import grpc, anytree, logging, threading from typing import Any, Iterator, List, Optional, Tuple, Union from .P4Util import P4RuntimeClient,\ P4_ATTR_DEV_ID, P4_ATTR_DEV_NAME, P4_ATTR_DEV_VENDOR,\ P4_ATTR_DEV_HW_VER, P4_ATTR_DEV_SW_VER, P4_ATTR_DEV_PIPECONF try: from Checkers import chk_float, chk_length, chk_string, chk_type from _Driver import _Driver from AnyTreeTools import TreeNode, dump_subtree, get_subnode,\ set_subnode_value except ImportError: from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type from device.service.driver_api._Driver import _Driver from device.service.driver_api.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value LOGGER = logging.getLogger(__name__) class P4Driver(_Driver): def __init__(self, address: str, port: int, **settings) -> None: # pylint: disable=super-init-not-called self.__client = None self.__address = address self.__port = int(port) self.__settings = settings if P4_ATTR_DEV_ID in self.__settings: self.__dev_id = self.__settings.get(P4_ATTR_DEV_ID) if P4_ATTR_DEV_NAME in self.__settings: self.__dev_name = self.__settings.get(P4_ATTR_DEV_NAME) if P4_ATTR_DEV_VENDOR in self.__settings: self.__dev_vendor = self.__settings.get(P4_ATTR_DEV_VENDOR) if P4_ATTR_DEV_HW_VER in self.__settings: self.__dev_hw_version = self.__settings.get(P4_ATTR_DEV_HW_VER) if P4_ATTR_DEV_SW_VER in self.__settings: self.__dev_sw_version = self.__settings.get(P4_ATTR_DEV_SW_VER) if P4_ATTR_DEV_PIPECONF in self.__settings: self.__dev_pipeconf = self.__settings.get(P4_ATTR_DEV_PIPECONF) LOGGER.info('Initializing P4 device at {}:{} with settings:'.format( self.__address, self.__port)) self.__lock = threading.Lock() self.__started = threading.Event() self.__terminate = threading.Event() for key, value in settings.items(): LOGGER.info('\t%8s = %s' %(key, value)) def Connect(self) -> bool: LOGGER.info('Connecting to P4 device {}:{}...'.format( self.__address, self.__port)) with self.__lock: # Skip if already connected if self.__started.is_set(): return True # Instantiate a gRPC channel with the P4 device grpc_address = '{}:{}'.format(self.__address, self.__port) election_id = (1, 0) self.__client = P4RuntimeClient(self.__dev_id, grpc_address, election_id) LOGGER.info('\tConnected!') self.__started.set() return True def Disconnect(self) -> bool: LOGGER.info('Disconnecting from P4 device {}:{}...'.format( self.__address, self.__port)) # If not started, assume it is already disconnected if not self.__started.is_set(): return True # gRPC client must already be instantiated assert self.__client # Trigger termination of loops and processes self.__terminate.set() # Trigger connection tear down with the P4Runtime server self.__client.tear_down() self.__client = None LOGGER.info('\tDisonnected!') return True def GetInitialConfig(self) -> List[Tuple[str, Any]]: LOGGER.info('P4 GetInitialConfig()') return [] def GetConfig(self, resource_keys : List[str] = [])\ -> List[Tuple[str, Union[Any, None, Exception]]]: LOGGER.info('P4 GetConfig()') return [] def SetConfig(self, resources : List[Tuple[str, Any]])\ -> List[Union[bool, Exception]]: LOGGER.info('P4 SetConfig()') return [] def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: LOGGER.info('P4 DeleteConfig()') return [] def GetResource(self, endpoint_uuid : str) -> Optional[str]: LOGGER.info('P4 GetResource()') return "" def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]: LOGGER.info('P4 GetState()') return [] def SubscribeState(self, subscriptions : List[Tuple[str, float, float]])\ -> List[Union[bool, Exception]]: LOGGER.info('P4 SubscribeState()') return [] def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]])\ -> List[Union[bool, Exception]]: LOGGER.info('P4 UnsubscribeState()') return []