# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging, queue, re, threading from datetime import datetime from typing import Dict from common.orm.Database import Database from common.orm.HighLevel import get_object from common.orm.backend.Tools import key_to_str from common.proto.monitoring_pb2 import Kpi from monitoring.client.MonitoringClient import MonitoringClient from .database.KpiModel import KpiModel from .database.RelationModels import EndPointMonitorKpiModel from .driver_api._Driver import _Driver LOGGER = logging.getLogger(__name__) QUEUE_GET_WAIT_TIMEOUT = 0.5 class MonitoringLoop: def __init__(self, device_uuid : str, driver : _Driver, samples_queue : queue.Queue) -> None: self._device_uuid = device_uuid self._driver = driver self._samples_queue = samples_queue self._running = threading.Event() self._terminate = threading.Event() self._samples_stream = self._driver.GetState(blocking=True, terminate=self._terminate) self._collector_thread = threading.Thread(target=self._collect, daemon=True) def _collect(self) -> None: for sample in self._samples_stream: if self._terminate.is_set(): break sample = (self._device_uuid, *sample) self._samples_queue.put_nowait(sample) def start(self): self._collector_thread.start() self._running.set() @property def is_running(self): return self._running.is_set() def stop(self): self._terminate.set() self._collector_thread.join() class MonitoringLoops: def __init__(self, database : Database) -> None: self._monitoring_client = MonitoringClient() self._database = database self._samples_queue = queue.Queue() self._running = threading.Event() self._terminate = threading.Event() self._lock = threading.Lock() self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {} self._exporter_thread = threading.Thread(target=self._export, daemon=True) def add(self, device_uuid : str, driver : _Driver) -> None: with self._lock: monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) if (monitoring_loop is not None) and monitoring_loop.is_running: return monitoring_loop = MonitoringLoop(device_uuid, driver, self._samples_queue) self._device_uuid__to__monitoring_loop[device_uuid] = monitoring_loop monitoring_loop.start() def remove(self, device_uuid : str) -> None: with self._lock: monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) if monitoring_loop is None: return if monitoring_loop.is_running: monitoring_loop.stop() self._device_uuid__to__monitoring_loop.pop(device_uuid, None) def start(self): self._exporter_thread.start() @property def is_running(self): return self._running.is_set() def stop(self): self._terminate.set() self._exporter_thread.join() def _export(self) -> None: if self._database is None: LOGGER.error('[MonitoringLoops:_export] Database not set. Terminating Exporter.') return self._running.set() while not self._terminate.is_set(): try: sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT) #LOGGER.debug('[MonitoringLoops:_export] sample={:s}'.format(str(sample))) except queue.Empty: continue device_uuid, timestamp, endpoint_monitor_resource_key, value = sample endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', endpoint_monitor_resource_key) str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':') #db_entries = self._database.dump() #LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) #for db_entry in db_entries: # LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover #LOGGER.info('-----------------------------------------------------------') db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object( self._database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False) if db_endpoint_monitor_kpi is None: LOGGER.warning('EndPointMonitorKpi({:s}) not found'.format(str_endpoint_monitor_kpi_key)) continue str_kpi_key = db_endpoint_monitor_kpi.kpi_fk db_kpi : KpiModel = get_object( self._database, KpiModel, str_kpi_key, raise_if_not_found=False) if db_kpi is None: LOGGER.warning('Kpi({:s}) not found'.format(str_kpi_key)) continue # FIXME: uint32 used for intVal results in out of range issues. Temporarily changed to float # extend the 'kpi_value' to support long integers (uint64 / int64 / ...) if isinstance(value, int): kpi_value_field_name = 'int64Val' kpi_value_field_cast = int elif isinstance(value, float): kpi_value_field_name = 'floatVal' kpi_value_field_cast = float elif isinstance(value, bool): kpi_value_field_name = 'boolVal' kpi_value_field_cast = bool else: kpi_value_field_name = 'stringVal' kpi_value_field_cast = str try: self._monitoring_client.IncludeKpi(Kpi(**{ 'kpi_id' : {'kpi_id': {'uuid': db_kpi.kpi_uuid}}, 'timestamp': {'timestamp': timestamp}, 'kpi_value': {kpi_value_field_name: kpi_value_field_cast(value)} })) except: # pylint: disable=bare-except LOGGER.exception('Unable to format/send Kpi') self._running.clear()