Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import grpc, anytree, logging, threading
from typing import Any, Iterator, List, Optional, Tuple, Union
from .P4Util import P4RuntimeClient,\
P4_ATTR_DEV_ID, P4_ATTR_DEV_NAME, P4_ATTR_DEV_VENDOR,\
P4_ATTR_DEV_HW_VER, P4_ATTR_DEV_SW_VER, P4_ATTR_DEV_PIPECONF
try:
from Checkers import chk_float, chk_length, chk_string, chk_type
from _Driver import _Driver
from AnyTreeTools import TreeNode, dump_subtree, get_subnode,\
set_subnode_value
except ImportError:
from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from device.service.driver_api.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value
LOGGER = logging.getLogger(__name__)
class P4Driver(_Driver):
def __init__(self, address: str, port: int, **settings) -> None:
# pylint: disable=super-init-not-called
self.__client = None
self.__address = address
self.__port = int(port)
self.__settings = settings
if P4_ATTR_DEV_ID in self.__settings:
self.__dev_id = self.__settings.get(P4_ATTR_DEV_ID)
if P4_ATTR_DEV_NAME in self.__settings:
self.__dev_name = self.__settings.get(P4_ATTR_DEV_NAME)
if P4_ATTR_DEV_VENDOR in self.__settings:
self.__dev_vendor = self.__settings.get(P4_ATTR_DEV_VENDOR)
if P4_ATTR_DEV_HW_VER in self.__settings:
self.__dev_hw_version = self.__settings.get(P4_ATTR_DEV_HW_VER)
if P4_ATTR_DEV_SW_VER in self.__settings:
self.__dev_sw_version = self.__settings.get(P4_ATTR_DEV_SW_VER)
if P4_ATTR_DEV_PIPECONF in self.__settings:
self.__dev_pipeconf = self.__settings.get(P4_ATTR_DEV_PIPECONF)
LOGGER.info('Initializing P4 device at {}:{} with settings:'.format(
self.__address, self.__port))
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
for key, value in settings.items():
LOGGER.info('\t%8s = %s' %(key, value))
def Connect(self) -> bool:
LOGGER.info('Connecting to P4 device {}:{}...'.format(
self.__address, self.__port))
with self.__lock:
# Skip if already connected
if self.__started.is_set():
return True
# Instantiate a gRPC channel with the P4 device
grpc_address = '{}:{}'.format(self.__address, self.__port)
election_id = (1, 0)
self.__client = P4RuntimeClient(self.__dev_id, grpc_address, election_id)
LOGGER.info('\tConnected!')
self.__started.set()
return True
def Disconnect(self) -> bool:
LOGGER.info('Disconnecting from P4 device {}:{}...'.format(
self.__address, self.__port))
# If not started, assume it is already disconnected
if not self.__started.is_set():
return True
# gRPC client must already be instantiated
assert self.__client
# Trigger termination of loops and processes
self.__terminate.set()
# Trigger connection tear down with the P4Runtime server
self.__client.tear_down()
self.__client = None
LOGGER.info('\tDisonnected!')
return True
def GetInitialConfig(self) -> List[Tuple[str, Any]]:
LOGGER.info('P4 GetInitialConfig()')
return []
def GetConfig(self, resource_keys : List[str] = [])\
-> List[Tuple[str, Union[Any, None, Exception]]]:
LOGGER.info('P4 GetConfig()')
return []
def SetConfig(self, resources : List[Tuple[str, Any]])\
-> List[Union[bool, Exception]]:
LOGGER.info('P4 SetConfig()')
return []
def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
LOGGER.info('P4 DeleteConfig()')
return []
def GetResource(self, endpoint_uuid : str) -> Optional[str]:
LOGGER.info('P4 GetResource()')
return ""
def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]:
LOGGER.info('P4 GetState()')
return []
def SubscribeState(self, subscriptions : List[Tuple[str, float, float]])\
-> List[Union[bool, Exception]]:
LOGGER.info('P4 SubscribeState()')
return []
def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]])\
-> List[Union[bool, Exception]]:
LOGGER.info('P4 UnsubscribeState()')
return []