# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, json, logging from typing import Any, Dict, Iterator, List from common.proto.context_pb2 import ( Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, Context, ContextEvent, ContextId, ContextIdList, ContextList, Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, Empty, EventTypeEnum, Link, LinkEvent, LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceId, SliceIdList, SliceList, Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList) from common.proto.context_pb2_grpc import ContextServiceServicer from common.tests.MockMessageBroker import MockMessageBroker, notify_event from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string LOGGER = logging.getLogger(__name__) TOPIC_CONNECTION = 'connection' TOPIC_CONTEXT = 'context' TOPIC_TOPOLOGY = 'topology' TOPIC_DEVICE = 'device' TOPIC_LINK = 'link' TOPIC_SERVICE = 'service' TOPIC_SLICE = 'slice' def get_container(database : Dict[str, Dict[str, Any]], container_name : str) -> Dict[str, Any]: return database.setdefault(container_name, {}) def get_entries(database : Dict[str, Dict[str, Any]], container_name : str) -> List[Any]: container = get_container(database, container_name) return [container[entry_uuid] for entry_uuid in sorted(container.keys())] def has_entry(database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str) -> Any: LOGGER.debug('[has_entry] BEFORE database={:s}'.format(str(database))) container = get_container(database, container_name) return entry_uuid in container def get_entry( context : grpc.ServicerContext, database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str ) -> Any: LOGGER.debug('[get_entry] BEFORE database={:s}'.format(str(database))) container = get_container(database, container_name) if entry_uuid not in container: context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid))) return container[entry_uuid] def set_entry(database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str, entry : Any) -> Any: container = get_container(database, container_name) LOGGER.debug('[set_entry] BEFORE database={:s}'.format(str(database))) container[entry_uuid] = entry LOGGER.debug('[set_entry] AFTER database={:s}'.format(str(database))) return entry def del_entry( context : grpc.ServicerContext, database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str ) -> Any: container = get_container(database, container_name) if entry_uuid not in container: context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid))) del container[entry_uuid] return Empty() class MockServicerImpl_Context(ContextServiceServicer): def __init__(self): LOGGER.info('[__init__] Creating Servicer...') self.database : Dict[str, Any] = {} self.msg_broker = MockMessageBroker() LOGGER.info('[__init__] Servicer Created') # ----- Common ----------------------------------------------------------------------------------------------------- def _set(self, request, container_name, entry_uuid, entry_id_field_name, topic_name): exists = has_entry(self.database, container_name, entry_uuid) entry = set_entry(self.database, container_name, entry_uuid, request) event_type = EventTypeEnum.EVENTTYPE_UPDATE if exists else EventTypeEnum.EVENTTYPE_CREATE entry_id = getattr(entry, entry_id_field_name) dict_entry_id = grpc_message_to_json(entry_id) notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id}) return entry_id def _del(self, request, container_name, entry_uuid, entry_id_field_name, topic_name, grpc_context): empty = del_entry(grpc_context, self.database, container_name, entry_uuid) event_type = EventTypeEnum.EVENTTYPE_REMOVE dict_entry_id = grpc_message_to_json(request) notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id}) return empty # ----- Context ---------------------------------------------------------------------------------------------------- def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList: LOGGER.info('[ListContextIds] request={:s}'.format(grpc_message_to_json_string(request))) return ContextIdList(context_ids=[context.context_id for context in get_entries(self.database, 'context')]) def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList: LOGGER.info('[ListContexts] request={:s}'.format(grpc_message_to_json_string(request))) return ContextList(contexts=get_entries(self.database, 'context')) def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context: LOGGER.info('[GetContext] request={:s}'.format(grpc_message_to_json_string(request))) return get_entry(context, self.database, 'context', request.context_uuid.uuid) def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId: LOGGER.info('[SetContext] request={:s}'.format(grpc_message_to_json_string(request))) return self._set(request, 'context', request.context_id.context_uuid.uuid, 'context_id', TOPIC_CONTEXT) def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty: LOGGER.info('[RemoveContext] request={:s}'.format(grpc_message_to_json_string(request))) return self._del(request, 'context', request.context_uuid.uuid, 'context_id', TOPIC_CONTEXT, context) def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]: LOGGER.info('[GetContextEvents] request={:s}'.format(grpc_message_to_json_string(request))) for message in self.msg_broker.consume({TOPIC_CONTEXT}): yield ContextEvent(**json.loads(message.content)) # ----- Topology --------------------------------------------------------------------------------------------------- def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList: LOGGER.info('[ListTopologyIds] request={:s}'.format(grpc_message_to_json_string(request))) topologies = get_entries(self.database, 'topology[{:s}]'.format(str(request.context_uuid.uuid))) return TopologyIdList(topology_ids=[topology.topology_id for topology in topologies]) def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList: LOGGER.info('[ListTopologies] request={:s}'.format(grpc_message_to_json_string(request))) topologies = get_entries(self.database, 'topology[{:s}]'.format(str(request.context_uuid.uuid))) return TopologyList(topologies=[topology for topology in topologies]) def GetTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Topology: LOGGER.info('[GetTopology] request={:s}'.format(grpc_message_to_json_string(request))) container_name = 'topology[{:s}]'.format(str(request.context_id.context_uuid.uuid)) return get_entry(context, self.database, container_name, request.topology_uuid.uuid) def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId: LOGGER.info('[SetTopology] request={:s}'.format(grpc_message_to_json_string(request))) container_name = 'topology[{:s}]'.format(str(request.topology_id.context_id.context_uuid.uuid)) topology_uuid = request.topology_id.topology_uuid.uuid return self._set(request, container_name, topology_uuid, 'topology_id', TOPIC_TOPOLOGY) def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty: LOGGER.info('[RemoveTopology] request={:s}'.format(grpc_message_to_json_string(request))) container_name = 'topology[{:s}]'.format(str(request.context_id.context_uuid.uuid)) topology_uuid = request.topology_uuid.uuid return self._del(request, container_name, topology_uuid, 'topology_id', TOPIC_TOPOLOGY, context) def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]: LOGGER.info('[GetTopologyEvents] request={:s}'.format(grpc_message_to_json_string(request))) for message in self.msg_broker.consume({TOPIC_TOPOLOGY}): yield TopologyEvent(**json.loads(message.content)) # ----- Device ----------------------------------------------------------------------------------------------------- def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList: LOGGER.info('[ListDeviceIds] request={:s}'.format(grpc_message_to_json_string(request))) return DeviceIdList(device_ids=[device.device_id for device in get_entries(self.database, 'device')]) def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList: LOGGER.info('[ListDevices] request={:s}'.format(grpc_message_to_json_string(request))) return DeviceList(devices=get_entries(self.database, 'device')) def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device: LOGGER.info('[GetDevice] request={:s}'.format(grpc_message_to_json_string(request))) return get_entry(context, self.database, 'device', request.device_uuid.uuid) def SetDevice(self, request: Context, context : grpc.ServicerContext) -> DeviceId: LOGGER.info('[SetDevice] request={:s}'.format(grpc_message_to_json_string(request))) return self._set(request, 'device', request.device_id.device_uuid.uuid, 'device_id', TOPIC_DEVICE) def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty: LOGGER.info('[RemoveDevice] request={:s}'.format(grpc_message_to_json_string(request))) return self._del(request, 'device', request.device_uuid.uuid, 'device_id', TOPIC_DEVICE, context) def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]: LOGGER.info('[GetDeviceEvents] request={:s}'.format(grpc_message_to_json_string(request))) for message in self.msg_broker.consume({TOPIC_DEVICE}): yield DeviceEvent(**json.loads(message.content)) # ----- Link ------------------------------------------------------------------------------------------------------- def ListLinkIds(self, request: Empty, context : grpc.ServicerContext) -> LinkIdList: LOGGER.info('[ListLinkIds] request={:s}'.format(grpc_message_to_json_string(request))) return LinkIdList(link_ids=[link.link_id for link in get_entries(self.database, 'link')]) def ListLinks(self, request: Empty, context : grpc.ServicerContext) -> LinkList: LOGGER.info('[ListLinks] request={:s}'.format(grpc_message_to_json_string(request))) return LinkList(links=get_entries(self.database, 'link')) def GetLink(self, request: LinkId, context : grpc.ServicerContext) -> Link: LOGGER.info('[GetLink] request={:s}'.format(grpc_message_to_json_string(request))) return get_entry(context, self.database, 'link', request.link_uuid.uuid) def SetLink(self, request: Context, context : grpc.ServicerContext) -> LinkId: LOGGER.info('[SetLink] request={:s}'.format(grpc_message_to_json_string(request))) return self._set(request, 'link', request.link_id.link_uuid.uuid, 'link_id', TOPIC_LINK) def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty: LOGGER.info('[RemoveLink] request={:s}'.format(grpc_message_to_json_string(request))) return self._del(request, 'link', request.link_uuid.uuid, 'link_id', TOPIC_LINK, context) def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]: LOGGER.info('[GetLinkEvents] request={:s}'.format(grpc_message_to_json_string(request))) for message in self.msg_broker.consume({TOPIC_LINK}): yield LinkEvent(**json.loads(message.content)) # ----- Slice ------------------------------------------------------------------------------------------------------ def ListSliceIds(self, request: ContextId, context : grpc.ServicerContext) -> SliceIdList: LOGGER.info('[ListSliceIds] request={:s}'.format(grpc_message_to_json_string(request))) slices = get_entries(self.database, 'slice[{:s}]'.format(str(request.context_uuid.uuid))) return SliceIdList(slice_ids=[slice.slice_id for slice in slices]) def ListSlices(self, request: ContextId, context : grpc.ServicerContext) -> SliceList: LOGGER.info('[ListSlices] request={:s}'.format(grpc_message_to_json_string(request))) slices = get_entries(self.database, 'slice[{:s}]'.format(str(request.context_uuid.uuid))) return SliceList(slices=[slice for slice in slices]) def GetSlice(self, request: SliceId, context : grpc.ServicerContext) -> Slice: LOGGER.info('[GetSlice] request={:s}'.format(grpc_message_to_json_string(request))) container_name = 'slice[{:s}]'.format(str(request.context_id.context_uuid.uuid)) return get_entry(context, self.database, container_name, request.slice_uuid.uuid) def SetSlice(self, request: Slice, context : grpc.ServicerContext) -> SliceId: LOGGER.info('[SetSlice] request={:s}'.format(grpc_message_to_json_string(request))) container_name = 'slice[{:s}]'.format(str(request.slice_id.context_id.context_uuid.uuid)) slice_uuid = request.slice_id.slice_uuid.uuid return self._set(request, container_name, slice_uuid, 'slice_id', TOPIC_SLICE) def RemoveSlice(self, request: SliceId, context : grpc.ServicerContext) -> Empty: LOGGER.info('[RemoveSlice] request={:s}'.format(grpc_message_to_json_string(request))) container_name = 'slice[{:s}]'.format(str(request.context_id.context_uuid.uuid)) slice_uuid = request.slice_uuid.uuid return self._del(request, container_name, slice_uuid, 'slice_id', TOPIC_SLICE, context) def GetSliceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]: LOGGER.info('[GetSliceEvents] request={:s}'.format(grpc_message_to_json_string(request))) for message in self.msg_broker.consume({TOPIC_SLICE}): yield SliceEvent(**json.loads(message.content)) # ----- Service ---------------------------------------------------------------------------------------------------- def ListServiceIds(self, request: ContextId, context : grpc.ServicerContext) -> ServiceIdList: LOGGER.info('[ListServiceIds] request={:s}'.format(grpc_message_to_json_string(request))) services = get_entries(self.database, 'service[{:s}]'.format(str(request.context_uuid.uuid))) return ServiceIdList(service_ids=[service.service_id for service in services]) def ListServices(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList: LOGGER.info('[ListServices] request={:s}'.format(grpc_message_to_json_string(request))) services = get_entries(self.database, 'service[{:s}]'.format(str(request.context_uuid.uuid))) return ServiceList(services=[service for service in services]) def GetService(self, request: ServiceId, context : grpc.ServicerContext) -> Service: LOGGER.info('[GetService] request={:s}'.format(grpc_message_to_json_string(request))) container_name = 'service[{:s}]'.format(str(request.context_id.context_uuid.uuid)) return get_entry(context, self.database, container_name, request.service_uuid.uuid) def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId: LOGGER.info('[SetService] request={:s}'.format(grpc_message_to_json_string(request))) container_name = 'service[{:s}]'.format(str(request.service_id.context_id.context_uuid.uuid)) service_uuid = request.service_id.service_uuid.uuid return self._set(request, container_name, service_uuid, 'service_id', TOPIC_SERVICE) def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty: LOGGER.info('[RemoveService] request={:s}'.format(grpc_message_to_json_string(request))) container_name = 'service[{:s}]'.format(str(request.context_id.context_uuid.uuid)) service_uuid = request.service_uuid.uuid return self._del(request, container_name, service_uuid, 'service_id', TOPIC_SERVICE, context) def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]: LOGGER.info('[GetServiceEvents] request={:s}'.format(grpc_message_to_json_string(request))) for message in self.msg_broker.consume({TOPIC_SERVICE}): yield ServiceEvent(**json.loads(message.content)) # ----- Connection ------------------------------------------------------------------------------------------------- def ListConnectionIds(self, request: ServiceId, context : grpc.ServicerContext) -> ConnectionIdList: LOGGER.info('[ListConnectionIds] request={:s}'.format(grpc_message_to_json_string(request))) container_name = 'service_connections[{:s}/{:s}]'.format( str(request.context_id.context_uuid.uuid), str(request.service_uuid.uuid)) return ConnectionIdList(connection_ids=[c.connection_id for c in get_entries(self.database, container_name)]) def ListConnections(self, request: ServiceId, context : grpc.ServicerContext) -> ConnectionList: LOGGER.info('[ListConnections] request={:s}'.format(grpc_message_to_json_string(request))) container_name = 'service_connections[{:s}/{:s}]'.format( str(request.context_id.context_uuid.uuid), str(request.service_uuid.uuid)) return ConnectionList(connections=get_entries(self.database, container_name)) def GetConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Connection: LOGGER.info('[GetConnection] request={:s}'.format(grpc_message_to_json_string(request))) return get_entry(context, self.database, 'connection', request.connection_uuid.uuid) def SetConnection(self, request: Connection, context : grpc.ServicerContext) -> ConnectionId: LOGGER.info('[SetConnection] request={:s}'.format(grpc_message_to_json_string(request))) container_name = 'service_connection[{:s}/{:s}]'.format( str(request.service_id.context_id.context_uuid.uuid), str(request.service_id.service_uuid.uuid)) connection_uuid = request.connection_id.connection_uuid.uuid set_entry(self.database, container_name, connection_uuid, request) return self._set(request, 'connection', connection_uuid, 'connection_id', TOPIC_CONNECTION) def RemoveConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Empty: LOGGER.info('[RemoveConnection] request={:s}'.format(grpc_message_to_json_string(request))) connection = get_entry(context, self.database, 'connection', request.connection_uuid.uuid) container_name = 'service_connection[{:s}/{:s}]'.format( str(connection.service_id.context_id.context_uuid.uuid), str(connection.service_id.service_uuid.uuid)) connection_uuid = request.connection_uuid.uuid del_entry(context, self.database, container_name, connection_uuid) return self._del(request, 'connection', connection_uuid, 'connection_id', TOPIC_CONNECTION, context) def GetConnectionEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]: LOGGER.info('[GetConnectionEvents] request={:s}'.format(grpc_message_to_json_string(request))) for message in self.msg_broker.consume({TOPIC_CONNECTION}): yield ConnectionEvent(**json.loads(message.content))