# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import threading from queue import Queue import grpc from common.rpc_method_wrapper.ServiceExceptions import ServiceException from context.client.ContextClient import ContextClient from context.proto import kpi_sample_types_pb2 from context.proto.context_pb2 import Empty, EventTypeEnum from common.logger import getJSONLogger from monitoring.client.monitoring_client import MonitoringClient from monitoring.proto import monitoring_pb2 LOGGER = getJSONLogger('monitoringservice-server') LOGGER.setLevel('DEBUG') class EventsDeviceCollector: def __init__(self, context_client_grpc : ContextClient, monitoring_client_grpc : MonitoringClient) -> None: # pylint: disable=redefined-outer-name self._events_queue = Queue() self._device_stream = context_client_grpc.GetDeviceEvents(Empty()) self._context_client = context_client_grpc self._channel = context_client_grpc.channel self._monitoring_client = monitoring_client_grpc self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream ,), daemon=False) def grpc_server_on(self): try: grpc.channel_ready_future(self._channel).result(timeout=15) return True except grpc.FutureTimeoutError: return False def _collect(self, events_stream): try: for event in events_stream: self._events_queue.put_nowait(event) except grpc.RpcError as e: if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member raise # pragma: no cover def start(self): try: self._device_thread.start() except RuntimeError as e: LOGGER.exception('Start EventTools exception') def get_event(self, block : bool = True, timeout : float = 0.1): return self._events_queue.get(block=block, timeout=timeout) def stop(self): self._device_stream.cancel() self._device_thread.join() def listen_events(self): try: kpi_id_list = [] while not self._events_queue.empty(): LOGGER.info('getting Kpi by KpiID') event = self.get_event(block=True) if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE: device = self._context_client.GetDevice(event.device_id) for j,end_point in enumerate(device.device_endpoints): #for i, value in enumerate(kpi_sample_types_pb2.KpiSampleType.values()): for i, value in enumerate(end_point.kpi_sample_types): #if value == kpi_sample_types_pb2.KpiSampleType.KPISAMPLETYPE_UNKNOWN: continue kpi_descriptor = monitoring_pb2.KpiDescriptor() kpi_descriptor.kpi_description = device.device_type kpi_descriptor.kpi_sample_type = value #kpi_descriptor.service_id.service_uuid.uuid = "" kpi_descriptor.device_id.CopyFrom(device.device_id) kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id) kpi_id = self._monitoring_client.CreateKpi(kpi_descriptor) kpi_id_list.append(kpi_id) return kpi_id_list except ServiceException as e: LOGGER.exception('ListenEvents exception') except Exception as e: # pragma: no cover LOGGER.exception('ListenEvents exception')