Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, queue, re, threading
from datetime import datetime
from typing import Dict
from common.orm.Database import Database
from common.orm.HighLevel import get_object
from common.orm.backend.Tools import key_to_str
from common.proto.monitoring_pb2 import Kpi
from monitoring.client.MonitoringClient import MonitoringClient
from .database.KpiModel import KpiModel
from .database.RelationModels import EndPointMonitorKpiModel
from .driver_api._Driver import _Driver
LOGGER = logging.getLogger(__name__)
QUEUE_GET_WAIT_TIMEOUT = 0.5
class MonitoringLoop:
def __init__(self, device_uuid : str, driver : _Driver, samples_queue : queue.Queue) -> None:
self._device_uuid = device_uuid
self._driver = driver
self._samples_queue = samples_queue
self._running = threading.Event()
self._terminate = threading.Event()
self._samples_stream = self._driver.GetState(blocking=True, terminate=self._terminate)
self._collector_thread = threading.Thread(target=self._collect, daemon=True)
def _collect(self) -> None:
for sample in self._samples_stream:
if self._terminate.is_set(): break
sample = (self._device_uuid, *sample)
self._samples_queue.put_nowait(sample)
def start(self):
self._collector_thread.start()
self._running.set()
@property
def is_running(self): return self._running.is_set()
def stop(self):
self._terminate.set()
self._collector_thread.join()
class MonitoringLoops:
def __init__(self, database : Database) -> None:
self._monitoring_client = MonitoringClient()
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
self._database = database
self._samples_queue = queue.Queue()
self._running = threading.Event()
self._terminate = threading.Event()
self._lock = threading.Lock()
self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {}
self._exporter_thread = threading.Thread(target=self._export, daemon=True)
def add(self, device_uuid : str, driver : _Driver) -> None:
with self._lock:
monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid)
if (monitoring_loop is not None) and monitoring_loop.is_running: return
monitoring_loop = MonitoringLoop(device_uuid, driver, self._samples_queue)
self._device_uuid__to__monitoring_loop[device_uuid] = monitoring_loop
monitoring_loop.start()
def remove(self, device_uuid : str) -> None:
with self._lock:
monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid)
if monitoring_loop is None: return
if monitoring_loop.is_running: monitoring_loop.stop()
self._device_uuid__to__monitoring_loop.pop(device_uuid, None)
def start(self):
self._exporter_thread.start()
@property
def is_running(self): return self._running.is_set()
def stop(self):
self._terminate.set()
self._exporter_thread.join()
def _export(self) -> None:
if self._database is None:
LOGGER.error('[MonitoringLoops:_export] Database not set. Terminating Exporter.')
return
while not self._terminate.is_set():
try:
sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT)
#LOGGER.debug('[MonitoringLoops:_export] sample={:s}'.format(str(sample)))
except queue.Empty:
continue
device_uuid, timestamp, endpoint_monitor_resource_key, value = sample
endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', endpoint_monitor_resource_key)
str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
#db_entries = self._database.dump()
#LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
#for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
#LOGGER.info('-----------------------------------------------------------')
db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
self._database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
if db_endpoint_monitor_kpi is None:
LOGGER.warning('EndPointMonitorKpi({:s}) not found'.format(str_endpoint_monitor_kpi_key))
continue
str_kpi_key = db_endpoint_monitor_kpi.kpi_fk
db_kpi : KpiModel = get_object(
self._database, KpiModel, str_kpi_key, raise_if_not_found=False)
if db_kpi is None:
LOGGER.warning('Kpi({:s}) not found'.format(str_kpi_key))
continue

Lluis Gifre Renom
committed
# FIXME: uint32 used for intVal results in out of range issues. Temporarily changed to float
# extend the 'kpi_value' to support long integers (uint64 / int64 / ...)
if isinstance(value, int):
kpi_value_field_name = 'int64Val'
kpi_value_field_cast = int
elif isinstance(value, float):
kpi_value_field_name = 'floatVal'
kpi_value_field_cast = float
elif isinstance(value, bool):
kpi_value_field_name = 'boolVal'
kpi_value_field_cast = bool
else:
kpi_value_field_name = 'stringVal'
kpi_value_field_cast = str
try:
self._monitoring_client.IncludeKpi(Kpi(**{
'kpi_id' : {'kpi_id': {'uuid': db_kpi.kpi_uuid}},
'kpi_value': {kpi_value_field_name: kpi_value_field_cast(value)}
}))
except: # pylint: disable=bare-except
LOGGER.exception('Unable to format/send Kpi')