Commit ff8ebacf authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Monitoring component:

- Added mapping of UUIDs to names for samples stored in QuestDB
- Extracted QuestDB from Monitoring component
- Improved management of device events
parent ea8e6350
Loading
Loading
Loading
Loading
+54 −20
Original line number Diff line number Diff line
@@ -12,18 +12,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict
import grpc, logging, queue, threading
from common.method_wrappers.ServiceExceptions import ServiceException
from common.proto import monitoring_pb2
from common.proto.context_pb2 import Empty, EventTypeEnum
from common.proto.context_pb2 import DeviceOperationalStatusEnum, Empty, EventTypeEnum
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from context.client.ContextClient import ContextClient
from monitoring.client.MonitoringClient import MonitoringClient
from monitoring.service.MonitoringServiceServicerImpl import LOGGER
from monitoring.service.NameMapping import NameMapping

LOGGER = logging.getLogger(__name__)

DEVICE_OP_STATUS_UNDEFINED   = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED
DEVICE_OP_STATUS_DISABLED    = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
DEVICE_OP_STATUS_ENABLED     = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
DEVICE_OP_STATUS_NOT_ENABLED = {DEVICE_OP_STATUS_UNDEFINED, DEVICE_OP_STATUS_DISABLED}
KPISAMPLETYPE_UNKNOWN        = KpiSampleType.KPISAMPLETYPE_UNKNOWN

class EventsDeviceCollector:
    def __init__(self) -> None: # pylint: disable=redefined-outer-name
    def __init__(self, name_mapping : NameMapping) -> None: # pylint: disable=redefined-outer-name
        self._events_queue = queue.Queue()

        self._context_client_grpc = ContextClient()
@@ -34,6 +43,9 @@ class EventsDeviceCollector:

        self._device_thread   = threading.Thread(target=self._collect, args=(self._device_stream,), daemon=False)

        self._device_to_state : Dict[str, DeviceOperationalStatusEnum] = dict()
        self._name_mapping = name_mapping

    def grpc_server_on(self):
        try:
            grpc.channel_ready_future(self._channel).result(timeout=15)
@@ -52,7 +64,7 @@ class EventsDeviceCollector:
    def start(self):
        try:
            self._device_thread.start()
        except RuntimeError as e:
        except RuntimeError:
            LOGGER.exception('Start EventTools exception')

    def get_event(self, block : bool = True, timeout : float = 0.1):
@@ -71,20 +83,43 @@ class EventsDeviceCollector:
                try:
                    event = self.get_event(block=True, timeout=0.5)

                    if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE:
                    event_type = event.event.event_type
                    device_uuid = event.device_id.device_uuid.uuid
                    if event_type in {EventTypeEnum.EVENTTYPE_REMOVE}:
                        self._device_to_state.pop(device_uuid, None)
                        continue

                    if event_type not in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
                        # Unknown event type
                        continue

                    device = self._context_client.GetDevice(event.device_id)
                        for j,end_point in enumerate(device.device_endpoints):
                            #for i, value in enumerate(kpi_sample_types_pb2.KpiSampleType.values()):
                            for i, value in enumerate(end_point.kpi_sample_types):
                                #if value == kpi_sample_types_pb2.KpiSampleType.KPISAMPLETYPE_UNKNOWN: continue
                    self._name_mapping.set_device_name(device_uuid, device.name)

                                kpi_descriptor = monitoring_pb2.KpiDescriptor()
                    old_operational_status = self._device_to_state.get(device_uuid, DEVICE_OP_STATUS_UNDEFINED)
                    device_was_not_enabled = (old_operational_status in DEVICE_OP_STATUS_NOT_ENABLED)

                    new_operational_status = device.device_operational_status
                    device_is_enabled = (new_operational_status == DEVICE_OP_STATUS_ENABLED)
                    self._device_to_state[device_uuid] = new_operational_status

                    activate_monitoring = device_was_not_enabled and device_is_enabled
                    if not activate_monitoring:
                        # device is not ready for monitoring
                        continue

                    for endpoint in device.device_endpoints:
                        endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
                        self._name_mapping.set_endpoint_name(endpoint_uuid, endpoint.name)

                        for value in endpoint.kpi_sample_types:
                            if value == KPISAMPLETYPE_UNKNOWN: continue

                            kpi_descriptor = monitoring_pb2.KpiDescriptor()
                            kpi_descriptor.kpi_description = device.device_type
                            kpi_descriptor.kpi_sample_type = value
                                #kpi_descriptor.service_id.service_uuid.uuid         = ""
                                kpi_descriptor.device_id.CopyFrom(device.device_id)
                                kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id)
                            kpi_descriptor.device_id.CopyFrom(device.device_id)         # pylint: disable=no-member
                            kpi_descriptor.endpoint_id.CopyFrom(endpoint.endpoint_id)   # pylint: disable=no-member

                            kpi_id = self._monitoring_client.SetKpi(kpi_descriptor)
                            kpi_id_list.append(kpi_id)
@@ -92,8 +127,7 @@ class EventsDeviceCollector:
                    break

            return kpi_id_list
        except ServiceException as e:
        except ServiceException:
            LOGGER.exception('ListenEvents exception')
        except Exception as e:  # pragma: no cover
        except Exception:  # pragma: no cover # pylint: disable=broad-except
            LOGGER.exception('ListenEvents exception')
