From 430382ca6b231d11910932f2fccb6444df0d4f15 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Fri, 28 Oct 2022 19:33:16 +0000 Subject: [PATCH] Interdomain component: - removed unneeded code in Dockerfile - created first partial version of TopologyAbstractor --- src/interdomain/Dockerfile | 7 +- src/interdomain/service/__main__.py | 12 +- .../topology_abstractor/TopologyAbstractor.py | 111 ++++++++++++++++++ .../service/topology_abstractor/__init__.py | 14 +++ .../tools/ContextMethods.py | 71 +++++++++++ .../topology_abstractor/tools/__init__.py | 14 +++ 6 files changed, 224 insertions(+), 5 deletions(-) create mode 100644 src/interdomain/service/topology_abstractor/TopologyAbstractor.py create mode 100644 src/interdomain/service/topology_abstractor/__init__.py create mode 100644 src/interdomain/service/topology_abstractor/tools/ContextMethods.py create mode 100644 src/interdomain/service/topology_abstractor/tools/__init__.py diff --git a/src/interdomain/Dockerfile b/src/interdomain/Dockerfile index 388fcb76d..036890dc4 100644 --- a/src/interdomain/Dockerfile +++ b/src/interdomain/Dockerfile @@ -63,10 +63,11 @@ RUN python3 -m pip install -r requirements.txt # Add component files into working directory WORKDIR /var/teraflow COPY src/context/. context/ -COPY src/device/. device/ +#COPY src/device/. device/ +COPY src/dlt/. dlt/ COPY src/interdomain/. interdomain/ -COPY src/monitoring/. monitoring/ -COPY src/service/. service/ +#COPY src/monitoring/. monitoring/ +#COPY src/service/. service/ COPY src/slice/. slice/ # Start the service diff --git a/src/interdomain/service/__main__.py b/src/interdomain/service/__main__.py index c0a078f4d..b5463d7d8 100644 --- a/src/interdomain/service/__main__.py +++ b/src/interdomain/service/__main__.py @@ -18,6 +18,7 @@ from common.Constants import ServiceNameEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, get_service_port_grpc, wait_for_environment_variables) +from .topology_abstractor.TopologyAbstractor import TopologyAbstractor from .InterdomainService import InterdomainService from .RemoteDomainClients import RemoteDomainClients @@ -40,6 +41,8 @@ def main(): get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_HOST ), get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ]) signal.signal(signal.SIGINT, signal_handler) @@ -58,14 +61,19 @@ def main(): grpc_service = InterdomainService(remote_domain_clients) grpc_service.start() + # Subscribe to Context Events + topology_abstractor = TopologyAbstractor() + topology_abstractor.start() + # TODO: improve with configuration the definition of the remote peers - interdomain_service_port_grpc = get_service_port_grpc(ServiceNameEnum.INTERDOMAIN) - remote_domain_clients.add_peer('remote-teraflow', 'remote-teraflow', interdomain_service_port_grpc) + #interdomain_service_port_grpc = get_service_port_grpc(ServiceNameEnum.INTERDOMAIN) + #remote_domain_clients.add_peer('remote-teraflow', 'remote-teraflow', interdomain_service_port_grpc) # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=0.1): pass LOGGER.info('Terminating...') + topology_abstractor.stop() grpc_service.stop() LOGGER.info('Bye') diff --git a/src/interdomain/service/topology_abstractor/TopologyAbstractor.py b/src/interdomain/service/topology_abstractor/TopologyAbstractor.py new file mode 100644 index 000000000..6773ded4a --- /dev/null +++ b/src/interdomain/service/topology_abstractor/TopologyAbstractor.py @@ -0,0 +1,111 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import logging, threading +from typing import List, Optional, Union +from common.Constants import AGGREGATED_TOPOLOGY_UUID, DOMAINS_TOPOLOGY_UUID +from common.proto.context_pb2 import ( + ConnectionEvent, ContextEvent, ContextId, DeviceEvent, DeviceId, LinkEvent, ServiceEvent, ServiceId, + SliceEvent, SliceId, TopologyEvent) +from context.client.ContextClient import ContextClient +from context.client.EventsCollector import EventsCollector +from dlt.connector.client.DltConnectorClient import DltConnectorClient +from .tools.ContextMethods import create_abstracted_device_if_not_exists, create_interdomain_entities + +LOGGER = logging.getLogger(__name__) + +DltRecordIdTypes = Union[DeviceId, SliceId, ServiceId] +EventTypes = Union[ + ContextEvent, TopologyEvent, DeviceEvent, LinkEvent, ServiceEvent, SliceEvent, ConnectionEvent +] + +class TopologyAbstractor(threading.Thread): + def __init__(self) -> None: + super().__init__(daemon=True) + self.terminate = threading.Event() + + self.context_client = ContextClient() + self.dlt_connector_client = DltConnectorClient() + self.context_event_collector = EventsCollector(self.context_client) + + self.own_context_id : Optional[ContextId] = None + self.own_abstract_device : Optional[ContextId] = None + + def stop(self): + self.terminate.set() + + def run(self) -> None: + self.context_client.connect() + self.dlt_connector_client.connect() + self.context_event_collector.start() + + while not self.terminate.is_set(): + event = self.context_event_collector.get_event(timeout=0.1) + if event is None: continue + if self.ignore_event(event): continue + # TODO: filter events resulting from abstraction computation + # TODO: filter events resulting from updating remote abstractions + LOGGER.info('Processing Event({:s})...'.format(str(event))) + dlt_records = self.update_abstraction(event) + self.send_dlt_records(dlt_records) + + self.context_event_collector.stop() + self.context_client.close() + self.dlt_connector_client.close() + + def ignore_event(self, event : EventTypes) -> List[DltRecordIdTypes]: + if isinstance(event, ContextEvent): + context_uuid = event.context_id.context_uuid.uuid + if self.own_context_id is None: return False + own_context_uuid = self.own_context_id.context_uuid.uuid + return context_uuid == own_context_uuid + elif isinstance(event, TopologyEvent): + context_uuid = event.topology_id.context_id.context_uuid.uuid + if self.own_context_id is None: return False + own_context_uuid = self.own_context_id.context_uuid.uuid + if context_uuid != own_context_uuid: return True + topology_uuid = event.topology_id.topology_uuid.uuid + if topology_uuid in {DOMAINS_TOPOLOGY_UUID, AGGREGATED_TOPOLOGY_UUID}: return True + return False + + def send_dlt_records(self, dlt_records : Union[DltRecordIdTypes, List[DltRecordIdTypes]]) -> None: + for dlt_record_id in dlt_records: + if isinstance(dlt_record_id, DeviceId): + self.dlt_connector_client.RecordDevice(dlt_record_id) + elif isinstance(dlt_record_id, ServiceId): + self.dlt_connector_client.RecordService(dlt_record_id) + elif isinstance(dlt_record_id, SliceId): + self.dlt_connector_client.RecordSlice(dlt_record_id) + else: + LOGGER.error('Unsupported Record({:s})'.format(str(dlt_record_id))) + + def update_abstraction(self, event : Optional[EventTypes] = None) -> List[DltRecordIdTypes]: + dlt_record_ids_with_changes = [] + + if self.own_context_id is None: + self.own_context_id = create_interdomain_entities(self.context_client) + + if self.own_abstract_device is None: + self.own_abstract_device = create_abstracted_device_if_not_exists(self.context_client, self.own_context_id) + dlt_record_ids_with_changes.append(self.own_abstract_device.device_id) + + if event is None: + # TODO: identify initial status from topology and update endpoints accordingly + pass + else: + # TODO: identify changes from event and update endpoints accordingly + pass + + return dlt_record_ids_with_changes diff --git a/src/interdomain/service/topology_abstractor/__init__.py b/src/interdomain/service/topology_abstractor/__init__.py new file mode 100644 index 000000000..70a332512 --- /dev/null +++ b/src/interdomain/service/topology_abstractor/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/interdomain/service/topology_abstractor/tools/ContextMethods.py b/src/interdomain/service/topology_abstractor/tools/ContextMethods.py new file mode 100644 index 000000000..7b8e2eada --- /dev/null +++ b/src/interdomain/service/topology_abstractor/tools/ContextMethods.py @@ -0,0 +1,71 @@ +import copy +from common.Constants import ( + AGGREGATED_TOPOLOGY_UUID, DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, DOMAINS_TOPOLOGY_UUID) +from common.DeviceTypes import DeviceTypeEnum +from common.proto.context_pb2 import ( + Context, ContextId, Device, DeviceDriverEnum, DeviceId, DeviceOperationalStatusEnum, Empty, Topology, TopologyId) +from common.tools.object_factory.Context import json_context, json_context_id +from common.tools.object_factory.Device import json_device, json_device_id +from common.tools.object_factory.Topology import json_topology, json_topology_id +from context.client.ContextClient import ContextClient + +def create_interdomain_entities(context_client : ContextClient) -> ContextId: + existing_context_ids = context_client.ListContextIds(Empty()) + existing_context_uuids = {context_id.context_uuid.uuid for context_id in existing_context_ids.context_ids} + + # Detect local context name (will be used as abstracted device name); exclude DEFAULT_CONTEXT_UUID + existing_non_admin_context_uuids = copy.deepcopy(existing_context_uuids) + existing_non_admin_context_uuids.discard(DEFAULT_CONTEXT_UUID) + if len(existing_non_admin_context_uuids) != 1: + MSG = 'Unable to identify own domain name. Existing Contexts({:s})' + raise Exception(MSG.format(str(existing_context_uuids))) + own_domain_uuid = existing_non_admin_context_uuids.pop() + own_context_id = ContextId(**json_context_id(own_domain_uuid)) + + #if DEFAULT_CONTEXT_UUID not in existing_context_uuids: + # context_client.SetContext(Context(**json_context(DEFAULT_CONTEXT_UUID))) + #admin_context_id = ContextId(**json_context_id(DEFAULT_CONTEXT_UUID)) + + # Create topologies "admin", "domains", and "aggregated" + existing_topology_ids = context_client.ListTopologyIds(own_context_id) + existing_topology_uuids = {topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids} + + topology_uuids = [DEFAULT_TOPOLOGY_UUID, DOMAINS_TOPOLOGY_UUID, AGGREGATED_TOPOLOGY_UUID] + for topology_uuid in topology_uuids: + if topology_uuid in existing_topology_uuids: continue + context_client.SetTopology(Topology(**json_topology(topology_uuid, context_id=own_context_id))) + + return own_context_id + +def create_abstracted_device_if_not_exists(context_client : ContextClient, own_context_id : ContextId) -> Device: + own_domain_uuid = own_context_id.context_uuid.uuid + + # Create device representing abstracted local domain + existing_device_ids = context_client.ListDeviceIds(Empty()) + existing_device_uuids = {device_id.device_uuid.uuid for device_id in existing_device_ids.device_ids} + own_abstract_device_id = DeviceId(**json_device_id(own_domain_uuid)) + if own_domain_uuid in existing_device_uuids: + own_abstract_device = context_client.GetDevice(own_abstract_device_id) + else: + own_abstracted_device_uuid = own_domain_uuid + own_abstract_device = Device(**json_device( + own_abstracted_device_uuid, DeviceTypeEnum.NETWORK.value, + DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED, + endpoints=[], config_rules=[], drivers=[DeviceDriverEnum.DEVICEDRIVER_UNDEFINED] + )) + context_client.SetDevice(own_abstract_device) + + # Add own abstracted device to topologies ["domains"] + topology_uuids = [DOMAINS_TOPOLOGY_UUID] + for topology_uuid in topology_uuids: + topology_id = TopologyId(**json_topology_id(topology_uuid, own_context_id)) + topology_ro = context_client.GetTopology(topology_id) + device_uuids = {device_id.device_uuid.uuid for device_id in topology_ro.device_ids} + if own_abstracted_device_uuid in device_uuids: continue + + topology_rw = Topology() + topology_rw.CopyFrom(topology_ro) + topology_rw.device_ids.add().device_uuid.uuid = own_abstracted_device_uuid + context_client.SetTopology(topology_rw) + + return own_abstract_device diff --git a/src/interdomain/service/topology_abstractor/tools/__init__.py b/src/interdomain/service/topology_abstractor/tools/__init__.py new file mode 100644 index 000000000..70a332512 --- /dev/null +++ b/src/interdomain/service/topology_abstractor/tools/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + -- GitLab