Skip to content
Snippets Groups Projects
MonitoringLoops.py 5.8 KiB
Newer Older
import logging, queue, re, threading
from datetime import datetime
from typing import Dict
from common.orm.Database import Database
from common.orm.HighLevel import get_object
from common.orm.backend.Tools import key_to_str
from monitoring.client.monitoring_client import MonitoringClient
from monitoring.proto.monitoring_pb2 import Kpi
from .database.KpiModel import KpiModel
from .database.RelationModels import EndPointMonitorKpiModel
from .driver_api._Driver import _Driver

LOGGER = logging.getLogger(__name__)
QUEUE_GET_WAIT_TIMEOUT = 0.5

class MonitoringLoop:
    def __init__(self, device_uuid : str, driver : _Driver, samples_queue : queue.Queue) -> None:
        self._device_uuid = device_uuid
        self._driver = driver
        self._samples_queue = samples_queue
        self._running = threading.Event()
        self._terminate = threading.Event()
        self._samples_stream = self._driver.GetState(blocking=True, terminate=self._terminate)
        self._collector_thread = threading.Thread(target=self._collect, daemon=True)

    def _collect(self) -> None:
        for sample in self._samples_stream:
            if self._terminate.is_set(): break
            sample = (self._device_uuid, *sample)
            self._samples_queue.put_nowait(sample)

    def start(self):
        self._collector_thread.start()
        self._running.set()

    @property
    def is_running(self): return self._running.is_set()

    def stop(self):
        self._terminate.set()
        self._collector_thread.join()

class MonitoringLoops:
    def __init__(self, monitoring_client : MonitoringClient, database : Database) -> None:
        self._monitoring_client = monitoring_client
        self._database = database
        self._samples_queue = queue.Queue()
        self._running = threading.Event()
        self._terminate = threading.Event()
        self._lock = threading.Lock()
        self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {}
        self._exporter_thread = threading.Thread(target=self._export, daemon=True)

    def add(self, device_uuid : str, driver : _Driver) -> None:
        with self._lock:
            monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid)
            if (monitoring_loop is not None) and monitoring_loop.is_running: return
            monitoring_loop = MonitoringLoop(device_uuid, driver, self._samples_queue)
            self._device_uuid__to__monitoring_loop[device_uuid] = monitoring_loop
            monitoring_loop.start()

    def remove(self, device_uuid : str) -> None:
        with self._lock:
            monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid)
            if monitoring_loop is None: return
            if monitoring_loop.is_running: monitoring_loop.stop()
            self._device_uuid__to__monitoring_loop.pop(device_uuid, None)

    def start(self):
        self._exporter_thread.start()
        self._running.set()

    @property
    def is_running(self): return self._running.is_set()

    def stop(self):
        self._terminate.set()
        self._exporter_thread.join()

    def _export(self) -> None:
        if self._database is None:
            LOGGER.error('[MonitoringLoops:_export] Database not set. Terminating Exporter.')
            return

        while not self._terminate.is_set():
            try:
                sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT)
                #LOGGER.debug('[MonitoringLoops:_export] sample={:s}'.format(str(sample)))
            except queue.Empty:
                continue

            device_uuid, timestamp, endpoint_monitor_resource_key, value = sample
            endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', endpoint_monitor_resource_key)
            str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')

            #db_entries = self._database.dump()
            #LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
            #for db_entry in db_entries:
            #    LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
            #LOGGER.info('-----------------------------------------------------------')

            db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
                self._database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
            if db_endpoint_monitor_kpi is None:
                LOGGER.warning('EndPointMonitorKpi({:s}) not found'.format(str_endpoint_monitor_kpi_key))
                continue

            str_kpi_key = db_endpoint_monitor_kpi.kpi_fk
            db_kpi : KpiModel = get_object(
                self._database, KpiModel, str_kpi_key, raise_if_not_found=False)
            if db_kpi is None:
                LOGGER.warning('Kpi({:s}) not found'.format(str_kpi_key))
                continue

            if isinstance(value, int):
                kpi_value_field_name = 'intVal'
                kpi_value_field_cast = int
            elif isinstance(value, float):
                kpi_value_field_name = 'floatVal'
                kpi_value_field_cast = float
            elif isinstance(value, bool):
                kpi_value_field_name = 'boolVal'
                kpi_value_field_cast = bool
            else:
                kpi_value_field_name = 'stringVal'
                kpi_value_field_cast = str

            try:
                self._monitoring_client.IncludeKpi(Kpi(**{
                    'kpi_id'   : {'kpi_id': {'uuid': db_kpi.kpi_uuid}},
                    'timestamp': datetime.utcfromtimestamp(timestamp).isoformat() + 'Z',
                    'kpi_value': {kpi_value_field_name: kpi_value_field_cast(value)}
                }))
            except: # pylint: disable=bare-except
                LOGGER.exception('Unable to format/send Kpi')