diff --git a/proto/kpi_sample_types.proto b/proto/kpi_sample_types.proto index 7445a0f25a57df9793bd8761da024581988cf9e6..4419a8df4a22047d8708c5cf2e2c3657148b5eeb 100644 --- a/proto/kpi_sample_types.proto +++ b/proto/kpi_sample_types.proto @@ -16,9 +16,19 @@ syntax = "proto3"; package kpi_sample_types; enum KpiSampleType { - KPISAMPLETYPE_UNKNOWN = 0; - KPISAMPLETYPE_PACKETS_TRANSMITTED = 101; - KPISAMPLETYPE_PACKETS_RECEIVED = 102; - KPISAMPLETYPE_BYTES_TRANSMITTED = 201; - KPISAMPLETYPE_BYTES_RECEIVED = 202; + KPISAMPLETYPE_UNKNOWN = 0; + KPISAMPLETYPE_PACKETS_TRANSMITTED = 101; + KPISAMPLETYPE_PACKETS_RECEIVED = 102; + KPISAMPLETYPE_PACKETS_DROPPED = 103; + KPISAMPLETYPE_BYTES_TRANSMITTED = 201; + KPISAMPLETYPE_BYTES_RECEIVED = 202; + KPISAMPLETYPE_BYTES_DROPPED = 203; + KPISAMPLETYPE_ML_CONFIDENCE = 401; //. can be used by both optical and L3 without any issue + KPISAMPLETYPE_OPTICAL_SECURITY_STATUS = 501; //. can be used by both optical and L3 without any issue + KPISAMPLETYPE_L3_UNIQUE_ATTACK_CONNS = 601; + KPISAMPLETYPE_L3_TOTAL_DROPPED_PACKTS = 602; + KPISAMPLETYPE_L3_UNIQUE_ATTACKERS = 603; + KPISAMPLETYPE_L3_UNIQUE_COMPROMISED_CLIENTS = 604; + KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO = 605; + KPISAMPLETYPE_SERVICE_LATENCY_MS = 701; } diff --git a/proto/monitoring.proto b/proto/monitoring.proto index 9be39db909d915b2a9b5d99b01841db028959543..f9c408c96ced121f35cc1116bf64d013e7320e6a 100644 --- a/proto/monitoring.proto +++ b/proto/monitoring.proto @@ -25,7 +25,7 @@ service MonitoringService { rpc GetKpiDescriptorList (context.Empty ) returns (KpiDescriptorList ) {} // Stable and final rpc IncludeKpi (Kpi ) returns (context.Empty ) {} // Stable and final rpc MonitorKpi (MonitorKpiRequest ) returns (context.Empty ) {} // Stable and final - rpc QueryKpiData (KpiQuery ) returns (KpiList ) {} // Not implemented + rpc QueryKpiData (KpiQuery ) returns (RawKpiTable ) {} // Not implemented rpc SetKpiSubscription (SubsDescriptor ) returns (stream SubsResponse ) {} // Stable not final rpc GetSubsDescriptor (SubscriptionID ) returns (SubsDescriptor ) {} // Stable and final rpc GetSubscriptions (context.Empty ) returns (SubsList ) {} // Stable and final @@ -36,7 +36,7 @@ service MonitoringService { rpc GetAlarmResponseStream(AlarmSubscription ) returns (stream AlarmResponse) {} // Not Stable not final rpc DeleteAlarm (AlarmID ) returns (context.Empty ) {} // Stable and final rpc GetStreamKpi (KpiId ) returns (stream Kpi ) {} // Stable not final - rpc GetInstantKpi (KpiId ) returns (Kpi ) {} // Stable not final + rpc GetInstantKpi (KpiId ) returns (Kpi ) {} // Stable not final } message KpiDescriptor { @@ -48,6 +48,7 @@ message KpiDescriptor { context.EndPointId endpoint_id = 6; context.ServiceId service_id = 7; context.SliceId slice_id = 8; + context.ConnectionId connection_id = 9; } message MonitorKpiRequest { @@ -58,13 +59,26 @@ message MonitorKpiRequest { } message KpiQuery { - KpiId kpi_id = 1; + repeated KpiId kpi_ids = 1; float monitoring_window_s = 2; - float sampling_rate_s = 3; - uint32 last_n_samples = 4; // used when you want something like "get the last N many samples - context.Timestamp start_timestamp = 5; // used when you want something like "get the samples since X date/time" - context.Timestamp end_timestamp = 6; // used when you want something like "get the samples until X date/time" - // Pending add field to reflect Available Device Protocols + uint32 last_n_samples = 3; // used when you want something like "get the last N many samples + context.Timestamp start_timestamp = 4; // used when you want something like "get the samples since X date/time" + context.Timestamp end_timestamp = 5; // used when you want something like "get the samples until X date/time" +} + + +message RawKpi { // cell + context.Timestamp timestamp = 1; + KpiValue kpi_value = 2; +} + +message RawKpiList { // column + KpiId kpi_id = 1; + repeated RawKpi raw_kpis = 2; +} + +message RawKpiTable { // table + repeated RawKpiList raw_kpi_lists = 1; } message KpiId { diff --git a/src/monitoring/client/MonitoringClient.py b/src/monitoring/client/MonitoringClient.py index 73607a081cd57e7c62b9c4e2c5e487868e72d189..5641b9cf3236c5fecfa5c6efe3a03b899c342ea5 100644 --- a/src/monitoring/client/MonitoringClient.py +++ b/src/monitoring/client/MonitoringClient.py @@ -22,7 +22,7 @@ from common.tools.grpc.Tools import grpc_message_to_json_string from common.proto.context_pb2 import Empty from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest, \ KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsList, \ - SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription + SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription, RawKpiTable from common.proto.monitoring_pb2_grpc import MonitoringServiceStub LOGGER = logging.getLogger(__name__) @@ -93,7 +93,7 @@ class MonitoringClient: return response @RETRY_DECORATOR - def QueryKpiData(self, request : KpiQuery) -> KpiList: + def QueryKpiData(self, request : KpiQuery) -> RawKpiTable: LOGGER.debug('QueryKpiData: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.QueryKpiData(request) LOGGER.debug('QueryKpiData result: {:s}'.format(grpc_message_to_json_string(response))) diff --git a/src/monitoring/service/AlarmManager.py b/src/monitoring/service/AlarmManager.py index e5ac8915c3728c7894dc70ab901215dd5a7feb41..873a65d2c8041e6378f84d979bb1fd98d4d61d6b 100644 --- a/src/monitoring/service/AlarmManager.py +++ b/src/monitoring/service/AlarmManager.py @@ -1,3 +1,4 @@ +import pytz from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.jobstores.base import JobLookupError @@ -19,10 +20,16 @@ class AlarmManager(): end_date=None if subscription_timeout_s: start_timestamp=time.time() - start_date=datetime.fromtimestamp(start_timestamp) - end_date=datetime.fromtimestamp(start_timestamp+subscription_timeout_s) - self.scheduler.add_job(self.metrics_db.get_alarm_data, args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, end_date=end_date, id=alarm_id) + end_timestamp = start_timestamp + subscription_timeout_s + start_date = datetime.utcfromtimestamp(start_timestamp).isoformat() + end_date = datetime.utcfromtimestamp(end_timestamp).isoformat() + + job = self.scheduler.add_job(self.metrics_db.get_alarm_data, + args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms), + trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, + end_date=end_date,timezone=pytz.utc, id=str(alarm_id)) LOGGER.debug(f"Alarm job {alarm_id} succesfully created") + #job.remove() def delete_alarm(self, alarm_id): try: diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py index 2387ddde0ab9eecea6c8fc982ba97a94f1a88c98..2185a3986532ad1b8e629cdcdb66079f23995c8f 100644 --- a/src/monitoring/service/ManagementDBTools.py +++ b/src/monitoring/service/ManagementDBTools.py @@ -38,7 +38,10 @@ class ManagementDB(): kpi_sample_type INTEGER, device_id INTEGER, endpoint_id INTEGER, - service_id INTEGER + service_id INTEGER, + slice_id INTEGER, + connection_id INTEGER, + monitor_flag INTEGER ); """) LOGGER.debug("KPI table created in the ManagementDB") @@ -84,13 +87,13 @@ class ManagementDB(): LOGGER.debug(f"Alarm table cannot be created in the ManagementDB. {e}") raise Exception - def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id): + def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,slice_id,connection_id): try: c = self.client.cursor() - c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id)) + c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ? AND slice_id is ? AND connection_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id,slice_id,connection_id)) data=c.fetchone() if data is None: - c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id)) + c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,slice_id,connection_id) VALUES (?,?,?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,slice_id,connection_id)) self.client.commit() kpi_id = c.lastrowid LOGGER.debug(f"KPI {kpi_id} succesfully inserted in the ManagementDB") @@ -245,4 +248,41 @@ class ManagementDB(): LOGGER.debug(f"Alarms succesfully retrieved from the ManagementDB") return data except sqlite3.Error as e: - LOGGER.debug(f"Alarms cannot be retrieved from the ManagementDB: {e}") \ No newline at end of file + LOGGER.debug(f"Alarms cannot be retrieved from the ManagementDB: {e}") + + def check_monitoring_flag(self,kpi_id): + try: + c = self.client.cursor() + c.execute("SELECT monitor_flag FROM kpi WHERE kpi_id is ?",(kpi_id,)) + data=c.fetchone() + if data is None: + LOGGER.debug(f"KPI {kpi_id} does not exists") + return None + else: + if data[0] == 1: + return True + elif data[0] == 0: + return False + else: + LOGGER.debug(f"KPI {kpi_id} is wrong") + return None + except sqlite3.Error as e: + LOGGER.debug(f"KPI {kpi_id} cannot be checked from the ManagementDB: {e}") + + + def set_monitoring_flag(self,kpi_id,flag): + try: + c = self.client.cursor() + data = c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)).fetchone() + if data is None: + LOGGER.debug(f"KPI {kpi_id} does not exists") + return None + else: + if flag : + value = 1 + else: + value = 0 + c.execute("UPDATE kpi SET monitor_flag = ? WHERE kpi_id is ?",(value,kpi_id)) + return True + except sqlite3.Error as e: + LOGGER.debug(f"KPI {kpi_id} cannot be checked from the ManagementDB: {e}") \ No newline at end of file diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index 16e6373f542656b4c172c8d619bf3f17ca5df404..1d3888d5348bdbe2995f077310ca448827290382 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -87,6 +87,8 @@ class MetricsDB(): 'device_id SYMBOL,' \ 'endpoint_id SYMBOL,' \ 'service_id SYMBOL,' \ + 'slice_id SYMBOL,' \ + 'connection_id SYMBOL,' \ 'timestamp TIMESTAMP,' \ 'kpi_value DOUBLE)' \ 'TIMESTAMP(timestamp);' @@ -97,7 +99,7 @@ class MetricsDB(): LOGGER.debug(f"Table {self.table} cannot be created. {e}") raise Exception - def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, kpi_value): + def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, slice_id, connection_id, kpi_value): counter = 0 while (counter < self.retries): try: @@ -109,7 +111,9 @@ class MetricsDB(): 'kpi_sample_type': kpi_sample_type, 'device_id': device_id, 'endpoint_id': endpoint_id, - 'service_id': service_id}, + 'service_id': service_id, + 'slice_id': slice_id, + 'connection_id': connection_id,}, columns={ 'kpi_value': kpi_value}, at=datetime.datetime.fromtimestamp(time)) @@ -170,11 +174,54 @@ class MetricsDB(): if connection: connection.close() + def get_raw_kpi_list(self, kpi_id, monitoring_window_s, last_n_samples, start_timestamp, end_timestamp): + try: + query_root = f"SELECT timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' " + query = query_root + start_date = float() + end_date = float() + if last_n_samples: + query = query + f"ORDER BY timestamp DESC limit {last_n_samples}" + elif monitoring_window_s or start_timestamp or end_timestamp: + if start_timestamp and end_timestamp: + start_date = start_timestamp + end_date = end_timestamp + elif monitoring_window_s: + if start_timestamp and not end_timestamp: + start_date = start_timestamp + end_date = start_date + monitoring_window_s + elif end_timestamp and not start_timestamp: + end_date = end_timestamp + start_date = end_date - monitoring_window_s + elif not start_timestamp and not end_timestamp: + end_date = timestamp_utcnow_to_float() + start_date = end_date - monitoring_window_s + query = query + f"AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + else: + LOGGER.debug(f"Wrong parameters settings") + + LOGGER.debug(query) + + if self.postgre: + kpi_list = self.run_query_postgre(query) + LOGGER.debug(f"kpi_list postgre: {kpi_list}") + else: + kpi_list = self.run_query(query) + LOGGER.debug(f"kpi_list influx: {kpi_list}") + if kpi_list: + LOGGER.debug(f"New data received for subscription to KPI {kpi_id}") + return kpi_list + else: + LOGGER.debug(f"No new data for the subscription to KPI {kpi_id}") + except (Exception) as e: + LOGGER.debug(f"Subscription data cannot be retrieved. {e}") + def get_subscription_data(self,subs_queue, kpi_id, sampling_interval_s=1): try: end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000 start_date = end_date - sampling_interval_s query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + LOGGER.debug(query) if self.postgre: kpi_list = self.run_query_postgre(query) LOGGER.debug(f"kpi_list postgre: {kpi_list}") @@ -201,6 +248,8 @@ class MetricsDB(): kpi_list = self.run_query(query) if kpi_list: LOGGER.debug(f"New data received for alarm of KPI {kpi_id}") + LOGGER.info(kpi_list) + valid_kpi_list = [] for kpi in kpi_list: alarm = False kpi_value = kpi[2] @@ -263,10 +312,10 @@ class MetricsDB(): if (kpi_value >= kpiMaxValue): alarm = True if alarm: - # queue.append[kpi] - alarm_queue.put_nowait(kpi) - LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") - else: - LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") + valid_kpi_list.append(kpi) + alarm_queue.put_nowait(valid_kpi_list) + LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") + else: + LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") except (Exception) as e: LOGGER.debug(f"Alarm data cannot be retrieved. {e}") \ No newline at end of file diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 7cd47f187986a0c32eea2ac8405183ac4418d100..6e927476bd4c3e9f61068efef06f77568ecc0961 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -26,9 +26,9 @@ from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \ KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \ - MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse + MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse, RawKpiTable, RawKpi, RawKpiList from common.rpc_method_wrapper.ServiceExceptions import ServiceException -from common.tools.timestamp.Converters import timestamp_string_to_float +from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float from monitoring.service import ManagementDBTools, MetricsDBTools from device.client.DeviceClient import DeviceClient @@ -85,13 +85,16 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpi_device_id = request.device_id.device_uuid.uuid kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid kpi_service_id = request.service_id.service_uuid.uuid + kpi_slice_id = request.slice_id.slice_uuid.uuid + kpi_connection_id = request.connection_id.connection_uuid.uuid - if request.kpi_id.kpi_id.uuid is not "": + + if request.kpi_id.kpi_id.uuid != "": response.kpi_id.uuid = request.kpi_id.kpi_id.uuid # Here the code to modify an existing kpi else: data = self.management_db.insert_KPI( - kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_slice_id, kpi_connection_id) response.kpi_id.uuid = str(data) return response @@ -131,11 +134,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): if kpi_db is None: LOGGER.info('GetKpiDescriptor error: KpiID({:s}): not found in database'.format(str(kpi_id))) else: - kpiDescriptor.kpi_description = kpi_db[1] - kpiDescriptor.kpi_sample_type = kpi_db[2] - kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) - kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) - kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) + kpiDescriptor.kpi_description = kpi_db[1] + kpiDescriptor.kpi_sample_type = kpi_db[2] + kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) + kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) + kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) + kpiDescriptor.slice_id.slice_uuid.uuid = str(kpi_db[6]) + kpiDescriptor.connection_id.connection_uuid.uuid = str(kpi_db[7]) return kpiDescriptor except ServiceException as e: LOGGER.exception('GetKpiDescriptor exception') @@ -154,12 +159,14 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): for item in data: kpi_descriptor = KpiDescriptor() - kpi_descriptor.kpi_id.kpi_id.uuid = str(item[0]) - kpi_descriptor.kpi_description = item[1] - kpi_descriptor.kpi_sample_type = item[2] - kpi_descriptor.device_id.device_uuid.uuid = str(item[3]) - kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4]) - kpi_descriptor.service_id.service_uuid.uuid = str(item[5]) + kpi_descriptor.kpi_id.kpi_id.uuid = str(item[0]) + kpi_descriptor.kpi_description = item[1] + kpi_descriptor.kpi_sample_type = item[2] + kpi_descriptor.device_id.device_uuid.uuid = str(item[3]) + kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4]) + kpi_descriptor.service_id.service_uuid.uuid = str(item[5]) + kpi_descriptor.slice_id.slice_uuid.uuid = str(item[6]) + kpi_descriptor.connection_id.connection_uuid.uuid = str(item[7]) kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor) @@ -186,11 +193,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): deviceId = kpiDescriptor.device_id.device_uuid.uuid endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid serviceId = kpiDescriptor.service_id.service_uuid.uuid + sliceId = kpiDescriptor.slice_id.slice_uuid.uuid + connectionId = kpiDescriptor.connection_id.connection_uuid.uuid time_stamp = request.timestamp.timestamp kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) # Build the structure to be included as point in the MetricsDB - self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, kpi_value) + self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, kpi_value) return Empty() except ServiceException as e: @@ -220,8 +229,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): monitor_device_request.sampling_duration_s = request.monitoring_window_s monitor_device_request.sampling_interval_s = request.sampling_rate_s - device_client = DeviceClient() - device_client.MonitorDeviceKpi(monitor_device_request) + if not self.management_db.check_monitoring_flag(kpi_id): + device_client = DeviceClient() + device_client.MonitorDeviceKpi(monitor_device_request) + self.management_db.set_monitoring_flag(kpi_id,True) + self.management_db.check_monitoring_flag(kpi_id) + else: + LOGGER.warning('MonitorKpi warning: KpiID({:s}) is currently being monitored'.format(str(kpi_id))) else: LOGGER.info('MonitorKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) return response @@ -234,12 +248,48 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # CREATEKPI_COUNTER_FAILED.inc() - def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> KpiList: + def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> RawKpiTable: LOGGER.info('QueryKpiData') try: - # TBC - return KpiList() + raw_kpi_table = RawKpiTable() + + LOGGER.debug(str(request)) + + kpi_id_list = request.kpi_ids + monitoring_window_s = request.monitoring_window_s + last_n_samples = request.last_n_samples + start_timestamp = request.start_timestamp.timestamp + end_timestamp = request.end_timestamp.timestamp + + # Check if all the Kpi_ids exist + for item in kpi_id_list: + kpi_id = item.kpi_id.uuid + + kpiDescriptor = self.GetKpiDescriptor(item, grpc_context) + if kpiDescriptor is None: + LOGGER.info('QueryKpiData error: KpiID({:s}): not found in database'.format(str(kpi_id))) + break + else: + # Execute query per Kpi_id and introduce their kpi_list in the table + kpi_list = self.metrics_db.get_raw_kpi_list(kpi_id,monitoring_window_s,last_n_samples,start_timestamp,end_timestamp) + raw_kpi_list = RawKpiList() + raw_kpi_list.kpi_id.kpi_id.uuid = kpi_id + + LOGGER.debug(str(kpi_list)) + + if kpi_list is None: + LOGGER.info('QueryKpiData error: KpiID({:s}): points not found in metrics database'.format(str(kpi_id))) + else: + for item in kpi_list: + raw_kpi = RawKpi() + raw_kpi.timestamp.timestamp = timestamp_string_to_float(item[0]) + raw_kpi.kpi_value.floatVal = item[1] + raw_kpi_list.raw_kpis.append(raw_kpi) + + raw_kpi_table.raw_kpi_lists.append(raw_kpi_list) + + return raw_kpi_table except ServiceException as e: LOGGER.exception('QueryKpiData exception') grpc_context.abort(e.code, e.details) @@ -250,9 +300,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('SubscribeKpi') try: - subs_queue = Queue() - subs_response = SubsResponse() kpi_id = request.kpi_id.kpi_id.uuid sampling_duration_s = request.sampling_duration_s @@ -268,18 +316,21 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): start_timestamp, end_timestamp) # parse queue to append kpis into the list - while not subs_queue.empty(): - list = subs_queue.get_nowait() - for item in list: - kpi = Kpi() - kpi.kpi_id.kpi_id.uuid = str(item[0]) - kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) - kpi.kpi_value.floatVal = item[2] # This must be improved - subs_response.kpi_list.kpi.append(kpi) - - subs_response.subs_id.subs_id.uuid = str(subs_id) - - yield subs_response + while True: + while not subs_queue.empty(): + subs_response = SubsResponse() + list = subs_queue.get_nowait() + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = str(item[0]) + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] # This must be improved + subs_response.kpi_list.kpi.append(kpi) + subs_response.subs_id.subs_id.uuid = str(subs_id) + yield subs_response + if timestamp_utcnow_to_float() > end_timestamp: + break + # yield subs_response except ServiceException as e: LOGGER.exception('SubscribeKpi exception') grpc_context.abort(e.code, e.details) @@ -373,7 +424,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.debug(f"request.AlarmID: {request.alarm_id.alarm_id.uuid}") - if request.alarm_id.alarm_id.uuid is not "": + if request.alarm_id.alarm_id.uuid != "": alarm_id = request.alarm_id.alarm_id.uuid # Here the code to modify an existing alarm else: @@ -424,6 +475,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('GetAlarmDescriptor') try: alarm_id = request.alarm_id.uuid + LOGGER.debug(alarm_id) alarm = self.management_db.get_alarm(alarm_id) response = AlarmDescriptor() @@ -454,15 +506,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('GetAlarmResponseStream') try: alarm_id = request.alarm_id.alarm_id.uuid - alarm = self.management_db.get_alarm(alarm_id) - alarm_response = AlarmResponse() - - if alarm: + alarm_data = self.management_db.get_alarm(alarm_id) + real_start_time = timestamp_utcnow_to_float() + if alarm_data: + LOGGER.debug(f"{alarm_data}") alarm_queue = Queue() - alarm_data = self.management_db.get_alarm(alarm) - alarm_id = request.alarm_id.alarm_id.uuid kpi_id = alarm_data[3] kpiMinValue = alarm_data[4] @@ -473,24 +523,30 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): subscription_frequency_ms = request.subscription_frequency_ms subscription_timeout_s = request.subscription_timeout_s + end_timestamp = real_start_time + subscription_timeout_s + self.alarm_manager.create_alarm(alarm_queue, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms, subscription_timeout_s) - while not alarm_queue.empty(): - list = alarm_queue.get_nowait() - for item in list: - kpi = Kpi() - kpi.kpi_id.kpi_id.uuid = str(item[0]) - kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) - kpi.kpi_value.floatVal = item[2] # This must be improved - alarm_response.kpi_list.kpi.append(kpi) - - alarm_response.alarm_id.alarm_id.uuid = alarm_id - - yield alarm_response + while True: + while not alarm_queue.empty(): + alarm_response = AlarmResponse() + list = alarm_queue.get_nowait() + size = len(list) + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = str(item[0]) + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] # This must be improved + alarm_response.kpi_list.kpi.append(kpi) + alarm_response.alarm_id.alarm_id.uuid = alarm_id + yield alarm_response + if timestamp_utcnow_to_float() > end_timestamp: + break else: LOGGER.info('GetAlarmResponseStream error: AlarmID({:s}): not found in database'.format(str(alarm_id))) + alarm_response = AlarmResponse() alarm_response.alarm_id.alarm_id.uuid = "NoID" return alarm_response except ServiceException as e: @@ -527,7 +583,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpi_db = self.management_db.get_KPI(int(kpi_id)) response = Kpi() if kpi_db is None: - LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) + LOGGER.info('GetStreamKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) response.kpi_id.kpi_id.uuid = "NoID" return response else: @@ -540,7 +596,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): try: kpi_id = request.kpi_id.uuid response = Kpi() - if kpi_id is "": + if kpi_id == "": LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) response.kpi_id.kpi_id.uuid = "NoID" else: @@ -548,10 +604,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): f"LATEST ON timestamp PARTITION BY kpi_id" data = self.metrics_db.run_query(query)[0] LOGGER.debug(data) - - response.kpi_id.kpi_id.uuid = str(data[0]) - response.timestamp.timestamp = timestamp_string_to_float(data[1]) - response.kpi_value.floatVal = data[2] # This must be improved + if len(data) == 0: + response.kpi_id.kpi_id.uuid = request.kpi_id.uuid + else: + data = data[0] + response.kpi_id.kpi_id.uuid = str(data[0]) + response.timestamp.timestamp = timestamp_string_to_float(data[1]) + response.kpi_value.floatVal = data[2] return response except ServiceException as e: diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index fe27d6ee365676b05175b762a106621121e3b897..3d1da36b7c5f66c28d3885a305660d6971f695b1 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -42,14 +42,12 @@ class SubscriptionManager(): if end_timestamp: end_date = datetime.utcfromtimestamp(end_timestamp).isoformat() - LOGGER.debug(f"kpi_id: {kpi_id}") - LOGGER.debug(f"sampling_interval_s: {sampling_interval_s}") - LOGGER.debug(f"subscription_id: {subscription_id}") - LOGGER.debug(f"start_date: {start_date}") - self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s), + job = self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s), trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, timezone=pytz.utc, id=str(subscription_id)) LOGGER.debug(f"Subscrition job {subscription_id} succesfully created") + #job.remove() def delete_subscription(self, subscription_id): - self.scheduler.remove_job(subscription_id) \ No newline at end of file + self.scheduler.remove_job(subscription_id) + LOGGER.debug(f"Subscription job {subscription_id} succesfully deleted") \ No newline at end of file diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index 845153856c44cec0576bd6f11b045e3310558a97..f15cb5ec2c1d14ed95731cd37e54cb714b29e8b7 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import datetime from random import random from common.proto import monitoring_pb2 @@ -23,13 +22,15 @@ def kpi_id(): _kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member return _kpi_id -def create_kpi_request(): - _create_kpi_request = monitoring_pb2.KpiDescriptor() - _create_kpi_request.kpi_description = 'KPI Description Test' - _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED - _create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member - _create_kpi_request.service_id.service_uuid.uuid = 'SERV1' # pylint: disable=maybe-no-member - _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member +def create_kpi_request(kpi_id_str): + _create_kpi_request = monitoring_pb2.KpiDescriptor() + _create_kpi_request.kpi_description = 'KPI Description Test' + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED + _create_kpi_request.device_id.device_uuid.uuid = 'DEV' + str(kpi_id_str) + _create_kpi_request.service_id.service_uuid.uuid = 'SERV' + str(kpi_id_str) + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC' + str(kpi_id_str) + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END' + str(kpi_id_str) + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON' + str(kpi_id_str) return _create_kpi_request def create_kpi_request_b(): @@ -38,7 +39,9 @@ def create_kpi_request_b(): _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC2' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member return _create_kpi_request def create_kpi_request_c(): @@ -47,7 +50,9 @@ def create_kpi_request_c(): _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON3' # pylint: disable=maybe-no-member return _create_kpi_request def monitor_kpi_request(kpi_uuid, monitoring_window_s, sampling_rate_s): @@ -69,20 +74,32 @@ def kpi_descriptor_list(): return _kpi_descriptor_list -def kpi_query(): +def kpi_query(kpi_id_list): _kpi_query = monitoring_pb2.KpiQuery() + _kpi_query.kpi_ids.extend(kpi_id_list) + # _kpi_query.monitoring_window_s = 10 + # _kpi_query.last_n_samples = 2 + _kpi_query.start_timestamp.timestamp = timestamp_utcnow_to_float() - 10 + _kpi_query.end_timestamp.timestamp = timestamp_utcnow_to_float() + return _kpi_query def subs_descriptor(kpi_id): _subs_descriptor = monitoring_pb2.SubsDescriptor() + sampling_duration_s = 10 + sampling_interval_s = 3 + real_start_time = timestamp_utcnow_to_float() + start_timestamp = real_start_time + end_timestamp = start_timestamp + sampling_duration_s + _subs_descriptor.subs_id.subs_id.uuid = "" _subs_descriptor.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid - _subs_descriptor.sampling_duration_s = 10 - _subs_descriptor.sampling_interval_s = 2 - _subs_descriptor.start_timestamp.timestamp = timestamp_utcnow_to_float() - _subs_descriptor.end_timestamp.timestamp = timestamp_utcnow_to_float() + 10 + _subs_descriptor.sampling_duration_s = sampling_duration_s + _subs_descriptor.sampling_interval_s = sampling_interval_s + _subs_descriptor.start_timestamp.timestamp = start_timestamp + _subs_descriptor.end_timestamp.timestamp = end_timestamp return _subs_descriptor @@ -91,14 +108,14 @@ def subs_id(): return _subs_id -def alarm_descriptor(): +def alarm_descriptor(kpi_id): _alarm_descriptor = monitoring_pb2.AlarmDescriptor() _alarm_descriptor.alarm_description = "Alarm Description" _alarm_descriptor.name = "Alarm Name" - _alarm_descriptor.kpi_id.kpi_id.uuid = "1" + _alarm_descriptor.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid _alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = 0.0 - _alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = 50.0 + _alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = 250.0 _alarm_descriptor.kpi_value_range.inRange = True _alarm_descriptor.kpi_value_range.includeMinValue = False _alarm_descriptor.kpi_value_range.includeMaxValue = True @@ -113,11 +130,16 @@ def alarm_descriptor_b(): return _alarm_descriptor def alarm_subscription(alarm_id): - _alarm_descriptor = monitoring_pb2.AlarmSubscription() + _alarm_subscription = monitoring_pb2.AlarmSubscription() - _alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm_id) + subscription_timeout_s = 10 + subscription_frequency_ms = 1000 - return _alarm_descriptor + _alarm_subscription.alarm_id.alarm_id.uuid = str(alarm_id.alarm_id.uuid) + _alarm_subscription.subscription_timeout_s = subscription_timeout_s + _alarm_subscription.subscription_frequency_ms = subscription_frequency_ms + + return _alarm_subscription def alarm_id(): diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index ee6a29e8a483fe53c58a6e6d2e3aa240f2456b81..b113f5a7822841e17274300dc7102664bce1c409 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -15,11 +15,14 @@ import copy, os, pytest import threading import time +from queue import Queue +from random import random from time import sleep from typing import Tuple from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.schedulers.base import STATE_STOPPED from grpc._channel import _MultiThreadedRendezvous from common.Constants import ServiceNameEnum @@ -33,7 +36,8 @@ from common.message_broker.MessageBroker import MessageBroker from common.proto import monitoring_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \ - AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse + AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse, RawKpiTable +from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_string_to_float from context.client.ContextClient import ContextClient from context.service.grpc_server.ContextService import ContextService @@ -43,6 +47,9 @@ from device.client.DeviceClient import DeviceClient from device.service.DeviceService import DeviceService from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache +from monitoring.service.AlarmManager import AlarmManager +from monitoring.service.MetricsDBTools import MetricsDB +from monitoring.service.SubscriptionManager import SubscriptionManager os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE' from device.service.drivers import DRIVERS # pylint: disable=wrong-import-position @@ -175,14 +182,23 @@ def subs_scheduler(): return _scheduler -def ingestion_data(monitoring_client): - _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - _include_kpi_request = include_kpi_request(_kpi_id) +def ingestion_data(kpi_id_int): + metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring") + + for i in range(50): + kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') + kpiId = kpi_id_int + deviceId = 'DEV'+ str(kpi_id_int) + endpointId = 'END' + str(kpi_id_int) + serviceId = 'SERV' + str(kpi_id_int) + sliceId = 'SLC' + str(kpi_id_int) + connectionId = 'CON' + str(kpi_id_int) + time_stamp = timestamp_utcnow_to_float() + kpi_value = 500*random() - for i in range(200): - _include_kpi_request = include_kpi_request(_kpi_id) - monitoring_client.IncludeKpi(_include_kpi_request) - time.sleep(0.01) + metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, + kpi_value) + sleep(0.1) ########################### # Tests Implementation @@ -192,18 +208,17 @@ def ingestion_data(monitoring_client): def test_set_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('test_create_kpi requesting') - response = monitoring_client.SetKpi(create_kpi_request()) - LOGGER.debug(str(response)) - response = monitoring_client.SetKpi(create_kpi_request_b()) - LOGGER.debug(str(response)) - assert isinstance(response, KpiId) + for i in range(3): + response = monitoring_client.SetKpi(create_kpi_request(str(i+1))) + LOGGER.debug(str(response)) + assert isinstance(response, KpiId) # Test case that makes use of client fixture to test server's DeleteKpi method def test_delete_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('delete_kpi requesting') - response = monitoring_client.SetKpi(create_kpi_request_b()) + response = monitoring_client.SetKpi(create_kpi_request('4')) response = monitoring_client.DeleteKpi(response) LOGGER.debug(str(response)) assert isinstance(response, Empty) @@ -211,7 +226,7 @@ def test_delete_kpi(monitoring_client): # pylint: disable=redefined-outer-name # Test case that makes use of client fixture to test server's GetKpiDescriptor method def test_get_kpidescritor(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getkpidescritor_kpi begin') - response = monitoring_client.SetKpi(create_kpi_request_c()) + response = monitoring_client.SetKpi(create_kpi_request('1')) response = monitoring_client.GetKpiDescriptor(response) LOGGER.debug(str(response)) assert isinstance(response, KpiDescriptor) @@ -227,7 +242,8 @@ def test_get_kpi_descriptor_list(monitoring_client): # pylint: disable=redefined def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('test_include_kpi requesting') - kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + kpi_id = monitoring_client.SetKpi(create_kpi_request('1')) + LOGGER.debug(str(kpi_id)) response = monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) LOGGER.debug(str(response)) assert isinstance(response, Empty) @@ -261,44 +277,40 @@ def test_monitor_kpi( response = device_client.AddDevice(Device(**device_with_connect_rules)) assert response.device_uuid.uuid == DEVICE_DEV1_UUID - response = monitoring_client.SetKpi(create_kpi_request()) + response = monitoring_client.SetKpi(create_kpi_request('1')) _monitor_kpi_request = monitor_kpi_request(response.kpi_id.uuid, 120, 5) # pylint: disable=maybe-no-member response = monitoring_client.MonitorKpi(_monitor_kpi_request) LOGGER.debug(str(response)) assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's QueryKpiData method -def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-name +def test_query_kpi_data(monitoring_client,subs_scheduler): # pylint: disable=redefined-outer-name + + kpi_id_list = [] + for i in range(2): + kpi_id = monitoring_client.SetKpi(create_kpi_request(str(i+1))) + subs_scheduler.add_job(ingestion_data, args=[kpi_id.kpi_id.uuid]) + kpi_id_list.append(kpi_id) LOGGER.warning('test_query_kpi_data') - response = monitoring_client.QueryKpiData(kpi_query()) + sleep(5) + response = monitoring_client.QueryKpiData(kpi_query(kpi_id_list)) LOGGER.debug(str(response)) - assert isinstance(response, KpiList) - -def test_ingestion_data(monitoring_client): - _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - _include_kpi_request = include_kpi_request(_kpi_id) - - for i in range(100): - _include_kpi_request = include_kpi_request(_kpi_id) - monitoring_client.IncludeKpi(_include_kpi_request) - time.sleep(0.01) - -# def test_subscription_scheduler(monitoring_client,metrics_db,subs_scheduler): -# subs_scheduler.add_job(ingestion_data(monitoring_client),id="1") + assert isinstance(response, RawKpiTable) + if (subs_scheduler.state != STATE_STOPPED): + subs_scheduler.shutdown() # Test case that makes use of client fixture to test server's SetKpiSubscription method -def test_set_kpi_subscription(monitoring_client,metrics_db): # pylint: disable=redefined-outer-name +def test_set_kpi_subscription(monitoring_client,subs_scheduler): # pylint: disable=redefined-outer-name LOGGER.warning('test_set_kpi_subscription') - kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - # thread = threading.Thread(target=test_ingestion_data, args=(monitoring_client,metrics_db)) - # thread.start() - monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) + kpi_id = monitoring_client.SetKpi(create_kpi_request('1')) + subs_scheduler.add_job(ingestion_data, args=[kpi_id.kpi_id.uuid]) response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) assert isinstance(response, _MultiThreadedRendezvous) - LOGGER.debug(response) for item in response: LOGGER.debug(item) assert isinstance(item, SubsResponse) + if (subs_scheduler.state != STATE_STOPPED): + subs_scheduler.shutdown() # Test case that makes use of client fixture to test server's GetSubsDescriptor method def test_get_subs_descriptor(monitoring_client): @@ -331,7 +343,8 @@ def test_delete_subscription(monitoring_client): # Test case that makes use of client fixture to test server's SetKpiAlarm method def test_set_kpi_alarm(monitoring_client): LOGGER.warning('test_set_kpi_alarm') - response = monitoring_client.SetKpiAlarm(alarm_descriptor()) + kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + response = monitoring_client.SetKpiAlarm(alarm_descriptor(kpi_id)) LOGGER.debug(str(response)) assert isinstance(response, AlarmID) @@ -345,28 +358,35 @@ def test_get_alarms(monitoring_client): # Test case that makes use of client fixture to test server's GetAlarmDescriptor method def test_get_alarm_descriptor(monitoring_client): LOGGER.warning('test_get_alarm_descriptor') - alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) - response = monitoring_client.GetAlarmDescriptor(alarm_id) - LOGGER.debug(response) - assert isinstance(response, AlarmDescriptor) + _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + _response = monitoring_client.GetAlarmDescriptor(_alarm_id) + LOGGER.debug(_response) + assert isinstance(_response, AlarmDescriptor) # Test case that makes use of client fixture to test server's GetAlarmResponseStream method -def test_get_alarm_response_stream(monitoring_client): +def test_get_alarm_response_stream(monitoring_client,subs_scheduler): LOGGER.warning('test_get_alarm_descriptor') - alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) - response = monitoring_client.GetAlarmResponseStream(alarm_subscription(alarm_id)) - assert isinstance(response, _MultiThreadedRendezvous) - for item in response: - LOGGER.debug(response) + _kpi_id = monitoring_client.SetKpi(create_kpi_request('3')) + _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + subs_scheduler.add_job(ingestion_data,args=[_kpi_id.kpi_id.uuid]) + _response = monitoring_client.GetAlarmResponseStream(alarm_subscription(_alarm_id)) + assert isinstance(_response, _MultiThreadedRendezvous) + for item in _response: + LOGGER.debug(item) assert isinstance(item,AlarmResponse) + if(subs_scheduler.state != STATE_STOPPED): + subs_scheduler.shutdown() + # Test case that makes use of client fixture to test server's DeleteAlarm method def test_delete_alarm(monitoring_client): LOGGER.warning('test_delete_alarm') - alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) - response = monitoring_client.DeleteAlarm(alarm_id) - LOGGER.debug(type(response)) - assert isinstance(response, Empty) + _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + _response = monitoring_client.DeleteAlarm(_alarm_id) + LOGGER.debug(type(_response)) + assert isinstance(_response, Empty) # Test case that makes use of client fixture to test server's GetStreamKpi method def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name @@ -384,64 +404,117 @@ def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-na # response = monitoring_client.GetInstantKpi(kpi_id) # LOGGER.debug(response) # assert isinstance(response, Kpi) - # response = monitoring_client.GetInstantKpi(KpiId()) - # LOGGER.debug(type(response)) - # assert response.kpi_id.kpi_id.uuid == "NoID" -def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_managementdb_tools_insert_kpi begin') - _create_kpi_request = create_kpi_request() - kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member - kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member - kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member - kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member - kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member - - response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) - assert isinstance(response, int) -def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_managementdb_tools_get_kpi begin') - _create_kpi_request = create_kpi_request() +def test_managementdb_tools_kpis(management_db): # pylint: disable=redefined-outer-name + LOGGER.warning('test_managementdb_tools_kpis begin') + _create_kpi_request = create_kpi_request('5') kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid + kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid + + _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_slice_id,kpi_connection_id) + assert isinstance(_kpi_id, int) - _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) response = management_db.get_KPI(_kpi_id) assert isinstance(response, tuple) -def test_managementdb_tools_get_kpis(management_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_managementdb_tools_get_kpis begin') + response = management_db.set_monitoring_flag(_kpi_id,True) + assert response is True + response = management_db.check_monitoring_flag(_kpi_id) + assert response is True + management_db.set_monitoring_flag(_kpi_id, False) + response = management_db.check_monitoring_flag(_kpi_id) + assert response is False + response = management_db.get_KPIS() assert isinstance(response, list) -def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_managementdb_tools_get_kpi begin') - - _create_kpi_request = create_kpi_request() - kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member - kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member - kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member - kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member - kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member - - _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, - kpi_service_id) - response = management_db.delete_KPI(_kpi_id) - assert response -def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_metric_sdb_tools_write_kpi begin') - -def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_metrics_db_tools_read_kpi_points begin') +def test_managementdb_tools_insert_alarm(management_db): + LOGGER.warning('test_managementdb_tools_insert_alarm begin') + _alarm_description = "Alarm Description" + _alarm_name = "Alarm Name" + _kpi_id = "3" + _kpi_min_value = 0.0 + _kpi_max_value = 250.0 + _in_range = True + _include_min_value = False + _include_max_value = True + _alarm_id = management_db.insert_alarm(_alarm_description, _alarm_name, _kpi_id, _kpi_min_value, + _kpi_max_value, + _in_range, _include_min_value, _include_max_value) + LOGGER.debug(_alarm_id) + assert isinstance(_alarm_id,int) +# +# def test_metrics_db_tools(metrics_db): # pylint: disable=redefined-outer-name +# LOGGER.warning('test_metric_sdb_tools_write_kpi begin') +# _kpiId = "6" +# +# for i in range(50): +# _kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') +# _deviceId = 'DEV4' +# _endpointId = 'END4' +# _serviceId = 'SERV4' +# _sliceId = 'SLC4' +# _connectionId = 'CON4' +# _time_stamp = timestamp_utcnow_to_float() +# _kpi_value = 500*random() +# +# metrics_db.write_KPI(_time_stamp, _kpiId, _kpiSampleType, _deviceId, _endpointId, _serviceId, _sliceId, _connectionId, +# _kpi_value) +# sleep(0.05) +# +# _query = f"SELECT * FROM monitoring WHERE kpi_id ='{_kpiId}'" +# _data = metrics_db.run_query(_query) +# assert len(_data) >= 50 +# +# def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler): +# LOGGER.warning('test_subscription_manager_create_subscription begin') +# subs_queue = Queue() +# +# subs_manager = SubscriptionManager(metrics_db) +# +# subs_scheduler.add_job(ingestion_data) +# +# kpi_id = "3" +# sampling_duration_s = 20 +# sampling_interval_s = 3 +# real_start_time = timestamp_utcnow_to_float() +# start_timestamp = real_start_time +# end_timestamp = start_timestamp + sampling_duration_s +# +# subs_id = management_db.insert_subscription(kpi_id, "localhost", sampling_duration_s, +# sampling_interval_s,start_timestamp,end_timestamp) +# subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s, +# sampling_duration_s,start_timestamp,end_timestamp) +# +# # This is here to simulate application activity (which keeps the main thread alive). +# total_points = 0 +# while True: +# while not subs_queue.empty(): +# list = subs_queue.get_nowait() +# kpi_list = KpiList() +# for item in list: +# kpi = Kpi() +# kpi.kpi_id.kpi_id.uuid = item[0] +# kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) +# kpi.kpi_value.floatVal = item[2] +# kpi_list.kpi.append(kpi) +# total_points += 1 +# LOGGER.debug(kpi_list) +# if timestamp_utcnow_to_float() > end_timestamp: +# break +# +# assert total_points != 0 def test_events_tools( context_client : ContextClient, # pylint: disable=redefined-outer-name