From 7b149e7d69bbe7fa5180d484300be8c0b0972756 Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Wed, 2 Nov 2022 09:31:47 +0100 Subject: [PATCH 01/20] Added new KPI Sample Types --- proto/kpi_sample_types.proto | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/proto/kpi_sample_types.proto b/proto/kpi_sample_types.proto index 7445a0f25..f381dc95a 100644 --- a/proto/kpi_sample_types.proto +++ b/proto/kpi_sample_types.proto @@ -16,9 +16,16 @@ syntax = "proto3"; package kpi_sample_types; enum KpiSampleType { - KPISAMPLETYPE_UNKNOWN = 0; - KPISAMPLETYPE_PACKETS_TRANSMITTED = 101; - KPISAMPLETYPE_PACKETS_RECEIVED = 102; - KPISAMPLETYPE_BYTES_TRANSMITTED = 201; - KPISAMPLETYPE_BYTES_RECEIVED = 202; + KPISAMPLETYPE_UNKNOWN = 0; + KPISAMPLETYPE_PACKETS_TRANSMITTED = 101; + KPISAMPLETYPE_PACKETS_RECEIVED = 102; + KPISAMPLETYPE_PACKETS_DROPPED = 103; + KPISAMPLETYPE_BYTES_TRANSMITTED = 201; + KPISAMPLETYPE_BYTES_RECEIVED = 202; + KPISAMPLETYPE_BYTES_DROPPED = 203; + KPISAMPLETYPE_ML_CONFIDENCE = 401; //. can be used by both optical and L3 without any issue + KPISAMPLETYPE_SECURITY_STATUS_SERVICE = 501; //. can be used by both optical and L3 without any issue + KPISAMPLETYPE_L3_UNIQUE_ATTACK_CONNS = 601; + KPISAMPLETYPE_L3_TOTAL_DROPPED_PACKTS = 602; + KPISAMPLETYPE_SERVICE_LATENCY_MS = 701; } -- GitLab From 0f6606e26f89bd498252c29050bf22cbbcef4b52 Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Wed, 2 Nov 2022 17:32:52 +0100 Subject: [PATCH 02/20] Added new KPI Sample Types for Scenario 3 --- proto/kpi_sample_types.proto | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/proto/kpi_sample_types.proto b/proto/kpi_sample_types.proto index f381dc95a..81492bbdb 100644 --- a/proto/kpi_sample_types.proto +++ b/proto/kpi_sample_types.proto @@ -16,16 +16,19 @@ syntax = "proto3"; package kpi_sample_types; enum KpiSampleType { - KPISAMPLETYPE_UNKNOWN = 0; - KPISAMPLETYPE_PACKETS_TRANSMITTED = 101; - KPISAMPLETYPE_PACKETS_RECEIVED = 102; - KPISAMPLETYPE_PACKETS_DROPPED = 103; - KPISAMPLETYPE_BYTES_TRANSMITTED = 201; - KPISAMPLETYPE_BYTES_RECEIVED = 202; - KPISAMPLETYPE_BYTES_DROPPED = 203; - KPISAMPLETYPE_ML_CONFIDENCE = 401; //. can be used by both optical and L3 without any issue - KPISAMPLETYPE_SECURITY_STATUS_SERVICE = 501; //. can be used by both optical and L3 without any issue - KPISAMPLETYPE_L3_UNIQUE_ATTACK_CONNS = 601; - KPISAMPLETYPE_L3_TOTAL_DROPPED_PACKTS = 602; - KPISAMPLETYPE_SERVICE_LATENCY_MS = 701; -} + KPISAMPLETYPE_UNKNOWN = 0; + KPISAMPLETYPE_PACKETS_TRANSMITTED = 101; + KPISAMPLETYPE_PACKETS_RECEIVED = 102; + KPISAMPLETYPE_PACKETS_DROPPED = 103; + KPISAMPLETYPE_BYTES_TRANSMITTED = 201; + KPISAMPLETYPE_BYTES_RECEIVED = 202; + KPISAMPLETYPE_BYTES_DROPPED = 203; + KPISAMPLETYPE_ML_CONFIDENCE = 401; //. can be used by both optical and L3 without any issue + KPISAMPLETYPE_OPTICAL_SECURITY_STATUS = 501; //. can be used by both optical and L3 without any issue + KPISAMPLETYPE_L3_UNIQUE_ATTACK_CONNS = 601; + KPISAMPLETYPE_L3_TOTAL_DROPPED_PACKTS = 602; + KPISAMPLETYPE_L3_UNIQUE_ATTACKERS = 603; + KPISAMPLETYPE_L3_UNIQUE_COMPROMISED_CLIENTS = 604; + KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO = 605; + KPISAMPLETYPE_SERVICE_LATENCY_MS = 701; +} º -- GitLab From b2bc69eab4a340cf4e9e4cb76b919c000edaf070 Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Wed, 2 Nov 2022 17:34:29 +0100 Subject: [PATCH 03/20] Minor typo --- proto/kpi_sample_types.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proto/kpi_sample_types.proto b/proto/kpi_sample_types.proto index 81492bbdb..3494d9849 100644 --- a/proto/kpi_sample_types.proto +++ b/proto/kpi_sample_types.proto @@ -31,4 +31,4 @@ enum KpiSampleType { KPISAMPLETYPE_L3_UNIQUE_COMPROMISED_CLIENTS = 604; KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO = 605; KPISAMPLETYPE_SERVICE_LATENCY_MS = 701; -} º +} -- GitLab From b7396f378bdbf2bf298001f412efb919dcb54794 Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Thu, 3 Nov 2022 12:30:13 +0100 Subject: [PATCH 04/20] Added ConnectionID to the KpiDescriptor --- proto/monitoring.proto | 1 + src/monitoring/service/ManagementDBTools.py | 9 +++++---- src/monitoring/service/MetricsDBTools.py | 6 ++++-- src/monitoring/service/MonitoringServiceServicerImpl.py | 8 ++++++-- src/monitoring/tests/Messages.py | 3 +++ src/monitoring/tests/test_unitary.py | 9 ++++++--- 6 files changed, 25 insertions(+), 11 deletions(-) diff --git a/proto/monitoring.proto b/proto/monitoring.proto index 9be39db90..c0e2dd877 100644 --- a/proto/monitoring.proto +++ b/proto/monitoring.proto @@ -48,6 +48,7 @@ message KpiDescriptor { context.EndPointId endpoint_id = 6; context.ServiceId service_id = 7; context.SliceId slice_id = 8; + context.ConnectionId connection_id = 9; } message MonitorKpiRequest { diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py index 2387ddde0..2533707a6 100644 --- a/src/monitoring/service/ManagementDBTools.py +++ b/src/monitoring/service/ManagementDBTools.py @@ -38,7 +38,8 @@ class ManagementDB(): kpi_sample_type INTEGER, device_id INTEGER, endpoint_id INTEGER, - service_id INTEGER + service_id INTEGER, + connection_id INTEGER ); """) LOGGER.debug("KPI table created in the ManagementDB") @@ -84,13 +85,13 @@ class ManagementDB(): LOGGER.debug(f"Alarm table cannot be created in the ManagementDB. {e}") raise Exception - def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id): + def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id, connection_id): try: c = self.client.cursor() - c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id)) + c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id, connection_id)) data=c.fetchone() if data is None: - c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id)) + c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id, connection_id)) self.client.commit() kpi_id = c.lastrowid LOGGER.debug(f"KPI {kpi_id} succesfully inserted in the ManagementDB") diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index 16e6373f5..f5dd3ad2a 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -87,6 +87,7 @@ class MetricsDB(): 'device_id SYMBOL,' \ 'endpoint_id SYMBOL,' \ 'service_id SYMBOL,' \ + 'connection_id SYMBOL,' \ 'timestamp TIMESTAMP,' \ 'kpi_value DOUBLE)' \ 'TIMESTAMP(timestamp);' @@ -97,7 +98,7 @@ class MetricsDB(): LOGGER.debug(f"Table {self.table} cannot be created. {e}") raise Exception - def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, kpi_value): + def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id,connection_id, kpi_value): counter = 0 while (counter < self.retries): try: @@ -109,7 +110,8 @@ class MetricsDB(): 'kpi_sample_type': kpi_sample_type, 'device_id': device_id, 'endpoint_id': endpoint_id, - 'service_id': service_id}, + 'service_id': service_id, + 'connection_id': connection_id,}, columns={ 'kpi_value': kpi_value}, at=datetime.datetime.fromtimestamp(time)) diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 7cd47f187..48dfeb823 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -85,13 +85,14 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpi_device_id = request.device_id.device_uuid.uuid kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid kpi_service_id = request.service_id.service_uuid.uuid + kpi_connection_id = request.connection_id.connection_id.uuid if request.kpi_id.kpi_id.uuid is not "": response.kpi_id.uuid = request.kpi_id.kpi_id.uuid # Here the code to modify an existing kpi else: data = self.management_db.insert_KPI( - kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_connection_id) response.kpi_id.uuid = str(data) return response @@ -136,6 +137,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) + kpiDescriptor.connection_id.connection_id.uuid = str(kpi_db[6]) return kpiDescriptor except ServiceException as e: LOGGER.exception('GetKpiDescriptor exception') @@ -160,6 +162,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpi_descriptor.device_id.device_uuid.uuid = str(item[3]) kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4]) kpi_descriptor.service_id.service_uuid.uuid = str(item[5]) + kpi_descriptor.connection_id.connection_id.uuid = str(item[6]) kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor) @@ -186,11 +189,12 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): deviceId = kpiDescriptor.device_id.device_uuid.uuid endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid serviceId = kpiDescriptor.service_id.service_uuid.uuid + connectionId = kpiDescriptor.connection_id.connection_id.uuid time_stamp = request.timestamp.timestamp kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) # Build the structure to be included as point in the MetricsDB - self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, kpi_value) + self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, connectionId, kpi_value) return Empty() except ServiceException as e: diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index 845153856..7c10ea44a 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -30,6 +30,7 @@ def create_kpi_request(): _create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV1' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_id.uuid = 'CON1' # pylint: disable=maybe-no-member return _create_kpi_request def create_kpi_request_b(): @@ -39,6 +40,7 @@ def create_kpi_request_b(): _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_id.uuid = 'CON2' # pylint: disable=maybe-no-member return _create_kpi_request def create_kpi_request_c(): @@ -48,6 +50,7 @@ def create_kpi_request_c(): _create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_id.uuid = 'CON3' # pylint: disable=maybe-no-member return _create_kpi_request def monitor_kpi_request(kpi_uuid, monitoring_window_s, sampling_rate_s): diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index ee6a29e8a..290431af5 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -395,8 +395,9 @@ def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefin kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + kpi_connection_id = _create_kpi_request.connection_id.connection_id.uuid - response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_connection_id) assert isinstance(response, int) def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined-outer-name @@ -407,8 +408,9 @@ def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined- kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + kpi_connection_id = _create_kpi_request.connection_id.connection_id.uuid - _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_connection_id) response = management_db.get_KPI(_kpi_id) assert isinstance(response, tuple) @@ -426,9 +428,10 @@ def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefin kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + kpi_connection_id = _create_kpi_request.connection_id.connection_id.uuid _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, - kpi_service_id) + kpi_service_id, kpi_connection_id) response = management_db.delete_KPI(_kpi_id) -- GitLab From b61629b3ef18b7832d128208da49fe7ce6b4fa96 Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Thu, 3 Nov 2022 13:02:59 +0100 Subject: [PATCH 05/20] ConnectionID issues solved --- src/monitoring/service/ManagementDBTools.py | 6 +++--- .../service/MonitoringServiceServicerImpl.py | 10 ++++++---- src/monitoring/tests/Messages.py | 6 +++--- src/monitoring/tests/test_unitary.py | 6 +++--- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py index 2533707a6..bf36d3517 100644 --- a/src/monitoring/service/ManagementDBTools.py +++ b/src/monitoring/service/ManagementDBTools.py @@ -85,13 +85,13 @@ class ManagementDB(): LOGGER.debug(f"Alarm table cannot be created in the ManagementDB. {e}") raise Exception - def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id, connection_id): + def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,connection_id): try: c = self.client.cursor() - c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id, connection_id)) + c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ? AND connection_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id, connection_id)) data=c.fetchone() if data is None: - c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id, connection_id)) + c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,connection_id) VALUES (?,?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id, connection_id)) self.client.commit() kpi_id = c.lastrowid LOGGER.debug(f"KPI {kpi_id} succesfully inserted in the ManagementDB") diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 48dfeb823..f26819063 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -85,7 +85,9 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpi_device_id = request.device_id.device_uuid.uuid kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid kpi_service_id = request.service_id.service_uuid.uuid - kpi_connection_id = request.connection_id.connection_id.uuid + kpi_connection_id = request.connection_id.connection_uuid.uuid + + LOGGER.debug(kpi_connection_id) if request.kpi_id.kpi_id.uuid is not "": response.kpi_id.uuid = request.kpi_id.kpi_id.uuid @@ -137,7 +139,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) - kpiDescriptor.connection_id.connection_id.uuid = str(kpi_db[6]) + kpiDescriptor.connection_id.connection_uuid.uuid = str(kpi_db[6]) return kpiDescriptor except ServiceException as e: LOGGER.exception('GetKpiDescriptor exception') @@ -162,7 +164,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpi_descriptor.device_id.device_uuid.uuid = str(item[3]) kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4]) kpi_descriptor.service_id.service_uuid.uuid = str(item[5]) - kpi_descriptor.connection_id.connection_id.uuid = str(item[6]) + kpi_descriptor.connection_id.connection_uuid.uuid = str(item[6]) kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor) @@ -189,7 +191,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): deviceId = kpiDescriptor.device_id.device_uuid.uuid endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid serviceId = kpiDescriptor.service_id.service_uuid.uuid - connectionId = kpiDescriptor.connection_id.connection_id.uuid + connectionId = kpiDescriptor.connection_id.connection_uuid.uuid time_stamp = request.timestamp.timestamp kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index 7c10ea44a..534810f4b 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -30,7 +30,7 @@ def create_kpi_request(): _create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV1' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member - _create_kpi_request.connection_id.connection_id.uuid = 'CON1' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' # pylint: disable=maybe-no-member return _create_kpi_request def create_kpi_request_b(): @@ -40,7 +40,7 @@ def create_kpi_request_b(): _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member - _create_kpi_request.connection_id.connection_id.uuid = 'CON2' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member return _create_kpi_request def create_kpi_request_c(): @@ -50,7 +50,7 @@ def create_kpi_request_c(): _create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member - _create_kpi_request.connection_id.connection_id.uuid = 'CON3' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON3' # pylint: disable=maybe-no-member return _create_kpi_request def monitor_kpi_request(kpi_uuid, monitoring_window_s, sampling_rate_s): diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index 290431af5..acbb10fd8 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -395,7 +395,7 @@ def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefin kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member - kpi_connection_id = _create_kpi_request.connection_id.connection_id.uuid + kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_connection_id) assert isinstance(response, int) @@ -408,7 +408,7 @@ def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined- kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member - kpi_connection_id = _create_kpi_request.connection_id.connection_id.uuid + kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_connection_id) response = management_db.get_KPI(_kpi_id) @@ -428,7 +428,7 @@ def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefin kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member - kpi_connection_id = _create_kpi_request.connection_id.connection_id.uuid + kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_connection_id) -- GitLab From e0bd3455c4d14db75cad8cdb5cf61c18d25f8e53 Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Thu, 3 Nov 2022 13:43:44 +0100 Subject: [PATCH 06/20] Added sliceID field to the DBs --- src/monitoring/service/ManagementDBTools.py | 7 ++-- src/monitoring/service/MetricsDBTools.py | 4 ++- .../service/MonitoringServiceServicerImpl.py | 35 ++++++++++--------- src/monitoring/tests/Messages.py | 3 ++ src/monitoring/tests/test_unitary.py | 9 +++-- 5 files changed, 35 insertions(+), 23 deletions(-) diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py index bf36d3517..ae58ffe85 100644 --- a/src/monitoring/service/ManagementDBTools.py +++ b/src/monitoring/service/ManagementDBTools.py @@ -39,6 +39,7 @@ class ManagementDB(): device_id INTEGER, endpoint_id INTEGER, service_id INTEGER, + slice_id INTEGER, connection_id INTEGER ); """) @@ -85,13 +86,13 @@ class ManagementDB(): LOGGER.debug(f"Alarm table cannot be created in the ManagementDB. {e}") raise Exception - def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,connection_id): + def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,slice_id,connection_id): try: c = self.client.cursor() - c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ? AND connection_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id, connection_id)) + c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ? AND slice_id is ? AND connection_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id,slice_id,connection_id)) data=c.fetchone() if data is None: - c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,connection_id) VALUES (?,?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id, connection_id)) + c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,slice_id,connection_id) VALUES (?,?,?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,slice_id,connection_id)) self.client.commit() kpi_id = c.lastrowid LOGGER.debug(f"KPI {kpi_id} succesfully inserted in the ManagementDB") diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index f5dd3ad2a..76ffc7815 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -87,6 +87,7 @@ class MetricsDB(): 'device_id SYMBOL,' \ 'endpoint_id SYMBOL,' \ 'service_id SYMBOL,' \ + 'slice_id SYMBOL,' \ 'connection_id SYMBOL,' \ 'timestamp TIMESTAMP,' \ 'kpi_value DOUBLE)' \ @@ -98,7 +99,7 @@ class MetricsDB(): LOGGER.debug(f"Table {self.table} cannot be created. {e}") raise Exception - def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id,connection_id, kpi_value): + def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, slice_id, connection_id, kpi_value): counter = 0 while (counter < self.retries): try: @@ -111,6 +112,7 @@ class MetricsDB(): 'device_id': device_id, 'endpoint_id': endpoint_id, 'service_id': service_id, + 'slice_id': slice_id, 'connection_id': connection_id,}, columns={ 'kpi_value': kpi_value}, diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index f26819063..757fee8ae 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -85,16 +85,16 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpi_device_id = request.device_id.device_uuid.uuid kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid kpi_service_id = request.service_id.service_uuid.uuid + kpi_slice_id = request.slice_id.slice_uuid.uuid kpi_connection_id = request.connection_id.connection_uuid.uuid - LOGGER.debug(kpi_connection_id) if request.kpi_id.kpi_id.uuid is not "": response.kpi_id.uuid = request.kpi_id.kpi_id.uuid # Here the code to modify an existing kpi else: data = self.management_db.insert_KPI( - kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_connection_id) + kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_slice_id, kpi_connection_id) response.kpi_id.uuid = str(data) return response @@ -134,12 +134,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): if kpi_db is None: LOGGER.info('GetKpiDescriptor error: KpiID({:s}): not found in database'.format(str(kpi_id))) else: - kpiDescriptor.kpi_description = kpi_db[1] - kpiDescriptor.kpi_sample_type = kpi_db[2] - kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) - kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) - kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) - kpiDescriptor.connection_id.connection_uuid.uuid = str(kpi_db[6]) + kpiDescriptor.kpi_description = kpi_db[1] + kpiDescriptor.kpi_sample_type = kpi_db[2] + kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) + kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) + kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) + kpiDescriptor.slice_id.slice_uuid.uuid = str(kpi_db[6]) + kpiDescriptor.connection_id.connection_uuid.uuid = str(kpi_db[7]) return kpiDescriptor except ServiceException as e: LOGGER.exception('GetKpiDescriptor exception') @@ -158,13 +159,14 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): for item in data: kpi_descriptor = KpiDescriptor() - kpi_descriptor.kpi_id.kpi_id.uuid = str(item[0]) - kpi_descriptor.kpi_description = item[1] - kpi_descriptor.kpi_sample_type = item[2] - kpi_descriptor.device_id.device_uuid.uuid = str(item[3]) - kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4]) - kpi_descriptor.service_id.service_uuid.uuid = str(item[5]) - kpi_descriptor.connection_id.connection_uuid.uuid = str(item[6]) + kpi_descriptor.kpi_id.kpi_id.uuid = str(item[0]) + kpi_descriptor.kpi_description = item[1] + kpi_descriptor.kpi_sample_type = item[2] + kpi_descriptor.device_id.device_uuid.uuid = str(item[3]) + kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4]) + kpi_descriptor.service_id.service_uuid.uuid = str(item[5]) + kpi_descriptor.slice_id.slice_uuid.uuid = str(item[6]) + kpi_descriptor.connection_id.connection_uuid.uuid = str(item[7]) kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor) @@ -191,12 +193,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): deviceId = kpiDescriptor.device_id.device_uuid.uuid endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid serviceId = kpiDescriptor.service_id.service_uuid.uuid + sliceId = kpiDescriptor.slice_id.slice_uuid.uuid connectionId = kpiDescriptor.connection_id.connection_uuid.uuid time_stamp = request.timestamp.timestamp kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) # Build the structure to be included as point in the MetricsDB - self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, connectionId, kpi_value) + self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, kpi_value) return Empty() except ServiceException as e: diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index 534810f4b..92e388ae8 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -29,6 +29,7 @@ def create_kpi_request(): _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED _create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV1' # pylint: disable=maybe-no-member + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' # pylint: disable=maybe-no-member return _create_kpi_request @@ -39,6 +40,7 @@ def create_kpi_request_b(): _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC2' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member return _create_kpi_request @@ -49,6 +51,7 @@ def create_kpi_request_c(): _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member _create_kpi_request.connection_id.connection_uuid.uuid = 'CON3' # pylint: disable=maybe-no-member return _create_kpi_request diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index acbb10fd8..ba41511d2 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -395,9 +395,10 @@ def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefin kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid - response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_connection_id) + response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_slice_id,kpi_connection_id) assert isinstance(response, int) def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined-outer-name @@ -408,9 +409,10 @@ def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined- kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid - _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_connection_id) + _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_slice_id,kpi_connection_id) response = management_db.get_KPI(_kpi_id) assert isinstance(response, tuple) @@ -428,10 +430,11 @@ def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefin kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, - kpi_service_id, kpi_connection_id) + kpi_service_id, kpi_slice_id, kpi_connection_id) response = management_db.delete_KPI(_kpi_id) -- GitLab From 9a69ab50f115d2c127b804e394a6f3b9e129c635 Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Mon, 7 Nov 2022 12:51:05 +0100 Subject: [PATCH 07/20] Improving unit tests and solving issues with Subscription and Alarm Managers (not finished) --- .../service/MonitoringServiceServicerImpl.py | 30 +-- src/monitoring/tests/Messages.py | 22 ++- src/monitoring/tests/test_unitary.py | 180 ++++++++++++++---- 3 files changed, 173 insertions(+), 59 deletions(-) diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 757fee8ae..927eedf51 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -28,7 +28,7 @@ from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmLis KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \ MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse from common.rpc_method_wrapper.ServiceExceptions import ServiceException -from common.tools.timestamp.Converters import timestamp_string_to_float +from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float from monitoring.service import ManagementDBTools, MetricsDBTools from device.client.DeviceClient import DeviceClient @@ -277,14 +277,17 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): start_timestamp, end_timestamp) # parse queue to append kpis into the list - while not subs_queue.empty(): - list = subs_queue.get_nowait() - for item in list: - kpi = Kpi() - kpi.kpi_id.kpi_id.uuid = str(item[0]) - kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) - kpi.kpi_value.floatVal = item[2] # This must be improved - subs_response.kpi_list.kpi.append(kpi) + while True: + while not subs_queue.empty(): + list = subs_queue.get_nowait() + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = str(item[0]) + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] # This must be improved + subs_response.kpi_list.kpi.append(kpi) + if timestamp_utcnow_to_float() > end_timestamp: + break subs_response.subs_id.subs_id.uuid = str(subs_id) @@ -433,6 +436,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('GetAlarmDescriptor') try: alarm_id = request.alarm_id.uuid + LOGGER.debug(alarm_id) alarm = self.management_db.get_alarm(alarm_id) response = AlarmDescriptor() @@ -463,15 +467,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('GetAlarmResponseStream') try: alarm_id = request.alarm_id.alarm_id.uuid - alarm = self.management_db.get_alarm(alarm_id) + alarm_data = self.management_db.get_alarm(alarm_id) alarm_response = AlarmResponse() - if alarm: - + if alarm_data: + LOGGER.debug(f"{alarm_data}") alarm_queue = Queue() - alarm_data = self.management_db.get_alarm(alarm) - alarm_id = request.alarm_id.alarm_id.uuid kpi_id = alarm_data[3] kpiMinValue = alarm_data[4] diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index 92e388ae8..23e0867c1 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -83,12 +83,18 @@ def kpi_query(): def subs_descriptor(kpi_id): _subs_descriptor = monitoring_pb2.SubsDescriptor() + sampling_duration_s = 20 + sampling_interval_s = 3 + real_start_time = timestamp_utcnow_to_float() + start_timestamp = real_start_time + end_timestamp = start_timestamp + sampling_duration_s + _subs_descriptor.subs_id.subs_id.uuid = "" _subs_descriptor.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid - _subs_descriptor.sampling_duration_s = 10 - _subs_descriptor.sampling_interval_s = 2 - _subs_descriptor.start_timestamp.timestamp = timestamp_utcnow_to_float() - _subs_descriptor.end_timestamp.timestamp = timestamp_utcnow_to_float() + 10 + _subs_descriptor.sampling_duration_s = sampling_duration_s + _subs_descriptor.sampling_interval_s = sampling_interval_s + _subs_descriptor.start_timestamp.timestamp = start_timestamp + _subs_descriptor.end_timestamp.timestamp = end_timestamp return _subs_descriptor @@ -97,14 +103,14 @@ def subs_id(): return _subs_id -def alarm_descriptor(): +def alarm_descriptor(kpi_id): _alarm_descriptor = monitoring_pb2.AlarmDescriptor() _alarm_descriptor.alarm_description = "Alarm Description" _alarm_descriptor.name = "Alarm Name" - _alarm_descriptor.kpi_id.kpi_id.uuid = "1" + _alarm_descriptor.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid _alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = 0.0 - _alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = 50.0 + _alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = 250.0 _alarm_descriptor.kpi_value_range.inRange = True _alarm_descriptor.kpi_value_range.includeMinValue = False _alarm_descriptor.kpi_value_range.includeMaxValue = True @@ -121,7 +127,7 @@ def alarm_descriptor_b(): def alarm_subscription(alarm_id): _alarm_descriptor = monitoring_pb2.AlarmSubscription() - _alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm_id) + _alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm_id.alarm_id.uuid) return _alarm_descriptor diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index ba41511d2..33e9d3224 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -15,6 +15,8 @@ import copy, os, pytest import threading import time +from queue import Queue +from random import random from time import sleep from typing import Tuple @@ -34,6 +36,7 @@ from common.proto import monitoring_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \ AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse +from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_string_to_float from context.client.ContextClient import ContextClient from context.service.grpc_server.ContextService import ContextService @@ -43,6 +46,9 @@ from device.client.DeviceClient import DeviceClient from device.service.DeviceService import DeviceService from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache +from monitoring.service.AlarmManager import AlarmManager +from monitoring.service.MetricsDBTools import MetricsDB +from monitoring.service.SubscriptionManager import SubscriptionManager os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE' from device.service.drivers import DRIVERS # pylint: disable=wrong-import-position @@ -175,14 +181,23 @@ def subs_scheduler(): return _scheduler -def ingestion_data(monitoring_client): - _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - _include_kpi_request = include_kpi_request(_kpi_id) +def ingestion_data(): + metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring") for i in range(200): - _include_kpi_request = include_kpi_request(_kpi_id) - monitoring_client.IncludeKpi(_include_kpi_request) - time.sleep(0.01) + kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') + kpiId = "3" + deviceId = 'DEV3' + endpointId = 'END3' + serviceId = 'SERV3' + sliceId = 'SLC3' + connectionId = 'CON3' + time_stamp = timestamp_utcnow_to_float() + kpi_value = 500*random() + + metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, + kpi_value) + sleep(0.05) ########################### # Tests Implementation @@ -274,28 +289,13 @@ def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-na LOGGER.debug(str(response)) assert isinstance(response, KpiList) -def test_ingestion_data(monitoring_client): - _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - _include_kpi_request = include_kpi_request(_kpi_id) - - for i in range(100): - _include_kpi_request = include_kpi_request(_kpi_id) - monitoring_client.IncludeKpi(_include_kpi_request) - time.sleep(0.01) - -# def test_subscription_scheduler(monitoring_client,metrics_db,subs_scheduler): -# subs_scheduler.add_job(ingestion_data(monitoring_client),id="1") - # Test case that makes use of client fixture to test server's SetKpiSubscription method -def test_set_kpi_subscription(monitoring_client,metrics_db): # pylint: disable=redefined-outer-name +def test_set_kpi_subscription(monitoring_client,metrics_db,subs_scheduler): # pylint: disable=redefined-outer-name LOGGER.warning('test_set_kpi_subscription') kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - # thread = threading.Thread(target=test_ingestion_data, args=(monitoring_client,metrics_db)) - # thread.start() - monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) + subs_scheduler.add_job(ingestion_data) response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) assert isinstance(response, _MultiThreadedRendezvous) - LOGGER.debug(response) for item in response: LOGGER.debug(item) assert isinstance(item, SubsResponse) @@ -331,7 +331,8 @@ def test_delete_subscription(monitoring_client): # Test case that makes use of client fixture to test server's SetKpiAlarm method def test_set_kpi_alarm(monitoring_client): LOGGER.warning('test_set_kpi_alarm') - response = monitoring_client.SetKpiAlarm(alarm_descriptor()) + kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + response = monitoring_client.SetKpiAlarm(alarm_descriptor(kpi_id)) LOGGER.debug(str(response)) assert isinstance(response, AlarmID) @@ -345,28 +346,31 @@ def test_get_alarms(monitoring_client): # Test case that makes use of client fixture to test server's GetAlarmDescriptor method def test_get_alarm_descriptor(monitoring_client): LOGGER.warning('test_get_alarm_descriptor') - alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) - response = monitoring_client.GetAlarmDescriptor(alarm_id) - LOGGER.debug(response) - assert isinstance(response, AlarmDescriptor) + _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + _response = monitoring_client.GetAlarmDescriptor(_alarm_id) + LOGGER.debug(_response) + assert isinstance(_response, AlarmDescriptor) # Test case that makes use of client fixture to test server's GetAlarmResponseStream method def test_get_alarm_response_stream(monitoring_client): LOGGER.warning('test_get_alarm_descriptor') - alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) - response = monitoring_client.GetAlarmResponseStream(alarm_subscription(alarm_id)) - assert isinstance(response, _MultiThreadedRendezvous) - for item in response: - LOGGER.debug(response) + _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + _response = monitoring_client.GetAlarmResponseStream(alarm_subscription(_alarm_id)) + assert isinstance(_response, _MultiThreadedRendezvous) + for item in _response: + LOGGER.debug(_response) assert isinstance(item,AlarmResponse) # Test case that makes use of client fixture to test server's DeleteAlarm method def test_delete_alarm(monitoring_client): LOGGER.warning('test_delete_alarm') - alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) - response = monitoring_client.DeleteAlarm(alarm_id) - LOGGER.debug(type(response)) - assert isinstance(response, Empty) + _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + _response = monitoring_client.DeleteAlarm(_alarm_id) + LOGGER.debug(type(_response)) + assert isinstance(_response, Empty) # Test case that makes use of client fixture to test server's GetStreamKpi method def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name @@ -440,14 +444,116 @@ def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefin assert response +def test_managementdb_tools_insert_alarm(management_db): + LOGGER.warning('test_managementdb_tools_insert_alarm begin') + + _alarm_description = "Alarm Description" + _alarm_name = "Alarm Name" + _kpi_id = "3" + _kpi_min_value = 0.0 + _kpi_max_value = 250.0 + _in_range = True + _include_min_value = False + _include_max_value = True + + _alarm_id = management_db.insert_alarm(_alarm_description, _alarm_name, _kpi_id, _kpi_min_value, + _kpi_max_value, + _in_range, _include_min_value, _include_max_value) + LOGGER.debug(_alarm_id) + assert isinstance(_alarm_id,int) + def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_metric_sdb_tools_write_kpi begin') + for i in range(200): + kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') + kpiId = "3" + deviceId = 'DEV3' + endpointId = 'END3' + serviceId = 'SERV3' + sliceId = 'SLC3' + connectionId = 'CON3' + time_stamp = timestamp_utcnow_to_float() + kpi_value = 500*random() + + metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, + kpi_value) + sleep(0.05) def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_metrics_db_tools_read_kpi_points begin') +def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler): + LOGGER.warning('test_subscription_manager_create_subscription begin') + subs_queue = Queue() + + subs_manager = SubscriptionManager(metrics_db) + + subs_scheduler.add_job(ingestion_data) + + kpi_id = "3" + sampling_duration_s = 20 + sampling_interval_s = 3 + real_start_time = timestamp_utcnow_to_float() + start_timestamp = real_start_time + end_timestamp = start_timestamp + sampling_duration_s + + subs_id = management_db.insert_subscription(kpi_id, "localhost", sampling_duration_s, + sampling_interval_s,start_timestamp,end_timestamp) + subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s, + sampling_duration_s,start_timestamp,end_timestamp) + + # This is here to simulate application activity (which keeps the main thread alive). + total_points = 0 + while True: + while not subs_queue.empty(): + list = subs_queue.get_nowait() + kpi_list = KpiList() + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = item[0] + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] + kpi_list.kpi.append(kpi) + total_points += 1 + LOGGER.debug(kpi_list) + if timestamp_utcnow_to_float() > end_timestamp: + break + + assert total_points != 0 + +def test_alarm_manager_create_alarm(management_db,metrics_db): + LOGGER.warning('test_alarm_manager_create_alarm begin') + + _alarm_description = "Alarm Description" + _alarm_name = "Alarm Name" + _kpi_id = "3" + _kpi_min_value = 0.0 + _kpi_max_value = 250.0 + _in_range = True + _include_min_value = False + _include_max_value = True + _subscription_frequency_ms = 10 + + _alarm_id = management_db.insert_alarm(_alarm_description, _alarm_name, _kpi_id, _kpi_min_value, + _kpi_max_value, + _in_range, _include_min_value, _include_max_value) + + LOGGER.debug(f"alarm_id: {_alarm_id}") + _alarm_queue = Queue() + _alarm_manager = AlarmManager(metrics_db) + + _alarm_manager.create_alarm(_alarm_queue,str(_alarm_id),_kpi_id,_kpi_min_value,_kpi_max_value,_in_range,_include_min_value,_include_max_value,_subscription_frequency_ms) + + LOGGER.debug(_alarm_queue) + + while not _alarm_queue.empty(): + list = _alarm_queue.get_nowait() + LOGGER.debug(list) + for item in list: + LOGGER.debug(item) + def test_events_tools( context_client : ContextClient, # pylint: disable=redefined-outer-name -- GitLab From d3b14a1dc674387a750f1a6bbe1048199fcf5a5c Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Mon, 7 Nov 2022 15:15:13 +0100 Subject: [PATCH 08/20] Issues with subscription and alarm managers solved --- src/monitoring/service/AlarmManager.py | 12 ++++-- src/monitoring/service/MetricsDBTools.py | 12 +++--- .../service/MonitoringServiceServicerImpl.py | 42 ++++++++++--------- src/monitoring/service/SubscriptionManager.py | 7 +--- src/monitoring/tests/Messages.py | 11 +++-- src/monitoring/tests/test_unitary.py | 7 ++-- 6 files changed, 53 insertions(+), 38 deletions(-) diff --git a/src/monitoring/service/AlarmManager.py b/src/monitoring/service/AlarmManager.py index e5ac8915c..27b169b16 100644 --- a/src/monitoring/service/AlarmManager.py +++ b/src/monitoring/service/AlarmManager.py @@ -1,3 +1,4 @@ +import pytz from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.jobstores.base import JobLookupError @@ -19,9 +20,14 @@ class AlarmManager(): end_date=None if subscription_timeout_s: start_timestamp=time.time() - start_date=datetime.fromtimestamp(start_timestamp) - end_date=datetime.fromtimestamp(start_timestamp+subscription_timeout_s) - self.scheduler.add_job(self.metrics_db.get_alarm_data, args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, end_date=end_date, id=alarm_id) + end_timestamp = start_timestamp + subscription_timeout_s + start_date = datetime.utcfromtimestamp(start_timestamp).isoformat() + end_date = datetime.utcfromtimestamp(end_timestamp).isoformat() + + self.scheduler.add_job(self.metrics_db.get_alarm_data, + args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms), + trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, + end_date=end_date,timezone=pytz.utc, id=str(alarm_id)) LOGGER.debug(f"Alarm job {alarm_id} succesfully created") def delete_alarm(self, alarm_id): diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index 76ffc7815..0f41cfee1 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -205,6 +205,8 @@ class MetricsDB(): kpi_list = self.run_query(query) if kpi_list: LOGGER.debug(f"New data received for alarm of KPI {kpi_id}") + LOGGER.info(kpi_list) + valid_kpi_list = [] for kpi in kpi_list: alarm = False kpi_value = kpi[2] @@ -267,10 +269,10 @@ class MetricsDB(): if (kpi_value >= kpiMaxValue): alarm = True if alarm: - # queue.append[kpi] - alarm_queue.put_nowait(kpi) - LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") - else: - LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") + valid_kpi_list.append(kpi) + alarm_queue.put_nowait(valid_kpi_list) + LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") + else: + LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") except (Exception) as e: LOGGER.debug(f"Alarm data cannot be retrieved. {e}") \ No newline at end of file diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 927eedf51..9f007ae86 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -259,9 +259,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('SubscribeKpi') try: - subs_queue = Queue() - subs_response = SubsResponse() kpi_id = request.kpi_id.kpi_id.uuid sampling_duration_s = request.sampling_duration_s @@ -279,6 +277,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): # parse queue to append kpis into the list while True: while not subs_queue.empty(): + subs_response = SubsResponse() list = subs_queue.get_nowait() for item in list: kpi = Kpi() @@ -286,12 +285,11 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) kpi.kpi_value.floatVal = item[2] # This must be improved subs_response.kpi_list.kpi.append(kpi) + subs_response.subs_id.subs_id.uuid = str(subs_id) + yield subs_response if timestamp_utcnow_to_float() > end_timestamp: break - - subs_response.subs_id.subs_id.uuid = str(subs_id) - - yield subs_response + # yield subs_response except ServiceException as e: LOGGER.exception('SubscribeKpi exception') grpc_context.abort(e.code, e.details) @@ -468,7 +466,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): try: alarm_id = request.alarm_id.alarm_id.uuid alarm_data = self.management_db.get_alarm(alarm_id) - alarm_response = AlarmResponse() + real_start_time = timestamp_utcnow_to_float() if alarm_data: LOGGER.debug(f"{alarm_data}") @@ -484,24 +482,30 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): subscription_frequency_ms = request.subscription_frequency_ms subscription_timeout_s = request.subscription_timeout_s + end_timestamp = real_start_time + subscription_timeout_s + self.alarm_manager.create_alarm(alarm_queue, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms, subscription_timeout_s) - while not alarm_queue.empty(): - list = alarm_queue.get_nowait() - for item in list: - kpi = Kpi() - kpi.kpi_id.kpi_id.uuid = str(item[0]) - kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) - kpi.kpi_value.floatVal = item[2] # This must be improved - alarm_response.kpi_list.kpi.append(kpi) - - alarm_response.alarm_id.alarm_id.uuid = alarm_id - - yield alarm_response + while True: + while not alarm_queue.empty(): + alarm_response = AlarmResponse() + list = alarm_queue.get_nowait() + size = len(list) + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = str(item[0]) + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] # This must be improved + alarm_response.kpi_list.kpi.append(kpi) + alarm_response.alarm_id.alarm_id.uuid = alarm_id + yield alarm_response + if timestamp_utcnow_to_float() > end_timestamp: + break else: LOGGER.info('GetAlarmResponseStream error: AlarmID({:s}): not found in database'.format(str(alarm_id))) + alarm_response = AlarmResponse() alarm_response.alarm_id.alarm_id.uuid = "NoID" return alarm_response except ServiceException as e: diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index fe27d6ee3..f76cf8c39 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -42,14 +42,11 @@ class SubscriptionManager(): if end_timestamp: end_date = datetime.utcfromtimestamp(end_timestamp).isoformat() - LOGGER.debug(f"kpi_id: {kpi_id}") - LOGGER.debug(f"sampling_interval_s: {sampling_interval_s}") - LOGGER.debug(f"subscription_id: {subscription_id}") - LOGGER.debug(f"start_date: {start_date}") self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s), trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, timezone=pytz.utc, id=str(subscription_id)) LOGGER.debug(f"Subscrition job {subscription_id} succesfully created") def delete_subscription(self, subscription_id): - self.scheduler.remove_job(subscription_id) \ No newline at end of file + self.scheduler.remove_job(subscription_id) + LOGGER.debug(f"Subscription job {subscription_id} succesfully deleted") \ No newline at end of file diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index 23e0867c1..a4c210b61 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -125,11 +125,16 @@ def alarm_descriptor_b(): return _alarm_descriptor def alarm_subscription(alarm_id): - _alarm_descriptor = monitoring_pb2.AlarmSubscription() + _alarm_subscription = monitoring_pb2.AlarmSubscription() - _alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm_id.alarm_id.uuid) + subscription_timeout_s = 20 + subscription_frequency_ms = 3000 - return _alarm_descriptor + _alarm_subscription.alarm_id.alarm_id.uuid = str(alarm_id.alarm_id.uuid) + _alarm_subscription.subscription_timeout_s = subscription_timeout_s + _alarm_subscription.subscription_frequency_ms = subscription_frequency_ms + + return _alarm_subscription def alarm_id(): diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index 33e9d3224..9eff2e5c9 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -197,7 +197,7 @@ def ingestion_data(): metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, kpi_value) - sleep(0.05) + sleep(0.1) ########################### # Tests Implementation @@ -353,14 +353,15 @@ def test_get_alarm_descriptor(monitoring_client): assert isinstance(_response, AlarmDescriptor) # Test case that makes use of client fixture to test server's GetAlarmResponseStream method -def test_get_alarm_response_stream(monitoring_client): +def test_get_alarm_response_stream(monitoring_client,subs_scheduler): LOGGER.warning('test_get_alarm_descriptor') _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + subs_scheduler.add_job(ingestion_data) _response = monitoring_client.GetAlarmResponseStream(alarm_subscription(_alarm_id)) assert isinstance(_response, _MultiThreadedRendezvous) for item in _response: - LOGGER.debug(_response) + LOGGER.debug(item) assert isinstance(item,AlarmResponse) # Test case that makes use of client fixture to test server's DeleteAlarm method -- GitLab From 44b4f388b7cfbd8b7df5dc03d01848a8783d36c6 Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Mon, 7 Nov 2022 15:44:51 +0100 Subject: [PATCH 09/20] Improving some minor tests --- .../service/MonitoringServiceServicerImpl.py | 2 +- src/monitoring/tests/test_unitary.py | 57 +++++++++---------- 2 files changed, 29 insertions(+), 30 deletions(-) diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 9f007ae86..9c88ed311 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -542,7 +542,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpi_db = self.management_db.get_KPI(int(kpi_id)) response = Kpi() if kpi_db is None: - LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) + LOGGER.info('GetStreamKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) response.kpi_id.kpi_id.uuid = "NoID" return response else: diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index 9eff2e5c9..e5e8d310d 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -381,17 +381,16 @@ def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-na assert isinstance(response, _MultiThreadedRendezvous) # Test case that makes use of client fixture to test server's GetInstantKpi method -# def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name -# LOGGER.warning('test_getinstant_kpi begin') -# kpi_id = monitoring_client.SetKpi(KpiId()) -# monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) -# sleep(0.3) -# response = monitoring_client.GetInstantKpi(kpi_id) -# LOGGER.debug(response) -# assert isinstance(response, Kpi) - # response = monitoring_client.GetInstantKpi(KpiId()) - # LOGGER.debug(type(response)) - # assert response.kpi_id.kpi_id.uuid == "NoID" +def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name + LOGGER.warning('test_getinstant_kpi begin') + kpi_id = monitoring_client.SetKpi(KpiId()) + monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) + sleep(0.3) + response = monitoring_client.GetInstantKpi(kpi_id) + LOGGER.debug(response) + assert isinstance(response, Kpi) + + def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_managementdb_tools_insert_kpi begin') _create_kpi_request = create_kpi_request() @@ -463,27 +462,27 @@ def test_managementdb_tools_insert_alarm(management_db): LOGGER.debug(_alarm_id) assert isinstance(_alarm_id,int) -def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name +def test_metrics_db_tools(metrics_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_metric_sdb_tools_write_kpi begin') - - for i in range(200): - kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') - kpiId = "3" - deviceId = 'DEV3' - endpointId = 'END3' - serviceId = 'SERV3' - sliceId = 'SLC3' - connectionId = 'CON3' - time_stamp = timestamp_utcnow_to_float() - kpi_value = 500*random() - - metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, - kpi_value) + _kpiId = "6" + + for i in range(50): + _kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') + _deviceId = 'DEV4' + _endpointId = 'END4' + _serviceId = 'SERV4' + _sliceId = 'SLC4' + _connectionId = 'CON4' + _time_stamp = timestamp_utcnow_to_float() + _kpi_value = 500*random() + + metrics_db.write_KPI(_time_stamp, _kpiId, _kpiSampleType, _deviceId, _endpointId, _serviceId, _sliceId, _connectionId, + _kpi_value) sleep(0.05) -def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_metrics_db_tools_read_kpi_points begin') - + _query = f"SELECT * FROM monitoring WHERE kpi_id ='{_kpiId}'" + _data = metrics_db.run_query(_query) + assert len(_data) >= 50 def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler): LOGGER.warning('test_subscription_manager_create_subscription begin') -- GitLab From b85035e5da6556aeea76a3808a1a60134bbc00fe Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Mon, 7 Nov 2022 16:19:36 +0100 Subject: [PATCH 10/20] Cleaning code --- src/monitoring/tests/Messages.py | 4 ++-- src/monitoring/tests/test_unitary.py | 32 ---------------------------- 2 files changed, 2 insertions(+), 34 deletions(-) diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index a4c210b61..228b1ce42 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -127,8 +127,8 @@ def alarm_descriptor_b(): def alarm_subscription(alarm_id): _alarm_subscription = monitoring_pb2.AlarmSubscription() - subscription_timeout_s = 20 - subscription_frequency_ms = 3000 + subscription_timeout_s = 10 + subscription_frequency_ms = 1000 _alarm_subscription.alarm_id.alarm_id.uuid = str(alarm_id.alarm_id.uuid) _alarm_subscription.subscription_timeout_s = subscription_timeout_s diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index e5e8d310d..d9c83aa5d 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -523,38 +523,6 @@ def test_subscription_manager_create_subscription(management_db,metrics_db,subs_ assert total_points != 0 -def test_alarm_manager_create_alarm(management_db,metrics_db): - LOGGER.warning('test_alarm_manager_create_alarm begin') - - _alarm_description = "Alarm Description" - _alarm_name = "Alarm Name" - _kpi_id = "3" - _kpi_min_value = 0.0 - _kpi_max_value = 250.0 - _in_range = True - _include_min_value = False - _include_max_value = True - _subscription_frequency_ms = 10 - - _alarm_id = management_db.insert_alarm(_alarm_description, _alarm_name, _kpi_id, _kpi_min_value, - _kpi_max_value, - _in_range, _include_min_value, _include_max_value) - - LOGGER.debug(f"alarm_id: {_alarm_id}") - _alarm_queue = Queue() - _alarm_manager = AlarmManager(metrics_db) - - _alarm_manager.create_alarm(_alarm_queue,str(_alarm_id),_kpi_id,_kpi_min_value,_kpi_max_value,_in_range,_include_min_value,_include_max_value,_subscription_frequency_ms) - - LOGGER.debug(_alarm_queue) - - while not _alarm_queue.empty(): - list = _alarm_queue.get_nowait() - LOGGER.debug(list) - for item in list: - LOGGER.debug(item) - - def test_events_tools( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name -- GitLab From 6ae69eb20401bd03a5d5ebb4907d047f04ff4d0f Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Mon, 7 Nov 2022 16:30:39 +0100 Subject: [PATCH 11/20] Last minor changes --- src/monitoring/tests/test_unitary.py | 138 +++++++++++++-------------- 1 file changed, 69 insertions(+), 69 deletions(-) diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index d9c83aa5d..823998899 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -381,14 +381,14 @@ def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-na assert isinstance(response, _MultiThreadedRendezvous) # Test case that makes use of client fixture to test server's GetInstantKpi method -def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name - LOGGER.warning('test_getinstant_kpi begin') - kpi_id = monitoring_client.SetKpi(KpiId()) - monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) - sleep(0.3) - response = monitoring_client.GetInstantKpi(kpi_id) - LOGGER.debug(response) - assert isinstance(response, Kpi) +# def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name +# LOGGER.warning('test_getinstant_kpi begin') +# kpi_id = monitoring_client.SetKpi(KpiId()) +# monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) +# sleep(0.3) +# response = monitoring_client.GetInstantKpi(kpi_id) +# LOGGER.debug(response) +# assert isinstance(response, Kpi) def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name @@ -461,67 +461,67 @@ def test_managementdb_tools_insert_alarm(management_db): _in_range, _include_min_value, _include_max_value) LOGGER.debug(_alarm_id) assert isinstance(_alarm_id,int) - -def test_metrics_db_tools(metrics_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_metric_sdb_tools_write_kpi begin') - _kpiId = "6" - - for i in range(50): - _kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') - _deviceId = 'DEV4' - _endpointId = 'END4' - _serviceId = 'SERV4' - _sliceId = 'SLC4' - _connectionId = 'CON4' - _time_stamp = timestamp_utcnow_to_float() - _kpi_value = 500*random() - - metrics_db.write_KPI(_time_stamp, _kpiId, _kpiSampleType, _deviceId, _endpointId, _serviceId, _sliceId, _connectionId, - _kpi_value) - sleep(0.05) - - _query = f"SELECT * FROM monitoring WHERE kpi_id ='{_kpiId}'" - _data = metrics_db.run_query(_query) - assert len(_data) >= 50 - -def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler): - LOGGER.warning('test_subscription_manager_create_subscription begin') - subs_queue = Queue() - - subs_manager = SubscriptionManager(metrics_db) - - subs_scheduler.add_job(ingestion_data) - - kpi_id = "3" - sampling_duration_s = 20 - sampling_interval_s = 3 - real_start_time = timestamp_utcnow_to_float() - start_timestamp = real_start_time - end_timestamp = start_timestamp + sampling_duration_s - - subs_id = management_db.insert_subscription(kpi_id, "localhost", sampling_duration_s, - sampling_interval_s,start_timestamp,end_timestamp) - subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s, - sampling_duration_s,start_timestamp,end_timestamp) - - # This is here to simulate application activity (which keeps the main thread alive). - total_points = 0 - while True: - while not subs_queue.empty(): - list = subs_queue.get_nowait() - kpi_list = KpiList() - for item in list: - kpi = Kpi() - kpi.kpi_id.kpi_id.uuid = item[0] - kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) - kpi.kpi_value.floatVal = item[2] - kpi_list.kpi.append(kpi) - total_points += 1 - LOGGER.debug(kpi_list) - if timestamp_utcnow_to_float() > end_timestamp: - break - - assert total_points != 0 +# +# def test_metrics_db_tools(metrics_db): # pylint: disable=redefined-outer-name +# LOGGER.warning('test_metric_sdb_tools_write_kpi begin') +# _kpiId = "6" +# +# for i in range(50): +# _kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') +# _deviceId = 'DEV4' +# _endpointId = 'END4' +# _serviceId = 'SERV4' +# _sliceId = 'SLC4' +# _connectionId = 'CON4' +# _time_stamp = timestamp_utcnow_to_float() +# _kpi_value = 500*random() +# +# metrics_db.write_KPI(_time_stamp, _kpiId, _kpiSampleType, _deviceId, _endpointId, _serviceId, _sliceId, _connectionId, +# _kpi_value) +# sleep(0.05) +# +# _query = f"SELECT * FROM monitoring WHERE kpi_id ='{_kpiId}'" +# _data = metrics_db.run_query(_query) +# assert len(_data) >= 50 +# +# def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler): +# LOGGER.warning('test_subscription_manager_create_subscription begin') +# subs_queue = Queue() +# +# subs_manager = SubscriptionManager(metrics_db) +# +# subs_scheduler.add_job(ingestion_data) +# +# kpi_id = "3" +# sampling_duration_s = 20 +# sampling_interval_s = 3 +# real_start_time = timestamp_utcnow_to_float() +# start_timestamp = real_start_time +# end_timestamp = start_timestamp + sampling_duration_s +# +# subs_id = management_db.insert_subscription(kpi_id, "localhost", sampling_duration_s, +# sampling_interval_s,start_timestamp,end_timestamp) +# subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s, +# sampling_duration_s,start_timestamp,end_timestamp) +# +# # This is here to simulate application activity (which keeps the main thread alive). +# total_points = 0 +# while True: +# while not subs_queue.empty(): +# list = subs_queue.get_nowait() +# kpi_list = KpiList() +# for item in list: +# kpi = Kpi() +# kpi.kpi_id.kpi_id.uuid = item[0] +# kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) +# kpi.kpi_value.floatVal = item[2] +# kpi_list.kpi.append(kpi) +# total_points += 1 +# LOGGER.debug(kpi_list) +# if timestamp_utcnow_to_float() > end_timestamp: +# break +# +# assert total_points != 0 def test_events_tools( context_client : ContextClient, # pylint: disable=redefined-outer-name -- GitLab From ae0398a5de60fbdf55e735641dda554dc8057d4a Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Mon, 7 Nov 2022 17:15:57 +0100 Subject: [PATCH 12/20] Clean code --- src/monitoring/service/AlarmManager.py | 3 ++- src/monitoring/service/SubscriptionManager.py | 3 ++- src/monitoring/tests/test_unitary.py | 7 ++++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/monitoring/service/AlarmManager.py b/src/monitoring/service/AlarmManager.py index 27b169b16..d80d815fe 100644 --- a/src/monitoring/service/AlarmManager.py +++ b/src/monitoring/service/AlarmManager.py @@ -24,11 +24,12 @@ class AlarmManager(): start_date = datetime.utcfromtimestamp(start_timestamp).isoformat() end_date = datetime.utcfromtimestamp(end_timestamp).isoformat() - self.scheduler.add_job(self.metrics_db.get_alarm_data, + job = self.scheduler.add_job(self.metrics_db.get_alarm_data, args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms), trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, end_date=end_date,timezone=pytz.utc, id=str(alarm_id)) LOGGER.debug(f"Alarm job {alarm_id} succesfully created") + job.remove() def delete_alarm(self, alarm_id): try: diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index f76cf8c39..6ff922c52 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -42,10 +42,11 @@ class SubscriptionManager(): if end_timestamp: end_date = datetime.utcfromtimestamp(end_timestamp).isoformat() - self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s), + job = self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s), trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, timezone=pytz.utc, id=str(subscription_id)) LOGGER.debug(f"Subscrition job {subscription_id} succesfully created") + job.remove() def delete_subscription(self, subscription_id): self.scheduler.remove_job(subscription_id) diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index 823998899..55ac9a18b 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -22,6 +22,7 @@ from typing import Tuple from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.schedulers.base import STATE_STOPPED from grpc._channel import _MultiThreadedRendezvous from common.Constants import ServiceNameEnum @@ -299,6 +300,8 @@ def test_set_kpi_subscription(monitoring_client,metrics_db,subs_scheduler): # py for item in response: LOGGER.debug(item) assert isinstance(item, SubsResponse) + if (subs_scheduler.state != STATE_STOPPED): + subs_scheduler.shutdown() # Test case that makes use of client fixture to test server's GetSubsDescriptor method def test_get_subs_descriptor(monitoring_client): @@ -364,6 +367,9 @@ def test_get_alarm_response_stream(monitoring_client,subs_scheduler): LOGGER.debug(item) assert isinstance(item,AlarmResponse) + if(subs_scheduler.state != STATE_STOPPED): + subs_scheduler.shutdown() + # Test case that makes use of client fixture to test server's DeleteAlarm method def test_delete_alarm(monitoring_client): LOGGER.warning('test_delete_alarm') @@ -390,7 +396,6 @@ def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-na # LOGGER.debug(response) # assert isinstance(response, Kpi) - def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_managementdb_tools_insert_kpi begin') _create_kpi_request = create_kpi_request() -- GitLab From c85e486fe92df215ca8f2282107694af43cdad43 Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Fri, 18 Nov 2022 12:41:50 +0100 Subject: [PATCH 13/20] Updated QueryKpiData RPC (stable) --- proto/monitoring.proto | 19 +++++++- src/monitoring/client/MonitoringClient.py | 4 +- src/monitoring/service/MetricsDBTools.py | 21 +++++++++ .../service/MonitoringServiceServicerImpl.py | 45 +++++++++++++++++-- src/monitoring/tests/Messages.py | 11 ++++- src/monitoring/tests/test_unitary.py | 15 ++++--- 6 files changed, 100 insertions(+), 15 deletions(-) diff --git a/proto/monitoring.proto b/proto/monitoring.proto index c0e2dd877..270ed9ccf 100644 --- a/proto/monitoring.proto +++ b/proto/monitoring.proto @@ -25,7 +25,7 @@ service MonitoringService { rpc GetKpiDescriptorList (context.Empty ) returns (KpiDescriptorList ) {} // Stable and final rpc IncludeKpi (Kpi ) returns (context.Empty ) {} // Stable and final rpc MonitorKpi (MonitorKpiRequest ) returns (context.Empty ) {} // Stable and final - rpc QueryKpiData (KpiQuery ) returns (KpiList ) {} // Not implemented + rpc QueryKpiData (KpiQuery ) returns (RawKpiTable ) {} // Not implemented rpc SetKpiSubscription (SubsDescriptor ) returns (stream SubsResponse ) {} // Stable not final rpc GetSubsDescriptor (SubscriptionID ) returns (SubsDescriptor ) {} // Stable and final rpc GetSubscriptions (context.Empty ) returns (SubsList ) {} // Stable and final @@ -59,7 +59,7 @@ message MonitorKpiRequest { } message KpiQuery { - KpiId kpi_id = 1; + repeated KpiId kpi_id = 1; float monitoring_window_s = 2; float sampling_rate_s = 3; uint32 last_n_samples = 4; // used when you want something like "get the last N many samples @@ -68,6 +68,21 @@ message KpiQuery { // Pending add field to reflect Available Device Protocols } + +message RawKpi { // cell + context.Timestamp timestamp = 1; + KpiValue kpi_value = 2; +} + +message RawKpiList { // column + KpiId kpi_id = 1; + repeated RawKpi raw_kpi_list = 2; +} + +message RawKpiTable { // table + repeated RawKpiList raw_kpi_table = 1; +} + message KpiId { context.Uuid kpi_id = 1; } diff --git a/src/monitoring/client/MonitoringClient.py b/src/monitoring/client/MonitoringClient.py index 73607a081..5641b9cf3 100644 --- a/src/monitoring/client/MonitoringClient.py +++ b/src/monitoring/client/MonitoringClient.py @@ -22,7 +22,7 @@ from common.tools.grpc.Tools import grpc_message_to_json_string from common.proto.context_pb2 import Empty from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest, \ KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsList, \ - SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription + SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription, RawKpiTable from common.proto.monitoring_pb2_grpc import MonitoringServiceStub LOGGER = logging.getLogger(__name__) @@ -93,7 +93,7 @@ class MonitoringClient: return response @RETRY_DECORATOR - def QueryKpiData(self, request : KpiQuery) -> KpiList: + def QueryKpiData(self, request : KpiQuery) -> RawKpiTable: LOGGER.debug('QueryKpiData: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.QueryKpiData(request) LOGGER.debug('QueryKpiData result: {:s}'.format(grpc_message_to_json_string(response))) diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index 0f41cfee1..aa374b340 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -174,6 +174,27 @@ class MetricsDB(): if connection: connection.close() + def get_raw_kpi_list(self, kpi_id, monitoring_window_s,sampling_rate_s, last_n_samples, start_timestamp, end_timestamp): + try: + end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000 + start_date = end_date - monitoring_window_s + query = f"SELECT timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + + if self.postgre: + kpi_list = self.run_query_postgre(query) + LOGGER.debug(f"kpi_list postgre: {kpi_list}") + else: + kpi_list = self.run_query(query) + LOGGER.debug(f"kpi_list influx: {kpi_list}") + + if kpi_list: + LOGGER.debug(f"New data received for subscription to KPI {kpi_id}") + return kpi_list + else: + LOGGER.debug(f"No new data for the subscription to KPI {kpi_id}") + except (Exception) as e: + LOGGER.debug(f"Subscription data cannot be retrieved. {e}") + def get_subscription_data(self,subs_queue, kpi_id, sampling_interval_s=1): try: end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000 diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 9c88ed311..e12d7ca73 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -26,7 +26,7 @@ from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \ KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \ - MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse + MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse, RawKpiTable, RawKpi, RawKpiList from common.rpc_method_wrapper.ServiceExceptions import ServiceException from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float @@ -243,12 +243,49 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # CREATEKPI_COUNTER_FAILED.inc() - def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> KpiList: + def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> RawKpiTable: LOGGER.info('QueryKpiData') try: - # TBC - return KpiList() + raw_kpi_table = RawKpiTable() + + LOGGER.debug(str(request)) + + kpi_id_list = request.kpi_id + monitoring_window_s = request.monitoring_window_s + sampling_rate_s = request.sampling_rate_s + last_n_samples = request.last_n_samples + start_timestamp = request.start_timestamp.timestamp + end_timestamp = request.end_timestamp.timestamp + + # Check if all the Kpi_ids exist + for item in kpi_id_list: + kpi_id = item.kpi_id.uuid + + kpiDescriptor = self.GetKpiDescriptor(item, grpc_context) + if kpiDescriptor is None: + LOGGER.info('QueryKpiData error: KpiID({:s}): not found in database'.format(str(kpi_id))) + break + else: + # Execute query per Kpi_id and introduce their kpi_list in the table + kpi_list = self.metrics_db.get_raw_kpi_list(kpi_id,monitoring_window_s,sampling_rate_s,last_n_samples,start_timestamp,end_timestamp) + raw_kpi_list = RawKpiList() + raw_kpi_list.kpi_id.kpi_id.uuid = kpi_id + + LOGGER.debug(str(kpi_list)) + + if kpi_list is None: + LOGGER.info('QueryKpiData error: KpiID({:s}): points not found in metrics database'.format(str(kpi_id))) + else: + for item in kpi_list: + raw_kpi = RawKpi() + raw_kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + raw_kpi.kpi_value.floatVal = item[2] + raw_kpi_list.raw_kpi_list.append(raw_kpi) + + raw_kpi_table.raw_kpi_table.append(raw_kpi_list) + + return raw_kpi_table except ServiceException as e: LOGGER.exception('QueryKpiData exception') grpc_context.abort(e.code, e.details) diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index 228b1ce42..a2fc421dd 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import datetime from random import random from common.proto import monitoring_pb2 @@ -75,9 +74,17 @@ def kpi_descriptor_list(): return _kpi_descriptor_list -def kpi_query(): +def kpi_query(kpi_id_list): _kpi_query = monitoring_pb2.KpiQuery() + _kpi_query.kpi_id.extend(kpi_id_list) + _kpi_query.monitoring_window_s = 20 + _kpi_query.sampling_rate_s = 3 + # _kpi_query.last_n_samples = 10 + _kpi_query.start_timestamp.timestamp = timestamp_utcnow_to_float() - 20 + _kpi_query.end_timestamp.timestamp = timestamp_utcnow_to_float() + + return _kpi_query def subs_descriptor(kpi_id): diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index 55ac9a18b..68bd6685b 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -36,7 +36,7 @@ from common.message_broker.MessageBroker import MessageBroker from common.proto import monitoring_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \ - AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse + AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse, RawKpiTable from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_string_to_float from context.client.ContextClient import ContextClient @@ -284,14 +284,19 @@ def test_monitor_kpi( assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's QueryKpiData method -def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-name +def test_query_kpi_data(monitoring_client,subs_scheduler): # pylint: disable=redefined-outer-name + + kpi_id_list = [] + kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + subs_scheduler.add_job(ingestion_data) + kpi_id_list.append(kpi_id) LOGGER.warning('test_query_kpi_data') - response = monitoring_client.QueryKpiData(kpi_query()) + response = monitoring_client.QueryKpiData(kpi_query(kpi_id_list)) LOGGER.debug(str(response)) - assert isinstance(response, KpiList) + assert isinstance(response, RawKpiTable) # Test case that makes use of client fixture to test server's SetKpiSubscription method -def test_set_kpi_subscription(monitoring_client,metrics_db,subs_scheduler): # pylint: disable=redefined-outer-name +def test_set_kpi_subscription(monitoring_client,subs_scheduler): # pylint: disable=redefined-outer-name LOGGER.warning('test_set_kpi_subscription') kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) subs_scheduler.add_job(ingestion_data) -- GitLab From 5c138dad0fe68eabd51aba87270432fe506a35f0 Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Tue, 22 Nov 2022 17:57:11 +0100 Subject: [PATCH 14/20] Improve QueryKpiData RPC --- proto/monitoring.proto | 16 ++++----- src/monitoring/service/MetricsDBTools.py | 34 +++++++++++++++---- .../service/MonitoringServiceServicerImpl.py | 13 ++++--- src/monitoring/tests/Messages.py | 30 ++++++++-------- 4 files changed, 55 insertions(+), 38 deletions(-) diff --git a/proto/monitoring.proto b/proto/monitoring.proto index 270ed9ccf..027dcb022 100644 --- a/proto/monitoring.proto +++ b/proto/monitoring.proto @@ -59,13 +59,11 @@ message MonitorKpiRequest { } message KpiQuery { - repeated KpiId kpi_id = 1; + repeated KpiId kpi_ids = 1; float monitoring_window_s = 2; - float sampling_rate_s = 3; - uint32 last_n_samples = 4; // used when you want something like "get the last N many samples - context.Timestamp start_timestamp = 5; // used when you want something like "get the samples since X date/time" - context.Timestamp end_timestamp = 6; // used when you want something like "get the samples until X date/time" - // Pending add field to reflect Available Device Protocols + uint32 last_n_samples = 3; // used when you want something like "get the last N many samples + context.Timestamp start_timestamp = 4; // used when you want something like "get the samples since X date/time" + context.Timestamp end_timestamp = 5; // used when you want something like "get the samples until X date/time" } @@ -75,12 +73,12 @@ message RawKpi { // cell } message RawKpiList { // column - KpiId kpi_id = 1; - repeated RawKpi raw_kpi_list = 2; + KpiId kpi_id = 1; + repeated RawKpi raw_kpis = 2; } message RawKpiTable { // table - repeated RawKpiList raw_kpi_table = 1; + repeated RawKpiList raw_kpi_lists = 1; } message KpiId { diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index aa374b340..5848ce545 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -23,7 +23,7 @@ import datetime from common.tools.timestamp.Converters import timestamp_float_to_string, timestamp_utcnow_to_float import psycopg2 -LOGGER = logging.getLogger(__name__) +from monitoring.service.MonitoringServiceServicerImpl import LOGGER class MetricsDB(): @@ -174,11 +174,33 @@ class MetricsDB(): if connection: connection.close() - def get_raw_kpi_list(self, kpi_id, monitoring_window_s,sampling_rate_s, last_n_samples, start_timestamp, end_timestamp): + def get_raw_kpi_list(self, kpi_id, monitoring_window_s, last_n_samples, start_timestamp, end_timestamp): try: - end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000 - start_date = end_date - monitoring_window_s - query = f"SELECT timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + query_root = f"SELECT timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' " + query = query_root + start_date = float() + end_date = float() + if last_n_samples: + query = query + f"ORDER BY timestamp DESC limit {last_n_samples}" + elif monitoring_window_s or start_timestamp or end_timestamp: + if start_timestamp and end_timestamp: + start_date = start_timestamp + end_date = end_timestamp + elif monitoring_window_s: + if start_timestamp and not end_timestamp: + start_date = start_timestamp + end_date = start_date + monitoring_window_s + elif end_timestamp and not start_timestamp: + end_date = end_timestamp + start_date = end_date - monitoring_window_s + elif not start_timestamp and not end_timestamp: + end_date = timestamp_utcnow_to_float() + start_date = end_date - monitoring_window_s + query = query + f"AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + else: + LOGGER.debug(f"Wrong parameters settings") + + LOGGER.debug(query) if self.postgre: kpi_list = self.run_query_postgre(query) @@ -186,7 +208,6 @@ class MetricsDB(): else: kpi_list = self.run_query(query) LOGGER.debug(f"kpi_list influx: {kpi_list}") - if kpi_list: LOGGER.debug(f"New data received for subscription to KPI {kpi_id}") return kpi_list @@ -200,6 +221,7 @@ class MetricsDB(): end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000 start_date = end_date - sampling_interval_s query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + LOGGER.debug(query) if self.postgre: kpi_list = self.run_query_postgre(query) LOGGER.debug(f"kpi_list postgre: {kpi_list}") diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index e12d7ca73..2d7d01e3e 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -251,9 +251,8 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.debug(str(request)) - kpi_id_list = request.kpi_id + kpi_id_list = request.kpi_ids monitoring_window_s = request.monitoring_window_s - sampling_rate_s = request.sampling_rate_s last_n_samples = request.last_n_samples start_timestamp = request.start_timestamp.timestamp end_timestamp = request.end_timestamp.timestamp @@ -268,7 +267,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): break else: # Execute query per Kpi_id and introduce their kpi_list in the table - kpi_list = self.metrics_db.get_raw_kpi_list(kpi_id,monitoring_window_s,sampling_rate_s,last_n_samples,start_timestamp,end_timestamp) + kpi_list = self.metrics_db.get_raw_kpi_list(kpi_id,monitoring_window_s,last_n_samples,start_timestamp,end_timestamp) raw_kpi_list = RawKpiList() raw_kpi_list.kpi_id.kpi_id.uuid = kpi_id @@ -279,11 +278,11 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): else: for item in kpi_list: raw_kpi = RawKpi() - raw_kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) - raw_kpi.kpi_value.floatVal = item[2] - raw_kpi_list.raw_kpi_list.append(raw_kpi) + raw_kpi.timestamp.timestamp = timestamp_string_to_float(item[0]) + raw_kpi.kpi_value.floatVal = item[1] + raw_kpi_list.raw_kpis.append(raw_kpi) - raw_kpi_table.raw_kpi_table.append(raw_kpi_list) + raw_kpi_table.raw_kpi_lists.append(raw_kpi_list) return raw_kpi_table except ServiceException as e: diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index a2fc421dd..f15cb5ec2 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -22,15 +22,15 @@ def kpi_id(): _kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member return _kpi_id -def create_kpi_request(): - _create_kpi_request = monitoring_pb2.KpiDescriptor() - _create_kpi_request.kpi_description = 'KPI Description Test' - _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED - _create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member - _create_kpi_request.service_id.service_uuid.uuid = 'SERV1' # pylint: disable=maybe-no-member - _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' # pylint: disable=maybe-no-member - _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member - _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' # pylint: disable=maybe-no-member +def create_kpi_request(kpi_id_str): + _create_kpi_request = monitoring_pb2.KpiDescriptor() + _create_kpi_request.kpi_description = 'KPI Description Test' + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED + _create_kpi_request.device_id.device_uuid.uuid = 'DEV' + str(kpi_id_str) + _create_kpi_request.service_id.service_uuid.uuid = 'SERV' + str(kpi_id_str) + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC' + str(kpi_id_str) + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END' + str(kpi_id_str) + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON' + str(kpi_id_str) return _create_kpi_request def create_kpi_request_b(): @@ -77,20 +77,18 @@ def kpi_descriptor_list(): def kpi_query(kpi_id_list): _kpi_query = monitoring_pb2.KpiQuery() - _kpi_query.kpi_id.extend(kpi_id_list) - _kpi_query.monitoring_window_s = 20 - _kpi_query.sampling_rate_s = 3 - # _kpi_query.last_n_samples = 10 - _kpi_query.start_timestamp.timestamp = timestamp_utcnow_to_float() - 20 + _kpi_query.kpi_ids.extend(kpi_id_list) + # _kpi_query.monitoring_window_s = 10 + # _kpi_query.last_n_samples = 2 + _kpi_query.start_timestamp.timestamp = timestamp_utcnow_to_float() - 10 _kpi_query.end_timestamp.timestamp = timestamp_utcnow_to_float() - return _kpi_query def subs_descriptor(kpi_id): _subs_descriptor = monitoring_pb2.SubsDescriptor() - sampling_duration_s = 20 + sampling_duration_s = 10 sampling_interval_s = 3 real_start_time = timestamp_utcnow_to_float() start_timestamp = real_start_time -- GitLab From 8f705ca26d77979a4dd55ac3c58f7c88decc25f2 Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Tue, 22 Nov 2022 17:57:20 +0100 Subject: [PATCH 15/20] Improve QueryKpiData RPC --- src/monitoring/tests/test_unitary.py | 64 +++++++++++++++------------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index 68bd6685b..92d6a6f3f 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -182,19 +182,19 @@ def subs_scheduler(): return _scheduler -def ingestion_data(): +def ingestion_data(kpi_id_int): metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring") - for i in range(200): - kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') - kpiId = "3" - deviceId = 'DEV3' - endpointId = 'END3' - serviceId = 'SERV3' - sliceId = 'SLC3' - connectionId = 'CON3' - time_stamp = timestamp_utcnow_to_float() - kpi_value = 500*random() + for i in range(50): + kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') + kpiId = kpi_id_int + deviceId = 'DEV'+ str(kpi_id_int) + endpointId = 'END' + str(kpi_id_int) + serviceId = 'SERV' + str(kpi_id_int) + sliceId = 'SLC' + str(kpi_id_int) + connectionId = 'CON' + str(kpi_id_int) + time_stamp = timestamp_utcnow_to_float() + kpi_value = 500*random() metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, kpi_value) @@ -208,18 +208,17 @@ def ingestion_data(): def test_set_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('test_create_kpi requesting') - response = monitoring_client.SetKpi(create_kpi_request()) - LOGGER.debug(str(response)) - response = monitoring_client.SetKpi(create_kpi_request_b()) - LOGGER.debug(str(response)) - assert isinstance(response, KpiId) + for i in range(3): + response = monitoring_client.SetKpi(create_kpi_request(str(i+1))) + LOGGER.debug(str(response)) + assert isinstance(response, KpiId) # Test case that makes use of client fixture to test server's DeleteKpi method def test_delete_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('delete_kpi requesting') - response = monitoring_client.SetKpi(create_kpi_request_b()) + response = monitoring_client.SetKpi(create_kpi_request('4')) response = monitoring_client.DeleteKpi(response) LOGGER.debug(str(response)) assert isinstance(response, Empty) @@ -227,7 +226,7 @@ def test_delete_kpi(monitoring_client): # pylint: disable=redefined-outer-name # Test case that makes use of client fixture to test server's GetKpiDescriptor method def test_get_kpidescritor(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getkpidescritor_kpi begin') - response = monitoring_client.SetKpi(create_kpi_request_c()) + response = monitoring_client.SetKpi(create_kpi_request('1')) response = monitoring_client.GetKpiDescriptor(response) LOGGER.debug(str(response)) assert isinstance(response, KpiDescriptor) @@ -243,7 +242,8 @@ def test_get_kpi_descriptor_list(monitoring_client): # pylint: disable=redefined def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('test_include_kpi requesting') - kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + kpi_id = monitoring_client.SetKpi(create_kpi_request('1')) + LOGGER.debug(str(kpi_id)) response = monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) LOGGER.debug(str(response)) assert isinstance(response, Empty) @@ -277,7 +277,7 @@ def test_monitor_kpi( response = device_client.AddDevice(Device(**device_with_connect_rules)) assert response.device_uuid.uuid == DEVICE_DEV1_UUID - response = monitoring_client.SetKpi(create_kpi_request()) + response = monitoring_client.SetKpi(create_kpi_request('1')) _monitor_kpi_request = monitor_kpi_request(response.kpi_id.uuid, 120, 5) # pylint: disable=maybe-no-member response = monitoring_client.MonitorKpi(_monitor_kpi_request) LOGGER.debug(str(response)) @@ -287,19 +287,23 @@ def test_monitor_kpi( def test_query_kpi_data(monitoring_client,subs_scheduler): # pylint: disable=redefined-outer-name kpi_id_list = [] - kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - subs_scheduler.add_job(ingestion_data) - kpi_id_list.append(kpi_id) + for i in range(2): + kpi_id = monitoring_client.SetKpi(create_kpi_request(str(i+1))) + subs_scheduler.add_job(ingestion_data, args=[kpi_id.kpi_id.uuid]) + kpi_id_list.append(kpi_id) LOGGER.warning('test_query_kpi_data') + sleep(5) response = monitoring_client.QueryKpiData(kpi_query(kpi_id_list)) LOGGER.debug(str(response)) assert isinstance(response, RawKpiTable) + if (subs_scheduler.state != STATE_STOPPED): + subs_scheduler.shutdown() # Test case that makes use of client fixture to test server's SetKpiSubscription method def test_set_kpi_subscription(monitoring_client,subs_scheduler): # pylint: disable=redefined-outer-name LOGGER.warning('test_set_kpi_subscription') - kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - subs_scheduler.add_job(ingestion_data) + kpi_id = monitoring_client.SetKpi(create_kpi_request('1')) + subs_scheduler.add_job(ingestion_data, args=[kpi_id.kpi_id.uuid]) response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) assert isinstance(response, _MultiThreadedRendezvous) for item in response: @@ -363,9 +367,9 @@ def test_get_alarm_descriptor(monitoring_client): # Test case that makes use of client fixture to test server's GetAlarmResponseStream method def test_get_alarm_response_stream(monitoring_client,subs_scheduler): LOGGER.warning('test_get_alarm_descriptor') - _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _kpi_id = monitoring_client.SetKpi(create_kpi_request('3')) _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) - subs_scheduler.add_job(ingestion_data) + subs_scheduler.add_job(ingestion_data,args=[_kpi_id.kpi_id.uuid]) _response = monitoring_client.GetAlarmResponseStream(alarm_subscription(_alarm_id)) assert isinstance(_response, _MultiThreadedRendezvous) for item in _response: @@ -403,7 +407,7 @@ def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-na def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_managementdb_tools_insert_kpi begin') - _create_kpi_request = create_kpi_request() + _create_kpi_request = create_kpi_request('5') kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member @@ -417,7 +421,7 @@ def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefin def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_managementdb_tools_get_kpi begin') - _create_kpi_request = create_kpi_request() + _create_kpi_request = create_kpi_request('5') kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member @@ -438,7 +442,7 @@ def test_managementdb_tools_get_kpis(management_db): # pylint: disable=redefined def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_managementdb_tools_get_kpi begin') - _create_kpi_request = create_kpi_request() + _create_kpi_request = create_kpi_request('5') kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member -- GitLab From f29a336ef37aba513a2d4d01ca737a09f5d230a3 Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Fri, 25 Nov 2022 09:23:21 +0100 Subject: [PATCH 16/20] Add monitoring flag when a kpi is started to be monitored --- src/monitoring/service/ManagementDBTools.py | 42 ++++++++++++++++- .../service/MonitoringServiceServicerImpl.py | 9 +++- src/monitoring/tests/test_unitary.py | 47 +++++-------------- 3 files changed, 60 insertions(+), 38 deletions(-) diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py index ae58ffe85..2185a3986 100644 --- a/src/monitoring/service/ManagementDBTools.py +++ b/src/monitoring/service/ManagementDBTools.py @@ -40,7 +40,8 @@ class ManagementDB(): endpoint_id INTEGER, service_id INTEGER, slice_id INTEGER, - connection_id INTEGER + connection_id INTEGER, + monitor_flag INTEGER ); """) LOGGER.debug("KPI table created in the ManagementDB") @@ -247,4 +248,41 @@ class ManagementDB(): LOGGER.debug(f"Alarms succesfully retrieved from the ManagementDB") return data except sqlite3.Error as e: - LOGGER.debug(f"Alarms cannot be retrieved from the ManagementDB: {e}") \ No newline at end of file + LOGGER.debug(f"Alarms cannot be retrieved from the ManagementDB: {e}") + + def check_monitoring_flag(self,kpi_id): + try: + c = self.client.cursor() + c.execute("SELECT monitor_flag FROM kpi WHERE kpi_id is ?",(kpi_id,)) + data=c.fetchone() + if data is None: + LOGGER.debug(f"KPI {kpi_id} does not exists") + return None + else: + if data[0] == 1: + return True + elif data[0] == 0: + return False + else: + LOGGER.debug(f"KPI {kpi_id} is wrong") + return None + except sqlite3.Error as e: + LOGGER.debug(f"KPI {kpi_id} cannot be checked from the ManagementDB: {e}") + + + def set_monitoring_flag(self,kpi_id,flag): + try: + c = self.client.cursor() + data = c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)).fetchone() + if data is None: + LOGGER.debug(f"KPI {kpi_id} does not exists") + return None + else: + if flag : + value = 1 + else: + value = 0 + c.execute("UPDATE kpi SET monitor_flag = ? WHERE kpi_id is ?",(value,kpi_id)) + return True + except sqlite3.Error as e: + LOGGER.debug(f"KPI {kpi_id} cannot be checked from the ManagementDB: {e}") \ No newline at end of file diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 2d7d01e3e..c265d2c9d 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -229,8 +229,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): monitor_device_request.sampling_duration_s = request.monitoring_window_s monitor_device_request.sampling_interval_s = request.sampling_rate_s - device_client = DeviceClient() - device_client.MonitorDeviceKpi(monitor_device_request) + if not self.management_db.check_monitoring_flag(kpi_id): + device_client = DeviceClient() + device_client.MonitorDeviceKpi(monitor_device_request) + self.management_db.set_monitoring_flag(kpi_id,True) + self.management_db.check_monitoring_flag(kpi_id) + else: + LOGGER.warning('MonitorKpi warning: KpiID({:s}) is currently being monitored'.format(str(kpi_id))) else: LOGGER.info('MonitorKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) return response diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index 92d6a6f3f..b113f5a78 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -405,22 +405,8 @@ def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-na # LOGGER.debug(response) # assert isinstance(response, Kpi) -def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_managementdb_tools_insert_kpi begin') - _create_kpi_request = create_kpi_request('5') - kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member - kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member - kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member - kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member - kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member - kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid - kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid - - response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_slice_id,kpi_connection_id) - assert isinstance(response, int) - -def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_managementdb_tools_get_kpi begin') +def test_managementdb_tools_kpis(management_db): # pylint: disable=redefined-outer-name + LOGGER.warning('test_managementdb_tools_kpis begin') _create_kpi_request = create_kpi_request('5') kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member @@ -431,33 +417,26 @@ def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined- kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_slice_id,kpi_connection_id) + assert isinstance(_kpi_id, int) + response = management_db.get_KPI(_kpi_id) assert isinstance(response, tuple) -def test_managementdb_tools_get_kpis(management_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_managementdb_tools_get_kpis begin') + response = management_db.set_monitoring_flag(_kpi_id,True) + assert response is True + response = management_db.check_monitoring_flag(_kpi_id) + assert response is True + management_db.set_monitoring_flag(_kpi_id, False) + response = management_db.check_monitoring_flag(_kpi_id) + assert response is False + response = management_db.get_KPIS() assert isinstance(response, list) -def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_managementdb_tools_get_kpi begin') - - _create_kpi_request = create_kpi_request('5') - kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member - kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member - kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member - kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member - kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member - kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid - kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid - - _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, - kpi_service_id, kpi_slice_id, kpi_connection_id) - response = management_db.delete_KPI(_kpi_id) - assert response + def test_managementdb_tools_insert_alarm(management_db): LOGGER.warning('test_managementdb_tools_insert_alarm begin') -- GitLab From 8bb76aafbaf26d2bb56758267852edf81ac6be24 Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Fri, 25 Nov 2022 09:59:30 +0100 Subject: [PATCH 17/20] Minor import issue --- src/monitoring/service/MetricsDBTools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index 5848ce545..1d3888d53 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -23,7 +23,7 @@ import datetime from common.tools.timestamp.Converters import timestamp_float_to_string, timestamp_utcnow_to_float import psycopg2 -from monitoring.service.MonitoringServiceServicerImpl import LOGGER +LOGGER = logging.getLogger(__name__) class MetricsDB(): -- GitLab From 0d16676a1228f073bd16e658c83a808681d4b904 Mon Sep 17 00:00:00 2001 From: fjmmuro Date: Mon, 28 Nov 2022 13:58:10 +0100 Subject: [PATCH 18/20] Fix issue in GetInstantKpi --- .../service/MonitoringServiceServicerImpl.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index c265d2c9d..20a536b4a 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -604,10 +604,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): f"LATEST ON timestamp PARTITION BY kpi_id" data = self.metrics_db.run_query(query)[0] LOGGER.debug(data) - - response.kpi_id.kpi_id.uuid = str(data[0]) - response.timestamp.timestamp = timestamp_string_to_float(data[1]) - response.kpi_value.floatVal = data[2] # This must be improved + if len(data) == 0: + response.kpi_id.kpi_id.uuid = request.kpi_id.uuid + else: + data = data[0] + response.kpi_id.kpi_id.uuid = str(data[0]) + response.timestamp.timestamp = timestamp_string_to_float(data[1]) + response.kpi_value.floatVal = data[2] return response except ServiceException as e: -- GitLab From 4fc4317dfe0d8c8fcf7e63189dcd6fa675bd0e86 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Mon, 28 Nov 2022 14:14:02 +0000 Subject: [PATCH 19/20] Proto Monitoring cleanup --- proto/kpi_sample_types.proto | 4 ++-- proto/monitoring.proto | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/proto/kpi_sample_types.proto b/proto/kpi_sample_types.proto index 3494d9849..4419a8df4 100644 --- a/proto/kpi_sample_types.proto +++ b/proto/kpi_sample_types.proto @@ -23,8 +23,8 @@ enum KpiSampleType { KPISAMPLETYPE_BYTES_TRANSMITTED = 201; KPISAMPLETYPE_BYTES_RECEIVED = 202; KPISAMPLETYPE_BYTES_DROPPED = 203; - KPISAMPLETYPE_ML_CONFIDENCE = 401; //. can be used by both optical and L3 without any issue - KPISAMPLETYPE_OPTICAL_SECURITY_STATUS = 501; //. can be used by both optical and L3 without any issue + KPISAMPLETYPE_ML_CONFIDENCE = 401; //. can be used by both optical and L3 without any issue + KPISAMPLETYPE_OPTICAL_SECURITY_STATUS = 501; //. can be used by both optical and L3 without any issue KPISAMPLETYPE_L3_UNIQUE_ATTACK_CONNS = 601; KPISAMPLETYPE_L3_TOTAL_DROPPED_PACKTS = 602; KPISAMPLETYPE_L3_UNIQUE_ATTACKERS = 603; diff --git a/proto/monitoring.proto b/proto/monitoring.proto index 027dcb022..f9c408c96 100644 --- a/proto/monitoring.proto +++ b/proto/monitoring.proto @@ -36,7 +36,7 @@ service MonitoringService { rpc GetAlarmResponseStream(AlarmSubscription ) returns (stream AlarmResponse) {} // Not Stable not final rpc DeleteAlarm (AlarmID ) returns (context.Empty ) {} // Stable and final rpc GetStreamKpi (KpiId ) returns (stream Kpi ) {} // Stable not final - rpc GetInstantKpi (KpiId ) returns (Kpi ) {} // Stable not final + rpc GetInstantKpi (KpiId ) returns (Kpi ) {} // Stable not final } message KpiDescriptor { @@ -59,7 +59,7 @@ message MonitorKpiRequest { } message KpiQuery { - repeated KpiId kpi_ids = 1; + repeated KpiId kpi_ids = 1; float monitoring_window_s = 2; uint32 last_n_samples = 3; // used when you want something like "get the last N many samples context.Timestamp start_timestamp = 4; // used when you want something like "get the samples since X date/time" -- GitLab From 9e82e3c77f18f06cfe90e3758fe999224188c129 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Mon, 28 Nov 2022 14:49:05 +0000 Subject: [PATCH 20/20] Monitoring component: - pre-merge fixes --- src/monitoring/service/AlarmManager.py | 2 +- src/monitoring/service/MonitoringServiceServicerImpl.py | 6 +++--- src/monitoring/service/SubscriptionManager.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/monitoring/service/AlarmManager.py b/src/monitoring/service/AlarmManager.py index d80d815fe..873a65d2c 100644 --- a/src/monitoring/service/AlarmManager.py +++ b/src/monitoring/service/AlarmManager.py @@ -29,7 +29,7 @@ class AlarmManager(): trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, end_date=end_date,timezone=pytz.utc, id=str(alarm_id)) LOGGER.debug(f"Alarm job {alarm_id} succesfully created") - job.remove() + #job.remove() def delete_alarm(self, alarm_id): try: diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 20a536b4a..6e927476b 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -89,7 +89,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpi_connection_id = request.connection_id.connection_uuid.uuid - if request.kpi_id.kpi_id.uuid is not "": + if request.kpi_id.kpi_id.uuid != "": response.kpi_id.uuid = request.kpi_id.kpi_id.uuid # Here the code to modify an existing kpi else: @@ -424,7 +424,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.debug(f"request.AlarmID: {request.alarm_id.alarm_id.uuid}") - if request.alarm_id.alarm_id.uuid is not "": + if request.alarm_id.alarm_id.uuid != "": alarm_id = request.alarm_id.alarm_id.uuid # Here the code to modify an existing alarm else: @@ -596,7 +596,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): try: kpi_id = request.kpi_id.uuid response = Kpi() - if kpi_id is "": + if kpi_id == "": LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) response.kpi_id.kpi_id.uuid = "NoID" else: diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index 6ff922c52..3d1da36b7 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -46,7 +46,7 @@ class SubscriptionManager(): trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, timezone=pytz.utc, id=str(subscription_id)) LOGGER.debug(f"Subscrition job {subscription_id} succesfully created") - job.remove() + #job.remove() def delete_subscription(self, subscription_id): self.scheduler.remove_job(subscription_id) -- GitLab