Skip to content
Snippets Groups Projects
Commit 16bdb723 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

DLT - Connector component:

- updated proto definition to include the domain that originates the record
- updated proto definition to include RecordLink and RecordAllLinks methods
- updated proto definition to provide the domain for which RecordAll* methods should be executed
- updated DltConnectorClient accordingly
- implemented RecordDevice and RecordLink in DltConnectorServicer
- removed unused OwnDomainFinder
- first functional version of DltEventDispatcher (only processes Devices and Links, by now)
parent 7f5b969a
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!24Integrate NFV-SDN'22 demo
...@@ -18,14 +18,37 @@ package dlt; ...@@ -18,14 +18,37 @@ package dlt;
import "context.proto"; import "context.proto";
service DltConnectorService { service DltConnectorService {
rpc RecordAll (context.Empty ) returns (context.Empty) {} rpc RecordAll (context.TopologyId) returns (context.Empty) {}
rpc RecordAllDevices (context.Empty ) returns (context.Empty) {} rpc RecordAllDevices (context.TopologyId) returns (context.Empty) {}
rpc RecordDevice (context.DeviceId ) returns (context.Empty) {} rpc RecordDevice (DltDeviceId ) returns (context.Empty) {}
rpc RecordAllServices(context.Empty ) returns (context.Empty) {} rpc RecordAllLinks (context.TopologyId) returns (context.Empty) {}
rpc RecordService (context.ServiceId) returns (context.Empty) {} rpc RecordLink (DltLinkId ) returns (context.Empty) {}
rpc RecordAllSlices (context.Empty ) returns (context.Empty) {} rpc RecordAllServices(context.TopologyId) returns (context.Empty) {}
rpc RecordSlice (context.SliceId ) returns (context.Empty) {} rpc RecordService (DltServiceId ) returns (context.Empty) {}
rpc RecordAllSlices (context.TopologyId) returns (context.Empty) {}
rpc RecordSlice (DltSliceId ) returns (context.Empty) {}
}
message DltDeviceId {
context.TopologyId topology_id = 1;
context.DeviceId device_id = 2;
}
message DltLinkId {
context.TopologyId topology_id = 1;
context.LinkId link_id = 2;
}
message DltServiceId {
context.TopologyId topology_id = 1;
context.ServiceId service_id = 2;
}
message DltSliceId {
context.TopologyId topology_id = 1;
context.SliceId slice_id = 2;
} }
...@@ -15,7 +15,8 @@ ...@@ -15,7 +15,8 @@
import grpc, logging import grpc, logging
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import DeviceId, Empty, ServiceId, SliceId from common.proto.context_pb2 import Empty, TopologyId
from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId
from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceStub from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json_string
...@@ -46,49 +47,63 @@ class DltConnectorClient: ...@@ -46,49 +47,63 @@ class DltConnectorClient:
self.stub = None self.stub = None
@RETRY_DECORATOR @RETRY_DECORATOR
def RecordAll(self, request : Empty) -> Empty: def RecordAll(self, request : TopologyId) -> Empty:
LOGGER.debug('RecordAll request: {:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('RecordAll request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordAll(request) response = self.stub.RecordAll(request)
LOGGER.debug('RecordAll result: {:s}'.format(grpc_message_to_json_string(response))) LOGGER.debug('RecordAll result: {:s}'.format(grpc_message_to_json_string(response)))
return response return response
@RETRY_DECORATOR @RETRY_DECORATOR
def RecordAllDevices(self, request : Empty) -> Empty: def RecordAllDevices(self, request : TopologyId) -> Empty:
LOGGER.debug('RecordAllDevices request: {:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('RecordAllDevices request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordAllDevices(request) response = self.stub.RecordAllDevices(request)
LOGGER.debug('RecordAllDevices result: {:s}'.format(grpc_message_to_json_string(response))) LOGGER.debug('RecordAllDevices result: {:s}'.format(grpc_message_to_json_string(response)))
return response return response
@RETRY_DECORATOR @RETRY_DECORATOR
def RecordDevice(self, request : DeviceId) -> Empty: def RecordDevice(self, request : DltDeviceId) -> Empty:
LOGGER.debug('RecordDevice request: {:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('RecordDevice request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordDevice(request) response = self.stub.RecordDevice(request)
LOGGER.debug('RecordDevice result: {:s}'.format(grpc_message_to_json_string(response))) LOGGER.debug('RecordDevice result: {:s}'.format(grpc_message_to_json_string(response)))
return response return response
@RETRY_DECORATOR @RETRY_DECORATOR
def RecordAllServices(self, request : Empty) -> Empty: def RecordAllLinks(self, request : TopologyId) -> Empty:
LOGGER.debug('RecordAllLinks request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordAllLinks(request)
LOGGER.debug('RecordAllLinks result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordLink(self, request : DltLinkId) -> Empty:
LOGGER.debug('RecordLink request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordLink(request)
LOGGER.debug('RecordLink result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordAllServices(self, request : TopologyId) -> Empty:
LOGGER.debug('RecordAllServices request: {:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('RecordAllServices request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordAllServices(request) response = self.stub.RecordAllServices(request)
LOGGER.debug('RecordAllServices result: {:s}'.format(grpc_message_to_json_string(response))) LOGGER.debug('RecordAllServices result: {:s}'.format(grpc_message_to_json_string(response)))
return response return response
@RETRY_DECORATOR @RETRY_DECORATOR
def RecordService(self, request : ServiceId) -> Empty: def RecordService(self, request : DltServiceId) -> Empty:
LOGGER.debug('RecordService request: {:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('RecordService request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordService(request) response = self.stub.RecordService(request)
LOGGER.debug('RecordService result: {:s}'.format(grpc_message_to_json_string(response))) LOGGER.debug('RecordService result: {:s}'.format(grpc_message_to_json_string(response)))
return response return response
@RETRY_DECORATOR @RETRY_DECORATOR
def RecordAllSlices(self, request : Empty) -> Empty: def RecordAllSlices(self, request : TopologyId) -> Empty:
LOGGER.debug('RecordAllSlices request: {:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('RecordAllSlices request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordAllSlices(request) response = self.stub.RecordAllSlices(request)
LOGGER.debug('RecordAllSlices result: {:s}'.format(grpc_message_to_json_string(response))) LOGGER.debug('RecordAllSlices result: {:s}'.format(grpc_message_to_json_string(response)))
return response return response
@RETRY_DECORATOR @RETRY_DECORATOR
def RecordSlice(self, request : SliceId) -> Empty: def RecordSlice(self, request : DltSliceId) -> Empty:
LOGGER.debug('RecordSlice request: {:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('RecordSlice request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordSlice(request) response = self.stub.RecordSlice(request)
LOGGER.debug('RecordSlice result: {:s}'.format(grpc_message_to_json_string(response))) LOGGER.debug('RecordSlice result: {:s}'.format(grpc_message_to_json_string(response)))
......
...@@ -12,18 +12,16 @@ ...@@ -12,18 +12,16 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import copy, grpc, logging import grpc, logging
from typing import Optional from common.proto.context_pb2 import DeviceId, Empty, LinkId, ServiceId, SliceId, TopologyId
from common.Constants import DEFAULT_CONTEXT_UUID from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId
from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceServicer
from common.proto.dlt_gateway_pb2 import DltRecord, DltRecordId, DltRecordOperationEnum, DltRecordTypeEnum from common.proto.dlt_gateway_pb2 import DltRecord, DltRecordId, DltRecordOperationEnum, DltRecordTypeEnum
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.proto.context_pb2 import DeviceId, Empty, ServiceId, SliceId
from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceServicer
from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
from dlt.connector.client.DltGatewayClient import DltGatewayClient from dlt.connector.client.DltGatewayClient import DltGatewayClient
from .tools.Checkers import record_exists from .tools.Checkers import record_exists
from .tools.OwnDomainFinder import OwnDomainFinder
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -31,6 +29,7 @@ SERVICE_NAME = 'DltConnector' ...@@ -31,6 +29,7 @@ SERVICE_NAME = 'DltConnector'
METHOD_NAMES = [ METHOD_NAMES = [
'RecordAll', 'RecordAll',
'RecordAllDevices', 'RecordDevice', 'RecordAllDevices', 'RecordDevice',
'RecordAllLinks', 'RecordLink',
'RecordAllServices', 'RecordService', 'RecordAllServices', 'RecordService',
'RecordAllSlices', 'RecordSlice', 'RecordAllSlices', 'RecordSlice',
] ]
...@@ -39,26 +38,25 @@ METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) ...@@ -39,26 +38,25 @@ METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): class DltConnectorServiceServicerImpl(DltConnectorServiceServicer):
def __init__(self): def __init__(self):
LOGGER.debug('Creating Servicer...') LOGGER.debug('Creating Servicer...')
self._own_domain_finder = OwnDomainFinder()
LOGGER.debug('Servicer Created') LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def RecordAll(self, request : Empty, context : grpc.ServicerContext) -> Empty: def RecordAll(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
return Empty() return Empty()
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def RecordAllDevices(self, request : Empty, context : grpc.ServicerContext) -> Empty: def RecordAllDevices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
return Empty() return Empty()
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def RecordDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty: def RecordDevice(self, request : DltDeviceId, context : grpc.ServicerContext) -> Empty:
context_client = ContextClient() context_client = ContextClient()
device = context_client.GetDevice(request) device = context_client.GetDevice(request.device_id)
dltgateway_client = DltGatewayClient() dltgateway_client = DltGatewayClient()
dlt_record_id = DltRecordId() dlt_record_id = DltRecordId()
dlt_record_id.domain_uuid.uuid = self._own_domain_finder.own_domain_uuid dlt_record_id.domain_uuid.uuid = request.topology_id.topology_uuid.uuid
dlt_record_id.type = DltRecordTypeEnum.DLTRECORDTYPE_DEVICE dlt_record_id.type = DltRecordTypeEnum.DLTRECORDTYPE_DEVICE
dlt_record_id.record_uuid.uuid = device.device_id.device_uuid.uuid dlt_record_id.record_uuid.uuid = device.device_id.device_uuid.uuid
...@@ -83,17 +81,53 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): ...@@ -83,17 +81,53 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer):
return Empty() return Empty()
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def RecordAllServices(self, request : Empty, context : grpc.ServicerContext) -> Empty: def RecordAllLinks(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
return Empty()
@safe_and_metered_rpc_method(METRICS, LOGGER)
def RecordLink(self, request : DltLinkId, context : grpc.ServicerContext) -> Empty:
context_client = ContextClient()
link = context_client.GetLink(request.link_id)
dltgateway_client = DltGatewayClient()
dlt_record_id = DltRecordId()
dlt_record_id.domain_uuid.uuid = request.topology_id.topology_uuid.uuid
dlt_record_id.type = DltRecordTypeEnum.DLTRECORDTYPE_LINK
dlt_record_id.record_uuid.uuid = link.link_id.link_uuid.uuid
LOGGER.info('[RecordLink] sent dlt_record_id = {:s}'.format(grpc_message_to_json_string(dlt_record_id)))
dlt_record = dltgateway_client.GetFromDlt(dlt_record_id)
LOGGER.info('[RecordLink] recv dlt_record = {:s}'.format(grpc_message_to_json_string(dlt_record)))
exists = record_exists(dlt_record)
LOGGER.info('[RecordLink] exists = {:s}'.format(str(exists)))
dlt_record = DltRecord()
dlt_record.record_id.CopyFrom(dlt_record_id)
dlt_record.operation = \
DltRecordOperationEnum.DLTRECORDOPERATION_UPDATE \
if exists else \
DltRecordOperationEnum.DLTRECORDOPERATION_ADD
dlt_record.data_json = grpc_message_to_json_string(link)
LOGGER.info('[RecordLink] sent dlt_record = {:s}'.format(grpc_message_to_json_string(dlt_record)))
dlt_record_status = dltgateway_client.RecordToDlt(dlt_record)
LOGGER.info('[RecordLink] recv dlt_record_status = {:s}'.format(grpc_message_to_json_string(dlt_record_status)))
return Empty()
@safe_and_metered_rpc_method(METRICS, LOGGER)
def RecordAllServices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
return Empty() return Empty()
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def RecordService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty: def RecordService(self, request : DltServiceId, context : grpc.ServicerContext) -> Empty:
return Empty() return Empty()
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def RecordAllSlices(self, request : Empty, context : grpc.ServicerContext) -> Empty: def RecordAllSlices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
return Empty() return Empty()
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def RecordSlice(self, request : SliceId, context : grpc.ServicerContext) -> Empty: def RecordSlice(self, request : DltSliceId, context : grpc.ServicerContext) -> Empty:
return Empty() return Empty()
...@@ -12,15 +12,17 @@ ...@@ -12,15 +12,17 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging, threading import json, logging, threading
from typing import Any from typing import Any
from common.proto.context_pb2 import EventTypeEnum from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, INTERDOMAIN_TOPOLOGY_UUID
from common.proto.context_pb2 import ContextId, Device, EventTypeEnum, Link
from common.proto.dlt_gateway_pb2 import DltRecordTypeEnum from common.proto.dlt_gateway_pb2 import DltRecordTypeEnum
from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
from dlt.connector.client.DltEventsCollector import DltEventsCollector from dlt.connector.client.DltEventsCollector import DltEventsCollector
from dlt.connector.client.DltGatewayClient import DltGatewayClient from dlt.connector.client.DltGatewayClient import DltGatewayClient
from dlt.connector.service.tools.OwnDomainFinder import OwnDomainFinder from .Tools import add_device_to_topology, add_link_to_topology, create_context, create_topology
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -29,6 +31,7 @@ GET_EVENT_TIMEOUT = 0.5 ...@@ -29,6 +31,7 @@ GET_EVENT_TIMEOUT = 0.5
class DltEventDispatcher(threading.Thread): class DltEventDispatcher(threading.Thread):
def __init__(self) -> None: def __init__(self) -> None:
LOGGER.debug('Creating connector...') LOGGER.debug('Creating connector...')
super().__init__(name='DltEventDispatcher', daemon=True)
self._terminate = threading.Event() self._terminate = threading.Event()
LOGGER.debug('Connector created') LOGGER.debug('Connector created')
...@@ -40,17 +43,27 @@ class DltEventDispatcher(threading.Thread): ...@@ -40,17 +43,27 @@ class DltEventDispatcher(threading.Thread):
self._terminate.set() self._terminate.set()
def run(self) -> None: def run(self) -> None:
own_domain_finder = OwnDomainFinder()
context_client = ContextClient() context_client = ContextClient()
create_context(context_client, DEFAULT_CONTEXT_UUID)
create_topology(context_client, DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID)
create_topology(context_client, DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID)
dlt_gateway_client = DltGatewayClient() dlt_gateway_client = DltGatewayClient()
dlt_events_collector = DltEventsCollector(dlt_gateway_client, log_events_received=True) dlt_events_collector = DltEventsCollector(dlt_gateway_client, log_events_received=True)
dlt_events_collector.start() dlt_events_collector.start()
while not self._terminate.is_set(): while not self._terminate.is_set():
event = dlt_events_collector.get_event(block=False, timeout=GET_EVENT_TIMEOUT) event = dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
if event is None: continue if event is None: continue
if event.record_id.domain_uuid.uuid == own_domain_finder.own_domain_id:
existing_topology_ids = context_client.ListTopologyIds(ContextId(**json_context_id(DEFAULT_CONTEXT_UUID)))
existing_topology_uuids = {
topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids
}
existing_topology_uuids.discard(DEFAULT_TOPOLOGY_UUID)
existing_topology_uuids.discard(INTERDOMAIN_TOPOLOGY_UUID)
if event.record_id.domain_uuid.uuid in existing_topology_uuids:
LOGGER.info('Ignoring DLT event received (local): {:s}'.format(grpc_message_to_json_string(event))) LOGGER.info('Ignoring DLT event received (local): {:s}'.format(grpc_message_to_json_string(event)))
else: else:
LOGGER.info('DLT event received (remote): {:s}'.format(grpc_message_to_json_string(event))) LOGGER.info('DLT event received (remote): {:s}'.format(grpc_message_to_json_string(event)))
...@@ -60,13 +73,31 @@ class DltEventDispatcher(threading.Thread): ...@@ -60,13 +73,31 @@ class DltEventDispatcher(threading.Thread):
dlt_gateway_client.close() dlt_gateway_client.close()
context_client.close() context_client.close()
def dispatch_event(self, context_client : ContextClient, dlt_gateway_client : DltGatewayClient, event : Any): def dispatch_event(
timestamp : float = event.event.timestamp.timestamp self, context_client : ContextClient, dlt_gateway_client : DltGatewayClient, event : Any
) -> None:
event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
domain_uuid : str = event.record_id.domain_uuid.uuid
record_type : DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE} record_type : DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE}
record_uuid : str = event.record_id.record_uuid.uuid
LOGGER.info('[dispatch_event] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) LOGGER.info('[dispatch_event] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
record = dlt_gateway_client.GetFromDlt(event.record_id) record = dlt_gateway_client.GetFromDlt(event.record_id)
LOGGER.info('[dispatch_event] record={:s}'.format(grpc_message_to_json_string(record))) LOGGER.info('[dispatch_event] record={:s}'.format(grpc_message_to_json_string(record)))
if record_type == DltRecordTypeEnum.DLTRECORDTYPE_DEVICE:
if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
device = Device(**json.loads(record.data_json))
context_client.SetDevice(device)
device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member
add_device_to_topology(context_client, DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID, device_uuid)
elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
raise NotImplementedError('Delete Device')
elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_LINK:
if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
link = Link(**json.loads(record.data_json))
context_client.SetLink(link)
link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member
add_link_to_topology(context_client, DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID, link_uuid)
elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
raise NotImplementedError('Delete Link')
else:
raise NotImplementedError('EventType: {:s}'.format(grpc_message_to_json_string(event)))
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.tools.object_factory.Context import json_context, json_context_id
from common.tools.object_factory.Topology import json_topology, json_topology_id
from common.proto.context_pb2 import Context, ContextId, Empty, Topology, TopologyId
from context.client.ContextClient import ContextClient
def create_context(
context_client : ContextClient, context_uuid : str
) -> None:
existing_context_ids = context_client.ListContextIds(Empty())
existing_context_uuids = {context_id.context_uuid.uuid for context_id in existing_context_ids.context_ids}
if context_uuid in existing_context_uuids: return
context_client.SetContext(Context(**json_context(context_uuid)))
def create_topology(
context_client : ContextClient, context_uuid : str, topology_uuid : str
) -> None:
context_id = ContextId(**json_context_id(context_uuid))
existing_topology_ids = context_client.ListTopologyIds(context_id)
existing_topology_uuids = {topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids}
if topology_uuid in existing_topology_uuids: return
context_client.SetTopology(Topology(**json_topology(topology_uuid, context_id=context_id)))
def add_topology_to_context(
context_client : ContextClient, context_uuid : str, topology_uuid : str
) -> None:
context_id = ContextId(**json_context_id(context_uuid))
context_ro = context_client.GetContext(context_id)
topology_uuids = {topology_id.topology_uuid.uuid for topology_id in context_ro.topology_ids}
if topology_uuid in topology_uuids: return
context_rw = Context()
context_rw.CopyFrom(context_ro)
context_rw.topology_ids.add().topology_uuid.uuid = topology_uuid # pylint: disable=no-member
context_client.SetContext(context_rw)
def add_device_to_topology(
context_client : ContextClient, context_uuid : str, topology_uuid : str, device_uuid : str
) -> None:
context_id = ContextId(**json_context_id(context_uuid))
topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id))
topology_ro = context_client.GetTopology(topology_id)
device_uuids = {device_id.device_uuid.uuid for device_id in topology_ro.device_ids}
if device_uuid in device_uuids: return
topology_rw = Topology()
topology_rw.CopyFrom(topology_ro)
topology_rw.device_ids.add().device_uuid.uuid = device_uuid # pylint: disable=no-member
context_client.SetTopology(topology_rw)
def add_link_to_topology(
context_client : ContextClient, context_uuid : str, topology_uuid : str, link_uuid : str
) -> None:
context_id = ContextId(**json_context_id(context_uuid))
topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id))
topology_ro = context_client.GetTopology(topology_id)
link_uuids = {link_id.link_uuid.uuid for link_id in topology_ro.link_ids}
if link_uuid in link_uuids: return
topology_rw = Topology()
topology_rw.CopyFrom(topology_ro)
topology_rw.link_ids.add().link_uuid.uuid = link_uuid # pylint: disable=no-member
context_client.SetTopology(topology_rw)
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, logging
from typing import Optional
from common.Constants import DEFAULT_CONTEXT_UUID
from common.proto.context_pb2 import Empty
from context.client.ContextClient import ContextClient
LOGGER = logging.getLogger(__name__)
class OwnDomainFinder:
def __new__(cls):
if not hasattr(cls, 'instance'):
cls.instance = super(OwnDomainFinder, cls).__new__(cls)
return cls.instance
def __init__(self) -> None:
self.__own_domain_uuid : Optional[str] = None
@property
def own_domain_uuid(self) -> str:
if self.__own_domain_uuid is not None: return self.__own_domain_uuid
context_client = ContextClient()
existing_context_ids = context_client.ListContextIds(Empty())
existing_context_uuids = {context_id.context_uuid.uuid for context_id in existing_context_ids.context_ids}
# Detect local context name (will be used as abstracted device name); exclude DEFAULT_CONTEXT_UUID
existing_non_admin_context_uuids = copy.deepcopy(existing_context_uuids)
existing_non_admin_context_uuids.discard(DEFAULT_CONTEXT_UUID)
if len(existing_non_admin_context_uuids) != 1:
MSG = 'Unable to identify own domain name. Existing Contexts({:s})'
raise Exception(MSG.format(str(existing_context_uuids)))
self.__own_domain_uuid = existing_non_admin_context_uuids.pop()
return self.__own_domain_uuid
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment