diff --git a/src/device/requirements.in b/src/device/requirements.in index 78abc96ad994c5d2acdb92ba9108b0527d61ecca..b7ee5dacdec0dfd7b493ce51741b704987b9e4b5 100644 --- a/src/device/requirements.in +++ b/src/device/requirements.in @@ -13,3 +13,4 @@ pytz redis requests xmltodict +p4runtime==1.3.0 diff --git a/src/device/service/driver_api/FilterFields.py b/src/device/service/driver_api/FilterFields.py index c7de05f92a743c826d54897930b10013bd09c2b7..892e7f72056cd3342ce04190710d492ec83a02d3 100644 --- a/src/device/service/driver_api/FilterFields.py +++ b/src/device/service/driver_api/FilterFields.py @@ -8,6 +8,7 @@ class DeviceTypeFilterFieldEnum(Enum): OPTICAL_LINE_SYSTEM = 'optical-line-system' PACKET_ROUTER = 'packet-router' PACKET_SWITCH = 'packet-switch' + P4_SWITCH = 'p4-switch' class FilterFieldEnum(Enum): DEVICE_TYPE = 'device_type' diff --git a/src/device/service/drivers/__init__.py b/src/device/service/drivers/__init__.py index e59bae207e4cd14f238aabcb7e373bb973374005..54c944bab8183d57478a07ea46a9aa55b439f0b1 100644 --- a/src/device/service/drivers/__init__.py +++ b/src/device/service/drivers/__init__.py @@ -2,6 +2,7 @@ from ..driver_api.FilterFields import FilterFieldEnum, DeviceTypeFilterFieldEnum from .emulated.EmulatedDriver import EmulatedDriver from .openconfig.OpenConfigDriver import OpenConfigDriver from .transport_api.TransportApiDriver import TransportApiDriver +from .p4.p4_driver import P4Driver DRIVERS = [ (EmulatedDriver, [ @@ -22,4 +23,10 @@ DRIVERS = [ FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.TRANSPORT_API, } ]), + (P4Driver, [ + { + FilterFieldEnum.DEVICE_TYPE: DeviceTypeFilterFieldEnum.P4_SWITCH, + FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.P4, + } + ]), ] diff --git a/src/device/service/drivers/p4/__init__.py b/src/device/service/drivers/p4/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/device/service/drivers/p4/p4_driver.py b/src/device/service/drivers/p4/p4_driver.py new file mode 100644 index 0000000000000000000000000000000000000000..3d3abf236f016608ef93e3d63ab04ac86830da7d --- /dev/null +++ b/src/device/service/drivers/p4/p4_driver.py @@ -0,0 +1,246 @@ +""" +P4 driver plugin for the TeraFlow SDN controller. +""" + +import logging +import threading +from typing import Any, Iterator, List, Optional, Tuple, Union +from .p4_util import P4RuntimeClient,\ + P4_ATTR_DEV_ID, P4_ATTR_DEV_NAME, P4_ATTR_DEV_VENDOR,\ + P4_ATTR_DEV_HW_VER, P4_ATTR_DEV_SW_VER, P4_ATTR_DEV_PIPECONF,\ + P4_VAL_DEF_VENDOR, P4_VAL_DEF_HW_VER, P4_VAL_DEF_SW_VER, P4_VAL_DEF_PIPECONF + +try: + from _Driver import _Driver +except ImportError: + from device.service.driver_api._Driver import _Driver + +LOGGER = logging.getLogger(__name__) + + +class P4Driver(_Driver): + """ + P4Driver class inherits the abstract _Driver class to support P4 devices. + + Attributes + ---------- + address : str + IP address of the P4Runtime server running on the P4 device + port : int + transport port number of the P4Runtime server running on the P4 device + **settings : map + id : int + P4 device ID (Mandatory) + name : str + P4 device name (Optional) + vendor : str + P4 device vendor (Optional) + hw_ver : str + Hardware version of the P4 device (Optional) + sw_ver : str + Software version of the P4 device (Optional) + pipeconf : str + P4 device table configuration (Optional) + """ + + def __init__(self, address: str, port: int, **settings) -> None: + # pylint: disable=super-init-not-called + self.__client = None + self.__address = address + self.__port = int(port) + self.__settings = settings + + try: + self.__dev_id = self.__settings.get(P4_ATTR_DEV_ID) + except Exception as ex: + LOGGER.error('P4 device ID is a mandatory setting') + raise Exception from ex + + if P4_ATTR_DEV_NAME in self.__settings: + self.__dev_name = self.__settings.get(P4_ATTR_DEV_NAME) + else: + self.__dev_name = str(self.__dev_id) + LOGGER.warning( + 'No device name is provided. Setting default name: %s', + self.__dev_name) + + if P4_ATTR_DEV_VENDOR in self.__settings: + self.__dev_vendor = self.__settings.get(P4_ATTR_DEV_VENDOR) + else: + self.__dev_vendor = P4_VAL_DEF_VENDOR + LOGGER.warning( + 'No vendor is provided. Setting default vendor: %s', + self.__dev_vendor) + + if P4_ATTR_DEV_HW_VER in self.__settings: + self.__dev_hw_version = self.__settings.get(P4_ATTR_DEV_HW_VER) + else: + self.__dev_hw_version = P4_VAL_DEF_HW_VER + LOGGER.warning( + 'No HW version is provided. Setting default HW version: %s', + self.__dev_hw_version) + + if P4_ATTR_DEV_SW_VER in self.__settings: + self.__dev_sw_version = self.__settings.get(P4_ATTR_DEV_SW_VER) + else: + self.__dev_sw_version = P4_VAL_DEF_SW_VER + LOGGER.warning( + 'No SW version is provided. Setting default SW version: %s', + self.__dev_sw_version) + + if P4_ATTR_DEV_PIPECONF in self.__settings: + self.__dev_pipeconf = self.__settings.get(P4_ATTR_DEV_PIPECONF) + else: + self.__dev_pipeconf = P4_VAL_DEF_PIPECONF + LOGGER.warning( + 'No P4 pipeconf is provided. Setting default P4 pipeconf: %s', + self.__dev_pipeconf) + + self.__lock = threading.Lock() + self.__started = threading.Event() + self.__terminate = threading.Event() + + LOGGER.info('Initializing P4 device at %s:%d with settings:', + self.__address, self.__port) + + for key, value in settings.items(): + LOGGER.info('\t%8s = %s', key, value) + + def Connect(self) -> bool: + """ + Establishes a connection between the P4 device driver and a P4 device. + + :return: boolean connection status. + """ + LOGGER.info( + 'Connecting to P4 device %s:%d ...', + self.__address, self.__port) + + with self.__lock: + # Skip if already connected + if self.__started.is_set(): + return True + + # Instantiate a gRPC channel with the P4 device + grpc_address = f'{self.__address}:{self.__port}' + election_id = (1, 0) + self.__client = P4RuntimeClient( + self.__dev_id, grpc_address, election_id) + LOGGER.info('\tConnected!') + self.__started.set() + + return True + + def Disconnect(self) -> bool: + """ + Terminates the connection between the P4 device driver and a P4 device. + + :return: boolean disconnection status. + """ + LOGGER.info( + 'Disconnecting from P4 device %s:%d ...', + self.__address, self.__port) + + # If not started, assume it is already disconnected + if not self.__started.is_set(): + return True + + # gRPC client must already be instantiated + assert self.__client + + # Trigger termination of loops and processes + self.__terminate.set() + + # Trigger connection tear down with the P4Runtime server + self.__client.tear_down() + self.__client = None + + LOGGER.info('\tDisconnected!') + + return True + + def GetInitialConfig(self) -> List[Tuple[str, Any]]: + """ + Retrieves the initial configuration of a P4 device. + + :return: list of initial configuration items. + """ + LOGGER.info('P4 GetInitialConfig()') + return [] + + def GetConfig(self, resource_keys : List[str] = [])\ + -> List[Tuple[str, Union[Any, None, Exception]]]: + """ + Retrieves the current configuration of a P4 device. + + :param resource_keys: configuration parameters to retrieve. + :return: list of values associated with the requested resource keys. + """ + + LOGGER.info('P4 GetConfig()') + return [] + + def SetConfig(self, resources : List[Tuple[str, Any]])\ + -> List[Union[bool, Exception]]: + """ + Submits a new configuration to a P4 device. + + :param resources: configuration parameters to set. + :return: list of results for resource key changes requested. + """ + LOGGER.info('P4 SetConfig()') + return [] + + def DeleteConfig(self, resources : List[Tuple[str, Any]])\ + -> List[Union[bool, Exception]]: + """ + Revokes P4 device configuration. + + :param resources: list of tuples with resource keys to be deleted. + :return: list of results for resource key deletions requested. + """ + LOGGER.info('P4 DeleteConfig()') + return [] + + def GetResource(self, endpoint_uuid : str) -> Optional[str]: + """ + Retrieves a certain resource from a P4 device. + + :param endpoint_uuid: target endpoint UUID. + :return: The path of the endpoint or None if not found. + """ + LOGGER.info('P4 GetResource()') + return "" + + def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]: + """ + Retrieves the state of a P4 device. + + :param blocking: if non-blocking, the driver terminates the loop and + returns. + :return: sequences of state sample. + """ + LOGGER.info('P4 GetState()') + return [] + + def SubscribeState(self, subscriptions : List[Tuple[str, float, float]])\ + -> List[Union[bool, Exception]]: + """ + Subscribes to certain state information. + + :param subscriptions: list of tuples with resources to be subscribed. + :return: list of results for resource subscriptions requested. + """ + LOGGER.info('P4 SubscribeState()') + return [] + + def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]])\ + -> List[Union[bool, Exception]]: + """ + Unsubscribes from certain state information. + + :param subscriptions: list of tuples with resources to be unsubscribed. + :return: list of results for resource un-subscriptions requested. + """ + LOGGER.info('P4 UnsubscribeState()') + return [] diff --git a/src/device/service/drivers/p4/p4_util.py b/src/device/service/drivers/p4/p4_util.py new file mode 100644 index 0000000000000000000000000000000000000000..8d6f258ddb0285a5fa4ee5bd11811d02df380345 --- /dev/null +++ b/src/device/service/drivers/p4/p4_util.py @@ -0,0 +1,257 @@ +""" +P4 driver utilities. +""" + +import logging +import queue +import sys +import threading +from functools import wraps +import grpc +import google.protobuf.text_format +from google.rpc import code_pb2 + +from p4.v1 import p4runtime_pb2 +from p4.v1 import p4runtime_pb2_grpc + +P4_ATTR_DEV_ID = 'id' +P4_ATTR_DEV_NAME = 'name' +P4_ATTR_DEV_VENDOR = 'vendor' +P4_ATTR_DEV_HW_VER = 'hw_ver' +P4_ATTR_DEV_SW_VER = 'sw_ver' +P4_ATTR_DEV_PIPECONF = 'pipeconf' + +P4_VAL_DEF_VENDOR = 'Unknown' +P4_VAL_DEF_HW_VER = 'BMv2 simple_switch' +P4_VAL_DEF_SW_VER = 'Stratum' +P4_VAL_DEF_PIPECONF = 'org.onosproject.pipelines.fabric' + +STREAM_ATTR_ARBITRATION = 'arbitration' +STREAM_ATTR_PACKET = 'packet' +STREAM_ATTR_DIGEST = 'digest' +STREAM_ATTR_UNKNOWN = 'unknown' + +LOGGER = logging.getLogger(__name__) + + +class P4RuntimeException(Exception): + """ + P4Runtime exception handler. + + Attributes + ---------- + grpc_error : object + gRPC error + """ + + def __init__(self, grpc_error): + super().__init__() + self.grpc_error = grpc_error + + def __str__(self): + return str('P4Runtime RPC error (%s): %s', + self.grpc_error.code().name(), self.grpc_error.details()) + + +def parse_p4runtime_error(fun): + """ + Parse P4Runtime error. + + :param fun: function + :return: parsed error + """ + @wraps(fun) + def handle(*args, **kwargs): + try: + return fun(*args, **kwargs) + except grpc.RpcError as rpc_ex: + raise P4RuntimeException(rpc_ex) from None + except Exception as ex: + raise Exception(ex) from None + return handle + + +class P4RuntimeClient: + """ + P4Runtime client. + + Attributes + ---------- + device_id : int + P4 device ID + grpc_address : str + IP address and port + election_id : tuple + Mastership election ID + role_name : str + Role name (optional) + """ + def __init__(self, device_id, grpc_address, election_id, role_name=None): + self.device_id = device_id + self.election_id = election_id + self.role_name = role_name + self.stream_in_q = None + self.stream_out_q = None + self.stream = None + self.stream_recv_thread = None + LOGGER.debug( + 'Connecting to device %d at %s', device_id, grpc_address) + self.channel = grpc.insecure_channel(grpc_address) + self.stub = p4runtime_pb2_grpc.P4RuntimeStub(self.channel) + try: + self.set_up_stream() + except P4RuntimeException: + LOGGER.critical('Failed to connect to P4Runtime server') + sys.exit(1) + + def set_up_stream(self): + """ + Set up a gRPC stream. + """ + self.stream_out_q = queue.Queue() + # queues for different messages + self.stream_in_q = { + STREAM_ATTR_ARBITRATION: queue.Queue(), + STREAM_ATTR_PACKET: queue.Queue(), + STREAM_ATTR_DIGEST: queue.Queue(), + STREAM_ATTR_UNKNOWN: queue.Queue(), + } + + def stream_req_iterator(): + while True: + st_p = self.stream_out_q.get() + if st_p is None: + break + yield st_p + + def stream_recv_wrapper(stream): + @parse_p4runtime_error + def stream_recv(): + for st_p in stream: + if st_p.HasField(STREAM_ATTR_ARBITRATION): + self.stream_in_q[STREAM_ATTR_ARBITRATION].put(st_p) + elif st_p.HasField(STREAM_ATTR_PACKET): + self.stream_in_q[STREAM_ATTR_PACKET].put(st_p) + elif st_p.HasField(STREAM_ATTR_DIGEST): + self.stream_in_q[STREAM_ATTR_DIGEST].put(st_p) + else: + self.stream_in_q[STREAM_ATTR_UNKNOWN].put(st_p) + try: + stream_recv() + except P4RuntimeException as ex: + LOGGER.critical('StreamChannel error, closing stream') + LOGGER.critical(ex) + for k in self.stream_in_q: + self.stream_in_q[k].put(None) + self.stream = self.stub.StreamChannel(stream_req_iterator()) + self.stream_recv_thread = threading.Thread( + target=stream_recv_wrapper, args=(self.stream,)) + self.stream_recv_thread.start() + self.handshake() + + def handshake(self): + """ + Handshake with gRPC server. + """ + + req = p4runtime_pb2.StreamMessageRequest() + arbitration = req.arbitration + arbitration.device_id = self.device_id + election_id = arbitration.election_id + election_id.high = self.election_id[0] + election_id.low = self.election_id[1] + if self.role_name is not None: + arbitration.role.name = self.role_name + self.stream_out_q.put(req) + + rep = self.get_stream_packet(STREAM_ATTR_ARBITRATION, timeout=2) + if rep is None: + LOGGER.critical('Failed to establish session with server') + sys.exit(1) + is_primary = (rep.arbitration.status.code == code_pb2.OK) + LOGGER.debug('Session established, client is %s', + 'primary' if is_primary else 'backup') + if not is_primary: + LOGGER.warning( + 'You are not the primary client, ' + 'you only have read access to the server') + + def get_stream_packet(self, type_, timeout=1): + """ + Get a new message from the stream. + + :param type_: stream type. + :param timeout: time to wait. + :return: message or None + """ + if type_ not in self.stream_in_q: + LOGGER.critical('Unknown stream type %s', type_) + return None + try: + msg = self.stream_in_q[type_].get(timeout=timeout) + return msg + except queue.Empty: # timeout expired + return None + + @parse_p4runtime_error + def get_p4info(self): + """ + Retrieve P4Info content. + + :return: P4Info object. + """ + + LOGGER.debug('Retrieving P4Info file') + req = p4runtime_pb2.GetForwardingPipelineConfigRequest() + req.device_id = self.device_id + req.response_type =\ + p4runtime_pb2.GetForwardingPipelineConfigRequest.P4INFO_AND_COOKIE + rep = self.stub.GetForwardingPipelineConfig(req) + return rep.config.p4info + + @parse_p4runtime_error + def set_fwd_pipe_config(self, p4info_path, bin_path): + """ + Configure the pipeline. + + :param p4info_path: path to the P4Info file + :param bin_path: path to the binary file + :return: + """ + + LOGGER.debug('Setting forwarding pipeline config') + req = p4runtime_pb2.SetForwardingPipelineConfigRequest() + req.device_id = self.device_id + if self.role_name is not None: + req.role = self.role_name + election_id = req.election_id + election_id.high = self.election_id[0] + election_id.low = self.election_id[1] + req.action =\ + p4runtime_pb2.SetForwardingPipelineConfigRequest.VERIFY_AND_COMMIT + with open(p4info_path, 'r', encoding='utf8') as f_1: + with open(bin_path, 'rb', encoding='utf8') as f_2: + try: + google.protobuf.text_format.Merge( + f_1.read(), req.config.p4info) + except google.protobuf.text_format.ParseError: + LOGGER.error('Error when parsing P4Info') + raise + req.config.p4_device_config = f_2.read() + return self.stub.SetForwardingPipelineConfig(req) + + def tear_down(self): + """ + Tear connection with the gRPC server down. + """ + + if self.stream_out_q: + LOGGER.debug('Cleaning up stream') + self.stream_out_q.put(None) + if self.stream_in_q: + for k in self.stream_in_q: + self.stream_in_q[k].put(None) + if self.stream_recv_thread: + self.stream_recv_thread.join() + self.channel.close() + del self.channel diff --git a/src/device/tests/device_p4.py b/src/device/tests/device_p4.py new file mode 100644 index 0000000000000000000000000000000000000000..55c1025861ad1a00fff6b692eaeaaadcd639a519 --- /dev/null +++ b/src/device/tests/device_p4.py @@ -0,0 +1,49 @@ +""" +P4 device example configuration. +""" + +from copy import deepcopy +try: + from .context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum +except ImportError: + from device.proto.context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum +from .Tools import config_rule_set + +DEVICE_P4_ID = 0 +DEVICE_P4_NAME = 'device:leaf1' +DEVICE_P4_TYPE = 'p4-switch' +DEVICE_P4_ADDRESS = '127.0.0.1' +DEVICE_P4_PORT = '50101' +DEVICE_P4_DRIVERS = [DeviceDriverEnum.DEVICEDRIVER_P4] +DEVICE_P4_VENDOR = 'Open Networking Foundation' +DEVICE_P4_HW_VER = 'BMv2 simple_switch' +DEVICE_P4_SW_VER = 'Stratum' +DEVICE_P4_PIPECONF = 'org.onosproject.pipelines.fabric' +DEVICE_P4_WORKERS = 2 +DEVICE_P4_GRACE_PERIOD = 60 + +DEVICE_P4_UUID = {'device_uuid': {'uuid': DEVICE_P4_NAME}} +DEVICE_P4 = { + 'device_id': deepcopy(DEVICE_P4_UUID), + 'device_type': DEVICE_P4_TYPE, + 'device_config': {'config_rules': []}, + 'device_operational_status': DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED, + 'device_drivers': DEVICE_P4_DRIVERS, + 'device_endpoints': [], +} + +DEVICE_P4_CONNECT_RULES = [ + config_rule_set('_connect/address', DEVICE_P4_ADDRESS), + config_rule_set('_connect/port', DEVICE_P4_PORT), + config_rule_set('_connect/settings', { + 'id': int(DEVICE_P4_ID), + 'name': DEVICE_P4_NAME, + 'hw-ver': DEVICE_P4_HW_VER, + 'sw-ver': DEVICE_P4_SW_VER, + 'pipeconf': DEVICE_P4_PIPECONF + }), +] + +DEVICE_P4_CONFIG_RULES = [ + config_rule_set('key1', 'value1'), +] diff --git a/src/device/tests/mock_p4runtime_service.py b/src/device/tests/mock_p4runtime_service.py new file mode 100644 index 0000000000000000000000000000000000000000..f1cf673e34a1dd604a17589ad39c93f0ca1375b8 --- /dev/null +++ b/src/device/tests/mock_p4runtime_service.py @@ -0,0 +1,66 @@ +""" +A mock P4Runtime server. +""" + +import logging +from concurrent import futures +import grpc +from p4.v1 import p4runtime_pb2_grpc + +from .device_p4 import( + DEVICE_P4_ADDRESS, DEVICE_P4_PORT, + DEVICE_P4_WORKERS, DEVICE_P4_GRACE_PERIOD) +from .mock_p4runtime_servicer_impl import MockP4RuntimeServicerImpl + +LOGGER = logging.getLogger(__name__) + + +class MockP4RuntimeService: + """ + P4Runtime server for testing purposes. + """ + + def __init__( + self, address=DEVICE_P4_ADDRESS, port=DEVICE_P4_PORT, + max_workers=DEVICE_P4_WORKERS, + grace_period=DEVICE_P4_GRACE_PERIOD): + self.address = address + self.port = port + self.endpoint = f'{self.address}:{self.port}' + self.max_workers = max_workers + self.grace_period = grace_period + self.server = None + self.servicer = None + + def start(self): + """ + Start the P4Runtime server. + """ + + LOGGER.info( + 'Starting P4Runtime service on %s with max_workers: %s', + str(self.endpoint), str(self.max_workers)) + + self.server = grpc.server( + futures.ThreadPoolExecutor(max_workers=self.max_workers)) + + self.servicer = MockP4RuntimeServicerImpl() + p4runtime_pb2_grpc.add_P4RuntimeServicer_to_server( + self.servicer, self.server) + + _ = self.server.add_insecure_port(self.endpoint) + LOGGER.info('Listening on %s...', str(self.endpoint)) + + self.server.start() + LOGGER.debug('P4Runtime service started') + + def stop(self): + """ + Stop the P4Runtime server. + """ + + LOGGER.debug( + 'Stopping P4Runtime service (grace period %d seconds...', + self.grace_period) + self.server.stop(self.grace_period) + LOGGER.debug('P4Runtime service stopped') diff --git a/src/device/tests/mock_p4runtime_servicer_impl.py b/src/device/tests/mock_p4runtime_servicer_impl.py new file mode 100644 index 0000000000000000000000000000000000000000..f29f05ba2d504794a085c4499ebfefba08604811 --- /dev/null +++ b/src/device/tests/mock_p4runtime_servicer_impl.py @@ -0,0 +1,55 @@ +""" +A mock P4Runtime service implementation. +""" + +import queue +from google.rpc import code_pb2 +from p4.v1 import p4runtime_pb2, p4runtime_pb2_grpc +from p4.config.v1 import p4info_pb2 + +try: + from p4_util import STREAM_ATTR_ARBITRATION, STREAM_ATTR_PACKET +except ImportError: + from device.service.drivers.p4.p4_util import STREAM_ATTR_ARBITRATION,\ + STREAM_ATTR_PACKET + +class MockP4RuntimeServicerImpl(p4runtime_pb2_grpc.P4RuntimeServicer): + """ + A P4Runtime service implementation for testing purposes. + """ + + def __init__(self): + self.p4info = p4info_pb2.P4Info() + self.p4runtime_api_version = "1.3.0" + self.stored_packet_out = queue.Queue() + + def GetForwardingPipelineConfig(self, request, context): + rep = p4runtime_pb2.GetForwardingPipelineConfigResponse() + if self.p4info is not None: + rep.config.p4info.CopyFrom(self.p4info) + return rep + + def SetForwardingPipelineConfig(self, request, context): + self.p4info.CopyFrom(request.config.p4info) + return p4runtime_pb2.SetForwardingPipelineConfigResponse() + + def Write(self, request, context): + return p4runtime_pb2.WriteResponse() + + def Read(self, request, context): + yield p4runtime_pb2.ReadResponse() + + def StreamChannel(self, request_iterator, context): + for req in request_iterator: + if req.HasField(STREAM_ATTR_ARBITRATION): + rep = p4runtime_pb2.StreamMessageResponse() + rep.arbitration.CopyFrom(req.arbitration) + rep.arbitration.status.code = code_pb2.OK + yield rep + elif req.HasField(STREAM_ATTR_PACKET): + self.stored_packet_out.put(req) + + def Capabilities(self, request, context): + rep = p4runtime_pb2.CapabilitiesResponse() + rep.p4runtime_api_version = self.p4runtime_api_version + return rep diff --git a/src/device/tests/test_unit_p4.py b/src/device/tests/test_unit_p4.py new file mode 100644 index 0000000000000000000000000000000000000000..8f2f89f05f7cbee1e5263464a4c59e77ccde2092 --- /dev/null +++ b/src/device/tests/test_unit_p4.py @@ -0,0 +1,83 @@ +import pytest +from device.service.drivers.p4.p4_driver import P4Driver +from .device_p4 import( + DEVICE_P4_ADDRESS, DEVICE_P4_PORT, DEVICE_P4_ID, DEVICE_P4_NAME, + DEVICE_P4_VENDOR, DEVICE_P4_HW_VER, DEVICE_P4_SW_VER, + DEVICE_P4_PIPECONF, DEVICE_P4_WORKERS, DEVICE_P4_GRACE_PERIOD) +from .mock_p4runtime_service import MockP4RuntimeService + + +@pytest.fixture(scope='session') +def p4runtime_service(): + _service = MockP4RuntimeService( + address=DEVICE_P4_ADDRESS, port=DEVICE_P4_PORT, + max_workers=DEVICE_P4_WORKERS, + grace_period=DEVICE_P4_GRACE_PERIOD) + _service.start() + yield _service + _service.stop() + + +@pytest.fixture(scope='session') +def device_driverapi_p4(): + _driver = P4Driver( + address=DEVICE_P4_ADDRESS, + port=DEVICE_P4_PORT, + id=DEVICE_P4_ID, + name=DEVICE_P4_NAME, + vendor=DEVICE_P4_VENDOR, + hw_ver=DEVICE_P4_HW_VER, + sw_ver=DEVICE_P4_SW_VER, + pipeconf=DEVICE_P4_PIPECONF) + _driver.Connect() + yield _driver + _driver.Disconnect() + + +def test_device_driverapi_p4_setconfig( + p4runtime_service: MockP4RuntimeService, + device_driverapi_p4: P4Driver): # pylint: disable=redefined-outer-name + device_driverapi_p4.SetConfig([]) + return + + +def test_device_driverapi_p4_getconfig( + p4runtime_service: MockP4RuntimeService, + device_driverapi_p4: P4Driver): # pylint: disable=redefined-outer-name + device_driverapi_p4.GetConfig() + return + + +def test_device_driverapi_p4_getresource( + p4runtime_service: MockP4RuntimeService, + device_driverapi_p4: P4Driver): # pylint: disable=redefined-outer-name + device_driverapi_p4.GetResource("") + return + + +def test_device_driverapi_p4_getstate( + p4runtime_service: MockP4RuntimeService, + device_driverapi_p4: P4Driver): # pylint: disable=redefined-outer-name + device_driverapi_p4.GetState() + return + + +def test_device_driverapi_p4_deleteconfig( + p4runtime_service: MockP4RuntimeService, + device_driverapi_p4: P4Driver): # pylint: disable=redefined-outer-name + device_driverapi_p4.DeleteConfig([]) + return + + +def test_device_driverapi_p4_subscribe_state( + p4runtime_service: MockP4RuntimeService, + device_driverapi_p4: P4Driver): # pylint: disable=redefined-outer-name + device_driverapi_p4.SubscribeState([]) + return + + +def test_device_driverapi_p4_unsubscribe_state( + p4runtime_service: MockP4RuntimeService, + device_driverapi_p4: P4Driver): # pylint: disable=redefined-outer-name + device_driverapi_p4.UnsubscribeState([]) + return diff --git a/src/device/tests/test_unitary.py b/src/device/tests/test_unitary.py index f5c43309e1682cd12925d853793d8dd0982e245b..7eaee6e9fe232998e9f7a56aaaaf1c1ed136420a 100644 --- a/src/device/tests/test_unitary.py +++ b/src/device/tests/test_unitary.py @@ -65,6 +65,16 @@ except ImportError: #ENABLE_OPENCONFIG = False #ENABLE_TAPI = False +from .mock_p4runtime_service import MockP4RuntimeService +try: + from .device_p4 import( + DEVICE_P4, DEVICE_P4_ID, DEVICE_P4_UUID, DEVICE_P4_NAME, + DEVICE_P4_ADDRESS, DEVICE_P4_PORT, DEVICE_P4_WORKERS, + DEVICE_P4_GRACE_PERIOD, DEVICE_P4_CONNECT_RULES, + DEVICE_P4_CONFIG_RULES) +except ImportError: + raise ImportError("Test configuration for P4 devices not found") + LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) @@ -147,6 +157,16 @@ def device_client(device_service : DeviceService): # pylint: disable=redefined-o yield _client _client.close() +@pytest.fixture(scope='session') +def p4runtime_service(): + _service = MockP4RuntimeService( + address=DEVICE_P4_ADDRESS, port=DEVICE_P4_PORT, + max_workers=DEVICE_P4_WORKERS, + grace_period=DEVICE_P4_GRACE_PERIOD) + _service.start() + yield _service + _service.stop() + def grpc_message_to_json_string(message): return str(MessageToDict( message, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False)) @@ -161,7 +181,7 @@ def test_prepare_environment( context_client.SetTopology(Topology(**TOPOLOGY)) -# ----- Test Device Driver Emulated ------------------------------------------------------------------------------------ +# ----- Test Device Driver Emulated -------------------------------------------- # Device Driver Emulated tests are used to validate Driver API as well as Emulated Device Driver. Note that other # Drivers might support a different set of resource paths, and attributes/values per resource; however, they must # implement the Driver API. @@ -471,7 +491,7 @@ def test_device_emulated_delete( assert driver is None -# ----- Test Device Driver OpenConfig ---------------------------------------------------------------------------------- +# ----- Test Device Driver OpenConfig ------------------------------------------ def test_device_openconfig_add_error_cases( context_client : ContextClient, # pylint: disable=redefined-outer-name @@ -603,7 +623,7 @@ def test_device_openconfig_delete( assert driver is None -# ----- Test Device Driver TAPI ---------------------------------------------------------------------------------- +# ----- Test Device Driver TAPI ------------------------------------------------ def test_device_tapi_add_error_cases( device_client : DeviceClient): # pylint: disable=redefined-outer-name @@ -728,3 +748,81 @@ def test_device_tapi_delete( device_client.DeleteDevice(DeviceId(**DEVICE_TAPI_ID)) driver : _Driver = device_service.driver_instance_cache.get(DEVICE_TAPI_UUID, {}) assert driver is None + + +# ----- Test Device Driver P4 -------------------------------------------------- + +def test_device_p4_add_error_cases( + context_client: ContextClient, # pylint: disable=redefined-outer-name + device_client: DeviceClient, # pylint: disable=redefined-outer-name + device_service: DeviceService): # pylint: disable=redefined-outer-name + + with pytest.raises(grpc.RpcError) as e: + device_p4_with_extra_rules = copy.deepcopy(DEVICE_P4) + device_p4_with_extra_rules['device_config']['config_rules'].extend( + DEVICE_P4_CONNECT_RULES) + device_p4_with_extra_rules['device_config']['config_rules'].extend( + DEVICE_P4_CONFIG_RULES) + device_client.AddDevice(Device(**device_p4_with_extra_rules)) + assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT + msg_head = 'device.device_config.config_rules([' + msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\ + 'with "_connect/" tag. Others should be configured after adding the device.' + except_msg = str(e.value.details()) + assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) + + +def test_device_p4_add_correct( + context_client: ContextClient, # pylint: disable=redefined-outer-name + device_client: DeviceClient, # pylint: disable=redefined-outer-name + device_service: DeviceService, # pylint: disable=redefined-outer-name + p4runtime_service: MockP4RuntimeService): + + device_p4_with_connect_rules = copy.deepcopy(DEVICE_P4) + device_p4_with_connect_rules['device_config']['config_rules'].extend( + DEVICE_P4_CONNECT_RULES) + device_client.AddDevice(Device(**device_p4_with_connect_rules)) + driver : _Driver = device_service.driver_instance_cache.get(DEVICE_P4_NAME) + assert driver is not None + + +def test_device_p4_get( + context_client: ContextClient, # pylint: disable=redefined-outer-name + device_client: DeviceClient, # pylint: disable=redefined-outer-name + device_service: DeviceService, # pylint: disable=redefined-outer-name + p4runtime_service: MockP4RuntimeService): + + initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_P4_UUID)) + LOGGER.info('initial_config = {:s}'.format( + grpc_message_to_json_string(initial_config))) + + device_data = context_client.GetDevice(DeviceId(**DEVICE_P4_UUID)) + LOGGER.info('device_data = {:s}'.format( + grpc_message_to_json_string(device_data))) + + +def test_device_p4_configure( + context_client: ContextClient, # pylint: disable=redefined-outer-name + device_client: DeviceClient, # pylint: disable=redefined-outer-name + device_service: DeviceService, # pylint: disable=redefined-outer-name + p4runtime_service: MockP4RuntimeService): + pytest.skip("Skipping test for unimplemented method") + + +def test_device_p4_deconfigure( + context_client: ContextClient, # pylint: disable=redefined-outer-name + device_client: DeviceClient, # pylint: disable=redefined-outer-name + device_service: DeviceService, # pylint: disable=redefined-outer-name + p4runtime_service: MockP4RuntimeService): + pytest.skip("Skipping test for unimplemented method") + + +def test_device_p4_delete( + context_client: ContextClient, # pylint: disable=redefined-outer-name + device_client: DeviceClient, # pylint: disable=redefined-outer-name + device_service: DeviceService, # pylint: disable=redefined-outer-name + p4runtime_service: MockP4RuntimeService): + + device_client.DeleteDevice(DeviceId(**DEVICE_P4_UUID)) + driver : _Driver = device_service.driver_instance_cache.get(DEVICE_P4_NAME) + assert driver is None diff --git a/src/service/requirements.in b/src/service/requirements.in index 55d2eb25b8619d86d0a80f3682ba9c9524b8d38d..72e54cd195bbfb68c93bec6ebde7247392e6c674 100644 --- a/src/service/requirements.in +++ b/src/service/requirements.in @@ -14,3 +14,4 @@ pytz redis requests xmltodict +p4runtime==1.3.0