#import logging, queue, threading #from typing import Dict #from monitoring.client.monitoring_client import MonitoringClient #from monitoring.proto.monitoring_pb2 import Kpi #from .driver_api._Driver import _Driver # #LOGGER = logging.getLogger(__name__) #QUEUE_GET_WAIT_TIMEOUT = 0.5 # #class MonitoringLoop: # def __init__(self, driver : _Driver, samples_queue : queue.Queue) -> None: # self._driver = driver # self._samples_queue = samples_queue # self._running = threading.Event() # self._terminate = threading.Event() # self._samples_stream = self._driver.GetState(blocking=True) # self._collector_thread = threading.Thread(target=self._collect, daemon=False) # # def _collect(self) -> None: # for sample in self._samples_stream: # if self._terminate.is_set(): break # LOGGER.info('[MonitoringLoop:_collect] sample={:s}'.format(str(sample))) # # TODO: add timestamp (if not present) # self._samples_queue.put_nowait(sample) # # def start(self): # self._collector_thread.start() # self._running.set() # # @property # def is_running(self): return self._running.is_set() # # def stop(self): # self._terminate.set() # self._samples_stream.cancel() # self._collector_thread.join() # #class MonitoringLoops: # def __init__(self, monitoring_client : MonitoringClient) -> None: # self._monitoring_client = monitoring_client # self._samples_queue = queue.Queue() # self._running = threading.Event() # self._terminate = threading.Event() # self._lock = threading.Lock() # self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {} # self._exporter_thread = threading.Thread(target=self._export, daemon=False) # # def add(self, device_uuid : str, driver : _Driver) -> None: # with self._lock: # monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) # if (monitoring_loop is not None) and monitoring_loop.is_running: return # monitoring_loop = MonitoringLoop(driver, self._samples_queue) # self._device_uuid__to__monitoring_loop[device_uuid] = monitoring_loop # monitoring_loop.start() # # def remove(self, device_uuid : str) -> None: # with self._lock: # monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) # if monitoring_loop is None: return # if monitoring_loop.is_running: monitoring_loop.stop() # self._device_uuid__to__monitoring_loop.pop(device_uuid, None) # # def start(self): # self._exporter_thread.start() # self._running.set() # # @property # def is_running(self): return self._running.is_set() # # def stop(self): # self._terminate.set() # self._exporter_thread.join() # # def _export(self) -> None: # while not self._terminate.is_set(): # try: # sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT) # LOGGER.info('[MonitoringLoops:_export] sample={:s}'.format(str(sample))) # except queue.Empty: # continue # # TODO: find in database the KpiId, format KPI and send to Monitoring # kpi_data = {} # self._monitoring_client.IncludeKpi(Kpi(**kpi_data))