Loading src/service/service/service_handlers/p4_fabric_tna_int/p4_fabric_tna_int_config.py +6 −0 Original line number Diff line number Diff line Loading @@ -36,6 +36,8 @@ LOGGER = logging.getLogger(__name__) INT_COLLECTOR_INFO = "int_collector_info" INT_REPORT_MIRROR_ID_LIST = "int_report_mirror_id_list" PORT_INT = "int_port" # In-band Network Telemetry transport port (of the collector) DURATION_SEC = "duration_sec" INTERVAL_SEC = "interval_sec" # INT tables TABLE_INT_WATCHLIST = "FabricIngress.int_watchlist.watchlist" Loading @@ -51,6 +53,10 @@ INT_REPORT_TYPE_FLOW = 1 INT_REPORT_TYPE_QUEUE = 2 INT_REPORT_TYPE_DROP = 4 # INT collection timings DEF_DURATION_SEC = 3000 DEF_INTERVAL_SEC = 1 def rules_set_up_int_watchlist(action : ConfigActionEnum) -> List [Tuple]: # type: ignore rule_no = cache_rule(TABLE_INT_WATCHLIST, action) Loading src/service/service/service_handlers/p4_fabric_tna_int/p4_fabric_tna_int_service_handler.py +166 −39 Original line number Diff line number Diff line Loading @@ -22,8 +22,12 @@ https://p4.org/p4-spec/docs/INT_v0_5.pdf import logging from typing import Any, List, Optional, Tuple, Union from uuid import uuid4 from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.proto.context_pb2 import ConfigActionEnum, DeviceId, Service, Device from common.proto.context_pb2 import ConfigActionEnum, ContextIdList, DeviceId, Service, Device, Empty from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.telemetry_frontend_pb2 import Collector, CollectorId from common.tools.object_factory.Device import json_device_id from common.type_checkers.Checkers import chk_type, chk_address_mac, chk_address_ipv4,\ chk_transport_port, chk_vlan_id Loading @@ -32,6 +36,10 @@ from service.service.service_handler_api.SettingsHandler import SettingsHandler from service.service.service_handlers.p4_fabric_tna_commons.p4_fabric_tna_commons import * from service.service.task_scheduler.TaskExecutor import TaskExecutor from context.client.ContextClient import ContextClient from kpi_manager.client.KpiManagerClient import KpiManagerClient from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient from .p4_fabric_tna_int_config import * LOGGER = logging.getLogger(__name__) Loading Loading @@ -63,6 +71,9 @@ class P4FabricINTServiceHandler(_ServiceHandler): self._parse_settings() self._print_settings() # TODO: Check whether the Telemetry service is up before issuing this call self._start_collector() @metered_subclass_method(METRICS_POOL) def SetEndpoint( self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None Loading Loading @@ -296,10 +307,13 @@ class P4FabricINTServiceHandler(_ServiceHandler): def _init_settings(self): self.__switch_info = {} self.__int_collector_info = {} self.__int_collector_iface = "" self.__int_collector_mac = "" self.__int_collector_ip = "" self.__int_collector_port = -1 self.__int_vlan_id = -1 self.__int_vlan_id = DEF_VLAN self.__int_collector_duration_s = DEF_DURATION_SEC self.__int_collector_interval_s = DEF_INTERVAL_SEC try: self.__settings = self.__settings_handler.get('/settings') Loading @@ -311,13 +325,14 @@ class P4FabricINTServiceHandler(_ServiceHandler): def _parse_settings(self): try: switch_info = self.__settings.value[SWITCH_INFO] assert isinstance(switch_info, list), "Switch info object must be a list" except Exception as ex: LOGGER.error("Failed to parse service settings: {}".format(ex)) raise Exception(ex) assert isinstance(switch_info, list), "Switch info object must be a list" for switch in switch_info: for switch_name, sw_info in switch.items(): try: assert switch_name, "Invalid P4 switch name" assert isinstance(sw_info, dict), "Switch {} info must be a map with arch, dpid, mac, ip, and int_port items)" assert sw_info[ARCH] in SUPPORTED_TARGET_ARCH_LIST, \ Loading @@ -336,11 +351,18 @@ class P4FabricINTServiceHandler(_ServiceHandler): sw_info[RECIRCULATION_PORT_LIST] = RECIRCULATION_PORTS_V1MODEL sw_info[INT_REPORT_MIRROR_ID_LIST] = INT_REPORT_MIRROR_ID_LIST_V1MODEL assert isinstance(sw_info[RECIRCULATION_PORT_LIST], list), "Switch {} - Recirculation ports must be described as a list".format(switch_name) except Exception as ex: LOGGER.error("Failed to parse switch {} information".format(switch_name)) return self.__switch_info[switch_name] = sw_info try: self.__int_collector_info = self.__settings.value[INT_COLLECTOR_INFO] assert isinstance(self.__int_collector_info, dict), "INT collector info object must be a map with mac, ip, port, and vlan_id keys)" self.__int_collector_iface = self.__int_collector_info[IFACE] assert self.__int_collector_iface, "Invalid P4 INT collector network interface" self.__int_collector_mac = self.__int_collector_info[MAC] assert chk_address_mac(self.__int_collector_mac), "Invalid P4 INT collector MAC address" Loading @@ -350,8 +372,27 @@ class P4FabricINTServiceHandler(_ServiceHandler): self.__int_collector_port = self.__int_collector_info[PORT] assert chk_transport_port(self.__int_collector_port), "Invalid P4 INT collector transport port" if self.__int_collector_info[VLAN_ID] > 0: self.__int_vlan_id = self.__int_collector_info[VLAN_ID] assert chk_vlan_id(self.__int_vlan_id), "Invalid VLAN ID" assert chk_vlan_id(self.__int_vlan_id), "Invalid VLAN ID for INT" else: LOGGER.warning("No or invalid INT VLAN ID is provided. Default VLAN ID is set to {} (No VLAN)".\ format(self.__int_vlan_id)) if self.__int_collector_info[DURATION_SEC] > 0: self.__int_collector_duration_s = self.__int_collector_info[DURATION_SEC] else: LOGGER.warning("No or invalid INT collection duration is provided. Default duration is set to {} seconds".\ format(self.__int_collector_duration_s)) if self.__int_collector_info[INTERVAL_SEC] > 0: self.__int_collector_interval_s = self.__int_collector_info[INTERVAL_SEC] else: LOGGER.warning("No or invalid INT collection interval is provided. Default interval is set to {} seconds".\ format(self.__int_collector_interval_s)) except Exception as ex: LOGGER.error("Failed to parse INT collector information") return def _print_settings(self): LOGGER.info("-------------------- {} settings --------------------".format(self.__service.name)) Loading @@ -366,10 +407,13 @@ class P4FabricINTServiceHandler(_ServiceHandler): LOGGER.info("\t\t| INT port type: {}".format(switch_info[PORT_INT][PORT_TYPE])) LOGGER.info("\t\t| Recirculation port list: {}".format(switch_info[RECIRCULATION_PORT_LIST])) LOGGER.info("\t\t| Report mirror ID list: {}".format(switch_info[INT_REPORT_MIRROR_ID_LIST])) LOGGER.info("--- INT collector interface: {}".format(self.__int_collector_iface)) LOGGER.info("--- INT collector MAC: {}".format(self.__int_collector_mac)) LOGGER.info("--- INT collector IP: {}".format(self.__int_collector_ip)) LOGGER.info("--- INT collector port: {}".format(self.__int_collector_port)) LOGGER.info("--- INT VLAN ID: {}".format(self.__int_vlan_id)) LOGGER.info("--- INT collector duration: {} sec".format(self.__int_collector_duration_s)) LOGGER.info("--- INT collector interval: {} sec".format(self.__int_collector_interval_s)) LOGGER.info("-----------------------------------------------------------------") def _create_rules(self, device_obj : Device, action : ConfigActionEnum): # type: ignore Loading Loading @@ -474,3 +518,86 @@ class P4FabricINTServiceHandler(_ServiceHandler): raise Exception(ex) return rules def _retrieve_context_for_int_collector(self): ctx_id = service_id = dev_id = ep_id = None try: context_client = ContextClient() response : ContextIdList = context_client.ListContextIds(Empty()) # type: ignore # Get the context ctx_id = response.context_ids[0].context_uuid.uuid assert ctx_id, "Cannot create INT collector with invalid context ID" LOGGER.debug("Context ID: {}".format(ctx_id)) service_id = self.__service.service_id.service_uuid.uuid assert service_id, "Cannot create INT collector with invalid service ID" LOGGER.debug("Service ID: {}".format(service_id)) # Get a service endpoint svc_endpoints = self.__service.service_endpoint_ids[0] assert svc_endpoints, "Cannot create INT collector: No service endpoints are established" # Get a P4 device associated with this endpoint dev_id = svc_endpoints.device_id.device_uuid.uuid assert dev_id, "Cannot create INT collector with invalid device ID" LOGGER.debug("Device ID: {}".format(dev_id)) # Get the endpoint ID ep_id = svc_endpoints.endpoint_uuid.uuid assert ep_id, "Cannot create INT collector with invalid endpoint ID" LOGGER.debug("Endpoint ID: {}".format(ep_id)) except Exception as ex: LOGGER.error("Failed to retrieve context for starting the INT collector: {}".format(ex)) raise ex return ctx_id, service_id, dev_id, ep_id def _start_collector(self): ctx_id = service_id = dev_id = ep_id = None try: ctx_id, service_id, dev_id, ep_id = self._retrieve_context_for_int_collector() except Exception: LOGGER.error("INT collector cannot be initialized due to missing information") return # Create a "virtual" INT KPI associated with this context and P4 dataplane kpi_id_int = None try: kpi_descriptor_int = KpiDescriptor() kpi_descriptor_int.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_UNKNOWN kpi_descriptor_int.service_id.service_uuid.uuid = service_id kpi_descriptor_int.device_id.device_uuid.uuid = dev_id kpi_descriptor_int.endpoint_id.endpoint_uuid.uuid = ep_id kpi_descriptor_int.kpi_id.kpi_id.uuid = str(uuid4()) # Set this new KPI kpi_manager_client = KpiManagerClient() kpi_id_int: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_int) # type: ignore LOGGER.debug("INT KPI ID: {}".format(kpi_id_int)) except Exception: LOGGER.error("INT collector cannot be initialized due to failed KPI initialization") return # Initialize an INT collector object try: collect_int = Collector() collect_int.collector_id.collector_id.uuid = str(uuid4()) collect_int.kpi_id.kpi_id.uuid = kpi_id_int.kpi_id.uuid collect_int.duration_s = self.__int_collector_duration_s collect_int.interval_s = self.__int_collector_interval_s collect_int.int_collector.interface = self.__int_collector_iface collect_int.int_collector.transport_port = self.__int_collector_port collect_int.int_collector.service_id = service_id collect_int.int_collector.context_id = ctx_id LOGGER.info("INT Collector: {}".format(str(collect_int))) telemetry_frontend_client = TelemetryFrontendClient() collect_id: CollectorId = telemetry_frontend_client.StartCollector(collect_int) # type: ignore assert collect_id.uuid, "INT collector failed to start" except Exception: LOGGER.error("INT collector cannot be initialized") return LOGGER.info("INT collector with ID {} is successfully invoked".format(collect_id)) src/tests/p4-fabric-tna/descriptors/service-p4-int.json +3 −1 Original line number Diff line number Diff line Loading @@ -42,7 +42,9 @@ "mac": "3e:87:de:3d:6d:33", "ip": "192.168.5.131", "port": 12345, "vlan_id": 4094 "vlan_id": 4094, "duration_sec": 2000, "interval_sec": 1 } } } Loading Loading
src/service/service/service_handlers/p4_fabric_tna_int/p4_fabric_tna_int_config.py +6 −0 Original line number Diff line number Diff line Loading @@ -36,6 +36,8 @@ LOGGER = logging.getLogger(__name__) INT_COLLECTOR_INFO = "int_collector_info" INT_REPORT_MIRROR_ID_LIST = "int_report_mirror_id_list" PORT_INT = "int_port" # In-band Network Telemetry transport port (of the collector) DURATION_SEC = "duration_sec" INTERVAL_SEC = "interval_sec" # INT tables TABLE_INT_WATCHLIST = "FabricIngress.int_watchlist.watchlist" Loading @@ -51,6 +53,10 @@ INT_REPORT_TYPE_FLOW = 1 INT_REPORT_TYPE_QUEUE = 2 INT_REPORT_TYPE_DROP = 4 # INT collection timings DEF_DURATION_SEC = 3000 DEF_INTERVAL_SEC = 1 def rules_set_up_int_watchlist(action : ConfigActionEnum) -> List [Tuple]: # type: ignore rule_no = cache_rule(TABLE_INT_WATCHLIST, action) Loading
src/service/service/service_handlers/p4_fabric_tna_int/p4_fabric_tna_int_service_handler.py +166 −39 Original line number Diff line number Diff line Loading @@ -22,8 +22,12 @@ https://p4.org/p4-spec/docs/INT_v0_5.pdf import logging from typing import Any, List, Optional, Tuple, Union from uuid import uuid4 from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.proto.context_pb2 import ConfigActionEnum, DeviceId, Service, Device from common.proto.context_pb2 import ConfigActionEnum, ContextIdList, DeviceId, Service, Device, Empty from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.telemetry_frontend_pb2 import Collector, CollectorId from common.tools.object_factory.Device import json_device_id from common.type_checkers.Checkers import chk_type, chk_address_mac, chk_address_ipv4,\ chk_transport_port, chk_vlan_id Loading @@ -32,6 +36,10 @@ from service.service.service_handler_api.SettingsHandler import SettingsHandler from service.service.service_handlers.p4_fabric_tna_commons.p4_fabric_tna_commons import * from service.service.task_scheduler.TaskExecutor import TaskExecutor from context.client.ContextClient import ContextClient from kpi_manager.client.KpiManagerClient import KpiManagerClient from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient from .p4_fabric_tna_int_config import * LOGGER = logging.getLogger(__name__) Loading Loading @@ -63,6 +71,9 @@ class P4FabricINTServiceHandler(_ServiceHandler): self._parse_settings() self._print_settings() # TODO: Check whether the Telemetry service is up before issuing this call self._start_collector() @metered_subclass_method(METRICS_POOL) def SetEndpoint( self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None Loading Loading @@ -296,10 +307,13 @@ class P4FabricINTServiceHandler(_ServiceHandler): def _init_settings(self): self.__switch_info = {} self.__int_collector_info = {} self.__int_collector_iface = "" self.__int_collector_mac = "" self.__int_collector_ip = "" self.__int_collector_port = -1 self.__int_vlan_id = -1 self.__int_vlan_id = DEF_VLAN self.__int_collector_duration_s = DEF_DURATION_SEC self.__int_collector_interval_s = DEF_INTERVAL_SEC try: self.__settings = self.__settings_handler.get('/settings') Loading @@ -311,13 +325,14 @@ class P4FabricINTServiceHandler(_ServiceHandler): def _parse_settings(self): try: switch_info = self.__settings.value[SWITCH_INFO] assert isinstance(switch_info, list), "Switch info object must be a list" except Exception as ex: LOGGER.error("Failed to parse service settings: {}".format(ex)) raise Exception(ex) assert isinstance(switch_info, list), "Switch info object must be a list" for switch in switch_info: for switch_name, sw_info in switch.items(): try: assert switch_name, "Invalid P4 switch name" assert isinstance(sw_info, dict), "Switch {} info must be a map with arch, dpid, mac, ip, and int_port items)" assert sw_info[ARCH] in SUPPORTED_TARGET_ARCH_LIST, \ Loading @@ -336,11 +351,18 @@ class P4FabricINTServiceHandler(_ServiceHandler): sw_info[RECIRCULATION_PORT_LIST] = RECIRCULATION_PORTS_V1MODEL sw_info[INT_REPORT_MIRROR_ID_LIST] = INT_REPORT_MIRROR_ID_LIST_V1MODEL assert isinstance(sw_info[RECIRCULATION_PORT_LIST], list), "Switch {} - Recirculation ports must be described as a list".format(switch_name) except Exception as ex: LOGGER.error("Failed to parse switch {} information".format(switch_name)) return self.__switch_info[switch_name] = sw_info try: self.__int_collector_info = self.__settings.value[INT_COLLECTOR_INFO] assert isinstance(self.__int_collector_info, dict), "INT collector info object must be a map with mac, ip, port, and vlan_id keys)" self.__int_collector_iface = self.__int_collector_info[IFACE] assert self.__int_collector_iface, "Invalid P4 INT collector network interface" self.__int_collector_mac = self.__int_collector_info[MAC] assert chk_address_mac(self.__int_collector_mac), "Invalid P4 INT collector MAC address" Loading @@ -350,8 +372,27 @@ class P4FabricINTServiceHandler(_ServiceHandler): self.__int_collector_port = self.__int_collector_info[PORT] assert chk_transport_port(self.__int_collector_port), "Invalid P4 INT collector transport port" if self.__int_collector_info[VLAN_ID] > 0: self.__int_vlan_id = self.__int_collector_info[VLAN_ID] assert chk_vlan_id(self.__int_vlan_id), "Invalid VLAN ID" assert chk_vlan_id(self.__int_vlan_id), "Invalid VLAN ID for INT" else: LOGGER.warning("No or invalid INT VLAN ID is provided. Default VLAN ID is set to {} (No VLAN)".\ format(self.__int_vlan_id)) if self.__int_collector_info[DURATION_SEC] > 0: self.__int_collector_duration_s = self.__int_collector_info[DURATION_SEC] else: LOGGER.warning("No or invalid INT collection duration is provided. Default duration is set to {} seconds".\ format(self.__int_collector_duration_s)) if self.__int_collector_info[INTERVAL_SEC] > 0: self.__int_collector_interval_s = self.__int_collector_info[INTERVAL_SEC] else: LOGGER.warning("No or invalid INT collection interval is provided. Default interval is set to {} seconds".\ format(self.__int_collector_interval_s)) except Exception as ex: LOGGER.error("Failed to parse INT collector information") return def _print_settings(self): LOGGER.info("-------------------- {} settings --------------------".format(self.__service.name)) Loading @@ -366,10 +407,13 @@ class P4FabricINTServiceHandler(_ServiceHandler): LOGGER.info("\t\t| INT port type: {}".format(switch_info[PORT_INT][PORT_TYPE])) LOGGER.info("\t\t| Recirculation port list: {}".format(switch_info[RECIRCULATION_PORT_LIST])) LOGGER.info("\t\t| Report mirror ID list: {}".format(switch_info[INT_REPORT_MIRROR_ID_LIST])) LOGGER.info("--- INT collector interface: {}".format(self.__int_collector_iface)) LOGGER.info("--- INT collector MAC: {}".format(self.__int_collector_mac)) LOGGER.info("--- INT collector IP: {}".format(self.__int_collector_ip)) LOGGER.info("--- INT collector port: {}".format(self.__int_collector_port)) LOGGER.info("--- INT VLAN ID: {}".format(self.__int_vlan_id)) LOGGER.info("--- INT collector duration: {} sec".format(self.__int_collector_duration_s)) LOGGER.info("--- INT collector interval: {} sec".format(self.__int_collector_interval_s)) LOGGER.info("-----------------------------------------------------------------") def _create_rules(self, device_obj : Device, action : ConfigActionEnum): # type: ignore Loading Loading @@ -474,3 +518,86 @@ class P4FabricINTServiceHandler(_ServiceHandler): raise Exception(ex) return rules def _retrieve_context_for_int_collector(self): ctx_id = service_id = dev_id = ep_id = None try: context_client = ContextClient() response : ContextIdList = context_client.ListContextIds(Empty()) # type: ignore # Get the context ctx_id = response.context_ids[0].context_uuid.uuid assert ctx_id, "Cannot create INT collector with invalid context ID" LOGGER.debug("Context ID: {}".format(ctx_id)) service_id = self.__service.service_id.service_uuid.uuid assert service_id, "Cannot create INT collector with invalid service ID" LOGGER.debug("Service ID: {}".format(service_id)) # Get a service endpoint svc_endpoints = self.__service.service_endpoint_ids[0] assert svc_endpoints, "Cannot create INT collector: No service endpoints are established" # Get a P4 device associated with this endpoint dev_id = svc_endpoints.device_id.device_uuid.uuid assert dev_id, "Cannot create INT collector with invalid device ID" LOGGER.debug("Device ID: {}".format(dev_id)) # Get the endpoint ID ep_id = svc_endpoints.endpoint_uuid.uuid assert ep_id, "Cannot create INT collector with invalid endpoint ID" LOGGER.debug("Endpoint ID: {}".format(ep_id)) except Exception as ex: LOGGER.error("Failed to retrieve context for starting the INT collector: {}".format(ex)) raise ex return ctx_id, service_id, dev_id, ep_id def _start_collector(self): ctx_id = service_id = dev_id = ep_id = None try: ctx_id, service_id, dev_id, ep_id = self._retrieve_context_for_int_collector() except Exception: LOGGER.error("INT collector cannot be initialized due to missing information") return # Create a "virtual" INT KPI associated with this context and P4 dataplane kpi_id_int = None try: kpi_descriptor_int = KpiDescriptor() kpi_descriptor_int.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_UNKNOWN kpi_descriptor_int.service_id.service_uuid.uuid = service_id kpi_descriptor_int.device_id.device_uuid.uuid = dev_id kpi_descriptor_int.endpoint_id.endpoint_uuid.uuid = ep_id kpi_descriptor_int.kpi_id.kpi_id.uuid = str(uuid4()) # Set this new KPI kpi_manager_client = KpiManagerClient() kpi_id_int: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_int) # type: ignore LOGGER.debug("INT KPI ID: {}".format(kpi_id_int)) except Exception: LOGGER.error("INT collector cannot be initialized due to failed KPI initialization") return # Initialize an INT collector object try: collect_int = Collector() collect_int.collector_id.collector_id.uuid = str(uuid4()) collect_int.kpi_id.kpi_id.uuid = kpi_id_int.kpi_id.uuid collect_int.duration_s = self.__int_collector_duration_s collect_int.interval_s = self.__int_collector_interval_s collect_int.int_collector.interface = self.__int_collector_iface collect_int.int_collector.transport_port = self.__int_collector_port collect_int.int_collector.service_id = service_id collect_int.int_collector.context_id = ctx_id LOGGER.info("INT Collector: {}".format(str(collect_int))) telemetry_frontend_client = TelemetryFrontendClient() collect_id: CollectorId = telemetry_frontend_client.StartCollector(collect_int) # type: ignore assert collect_id.uuid, "INT collector failed to start" except Exception: LOGGER.error("INT collector cannot be initialized") return LOGGER.info("INT collector with ID {} is successfully invoked".format(collect_id))
src/tests/p4-fabric-tna/descriptors/service-p4-int.json +3 −1 Original line number Diff line number Diff line Loading @@ -42,7 +42,9 @@ "mac": "3e:87:de:3d:6d:33", "ip": "192.168.5.131", "port": 12345, "vlan_id": 4094 "vlan_id": 4094, "duration_sec": 2000, "interval_sec": 1 } } } Loading