Loading src/kpi_manager/service/NameMapping.py 0 → 100644 +46 −0 Original line number Original line Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import threading from typing import Dict, Optional class NameMapping: def __init__(self) -> None: self.__lock = threading.Lock() self.__device_to_name : Dict[str, str] = dict() self.__endpoint_to_name : Dict[str, str] = dict() def get_device_name(self, device_uuid : str) -> Optional[str]: with self.__lock: return self.__device_to_name.get(device_uuid) def get_endpoint_name(self, endpoint_uuid : str) -> Optional[str]: with self.__lock: return self.__endpoint_to_name.get(endpoint_uuid) def set_device_name(self, device_uuid : str, device_name : str) -> None: with self.__lock: self.__device_to_name[device_uuid] = device_name def set_endpoint_name(self, endpoint_uuid : str, endpoint_name : str) -> None: with self.__lock: self.__endpoint_to_name[endpoint_uuid] = endpoint_name def delete_device_name(self, device_uuid : str) -> None: with self.__lock: self.__device_to_name.pop(device_uuid, None) def delete_endpoint_name(self, endpoint_uuid : str) -> None: with self.__lock: self.__endpoint_to_name.pop(endpoint_uuid, None) src/kpi_manager/service/__main__.py +30 −30 Original line number Original line Diff line number Diff line Loading @@ -30,34 +30,34 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning('Terminate signal received') LOGGER.warning('Terminate signal received') terminate.set() terminate.set() def start_kpi_manager(name_mapping : NameMapping): # def start_kpi_manager(name_mapping : NameMapping): LOGGER.info('Start Kpi Manager...',) # LOGGER.info('Start Kpi Manager...',) events_collector = EventsDeviceCollector(name_mapping) # events_collector = EventsDeviceCollector(name_mapping) events_collector.start() # events_collector.start() # TODO: redesign this method to be more clear and clean # # TODO: redesign this method to be more clear and clean # Iterate while terminate is not set # # Iterate while terminate is not set while not terminate.is_set(): # while not terminate.is_set(): list_new_kpi_ids = events_collector.listen_events() # list_new_kpi_ids = events_collector.listen_events() # Monitor Kpis # # Monitor Kpis if bool(list_new_kpi_ids): # if bool(list_new_kpi_ids): for kpi_id in list_new_kpi_ids: # for kpi_id in list_new_kpi_ids: # Create Monitor Kpi Requests # # Create Monitor Kpi Requests monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() # monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() monitor_kpi_request.kpi_id.CopyFrom(kpi_id) # monitor_kpi_request.kpi_id.CopyFrom(kpi_id) monitor_kpi_request.monitoring_window_s = 86400 # monitor_kpi_request.monitoring_window_s = 86400 monitor_kpi_request.sampling_rate_s = 10 # monitor_kpi_request.sampling_rate_s = 10 events_collector._monitoring_client.MonitorKpi(monitor_kpi_request) # events_collector._monitoring_client.MonitorKpi(monitor_kpi_request) time.sleep(0.5) # let other tasks run; do not overload CPU # time.sleep(0.5) # let other tasks run; do not overload CPU else: # else: # Terminate is set, looping terminates # # Terminate is set, looping terminates LOGGER.warning("Stopping execution...") # LOGGER.warning("Stopping execution...") events_collector.start() # events_collector.start() def main(): def main(): global LOGGER # pylint: disable=global-statement global LOGGER # pylint: disable=global-statement Loading @@ -76,7 +76,7 @@ def main(): signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, signal_handler) LOGGER.info('Starting...') LOGGER.debug('Starting...') # Start metrics server # Start metrics server metrics_port = get_metrics_port() metrics_port = get_metrics_port() Loading @@ -91,15 +91,15 @@ def main(): grpc_service = KpiManagerService(name_mapping) grpc_service = KpiManagerService(name_mapping) grpc_service.start() grpc_service.start() start_kpi_manager(name_mapping) # start_kpi_manager(name_mapping) # Wait for Ctrl+C or termination signal # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1.0): pass while not terminate.wait(timeout=1.0): pass LOGGER.info('Terminating...') LOGGER.debug('Terminating...') grpc_service.stop() grpc_service.stop() LOGGER.info('Bye') LOGGER.debug('Bye') return 0 return 0 if __name__ == '__main__': if __name__ == '__main__': Loading src/kpi_value_api/tests/messages.py +2 −2 Original line number Original line Diff line number Diff line Loading @@ -20,8 +20,8 @@ def create_kpi_value_list(): _create_kpi_value_list = KpiValueList() _create_kpi_value_list = KpiValueList() # To run this experiment sucessfully, already existing UUID in KPI DB in necessary. # To run this experiment sucessfully, already existing UUID in KPI DB in necessary. # because the UUID is used to get the descriptor form KPI DB. # because the UUID is used to get the descriptor form KPI DB. EXISTING_KPI_IDs = ["198a5a83-ddd3-4818-bdcb-e468eda03e18", EXISTING_KPI_IDs = ["725ce3ad-ac67-4373-bd35-8cd9d6a86e09", "c288ea27-db40-419e-81d3-f675df22c8f4", str(uuid.uuid4()), str(uuid.uuid4())] str(uuid.uuid4())] for kpi_id_uuid in EXISTING_KPI_IDs: for kpi_id_uuid in EXISTING_KPI_IDs: Loading src/kpi_value_writer/service/KpiValueWriter.py +0 −32 Original line number Original line Diff line number Diff line Loading @@ -93,35 +93,3 @@ class KpiValueWriter: LOGGER.debug("Error in extracting row {:}".format(kpi_descriptor_object)) LOGGER.debug("Error in extracting row {:}".format(kpi_descriptor_object)) except Exception as e: except Exception as e: print ("Unable to get Descriptor. Error: {:}".format(e)) print ("Unable to get Descriptor. Error: {:}".format(e)) def kpi_manager_service(): LOGGER.info('Initializing KpiManagerService...') name_mapping = NameMapping() # _service = MonitoringService(name_mapping) _service = KpiManagerService(name_mapping) _service.start() # yield the server, when test finishes, execution will resume to stop it LOGGER.info('Yielding KpiManagerService...') yield _service LOGGER.info('Terminating KpiManagerService...') _service.stop() LOGGER.info('Terminated KpiManagerService...') def kpi_manager_client_a(kpi_manager_service : KpiManagerService): # pylint: disable=redefined-outer-name,unused-argument LOGGER.info('Initializing KpiManagerClient...') _client = KpiManagerClient() # yield the server, when test finishes, execution will resume to stop it LOGGER.info('Yielding KpiManagerClient...') yield _client LOGGER.info('Closing KpiManagerClient...') _client.close() LOGGER.info('Closed KpiManagerClient...') No newline at end of file src/kpi_value_writer/service/NameMapping.py 0 → 100644 +46 −0 Original line number Original line Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import threading from typing import Dict, Optional class NameMapping: def __init__(self) -> None: self.__lock = threading.Lock() self.__device_to_name : Dict[str, str] = dict() self.__endpoint_to_name : Dict[str, str] = dict() def get_device_name(self, device_uuid : str) -> Optional[str]: with self.__lock: return self.__device_to_name.get(device_uuid) def get_endpoint_name(self, endpoint_uuid : str) -> Optional[str]: with self.__lock: return self.__endpoint_to_name.get(endpoint_uuid) def set_device_name(self, device_uuid : str, device_name : str) -> None: with self.__lock: self.__device_to_name[device_uuid] = device_name def set_endpoint_name(self, endpoint_uuid : str, endpoint_name : str) -> None: with self.__lock: self.__endpoint_to_name[endpoint_uuid] = endpoint_name def delete_device_name(self, device_uuid : str) -> None: with self.__lock: self.__device_to_name.pop(device_uuid, None) def delete_endpoint_name(self, endpoint_uuid : str) -> None: with self.__lock: self.__endpoint_to_name.pop(endpoint_uuid, None) Loading
src/kpi_manager/service/NameMapping.py 0 → 100644 +46 −0 Original line number Original line Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import threading from typing import Dict, Optional class NameMapping: def __init__(self) -> None: self.__lock = threading.Lock() self.__device_to_name : Dict[str, str] = dict() self.__endpoint_to_name : Dict[str, str] = dict() def get_device_name(self, device_uuid : str) -> Optional[str]: with self.__lock: return self.__device_to_name.get(device_uuid) def get_endpoint_name(self, endpoint_uuid : str) -> Optional[str]: with self.__lock: return self.__endpoint_to_name.get(endpoint_uuid) def set_device_name(self, device_uuid : str, device_name : str) -> None: with self.__lock: self.__device_to_name[device_uuid] = device_name def set_endpoint_name(self, endpoint_uuid : str, endpoint_name : str) -> None: with self.__lock: self.__endpoint_to_name[endpoint_uuid] = endpoint_name def delete_device_name(self, device_uuid : str) -> None: with self.__lock: self.__device_to_name.pop(device_uuid, None) def delete_endpoint_name(self, endpoint_uuid : str) -> None: with self.__lock: self.__endpoint_to_name.pop(endpoint_uuid, None)
src/kpi_manager/service/__main__.py +30 −30 Original line number Original line Diff line number Diff line Loading @@ -30,34 +30,34 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning('Terminate signal received') LOGGER.warning('Terminate signal received') terminate.set() terminate.set() def start_kpi_manager(name_mapping : NameMapping): # def start_kpi_manager(name_mapping : NameMapping): LOGGER.info('Start Kpi Manager...',) # LOGGER.info('Start Kpi Manager...',) events_collector = EventsDeviceCollector(name_mapping) # events_collector = EventsDeviceCollector(name_mapping) events_collector.start() # events_collector.start() # TODO: redesign this method to be more clear and clean # # TODO: redesign this method to be more clear and clean # Iterate while terminate is not set # # Iterate while terminate is not set while not terminate.is_set(): # while not terminate.is_set(): list_new_kpi_ids = events_collector.listen_events() # list_new_kpi_ids = events_collector.listen_events() # Monitor Kpis # # Monitor Kpis if bool(list_new_kpi_ids): # if bool(list_new_kpi_ids): for kpi_id in list_new_kpi_ids: # for kpi_id in list_new_kpi_ids: # Create Monitor Kpi Requests # # Create Monitor Kpi Requests monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() # monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() monitor_kpi_request.kpi_id.CopyFrom(kpi_id) # monitor_kpi_request.kpi_id.CopyFrom(kpi_id) monitor_kpi_request.monitoring_window_s = 86400 # monitor_kpi_request.monitoring_window_s = 86400 monitor_kpi_request.sampling_rate_s = 10 # monitor_kpi_request.sampling_rate_s = 10 events_collector._monitoring_client.MonitorKpi(monitor_kpi_request) # events_collector._monitoring_client.MonitorKpi(monitor_kpi_request) time.sleep(0.5) # let other tasks run; do not overload CPU # time.sleep(0.5) # let other tasks run; do not overload CPU else: # else: # Terminate is set, looping terminates # # Terminate is set, looping terminates LOGGER.warning("Stopping execution...") # LOGGER.warning("Stopping execution...") events_collector.start() # events_collector.start() def main(): def main(): global LOGGER # pylint: disable=global-statement global LOGGER # pylint: disable=global-statement Loading @@ -76,7 +76,7 @@ def main(): signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, signal_handler) LOGGER.info('Starting...') LOGGER.debug('Starting...') # Start metrics server # Start metrics server metrics_port = get_metrics_port() metrics_port = get_metrics_port() Loading @@ -91,15 +91,15 @@ def main(): grpc_service = KpiManagerService(name_mapping) grpc_service = KpiManagerService(name_mapping) grpc_service.start() grpc_service.start() start_kpi_manager(name_mapping) # start_kpi_manager(name_mapping) # Wait for Ctrl+C or termination signal # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1.0): pass while not terminate.wait(timeout=1.0): pass LOGGER.info('Terminating...') LOGGER.debug('Terminating...') grpc_service.stop() grpc_service.stop() LOGGER.info('Bye') LOGGER.debug('Bye') return 0 return 0 if __name__ == '__main__': if __name__ == '__main__': Loading
src/kpi_value_api/tests/messages.py +2 −2 Original line number Original line Diff line number Diff line Loading @@ -20,8 +20,8 @@ def create_kpi_value_list(): _create_kpi_value_list = KpiValueList() _create_kpi_value_list = KpiValueList() # To run this experiment sucessfully, already existing UUID in KPI DB in necessary. # To run this experiment sucessfully, already existing UUID in KPI DB in necessary. # because the UUID is used to get the descriptor form KPI DB. # because the UUID is used to get the descriptor form KPI DB. EXISTING_KPI_IDs = ["198a5a83-ddd3-4818-bdcb-e468eda03e18", EXISTING_KPI_IDs = ["725ce3ad-ac67-4373-bd35-8cd9d6a86e09", "c288ea27-db40-419e-81d3-f675df22c8f4", str(uuid.uuid4()), str(uuid.uuid4())] str(uuid.uuid4())] for kpi_id_uuid in EXISTING_KPI_IDs: for kpi_id_uuid in EXISTING_KPI_IDs: Loading
src/kpi_value_writer/service/KpiValueWriter.py +0 −32 Original line number Original line Diff line number Diff line Loading @@ -93,35 +93,3 @@ class KpiValueWriter: LOGGER.debug("Error in extracting row {:}".format(kpi_descriptor_object)) LOGGER.debug("Error in extracting row {:}".format(kpi_descriptor_object)) except Exception as e: except Exception as e: print ("Unable to get Descriptor. Error: {:}".format(e)) print ("Unable to get Descriptor. Error: {:}".format(e)) def kpi_manager_service(): LOGGER.info('Initializing KpiManagerService...') name_mapping = NameMapping() # _service = MonitoringService(name_mapping) _service = KpiManagerService(name_mapping) _service.start() # yield the server, when test finishes, execution will resume to stop it LOGGER.info('Yielding KpiManagerService...') yield _service LOGGER.info('Terminating KpiManagerService...') _service.stop() LOGGER.info('Terminated KpiManagerService...') def kpi_manager_client_a(kpi_manager_service : KpiManagerService): # pylint: disable=redefined-outer-name,unused-argument LOGGER.info('Initializing KpiManagerClient...') _client = KpiManagerClient() # yield the server, when test finishes, execution will resume to stop it LOGGER.info('Yielding KpiManagerClient...') yield _client LOGGER.info('Closing KpiManagerClient...') _client.close() LOGGER.info('Closed KpiManagerClient...') No newline at end of file
src/kpi_value_writer/service/NameMapping.py 0 → 100644 +46 −0 Original line number Original line Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import threading from typing import Dict, Optional class NameMapping: def __init__(self) -> None: self.__lock = threading.Lock() self.__device_to_name : Dict[str, str] = dict() self.__endpoint_to_name : Dict[str, str] = dict() def get_device_name(self, device_uuid : str) -> Optional[str]: with self.__lock: return self.__device_to_name.get(device_uuid) def get_endpoint_name(self, endpoint_uuid : str) -> Optional[str]: with self.__lock: return self.__endpoint_to_name.get(endpoint_uuid) def set_device_name(self, device_uuid : str, device_name : str) -> None: with self.__lock: self.__device_to_name[device_uuid] = device_name def set_endpoint_name(self, endpoint_uuid : str, endpoint_name : str) -> None: with self.__lock: self.__endpoint_to_name[endpoint_uuid] = endpoint_name def delete_device_name(self, device_uuid : str) -> None: with self.__lock: self.__device_to_name.pop(device_uuid, None) def delete_endpoint_name(self, endpoint_uuid : str) -> None: with self.__lock: self.__endpoint_to_name.pop(endpoint_uuid, None)