Loading src/common/tests/MockMessageBroker.py +20 −3 Original line number Diff line number Diff line Loading @@ -15,9 +15,24 @@ import json, logging, threading, time from queue import Queue, Empty from typing import Dict, Iterator, NamedTuple, Set from common.proto.context_pb2 import EventTypeEnum LOGGER = logging.getLogger(__name__) CONSUME_TIMEOUT = 0.1 # seconds TOPIC_CONNECTION = 'connection' TOPIC_CONTEXT = 'context' TOPIC_DEVICE = 'device' TOPIC_LINK = 'link' TOPIC_POLICY = 'policy' TOPIC_SERVICE = 'service' TOPIC_SLICE = 'slice' TOPIC_TOPOLOGY = 'topology' TOPICS = { TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_POLICY, TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY } CONSUME_TIMEOUT = 0.5 # seconds class Message(NamedTuple): topic: str Loading Loading @@ -54,8 +69,10 @@ class MockMessageBroker: def terminate(self): self._terminate.set() def notify_event(messagebroker, topic_name, event_type, fields) -> None: event = {'event': {'timestamp': time.time(), 'event_type': event_type}} def notify_event( messagebroker : MockMessageBroker, topic_name : str, event_type : EventTypeEnum, fields : Dict[str, str] ) -> None: event = {'event': {'timestamp': {'timestamp': time.time()}, 'event_type': event_type}} for field_name, field_value in fields.items(): event[field_name] = field_value messagebroker.publish(Message(topic_name, json.dumps(event))) src/common/tests/MockServicerImpl_Context.py +3 −9 Original line number Diff line number Diff line Loading @@ -24,19 +24,13 @@ from common.proto.context_pb2 import ( Slice, SliceEvent, SliceId, SliceIdList, SliceList, Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList) from common.proto.context_pb2_grpc import ContextServiceServicer from common.tests.MockMessageBroker import MockMessageBroker, notify_event from common.tests.MockMessageBroker import ( TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY, MockMessageBroker, notify_event) from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string LOGGER = logging.getLogger(__name__) TOPIC_CONNECTION = 'connection' TOPIC_CONTEXT = 'context' TOPIC_TOPOLOGY = 'topology' TOPIC_DEVICE = 'device' TOPIC_LINK = 'link' TOPIC_SERVICE = 'service' TOPIC_SLICE = 'slice' def get_container(database : Dict[str, Dict[str, Any]], container_name : str) -> Dict[str, Any]: return database.setdefault(container_name, {}) Loading Loading
src/common/tests/MockMessageBroker.py +20 −3 Original line number Diff line number Diff line Loading @@ -15,9 +15,24 @@ import json, logging, threading, time from queue import Queue, Empty from typing import Dict, Iterator, NamedTuple, Set from common.proto.context_pb2 import EventTypeEnum LOGGER = logging.getLogger(__name__) CONSUME_TIMEOUT = 0.1 # seconds TOPIC_CONNECTION = 'connection' TOPIC_CONTEXT = 'context' TOPIC_DEVICE = 'device' TOPIC_LINK = 'link' TOPIC_POLICY = 'policy' TOPIC_SERVICE = 'service' TOPIC_SLICE = 'slice' TOPIC_TOPOLOGY = 'topology' TOPICS = { TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_POLICY, TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY } CONSUME_TIMEOUT = 0.5 # seconds class Message(NamedTuple): topic: str Loading Loading @@ -54,8 +69,10 @@ class MockMessageBroker: def terminate(self): self._terminate.set() def notify_event(messagebroker, topic_name, event_type, fields) -> None: event = {'event': {'timestamp': time.time(), 'event_type': event_type}} def notify_event( messagebroker : MockMessageBroker, topic_name : str, event_type : EventTypeEnum, fields : Dict[str, str] ) -> None: event = {'event': {'timestamp': {'timestamp': time.time()}, 'event_type': event_type}} for field_name, field_value in fields.items(): event[field_name] = field_value messagebroker.publish(Message(topic_name, json.dumps(event)))
src/common/tests/MockServicerImpl_Context.py +3 −9 Original line number Diff line number Diff line Loading @@ -24,19 +24,13 @@ from common.proto.context_pb2 import ( Slice, SliceEvent, SliceId, SliceIdList, SliceList, Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList) from common.proto.context_pb2_grpc import ContextServiceServicer from common.tests.MockMessageBroker import MockMessageBroker, notify_event from common.tests.MockMessageBroker import ( TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY, MockMessageBroker, notify_event) from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string LOGGER = logging.getLogger(__name__) TOPIC_CONNECTION = 'connection' TOPIC_CONTEXT = 'context' TOPIC_TOPOLOGY = 'topology' TOPIC_DEVICE = 'device' TOPIC_LINK = 'link' TOPIC_SERVICE = 'service' TOPIC_SLICE = 'slice' def get_container(database : Dict[str, Dict[str, Any]], container_name : str) -> Dict[str, Any]: return database.setdefault(container_name, {}) Loading