Skip to content
P4Driver.py 4.51 KiB
Newer Older
import grpc, anytree, logging, threading
from typing import Any, Iterator, List, Optional, Tuple, Union
from .P4Util import P4RuntimeClient,\
    P4_ATTR_DEV_ID, P4_ATTR_DEV_NAME, P4_ATTR_DEV_VENDOR,\
    P4_ATTR_DEV_HW_VER, P4_ATTR_DEV_SW_VER, P4_ATTR_DEV_PIPECONF

try:
    from Checkers import chk_float, chk_length, chk_string, chk_type
    from _Driver import _Driver
    from AnyTreeTools import TreeNode, dump_subtree, get_subnode,\
        set_subnode_value
except ImportError:
    from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type
    from device.service.driver_api._Driver import _Driver
    from device.service.driver_api.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value

LOGGER = logging.getLogger(__name__)


class P4Driver(_Driver):
    def __init__(self, address: str, port: int, **settings) -> None:
            # pylint: disable=super-init-not-called
        self.__client = None
        self.__address = address
        self.__port = int(port)
        self.__settings = settings
        if P4_ATTR_DEV_ID in self.__settings:
            self.__dev_id = self.__settings.get(P4_ATTR_DEV_ID)
        if P4_ATTR_DEV_NAME in self.__settings:
            self.__dev_name = self.__settings.get(P4_ATTR_DEV_NAME)
        if P4_ATTR_DEV_VENDOR in self.__settings:
            self.__dev_vendor = self.__settings.get(P4_ATTR_DEV_VENDOR)
        if P4_ATTR_DEV_HW_VER in self.__settings:
            self.__dev_hw_version = self.__settings.get(P4_ATTR_DEV_HW_VER)
        if P4_ATTR_DEV_SW_VER in self.__settings:
            self.__dev_sw_version = self.__settings.get(P4_ATTR_DEV_SW_VER)
        if P4_ATTR_DEV_PIPECONF in self.__settings:
            self.__dev_pipeconf = self.__settings.get(P4_ATTR_DEV_PIPECONF)

        LOGGER.info('Initializing P4 device at {}:{} with settings:'.format(
            self.__address, self.__port))

        self.__lock = threading.Lock()
        self.__started = threading.Event()
        self.__terminate = threading.Event()

        for key, value in settings.items():
            LOGGER.info('\t%8s = %s' %(key, value))

    def Connect(self) -> bool:
        LOGGER.info('Connecting to P4 device {}:{}...'.format(
            self.__address, self.__port))
        with self.__lock:
            # Skip if already connected
            if self.__started.is_set():
                return True

            # Instantiate a gRPC channel with the P4 device
            grpc_address = '{}:{}'.format(self.__address, self.__port)
            election_id = (1, 0)
            self.__client = P4RuntimeClient(self.__dev_id, grpc_address, election_id)
            LOGGER.info('\tConnected!')
            self.__started.set()

            return True

    def Disconnect(self) -> bool:
        LOGGER.info('Disconnecting from P4 device {}:{}...'.format(
            self.__address, self.__port))

        # If not started, assume it is already disconnected
        if not self.__started.is_set():
            return True

        # gRPC client must already be instantiated
        assert self.__client

        # Trigger termination of loops and processes
        self.__terminate.set()

        # Trigger connection tear down with the P4Runtime server
        self.__client.tear_down()
        self.__client = None

        LOGGER.info('\tDisonnected!')

        return True

    def GetInitialConfig(self) -> List[Tuple[str, Any]]:
        LOGGER.info('P4 GetInitialConfig()')
        return []

    def GetConfig(self, resource_keys : List[str] = [])\
            -> List[Tuple[str, Union[Any, None, Exception]]]:
        LOGGER.info('P4 GetConfig()')
        return []

    def SetConfig(self, resources : List[Tuple[str, Any]])\
            -> List[Union[bool, Exception]]:
        LOGGER.info('P4 SetConfig()')
        return []

    def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        LOGGER.info('P4 DeleteConfig()')
        return []

    def GetResource(self, endpoint_uuid : str) -> Optional[str]:
        LOGGER.info('P4 GetResource()')
        return ""

    def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]:
        LOGGER.info('P4 GetState()')
        return []

    def SubscribeState(self, subscriptions : List[Tuple[str, float, float]])\
            -> List[Union[bool, Exception]]:
        LOGGER.info('P4 SubscribeState()')
        return []

    def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]])\
            -> List[Union[bool, Exception]]:
        LOGGER.info('P4 UnsubscribeState()')
        return []