diff --git a/src/monitoring/client/monitoring_client.py b/src/monitoring/client/monitoring_client.py index dc21275482ac039c79ace8b956df24223afc7a23..c3ab508c947ae07835801ad724f4589af92379c8 100644 --- a/src/monitoring/client/monitoring_client.py +++ b/src/monitoring/client/monitoring_client.py @@ -21,19 +21,19 @@ class MonitoringClient: LOGGER.info('CreateKpi: {}'.format(request)) response = self.server.CreateKpi(request) LOGGER.info('CreateKpi result: {}'.format(response)) - return monitoring_pb2.KpiId() + return response def MonitorKpi(self, request): LOGGER.info('MonitorKpi: {}'.format(request)) response = self.server.MonitorKpi(request) LOGGER.info('MonitorKpi result: {}'.format(response)) - return context_pb2.Empty() + return response def IncludeKpi(self, request): LOGGER.info('IncludeKpi: {}'.format(request)) response = self.server.IncludeKpi(request) LOGGER.info('IncludeKpi result: {}'.format(response)) - return context_pb2.Empty() + return response def GetStreamKpi(self, request): LOGGER.info('GetStreamKpi: {}'.format(request)) @@ -51,13 +51,7 @@ class MonitoringClient: LOGGER.info('GetKpiDescriptor: {}'.format(request)) response = self.server.GetKpiDescriptor(request) LOGGER.info('GetKpiDescriptor result: {}'.format(response)) - return monitoring_pb2.KpiDescriptor() - - def ListenEvents(self, ): - LOGGER.info('ListenEvents: {}'.format()) - response = self.server.ListenEvents() - LOGGER.info('ListenEvents result: {}'.format(response)) - return monitoring_pb2.KpiDescriptor() + return response if __name__ == '__main__': # get port diff --git a/src/monitoring/service/EventTools.py b/src/monitoring/service/EventTools.py index 4bf5bffa9c93bd1b0a8fb5ac4a7a273d32ad9eaa..c54c5f57b7e45c9a71ea07e5b1f84b68334f86fa 100644 --- a/src/monitoring/service/EventTools.py +++ b/src/monitoring/service/EventTools.py @@ -47,27 +47,29 @@ class EventsDeviceCollector: def listen_events(self): LOGGER.info('getting Kpi by KpiID') - + qsize = self._events_queue.qsize() try: kpi_id_list = [] - for i in range(self._events_queue.qsize()): - event = self.get_event(block=True) - - if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE: - device = self._context_client.GetDevice(event.device_id) - - for j,end_point in enumerate(device.device_endpoints): - # for k,rule in enumerate(device.device_config.config_rules): - kpi_descriptor = monitoring_pb2.KpiDescriptor() - - kpi_descriptor.kpi_description = device.device_type - kpi_descriptor.kpi_sample_type = kpi_sample_types_pb2.KpiSampleType.PACKETS_TRANSMITTED - kpi_descriptor.device_id.CopyFrom(device.device_id) - kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id) - kpi_descriptor.service_id.service_uuid.uuid = "SERV"+str(i+1) - - kpi_id = self._monitoring_client.CreateKpi(kpi_descriptor) - kpi_id_list.append(kpi_id) + if qsize > 0: + for i in range(qsize): + print("Queue size: "+str(qsize)) + event = self.get_event(block=True) + if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE: + device = self._context_client.GetDevice(event.device_id) + print("Endpoints value: " + str(len(device.device_endpoints))) + for j,end_point in enumerate(device.device_endpoints): + + # for k,rule in enumerate(device.device_config.config_rules): + kpi_descriptor = monitoring_pb2.KpiDescriptor() + + kpi_descriptor.kpi_description = device.device_type + kpi_descriptor.kpi_sample_type = kpi_sample_types_pb2.KpiSampleType.PACKETS_TRANSMITTED + kpi_descriptor.device_id.CopyFrom(device.device_id) + kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id) + kpi_descriptor.service_id.service_uuid.uuid = "SERV"+str(i+1) + + kpi_id = self._monitoring_client.CreateKpi(kpi_descriptor) + kpi_id_list.append(kpi_id) return kpi_id_list diff --git a/src/monitoring/service/SqliteTools.py b/src/monitoring/service/SqliteTools.py index 5502ac5c984d47222e3c439680f0838334a770d0..121438c58e320b11b10fa41aa273992b8e25393c 100644 --- a/src/monitoring/service/SqliteTools.py +++ b/src/monitoring/service/SqliteTools.py @@ -16,7 +16,7 @@ class SQLite(): def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id ): c = self.client.cursor() - c.execute("SELECT kpi_id FROM KPI WHERE device_id is ? AND kpi_sample_type is ?",(device_id,kpi_sample_type)) + c.execute("SELECT kpi_id FROM KPI WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ?",(device_id,kpi_sample_type,endpoint_id)) data=c.fetchone() if data is None: c.execute("INSERT INTO KPI (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id)) diff --git a/src/monitoring/service/__main__.py b/src/monitoring/service/__main__.py index 4d001e286ae568198b9b47497d633cade524b864..ea189e1c905b07f92ef3e9cd100763eb7dffa08f 100644 --- a/src/monitoring/service/__main__.py +++ b/src/monitoring/service/__main__.py @@ -1,9 +1,13 @@ import logging, os, signal, sys, threading import time +from context.client.ContextClient import ContextClient from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT from common.logger import getJSONLogger +from monitoring.client.monitoring_client import MonitoringClient +from monitoring.proto import monitoring_pb2 +from monitoring.service.EventTools import EventsDeviceCollector from monitoring.service.MonitoringService import MonitoringService LOGGER = getJSONLogger('monitoringservice-server') @@ -16,9 +20,40 @@ logger = None def signal_handler(signal, frame): global terminate, logger - logger.warning('Terminate signal received') + LOGGER.warning('Terminate signal received') terminate.set() +def start_monitoring(): + LOGGER.info('Start Monitoring...') + context_client_grpc = ContextClient(address='localhost', port='2020') + monitoring_client = MonitoringClient(server='localhost', port='7070') # instantiate the client + + while True: + if terminate.is_set(): + LOGGER.warning("Stopping execution...") + + break + + # Start Listening Events + events_collector = EventsDeviceCollector(context_client_grpc, monitoring_client) + events_collector.start() + + list_new_kpi_ids = events_collector.listen_events() + + # Monitor Kpis + if bool(list_new_kpi_ids): + for kpi_id in list_new_kpi_ids: + # Create Monitor Kpi Requests + monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() + monitor_kpi_request.kpi_id.CopyFrom(kpi_id) + monitor_kpi_request.sampling_duration_s = 120 + monitor_kpi_request.sampling_interval_s = 5 + + # MonitorKpi(monitor_kpi_request) + + + + def main(): global terminate, logger @@ -42,6 +77,8 @@ def main(): grpc_service = MonitoringService(port=service_port, max_workers=max_workers, grace_period=grace_period) grpc_service.start() + # start_monitoring() + # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=0.1): pass diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index 145e2c718c12c4798c7955ade023267ffceaca4d..2750a6b421c0f12ae8e124d186baccbd01c72447 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -263,4 +263,7 @@ def test_listen_events(monitoring_client: MonitoringClient, populate('localhost', GRPC_PORT_CONTEXT) # place this call in the appropriate line, according to your tests - events_collector.listen_events() + kpi_id_list = events_collector.listen_events() + + assert bool(kpi_id_list) == True +