From 13f6b4385da2103c70c7f77e74093d69042614b4 Mon Sep 17 00:00:00 2001
From: gifrerenom <lluis.gifre@cttc.es>
Date: Sat, 28 Jan 2023 16:19:37 +0000
Subject: [PATCH] Monitoring component:

- added log verbosity in CI/CD pipeline since it gets stuck for unknown reasons
---
 src/monitoring/service/EventTools.py | 60 ++++++++++++++--------------
 src/monitoring/service/__main__.py   |  6 ++-
 2 files changed, 34 insertions(+), 32 deletions(-)

diff --git a/src/monitoring/service/EventTools.py b/src/monitoring/service/EventTools.py
index 9ebe0774c..95350ae70 100644
--- a/src/monitoring/service/EventTools.py
+++ b/src/monitoring/service/EventTools.py
@@ -12,25 +12,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import threading
-from queue import Queue
-
-import grpc, logging
-
+import grpc, logging, queue, threading
 from common.method_wrappers.ServiceExceptions import ServiceException
-from context.client.ContextClient import ContextClient
-
+from common.proto import monitoring_pb2
 from common.proto.context_pb2 import Empty, EventTypeEnum
-
+from context.client.ContextClient import ContextClient
 from monitoring.client.MonitoringClient import MonitoringClient
 from monitoring.service.MonitoringServiceServicerImpl import LOGGER
-from common.proto import monitoring_pb2
 
 LOGGER = logging.getLogger(__name__)
 
 class EventsDeviceCollector:
     def __init__(self) -> None: # pylint: disable=redefined-outer-name
-        self._events_queue = Queue()
+        self._events_queue = queue.Queue()
 
         self._context_client_grpc = ContextClient()
         self._device_stream     = self._context_client_grpc.GetDeviceEvents(Empty())
@@ -79,28 +73,32 @@ class EventsDeviceCollector:
         try:
             kpi_id_list = []
 
-            while not self._events_queue.empty():
+            while True:
                 # LOGGER.info('getting Kpi by KpiID')
-                LOGGER.warning('[listen_events] waiting event')
-                event = self.get_event(block=True)
-                LOGGER.warning('[listen_events] event received')
-                if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE:
-                    device = self._context_client.GetDevice(event.device_id)
-                    for j,end_point in enumerate(device.device_endpoints):
-                        #for i, value in enumerate(kpi_sample_types_pb2.KpiSampleType.values()):
-                        for i, value in enumerate(end_point.kpi_sample_types):
-                            #if value == kpi_sample_types_pb2.KpiSampleType.KPISAMPLETYPE_UNKNOWN: continue
-
-                            kpi_descriptor = monitoring_pb2.KpiDescriptor()
-
-                            kpi_descriptor.kpi_description                      = device.device_type
-                            kpi_descriptor.kpi_sample_type                      = value
-                            #kpi_descriptor.service_id.service_uuid.uuid         = ""
-                            kpi_descriptor.device_id.CopyFrom(device.device_id)
-                            kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id)
-
-                            kpi_id = self._monitoring_client.SetKpi(kpi_descriptor)
-                            kpi_id_list.append(kpi_id)
+                try:
+                    LOGGER.warning('[listen_events] waiting event')
+                    event = self.get_event(block=True, timeout=0.5)
+                    LOGGER.warning('[listen_events] event received')
+
+                    if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE:
+                        device = self._context_client.GetDevice(event.device_id)
+                        for j,end_point in enumerate(device.device_endpoints):
+                            #for i, value in enumerate(kpi_sample_types_pb2.KpiSampleType.values()):
+                            for i, value in enumerate(end_point.kpi_sample_types):
+                                #if value == kpi_sample_types_pb2.KpiSampleType.KPISAMPLETYPE_UNKNOWN: continue
+
+                                kpi_descriptor = monitoring_pb2.KpiDescriptor()
+
+                                kpi_descriptor.kpi_description                      = device.device_type
+                                kpi_descriptor.kpi_sample_type                      = value
+                                #kpi_descriptor.service_id.service_uuid.uuid         = ""
+                                kpi_descriptor.device_id.CopyFrom(device.device_id)
+                                kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id)
+
+                                kpi_id = self._monitoring_client.SetKpi(kpi_descriptor)
+                                kpi_id_list.append(kpi_id)
+                except queue.Empty:
+                    break
 
             LOGGER.warning('[listen_events] return')
             return kpi_id_list
diff --git a/src/monitoring/service/__main__.py b/src/monitoring/service/__main__.py
index 3334a860c..78764ea64 100644
--- a/src/monitoring/service/__main__.py
+++ b/src/monitoring/service/__main__.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging, signal, sys, threading
+import logging, signal, sys, threading, time
 from prometheus_client import start_http_server
 from common.Constants import ServiceNameEnum
 from common.Settings import (
@@ -35,6 +35,8 @@ def start_monitoring():
     events_collector = EventsDeviceCollector()
     events_collector.start()
 
+    # TODO: redesign this method to be more clear and clean
+
     # Iterate while terminate is not set
     while not terminate.is_set():
         list_new_kpi_ids = events_collector.listen_events()
@@ -48,6 +50,8 @@ def start_monitoring():
                 monitor_kpi_request.monitoring_window_s = 86400
                 monitor_kpi_request.sampling_rate_s = 30
                 events_collector._monitoring_client.MonitorKpi(monitor_kpi_request)
+        
+        time.sleep(0.5) # let other tasks run; do not overload CPU
     else:
         # Terminate is set, looping terminates
         LOGGER.warning("Stopping execution...")
-- 
GitLab