Skip to content
Snippets Groups Projects
EventTools.py 3.28 KiB
Newer Older
import threading
from queue import Queue

import grpc

from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from context.client.ContextClient import ContextClient
from context.proto import kpi_sample_types_pb2
from context.proto.context_pb2 import Empty, EventTypeEnum

from common.logger import getJSONLogger
from monitoring.client.monitoring_client import MonitoringClient
from monitoring.proto import monitoring_pb2

LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG')

class EventsDeviceCollector:
    def __init__(self, context_client_grpc : ContextClient, monitoring_client_grpc : MonitoringClient) -> None: # pylint: disable=redefined-outer-name
        self._events_queue = Queue()

        self._device_stream   = context_client_grpc.GetDeviceEvents(Empty())
        self._context_client  = context_client_grpc
        self._monitoring_client = monitoring_client_grpc

        self._device_thread   = threading.Thread(target=self._collect, args=(self._device_stream  ,), daemon=False)

    def _collect(self, events_stream) -> None:
        try:
            for event in events_stream:
                self._events_queue.put_nowait(event)
        except grpc.RpcError as e:
            if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
                raise # pragma: no cover

    def start(self):
        self._device_thread.start()

    def get_event(self, block : bool = True, timeout : float = 0.1):
        return self._events_queue.get(block=block, timeout=timeout)

    def stop(self):

        self._device_stream.cancel()

        self._device_thread.join()

    def listen_events(self):
        LOGGER.info('getting Kpi by KpiID')
        qsize = self._events_queue.qsize()
            if qsize > 0:
                for i in range(qsize):
                    print("Queue size: "+str(qsize))
                    event = self.get_event(block=True)
                    if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE:
                        device = self._context_client.GetDevice(event.device_id)
                        print("Endpoints value: " + str(len(device.device_endpoints)))
                        for j,end_point in enumerate(device.device_endpoints):

                            # for k,rule in enumerate(device.device_config.config_rules):
                            kpi_descriptor = monitoring_pb2.KpiDescriptor()

                            kpi_descriptor.kpi_description                      = device.device_type
                            kpi_descriptor.kpi_sample_type                      = kpi_sample_types_pb2.KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED
                            kpi_descriptor.device_id.CopyFrom(device.device_id)
                            kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id)
                            kpi_descriptor.service_id.service_uuid.uuid         = "SERV"+str(i+1)

                            kpi_id = self._monitoring_client.CreateKpi(kpi_descriptor)
                            kpi_id_list.append(kpi_id)

            return kpi_id_list

        except ServiceException as e:
            LOGGER.exception('ListenEvents exception')

        except Exception as e:  # pragma: no cover
            LOGGER.exception('ListenEvents exception')