diff --git a/src/monitoring/service/EventTools.py b/src/monitoring/service/EventTools.py index 221a0ddbfdbb65b1a908e134cc25f55e235b7564..cdf8afc04c8c714bcdc2517e13063d75bc837df2 100644 --- a/src/monitoring/service/EventTools.py +++ b/src/monitoring/service/EventTools.py @@ -12,18 +12,27 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Dict import grpc, logging, queue, threading from common.method_wrappers.ServiceExceptions import ServiceException from common.proto import monitoring_pb2 -from common.proto.context_pb2 import Empty, EventTypeEnum +from common.proto.context_pb2 import DeviceOperationalStatusEnum, Empty, EventTypeEnum +from common.proto.kpi_sample_types_pb2 import KpiSampleType from context.client.ContextClient import ContextClient from monitoring.client.MonitoringClient import MonitoringClient from monitoring.service.MonitoringServiceServicerImpl import LOGGER +from monitoring.service.NameMapping import NameMapping LOGGER = logging.getLogger(__name__) +DEVICE_OP_STATUS_UNDEFINED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED +DEVICE_OP_STATUS_DISABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED +DEVICE_OP_STATUS_ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED +DEVICE_OP_STATUS_NOT_ENABLED = {DEVICE_OP_STATUS_UNDEFINED, DEVICE_OP_STATUS_DISABLED} +KPISAMPLETYPE_UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN + class EventsDeviceCollector: - def __init__(self) -> None: # pylint: disable=redefined-outer-name + def __init__(self, name_mapping : NameMapping) -> None: # pylint: disable=redefined-outer-name self._events_queue = queue.Queue() self._context_client_grpc = ContextClient() @@ -34,6 +43,9 @@ class EventsDeviceCollector: self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream,), daemon=False) + self._device_to_state : Dict[str, DeviceOperationalStatusEnum] = dict() + self._name_mapping = name_mapping + def grpc_server_on(self): try: grpc.channel_ready_future(self._channel).result(timeout=15) @@ -52,7 +64,7 @@ class EventsDeviceCollector: def start(self): try: self._device_thread.start() - except RuntimeError as e: + except RuntimeError: LOGGER.exception('Start EventTools exception') def get_event(self, block : bool = True, timeout : float = 0.1): @@ -71,29 +83,51 @@ class EventsDeviceCollector: try: event = self.get_event(block=True, timeout=0.5) - if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE: - device = self._context_client.GetDevice(event.device_id) - for j,end_point in enumerate(device.device_endpoints): - #for i, value in enumerate(kpi_sample_types_pb2.KpiSampleType.values()): - for i, value in enumerate(end_point.kpi_sample_types): - #if value == kpi_sample_types_pb2.KpiSampleType.KPISAMPLETYPE_UNKNOWN: continue + event_type = event.event.event_type + device_uuid = event.device_id.device_uuid.uuid + if event_type in {EventTypeEnum.EVENTTYPE_REMOVE}: + self._device_to_state.pop(device_uuid, None) + continue + + if event_type not in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: + # Unknown event type + continue + + device = self._context_client.GetDevice(event.device_id) + self._name_mapping.set_device_name(device_uuid, device.name) + + old_operational_status = self._device_to_state.get(device_uuid, DEVICE_OP_STATUS_UNDEFINED) + device_was_not_enabled = (old_operational_status in DEVICE_OP_STATUS_NOT_ENABLED) - kpi_descriptor = monitoring_pb2.KpiDescriptor() + new_operational_status = device.device_operational_status + device_is_enabled = (new_operational_status == DEVICE_OP_STATUS_ENABLED) + self._device_to_state[device_uuid] = new_operational_status - kpi_descriptor.kpi_description = device.device_type - kpi_descriptor.kpi_sample_type = value - #kpi_descriptor.service_id.service_uuid.uuid = "" - kpi_descriptor.device_id.CopyFrom(device.device_id) - kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id) + activate_monitoring = device_was_not_enabled and device_is_enabled + if not activate_monitoring: + # device is not ready for monitoring + continue - kpi_id = self._monitoring_client.SetKpi(kpi_descriptor) - kpi_id_list.append(kpi_id) + for endpoint in device.device_endpoints: + endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid + self._name_mapping.set_endpoint_name(endpoint_uuid, endpoint.name) + + for value in endpoint.kpi_sample_types: + if value == KPISAMPLETYPE_UNKNOWN: continue + + kpi_descriptor = monitoring_pb2.KpiDescriptor() + kpi_descriptor.kpi_description = device.device_type + kpi_descriptor.kpi_sample_type = value + kpi_descriptor.device_id.CopyFrom(device.device_id) # pylint: disable=no-member + kpi_descriptor.endpoint_id.CopyFrom(endpoint.endpoint_id) # pylint: disable=no-member + + kpi_id = self._monitoring_client.SetKpi(kpi_descriptor) + kpi_id_list.append(kpi_id) except queue.Empty: break return kpi_id_list - except ServiceException as e: + except ServiceException: LOGGER.exception('ListenEvents exception') - except Exception as e: # pragma: no cover + except Exception: # pragma: no cover # pylint: disable=broad-except LOGGER.exception('ListenEvents exception') - diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index 1d3888d5348bdbe2995f077310ca448827290382..a386d5f184694c87493681e7a31f7bc06301e50d 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -23,14 +23,18 @@ import datetime from common.tools.timestamp.Converters import timestamp_float_to_string, timestamp_utcnow_to_float import psycopg2 +from monitoring.service.NameMapping import NameMapping + LOGGER = logging.getLogger(__name__) class MetricsDB(): - def __init__(self, host, ilp_port=9009, rest_port=9000, table="monitoring", commit_lag_ms=1000, retries=10, - postgre=False, postgre_port=8812, postgre_user='admin', postgre_password='quest'): + def __init__(self, host, name_mapping : NameMapping, ilp_port=9009, rest_port=9000, table="monitoring", + commit_lag_ms=1000, retries=10, postgre=False, postgre_port=8812, postgre_user='admin', + postgre_password='quest'): try: self.host = host + self.name_mapping = name_mapping self.ilp_port = int(ilp_port) self.rest_port = rest_port self.table = table @@ -85,7 +89,9 @@ class MetricsDB(): '(kpi_id SYMBOL,' \ 'kpi_sample_type SYMBOL,' \ 'device_id SYMBOL,' \ + 'device_name SYMBOL,' \ 'endpoint_id SYMBOL,' \ + 'endpoint_name SYMBOL,' \ 'service_id SYMBOL,' \ 'slice_id SYMBOL,' \ 'connection_id SYMBOL,' \ @@ -100,6 +106,9 @@ class MetricsDB(): raise Exception def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, slice_id, connection_id, kpi_value): + device_name = self.name_mapping.get_device_name(device_id) or '' + endpoint_name = self.name_mapping.get_endpoint_name(endpoint_id) or '' + counter = 0 while (counter < self.retries): try: @@ -110,7 +119,9 @@ class MetricsDB(): 'kpi_id': kpi_id, 'kpi_sample_type': kpi_sample_type, 'device_id': device_id, + 'device_name': device_name, 'endpoint_id': endpoint_id, + 'endpoint_name': endpoint_name, 'service_id': service_id, 'slice_id': slice_id, 'connection_id': connection_id,}, diff --git a/src/monitoring/service/MonitoringService.py b/src/monitoring/service/MonitoringService.py index e2cbe2862894aec7b571ae857ad4c4fffa3c94c6..10611768a8cc91c45637f676536d1840114d8f33 100644 --- a/src/monitoring/service/MonitoringService.py +++ b/src/monitoring/service/MonitoringService.py @@ -17,12 +17,13 @@ from common.Settings import get_service_port_grpc from common.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server from common.tools.service.GenericGrpcService import GenericGrpcService from monitoring.service.MonitoringServiceServicerImpl import MonitoringServiceServicerImpl +from monitoring.service.NameMapping import NameMapping class MonitoringService(GenericGrpcService): - def __init__(self, cls_name: str = __name__) -> None: + def __init__(self, name_mapping : NameMapping, cls_name: str = __name__) -> None: port = get_service_port_grpc(ServiceNameEnum.MONITORING) super().__init__(port, cls_name=cls_name) - self.monitoring_servicer = MonitoringServiceServicerImpl() + self.monitoring_servicer = MonitoringServiceServicerImpl(name_mapping) def install_servicers(self): add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server) diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 99bed8f4d658d413b60430960c1e3ffda3cb1829..bf9e7cabdd812680a97754da80b0bcb0ba1722e3 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -34,6 +34,7 @@ from device.client.DeviceClient import DeviceClient from prometheus_client import Counter, Summary from monitoring.service.AlarmManager import AlarmManager +from monitoring.service.NameMapping import NameMapping from monitoring.service.SubscriptionManager import SubscriptionManager LOGGER = getJSONLogger('monitoringservice-server') @@ -49,14 +50,14 @@ METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT") METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE") class MonitoringServiceServicerImpl(MonitoringServiceServicer): - def __init__(self): + def __init__(self, name_mapping : NameMapping): LOGGER.info('Init monitoringService') # Init sqlite monitoring db self.management_db = ManagementDBTools.ManagementDB('monitoring.db') self.deviceClient = DeviceClient() - self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, - METRICSDB_TABLE) + self.metrics_db = MetricsDBTools.MetricsDB( + METRICSDB_HOSTNAME, name_mapping, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE) self.subs_manager = SubscriptionManager(self.metrics_db) self.alarm_manager = AlarmManager(self.metrics_db) LOGGER.info('MetricsDB initialized') diff --git a/src/monitoring/service/NameMapping.py b/src/monitoring/service/NameMapping.py new file mode 100644 index 0000000000000000000000000000000000000000..57d7bfd4e0699a1998fa1b17bdbab863c193e984 --- /dev/null +++ b/src/monitoring/service/NameMapping.py @@ -0,0 +1,46 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +from typing import Dict, Optional + +class NameMapping: + def __init__(self) -> None: + self.__lock = threading.Lock() + self.__device_to_name : Dict[str, str] = dict() + self.__endpoint_to_name : Dict[str, str] = dict() + + def get_device_name(self, device_uuid : str) -> Optional[str]: + with self.__lock: + return self.__device_to_name.get(device_uuid) + + def get_endpoint_name(self, endpoint_uuid : str) -> Optional[str]: + with self.__lock: + return self.__endpoint_to_name.get(endpoint_uuid) + + def set_device_name(self, device_uuid : str, device_name : str) -> None: + with self.__lock: + self.__device_to_name[device_uuid] = device_name + + def set_endpoint_name(self, endpoint_uuid : str, endpoint_name : str) -> None: + with self.__lock: + self.__endpoint_to_name[endpoint_uuid] = endpoint_name + + def delete_device_name(self, device_uuid : str) -> None: + with self.__lock: + self.__device_to_name.pop(device_uuid, None) + + def delete_endpoint_name(self, endpoint_uuid : str) -> None: + with self.__lock: + self.__endpoint_to_name.pop(endpoint_uuid, None) diff --git a/src/monitoring/service/__main__.py b/src/monitoring/service/__main__.py index 78764ea64e39c48d927901ad88e7cff569e7447b..5483bf5639dd6e4d157c9e8c35d330af492896ef 100644 --- a/src/monitoring/service/__main__.py +++ b/src/monitoring/service/__main__.py @@ -21,6 +21,7 @@ from common.Settings import ( from common.proto import monitoring_pb2 from .EventTools import EventsDeviceCollector from .MonitoringService import MonitoringService +from .NameMapping import NameMapping terminate = threading.Event() LOGGER = None @@ -29,10 +30,10 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning('Terminate signal received') terminate.set() -def start_monitoring(): +def start_monitoring(name_mapping : NameMapping): LOGGER.info('Start Monitoring...',) - events_collector = EventsDeviceCollector() + events_collector = EventsDeviceCollector(name_mapping) events_collector.start() # TODO: redesign this method to be more clear and clean @@ -79,11 +80,13 @@ def main(): metrics_port = get_metrics_port() start_http_server(metrics_port) + name_mapping = NameMapping() + # Starting monitoring service - grpc_service = MonitoringService() + grpc_service = MonitoringService(name_mapping) grpc_service.start() - start_monitoring() + start_monitoring(name_mapping) # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=0.1): pass