Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import queue
from google.rpc import code_pb2
from p4.v1 import p4runtime_pb2, p4runtime_pb2_grpc
from p4.config.v1 import p4info_pb2
class MockP4RuntimeServicerImpl(p4runtime_pb2_grpc.P4RuntimeServicer):
def __init__(self):
self.p4info = p4info_pb2.P4Info()
self.p4runtime_api_version = "1.3.0"
self.stored_packet_out = queue.Queue()
def GetForwardingPipelineConfig(self, request, context):
rep = p4runtime_pb2.GetForwardingPipelineConfigResponse()
if self.p4info is not None:
rep.config.p4info.CopyFrom(self.p4info)
return rep
def SetForwardingPipelineConfig(self, request, context):
self.p4info.CopyFrom(request.config.p4info)
return p4runtime_pb2.SetForwardingPipelineConfigResponse()
def Write(self, request, context):
return p4runtime_pb2.WriteResponse()
def Read(self, request, context):
yield p4runtime_pb2.ReadResponse()
def StreamChannel(self, request_iterator, context):
for req in request_iterator:
if req.HasField('arbitration'):
rep = p4runtime_pb2.StreamMessageResponse()
rep.arbitration.CopyFrom(req.arbitration)
rep.arbitration.status.code = code_pb2.OK
yield rep
elif req.HasField('packet'):
self.stored_packet_out.put(req)
def Capabilities(self, request, context):
rep = p4runtime_pb2.CapabilitiesResponse()
rep.p4runtime_api_version = self.p4runtime_api_version
return rep