+13 −2
Original line number Diff line number Diff line
@@ -23,14 +23,18 @@ import datetime
from common.tools.timestamp.Converters import timestamp_float_to_string, timestamp_utcnow_to_float
import psycopg2

from monitoring.service.NameMapping import NameMapping

LOGGER = logging.getLogger(__name__)


class MetricsDB():
    def __init__(self, host, ilp_port=9009, rest_port=9000, table="monitoring", commit_lag_ms=1000, retries=10,
                 postgre=False, postgre_port=8812, postgre_user='admin', postgre_password='quest'):
    def __init__(self, host, name_mapping : NameMapping, ilp_port=9009, rest_port=9000, table="monitoring",
                 commit_lag_ms=1000, retries=10, postgre=False, postgre_port=8812, postgre_user='admin',
                 postgre_password='quest'):
        try:
            self.host = host
            self.name_mapping = name_mapping
            self.ilp_port = int(ilp_port)
            self.rest_port = rest_port
            self.table = table
@@ -85,7 +89,9 @@ class MetricsDB():
                    '(kpi_id SYMBOL,' \
                    'kpi_sample_type SYMBOL,' \
                    'device_id SYMBOL,' \
                    'device_name SYMBOL,' \
                    'endpoint_id SYMBOL,' \
                    'endpoint_name SYMBOL,' \
                    'service_id SYMBOL,' \
                    'slice_id SYMBOL,' \
                    'connection_id SYMBOL,' \
@@ -100,6 +106,9 @@ class MetricsDB():
            raise Exception

    def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, slice_id, connection_id, kpi_value):
        device_name = self.name_mapping.get_device_name(device_id) or ''
        endpoint_name = self.name_mapping.get_endpoint_name(endpoint_id) or ''

        counter = 0
        while (counter < self.retries):
            try:
@@ -110,7 +119,9 @@ class MetricsDB():
                            'kpi_id': kpi_id,
                            'kpi_sample_type': kpi_sample_type,
                            'device_id': device_id,
                            'device_name': device_name,
                            'endpoint_id': endpoint_id,
                            'endpoint_name': endpoint_name,
                            'service_id': service_id,
                            'slice_id': slice_id,
                            'connection_id': connection_id,},
+3 −2
Original line number Diff line number Diff line
@@ -17,12 +17,13 @@ from common.Settings import get_service_port_grpc
from common.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server
from common.tools.service.GenericGrpcService import GenericGrpcService
from monitoring.service.MonitoringServiceServicerImpl import MonitoringServiceServicerImpl
from monitoring.service.NameMapping import NameMapping

class MonitoringService(GenericGrpcService):
    def __init__(self, cls_name: str = __name__) -> None:
    def __init__(self, name_mapping : NameMapping, cls_name: str = __name__) -> None:
        port = get_service_port_grpc(ServiceNameEnum.MONITORING)
        super().__init__(port, cls_name=cls_name)
        self.monitoring_servicer = MonitoringServiceServicerImpl()
        self.monitoring_servicer = MonitoringServiceServicerImpl(name_mapping)

    def install_servicers(self):
        add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server)
+4 −3
Original line number Diff line number Diff line
@@ -34,6 +34,7 @@ from device.client.DeviceClient import DeviceClient
from prometheus_client import Counter, Summary

from monitoring.service.AlarmManager import AlarmManager
from monitoring.service.NameMapping import NameMapping
from monitoring.service.SubscriptionManager import SubscriptionManager

LOGGER = getJSONLogger('monitoringservice-server')
@@ -49,14 +50,14 @@ METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")

class MonitoringServiceServicerImpl(MonitoringServiceServicer):
    def __init__(self):
    def __init__(self, name_mapping : NameMapping):
        LOGGER.info('Init monitoringService')

        # Init sqlite monitoring db
        self.management_db = ManagementDBTools.ManagementDB('monitoring.db')
        self.deviceClient = DeviceClient()
        self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT,
                                                   METRICSDB_TABLE)
        self.metrics_db = MetricsDBTools.MetricsDB(
            METRICSDB_HOSTNAME, name_mapping, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE)
        self.subs_manager = SubscriptionManager(self.metrics_db)
        self.alarm_manager = AlarmManager(self.metrics_db)
        LOGGER.info('MetricsDB initialized')
+46 −0
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
from typing import Dict, Optional

class NameMapping:
    def __init__(self) -> None:
        self.__lock = threading.Lock()
        self.__device_to_name   : Dict[str, str] = dict()
        self.__endpoint_to_name : Dict[str, str] = dict()

    def get_device_name(self, device_uuid : str) -> Optional[str]:
        with self.__lock:
            return self.__device_to_name.get(device_uuid)

    def get_endpoint_name(self, endpoint_uuid : str) -> Optional[str]:
        with self.__lock:
            return self.__endpoint_to_name.get(endpoint_uuid)

    def set_device_name(self, device_uuid : str, device_name : str) -> None:
        with self.__lock:
            self.__device_to_name[device_uuid] = device_name

    def set_endpoint_name(self, endpoint_uuid : str, endpoint_name : str) -> None:
        with self.__lock:
            self.__endpoint_to_name[endpoint_uuid] = endpoint_name

    def delete_device_name(self, device_uuid : str) -> None:
        with self.__lock:
            self.__device_to_name.pop(device_uuid, None)

    def delete_endpoint_name(self, endpoint_uuid : str) -> None:
        with self.__lock:
            self.__endpoint_to_name.pop(endpoint_uuid, None)
Loading