From 2980429b762c389c0cca6f3d62cd7d6abfe9d99f Mon Sep 17 00:00:00 2001 From: Lluis Gifre Date: Thu, 4 Nov 2021 19:13:37 +0100 Subject: [PATCH] Implemented skeleton of Device Monitoring Loops. --- .../service/DeviceServiceServicerImpl.py | 3 +- src/device/service/MonitoringLoops.py | 63 ++++++++++++++++++- 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py index 746ef40b4..ea42ab282 100644 --- a/src/device/service/DeviceServiceServicerImpl.py +++ b/src/device/service/DeviceServiceServicerImpl.py @@ -223,8 +223,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): assert len(results) == 4 for result in results: assert isinstance(result, bool) and result - - self.monitoring_loops.add() + self.monitoring_loops.add(device_uuid, driver) raise NotImplementedError() return Empty() diff --git a/src/device/service/MonitoringLoops.py b/src/device/service/MonitoringLoops.py index 831534232..48c185fed 100644 --- a/src/device/service/MonitoringLoops.py +++ b/src/device/service/MonitoringLoops.py @@ -1,3 +1,62 @@ +import threading +from queue import Queue +from typing import Dict +from monitoring.client.monitoring_client import MonitoringClient +from monitoring.proto.monitoring_pb2 import Kpi +from .driver_api._Driver import _Driver + +QUEUE_GET_WAIT_TIMEOUT = 0.5 + +class MonitoringLoop: + def __init__(self, driver : _Driver, monitoring_client : MonitoringClient) -> None: + self._driver = driver + self._monitoring_client = monitoring_client + self._samples_queue = Queue() + self._running = threading.Event() + self._terminate = threading.Event() + self._samples_stream = self._driver.GetState(blocking=True) + self._collector_thread = threading.Thread(target=self._collect, daemon=False) + self._exporter_thread = threading.Thread(target=self._export, daemon=False) + + def _collect(self) -> None: + for event in self._samples_stream: + if self._terminate.is_set(): break + # TODO: add timestamp (if not present) + self._samples_queue.put_nowait(event) + + def _export(self) -> None: + while not self._terminate.is_set(): + sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT) + # TODO: find in database the KpiId, format KPI and send to Monitoring + kpi_data = {} + self._monitoring_client.IncludeKpi(Kpi(**kpi_data)) + + def start(self): + self._collector_thread.start() + self._exporter_thread.start() + self._running.set() + + @property + def is_running(self): return self._running.is_set() + + def stop(self): + self._terminate.set() + self._samples_stream.cancel() + self._collector_thread.join() + self._exporter_thread.join() + class MonitoringLoops: - def __init__(self) -> None: - pass + def __init__(self, monitoring_client : MonitoringClient) -> None: + self.monitoring_client = monitoring_client + self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {} + + def add(self, device_uuid : str, driver : _Driver) -> None: + monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) + if (monitoring_loop is not None) and monitoring_loop.is_running: return + self._device_uuid__to__monitoring_loop.setdefault(device_uuid, MonitoringLoop(driver, self.monitoring_client)) + + def remove(self, device_uuid : str) -> None: + monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) + if monitoring_loop is None: return + if monitoring_loop.is_running: monitoring_loop.stop() + self._device_uuid__to__monitoring_loop.pop(device_uuid, None) -- GitLab