From 692fc03ee9658b51f57f3c8004232101cbc7f18a Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Fri, 13 Jan 2023 15:29:44 +0000 Subject: [PATCH] Common: - Added backend for NATS message broker - removed unneeded test script --- src/common/message_broker/Factory.py | 2 + .../message_broker/backend/BackendEnum.py | 3 +- .../backend/nats/NatsBackend.py | 49 +++++++++++++++ .../backend/nats/NatsBackendThread.py | 61 +++++++++++++++++++ .../message_broker/backend/nats/__init__.py | 14 +++++ test-context.sh | 58 ------------------ 6 files changed, 128 insertions(+), 59 deletions(-) create mode 100644 src/common/message_broker/backend/nats/NatsBackend.py create mode 100644 src/common/message_broker/backend/nats/NatsBackendThread.py create mode 100644 src/common/message_broker/backend/nats/__init__.py delete mode 100755 test-context.sh diff --git a/src/common/message_broker/Factory.py b/src/common/message_broker/Factory.py index c5d48f9e1..e60118706 100644 --- a/src/common/message_broker/Factory.py +++ b/src/common/message_broker/Factory.py @@ -17,12 +17,14 @@ from typing import Optional, Union from .backend._Backend import _Backend from .backend.BackendEnum import BackendEnum from .backend.inmemory.InMemoryBackend import InMemoryBackend +from .backend.nats.NatsBackend import NatsBackend #from .backend.redis.RedisBackend import RedisBackend LOGGER = logging.getLogger(__name__) BACKENDS = { BackendEnum.INMEMORY.value: InMemoryBackend, + BackendEnum.NATS.value: NatsBackend, #BackendEnum.REDIS.value: RedisBackend, #BackendEnum.KAFKA.value: KafkaBackend, #BackendEnum.RABBITMQ.value: RabbitMQBackend, diff --git a/src/common/message_broker/backend/BackendEnum.py b/src/common/message_broker/backend/BackendEnum.py index bf95f1764..05dde8197 100644 --- a/src/common/message_broker/backend/BackendEnum.py +++ b/src/common/message_broker/backend/BackendEnum.py @@ -16,7 +16,8 @@ from enum import Enum class BackendEnum(Enum): INMEMORY = 'inmemory' - REDIS = 'redis' + NATS = 'nats' + #REDIS = 'redis' #KAFKA = 'kafka' #RABBITMQ = 'rabbitmq' #ZEROMQ = 'zeromq' diff --git a/src/common/message_broker/backend/nats/NatsBackend.py b/src/common/message_broker/backend/nats/NatsBackend.py new file mode 100644 index 000000000..0825095eb --- /dev/null +++ b/src/common/message_broker/backend/nats/NatsBackend.py @@ -0,0 +1,49 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import queue, threading +from typing import Iterator, Set, Tuple +from common.Settings import get_setting +from common.message_broker.Message import Message +from .._Backend import _Backend +from .NatsBackendThread import NatsBackendThread + +DEFAULT_NATS_URI = 'nats://127.0.0.1:4222' + +class NatsBackend(_Backend): + def __init__(self, **settings) -> None: # pylint: disable=super-init-not-called + nats_uri = get_setting('NATS_URI', settings=settings, default=DEFAULT_NATS_URI) + self._terminate = threading.Event() + self._nats_backend_thread = NatsBackendThread(nats_uri) + self._nats_backend_thread.start() + + def terminate(self) -> None: + self._terminate.set() + self._nats_backend_thread.terminate() + self._nats_backend_thread.join() + + def publish(self, topic_name : str, message_content : str) -> None: + self._nats_backend_thread.publish(topic_name, message_content) + + def consume(self, topic_names : Set[str], consume_timeout : float) -> Iterator[Tuple[str, str]]: + out_queue = queue.Queue[Message]() + unsubscribe = threading.Event() + for topic_name in topic_names: + self._nats_backend_thread.subscribe(topic_name, consume_timeout, out_queue, unsubscribe) + while not self._terminate.is_set(): + try: + yield out_queue.get(block=True, timeout=consume_timeout) + except queue.Empty: + continue + unsubscribe.set() diff --git a/src/common/message_broker/backend/nats/NatsBackendThread.py b/src/common/message_broker/backend/nats/NatsBackendThread.py new file mode 100644 index 000000000..e11ab7c04 --- /dev/null +++ b/src/common/message_broker/backend/nats/NatsBackendThread.py @@ -0,0 +1,61 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio, nats, nats.errors, queue, threading +from common.message_broker.Message import Message + +class NatsBackendThread(threading.Thread): + def __init__(self, nats_uri : str) -> None: + self._nats_uri = nats_uri + self._event_loop = asyncio.get_event_loop() + self._terminate = asyncio.Event() + self._publish_queue = asyncio.Queue[Message]() + super().__init__() + + def terminate(self) -> None: + self._terminate.set() + + async def _run_publisher(self) -> None: + client = await nats.connect(servers=[self._nats_uri]) + while not self._terminate.is_set(): + message : Message = await self._publish_queue.get() + await client.publish(message.topic, message.content.encode('UTF-8')) + await client.drain() + + def publish(self, topic_name : str, message_content : str) -> None: + self._publish_queue.put_nowait(Message(topic_name, message_content)) + + async def _run_subscriber( + self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event + ) -> None: + client = await nats.connect(servers=[self._nats_uri]) + subscription = await client.subscribe(topic_name) + while not self._terminate.is_set() and not unsubscribe.is_set(): + try: + message = await subscription.next_msg(timeout) + except nats.errors.TimeoutError: + continue + out_queue.put(Message(message.subject, message.data.decode('UTF-8'))) + await subscription.unsubscribe() + await client.drain() + + def subscribe( + self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event + ) -> None: + self._event_loop.create_task(self._run_subscriber(topic_name, timeout, out_queue, unsubscribe)) + + def run(self) -> None: + asyncio.set_event_loop(self._event_loop) + self._event_loop.create_task(self._run_publisher()) + self._event_loop.run_until_complete(self._terminate.wait()) diff --git a/src/common/message_broker/backend/nats/__init__.py b/src/common/message_broker/backend/nats/__init__.py new file mode 100644 index 000000000..70a332512 --- /dev/null +++ b/src/common/message_broker/backend/nats/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/test-context.sh b/test-context.sh deleted file mode 100755 index 212ce5bbe..000000000 --- a/test-context.sh +++ /dev/null @@ -1,58 +0,0 @@ -#!/bin/bash -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -######################################################################################################################## -# Define your deployment settings here -######################################################################################################################## - -# If not already set, set the name of the Kubernetes namespace to deploy to. -export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} - -######################################################################################################################## -# Automated steps start here -######################################################################################################################## - -PROJECTDIR=`pwd` - -cd $PROJECTDIR/src -RCFILE=$PROJECTDIR/coverage/.coveragerc -COVERAGEFILE=$PROJECTDIR/coverage/.coverage - -# Destroy old coverage file and configure the correct folder on the .coveragerc file -rm -f $COVERAGEFILE -cat $PROJECTDIR/coverage/.coveragerc.template | sed s+~/tfs-ctrl+$PROJECTDIR+g > $RCFILE - -#export CRDB_URI="cockroachdb://tfs:tfs123@127.0.0.1:26257/tfs_test?sslmode=require" -export CRDB_URI="cockroachdb://tfs:tfs123@10.1.7.195:26257/tfs_test?sslmode=require" -export PYTHONPATH=/home/tfs/tfs-ctrl/src - -# Run unitary tests and analyze coverage of code at same time -# helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0 -coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose --maxfail=1 \ - context/tests/test_hasher.py \ - context/tests/test_context.py \ - context/tests/test_topology.py \ - context/tests/test_device.py \ - context/tests/test_link.py \ - context/tests/test_service.py \ - context/tests/test_slice.py \ - context/tests/test_connection.py \ - context/tests/test_policy.py - -echo -echo "Coverage report:" -echo "----------------" -#coverage report --rcfile=$RCFILE --sort cover --show-missing --skip-covered | grep --color -E -i "^context/.*$|$" -coverage report --rcfile=$RCFILE --sort cover --show-missing --skip-covered --include="context/*" -- GitLab