Skip to content
Snippets Groups Projects
Commit 5ac25d5f authored by Georgios Katsikas's avatar Georgios Katsikas Committed by Lluis Gifre Renom
Browse files

feat: basic P4 device driver

parent 59398a94
Branches
Tags
1 merge request!54Release 2.0.0
......@@ -13,3 +13,4 @@ pytz
redis
requests
xmltodict
p4runtime==1.3.0
......@@ -8,6 +8,7 @@ class DeviceTypeFilterFieldEnum(Enum):
OPTICAL_LINE_SYSTEM = 'optical-line-system'
PACKET_ROUTER = 'packet-router'
PACKET_SWITCH = 'packet-switch'
P4_SWITCH = 'p4-switch'
class FilterFieldEnum(Enum):
DEVICE_TYPE = 'device_type'
......
......@@ -2,6 +2,7 @@ from ..driver_api.FilterFields import FilterFieldEnum, DeviceTypeFilterFieldEnum
from .emulated.EmulatedDriver import EmulatedDriver
from .openconfig.OpenConfigDriver import OpenConfigDriver
from .transport_api.TransportApiDriver import TransportApiDriver
from .p4.p4_driver import P4Driver
DRIVERS = [
(EmulatedDriver, [
......@@ -22,4 +23,10 @@ DRIVERS = [
FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.TRANSPORT_API,
}
]),
(P4Driver, [
{
FilterFieldEnum.DEVICE_TYPE: DeviceTypeFilterFieldEnum.P4_SWITCH,
FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.P4,
}
]),
]
"""
P4 driver plugin for the TeraFlow SDN controller.
"""
import logging
import threading
from typing import Any, Iterator, List, Optional, Tuple, Union
from .p4_util import P4RuntimeClient,\
P4_ATTR_DEV_ID, P4_ATTR_DEV_NAME, P4_ATTR_DEV_VENDOR,\
P4_ATTR_DEV_HW_VER, P4_ATTR_DEV_SW_VER, P4_ATTR_DEV_PIPECONF,\
P4_VAL_DEF_VENDOR, P4_VAL_DEF_HW_VER, P4_VAL_DEF_SW_VER, P4_VAL_DEF_PIPECONF
try:
from _Driver import _Driver
except ImportError:
from device.service.driver_api._Driver import _Driver
LOGGER = logging.getLogger(__name__)
class P4Driver(_Driver):
"""
P4Driver class inherits the abstract _Driver class to support P4 devices.
Attributes
----------
address : str
IP address of the P4Runtime server running on the P4 device
port : int
transport port number of the P4Runtime server running on the P4 device
**settings : map
id : int
P4 device ID (Mandatory)
name : str
P4 device name (Optional)
vendor : str
P4 device vendor (Optional)
hw_ver : str
Hardware version of the P4 device (Optional)
sw_ver : str
Software version of the P4 device (Optional)
pipeconf : str
P4 device table configuration (Optional)
"""
def __init__(self, address: str, port: int, **settings) -> None:
# pylint: disable=super-init-not-called
self.__client = None
self.__address = address
self.__port = int(port)
self.__settings = settings
try:
self.__dev_id = self.__settings.get(P4_ATTR_DEV_ID)
except Exception as ex:
LOGGER.error('P4 device ID is a mandatory setting')
raise Exception from ex
if P4_ATTR_DEV_NAME in self.__settings:
self.__dev_name = self.__settings.get(P4_ATTR_DEV_NAME)
else:
self.__dev_name = str(self.__dev_id)
LOGGER.warning(
'No device name is provided. Setting default name: %s',
self.__dev_name)
if P4_ATTR_DEV_VENDOR in self.__settings:
self.__dev_vendor = self.__settings.get(P4_ATTR_DEV_VENDOR)
else:
self.__dev_vendor = P4_VAL_DEF_VENDOR
LOGGER.warning(
'No vendor is provided. Setting default vendor: %s',
self.__dev_vendor)
if P4_ATTR_DEV_HW_VER in self.__settings:
self.__dev_hw_version = self.__settings.get(P4_ATTR_DEV_HW_VER)
else:
self.__dev_hw_version = P4_VAL_DEF_HW_VER
LOGGER.warning(
'No HW version is provided. Setting default HW version: %s',
self.__dev_hw_version)
if P4_ATTR_DEV_SW_VER in self.__settings:
self.__dev_sw_version = self.__settings.get(P4_ATTR_DEV_SW_VER)
else:
self.__dev_sw_version = P4_VAL_DEF_SW_VER
LOGGER.warning(
'No SW version is provided. Setting default SW version: %s',
self.__dev_sw_version)
if P4_ATTR_DEV_PIPECONF in self.__settings:
self.__dev_pipeconf = self.__settings.get(P4_ATTR_DEV_PIPECONF)
else:
self.__dev_pipeconf = P4_VAL_DEF_PIPECONF
LOGGER.warning(
'No P4 pipeconf is provided. Setting default P4 pipeconf: %s',
self.__dev_pipeconf)
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
LOGGER.info('Initializing P4 device at %s:%d with settings:',
self.__address, self.__port)
for key, value in settings.items():
LOGGER.info('\t%8s = %s', key, value)
def Connect(self) -> bool:
"""
Establishes a connection between the P4 device driver and a P4 device.
:return: boolean connection status.
"""
LOGGER.info(
'Connecting to P4 device %s:%d ...',
self.__address, self.__port)
with self.__lock:
# Skip if already connected
if self.__started.is_set():
return True
# Instantiate a gRPC channel with the P4 device
grpc_address = f'{self.__address}:{self.__port}'
election_id = (1, 0)
self.__client = P4RuntimeClient(
self.__dev_id, grpc_address, election_id)
LOGGER.info('\tConnected!')
self.__started.set()
return True
def Disconnect(self) -> bool:
"""
Terminates the connection between the P4 device driver and a P4 device.
:return: boolean disconnection status.
"""
LOGGER.info(
'Disconnecting from P4 device %s:%d ...',
self.__address, self.__port)
# If not started, assume it is already disconnected
if not self.__started.is_set():
return True
# gRPC client must already be instantiated
assert self.__client
# Trigger termination of loops and processes
self.__terminate.set()
# Trigger connection tear down with the P4Runtime server
self.__client.tear_down()
self.__client = None
LOGGER.info('\tDisconnected!')
return True
def GetInitialConfig(self) -> List[Tuple[str, Any]]:
"""
Retrieves the initial configuration of a P4 device.
:return: list of initial configuration items.
"""
LOGGER.info('P4 GetInitialConfig()')
return []
def GetConfig(self, resource_keys : List[str] = [])\
-> List[Tuple[str, Union[Any, None, Exception]]]:
"""
Retrieves the current configuration of a P4 device.
:param resource_keys: configuration parameters to retrieve.
:return: list of values associated with the requested resource keys.
"""
LOGGER.info('P4 GetConfig()')
return []
def SetConfig(self, resources : List[Tuple[str, Any]])\
-> List[Union[bool, Exception]]:
"""
Submits a new configuration to a P4 device.
:param resources: configuration parameters to set.
:return: list of results for resource key changes requested.
"""
LOGGER.info('P4 SetConfig()')
return []
def DeleteConfig(self, resources : List[Tuple[str, Any]])\
-> List[Union[bool, Exception]]:
"""
Revokes P4 device configuration.
:param resources: list of tuples with resource keys to be deleted.
:return: list of results for resource key deletions requested.
"""
LOGGER.info('P4 DeleteConfig()')
return []
def GetResource(self, endpoint_uuid : str) -> Optional[str]:
"""
Retrieves a certain resource from a P4 device.
:param endpoint_uuid: target endpoint UUID.
:return: The path of the endpoint or None if not found.
"""
LOGGER.info('P4 GetResource()')
return ""
def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]:
"""
Retrieves the state of a P4 device.
:param blocking: if non-blocking, the driver terminates the loop and
returns.
:return: sequences of state sample.
"""
LOGGER.info('P4 GetState()')
return []
def SubscribeState(self, subscriptions : List[Tuple[str, float, float]])\
-> List[Union[bool, Exception]]:
"""
Subscribes to certain state information.
:param subscriptions: list of tuples with resources to be subscribed.
:return: list of results for resource subscriptions requested.
"""
LOGGER.info('P4 SubscribeState()')
return []
def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]])\
-> List[Union[bool, Exception]]:
"""
Unsubscribes from certain state information.
:param subscriptions: list of tuples with resources to be unsubscribed.
:return: list of results for resource un-subscriptions requested.
"""
LOGGER.info('P4 UnsubscribeState()')
return []
"""
P4 driver utilities.
"""
import logging
import queue
import sys
import threading
from functools import wraps
import grpc
import google.protobuf.text_format
from google.rpc import code_pb2
from p4.v1 import p4runtime_pb2
from p4.v1 import p4runtime_pb2_grpc
P4_ATTR_DEV_ID = 'id'
P4_ATTR_DEV_NAME = 'name'
P4_ATTR_DEV_VENDOR = 'vendor'
P4_ATTR_DEV_HW_VER = 'hw_ver'
P4_ATTR_DEV_SW_VER = 'sw_ver'
P4_ATTR_DEV_PIPECONF = 'pipeconf'
P4_VAL_DEF_VENDOR = 'Unknown'
P4_VAL_DEF_HW_VER = 'BMv2 simple_switch'
P4_VAL_DEF_SW_VER = 'Stratum'
P4_VAL_DEF_PIPECONF = 'org.onosproject.pipelines.fabric'
STREAM_ATTR_ARBITRATION = 'arbitration'
STREAM_ATTR_PACKET = 'packet'
STREAM_ATTR_DIGEST = 'digest'
STREAM_ATTR_UNKNOWN = 'unknown'
LOGGER = logging.getLogger(__name__)
class P4RuntimeException(Exception):
"""
P4Runtime exception handler.
Attributes
----------
grpc_error : object
gRPC error
"""
def __init__(self, grpc_error):
super().__init__()
self.grpc_error = grpc_error
def __str__(self):
return str('P4Runtime RPC error (%s): %s',
self.grpc_error.code().name(), self.grpc_error.details())
def parse_p4runtime_error(fun):
"""
Parse P4Runtime error.
:param fun: function
:return: parsed error
"""
@wraps(fun)
def handle(*args, **kwargs):
try:
return fun(*args, **kwargs)
except grpc.RpcError as rpc_ex:
raise P4RuntimeException(rpc_ex) from None
except Exception as ex:
raise Exception(ex) from None
return handle
class P4RuntimeClient:
"""
P4Runtime client.
Attributes
----------
device_id : int
P4 device ID
grpc_address : str
IP address and port
election_id : tuple
Mastership election ID
role_name : str
Role name (optional)
"""
def __init__(self, device_id, grpc_address, election_id, role_name=None):
self.device_id = device_id
self.election_id = election_id
self.role_name = role_name
self.stream_in_q = None
self.stream_out_q = None
self.stream = None
self.stream_recv_thread = None
LOGGER.debug(
'Connecting to device %d at %s', device_id, grpc_address)
self.channel = grpc.insecure_channel(grpc_address)
self.stub = p4runtime_pb2_grpc.P4RuntimeStub(self.channel)
try:
self.set_up_stream()
except P4RuntimeException:
LOGGER.critical('Failed to connect to P4Runtime server')
sys.exit(1)
def set_up_stream(self):
"""
Set up a gRPC stream.
"""
self.stream_out_q = queue.Queue()
# queues for different messages
self.stream_in_q = {
STREAM_ATTR_ARBITRATION: queue.Queue(),
STREAM_ATTR_PACKET: queue.Queue(),
STREAM_ATTR_DIGEST: queue.Queue(),
STREAM_ATTR_UNKNOWN: queue.Queue(),
}
def stream_req_iterator():
while True:
st_p = self.stream_out_q.get()
if st_p is None:
break
yield st_p
def stream_recv_wrapper(stream):
@parse_p4runtime_error
def stream_recv():
for st_p in stream:
if st_p.HasField(STREAM_ATTR_ARBITRATION):
self.stream_in_q[STREAM_ATTR_ARBITRATION].put(st_p)
elif st_p.HasField(STREAM_ATTR_PACKET):
self.stream_in_q[STREAM_ATTR_PACKET].put(st_p)
elif st_p.HasField(STREAM_ATTR_DIGEST):
self.stream_in_q[STREAM_ATTR_DIGEST].put(st_p)
else:
self.stream_in_q[STREAM_ATTR_UNKNOWN].put(st_p)
try:
stream_recv()
except P4RuntimeException as ex:
LOGGER.critical('StreamChannel error, closing stream')
LOGGER.critical(ex)
for k in self.stream_in_q:
self.stream_in_q[k].put(None)
self.stream = self.stub.StreamChannel(stream_req_iterator())
self.stream_recv_thread = threading.Thread(
target=stream_recv_wrapper, args=(self.stream,))
self.stream_recv_thread.start()
self.handshake()
def handshake(self):
"""
Handshake with gRPC server.
"""
req = p4runtime_pb2.StreamMessageRequest()
arbitration = req.arbitration
arbitration.device_id = self.device_id
election_id = arbitration.election_id
election_id.high = self.election_id[0]
election_id.low = self.election_id[1]
if self.role_name is not None:
arbitration.role.name = self.role_name
self.stream_out_q.put(req)
rep = self.get_stream_packet(STREAM_ATTR_ARBITRATION, timeout=2)
if rep is None:
LOGGER.critical('Failed to establish session with server')
sys.exit(1)
is_primary = (rep.arbitration.status.code == code_pb2.OK)
LOGGER.debug('Session established, client is %s',
'primary' if is_primary else 'backup')
if not is_primary:
LOGGER.warning(
'You are not the primary client, '
'you only have read access to the server')
def get_stream_packet(self, type_, timeout=1):
"""
Get a new message from the stream.
:param type_: stream type.
:param timeout: time to wait.
:return: message or None
"""
if type_ not in self.stream_in_q:
LOGGER.critical('Unknown stream type %s', type_)
return None
try:
msg = self.stream_in_q[type_].get(timeout=timeout)
return msg
except queue.Empty: # timeout expired
return None
@parse_p4runtime_error
def get_p4info(self):
"""
Retrieve P4Info content.
:return: P4Info object.
"""
LOGGER.debug('Retrieving P4Info file')
req = p4runtime_pb2.GetForwardingPipelineConfigRequest()
req.device_id = self.device_id
req.response_type =\
p4runtime_pb2.GetForwardingPipelineConfigRequest.P4INFO_AND_COOKIE
rep = self.stub.GetForwardingPipelineConfig(req)
return rep.config.p4info
@parse_p4runtime_error
def set_fwd_pipe_config(self, p4info_path, bin_path):
"""
Configure the pipeline.
:param p4info_path: path to the P4Info file
:param bin_path: path to the binary file
:return:
"""
LOGGER.debug('Setting forwarding pipeline config')
req = p4runtime_pb2.SetForwardingPipelineConfigRequest()
req.device_id = self.device_id
if self.role_name is not None:
req.role = self.role_name
election_id = req.election_id
election_id.high = self.election_id[0]
election_id.low = self.election_id[1]
req.action =\
p4runtime_pb2.SetForwardingPipelineConfigRequest.VERIFY_AND_COMMIT
with open(p4info_path, 'r', encoding='utf8') as f_1:
with open(bin_path, 'rb', encoding='utf8') as f_2:
try:
google.protobuf.text_format.Merge(
f_1.read(), req.config.p4info)
except google.protobuf.text_format.ParseError:
LOGGER.error('Error when parsing P4Info')
raise
req.config.p4_device_config = f_2.read()
return self.stub.SetForwardingPipelineConfig(req)
def tear_down(self):
"""
Tear connection with the gRPC server down.
"""
if self.stream_out_q:
LOGGER.debug('Cleaning up stream')
self.stream_out_q.put(None)
if self.stream_in_q:
for k in self.stream_in_q:
self.stream_in_q[k].put(None)
if self.stream_recv_thread:
self.stream_recv_thread.join()
self.channel.close()
del self.channel
"""
P4 device example configuration.
"""
from copy import deepcopy
try:
from .context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum
except ImportError:
from device.proto.context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum
from .Tools import config_rule_set
DEVICE_P4_ID = 0
DEVICE_P4_NAME = 'device:leaf1'
DEVICE_P4_TYPE = 'p4-switch'
DEVICE_P4_ADDRESS = '127.0.0.1'
DEVICE_P4_PORT = '50101'
DEVICE_P4_DRIVERS = [DeviceDriverEnum.DEVICEDRIVER_P4]
DEVICE_P4_VENDOR = 'Open Networking Foundation'
DEVICE_P4_HW_VER = 'BMv2 simple_switch'
DEVICE_P4_SW_VER = 'Stratum'
DEVICE_P4_PIPECONF = 'org.onosproject.pipelines.fabric'
DEVICE_P4_WORKERS = 2
DEVICE_P4_GRACE_PERIOD = 60
DEVICE_P4_UUID = {'device_uuid': {'uuid': DEVICE_P4_NAME}}
DEVICE_P4 = {
'device_id': deepcopy(DEVICE_P4_UUID),
'device_type': DEVICE_P4_TYPE,
'device_config': {'config_rules': []},
'device_operational_status': DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED,
'device_drivers': DEVICE_P4_DRIVERS,
'device_endpoints': [],
}
DEVICE_P4_CONNECT_RULES = [
config_rule_set('_connect/address', DEVICE_P4_ADDRESS),
config_rule_set('_connect/port', DEVICE_P4_PORT),
config_rule_set('_connect/settings', {
'id': int(DEVICE_P4_ID),
'name': DEVICE_P4_NAME,
'hw-ver': DEVICE_P4_HW_VER,
'sw-ver': DEVICE_P4_SW_VER,
'pipeconf': DEVICE_P4_PIPECONF
}),
]
DEVICE_P4_CONFIG_RULES = [
config_rule_set('key1', 'value1'),
]
"""
A mock P4Runtime server.
"""
import logging
from concurrent import futures
import grpc
from p4.v1 import p4runtime_pb2_grpc
from .device_p4 import(
DEVICE_P4_ADDRESS, DEVICE_P4_PORT,
DEVICE_P4_WORKERS, DEVICE_P4_GRACE_PERIOD)
from .mock_p4runtime_servicer_impl import MockP4RuntimeServicerImpl
LOGGER = logging.getLogger(__name__)
class MockP4RuntimeService:
"""
P4Runtime server for testing purposes.
"""
def __init__(
self, address=DEVICE_P4_ADDRESS, port=DEVICE_P4_PORT,
max_workers=DEVICE_P4_WORKERS,
grace_period=DEVICE_P4_GRACE_PERIOD):
self.address = address
self.port = port
self.endpoint = f'{self.address}:{self.port}'
self.max_workers = max_workers
self.grace_period = grace_period
self.server = None
self.servicer = None
def start(self):
"""
Start the P4Runtime server.
"""
LOGGER.info(
'Starting P4Runtime service on %s with max_workers: %s',
str(self.endpoint), str(self.max_workers))
self.server = grpc.server(
futures.ThreadPoolExecutor(max_workers=self.max_workers))
self.servicer = MockP4RuntimeServicerImpl()
p4runtime_pb2_grpc.add_P4RuntimeServicer_to_server(
self.servicer, self.server)
_ = self.server.add_insecure_port(self.endpoint)
LOGGER.info('Listening on %s...', str(self.endpoint))
self.server.start()
LOGGER.debug('P4Runtime service started')
def stop(self):
"""
Stop the P4Runtime server.
"""
LOGGER.debug(
'Stopping P4Runtime service (grace period %d seconds...',
self.grace_period)
self.server.stop(self.grace_period)
LOGGER.debug('P4Runtime service stopped')
"""
A mock P4Runtime service implementation.
"""
import queue
from google.rpc import code_pb2
from p4.v1 import p4runtime_pb2, p4runtime_pb2_grpc
from p4.config.v1 import p4info_pb2
try:
from p4_util import STREAM_ATTR_ARBITRATION, STREAM_ATTR_PACKET
except ImportError:
from device.service.drivers.p4.p4_util import STREAM_ATTR_ARBITRATION,\
STREAM_ATTR_PACKET
class MockP4RuntimeServicerImpl(p4runtime_pb2_grpc.P4RuntimeServicer):
"""
A P4Runtime service implementation for testing purposes.
"""
def __init__(self):
self.p4info = p4info_pb2.P4Info()
self.p4runtime_api_version = "1.3.0"
self.stored_packet_out = queue.Queue()
def GetForwardingPipelineConfig(self, request, context):
rep = p4runtime_pb2.GetForwardingPipelineConfigResponse()
if self.p4info is not None:
rep.config.p4info.CopyFrom(self.p4info)
return rep
def SetForwardingPipelineConfig(self, request, context):
self.p4info.CopyFrom(request.config.p4info)
return p4runtime_pb2.SetForwardingPipelineConfigResponse()
def Write(self, request, context):
return p4runtime_pb2.WriteResponse()
def Read(self, request, context):
yield p4runtime_pb2.ReadResponse()
def StreamChannel(self, request_iterator, context):
for req in request_iterator:
if req.HasField(STREAM_ATTR_ARBITRATION):
rep = p4runtime_pb2.StreamMessageResponse()
rep.arbitration.CopyFrom(req.arbitration)
rep.arbitration.status.code = code_pb2.OK
yield rep
elif req.HasField(STREAM_ATTR_PACKET):
self.stored_packet_out.put(req)
def Capabilities(self, request, context):
rep = p4runtime_pb2.CapabilitiesResponse()
rep.p4runtime_api_version = self.p4runtime_api_version
return rep
import pytest
from device.service.drivers.p4.p4_driver import P4Driver
from .device_p4 import(
DEVICE_P4_ADDRESS, DEVICE_P4_PORT, DEVICE_P4_ID, DEVICE_P4_NAME,
DEVICE_P4_VENDOR, DEVICE_P4_HW_VER, DEVICE_P4_SW_VER,
DEVICE_P4_PIPECONF, DEVICE_P4_WORKERS, DEVICE_P4_GRACE_PERIOD)
from .mock_p4runtime_service import MockP4RuntimeService
@pytest.fixture(scope='session')
def p4runtime_service():
_service = MockP4RuntimeService(
address=DEVICE_P4_ADDRESS, port=DEVICE_P4_PORT,
max_workers=DEVICE_P4_WORKERS,
grace_period=DEVICE_P4_GRACE_PERIOD)
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def device_driverapi_p4():
_driver = P4Driver(
address=DEVICE_P4_ADDRESS,
port=DEVICE_P4_PORT,
id=DEVICE_P4_ID,
name=DEVICE_P4_NAME,
vendor=DEVICE_P4_VENDOR,
hw_ver=DEVICE_P4_HW_VER,
sw_ver=DEVICE_P4_SW_VER,
pipeconf=DEVICE_P4_PIPECONF)
_driver.Connect()
yield _driver
_driver.Disconnect()
def test_device_driverapi_p4_setconfig(
p4runtime_service: MockP4RuntimeService,
device_driverapi_p4: P4Driver): # pylint: disable=redefined-outer-name
device_driverapi_p4.SetConfig([])
return
def test_device_driverapi_p4_getconfig(
p4runtime_service: MockP4RuntimeService,
device_driverapi_p4: P4Driver): # pylint: disable=redefined-outer-name
device_driverapi_p4.GetConfig()
return
def test_device_driverapi_p4_getresource(
p4runtime_service: MockP4RuntimeService,
device_driverapi_p4: P4Driver): # pylint: disable=redefined-outer-name
device_driverapi_p4.GetResource("")
return
def test_device_driverapi_p4_getstate(
p4runtime_service: MockP4RuntimeService,
device_driverapi_p4: P4Driver): # pylint: disable=redefined-outer-name
device_driverapi_p4.GetState()
return
def test_device_driverapi_p4_deleteconfig(
p4runtime_service: MockP4RuntimeService,
device_driverapi_p4: P4Driver): # pylint: disable=redefined-outer-name
device_driverapi_p4.DeleteConfig([])
return
def test_device_driverapi_p4_subscribe_state(
p4runtime_service: MockP4RuntimeService,
device_driverapi_p4: P4Driver): # pylint: disable=redefined-outer-name
device_driverapi_p4.SubscribeState([])
return
def test_device_driverapi_p4_unsubscribe_state(
p4runtime_service: MockP4RuntimeService,
device_driverapi_p4: P4Driver): # pylint: disable=redefined-outer-name
device_driverapi_p4.UnsubscribeState([])
return
......@@ -65,6 +65,16 @@ except ImportError:
#ENABLE_OPENCONFIG = False
#ENABLE_TAPI = False
from .mock_p4runtime_service import MockP4RuntimeService
try:
from .device_p4 import(
DEVICE_P4, DEVICE_P4_ID, DEVICE_P4_UUID, DEVICE_P4_NAME,
DEVICE_P4_ADDRESS, DEVICE_P4_PORT, DEVICE_P4_WORKERS,
DEVICE_P4_GRACE_PERIOD, DEVICE_P4_CONNECT_RULES,
DEVICE_P4_CONFIG_RULES)
except ImportError:
raise ImportError("Test configuration for P4 devices not found")
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
......@@ -147,6 +157,16 @@ def device_client(device_service : DeviceService): # pylint: disable=redefined-o
yield _client
_client.close()
@pytest.fixture(scope='session')
def p4runtime_service():
_service = MockP4RuntimeService(
address=DEVICE_P4_ADDRESS, port=DEVICE_P4_PORT,
max_workers=DEVICE_P4_WORKERS,
grace_period=DEVICE_P4_GRACE_PERIOD)
_service.start()
yield _service
_service.stop()
def grpc_message_to_json_string(message):
return str(MessageToDict(
message, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False))
......@@ -161,7 +181,7 @@ def test_prepare_environment(
context_client.SetTopology(Topology(**TOPOLOGY))
# ----- Test Device Driver Emulated ------------------------------------------------------------------------------------
# ----- Test Device Driver Emulated --------------------------------------------
# Device Driver Emulated tests are used to validate Driver API as well as Emulated Device Driver. Note that other
# Drivers might support a different set of resource paths, and attributes/values per resource; however, they must
# implement the Driver API.
......@@ -471,7 +491,7 @@ def test_device_emulated_delete(
assert driver is None
# ----- Test Device Driver OpenConfig ----------------------------------------------------------------------------------
# ----- Test Device Driver OpenConfig ------------------------------------------
def test_device_openconfig_add_error_cases(
context_client : ContextClient, # pylint: disable=redefined-outer-name
......@@ -603,7 +623,7 @@ def test_device_openconfig_delete(
assert driver is None
# ----- Test Device Driver TAPI ----------------------------------------------------------------------------------
# ----- Test Device Driver TAPI ------------------------------------------------
def test_device_tapi_add_error_cases(
device_client : DeviceClient): # pylint: disable=redefined-outer-name
......@@ -728,3 +748,81 @@ def test_device_tapi_delete(
device_client.DeleteDevice(DeviceId(**DEVICE_TAPI_ID))
driver : _Driver = device_service.driver_instance_cache.get(DEVICE_TAPI_UUID, {})
assert driver is None
# ----- Test Device Driver P4 --------------------------------------------------
def test_device_p4_add_error_cases(
context_client: ContextClient, # pylint: disable=redefined-outer-name
device_client: DeviceClient, # pylint: disable=redefined-outer-name
device_service: DeviceService): # pylint: disable=redefined-outer-name
with pytest.raises(grpc.RpcError) as e:
device_p4_with_extra_rules = copy.deepcopy(DEVICE_P4)
device_p4_with_extra_rules['device_config']['config_rules'].extend(
DEVICE_P4_CONNECT_RULES)
device_p4_with_extra_rules['device_config']['config_rules'].extend(
DEVICE_P4_CONFIG_RULES)
device_client.AddDevice(Device(**device_p4_with_extra_rules))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg_head = 'device.device_config.config_rules(['
msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\
'with "_connect/" tag. Others should be configured after adding the device.'
except_msg = str(e.value.details())
assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail)
def test_device_p4_add_correct(
context_client: ContextClient, # pylint: disable=redefined-outer-name
device_client: DeviceClient, # pylint: disable=redefined-outer-name
device_service: DeviceService, # pylint: disable=redefined-outer-name
p4runtime_service: MockP4RuntimeService):
device_p4_with_connect_rules = copy.deepcopy(DEVICE_P4)
device_p4_with_connect_rules['device_config']['config_rules'].extend(
DEVICE_P4_CONNECT_RULES)
device_client.AddDevice(Device(**device_p4_with_connect_rules))
driver : _Driver = device_service.driver_instance_cache.get(DEVICE_P4_NAME)
assert driver is not None
def test_device_p4_get(
context_client: ContextClient, # pylint: disable=redefined-outer-name
device_client: DeviceClient, # pylint: disable=redefined-outer-name
device_service: DeviceService, # pylint: disable=redefined-outer-name
p4runtime_service: MockP4RuntimeService):
initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_P4_UUID))
LOGGER.info('initial_config = {:s}'.format(
grpc_message_to_json_string(initial_config)))
device_data = context_client.GetDevice(DeviceId(**DEVICE_P4_UUID))
LOGGER.info('device_data = {:s}'.format(
grpc_message_to_json_string(device_data)))
def test_device_p4_configure(
context_client: ContextClient, # pylint: disable=redefined-outer-name
device_client: DeviceClient, # pylint: disable=redefined-outer-name
device_service: DeviceService, # pylint: disable=redefined-outer-name
p4runtime_service: MockP4RuntimeService):
pytest.skip("Skipping test for unimplemented method")
def test_device_p4_deconfigure(
context_client: ContextClient, # pylint: disable=redefined-outer-name
device_client: DeviceClient, # pylint: disable=redefined-outer-name
device_service: DeviceService, # pylint: disable=redefined-outer-name
p4runtime_service: MockP4RuntimeService):
pytest.skip("Skipping test for unimplemented method")
def test_device_p4_delete(
context_client: ContextClient, # pylint: disable=redefined-outer-name
device_client: DeviceClient, # pylint: disable=redefined-outer-name
device_service: DeviceService, # pylint: disable=redefined-outer-name
p4runtime_service: MockP4RuntimeService):
device_client.DeleteDevice(DeviceId(**DEVICE_P4_UUID))
driver : _Driver = device_service.driver_instance_cache.get(DEVICE_P4_NAME)
assert driver is None
......@@ -14,3 +14,4 @@ pytz
redis
requests
xmltodict
p4runtime==1.3.0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment