# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, itertools, json, logging, time from typing import Dict, Iterator, Optional, Tuple from common.tests.MockMessageBroker import Message, MockMessageBroker from common.tools.grpc.Tools import grpc_message_to_json_string from common.proto.context_pb2 import EVENTTYPE_CREATE, EVENTTYPE_REMOVE, EVENTTYPE_UPDATE, Empty, TeraFlowController from common.proto.dlt_gateway_pb2 import ( DLTRECORDOPERATION_ADD, DLTRECORDOPERATION_DELETE, DLTRECORDOPERATION_UNDEFINED, DLTRECORDOPERATION_UPDATE, DLTRECORDSTATUS_FAILED, DLTRECORDSTATUS_SUCCEEDED, DLTRECORDTYPE_CONTEXT, DLTRECORDTYPE_DEVICE, DLTRECORDTYPE_LINK, DLTRECORDTYPE_SERVICE, DLTRECORDTYPE_SLICE, DLTRECORDTYPE_TOPOLOGY, DLTRECORDTYPE_UNDEFINED, DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordOperationEnum, DltRecordStatus, DltRecordSubscription, DltRecordTypeEnum) from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceServicer LOGGER = logging.getLogger(__name__) DltRecordKey = Tuple[str, DltRecordOperationEnum, str] # domain_uuid, operation, record_uuid DltRecordDict = Dict[DltRecordKey, DltRecord] # dlt_record_key => dlt_record class AlreadyExistsException(Exception): pass class DoesNotExistException(Exception): pass class MockServicerImpl_DltGateway(DltGatewayServiceServicer): def __init__(self): LOGGER.info('[__init__] Creating Servicer...') self.records : DltRecordDict = {} self.msg_broker = MockMessageBroker() LOGGER.info('[__init__] Servicer Created') def __get_record(self, record_id : DltRecordId, should_exist : bool) -> Optional[Dict]: domain_uuid, record_uuid = record_id.domain_uuid.uuid, record_id.record_uuid.uuid str_type = DltRecordTypeEnum.Name(record_id.type).upper().replace('DLTRECORDTYPE_', '') records_domain : Dict[str, Dict] = self.records.setdefault(domain_uuid, {}) records_type : Dict[str, Dict] = records_domain.setdefault(str_type, {}) record : Optional[Dict] = records_type.get(record_uuid) if should_exist and record is None: raise DoesNotExistException('RecordId({:s}, {:s}, {:s})'.format(domain_uuid, str_type, record_uuid)) elif not should_exist and record is not None: raise AlreadyExistsException('RecordId({:s}, {:s}, {:s})'.format(domain_uuid, str_type, record_uuid)) return record def __set_record(self, record_id : DltRecordId, should_exist : bool, data_json : str) -> None: domain_uuid, record_uuid = record_id.domain_uuid.uuid, record_id.record_uuid.uuid str_type = DltRecordTypeEnum.Name(record_id.type).upper().replace('DLTRECORDTYPE_', '') records_domain : Dict[str, Dict] = self.records.setdefault(domain_uuid, {}) records_type : Dict[str, Dict] = records_domain.setdefault(str_type, {}) record : Optional[Dict] = records_type.get(record_uuid) if should_exist and record is None: raise DoesNotExistException('RecordId({:s}, {:s}, {:s})'.format(domain_uuid, str_type, record_uuid)) elif not should_exist and record is not None: raise AlreadyExistsException('RecordId({:s}, {:s}, {:s})'.format(domain_uuid, str_type, record_uuid)) records_type[record_uuid] = json.loads(data_json) def __del_record(self, record_id : DltRecordId) -> None: domain_uuid, record_uuid = record_id.domain_uuid.uuid, record_id.record_uuid.uuid str_type = DltRecordTypeEnum.Name(record_id.type).upper().replace('DLTRECORDTYPE_', '') records_domain : Dict[str, Dict] = self.records.setdefault(domain_uuid, {}) records_type : Dict[str, Dict] = records_domain.setdefault(str_type, {}) record : Optional[Dict] = records_type.get(record_uuid) if record is None: raise DoesNotExistException('RecordId({:s}, {:s}, {:s})'.format(domain_uuid, str_type, record_uuid)) records_type.discard(record_uuid) def __publish(self, operation : DltRecordOperationEnum, record_id : DltRecordId) -> None: str_operation = DltRecordOperationEnum.Name(operation).upper().replace('DLTRECORDOPERATION_', '') str_type = DltRecordTypeEnum.Name(record_id.type).upper().replace('DLTRECORDTYPE_', '') topic = '{:s}:{:s}'.format(str_type, str_operation) event = DltRecordEvent() event.event.timestamp.timestamp = time.time() # pylint: disable=no-member event.event.event_type = { # pylint: disable=no-member DLTRECORDOPERATION_ADD : EVENTTYPE_CREATE, DLTRECORDOPERATION_UPDATE: EVENTTYPE_UPDATE, DLTRECORDOPERATION_DELETE: EVENTTYPE_REMOVE, }.get(operation) event.record_id.CopyFrom(record_id) # pylint: disable=no-member self.msg_broker.publish(Message(topic=topic, content=grpc_message_to_json_string(event))) def RecordToDlt(self, request : DltRecord, context : grpc.ServicerContext) -> DltRecordStatus: LOGGER.info('[RecordToDlt] request={:s}'.format(grpc_message_to_json_string(request))) record_id = request.record_id response = DltRecordStatus() response.record_id.CopyFrom(record_id) # pylint: disable=no-member try: operation : DltRecordOperationEnum = request.operation if operation == DLTRECORDOPERATION_ADD: self.__set_record(record_id, False, request.data_json) elif operation == DLTRECORDOPERATION_UPDATE: self.__set_record(record_id, True, request.data_json) elif operation == DLTRECORDOPERATION_DELETE: self.__del_record(record_id) else: str_operation = DltRecordOperationEnum.Name(operation).upper().replace('DLTRECORDOPERATION_', '') raise NotImplementedError('DltRecordOperationEnum({:s})'.format(str_operation)) self.__publish(operation, record_id) response.status = DLTRECORDSTATUS_SUCCEEDED except Exception as e: # pylint: disable=broad-except response.status = DLTRECORDSTATUS_FAILED response.error_message = str(e) LOGGER.info('[RecordToDlt] response={:s}'.format(grpc_message_to_json_string(response))) return response def GetFromDlt(self, request : DltRecordId, context : grpc.ServicerContext) -> DltRecord: LOGGER.info('[GetFromDlt] request={:s}'.format(grpc_message_to_json_string(request))) record = self.__get_record(request, True) response = DltRecord() response.record_id.CopyFrom(request) # pylint: disable=no-member response.operation = DLTRECORDOPERATION_UNDEFINED response.data_json = json.dumps(record, sort_keys=True) LOGGER.info('[GetFromDlt] response={:s}'.format(grpc_message_to_json_string(response))) return response def SubscribeToDlt( self, request: DltRecordSubscription, context : grpc.ServicerContext ) -> Iterator[DltRecordEvent]: LOGGER.info('[SubscribeToDlt] request={:s}'.format(grpc_message_to_json_string(request))) types = request.type if len(types) == 0: types = [ DLTRECORDTYPE_UNDEFINED, DLTRECORDTYPE_CONTEXT, DLTRECORDTYPE_TOPOLOGY, DLTRECORDTYPE_DEVICE, DLTRECORDTYPE_LINK, DLTRECORDTYPE_SERVICE, DLTRECORDTYPE_SLICE ] str_types = [ DltRecordTypeEnum.Name(_type).upper().replace('DLTRECORDTYPE_', '') for _type in types ] operations = request.operation if len(operations) == 0: operations = [ DLTRECORDOPERATION_UNDEFINED, DLTRECORDOPERATION_ADD, DLTRECORDOPERATION_UPDATE, DLTRECORDOPERATION_DELETE ] str_operations = [ DltRecordOperationEnum.Name(_operation).upper().replace('DLTRECORDOPERATION_', '') for _operation in operations ] topics = { '{:s}:{:s}'.format(*type_operation) for type_operation in itertools.product(str_types, str_operations) } for message in self.msg_broker.consume(topics): yield DltRecordEvent(**json.loads(message.content)) def GetDltStatus(self, request : TeraFlowController, context : grpc.ServicerContext) -> DltPeerStatus: LOGGER.info('[GetDltStatus] request={:s}'.format(grpc_message_to_json_string(request))) raise NotImplementedError() def GetDltPeers(self, request : Empty, context : grpc.ServicerContext) -> DltPeerStatusList: LOGGER.info('[GetDltPeers] request={:s}'.format(grpc_message_to_json_string(request))) raise NotImplementedError()