Skip to content
Snippets Groups Projects
Commit 2b9be99a authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

DLT connector: intermediate backup

parent 23157444
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!16DLT component (and related) improvements
#!/bin/bash -eu
mkdir -p src/common/proto
rm -rf src/common/proto/*.py
touch src/common/proto/__init__.py
python3 -m grpc_tools.protoc -I=./proto --python_out=src/common/proto/ --grpc_python_out=src/common/proto/ proto/*.proto
find src/common/proto -type f -iname *.py -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
build slice: build slice:
variables: variables:
IMAGE_NAME: 'slice' # name of the microservice IMAGE_NAME: 'slice' # name of the microservice
IMAGE_NAME_TEST: 'slice-test' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build stage: build
before_script: before_script:
...@@ -34,7 +33,6 @@ build slice: ...@@ -34,7 +33,6 @@ build slice:
unit_test slice: unit_test slice:
variables: variables:
IMAGE_NAME: 'slice' # name of the microservice IMAGE_NAME: 'slice' # name of the microservice
IMAGE_NAME_TEST: 'slice-test' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test stage: unit_test
needs: needs:
......
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging, queue, threading
from common.proto.dlt_gateway_pb2 import DltRecordSubscription
from common.tools.grpc.Tools import grpc_message_to_json_string
from dlt.connector.client.DltGatewayClient import DltGatewayClient
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
class DltEventsCollector:
def __init__(
self, dltgateway_client : DltGatewayClient,
log_events_received : bool = False,
) -> None:
self._events_queue = queue.Queue()
self._log_events_received = log_events_received
subscription = DltRecordSubscription() # bu default subscribe to all
self._dltgateway_stream = dltgateway_client.SubscribeToDlt(subscription)
self._dltgateway_thread = self._create_collector_thread(self._dltgateway_stream)
def _create_collector_thread(self, stream, as_daemon : bool = False):
return threading.Thread(target=self._collect, args=(stream,), daemon=as_daemon)
def _collect(self, events_stream) -> None:
try:
for event in events_stream:
if self._log_events_received:
LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event)))
self._events_queue.put_nowait(event)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
raise # pragma: no cover
def start(self):
if self._dltgateway_thread is not None: self._dltgateway_thread.start()
def get_event(self, block : bool = True, timeout : float = 0.1):
try:
return self._events_queue.get(block=block, timeout=timeout)
except queue.Empty: # pylint: disable=catching-non-exception
return None
def get_events(self, block : bool = True, timeout : float = 0.1, count : int = None):
events = []
if count is None:
while True:
event = self.get_event(block=block, timeout=timeout)
if event is None: break
events.append(event)
else:
for _ in range(count):
event = self.get_event(block=block, timeout=timeout)
if event is None: continue
events.append(event)
return sorted(events, key=lambda e: e.event.timestamp.timestamp)
def stop(self):
if self._dltgateway_stream is not None: self._dltgateway_stream.cancel()
if self._dltgateway_thread is not None: self._dltgateway_thread.join()
import sys
from .client.DltGatewayClient import DltGatewayClient
from .client.DltEventsCollector import DltEventsCollector
def main():
dltgateway_client_1 = DltGatewayClient(host='', port=0)
dltgateway_client_2 = DltGatewayClient(host='', port=0)
dltgateway_client_3 = DltGatewayClient(host='', port=0)
dltgateway_collector_1 = DltEventsCollector(dltgateway_client_1, log_events_received=True)
dltgateway_collector_2 = DltEventsCollector(dltgateway_client_2, log_events_received=True)
dltgateway_collector_3 = DltEventsCollector(dltgateway_client_3, log_events_received=True)
if __name__ == '__main__':
sys.exit(main())
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Union
from common.Constants import ServiceNameEnum
from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name
from common.proto.dlt_gateway_pb2_grpc import add_DltGatewayServiceServicer_to_server
from common.tests.MockServicerImpl_DltGateway import MockServicerImpl_DltGateway
from common.tools.service.GenericGrpcService import GenericGrpcService
LOCAL_HOST = '127.0.0.1'
SERVICE_DLT = ServiceNameEnum.DLT
class MockService_Dependencies(GenericGrpcService):
def __init__(self, bind_port: Union[str, int]) -> None:
super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService')
# pylint: disable=attribute-defined-outside-init
def install_servicers(self):
self.dltgateway_servicer = MockServicerImpl_DltGateway()
add_DltGatewayServiceServicer_to_server(self.dltgateway_servicer, self.server)
def configure_env_vars(self):
os.environ[get_env_var_name(SERVICE_DLT, ENVVAR_SUFIX_SERVICE_HOST )] = str(self.bind_address)
os.environ[get_env_var_name(SERVICE_DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(self.bind_port)
...@@ -23,28 +23,87 @@ from common.message_broker.Factory import get_messagebroker_backend, BackendEnum ...@@ -23,28 +23,87 @@ from common.message_broker.Factory import get_messagebroker_backend, BackendEnum
from common.message_broker.MessageBroker import MessageBroker from common.message_broker.MessageBroker import MessageBroker
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
from context.service.grpc_server.ContextService import ContextService from context.service.grpc_server.ContextService import ContextService
from dlt.connector.client.DltConnectorClient import DltConnectorClient
from dlt.connector.service.DltConnectorService import DltConnectorService
from .MockService_Dependencies import MockService_Dependencies
LOCAL_HOST = '127.0.0.1' LOCAL_HOST = '127.0.0.1'
GRPC_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports MOCKSERVICE_PORT = 10000
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) #GRPC_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT) #os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
#os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT)
# ===== BlockChain Emulator (Mock DLT Gateway) =========================================================================
# A single gateway is used for all the domains
@pytest.fixture(scope='session')
def dltgateway_service():
_service = MockService_Dependencies(MOCKSERVICE_PORT)
_service.configure_env_vars()
_service.start()
yield _service
_service.stop()
# ===== Domain A (Real Context + Real DLT Connector) ===================================================================
@pytest.fixture(scope='session')
def context_service_a(): # pylint: disable=redefined-outer-name
_database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY))
_message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY))
_service = ContextService(_database, _message_broker)
_service.start()
yield _service
_service.stop()
_message_broker.terminate()
@pytest.fixture(scope='session')
def context_client_a(context_service_a : ContextService): # pylint: disable=redefined-outer-name
_client = ContextClient(host=context_service_a.bind_address, port=context_service_a.bind_port)
yield _client
_client.close()
@pytest.fixture(scope='session')
def dltconnector_service_a():
_service = DltConnectorService()
_service.bind_port += 1
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def dltconnector_client_a(dltconnector_service_a : DltConnectorService): # pylint: disable=redefined-outer-name
_client = DltConnectorClient(host=dltconnector_service_a.bind_address, port=dltconnector_service_a.bind_port)
yield _client
_client.close()
# ===== Domain B (Real Context + Real DLT Connector) ===================================================================
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def context_db_mb() -> Tuple[Database, MessageBroker]: def context_service_b(): # pylint: disable=redefined-outer-name
_database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY)) _database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY))
_message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY)) _message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY))
yield _database, _message_broker _service = ContextService(_database, _message_broker)
_service.start()
yield _service
_service.stop()
_message_broker.terminate() _message_broker.terminate()
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name def context_client_b(context_service_b : ContextService): # pylint: disable=redefined-outer-name
_service = ContextService(context_db_mb[0], context_db_mb[1]) _client = ContextClient(host=context_service_b.bind_address, port=context_service_b.bind_port)
yield _client
_client.close()
@pytest.fixture(scope='session')
def dltconnector_service_b():
_service = DltConnectorService()
_service.bind_port += 2
_service.start() _service.start()
yield _service yield _service
_service.stop() _service.stop()
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name def dltconnector_client_b(dltconnector_service_b : DltConnectorService): # pylint: disable=redefined-outer-name
_client = ContextClient() _client = DltConnectorClient(host=dltconnector_service_b.bind_address, port=dltconnector_service_b.bind_port)
yield _client yield _client
_client.close() _client.close()
...@@ -16,9 +16,8 @@ import logging ...@@ -16,9 +16,8 @@ import logging
from typing import Tuple from typing import Tuple
from common.orm.Database import Database from common.orm.Database import Database
from common.message_broker.MessageBroker import MessageBroker from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import Context, ContextId, Device, DeviceId, Link, LinkId, Topology, TopologyId
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from context.proto.context_pb2 import Context, ContextId, Device, DeviceId, Link, LinkId, Topology, TopologyId
from .PrepareTestScenario import context_db_mb, context_service, context_client # pylint: disable=unused-import from .PrepareTestScenario import context_db_mb, context_service, context_client # pylint: disable=unused-import
from .Objects import CONTEXTS, TOPOLOGIES, DEVICES, LINKS from .Objects import CONTEXTS, TOPOLOGIES, DEVICES, LINKS
...@@ -32,11 +31,6 @@ def test_create_events( ...@@ -32,11 +31,6 @@ def test_create_events(
context_database.clear_all() context_database.clear_all()
events_collector = EventsCollector(context_client)
events_collector.start()
events = events_collector.get_events(block=True, count=4)
events_collector.stop()
for context in CONTEXTS : context_client.SetContext (Context (**context )) for context in CONTEXTS : context_client.SetContext (Context (**context ))
for topology in TOPOLOGIES: context_client.SetTopology(Topology(**topology)) for topology in TOPOLOGIES: context_client.SetTopology(Topology(**topology))
for device in DEVICES : context_client.SetDevice (Device (**device )) for device in DEVICES : context_client.SetDevice (Device (**device ))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment