Newer
Older
import grpc, logging
from prometheus_client import Counter, Histogram
from common.database.api.context.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID
from common.database.api.Database import Database
from common.exceptions.ServiceException import ServiceException
from context.proto.context_pb2 import Empty, Link, LinkId, Topology
from context.proto.context_pb2_grpc import ContextServiceServicer
LOGGER = logging.getLogger(__name__)
GETTOPOLOGY_COUNTER_STARTED = Counter ('context_gettopology_counter_started',
'Context:GetTopology counter of requests started' )
GETTOPOLOGY_COUNTER_COMPLETED = Counter ('context_gettopology_counter_completed',
'Context:GetTopology counter of requests completed')
GETTOPOLOGY_COUNTER_FAILED = Counter ('context_gettopology_counter_failed',
'Context:GetTopology counter of requests failed' )
GETTOPOLOGY_HISTOGRAM_DURATION = Histogram('context_gettopology_histogram_duration',
'Context:GetTopology histogram of request duration')
ADDLINK_COUNTER_STARTED = Counter ('context_addlink_counter_started',
'Context:AddLink counter of requests started' )
ADDLINK_COUNTER_COMPLETED = Counter ('context_addlink_counter_completed',
'Context:AddLink counter of requests completed')
ADDLINK_COUNTER_FAILED = Counter ('context_addlink_counter_failed',
'Context:AddLink counter of requests failed' )
ADDLINK_HISTOGRAM_DURATION = Histogram('context_addlink_histogram_duration',
'Context:AddLink histogram of request duration')
DELETELINK_COUNTER_STARTED = Counter ('context_deletelink_counter_started',
'Context:DeleteLink counter of requests started' )
DELETELINK_COUNTER_COMPLETED = Counter ('context_deletelink_counter_completed',
'Context:DeleteLink counter of requests completed')
DELETELINK_COUNTER_FAILED = Counter ('context_deletelink_counter_failed',
'Context:DeleteLink counter of requests failed' )
DELETELINK_HISTOGRAM_DURATION = Histogram('context_deletelink_histogram_duration',
'Context:DeleteLink histogram of request duration')
class ContextServiceServicerImpl(ContextServiceServicer):
LOGGER.debug('Creating Servicer...')
self.database = database
LOGGER.debug('Servicer Created')
@GETTOPOLOGY_HISTOGRAM_DURATION.time()
def GetTopology(self, request : Empty, grpc_context : grpc.ServicerContext) -> Topology:
GETTOPOLOGY_COUNTER_STARTED.inc()
try:
LOGGER.debug('GetTopology request: {}'.format(str(request)))
# ----- Validate request data and pre-conditions -----------------------------------------------------------
db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()
# ----- Retrieve data from the database --------------------------------------------------------------------
json_topology = db_topology.dump()
# ----- Compose reply --------------------------------------------------------------------------------------
reply = Topology(**json_topology)
LOGGER.debug('GetTopology reply: {}'.format(str(reply)))
GETTOPOLOGY_COUNTER_COMPLETED.inc()
return reply
except ServiceException as e: # pragma: no cover (ServiceException not thrown)
grpc_context.abort(e.code, e.details) # pragma: no cover (ServiceException not thrown)
except Exception as e: # pragma: no cover
LOGGER.exception('GetTopology exception') # pragma: no cover
GETTOPOLOGY_COUNTER_FAILED.inc() # pragma: no cover
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # pragma: no cover
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@ADDLINK_HISTOGRAM_DURATION.time()
def AddLink(self, request : Link, grpc_context : grpc.ServicerContext) -> LinkId:
ADDLINK_COUNTER_STARTED.inc()
try:
LOGGER.debug('AddLink request: {}'.format(str(request)))
# ----- Validate request data and pre-conditions -----------------------------------------------------------
try:
link_id = chk_string('link.link_id.link_id.uuid',
request.link_id.link_id.uuid,
allow_empty=False)
except Exception as e:
LOGGER.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()
if db_topology.links.contains(link_id):
msg = 'Link({}) already exists in the database.'
msg = msg.format(link_id)
raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg)
added_devices_and_endpoints : Dict[str, Set[str]] = {}
device_endpoint_pairs : List[Tuple[str, str]] = []
for i,endpoint in enumerate(request.endpointList):
try:
ep_context_id = chk_string('endpoint[#{}].topoId.contextId.contextUuid.uuid'.format(i),
endpoint.topoId.contextId.contextUuid.uuid,
allow_empty=True)
ep_topology_id = chk_string('endpoint[#{}].topoId.topoId.uuid'.format(i),
endpoint.topoId.topoId.uuid,
allow_empty=True)
ep_device_id = chk_string('endpoint[#{}].dev_id.device_id.uuid'.format(i),
endpoint.dev_id.device_id.uuid,
allow_empty=False)
ep_port_id = chk_string('endpoint[#{}].port_id.uuid'.format(i),
endpoint.port_id.uuid,
allow_empty=False)
except Exception as e:
LOGGER.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
if (len(ep_context_id) > 0) and (ep_context_id != DEFAULT_CONTEXT_ID):
msg = ' '.join([
'Unsupported Context({}) in Endpoint(#{}) of Link({}).',
'Only default Context({}) is currently supported.',
'Optionally, leave field empty to use default Context.',
])
msg = msg.format(ep_context_id, i, link_id, DEFAULT_CONTEXT_ID)
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
elif len(ep_context_id) == 0:
ep_context_id = DEFAULT_CONTEXT_ID
if (len(ep_topology_id) > 0) and (ep_topology_id != DEFAULT_TOPOLOGY_ID):
msg = ' '.join([
'Unsupported Topology({}) in Endpoint(#{}) of Link({}).',
'Only default Topology({}) is currently supported.',
'Optionally, leave field empty to use default Topology.',
])
msg = msg.format(ep_topology_id, i, link_id, DEFAULT_TOPOLOGY_ID)
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
elif len(ep_topology_id) == 0:
ep_topology_id = DEFAULT_TOPOLOGY_ID
if ep_device_id in added_devices_and_endpoints:
msg = 'Duplicated Device({}) in Endpoint(#{}) of Link({}).'
msg = msg.format(ep_device_id, i, link_id)
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
if not db_topology.devices.contains(ep_device_id):
msg = ' '.join([
'Context({})/Topology({})/Device({}) in Endpoint(#{}) of Link({})',
'does not exist in the database.',
])
msg = msg.format(ep_context_id, ep_topology_id, ep_device_id, ep_port_id, i, link_id)
raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
added_device_and_endpoints = added_devices_and_endpoints.setdefault(ep_device_id, set())
# should never happen since same device cannot appear 2 times in the link
if ep_port_id in added_device_and_endpoints: # pragma: no cover
msg = 'Duplicated Device({})/Port({}) in Endpoint(#{}) of Link({}).' # pragma: no cover
msg = msg.format(ep_device_id, ep_port_id, i, link_id) # pragma: no cover
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) # pragma: no cover
if not db_topology.device(ep_device_id).endpoints.contains(ep_port_id):
msg = ' '.join([
'Context({})/Topology({})/Device({})/Port({}) in Endpoint(#{}) of Link({})',
'does not exist in the database.',
])
msg = msg.format(ep_context_id, ep_topology_id, ep_device_id, ep_port_id, i, link_id)
raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
added_device_and_endpoints.add(ep_port_id)
device_endpoint_pairs.append((ep_device_id, ep_port_id))
# ----- Implement changes in the database ------------------------------------------------------------------
db_link = db_topology.link(link_id).create()
for device_id,endpoint_id in device_endpoint_pairs:
link_endpoint_id = '{}/{}'.format(device_id, endpoint_id)
db_endpoint = db_topology.device(device_id).endpoint(endpoint_id)
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
db_link.endpoint(link_endpoint_id).create(db_endpoint)
# ----- Compose reply --------------------------------------------------------------------------------------
reply = LinkId(**db_link.dump_id())
LOGGER.debug('AddLink reply: {}'.format(str(reply)))
ADDLINK_COUNTER_COMPLETED.inc()
return reply
except ServiceException as e:
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('AddLink exception') # pragma: no cover
ADDLINK_COUNTER_FAILED.inc() # pragma: no cover
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # pragma: no cover
@DELETELINK_HISTOGRAM_DURATION.time()
def DeleteLink(self, request : LinkId, grpc_context : grpc.ServicerContext) -> Empty:
DELETELINK_COUNTER_STARTED.inc()
try:
LOGGER.debug('DeleteLink request: {}'.format(str(request)))
# ----- Validate request data and pre-conditions -----------------------------------------------------------
try:
link_id = chk_string('link_id.link_id.uuid',
request.link_id.uuid,
allow_empty=False)
except Exception as e:
LOGGER.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()
if not db_topology.links.contains(link_id):
msg = 'Link({}) does not exist in the database.'
msg = msg.format(link_id)
raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
# ----- Implement changes in the database ------------------------------------------------------------------
db_topology.link(link_id).delete()
# ----- Compose reply --------------------------------------------------------------------------------------
reply = Empty()
LOGGER.debug('DeleteLink reply: {}'.format(str(reply)))
DELETELINK_COUNTER_COMPLETED.inc()
return reply
except ServiceException as e:
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteLink exception') # pragma: no cover
DELETELINK_COUNTER_FAILED.inc() # pragma: no cover
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # pragma: no cover