Loading src/device/service/DeviceServiceServicerImpl.py +1 −2 Original line number Diff line number Diff line Loading @@ -223,8 +223,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): assert len(results) == 4 for result in results: assert isinstance(result, bool) and result self.monitoring_loops.add() self.monitoring_loops.add(device_uuid, driver) raise NotImplementedError() return Empty() src/device/service/MonitoringLoops.py +61 −2 Original line number Diff line number Diff line import threading from queue import Queue from typing import Dict from monitoring.client.monitoring_client import MonitoringClient from monitoring.proto.monitoring_pb2 import Kpi from .driver_api._Driver import _Driver QUEUE_GET_WAIT_TIMEOUT = 0.5 class MonitoringLoop: def __init__(self, driver : _Driver, monitoring_client : MonitoringClient) -> None: self._driver = driver self._monitoring_client = monitoring_client self._samples_queue = Queue() self._running = threading.Event() self._terminate = threading.Event() self._samples_stream = self._driver.GetState(blocking=True) self._collector_thread = threading.Thread(target=self._collect, daemon=False) self._exporter_thread = threading.Thread(target=self._export, daemon=False) def _collect(self) -> None: for event in self._samples_stream: if self._terminate.is_set(): break # TODO: add timestamp (if not present) self._samples_queue.put_nowait(event) def _export(self) -> None: while not self._terminate.is_set(): sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT) # TODO: find in database the KpiId, format KPI and send to Monitoring kpi_data = {} self._monitoring_client.IncludeKpi(Kpi(**kpi_data)) def start(self): self._collector_thread.start() self._exporter_thread.start() self._running.set() @property def is_running(self): return self._running.is_set() def stop(self): self._terminate.set() self._samples_stream.cancel() self._collector_thread.join() self._exporter_thread.join() class MonitoringLoops: def __init__(self) -> None: pass def __init__(self, monitoring_client : MonitoringClient) -> None: self.monitoring_client = monitoring_client self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {} def add(self, device_uuid : str, driver : _Driver) -> None: monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) if (monitoring_loop is not None) and monitoring_loop.is_running: return self._device_uuid__to__monitoring_loop.setdefault(device_uuid, MonitoringLoop(driver, self.monitoring_client)) def remove(self, device_uuid : str) -> None: monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) if monitoring_loop is None: return if monitoring_loop.is_running: monitoring_loop.stop() self._device_uuid__to__monitoring_loop.pop(device_uuid, None) Loading
src/device/service/DeviceServiceServicerImpl.py +1 −2 Original line number Diff line number Diff line Loading @@ -223,8 +223,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): assert len(results) == 4 for result in results: assert isinstance(result, bool) and result self.monitoring_loops.add() self.monitoring_loops.add(device_uuid, driver) raise NotImplementedError() return Empty()
src/device/service/MonitoringLoops.py +61 −2 Original line number Diff line number Diff line import threading from queue import Queue from typing import Dict from monitoring.client.monitoring_client import MonitoringClient from monitoring.proto.monitoring_pb2 import Kpi from .driver_api._Driver import _Driver QUEUE_GET_WAIT_TIMEOUT = 0.5 class MonitoringLoop: def __init__(self, driver : _Driver, monitoring_client : MonitoringClient) -> None: self._driver = driver self._monitoring_client = monitoring_client self._samples_queue = Queue() self._running = threading.Event() self._terminate = threading.Event() self._samples_stream = self._driver.GetState(blocking=True) self._collector_thread = threading.Thread(target=self._collect, daemon=False) self._exporter_thread = threading.Thread(target=self._export, daemon=False) def _collect(self) -> None: for event in self._samples_stream: if self._terminate.is_set(): break # TODO: add timestamp (if not present) self._samples_queue.put_nowait(event) def _export(self) -> None: while not self._terminate.is_set(): sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT) # TODO: find in database the KpiId, format KPI and send to Monitoring kpi_data = {} self._monitoring_client.IncludeKpi(Kpi(**kpi_data)) def start(self): self._collector_thread.start() self._exporter_thread.start() self._running.set() @property def is_running(self): return self._running.is_set() def stop(self): self._terminate.set() self._samples_stream.cancel() self._collector_thread.join() self._exporter_thread.join() class MonitoringLoops: def __init__(self) -> None: pass def __init__(self, monitoring_client : MonitoringClient) -> None: self.monitoring_client = monitoring_client self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {} def add(self, device_uuid : str, driver : _Driver) -> None: monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) if (monitoring_loop is not None) and monitoring_loop.is_running: return self._device_uuid__to__monitoring_loop.setdefault(device_uuid, MonitoringLoop(driver, self.monitoring_client)) def remove(self, device_uuid : str) -> None: monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) if monitoring_loop is None: return if monitoring_loop.is_running: monitoring_loop.stop() self._device_uuid__to__monitoring_loop.pop(device_uuid, None)