diff --git a/hackfest/p4/setup.sh b/hackfest/p4/setup.sh index 07fe22e6aea2341c50462010b4bfb55c4a657a47..195327a03fedafdc64a2d0dc34577766eda72a4f 100755 --- a/hackfest/p4/setup.sh +++ b/hackfest/p4/setup.sh @@ -4,5 +4,5 @@ export POD_NAME=$(kubectl get pods -n=tfs | grep device | awk '{print $1}') kubectl exec ${POD_NAME} -n=tfs -- mkdir /root/p4 -kubectl cp src/tests/netx22-p4/p4/p4info.txt tfs/${POD_NAME}:/root/p4 -kubectl cp src/tests/netx22-p4/p4/bmv2.json tfs/${POD_NAME}:/root/p4 +kubectl cp hackfest/p4/p4/p4info.txt tfs/${POD_NAME}:/root/p4 +kubectl cp hackfest/p4/p4/bmv2.json tfs/${POD_NAME}:/root/p4 diff --git a/hackfest/p4/tests/Objects.py b/hackfest/p4/tests/Objects.py index 09b3aced843a198b7c963a34492a4fe2379c9123..4d8fb3352b2417afbdf4ffb56e32cf5720355f6e 100644 --- a/hackfest/p4/tests/Objects.py +++ b/hackfest/p4/tests/Objects.py @@ -1,4 +1,5 @@ # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -42,6 +43,8 @@ PACKET_PORT_SAMPLE_TYPES = [ KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED, ] +# ----- Device Credentials and Settings -------------------------------------------------------------------------------- + # ----- Devices -------------------------------------------------------------------------------------------------------- @@ -78,9 +81,38 @@ DEVICE_SW1_CONNECT_RULES = json_device_connect_rules( } ) +DEVICE_SW2_UUID = 'SW2' +DEVICE_SW2_TIMEOUT = 60 +DEVICE_SW2_ID = json_device_id(DEVICE_SW2_UUID) +DEVICE_SW2 = json_device_p4_disabled(DEVICE_SW2_UUID) -################################## TABLE ENTRIES ################################## +DEVICE_SW2_DPID = 1 +DEVICE_SW2_NAME = DEVICE_SW2_UUID +DEVICE_SW2_IP_ADDR = '10.0.2.10' +DEVICE_SW2_PORT = '50002' +DEVICE_SW2_VENDOR = 'Open Networking Foundation' +DEVICE_SW2_HW_VER = 'BMv2 simple_switch' +DEVICE_SW2_SW_VER = 'Stratum' +DEVICE_SW2_BIN_PATH = '/root/p4/bmv2.json' +DEVICE_SW2_INFO_PATH = '/root/p4/p4info.txt' + +DEVICE_SW2_CONNECT_RULES = json_device_connect_rules( + DEVICE_SW2_IP_ADDR, + DEVICE_SW2_PORT, + { + 'id': DEVICE_SW2_DPID, + 'name': DEVICE_SW2_NAME, + 'vendor': DEVICE_SW2_VENDOR, + 'hw_ver': DEVICE_SW2_HW_VER, + 'sw_ver': DEVICE_SW2_SW_VER, + 'timeout': DEVICE_SW2_TIMEOUT, + 'p4bin': DEVICE_SW2_BIN_PATH, + 'p4info': DEVICE_SW2_INFO_PATH + } +) + +################################## TABLE ENTRIES ################################## DEVICE_SW1_CONFIG_TABLE_ENTRIES = [ json_config_rule_set( @@ -123,6 +155,8 @@ DEVICE_SW1_CONFIG_TABLE_ENTRIES = [ ) ] +DEVICE_SW2_CONFIG_TABLE_ENTRIES = DEVICE_SW1_CONFIG_TABLE_ENTRIES + """ DEVICE_SW1_CONFIG_TABLE_ENTRIES = [ @@ -171,7 +205,6 @@ DEVICE_SW1_CONFIG_TABLE_ENTRIES = [ ################################## TABLE DECONF ################################## - DEVICE_SW1_DECONF_TABLE_ENTRIES = [ json_config_rule_delete( 'table', @@ -213,6 +246,7 @@ DEVICE_SW1_DECONF_TABLE_ENTRIES = [ ) ] +DEVICE_SW2_DECONF_TABLE_ENTRIES = DEVICE_SW1_DECONF_TABLE_ENTRIES """ @@ -271,6 +305,7 @@ TOPOLOGIES = [TOPOLOGY] DEVICES = [ (DEVICE_SW1, DEVICE_SW1_CONNECT_RULES, DEVICE_SW1_CONFIG_TABLE_ENTRIES, DEVICE_SW1_DECONF_TABLE_ENTRIES), + (DEVICE_SW2, DEVICE_SW2_CONNECT_RULES, DEVICE_SW2_CONFIG_TABLE_ENTRIES, DEVICE_SW2_DECONF_TABLE_ENTRIES), ] LINKS = [] diff --git a/hackfest/p4/tests/test_functional_cleanup.py b/hackfest/p4/tests/test_functional_cleanup.py index 32f716f1c2287b11bae3610022d64659d82ba73d..ccbcb9843a03bbf095743af0753da3fe8af3bfce 100644 --- a/hackfest/p4/tests/test_functional_cleanup.py +++ b/hackfest/p4/tests/test_functional_cleanup.py @@ -54,8 +54,8 @@ def test_scenario_cleanup( device_client.DeleteDevice(DeviceId(**device_id)) #expected_events.append(('DeviceEvent', EVENT_REMOVE, json_device_id(device_uuid))) - response = context_client.ListDevices(Empty()) - assert len(response.devices) == 0 + response = context_client.ListDevices(Empty()) + assert len(response.devices) == 0 # ----- Delete Topologies and Validate Collected Events ------------------------------------------------------------ for topology in TOPOLOGIES: diff --git a/src/device/service/drivers/p4/p4_driver.py b/src/device/service/drivers/p4/p4_driver.py index 069c07ce40e43192b74519b2175e7e10c638cd20..60fa84a38b74c1c41a360d96e406cb04e7c213a7 100644 --- a/src/device/service/drivers/p4/p4_driver.py +++ b/src/device/service/drivers/p4/p4_driver.py @@ -28,9 +28,9 @@ from .p4_common import matches_ipv4, matches_ipv6, valid_port,\ P4_ATTR_DEV_P4BIN, P4_ATTR_DEV_P4INFO, P4_ATTR_DEV_TIMEOUT,\ P4_VAL_DEF_VENDOR, P4_VAL_DEF_HW_VER, P4_VAL_DEF_SW_VER,\ P4_VAL_DEF_TIMEOUT -from .p4_manager import P4Manager, get_api_version, KEY_TABLE,\ +from .p4_manager import P4Manager, KEY_TABLE,\ KEY_ACTION_PROFILE, KEY_COUNTER, KEY_DIR_COUNTER, KEY_METER, KEY_DIR_METER,\ - KEY_CTL_PKT_METADATA + KEY_CTL_PKT_METADATA#, get_api_version from .p4_client import WriteOperation try: @@ -127,8 +127,7 @@ class P4Driver(_Driver): except Exception as ex: # pylint: disable=broad-except raise Exception(ex) from ex - LOGGER.info("\tConnected via P4Runtime version %s", - get_api_version()) + LOGGER.info("\tConnected via P4Runtime version ")#,get_api_version()) self.__started.set() return True diff --git a/src/device/service/drivers/p4/p4_drv.py b/src/device/service/drivers/p4/p4_drv.py new file mode 100644 index 0000000000000000000000000000000000000000..e4ca70b7b79561f8c069a124c334670be8540a5c --- /dev/null +++ b/src/device/service/drivers/p4/p4_drv.py @@ -0,0 +1,599 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +P4 driver plugin for the TeraFlow SDN controller. +""" + +import os +import json +import logging +import threading +from typing import Any, Iterator, List, Optional, Tuple, Union +from common.type_checkers.Checkers import chk_type, chk_length, chk_string +from p4_common import matches_ipv4, matches_ipv6, valid_port,\ + P4_ATTR_DEV_ID, P4_ATTR_DEV_NAME, P4_ATTR_DEV_VENDOR,\ + P4_ATTR_DEV_HW_VER, P4_ATTR_DEV_SW_VER,\ + P4_ATTR_DEV_P4BIN, P4_ATTR_DEV_P4INFO, P4_ATTR_DEV_TIMEOUT,\ + P4_VAL_DEF_VENDOR, P4_VAL_DEF_HW_VER, P4_VAL_DEF_SW_VER,\ + P4_VAL_DEF_TIMEOUT +import p4_manager +from p4_manager import P4Manager, get_api_version +from p4_context import P4Type +from p4_client import WriteOperation + +LOGGER = logging.getLogger(__name__) + + +class P4Driver: + """ + P4Driver class inherits the abstract _Driver class to support P4 devices. + + Attributes + ---------- + address : str + IP address of the P4Runtime server running on the P4 device + port : int + transport port number of the P4Runtime server running on the P4 device + **settings : map + id : int + P4 device datapath ID (Mandatory) + name : str + P4 device name (Optional) + vendor : str + P4 device vendor (Optional) + hw_ver : str + Hardware version of the P4 device (Optional) + sw_ver : str + Software version of the P4 device (Optional) + p4bin : str + Path to P4 binary file (Optional, but must be combined with p4info) + p4info : str + Path to P4 info file (Optional, but must be combined with p4bin) + timeout : int + Device timeout in seconds (Optional) + """ + + def __init__(self, address: str, port: int, **settings) -> None: + # pylint: disable=super-init-not-called + self.__manager = None + self.__address = address + self.__port = int(port) + self.__endpoint = None + self.__settings = settings + self.__id = None + self.__name = None + self.__vendor = P4_VAL_DEF_VENDOR + self.__hw_version = P4_VAL_DEF_HW_VER + self.__sw_version = P4_VAL_DEF_SW_VER + self.__p4bin_path = None + self.__p4info_path = None + self.__timeout = P4_VAL_DEF_TIMEOUT + self.__lock = threading.Lock() + self.__started = threading.Event() + self.__terminate = threading.Event() + + self.__parse_and_validate_settings() + + LOGGER.info("Initializing P4 device at %s:%d with settings:", + self.__address, self.__port) + + for key, value in settings.items(): + LOGGER.info("\t%8s = %s", key, value) + + def Connect(self) -> bool: + """ + Establish a connection between the P4 device driver and a P4 device. + + :return: boolean connection status. + """ + LOGGER.info("Connecting to P4 device %s ...", self.__endpoint) + + with self.__lock: + # Skip if already connected + if self.__started.is_set(): + return True + + # Dynamically devise an election ID + election_id = (1, 0) + + # Spawn a P4 manager for this device + self.__manager = P4Manager( + device_id=self.__id, + ip_address=self.__address, + port=self.__port, + election_id=election_id) + assert self.__manager + + # Start the P4 manager + try: + self.__manager.start(self.__p4bin_path, self.__p4info_path) + except Exception as ex: # pylint: disable=broad-except + raise Exception(ex) from ex + + LOGGER.info("\tConnected via P4Runtime version %s", + get_api_version()) + self.__started.set() + + return True + + def Disconnect(self) -> bool: + """ + Terminate the connection between the P4 device driver and a P4 device. + + :return: boolean disconnection status. + """ + LOGGER.info("Disconnecting from P4 device %s ...", self.__endpoint) + + # If not started, assume it is already disconnected + if not self.__started.is_set(): + return True + + # P4 manager must already be instantiated + assert self.__manager + + # Trigger termination of loops and processes + self.__terminate.set() + + # Trigger connection tear down with the P4Runtime server + self.__manager.stop() + self.__manager = None + + LOGGER.info("\tDisconnected!") + + return True + + def GetInitialConfig(self) -> List[Tuple[str, Any]]: + """ + Retrieve the initial configuration of a P4 device. + + :return: list of initial configuration items. + """ + initial_conf = [] + + with self.__lock: + if not initial_conf: + LOGGER.warning("No initial configuration for P4 device %s ...", + self.__endpoint) + return [] + + def GetConfig(self, resource_keys: List[str] = [])\ + -> List[Tuple[str, Union[Any, None, Exception]]]: + """ + Retrieve the current configuration of a P4 device. + + :param resource_keys: P4 resource keys to retrieve. + :return: list of values associated with the requested resource keys or + None/Exception. + """ + LOGGER.info( + "Getting configuration from P4 device %s ...", self.__endpoint) + + # No resource keys means fetch all configuration + if len(resource_keys) == 0: + LOGGER.warning( + "GetConfig with no resource keys " + "implies getting all resource keys!") + resource_keys = [ + obj_name for obj_name, _ in self.__manager.p4_objects.items() + ] + + # Verify the input type + chk_type("resources", resource_keys, list) + + with self.__lock: + return self.__get_resources(resource_keys) + + def SetConfig(self, resources: List[Tuple[str, Any]])\ + -> List[Union[bool, Exception]]: + """ + Submit a new configuration to a P4 device. + + :param resources: P4 resources to set. + :return: list of boolean results or Exceptions for resource key + changes requested. + """ + LOGGER.info( + "Setting configuration to P4 device %s ...", self.__endpoint) + + if not resources or len(resources) == 0: + LOGGER.warning( + "SetConfig requires a list of resources to store " + "into the device. Nothing is provided though.") + return [] + + assert isinstance(resources, list) + + with self.__lock: + return self.__set_resources(resources) + + def DeleteConfig(self, resources: List[Tuple[str, Any]])\ + -> List[Union[bool, Exception]]: + """ + Revoke P4 device configuration. + + :param resources: list of tuples with resource keys to be deleted. + :return: list of boolean results or Exceptions for resource key + deletions requested. + """ + LOGGER.info( + "Deleting configuration from P4 device %s ...", self.__endpoint) + + if not resources or len(resources) == 0: + LOGGER.warning( + "DeleteConfig requires a list of resources to delete " + "from the device. Nothing is provided though.") + return [] + + with self.__lock: + return self.__delete_resources(resources) + + def GetResource(self, endpoint_uuid: str) -> Optional[str]: + """ + Retrieve a certain resource from a P4 device. + + :param endpoint_uuid: target endpoint UUID. + :return: The path of the endpoint or None if not found. + """ + LOGGER.warning("GetResource() RPC not yet implemented by the P4 driver") + return "" + + def GetState(self, + blocking=False, + terminate: Optional[threading.Event] = None) -> \ + Iterator[Tuple[str, Any]]: + """ + Retrieve the state of a P4 device. + + :param blocking: if non-blocking, the driver terminates the loop and + returns. + :param terminate: termination flag. + :return: sequences of state sample. + """ + LOGGER.warning("GetState() RPC not yet implemented by the P4 driver") + return [] + + def SubscribeState(self, subscriptions: List[Tuple[str, float, float]])\ + -> List[Union[bool, Exception]]: + """ + Subscribe to certain state information. + + :param subscriptions: list of tuples with resources to be subscribed. + :return: list of results for resource subscriptions requested. + """ + LOGGER.warning( + "SubscribeState() RPC not yet implemented by the P4 driver") + return [False for _ in subscriptions] + + def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]])\ + -> List[Union[bool, Exception]]: + """ + Unsubscribe from certain state information. + + :param subscriptions: list of tuples with resources to be unsubscribed. + :return: list of results for resource un-subscriptions requested. + """ + LOGGER.warning( + "UnsubscribeState() RPC not yet implemented by the P4 driver") + return [False for _ in subscriptions] + + def get_manager(self): + """ + Get an instance of the P4 manager. + + :return: P4 manager instance + """ + return self.__manager + + def __parse_and_validate_settings(self): + """ + Verify that the driver inputs comply to what is expected. + + :return: void or exception in case of validation error + """ + # Device endpoint information + assert matches_ipv4(self.__address) or (matches_ipv6(self.__address)),\ + f"{self.__address} not a valid IPv4 or IPv6 address" + assert valid_port(self.__port), \ + f"{self.__port} not a valid transport port" + self.__endpoint = f"{self.__address}:{self.__port}" + + # Device ID + try: + self.__id = self.__settings.get(P4_ATTR_DEV_ID) + except Exception as ex: + LOGGER.error("P4 device ID is a mandatory setting") + raise Exception from ex + + # Device name + if P4_ATTR_DEV_NAME in self.__settings: + self.__name = self.__settings.get(P4_ATTR_DEV_NAME) + else: + self.__name = str(self.__id) + LOGGER.warning( + "No device name is provided. Setting default name: %s", + self.__name) + + # Device vendor + if P4_ATTR_DEV_VENDOR in self.__settings: + self.__vendor = self.__settings.get(P4_ATTR_DEV_VENDOR) + else: + LOGGER.warning( + "No device vendor is provided. Setting default vendor: %s", + self.__vendor) + + # Device hardware version + if P4_ATTR_DEV_HW_VER in self.__settings: + self.__hw_version = self.__settings.get(P4_ATTR_DEV_HW_VER) + else: + LOGGER.warning( + "No HW version is provided. Setting default HW version: %s", + self.__hw_version) + + # Device software version + if P4_ATTR_DEV_SW_VER in self.__settings: + self.__sw_version = self.__settings.get(P4_ATTR_DEV_SW_VER) + else: + LOGGER.warning( + "No SW version is provided. Setting default SW version: %s", + self.__sw_version) + + # Path to P4 binary file + if P4_ATTR_DEV_P4BIN in self.__settings: + self.__p4bin_path = self.__settings.get(P4_ATTR_DEV_P4BIN) + assert os.path.exists(self.__p4bin_path),\ + "Invalid path to p4bin file" + assert P4_ATTR_DEV_P4INFO in self.__settings,\ + "p4info and p4bin settings must be provided together" + + # Path to P4 info file + if P4_ATTR_DEV_P4INFO in self.__settings: + self.__p4info_path = self.__settings.get(P4_ATTR_DEV_P4INFO) + assert os.path.exists(self.__p4info_path),\ + "Invalid path to p4info file" + assert P4_ATTR_DEV_P4BIN in self.__settings,\ + "p4info and p4bin settings must be provided together" + + if (not self.__p4bin_path) or (not self.__p4info_path): + LOGGER.warning( + "No P4 binary and info files are provided, hence " + "no pipeline will be installed on the whitebox device.\n" + "This driver will attempt to manage whatever pipeline " + "is available on the target device.") + + # Device timeout + if P4_ATTR_DEV_TIMEOUT in self.__settings: + self.__timeout = self.__settings.get(P4_ATTR_DEV_TIMEOUT) + assert self.__timeout > 0,\ + "Device timeout must be a positive integer" + else: + LOGGER.warning( + "No device timeout is provided. Setting default timeout: %s", + self.__timeout) + + def __get_resources(self, resource_keys): + """ + Retrieve the current configuration of a P4 device. + + :param resource_keys: P4 resource keys to retrieve. + :return: list of values associated with the requested resource keys or + None/Exception. + """ + resources = [] + + LOGGER.debug(f"GetConfig() -> Keys: {resource_keys}") + + for resource_key in resource_keys: + entries = [] + try: + if p4_manager.KEY_TABLE == resource_key: + for table_name in self.__manager.get_table_names(): + t_entries = self.__manager.table_entries_to_json( + table_name) + if t_entries: + entries.append(t_entries) + elif p4_manager.KEY_COUNTER == resource_key: + for cnt_name in self.__manager.get_counter_names(): + c_entries = self.__manager.counter_entries_to_json( + cnt_name) + if c_entries: + entries.append(c_entries) + elif p4_manager.KEY_DIR_COUNTER == resource_key: + for d_cnt_name in self.__manager.get_direct_counter_names(): + dc_entries = \ + self.__manager.direct_counter_entries_to_json( + d_cnt_name) + if dc_entries: + entries.append(dc_entries) + elif p4_manager.KEY_METER == resource_key: + for meter_name in self.__manager.get_meter_names(): + m_entries = self.__manager.meter_entries_to_json( + meter_name) + if m_entries: + entries.append(m_entries) + elif p4_manager.KEY_DIR_METER == resource_key: + for d_meter_name in self.__manager.get_direct_meter_names(): + dm_entries = \ + self.__manager.direct_meter_entries_to_json( + d_meter_name) + if dm_entries: + entries.append(dm_entries) + elif p4_manager.KEY_ACTION_PROFILE == resource_key: + for ap_name in self.__manager.get_action_profile_names(): + ap_entries = \ + self.__manager.action_prof_member_entries_to_json( + ap_name) + if ap_entries: + entries.append(ap_entries) + elif p4_manager.KEY_CTL_PKT_METADATA == resource_key: + msg = f"{resource_key.capitalize()} is not a " \ + f"retrievable resource" + raise Exception(msg) + else: + msg = f"GetConfig failed due to invalid " \ + f"resource key: {resource_key}" + raise Exception(msg) + resources.append( + (resource_key, entries if entries else None) + ) + except Exception as ex: + resources.append((resource_key, ex)) + + return resources + + def __set_resources(self, resources): + """ + Submit a new configuration to a P4 device. + + :param resources: P4 resources to set. + :return: list of boolean results or Exceptions for resource key + changes requested. + """ + results = [] + + for i, resource in enumerate(resources): + str_resource_name = "resources[#{:d}]".format(i) + resource_key = "" + try: + chk_type( + str_resource_name, resource, (list, tuple)) + chk_length( + str_resource_name, resource, min_length=2, max_length=2) + resource_key, resource_value = resource + chk_string( + str_resource_name, resource_key, allow_empty=False) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception( + "Exception validating {:s}: {:s}".format( + str_resource_name, str(resource_key))) + results.append(e) # store the exception if validation fails + continue + + try: + resource_value = json.loads(resource_value) + except Exception: # pylint: disable=broad-except + pass + + LOGGER.info( + f"SetConfig() -> Key: {resource_key} - Value: {resource_value}") + + # Default operation is insert. + # P4 manager has internal logic to judge whether an entry + # to be inserted already exists, thus simply needs an update. + operation = WriteOperation.insert + + try: + self.__apply_operation(resource_key, resource_value, operation) + results.append(True) + except Exception as ex: + results.append(ex) + + print(results) + + return results + + def __delete_resources(self, resources): + """ + Revoke P4 device configuration. + + :param resources: list of tuples with resource keys to be deleted. + :return: list of boolean results or Exceptions for resource key + deletions requested. + """ + results = [] + + for i, resource in enumerate(resources): + str_resource_name = "resources[#{:d}]".format(i) + resource_key = "" + try: + chk_type( + str_resource_name, resource, (list, tuple)) + chk_length( + str_resource_name, resource, min_length=2, max_length=2) + resource_key, resource_value = resource + chk_string( + str_resource_name, resource_key, allow_empty=False) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception( + "Exception validating {:s}: {:s}".format( + str_resource_name, str(resource_key))) + results.append(e) # store the exception if validation fails + continue + + try: + resource_value = json.loads(resource_value) + except Exception: # pylint: disable=broad-except + pass + + LOGGER.debug( + f"DeleteConfig() -> " + f"Key: {resource_key} - Value: {resource_value}") + + operation = WriteOperation.delete + + try: + self.__apply_operation(resource_key, resource_value, operation) + results.append(True) + except Exception as ex: + results.append(ex) + + print(results) + + return results + + def __apply_operation( + self, resource_key, resource_value, operation: WriteOperation): + """ + Apply a write operation to a P4 resource. + + :param resource_key: P4 resource key + :param resource_value: P4 resource value in JSON format + :param operation: write operation (i.e., insert, update, delete) + to apply + :return: True if operation is successfully applied or raise Exception + """ + + # Apply settings to the various tables + if p4_manager.KEY_TABLE == resource_key: + self.__manager.table_entry_operation_from_json( + resource_value, operation) + elif p4_manager.KEY_COUNTER == resource_key: + self.__manager.counter_entry_operation_from_json( + resource_value, operation) + elif p4_manager.KEY_DIR_COUNTER == resource_key: + self.__manager.direct_counter_entry_operation_from_json( + resource_value, operation) + elif p4_manager.KEY_METER == resource_key: + self.__manager.meter_entry_operation_from_json( + resource_value, operation) + elif p4_manager.KEY_DIR_METER == resource_key: + self.__manager.direct_meter_entry_operation_from_json( + resource_value, operation) + elif p4_manager.KEY_ACTION_PROFILE == resource_key: + self.__manager.action_prof_member_entry_operation_from_json( + resource_value, operation) + self.__manager.action_prof_group_entry_operation_from_json( + resource_value, operation) + elif p4_manager.KEY_CTL_PKT_METADATA == resource_key: + msg = f"{resource_key.capitalize()} is not a " \ + f"configurable resource" + raise Exception(msg) + else: + msg = f"{operation} on invalid key {resource_key}" + LOGGER.error(msg) + raise Exception(msg) + + LOGGER.debug(f"{resource_key.capitalize()} operation: {operation}") + + return True diff --git a/src/device/service/drivers/p4/p4_main.py b/src/device/service/drivers/p4/p4_main.py new file mode 100644 index 0000000000000000000000000000000000000000..56d17bcd975ad1d85b2b86fd8e1243680aee8b06 --- /dev/null +++ b/src/device/service/drivers/p4/p4_main.py @@ -0,0 +1,851 @@ +#!/usr/bin/env python3 +""" +Script to manage P4 switches. +""" + +import time + +import sys +import logging +from typing import Any, Tuple, Union +from p4_drv import P4Driver + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%d-%b-%y %H:%M:%S' +) + +DEVICE_P4_DPID = 1 +DEVICE_P4_NAME = 'device:leaf1' +DEVICE_P4_IP_ADDR = 'localhost' +DEVICE_P4_PORT = 50001 +DEVICE_P4_GRPC_ADDR = f"{DEVICE_P4_IP_ADDR}:{DEVICE_P4_PORT}" +DEVICE_P4_ELECTION_ID = (0, 1) +DEVICE_P4_VENDOR = 'Open Networking Foundation' +DEVICE_P4_HW_VER = 'BMv2 simple_switch' +DEVICE_P4_SW_VER = 'Stratum' +# DEVICE_P4_BIN_PATH = '/home/katsikas/teraflow-h2020/setup/controller/src/device/tests/p4/test-bmv2.json' +# DEVICE_P4_INFO_PATH = '/home/katsikas/teraflow-h2020/setup/controller/src/device/tests/p4/test-p4info.txt' +# DEVICE_P4_BIN_PATH = '/home/kesnar/tfs-ctrl/src/tests/p4/tests/p4/bmv2.json' +# DEVICE_P4_INFO_PATH = '/home/kesnar/tfs-ctrl/src/tests/p4/tests/p4/p4info.txt' +DEVICE_P4_BIN_PATH = '/home/kesnar/ngsdn-tutorial/p4src/build/bmv2.json' +DEVICE_P4_INFO_PATH = '/home/kesnar/ngsdn-tutorial/p4src/build/p4info.txt' +DEVICE_P4_TIMEOUT = 60 +DEVICE_ELECTION_ID = (1, 0) + +P4_DEV_SETTINGS = { + 'id': DEVICE_P4_DPID, + 'name': DEVICE_P4_NAME, + 'vendor': DEVICE_P4_VENDOR, + 'hw_ver': DEVICE_P4_HW_VER, + 'sw_ver': DEVICE_P4_SW_VER, + 'p4bin': DEVICE_P4_BIN_PATH, + 'p4info': DEVICE_P4_INFO_PATH, + 'timeout': DEVICE_P4_TIMEOUT +} + +p4_driver = P4Driver( + address=DEVICE_P4_IP_ADDR, + port=DEVICE_P4_PORT, + id=DEVICE_P4_DPID, + name=DEVICE_P4_NAME, + vendor=DEVICE_P4_VENDOR, + hw_ver=DEVICE_P4_HW_VER, + sw_ver=DEVICE_P4_SW_VER, + p4bin=DEVICE_P4_BIN_PATH, + p4info=DEVICE_P4_INFO_PATH, + timeout=DEVICE_P4_TIMEOUT +) + +manager = None + + +def connect(): + return p4_driver.Connect() + + +def disconnect(): + return p4_driver.Disconnect() + + +def count_report(): + return p4_driver.get_manager().count_active_entries() + + +def count_report_2(): + tot_cnt = 0 + + logging.info("=========================== COUNT ==========================") + + t_entries = manager.get_table_entries( + table_name="IngressPipeImpl.acl_table") + logging.debug('Table {} entries: {}'.format( + "IngressPipeImpl.acl_table", t_entries)) + t_entries_nb = manager.count_table_entries("IngressPipeImpl.acl_table") + logging.debug('# of table {} entries: {}'.format( + "IngressPipeImpl.acl_table", t_entries_nb)) + tot_cnt += t_entries_nb if t_entries_nb >= 0 else 0 + + c_entries = manager.get_counter_entries(cnt_name="bla") + logging.debug('Counter {} entries: {}'.format("bla", c_entries)) + c_entries_nb = manager.count_counter_entries("bla") + logging.debug('# of counter {} entries: {}'.format("bla", c_entries_nb)) + tot_cnt += c_entries_nb if c_entries_nb >= 0 else 0 + + dc_entries = manager.get_direct_counter_entries( + d_cnt_name="acl_table_counter") + logging.debug('Direct counter {} entries: {}'.format( + "acl_table_counter", dc_entries)) + dc_entries_nb = manager.count_direct_counter_entries("acl_table_counter") + logging.debug('# of direct counter {} entries: {}'.format( + "bla", dc_entries_nb)) + tot_cnt += dc_entries_nb if dc_entries_nb >= 0 else 0 + + m_entries = manager.get_meter_entries(meter_name="bla") + logging.debug('Meter {} entries: {}'.format("bla", m_entries)) + m_entries_nb = manager.count_meter_entries("bla") + logging.debug('# of meter {} entries: {}'.format("bla", m_entries_nb)) + tot_cnt += m_entries_nb if m_entries_nb >= 0 else 0 + + dm_entries = manager.get_direct_meter_entries(d_meter_name="bla") + logging.debug('Direct meter {} entries: {}'.format("bla", dm_entries)) + dm_entries_nb = manager.count_direct_meter_entries("bla") + logging.debug('# of direct meter {} entries: {}'.format( + "bla", dm_entries_nb)) + tot_cnt += dm_entries_nb if dm_entries_nb >= 0 else 0 + + gid = 1 + mc_entries = manager.get_multicast_group_entry(group_id=gid) + logging.debug('Multicast group {}: {}'.format(gid, mc_entries)) + mc_groups_nb = manager.count_multicast_groups() + logging.debug('# of multicast groups: {}'.format(mc_groups_nb)) + tot_cnt += mc_groups_nb if mc_groups_nb >= 0 else 0 + + sid = 1 + cs_entries = manager.get_clone_session_entry(session_id=sid) + logging.debug('Clone session {}: {}'.format(sid, cs_entries)) + cs_nb = manager.count_clone_session_entries() + logging.debug('# of clone sessions: {}'.format(cs_nb)) + tot_cnt += cs_nb if cs_nb >= 0 else 0 + + ap_name = "bla" + ap_mem_entries = manager.get_action_prof_member_entries(ap_name=ap_name) + logging.debug('Action profile member {}: {}'.format( + ap_name, ap_mem_entries)) + ap_mems_nb = manager.count_action_prof_member_entries(ap_name=ap_name) + logging.debug('# of action profile members: {}'.format(ap_mems_nb)) + tot_cnt += ap_mems_nb if ap_mems_nb >= 0 else 0 + + ap_gr_entries = manager.get_action_prof_group_entries(ap_name=ap_name) + logging.debug('Action profile group {}: {}'.format(ap_name, ap_gr_entries)) + ap_grs_nb = manager.count_action_prof_group_entries(ap_name=ap_name) + logging.debug('# of action profile groups: {}'.format(ap_grs_nb)) + tot_cnt += ap_grs_nb if ap_grs_nb >= 0 else 0 + + logging.info("Total # of entries: {}".format(tot_cnt)) + logging.info("============================================================") + + return tot_cnt + + +def table_acl_insert_1(): + logging.info("======================= INSERT TABLE =======================") + te_match_map = {"hdr.ethernet.dst_addr": "aa:bb:cc:dd:ee:22 &&& ff:ff:ff:ff:ff:ff"} + te_action_name = "IngressPipeImpl.send_to_cpu" + te_action_params = {} + manager.insert_table_entry( + table_name="IngressPipeImpl.acl_table", + action_name=te_action_name, + match_map=te_match_map, + action_params=te_action_params, + priority=1, + cnt_pkt=0, + cnt_byte=0 + ) + logging.info("============================================================") + print("\n") + + +def table_acl_delete_1(): + logging.info("======================= DELETE TABLE =======================") + te_match_map = {"hdr.ethernet.dst_addr": "aa:bb:cc:dd:ee:22"} + te_action_name = "IngressPipeImpl.send_to_cpu" + te_action_params = {} + e = manager.delete_table_entry( + table_name="IngressPipeImpl.acl_table", + action_name=te_action_name, + match_map=te_match_map, + action_params=te_action_params, + priority=1 + ) + logging.info("============================================================") + print("\n") + + +def table_acl_insert_2(): + logging.info("======================= INSERT TABLE =======================") + te_match_map = { + # "standard_metadata.ingress_port": "0x01&&&0xff", + "hdr.ethernet.src_addr": "aa:bb:cc:dd:ee:11 &&& ff:ff:ff:ff:ff:ff", + "hdr.ethernet.dst_addr": "aa:bb:cc:dd:ee:22 &&& ff:ff:ff:ff:ff:ff", + "hdr.ethernet.ether_type": "0x86DD &&& 0xffff", + "local_metadata.ip_proto": "0x06 &&& 0xff" + } + te_action_name = "IngressPipeImpl.clone_to_cpu" + te_action_params = {} + manager.insert_table_entry( + table_name="IngressPipeImpl.acl_table", + action_name=te_action_name, + match_map=te_match_map, + action_params=te_action_params, + priority=1, + cnt_pkt=0, + cnt_byte=0 + ) + logging.info("============================================================") + print("\n") + + +def table_acl_delete_all(): + logging.info("======================= FLUSH TABLE ========================") + manager.delete_table_entries("IngressPipeImpl.acl_table") + logging.info("============================================================") + print("\n") + +def table_ternary_insert_1(): + logging.info("======================= INSERT TABLE =======================") + te_match_map = {"hdr.ethernet.dst_addr": "aa:bb:cc:dd:ee:22 &&& ff:ff:ff:ff:ff:ff"} + te_action_name = "IngressPipeImpl.set_egress_port1" + te_action_params = {} + manager.insert_table_entry( + table_name="IngressPipeImpl.l2_ternary_table", + action_name=te_action_name, + match_map=te_match_map, + action_params=te_action_params, + priority=1, + cnt_pkt=0, + cnt_byte=0 + ) + logging.info("============================================================") + print("\n") + + +def table_ternary_delete_1(): + logging.info("======================= DELETE TABLE =======================") + te_match_map = {"hdr.ethernet.dst_addr": "aa:bb:cc:dd:ee:22 &&& ff:ff:ff:ff:ff:ff"} + te_action_name = "IngressPipeImpl.set_egress_port" + te_action_params = {} + manager.delete_table_entry( + table_name="IngressPipeImpl.l2_ternary_table", + action_name=te_action_name, + match_map=te_match_map, + action_params=te_action_params, + priority=1 + ) + logging.info("============================================================") + print("\n") + +def table_ternary_insert_parameters1(): + logging.info("======================= INSERT TABLE =======================") + te_match_map = {"hdr.ethernet.dst_addr": "aa:bb:cc:dd:ee:11 &&& ff:ff:ff:ff:ff:ff"} + te_action_name = "IngressPipeImpl.set_egress_port" + te_action_params = {"port_num" : '1'} + manager.insert_table_entry( + table_name="IngressPipeImpl.l2_ternary_table", + action_name=te_action_name, + match_map=te_match_map, + action_params=te_action_params, + priority=1, + cnt_pkt=0, + cnt_byte=0 + ) + logging.info("============================================================") + print("\n") + + +def table_ternary_delete_parameters1(): + logging.info("======================= DELETE TABLE =======================") + te_match_map = {"hdr.ethernet.dst_addr": "aa:bb:cc:dd:ee:11 &&& ff:ff:ff:ff:ff:ff"} + te_action_name = "IngressPipeImpl.set_egress_port" + te_action_params = {"port_num" : '1'} + manager.delete_table_entry( + table_name="IngressPipeImpl.l2_ternary_table", + action_name=te_action_name, + match_map=te_match_map, + action_params=te_action_params, + priority=1 + ) + logging.info("============================================================") + print("\n") + +def table_ternary_insert_parameters2(): + logging.info("======================= INSERT TABLE =======================") + te_match_map = {"hdr.ethernet.dst_addr": "aa:bb:cc:dd:ee:22 &&& ff:ff:ff:ff:ff:ff"} + te_action_name = "IngressPipeImpl.set_egress_port" + te_action_params = {"port_num" : '2'} + manager.insert_table_entry( + table_name="IngressPipeImpl.l2_ternary_table", + action_name=te_action_name, + match_map=te_match_map, + action_params=te_action_params, + priority=1, + cnt_pkt=0, + cnt_byte=0 + ) + logging.info("============================================================") + print("\n") + + +def table_ternary_delete_parameters2(): + logging.info("======================= DELETE TABLE =======================") + te_match_map = {"hdr.ethernet.dst_addr": "aa:bb:cc:dd:ee:22 &&& ff:ff:ff:ff:ff:ff"} + te_action_name = "IngressPipeImpl.set_egress_port" + te_action_params = {"port_num" : '1'} + manager.delete_table_entry( + table_name="IngressPipeImpl.l2_ternary_table", + action_name=te_action_name, + match_map=te_match_map, + action_params=te_action_params, + priority=1 + ) + logging.info("============================================================") + print("\n") + +def dir_counter_acl_insert_1(): + logging.info("======================= INSERT DIR-C =======================") + dc_match_map = {"hdr.ethernet.dst_addr": "aa:bb:cc:dd:ee:22"} + manager.insert_direct_counter_entry( + d_cnt_name="acl_table_counter", + match_map=dc_match_map, + priority=1, + cnt_pkt=0, + cnt_byte=0 + ) + logging.info("============================================================") + print("\n") + + +def dir_counter_acl_flush(): + logging.info("======================= FLUSH DIR-C ========================") + manager.clear_direct_counter_entry( + d_cnt_name="acl_table_counter" + ) + logging.info("============================================================") + print("\n") + + +def test_multicast(): + logging.info("======================== MULTICAST =========================") + gid = 1 + ports = [1, 2, 3] + manager.insert_multicast_group_entry( + group_id=gid, + ports=ports + ) + + mc_groups_nb = manager.count_multicast_groups() + logging.info('# of multicast groups: {}'.format(mc_groups_nb)) + print("\n") + + manager.print_multicast_groups_summary() + + manager.delete_multicast_group_entry(group_id=gid) + manager.delete_multicast_group_entries() + + manager.print_multicast_groups_summary() + + mc_groups_nb = manager.count_multicast_groups() + logging.info('# of multicast groups: {}'.format(mc_groups_nb)) + logging.info("============================================================") + + +def test_clone_session(): + logging.info("======================== CLONE SES =========================") + gid = 1 + sid = 1 + ports = [1, 2, 3] + manager.insert_clone_session_entry( + session_id=gid, + ports=ports + ) + + cs_nb = manager.count_clone_session_entries() + logging.info('# of clone sessions: {}'.format(cs_nb)) + print("\n") + + manager.print_clone_sessions_summary() + + manager.delete_clone_session_entry(session_id=sid) + manager.delete_clone_session_entries() + + manager.print_clone_sessions_summary() + + cs_nb = manager.count_clone_session_entries() + logging.info('# of clone sessions: {}'.format(cs_nb)) + logging.info("============================================================") + + +def print_entries_summary(): + manager.print_table_entries_summary() + manager.print_counter_entries_summary() + manager.print_direct_counter_entries_summary() + manager.print_meter_entries_summary() + manager.print_direct_meter_entries_summary() + + +def print_specs(): + logging.info("Tables' specification") + + for t in manager.get_tables(): + manager.print_table_entries_spec(t.name) + + +def print_entries(): + logging.info("Tables' entries") + + for t in manager.get_tables(): + manager.print_table_entries(t.name) +################################################################################ + + +connect() + +manager = p4_driver.get_manager() + +print("\n") + +# manager.print_objects() + +print("\n") + +cnt = count_report() +assert cnt == 0 + +print("\n") + +table_acl_insert_1() +cnt = count_report() +assert cnt == 1 + +print("\n") + +table_acl_insert_2() +cnt = count_report() +assert cnt == 2 + +print("\n") + +dir_counter_acl_insert_1() +cnt = count_report() +assert cnt == 2 +print("\n") +dir_counter_acl_flush() + +print("\n") + +# print_entries_summary() + +print("\n") + +# print_specs() + +print("\n") + +# print_entries() + +print("\n") + +table_acl_delete_all() +cnt = count_report() +assert cnt == 0 + +#table_ternary_insert_1() +#cnt = count_report() +#assert cnt == 1 +#table_ternary_delete_1() + +table_ternary_insert_parameters1() +cnt = count_report() +assert cnt == 1 + +table_ternary_insert_parameters2() +cnt = count_report() +assert cnt == 2 + +#time.sleep(10) + +table_ternary_delete_parameters1() +cnt = count_report() +assert cnt == 1 + +table_ternary_delete_parameters2() +cnt = count_report() +assert cnt == 0 + + +print("\n") + +############################################################################### + + +print("\n") + +logging.info("================================================================") +logging.info("RPC SetConfig()") +logging.info("================================================================") + +resources = [ + ( + 'table', + { + 'table-name': 'IngressPipeImpl.l2_ternary_table', + 'match-fields': [ + { + 'match-field': 'hdr.ethernet.dst_addr', + 'match-value': 'aa:bb:cc:dd:ee:11 &&& ff:ff:ff:ff:ff:ff' + } + ], + 'action-name': 'IngressPipeImpl.set_egress_port', + 'action-params': [ + { + 'action-param': 'port_num', + 'action-value': '1' + } + ], + 'priority': 1 + } + ), + ( + 'table', + { + 'table-name': 'IngressPipeImpl.l2_ternary_table', + 'match-fields': [ + { + 'match-field': 'hdr.ethernet.dst_addr', + 'match-value': 'aa:bb:cc:dd:ee:22 &&& ff:ff:ff:ff:ff:ff' + } + ], + 'action-name': 'IngressPipeImpl.set_egress_port', + 'action-params': [ + { + 'action-param': 'port_num', + 'action-value': '2' + } + ], + 'priority': 1 + } + ) +] +result = p4_driver.SetConfig(resources) +assert [isinstance(res, bool) and res for res in result], \ + "Failed insertion(s)" +# real_cnt = 0 +# for res in result: +# if isinstance(res, Exception): +# continue +# real_cnt += 1 +cnt = count_report() +assert cnt == 2 +logging.info("================================================================") + +#time.sleep(100) + + + + + + +""" +############################################################################### + +logging.info("================================================================") +logging.info("RPC GetInitialConfig()") +logging.info("================================================================") + +initial_config = p4_driver.GetInitialConfig() +assert len(initial_config) == 0 +logging.info("================================================================") + +############################################################################### + +print("\n") + +logging.info("================================================================") +logging.info("RPC GetConfig()") +logging.info("================================================================") + +resource_keys = manager.get_resource_keys() +resources = p4_driver.GetConfig(resource_keys) +cnt = count_report() +assert cnt == 0 +for res in resources: + assert isinstance(res, Tuple) + assert isinstance(res[0], str) + if not isinstance(res[1], (type(None), Exception)): + pass +# assert p4_driver.count_supported_entities() == 6 +# assert p4_driver.get_manager().count_active_entries() == 0 +print(resources) +logging.info("================================================================") + +############################################################################### + +print("\n") + +logging.info("================================================================") +logging.info("RPC SetConfig()") +logging.info("================================================================") + +resources = [ + ( + "table", + { + "table-name": "IngressPipeImpl.acl_table", + "match-fields": [ + { + "match-field": "hdr.ethernet.src_addr", + "match-value": "aa:bb:cc:dd:ee:11 &&& ff:ff:ff:ff:ff:ff" + }, + { + "match-field": "hdr.ethernet.dst_addr", + "match-value": "aa:bb:cc:dd:ee:22 &&& ff:ff:ff:ff:ff:ff" + }, + { + "match-field": "hdr.ethernet.ether_type", + "match-value": "0x86DD &&& 0xffff" + }, + { + "match-field": "local_metadata.ip_proto", + "match-value": "0x06 &&& 0xff" + } + ], + "action-name": "IngressPipeImpl.clone_to_cpu", + "action-params": [], + "priority": 1 + } + ), + ( + "table", + { + "table-name": "IngressPipeImpl.acl_table", + "match-fields": [ + { + "match-field": "hdr.ethernet.src_addr", + "match-value": "aa:bb:cc:dd:ee:11 &&& ff:ff:ff:ff:ff:ff" + }, + { + "match-field": "hdr.ethernet.dst_addr", + "match-value": "aa:bb:cc:dd:ee:22 &&& ff:ff:ff:ff:ff:ff" + }, + { + "match-field": "hdr.ethernet.ether_type", + "match-value": "0x86DD &&& 0xffff" + }, + { + "match-field": "local_metadata.ip_proto", + "match-value": "0x06 &&& 0xff" + } + ], + "action-name": "IngressPipeImpl.clone_to_cpu", + "action-params": [], + "priority": 2 + } + ), + ( + "table", + { + "table-name": "IngressPipeImpl.acl_table", + "match-fields": [ + { + "match-field": "hdr.ethernet.src_addr", + "match-value": "aa:bb:cc:dd:ee:33" + } + ], + "action-name": "IngressPipeImpl.send_to_cpu", + "action-params": [], + "priority": 2 + } + ), + ( + "direct_counter", + { + "direct-counter-name": "acl_table_counter", + "match-fields": [ + { + "match-field": "hdr.ethernet.src_addr", + "match-value": "aa:bb:cc:dd:ee:11 &&& ff:ff:ff:ff:ff:ff" + }, + { + "match-field": "hdr.ethernet.dst_addr", + "match-value": "aa:bb:cc:dd:ee:22 &&& ff:ff:ff:ff:ff:ff" + }, + { + "match-field": "hdr.ethernet.ether_type", + "match-value": "0x86DD &&& 0xffff" + }, + { + "match-field": "local_metadata.ip_proto", + "match-value": "0x06 &&& 0xff" + } + ], + "priority": 1, + "packet-count": 10, + "byte-count": 1000 + } + ), + ( + 'table', + { + 'table-name': 'IngressPipeImpl.l2_ternary_table', + 'match-fields': [ + { + 'match-field': 'hdr.ethernet.dst_addr', + 'match-value': 'aa:bb:cc:dd:ee:11 &&& ff:ff:ff:ff:ff:ff' + } + ], + 'action-name': 'IngressPipeImpl.set_egress_port', + 'action-params': [ +# { +# 'action-param': 'port_num', +# 'action-value': '1' +# } + ], + 'priority': 1 + } + ) +] +result = p4_driver.SetConfig(resources) +assert [isinstance(res, bool) and res for res in result], \ + "Failed insertion(s)" +# real_cnt = 0 +# for res in result: +# if isinstance(res, Exception): +# continue +# real_cnt += 1 +cnt = count_report() +assert cnt == 3 +logging.info("================================================================") + + +############################################################################### + +print("\n") + +logging.info("================================================================") +logging.info("RPC GetConfig()") +logging.info("================================================================") + +resource_keys = manager.get_resource_keys() +resources = p4_driver.GetConfig(resource_keys) +cnt = count_report() +assert cnt == 3 +print(resources) +logging.info("================================================================") + + +############################################################################### + + +print("\n") + +logging.info("================================================================") +logging.info("RPC DeleteConfig()") +logging.info("================================================================") + +resources = [ + ( + "table", + { + "table-name": "IngressPipeImpl.acl_table", + "match-fields": [ + { + "match-field": "hdr.ethernet.src_addr", + "match-value": "aa:bb:cc:dd:ee:11 &&& ff:ff:ff:ff:ff:ff" + }, + { + "match-field": "hdr.ethernet.dst_addr", + "match-value": "aa:bb:cc:dd:ee:22 &&& ff:ff:ff:ff:ff:ff" + }, + { + "match-field": "hdr.ethernet.ether_type", + "match-value": "0x86DD &&& 0xffff" + }, + { + "match-field": "local_metadata.ip_proto", + "match-value": "0x06 &&& 0xff" + } + ], + "action-name": "IngressPipeImpl.clone_to_cpu", + "action-params": [], + "priority": 1 + } + ), + ( + "table", + { + "table-name": "IngressPipeImpl.acl_table", + "match-fields": [ + { + "match-field": "hdr.ethernet.src_addr", + "match-value": "aa:bb:cc:dd:ee:11 &&& ff:ff:ff:ff:ff:ff" + }, + { + "match-field": "hdr.ethernet.dst_addr", + "match-value": "aa:bb:cc:dd:ee:22 &&& ff:ff:ff:ff:ff:ff" + }, + { + "match-field": "hdr.ethernet.ether_type", + "match-value": "0x86DD &&& 0xffff" + }, + { + "match-field": "local_metadata.ip_proto", + "match-value": "0x06 &&& 0xff" + } + ], + "action-name": "IngressPipeImpl.clone_to_cpu", + "action-params": [], + "priority": 2 + } + ), + ( + "direct_counter", + { + "direct-counter-name": "acl_table_counter" + } + ), + ( + 'table', + { + 'table-name': 'IngressPipeImpl.l2_ternary_table', + 'match-fields': [ + { + 'match-field': 'hdr.ethernet.dst_addr', + 'match-value': 'aa:bb:cc:dd:ee:11 &&& ff:ff:ff:ff:ff:ff' + } + ], + 'action-name': 'IngressPipeImpl.set_egress_port1', + 'action-params': [ +# { +# 'action-param': 'port_num', +# 'action-value': '1' +# } + ], + 'priority': 1 + } + ) +] + +p4_driver.DeleteConfig(resources) +cnt = count_report() +assert cnt == 1 + +table_acl_delete_all() +cnt = count_report() +assert cnt == 0 +logging.info("================================================================") + +""" +disconnect() + +sys.exit(0) diff --git a/src/device/service/drivers/p4/p4_manager.py b/src/device/service/drivers/p4/p4_manager.py index 65f8602ea30fa2d8cd06b09655ee4ee63d045a97..803624a610a13758c280f0518e876b025a1bb375 100644 --- a/src/device/service/drivers/p4/p4_manager.py +++ b/src/device/service/drivers/p4/p4_manager.py @@ -55,7 +55,7 @@ LOGGER = logging.getLogger(__name__) CONTEXT = Context() # Global P4Runtime client -CLIENT = None +CLIENTS = {} # Constant P4 entities KEY_TABLE = "table" @@ -77,22 +77,22 @@ def get_context(): return CONTEXT -def get_client(): +#def get_client(): """ Return P4 client. :return: P4Runtime client object """ - return CLIENT +# return CLIENT -def get_api_version(): +#def get_api_version(): """ Get the supported P4Runtime API version. :return: API version """ - return CLIENT.api_version() +# return CLIENT.api_version() def get_table_type(table): @@ -136,171 +136,28 @@ def match_type_to_str(match_type): return None -def insert_table_entry_exact( - table_name, match_map, action_name, action_params, metadata, - cnt_pkt=-1, cnt_byte=-1): - """ - Insert an entry into an exact match table. - - :param table_name: P4 table name - :param match_map: Map of match operations - :param action_name: Action name - :param action_params: Map of action parameters - :param metadata: table metadata - :param cnt_pkt: packet count - :param cnt_byte: byte count - :return: inserted entry - """ - assert match_map, "Table entry without match operations is not accepted" - assert action_name, "Table entry without action is not accepted" - - table_entry = TableEntry(table_name)(action=action_name) - - for match_k, match_v in match_map.items(): - table_entry.match[match_k] = match_v - - for action_k, action_v in action_params.items(): - table_entry.action[action_k] = action_v - - if metadata: - table_entry.metadata = metadata - - if cnt_pkt > 0: - table_entry.counter_data.packet_count = cnt_pkt - - if cnt_byte > 0: - table_entry.counter_data.byte_count = cnt_byte - - ex_msg = "" - try: - table_entry.insert() - LOGGER.info("Inserted exact table entry: %s", table_entry) - except (P4RuntimeException, P4RuntimeWriteException) as ex: - raise P4RuntimeException from ex - - # Table entry exists, needs to be modified - if "ALREADY_EXISTS" in ex_msg: - table_entry.modify() - LOGGER.info("Updated exact table entry: %s", table_entry) - - return table_entry - - -def insert_table_entry_ternary( - table_name, match_map, action_name, action_params, metadata, - priority, cnt_pkt=-1, cnt_byte=-1): - """ - Insert an entry into a ternary match table. - - :param table_name: P4 table name - :param match_map: Map of match operations - :param action_name: Action name - :param action_params: Map of action parameters - :param metadata: table metadata - :param priority: entry priority - :param cnt_pkt: packet count - :param cnt_byte: byte count - :return: inserted entry - """ - assert match_map, "Table entry without match operations is not accepted" - assert action_name, "Table entry without action is not accepted" - - table_entry = TableEntry(table_name)(action=action_name) - - for match_k, match_v in match_map.items(): - table_entry.match[match_k] = match_v - - for action_k, action_v in action_params.items(): - table_entry.action[action_k] = action_v - - table_entry.priority = priority - - if metadata: - table_entry.metadata = metadata - - if cnt_pkt > 0: - table_entry.counter_data.packet_count = cnt_pkt - - if cnt_byte > 0: - table_entry.counter_data.byte_count = cnt_byte - - ex_msg = "" - try: - table_entry.insert() - LOGGER.info("Inserted ternary table entry: %s", table_entry) - except (P4RuntimeException, P4RuntimeWriteException) as ex: - raise P4RuntimeException from ex - - # Table entry exists, needs to be modified - if "ALREADY_EXISTS" in ex_msg: - table_entry.modify() - LOGGER.info("Updated ternary table entry: %s", table_entry) - - return table_entry - - -def insert_table_entry_range( - table_name, match_map, action_name, action_params, metadata, - priority, cnt_pkt=-1, cnt_byte=-1): # pylint: disable=unused-argument - """ - Insert an entry into a range match table. - - :param table_name: P4 table name - :param match_map: Map of match operations - :param action_name: Action name - :param action_params: Map of action parameters - :param metadata: table metadata - :param priority: entry priority - :param cnt_pkt: packet count - :param cnt_byte: byte count - :return: inserted entry - """ - assert match_map, "Table entry without match operations is not accepted" - assert action_name, "Table entry without action is not accepted" - - raise NotImplementedError( - "Range-based table insertion not implemented yet") - - -def insert_table_entry_optional( - table_name, match_map, action_name, action_params, metadata, - priority, cnt_pkt=-1, cnt_byte=-1): # pylint: disable=unused-argument - """ - Insert an entry into an optional match table. - - :param table_name: P4 table name - :param match_map: Map of match operations - :param action_name: Action name - :param action_params: Map of action parameters - :param metadata: table metadata - :param priority: entry priority - :param cnt_pkt: packet count - :param cnt_byte: byte count - :return: inserted entry - """ - assert match_map, "Table entry without match operations is not accepted" - assert action_name, "Table entry without action is not accepted" - - raise NotImplementedError( - "Optional-based table insertion not implemented yet") - class P4Manager: """ Class to manage the runtime entries of a P4 pipeline. """ + localCLIENT = None + key_id = None def __init__(self, device_id: int, ip_address: str, port: int, election_id: tuple, role_name=None, ssl_options=None): - global CLIENT + global CLIENTS self.__id = device_id self.__ip_address = ip_address self.__port = int(port) self.__endpoint = f"{self.__ip_address}:{self.__port}" - CLIENT = P4RuntimeClient( + self.key_id = ip_address+str(port) + CLIENTS[self.key_id] = P4RuntimeClient( self.__id, self.__endpoint, election_id, role_name, ssl_options) self.__p4info = None + + self.localCLIENT = CLIENTS[self.key_id] # Internal memory for whitebox management # | -> P4 entities @@ -339,27 +196,27 @@ class P4Manager: # Forwarding pipeline is only set iff both files are present if p4bin_path and p4info_path: try: - CLIENT.set_fwd_pipe_config(p4info_path, p4bin_path) + self.localCLIENT.set_fwd_pipe_config(p4info_path, p4bin_path) except FileNotFoundError as ex: LOGGER.critical(ex) - CLIENT.tear_down() + self.localCLIENT.tear_down() raise FileNotFoundError(ex) from ex except P4RuntimeException as ex: LOGGER.critical("Error when setting config") LOGGER.critical(ex) - CLIENT.tear_down() + self.localCLIENT.tear_down() raise P4RuntimeException(ex) from ex except Exception as ex: # pylint: disable=broad-except LOGGER.critical("Error when setting config") - CLIENT.tear_down() + self.localCLIENT.tear_down() raise Exception(ex) from ex try: - self.__p4info = CLIENT.get_p4info() + self.__p4info = self.localCLIENT.get_p4info() except P4RuntimeException as ex: LOGGER.critical("Error when retrieving P4Info") LOGGER.critical(ex) - CLIENT.tear_down() + self.localCLIENT.tear_down() raise P4RuntimeException(ex) from ex CONTEXT.set_p4info(self.__p4info) @@ -375,14 +232,15 @@ class P4Manager: :return: void """ - global CLIENT + global CLIENTS # gRPC client must already be instantiated - assert CLIENT + assert self.localCLIENT # Trigger connection tear down with the P4Runtime server - CLIENT.tear_down() - CLIENT = None + self.localCLIENT.tear_down() + # Remove client entry from global dictionary + CLIENTS.pop(self.key_id) self.__clear() LOGGER.info("P4Runtime manager stopped") @@ -723,7 +581,7 @@ class P4Manager: try: for count, table_entry in enumerate( - TableEntry(table_name)(action=action_name).read()): + TableEntry(self, table_name)(action=action_name).read()): LOGGER.debug( "Table %s - Entry %d\n%s", table_name, count, table_entry) self.table_entries[table_name].append(table_entry) @@ -856,6 +714,154 @@ class P4Manager: ) return None + def insert_table_entry_exact(self, + table_name, match_map, action_name, action_params, metadata, + cnt_pkt=-1, cnt_byte=-1): + """ + Insert an entry into an exact match table. + + :param table_name: P4 table name + :param match_map: Map of match operations + :param action_name: Action name + :param action_params: Map of action parameters + :param metadata: table metadata + :param cnt_pkt: packet count + :param cnt_byte: byte count + :return: inserted entry + """ + assert match_map, "Table entry without match operations is not accepted" + assert action_name, "Table entry without action is not accepted" + + table_entry = TableEntry(self, table_name)(action=action_name) + + for match_k, match_v in match_map.items(): + table_entry.match[match_k] = match_v + + for action_k, action_v in action_params.items(): + table_entry.action[action_k] = action_v + + if metadata: + table_entry.metadata = metadata + + if cnt_pkt > 0: + table_entry.counter_data.packet_count = cnt_pkt + + if cnt_byte > 0: + table_entry.counter_data.byte_count = cnt_byte + + ex_msg = "" + try: + table_entry.insert() + LOGGER.info("Inserted exact table entry: %s", table_entry) + except (P4RuntimeException, P4RuntimeWriteException) as ex: + raise P4RuntimeException from ex + + # Table entry exists, needs to be modified + if "ALREADY_EXISTS" in ex_msg: + table_entry.modify() + LOGGER.info("Updated exact table entry: %s", table_entry) + + return table_entry + + + def insert_table_entry_ternary(self, + table_name, match_map, action_name, action_params, metadata, + priority, cnt_pkt=-1, cnt_byte=-1): + """ + Insert an entry into a ternary match table. + + :param table_name: P4 table name + :param match_map: Map of match operations + :param action_name: Action name + :param action_params: Map of action parameters + :param metadata: table metadata + :param priority: entry priority + :param cnt_pkt: packet count + :param cnt_byte: byte count + :return: inserted entry + """ + assert match_map, "Table entry without match operations is not accepted" + assert action_name, "Table entry without action is not accepted" + + table_entry = TableEntry(self, table_name)(action=action_name) + + for match_k, match_v in match_map.items(): + table_entry.match[match_k] = match_v + + for action_k, action_v in action_params.items(): + table_entry.action[action_k] = action_v + + table_entry.priority = priority + + if metadata: + table_entry.metadata = metadata + + if cnt_pkt > 0: + table_entry.counter_data.packet_count = cnt_pkt + + if cnt_byte > 0: + table_entry.counter_data.byte_count = cnt_byte + + ex_msg = "" + try: + table_entry.insert() + LOGGER.info("Inserted ternary table entry: %s", table_entry) + except (P4RuntimeException, P4RuntimeWriteException) as ex: + raise P4RuntimeException from ex + + # Table entry exists, needs to be modified + if "ALREADY_EXISTS" in ex_msg: + table_entry.modify() + LOGGER.info("Updated ternary table entry: %s", table_entry) + + return table_entry + + + def insert_table_entry_range(self, + table_name, match_map, action_name, action_params, metadata, + priority, cnt_pkt=-1, cnt_byte=-1): # pylint: disable=unused-argument + """ + Insert an entry into a range match table. + + :param table_name: P4 table name + :param match_map: Map of match operations + :param action_name: Action name + :param action_params: Map of action parameters + :param metadata: table metadata + :param priority: entry priority + :param cnt_pkt: packet count + :param cnt_byte: byte count + :return: inserted entry + """ + assert match_map, "Table entry without match operations is not accepted" + assert action_name, "Table entry without action is not accepted" + + raise NotImplementedError( + "Range-based table insertion not implemented yet") + + + def insert_table_entry_optional(self, + table_name, match_map, action_name, action_params, metadata, + priority, cnt_pkt=-1, cnt_byte=-1): # pylint: disable=unused-argument + """ + Insert an entry into an optional match table. + + :param table_name: P4 table name + :param match_map: Map of match operations + :param action_name: Action name + :param action_params: Map of action parameters + :param metadata: table metadata + :param priority: entry priority + :param cnt_pkt: packet count + :param cnt_byte: byte count + :return: inserted entry + """ + assert match_map, "Table entry without match operations is not accepted" + assert action_name, "Table entry without action is not accepted" + + raise NotImplementedError( + "Optional-based table insertion not implemented yet") + def insert_table_entry(self, table_name, match_map, action_name, action_params, priority, metadata=None, cnt_pkt=-1, cnt_byte=-1): @@ -889,26 +895,26 @@ class P4Manager: # Exact match is supported if get_table_type(table) == p4info_pb2.MatchField.EXACT: - return insert_table_entry_exact( + return self.insert_table_entry_exact( table_name, match_map, action_name, action_params, metadata, cnt_pkt, cnt_byte) # Ternary and LPM matches are supported if get_table_type(table) in \ [p4info_pb2.MatchField.TERNARY, p4info_pb2.MatchField.LPM]: - return insert_table_entry_ternary( + return self.insert_table_entry_ternary( table_name, match_map, action_name, action_params, metadata, priority, cnt_pkt, cnt_byte) # TODO: Cover RANGE match # pylint: disable=W0511 if get_table_type(table) == p4info_pb2.MatchField.RANGE: - return insert_table_entry_range( + return self.insert_table_entry_range( table_name, match_map, action_name, action_params, metadata, priority, cnt_pkt, cnt_byte) # TODO: Cover OPTIONAL match # pylint: disable=W0511 if get_table_type(table) == p4info_pb2.MatchField.OPTIONAL: - return insert_table_entry_optional( + return self.insert_table_entry_optional( table_name, match_map, action_name, action_params, metadata, priority, cnt_pkt, cnt_byte) @@ -935,7 +941,7 @@ class P4Manager: LOGGER.error(msg) raise UserError(msg) - table_entry = TableEntry(table_name)(action=action_name) + table_entry = TableEntry(self, table_name)(action=action_name) for match_k, match_v in match_map.items(): table_entry.match[match_k] = match_v @@ -979,7 +985,7 @@ class P4Manager: LOGGER.error(msg) raise UserError(msg) - TableEntry(table_name).read(function=lambda x: x.delete()) + TableEntry(self, table_name).read(function=lambda x: x.delete()) LOGGER.info("Deleted all entries from table: %s", table_name) def print_table_entries_spec(self, table_name): @@ -1179,7 +1185,7 @@ class P4Manager: self.counter_entries[cnt_name] = [] try: - for count, cnt_entry in enumerate(CounterEntry(cnt_name).read()): + for count, cnt_entry in enumerate(CounterEntry(self, cnt_name).read()): LOGGER.debug( "Counter %s - Entry %d\n%s", cnt_name, count, cnt_entry) self.counter_entries[cnt_name].append(cnt_entry) @@ -1298,7 +1304,7 @@ class P4Manager: assert cnt, \ "P4 pipeline does not implement counter " + cnt_name - cnt_entry = CounterEntry(cnt_name) + cnt_entry = CounterEntry(self, cnt_name) if index: cnt_entry.index = index @@ -1325,7 +1331,7 @@ class P4Manager: assert cnt, \ "P4 pipeline does not implement counter " + cnt_name - cnt_entry = CounterEntry(cnt_name) + cnt_entry = CounterEntry(self, cnt_name) cnt_entry.clear_data() LOGGER.info("Cleared data of counter entry: %s", cnt_entry) @@ -1394,7 +1400,7 @@ class P4Manager: try: for count, d_cnt_entry in enumerate( - DirectCounterEntry(d_cnt_name).read()): + DirectCounterEntry(self, d_cnt_name).read()): LOGGER.debug( "Direct counter %s - Entry %d\n%s", d_cnt_name, count, d_cnt_entry) @@ -1530,7 +1536,7 @@ class P4Manager: assert match_map,\ "Direct counter entry without match operations is not accepted" - d_cnt_entry = DirectCounterEntry(d_cnt_name) + d_cnt_entry = DirectCounterEntry(self, d_cnt_name) for match_k, match_v in match_map.items(): d_cnt_entry.table_entry.match[match_k] = match_v @@ -1559,7 +1565,7 @@ class P4Manager: assert d_cnt, \ "P4 pipeline does not implement direct counter " + d_cnt_name - d_cnt_entry = DirectCounterEntry(d_cnt_name) + d_cnt_entry = DirectCounterEntry(self, d_cnt_name) d_cnt_entry.clear_data() LOGGER.info("Cleared direct counter entry: %s", d_cnt_entry) @@ -2100,7 +2106,7 @@ class P4Manager: try: for count, ap_entry in enumerate( - ActionProfileMember(ap_name).read()): + ActionProfileMember(self, ap_name).read()): LOGGER.debug( "Action profile member %s - Entry %d\n%s", ap_name, count, ap_entry) @@ -2230,7 +2236,7 @@ class P4Manager: assert act_p, \ "P4 pipeline does not implement action profile " + ap_name - ap_member_entry = ActionProfileMember(ap_name)( + ap_member_entry = ActionProfileMember(self, ap_name)( member_id=member_id, action=action_name) for action_k, action_v in action_params.items(): @@ -2267,7 +2273,7 @@ class P4Manager: assert act_p, \ "P4 pipeline does not implement action profile " + ap_name - ap_member_entry = ActionProfileMember(ap_name)( + ap_member_entry = ActionProfileMember(self, ap_name)( member_id=member_id, action=action_name) ap_member_entry.delete() LOGGER.info("Deleted action profile member entry: %s", ap_member_entry) @@ -2364,7 +2370,7 @@ class P4Manager: try: for count, ap_entry in enumerate( - ActionProfileGroup(ap_name).read()): + ActionProfileGroup(self, ap_name).read()): LOGGER.debug("Action profile group %s - Entry %d\n%s", ap_name, count, ap_entry) self.action_profile_groups[ap_name].append(ap_entry) @@ -2483,7 +2489,7 @@ class P4Manager: assert ap, \ "P4 pipeline does not implement action profile " + ap_name - ap_group_entry = ActionProfileGroup(ap_name)(group_id=group_id) + ap_group_entry = ActionProfileGroup(self, ap_name)(group_id=group_id) if members: for m in members: @@ -2519,7 +2525,7 @@ class P4Manager: assert ap, \ "P4 pipeline does not implement action profile " + ap_name - ap_group_entry = ActionProfileGroup(ap_name)(group_id=group_id) + ap_group_entry = ActionProfileGroup(self, ap_name)(group_id=group_id) ap_group_entry.delete() LOGGER.info("Deleted action profile group entry: %s", ap_group_entry) @@ -2537,7 +2543,7 @@ class P4Manager: assert ap, \ "P4 pipeline does not implement action profile " + ap_name - ap_group_entry = ActionProfileGroup(ap_name)(group_id=group_id) + ap_group_entry = ActionProfileGroup(self, ap_name)(group_id=group_id) ap_group_entry.clear() LOGGER.info("Cleared action profile group entry: %s", ap_group_entry) @@ -2631,7 +2637,7 @@ class P4Manager: self.multicast_groups[group_id] = None try: - mcast_group = MulticastGroupEntry(group_id).read() + mcast_group = MulticastGroupEntry(self, group_id).read() LOGGER.debug("Multicast group %d\n%s", group_id, mcast_group) self.multicast_groups[group_id] = mcast_group return self.multicast_groups[group_id] @@ -2724,7 +2730,7 @@ class P4Manager: assert ports, \ "No multicast group ports are provided" - mcast_group = MulticastGroupEntry(group_id) + mcast_group = MulticastGroupEntry(self, group_id) for p in ports: mcast_group.add(p, 1) @@ -2756,7 +2762,7 @@ class P4Manager: assert group_id > 0, \ "Multicast group " + group_id + " must be > 0" - mcast_group = MulticastGroupEntry(group_id) + mcast_group = MulticastGroupEntry(self, group_id) mcast_group.delete() if group_id in self.multicast_groups: @@ -2772,7 +2778,7 @@ class P4Manager: :return: void """ - for mcast_group in MulticastGroupEntry().read(): + for mcast_group in MulticastGroupEntry(self).read(): gid = mcast_group.group_id mcast_group.delete() del self.multicast_groups[gid] @@ -2828,7 +2834,7 @@ class P4Manager: self.clone_session_entries[session_id] = None try: - session = CloneSessionEntry(session_id).read() + session = CloneSessionEntry(self, session_id).read() LOGGER.debug("Clone session %d\n%s", session_id, session) self.clone_session_entries[session_id] = session return self.clone_session_entries[session_id] @@ -2923,7 +2929,7 @@ class P4Manager: assert ports, \ "No clone session ports are provided" - session = CloneSessionEntry(session_id) + session = CloneSessionEntry(self, session_id) for p in ports: session.add(p, 1) @@ -2955,7 +2961,7 @@ class P4Manager: assert session_id > 0, \ "Clone session " + session_id + " must be > 0" - session = CloneSessionEntry(session_id) + session = CloneSessionEntry(self, session_id) session.delete() if session_id in self.clone_session_entries: @@ -2971,7 +2977,7 @@ class P4Manager: :return: void """ - for e in CloneSessionEntry().read(): + for e in CloneSessionEntry(self).read(): sid = e.session_id e.delete() del self.clone_session_entries[sid] @@ -3052,7 +3058,7 @@ class P4Manager: "No controller packet metadata in the pipeline\n") return None - packet_in = PacketOut() + packet_in = PacketIn(self) packet_in.payload = payload if metadata: for name, value in metadata.items(): @@ -3090,7 +3096,7 @@ class P4Manager: _t = Thread(target=_sniff_packet, args=(captured_packet,)) _t.start() # P4Runtime client sends the packet to the switch - CLIENT.stream_in_q["packet"].put(packet_in) + self.localCLIENT.stream_in_q["packet"].put(packet_in) _t.join() LOGGER.info("Packet-in sent: %s", packet_in) @@ -3111,7 +3117,7 @@ class P4Manager: "No controller packet metadata in the pipeline\n") return None - packet_out = PacketOut() + packet_out = PacketOut(self) packet_out.payload = payload if metadata: for name, value in metadata.items(): @@ -3654,12 +3660,14 @@ class _EntityBase: """ Basic entity. """ + localCLIENT = None - def __init__(self, entity_type, p4runtime_cls, modify_only=False): + def __init__(self, p4_man, entity_type, p4runtime_cls, modify_only=False): self._init = False self._entity_type = entity_type self._entry = p4runtime_cls() self._modify_only = modify_only + self.localCLIENT = p4_man.localCLIENT def __dir__(self): d = ["msg", "read"] @@ -3696,7 +3704,7 @@ class _EntityBase: update = p4runtime_pb2.Update() update.type = type_ getattr(update.entity, self._entity_type.name).CopyFrom(self._entry) - CLIENT.write_update(update) + self.localCLIENT.write_update(update) def insert(self): """ @@ -3747,7 +3755,7 @@ class _EntityBase: entity = p4runtime_pb2.Entity() getattr(entity, self._entity_type.name).CopyFrom(self._entry) - iterator = CLIENT.read_one(entity) + iterator = self.localCLIENT.read_one(entity) # Cannot use a (simpler) generator here as we need to # decorate __next__ with @parse_p4runtime_error. @@ -3794,9 +3802,9 @@ class _P4EntityBase(_EntityBase): Basic P4 entity. """ - def __init__(self, p4_type, entity_type, p4runtime_cls, name=None, + def __init__(self, p4_man, p4_type, entity_type, p4runtime_cls, name=None, modify_only=False): - super().__init__(entity_type, p4runtime_cls, modify_only) + super().__init__(p4_man, entity_type, p4runtime_cls, modify_only) self._p4_type = p4_type if name is None: raise UserError( @@ -3825,8 +3833,8 @@ class ActionProfileMember(_P4EntityBase): P4 action profile member. """ - def __init__(self, action_profile_name=None): - super().__init__( + def __init__(self, p4_man, action_profile_name=None): + super().__init__( p4_man, P4Type.action_profile, P4RuntimeEntity.action_profile_member, p4runtime_pb2.ActionProfileMember, action_profile_name) self.member_id = 0 @@ -3991,8 +3999,8 @@ class ActionProfileGroup(_P4EntityBase): P4 action profile group. """ - def __init__(self, action_profile_name=None): - super().__init__( + def __init__(self, p4_man, action_profile_name=None): + super().__init__( p4_man, P4Type.action_profile, P4RuntimeEntity.action_profile_group, p4runtime_pb2.ActionProfileGroup, action_profile_name) self.group_id = 0 @@ -4554,8 +4562,8 @@ class TableEntry(_P4EntityBase): "oneshot": cls._ActionSpecType.ONESHOT, }.get(name, None) - def __init__(self, table_name=None): - super().__init__( + def __init__(self, p4_man, table_name=None): + super().__init__(p4_man, P4Type.table, P4RuntimeEntity.table_entry, p4runtime_pb2.TableEntry, table_name) self.match = MatchKey(table_name, self._info.match_fields) @@ -4996,8 +5004,8 @@ class _CounterEntryBase(_P4EntityBase): Basic P4 counter entry. """ - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, p4_man, *args, **kwargs): + super().__init__(p4_man, *args, **kwargs) self._counter_type = self._info.spec.unit self.packet_count = -1 self.byte_count = -1 @@ -5065,8 +5073,8 @@ class CounterEntry(_CounterEntryBase): P4 counter entry. """ - def __init__(self, counter_name=None): - super().__init__( + def __init__(self, p4_man, counter_name=None): + super().__init__( p4_man, P4Type.counter, P4RuntimeEntity.counter_entry, p4runtime_pb2.CounterEntry, counter_name, modify_only=True) @@ -5126,10 +5134,11 @@ To write to the counter, use <self>.modify class DirectCounterEntry(_CounterEntryBase): """ Direct P4 counter entry. - """ + """ + P4_MANAGER = None - def __init__(self, direct_counter_name=None): - super().__init__( + def __init__(self, p4_man, direct_counter_name=None): + super().__init__( p4_man, P4Type.direct_counter, P4RuntimeEntity.direct_counter_entry, p4runtime_pb2.DirectCounterEntry, direct_counter_name, modify_only=True) @@ -5140,7 +5149,8 @@ class DirectCounterEntry(_CounterEntryBase): except KeyError as ex: raise InvalidP4InfoError(f"direct_table_id {self._direct_table_id} " f"is not a valid table id") from ex - self._table_entry = TableEntry(self._direct_table_name) + self._table_entry = TableEntry(p4_man, self._direct_table_name) + self.P4_MANAGER = p4_man self.__doc__ = f""" An entry for direct counter '{direct_counter_name}' @@ -5167,7 +5177,7 @@ To write to the counter, use <self>.modify raise UserError("Direct counters are not index-based") if name == "table_entry": if value is None: - self._table_entry = TableEntry(self._direct_table_name) + self._table_entry = TableEntry(self.P4_MANAGER, self._direct_table_name) return if not isinstance(value, TableEntry): raise UserError("table_entry must be an instance of TableEntry") @@ -5356,6 +5366,7 @@ class DirectMeterEntry(_MeterEntryBase): """ Direct P4 meter entry. """ + P4_MANAGER = None def __init__(self, direct_meter_name=None): super().__init__( @@ -5369,7 +5380,8 @@ class DirectMeterEntry(_MeterEntryBase): except KeyError as ex: raise InvalidP4InfoError(f"direct_table_id {self._direct_table_id} " f"is not a valid table id") from ex - self._table_entry = TableEntry(self._direct_table_name) + self._table_entry = TableEntry(p4_man, self._direct_table_name) + self.P4_MANAGER = p4_man self.__doc__ = f""" An entry for direct meter '{direct_meter_name}' @@ -5399,7 +5411,7 @@ To write to the meter, use <self>.modify raise UserError("Direct meters are not index-based") if name == "table_entry": if value is None: - self._table_entry = TableEntry(self._direct_table_name) + self._table_entry = TableEntry(self.P4_MANAGER, self._direct_table_name) return if not isinstance(value, TableEntry): raise UserError("table_entry must be an instance of TableEntry") @@ -5531,8 +5543,8 @@ class MulticastGroupEntry(_EntityBase): P4 multicast group entry. """ - def __init__(self, group_id=0): - super().__init__( + def __init__(self, p4_man, group_id=0): + super().__init__(p4_man, P4RuntimeEntity.packet_replication_engine_entry, p4runtime_pb2.PacketReplicationEngineEntry) self.group_id = group_id @@ -5609,8 +5621,8 @@ class CloneSessionEntry(_EntityBase): P4 clone session entry. """ - def __init__(self, session_id=0): - super().__init__( + def __init__(self, p4_man, session_id=0): + super().__init__(p4_man, P4RuntimeEntity.packet_replication_engine_entry, p4runtime_pb2.PacketReplicationEngineEntry) self.session_id = session_id @@ -5779,8 +5791,9 @@ class PacketIn(): """ P4 packet in. """ + localCLIENT = None - def __init__(self): + def __init__(self, p4_man): ctrl_pkt_md = P4Objects(P4Type.controller_packet_metadata) self.md_info_list = {} if "packet_in" in ctrl_pkt_md: @@ -5788,10 +5801,11 @@ class PacketIn(): for md_info in self.p4_info.metadata: self.md_info_list[md_info.name] = md_info self.packet_in_queue = queue.Queue() + self.localCLIENT = p4_man.localCLIENT def _packet_in_recv_func(packet_in_queue): while True: - msg = CLIENT.get_stream_packet("packet", timeout=None) + msg = self.localCLIENT.get_stream_packet("packet", timeout=None) if not msg: break packet_in_queue.put(msg) @@ -5857,8 +5871,9 @@ class PacketOut: """ P4 packet out. """ + localCLIENT = None - def __init__(self, payload=b'', **kwargs): + def __init__(self, p4_man, payload=b'', **kwargs): self.p4_info = P4Objects(P4Type.controller_packet_metadata)[ "packet_out"] @@ -5868,6 +5883,7 @@ class PacketOut: if kwargs: for key, value in kwargs.items(): self.metadata[key] = value + self.localCLIENT = p4_man.localCLIENT def _update_msg(self): self._entry = p4runtime_pb2.PacketOut() @@ -5897,7 +5913,7 @@ class PacketOut: self._update_msg() msg = p4runtime_pb2.StreamMessageRequest() msg.packet.CopyFrom(self._entry) - CLIENT.stream_out_q.put(msg) + self.localCLIENT.stream_out_q.put(msg) def str(self): """ @@ -5913,13 +5929,16 @@ class IdleTimeoutNotification(): """ P4 idle timeout notification. """ + + localCLIENT = None - def __init__(self): + def __init__(self, p4_man): self.notification_queue = queue.Queue() + self.localCLIENT = p4_man.localCLIENT def _notification_recv_func(notification_queue): while True: - msg = CLIENT.get_stream_packet("idle_timeout_notification", + msg = self.localCLIENT.get_stream_packet("idle_timeout_notification", timeout=None) if not msg: break