Skip to content
Snippets Groups Projects
Commit f30a22a7 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Intermediate backup of DLT connector (not functional)

parent 959136e5
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!16DLT component (and related) improvements
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, threading, time
from queue import Queue, Empty
from typing import Dict, Iterator, NamedTuple, Set
LOGGER = logging.getLogger(__name__)
CONSUME_TIMEOUT = 0.1 # seconds
class Message(NamedTuple):
topic: str
content: str
class MockMessageBroker:
def __init__(self):
self._terminate = threading.Event()
self._topic__to__queues : Dict[str, Set[Queue]] = {}
def publish(self, message : Message) -> None:
queues = self._topic__to__queues.get(message.topic, None)
if queues is None: return
for queue in queues: queue.put_nowait((message.topic, message.content))
def consume(
self, topic_names : Set[str], block : bool = True, consume_timeout : float = CONSUME_TIMEOUT
) -> Iterator[Message]:
queue = Queue()
for topic_name in topic_names:
self._topic__to__queues.setdefault(topic_name, set()).add(queue)
while not self._terminate.is_set():
try:
message = queue.get(block=block, timeout=consume_timeout)
except Empty:
continue
if message is None: continue
yield Message(*message)
for topic_name in topic_names:
self._topic__to__queues.get(topic_name, set()).discard(queue)
def terminate(self):
self._terminate.set()
def notify_event(messagebroker, topic_name, event_type, fields) -> None:
event = {'event': {'timestamp': time.time(), 'event_type': event_type}}
for field_name, field_value in fields.items():
event[field_name] = field_value
messagebroker.publish(Message(topic_name, json.dumps(event)))
...@@ -12,18 +12,31 @@ ...@@ -12,18 +12,31 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import grpc, logging import grpc, json, logging
from typing import Any, Dict, Iterator, List from typing import Any, Dict, Iterator, List
from common.tools.grpc.Tools import grpc_message_to_json_string from common.tests.MockMessageBroker import MockMessageBroker, notify_event
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
from context.proto.context_pb2 import ( from context.proto.context_pb2 import (
Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, Context, ContextEvent, ContextId, Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
ContextIdList, ContextList, Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, Empty, Link, LinkEvent, Context, ContextEvent, ContextId, ContextIdList, ContextList,
LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList,
SliceId, SliceIdList, SliceList, Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList) Empty, EventTypeEnum,
Link, LinkEvent, LinkId, LinkIdList, LinkList,
Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList,
Slice, SliceEvent, SliceId, SliceIdList, SliceList,
Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList)
from context.proto.context_pb2_grpc import ContextServiceServicer from context.proto.context_pb2_grpc import ContextServiceServicer
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
TOPIC_CONNECTION = 'connection'
TOPIC_CONTEXT = 'context'
TOPIC_TOPOLOGY = 'topology'
TOPIC_DEVICE = 'device'
TOPIC_LINK = 'link'
TOPIC_SERVICE = 'service'
TOPIC_SLICE = 'slice'
def get_container(database : Dict[str, Dict[str, Any]], container_name : str) -> Dict[str, Any]: def get_container(database : Dict[str, Dict[str, Any]], container_name : str) -> Dict[str, Any]:
return database.setdefault(container_name, {}) return database.setdefault(container_name, {})
...@@ -31,10 +44,15 @@ def get_entries(database : Dict[str, Dict[str, Any]], container_name : str) -> L ...@@ -31,10 +44,15 @@ def get_entries(database : Dict[str, Dict[str, Any]], container_name : str) -> L
container = get_container(database, container_name) container = get_container(database, container_name)
return [container[entry_uuid] for entry_uuid in sorted(container.keys())] return [container[entry_uuid] for entry_uuid in sorted(container.keys())]
def has_entry(database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str) -> Any:
LOGGER.debug('[has_entry] BEFORE database={:s}'.format(str(database)))
container = get_container(database, container_name)
return entry_uuid in container
def get_entry( def get_entry(
context : grpc.ServicerContext, database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str context : grpc.ServicerContext, database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str
) -> Any: ) -> Any:
LOGGER.debug('[get_entry] AFTER database={:s}'.format(str(database))) LOGGER.debug('[get_entry] BEFORE database={:s}'.format(str(database)))
container = get_container(database, container_name) container = get_container(database, container_name)
if entry_uuid not in container: if entry_uuid not in container:
context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid))) context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid)))
...@@ -60,8 +78,27 @@ class MockServicerImpl_Context(ContextServiceServicer): ...@@ -60,8 +78,27 @@ class MockServicerImpl_Context(ContextServiceServicer):
def __init__(self): def __init__(self):
LOGGER.info('[__init__] Creating Servicer...') LOGGER.info('[__init__] Creating Servicer...')
self.database : Dict[str, Any] = {} self.database : Dict[str, Any] = {}
self.msg_broker = MockMessageBroker()
LOGGER.info('[__init__] Servicer Created') LOGGER.info('[__init__] Servicer Created')
# ----- Common -----------------------------------------------------------------------------------------------------
def _set(self, request, container_name, entry_uuid, entry_id_field_name, topic_name):
exists = has_entry(self.database, container_name, entry_uuid)
entry = set_entry(self.database, container_name, entry_uuid, request)
event_type = EventTypeEnum.EVENTTYPE_UPDATE if exists else EventTypeEnum.EVENTTYPE_CREATE
entry_id = getattr(entry, entry_id_field_name)
dict_entry_id = grpc_message_to_json(entry_id)
notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id})
return entry_id
def _del(self, request, container_name, entry_uuid, entry_id_field_name, topic_name, grpc_context):
empty = del_entry(grpc_context, self.database, container_name, entry_uuid)
event_type = EventTypeEnum.EVENTTYPE_REMOVE
dict_entry_id = grpc_message_to_json(request)
notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id})
return empty
# ----- Context ---------------------------------------------------------------------------------------------------- # ----- Context ----------------------------------------------------------------------------------------------------
def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList: def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList:
...@@ -78,14 +115,15 @@ class MockServicerImpl_Context(ContextServiceServicer): ...@@ -78,14 +115,15 @@ class MockServicerImpl_Context(ContextServiceServicer):
def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId: def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId:
LOGGER.info('[SetContext] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[SetContext] request={:s}'.format(grpc_message_to_json_string(request)))
return set_entry(self.database, 'context', request.context_id.context_uuid.uuid, request).context_id return self._set(request, 'context', request.context_uuid.uuid, 'context_id', TOPIC_CONTEXT)
def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty: def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveContext] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[RemoveContext] request={:s}'.format(grpc_message_to_json_string(request)))
return del_entry(context, self.database, 'context', request.context_uuid.uuid) return self._del(request, 'context', request.context_uuid.uuid, 'context_id', TOPIC_CONTEXT, context)
def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]: def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
LOGGER.info('[GetContextEvents] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[GetContextEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_CONTEXT}): yield ContextEvent(**json.loads(message.content))
# ----- Topology --------------------------------------------------------------------------------------------------- # ----- Topology ---------------------------------------------------------------------------------------------------
...@@ -108,15 +146,18 @@ class MockServicerImpl_Context(ContextServiceServicer): ...@@ -108,15 +146,18 @@ class MockServicerImpl_Context(ContextServiceServicer):
def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId: def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId:
LOGGER.info('[SetTopology] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[SetTopology] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'topology[{:s}]'.format(str(request.topology_id.context_id.context_uuid.uuid)) container_name = 'topology[{:s}]'.format(str(request.topology_id.context_id.context_uuid.uuid))
return set_entry(self.database, container_name, request.topology_id.topology_uuid.uuid, request).topology_id topology_uuid = request.topology_id.topology_uuid.uuid
return self._set(request, container_name, topology_uuid, 'topology_id', TOPIC_TOPOLOGY)
def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty: def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveTopology] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[RemoveTopology] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'topology[{:s}]'.format(str(request.context_id.context_uuid.uuid)) container_name = 'topology[{:s}]'.format(str(request.context_id.context_uuid.uuid))
return del_entry(context, self.database, container_name, request.topology_uuid.uuid) topology_uuid = request.topology_uuid.uuid
return self._del(request, container_name, topology_uuid, 'topology_id', TOPIC_TOPOLOGY, context)
def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]: def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
LOGGER.info('[GetTopologyEvents] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[GetTopologyEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_TOPOLOGY}): yield TopologyEvent(**json.loads(message.content))
# ----- Device ----------------------------------------------------------------------------------------------------- # ----- Device -----------------------------------------------------------------------------------------------------
...@@ -135,14 +176,15 @@ class MockServicerImpl_Context(ContextServiceServicer): ...@@ -135,14 +176,15 @@ class MockServicerImpl_Context(ContextServiceServicer):
def SetDevice(self, request: Context, context : grpc.ServicerContext) -> DeviceId: def SetDevice(self, request: Context, context : grpc.ServicerContext) -> DeviceId:
LOGGER.info('[SetDevice] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[SetDevice] request={:s}'.format(grpc_message_to_json_string(request)))
return set_entry(self.database, 'device', request.device_id.device_uuid.uuid, request).device_id return self._set(request, 'device', request.device_id.device_uuid.uuid, 'device_id', TOPIC_DEVICE)
def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty: def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveDevice] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[RemoveDevice] request={:s}'.format(grpc_message_to_json_string(request)))
return del_entry(context, self.database, 'device', request.device_uuid.uuid) return self._del(request, 'device', request.device_uuid.uuid, 'device_id', TOPIC_DEVICE, context)
def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]: def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]:
LOGGER.info('[GetDeviceEvents] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[GetDeviceEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_DEVICE}): yield DeviceEvent(**json.loads(message.content))
# ----- Link ------------------------------------------------------------------------------------------------------- # ----- Link -------------------------------------------------------------------------------------------------------
...@@ -161,14 +203,15 @@ class MockServicerImpl_Context(ContextServiceServicer): ...@@ -161,14 +203,15 @@ class MockServicerImpl_Context(ContextServiceServicer):
def SetLink(self, request: Context, context : grpc.ServicerContext) -> LinkId: def SetLink(self, request: Context, context : grpc.ServicerContext) -> LinkId:
LOGGER.info('[SetLink] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[SetLink] request={:s}'.format(grpc_message_to_json_string(request)))
return set_entry(self.database, 'link', request.link_id.link_uuid.uuid, request).link_id return self._set(request, 'link', request.link_id.link_uuid.uuid, 'link_id', TOPIC_LINK)
def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty: def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveLink] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[RemoveLink] request={:s}'.format(grpc_message_to_json_string(request)))
return del_entry(context, self.database, 'link', request.link_uuid.uuid) return self._del(request, 'link', request.link_uuid.uuid, 'link_id', TOPIC_LINK, context)
def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]: def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]:
LOGGER.info('[GetLinkEvents] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[GetLinkEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_LINK}): yield LinkEvent(**json.loads(message.content))
# ----- Slice ------------------------------------------------------------------------------------------------------ # ----- Slice ------------------------------------------------------------------------------------------------------
...@@ -222,17 +265,19 @@ class MockServicerImpl_Context(ContextServiceServicer): ...@@ -222,17 +265,19 @@ class MockServicerImpl_Context(ContextServiceServicer):
def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId: def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId:
LOGGER.info('[SetService] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[SetService] request={:s}'.format(grpc_message_to_json_string(request)))
return set_entry( container_name = 'service[{:s}]'.format(str(request.service_id.context_id.context_uuid.uuid))
self.database, 'service[{:s}]'.format(str(request.service_id.context_id.context_uuid.uuid)), service_uuid = request.service_id.service_uuid.uuid
request.service_id.service_uuid.uuid, request).service_id return self._set(request, container_name, service_uuid, 'service_id', TOPIC_SERVICE)
def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty: def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveService] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[RemoveService] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'service[{:s}]'.format(str(request.context_id.context_uuid.uuid)) container_name = 'service[{:s}]'.format(str(request.context_id.context_uuid.uuid))
return del_entry(context, self.database, container_name, request.service_uuid.uuid) service_uuid = request.service_id.service_uuid.uuid
return self._del(request, container_name, service_uuid, 'service_id', TOPIC_SERVICE, context)
def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]: def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]:
LOGGER.info('[GetServiceEvents] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[GetServiceEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_SERVICE}): yield ServiceEvent(**json.loads(message.content))
# ----- Connection ------------------------------------------------------------------------------------------------- # ----- Connection -------------------------------------------------------------------------------------------------
...@@ -255,21 +300,21 @@ class MockServicerImpl_Context(ContextServiceServicer): ...@@ -255,21 +300,21 @@ class MockServicerImpl_Context(ContextServiceServicer):
def SetConnection(self, request: Connection, context : grpc.ServicerContext) -> ConnectionId: def SetConnection(self, request: Connection, context : grpc.ServicerContext) -> ConnectionId:
LOGGER.info('[SetConnection] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[SetConnection] request={:s}'.format(grpc_message_to_json_string(request)))
service_connection__container_name = 'service_connection[{:s}/{:s}]'.format( container_name = 'service_connection[{:s}/{:s}]'.format(
str(request.service_id.context_id.context_uuid.uuid), str(request.service_id.service_uuid.uuid)) str(request.service_id.context_id.context_uuid.uuid), str(request.service_id.service_uuid.uuid))
set_entry( connection_uuid = request.connection_id.connection_uuid.uuid
self.database, service_connection__container_name, request.connection_id.connection_uuid.uuid, request) set_entry(self.database, container_name, connection_uuid, request)
return set_entry( return self._set(request, 'connection', connection_uuid, 'connection_id', TOPIC_CONNECTION)
self.database, 'connection', request.connection_id.connection_uuid.uuid, request).connection_id
def RemoveConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Empty: def RemoveConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveConnection] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[RemoveConnection] request={:s}'.format(grpc_message_to_json_string(request)))
connection = get_entry(context, self.database, 'connection', request.connection_uuid.uuid) connection = get_entry(context, self.database, 'connection', request.connection_uuid.uuid)
service_id = connection.service_id container_name = 'service_connection[{:s}/{:s}]'.format(
service_connection__container_name = 'service_connection[{:s}/{:s}]'.format( str(connection.service_id.context_id.context_uuid.uuid), str(connection.service_id.service_uuid.uuid))
str(service_id.context_id.context_uuid.uuid), str(service_id.service_uuid.uuid)) connection_uuid = request.connection_uuid.uuid
del_entry(context, self.database, service_connection__container_name, request.connection_uuid.uuid) del_entry(context, self.database, container_name, connection_uuid)
return del_entry(context, self.database, 'connection', request.connection_uuid.uuid) return self._del(request, 'connection', connection_uuid, 'connection_id', TOPIC_CONNECTION, context)
def GetConnectionEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]: def GetConnectionEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]:
LOGGER.info('[GetConnectionEvents] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.info('[GetConnectionEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_CONNECTION}): yield ConnectionEvent(**json.loads(message.content))
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict, Iterator, NamedTuple, Tuple
import grpc, logging
from common.tests.MockMessageBroker import MockMessageBroker
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.proto.context_pb2 import Empty, TeraFlowController
from dlt.connector.proto.dlt_pb2 import (
DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordOperationEnum, DltRecordStatus, DltRecordSubscription, DltRecordTypeEnum)
from dlt.connector.proto.dlt_pb2_grpc import DltServiceServicer
LOGGER = logging.getLogger(__name__)
DltRecordKey = Tuple[str, DltRecordOperationEnum, str] # domain_uuid, operation, record_uuid
DltRecordDict = Dict[DltRecordKey, DltRecord] # dlt_record_key => dlt_record
class MockServicerImpl_Dlt(DltServiceServicer):
def __init__(self):
LOGGER.info('[__init__] Creating Servicer...')
self.records : DltRecordDict = {}
self.msg_broker = MockMessageBroker()
LOGGER.info('[__init__] Servicer Created')
def RecordToDlt(self, request : DltRecord, context : grpc.ServicerContext) -> DltRecordStatus:
LOGGER.info('[RecordToDlt] request={:s}'.format(grpc_message_to_json_string(request)))
operation = request.operation
domain_uuid = request.record_id.domain_uuid
record_uuid = request.record_id.record_uuid
#if operation ==
def GetFromDlt(self, request : DltRecordId, context : grpc.ServicerContext) -> DltRecord:
LOGGER.info('[GetFromDlt] request={:s}'.format(grpc_message_to_json_string(request)))
def SubscribeToDlt(self, request: DltRecordSubscription, context : grpc.ServicerContext) -> Iterator[DltRecordEvent]:
LOGGER.info('[SubscribeToDlt] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_CONTEXT}): yield ContextEvent(**json.loads(message.content))
def GetDltStatus(self, request : TeraFlowController, context : grpc.ServicerContext) -> DltPeerStatus:
LOGGER.info('[GetDltStatus] request={:s}'.format(grpc_message_to_json_string(request)))
def GetDltPeers(self, request : Empty, context : grpc.ServicerContext) -> DltPeerStatusList:
LOGGER.info('[GetDltPeers] request={:s}'.format(grpc_message_to_json_string(request)))
LOGGER.info('[__init__] Servicer Created')
# ----- Common -----------------------------------------------------------------------------------------------------
def _set(self, request, container_name, entry_uuid, entry_id_field_name, topic_name):
exists = has_entry(self.database, container_name, entry_uuid)
entry = set_entry(self.database, container_name, entry_uuid, request)
event_type = EventTypeEnum.EVENTTYPE_UPDATE if exists else EventTypeEnum.EVENTTYPE_CREATE
entry_id = getattr(entry, entry_id_field_name)
dict_entry_id = grpc_message_to_json(entry_id)
notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id})
return entry_id
def _del(self, request, container_name, entry_uuid, entry_id_field_name, topic_name, grpc_context):
empty = del_entry(grpc_context, self.database, container_name, entry_uuid)
event_type = EventTypeEnum.EVENTTYPE_REMOVE
dict_entry_id = grpc_message_to_json(request)
notify_event(self.msg_broker, topic_name, event_type, {entry_id_field_name: dict_entry_id})
return empty
# ----- Context ----------------------------------------------------------------------------------------------------
def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList:
LOGGER.info('[ListContextIds] request={:s}'.format(grpc_message_to_json_string(request)))
return ContextIdList(context_ids=[context.context_id for context in get_entries(self.database, 'context')])
def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList:
LOGGER.info('[ListContexts] request={:s}'.format(grpc_message_to_json_string(request)))
return ContextList(contexts=get_entries(self.database, 'context'))
def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context:
LOGGER.info('[GetContext] request={:s}'.format(grpc_message_to_json_string(request)))
return get_entry(context, self.database, 'context', request.context_uuid.uuid)
def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId:
LOGGER.info('[SetContext] request={:s}'.format(grpc_message_to_json_string(request)))
return self._set(request, 'context', request.context_uuid.uuid, 'context_id', TOPIC_CONTEXT)
def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[RemoveContext] request={:s}'.format(grpc_message_to_json_string(request)))
return self._del(request, 'context', request.context_uuid.uuid, 'context_id', TOPIC_CONTEXT, context)
def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
LOGGER.info('[GetContextEvents] request={:s}'.format(grpc_message_to_json_string(request)))
for message in self.msg_broker.consume({TOPIC_CONTEXT}): yield ContextEvent(**json.loads(message.content))
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Build, tag, and push the Docker images to the GitLab Docker registry
build slice:
variables:
IMAGE_NAME: 'slice' # name of the microservice
IMAGE_NAME_TEST: 'slice-test' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile ./src/
- docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
rules:
- changes:
- src/$IMAGE_NAME/**
- .gitlab-ci.yml
# Pull, execute, and run unitary tests for the Docker image from the GitLab registry
unit_test slice:
variables:
IMAGE_NAME: 'slice' # name of the microservice
IMAGE_NAME_TEST: 'slice-test' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build slice
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker run -d -p 4040:4040 --name $IMAGE_NAME --network=teraflowbridge "$IMAGE_NAME:$IMAGE_TAG"
- docker ps -a
- sleep 5
- docker ps -a
- docker logs $IMAGE_NAME
- docker exec -i $IMAGE_NAME bash -c "pytest --log-level=DEBUG --verbose $IMAGE_NAME/tests/test_unitary.py"
after_script:
- docker stop $IMAGE_NAME
- docker rm $IMAGE_NAME
rules:
- changes:
- src/$IMAGE_NAME/**
- .gitlab-ci.yml
# Deployment of the service in Kubernetes Cluster
deploy slice:
stage: deploy
needs:
- build slice
- unit_test slice
- dependencies all
- integ_test execute
script:
- kubectl version
- kubectl get all
- kubectl apply -f "manifests/sliceservice.yaml"
- kubectl delete pods --selector app=sliceservice
- kubectl get all
FROM python:3-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip setuptools wheel pip-tools
# Set working directory
WORKDIR /var/teraflow
# Create module sub-folders
RUN mkdir -p /var/teraflow/slice
# Get Python packages per module
COPY slice/requirements.in slice/requirements.in
RUN pip-compile --output-file=slice/requirements.txt slice/requirements.in
RUN python3 -m pip install -r slice/requirements.in
# Add files into working directory
COPY common/. common
COPY context/. context
COPY interdomain/. interdomain
COPY service/. service
COPY slice/. slice
# Start slice service
ENTRYPOINT ["python", "-m", "slice.service"]
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.tools.client.RetryDecorator import retry, delay_exponential
#from common.tools.grpc.Tools import grpc_message_to_json_string
#from slice.proto.context_pb2 import Empty, Slice, SliceId
from dlt.connector.proto.dlt_pb2_grpc import DltServiceStub
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class DltClient:
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.DLT)
if not port: port = get_service_port_grpc(ServiceNameEnum.DLT)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = DltServiceStub(self.channel)
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
# @RETRY_DECORATOR
# def CreateSlice(self, request : Slice) -> SliceId:
# LOGGER.debug('CreateSlice request: {:s}'.format(grpc_message_to_json_string(request)))
# response = self.stub.CreateSlice(request)
# LOGGER.debug('CreateSlice result: {:s}'.format(grpc_message_to_json_string(response)))
# return response
# @RETRY_DECORATOR
# def UpdateSlice(self, request : Slice) -> SliceId:
# LOGGER.debug('UpdateSlice request: {:s}'.format(grpc_message_to_json_string(request)))
# response = self.stub.UpdateSlice(request)
# LOGGER.debug('UpdateSlice result: {:s}'.format(grpc_message_to_json_string(response)))
# return response
# @RETRY_DECORATOR
# def DeleteSlice(self, request : SliceId) -> Empty:
# LOGGER.debug('DeleteSlice request: {:s}'.format(grpc_message_to_json_string(request)))
# response = self.stub.DeleteSlice(request)
# LOGGER.debug('DeleteSlice result: {:s}'.format(grpc_message_to_json_string(response)))
# return response
// Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package dlt;
import "context.proto";
service DltService {
rpc RecordToDlt (DltRecord ) returns ( DltRecordStatus ) {}
rpc GetFromDlt (DltRecordId ) returns ( DltRecord ) {}
rpc SubscribeToDlt(DltRecordSubscription ) returns (stream DltRecordEvent ) {}
rpc GetDltStatus (context.TeraFlowController) returns ( DltPeerStatus ) {} // NEC is checkig if it is possible
rpc GetDltPeers (context.Empty ) returns ( DltPeerStatusList) {} // NEC is checkig if it is possible
}
enum DltRecordTypeEnum {
DLTRECORDTYPE_UNDEFINED = 0;
DLTRECORDTYPE_CONTEXT = 1;
DLTRECORDTYPE_TOPOLOGY = 2;
DLTRECORDTYPE_DEVICE = 3;
DLTRECORDTYPE_LINK = 4;
DLTRECORDTYPE_SERVICE = 5;
DLTRECORDTYPE_SLICE = 6;
}
enum DltRecordOperationEnum {
DLTRECORDOPERATION_UNDEFINED = 0;
DLTRECORDOPERATION_ADD = 1;
DLTRECORDOPERATION_UPDATE = 2;
DLTRECORDOPERATION_DELETE = 3;
}
enum DltRecordStatusEnum {
DLTRECORDSTATUS_UNDEFINED = 0;
DLTRECORDSTATUS_SUCCEEDED = 1;
DLTRECORDSTATUS_FAILED = 2;
}
enum DltStatusEnum {
DLTSTATUS_UNDEFINED = 0;
DLTSTATUS_NOTAVAILABLE = 1;
DLTSTATUS_INITIALIZED = 2;
DLTSTATUS_AVAILABLE = 3;
DLTSTATUS_DEINIT = 4;
}
message DltRecordId {
context.Uuid domain_uuid = 1; // unique identifier of domain owning the record
DltRecordTypeEnum type = 2; // type of record
context.Uuid record_uuid = 3; // unique identifier of the record within the domain context_uuid/topology_uuid
}
message DltRecord {
DltRecordId record_id = 1; // record identifier
DltRecordOperationEnum operation = 2; // operation to be performed over the record
string data_json = 3; // record content: JSON-encoded record content
}
message DltRecordSubscription {
// retrieved events have to match ALL conditions.
// i.e., type in types requested, AND operation in operations requested
// TODO: consider adding a more sophisticated filtering
repeated DltRecordTypeEnum type = 1; // selected event types, empty=all
repeated DltRecordOperationEnum operation = 2; // selected event operations, empty=all
}
message DltRecordEvent {
context.Event event = 1; // common event data (timestamp & event_type)
DltRecordId record_id = 2; // record identifier associated with this event
}
message DltRecordStatus {
DltRecordId record_id = 1; // identifier of the associated record
DltRecordStatusEnum status = 2; // status of the record
string error_message = 3; // error message in case of failure, empty otherwise
}
message DltPeerStatus {
context.TeraFlowController controller = 1; // Identifier of the TeraFlow controller instance
DltStatusEnum status = 2; // Status of the TeraFlow controller instance
}
message DltPeerStatusList {
repeated DltPeerStatus peers = 1; // List of peers and their status
}
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, threading
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from dlt.connector.client.DltClient import DltClient
LOGGER = logging.getLogger(__name__)
class DltConnector:
def __init__(self) -> None:
LOGGER.debug('Creating connector...')
self._terminate = threading.Event()
self._thread = None
LOGGER.debug('Connector created')
def start(self):
self._terminate.clear()
self._thread = threading.Thread(target=self._run_events_collector)
self._thread.start()
def _run_events_collector(self) -> None:
dlt_client = DltClient()
context_client = ContextClient()
events_collector = EventsCollector(context_client)
events_collector.start()
while not self._terminate.is_set():
event = events_collector.get_event()
LOGGER.info('Event from Context Received: {:s}'.format(grpc_message_to_json_string(event)))
events_collector.stop()
context_client.close()
dlt_client.close()
def stop(self):
self._terminate.set()
self._thread.join()
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.tools.service.GenericGrpcService import GenericGrpcService
from ..proto.dlt_pb2_grpc import add_SliceServiceServicer_to_server
from .DltConnectorServiceServicerImpl import SliceServiceServicerImpl
class DltConnectorService(GenericGrpcService):
def __init__(self, cls_name: str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.SLICE)
super().__init__(port, cls_name=cls_name)
self.slice_servicer = SliceServiceServicerImpl()
def install_servicers(self):
add_SliceServiceServicer_to_server(self.slice_servicer, self.server)
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from context.proto.context_pb2 import Slice, SliceId
from ..proto.dlt_pb2_grpc import DltServiceServicer
LOGGER = logging.getLogger(__name__)
SERVICE_NAME = 'DltConnector'
METHOD_NAMES = ['CreateSlice', 'UpdateSlice', 'DeleteSlice']
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class DltConnectorServiceServicerImpl(DltServiceServicer):
def __init__(self):
LOGGER.debug('Creating Servicer...')
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS, LOGGER)
def CreateSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
return SliceId()
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port,
wait_for_environment_variables)
from .DltConnectorService import DltConnectorService
terminate = threading.Event()
LOGGER : logging.Logger = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
terminate.set()
def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level()
logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__)
wait_for_environment_variables([
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...')
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
# Starting DLT connector service
grpc_service = DltConnectorService()
grpc_service.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=0.1): pass
LOGGER.info('Terminating...')
grpc_service.stop()
LOGGER.info('Bye')
return 0
if __name__ == '__main__':
sys.exit(main())
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Tuple
from common.orm.Database import Database
from common.message_broker.MessageBroker import MessageBroker
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from context.proto.context_pb2 import Context, ContextId, Device, DeviceId, Link, LinkId, Topology, TopologyId
from .PrepareTestScenario import context_db_mb, context_service, context_client # pylint: disable=unused-import
from .Objects import CONTEXTS, TOPOLOGIES, DEVICES, LINKS
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
def test_create_events(
context_client : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_database = context_db_mb[0]
context_database.clear_all()
events_collector = EventsCollector(context_client)
events_collector.start()
events = events_collector.get_events(block=True, count=4)
events_collector.stop()
for context in CONTEXTS : context_client.SetContext (Context (**context ))
for topology in TOPOLOGIES: context_client.SetTopology(Topology(**topology))
for device in DEVICES : context_client.SetDevice (Device (**device ))
for link in LINKS : context_client.SetLink (Link (**link ))
for link in LINKS : context_client.RemoveLink (LinkId (**link ['link_id' ]))
for device in DEVICES : context_client.RemoveDevice (DeviceId (**device ['device_id' ]))
for topology in TOPOLOGIES: context_client.RemoveTopology(TopologyId(**topology['topology_id']))
for context in CONTEXTS : context_client.RemoveContext (ContextId (**context ['context_id' ]))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment