diff --git a/src/common/tests/MockMessageBroker.py b/src/common/tests/MockMessageBroker.py new file mode 100644 index 0000000000000000000000000000000000000000..851c06766fd705bee746840f3d4ce9c4f4ac404d --- /dev/null +++ b/src/common/tests/MockMessageBroker.py @@ -0,0 +1,61 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json, logging, threading, time +from queue import Queue, Empty +from typing import Dict, Iterator, NamedTuple, Set + +LOGGER = logging.getLogger(__name__) +CONSUME_TIMEOUT = 0.1 # seconds + +class Message(NamedTuple): + topic: str + content: str + +class MockMessageBroker: + def __init__(self): + self._terminate = threading.Event() + self._topic__to__queues : Dict[str, Set[Queue]] = {} + + def publish(self, message : Message) -> None: + queues = self._topic__to__queues.get(message.topic, None) + if queues is None: return + for queue in queues: queue.put_nowait((message.topic, message.content)) + + def consume( + self, topic_names : Set[str], block : bool = True, consume_timeout : float = CONSUME_TIMEOUT + ) -> Iterator[Message]: + queue = Queue() + for topic_name in topic_names: + self._topic__to__queues.setdefault(topic_name, set()).add(queue) + + while not self._terminate.is_set(): + try: + message = queue.get(block=block, timeout=consume_timeout) + except Empty: + continue + if message is None: continue + yield Message(*message) + + for topic_name in topic_names: + self._topic__to__queues.get(topic_name, set()).discard(queue) + + def terminate(self): + self._terminate.set() + +def notify_event(messagebroker, topic_name, event_type, fields) -> None: + event = {'event': {'timestamp': time.time(), 'event_type': event_type}} + for field_name, field_value in fields.items(): + event[field_name] = field_value + messagebroker.publish(Message(topic_name, json.dumps(event))) diff --git a/src/common/tests/MockServicerImpl_Context.py b/src/common/tests/MockServicerImpl_Context.py index 8b4560016bc2cfa54d5ac30e7147b4df59e04e72..adb8ae36037d0b8c46862aa979b9bde4058b0dcf 100644 --- a/src/common/tests/MockServicerImpl_Context.py +++ b/src/common/tests/MockServicerImpl_Context.py @@ -12,18 +12,31 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging +import grpc, json, logging from typing import Any, Dict, Iterator, List -from common.tools.grpc.Tools import grpc_message_to_json_string +from common.tests.MockMessageBroker import MockMessageBroker, notify_event +from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string from context.proto.context_pb2 import ( - Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, Context, ContextEvent, ContextId, - ContextIdList, ContextList, Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, Empty, Link, LinkEvent, - LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, - SliceId, SliceIdList, SliceList, Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList) + Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, + Context, ContextEvent, ContextId, ContextIdList, ContextList, + Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, + Empty, EventTypeEnum, + Link, LinkEvent, LinkId, LinkIdList, LinkList, + Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, + Slice, SliceEvent, SliceId, SliceIdList, SliceList, + Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList) from context.proto.context_pb2_grpc import ContextServiceServicer LOGGER = logging.getLogger(__name__) +TOPIC_CONNECTION = 'connection' +TOPIC_CONTEXT = 'context' +TOPIC_TOPOLOGY = 'topology' +TOPIC_DEVICE = 'device' +TOPIC_LINK = 'link' +TOPIC_SERVICE = 'service' +TOPIC_SLICE = 'slice' + def get_container(database : Dict[str, Dict[str, Any]], container_name : str) -> Dict[str, Any]: return database.setdefault(container_name, {}) @@ -31,10 +44,15 @@ def get_entries(database : Dict[str, Dict[str, Any]], container_name : str) -> L container = get_container(database, container_name) return [container[entry_uuid] for entry_uuid in sorted(container.keys())] +def has_entry(database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str) -> Any: + LOGGER.debug('[has_entry] BEFORE database={:s}'.format(str(database))) + container = get_container(database, container_name) + return entry_uuid in container + def get_entry( context : grpc.ServicerContext, database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str ) -> Any: - LOGGER.debug('[get_entry] AFTER database={:s}'.format(str(database))) + LOGGER.debug('[get_entry] BEFORE database={:s}'.format(str(database))) container = get_container(database, container_name) if entry_uuid not in container: context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid))) @@ -60,8 +78,27 @@ class MockServicerImpl_Context(ContextServiceServicer): def __init__(self): LOGGER.info('[__init__] Creating Servicer...') self.database : Dict[str, Any] = {} + self.msg_broker = MockMessageBroker() LOGGER.info('[__init__] Servicer Created') + # ----- Common ----------------------------------------------------------------------------------------------------- + + def _set(self, request, container_name, entry_uuid, entry_id_field_name, topic_name): + exists = has_entry(self.database, container_name, entry_uuid) + entry = set_entry(self.database, container_name, entry_uuid, request) + event_type = EventTypeEnum.EVENTTYPE_UPDATE if exists else EventTypeEnum.EVENTTYPE_CREATE + entry_id = getattr(entry, entry_id_field_name) + dict_entry_id = grpc_message_to_json(entry_id) + notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id}) + return entry_id + + def _del(self, request, container_name, entry_uuid, entry_id_field_name, topic_name, grpc_context): + empty = del_entry(grpc_context, self.database, container_name, entry_uuid) + event_type = EventTypeEnum.EVENTTYPE_REMOVE + dict_entry_id = grpc_message_to_json(request) + notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id}) + return empty + # ----- Context ---------------------------------------------------------------------------------------------------- def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList: @@ -78,14 +115,15 @@ class MockServicerImpl_Context(ContextServiceServicer): def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId: LOGGER.info('[SetContext] request={:s}'.format(grpc_message_to_json_string(request))) - return set_entry(self.database, 'context', request.context_id.context_uuid.uuid, request).context_id + return self._set(request, 'context', request.context_uuid.uuid, 'context_id', TOPIC_CONTEXT) def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty: LOGGER.info('[RemoveContext] request={:s}'.format(grpc_message_to_json_string(request))) - return del_entry(context, self.database, 'context', request.context_uuid.uuid) + return self._del(request, 'context', request.context_uuid.uuid, 'context_id', TOPIC_CONTEXT, context) def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]: LOGGER.info('[GetContextEvents] request={:s}'.format(grpc_message_to_json_string(request))) + for message in self.msg_broker.consume({TOPIC_CONTEXT}): yield ContextEvent(**json.loads(message.content)) # ----- Topology --------------------------------------------------------------------------------------------------- @@ -108,15 +146,18 @@ class MockServicerImpl_Context(ContextServiceServicer): def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId: LOGGER.info('[SetTopology] request={:s}'.format(grpc_message_to_json_string(request))) container_name = 'topology[{:s}]'.format(str(request.topology_id.context_id.context_uuid.uuid)) - return set_entry(self.database, container_name, request.topology_id.topology_uuid.uuid, request).topology_id + topology_uuid = request.topology_id.topology_uuid.uuid + return self._set(request, container_name, topology_uuid, 'topology_id', TOPIC_TOPOLOGY) def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty: LOGGER.info('[RemoveTopology] request={:s}'.format(grpc_message_to_json_string(request))) container_name = 'topology[{:s}]'.format(str(request.context_id.context_uuid.uuid)) - return del_entry(context, self.database, container_name, request.topology_uuid.uuid) + topology_uuid = request.topology_uuid.uuid + return self._del(request, container_name, topology_uuid, 'topology_id', TOPIC_TOPOLOGY, context) def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]: LOGGER.info('[GetTopologyEvents] request={:s}'.format(grpc_message_to_json_string(request))) + for message in self.msg_broker.consume({TOPIC_TOPOLOGY}): yield TopologyEvent(**json.loads(message.content)) # ----- Device ----------------------------------------------------------------------------------------------------- @@ -135,14 +176,15 @@ class MockServicerImpl_Context(ContextServiceServicer): def SetDevice(self, request: Context, context : grpc.ServicerContext) -> DeviceId: LOGGER.info('[SetDevice] request={:s}'.format(grpc_message_to_json_string(request))) - return set_entry(self.database, 'device', request.device_id.device_uuid.uuid, request).device_id + return self._set(request, 'device', request.device_id.device_uuid.uuid, 'device_id', TOPIC_DEVICE) def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty: LOGGER.info('[RemoveDevice] request={:s}'.format(grpc_message_to_json_string(request))) - return del_entry(context, self.database, 'device', request.device_uuid.uuid) + return self._del(request, 'device', request.device_uuid.uuid, 'device_id', TOPIC_DEVICE, context) def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]: LOGGER.info('[GetDeviceEvents] request={:s}'.format(grpc_message_to_json_string(request))) + for message in self.msg_broker.consume({TOPIC_DEVICE}): yield DeviceEvent(**json.loads(message.content)) # ----- Link ------------------------------------------------------------------------------------------------------- @@ -161,14 +203,15 @@ class MockServicerImpl_Context(ContextServiceServicer): def SetLink(self, request: Context, context : grpc.ServicerContext) -> LinkId: LOGGER.info('[SetLink] request={:s}'.format(grpc_message_to_json_string(request))) - return set_entry(self.database, 'link', request.link_id.link_uuid.uuid, request).link_id + return self._set(request, 'link', request.link_id.link_uuid.uuid, 'link_id', TOPIC_LINK) def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty: LOGGER.info('[RemoveLink] request={:s}'.format(grpc_message_to_json_string(request))) - return del_entry(context, self.database, 'link', request.link_uuid.uuid) + return self._del(request, 'link', request.link_uuid.uuid, 'link_id', TOPIC_LINK, context) def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]: LOGGER.info('[GetLinkEvents] request={:s}'.format(grpc_message_to_json_string(request))) + for message in self.msg_broker.consume({TOPIC_LINK}): yield LinkEvent(**json.loads(message.content)) # ----- Slice ------------------------------------------------------------------------------------------------------ @@ -222,17 +265,19 @@ class MockServicerImpl_Context(ContextServiceServicer): def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId: LOGGER.info('[SetService] request={:s}'.format(grpc_message_to_json_string(request))) - return set_entry( - self.database, 'service[{:s}]'.format(str(request.service_id.context_id.context_uuid.uuid)), - request.service_id.service_uuid.uuid, request).service_id + container_name = 'service[{:s}]'.format(str(request.service_id.context_id.context_uuid.uuid)) + service_uuid = request.service_id.service_uuid.uuid + return self._set(request, container_name, service_uuid, 'service_id', TOPIC_SERVICE) def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty: LOGGER.info('[RemoveService] request={:s}'.format(grpc_message_to_json_string(request))) container_name = 'service[{:s}]'.format(str(request.context_id.context_uuid.uuid)) - return del_entry(context, self.database, container_name, request.service_uuid.uuid) + service_uuid = request.service_id.service_uuid.uuid + return self._del(request, container_name, service_uuid, 'service_id', TOPIC_SERVICE, context) def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]: LOGGER.info('[GetServiceEvents] request={:s}'.format(grpc_message_to_json_string(request))) + for message in self.msg_broker.consume({TOPIC_SERVICE}): yield ServiceEvent(**json.loads(message.content)) # ----- Connection ------------------------------------------------------------------------------------------------- @@ -255,21 +300,21 @@ class MockServicerImpl_Context(ContextServiceServicer): def SetConnection(self, request: Connection, context : grpc.ServicerContext) -> ConnectionId: LOGGER.info('[SetConnection] request={:s}'.format(grpc_message_to_json_string(request))) - service_connection__container_name = 'service_connection[{:s}/{:s}]'.format( + container_name = 'service_connection[{:s}/{:s}]'.format( str(request.service_id.context_id.context_uuid.uuid), str(request.service_id.service_uuid.uuid)) - set_entry( - self.database, service_connection__container_name, request.connection_id.connection_uuid.uuid, request) - return set_entry( - self.database, 'connection', request.connection_id.connection_uuid.uuid, request).connection_id + connection_uuid = request.connection_id.connection_uuid.uuid + set_entry(self.database, container_name, connection_uuid, request) + return self._set(request, 'connection', connection_uuid, 'connection_id', TOPIC_CONNECTION) def RemoveConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Empty: LOGGER.info('[RemoveConnection] request={:s}'.format(grpc_message_to_json_string(request))) connection = get_entry(context, self.database, 'connection', request.connection_uuid.uuid) - service_id = connection.service_id - service_connection__container_name = 'service_connection[{:s}/{:s}]'.format( - str(service_id.context_id.context_uuid.uuid), str(service_id.service_uuid.uuid)) - del_entry(context, self.database, service_connection__container_name, request.connection_uuid.uuid) - return del_entry(context, self.database, 'connection', request.connection_uuid.uuid) + container_name = 'service_connection[{:s}/{:s}]'.format( + str(connection.service_id.context_id.context_uuid.uuid), str(connection.service_id.service_uuid.uuid)) + connection_uuid = request.connection_uuid.uuid + del_entry(context, self.database, container_name, connection_uuid) + return self._del(request, 'connection', connection_uuid, 'connection_id', TOPIC_CONNECTION, context) def GetConnectionEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]: LOGGER.info('[GetConnectionEvents] request={:s}'.format(grpc_message_to_json_string(request))) + for message in self.msg_broker.consume({TOPIC_CONNECTION}): yield ConnectionEvent(**json.loads(message.content)) diff --git a/src/common/tests/MockServicerImpl_Service copy.py b/src/common/tests/MockServicerImpl_Service copy.py new file mode 100644 index 0000000000000000000000000000000000000000..3b5c769dd23553a29ba0f0b13925a442007e013c --- /dev/null +++ b/src/common/tests/MockServicerImpl_Service copy.py @@ -0,0 +1,108 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict, Iterator, NamedTuple, Tuple +import grpc, logging +from common.tests.MockMessageBroker import MockMessageBroker +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.proto.context_pb2 import Empty, TeraFlowController +from dlt.connector.proto.dlt_pb2 import ( + DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordOperationEnum, DltRecordStatus, DltRecordSubscription, DltRecordTypeEnum) +from dlt.connector.proto.dlt_pb2_grpc import DltServiceServicer + +LOGGER = logging.getLogger(__name__) + +DltRecordKey = Tuple[str, DltRecordOperationEnum, str] # domain_uuid, operation, record_uuid +DltRecordDict = Dict[DltRecordKey, DltRecord] # dlt_record_key => dlt_record + +class MockServicerImpl_Dlt(DltServiceServicer): + def __init__(self): + LOGGER.info('[__init__] Creating Servicer...') + self.records : DltRecordDict = {} + self.msg_broker = MockMessageBroker() + LOGGER.info('[__init__] Servicer Created') + + def RecordToDlt(self, request : DltRecord, context : grpc.ServicerContext) -> DltRecordStatus: + LOGGER.info('[RecordToDlt] request={:s}'.format(grpc_message_to_json_string(request))) + operation = request.operation + domain_uuid = request.record_id.domain_uuid + record_uuid = request.record_id.record_uuid + + #if operation == + + + def GetFromDlt(self, request : DltRecordId, context : grpc.ServicerContext) -> DltRecord: + LOGGER.info('[GetFromDlt] request={:s}'.format(grpc_message_to_json_string(request))) + + def SubscribeToDlt(self, request: DltRecordSubscription, context : grpc.ServicerContext) -> Iterator[DltRecordEvent]: + LOGGER.info('[SubscribeToDlt] request={:s}'.format(grpc_message_to_json_string(request))) + for message in self.msg_broker.consume({TOPIC_CONTEXT}): yield ContextEvent(**json.loads(message.content)) + + def GetDltStatus(self, request : TeraFlowController, context : grpc.ServicerContext) -> DltPeerStatus: + LOGGER.info('[GetDltStatus] request={:s}'.format(grpc_message_to_json_string(request))) + + def GetDltPeers(self, request : Empty, context : grpc.ServicerContext) -> DltPeerStatusList: + LOGGER.info('[GetDltPeers] request={:s}'.format(grpc_message_to_json_string(request))) + + + + + + + + LOGGER.info('[__init__] Servicer Created') + + # ----- Common ----------------------------------------------------------------------------------------------------- + + def _set(self, request, container_name, entry_uuid, entry_id_field_name, topic_name): + exists = has_entry(self.database, container_name, entry_uuid) + entry = set_entry(self.database, container_name, entry_uuid, request) + event_type = EventTypeEnum.EVENTTYPE_UPDATE if exists else EventTypeEnum.EVENTTYPE_CREATE + entry_id = getattr(entry, entry_id_field_name) + dict_entry_id = grpc_message_to_json(entry_id) + notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id}) + return entry_id + + def _del(self, request, container_name, entry_uuid, entry_id_field_name, topic_name, grpc_context): + empty = del_entry(grpc_context, self.database, container_name, entry_uuid) + event_type = EventTypeEnum.EVENTTYPE_REMOVE + dict_entry_id = grpc_message_to_json(request) + notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id}) + return empty + + # ----- Context ---------------------------------------------------------------------------------------------------- + + def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList: + LOGGER.info('[ListContextIds] request={:s}'.format(grpc_message_to_json_string(request))) + return ContextIdList(context_ids=[context.context_id for context in get_entries(self.database, 'context')]) + + def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList: + LOGGER.info('[ListContexts] request={:s}'.format(grpc_message_to_json_string(request))) + return ContextList(contexts=get_entries(self.database, 'context')) + + def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context: + LOGGER.info('[GetContext] request={:s}'.format(grpc_message_to_json_string(request))) + return get_entry(context, self.database, 'context', request.context_uuid.uuid) + + def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId: + LOGGER.info('[SetContext] request={:s}'.format(grpc_message_to_json_string(request))) + return self._set(request, 'context', request.context_uuid.uuid, 'context_id', TOPIC_CONTEXT) + + def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty: + LOGGER.info('[RemoveContext] request={:s}'.format(grpc_message_to_json_string(request))) + return self._del(request, 'context', request.context_uuid.uuid, 'context_id', TOPIC_CONTEXT, context) + + def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]: + LOGGER.info('[GetContextEvents] request={:s}'.format(grpc_message_to_json_string(request))) + for message in self.msg_broker.consume({TOPIC_CONTEXT}): yield ContextEvent(**json.loads(message.content)) diff --git a/src/dlt/connector/.gitlab-ci.yml b/src/dlt/connector/.gitlab-ci.yml new file mode 100644 index 0000000000000000000000000000000000000000..d62e8edad3ce8ceb1fa6e67f3213d761e36df012 --- /dev/null +++ b/src/dlt/connector/.gitlab-ci.yml @@ -0,0 +1,74 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Build, tag, and push the Docker images to the GitLab Docker registry +build slice: + variables: + IMAGE_NAME: 'slice' # name of the microservice + IMAGE_NAME_TEST: 'slice-test' # name of the microservice + IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) + stage: build + before_script: + - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY + script: + - docker build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile ./src/ + - docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" + - docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" + rules: + - changes: + - src/$IMAGE_NAME/** + - .gitlab-ci.yml + +# Pull, execute, and run unitary tests for the Docker image from the GitLab registry +unit_test slice: + variables: + IMAGE_NAME: 'slice' # name of the microservice + IMAGE_NAME_TEST: 'slice-test' # name of the microservice + IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) + stage: unit_test + needs: + - build slice + before_script: + - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY + - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi + script: + - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" + - docker run -d -p 4040:4040 --name $IMAGE_NAME --network=teraflowbridge "$IMAGE_NAME:$IMAGE_TAG" + - docker ps -a + - sleep 5 + - docker ps -a + - docker logs $IMAGE_NAME + - docker exec -i $IMAGE_NAME bash -c "pytest --log-level=DEBUG --verbose $IMAGE_NAME/tests/test_unitary.py" + after_script: + - docker stop $IMAGE_NAME + - docker rm $IMAGE_NAME + rules: + - changes: + - src/$IMAGE_NAME/** + - .gitlab-ci.yml + +# Deployment of the service in Kubernetes Cluster +deploy slice: + stage: deploy + needs: + - build slice + - unit_test slice + - dependencies all + - integ_test execute + script: + - kubectl version + - kubectl get all + - kubectl apply -f "manifests/sliceservice.yaml" + - kubectl delete pods --selector app=sliceservice + - kubectl get all diff --git a/src/dlt/connector/Dockerfile b/src/dlt/connector/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..d653bb21778adbbd09407c1ca54f0afdc7ae5d81 --- /dev/null +++ b/src/dlt/connector/Dockerfile @@ -0,0 +1,38 @@ +FROM python:3-slim + +# Install dependencies +RUN apt-get --yes --quiet --quiet update && \ + apt-get --yes --quiet --quiet install wget g++ && \ + rm -rf /var/lib/apt/lists/* + +# Set Python to show logs as they occur +ENV PYTHONUNBUFFERED=0 + +# Download the gRPC health probe +RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \ + wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ + chmod +x /bin/grpc_health_probe + +# Get generic Python packages +RUN python3 -m pip install --upgrade pip setuptools wheel pip-tools + +# Set working directory +WORKDIR /var/teraflow + +# Create module sub-folders +RUN mkdir -p /var/teraflow/slice + +# Get Python packages per module +COPY slice/requirements.in slice/requirements.in +RUN pip-compile --output-file=slice/requirements.txt slice/requirements.in +RUN python3 -m pip install -r slice/requirements.in + +# Add files into working directory +COPY common/. common +COPY context/. context +COPY interdomain/. interdomain +COPY service/. service +COPY slice/. slice + +# Start slice service +ENTRYPOINT ["python", "-m", "slice.service"] diff --git a/src/dlt/connector/client/DltClient.py b/src/dlt/connector/client/DltClient.py new file mode 100644 index 0000000000000000000000000000000000000000..79d38d12d345357561dd8ea3a64de726c5a9c77e --- /dev/null +++ b/src/dlt/connector/client/DltClient.py @@ -0,0 +1,171 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, logging +from common.Constants import ServiceNameEnum +from common.Settings import get_service_host, get_service_port_grpc +from common.tools.client.RetryDecorator import retry, delay_exponential +#from common.tools.grpc.Tools import grpc_message_to_json_string +#from slice.proto.context_pb2 import Empty, Slice, SliceId +from dlt.connector.proto.dlt_pb2_grpc import DltServiceStub + +LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 15 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') + +class DltClient: + def __init__(self, host=None, port=None): + if not host: host = get_service_host(ServiceNameEnum.DLT) + if not port: port = get_service_port_grpc(ServiceNameEnum.DLT) + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) + LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) + self.channel = None + self.stub = None + self.connect() + LOGGER.debug('Channel created') + + def connect(self): + self.channel = grpc.insecure_channel(self.endpoint) + self.stub = DltServiceStub(self.channel) + + def close(self): + if self.channel is not None: self.channel.close() + self.channel = None + self.stub = None + +# @RETRY_DECORATOR +# def CreateSlice(self, request : Slice) -> SliceId: +# LOGGER.debug('CreateSlice request: {:s}'.format(grpc_message_to_json_string(request))) +# response = self.stub.CreateSlice(request) +# LOGGER.debug('CreateSlice result: {:s}'.format(grpc_message_to_json_string(response))) +# return response + +# @RETRY_DECORATOR +# def UpdateSlice(self, request : Slice) -> SliceId: +# LOGGER.debug('UpdateSlice request: {:s}'.format(grpc_message_to_json_string(request))) +# response = self.stub.UpdateSlice(request) +# LOGGER.debug('UpdateSlice result: {:s}'.format(grpc_message_to_json_string(response))) +# return response + +# @RETRY_DECORATOR +# def DeleteSlice(self, request : SliceId) -> Empty: +# LOGGER.debug('DeleteSlice request: {:s}'.format(grpc_message_to_json_string(request))) +# response = self.stub.DeleteSlice(request) +# LOGGER.debug('DeleteSlice result: {:s}'.format(grpc_message_to_json_string(response))) +# return response + + + + + + + +// Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; +package dlt; + +import "context.proto"; + +service DltService { + rpc RecordToDlt (DltRecord ) returns ( DltRecordStatus ) {} + rpc GetFromDlt (DltRecordId ) returns ( DltRecord ) {} + rpc SubscribeToDlt(DltRecordSubscription ) returns (stream DltRecordEvent ) {} + rpc GetDltStatus (context.TeraFlowController) returns ( DltPeerStatus ) {} // NEC is checkig if it is possible + rpc GetDltPeers (context.Empty ) returns ( DltPeerStatusList) {} // NEC is checkig if it is possible +} + +enum DltRecordTypeEnum { + DLTRECORDTYPE_UNDEFINED = 0; + DLTRECORDTYPE_CONTEXT = 1; + DLTRECORDTYPE_TOPOLOGY = 2; + DLTRECORDTYPE_DEVICE = 3; + DLTRECORDTYPE_LINK = 4; + DLTRECORDTYPE_SERVICE = 5; + DLTRECORDTYPE_SLICE = 6; +} + +enum DltRecordOperationEnum { + DLTRECORDOPERATION_UNDEFINED = 0; + DLTRECORDOPERATION_ADD = 1; + DLTRECORDOPERATION_UPDATE = 2; + DLTRECORDOPERATION_DELETE = 3; +} + +enum DltRecordStatusEnum { + DLTRECORDSTATUS_UNDEFINED = 0; + DLTRECORDSTATUS_SUCCEEDED = 1; + DLTRECORDSTATUS_FAILED = 2; +} + +enum DltStatusEnum { + DLTSTATUS_UNDEFINED = 0; + DLTSTATUS_NOTAVAILABLE = 1; + DLTSTATUS_INITIALIZED = 2; + DLTSTATUS_AVAILABLE = 3; + DLTSTATUS_DEINIT = 4; +} + +message DltRecordId { + context.Uuid domain_uuid = 1; // unique identifier of domain owning the record + DltRecordTypeEnum type = 2; // type of record + context.Uuid record_uuid = 3; // unique identifier of the record within the domain context_uuid/topology_uuid +} + +message DltRecord { + DltRecordId record_id = 1; // record identifier + DltRecordOperationEnum operation = 2; // operation to be performed over the record + string data_json = 3; // record content: JSON-encoded record content +} + +message DltRecordSubscription { + // retrieved events have to match ALL conditions. + // i.e., type in types requested, AND operation in operations requested + // TODO: consider adding a more sophisticated filtering + repeated DltRecordTypeEnum type = 1; // selected event types, empty=all + repeated DltRecordOperationEnum operation = 2; // selected event operations, empty=all +} + +message DltRecordEvent { + context.Event event = 1; // common event data (timestamp & event_type) + DltRecordId record_id = 2; // record identifier associated with this event +} + +message DltRecordStatus { + DltRecordId record_id = 1; // identifier of the associated record + DltRecordStatusEnum status = 2; // status of the record + string error_message = 3; // error message in case of failure, empty otherwise +} + +message DltPeerStatus { + context.TeraFlowController controller = 1; // Identifier of the TeraFlow controller instance + DltStatusEnum status = 2; // Status of the TeraFlow controller instance +} + +message DltPeerStatusList { + repeated DltPeerStatus peers = 1; // List of peers and their status +} diff --git a/src/dlt/connector/service/DltConnector.py b/src/dlt/connector/service/DltConnector.py new file mode 100644 index 0000000000000000000000000000000000000000..b8f7e5311dcff4013c76ffe370076acd34e46521 --- /dev/null +++ b/src/dlt/connector/service/DltConnector.py @@ -0,0 +1,51 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, threading +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient +from context.client.EventsCollector import EventsCollector +from dlt.connector.client.DltClient import DltClient + +LOGGER = logging.getLogger(__name__) + +class DltConnector: + def __init__(self) -> None: + LOGGER.debug('Creating connector...') + self._terminate = threading.Event() + self._thread = None + LOGGER.debug('Connector created') + + def start(self): + self._terminate.clear() + self._thread = threading.Thread(target=self._run_events_collector) + self._thread.start() + + def _run_events_collector(self) -> None: + dlt_client = DltClient() + context_client = ContextClient() + events_collector = EventsCollector(context_client) + events_collector.start() + + while not self._terminate.is_set(): + event = events_collector.get_event() + LOGGER.info('Event from Context Received: {:s}'.format(grpc_message_to_json_string(event))) + + events_collector.stop() + context_client.close() + dlt_client.close() + + def stop(self): + self._terminate.set() + self._thread.join() diff --git a/src/dlt/connector/service/DltConnectorService.py b/src/dlt/connector/service/DltConnectorService.py new file mode 100644 index 0000000000000000000000000000000000000000..de696e88c3bc9b72b733e3763e0fec79de199068 --- /dev/null +++ b/src/dlt/connector/service/DltConnectorService.py @@ -0,0 +1,28 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.Constants import ServiceNameEnum +from common.Settings import get_service_port_grpc +from common.tools.service.GenericGrpcService import GenericGrpcService +from ..proto.dlt_pb2_grpc import add_SliceServiceServicer_to_server +from .DltConnectorServiceServicerImpl import SliceServiceServicerImpl + +class DltConnectorService(GenericGrpcService): + def __init__(self, cls_name: str = __name__) -> None: + port = get_service_port_grpc(ServiceNameEnum.SLICE) + super().__init__(port, cls_name=cls_name) + self.slice_servicer = SliceServiceServicerImpl() + + def install_servicers(self): + add_SliceServiceServicer_to_server(self.slice_servicer, self.server) diff --git a/src/dlt/connector/service/DltConnectorServiceServicerImpl.py b/src/dlt/connector/service/DltConnectorServiceServicerImpl.py new file mode 100644 index 0000000000000000000000000000000000000000..a19c797eadebb2fa0f03e3c33c91e62781807ccb --- /dev/null +++ b/src/dlt/connector/service/DltConnectorServiceServicerImpl.py @@ -0,0 +1,33 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, logging +from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method +from context.proto.context_pb2 import Slice, SliceId +from ..proto.dlt_pb2_grpc import DltServiceServicer + +LOGGER = logging.getLogger(__name__) + +SERVICE_NAME = 'DltConnector' +METHOD_NAMES = ['CreateSlice', 'UpdateSlice', 'DeleteSlice'] +METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) + +class DltConnectorServiceServicerImpl(DltServiceServicer): + def __init__(self): + LOGGER.debug('Creating Servicer...') + LOGGER.debug('Servicer Created') + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def CreateSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: + return SliceId() diff --git a/src/dlt/connector/service/__main__.py b/src/dlt/connector/service/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..435a93f61bf934a17d9c044756648176e9cb2d2d --- /dev/null +++ b/src/dlt/connector/service/__main__.py @@ -0,0 +1,65 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, signal, sys, threading +from prometheus_client import start_http_server +from common.Constants import ServiceNameEnum +from common.Settings import ( + ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, + wait_for_environment_variables) +from .DltConnectorService import DltConnectorService + +terminate = threading.Event() +LOGGER : logging.Logger = None + +def signal_handler(signal, frame): # pylint: disable=redefined-outer-name + LOGGER.warning('Terminate signal received') + terminate.set() + +def main(): + global LOGGER # pylint: disable=global-statement + + log_level = get_log_level() + logging.basicConfig(level=log_level) + LOGGER = logging.getLogger(__name__) + + wait_for_environment_variables([ + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + ]) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + LOGGER.info('Starting...') + + # Start metrics server + metrics_port = get_metrics_port() + start_http_server(metrics_port) + + # Starting DLT connector service + grpc_service = DltConnectorService() + grpc_service.start() + + # Wait for Ctrl+C or termination signal + while not terminate.wait(timeout=0.1): pass + + LOGGER.info('Terminating...') + grpc_service.stop() + + LOGGER.info('Bye') + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/dlt/connector/tests/test_unitary.py b/src/dlt/connector/tests/test_unitary.py new file mode 100644 index 0000000000000000000000000000000000000000..0f1fcb3eb28805fc207b3ea0dd84b161b6261f24 --- /dev/null +++ b/src/dlt/connector/tests/test_unitary.py @@ -0,0 +1,49 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import Tuple +from common.orm.Database import Database +from common.message_broker.MessageBroker import MessageBroker +from context.client.ContextClient import ContextClient +from context.client.EventsCollector import EventsCollector +from context.proto.context_pb2 import Context, ContextId, Device, DeviceId, Link, LinkId, Topology, TopologyId +from .PrepareTestScenario import context_db_mb, context_service, context_client # pylint: disable=unused-import +from .Objects import CONTEXTS, TOPOLOGIES, DEVICES, LINKS + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +def test_create_events( + context_client : ContextClient, # pylint: disable=redefined-outer-name + context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name + context_database = context_db_mb[0] + + context_database.clear_all() + + events_collector = EventsCollector(context_client) + events_collector.start() + events = events_collector.get_events(block=True, count=4) + events_collector.stop() + + for context in CONTEXTS : context_client.SetContext (Context (**context )) + for topology in TOPOLOGIES: context_client.SetTopology(Topology(**topology)) + for device in DEVICES : context_client.SetDevice (Device (**device )) + for link in LINKS : context_client.SetLink (Link (**link )) + + + for link in LINKS : context_client.RemoveLink (LinkId (**link ['link_id' ])) + for device in DEVICES : context_client.RemoveDevice (DeviceId (**device ['device_id' ])) + for topology in TOPOLOGIES: context_client.RemoveTopology(TopologyId(**topology['topology_id'])) + for context in CONTEXTS : context_client.RemoveContext (ContextId (**context ['context_id' ]))