Skip to content
Snippets Groups Projects
MonitoringLoops.py 6.55 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, queue, re, threading
from datetime import datetime
from typing import Dict
from common.orm.Database import Database
from common.orm.HighLevel import get_object
from common.orm.backend.Tools import key_to_str
from common.proto.monitoring_pb2 import Kpi
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from monitoring.client.MonitoringClient import MonitoringClient
from .database.KpiModel import KpiModel
from .database.RelationModels import EndPointMonitorKpiModel
from .driver_api._Driver import _Driver

LOGGER = logging.getLogger(__name__)
QUEUE_GET_WAIT_TIMEOUT = 0.5

class MonitoringLoop:
    def __init__(self, device_uuid : str, driver : _Driver, samples_queue : queue.Queue) -> None:
        self._device_uuid = device_uuid
        self._driver = driver
        self._samples_queue = samples_queue
        self._running = threading.Event()
        self._terminate = threading.Event()
        self._samples_stream = self._driver.GetState(blocking=True, terminate=self._terminate)
        self._collector_thread = threading.Thread(target=self._collect, daemon=True)

    def _collect(self) -> None:
        for sample in self._samples_stream:
            if self._terminate.is_set(): break
            sample = (self._device_uuid, *sample)
            self._samples_queue.put_nowait(sample)

    def start(self):
        self._collector_thread.start()
        self._running.set()

    @property
    def is_running(self): return self._running.is_set()

    def stop(self):
        self._terminate.set()
        self._collector_thread.join()

class MonitoringLoops:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, database : Database) -> None:
        self._monitoring_client = MonitoringClient()
        self._database = database
        self._samples_queue = queue.Queue()
        self._running = threading.Event()
        self._terminate = threading.Event()
        self._lock = threading.Lock()
        self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {}
        self._exporter_thread = threading.Thread(target=self._export, daemon=True)

    def add(self, device_uuid : str, driver : _Driver) -> None:
        with self._lock:
            monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid)
            if (monitoring_loop is not None) and monitoring_loop.is_running: return
            monitoring_loop = MonitoringLoop(device_uuid, driver, self._samples_queue)
            self._device_uuid__to__monitoring_loop[device_uuid] = monitoring_loop
            monitoring_loop.start()

    def remove(self, device_uuid : str) -> None:
        with self._lock:
            monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid)
            if monitoring_loop is None: return
            if monitoring_loop.is_running: monitoring_loop.stop()
            self._device_uuid__to__monitoring_loop.pop(device_uuid, None)

    def start(self):
        self._exporter_thread.start()

    @property
    def is_running(self): return self._running.is_set()

    def stop(self):
        self._terminate.set()
        self._exporter_thread.join()

    def _export(self) -> None:
        if self._database is None:
            LOGGER.error('[MonitoringLoops:_export] Database not set. Terminating Exporter.')
            return

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._running.set()
        while not self._terminate.is_set():
            try:
                sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT)
                #LOGGER.debug('[MonitoringLoops:_export] sample={:s}'.format(str(sample)))
            except queue.Empty:
                continue

            device_uuid, timestamp, endpoint_monitor_resource_key, value = sample
            endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', endpoint_monitor_resource_key)
            str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')

            #db_entries = self._database.dump()
            #LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
            #for db_entry in db_entries:
            #    LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
            #LOGGER.info('-----------------------------------------------------------')

            db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
                self._database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
            if db_endpoint_monitor_kpi is None:
                LOGGER.warning('EndPointMonitorKpi({:s}) not found'.format(str_endpoint_monitor_kpi_key))
                continue

            str_kpi_key = db_endpoint_monitor_kpi.kpi_fk
            db_kpi : KpiModel = get_object(
                self._database, KpiModel, str_kpi_key, raise_if_not_found=False)
            if db_kpi is None:
                LOGGER.warning('Kpi({:s}) not found'.format(str_kpi_key))
                continue

            # FIXME: uint32 used for intVal results in out of range issues. Temporarily changed to float
            #        extend the 'kpi_value' to support long integers (uint64 / int64 / ...)
            if isinstance(value, int):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                kpi_value_field_name = 'int64Val'
                kpi_value_field_cast = int
            elif isinstance(value, float):
                kpi_value_field_name = 'floatVal'
                kpi_value_field_cast = float
            elif isinstance(value, bool):
                kpi_value_field_name = 'boolVal'
                kpi_value_field_cast = bool
            else:
                kpi_value_field_name = 'stringVal'
                kpi_value_field_cast = str

            try:
                self._monitoring_client.IncludeKpi(Kpi(**{
                    'kpi_id'   : {'kpi_id': {'uuid': db_kpi.kpi_uuid}},
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    'timestamp': {'timestamp': timestamp},
                    'kpi_value': {kpi_value_field_name: kpi_value_field_cast(value)}
                }))
            except: # pylint: disable=bare-except
                LOGGER.exception('Unable to format/send Kpi')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        self._running.clear()