Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#import logging, queue, threading
#from typing import Dict
#from monitoring.client.monitoring_client import MonitoringClient
#from monitoring.proto.monitoring_pb2 import Kpi
#from .driver_api._Driver import _Driver
#
#LOGGER = logging.getLogger(__name__)
#QUEUE_GET_WAIT_TIMEOUT = 0.5
#
#class MonitoringLoop:
# def __init__(self, driver : _Driver, samples_queue : queue.Queue) -> None:
# self._driver = driver
# self._samples_queue = samples_queue
# self._running = threading.Event()
# self._terminate = threading.Event()
# self._samples_stream = self._driver.GetState(blocking=True)
# self._collector_thread = threading.Thread(target=self._collect, daemon=False)
#
# def _collect(self) -> None:
# for sample in self._samples_stream:
# if self._terminate.is_set(): break
# LOGGER.info('[MonitoringLoop:_collect] sample={:s}'.format(str(sample)))
# # TODO: add timestamp (if not present)
# self._samples_queue.put_nowait(sample)
#
# def start(self):
# self._collector_thread.start()
# self._running.set()
#
# @property
# def is_running(self): return self._running.is_set()
#
# def stop(self):
# self._terminate.set()
# self._samples_stream.cancel()
# self._collector_thread.join()
#
#class MonitoringLoops:
# def __init__(self, monitoring_client : MonitoringClient) -> None:
# self._monitoring_client = monitoring_client
# self._samples_queue = queue.Queue()
# self._running = threading.Event()
# self._terminate = threading.Event()
# self._lock = threading.Lock()
# self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {}
# self._exporter_thread = threading.Thread(target=self._export, daemon=False)
#
# def add(self, device_uuid : str, driver : _Driver) -> None:
# with self._lock:
# monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid)
# if (monitoring_loop is not None) and monitoring_loop.is_running: return
# monitoring_loop = MonitoringLoop(driver, self._samples_queue)
# self._device_uuid__to__monitoring_loop[device_uuid] = monitoring_loop
# monitoring_loop.start()
#
# def remove(self, device_uuid : str) -> None:
# with self._lock:
# monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid)
# if monitoring_loop is None: return
# if monitoring_loop.is_running: monitoring_loop.stop()
# self._device_uuid__to__monitoring_loop.pop(device_uuid, None)
#
# def start(self):
# self._exporter_thread.start()
# self._running.set()
#
# @property
# def is_running(self): return self._running.is_set()
#
# def stop(self):
# self._terminate.set()
# self._exporter_thread.join()
#
# def _export(self) -> None:
# while not self._terminate.is_set():
# try:
# sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT)
# LOGGER.info('[MonitoringLoops:_export] sample={:s}'.format(str(sample)))
# except queue.Empty:
# continue
# # TODO: find in database the KpiId, format KPI and send to Monitoring
# kpi_data = {}
# self._monitoring_client.IncludeKpi(Kpi(**kpi_data))