Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, json, logging
from typing import Any, Dict, Iterator, List, Set
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.proto.context_pb2 import (
Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
Context, ContextEvent, ContextId, ContextIdList, ContextList,
Device, DeviceEvent, DeviceFilter, DeviceId, DeviceIdList, DeviceList,
Empty, EventTypeEnum,
Link, LinkEvent, LinkId, LinkIdList, LinkList,
Service, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList,
Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList,
Topology, TopologyDetails, TopologyEvent, TopologyId, TopologyIdList, TopologyList)
from common.proto.context_pb2_grpc import ContextServiceServicer
from common.tests.MockMessageBroker import (
TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY,
MockMessageBroker, notify_event)
Lluis Gifre Renom
committed
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
LOGGER = logging.getLogger(__name__)
def get_container(database : Dict[str, Dict[str, Any]], container_name : str) -> Dict[str, Any]:
return database.setdefault(container_name, {})
def get_entries(database : Dict[str, Dict[str, Any]], container_name : str) -> List[Any]:
container = get_container(database, container_name)
return [container[entry_uuid] for entry_uuid in sorted(container.keys())]
def has_entry(database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str) -> Any:
LOGGER.debug('[has_entry] BEFORE database={:s}'.format(str(database)))
container = get_container(database, container_name)
return entry_uuid in container
def get_entry(
context : grpc.ServicerContext, database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str
) -> Any:
LOGGER.debug('[get_entry] BEFORE database={:s}'.format(str(database)))
container = get_container(database, container_name)
if entry_uuid not in container:
context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid)))
return container[entry_uuid]
def set_entry(database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str, entry : Any) -> Any:
container = get_container(database, container_name)
LOGGER.debug('[set_entry] BEFORE database={:s}'.format(str(database)))
container[entry_uuid] = entry
LOGGER.debug('[set_entry] AFTER database={:s}'.format(str(database)))
return entry
def del_entry(
context : grpc.ServicerContext, database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str
) -> Any:
container = get_container(database, container_name)
if entry_uuid not in container:
context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid)))
del container[entry_uuid]
return Empty()
def select_entries(database : Dict[str, Dict[str, Any]], container_name : str, entry_uuids : Set[str]) -> List[Any]:
if len(entry_uuids) == 0: return get_entries(database, container_name)
container = get_container(database, container_name)
return [
container[entry_uuid]
for entry_uuid in sorted(container.keys())
if entry_uuid in entry_uuids
]
class MockServicerImpl_Context(ContextServiceServicer):
def __init__(self):
LOGGER.info('[__init__] Creating Servicer...')
self.database : Dict[str, Dict[str, Any]] = {}
self.msg_broker = MockMessageBroker()
LOGGER.info('[__init__] Servicer Created')
# ----- Common -----------------------------------------------------------------------------------------------------
def _set(self, request, container_name, entry_uuid, entry_id_field_name, topic_name):
exists = has_entry(self.database, container_name, entry_uuid)
entry = set_entry(self.database, container_name, entry_uuid, request)
event_type = EventTypeEnum.EVENTTYPE_UPDATE if exists else EventTypeEnum.EVENTTYPE_CREATE
entry_id = getattr(entry, entry_id_field_name)
dict_entry_id = grpc_message_to_json(entry_id)
notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id})
return entry_id
def _del(self, request, container_name, entry_uuid, entry_id_field_name, topic_name, grpc_context):
empty = del_entry(grpc_context, self.database, container_name, entry_uuid)
event_type = EventTypeEnum.EVENTTYPE_REMOVE
dict_entry_id = grpc_message_to_json(request)
notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id})
return empty
# ----- Context ----------------------------------------------------------------------------------------------------
def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList:
LOGGER.info('[ListContextIds] request={:s}'.format(grpc_message_to_json_string(request)))
reply = ContextIdList(context_ids=[context.context_id for context in get_entries(self.database, 'context')])
LOGGER.info('[ListContextIds] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList:
LOGGER.info('[ListContexts] request={:s}'.format(grpc_message_to_json_string(request)))
reply = ContextList(contexts=get_entries(self.database, 'context'))
LOGGER.info('[ListContexts] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context:
LOGGER.info('[GetContext] request={:s}'.format(grpc_message_to_json_string(request)))
reply = get_entry(context, self.database, 'context', request.context_uuid.uuid)
LOGGER.info('[GetContext] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId:
LOGGER.info('[SetContext] request={:s}'.format(grpc_message_to_json_string(request)))
reply = self._set(request, 'context', request.context_id.context_uuid.uuid, 'context_id', TOPIC_CONTEXT)
LOGGER.info('[SetContext] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveContext] request={:s}'.format(grpc_message_to_json_string(request)))
reply = self._del(request, 'context', request.context_uuid.uuid, 'context_id', TOPIC_CONTEXT, context)
LOGGER.info('[RemoveContext] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
LOGGER.info('[GetContextEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_CONTEXT}): yield ContextEvent(**json.loads(message.content))
# ----- Topology ---------------------------------------------------------------------------------------------------
def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList:
LOGGER.info('[ListTopologyIds] request={:s}'.format(grpc_message_to_json_string(request)))
topologies = get_entries(self.database, 'topology[{:s}]'.format(str(request.context_uuid.uuid)))
reply = TopologyIdList(topology_ids=[topology.topology_id for topology in topologies])
LOGGER.info('[ListTopologyIds] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList:
LOGGER.info('[ListTopologies] request={:s}'.format(grpc_message_to_json_string(request)))
topologies = get_entries(self.database, 'topology[{:s}]'.format(str(request.context_uuid.uuid)))
reply = TopologyList(topologies=[topology for topology in topologies])
LOGGER.info('[ListTopologies] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Topology:
LOGGER.info('[GetTopology] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'topology[{:s}]'.format(str(request.context_id.context_uuid.uuid))
reply = get_entry(context, self.database, container_name, request.topology_uuid.uuid)
LOGGER.info('[GetTopology] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetTopologyDetails(self, request : TopologyId, context : grpc.ServicerContext) -> TopologyDetails:
LOGGER.info('[GetTopologyDetails] request={:s}'.format(grpc_message_to_json_string(request)))
context_uuid = request.context_id.context_uuid.uuid
container_name = 'topology[{:s}]'.format(str(context_uuid))
topology_uuid = request.topology_uuid.uuid
_reply = get_entry(context, self.database, container_name, topology_uuid)
reply = TopologyDetails()
reply.topology_id.CopyFrom(_reply.topology_id)
reply.name = _reply.name
if context_uuid == DEFAULT_CONTEXT_NAME and topology_uuid == DEFAULT_TOPOLOGY_NAME:
for device in get_entries(self.database, 'device'): reply.devices.append(device)
for link in get_entries(self.database, 'link'): reply.links.append(link)
else:
# TODO: to be improved; Mock does not associate devices/links to topologies automatically
for device_id in _reply.device_ids:
device = get_entry(context, self.database, 'device', device_id.device_uuid.uuid)
reply.devices.append(device)
for link_id in _reply.link_ids:
link = get_entry(context, self.database, 'link', link_id.link_uuid.uuid)
reply.links.append(link)
LOGGER.info('[GetTopologyDetails] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId:
LOGGER.info('[SetTopology] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'topology[{:s}]'.format(str(request.topology_id.context_id.context_uuid.uuid))
topology_uuid = request.topology_id.topology_uuid.uuid
reply = self._set(request, container_name, topology_uuid, 'topology_id', TOPIC_TOPOLOGY)
LOGGER.info('[SetTopology] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveTopology] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'topology[{:s}]'.format(str(request.context_id.context_uuid.uuid))
topology_uuid = request.topology_uuid.uuid
reply = self._del(request, container_name, topology_uuid, 'topology_id', TOPIC_TOPOLOGY, context)
LOGGER.info('[RemoveTopology] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
LOGGER.info('[GetTopologyEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_TOPOLOGY}): yield TopologyEvent(**json.loads(message.content))
# ----- Device -----------------------------------------------------------------------------------------------------
def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList:
LOGGER.info('[ListDeviceIds] request={:s}'.format(grpc_message_to_json_string(request)))
reply = DeviceIdList(device_ids=[device.device_id for device in get_entries(self.database, 'device')])
LOGGER.info('[ListDeviceIds] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList:
LOGGER.info('[ListDevices] request={:s}'.format(grpc_message_to_json_string(request)))
reply = DeviceList(devices=get_entries(self.database, 'device'))
LOGGER.info('[ListDevices] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device:
LOGGER.info('[GetDevice] request={:s}'.format(grpc_message_to_json_string(request)))
reply = get_entry(context, self.database, 'device', request.device_uuid.uuid)
LOGGER.info('[GetDevice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def SetDevice(self, request: Context, context : grpc.ServicerContext) -> DeviceId:
LOGGER.info('[SetDevice] request={:s}'.format(grpc_message_to_json_string(request)))
reply = self._set(request, 'device', request.device_id.device_uuid.uuid, 'device_id', TOPIC_DEVICE)
LOGGER.info('[SetDevice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveDevice] request={:s}'.format(grpc_message_to_json_string(request)))
reply = self._del(request, 'device', request.device_uuid.uuid, 'device_id', TOPIC_DEVICE, context)
LOGGER.info('[RemoveDevice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]:
LOGGER.info('[GetDeviceEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_DEVICE}): yield DeviceEvent(**json.loads(message.content))
def SelectDevice(self, request : DeviceFilter, context : grpc.ServicerContext) -> DeviceList:
LOGGER.info('[SelectDevice] request={:s}'.format(grpc_message_to_json_string(request)))
container_entry_uuids : Dict[str, Set[str]] = {}
container_name = 'device'
device_uuid = device_id.device_uuid.uuid
container_entry_uuids.setdefault(container_name, set()).add(device_uuid)
exclude_endpoints = not request.include_endpoints
exclude_config_rules = not request.include_config_rules
exclude_components = not request.include_components
devices = list()
for container_name in sorted(container_entry_uuids.keys()):
entry_uuids = container_entry_uuids[container_name]
for device in select_entries(self.database, container_name, entry_uuids):
reply_device = Device()
reply_device.CopyFrom(device)
if exclude_endpoints: del reply_device.device_endpoints [:] # pylint: disable=no-member
if exclude_config_rules: del reply_device.device_config.config_rules[:] # pylint: disable=no-member
if exclude_components: del reply_device.component[:] # pylint: disable=no-member
devices.append(reply_device)
reply = DeviceList(devices=devices)
LOGGER.info('[SelectDevice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
# ----- Link -------------------------------------------------------------------------------------------------------
def ListLinkIds(self, request: Empty, context : grpc.ServicerContext) -> LinkIdList:
LOGGER.info('[ListLinkIds] request={:s}'.format(grpc_message_to_json_string(request)))
reply = LinkIdList(link_ids=[link.link_id for link in get_entries(self.database, 'link')])
LOGGER.info('[ListLinkIds] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def ListLinks(self, request: Empty, context : grpc.ServicerContext) -> LinkList:
LOGGER.info('[ListLinks] request={:s}'.format(grpc_message_to_json_string(request)))
reply = LinkList(links=get_entries(self.database, 'link'))
LOGGER.info('[ListLinks] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetLink(self, request: LinkId, context : grpc.ServicerContext) -> Link:
LOGGER.info('[GetLink] request={:s}'.format(grpc_message_to_json_string(request)))
reply = get_entry(context, self.database, 'link', request.link_uuid.uuid)
LOGGER.info('[GetLink] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def SetLink(self, request: Context, context : grpc.ServicerContext) -> LinkId:
LOGGER.info('[SetLink] request={:s}'.format(grpc_message_to_json_string(request)))
reply = self._set(request, 'link', request.link_id.link_uuid.uuid, 'link_id', TOPIC_LINK)
LOGGER.info('[SetLink] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveLink] request={:s}'.format(grpc_message_to_json_string(request)))
reply = self._del(request, 'link', request.link_uuid.uuid, 'link_id', TOPIC_LINK, context)
LOGGER.info('[RemoveLink] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]:
LOGGER.info('[GetLinkEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_LINK}): yield LinkEvent(**json.loads(message.content))
# ----- Slice ------------------------------------------------------------------------------------------------------
def ListSliceIds(self, request: ContextId, context : grpc.ServicerContext) -> SliceIdList:
LOGGER.info('[ListSliceIds] request={:s}'.format(grpc_message_to_json_string(request)))
slices = get_entries(self.database, 'slice[{:s}]'.format(str(request.context_uuid.uuid)))
reply = SliceIdList(slice_ids=[slice.slice_id for slice in slices])
LOGGER.info('[ListSliceIds] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def ListSlices(self, request: ContextId, context : grpc.ServicerContext) -> SliceList:
LOGGER.info('[ListSlices] request={:s}'.format(grpc_message_to_json_string(request)))
slices = get_entries(self.database, 'slice[{:s}]'.format(str(request.context_uuid.uuid)))
reply = SliceList(slices=[slice for slice in slices])
LOGGER.info('[ListSlices] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetSlice(self, request: SliceId, context : grpc.ServicerContext) -> Slice:
LOGGER.info('[GetSlice] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'slice[{:s}]'.format(str(request.context_id.context_uuid.uuid))
reply = get_entry(context, self.database, container_name, request.slice_uuid.uuid)
LOGGER.info('[GetSlice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def SetSlice(self, request: Slice, context : grpc.ServicerContext) -> SliceId:
LOGGER.info('[SetSlice] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'slice[{:s}]'.format(str(request.slice_id.context_id.context_uuid.uuid))
slice_uuid = request.slice_id.slice_uuid.uuid
reply = self._set(request, container_name, slice_uuid, 'slice_id', TOPIC_SLICE)
LOGGER.info('[SetSlice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def RemoveSlice(self, request: SliceId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveSlice] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'slice[{:s}]'.format(str(request.context_id.context_uuid.uuid))
slice_uuid = request.slice_uuid.uuid
reply = self._del(request, container_name, slice_uuid, 'slice_id', TOPIC_SLICE, context)
LOGGER.info('[RemoveSlice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetSliceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]:
LOGGER.info('[GetSliceEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_SLICE}): yield SliceEvent(**json.loads(message.content))
def SelectSlice(self, request : SliceFilter, context : grpc.ServicerContext) -> SliceList:
LOGGER.info('[SelectSlice] request={:s}'.format(grpc_message_to_json_string(request)))
container_entry_uuids : Dict[str, Set[str]] = {}
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
container_name = 'slice[{:s}]'.format(str(slice_id.context_id.context_uuid.uuid))
slice_uuid = slice_id.slice_uuid.uuid
container_entry_uuids.setdefault(container_name, set()).add(slice_uuid)
exclude_endpoint_ids = not request.include_endpoint_ids
exclude_constraints = not request.include_constraints
exclude_service_ids = not request.include_service_ids
exclude_subslice_ids = not request.include_subslice_ids
exclude_config_rules = not request.include_config_rules
slices = list()
for container_name in sorted(container_entry_uuids.keys()):
entry_uuids = container_entry_uuids[container_name]
for eslice in select_entries(self.database, container_name, entry_uuids):
reply_slice = Slice()
reply_slice.CopyFrom(eslice)
if exclude_endpoint_ids: del reply_slice.service_endpoint_ids[:] # pylint: disable=no-member
if exclude_constraints : del reply_slice.service_constraints[:] # pylint: disable=no-member
if exclude_service_ids : del reply_slice.slice_service_ids[:] # pylint: disable=no-member
if exclude_subslice_ids : del reply_slice.slice_subslice_ids[:] # pylint: disable=no-member
if exclude_config_rules: del reply_slice.slice_config .config_rules[:] # pylint: disable=no-member
slices.append(reply_slice)
reply = SliceList(slices=slices)
LOGGER.info('[SelectSlice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
# ----- Service ----------------------------------------------------------------------------------------------------
def ListServiceIds(self, request: ContextId, context : grpc.ServicerContext) -> ServiceIdList:
LOGGER.info('[ListServiceIds] request={:s}'.format(grpc_message_to_json_string(request)))
services = get_entries(self.database, 'service[{:s}]'.format(str(request.context_uuid.uuid)))
reply = ServiceIdList(service_ids=[service.service_id for service in services])
LOGGER.info('[ListServiceIds] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def ListServices(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList:
LOGGER.info('[ListServices] request={:s}'.format(grpc_message_to_json_string(request)))
services = get_entries(self.database, 'service[{:s}]'.format(str(request.context_uuid.uuid)))
reply = ServiceList(services=[service for service in services])
LOGGER.info('[ListServices] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetService(self, request: ServiceId, context : grpc.ServicerContext) -> Service:
LOGGER.info('[GetService] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'service[{:s}]'.format(str(request.context_id.context_uuid.uuid))
reply = get_entry(context, self.database, container_name, request.service_uuid.uuid)
LOGGER.info('[GetService] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId:
LOGGER.info('[SetService] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'service[{:s}]'.format(str(request.service_id.context_id.context_uuid.uuid))
service_uuid = request.service_id.service_uuid.uuid
reply = self._set(request, container_name, service_uuid, 'service_id', TOPIC_SERVICE)
LOGGER.info('[SetService] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveService] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'service[{:s}]'.format(str(request.context_id.context_uuid.uuid))
service_uuid = request.service_uuid.uuid
reply = self._del(request, container_name, service_uuid, 'service_id', TOPIC_SERVICE, context)
LOGGER.info('[RemoveService] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]:
LOGGER.info('[GetServiceEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_SERVICE}): yield ServiceEvent(**json.loads(message.content))
def SelectService(self, request : ServiceFilter, context : grpc.ServicerContext) -> ServiceList:
LOGGER.info('[SelectService] request={:s}'.format(grpc_message_to_json_string(request)))
container_entry_uuids : Dict[str, Set[str]] = {}
for service_id in request.service_ids.service_ids:
container_name = 'service[{:s}]'.format(str(service_id.context_id.context_uuid.uuid))
service_uuid = service_id.service_uuid.uuid
container_entry_uuids.setdefault(container_name, set()).add(service_uuid)
exclude_endpoint_ids = not request.include_endpoint_ids
exclude_constraints = not request.include_constraints
exclude_config_rules = not request.include_config_rules
services = list()
for container_name in sorted(container_entry_uuids.keys()):
entry_uuids = container_entry_uuids[container_name]
for service in select_entries(self.database, container_name, entry_uuids):
reply_service = Service()
reply_service.CopyFrom(service)
if exclude_endpoint_ids: del reply_service.service_endpoint_ids[:] # pylint: disable=no-member
if exclude_constraints : del reply_service.service_constraints[:] # pylint: disable=no-member
if exclude_config_rules: del reply_service.service_config.config_rules[:] # pylint: disable=no-member
services.append(reply_service)
reply = ServiceList(services=services)
LOGGER.info('[SelectService] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
# ----- Connection -------------------------------------------------------------------------------------------------
def ListConnectionIds(self, request: ServiceId, context : grpc.ServicerContext) -> ConnectionIdList:
LOGGER.info('[ListConnectionIds] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'service_connections[{:s}/{:s}]'.format(
str(request.context_id.context_uuid.uuid), str(request.service_uuid.uuid))
reply = ConnectionIdList(connection_ids=[c.connection_id for c in get_entries(self.database, container_name)])
LOGGER.info('[ListConnectionIds] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def ListConnections(self, request: ServiceId, context : grpc.ServicerContext) -> ConnectionList:
LOGGER.info('[ListConnections] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'service_connections[{:s}/{:s}]'.format(
str(request.context_id.context_uuid.uuid), str(request.service_uuid.uuid))
reply = ConnectionList(connections=get_entries(self.database, container_name))
LOGGER.info('[ListConnections] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Connection:
LOGGER.info('[GetConnection] request={:s}'.format(grpc_message_to_json_string(request)))
reply = get_entry(context, self.database, 'connection', request.connection_uuid.uuid)
LOGGER.info('[GetConnection] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def SetConnection(self, request: Connection, context : grpc.ServicerContext) -> ConnectionId:
LOGGER.info('[SetConnection] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'service_connection[{:s}/{:s}]'.format(
str(request.service_id.context_id.context_uuid.uuid), str(request.service_id.service_uuid.uuid))
connection_uuid = request.connection_id.connection_uuid.uuid
set_entry(self.database, container_name, connection_uuid, request)
reply = self._set(request, 'connection', connection_uuid, 'connection_id', TOPIC_CONNECTION)
LOGGER.info('[SetConnection] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def RemoveConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveConnection] request={:s}'.format(grpc_message_to_json_string(request)))
connection = get_entry(context, self.database, 'connection', request.connection_uuid.uuid)
container_name = 'service_connection[{:s}/{:s}]'.format(
str(connection.service_id.context_id.context_uuid.uuid), str(connection.service_id.service_uuid.uuid))
connection_uuid = request.connection_uuid.uuid
del_entry(context, self.database, container_name, connection_uuid)
reply = self._del(request, 'connection', connection_uuid, 'connection_id', TOPIC_CONNECTION, context)
LOGGER.info('[RemoveConnection] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetConnectionEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]:
LOGGER.info('[GetConnectionEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_CONNECTION}): yield ConnectionEvent(**json.loads(message.content))