diff --git a/src/monitoring/service/EventTools.py b/src/monitoring/service/EventTools.py index 9ebe0774cab5bde8207b7a75a9177007d48d62e4..95350ae706105edf2b7bad49969b853adf947a84 100644 --- a/src/monitoring/service/EventTools.py +++ b/src/monitoring/service/EventTools.py @@ -12,25 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -import threading -from queue import Queue - -import grpc, logging - +import grpc, logging, queue, threading from common.method_wrappers.ServiceExceptions import ServiceException -from context.client.ContextClient import ContextClient - +from common.proto import monitoring_pb2 from common.proto.context_pb2 import Empty, EventTypeEnum - +from context.client.ContextClient import ContextClient from monitoring.client.MonitoringClient import MonitoringClient from monitoring.service.MonitoringServiceServicerImpl import LOGGER -from common.proto import monitoring_pb2 LOGGER = logging.getLogger(__name__) class EventsDeviceCollector: def __init__(self) -> None: # pylint: disable=redefined-outer-name - self._events_queue = Queue() + self._events_queue = queue.Queue() self._context_client_grpc = ContextClient() self._device_stream = self._context_client_grpc.GetDeviceEvents(Empty()) @@ -79,28 +73,32 @@ class EventsDeviceCollector: try: kpi_id_list = [] - while not self._events_queue.empty(): + while True: # LOGGER.info('getting Kpi by KpiID') - LOGGER.warning('[listen_events] waiting event') - event = self.get_event(block=True) - LOGGER.warning('[listen_events] event received') - if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE: - device = self._context_client.GetDevice(event.device_id) - for j,end_point in enumerate(device.device_endpoints): - #for i, value in enumerate(kpi_sample_types_pb2.KpiSampleType.values()): - for i, value in enumerate(end_point.kpi_sample_types): - #if value == kpi_sample_types_pb2.KpiSampleType.KPISAMPLETYPE_UNKNOWN: continue - - kpi_descriptor = monitoring_pb2.KpiDescriptor() - - kpi_descriptor.kpi_description = device.device_type - kpi_descriptor.kpi_sample_type = value - #kpi_descriptor.service_id.service_uuid.uuid = "" - kpi_descriptor.device_id.CopyFrom(device.device_id) - kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id) - - kpi_id = self._monitoring_client.SetKpi(kpi_descriptor) - kpi_id_list.append(kpi_id) + try: + LOGGER.warning('[listen_events] waiting event') + event = self.get_event(block=True, timeout=0.5) + LOGGER.warning('[listen_events] event received') + + if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE: + device = self._context_client.GetDevice(event.device_id) + for j,end_point in enumerate(device.device_endpoints): + #for i, value in enumerate(kpi_sample_types_pb2.KpiSampleType.values()): + for i, value in enumerate(end_point.kpi_sample_types): + #if value == kpi_sample_types_pb2.KpiSampleType.KPISAMPLETYPE_UNKNOWN: continue + + kpi_descriptor = monitoring_pb2.KpiDescriptor() + + kpi_descriptor.kpi_description = device.device_type + kpi_descriptor.kpi_sample_type = value + #kpi_descriptor.service_id.service_uuid.uuid = "" + kpi_descriptor.device_id.CopyFrom(device.device_id) + kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id) + + kpi_id = self._monitoring_client.SetKpi(kpi_descriptor) + kpi_id_list.append(kpi_id) + except queue.Empty: + break LOGGER.warning('[listen_events] return') return kpi_id_list diff --git a/src/monitoring/service/__main__.py b/src/monitoring/service/__main__.py index 3334a860ccd94d51390ab5f5869d25e2475084ee..78764ea64e39c48d927901ad88e7cff569e7447b 100644 --- a/src/monitoring/service/__main__.py +++ b/src/monitoring/service/__main__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, signal, sys, threading +import logging, signal, sys, threading, time from prometheus_client import start_http_server from common.Constants import ServiceNameEnum from common.Settings import ( @@ -35,6 +35,8 @@ def start_monitoring(): events_collector = EventsDeviceCollector() events_collector.start() + # TODO: redesign this method to be more clear and clean + # Iterate while terminate is not set while not terminate.is_set(): list_new_kpi_ids = events_collector.listen_events() @@ -48,6 +50,8 @@ def start_monitoring(): monitor_kpi_request.monitoring_window_s = 86400 monitor_kpi_request.sampling_rate_s = 30 events_collector._monitoring_client.MonitorKpi(monitor_kpi_request) + + time.sleep(0.5) # let other tasks run; do not overload CPU else: # Terminate is set, looping terminates LOGGER.warning("Stopping execution...")