Skip to content
Snippets Groups Projects
Commit 23157444 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

DltConnector component:

- Implemented MockServicerImpl_DltGateway
- Updated Objects for unitary tests
parent 28cceac3
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!16DLT component (and related) improvements
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, itertools, json, logging, time
from typing import Dict, Iterator, Optional, Tuple
from common.tests.MockMessageBroker import Message, MockMessageBroker
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.proto.context_pb2 import EVENTTYPE_CREATE, EVENTTYPE_REMOVE, EVENTTYPE_UPDATE, Empty, TeraFlowController
from common.proto.dlt_gateway_pb2 import (
DLTRECORDOPERATION_ADD, DLTRECORDOPERATION_DELETE, DLTRECORDOPERATION_UNDEFINED, DLTRECORDOPERATION_UPDATE,
DLTRECORDSTATUS_FAILED, DLTRECORDSTATUS_SUCCEEDED, DLTRECORDTYPE_CONTEXT, DLTRECORDTYPE_DEVICE, DLTRECORDTYPE_LINK,
DLTRECORDTYPE_SERVICE, DLTRECORDTYPE_SLICE, DLTRECORDTYPE_TOPOLOGY, DLTRECORDTYPE_UNDEFINED,
DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordOperationEnum, DltRecordStatus,
DltRecordSubscription, DltRecordTypeEnum)
from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceServicer
LOGGER = logging.getLogger(__name__)
DltRecordKey = Tuple[str, DltRecordOperationEnum, str] # domain_uuid, operation, record_uuid
DltRecordDict = Dict[DltRecordKey, DltRecord] # dlt_record_key => dlt_record
class AlreadyExistsException(Exception):
pass
class DoesNotExistException(Exception):
pass
class MockServicerImpl_DltGateway(DltGatewayServiceServicer):
def __init__(self):
LOGGER.info('[__init__] Creating Servicer...')
self.records : DltRecordDict = {}
self.msg_broker = MockMessageBroker()
LOGGER.info('[__init__] Servicer Created')
def __get_record(self, record_id : DltRecordId, should_exist : bool) -> Optional[Dict]:
domain_uuid, record_uuid = record_id.domain_uuid.uuid, record_id.record_uuid.uuid
str_type = DltRecordTypeEnum.Name(record_id.type).upper().replace('DLTRECORDTYPE_', '')
records_domain : Dict[str, Dict] = self.records.setdefault(domain_uuid, {})
records_type : Dict[str, Dict] = records_domain.setdefault(str_type, {})
record : Optional[Dict] = records_type.get(record_uuid)
if should_exist and record is None:
raise DoesNotExistException('RecordId({:s}, {:s}, {:s})'.format(domain_uuid, str_type, record_uuid))
elif not should_exist and record is not None:
raise AlreadyExistsException('RecordId({:s}, {:s}, {:s})'.format(domain_uuid, str_type, record_uuid))
return record
def __set_record(self, record_id : DltRecordId, should_exist : bool, data_json : str) -> None:
domain_uuid, record_uuid = record_id.domain_uuid.uuid, record_id.record_uuid.uuid
str_type = DltRecordTypeEnum.Name(record_id.type).upper().replace('DLTRECORDTYPE_', '')
records_domain : Dict[str, Dict] = self.records.setdefault(domain_uuid, {})
records_type : Dict[str, Dict] = records_domain.setdefault(str_type, {})
record : Optional[Dict] = records_type.get(record_uuid)
if should_exist and record is None:
raise DoesNotExistException('RecordId({:s}, {:s}, {:s})'.format(domain_uuid, str_type, record_uuid))
elif not should_exist and record is not None:
raise AlreadyExistsException('RecordId({:s}, {:s}, {:s})'.format(domain_uuid, str_type, record_uuid))
records_type[record_uuid] = json.loads(data_json)
def __del_record(self, record_id : DltRecordId) -> None:
domain_uuid, record_uuid = record_id.domain_uuid.uuid, record_id.record_uuid.uuid
str_type = DltRecordTypeEnum.Name(record_id.type).upper().replace('DLTRECORDTYPE_', '')
records_domain : Dict[str, Dict] = self.records.setdefault(domain_uuid, {})
records_type : Dict[str, Dict] = records_domain.setdefault(str_type, {})
record : Optional[Dict] = records_type.get(record_uuid)
if record is None:
raise DoesNotExistException('RecordId({:s}, {:s}, {:s})'.format(domain_uuid, str_type, record_uuid))
records_type.discard(record_uuid)
def __publish(self, operation : DltRecordOperationEnum, record_id : DltRecordId) -> None:
str_operation = DltRecordOperationEnum.Name(operation).upper().replace('DLTRECORDOPERATION_', '')
str_type = DltRecordTypeEnum.Name(record_id.type).upper().replace('DLTRECORDTYPE_', '')
topic = '{:s}:{:s}'.format(str_type, str_operation)
event = DltRecordEvent()
event.event.timestamp.timestamp = time.time() # pylint: disable=no-member
event.event.event_type = { # pylint: disable=no-member
DLTRECORDOPERATION_ADD : EVENTTYPE_CREATE,
DLTRECORDOPERATION_UPDATE: EVENTTYPE_UPDATE,
DLTRECORDOPERATION_DELETE: EVENTTYPE_REMOVE,
}.get(operation)
event.record_id.CopyFrom(record_id) # pylint: disable=no-member
self.msg_broker.publish(Message(topic=topic, content=grpc_message_to_json_string(event)))
def RecordToDlt(self, request : DltRecord, context : grpc.ServicerContext) -> DltRecordStatus:
LOGGER.info('[RecordToDlt] request={:s}'.format(grpc_message_to_json_string(request)))
record_id = request.record_id
response = DltRecordStatus()
response.record_id.CopyFrom(record_id) # pylint: disable=no-member
try:
operation : DltRecordOperationEnum = request.operation
if operation == DLTRECORDOPERATION_ADD:
self.__set_record(record_id, False, request.data_json)
elif operation == DLTRECORDOPERATION_UPDATE:
self.__set_record(record_id, True, request.data_json)
elif operation == DLTRECORDOPERATION_DELETE:
self.__del_record(record_id)
else:
str_operation = DltRecordOperationEnum.Name(operation).upper().replace('DLTRECORDOPERATION_', '')
raise NotImplementedError('DltRecordOperationEnum({:s})'.format(str_operation))
self.__publish(operation, record_id)
response.status = DLTRECORDSTATUS_SUCCEEDED
except Exception as e: # pylint: disable=broad-except
response.status = DLTRECORDSTATUS_FAILED
response.error_message = str(e)
LOGGER.info('[RecordToDlt] response={:s}'.format(grpc_message_to_json_string(response)))
return response
def GetFromDlt(self, request : DltRecordId, context : grpc.ServicerContext) -> DltRecord:
LOGGER.info('[GetFromDlt] request={:s}'.format(grpc_message_to_json_string(request)))
record = self.__get_record(request, True)
response = DltRecord()
response.record_id.CopyFrom(request) # pylint: disable=no-member
response.operation = DLTRECORDOPERATION_UNDEFINED
response.data_json = json.dumps(record, sort_keys=True)
LOGGER.info('[GetFromDlt] response={:s}'.format(grpc_message_to_json_string(response)))
return response
def SubscribeToDlt(
self, request: DltRecordSubscription, context : grpc.ServicerContext
) -> Iterator[DltRecordEvent]:
LOGGER.info('[SubscribeToDlt] request={:s}'.format(grpc_message_to_json_string(request)))
types = request.type
if len(types) == 0:
types = [
DLTRECORDTYPE_UNDEFINED, DLTRECORDTYPE_CONTEXT, DLTRECORDTYPE_TOPOLOGY, DLTRECORDTYPE_DEVICE,
DLTRECORDTYPE_LINK, DLTRECORDTYPE_SERVICE, DLTRECORDTYPE_SLICE
]
str_types = [
DltRecordTypeEnum.Name(_type).upper().replace('DLTRECORDTYPE_', '')
for _type in types
]
operations = request.operation
if len(operations) == 0:
operations = [
DLTRECORDOPERATION_UNDEFINED, DLTRECORDOPERATION_ADD, DLTRECORDOPERATION_UPDATE,
DLTRECORDOPERATION_DELETE
]
str_operations = [
DltRecordOperationEnum.Name(_operation).upper().replace('DLTRECORDOPERATION_', '')
for _operation in operations
]
topics = {
'{:s}:{:s}'.format(*type_operation)
for type_operation in itertools.product(str_types, str_operations)
}
for message in self.msg_broker.consume(topics):
yield DltRecordEvent(**json.loads(message.content))
def GetDltStatus(self, request : TeraFlowController, context : grpc.ServicerContext) -> DltPeerStatus:
LOGGER.info('[GetDltStatus] request={:s}'.format(grpc_message_to_json_string(request)))
raise NotImplementedError()
def GetDltPeers(self, request : Empty, context : grpc.ServicerContext) -> DltPeerStatusList:
LOGGER.info('[GetDltPeers] request={:s}'.format(grpc_message_to_json_string(request)))
raise NotImplementedError()
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict, Iterator, NamedTuple, Tuple
import grpc, logging
from common.tests.MockMessageBroker import MockMessageBroker
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.proto.context_pb2 import Empty, TeraFlowController
from dlt.connector.proto.dlt_pb2 import (
DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordOperationEnum, DltRecordStatus, DltRecordSubscription, DltRecordTypeEnum)
from dlt.connector.proto.dlt_pb2_grpc import DltServiceServicer
LOGGER = logging.getLogger(__name__)
DltRecordKey = Tuple[str, DltRecordOperationEnum, str] # domain_uuid, operation, record_uuid
DltRecordDict = Dict[DltRecordKey, DltRecord] # dlt_record_key => dlt_record
class MockServicerImpl_Dlt(DltServiceServicer):
def __init__(self):
LOGGER.info('[__init__] Creating Servicer...')
self.records : DltRecordDict = {}
self.msg_broker = MockMessageBroker()
LOGGER.info('[__init__] Servicer Created')
def RecordToDlt(self, request : DltRecord, context : grpc.ServicerContext) -> DltRecordStatus:
LOGGER.info('[RecordToDlt] request={:s}'.format(grpc_message_to_json_string(request)))
operation = request.operation
domain_uuid = request.record_id.domain_uuid
record_uuid = request.record_id.record_uuid
#if operation ==
def GetFromDlt(self, request : DltRecordId, context : grpc.ServicerContext) -> DltRecord:
LOGGER.info('[GetFromDlt] request={:s}'.format(grpc_message_to_json_string(request)))
def SubscribeToDlt(self, request: DltRecordSubscription, context : grpc.ServicerContext) -> Iterator[DltRecordEvent]:
LOGGER.info('[SubscribeToDlt] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_CONTEXT}): yield ContextEvent(**json.loads(message.content))
def GetDltStatus(self, request : TeraFlowController, context : grpc.ServicerContext) -> DltPeerStatus:
LOGGER.info('[GetDltStatus] request={:s}'.format(grpc_message_to_json_string(request)))
def GetDltPeers(self, request : Empty, context : grpc.ServicerContext) -> DltPeerStatusList:
LOGGER.info('[GetDltPeers] request={:s}'.format(grpc_message_to_json_string(request)))
LOGGER.info('[__init__] Servicer Created')
# ----- Common -----------------------------------------------------------------------------------------------------
def _set(self, request, container_name, entry_uuid, entry_id_field_name, topic_name):
exists = has_entry(self.database, container_name, entry_uuid)
entry = set_entry(self.database, container_name, entry_uuid, request)
event_type = EventTypeEnum.EVENTTYPE_UPDATE if exists else EventTypeEnum.EVENTTYPE_CREATE
entry_id = getattr(entry, entry_id_field_name)
dict_entry_id = grpc_message_to_json(entry_id)
notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id})
return entry_id
def _del(self, request, container_name, entry_uuid, entry_id_field_name, topic_name, grpc_context):
empty = del_entry(grpc_context, self.database, container_name, entry_uuid)
event_type = EventTypeEnum.EVENTTYPE_REMOVE
dict_entry_id = grpc_message_to_json(request)
notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id})
return empty
# ----- Context ----------------------------------------------------------------------------------------------------
def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList:
LOGGER.info('[ListContextIds] request={:s}'.format(grpc_message_to_json_string(request)))
return ContextIdList(context_ids=[context.context_id for context in get_entries(self.database, 'context')])
def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList:
LOGGER.info('[ListContexts] request={:s}'.format(grpc_message_to_json_string(request)))
return ContextList(contexts=get_entries(self.database, 'context'))
def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context:
LOGGER.info('[GetContext] request={:s}'.format(grpc_message_to_json_string(request)))
return get_entry(context, self.database, 'context', request.context_uuid.uuid)
def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId:
LOGGER.info('[SetContext] request={:s}'.format(grpc_message_to_json_string(request)))
return self._set(request, 'context', request.context_uuid.uuid, 'context_id', TOPIC_CONTEXT)
def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveContext] request={:s}'.format(grpc_message_to_json_string(request)))
return self._del(request, 'context', request.context_uuid.uuid, 'context_id', TOPIC_CONTEXT, context)
def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
LOGGER.info('[GetContextEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_CONTEXT}): yield ContextEvent(**json.loads(message.content))
...@@ -12,7 +12,6 @@ ...@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from common.tools.object_factory.Context import json_context, json_context_id from common.tools.object_factory.Context import json_context, json_context_id
from common.tools.object_factory.Device import json_device_emulated_packet_router_disabled, json_device_id from common.tools.object_factory.Device import json_device_emulated_packet_router_disabled, json_device_id
from common.tools.object_factory.EndPoint import json_endpoints from common.tools.object_factory.EndPoint import json_endpoints
...@@ -28,26 +27,55 @@ def compose_device( ...@@ -28,26 +27,55 @@ def compose_device(
device = json_device_emulated_packet_router_disabled(device_uuid, endpoints=endpoints) device = json_device_emulated_packet_router_disabled(device_uuid, endpoints=endpoints)
return device_id, endpoints, device return device_id, endpoints, device
# ===== Domain A =======================================================================================================
# ----- Context --------------------------------------------------------------------------------------------------------
DA_CONTEXT_ADMIN_ID = json_context_id('A')
DA_CONTEXT_ADMIN = json_context('A')
# ----- Topology -------------------------------------------------------------------------------------------------------
DA_TOPOLOGY_ADMIN_ID = json_topology_id('A', context_id=DA_CONTEXT_ADMIN_ID)
DA_TOPOLOGY_ADMIN = json_topology('A', context_id=DA_CONTEXT_ADMIN_ID)
# ----- Devices --------------------------------------------------------------------------------------------------------
DA_DEVICE_DEV1_ID, DA_DEVICE_DEV1_ENDPOINTS, DA_DEVICE_DEV1 = compose_device('DEV1@A', ['1', '2'])
DA_DEVICE_DEV2_ID, DA_DEVICE_DEV2_ENDPOINTS, DA_DEVICE_DEV2 = compose_device('DEV2@A', ['1', '2'])
DA_DEVICE_DEV3_ID, DA_DEVICE_DEV3_ENDPOINTS, DA_DEVICE_DEV3 = compose_device('DEV3@A', ['1', '2'])
# ----- Links ----------------------------------------------------------------------------------------------------------
DA_LINK_DEV1_DEV2_ID, DA_LINK_DEV1_DEV2 = compose_link(DA_DEVICE_DEV1_ENDPOINTS[0], DA_DEVICE_DEV2_ENDPOINTS[0])
DA_LINK_DEV1_DEV3_ID, DA_LINK_DEV1_DEV3 = compose_link(DA_DEVICE_DEV1_ENDPOINTS[1], DA_DEVICE_DEV3_ENDPOINTS[0])
DA_LINK_DEV2_DEV3_ID, DA_LINK_DEV2_DEV3 = compose_link(DA_DEVICE_DEV2_ENDPOINTS[1], DA_DEVICE_DEV3_ENDPOINTS[1])
# ----- Containers -----------------------------------------------------------------------------------------------------
DA_CONTEXTS = [DA_CONTEXT_ADMIN]
DA_TOPOLOGIES = [DA_TOPOLOGY_ADMIN]
DA_DEVICES = [DA_DEVICE_DEV1, DA_DEVICE_DEV2, DA_DEVICE_DEV3]
DA_LINKS = [DA_LINK_DEV1_DEV2, DA_LINK_DEV1_DEV3, DA_LINK_DEV2_DEV3]
# ===== Domain B =======================================================================================================
# ----- Context -------------------------------------------------------------------------------------------------------- # ----- Context --------------------------------------------------------------------------------------------------------
CONTEXT_ADMIN_ID = json_context_id(DEFAULT_CONTEXT_UUID) DB_CONTEXT_ADMIN_ID = json_context_id('B')
CONTEXT_ADMIN = json_context(DEFAULT_CONTEXT_UUID) DB_CONTEXT_ADMIN = json_context('B')
# ----- Topology ------------------------------------------------------------------------------------------------------- # ----- Topology -------------------------------------------------------------------------------------------------------
TOPOLOGY_ADMIN_ID = json_topology_id(DEFAULT_TOPOLOGY_UUID, context_id=CONTEXT_ADMIN_ID) DB_TOPOLOGY_ADMIN_ID = json_topology_id('B', context_id=DB_CONTEXT_ADMIN_ID)
TOPOLOGY_ADMIN = json_topology(DEFAULT_TOPOLOGY_UUID, context_id=CONTEXT_ADMIN_ID) DB_TOPOLOGY_ADMIN = json_topology('B', context_id=DB_CONTEXT_ADMIN_ID)
# ----- Devices -------------------------------------------------------------------------------------------------------- # ----- Devices --------------------------------------------------------------------------------------------------------
DEVICE_DEV1_ID, DEVICE_DEV1_ENDPOINTS, DEVICE_DEV1 = compose_device('DEV1', ['1', '2']) DB_DEVICE_DEV1_ID, DB_DEVICE_DEV1_ENDPOINTS, DB_DEVICE_DEV1 = compose_device('DEV1@B', ['1', '2'])
DEVICE_DEV2_ID, DEVICE_DEV2_ENDPOINTS, DEVICE_DEV2 = compose_device('DEV2', ['1', '2']) DB_DEVICE_DEV2_ID, DB_DEVICE_DEV2_ENDPOINTS, DB_DEVICE_DEV2 = compose_device('DEV2@B', ['1', '2'])
DEVICE_DEV3_ID, DEVICE_DEV3_ENDPOINTS, DEVICE_DEV3 = compose_device('DEV3', ['1', '2']) DB_DEVICE_DEV3_ID, DB_DEVICE_DEV3_ENDPOINTS, DB_DEVICE_DEV3 = compose_device('DEV3@B', ['1', '2'])
# ----- Links ---------------------------------------------------------------------------------------------------------- # ----- Links ----------------------------------------------------------------------------------------------------------
LINK_DEV1_DEV2_ID, LINK_DEV1_DEV2 = compose_link(DEVICE_DEV1_ENDPOINTS[0], DEVICE_DEV2_ENDPOINTS[0]) DB_LINK_DEV1_DEV2_ID, DB_LINK_DEV1_DEV2 = compose_link(DB_DEVICE_DEV1_ENDPOINTS[0], DB_DEVICE_DEV2_ENDPOINTS[0])
LINK_DEV1_DEV3_ID, LINK_DEV1_DEV3 = compose_link(DEVICE_DEV1_ENDPOINTS[1], DEVICE_DEV3_ENDPOINTS[0]) DB_LINK_DEV1_DEV3_ID, DB_LINK_DEV1_DEV3 = compose_link(DB_DEVICE_DEV1_ENDPOINTS[1], DB_DEVICE_DEV3_ENDPOINTS[0])
LINK_DEV2_DEV3_ID, LINK_DEV2_DEV3 = compose_link(DEVICE_DEV2_ENDPOINTS[1], DEVICE_DEV3_ENDPOINTS[1]) DB_LINK_DEV2_DEV3_ID, DB_LINK_DEV2_DEV3 = compose_link(DB_DEVICE_DEV2_ENDPOINTS[1], DB_DEVICE_DEV3_ENDPOINTS[1])
# ----- Containers ----------------------------------------------------------------------------------------------------- # ----- Containers -----------------------------------------------------------------------------------------------------
CONTEXTS = [CONTEXT_ADMIN] DB_CONTEXTS = [DB_CONTEXT_ADMIN]
TOPOLOGIES = [TOPOLOGY_ADMIN] DB_TOPOLOGIES = [DB_TOPOLOGY_ADMIN]
DEVICES = [DEVICE_DEV1, DEVICE_DEV2, DEVICE_DEV3] DB_DEVICES = [DB_DEVICE_DEV1, DB_DEVICE_DEV2, DB_DEVICE_DEV3]
LINKS = [LINK_DEV1_DEV2, LINK_DEV1_DEV3, LINK_DEV2_DEV3] DB_LINKS = [DB_LINK_DEV1_DEV2, DB_LINK_DEV1_DEV3, DB_LINK_DEV2_DEV3]
...@@ -11,4 +11,3 @@ ...@@ -11,4 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment