Newer
Older
Javi Moreno
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import threading
from queue import Queue
import grpc
from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from context.client.ContextClient import ContextClient
from context.proto import kpi_sample_types_pb2
from context.proto.context_pb2 import Empty, EventTypeEnum
from common.logger import getJSONLogger
from monitoring.client.monitoring_client import MonitoringClient
from monitoring.proto import monitoring_pb2
LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG')
class EventsDeviceCollector:
def __init__(self, context_client_grpc : ContextClient, monitoring_client_grpc : MonitoringClient) -> None: # pylint: disable=redefined-outer-name
self._events_queue = Queue()
self._device_stream = context_client_grpc.GetDeviceEvents(Empty())
self._context_client = context_client_grpc
self._monitoring_client = monitoring_client_grpc
self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream ,), daemon=False)
def _collect(self, events_stream) -> None:
try:
for event in events_stream:
self._events_queue.put_nowait(event)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
raise # pragma: no cover
def start(self):
self._device_thread.start()
def get_event(self, block : bool = True, timeout : float = 0.1):
return self._events_queue.get(block=block, timeout=timeout)
def stop(self):
self._device_stream.cancel()
self._device_thread.join()
def listen_events(self):
LOGGER.info('getting Kpi by KpiID')
Javi Moreno
committed
try:
kpi_id_list = []
if qsize > 0:
for i in range(qsize):
print("Queue size: "+str(qsize))
event = self.get_event(block=True)
if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE:
device = self._context_client.GetDevice(event.device_id)
print("Endpoints value: " + str(len(device.device_endpoints)))
for j,end_point in enumerate(device.device_endpoints):
# for k,rule in enumerate(device.device_config.config_rules):
kpi_descriptor = monitoring_pb2.KpiDescriptor()
kpi_descriptor.kpi_description = device.device_type
kpi_descriptor.kpi_sample_type = kpi_sample_types_pb2.KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED
kpi_descriptor.device_id.CopyFrom(device.device_id)
kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id)
kpi_descriptor.service_id.service_uuid.uuid = "SERV"+str(i+1)
kpi_id = self._monitoring_client.CreateKpi(kpi_descriptor)
kpi_id_list.append(kpi_id)
Javi Moreno
committed
return kpi_id_list
except ServiceException as e:
LOGGER.exception('ListenEvents exception')
except Exception as e: # pragma: no cover
LOGGER.exception('ListenEvents exception')