diff --git a/genproto.sh b/genproto.sh new file mode 100755 index 0000000000000000000000000000000000000000..d7ffe50e6068266583b61b627431714970703a4b --- /dev/null +++ b/genproto.sh @@ -0,0 +1,7 @@ +#!/bin/bash -eu + +mkdir -p src/common/proto +rm -rf src/common/proto/*.py +touch src/common/proto/__init__.py +python3 -m grpc_tools.protoc -I=./proto --python_out=src/common/proto/ --grpc_python_out=src/common/proto/ proto/*.proto +find src/common/proto -type f -iname *.py -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; diff --git a/src/dlt/connector/.gitlab-ci.yml b/src/dlt/connector/.gitlab-ci.yml index d62e8edad3ce8ceb1fa6e67f3213d761e36df012..08c58ae4a935da7d8929b45923eedfcaa3053f75 100644 --- a/src/dlt/connector/.gitlab-ci.yml +++ b/src/dlt/connector/.gitlab-ci.yml @@ -16,7 +16,6 @@ build slice: variables: IMAGE_NAME: 'slice' # name of the microservice - IMAGE_NAME_TEST: 'slice-test' # name of the microservice IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) stage: build before_script: @@ -34,7 +33,6 @@ build slice: unit_test slice: variables: IMAGE_NAME: 'slice' # name of the microservice - IMAGE_NAME_TEST: 'slice-test' # name of the microservice IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) stage: unit_test needs: diff --git a/src/dlt/connector/client/DltEventsCollector.py b/src/dlt/connector/client/DltEventsCollector.py new file mode 100644 index 0000000000000000000000000000000000000000..6fe2474cead37094c507a8a612181dc7f7243544 --- /dev/null +++ b/src/dlt/connector/client/DltEventsCollector.py @@ -0,0 +1,72 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, logging, queue, threading +from common.proto.dlt_gateway_pb2 import DltRecordSubscription +from common.tools.grpc.Tools import grpc_message_to_json_string +from dlt.connector.client.DltGatewayClient import DltGatewayClient + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +class DltEventsCollector: + def __init__( + self, dltgateway_client : DltGatewayClient, + log_events_received : bool = False, + ) -> None: + self._events_queue = queue.Queue() + self._log_events_received = log_events_received + subscription = DltRecordSubscription() # bu default subscribe to all + self._dltgateway_stream = dltgateway_client.SubscribeToDlt(subscription) + self._dltgateway_thread = self._create_collector_thread(self._dltgateway_stream) + + def _create_collector_thread(self, stream, as_daemon : bool = False): + return threading.Thread(target=self._collect, args=(stream,), daemon=as_daemon) + + def _collect(self, events_stream) -> None: + try: + for event in events_stream: + if self._log_events_received: + LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event))) + self._events_queue.put_nowait(event) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member + raise # pragma: no cover + + def start(self): + if self._dltgateway_thread is not None: self._dltgateway_thread.start() + + def get_event(self, block : bool = True, timeout : float = 0.1): + try: + return self._events_queue.get(block=block, timeout=timeout) + except queue.Empty: # pylint: disable=catching-non-exception + return None + + def get_events(self, block : bool = True, timeout : float = 0.1, count : int = None): + events = [] + if count is None: + while True: + event = self.get_event(block=block, timeout=timeout) + if event is None: break + events.append(event) + else: + for _ in range(count): + event = self.get_event(block=block, timeout=timeout) + if event is None: continue + events.append(event) + return sorted(events, key=lambda e: e.event.timestamp.timestamp) + + def stop(self): + if self._dltgateway_stream is not None: self._dltgateway_stream.cancel() + if self._dltgateway_thread is not None: self._dltgateway_thread.join() diff --git a/src/dlt/connector/main_test.py b/src/dlt/connector/main_test.py new file mode 100644 index 0000000000000000000000000000000000000000..a5ad9c33c0c1d7f218a01e2b83d4c1e6dfc6df6a --- /dev/null +++ b/src/dlt/connector/main_test.py @@ -0,0 +1,15 @@ +import sys +from .client.DltGatewayClient import DltGatewayClient +from .client.DltEventsCollector import DltEventsCollector + +def main(): + dltgateway_client_1 = DltGatewayClient(host='', port=0) + dltgateway_client_2 = DltGatewayClient(host='', port=0) + dltgateway_client_3 = DltGatewayClient(host='', port=0) + + dltgateway_collector_1 = DltEventsCollector(dltgateway_client_1, log_events_received=True) + dltgateway_collector_2 = DltEventsCollector(dltgateway_client_2, log_events_received=True) + dltgateway_collector_3 = DltEventsCollector(dltgateway_client_3, log_events_received=True) + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/dlt/connector/tests/MockService_Dependencies.py b/src/dlt/connector/tests/MockService_Dependencies.py new file mode 100644 index 0000000000000000000000000000000000000000..65ddc3cb48cb878b2ab5ba8b5ec44479b0b71451 --- /dev/null +++ b/src/dlt/connector/tests/MockService_Dependencies.py @@ -0,0 +1,38 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from typing import Union +from common.Constants import ServiceNameEnum +from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name +from common.proto.dlt_gateway_pb2_grpc import add_DltGatewayServiceServicer_to_server +from common.tests.MockServicerImpl_DltGateway import MockServicerImpl_DltGateway +from common.tools.service.GenericGrpcService import GenericGrpcService + +LOCAL_HOST = '127.0.0.1' + +SERVICE_DLT = ServiceNameEnum.DLT + +class MockService_Dependencies(GenericGrpcService): + def __init__(self, bind_port: Union[str, int]) -> None: + super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService') + + # pylint: disable=attribute-defined-outside-init + def install_servicers(self): + self.dltgateway_servicer = MockServicerImpl_DltGateway() + add_DltGatewayServiceServicer_to_server(self.dltgateway_servicer, self.server) + + def configure_env_vars(self): + os.environ[get_env_var_name(SERVICE_DLT, ENVVAR_SUFIX_SERVICE_HOST )] = str(self.bind_address) + os.environ[get_env_var_name(SERVICE_DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(self.bind_port) diff --git a/src/dlt/connector/tests/PrepareTestScenario.py b/src/dlt/connector/tests/PrepareTestScenario.py index 271deb72a8e302737e366c8607b70443a7478048..5c5d1cb5cc1c6868a5b47d929f026deecbe52f52 100644 --- a/src/dlt/connector/tests/PrepareTestScenario.py +++ b/src/dlt/connector/tests/PrepareTestScenario.py @@ -23,28 +23,87 @@ from common.message_broker.Factory import get_messagebroker_backend, BackendEnum from common.message_broker.MessageBroker import MessageBroker from context.client.ContextClient import ContextClient from context.service.grpc_server.ContextService import ContextService +from dlt.connector.client.DltConnectorClient import DltConnectorClient +from dlt.connector.service.DltConnectorService import DltConnectorService +from .MockService_Dependencies import MockService_Dependencies LOCAL_HOST = '127.0.0.1' -GRPC_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports -os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) -os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT) +MOCKSERVICE_PORT = 10000 +#GRPC_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports +#os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +#os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT) + +# ===== BlockChain Emulator (Mock DLT Gateway) ========================================================================= +# A single gateway is used for all the domains + +@pytest.fixture(scope='session') +def dltgateway_service(): + _service = MockService_Dependencies(MOCKSERVICE_PORT) + _service.configure_env_vars() + _service.start() + yield _service + _service.stop() + +# ===== Domain A (Real Context + Real DLT Connector) =================================================================== + +@pytest.fixture(scope='session') +def context_service_a(): # pylint: disable=redefined-outer-name + _database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY)) + _message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY)) + _service = ContextService(_database, _message_broker) + _service.start() + yield _service + _service.stop() + _message_broker.terminate() + +@pytest.fixture(scope='session') +def context_client_a(context_service_a : ContextService): # pylint: disable=redefined-outer-name + _client = ContextClient(host=context_service_a.bind_address, port=context_service_a.bind_port) + yield _client + _client.close() + +@pytest.fixture(scope='session') +def dltconnector_service_a(): + _service = DltConnectorService() + _service.bind_port += 1 + _service.start() + yield _service + _service.stop() + +@pytest.fixture(scope='session') +def dltconnector_client_a(dltconnector_service_a : DltConnectorService): # pylint: disable=redefined-outer-name + _client = DltConnectorClient(host=dltconnector_service_a.bind_address, port=dltconnector_service_a.bind_port) + yield _client + _client.close() + +# ===== Domain B (Real Context + Real DLT Connector) =================================================================== @pytest.fixture(scope='session') -def context_db_mb() -> Tuple[Database, MessageBroker]: +def context_service_b(): # pylint: disable=redefined-outer-name _database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY)) _message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY)) - yield _database, _message_broker + _service = ContextService(_database, _message_broker) + _service.start() + yield _service + _service.stop() _message_broker.terminate() @pytest.fixture(scope='session') -def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name - _service = ContextService(context_db_mb[0], context_db_mb[1]) +def context_client_b(context_service_b : ContextService): # pylint: disable=redefined-outer-name + _client = ContextClient(host=context_service_b.bind_address, port=context_service_b.bind_port) + yield _client + _client.close() + +@pytest.fixture(scope='session') +def dltconnector_service_b(): + _service = DltConnectorService() + _service.bind_port += 2 _service.start() yield _service _service.stop() @pytest.fixture(scope='session') -def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name - _client = ContextClient() +def dltconnector_client_b(dltconnector_service_b : DltConnectorService): # pylint: disable=redefined-outer-name + _client = DltConnectorClient(host=dltconnector_service_b.bind_address, port=dltconnector_service_b.bind_port) yield _client _client.close() diff --git a/src/dlt/connector/tests/test_unitary.py b/src/dlt/connector/tests/test_unitary.py index 0f1fcb3eb28805fc207b3ea0dd84b161b6261f24..f5f54798ffe96de3e4c7e71c4083effceaa25cae 100644 --- a/src/dlt/connector/tests/test_unitary.py +++ b/src/dlt/connector/tests/test_unitary.py @@ -16,9 +16,8 @@ import logging from typing import Tuple from common.orm.Database import Database from common.message_broker.MessageBroker import MessageBroker +from common.proto.context_pb2 import Context, ContextId, Device, DeviceId, Link, LinkId, Topology, TopologyId from context.client.ContextClient import ContextClient -from context.client.EventsCollector import EventsCollector -from context.proto.context_pb2 import Context, ContextId, Device, DeviceId, Link, LinkId, Topology, TopologyId from .PrepareTestScenario import context_db_mb, context_service, context_client # pylint: disable=unused-import from .Objects import CONTEXTS, TOPOLOGIES, DEVICES, LINKS @@ -32,11 +31,6 @@ def test_create_events( context_database.clear_all() - events_collector = EventsCollector(context_client) - events_collector.start() - events = events_collector.get_events(block=True, count=4) - events_collector.stop() - for context in CONTEXTS : context_client.SetContext (Context (**context )) for topology in TOPOLOGIES: context_client.SetTopology(Topology(**topology)) for device in DEVICES : context_client.SetDevice (Device (**device ))