Skip to content
Snippets Groups Projects
Commit 692fc03e authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Common:

- Added backend for NATS message broker
- removed unneeded test script
parent 359705e3
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!34Context Scalability extensions using CockroachDB + Removal of Stateful database inside Device + other
......@@ -17,12 +17,14 @@ from typing import Optional, Union
from .backend._Backend import _Backend
from .backend.BackendEnum import BackendEnum
from .backend.inmemory.InMemoryBackend import InMemoryBackend
from .backend.nats.NatsBackend import NatsBackend
#from .backend.redis.RedisBackend import RedisBackend
LOGGER = logging.getLogger(__name__)
BACKENDS = {
BackendEnum.INMEMORY.value: InMemoryBackend,
BackendEnum.NATS.value: NatsBackend,
#BackendEnum.REDIS.value: RedisBackend,
#BackendEnum.KAFKA.value: KafkaBackend,
#BackendEnum.RABBITMQ.value: RabbitMQBackend,
......
......@@ -16,7 +16,8 @@ from enum import Enum
class BackendEnum(Enum):
INMEMORY = 'inmemory'
REDIS = 'redis'
NATS = 'nats'
#REDIS = 'redis'
#KAFKA = 'kafka'
#RABBITMQ = 'rabbitmq'
#ZEROMQ = 'zeromq'
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import queue, threading
from typing import Iterator, Set, Tuple
from common.Settings import get_setting
from common.message_broker.Message import Message
from .._Backend import _Backend
from .NatsBackendThread import NatsBackendThread
DEFAULT_NATS_URI = 'nats://127.0.0.1:4222'
class NatsBackend(_Backend):
def __init__(self, **settings) -> None: # pylint: disable=super-init-not-called
nats_uri = get_setting('NATS_URI', settings=settings, default=DEFAULT_NATS_URI)
self._terminate = threading.Event()
self._nats_backend_thread = NatsBackendThread(nats_uri)
self._nats_backend_thread.start()
def terminate(self) -> None:
self._terminate.set()
self._nats_backend_thread.terminate()
self._nats_backend_thread.join()
def publish(self, topic_name : str, message_content : str) -> None:
self._nats_backend_thread.publish(topic_name, message_content)
def consume(self, topic_names : Set[str], consume_timeout : float) -> Iterator[Tuple[str, str]]:
out_queue = queue.Queue[Message]()
unsubscribe = threading.Event()
for topic_name in topic_names:
self._nats_backend_thread.subscribe(topic_name, consume_timeout, out_queue, unsubscribe)
while not self._terminate.is_set():
try:
yield out_queue.get(block=True, timeout=consume_timeout)
except queue.Empty:
continue
unsubscribe.set()
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio, nats, nats.errors, queue, threading
from common.message_broker.Message import Message
class NatsBackendThread(threading.Thread):
def __init__(self, nats_uri : str) -> None:
self._nats_uri = nats_uri
self._event_loop = asyncio.get_event_loop()
self._terminate = asyncio.Event()
self._publish_queue = asyncio.Queue[Message]()
super().__init__()
def terminate(self) -> None:
self._terminate.set()
async def _run_publisher(self) -> None:
client = await nats.connect(servers=[self._nats_uri])
while not self._terminate.is_set():
message : Message = await self._publish_queue.get()
await client.publish(message.topic, message.content.encode('UTF-8'))
await client.drain()
def publish(self, topic_name : str, message_content : str) -> None:
self._publish_queue.put_nowait(Message(topic_name, message_content))
async def _run_subscriber(
self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event
) -> None:
client = await nats.connect(servers=[self._nats_uri])
subscription = await client.subscribe(topic_name)
while not self._terminate.is_set() and not unsubscribe.is_set():
try:
message = await subscription.next_msg(timeout)
except nats.errors.TimeoutError:
continue
out_queue.put(Message(message.subject, message.data.decode('UTF-8')))
await subscription.unsubscribe()
await client.drain()
def subscribe(
self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event
) -> None:
self._event_loop.create_task(self._run_subscriber(topic_name, timeout, out_queue, unsubscribe))
def run(self) -> None:
asyncio.set_event_loop(self._event_loop)
self._event_loop.create_task(self._run_publisher())
self._event_loop.run_until_complete(self._terminate.wait())
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/bin/bash
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
COVERAGEFILE=$PROJECTDIR/coverage/.coverage
# Destroy old coverage file and configure the correct folder on the .coveragerc file
rm -f $COVERAGEFILE
cat $PROJECTDIR/coverage/.coveragerc.template | sed s+~/tfs-ctrl+$PROJECTDIR+g > $RCFILE
#export CRDB_URI="cockroachdb://tfs:tfs123@127.0.0.1:26257/tfs_test?sslmode=require"
export CRDB_URI="cockroachdb://tfs:tfs123@10.1.7.195:26257/tfs_test?sslmode=require"
export PYTHONPATH=/home/tfs/tfs-ctrl/src
# Run unitary tests and analyze coverage of code at same time
# helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose --maxfail=1 \
context/tests/test_hasher.py \
context/tests/test_context.py \
context/tests/test_topology.py \
context/tests/test_device.py \
context/tests/test_link.py \
context/tests/test_service.py \
context/tests/test_slice.py \
context/tests/test_connection.py \
context/tests/test_policy.py
echo
echo "Coverage report:"
echo "----------------"
#coverage report --rcfile=$RCFILE --sort cover --show-missing --skip-covered | grep --color -E -i "^context/.*$|$"
coverage report --rcfile=$RCFILE --sort cover --show-missing --skip-covered --include="context/*"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment