Skip to content
Snippets Groups Projects
EventTools.py 4.17 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    import threading
    from queue import Queue
    
    import grpc
    
    from common.rpc_method_wrapper.ServiceExceptions import ServiceException
    from context.client.ContextClient import ContextClient
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    #from context.proto import kpi_sample_types_pb2
    
    from context.proto.context_pb2 import Empty, EventTypeEnum
    
    from common.logger import getJSONLogger
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from monitoring.client.MonitoringClient import MonitoringClient
    
    from monitoring.proto import monitoring_pb2
    
    LOGGER = getJSONLogger('monitoringservice-server')
    LOGGER.setLevel('DEBUG')
    
    class EventsDeviceCollector:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def __init__(self) -> None: # pylint: disable=redefined-outer-name
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self._context_client_grpc = ContextClient()
            self._device_stream     = self._context_client_grpc.GetDeviceEvents(Empty())
            self._context_client    = self._context_client_grpc
            self._channel           = self._context_client_grpc.channel
            self._monitoring_client = MonitoringClient(host='127.0.0.1')
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self._device_thread   = threading.Thread(target=self._collect, args=(self._device_stream,), daemon=False)
    
        def grpc_server_on(self):
            try:
                grpc.channel_ready_future(self._channel).result(timeout=15)
                return True
            except grpc.FutureTimeoutError:
                return False
    
        def _collect(self, events_stream):
    
            try:
                for event in events_stream:
                    self._events_queue.put_nowait(event)
            except grpc.RpcError as e:
                if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
                    raise # pragma: no cover
    
        def start(self):
    
            try:
                self._device_thread.start()
            except RuntimeError as e:
                LOGGER.exception('Start EventTools exception')
    
    
        def get_event(self, block : bool = True, timeout : float = 0.1):
            return self._events_queue.get(block=block, timeout=timeout)
    
        def stop(self):
            self._device_stream.cancel()
            self._device_thread.join()
    
        def listen_events(self):
            try:
                kpi_id_list = []
    
    
                while not self._events_queue.empty():
                    LOGGER.info('getting Kpi by KpiID')
                    event = self.get_event(block=True)
                    if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE:
                        device = self._context_client.GetDevice(event.device_id)
                        for j,end_point in enumerate(device.device_endpoints):
                            #for i, value in enumerate(kpi_sample_types_pb2.KpiSampleType.values()):
                            for i, value in enumerate(end_point.kpi_sample_types):
                                #if value == kpi_sample_types_pb2.KpiSampleType.KPISAMPLETYPE_UNKNOWN: continue
    
    
    Javi Moreno's avatar
    Javi Moreno committed
                                kpi_descriptor = monitoring_pb2.KpiDescriptor()
    
                                kpi_descriptor.kpi_description                      = device.device_type
    
                                kpi_descriptor.kpi_sample_type                      = value
                                #kpi_descriptor.service_id.service_uuid.uuid         = ""
    
    Javi Moreno's avatar
    Javi Moreno committed
                                kpi_descriptor.device_id.CopyFrom(device.device_id)
                                kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id)
    
                                kpi_id = self._monitoring_client.CreateKpi(kpi_descriptor)
                                kpi_id_list.append(kpi_id)
    
    
                return kpi_id_list
    
            except ServiceException as e:
                LOGGER.exception('ListenEvents exception')
    
            except Exception as e:  # pragma: no cover
                LOGGER.exception('ListenEvents exception')