Skip to content
Snippets Groups Projects
EventTools.py 4.35 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, logging
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.ServiceExceptions import ServiceException
from context.client.ContextClient import ContextClient
from common.proto.context_pb2 import Empty, EventTypeEnum
from monitoring.client.MonitoringClient import MonitoringClient
from monitoring.service.MonitoringServiceServicerImpl import LOGGER
from common.proto import monitoring_pb2
LOGGER = logging.getLogger(__name__)

    def __init__(self) -> None: # pylint: disable=redefined-outer-name
        self._context_client_grpc = ContextClient()
        self._device_stream     = self._context_client_grpc.GetDeviceEvents(Empty())
        self._context_client    = self._context_client_grpc
        self._channel           = self._context_client_grpc.channel
        self._monitoring_client = MonitoringClient(host='127.0.0.1')
        self._device_thread   = threading.Thread(target=self._collect, args=(self._device_stream,), daemon=False)
    def grpc_server_on(self):
        try:
            grpc.channel_ready_future(self._channel).result(timeout=15)
            return True
        except grpc.FutureTimeoutError:
            return False

    def _collect(self, events_stream):
        LOGGER.warning('[_collect] begin')
        try:
            for event in events_stream:
                self._events_queue.put_nowait(event)
        except grpc.RpcError as e:
            if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
                LOGGER.warning('[_collect] raise')
        LOGGER.warning('[_collect] end')
        try:
            self._device_thread.start()
        except RuntimeError as e:
            LOGGER.exception('Start EventTools exception')

    def get_event(self, block : bool = True, timeout : float = 0.1):
        return self._events_queue.get(block=block, timeout=timeout)

    def stop(self):
        LOGGER.warning('[stop] begin')
        LOGGER.warning('[stop] joining')
        LOGGER.warning('[stop] end')

            while not self._events_queue.empty():
                # LOGGER.info('getting Kpi by KpiID')
                event = self.get_event(block=True)
                if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE:
                    device = self._context_client.GetDevice(event.device_id)
                    for j,end_point in enumerate(device.device_endpoints):
                        #for i, value in enumerate(kpi_sample_types_pb2.KpiSampleType.values()):
                        for i, value in enumerate(end_point.kpi_sample_types):
                            #if value == kpi_sample_types_pb2.KpiSampleType.KPISAMPLETYPE_UNKNOWN: continue

                            kpi_descriptor = monitoring_pb2.KpiDescriptor()

                            kpi_descriptor.kpi_description                      = device.device_type
                            kpi_descriptor.kpi_sample_type                      = value
                            #kpi_descriptor.service_id.service_uuid.uuid         = ""
                            kpi_descriptor.device_id.CopyFrom(device.device_id)
                            kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id)

                            kpi_id = self._monitoring_client.SetKpi(kpi_descriptor)
                            kpi_id_list.append(kpi_id)
            return kpi_id_list
        except ServiceException as e:
            LOGGER.exception('ListenEvents exception')
        except Exception as e:  # pragma: no cover
            LOGGER.exception('ListenEvents exception')