From 7b86b36eb0eff567a720926505c5c761e56414b7 Mon Sep 17 00:00:00 2001 From: "Georgios P. Katsikas" <gkatsikas@ubitech.eu> Date: Fri, 29 Nov 2024 13:12:16 +0000 Subject: [PATCH] feat: P4 endpoints support and several bug fixes --- src/device/service/driver_api/_Driver.py | 1 + src/device/service/drivers/p4/p4_common.py | 102 +++++++- src/device/service/drivers/p4/p4_context.py | 11 +- src/device/service/drivers/p4/p4_driver.py | 240 ++++++++++++----- src/device/service/drivers/p4/p4_manager.py | 276 ++++++++++++-------- 5 files changed, 451 insertions(+), 179 deletions(-) diff --git a/src/device/service/driver_api/_Driver.py b/src/device/service/driver_api/_Driver.py index 2580c3e78..bbb78cd43 100644 --- a/src/device/service/driver_api/_Driver.py +++ b/src/device/service/driver_api/_Driver.py @@ -25,6 +25,7 @@ RESOURCE_ROUTING_POLICIES = '__routing_policies__' RESOURCE_SERVICES = '__services__' RESOURCE_ACL = '__acl__' RESOURCE_INVENTORY = '__inventory__' +RESOURCE_RULES = "__rules__" class _Driver: diff --git a/src/device/service/drivers/p4/p4_common.py b/src/device/service/drivers/p4/p4_common.py index ec8514937..b55296a65 100644 --- a/src/device/service/drivers/p4/p4_common.py +++ b/src/device/service/drivers/p4/p4_common.py @@ -27,10 +27,12 @@ import math import re import socket import ipaddress +from typing import Any, Dict, List, Optional, Tuple from ctypes import c_uint16, sizeof import macaddress -from common.type_checkers.Checkers import chk_type +from common.type_checkers.Checkers import \ + chk_attribute, chk_string, chk_type, chk_issubclass try: from .p4_exception import UserBadValueError except ImportError: @@ -38,6 +40,7 @@ except ImportError: P4_ATTR_DEV_ID = "id" P4_ATTR_DEV_NAME = "name" +P4_ATTR_DEV_ENDPOINTS = "endpoints" P4_ATTR_DEV_VENDOR = "vendor" P4_ATTR_DEV_HW_VER = "hw_ver" P4_ATTR_DEV_SW_VER = "sw_ver" @@ -50,6 +53,7 @@ P4_VAL_DEF_HW_VER = "BMv2 simple_switch" P4_VAL_DEF_SW_VER = "Stratum" P4_VAL_DEF_TIMEOUT = 60 +RESOURCE_ENDPOINTS_ROOT_PATH = "/endpoints" # Logger instance LOGGER = logging.getLogger(__name__) @@ -422,6 +426,28 @@ def parse_action_parameters_from_json(resource): return action_params +def parse_replicas_from_json(resource): + """ + Parse the session replicas within a JSON-based object. + + :param resource: JSON-based object + :return: map of replicas + """ + if not resource or ("replicas" not in resource): + LOGGER.warning( + "JSON entry misses 'replicas' list of attributes") + return None + chk_type("replicas", resource["replicas"], list) + + replicas = {} + for rep in resource["replicas"]: + chk_type("egress-port", rep["egress-port"], int) + chk_type("instance", rep["instance"], int) + replicas[rep["egress-port"]] = rep["instance"] + + return replicas + + def parse_integer_list_from_json(resource, resource_list, resource_item): """ Parse the list of integers within a JSON-based object. @@ -443,3 +469,77 @@ def parse_integer_list_from_json(resource, resource_list, resource_item): integers_list.append(item[resource_item]) return integers_list + +def process_optional_string_field( + #TODO: Consider adding this in common methdos as it is taken by the Emulated driver + endpoint_data : Dict[str, Any], field_name : str, endpoint_resource_value : Dict[str, Any] +) -> None: + field_value = chk_attribute(field_name, endpoint_data, 'endpoint_data', default=None) + if field_value is None: return + chk_string('endpoint_data.{:s}'.format(field_name), field_value) + if len(field_value) > 0: endpoint_resource_value[field_name] = field_value + +def compose_resource_endpoints(endpoints_list : List[Tuple[str, Any]]): + #TODO: Consider adding this in common methods; currently taken by the Emulated driver + endpoint_resources = [] + for i, endpoint in enumerate(endpoints_list): + LOGGER.debug("P4 endpoint {}: {}".format(i, endpoint)) + endpoint_resource = compose_resource_endpoint(endpoint) + if endpoint_resource is None: continue + endpoint_resources.append(endpoint_resource) + return endpoint_resources + +def compose_resource_endpoint(endpoint_data : Dict[str, Any]) -> Optional[Tuple[str, Dict]]: + #TODO: Consider adding this in common methods; currently taken by the Emulated driver + try: + endpoint_uuid = chk_attribute('uuid', endpoint_data, 'endpoint_data') + chk_string('endpoint_data.uuid', endpoint_uuid, min_length=1) + endpoint_resource_path = RESOURCE_ENDPOINTS_ROOT_PATH + endpoint_resource_key = '{:s}/endpoint[{:s}]'.format(endpoint_resource_path, endpoint_uuid) + endpoint_resource_value = {'uuid': endpoint_uuid} + + # Check endpoint's optional string fields + process_optional_string_field(endpoint_data, 'name', endpoint_resource_value) + process_optional_string_field(endpoint_data, 'type', endpoint_resource_value) + process_optional_string_field(endpoint_data, 'context_uuid', endpoint_resource_value) + process_optional_string_field(endpoint_data, 'topology_uuid', endpoint_resource_value) + + return endpoint_resource_key, endpoint_resource_value + except: # pylint: disable=bare-except + LOGGER.error('Problem composing endpoint({:s})'.format(str(endpoint_data))) + return None + +def compose_resource_rules(rules_list : List[Tuple[str, Any]]): + rule_resources = [] + for i, rule in enumerate(rules_list): + rule_resource = compose_resource_rule(rule_data=rule, rule_cnt=i) + if rule_resource is None: continue + rule_resources.append(rule_resource) + return rule_resources + +def compose_resource_rule(rule_data : Dict[str, Any], rule_cnt : int) -> Optional[Tuple[str, Dict]]: + try: + LOGGER.info("Rule: {}".format(rule_data)) + + rule_resource_key = chk_attribute('resource_key', rule_data, 'rule_data') + chk_string('rule_data.resource_key', rule_resource_key, min_length=1) + + rule_resource_value = chk_attribute('resource_value', rule_data, 'rule_data') + chk_issubclass('rule_data.resource_value', rule_resource_value, dict) + + rule_key_unique = "" + + if "table" == rule_resource_key: + table_name = parse_resource_string_from_json(rule_resource_value, "table-name") + assert table_name, "Invalid table name in rule" + rule_key_unique = '/{0}s/{0}/{1}[{2}]'.format(rule_resource_key, table_name, rule_cnt) + else: + msg = f"Parsed an invalid key {rule_resource_key}" + LOGGER.error(msg) + raise Exception(msg) + + assert rule_key_unique, "Invalid unique resource key" + return rule_key_unique, rule_resource_value + except: # pylint: disable=bare-except + LOGGER.error('Problem composing rule({:s})'.format(str(rule_data))) + return None diff --git a/src/device/service/drivers/p4/p4_context.py b/src/device/service/drivers/p4/p4_context.py index ca8f0c19e..ce8e308e8 100644 --- a/src/device/service/drivers/p4/p4_context.py +++ b/src/device/service/drivers/p4/p4_context.py @@ -34,6 +34,7 @@ class P4Type(enum.Enum): meter = 6 direct_meter = 7 controller_packet_metadata = 8 + digest = 9 P4Type.table.p4info_name = "tables" @@ -44,6 +45,7 @@ P4Type.direct_counter.p4info_name = "direct_counters" P4Type.meter.p4info_name = "meters" P4Type.direct_meter.p4info_name = "direct_meters" P4Type.controller_packet_metadata.p4info_name = "controller_packet_metadata" +P4Type.digest.p4info_name = "digests" for object_type in P4Type: object_type.pretty_name = object_type.name.replace('_', ' ') @@ -58,11 +60,12 @@ class P4RuntimeEntity(enum.Enum): table_entry = 1 action_profile_member = 2 action_profile_group = 3 - meter_entry = 4 - direct_meter_entry = 5 - counter_entry = 6 - direct_counter_entry = 7 + counter_entry = 4 + direct_counter_entry = 5 + meter_entry = 6 + direct_meter_entry = 7 packet_replication_engine_entry = 8 + digest_entry = 9 class Context: diff --git a/src/device/service/drivers/p4/p4_driver.py b/src/device/service/drivers/p4/p4_driver.py index d31fa4673..c89a42bad 100644 --- a/src/device/service/drivers/p4/p4_driver.py +++ b/src/device/service/drivers/p4/p4_driver.py @@ -23,15 +23,19 @@ import threading from typing import Any, Iterator, List, Optional, Tuple, Union from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.type_checkers.Checkers import chk_type, chk_length, chk_string +from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_RULES from .p4_common import matches_ipv4, matches_ipv6, valid_port,\ - P4_ATTR_DEV_ID, P4_ATTR_DEV_NAME, P4_ATTR_DEV_VENDOR,\ - P4_ATTR_DEV_HW_VER, P4_ATTR_DEV_SW_VER,\ + compose_resource_endpoints, parse_resource_string_from_json,\ + P4_ATTR_DEV_ID, P4_ATTR_DEV_NAME, P4_ATTR_DEV_ENDPOINTS,\ + P4_ATTR_DEV_VENDOR, P4_ATTR_DEV_HW_VER, P4_ATTR_DEV_SW_VER,\ P4_ATTR_DEV_P4BIN, P4_ATTR_DEV_P4INFO, P4_ATTR_DEV_TIMEOUT,\ P4_VAL_DEF_VENDOR, P4_VAL_DEF_HW_VER, P4_VAL_DEF_SW_VER,\ P4_VAL_DEF_TIMEOUT -from .p4_manager import P4Manager, KEY_TABLE, KEY_ACTION, \ - KEY_ACTION_PROFILE, KEY_COUNTER, KEY_DIR_COUNTER, KEY_METER, KEY_DIR_METER,\ - KEY_CTL_PKT_METADATA +from .p4_manager import P4Manager, \ + KEY_TABLE, KEY_ACTION, KEY_ACTION_PROFILE, \ + KEY_COUNTER, KEY_DIR_COUNTER, KEY_METER, KEY_DIR_METER,\ + KEY_CTL_PKT_METADATA, KEY_DIGEST, KEY_CLONE_SESSION,\ + KEY_ENDPOINT from .p4_client import WriteOperation try: @@ -59,6 +63,8 @@ class P4Driver(_Driver): P4 device datapath ID (Mandatory) name : str P4 device name (Optional) + endpoints : list + List of P4 device endpoints, i.e., ports (Optional) vendor : str P4 device vendor (Optional) hw_ver : str @@ -70,17 +76,22 @@ class P4Driver(_Driver): p4info : str Path to P4 info file (Optional, but must be combined with p4bin) timeout : int - Device timeout in seconds (Optional) + P4 device timeout in seconds (Optional) + rules : list + List of rules to configure the P4 device's pipeline """ def __init__(self, address: str, port: int, **settings) -> None: - super().__init__(settings.pop('name', DRIVER_NAME), address, port, **settings) + super().__init__(name=DRIVER_NAME, address=address, port=port, setting=settings) self.__manager = None self.__address = address self.__port = int(port) - self.__endpoint = None + self.__grpc_endpoint = None self.__settings = settings self.__id = None + self.__name = None + self.__endpoints = [] + self.__rules = {} self.__vendor = P4_VAL_DEF_VENDOR self.__hw_version = P4_VAL_DEF_HW_VER self.__sw_version = P4_VAL_DEF_SW_VER @@ -97,7 +108,7 @@ class P4Driver(_Driver): self.__address, self.__port) for key, value in settings.items(): - LOGGER.info("\t%8s = %s", key, value) + LOGGER.info("\t%9s = %s", key, value) def Connect(self) -> bool: """ @@ -105,14 +116,14 @@ class P4Driver(_Driver): :return: boolean connection status. """ - LOGGER.info("Connecting to P4 device %s ...", self.__endpoint) + LOGGER.info("Connecting to P4 device %s ...", self.__grpc_endpoint) with self.__lock: # Skip if already connected if self.__started.is_set(): return True - # Dynamically devise an election ID + # TODO: Dynamically devise an election ID election_id = (1, 0) # Spawn a P4 manager for this device @@ -140,7 +151,7 @@ class P4Driver(_Driver): :return: boolean disconnection status. """ - LOGGER.info("Disconnecting from P4 device %s ...", self.__endpoint) + LOGGER.info("Disconnecting from P4 device %s ...", self.__grpc_endpoint) # If not started, assume it is already disconnected if not self.__started.is_set(): @@ -167,13 +178,15 @@ class P4Driver(_Driver): :return: list of initial configuration items. """ - initial_conf = [] + + resource_keys = [RESOURCE_ENDPOINTS] if self.__endpoints else [] with self.__lock: - if not initial_conf: - LOGGER.warning("No initial configuration for P4 device %s ...", - self.__endpoint) - return [] + if not resource_keys: + LOGGER.warning("No initial configuration for P4 device {} ...".format(self.__grpc_endpoint)) + return [] + LOGGER.info("Initial configuration for P4 device {}:".format(self.__grpc_endpoint)) + return self.GetConfig(resource_keys) @metered_subclass_method(METRICS_POOL) def GetConfig(self, resource_keys: List[str] = [])\ @@ -186,7 +199,7 @@ class P4Driver(_Driver): None/Exception. """ LOGGER.info( - "Getting configuration from P4 device %s ...", self.__endpoint) + "Getting configuration from P4 device %s ...", self.__grpc_endpoint) # No resource keys means fetch all configuration if len(resource_keys) == 0: @@ -195,7 +208,7 @@ class P4Driver(_Driver): "implies getting all resource keys!") resource_keys = [ obj_name for obj_name, _ in self.__manager.p4_objects.items() - ] + ] + [RESOURCE_ENDPOINTS] + [RESOURCE_RULES] # Verify the input type chk_type("resources", resource_keys, list) @@ -214,7 +227,7 @@ class P4Driver(_Driver): changes requested. """ LOGGER.info( - "Setting configuration to P4 device %s ...", self.__endpoint) + "Setting configuration to P4 device %s ...", self.__grpc_endpoint) if not resources or len(resources) == 0: LOGGER.warning( @@ -238,7 +251,7 @@ class P4Driver(_Driver): deletions requested. """ LOGGER.info( - "Deleting configuration from P4 device %s ...", self.__endpoint) + "Deleting configuration from P4 device %s ...", self.__grpc_endpoint) if not resources or len(resources) == 0: LOGGER.warning( @@ -308,6 +321,14 @@ class P4Driver(_Driver): """ return self.__manager + def is_started(self): + """ + Check if an instance of the P4 manager is started. + + :return: boolean P4 manager instance status + """ + return self.__started.is_set() + def __parse_and_validate_settings(self): """ Verify that the driver inputs comply to what is expected. @@ -319,7 +340,7 @@ class P4Driver(_Driver): f"{self.__address} not a valid IPv4 or IPv6 address" assert valid_port(self.__port), \ f"{self.__port} not a valid transport port" - self.__endpoint = f"{self.__address}:{self.__port}" + self.__grpc_endpoint = f"{self.__address}:{self.__port}" # Device ID try: @@ -337,6 +358,16 @@ class P4Driver(_Driver): "No device name is provided. Setting default name: %s", self.__name) + # Device endpoints + if P4_ATTR_DEV_ENDPOINTS in self.__settings: + endpoints = self.__settings.get(P4_ATTR_DEV_ENDPOINTS, []) + endpoint_resources = compose_resource_endpoints(endpoints) + if endpoint_resources: + LOGGER.info("Setting endpoints: {}".format(endpoint_resources)) + self.SetConfig(endpoint_resources) + else: + LOGGER.warning("No device endpoints are provided.") + # Device vendor if P4_ATTR_DEV_VENDOR in self.__settings: self.__vendor = self.__settings.get(P4_ATTR_DEV_VENDOR) @@ -365,7 +396,7 @@ class P4Driver(_Driver): if P4_ATTR_DEV_P4BIN in self.__settings: self.__p4bin_path = self.__settings.get(P4_ATTR_DEV_P4BIN) assert os.path.exists(self.__p4bin_path),\ - "Invalid path to p4bin file" + "Invalid path to p4bin file: {}".format(self.__p4bin_path) assert P4_ATTR_DEV_P4INFO in self.__settings,\ "p4info and p4bin settings must be provided together" @@ -373,7 +404,7 @@ class P4Driver(_Driver): if P4_ATTR_DEV_P4INFO in self.__settings: self.__p4info_path = self.__settings.get(P4_ATTR_DEV_P4INFO) assert os.path.exists(self.__p4info_path),\ - "Invalid path to p4info file" + "Invalid path to p4info file: {}".format(self.__p4info_path) assert P4_ATTR_DEV_P4BIN in self.__settings,\ "p4info and p4bin settings must be provided together" @@ -404,7 +435,7 @@ class P4Driver(_Driver): """ resources = [] - LOGGER.debug("GetConfig() -> Keys: %s", resource_keys) + LOGGER.info("GetConfig() -> Keys: {}".format(resource_keys)) for resource_key in resource_keys: entries = [] @@ -423,8 +454,7 @@ class P4Driver(_Driver): entries.append(c_entries) elif KEY_DIR_COUNTER == resource_key: for d_cnt_name in self.__manager.get_direct_counter_names(): - dc_entries = \ - self.__manager.direct_counter_entries_to_json( + dc_entries = self.__manager.direct_counter_entries_to_json( d_cnt_name) if dc_entries: entries.append(dc_entries) @@ -436,28 +466,35 @@ class P4Driver(_Driver): entries.append(m_entries) elif KEY_DIR_METER == resource_key: for d_meter_name in self.__manager.get_direct_meter_names(): - dm_entries = \ - self.__manager.direct_meter_entries_to_json( + dm_entries = self.__manager.direct_meter_entries_to_json( d_meter_name) if dm_entries: entries.append(dm_entries) elif KEY_ACTION_PROFILE == resource_key: for ap_name in self.__manager.get_action_profile_names(): - ap_entries = \ - self.__manager.action_prof_member_entries_to_json( + ap_entries = self.__manager.action_prof_member_entries_to_json( ap_name) if ap_entries: entries.append(ap_entries) elif KEY_ACTION == resource_key: - #To be implemented or deprecated - pass - elif '__endpoints__' == resource_key: - #Not Supported for P4 devices + # To be implemented or deprecated pass elif KEY_CTL_PKT_METADATA == resource_key: + #TODO: Handle controller packet metadata msg = f"{resource_key.capitalize()} is not a " \ f"retrievable resource" - raise Exception(msg) + LOGGER.warning("%s", msg) + elif KEY_DIGEST == resource_key: + #TODO: Handle digests + msg = f"{resource_key.capitalize()} is not a " \ + f"retrievable resource" + LOGGER.warning("%s", msg) + elif RESOURCE_ENDPOINTS == resource_key: + resources += self.__endpoints + continue + elif RESOURCE_RULES == resource_key: + resources = self.__rules_into_resources() + continue else: msg = f"GetConfig failed due to invalid " \ f"resource key: {resource_key}" @@ -465,8 +502,10 @@ class P4Driver(_Driver): resources.append( (resource_key, entries if entries else None) ) - except Exception as ex: # pylint: disable=broad-except - resources.append((resource_key, ex)) + except Exception as e: # pylint: disable=broad-except + resources.append((resource_key, e)) + + LOGGER.info("GetConfig() -> Results: %s", resources) return resources @@ -480,6 +519,8 @@ class P4Driver(_Driver): """ results = [] + LOGGER.info("SetConfig -> Resources {}".format(resources)) + for i, resource in enumerate(resources): str_resource_name = f"resources[#{i}]" resource_key = "" @@ -499,11 +540,15 @@ class P4Driver(_Driver): continue try: - resource_value = json.loads(resource_value) - except Exception: # pylint: disable=broad-except - pass + # Rules are JSON-based, endpoints are not + if "endpoint" not in resource_key: + resource_value = json.loads(resource_value) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception("Exception validating resource value {}".format(resource_value)) + results.append(e) + continue - LOGGER.debug( + LOGGER.info( "SetConfig() -> Key: %s - Value: %s", resource_key, resource_value) @@ -512,13 +557,22 @@ class P4Driver(_Driver): # to be inserted already exists, thus simply needs an update. operation = WriteOperation.insert + # Dataplane and cache rule insertion process try: - self.__apply_operation(resource_key, resource_value, operation) - results.append(True) - except Exception as ex: # pylint: disable=broad-except - results.append(ex) + r2, r3 = False, True + r1 = self.__cache_rule_insert(resource_key, resource_value, operation) + # Cache insertion succeeded, proceed to dataplane + if r1: + r2 = self.__apply_operation(resource_key, resource_value, operation) + # Dataplane insertion did not succeed --> Revert caching + if not r2 and r1: + r3 = self.__cache_rule_remove(resource_key) + results.append(r1 & r2 & r3) + except Exception as e: # pylint: disable=broad-except + results.append(e) + continue - print(results) + LOGGER.info("SetConfig() -> Results: {}".format(results)) return results @@ -552,21 +606,31 @@ class P4Driver(_Driver): try: resource_value = json.loads(resource_value) - except Exception: # pylint: disable=broad-except - pass + except Exception as e: # pylint: disable=broad-except + results.append(e) + continue - LOGGER.debug("DeleteConfig() -> Key: %s - Value: %s", + LOGGER.info("DeleteConfig() -> Key: %s - Value: %s", resource_key, resource_value) operation = WriteOperation.delete + # Dataplane and cache rule removal process try: - self.__apply_operation(resource_key, resource_value, operation) - results.append(True) - except Exception as ex: # pylint: disable=broad-except - results.append(ex) + r2, r3 = False, True + r1 = self.__cache_rule_remove(resource_key) + # Cache removal succeeded, proceed to dataplane + if r1: + r2 = self.__apply_operation(resource_key, resource_value, operation) + # Dataplane removal did not succeed --> Revert caching + if not r2 and r1: + r3 = self.__cache_rule_insert(resource_key, resource_value, WriteOperation.insert) + results.append(r1 & r2 & r3) + except Exception as e: # pylint: disable=broad-except + results.append(e) + continue - print(results) + LOGGER.info("DeleteConfig() -> Results: {}".format(results)) return results @@ -583,35 +647,85 @@ class P4Driver(_Driver): """ # Apply settings to the various tables - if KEY_TABLE == resource_key: + if KEY_TABLE in resource_key: self.__manager.table_entry_operation_from_json( resource_value, operation) - elif KEY_COUNTER == resource_key: + elif KEY_COUNTER in resource_key: self.__manager.counter_entry_operation_from_json( resource_value, operation) - elif KEY_DIR_COUNTER == resource_key: + elif KEY_DIR_COUNTER in resource_key: self.__manager.direct_counter_entry_operation_from_json( resource_value, operation) - elif KEY_METER == resource_key: + elif KEY_METER in resource_key: self.__manager.meter_entry_operation_from_json( resource_value, operation) - elif KEY_DIR_METER == resource_key: + elif KEY_DIR_METER in resource_key: self.__manager.direct_meter_entry_operation_from_json( resource_value, operation) - elif KEY_ACTION_PROFILE == resource_key: + elif KEY_ACTION_PROFILE in resource_key: self.__manager.action_prof_member_entry_operation_from_json( resource_value, operation) self.__manager.action_prof_group_entry_operation_from_json( resource_value, operation) - elif KEY_CTL_PKT_METADATA == resource_key: + elif KEY_CLONE_SESSION in resource_key: + self.__manager.clone_session_entry_operation_from_json( + resource_value, operation) + elif KEY_CTL_PKT_METADATA in resource_key: msg = f"{resource_key.capitalize()} is not a " \ f"configurable resource" raise Exception(msg) + elif KEY_DIGEST in resource_key: + msg = f"{resource_key.capitalize()} is not a " \ + f"configurable resource" + raise Exception(msg) + elif KEY_ENDPOINT in resource_key: + self.__endpoints.append((resource_key, resource_value)) else: msg = f"{operation} on invalid key {resource_key}" LOGGER.error(msg) raise Exception(msg) - LOGGER.debug("%s operation: %s", resource_key.capitalize(), operation) + return True + + def __cache_rule_insert(self, resource_key, resource_value, operation): + """ + Insert a new rule into the rule cache or update an existing one. + + :param resource_key: P4 resource key + :param resource_value: P4 resource value in JSON format + :param operation: write operation (i.e., insert, update) to apply + :return: True if new rule is inserted or existing is updated, otherwise False + """ + if (resource_key in self.__rules.keys()) and (operation == WriteOperation.insert): + LOGGER.error("Attempting to insert an existing rule key: {}".format(resource_key)) + return False + elif (resource_key not in self.__rules.keys()) and (operation == WriteOperation.update): + LOGGER.error("Attempting to update a non-existing rule key: {}".format(resource_key)) + return False + elif (resource_key in self.__rules.keys()) and (operation == WriteOperation.update): + LOGGER.warning("Updating an existing rule key: {}".format(resource_key)) + self.__rules[resource_key] = resource_value + return True + + def __cache_rule_remove(self, resource_key): + """ + Remove an existing rule from the rule cache. + :param resource_key: P4 resource key + :return: True if existing rule is removed, otherwise False + """ + if resource_key not in self.__rules.keys(): + LOGGER.error("Attempting to remove a non-existing rule key: {}".format(resource_key)) + return False + self.__rules.pop(resource_key) return True + + def __rules_into_resources(self): + """ + Transform rules from the driver's rule map into + resources exposed through the SBI API. + """ + resource_list = [] + for rule_key, rule_val in self.__rules.items(): + resource_list.append((rule_key, rule_val)) + return resource_list diff --git a/src/device/service/drivers/p4/p4_manager.py b/src/device/service/drivers/p4/p4_manager.py index f6684412a..210422ed8 100644 --- a/src/device/service/drivers/p4/p4_manager.py +++ b/src/device/service/drivers/p4/p4_manager.py @@ -35,7 +35,8 @@ try: from .p4_common import encode,\ parse_resource_string_from_json, parse_resource_integer_from_json,\ parse_resource_bytes_from_json, parse_match_operations_from_json,\ - parse_action_parameters_from_json, parse_integer_list_from_json + parse_action_parameters_from_json, parse_integer_list_from_json,\ + parse_replicas_from_json from .p4_exception import UserError, InvalidP4InfoError except ImportError: from p4_client import P4RuntimeClient, P4RuntimeException,\ @@ -58,6 +59,7 @@ CONTEXT = Context() CLIENTS = {} # Constant P4 entities +KEYS_P4 = [] KEY_TABLE = "table" KEY_ACTION = "action" KEY_ACTION_PROFILE = "action_profile" @@ -66,6 +68,11 @@ KEY_DIR_COUNTER = "direct_counter" KEY_METER = "meter" KEY_DIR_METER = "direct_meter" KEY_CTL_PKT_METADATA = "controller_packet_metadata" +KEY_DIGEST = "digest" + +# Extra resource keys +KEY_CLONE_SESSION = "clone_session" +KEY_ENDPOINT = "endpoint" def get_context(): @@ -83,19 +90,20 @@ def get_table_type(table): :param table: P4 table :return: P4 table type """ - for m_f in table.match_fields: - if m_f.match_type == p4info_pb2.MatchField.EXACT: - return p4info_pb2.MatchField.EXACT - if m_f.match_type == p4info_pb2.MatchField.LPM: - return p4info_pb2.MatchField.LPM - if m_f.match_type == p4info_pb2.MatchField.TERNARY: - return p4info_pb2.MatchField.TERNARY - if m_f.match_type == p4info_pb2.MatchField.RANGE: - return p4info_pb2.MatchField.RANGE - if m_f.match_type == p4info_pb2.MatchField.OPTIONAL: - return p4info_pb2.MatchField.OPTIONAL - return None + is_ternary = False + for m_f in table.match_fields: + # LPM and range are special forms of ternary + if m_f.match_type in [ + p4info_pb2.MatchField.TERNARY, + p4info_pb2.MatchField.LPM, + p4info_pb2.MatchField.RANGE + ]: + is_ternary = True + + if is_ternary: + return p4info_pb2.MatchField.TERNARY + return p4info_pb2.MatchField.EXACT def match_type_to_str(match_type): """ @@ -132,12 +140,12 @@ class P4Manager: self.__id = device_id self.__ip_address = ip_address self.__port = int(port) - self.__endpoint = f"{self.__ip_address}:{self.__port}" + self.__grpc_endpoint = f"{self.__ip_address}:{self.__port}" self.key_id = ip_address+str(port) CLIENTS[self.key_id] = P4RuntimeClient( - self.__id, self.__endpoint, election_id, role_name, ssl_options) + self.__id, self.__grpc_endpoint, election_id, role_name, ssl_options) self.__p4info = None - + self.local_client = CLIENTS[self.key_id] # Internal memory for whitebox management @@ -146,14 +154,14 @@ class P4Manager: # | -> P4 entities self.table_entries = {} + self.action_profile_members = {} + self.action_profile_groups = {} self.counter_entries = {} self.direct_counter_entries = {} self.meter_entries = {} self.direct_meter_entries = {} - self.multicast_groups = {} self.clone_session_entries = {} - self.action_profile_members = {} - self.action_profile_groups = {} + self.multicast_groups = {} def start(self, p4bin_path, p4info_path): """ @@ -234,7 +242,7 @@ class P4Manager: self.__id = None self.__ip_address = None self.__port = None - self.__endpoint = None + self.__grpc_endpoint = None self.__clear_state() def __clear_state(self): @@ -244,14 +252,14 @@ class P4Manager: :return: void """ self.table_entries.clear() + self.action_profile_members.clear() + self.action_profile_groups.clear() self.counter_entries.clear() self.direct_counter_entries.clear() self.meter_entries.clear() self.direct_meter_entries.clear() - self.multicast_groups.clear() self.clone_session_entries.clear() - self.action_profile_members.clear() - self.action_profile_groups.clear() + self.multicast_groups.clear() self.p4_objects.clear() def __init_objects(self): @@ -264,7 +272,7 @@ class P4Manager: global KEY_TABLE, KEY_ACTION, KEY_ACTION_PROFILE, \ KEY_COUNTER, KEY_DIR_COUNTER, \ KEY_METER, KEY_DIR_METER, \ - KEY_CTL_PKT_METADATA + KEY_CTL_PKT_METADATA, KEY_DIGEST, KEYS_P4 KEY_TABLE = P4Type.table.name KEY_ACTION = P4Type.action.name @@ -274,12 +282,15 @@ class P4Manager: KEY_METER = P4Type.meter.name KEY_DIR_METER = P4Type.direct_meter.name KEY_CTL_PKT_METADATA = P4Type.controller_packet_metadata.name - assert (k for k in [ + KEY_DIGEST = P4Type.digest.name + + KEYS_P4 = [ KEY_TABLE, KEY_ACTION, KEY_ACTION_PROFILE, KEY_COUNTER, KEY_DIR_COUNTER, KEY_METER, KEY_DIR_METER, - KEY_CTL_PKT_METADATA - ]) + KEY_CTL_PKT_METADATA, KEY_DIGEST + ] + assert (k for k in KEYS_P4) if not self.p4_objects: LOGGER.warning( @@ -292,6 +303,11 @@ class P4Manager: for table in self.p4_objects[KEY_TABLE]: self.table_entries[table.name] = [] + if KEY_ACTION_PROFILE in self.p4_objects: + for act_prof in self.p4_objects[KEY_ACTION_PROFILE]: + self.action_profile_members[act_prof.name] = [] + self.action_profile_groups[act_prof.name] = [] + if KEY_COUNTER in self.p4_objects: for cnt in self.p4_objects[KEY_COUNTER]: self.counter_entries[cnt.name] = [] @@ -308,11 +324,6 @@ class P4Manager: for d_meter in self.p4_objects[KEY_DIR_METER]: self.direct_meter_entries[d_meter.name] = [] - if KEY_ACTION_PROFILE in self.p4_objects: - for act_prof in self.p4_objects[KEY_ACTION_PROFILE]: - self.action_profile_members[act_prof.name] = [] - self.action_profile_groups[act_prof.name] = [] - def __discover_objects(self): """ Discover and store all P4 objects. @@ -509,6 +520,20 @@ class P4Manager: return pkt_meta return None + def get_digest(self, digest_name): + """ + Get a digest object by name. + + :param digest_name: name of a digest object + :return: digest object or None + """ + if KEY_DIGEST not in self.p4_objects: + return None + for dg in self.p4_objects[KEY_DIGEST]: + if dg == digest_name.name: + return digest_name + return None + def get_resource_keys(self): """ Retrieve the available P4 resource keys. @@ -561,15 +586,15 @@ class P4Manager: self.table_entries[table_name] = [] try: - for count, table_entry in enumerate( - TableEntry(self.local_client, table_name)(action=action_name).read()): - LOGGER.debug( - "Table %s - Entry %d\n%s", table_name, count, table_entry) + entries = TableEntry(self.local_client, table_name).read() + assert self.local_client + for table_entry in entries: self.table_entries[table_name].append(table_entry) return self.table_entries[table_name] except P4RuntimeException as ex: - LOGGER.error(ex) - return [] + LOGGER.error("Failed to get table %s entries: %s", + table_name, str(ex)) + return [] def table_entries_to_json(self, table_name): """ @@ -634,10 +659,14 @@ class P4Manager: :return: number of P4 table entries or negative integer upon missing table """ - entries = self.get_table_entries(table_name, action_name) - if entries is None: - return -1 - return len(entries) + count = 0 + try: + entries = TableEntry(self.local_client, table_name).read() + count = sum(1 for _ in entries) + except Exception as e: # pylint: disable=broad-except + LOGGER.error("Failed to read entries of table: %s", table_name) + + return count def count_table_entries_all(self): """ @@ -675,7 +704,7 @@ class P4Manager: metadata = parse_resource_bytes_from_json(json_resource, "metadata") if operation in [WriteOperation.insert, WriteOperation.update]: - LOGGER.debug("Table entry to insert/update: %s", json_resource) + LOGGER.info("Table entry to insert/update: %s", json_resource) return self.insert_table_entry( table_name=table_name, match_map=match_map, @@ -685,7 +714,7 @@ class P4Manager: metadata=metadata if metadata else None ) if operation == WriteOperation.delete: - LOGGER.debug("Table entry to delete: %s", json_resource) + LOGGER.info("Table entry to delete: %s", json_resource) return self.delete_table_entry( table_name=table_name, match_map=match_map, @@ -700,7 +729,7 @@ class P4Manager: cnt_pkt=-1, cnt_byte=-1): """ Insert an entry into an exact match table. - + :param table_name: P4 table name :param match_map: Map of match operations :param action_name: Action name @@ -712,45 +741,45 @@ class P4Manager: """ assert match_map, "Table entry without match operations is not accepted" assert action_name, "Table entry without action is not accepted" - + table_entry = TableEntry(self.local_client, table_name)(action=action_name) - + for match_k, match_v in match_map.items(): table_entry.match[match_k] = match_v - + for action_k, action_v in action_params.items(): table_entry.action[action_k] = action_v - + if metadata: table_entry.metadata = metadata - + if cnt_pkt > 0: table_entry.counter_data.packet_count = cnt_pkt - + if cnt_byte > 0: table_entry.counter_data.byte_count = cnt_byte - + ex_msg = "" try: table_entry.insert() LOGGER.info("Inserted exact table entry: %s", table_entry) except (P4RuntimeException, P4RuntimeWriteException) as ex: - raise P4RuntimeException from ex - + ex_msg = str(ex) + LOGGER.warning(ex) + # Table entry exists, needs to be modified if "ALREADY_EXISTS" in ex_msg: table_entry.modify() LOGGER.info("Updated exact table entry: %s", table_entry) - + return table_entry - - + def insert_table_entry_ternary(self, table_name, match_map, action_name, action_params, metadata, priority, cnt_pkt=-1, cnt_byte=-1): """ Insert an entry into a ternary match table. - + :param table_name: P4 table name :param match_map: Map of match operations :param action_name: Action name @@ -763,47 +792,47 @@ class P4Manager: """ assert match_map, "Table entry without match operations is not accepted" assert action_name, "Table entry without action is not accepted" - + table_entry = TableEntry(self.local_client, table_name)(action=action_name) - + for match_k, match_v in match_map.items(): table_entry.match[match_k] = match_v - + for action_k, action_v in action_params.items(): table_entry.action[action_k] = action_v - + table_entry.priority = priority - + if metadata: table_entry.metadata = metadata - + if cnt_pkt > 0: table_entry.counter_data.packet_count = cnt_pkt - + if cnt_byte > 0: table_entry.counter_data.byte_count = cnt_byte - + ex_msg = "" try: table_entry.insert() LOGGER.info("Inserted ternary table entry: %s", table_entry) except (P4RuntimeException, P4RuntimeWriteException) as ex: - raise P4RuntimeException from ex - + ex_msg = str(ex) + LOGGER.error(ex) + # Table entry exists, needs to be modified if "ALREADY_EXISTS" in ex_msg: table_entry.modify() LOGGER.info("Updated ternary table entry: %s", table_entry) - + return table_entry - - + def insert_table_entry_range(self, table_name, match_map, action_name, action_params, metadata, priority, cnt_pkt=-1, cnt_byte=-1): # pylint: disable=unused-argument """ Insert an entry into a range match table. - + :param table_name: P4 table name :param match_map: Map of match operations :param action_name: Action name @@ -816,17 +845,16 @@ class P4Manager: """ assert match_map, "Table entry without match operations is not accepted" assert action_name, "Table entry without action is not accepted" - + raise NotImplementedError( "Range-based table insertion not implemented yet") - - + def insert_table_entry_optional(self, table_name, match_map, action_name, action_params, metadata, priority, cnt_pkt=-1, cnt_byte=-1): # pylint: disable=unused-argument """ Insert an entry into an optional match table. - + :param table_name: P4 table name :param match_map: Map of match operations :param action_name: Action name @@ -839,7 +867,7 @@ class P4Manager: """ assert match_map, "Table entry without match operations is not accepted" assert action_name, "Table entry without action is not accepted" - + raise NotImplementedError( "Optional-based table insertion not implemented yet") @@ -869,32 +897,36 @@ class P4Manager: assert table, \ "P4 pipeline does not implement table " + table_name - if not get_table_type(table): + table_type = get_table_type(table) + + if not table_type: msg = f"Table {table_name} is undefined, cannot insert entry" LOGGER.error(msg) raise UserError(msg) + LOGGER.debug("Table {}: {}".format(table_name, match_type_to_str(table_type))) + # Exact match is supported - if get_table_type(table) == p4info_pb2.MatchField.EXACT: + if table_type == p4info_pb2.MatchField.EXACT: return self.insert_table_entry_exact( table_name, match_map, action_name, action_params, metadata, cnt_pkt, cnt_byte) # Ternary and LPM matches are supported - if get_table_type(table) in \ + if table_type in \ [p4info_pb2.MatchField.TERNARY, p4info_pb2.MatchField.LPM]: return self.insert_table_entry_ternary( table_name, match_map, action_name, action_params, metadata, priority, cnt_pkt, cnt_byte) # TODO: Cover RANGE match # pylint: disable=W0511 - if get_table_type(table) == p4info_pb2.MatchField.RANGE: + if table_type == p4info_pb2.MatchField.RANGE: return self.insert_table_entry_range( table_name, match_map, action_name, action_params, metadata, priority, cnt_pkt, cnt_byte) # TODO: Cover OPTIONAL match # pylint: disable=W0511 - if get_table_type(table) == p4info_pb2.MatchField.OPTIONAL: + if table_type == p4info_pb2.MatchField.OPTIONAL: return self.insert_table_entry_optional( table_name, match_map, action_name, action_params, metadata, priority, cnt_pkt, cnt_byte) @@ -917,7 +949,9 @@ class P4Manager: assert table, \ "P4 pipeline does not implement table " + table_name - if not get_table_type(table): + table_type = get_table_type(table) + + if not table_type: msg = f"Table {table_name} is undefined, cannot delete entry" LOGGER.error(msg) raise UserError(msg) @@ -930,7 +964,7 @@ class P4Manager: for action_k, action_v in action_params.items(): table_entry.action[action_k] = action_v - if get_table_type(table) in \ + if table_type in \ [p4info_pb2.MatchField.TERNARY, p4info_pb2.MatchField.LPM]: if priority == 0: msg = f"Table {table_name} is ternary, priority must be != 0" @@ -938,15 +972,25 @@ class P4Manager: raise UserError(msg) # TODO: Ensure correctness of RANGE & OPTIONAL # pylint: disable=W0511 - if get_table_type(table) in \ + if table_type in \ [p4info_pb2.MatchField.RANGE, p4info_pb2.MatchField.OPTIONAL]: raise NotImplementedError( "Range and optional-based table deletion not implemented yet") table_entry.priority = priority - table_entry.delete() - LOGGER.info("Deleted entry %s from table: %s", table_entry, table_name) + ex_msg = "" + try: + table_entry.delete() + LOGGER.info("Deleted entry %s from table: %s", table_entry, table_name) + except (P4RuntimeException, P4RuntimeWriteException) as ex: + ex_msg = str(ex) + LOGGER.warning(ex) + + # Table entry exists, needs to be modified + if "NOT_FOUND" in ex_msg: + # TODO: No way to discriminate between a modified entry and an actual table miss + LOGGER.warning("Table entry was initially modified, thus cannot be removed: %s", table_entry) return table_entry @@ -1172,7 +1216,8 @@ class P4Manager: self.counter_entries[cnt_name].append(cnt_entry) return self.counter_entries[cnt_name] except P4RuntimeException as ex: - LOGGER.error(ex) + LOGGER.error("Failed to get counter %s entries: %s", + cnt_name, str(ex)) return [] def counter_entries_to_json(self, cnt_name): @@ -1620,7 +1665,8 @@ class P4Manager: self.meter_entries[meter_name].append(meter_entry) return self.meter_entries[meter_name] except P4RuntimeException as ex: - LOGGER.error(ex) + LOGGER.error("Failed to get meter %s entries: %s", + meter_name, str(ex)) return [] def meter_entries_to_json(self, meter_name): @@ -1852,7 +1898,8 @@ class P4Manager: self.direct_meter_entries[d_meter_name].append(d_meter_entry) return self.direct_meter_entries[d_meter_name] except P4RuntimeException as ex: - LOGGER.error(ex) + LOGGER.error("Failed to get direct meter %s entries: %s", + d_meter_name, str(ex)) return [] def direct_meter_entries_to_json(self, d_meter_name): @@ -2094,7 +2141,8 @@ class P4Manager: self.action_profile_members[ap_name].append(ap_entry) return self.action_profile_members[ap_name] except P4RuntimeException as ex: - LOGGER.error(ex) + LOGGER.error("Failed to get action profile member %s entries: %s", + ap_name, str(ex)) return [] def action_prof_member_entries_to_json(self, ap_name): @@ -2357,7 +2405,8 @@ class P4Manager: self.action_profile_groups[ap_name].append(ap_entry) return self.action_profile_groups[ap_name] except P4RuntimeException as ex: - LOGGER.error(ex) + LOGGER.error("Failed to get action profile group %s entries: %s", + ap_name, str(ex)) return [] def count_action_prof_group_entries(self, ap_name): @@ -2880,14 +2929,13 @@ class P4Manager: json_resource, "session-id") if operation in [WriteOperation.insert, WriteOperation.update]: - ports = parse_integer_list_from_json( - json_resource, "ports", "port") + replicas = parse_replicas_from_json(json_resource) LOGGER.debug( "Clone session entry to insert/update: %s", json_resource) return self.insert_clone_session_entry( session_id=session_id, - ports=ports + replicas=replicas ) if operation == WriteOperation.delete: LOGGER.debug( @@ -2897,22 +2945,24 @@ class P4Manager: ) return None - def insert_clone_session_entry(self, session_id, ports): + def insert_clone_session_entry(self, session_id, replicas): """ Insert a new clone session. :param session_id: id of a clone session - :param ports: list of egress ports to clone session + :param replicas: list of egress ports to clone session :return: inserted clone session """ assert session_id > 0, \ "Clone session " + session_id + " must be > 0" - assert ports, \ - "No clone session ports are provided" + assert replicas, \ + "No clone session replicas are provided" + assert isinstance(replicas, dict), \ + "Clone session replicas must be a dictionary" session = CloneSessionEntry(self.local_client, session_id) - for p in ports: - session.add(p, 1) + for eg_port,instance in replicas.items(): + session.add(eg_port, instance) ex_msg = "" try: @@ -2943,12 +2993,15 @@ class P4Manager: "Clone session " + session_id + " must be > 0" session = CloneSessionEntry(self.local_client, session_id) - session.delete() + + try: + session.delete() + LOGGER.info("Deleted clone session %d", session_id) + except (P4RuntimeException, P4RuntimeWriteException) as ex: + LOGGER.error(ex) if session_id in self.clone_session_entries: del self.clone_session_entries[session_id] - LOGGER.info( - "Deleted clone session %d", session_id) return session @@ -3786,6 +3839,7 @@ class _P4EntityBase(_EntityBase): def __init__(self, p4_client, p4_type, entity_type, p4runtime_cls, name=None, modify_only=False): super().__init__(p4_client, entity_type, p4runtime_cls, modify_only) + assert self.local_client, "No local P4 client instance" self._p4_type = p4_type if name is None: raise UserError( @@ -3815,7 +3869,7 @@ class ActionProfileMember(_P4EntityBase): """ def __init__(self, p4_client, action_profile_name=None): - super().__init__( p4_client, + super().__init__(p4_client, P4Type.action_profile, P4RuntimeEntity.action_profile_member, p4runtime_pb2.ActionProfileMember, action_profile_name) self.member_id = 0 @@ -3981,7 +4035,7 @@ class ActionProfileGroup(_P4EntityBase): """ def __init__(self, p4_client, action_profile_name=None): - super().__init__( p4_client, + super().__init__(p4_client, P4Type.action_profile, P4RuntimeEntity.action_profile_group, p4runtime_pb2.ActionProfileGroup, action_profile_name) self.group_id = 0 @@ -5055,7 +5109,7 @@ class CounterEntry(_CounterEntryBase): """ def __init__(self, p4_client, counter_name=None): - super().__init__( p4_client, + super().__init__(p4_client, P4Type.counter, P4RuntimeEntity.counter_entry, p4runtime_pb2.CounterEntry, counter_name, modify_only=True) @@ -5115,11 +5169,11 @@ To write to the counter, use <self>.modify class DirectCounterEntry(_CounterEntryBase): """ Direct P4 counter entry. - """ + """ local_client = None def __init__(self, p4_client, direct_counter_name=None): - super().__init__( p4_client, + super().__init__(p4_client, P4Type.direct_counter, P4RuntimeEntity.direct_counter_entry, p4runtime_pb2.DirectCounterEntry, direct_counter_name, modify_only=True) @@ -5213,7 +5267,7 @@ class _MeterEntryBase(_P4EntityBase): """ def __init__(self, p4_client, *args, **kwargs): - super().__init__(*args, **kwargs) + super().__init__(p4_client, *args, **kwargs) self._meter_type = self._info.spec.unit self.index = -1 self.cir = -1 @@ -5910,7 +5964,7 @@ class IdleTimeoutNotification(): """ P4 idle timeout notification. """ - + local_client = None def __init__(self, p4_client): -- GitLab