Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, json, logging
from typing import Any, Dict, Iterator, List, Set
from common.proto.context_pb2 import (
Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
Context, ContextEvent, ContextId, ContextIdList, ContextList,
Device, DeviceEvent, DeviceFilter, DeviceId, DeviceIdList, DeviceList, Empty,
EventTypeEnum, Link, LinkEvent, LinkId, LinkIdList, LinkList,
Service, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList,
Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList,
Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList)
from common.proto.context_pb2_grpc import ContextServiceServicer
from common.tests.MockMessageBroker import (
TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY,
MockMessageBroker, notify_event)
Lluis Gifre Renom
committed
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
LOGGER = logging.getLogger(__name__)
def get_container(database : Dict[str, Dict[str, Any]], container_name : str) -> Dict[str, Any]:
return database.setdefault(container_name, {})
def get_entries(database : Dict[str, Dict[str, Any]], container_name : str) -> List[Any]:
container = get_container(database, container_name)
return [container[entry_uuid] for entry_uuid in sorted(container.keys())]
def has_entry(database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str) -> Any:
LOGGER.debug('[has_entry] BEFORE database={:s}'.format(str(database)))
container = get_container(database, container_name)
return entry_uuid in container
def get_entry(
context : grpc.ServicerContext, database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str
) -> Any:
LOGGER.debug('[get_entry] BEFORE database={:s}'.format(str(database)))
container = get_container(database, container_name)
if entry_uuid not in container:
context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid)))
return container[entry_uuid]
def set_entry(database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str, entry : Any) -> Any:
container = get_container(database, container_name)
LOGGER.debug('[set_entry] BEFORE database={:s}'.format(str(database)))
container[entry_uuid] = entry
LOGGER.debug('[set_entry] AFTER database={:s}'.format(str(database)))
return entry
def del_entry(
context : grpc.ServicerContext, database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str
) -> Any:
container = get_container(database, container_name)
if entry_uuid not in container:
context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid)))
del container[entry_uuid]
return Empty()
def select_entries(database : Dict[str, Dict[str, Any]], container_name : str, entry_uuids : Set[str]) -> List[Any]:
if len(entry_uuids) == 0: return get_entries(database, container_name)
container = get_container(database, container_name)
return [
container[entry_uuid]
for entry_uuid in sorted(container.keys())
if entry_uuid in entry_uuids
]
class MockServicerImpl_Context(ContextServiceServicer):
def __init__(self):
LOGGER.info('[__init__] Creating Servicer...')
self.database : Dict[str, Dict[str, Any]] = {}
self.msg_broker = MockMessageBroker()
LOGGER.info('[__init__] Servicer Created')
# ----- Common -----------------------------------------------------------------------------------------------------
def _set(self, request, container_name, entry_uuid, entry_id_field_name, topic_name):
exists = has_entry(self.database, container_name, entry_uuid)
entry = set_entry(self.database, container_name, entry_uuid, request)
event_type = EventTypeEnum.EVENTTYPE_UPDATE if exists else EventTypeEnum.EVENTTYPE_CREATE
entry_id = getattr(entry, entry_id_field_name)
dict_entry_id = grpc_message_to_json(entry_id)
notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id})
return entry_id
def _del(self, request, container_name, entry_uuid, entry_id_field_name, topic_name, grpc_context):
empty = del_entry(grpc_context, self.database, container_name, entry_uuid)
event_type = EventTypeEnum.EVENTTYPE_REMOVE
dict_entry_id = grpc_message_to_json(request)
notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id})
return empty
# ----- Context ----------------------------------------------------------------------------------------------------
def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList:
LOGGER.info('[ListContextIds] request={:s}'.format(grpc_message_to_json_string(request)))
reply = ContextIdList(context_ids=[context.context_id for context in get_entries(self.database, 'context')])
LOGGER.info('[ListContextIds] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList:
LOGGER.info('[ListContexts] request={:s}'.format(grpc_message_to_json_string(request)))
reply = ContextList(contexts=get_entries(self.database, 'context'))
LOGGER.info('[ListContexts] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context:
LOGGER.info('[GetContext] request={:s}'.format(grpc_message_to_json_string(request)))
reply = get_entry(context, self.database, 'context', request.context_uuid.uuid)
LOGGER.info('[GetContext] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId:
LOGGER.info('[SetContext] request={:s}'.format(grpc_message_to_json_string(request)))
reply = self._set(request, 'context', request.context_id.context_uuid.uuid, 'context_id', TOPIC_CONTEXT)
LOGGER.info('[SetContext] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveContext] request={:s}'.format(grpc_message_to_json_string(request)))
reply = self._del(request, 'context', request.context_uuid.uuid, 'context_id', TOPIC_CONTEXT, context)
LOGGER.info('[RemoveContext] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
LOGGER.info('[GetContextEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_CONTEXT}): yield ContextEvent(**json.loads(message.content))
# ----- Topology ---------------------------------------------------------------------------------------------------
def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList:
LOGGER.info('[ListTopologyIds] request={:s}'.format(grpc_message_to_json_string(request)))
topologies = get_entries(self.database, 'topology[{:s}]'.format(str(request.context_uuid.uuid)))
reply = TopologyIdList(topology_ids=[topology.topology_id for topology in topologies])
LOGGER.info('[ListTopologyIds] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList:
LOGGER.info('[ListTopologies] request={:s}'.format(grpc_message_to_json_string(request)))
topologies = get_entries(self.database, 'topology[{:s}]'.format(str(request.context_uuid.uuid)))
reply = TopologyList(topologies=[topology for topology in topologies])
LOGGER.info('[ListTopologies] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Topology:
LOGGER.info('[GetTopology] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'topology[{:s}]'.format(str(request.context_id.context_uuid.uuid))
reply = get_entry(context, self.database, container_name, request.topology_uuid.uuid)
LOGGER.info('[GetTopology] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId:
LOGGER.info('[SetTopology] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'topology[{:s}]'.format(str(request.topology_id.context_id.context_uuid.uuid))
topology_uuid = request.topology_id.topology_uuid.uuid
reply = self._set(request, container_name, topology_uuid, 'topology_id', TOPIC_TOPOLOGY)
LOGGER.info('[SetTopology] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveTopology] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'topology[{:s}]'.format(str(request.context_id.context_uuid.uuid))
topology_uuid = request.topology_uuid.uuid
reply = self._del(request, container_name, topology_uuid, 'topology_id', TOPIC_TOPOLOGY, context)
LOGGER.info('[RemoveTopology] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
LOGGER.info('[GetTopologyEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_TOPOLOGY}): yield TopologyEvent(**json.loads(message.content))
# ----- Device -----------------------------------------------------------------------------------------------------
def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList:
LOGGER.info('[ListDeviceIds] request={:s}'.format(grpc_message_to_json_string(request)))
reply = DeviceIdList(device_ids=[device.device_id for device in get_entries(self.database, 'device')])
LOGGER.info('[ListDeviceIds] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList:
LOGGER.info('[ListDevices] request={:s}'.format(grpc_message_to_json_string(request)))
reply = DeviceList(devices=get_entries(self.database, 'device'))
LOGGER.info('[ListDevices] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device:
LOGGER.info('[GetDevice] request={:s}'.format(grpc_message_to_json_string(request)))
reply = get_entry(context, self.database, 'device', request.device_uuid.uuid)
LOGGER.info('[GetDevice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def SetDevice(self, request: Context, context : grpc.ServicerContext) -> DeviceId:
LOGGER.info('[SetDevice] request={:s}'.format(grpc_message_to_json_string(request)))
reply = self._set(request, 'device', request.device_id.device_uuid.uuid, 'device_id', TOPIC_DEVICE)
LOGGER.info('[SetDevice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveDevice] request={:s}'.format(grpc_message_to_json_string(request)))
reply = self._del(request, 'device', request.device_uuid.uuid, 'device_id', TOPIC_DEVICE, context)
LOGGER.info('[RemoveDevice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]:
LOGGER.info('[GetDeviceEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_DEVICE}): yield DeviceEvent(**json.loads(message.content))
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def SelectDevice(self, request : DeviceFilter, context : grpc.ServicerContext) -> DeviceList:
LOGGER.info('[SelectDevice] request={:s}'.format(grpc_message_to_json_string(request)))
container_entry_uuids : Dict[str, Set[str]] = {}
container_name = 'device'
for device_id in request.device_ids:
device_uuid = device_id.device_uuid.uuid
container_entry_uuids.setdefault(container_name, set()).add(device_uuid)
exclude_endpoints = not request.include_endpoints
exclude_config_rules = not request.include_config_rules
exclude_components = not request.include_components
devices = list()
for container_name in sorted(container_entry_uuids.keys()):
entry_uuids = container_entry_uuids[container_name]
for device in select_entries(self.database, container_name, entry_uuids):
reply_device = Device()
reply_device.CopyFrom(device)
if exclude_endpoints: del reply_device.device_endpoints [:] # pylint: disable=no-member
if exclude_config_rules: del reply_device.device_config.config_rules[:] # pylint: disable=no-member
if exclude_components: del reply_device.component[:] # pylint: disable=no-member
devices.append(reply_device)
reply = DeviceList(devices=devices)
LOGGER.info('[SelectDevice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
# ----- Link -------------------------------------------------------------------------------------------------------
def ListLinkIds(self, request: Empty, context : grpc.ServicerContext) -> LinkIdList:
LOGGER.info('[ListLinkIds] request={:s}'.format(grpc_message_to_json_string(request)))
reply = LinkIdList(link_ids=[link.link_id for link in get_entries(self.database, 'link')])
LOGGER.info('[ListLinkIds] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def ListLinks(self, request: Empty, context : grpc.ServicerContext) -> LinkList:
LOGGER.info('[ListLinks] request={:s}'.format(grpc_message_to_json_string(request)))
reply = LinkList(links=get_entries(self.database, 'link'))
LOGGER.info('[ListLinks] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetLink(self, request: LinkId, context : grpc.ServicerContext) -> Link:
LOGGER.info('[GetLink] request={:s}'.format(grpc_message_to_json_string(request)))
reply = get_entry(context, self.database, 'link', request.link_uuid.uuid)
LOGGER.info('[GetLink] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def SetLink(self, request: Context, context : grpc.ServicerContext) -> LinkId:
LOGGER.info('[SetLink] request={:s}'.format(grpc_message_to_json_string(request)))
reply = self._set(request, 'link', request.link_id.link_uuid.uuid, 'link_id', TOPIC_LINK)
LOGGER.info('[SetLink] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveLink] request={:s}'.format(grpc_message_to_json_string(request)))
reply = self._del(request, 'link', request.link_uuid.uuid, 'link_id', TOPIC_LINK, context)
LOGGER.info('[RemoveLink] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]:
LOGGER.info('[GetLinkEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_LINK}): yield LinkEvent(**json.loads(message.content))
# ----- Slice ------------------------------------------------------------------------------------------------------
def ListSliceIds(self, request: ContextId, context : grpc.ServicerContext) -> SliceIdList:
LOGGER.info('[ListSliceIds] request={:s}'.format(grpc_message_to_json_string(request)))
slices = get_entries(self.database, 'slice[{:s}]'.format(str(request.context_uuid.uuid)))
reply = SliceIdList(slice_ids=[slice.slice_id for slice in slices])
LOGGER.info('[ListSliceIds] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def ListSlices(self, request: ContextId, context : grpc.ServicerContext) -> SliceList:
LOGGER.info('[ListSlices] request={:s}'.format(grpc_message_to_json_string(request)))
slices = get_entries(self.database, 'slice[{:s}]'.format(str(request.context_uuid.uuid)))
reply = SliceList(slices=[slice for slice in slices])
LOGGER.info('[ListSlices] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetSlice(self, request: SliceId, context : grpc.ServicerContext) -> Slice:
LOGGER.info('[GetSlice] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'slice[{:s}]'.format(str(request.context_id.context_uuid.uuid))
reply = get_entry(context, self.database, container_name, request.slice_uuid.uuid)
LOGGER.info('[GetSlice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def SetSlice(self, request: Slice, context : grpc.ServicerContext) -> SliceId:
LOGGER.info('[SetSlice] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'slice[{:s}]'.format(str(request.slice_id.context_id.context_uuid.uuid))
slice_uuid = request.slice_id.slice_uuid.uuid
reply = self._set(request, container_name, slice_uuid, 'slice_id', TOPIC_SLICE)
LOGGER.info('[SetSlice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def RemoveSlice(self, request: SliceId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveSlice] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'slice[{:s}]'.format(str(request.context_id.context_uuid.uuid))
slice_uuid = request.slice_uuid.uuid
reply = self._del(request, container_name, slice_uuid, 'slice_id', TOPIC_SLICE, context)
LOGGER.info('[RemoveSlice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetSliceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]:
LOGGER.info('[GetSliceEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_SLICE}): yield SliceEvent(**json.loads(message.content))
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def SelectSlice(self, request : SliceFilter, context : grpc.ServicerContext) -> SliceList:
LOGGER.info('[SelectSlice] request={:s}'.format(grpc_message_to_json_string(request)))
container_entry_uuids : Dict[str, Set[str]] = {}
for slice_id in request.slice_ids:
container_name = 'slice[{:s}]'.format(str(slice_id.context_id.context_uuid.uuid))
slice_uuid = slice_id.slice_uuid.uuid
container_entry_uuids.setdefault(container_name, set()).add(slice_uuid)
exclude_endpoint_ids = not request.include_endpoint_ids
exclude_constraints = not request.include_constraints
exclude_service_ids = not request.include_service_ids
exclude_subslice_ids = not request.include_subslice_ids
exclude_config_rules = not request.include_config_rules
slices = list()
for container_name in sorted(container_entry_uuids.keys()):
entry_uuids = container_entry_uuids[container_name]
for eslice in select_entries(self.database, container_name, entry_uuids):
reply_slice = Slice()
reply_slice.CopyFrom(eslice)
if exclude_endpoint_ids: del reply_slice.service_endpoint_ids[:] # pylint: disable=no-member
if exclude_constraints : del reply_slice.service_constraints[:] # pylint: disable=no-member
if exclude_service_ids : del reply_slice.slice_service_ids[:] # pylint: disable=no-member
if exclude_subslice_ids : del reply_slice.slice_subslice_ids[:] # pylint: disable=no-member
if exclude_config_rules: del reply_slice.slice_config .config_rules[:] # pylint: disable=no-member
slices.append(reply_slice)
reply = SliceList(slices=slices)
LOGGER.info('[SelectSlice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
# ----- Service ----------------------------------------------------------------------------------------------------
def ListServiceIds(self, request: ContextId, context : grpc.ServicerContext) -> ServiceIdList:
LOGGER.info('[ListServiceIds] request={:s}'.format(grpc_message_to_json_string(request)))
services = get_entries(self.database, 'service[{:s}]'.format(str(request.context_uuid.uuid)))
reply = ServiceIdList(service_ids=[service.service_id for service in services])
LOGGER.info('[ListServiceIds] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def ListServices(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList:
LOGGER.info('[ListServices] request={:s}'.format(grpc_message_to_json_string(request)))
services = get_entries(self.database, 'service[{:s}]'.format(str(request.context_uuid.uuid)))
reply = ServiceList(services=[service for service in services])
LOGGER.info('[ListServices] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetService(self, request: ServiceId, context : grpc.ServicerContext) -> Service:
LOGGER.info('[GetService] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'service[{:s}]'.format(str(request.context_id.context_uuid.uuid))
reply = get_entry(context, self.database, container_name, request.service_uuid.uuid)
LOGGER.info('[GetService] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId:
LOGGER.info('[SetService] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'service[{:s}]'.format(str(request.service_id.context_id.context_uuid.uuid))
service_uuid = request.service_id.service_uuid.uuid
reply = self._set(request, container_name, service_uuid, 'service_id', TOPIC_SERVICE)
LOGGER.info('[SetService] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveService] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'service[{:s}]'.format(str(request.context_id.context_uuid.uuid))
service_uuid = request.service_uuid.uuid
reply = self._del(request, container_name, service_uuid, 'service_id', TOPIC_SERVICE, context)
LOGGER.info('[RemoveService] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]:
LOGGER.info('[GetServiceEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_SERVICE}): yield ServiceEvent(**json.loads(message.content))
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
def SelectService(self, request : ServiceFilter, context : grpc.ServicerContext) -> ServiceList:
LOGGER.info('[SelectService] request={:s}'.format(grpc_message_to_json_string(request)))
LOGGER.warning('type: {}'.format(type(request)))
container_entry_uuids : Dict[str, Set[str]] = {}
for service_id in request.service_ids.service_ids:
container_name = 'service[{:s}]'.format(str(service_id.context_id.context_uuid.uuid))
service_uuid = service_id.service_uuid.uuid
container_entry_uuids.setdefault(container_name, set()).add(service_uuid)
exclude_endpoint_ids = not request.include_endpoint_ids
exclude_constraints = not request.include_constraints
exclude_config_rules = not request.include_config_rules
services = list()
for container_name in sorted(container_entry_uuids.keys()):
entry_uuids = container_entry_uuids[container_name]
for service in select_entries(self.database, container_name, entry_uuids):
reply_service = Service()
reply_service.CopyFrom(service)
if exclude_endpoint_ids: del reply_service.service_endpoint_ids[:] # pylint: disable=no-member
if exclude_constraints : del reply_service.service_constraints[:] # pylint: disable=no-member
if exclude_config_rules: del reply_service.service_config.config_rules[:] # pylint: disable=no-member
services.append(reply_service)
reply = ServiceList(services=services)
LOGGER.info('[SelectService] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
# ----- Connection -------------------------------------------------------------------------------------------------
def ListConnectionIds(self, request: ServiceId, context : grpc.ServicerContext) -> ConnectionIdList:
LOGGER.info('[ListConnectionIds] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'service_connections[{:s}/{:s}]'.format(
str(request.context_id.context_uuid.uuid), str(request.service_uuid.uuid))
reply = ConnectionIdList(connection_ids=[c.connection_id for c in get_entries(self.database, container_name)])
LOGGER.info('[ListConnectionIds] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def ListConnections(self, request: ServiceId, context : grpc.ServicerContext) -> ConnectionList:
LOGGER.info('[ListConnections] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'service_connections[{:s}/{:s}]'.format(
str(request.context_id.context_uuid.uuid), str(request.service_uuid.uuid))
reply = ConnectionList(connections=get_entries(self.database, container_name))
LOGGER.info('[ListConnections] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Connection:
LOGGER.info('[GetConnection] request={:s}'.format(grpc_message_to_json_string(request)))
reply = get_entry(context, self.database, 'connection', request.connection_uuid.uuid)
LOGGER.info('[GetConnection] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def SetConnection(self, request: Connection, context : grpc.ServicerContext) -> ConnectionId:
LOGGER.info('[SetConnection] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'service_connection[{:s}/{:s}]'.format(
str(request.service_id.context_id.context_uuid.uuid), str(request.service_id.service_uuid.uuid))
connection_uuid = request.connection_id.connection_uuid.uuid
set_entry(self.database, container_name, connection_uuid, request)
reply = self._set(request, 'connection', connection_uuid, 'connection_id', TOPIC_CONNECTION)
LOGGER.info('[SetConnection] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def RemoveConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveConnection] request={:s}'.format(grpc_message_to_json_string(request)))
connection = get_entry(context, self.database, 'connection', request.connection_uuid.uuid)
container_name = 'service_connection[{:s}/{:s}]'.format(
str(connection.service_id.context_id.context_uuid.uuid), str(connection.service_id.service_uuid.uuid))
connection_uuid = request.connection_uuid.uuid
del_entry(context, self.database, container_name, connection_uuid)
reply = self._del(request, 'connection', connection_uuid, 'connection_id', TOPIC_CONNECTION, context)
LOGGER.info('[RemoveConnection] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetConnectionEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]:
LOGGER.info('[GetConnectionEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_CONNECTION}): yield ConnectionEvent(**json.loads(message.content))