Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
Select Git revision
  • 11a69c44d3b4c00d85a83c2ad127ffe9e68a0be8
  • master default
  • feat/320-cttc-ietf-simap-basic-support-with-kafka-yang-push
  • feat/307-update-python-version-service
  • feat/292-cttc-implement-integration-test-for-ryu-openflow
  • cnit_tapi
  • feat/314-tid-new-service-for-ipowdm-configuration-fron-orchestrator-to-ipowdm-controller
  • feat/327-tid-new-service-to-ipowdm-controller-to-manage-transceivers-configuration-on-external-agent
  • cnit-p2mp-premerge
  • feat/325-tid-nbi-e2e-to-manage-e2e-path-computation
  • feat/326-tid-external-management-of-devices-telemetry-nbi
  • openroadm-flex-grid
  • feat/310-cttc-implement-nbi-connector-to-interface-with-osm-client
  • develop protected
  • feat/324-tid-nbi-ietf_l3vpn-deploy-fail
  • feat/321-add-support-for-gnmi-configuration-via-proto
  • feat/322-add-read-support-for-ipinfusion-devices-via-netconf
  • feat/323-add-support-for-restconf-protocol-in-devices
  • feat/policy-refactor
  • feat/192-cttc-implement-telemetry-backend-collector-gnmi-openconfig
  • feat/307-update-python-version
  • feat/telemetry-collector-int
  • v5.0.0 protected
  • v4.0.0 protected
  • demo-dpiab-eucnc2024
  • v3.0.0 protected
  • v2.1.0 protected
  • v2.0.0 protected
  • v1.0.0 protected
29 results

MonitoringLoops.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    MonitoringLoops.py 6.62 KiB
    # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import logging, queue, re, threading
    from datetime import datetime
    from typing import Dict
    from common.orm.Database import Database
    from common.orm.HighLevel import get_object
    from common.orm.backend.Tools import key_to_str
    from monitoring.client.monitoring_client import MonitoringClient
    from monitoring.proto.monitoring_pb2 import Kpi
    from .database.KpiModel import KpiModel
    from .database.RelationModels import EndPointMonitorKpiModel
    from .driver_api._Driver import _Driver
    
    LOGGER = logging.getLogger(__name__)
    QUEUE_GET_WAIT_TIMEOUT = 0.5
    
    class MonitoringLoop:
        def __init__(self, device_uuid : str, driver : _Driver, samples_queue : queue.Queue) -> None:
            self._device_uuid = device_uuid
            self._driver = driver
            self._samples_queue = samples_queue
            self._running = threading.Event()
            self._terminate = threading.Event()
            self._samples_stream = self._driver.GetState(blocking=True, terminate=self._terminate)
            self._collector_thread = threading.Thread(target=self._collect, daemon=True)
    
        def _collect(self) -> None:
            for sample in self._samples_stream:
                if self._terminate.is_set(): break
                sample = (self._device_uuid, *sample)
                self._samples_queue.put_nowait(sample)
    
        def start(self):
            self._collector_thread.start()
            self._running.set()
    
        @property
        def is_running(self): return self._running.is_set()
    
        def stop(self):
            self._terminate.set()
            self._collector_thread.join()
    
    class MonitoringLoops:
        def __init__(self, monitoring_client : MonitoringClient, database : Database) -> None:
            self._monitoring_client = monitoring_client
            self._database = database
            self._samples_queue = queue.Queue()
            self._running = threading.Event()
            self._terminate = threading.Event()
            self._lock = threading.Lock()
            self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {}
            self._exporter_thread = threading.Thread(target=self._export, daemon=True)
    
        def add(self, device_uuid : str, driver : _Driver) -> None:
            with self._lock:
                monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid)
                if (monitoring_loop is not None) and monitoring_loop.is_running: return
                monitoring_loop = MonitoringLoop(device_uuid, driver, self._samples_queue)
                self._device_uuid__to__monitoring_loop[device_uuid] = monitoring_loop
                monitoring_loop.start()
    
        def remove(self, device_uuid : str) -> None:
            with self._lock:
                monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid)
                if monitoring_loop is None: return
                if monitoring_loop.is_running: monitoring_loop.stop()
                self._device_uuid__to__monitoring_loop.pop(device_uuid, None)
    
        def start(self):
            self._exporter_thread.start()
            self._running.set()
    
        @property
        def is_running(self): return self._running.is_set()
    
        def stop(self):
            self._terminate.set()
            self._exporter_thread.join()
    
        def _export(self) -> None:
            if self._database is None:
                LOGGER.error('[MonitoringLoops:_export] Database not set. Terminating Exporter.')
                return
    
            while not self._terminate.is_set():
                try:
                    sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT)
                    #LOGGER.debug('[MonitoringLoops:_export] sample={:s}'.format(str(sample)))
                except queue.Empty:
                    continue
    
                device_uuid, timestamp, endpoint_monitor_resource_key, value = sample
                endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', endpoint_monitor_resource_key)
                str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
    
                #db_entries = self._database.dump()
                #LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
                #for db_entry in db_entries:
                #    LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
                #LOGGER.info('-----------------------------------------------------------')
    
                db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
                    self._database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
                if db_endpoint_monitor_kpi is None:
                    LOGGER.warning('EndPointMonitorKpi({:s}) not found'.format(str_endpoint_monitor_kpi_key))
                    continue
    
                str_kpi_key = db_endpoint_monitor_kpi.kpi_fk
                db_kpi : KpiModel = get_object(
                    self._database, KpiModel, str_kpi_key, raise_if_not_found=False)
                if db_kpi is None:
                    LOGGER.warning('Kpi({:s}) not found'.format(str_kpi_key))
                    continue
    
                # FIXME: uint32 used for intVal results in out of range issues. Temporarily changed to float
                #        extend the 'kpi_value' to support long integers (uint64 / int64 / ...)
                if isinstance(value, int):
                    kpi_value_field_name = 'floatVal'   # 'intVal'
                    kpi_value_field_cast = float        # int
                elif isinstance(value, float):
                    kpi_value_field_name = 'floatVal'
                    kpi_value_field_cast = float
                elif isinstance(value, bool):
                    kpi_value_field_name = 'boolVal'
                    kpi_value_field_cast = bool
                else:
                    kpi_value_field_name = 'stringVal'
                    kpi_value_field_cast = str
    
                try:
                    self._monitoring_client.IncludeKpi(Kpi(**{
                        'kpi_id'   : {'kpi_id': {'uuid': db_kpi.kpi_uuid}},
                        'timestamp': datetime.utcfromtimestamp(timestamp).isoformat() + 'Z',
                        'kpi_value': {kpi_value_field_name: kpi_value_field_cast(value)}
                    }))
                except: # pylint: disable=bare-except
                    LOGGER.exception('Unable to format/send Kpi')