diff --git a/hackfest/p4/setup.sh b/hackfest/p4/setup.sh index 07fe22e6aea2341c50462010b4bfb55c4a657a47..195327a03fedafdc64a2d0dc34577766eda72a4f 100755 --- a/hackfest/p4/setup.sh +++ b/hackfest/p4/setup.sh @@ -4,5 +4,5 @@ export POD_NAME=$(kubectl get pods -n=tfs | grep device | awk '{print $1}') kubectl exec ${POD_NAME} -n=tfs -- mkdir /root/p4 -kubectl cp src/tests/netx22-p4/p4/p4info.txt tfs/${POD_NAME}:/root/p4 -kubectl cp src/tests/netx22-p4/p4/bmv2.json tfs/${POD_NAME}:/root/p4 +kubectl cp hackfest/p4/p4/p4info.txt tfs/${POD_NAME}:/root/p4 +kubectl cp hackfest/p4/p4/bmv2.json tfs/${POD_NAME}:/root/p4 diff --git a/hackfest/p4/tests/Objects.py b/hackfest/p4/tests/Objects.py index 09b3aced843a198b7c963a34492a4fe2379c9123..c8b172244d714cd699ccc587e54c3751485a9a2e 100644 --- a/hackfest/p4/tests/Objects.py +++ b/hackfest/p4/tests/Objects.py @@ -1,4 +1,5 @@ # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -42,6 +43,8 @@ PACKET_PORT_SAMPLE_TYPES = [ KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED, ] +# ----- Device Credentials and Settings -------------------------------------------------------------------------------- + # ----- Devices -------------------------------------------------------------------------------------------------------- @@ -54,7 +57,7 @@ DEVICE_SW1 = json_device_p4_disabled(DEVICE_SW1_UUID) DEVICE_SW1_DPID = 1 DEVICE_SW1_NAME = DEVICE_SW1_UUID -DEVICE_SW1_IP_ADDR = '10.0.2.10' +DEVICE_SW1_IP_ADDR = 'localhost' DEVICE_SW1_PORT = '50001' DEVICE_SW1_VENDOR = 'Open Networking Foundation' DEVICE_SW1_HW_VER = 'BMv2 simple_switch' @@ -78,9 +81,38 @@ DEVICE_SW1_CONNECT_RULES = json_device_connect_rules( } ) +DEVICE_SW2_UUID = 'SW2' +DEVICE_SW2_TIMEOUT = 60 +DEVICE_SW2_ID = json_device_id(DEVICE_SW2_UUID) +DEVICE_SW2 = json_device_p4_disabled(DEVICE_SW2_UUID) -################################## TABLE ENTRIES ################################## +DEVICE_SW2_DPID = 1 +DEVICE_SW2_NAME = DEVICE_SW2_UUID +DEVICE_SW2_IP_ADDR = 'localhost' +DEVICE_SW2_PORT = '50002' +DEVICE_SW2_VENDOR = 'Open Networking Foundation' +DEVICE_SW2_HW_VER = 'BMv2 simple_switch' +DEVICE_SW2_SW_VER = 'Stratum' +DEVICE_SW2_BIN_PATH = '/root/p4/bmv2.json' +DEVICE_SW2_INFO_PATH = '/root/p4/p4info.txt' + +DEVICE_SW2_CONNECT_RULES = json_device_connect_rules( + DEVICE_SW2_IP_ADDR, + DEVICE_SW2_PORT, + { + 'id': DEVICE_SW2_DPID, + 'name': DEVICE_SW2_NAME, + 'vendor': DEVICE_SW2_VENDOR, + 'hw_ver': DEVICE_SW2_HW_VER, + 'sw_ver': DEVICE_SW2_SW_VER, + 'timeout': DEVICE_SW2_TIMEOUT, + 'p4bin': DEVICE_SW2_BIN_PATH, + 'p4info': DEVICE_SW2_INFO_PATH + } +) + +################################## TABLE ENTRIES ################################## DEVICE_SW1_CONFIG_TABLE_ENTRIES = [ json_config_rule_set( @@ -123,6 +155,8 @@ DEVICE_SW1_CONFIG_TABLE_ENTRIES = [ ) ] +DEVICE_SW2_CONFIG_TABLE_ENTRIES = DEVICE_SW1_CONFIG_TABLE_ENTRIES + """ DEVICE_SW1_CONFIG_TABLE_ENTRIES = [ @@ -171,7 +205,6 @@ DEVICE_SW1_CONFIG_TABLE_ENTRIES = [ ################################## TABLE DECONF ################################## - DEVICE_SW1_DECONF_TABLE_ENTRIES = [ json_config_rule_delete( 'table', @@ -213,6 +246,7 @@ DEVICE_SW1_DECONF_TABLE_ENTRIES = [ ) ] +DEVICE_SW2_DECONF_TABLE_ENTRIES = DEVICE_SW1_DECONF_TABLE_ENTRIES """ @@ -271,6 +305,7 @@ TOPOLOGIES = [TOPOLOGY] DEVICES = [ (DEVICE_SW1, DEVICE_SW1_CONNECT_RULES, DEVICE_SW1_CONFIG_TABLE_ENTRIES, DEVICE_SW1_DECONF_TABLE_ENTRIES), + (DEVICE_SW2, DEVICE_SW2_CONNECT_RULES, DEVICE_SW2_CONFIG_TABLE_ENTRIES, DEVICE_SW2_DECONF_TABLE_ENTRIES), ] LINKS = [] diff --git a/hackfest/p4/tests/test_functional_cleanup.py b/hackfest/p4/tests/test_functional_cleanup.py index 32f716f1c2287b11bae3610022d64659d82ba73d..ccbcb9843a03bbf095743af0753da3fe8af3bfce 100644 --- a/hackfest/p4/tests/test_functional_cleanup.py +++ b/hackfest/p4/tests/test_functional_cleanup.py @@ -54,8 +54,8 @@ def test_scenario_cleanup( device_client.DeleteDevice(DeviceId(**device_id)) #expected_events.append(('DeviceEvent', EVENT_REMOVE, json_device_id(device_uuid))) - response = context_client.ListDevices(Empty()) - assert len(response.devices) == 0 + response = context_client.ListDevices(Empty()) + assert len(response.devices) == 0 # ----- Delete Topologies and Validate Collected Events ------------------------------------------------------------ for topology in TOPOLOGIES: diff --git a/proto/kpi_sample_types.proto b/proto/kpi_sample_types.proto index 7445a0f25a57df9793bd8761da024581988cf9e6..4419a8df4a22047d8708c5cf2e2c3657148b5eeb 100644 --- a/proto/kpi_sample_types.proto +++ b/proto/kpi_sample_types.proto @@ -16,9 +16,19 @@ syntax = "proto3"; package kpi_sample_types; enum KpiSampleType { - KPISAMPLETYPE_UNKNOWN = 0; - KPISAMPLETYPE_PACKETS_TRANSMITTED = 101; - KPISAMPLETYPE_PACKETS_RECEIVED = 102; - KPISAMPLETYPE_BYTES_TRANSMITTED = 201; - KPISAMPLETYPE_BYTES_RECEIVED = 202; + KPISAMPLETYPE_UNKNOWN = 0; + KPISAMPLETYPE_PACKETS_TRANSMITTED = 101; + KPISAMPLETYPE_PACKETS_RECEIVED = 102; + KPISAMPLETYPE_PACKETS_DROPPED = 103; + KPISAMPLETYPE_BYTES_TRANSMITTED = 201; + KPISAMPLETYPE_BYTES_RECEIVED = 202; + KPISAMPLETYPE_BYTES_DROPPED = 203; + KPISAMPLETYPE_ML_CONFIDENCE = 401; //. can be used by both optical and L3 without any issue + KPISAMPLETYPE_OPTICAL_SECURITY_STATUS = 501; //. can be used by both optical and L3 without any issue + KPISAMPLETYPE_L3_UNIQUE_ATTACK_CONNS = 601; + KPISAMPLETYPE_L3_TOTAL_DROPPED_PACKTS = 602; + KPISAMPLETYPE_L3_UNIQUE_ATTACKERS = 603; + KPISAMPLETYPE_L3_UNIQUE_COMPROMISED_CLIENTS = 604; + KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO = 605; + KPISAMPLETYPE_SERVICE_LATENCY_MS = 701; } diff --git a/proto/monitoring.proto b/proto/monitoring.proto index 9be39db909d915b2a9b5d99b01841db028959543..f9c408c96ced121f35cc1116bf64d013e7320e6a 100644 --- a/proto/monitoring.proto +++ b/proto/monitoring.proto @@ -25,7 +25,7 @@ service MonitoringService { rpc GetKpiDescriptorList (context.Empty ) returns (KpiDescriptorList ) {} // Stable and final rpc IncludeKpi (Kpi ) returns (context.Empty ) {} // Stable and final rpc MonitorKpi (MonitorKpiRequest ) returns (context.Empty ) {} // Stable and final - rpc QueryKpiData (KpiQuery ) returns (KpiList ) {} // Not implemented + rpc QueryKpiData (KpiQuery ) returns (RawKpiTable ) {} // Not implemented rpc SetKpiSubscription (SubsDescriptor ) returns (stream SubsResponse ) {} // Stable not final rpc GetSubsDescriptor (SubscriptionID ) returns (SubsDescriptor ) {} // Stable and final rpc GetSubscriptions (context.Empty ) returns (SubsList ) {} // Stable and final @@ -36,7 +36,7 @@ service MonitoringService { rpc GetAlarmResponseStream(AlarmSubscription ) returns (stream AlarmResponse) {} // Not Stable not final rpc DeleteAlarm (AlarmID ) returns (context.Empty ) {} // Stable and final rpc GetStreamKpi (KpiId ) returns (stream Kpi ) {} // Stable not final - rpc GetInstantKpi (KpiId ) returns (Kpi ) {} // Stable not final + rpc GetInstantKpi (KpiId ) returns (Kpi ) {} // Stable not final } message KpiDescriptor { @@ -48,6 +48,7 @@ message KpiDescriptor { context.EndPointId endpoint_id = 6; context.ServiceId service_id = 7; context.SliceId slice_id = 8; + context.ConnectionId connection_id = 9; } message MonitorKpiRequest { @@ -58,13 +59,26 @@ message MonitorKpiRequest { } message KpiQuery { - KpiId kpi_id = 1; + repeated KpiId kpi_ids = 1; float monitoring_window_s = 2; - float sampling_rate_s = 3; - uint32 last_n_samples = 4; // used when you want something like "get the last N many samples - context.Timestamp start_timestamp = 5; // used when you want something like "get the samples since X date/time" - context.Timestamp end_timestamp = 6; // used when you want something like "get the samples until X date/time" - // Pending add field to reflect Available Device Protocols + uint32 last_n_samples = 3; // used when you want something like "get the last N many samples + context.Timestamp start_timestamp = 4; // used when you want something like "get the samples since X date/time" + context.Timestamp end_timestamp = 5; // used when you want something like "get the samples until X date/time" +} + + +message RawKpi { // cell + context.Timestamp timestamp = 1; + KpiValue kpi_value = 2; +} + +message RawKpiList { // column + KpiId kpi_id = 1; + repeated RawKpi raw_kpis = 2; +} + +message RawKpiTable { // table + repeated RawKpiList raw_kpi_lists = 1; } message KpiId { diff --git a/src/device/service/drivers/p4/p4_driver.py b/src/device/service/drivers/p4/p4_driver.py index 069c07ce40e43192b74519b2175e7e10c638cd20..b8ff795fbd9466874b07f1f752fce682ea741111 100644 --- a/src/device/service/drivers/p4/p4_driver.py +++ b/src/device/service/drivers/p4/p4_driver.py @@ -28,7 +28,7 @@ from .p4_common import matches_ipv4, matches_ipv6, valid_port,\ P4_ATTR_DEV_P4BIN, P4_ATTR_DEV_P4INFO, P4_ATTR_DEV_TIMEOUT,\ P4_VAL_DEF_VENDOR, P4_VAL_DEF_HW_VER, P4_VAL_DEF_SW_VER,\ P4_VAL_DEF_TIMEOUT -from .p4_manager import P4Manager, get_api_version, KEY_TABLE,\ +from .p4_manager import P4Manager, KEY_TABLE,\ KEY_ACTION_PROFILE, KEY_COUNTER, KEY_DIR_COUNTER, KEY_METER, KEY_DIR_METER,\ KEY_CTL_PKT_METADATA from .p4_client import WriteOperation @@ -127,8 +127,7 @@ class P4Driver(_Driver): except Exception as ex: # pylint: disable=broad-except raise Exception(ex) from ex - LOGGER.info("\tConnected via P4Runtime version %s", - get_api_version()) + LOGGER.info("\tConnected via P4Runtime") self.__started.set() return True diff --git a/src/device/service/drivers/p4/p4_manager.py b/src/device/service/drivers/p4/p4_manager.py index 65f8602ea30fa2d8cd06b09655ee4ee63d045a97..178487250ea3a5652690fb39f1631a0133aec4e3 100644 --- a/src/device/service/drivers/p4/p4_manager.py +++ b/src/device/service/drivers/p4/p4_manager.py @@ -55,7 +55,7 @@ LOGGER = logging.getLogger(__name__) CONTEXT = Context() # Global P4Runtime client -CLIENT = None +CLIENTS = {} # Constant P4 entities KEY_TABLE = "table" @@ -76,25 +76,6 @@ def get_context(): """ return CONTEXT - -def get_client(): - """ - Return P4 client. - - :return: P4Runtime client object - """ - return CLIENT - - -def get_api_version(): - """ - Get the supported P4Runtime API version. - - :return: API version - """ - return CLIENT.api_version() - - def get_table_type(table): """ Assess the type of P4 table based upon the matching scheme. @@ -136,171 +117,28 @@ def match_type_to_str(match_type): return None -def insert_table_entry_exact( - table_name, match_map, action_name, action_params, metadata, - cnt_pkt=-1, cnt_byte=-1): - """ - Insert an entry into an exact match table. - - :param table_name: P4 table name - :param match_map: Map of match operations - :param action_name: Action name - :param action_params: Map of action parameters - :param metadata: table metadata - :param cnt_pkt: packet count - :param cnt_byte: byte count - :return: inserted entry - """ - assert match_map, "Table entry without match operations is not accepted" - assert action_name, "Table entry without action is not accepted" - - table_entry = TableEntry(table_name)(action=action_name) - - for match_k, match_v in match_map.items(): - table_entry.match[match_k] = match_v - - for action_k, action_v in action_params.items(): - table_entry.action[action_k] = action_v - - if metadata: - table_entry.metadata = metadata - - if cnt_pkt > 0: - table_entry.counter_data.packet_count = cnt_pkt - - if cnt_byte > 0: - table_entry.counter_data.byte_count = cnt_byte - - ex_msg = "" - try: - table_entry.insert() - LOGGER.info("Inserted exact table entry: %s", table_entry) - except (P4RuntimeException, P4RuntimeWriteException) as ex: - raise P4RuntimeException from ex - - # Table entry exists, needs to be modified - if "ALREADY_EXISTS" in ex_msg: - table_entry.modify() - LOGGER.info("Updated exact table entry: %s", table_entry) - - return table_entry - - -def insert_table_entry_ternary( - table_name, match_map, action_name, action_params, metadata, - priority, cnt_pkt=-1, cnt_byte=-1): - """ - Insert an entry into a ternary match table. - - :param table_name: P4 table name - :param match_map: Map of match operations - :param action_name: Action name - :param action_params: Map of action parameters - :param metadata: table metadata - :param priority: entry priority - :param cnt_pkt: packet count - :param cnt_byte: byte count - :return: inserted entry - """ - assert match_map, "Table entry without match operations is not accepted" - assert action_name, "Table entry without action is not accepted" - - table_entry = TableEntry(table_name)(action=action_name) - - for match_k, match_v in match_map.items(): - table_entry.match[match_k] = match_v - - for action_k, action_v in action_params.items(): - table_entry.action[action_k] = action_v - - table_entry.priority = priority - - if metadata: - table_entry.metadata = metadata - - if cnt_pkt > 0: - table_entry.counter_data.packet_count = cnt_pkt - - if cnt_byte > 0: - table_entry.counter_data.byte_count = cnt_byte - - ex_msg = "" - try: - table_entry.insert() - LOGGER.info("Inserted ternary table entry: %s", table_entry) - except (P4RuntimeException, P4RuntimeWriteException) as ex: - raise P4RuntimeException from ex - - # Table entry exists, needs to be modified - if "ALREADY_EXISTS" in ex_msg: - table_entry.modify() - LOGGER.info("Updated ternary table entry: %s", table_entry) - - return table_entry - - -def insert_table_entry_range( - table_name, match_map, action_name, action_params, metadata, - priority, cnt_pkt=-1, cnt_byte=-1): # pylint: disable=unused-argument - """ - Insert an entry into a range match table. - - :param table_name: P4 table name - :param match_map: Map of match operations - :param action_name: Action name - :param action_params: Map of action parameters - :param metadata: table metadata - :param priority: entry priority - :param cnt_pkt: packet count - :param cnt_byte: byte count - :return: inserted entry - """ - assert match_map, "Table entry without match operations is not accepted" - assert action_name, "Table entry without action is not accepted" - - raise NotImplementedError( - "Range-based table insertion not implemented yet") - - -def insert_table_entry_optional( - table_name, match_map, action_name, action_params, metadata, - priority, cnt_pkt=-1, cnt_byte=-1): # pylint: disable=unused-argument - """ - Insert an entry into an optional match table. - - :param table_name: P4 table name - :param match_map: Map of match operations - :param action_name: Action name - :param action_params: Map of action parameters - :param metadata: table metadata - :param priority: entry priority - :param cnt_pkt: packet count - :param cnt_byte: byte count - :return: inserted entry - """ - assert match_map, "Table entry without match operations is not accepted" - assert action_name, "Table entry without action is not accepted" - - raise NotImplementedError( - "Optional-based table insertion not implemented yet") - class P4Manager: """ Class to manage the runtime entries of a P4 pipeline. """ + local_client = None + key_id = None def __init__(self, device_id: int, ip_address: str, port: int, election_id: tuple, role_name=None, ssl_options=None): - global CLIENT + global CLIENTS self.__id = device_id self.__ip_address = ip_address self.__port = int(port) self.__endpoint = f"{self.__ip_address}:{self.__port}" - CLIENT = P4RuntimeClient( + self.key_id = ip_address+str(port) + CLIENTS[self.key_id] = P4RuntimeClient( self.__id, self.__endpoint, election_id, role_name, ssl_options) self.__p4info = None + + self.local_client = CLIENTS[self.key_id] # Internal memory for whitebox management # | -> P4 entities @@ -339,27 +177,27 @@ class P4Manager: # Forwarding pipeline is only set iff both files are present if p4bin_path and p4info_path: try: - CLIENT.set_fwd_pipe_config(p4info_path, p4bin_path) + self.local_client.set_fwd_pipe_config(p4info_path, p4bin_path) except FileNotFoundError as ex: LOGGER.critical(ex) - CLIENT.tear_down() + self.local_client.tear_down() raise FileNotFoundError(ex) from ex except P4RuntimeException as ex: LOGGER.critical("Error when setting config") LOGGER.critical(ex) - CLIENT.tear_down() + self.local_client.tear_down() raise P4RuntimeException(ex) from ex except Exception as ex: # pylint: disable=broad-except LOGGER.critical("Error when setting config") - CLIENT.tear_down() + self.local_client.tear_down() raise Exception(ex) from ex try: - self.__p4info = CLIENT.get_p4info() + self.__p4info = self.local_client.get_p4info() except P4RuntimeException as ex: LOGGER.critical("Error when retrieving P4Info") LOGGER.critical(ex) - CLIENT.tear_down() + self.local_client.tear_down() raise P4RuntimeException(ex) from ex CONTEXT.set_p4info(self.__p4info) @@ -375,14 +213,15 @@ class P4Manager: :return: void """ - global CLIENT + global CLIENTS # gRPC client must already be instantiated - assert CLIENT + assert self.local_client # Trigger connection tear down with the P4Runtime server - CLIENT.tear_down() - CLIENT = None + self.local_client.tear_down() + # Remove client entry from global dictionary + CLIENTS.pop(self.key_id) self.__clear() LOGGER.info("P4Runtime manager stopped") @@ -723,7 +562,7 @@ class P4Manager: try: for count, table_entry in enumerate( - TableEntry(table_name)(action=action_name).read()): + TableEntry(self.local_client, table_name)(action=action_name).read()): LOGGER.debug( "Table %s - Entry %d\n%s", table_name, count, table_entry) self.table_entries[table_name].append(table_entry) @@ -856,6 +695,154 @@ class P4Manager: ) return None + def insert_table_entry_exact(self, + table_name, match_map, action_name, action_params, metadata, + cnt_pkt=-1, cnt_byte=-1): + """ + Insert an entry into an exact match table. + + :param table_name: P4 table name + :param match_map: Map of match operations + :param action_name: Action name + :param action_params: Map of action parameters + :param metadata: table metadata + :param cnt_pkt: packet count + :param cnt_byte: byte count + :return: inserted entry + """ + assert match_map, "Table entry without match operations is not accepted" + assert action_name, "Table entry without action is not accepted" + + table_entry = TableEntry(self.local_client, table_name)(action=action_name) + + for match_k, match_v in match_map.items(): + table_entry.match[match_k] = match_v + + for action_k, action_v in action_params.items(): + table_entry.action[action_k] = action_v + + if metadata: + table_entry.metadata = metadata + + if cnt_pkt > 0: + table_entry.counter_data.packet_count = cnt_pkt + + if cnt_byte > 0: + table_entry.counter_data.byte_count = cnt_byte + + ex_msg = "" + try: + table_entry.insert() + LOGGER.info("Inserted exact table entry: %s", table_entry) + except (P4RuntimeException, P4RuntimeWriteException) as ex: + raise P4RuntimeException from ex + + # Table entry exists, needs to be modified + if "ALREADY_EXISTS" in ex_msg: + table_entry.modify() + LOGGER.info("Updated exact table entry: %s", table_entry) + + return table_entry + + + def insert_table_entry_ternary(self, + table_name, match_map, action_name, action_params, metadata, + priority, cnt_pkt=-1, cnt_byte=-1): + """ + Insert an entry into a ternary match table. + + :param table_name: P4 table name + :param match_map: Map of match operations + :param action_name: Action name + :param action_params: Map of action parameters + :param metadata: table metadata + :param priority: entry priority + :param cnt_pkt: packet count + :param cnt_byte: byte count + :return: inserted entry + """ + assert match_map, "Table entry without match operations is not accepted" + assert action_name, "Table entry without action is not accepted" + + table_entry = TableEntry(self.local_client, table_name)(action=action_name) + + for match_k, match_v in match_map.items(): + table_entry.match[match_k] = match_v + + for action_k, action_v in action_params.items(): + table_entry.action[action_k] = action_v + + table_entry.priority = priority + + if metadata: + table_entry.metadata = metadata + + if cnt_pkt > 0: + table_entry.counter_data.packet_count = cnt_pkt + + if cnt_byte > 0: + table_entry.counter_data.byte_count = cnt_byte + + ex_msg = "" + try: + table_entry.insert() + LOGGER.info("Inserted ternary table entry: %s", table_entry) + except (P4RuntimeException, P4RuntimeWriteException) as ex: + raise P4RuntimeException from ex + + # Table entry exists, needs to be modified + if "ALREADY_EXISTS" in ex_msg: + table_entry.modify() + LOGGER.info("Updated ternary table entry: %s", table_entry) + + return table_entry + + + def insert_table_entry_range(self, + table_name, match_map, action_name, action_params, metadata, + priority, cnt_pkt=-1, cnt_byte=-1): # pylint: disable=unused-argument + """ + Insert an entry into a range match table. + + :param table_name: P4 table name + :param match_map: Map of match operations + :param action_name: Action name + :param action_params: Map of action parameters + :param metadata: table metadata + :param priority: entry priority + :param cnt_pkt: packet count + :param cnt_byte: byte count + :return: inserted entry + """ + assert match_map, "Table entry without match operations is not accepted" + assert action_name, "Table entry without action is not accepted" + + raise NotImplementedError( + "Range-based table insertion not implemented yet") + + + def insert_table_entry_optional(self, + table_name, match_map, action_name, action_params, metadata, + priority, cnt_pkt=-1, cnt_byte=-1): # pylint: disable=unused-argument + """ + Insert an entry into an optional match table. + + :param table_name: P4 table name + :param match_map: Map of match operations + :param action_name: Action name + :param action_params: Map of action parameters + :param metadata: table metadata + :param priority: entry priority + :param cnt_pkt: packet count + :param cnt_byte: byte count + :return: inserted entry + """ + assert match_map, "Table entry without match operations is not accepted" + assert action_name, "Table entry without action is not accepted" + + raise NotImplementedError( + "Optional-based table insertion not implemented yet") + def insert_table_entry(self, table_name, match_map, action_name, action_params, priority, metadata=None, cnt_pkt=-1, cnt_byte=-1): @@ -889,26 +876,26 @@ class P4Manager: # Exact match is supported if get_table_type(table) == p4info_pb2.MatchField.EXACT: - return insert_table_entry_exact( + return self.insert_table_entry_exact( table_name, match_map, action_name, action_params, metadata, cnt_pkt, cnt_byte) # Ternary and LPM matches are supported if get_table_type(table) in \ [p4info_pb2.MatchField.TERNARY, p4info_pb2.MatchField.LPM]: - return insert_table_entry_ternary( + return self.insert_table_entry_ternary( table_name, match_map, action_name, action_params, metadata, priority, cnt_pkt, cnt_byte) # TODO: Cover RANGE match # pylint: disable=W0511 if get_table_type(table) == p4info_pb2.MatchField.RANGE: - return insert_table_entry_range( + return self.insert_table_entry_range( table_name, match_map, action_name, action_params, metadata, priority, cnt_pkt, cnt_byte) # TODO: Cover OPTIONAL match # pylint: disable=W0511 if get_table_type(table) == p4info_pb2.MatchField.OPTIONAL: - return insert_table_entry_optional( + return self.insert_table_entry_optional( table_name, match_map, action_name, action_params, metadata, priority, cnt_pkt, cnt_byte) @@ -935,7 +922,7 @@ class P4Manager: LOGGER.error(msg) raise UserError(msg) - table_entry = TableEntry(table_name)(action=action_name) + table_entry = TableEntry(self.local_client, table_name)(action=action_name) for match_k, match_v in match_map.items(): table_entry.match[match_k] = match_v @@ -979,7 +966,7 @@ class P4Manager: LOGGER.error(msg) raise UserError(msg) - TableEntry(table_name).read(function=lambda x: x.delete()) + TableEntry(self.local_client, table_name).read(function=lambda x: x.delete()) LOGGER.info("Deleted all entries from table: %s", table_name) def print_table_entries_spec(self, table_name): @@ -1179,7 +1166,7 @@ class P4Manager: self.counter_entries[cnt_name] = [] try: - for count, cnt_entry in enumerate(CounterEntry(cnt_name).read()): + for count, cnt_entry in enumerate(CounterEntry(self.local_client, cnt_name).read()): LOGGER.debug( "Counter %s - Entry %d\n%s", cnt_name, count, cnt_entry) self.counter_entries[cnt_name].append(cnt_entry) @@ -1298,7 +1285,7 @@ class P4Manager: assert cnt, \ "P4 pipeline does not implement counter " + cnt_name - cnt_entry = CounterEntry(cnt_name) + cnt_entry = CounterEntry(self.local_client, cnt_name) if index: cnt_entry.index = index @@ -1325,7 +1312,7 @@ class P4Manager: assert cnt, \ "P4 pipeline does not implement counter " + cnt_name - cnt_entry = CounterEntry(cnt_name) + cnt_entry = CounterEntry(self.local_client, cnt_name) cnt_entry.clear_data() LOGGER.info("Cleared data of counter entry: %s", cnt_entry) @@ -1394,7 +1381,7 @@ class P4Manager: try: for count, d_cnt_entry in enumerate( - DirectCounterEntry(d_cnt_name).read()): + DirectCounterEntry(self.local_client, d_cnt_name).read()): LOGGER.debug( "Direct counter %s - Entry %d\n%s", d_cnt_name, count, d_cnt_entry) @@ -1530,7 +1517,7 @@ class P4Manager: assert match_map,\ "Direct counter entry without match operations is not accepted" - d_cnt_entry = DirectCounterEntry(d_cnt_name) + d_cnt_entry = DirectCounterEntry(self.local_client, d_cnt_name) for match_k, match_v in match_map.items(): d_cnt_entry.table_entry.match[match_k] = match_v @@ -1559,7 +1546,7 @@ class P4Manager: assert d_cnt, \ "P4 pipeline does not implement direct counter " + d_cnt_name - d_cnt_entry = DirectCounterEntry(d_cnt_name) + d_cnt_entry = DirectCounterEntry(self.local_client, d_cnt_name) d_cnt_entry.clear_data() LOGGER.info("Cleared direct counter entry: %s", d_cnt_entry) @@ -1627,7 +1614,7 @@ class P4Manager: self.meter_entries[meter_name] = [] try: - for count, meter_entry in enumerate(MeterEntry(meter_name).read()): + for count, meter_entry in enumerate(MeterEntry(self.local_client, meter_name).read()): LOGGER.debug( "Meter %s - Entry %d\n%s", meter_name, count, meter_entry) self.meter_entries[meter_name].append(meter_entry) @@ -1756,7 +1743,7 @@ class P4Manager: assert meter, \ "P4 pipeline does not implement meter " + meter_name - meter_entry = MeterEntry(meter_name) + meter_entry = MeterEntry(self.local_client, meter_name) if index: meter_entry.index = index @@ -1789,7 +1776,7 @@ class P4Manager: assert meter, \ "P4 pipeline does not implement meter " + meter_name - meter_entry = MeterEntry(meter_name) + meter_entry = MeterEntry(self.local_client, meter_name) meter_entry.clear_config() LOGGER.info("Cleared meter entry: %s", meter_entry) @@ -1858,7 +1845,7 @@ class P4Manager: try: for count, d_meter_entry in enumerate( - MeterEntry(d_meter_name).read()): + MeterEntry(self.local_client, d_meter_name).read()): LOGGER.debug( "Direct meter %s - Entry %d\n%s", d_meter_name, count, d_meter_entry) @@ -1998,7 +1985,7 @@ class P4Manager: assert match_map,\ "Direct meter entry without match operations is not accepted" - d_meter_entry = DirectMeterEntry(d_meter_name) + d_meter_entry = DirectMeterEntry(self.local_client, d_meter_name) for match_k, match_v in match_map.items(): d_meter_entry.table_entry.match[match_k] = match_v @@ -2031,7 +2018,7 @@ class P4Manager: assert d_meter, \ "P4 pipeline does not implement direct meter " + d_meter_name - d_meter_entry = DirectMeterEntry(d_meter_name) + d_meter_entry = DirectMeterEntry(self.local_client, d_meter_name) d_meter_entry.clear_config() LOGGER.info("Cleared direct meter entry: %s", d_meter_entry) @@ -2100,7 +2087,7 @@ class P4Manager: try: for count, ap_entry in enumerate( - ActionProfileMember(ap_name).read()): + ActionProfileMember(self.local_client, ap_name).read()): LOGGER.debug( "Action profile member %s - Entry %d\n%s", ap_name, count, ap_entry) @@ -2230,7 +2217,7 @@ class P4Manager: assert act_p, \ "P4 pipeline does not implement action profile " + ap_name - ap_member_entry = ActionProfileMember(ap_name)( + ap_member_entry = ActionProfileMember(self.local_client, ap_name)( member_id=member_id, action=action_name) for action_k, action_v in action_params.items(): @@ -2267,7 +2254,7 @@ class P4Manager: assert act_p, \ "P4 pipeline does not implement action profile " + ap_name - ap_member_entry = ActionProfileMember(ap_name)( + ap_member_entry = ActionProfileMember(self.local_client, ap_name)( member_id=member_id, action=action_name) ap_member_entry.delete() LOGGER.info("Deleted action profile member entry: %s", ap_member_entry) @@ -2364,7 +2351,7 @@ class P4Manager: try: for count, ap_entry in enumerate( - ActionProfileGroup(ap_name).read()): + ActionProfileGroup(self.local_client, ap_name).read()): LOGGER.debug("Action profile group %s - Entry %d\n%s", ap_name, count, ap_entry) self.action_profile_groups[ap_name].append(ap_entry) @@ -2483,7 +2470,7 @@ class P4Manager: assert ap, \ "P4 pipeline does not implement action profile " + ap_name - ap_group_entry = ActionProfileGroup(ap_name)(group_id=group_id) + ap_group_entry = ActionProfileGroup(self.local_client, ap_name)(group_id=group_id) if members: for m in members: @@ -2519,7 +2506,7 @@ class P4Manager: assert ap, \ "P4 pipeline does not implement action profile " + ap_name - ap_group_entry = ActionProfileGroup(ap_name)(group_id=group_id) + ap_group_entry = ActionProfileGroup(self.local_client, ap_name)(group_id=group_id) ap_group_entry.delete() LOGGER.info("Deleted action profile group entry: %s", ap_group_entry) @@ -2537,7 +2524,7 @@ class P4Manager: assert ap, \ "P4 pipeline does not implement action profile " + ap_name - ap_group_entry = ActionProfileGroup(ap_name)(group_id=group_id) + ap_group_entry = ActionProfileGroup(self.local_client, ap_name)(group_id=group_id) ap_group_entry.clear() LOGGER.info("Cleared action profile group entry: %s", ap_group_entry) @@ -2631,7 +2618,7 @@ class P4Manager: self.multicast_groups[group_id] = None try: - mcast_group = MulticastGroupEntry(group_id).read() + mcast_group = MulticastGroupEntry(self.local_client, group_id).read() LOGGER.debug("Multicast group %d\n%s", group_id, mcast_group) self.multicast_groups[group_id] = mcast_group return self.multicast_groups[group_id] @@ -2724,7 +2711,7 @@ class P4Manager: assert ports, \ "No multicast group ports are provided" - mcast_group = MulticastGroupEntry(group_id) + mcast_group = MulticastGroupEntry(self.local_client, group_id) for p in ports: mcast_group.add(p, 1) @@ -2756,7 +2743,7 @@ class P4Manager: assert group_id > 0, \ "Multicast group " + group_id + " must be > 0" - mcast_group = MulticastGroupEntry(group_id) + mcast_group = MulticastGroupEntry(self.local_client, group_id) mcast_group.delete() if group_id in self.multicast_groups: @@ -2772,7 +2759,7 @@ class P4Manager: :return: void """ - for mcast_group in MulticastGroupEntry().read(): + for mcast_group in MulticastGroupEntry(self.local_client).read(): gid = mcast_group.group_id mcast_group.delete() del self.multicast_groups[gid] @@ -2828,7 +2815,7 @@ class P4Manager: self.clone_session_entries[session_id] = None try: - session = CloneSessionEntry(session_id).read() + session = CloneSessionEntry(self.local_client, session_id).read() LOGGER.debug("Clone session %d\n%s", session_id, session) self.clone_session_entries[session_id] = session return self.clone_session_entries[session_id] @@ -2923,7 +2910,7 @@ class P4Manager: assert ports, \ "No clone session ports are provided" - session = CloneSessionEntry(session_id) + session = CloneSessionEntry(self.local_client, session_id) for p in ports: session.add(p, 1) @@ -2955,7 +2942,7 @@ class P4Manager: assert session_id > 0, \ "Clone session " + session_id + " must be > 0" - session = CloneSessionEntry(session_id) + session = CloneSessionEntry(self.local_client, session_id) session.delete() if session_id in self.clone_session_entries: @@ -2971,7 +2958,7 @@ class P4Manager: :return: void """ - for e in CloneSessionEntry().read(): + for e in CloneSessionEntry(self.local_client).read(): sid = e.session_id e.delete() del self.clone_session_entries[sid] @@ -3052,7 +3039,7 @@ class P4Manager: "No controller packet metadata in the pipeline\n") return None - packet_in = PacketOut() + packet_in = PacketIn(self.local_client) packet_in.payload = payload if metadata: for name, value in metadata.items(): @@ -3090,7 +3077,7 @@ class P4Manager: _t = Thread(target=_sniff_packet, args=(captured_packet,)) _t.start() # P4Runtime client sends the packet to the switch - CLIENT.stream_in_q["packet"].put(packet_in) + self.local_client.stream_in_q["packet"].put(packet_in) _t.join() LOGGER.info("Packet-in sent: %s", packet_in) @@ -3111,7 +3098,7 @@ class P4Manager: "No controller packet metadata in the pipeline\n") return None - packet_out = PacketOut() + packet_out = PacketOut(self.local_client) packet_out.payload = payload if metadata: for name, value in metadata.items(): @@ -3654,12 +3641,14 @@ class _EntityBase: """ Basic entity. """ + local_client = None - def __init__(self, entity_type, p4runtime_cls, modify_only=False): + def __init__(self, p4_client, entity_type, p4runtime_cls, modify_only=False): self._init = False self._entity_type = entity_type self._entry = p4runtime_cls() self._modify_only = modify_only + self.local_client = p4_client def __dir__(self): d = ["msg", "read"] @@ -3696,7 +3685,7 @@ class _EntityBase: update = p4runtime_pb2.Update() update.type = type_ getattr(update.entity, self._entity_type.name).CopyFrom(self._entry) - CLIENT.write_update(update) + self.local_client.write_update(update) def insert(self): """ @@ -3747,7 +3736,7 @@ class _EntityBase: entity = p4runtime_pb2.Entity() getattr(entity, self._entity_type.name).CopyFrom(self._entry) - iterator = CLIENT.read_one(entity) + iterator = self.local_client.read_one(entity) # Cannot use a (simpler) generator here as we need to # decorate __next__ with @parse_p4runtime_error. @@ -3794,9 +3783,9 @@ class _P4EntityBase(_EntityBase): Basic P4 entity. """ - def __init__(self, p4_type, entity_type, p4runtime_cls, name=None, + def __init__(self, p4_client, p4_type, entity_type, p4runtime_cls, name=None, modify_only=False): - super().__init__(entity_type, p4runtime_cls, modify_only) + super().__init__(p4_client, entity_type, p4runtime_cls, modify_only) self._p4_type = p4_type if name is None: raise UserError( @@ -3825,8 +3814,8 @@ class ActionProfileMember(_P4EntityBase): P4 action profile member. """ - def __init__(self, action_profile_name=None): - super().__init__( + def __init__(self, p4_client, action_profile_name=None): + super().__init__( p4_client, P4Type.action_profile, P4RuntimeEntity.action_profile_member, p4runtime_pb2.ActionProfileMember, action_profile_name) self.member_id = 0 @@ -3991,8 +3980,8 @@ class ActionProfileGroup(_P4EntityBase): P4 action profile group. """ - def __init__(self, action_profile_name=None): - super().__init__( + def __init__(self, p4_client, action_profile_name=None): + super().__init__( p4_client, P4Type.action_profile, P4RuntimeEntity.action_profile_group, p4runtime_pb2.ActionProfileGroup, action_profile_name) self.group_id = 0 @@ -4554,8 +4543,8 @@ class TableEntry(_P4EntityBase): "oneshot": cls._ActionSpecType.ONESHOT, }.get(name, None) - def __init__(self, table_name=None): - super().__init__( + def __init__(self, p4_client, table_name=None): + super().__init__(p4_client, P4Type.table, P4RuntimeEntity.table_entry, p4runtime_pb2.TableEntry, table_name) self.match = MatchKey(table_name, self._info.match_fields) @@ -4996,8 +4985,8 @@ class _CounterEntryBase(_P4EntityBase): Basic P4 counter entry. """ - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, p4_client, *args, **kwargs): + super().__init__(p4_client, *args, **kwargs) self._counter_type = self._info.spec.unit self.packet_count = -1 self.byte_count = -1 @@ -5065,8 +5054,8 @@ class CounterEntry(_CounterEntryBase): P4 counter entry. """ - def __init__(self, counter_name=None): - super().__init__( + def __init__(self, p4_client, counter_name=None): + super().__init__( p4_client, P4Type.counter, P4RuntimeEntity.counter_entry, p4runtime_pb2.CounterEntry, counter_name, modify_only=True) @@ -5126,10 +5115,11 @@ To write to the counter, use <self>.modify class DirectCounterEntry(_CounterEntryBase): """ Direct P4 counter entry. - """ + """ + local_client = None - def __init__(self, direct_counter_name=None): - super().__init__( + def __init__(self, p4_client, direct_counter_name=None): + super().__init__( p4_client, P4Type.direct_counter, P4RuntimeEntity.direct_counter_entry, p4runtime_pb2.DirectCounterEntry, direct_counter_name, modify_only=True) @@ -5140,7 +5130,8 @@ class DirectCounterEntry(_CounterEntryBase): except KeyError as ex: raise InvalidP4InfoError(f"direct_table_id {self._direct_table_id} " f"is not a valid table id") from ex - self._table_entry = TableEntry(self._direct_table_name) + self._table_entry = TableEntry(p4_client, self._direct_table_name) + self.local_client = p4_client self.__doc__ = f""" An entry for direct counter '{direct_counter_name}' @@ -5167,7 +5158,7 @@ To write to the counter, use <self>.modify raise UserError("Direct counters are not index-based") if name == "table_entry": if value is None: - self._table_entry = TableEntry(self._direct_table_name) + self._table_entry = TableEntry(self.local_client, self._direct_table_name) return if not isinstance(value, TableEntry): raise UserError("table_entry must be an instance of TableEntry") @@ -5221,7 +5212,7 @@ class _MeterEntryBase(_P4EntityBase): Basic P4 meter entry. """ - def __init__(self, *args, **kwargs): + def __init__(self, p4_client, *args, **kwargs): super().__init__(*args, **kwargs) self._meter_type = self._info.spec.unit self.index = -1 @@ -5291,8 +5282,8 @@ class MeterEntry(_MeterEntryBase): P4 meter entry. """ - def __init__(self, meter_name=None): - super().__init__( + def __init__(self, p4_client, meter_name=None): + super().__init__(p4_client, P4Type.meter, P4RuntimeEntity.meter_entry, p4runtime_pb2.MeterEntry, meter_name, modify_only=True) @@ -5356,9 +5347,10 @@ class DirectMeterEntry(_MeterEntryBase): """ Direct P4 meter entry. """ + local_client = None - def __init__(self, direct_meter_name=None): - super().__init__( + def __init__(self, p4_client, direct_meter_name=None): + super().__init__(p4_client, P4Type.direct_meter, P4RuntimeEntity.direct_meter_entry, p4runtime_pb2.DirectMeterEntry, direct_meter_name, modify_only=True) @@ -5369,7 +5361,8 @@ class DirectMeterEntry(_MeterEntryBase): except KeyError as ex: raise InvalidP4InfoError(f"direct_table_id {self._direct_table_id} " f"is not a valid table id") from ex - self._table_entry = TableEntry(self._direct_table_name) + self._table_entry = TableEntry(p4_client, self._direct_table_name) + self.local_client = p4_client self.__doc__ = f""" An entry for direct meter '{direct_meter_name}' @@ -5399,7 +5392,7 @@ To write to the meter, use <self>.modify raise UserError("Direct meters are not index-based") if name == "table_entry": if value is None: - self._table_entry = TableEntry(self._direct_table_name) + self._table_entry = TableEntry(self.local_client, self._direct_table_name) return if not isinstance(value, TableEntry): raise UserError("table_entry must be an instance of TableEntry") @@ -5531,8 +5524,8 @@ class MulticastGroupEntry(_EntityBase): P4 multicast group entry. """ - def __init__(self, group_id=0): - super().__init__( + def __init__(self, p4_client, group_id=0): + super().__init__(p4_client, P4RuntimeEntity.packet_replication_engine_entry, p4runtime_pb2.PacketReplicationEngineEntry) self.group_id = group_id @@ -5609,8 +5602,8 @@ class CloneSessionEntry(_EntityBase): P4 clone session entry. """ - def __init__(self, session_id=0): - super().__init__( + def __init__(self, p4_client, session_id=0): + super().__init__(p4_client, P4RuntimeEntity.packet_replication_engine_entry, p4runtime_pb2.PacketReplicationEngineEntry) self.session_id = session_id @@ -5779,8 +5772,9 @@ class PacketIn(): """ P4 packet in. """ + local_client = None - def __init__(self): + def __init__(self, p4_client): ctrl_pkt_md = P4Objects(P4Type.controller_packet_metadata) self.md_info_list = {} if "packet_in" in ctrl_pkt_md: @@ -5788,10 +5782,11 @@ class PacketIn(): for md_info in self.p4_info.metadata: self.md_info_list[md_info.name] = md_info self.packet_in_queue = queue.Queue() + self.local_client = p4_client def _packet_in_recv_func(packet_in_queue): while True: - msg = CLIENT.get_stream_packet("packet", timeout=None) + msg = self.local_client.get_stream_packet("packet", timeout=None) if not msg: break packet_in_queue.put(msg) @@ -5857,8 +5852,9 @@ class PacketOut: """ P4 packet out. """ + local_client = None - def __init__(self, payload=b'', **kwargs): + def __init__(self, p4_client, payload=b'', **kwargs): self.p4_info = P4Objects(P4Type.controller_packet_metadata)[ "packet_out"] @@ -5868,6 +5864,7 @@ class PacketOut: if kwargs: for key, value in kwargs.items(): self.metadata[key] = value + self.local_client = p4_client def _update_msg(self): self._entry = p4runtime_pb2.PacketOut() @@ -5897,7 +5894,7 @@ class PacketOut: self._update_msg() msg = p4runtime_pb2.StreamMessageRequest() msg.packet.CopyFrom(self._entry) - CLIENT.stream_out_q.put(msg) + self.local_client.stream_out_q.put(msg) def str(self): """ @@ -5913,13 +5910,16 @@ class IdleTimeoutNotification(): """ P4 idle timeout notification. """ + + local_client = None - def __init__(self): + def __init__(self, p4_client): self.notification_queue = queue.Queue() + self.local_client = p4_client.local_client def _notification_recv_func(notification_queue): while True: - msg = CLIENT.get_stream_packet("idle_timeout_notification", + msg = self.local_client.get_stream_packet("idle_timeout_notification", timeout=None) if not msg: break diff --git a/src/monitoring/client/MonitoringClient.py b/src/monitoring/client/MonitoringClient.py index 73607a081cd57e7c62b9c4e2c5e487868e72d189..5641b9cf3236c5fecfa5c6efe3a03b899c342ea5 100644 --- a/src/monitoring/client/MonitoringClient.py +++ b/src/monitoring/client/MonitoringClient.py @@ -22,7 +22,7 @@ from common.tools.grpc.Tools import grpc_message_to_json_string from common.proto.context_pb2 import Empty from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest, \ KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsList, \ - SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription + SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription, RawKpiTable from common.proto.monitoring_pb2_grpc import MonitoringServiceStub LOGGER = logging.getLogger(__name__) @@ -93,7 +93,7 @@ class MonitoringClient: return response @RETRY_DECORATOR - def QueryKpiData(self, request : KpiQuery) -> KpiList: + def QueryKpiData(self, request : KpiQuery) -> RawKpiTable: LOGGER.debug('QueryKpiData: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.QueryKpiData(request) LOGGER.debug('QueryKpiData result: {:s}'.format(grpc_message_to_json_string(response))) diff --git a/src/monitoring/service/AlarmManager.py b/src/monitoring/service/AlarmManager.py index e5ac8915c3728c7894dc70ab901215dd5a7feb41..873a65d2c8041e6378f84d979bb1fd98d4d61d6b 100644 --- a/src/monitoring/service/AlarmManager.py +++ b/src/monitoring/service/AlarmManager.py @@ -1,3 +1,4 @@ +import pytz from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.jobstores.base import JobLookupError @@ -19,10 +20,16 @@ class AlarmManager(): end_date=None if subscription_timeout_s: start_timestamp=time.time() - start_date=datetime.fromtimestamp(start_timestamp) - end_date=datetime.fromtimestamp(start_timestamp+subscription_timeout_s) - self.scheduler.add_job(self.metrics_db.get_alarm_data, args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, end_date=end_date, id=alarm_id) + end_timestamp = start_timestamp + subscription_timeout_s + start_date = datetime.utcfromtimestamp(start_timestamp).isoformat() + end_date = datetime.utcfromtimestamp(end_timestamp).isoformat() + + job = self.scheduler.add_job(self.metrics_db.get_alarm_data, + args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms), + trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, + end_date=end_date,timezone=pytz.utc, id=str(alarm_id)) LOGGER.debug(f"Alarm job {alarm_id} succesfully created") + #job.remove() def delete_alarm(self, alarm_id): try: diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py index 2387ddde0ab9eecea6c8fc982ba97a94f1a88c98..2185a3986532ad1b8e629cdcdb66079f23995c8f 100644 --- a/src/monitoring/service/ManagementDBTools.py +++ b/src/monitoring/service/ManagementDBTools.py @@ -38,7 +38,10 @@ class ManagementDB(): kpi_sample_type INTEGER, device_id INTEGER, endpoint_id INTEGER, - service_id INTEGER + service_id INTEGER, + slice_id INTEGER, + connection_id INTEGER, + monitor_flag INTEGER ); """) LOGGER.debug("KPI table created in the ManagementDB") @@ -84,13 +87,13 @@ class ManagementDB(): LOGGER.debug(f"Alarm table cannot be created in the ManagementDB. {e}") raise Exception - def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id): + def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,slice_id,connection_id): try: c = self.client.cursor() - c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id)) + c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ? AND slice_id is ? AND connection_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id,slice_id,connection_id)) data=c.fetchone() if data is None: - c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id)) + c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,slice_id,connection_id) VALUES (?,?,?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,slice_id,connection_id)) self.client.commit() kpi_id = c.lastrowid LOGGER.debug(f"KPI {kpi_id} succesfully inserted in the ManagementDB") @@ -245,4 +248,41 @@ class ManagementDB(): LOGGER.debug(f"Alarms succesfully retrieved from the ManagementDB") return data except sqlite3.Error as e: - LOGGER.debug(f"Alarms cannot be retrieved from the ManagementDB: {e}") \ No newline at end of file + LOGGER.debug(f"Alarms cannot be retrieved from the ManagementDB: {e}") + + def check_monitoring_flag(self,kpi_id): + try: + c = self.client.cursor() + c.execute("SELECT monitor_flag FROM kpi WHERE kpi_id is ?",(kpi_id,)) + data=c.fetchone() + if data is None: + LOGGER.debug(f"KPI {kpi_id} does not exists") + return None + else: + if data[0] == 1: + return True + elif data[0] == 0: + return False + else: + LOGGER.debug(f"KPI {kpi_id} is wrong") + return None + except sqlite3.Error as e: + LOGGER.debug(f"KPI {kpi_id} cannot be checked from the ManagementDB: {e}") + + + def set_monitoring_flag(self,kpi_id,flag): + try: + c = self.client.cursor() + data = c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)).fetchone() + if data is None: + LOGGER.debug(f"KPI {kpi_id} does not exists") + return None + else: + if flag : + value = 1 + else: + value = 0 + c.execute("UPDATE kpi SET monitor_flag = ? WHERE kpi_id is ?",(value,kpi_id)) + return True + except sqlite3.Error as e: + LOGGER.debug(f"KPI {kpi_id} cannot be checked from the ManagementDB: {e}") \ No newline at end of file diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index 16e6373f542656b4c172c8d619bf3f17ca5df404..1d3888d5348bdbe2995f077310ca448827290382 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -87,6 +87,8 @@ class MetricsDB(): 'device_id SYMBOL,' \ 'endpoint_id SYMBOL,' \ 'service_id SYMBOL,' \ + 'slice_id SYMBOL,' \ + 'connection_id SYMBOL,' \ 'timestamp TIMESTAMP,' \ 'kpi_value DOUBLE)' \ 'TIMESTAMP(timestamp);' @@ -97,7 +99,7 @@ class MetricsDB(): LOGGER.debug(f"Table {self.table} cannot be created. {e}") raise Exception - def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, kpi_value): + def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, slice_id, connection_id, kpi_value): counter = 0 while (counter < self.retries): try: @@ -109,7 +111,9 @@ class MetricsDB(): 'kpi_sample_type': kpi_sample_type, 'device_id': device_id, 'endpoint_id': endpoint_id, - 'service_id': service_id}, + 'service_id': service_id, + 'slice_id': slice_id, + 'connection_id': connection_id,}, columns={ 'kpi_value': kpi_value}, at=datetime.datetime.fromtimestamp(time)) @@ -170,11 +174,54 @@ class MetricsDB(): if connection: connection.close() + def get_raw_kpi_list(self, kpi_id, monitoring_window_s, last_n_samples, start_timestamp, end_timestamp): + try: + query_root = f"SELECT timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' " + query = query_root + start_date = float() + end_date = float() + if last_n_samples: + query = query + f"ORDER BY timestamp DESC limit {last_n_samples}" + elif monitoring_window_s or start_timestamp or end_timestamp: + if start_timestamp and end_timestamp: + start_date = start_timestamp + end_date = end_timestamp + elif monitoring_window_s: + if start_timestamp and not end_timestamp: + start_date = start_timestamp + end_date = start_date + monitoring_window_s + elif end_timestamp and not start_timestamp: + end_date = end_timestamp + start_date = end_date - monitoring_window_s + elif not start_timestamp and not end_timestamp: + end_date = timestamp_utcnow_to_float() + start_date = end_date - monitoring_window_s + query = query + f"AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + else: + LOGGER.debug(f"Wrong parameters settings") + + LOGGER.debug(query) + + if self.postgre: + kpi_list = self.run_query_postgre(query) + LOGGER.debug(f"kpi_list postgre: {kpi_list}") + else: + kpi_list = self.run_query(query) + LOGGER.debug(f"kpi_list influx: {kpi_list}") + if kpi_list: + LOGGER.debug(f"New data received for subscription to KPI {kpi_id}") + return kpi_list + else: + LOGGER.debug(f"No new data for the subscription to KPI {kpi_id}") + except (Exception) as e: + LOGGER.debug(f"Subscription data cannot be retrieved. {e}") + def get_subscription_data(self,subs_queue, kpi_id, sampling_interval_s=1): try: end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000 start_date = end_date - sampling_interval_s query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + LOGGER.debug(query) if self.postgre: kpi_list = self.run_query_postgre(query) LOGGER.debug(f"kpi_list postgre: {kpi_list}") @@ -201,6 +248,8 @@ class MetricsDB(): kpi_list = self.run_query(query) if kpi_list: LOGGER.debug(f"New data received for alarm of KPI {kpi_id}") + LOGGER.info(kpi_list) + valid_kpi_list = [] for kpi in kpi_list: alarm = False kpi_value = kpi[2] @@ -263,10 +312,10 @@ class MetricsDB(): if (kpi_value >= kpiMaxValue): alarm = True if alarm: - # queue.append[kpi] - alarm_queue.put_nowait(kpi) - LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") - else: - LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") + valid_kpi_list.append(kpi) + alarm_queue.put_nowait(valid_kpi_list) + LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") + else: + LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") except (Exception) as e: LOGGER.debug(f"Alarm data cannot be retrieved. {e}") \ No newline at end of file diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 7cd47f187986a0c32eea2ac8405183ac4418d100..6e927476bd4c3e9f61068efef06f77568ecc0961 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -26,9 +26,9 @@ from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \ KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \ - MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse + MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse, RawKpiTable, RawKpi, RawKpiList from common.rpc_method_wrapper.ServiceExceptions import ServiceException -from common.tools.timestamp.Converters import timestamp_string_to_float +from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float from monitoring.service import ManagementDBTools, MetricsDBTools from device.client.DeviceClient import DeviceClient @@ -85,13 +85,16 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpi_device_id = request.device_id.device_uuid.uuid kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid kpi_service_id = request.service_id.service_uuid.uuid + kpi_slice_id = request.slice_id.slice_uuid.uuid + kpi_connection_id = request.connection_id.connection_uuid.uuid - if request.kpi_id.kpi_id.uuid is not "": + + if request.kpi_id.kpi_id.uuid != "": response.kpi_id.uuid = request.kpi_id.kpi_id.uuid # Here the code to modify an existing kpi else: data = self.management_db.insert_KPI( - kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_slice_id, kpi_connection_id) response.kpi_id.uuid = str(data) return response @@ -131,11 +134,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): if kpi_db is None: LOGGER.info('GetKpiDescriptor error: KpiID({:s}): not found in database'.format(str(kpi_id))) else: - kpiDescriptor.kpi_description = kpi_db[1] - kpiDescriptor.kpi_sample_type = kpi_db[2] - kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) - kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) - kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) + kpiDescriptor.kpi_description = kpi_db[1] + kpiDescriptor.kpi_sample_type = kpi_db[2] + kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) + kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) + kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) + kpiDescriptor.slice_id.slice_uuid.uuid = str(kpi_db[6]) + kpiDescriptor.connection_id.connection_uuid.uuid = str(kpi_db[7]) return kpiDescriptor except ServiceException as e: LOGGER.exception('GetKpiDescriptor exception') @@ -154,12 +159,14 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): for item in data: kpi_descriptor = KpiDescriptor() - kpi_descriptor.kpi_id.kpi_id.uuid = str(item[0]) - kpi_descriptor.kpi_description = item[1] - kpi_descriptor.kpi_sample_type = item[2] - kpi_descriptor.device_id.device_uuid.uuid = str(item[3]) - kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4]) - kpi_descriptor.service_id.service_uuid.uuid = str(item[5]) + kpi_descriptor.kpi_id.kpi_id.uuid = str(item[0]) + kpi_descriptor.kpi_description = item[1] + kpi_descriptor.kpi_sample_type = item[2] + kpi_descriptor.device_id.device_uuid.uuid = str(item[3]) + kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4]) + kpi_descriptor.service_id.service_uuid.uuid = str(item[5]) + kpi_descriptor.slice_id.slice_uuid.uuid = str(item[6]) + kpi_descriptor.connection_id.connection_uuid.uuid = str(item[7]) kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor) @@ -186,11 +193,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): deviceId = kpiDescriptor.device_id.device_uuid.uuid endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid serviceId = kpiDescriptor.service_id.service_uuid.uuid + sliceId = kpiDescriptor.slice_id.slice_uuid.uuid + connectionId = kpiDescriptor.connection_id.connection_uuid.uuid time_stamp = request.timestamp.timestamp kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) # Build the structure to be included as point in the MetricsDB - self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, kpi_value) + self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, kpi_value) return Empty() except ServiceException as e: @@ -220,8 +229,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): monitor_device_request.sampling_duration_s = request.monitoring_window_s monitor_device_request.sampling_interval_s = request.sampling_rate_s - device_client = DeviceClient() - device_client.MonitorDeviceKpi(monitor_device_request) + if not self.management_db.check_monitoring_flag(kpi_id): + device_client = DeviceClient() + device_client.MonitorDeviceKpi(monitor_device_request) + self.management_db.set_monitoring_flag(kpi_id,True) + self.management_db.check_monitoring_flag(kpi_id) + else: + LOGGER.warning('MonitorKpi warning: KpiID({:s}) is currently being monitored'.format(str(kpi_id))) else: LOGGER.info('MonitorKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) return response @@ -234,12 +248,48 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # CREATEKPI_COUNTER_FAILED.inc() - def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> KpiList: + def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> RawKpiTable: LOGGER.info('QueryKpiData') try: - # TBC - return KpiList() + raw_kpi_table = RawKpiTable() + + LOGGER.debug(str(request)) + + kpi_id_list = request.kpi_ids + monitoring_window_s = request.monitoring_window_s + last_n_samples = request.last_n_samples + start_timestamp = request.start_timestamp.timestamp + end_timestamp = request.end_timestamp.timestamp + + # Check if all the Kpi_ids exist + for item in kpi_id_list: + kpi_id = item.kpi_id.uuid + + kpiDescriptor = self.GetKpiDescriptor(item, grpc_context) + if kpiDescriptor is None: + LOGGER.info('QueryKpiData error: KpiID({:s}): not found in database'.format(str(kpi_id))) + break + else: + # Execute query per Kpi_id and introduce their kpi_list in the table + kpi_list = self.metrics_db.get_raw_kpi_list(kpi_id,monitoring_window_s,last_n_samples,start_timestamp,end_timestamp) + raw_kpi_list = RawKpiList() + raw_kpi_list.kpi_id.kpi_id.uuid = kpi_id + + LOGGER.debug(str(kpi_list)) + + if kpi_list is None: + LOGGER.info('QueryKpiData error: KpiID({:s}): points not found in metrics database'.format(str(kpi_id))) + else: + for item in kpi_list: + raw_kpi = RawKpi() + raw_kpi.timestamp.timestamp = timestamp_string_to_float(item[0]) + raw_kpi.kpi_value.floatVal = item[1] + raw_kpi_list.raw_kpis.append(raw_kpi) + + raw_kpi_table.raw_kpi_lists.append(raw_kpi_list) + + return raw_kpi_table except ServiceException as e: LOGGER.exception('QueryKpiData exception') grpc_context.abort(e.code, e.details) @@ -250,9 +300,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('SubscribeKpi') try: - subs_queue = Queue() - subs_response = SubsResponse() kpi_id = request.kpi_id.kpi_id.uuid sampling_duration_s = request.sampling_duration_s @@ -268,18 +316,21 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): start_timestamp, end_timestamp) # parse queue to append kpis into the list - while not subs_queue.empty(): - list = subs_queue.get_nowait() - for item in list: - kpi = Kpi() - kpi.kpi_id.kpi_id.uuid = str(item[0]) - kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) - kpi.kpi_value.floatVal = item[2] # This must be improved - subs_response.kpi_list.kpi.append(kpi) - - subs_response.subs_id.subs_id.uuid = str(subs_id) - - yield subs_response + while True: + while not subs_queue.empty(): + subs_response = SubsResponse() + list = subs_queue.get_nowait() + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = str(item[0]) + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] # This must be improved + subs_response.kpi_list.kpi.append(kpi) + subs_response.subs_id.subs_id.uuid = str(subs_id) + yield subs_response + if timestamp_utcnow_to_float() > end_timestamp: + break + # yield subs_response except ServiceException as e: LOGGER.exception('SubscribeKpi exception') grpc_context.abort(e.code, e.details) @@ -373,7 +424,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.debug(f"request.AlarmID: {request.alarm_id.alarm_id.uuid}") - if request.alarm_id.alarm_id.uuid is not "": + if request.alarm_id.alarm_id.uuid != "": alarm_id = request.alarm_id.alarm_id.uuid # Here the code to modify an existing alarm else: @@ -424,6 +475,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('GetAlarmDescriptor') try: alarm_id = request.alarm_id.uuid + LOGGER.debug(alarm_id) alarm = self.management_db.get_alarm(alarm_id) response = AlarmDescriptor() @@ -454,15 +506,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('GetAlarmResponseStream') try: alarm_id = request.alarm_id.alarm_id.uuid - alarm = self.management_db.get_alarm(alarm_id) - alarm_response = AlarmResponse() - - if alarm: + alarm_data = self.management_db.get_alarm(alarm_id) + real_start_time = timestamp_utcnow_to_float() + if alarm_data: + LOGGER.debug(f"{alarm_data}") alarm_queue = Queue() - alarm_data = self.management_db.get_alarm(alarm) - alarm_id = request.alarm_id.alarm_id.uuid kpi_id = alarm_data[3] kpiMinValue = alarm_data[4] @@ -473,24 +523,30 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): subscription_frequency_ms = request.subscription_frequency_ms subscription_timeout_s = request.subscription_timeout_s + end_timestamp = real_start_time + subscription_timeout_s + self.alarm_manager.create_alarm(alarm_queue, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms, subscription_timeout_s) - while not alarm_queue.empty(): - list = alarm_queue.get_nowait() - for item in list: - kpi = Kpi() - kpi.kpi_id.kpi_id.uuid = str(item[0]) - kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) - kpi.kpi_value.floatVal = item[2] # This must be improved - alarm_response.kpi_list.kpi.append(kpi) - - alarm_response.alarm_id.alarm_id.uuid = alarm_id - - yield alarm_response + while True: + while not alarm_queue.empty(): + alarm_response = AlarmResponse() + list = alarm_queue.get_nowait() + size = len(list) + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = str(item[0]) + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] # This must be improved + alarm_response.kpi_list.kpi.append(kpi) + alarm_response.alarm_id.alarm_id.uuid = alarm_id + yield alarm_response + if timestamp_utcnow_to_float() > end_timestamp: + break else: LOGGER.info('GetAlarmResponseStream error: AlarmID({:s}): not found in database'.format(str(alarm_id))) + alarm_response = AlarmResponse() alarm_response.alarm_id.alarm_id.uuid = "NoID" return alarm_response except ServiceException as e: @@ -527,7 +583,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpi_db = self.management_db.get_KPI(int(kpi_id)) response = Kpi() if kpi_db is None: - LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) + LOGGER.info('GetStreamKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) response.kpi_id.kpi_id.uuid = "NoID" return response else: @@ -540,7 +596,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): try: kpi_id = request.kpi_id.uuid response = Kpi() - if kpi_id is "": + if kpi_id == "": LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) response.kpi_id.kpi_id.uuid = "NoID" else: @@ -548,10 +604,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): f"LATEST ON timestamp PARTITION BY kpi_id" data = self.metrics_db.run_query(query)[0] LOGGER.debug(data) - - response.kpi_id.kpi_id.uuid = str(data[0]) - response.timestamp.timestamp = timestamp_string_to_float(data[1]) - response.kpi_value.floatVal = data[2] # This must be improved + if len(data) == 0: + response.kpi_id.kpi_id.uuid = request.kpi_id.uuid + else: + data = data[0] + response.kpi_id.kpi_id.uuid = str(data[0]) + response.timestamp.timestamp = timestamp_string_to_float(data[1]) + response.kpi_value.floatVal = data[2] return response except ServiceException as e: diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index fe27d6ee365676b05175b762a106621121e3b897..3d1da36b7c5f66c28d3885a305660d6971f695b1 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -42,14 +42,12 @@ class SubscriptionManager(): if end_timestamp: end_date = datetime.utcfromtimestamp(end_timestamp).isoformat() - LOGGER.debug(f"kpi_id: {kpi_id}") - LOGGER.debug(f"sampling_interval_s: {sampling_interval_s}") - LOGGER.debug(f"subscription_id: {subscription_id}") - LOGGER.debug(f"start_date: {start_date}") - self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s), + job = self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s), trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, timezone=pytz.utc, id=str(subscription_id)) LOGGER.debug(f"Subscrition job {subscription_id} succesfully created") + #job.remove() def delete_subscription(self, subscription_id): - self.scheduler.remove_job(subscription_id) \ No newline at end of file + self.scheduler.remove_job(subscription_id) + LOGGER.debug(f"Subscription job {subscription_id} succesfully deleted") \ No newline at end of file diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index 845153856c44cec0576bd6f11b045e3310558a97..f15cb5ec2c1d14ed95731cd37e54cb714b29e8b7 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import datetime from random import random from common.proto import monitoring_pb2 @@ -23,13 +22,15 @@ def kpi_id(): _kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member return _kpi_id -def create_kpi_request(): - _create_kpi_request = monitoring_pb2.KpiDescriptor() - _create_kpi_request.kpi_description = 'KPI Description Test' - _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED - _create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member - _create_kpi_request.service_id.service_uuid.uuid = 'SERV1' # pylint: disable=maybe-no-member - _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member +def create_kpi_request(kpi_id_str): + _create_kpi_request = monitoring_pb2.KpiDescriptor() + _create_kpi_request.kpi_description = 'KPI Description Test' + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED + _create_kpi_request.device_id.device_uuid.uuid = 'DEV' + str(kpi_id_str) + _create_kpi_request.service_id.service_uuid.uuid = 'SERV' + str(kpi_id_str) + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC' + str(kpi_id_str) + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END' + str(kpi_id_str) + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON' + str(kpi_id_str) return _create_kpi_request def create_kpi_request_b(): @@ -38,7 +39,9 @@ def create_kpi_request_b(): _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC2' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member return _create_kpi_request def create_kpi_request_c(): @@ -47,7 +50,9 @@ def create_kpi_request_c(): _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON3' # pylint: disable=maybe-no-member return _create_kpi_request def monitor_kpi_request(kpi_uuid, monitoring_window_s, sampling_rate_s): @@ -69,20 +74,32 @@ def kpi_descriptor_list(): return _kpi_descriptor_list -def kpi_query(): +def kpi_query(kpi_id_list): _kpi_query = monitoring_pb2.KpiQuery() + _kpi_query.kpi_ids.extend(kpi_id_list) + # _kpi_query.monitoring_window_s = 10 + # _kpi_query.last_n_samples = 2 + _kpi_query.start_timestamp.timestamp = timestamp_utcnow_to_float() - 10 + _kpi_query.end_timestamp.timestamp = timestamp_utcnow_to_float() + return _kpi_query def subs_descriptor(kpi_id): _subs_descriptor = monitoring_pb2.SubsDescriptor() + sampling_duration_s = 10 + sampling_interval_s = 3 + real_start_time = timestamp_utcnow_to_float() + start_timestamp = real_start_time + end_timestamp = start_timestamp + sampling_duration_s + _subs_descriptor.subs_id.subs_id.uuid = "" _subs_descriptor.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid - _subs_descriptor.sampling_duration_s = 10 - _subs_descriptor.sampling_interval_s = 2 - _subs_descriptor.start_timestamp.timestamp = timestamp_utcnow_to_float() - _subs_descriptor.end_timestamp.timestamp = timestamp_utcnow_to_float() + 10 + _subs_descriptor.sampling_duration_s = sampling_duration_s + _subs_descriptor.sampling_interval_s = sampling_interval_s + _subs_descriptor.start_timestamp.timestamp = start_timestamp + _subs_descriptor.end_timestamp.timestamp = end_timestamp return _subs_descriptor @@ -91,14 +108,14 @@ def subs_id(): return _subs_id -def alarm_descriptor(): +def alarm_descriptor(kpi_id): _alarm_descriptor = monitoring_pb2.AlarmDescriptor() _alarm_descriptor.alarm_description = "Alarm Description" _alarm_descriptor.name = "Alarm Name" - _alarm_descriptor.kpi_id.kpi_id.uuid = "1" + _alarm_descriptor.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid _alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = 0.0 - _alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = 50.0 + _alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = 250.0 _alarm_descriptor.kpi_value_range.inRange = True _alarm_descriptor.kpi_value_range.includeMinValue = False _alarm_descriptor.kpi_value_range.includeMaxValue = True @@ -113,11 +130,16 @@ def alarm_descriptor_b(): return _alarm_descriptor def alarm_subscription(alarm_id): - _alarm_descriptor = monitoring_pb2.AlarmSubscription() + _alarm_subscription = monitoring_pb2.AlarmSubscription() - _alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm_id) + subscription_timeout_s = 10 + subscription_frequency_ms = 1000 - return _alarm_descriptor + _alarm_subscription.alarm_id.alarm_id.uuid = str(alarm_id.alarm_id.uuid) + _alarm_subscription.subscription_timeout_s = subscription_timeout_s + _alarm_subscription.subscription_frequency_ms = subscription_frequency_ms + + return _alarm_subscription def alarm_id(): diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index ee6a29e8a483fe53c58a6e6d2e3aa240f2456b81..b113f5a7822841e17274300dc7102664bce1c409 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -15,11 +15,14 @@ import copy, os, pytest import threading import time +from queue import Queue +from random import random from time import sleep from typing import Tuple from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.schedulers.base import STATE_STOPPED from grpc._channel import _MultiThreadedRendezvous from common.Constants import ServiceNameEnum @@ -33,7 +36,8 @@ from common.message_broker.MessageBroker import MessageBroker from common.proto import monitoring_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \ - AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse + AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse, RawKpiTable +from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_string_to_float from context.client.ContextClient import ContextClient from context.service.grpc_server.ContextService import ContextService @@ -43,6 +47,9 @@ from device.client.DeviceClient import DeviceClient from device.service.DeviceService import DeviceService from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache +from monitoring.service.AlarmManager import AlarmManager +from monitoring.service.MetricsDBTools import MetricsDB +from monitoring.service.SubscriptionManager import SubscriptionManager os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE' from device.service.drivers import DRIVERS # pylint: disable=wrong-import-position @@ -175,14 +182,23 @@ def subs_scheduler(): return _scheduler -def ingestion_data(monitoring_client): - _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - _include_kpi_request = include_kpi_request(_kpi_id) +def ingestion_data(kpi_id_int): + metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring") + + for i in range(50): + kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') + kpiId = kpi_id_int + deviceId = 'DEV'+ str(kpi_id_int) + endpointId = 'END' + str(kpi_id_int) + serviceId = 'SERV' + str(kpi_id_int) + sliceId = 'SLC' + str(kpi_id_int) + connectionId = 'CON' + str(kpi_id_int) + time_stamp = timestamp_utcnow_to_float() + kpi_value = 500*random() - for i in range(200): - _include_kpi_request = include_kpi_request(_kpi_id) - monitoring_client.IncludeKpi(_include_kpi_request) - time.sleep(0.01) + metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, + kpi_value) + sleep(0.1) ########################### # Tests Implementation @@ -192,18 +208,17 @@ def ingestion_data(monitoring_client): def test_set_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('test_create_kpi requesting') - response = monitoring_client.SetKpi(create_kpi_request()) - LOGGER.debug(str(response)) - response = monitoring_client.SetKpi(create_kpi_request_b()) - LOGGER.debug(str(response)) - assert isinstance(response, KpiId) + for i in range(3): + response = monitoring_client.SetKpi(create_kpi_request(str(i+1))) + LOGGER.debug(str(response)) + assert isinstance(response, KpiId) # Test case that makes use of client fixture to test server's DeleteKpi method def test_delete_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('delete_kpi requesting') - response = monitoring_client.SetKpi(create_kpi_request_b()) + response = monitoring_client.SetKpi(create_kpi_request('4')) response = monitoring_client.DeleteKpi(response) LOGGER.debug(str(response)) assert isinstance(response, Empty) @@ -211,7 +226,7 @@ def test_delete_kpi(monitoring_client): # pylint: disable=redefined-outer-name # Test case that makes use of client fixture to test server's GetKpiDescriptor method def test_get_kpidescritor(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getkpidescritor_kpi begin') - response = monitoring_client.SetKpi(create_kpi_request_c()) + response = monitoring_client.SetKpi(create_kpi_request('1')) response = monitoring_client.GetKpiDescriptor(response) LOGGER.debug(str(response)) assert isinstance(response, KpiDescriptor) @@ -227,7 +242,8 @@ def test_get_kpi_descriptor_list(monitoring_client): # pylint: disable=redefined def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('test_include_kpi requesting') - kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + kpi_id = monitoring_client.SetKpi(create_kpi_request('1')) + LOGGER.debug(str(kpi_id)) response = monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) LOGGER.debug(str(response)) assert isinstance(response, Empty) @@ -261,44 +277,40 @@ def test_monitor_kpi( response = device_client.AddDevice(Device(**device_with_connect_rules)) assert response.device_uuid.uuid == DEVICE_DEV1_UUID - response = monitoring_client.SetKpi(create_kpi_request()) + response = monitoring_client.SetKpi(create_kpi_request('1')) _monitor_kpi_request = monitor_kpi_request(response.kpi_id.uuid, 120, 5) # pylint: disable=maybe-no-member response = monitoring_client.MonitorKpi(_monitor_kpi_request) LOGGER.debug(str(response)) assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's QueryKpiData method -def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-name +def test_query_kpi_data(monitoring_client,subs_scheduler): # pylint: disable=redefined-outer-name + + kpi_id_list = [] + for i in range(2): + kpi_id = monitoring_client.SetKpi(create_kpi_request(str(i+1))) + subs_scheduler.add_job(ingestion_data, args=[kpi_id.kpi_id.uuid]) + kpi_id_list.append(kpi_id) LOGGER.warning('test_query_kpi_data') - response = monitoring_client.QueryKpiData(kpi_query()) + sleep(5) + response = monitoring_client.QueryKpiData(kpi_query(kpi_id_list)) LOGGER.debug(str(response)) - assert isinstance(response, KpiList) - -def test_ingestion_data(monitoring_client): - _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - _include_kpi_request = include_kpi_request(_kpi_id) - - for i in range(100): - _include_kpi_request = include_kpi_request(_kpi_id) - monitoring_client.IncludeKpi(_include_kpi_request) - time.sleep(0.01) - -# def test_subscription_scheduler(monitoring_client,metrics_db,subs_scheduler): -# subs_scheduler.add_job(ingestion_data(monitoring_client),id="1") + assert isinstance(response, RawKpiTable) + if (subs_scheduler.state != STATE_STOPPED): + subs_scheduler.shutdown() # Test case that makes use of client fixture to test server's SetKpiSubscription method -def test_set_kpi_subscription(monitoring_client,metrics_db): # pylint: disable=redefined-outer-name +def test_set_kpi_subscription(monitoring_client,subs_scheduler): # pylint: disable=redefined-outer-name LOGGER.warning('test_set_kpi_subscription') - kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - # thread = threading.Thread(target=test_ingestion_data, args=(monitoring_client,metrics_db)) - # thread.start() - monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) + kpi_id = monitoring_client.SetKpi(create_kpi_request('1')) + subs_scheduler.add_job(ingestion_data, args=[kpi_id.kpi_id.uuid]) response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) assert isinstance(response, _MultiThreadedRendezvous) - LOGGER.debug(response) for item in response: LOGGER.debug(item) assert isinstance(item, SubsResponse) + if (subs_scheduler.state != STATE_STOPPED): + subs_scheduler.shutdown() # Test case that makes use of client fixture to test server's GetSubsDescriptor method def test_get_subs_descriptor(monitoring_client): @@ -331,7 +343,8 @@ def test_delete_subscription(monitoring_client): # Test case that makes use of client fixture to test server's SetKpiAlarm method def test_set_kpi_alarm(monitoring_client): LOGGER.warning('test_set_kpi_alarm') - response = monitoring_client.SetKpiAlarm(alarm_descriptor()) + kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + response = monitoring_client.SetKpiAlarm(alarm_descriptor(kpi_id)) LOGGER.debug(str(response)) assert isinstance(response, AlarmID) @@ -345,28 +358,35 @@ def test_get_alarms(monitoring_client): # Test case that makes use of client fixture to test server's GetAlarmDescriptor method def test_get_alarm_descriptor(monitoring_client): LOGGER.warning('test_get_alarm_descriptor') - alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) - response = monitoring_client.GetAlarmDescriptor(alarm_id) - LOGGER.debug(response) - assert isinstance(response, AlarmDescriptor) + _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + _response = monitoring_client.GetAlarmDescriptor(_alarm_id) + LOGGER.debug(_response) + assert isinstance(_response, AlarmDescriptor) # Test case that makes use of client fixture to test server's GetAlarmResponseStream method -def test_get_alarm_response_stream(monitoring_client): +def test_get_alarm_response_stream(monitoring_client,subs_scheduler): LOGGER.warning('test_get_alarm_descriptor') - alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) - response = monitoring_client.GetAlarmResponseStream(alarm_subscription(alarm_id)) - assert isinstance(response, _MultiThreadedRendezvous) - for item in response: - LOGGER.debug(response) + _kpi_id = monitoring_client.SetKpi(create_kpi_request('3')) + _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + subs_scheduler.add_job(ingestion_data,args=[_kpi_id.kpi_id.uuid]) + _response = monitoring_client.GetAlarmResponseStream(alarm_subscription(_alarm_id)) + assert isinstance(_response, _MultiThreadedRendezvous) + for item in _response: + LOGGER.debug(item) assert isinstance(item,AlarmResponse) + if(subs_scheduler.state != STATE_STOPPED): + subs_scheduler.shutdown() + # Test case that makes use of client fixture to test server's DeleteAlarm method def test_delete_alarm(monitoring_client): LOGGER.warning('test_delete_alarm') - alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) - response = monitoring_client.DeleteAlarm(alarm_id) - LOGGER.debug(type(response)) - assert isinstance(response, Empty) + _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + _response = monitoring_client.DeleteAlarm(_alarm_id) + LOGGER.debug(type(_response)) + assert isinstance(_response, Empty) # Test case that makes use of client fixture to test server's GetStreamKpi method def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name @@ -384,64 +404,117 @@ def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-na # response = monitoring_client.GetInstantKpi(kpi_id) # LOGGER.debug(response) # assert isinstance(response, Kpi) - # response = monitoring_client.GetInstantKpi(KpiId()) - # LOGGER.debug(type(response)) - # assert response.kpi_id.kpi_id.uuid == "NoID" -def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_managementdb_tools_insert_kpi begin') - _create_kpi_request = create_kpi_request() - kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member - kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member - kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member - kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member - kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member - - response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) - assert isinstance(response, int) -def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_managementdb_tools_get_kpi begin') - _create_kpi_request = create_kpi_request() +def test_managementdb_tools_kpis(management_db): # pylint: disable=redefined-outer-name + LOGGER.warning('test_managementdb_tools_kpis begin') + _create_kpi_request = create_kpi_request('5') kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid + kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid + + _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_slice_id,kpi_connection_id) + assert isinstance(_kpi_id, int) - _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) response = management_db.get_KPI(_kpi_id) assert isinstance(response, tuple) -def test_managementdb_tools_get_kpis(management_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_managementdb_tools_get_kpis begin') + response = management_db.set_monitoring_flag(_kpi_id,True) + assert response is True + response = management_db.check_monitoring_flag(_kpi_id) + assert response is True + management_db.set_monitoring_flag(_kpi_id, False) + response = management_db.check_monitoring_flag(_kpi_id) + assert response is False + response = management_db.get_KPIS() assert isinstance(response, list) -def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_managementdb_tools_get_kpi begin') - - _create_kpi_request = create_kpi_request() - kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member - kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member - kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member - kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member - kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member - - _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, - kpi_service_id) - response = management_db.delete_KPI(_kpi_id) - assert response -def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_metric_sdb_tools_write_kpi begin') - -def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_metrics_db_tools_read_kpi_points begin') +def test_managementdb_tools_insert_alarm(management_db): + LOGGER.warning('test_managementdb_tools_insert_alarm begin') + _alarm_description = "Alarm Description" + _alarm_name = "Alarm Name" + _kpi_id = "3" + _kpi_min_value = 0.0 + _kpi_max_value = 250.0 + _in_range = True + _include_min_value = False + _include_max_value = True + _alarm_id = management_db.insert_alarm(_alarm_description, _alarm_name, _kpi_id, _kpi_min_value, + _kpi_max_value, + _in_range, _include_min_value, _include_max_value) + LOGGER.debug(_alarm_id) + assert isinstance(_alarm_id,int) +# +# def test_metrics_db_tools(metrics_db): # pylint: disable=redefined-outer-name +# LOGGER.warning('test_metric_sdb_tools_write_kpi begin') +# _kpiId = "6" +# +# for i in range(50): +# _kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') +# _deviceId = 'DEV4' +# _endpointId = 'END4' +# _serviceId = 'SERV4' +# _sliceId = 'SLC4' +# _connectionId = 'CON4' +# _time_stamp = timestamp_utcnow_to_float() +# _kpi_value = 500*random() +# +# metrics_db.write_KPI(_time_stamp, _kpiId, _kpiSampleType, _deviceId, _endpointId, _serviceId, _sliceId, _connectionId, +# _kpi_value) +# sleep(0.05) +# +# _query = f"SELECT * FROM monitoring WHERE kpi_id ='{_kpiId}'" +# _data = metrics_db.run_query(_query) +# assert len(_data) >= 50 +# +# def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler): +# LOGGER.warning('test_subscription_manager_create_subscription begin') +# subs_queue = Queue() +# +# subs_manager = SubscriptionManager(metrics_db) +# +# subs_scheduler.add_job(ingestion_data) +# +# kpi_id = "3" +# sampling_duration_s = 20 +# sampling_interval_s = 3 +# real_start_time = timestamp_utcnow_to_float() +# start_timestamp = real_start_time +# end_timestamp = start_timestamp + sampling_duration_s +# +# subs_id = management_db.insert_subscription(kpi_id, "localhost", sampling_duration_s, +# sampling_interval_s,start_timestamp,end_timestamp) +# subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s, +# sampling_duration_s,start_timestamp,end_timestamp) +# +# # This is here to simulate application activity (which keeps the main thread alive). +# total_points = 0 +# while True: +# while not subs_queue.empty(): +# list = subs_queue.get_nowait() +# kpi_list = KpiList() +# for item in list: +# kpi = Kpi() +# kpi.kpi_id.kpi_id.uuid = item[0] +# kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) +# kpi.kpi_value.floatVal = item[2] +# kpi_list.kpi.append(kpi) +# total_points += 1 +# LOGGER.debug(kpi_list) +# if timestamp_utcnow_to_float() > end_timestamp: +# break +# +# assert total_points != 0 def test_events_tools( context_client : ContextClient, # pylint: disable=redefined-outer-name