diff --git a/src/common/tests/MockServicerImpl_DltGateway.py b/src/common/tests/MockServicerImpl_DltGateway.py new file mode 100644 index 0000000000000000000000000000000000000000..16eae7a34686a90b1ac08010ce134c8f47d79146 --- /dev/null +++ b/src/common/tests/MockServicerImpl_DltGateway.py @@ -0,0 +1,165 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, itertools, json, logging, time +from typing import Dict, Iterator, Optional, Tuple +from common.tests.MockMessageBroker import Message, MockMessageBroker +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.proto.context_pb2 import EVENTTYPE_CREATE, EVENTTYPE_REMOVE, EVENTTYPE_UPDATE, Empty, TeraFlowController +from common.proto.dlt_gateway_pb2 import ( + DLTRECORDOPERATION_ADD, DLTRECORDOPERATION_DELETE, DLTRECORDOPERATION_UNDEFINED, DLTRECORDOPERATION_UPDATE, + DLTRECORDSTATUS_FAILED, DLTRECORDSTATUS_SUCCEEDED, DLTRECORDTYPE_CONTEXT, DLTRECORDTYPE_DEVICE, DLTRECORDTYPE_LINK, + DLTRECORDTYPE_SERVICE, DLTRECORDTYPE_SLICE, DLTRECORDTYPE_TOPOLOGY, DLTRECORDTYPE_UNDEFINED, + DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordOperationEnum, DltRecordStatus, + DltRecordSubscription, DltRecordTypeEnum) +from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceServicer + +LOGGER = logging.getLogger(__name__) + +DltRecordKey = Tuple[str, DltRecordOperationEnum, str] # domain_uuid, operation, record_uuid +DltRecordDict = Dict[DltRecordKey, DltRecord] # dlt_record_key => dlt_record + +class AlreadyExistsException(Exception): + pass + +class DoesNotExistException(Exception): + pass + +class MockServicerImpl_DltGateway(DltGatewayServiceServicer): + def __init__(self): + LOGGER.info('[__init__] Creating Servicer...') + self.records : DltRecordDict = {} + self.msg_broker = MockMessageBroker() + LOGGER.info('[__init__] Servicer Created') + + def __get_record(self, record_id : DltRecordId, should_exist : bool) -> Optional[Dict]: + domain_uuid, record_uuid = record_id.domain_uuid.uuid, record_id.record_uuid.uuid + str_type = DltRecordTypeEnum.Name(record_id.type).upper().replace('DLTRECORDTYPE_', '') + records_domain : Dict[str, Dict] = self.records.setdefault(domain_uuid, {}) + records_type : Dict[str, Dict] = records_domain.setdefault(str_type, {}) + record : Optional[Dict] = records_type.get(record_uuid) + if should_exist and record is None: + raise DoesNotExistException('RecordId({:s}, {:s}, {:s})'.format(domain_uuid, str_type, record_uuid)) + elif not should_exist and record is not None: + raise AlreadyExistsException('RecordId({:s}, {:s}, {:s})'.format(domain_uuid, str_type, record_uuid)) + return record + + def __set_record(self, record_id : DltRecordId, should_exist : bool, data_json : str) -> None: + domain_uuid, record_uuid = record_id.domain_uuid.uuid, record_id.record_uuid.uuid + str_type = DltRecordTypeEnum.Name(record_id.type).upper().replace('DLTRECORDTYPE_', '') + records_domain : Dict[str, Dict] = self.records.setdefault(domain_uuid, {}) + records_type : Dict[str, Dict] = records_domain.setdefault(str_type, {}) + record : Optional[Dict] = records_type.get(record_uuid) + if should_exist and record is None: + raise DoesNotExistException('RecordId({:s}, {:s}, {:s})'.format(domain_uuid, str_type, record_uuid)) + elif not should_exist and record is not None: + raise AlreadyExistsException('RecordId({:s}, {:s}, {:s})'.format(domain_uuid, str_type, record_uuid)) + records_type[record_uuid] = json.loads(data_json) + + def __del_record(self, record_id : DltRecordId) -> None: + domain_uuid, record_uuid = record_id.domain_uuid.uuid, record_id.record_uuid.uuid + str_type = DltRecordTypeEnum.Name(record_id.type).upper().replace('DLTRECORDTYPE_', '') + records_domain : Dict[str, Dict] = self.records.setdefault(domain_uuid, {}) + records_type : Dict[str, Dict] = records_domain.setdefault(str_type, {}) + record : Optional[Dict] = records_type.get(record_uuid) + if record is None: + raise DoesNotExistException('RecordId({:s}, {:s}, {:s})'.format(domain_uuid, str_type, record_uuid)) + records_type.discard(record_uuid) + + def __publish(self, operation : DltRecordOperationEnum, record_id : DltRecordId) -> None: + str_operation = DltRecordOperationEnum.Name(operation).upper().replace('DLTRECORDOPERATION_', '') + str_type = DltRecordTypeEnum.Name(record_id.type).upper().replace('DLTRECORDTYPE_', '') + topic = '{:s}:{:s}'.format(str_type, str_operation) + event = DltRecordEvent() + event.event.timestamp.timestamp = time.time() # pylint: disable=no-member + event.event.event_type = { # pylint: disable=no-member + DLTRECORDOPERATION_ADD : EVENTTYPE_CREATE, + DLTRECORDOPERATION_UPDATE: EVENTTYPE_UPDATE, + DLTRECORDOPERATION_DELETE: EVENTTYPE_REMOVE, + }.get(operation) + event.record_id.CopyFrom(record_id) # pylint: disable=no-member + self.msg_broker.publish(Message(topic=topic, content=grpc_message_to_json_string(event))) + + def RecordToDlt(self, request : DltRecord, context : grpc.ServicerContext) -> DltRecordStatus: + LOGGER.info('[RecordToDlt] request={:s}'.format(grpc_message_to_json_string(request))) + record_id = request.record_id + response = DltRecordStatus() + response.record_id.CopyFrom(record_id) # pylint: disable=no-member + try: + operation : DltRecordOperationEnum = request.operation + if operation == DLTRECORDOPERATION_ADD: + self.__set_record(record_id, False, request.data_json) + elif operation == DLTRECORDOPERATION_UPDATE: + self.__set_record(record_id, True, request.data_json) + elif operation == DLTRECORDOPERATION_DELETE: + self.__del_record(record_id) + else: + str_operation = DltRecordOperationEnum.Name(operation).upper().replace('DLTRECORDOPERATION_', '') + raise NotImplementedError('DltRecordOperationEnum({:s})'.format(str_operation)) + self.__publish(operation, record_id) + response.status = DLTRECORDSTATUS_SUCCEEDED + except Exception as e: # pylint: disable=broad-except + response.status = DLTRECORDSTATUS_FAILED + response.error_message = str(e) + LOGGER.info('[RecordToDlt] response={:s}'.format(grpc_message_to_json_string(response))) + return response + + def GetFromDlt(self, request : DltRecordId, context : grpc.ServicerContext) -> DltRecord: + LOGGER.info('[GetFromDlt] request={:s}'.format(grpc_message_to_json_string(request))) + record = self.__get_record(request, True) + response = DltRecord() + response.record_id.CopyFrom(request) # pylint: disable=no-member + response.operation = DLTRECORDOPERATION_UNDEFINED + response.data_json = json.dumps(record, sort_keys=True) + LOGGER.info('[GetFromDlt] response={:s}'.format(grpc_message_to_json_string(response))) + return response + + def SubscribeToDlt( + self, request: DltRecordSubscription, context : grpc.ServicerContext + ) -> Iterator[DltRecordEvent]: + LOGGER.info('[SubscribeToDlt] request={:s}'.format(grpc_message_to_json_string(request))) + types = request.type + if len(types) == 0: + types = [ + DLTRECORDTYPE_UNDEFINED, DLTRECORDTYPE_CONTEXT, DLTRECORDTYPE_TOPOLOGY, DLTRECORDTYPE_DEVICE, + DLTRECORDTYPE_LINK, DLTRECORDTYPE_SERVICE, DLTRECORDTYPE_SLICE + ] + str_types = [ + DltRecordTypeEnum.Name(_type).upper().replace('DLTRECORDTYPE_', '') + for _type in types + ] + operations = request.operation + if len(operations) == 0: + operations = [ + DLTRECORDOPERATION_UNDEFINED, DLTRECORDOPERATION_ADD, DLTRECORDOPERATION_UPDATE, + DLTRECORDOPERATION_DELETE + ] + str_operations = [ + DltRecordOperationEnum.Name(_operation).upper().replace('DLTRECORDOPERATION_', '') + for _operation in operations + ] + topics = { + '{:s}:{:s}'.format(*type_operation) + for type_operation in itertools.product(str_types, str_operations) + } + for message in self.msg_broker.consume(topics): + yield DltRecordEvent(**json.loads(message.content)) + + def GetDltStatus(self, request : TeraFlowController, context : grpc.ServicerContext) -> DltPeerStatus: + LOGGER.info('[GetDltStatus] request={:s}'.format(grpc_message_to_json_string(request))) + raise NotImplementedError() + + def GetDltPeers(self, request : Empty, context : grpc.ServicerContext) -> DltPeerStatusList: + LOGGER.info('[GetDltPeers] request={:s}'.format(grpc_message_to_json_string(request))) + raise NotImplementedError() diff --git a/src/common/tests/MockServicerImpl_Service copy.py b/src/common/tests/MockServicerImpl_Service copy.py deleted file mode 100644 index 3b5c769dd23553a29ba0f0b13925a442007e013c..0000000000000000000000000000000000000000 --- a/src/common/tests/MockServicerImpl_Service copy.py +++ /dev/null @@ -1,108 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Any, Dict, Iterator, NamedTuple, Tuple -import grpc, logging -from common.tests.MockMessageBroker import MockMessageBroker -from common.tools.grpc.Tools import grpc_message_to_json_string -from context.proto.context_pb2 import Empty, TeraFlowController -from dlt.connector.proto.dlt_pb2 import ( - DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordOperationEnum, DltRecordStatus, DltRecordSubscription, DltRecordTypeEnum) -from dlt.connector.proto.dlt_pb2_grpc import DltServiceServicer - -LOGGER = logging.getLogger(__name__) - -DltRecordKey = Tuple[str, DltRecordOperationEnum, str] # domain_uuid, operation, record_uuid -DltRecordDict = Dict[DltRecordKey, DltRecord] # dlt_record_key => dlt_record - -class MockServicerImpl_Dlt(DltServiceServicer): - def __init__(self): - LOGGER.info('[__init__] Creating Servicer...') - self.records : DltRecordDict = {} - self.msg_broker = MockMessageBroker() - LOGGER.info('[__init__] Servicer Created') - - def RecordToDlt(self, request : DltRecord, context : grpc.ServicerContext) -> DltRecordStatus: - LOGGER.info('[RecordToDlt] request={:s}'.format(grpc_message_to_json_string(request))) - operation = request.operation - domain_uuid = request.record_id.domain_uuid - record_uuid = request.record_id.record_uuid - - #if operation == - - - def GetFromDlt(self, request : DltRecordId, context : grpc.ServicerContext) -> DltRecord: - LOGGER.info('[GetFromDlt] request={:s}'.format(grpc_message_to_json_string(request))) - - def SubscribeToDlt(self, request: DltRecordSubscription, context : grpc.ServicerContext) -> Iterator[DltRecordEvent]: - LOGGER.info('[SubscribeToDlt] request={:s}'.format(grpc_message_to_json_string(request))) - for message in self.msg_broker.consume({TOPIC_CONTEXT}): yield ContextEvent(**json.loads(message.content)) - - def GetDltStatus(self, request : TeraFlowController, context : grpc.ServicerContext) -> DltPeerStatus: - LOGGER.info('[GetDltStatus] request={:s}'.format(grpc_message_to_json_string(request))) - - def GetDltPeers(self, request : Empty, context : grpc.ServicerContext) -> DltPeerStatusList: - LOGGER.info('[GetDltPeers] request={:s}'.format(grpc_message_to_json_string(request))) - - - - - - - - LOGGER.info('[__init__] Servicer Created') - - # ----- Common ----------------------------------------------------------------------------------------------------- - - def _set(self, request, container_name, entry_uuid, entry_id_field_name, topic_name): - exists = has_entry(self.database, container_name, entry_uuid) - entry = set_entry(self.database, container_name, entry_uuid, request) - event_type = EventTypeEnum.EVENTTYPE_UPDATE if exists else EventTypeEnum.EVENTTYPE_CREATE - entry_id = getattr(entry, entry_id_field_name) - dict_entry_id = grpc_message_to_json(entry_id) - notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id}) - return entry_id - - def _del(self, request, container_name, entry_uuid, entry_id_field_name, topic_name, grpc_context): - empty = del_entry(grpc_context, self.database, container_name, entry_uuid) - event_type = EventTypeEnum.EVENTTYPE_REMOVE - dict_entry_id = grpc_message_to_json(request) - notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id}) - return empty - - # ----- Context ---------------------------------------------------------------------------------------------------- - - def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList: - LOGGER.info('[ListContextIds] request={:s}'.format(grpc_message_to_json_string(request))) - return ContextIdList(context_ids=[context.context_id for context in get_entries(self.database, 'context')]) - - def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList: - LOGGER.info('[ListContexts] request={:s}'.format(grpc_message_to_json_string(request))) - return ContextList(contexts=get_entries(self.database, 'context')) - - def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context: - LOGGER.info('[GetContext] request={:s}'.format(grpc_message_to_json_string(request))) - return get_entry(context, self.database, 'context', request.context_uuid.uuid) - - def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId: - LOGGER.info('[SetContext] request={:s}'.format(grpc_message_to_json_string(request))) - return self._set(request, 'context', request.context_uuid.uuid, 'context_id', TOPIC_CONTEXT) - - def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty: - LOGGER.info('[RemoveContext] request={:s}'.format(grpc_message_to_json_string(request))) - return self._del(request, 'context', request.context_uuid.uuid, 'context_id', TOPIC_CONTEXT, context) - - def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]: - LOGGER.info('[GetContextEvents] request={:s}'.format(grpc_message_to_json_string(request))) - for message in self.msg_broker.consume({TOPIC_CONTEXT}): yield ContextEvent(**json.loads(message.content)) diff --git a/src/dlt/connector/tests/Objects.py b/src/dlt/connector/tests/Objects.py index cb3baf9c9553a5f99c4a1e1b45d24a0ea1297736..f797e93e6f2f4f6597a667fff61b2b8ba1cbd72a 100644 --- a/src/dlt/connector/tests/Objects.py +++ b/src/dlt/connector/tests/Objects.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID from common.tools.object_factory.Context import json_context, json_context_id from common.tools.object_factory.Device import json_device_emulated_packet_router_disabled, json_device_id from common.tools.object_factory.EndPoint import json_endpoints @@ -28,26 +27,55 @@ def compose_device( device = json_device_emulated_packet_router_disabled(device_uuid, endpoints=endpoints) return device_id, endpoints, device +# ===== Domain A ======================================================================================================= + +# ----- Context -------------------------------------------------------------------------------------------------------- +DA_CONTEXT_ADMIN_ID = json_context_id('A') +DA_CONTEXT_ADMIN = json_context('A') + +# ----- Topology ------------------------------------------------------------------------------------------------------- +DA_TOPOLOGY_ADMIN_ID = json_topology_id('A', context_id=DA_CONTEXT_ADMIN_ID) +DA_TOPOLOGY_ADMIN = json_topology('A', context_id=DA_CONTEXT_ADMIN_ID) + +# ----- Devices -------------------------------------------------------------------------------------------------------- +DA_DEVICE_DEV1_ID, DA_DEVICE_DEV1_ENDPOINTS, DA_DEVICE_DEV1 = compose_device('DEV1@A', ['1', '2']) +DA_DEVICE_DEV2_ID, DA_DEVICE_DEV2_ENDPOINTS, DA_DEVICE_DEV2 = compose_device('DEV2@A', ['1', '2']) +DA_DEVICE_DEV3_ID, DA_DEVICE_DEV3_ENDPOINTS, DA_DEVICE_DEV3 = compose_device('DEV3@A', ['1', '2']) + +# ----- Links ---------------------------------------------------------------------------------------------------------- +DA_LINK_DEV1_DEV2_ID, DA_LINK_DEV1_DEV2 = compose_link(DA_DEVICE_DEV1_ENDPOINTS[0], DA_DEVICE_DEV2_ENDPOINTS[0]) +DA_LINK_DEV1_DEV3_ID, DA_LINK_DEV1_DEV3 = compose_link(DA_DEVICE_DEV1_ENDPOINTS[1], DA_DEVICE_DEV3_ENDPOINTS[0]) +DA_LINK_DEV2_DEV3_ID, DA_LINK_DEV2_DEV3 = compose_link(DA_DEVICE_DEV2_ENDPOINTS[1], DA_DEVICE_DEV3_ENDPOINTS[1]) + +# ----- Containers ----------------------------------------------------------------------------------------------------- +DA_CONTEXTS = [DA_CONTEXT_ADMIN] +DA_TOPOLOGIES = [DA_TOPOLOGY_ADMIN] +DA_DEVICES = [DA_DEVICE_DEV1, DA_DEVICE_DEV2, DA_DEVICE_DEV3] +DA_LINKS = [DA_LINK_DEV1_DEV2, DA_LINK_DEV1_DEV3, DA_LINK_DEV2_DEV3] + + +# ===== Domain B ======================================================================================================= + # ----- Context -------------------------------------------------------------------------------------------------------- -CONTEXT_ADMIN_ID = json_context_id(DEFAULT_CONTEXT_UUID) -CONTEXT_ADMIN = json_context(DEFAULT_CONTEXT_UUID) +DB_CONTEXT_ADMIN_ID = json_context_id('B') +DB_CONTEXT_ADMIN = json_context('B') # ----- Topology ------------------------------------------------------------------------------------------------------- -TOPOLOGY_ADMIN_ID = json_topology_id(DEFAULT_TOPOLOGY_UUID, context_id=CONTEXT_ADMIN_ID) -TOPOLOGY_ADMIN = json_topology(DEFAULT_TOPOLOGY_UUID, context_id=CONTEXT_ADMIN_ID) +DB_TOPOLOGY_ADMIN_ID = json_topology_id('B', context_id=DB_CONTEXT_ADMIN_ID) +DB_TOPOLOGY_ADMIN = json_topology('B', context_id=DB_CONTEXT_ADMIN_ID) # ----- Devices -------------------------------------------------------------------------------------------------------- -DEVICE_DEV1_ID, DEVICE_DEV1_ENDPOINTS, DEVICE_DEV1 = compose_device('DEV1', ['1', '2']) -DEVICE_DEV2_ID, DEVICE_DEV2_ENDPOINTS, DEVICE_DEV2 = compose_device('DEV2', ['1', '2']) -DEVICE_DEV3_ID, DEVICE_DEV3_ENDPOINTS, DEVICE_DEV3 = compose_device('DEV3', ['1', '2']) +DB_DEVICE_DEV1_ID, DB_DEVICE_DEV1_ENDPOINTS, DB_DEVICE_DEV1 = compose_device('DEV1@B', ['1', '2']) +DB_DEVICE_DEV2_ID, DB_DEVICE_DEV2_ENDPOINTS, DB_DEVICE_DEV2 = compose_device('DEV2@B', ['1', '2']) +DB_DEVICE_DEV3_ID, DB_DEVICE_DEV3_ENDPOINTS, DB_DEVICE_DEV3 = compose_device('DEV3@B', ['1', '2']) # ----- Links ---------------------------------------------------------------------------------------------------------- -LINK_DEV1_DEV2_ID, LINK_DEV1_DEV2 = compose_link(DEVICE_DEV1_ENDPOINTS[0], DEVICE_DEV2_ENDPOINTS[0]) -LINK_DEV1_DEV3_ID, LINK_DEV1_DEV3 = compose_link(DEVICE_DEV1_ENDPOINTS[1], DEVICE_DEV3_ENDPOINTS[0]) -LINK_DEV2_DEV3_ID, LINK_DEV2_DEV3 = compose_link(DEVICE_DEV2_ENDPOINTS[1], DEVICE_DEV3_ENDPOINTS[1]) +DB_LINK_DEV1_DEV2_ID, DB_LINK_DEV1_DEV2 = compose_link(DB_DEVICE_DEV1_ENDPOINTS[0], DB_DEVICE_DEV2_ENDPOINTS[0]) +DB_LINK_DEV1_DEV3_ID, DB_LINK_DEV1_DEV3 = compose_link(DB_DEVICE_DEV1_ENDPOINTS[1], DB_DEVICE_DEV3_ENDPOINTS[0]) +DB_LINK_DEV2_DEV3_ID, DB_LINK_DEV2_DEV3 = compose_link(DB_DEVICE_DEV2_ENDPOINTS[1], DB_DEVICE_DEV3_ENDPOINTS[1]) # ----- Containers ----------------------------------------------------------------------------------------------------- -CONTEXTS = [CONTEXT_ADMIN] -TOPOLOGIES = [TOPOLOGY_ADMIN] -DEVICES = [DEVICE_DEV1, DEVICE_DEV2, DEVICE_DEV3] -LINKS = [LINK_DEV1_DEV2, LINK_DEV1_DEV3, LINK_DEV2_DEV3] +DB_CONTEXTS = [DB_CONTEXT_ADMIN] +DB_TOPOLOGIES = [DB_TOPOLOGY_ADMIN] +DB_DEVICES = [DB_DEVICE_DEV1, DB_DEVICE_DEV2, DB_DEVICE_DEV3] +DB_LINKS = [DB_LINK_DEV1_DEV2, DB_LINK_DEV1_DEV3, DB_LINK_DEV2_DEV3] diff --git a/src/dlt/connector/tests/__init__.py b/src/dlt/connector/tests/__init__.py index 70a33251242c51f49140e596b8208a19dd5245f7..9953c820575d42fa88351cc8de022d880ba96e6a 100644 --- a/src/dlt/connector/tests/__init__.py +++ b/src/dlt/connector/tests/__init__.py @@ -11,4 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -