Loading src/monitoring/service/EventTools.py +29 −31 Original line number Diff line number Diff line Loading @@ -12,25 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. import threading from queue import Queue import grpc, logging import grpc, logging, queue, threading from common.method_wrappers.ServiceExceptions import ServiceException from context.client.ContextClient import ContextClient from common.proto import monitoring_pb2 from common.proto.context_pb2 import Empty, EventTypeEnum from context.client.ContextClient import ContextClient from monitoring.client.MonitoringClient import MonitoringClient from monitoring.service.MonitoringServiceServicerImpl import LOGGER from common.proto import monitoring_pb2 LOGGER = logging.getLogger(__name__) class EventsDeviceCollector: def __init__(self) -> None: # pylint: disable=redefined-outer-name self._events_queue = Queue() self._events_queue = queue.Queue() self._context_client_grpc = ContextClient() self._device_stream = self._context_client_grpc.GetDeviceEvents(Empty()) Loading Loading @@ -79,11 +73,13 @@ class EventsDeviceCollector: try: kpi_id_list = [] while not self._events_queue.empty(): while True: # LOGGER.info('getting Kpi by KpiID') try: LOGGER.warning('[listen_events] waiting event') event = self.get_event(block=True) event = self.get_event(block=True, timeout=0.5) LOGGER.warning('[listen_events] event received') if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE: device = self._context_client.GetDevice(event.device_id) for j,end_point in enumerate(device.device_endpoints): Loading @@ -101,6 +97,8 @@ class EventsDeviceCollector: kpi_id = self._monitoring_client.SetKpi(kpi_descriptor) kpi_id_list.append(kpi_id) except queue.Empty: break LOGGER.warning('[listen_events] return') return kpi_id_list Loading src/monitoring/service/__main__.py +5 −1 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging, signal, sys, threading import logging, signal, sys, threading, time from prometheus_client import start_http_server from common.Constants import ServiceNameEnum from common.Settings import ( Loading @@ -35,6 +35,8 @@ def start_monitoring(): events_collector = EventsDeviceCollector() events_collector.start() # TODO: redesign this method to be more clear and clean # Iterate while terminate is not set while not terminate.is_set(): list_new_kpi_ids = events_collector.listen_events() Loading @@ -48,6 +50,8 @@ def start_monitoring(): monitor_kpi_request.monitoring_window_s = 86400 monitor_kpi_request.sampling_rate_s = 30 events_collector._monitoring_client.MonitorKpi(monitor_kpi_request) time.sleep(0.5) # let other tasks run; do not overload CPU else: # Terminate is set, looping terminates LOGGER.warning("Stopping execution...") Loading Loading
src/monitoring/service/EventTools.py +29 −31 Original line number Diff line number Diff line Loading @@ -12,25 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. import threading from queue import Queue import grpc, logging import grpc, logging, queue, threading from common.method_wrappers.ServiceExceptions import ServiceException from context.client.ContextClient import ContextClient from common.proto import monitoring_pb2 from common.proto.context_pb2 import Empty, EventTypeEnum from context.client.ContextClient import ContextClient from monitoring.client.MonitoringClient import MonitoringClient from monitoring.service.MonitoringServiceServicerImpl import LOGGER from common.proto import monitoring_pb2 LOGGER = logging.getLogger(__name__) class EventsDeviceCollector: def __init__(self) -> None: # pylint: disable=redefined-outer-name self._events_queue = Queue() self._events_queue = queue.Queue() self._context_client_grpc = ContextClient() self._device_stream = self._context_client_grpc.GetDeviceEvents(Empty()) Loading Loading @@ -79,11 +73,13 @@ class EventsDeviceCollector: try: kpi_id_list = [] while not self._events_queue.empty(): while True: # LOGGER.info('getting Kpi by KpiID') try: LOGGER.warning('[listen_events] waiting event') event = self.get_event(block=True) event = self.get_event(block=True, timeout=0.5) LOGGER.warning('[listen_events] event received') if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE: device = self._context_client.GetDevice(event.device_id) for j,end_point in enumerate(device.device_endpoints): Loading @@ -101,6 +97,8 @@ class EventsDeviceCollector: kpi_id = self._monitoring_client.SetKpi(kpi_descriptor) kpi_id_list.append(kpi_id) except queue.Empty: break LOGGER.warning('[listen_events] return') return kpi_id_list Loading
src/monitoring/service/__main__.py +5 −1 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging, signal, sys, threading import logging, signal, sys, threading, time from prometheus_client import start_http_server from common.Constants import ServiceNameEnum from common.Settings import ( Loading @@ -35,6 +35,8 @@ def start_monitoring(): events_collector = EventsDeviceCollector() events_collector.start() # TODO: redesign this method to be more clear and clean # Iterate while terminate is not set while not terminate.is_set(): list_new_kpi_ids = events_collector.listen_events() Loading @@ -48,6 +50,8 @@ def start_monitoring(): monitor_kpi_request.monitoring_window_s = 86400 monitor_kpi_request.sampling_rate_s = 30 events_collector._monitoring_client.MonitorKpi(monitor_kpi_request) time.sleep(0.5) # let other tasks run; do not overload CPU else: # Terminate is set, looping terminates LOGGER.warning("Stopping execution...") Loading