diff --git a/src/common/Constants.py b/src/common/Constants.py index c9801621c2a172368bb51847074fd536eceb1f89..964d904da704324d6def548103675e815743d818 100644 --- a/src/common/Constants.py +++ b/src/common/Constants.py @@ -30,9 +30,9 @@ DEFAULT_HTTP_BIND_ADDRESS = '0.0.0.0' DEFAULT_METRICS_PORT = 9192 # Default context and topology UUIDs -DEFAULT_CONTEXT_UUID = 'admin' -DEFAULT_TOPOLOGY_UUID = 'admin' # contains the detailed local topology -DOMAINS_TOPOLOGY_UUID = 'domains' # contains the abstracted domains (abstracted local + abstracted remotes) +DEFAULT_CONTEXT_UUID = 'admin' +DEFAULT_TOPOLOGY_UUID = 'admin' # contains the detailed local topology +INTERDOMAIN_TOPOLOGY_UUID = 'inter' # contains the abstract inter-domain topology # Default service names class ServiceNameEnum(Enum): diff --git a/src/interdomain/service/topology_abstractor/AbstractDevice.py b/src/interdomain/service/topology_abstractor/AbstractDevice.py index a964989c8a30f8a14836fd8cf2a1e46090aa07f1..443fc18cddd450eefb493206b944049814466be7 100644 --- a/src/interdomain/service/topology_abstractor/AbstractDevice.py +++ b/src/interdomain/service/topology_abstractor/AbstractDevice.py @@ -13,178 +13,178 @@ # limitations under the License. import copy, logging -from typing import Dict, Optional, Tuple -from common.Constants import DEFAULT_TOPOLOGY_UUID, DOMAINS_TOPOLOGY_UUID +from typing import Dict, Optional +from common.Constants import DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID from common.DeviceTypes import DeviceTypeEnum from common.proto.context_pb2 import ( ContextId, Device, DeviceDriverEnum, DeviceId, DeviceOperationalStatusEnum, EndPoint) +from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Device import json_device, json_device_id from context.client.ContextClient import ContextClient -from interdomain.service.topology_abstractor.Tools import ( - add_device_to_topology, create_missing_topologies, find_own_domain_uuid, get_devices_in_topology, +from .Tools import ( + add_device_to_topology, device_type_is_datacenter, device_type_is_network, endpoint_type_is_border, get_existing_device_uuids) LOGGER = logging.getLogger(__name__) class AbstractDevice: - def __init__(self): + def __init__(self, device_uuid : str, device_type : DeviceTypeEnum): self.__context_client = ContextClient() + self.__device_uuid : str = device_uuid + self.__device_type : DeviceTypeEnum = device_type + self.__device : Optional[Device] = None + self.__device_id : Optional[DeviceId] = None - self.__own_context_id : Optional[ContextId] = None - self.__own_domain_uuid : Optional[str] = None # uuid of own_context_id + # Dict[device_uuid, Dict[endpoint_uuid, abstract EndPoint]] + self.__device_endpoint_to_abstract : Dict[str, Dict[str, EndPoint]] = dict() - self.__own_abstract_device : Optional[Device] = None - self.__own_abstract_device_id : Optional[DeviceId] = None - - # Dict[device_uuid, Dict[endpoint_uuid, Tuple[interdomain_endpoint_uuid, abstract EndPoint]]] - self.__device_endpoint_to_abstract : Dict[str, Dict[str, Tuple[str, EndPoint]]] = dict() - - # Dict[interdomain_endpoint_uuid, Tuple[device_uuid, endpoint_uuid]] - self.__abstract_to_device_endpoint : Dict[str, Tuple[str, str]] = dict() + # Dict[endpoint_uuid, device_uuid] + self.__abstract_endpoint_to_device : Dict[str, str] = dict() @property - def own_context_id(self): return self.__own_context_id + def uuid(self) -> str: return self.__device_uuid @property - def own_domain_uuid(self): return self.__own_domain_uuid + def device_id(self) -> Optional[DeviceId]: return self.__device_id @property - def own_abstract_device_uuid(self): return self.__own_domain_uuid + def device(self) -> Optional[Device]: return self.__device - @property - def own_abstract_device_id(self): return self.__own_abstract_device_id + def get_endpoint(self, device_uuid : str, endpoint_uuid : str) -> Optional[EndPoint]: + return self.__device_endpoint_to_abstract.get(device_uuid, {}).get(endpoint_uuid) - @property - def own_abstract_device(self): return self.__own_abstract_device + def initialize(self) -> bool: + if self.__device is not None: return False - def _load_existing_abstract_device(self) -> None: - self.__device_endpoint_to_abstract = dict() - self.__abstract_to_device_endpoint = dict() - - self.__own_abstract_device_id = DeviceId(**json_device_id(self.__own_domain_uuid)) - self.__own_abstract_device = self.__context_client.GetDevice(self.__own_abstract_device_id) + existing_device_uuids = get_existing_device_uuids(self.__context_client) + create_abstract_device = self.__device_uuid not in existing_device_uuids - # for each endpoint in own_abstract_device, populate internal data structures and mappings - for interdomain_endpoint in self.__own_abstract_device.device_endpoints: - interdomain_endpoint_uuid : str = interdomain_endpoint.endpoint_id.endpoint_uuid.uuid - endpoint_uuid,device_uuid = interdomain_endpoint_uuid.split('@', maxsplit=1) + if create_abstract_device: + self._create_empty() + else: + self._load_existing() + + is_datacenter = device_type_is_datacenter(self.__device_type) + is_network = device_type_is_network(self.__device_type) + if is_datacenter or is_network: + # Add abstract device to topologies [INTERDOMAIN_TOPOLOGY_UUID] + context_id = ContextId(**json_context_id(DEFAULT_CONTEXT_UUID)) + topology_uuids = [INTERDOMAIN_TOPOLOGY_UUID] + for topology_uuid in topology_uuids: + add_device_to_topology(self.__context_client, context_id, topology_uuid, self.__device_uuid) + + # seems not needed; to be removed in future releases + #if is_datacenter and create_abstract_device: + # dc_device = self.__context_client.GetDevice(DeviceId(**json_device_id(self.__device_uuid))) + # if device_type_is_datacenter(dc_device.device_type): + # self.update_endpoints(dc_device) + #elif is_network: + # devices_in_admin_topology = get_devices_in_topology( + # self.__context_client, context_id, DEFAULT_TOPOLOGY_UUID) + # for device in devices_in_admin_topology: + # if device_type_is_datacenter(device.device_type): continue + # self.update_endpoints(device) - interdomain_endpoint_tuple = (interdomain_endpoint_uuid, interdomain_endpoint) - self.__device_endpoint_to_abstract\ - .setdefault(device_uuid, {}).setdefault(endpoint_uuid, interdomain_endpoint_tuple) - self.__abstract_to_device_endpoint\ - .setdefault(interdomain_endpoint_uuid, (device_uuid, endpoint_uuid)) + return True - def _create_empty_abstract_device(self) -> None: - own_abstract_device_uuid = self.__own_domain_uuid + def _create_empty(self) -> None: + device_uuid = self.__device_uuid - own_abstract_device = Device(**json_device( - own_abstract_device_uuid, DeviceTypeEnum.NETWORK.value, - DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED, + device = Device(**json_device( + device_uuid, self.__device_type.value, DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED, endpoints=[], config_rules=[], drivers=[DeviceDriverEnum.DEVICEDRIVER_UNDEFINED] )) - self.__context_client.SetDevice(own_abstract_device) - self.__own_abstract_device = own_abstract_device - self.__own_abstract_device_id = self.__own_abstract_device.device_id + self.__context_client.SetDevice(device) + self.__device = device + self.__device_id = self.__device.device_id - # Add own abstract device to topologies ["domains"] - topology_uuids = [DOMAINS_TOPOLOGY_UUID] - for topology_uuid in topology_uuids: - add_device_to_topology( - self.__context_client, self.__own_context_id, topology_uuid, own_abstract_device_uuid) + def _load_existing(self) -> None: + self.__device_endpoint_to_abstract = dict() + self.__abstract_endpoint_to_device = dict() - def _discover_or_create_abstract_device(self) -> bool: - # already discovered - if self.__own_abstract_device is not None: return False + self.__device_id = DeviceId(**json_device_id(self.__device_uuid)) + self.__device = self.__context_client.GetDevice(self.__device_id) + self.__device_type = self.__device.device_type + device_uuid = self.__device_id.device_uuid.uuid - # discover from existing devices; should have name of the own domain context - existing_device_uuids = get_existing_device_uuids(self.__context_client) - create_abstract_device = self.__own_domain_uuid not in existing_device_uuids - if create_abstract_device: - self._create_empty_abstract_device() - else: - self._load_existing_abstract_device() - return create_abstract_device + device_type = self.__device_type + is_datacenter = device_type_is_datacenter(device_type) + is_network = device_type_is_network(device_type) + if not is_datacenter and not is_network: + LOGGER.warning('Unsupported InterDomain Device Type: {:s}'.format(str(device_type))) + return + + # for each endpoint in abstract device, populate internal data structures and mappings + for interdomain_endpoint in self.__device.device_endpoints: + endpoint_uuid : str = interdomain_endpoint.endpoint_id.endpoint_uuid.uuid + + if is_network: + endpoint_uuid,device_uuid = endpoint_uuid.split('@', maxsplit=1) + + self.__device_endpoint_to_abstract\ + .setdefault(device_uuid, {}).setdefault(endpoint_uuid, interdomain_endpoint) + self.__abstract_endpoint_to_device\ + .setdefault(endpoint_uuid, device_uuid) def _update_endpoint_type(self, device_uuid : str, endpoint_uuid : str, endpoint_type : str) -> bool: device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_uuid, {}) - interdomain_endpoint_tuple = device_endpoint_to_abstract.get(endpoint_uuid) - _, interdomain_endpoint = interdomain_endpoint_tuple + interdomain_endpoint = device_endpoint_to_abstract.get(endpoint_uuid) interdomain_endpoint_type = interdomain_endpoint.endpoint_type if endpoint_type == interdomain_endpoint_type: return False interdomain_endpoint.endpoint_type = endpoint_type return True - def _add_interdomain_endpoint( - self, device_uuid : str, endpoint_uuid : str, endpoint_type : str, interdomain_endpoint_uuid : str - ) -> EndPoint: - interdomain_endpoint = self.__own_abstract_device.device_endpoints.add() - interdomain_endpoint.endpoint_id.device_id.CopyFrom(self.__own_abstract_device_id) - interdomain_endpoint.endpoint_id.endpoint_uuid.uuid = interdomain_endpoint_uuid + def _add_endpoint(self, device_uuid : str, endpoint_uuid : str, endpoint_type : str) -> EndPoint: + interdomain_endpoint = self.__device.device_endpoints.add() + interdomain_endpoint.endpoint_id.device_id.CopyFrom(self.__device_id) + interdomain_endpoint.endpoint_id.endpoint_uuid.uuid = endpoint_uuid interdomain_endpoint.endpoint_type = endpoint_type - interdomain_endpoint_tuple = (interdomain_endpoint_uuid, interdomain_endpoint) self.__device_endpoint_to_abstract\ - .setdefault(device_uuid, {}).setdefault(endpoint_uuid, interdomain_endpoint_tuple) - self.__abstract_to_device_endpoint\ - .setdefault(interdomain_endpoint_uuid, (device_uuid, endpoint_uuid)) + .setdefault(device_uuid, {}).setdefault(endpoint_uuid, interdomain_endpoint) + self.__abstract_endpoint_to_device\ + .setdefault(endpoint_uuid, device_uuid) return interdomain_endpoint - def _remove_interdomain_endpoint( - self, device_uuid : str, endpoint_uuid : str, interdomain_endpoint_tuple : Tuple[str, EndPoint] + def _remove_endpoint( + self, device_uuid : str, endpoint_uuid : str, interdomain_endpoint : EndPoint ) -> None: - interdomain_endpoint_uuid, interdomain_endpoint = interdomain_endpoint_tuple - self.__abstract_to_device_endpoint.pop(interdomain_endpoint_uuid, None) + self.__abstract_endpoint_to_device.pop(endpoint_uuid, None) device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_uuid, {}) device_endpoint_to_abstract.pop(endpoint_uuid, None) - self.__own_abstract_device.device_endpoints.remove(interdomain_endpoint) + self.__device.device_endpoints.remove(interdomain_endpoint) + + def update_endpoints(self, device : Device) -> bool: + if device_type_is_datacenter(self.__device.device_type): return False - def update_abstract_device_endpoints(self, device : Device) -> bool: device_uuid = device.device_id.device_uuid.uuid - LOGGER device_border_endpoint_uuids = { endpoint.endpoint_id.endpoint_uuid.uuid : endpoint.endpoint_type for endpoint in device.device_endpoints - if str(endpoint.endpoint_type).endswith('/border') + if endpoint_type_is_border(endpoint.endpoint_type) } updated = False - # for each border endpoint in own_abstract_device that is not in device; remove from own_abstract_device + # for each border endpoint in abstract device that is not in device; remove from abstract device device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_uuid, {}) _device_endpoint_to_abstract = copy.deepcopy(device_endpoint_to_abstract) - for endpoint_uuid, interdomain_endpoint_tuple in _device_endpoint_to_abstract.items(): + for endpoint_uuid, interdomain_endpoint in _device_endpoint_to_abstract.items(): if endpoint_uuid in device_border_endpoint_uuids: continue # remove interdomain endpoint that is not in device - self._remove_interdomain_endpoint(device_uuid, endpoint_uuid, interdomain_endpoint_tuple) + self._remove_endpoint(device_uuid, endpoint_uuid, interdomain_endpoint) updated = True - # for each border endpoint in device that is not in own_abstract_device; add to own_abstract_device + # for each border endpoint in device that is not in abstract device; add to abstract device for endpoint_uuid,endpoint_type in device_border_endpoint_uuids.items(): - # compose interdomain endpoint uuid - interdomain_endpoint_uuid = '{:s}@{:s}'.format(endpoint_uuid, device_uuid) - # if already added; just check endpoint type is not modified - if interdomain_endpoint_uuid in self.__abstract_to_device_endpoint: + if endpoint_uuid in self.__abstract_endpoint_to_device: updated = updated or self._update_endpoint_type(device_uuid, endpoint_uuid, endpoint_type) continue # otherwise, add it to the abstract device - self._add_interdomain_endpoint(device_uuid, endpoint_uuid, endpoint_type, interdomain_endpoint_uuid) + self._add_endpoint(device_uuid, endpoint_uuid, endpoint_type) updated = True return updated - - def initialize(self) -> Optional[bool]: - if self.__own_abstract_device is not None: return False - - # Discover or Create device representing abstract local domain - self._discover_or_create_abstract_device() - - devices_in_admin_topology = get_devices_in_topology( - self.__context_client, self.__own_context_id, DEFAULT_TOPOLOGY_UUID) - for device in devices_in_admin_topology: - self.update_abstract_device_endpoints(device) - - return True diff --git a/src/interdomain/service/topology_abstractor/AbstractLink.py b/src/interdomain/service/topology_abstractor/AbstractLink.py new file mode 100644 index 0000000000000000000000000000000000000000..2481f427a55dd284046db067f12061bc4d2263b5 --- /dev/null +++ b/src/interdomain/service/topology_abstractor/AbstractLink.py @@ -0,0 +1,126 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy, logging +from typing import Dict, List, Optional, Tuple +from common.Constants import DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID +from common.proto.context_pb2 import ContextId, EndPointId, Link, LinkId +from common.tools.object_factory.Context import json_context_id +from common.tools.object_factory.Link import json_link, json_link_id +from context.client.ContextClient import ContextClient +from .Tools import add_link_to_topology, get_existing_link_uuids + +LOGGER = logging.getLogger(__name__) + +class AbstractLink: + def __init__(self, link_uuid : str): + self.__context_client = ContextClient() + self.__link_uuid : str = link_uuid + self.__link : Optional[Link] = None + self.__link_id : Optional[LinkId] = None + + # Dict[(device_uuid, endpoint_uuid), abstract EndPointId] + self.__device_endpoint_to_abstract : Dict[Tuple[str, str], EndPointId] = dict() + + @property + def uuid(self) -> str: return self.__link_uuid + + @property + def link_id(self) -> Optional[LinkId]: return self.__link_id + + @property + def link(self) -> Optional[Link]: return self.__link + + @staticmethod + def compose_uuid( + device_uuid_a : str, endpoint_uuid_a : str, device_uuid_z : str, endpoint_uuid_z : str + ) -> str: + # sort endpoints lexicographically to prevent duplicities + link_endpoint_uuids = sorted([ + (device_uuid_a, endpoint_uuid_a), + (device_uuid_z, endpoint_uuid_z) + ]) + link_uuid = '{:s}/{:s}=={:s}/{:s}'.format( + link_endpoint_uuids[0][0], link_endpoint_uuids[0][1], + link_endpoint_uuids[1][0], link_endpoint_uuids[1][1]) + return link_uuid + + def initialize(self) -> bool: + if self.__link is not None: return False + + existing_link_uuids = get_existing_link_uuids(self.__context_client) + + create = self.__link_uuid not in existing_link_uuids + if create: + self._create_empty() + else: + self._load_existing() + + # Add abstract link to topologies [INTERDOMAIN_TOPOLOGY_UUID] + context_id = ContextId(**json_context_id(DEFAULT_CONTEXT_UUID)) + topology_uuids = [INTERDOMAIN_TOPOLOGY_UUID] + for topology_uuid in topology_uuids: + add_link_to_topology(self.__context_client, context_id, topology_uuid, self.__link_uuid) + + return create + + def _create_empty(self) -> None: + link = Link(**json_link(self.__link_uuid, endpoint_ids=[])) + self.__context_client.SetLink(link) + self.__link = link + self.__link_id = self.__link.link_id + + def _load_existing(self) -> None: + self.__link_id = LinkId(**json_link_id(self.__link_uuid)) + self.__link = self.__context_client.GetLink(self.__link_id) + + self.__device_endpoint_to_abstract = dict() + + # for each endpoint in abstract link, populate internal data structures and mappings + for endpoint_id in self.__link.link_endpoint_ids: + device_uuid : str = endpoint_id.device_id.device_uuid.uuid + endpoint_uuid : str = endpoint_id.endpoint_uuid.uuid + self.__device_endpoint_to_abstract.setdefault((device_uuid, endpoint_uuid), endpoint_id) + + def _add_endpoint(self, device_uuid : str, endpoint_uuid : str) -> None: + endpoint_id = self.__link.link_endpoint_ids.add() + endpoint_id.device_id.device_uuid.uuid = device_uuid + endpoint_id.endpoint_uuid.uuid = endpoint_uuid + self.__device_endpoint_to_abstract.setdefault((device_uuid, endpoint_uuid), endpoint_id) + + def _remove_endpoint(self, device_uuid : str, endpoint_uuid : str) -> None: + device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_uuid, {}) + endpoint_id = device_endpoint_to_abstract.pop(endpoint_uuid, None) + if endpoint_id is not None: self.__link.link_endpoint_ids.remove(endpoint_id) + + def update_endpoints(self, link_endpoint_uuids : List[Tuple[str, str]] = []) -> bool: + updated = False + + # for each endpoint in abstract link that is not in link; remove from abstract link + device_endpoint_to_abstract = copy.deepcopy(self.__device_endpoint_to_abstract) + for device_uuid, endpoint_uuid in device_endpoint_to_abstract.keys(): + if (device_uuid, endpoint_uuid) in link_endpoint_uuids: continue + # remove endpoint_id that is not in link + self._remove_endpoint(device_uuid, endpoint_uuid) + updated = True + + # for each endpoint in link that is not in abstract link; add to abstract link + for device_uuid, endpoint_uuid in link_endpoint_uuids: + # if already added; just check endpoint type is not modified + if (device_uuid, endpoint_uuid) in self.__device_endpoint_to_abstract: continue + # otherwise, add it to the abstract device + self._add_endpoint(device_uuid, endpoint_uuid) + updated = True + + return updated diff --git a/src/interdomain/service/topology_abstractor/DltRecordSender.py b/src/interdomain/service/topology_abstractor/DltRecordSender.py new file mode 100644 index 0000000000000000000000000000000000000000..f7e3d81dded18c7406b54389cbe128c0fd27d7b4 --- /dev/null +++ b/src/interdomain/service/topology_abstractor/DltRecordSender.py @@ -0,0 +1,91 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import Dict, List, Tuple +from common.proto.context_pb2 import Device, Link, Service, Slice, TopologyId +from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId +from context.client.ContextClient import ContextClient +from dlt.connector.client.DltConnectorClient import DltConnectorClient +from .Types import DltRecordTypes + +LOGGER = logging.getLogger(__name__) + +class DltRecordSender: + def __init__(self, context_client : ContextClient, dlt_connector_client : DltConnectorClient) -> None: + self.context_client = context_client + self.dlt_connector_client = dlt_connector_client + self.dlt_record_uuids : List[str] = list() + self.dlt_record_uuid_to_data : Dict[str, Tuple[TopologyId, DltRecordTypes]] = dict() + + def _add_record(self, record_uuid : str, data : Tuple[TopologyId, DltRecordTypes]) -> None: + if record_uuid in self.dlt_record_uuid_to_data: return + self.dlt_record_uuid_to_data[record_uuid] = data + self.dlt_record_uuids.append(record_uuid) + + def add_device(self, topology_id : TopologyId, device : Device) -> None: + topology_uuid = topology_id.topology_uuid.uuid + device_uuid = device.device_id.device_uuid.uuid + record_uuid = '{:s}:device:{:s}'.format(topology_uuid, device_uuid) + self._add_record(record_uuid, (topology_id, device)) + + def add_link(self, topology_id : TopologyId, link : Link) -> None: + topology_uuid = topology_id.topology_uuid.uuid + link_uuid = link.link_id.link_uuid.uuid + record_uuid = '{:s}:link:{:s}'.format(topology_uuid, link_uuid) + self._add_record(record_uuid, (topology_id, link)) + + def add_service(self, topology_id : TopologyId, service : Service) -> None: + topology_uuid = topology_id.topology_uuid.uuid + context_uuid = service.service_id.context_id.context_uuid.uuid + service_uuid = service.service_id.service_uuid.uuid + record_uuid = '{:s}:service:{:s}/{:s}'.format(topology_uuid, context_uuid, service_uuid) + self._add_record(record_uuid, (topology_id, service)) + + def add_slice(self, topology_id : TopologyId, slice_ : Slice) -> None: + topology_uuid = topology_id.topology_uuid.uuid + context_uuid = slice_.slice_id.context_id.context_uuid.uuid + slice_uuid = slice_.slice_id.slice_uuid.uuid + record_uuid = '{:s}:slice:{:s}/{:s}'.format(topology_uuid, context_uuid, slice_uuid) + self._add_record(record_uuid, (topology_id, slice_)) + + def commit(self) -> None: + for dlt_record_uuid in self.dlt_record_uuids: + topology_id,dlt_record = self.dlt_record_uuid_to_data[dlt_record_uuid] + if isinstance(dlt_record, Device): + device_id = self.context_client.SetDevice(dlt_record) + dlt_device_id = DltDeviceId() + dlt_device_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member + dlt_device_id.device_id.CopyFrom(device_id) # pylint: disable=no-member + self.dlt_connector_client.RecordDevice(dlt_device_id) + elif isinstance(dlt_record, Link): + link_id = self.context_client.SetLink(dlt_record) + dlt_link_id = DltLinkId() + dlt_link_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member + dlt_link_id.link_id.CopyFrom(link_id) # pylint: disable=no-member + self.dlt_connector_client.RecordLink(dlt_link_id) + elif isinstance(dlt_record, Service): + service_id = self.context_client.SetService(dlt_record) + dlt_service_id = DltServiceId() + dlt_service_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member + dlt_service_id.service_id.CopyFrom(service_id) # pylint: disable=no-member + self.dlt_connector_client.RecordService(dlt_service_id) + elif isinstance(dlt_record, Slice): + slice_id = self.context_client.SetSlice(dlt_record) + dlt_slice_id = DltSliceId() + dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member + dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member + self.dlt_connector_client.RecordSlice(dlt_slice_id) + else: + LOGGER.error('Unsupported Record({:s})'.format(str(dlt_record))) diff --git a/src/interdomain/service/topology_abstractor/OwnDomainFinder.py b/src/interdomain/service/topology_abstractor/OwnDomainFinder.py deleted file mode 100644 index e4e47bd6043637cea72ed8183434471d318dcb39..0000000000000000000000000000000000000000 --- a/src/interdomain/service/topology_abstractor/OwnDomainFinder.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import copy, logging -from typing import Optional, Set -from common.Constants import DEFAULT_CONTEXT_UUID -from common.proto.context_pb2 import Empty -from context.client.ContextClient import ContextClient - -LOGGER = logging.getLogger(__name__) - -class OwnDomainFinder: - def __new__(cls): - if not hasattr(cls, 'instance'): - cls.instance = super(OwnDomainFinder, cls).__new__(cls) - return cls.instance - - def __init__(self) -> None: - self.__context_client = ContextClient() - self.__own_domain_uuid : Optional[str] = None - self.__existing_context_uuids : Optional[Set[str]] = None - - def __update(self) -> None: - existing_context_ids = self.__context_client.ListContextIds(Empty()) - existing_context_uuids = {context_id.context_uuid.uuid for context_id in existing_context_ids.context_ids} - - # Detect local context name (will be used as abstracted device name); exclude DEFAULT_CONTEXT_UUID - existing_non_admin_context_uuids = copy.deepcopy(existing_context_uuids) - existing_non_admin_context_uuids.discard(DEFAULT_CONTEXT_UUID) - if len(existing_non_admin_context_uuids) != 1: - MSG = 'Unable to identify own domain name. Existing Contexts({:s})' - raise Exception(MSG.format(str(existing_context_uuids))) - - self.__own_domain_uuid = existing_non_admin_context_uuids.pop() - self.__existing_context_uuids = existing_context_uuids - - @property - def own_domain_uuid(self) -> Optional[str]: - if self.__own_domain_uuid is None: self.__update() - return self.__own_domain_uuid - - @property - def existing_context_uuids(self) -> Optional[Set[str]]: - if self.__existing_context_uuids is None: self.__update() - return self.__existing_context_uuids diff --git a/src/interdomain/service/topology_abstractor/Tools.py b/src/interdomain/service/topology_abstractor/Tools.py index 0f089c38ff8dc46b76b73eb734bf50d31414c215..7d7885b9929b1e54747b232c70f915ed1dab851f 100644 --- a/src/interdomain/service/topology_abstractor/Tools.py +++ b/src/interdomain/service/topology_abstractor/Tools.py @@ -12,16 +12,33 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy, logging -from typing import List, Set, Tuple -from common.Constants import DEFAULT_CONTEXT_UUID -from common.proto.context_pb2 import ContextId, Device, DeviceId, Empty, EndPointId, Topology, TopologyId -from common.tools.object_factory.Device import json_device_id +import logging +from typing import List, Set, Union +from common.DeviceTypes import DeviceTypeEnum +from common.proto.context_pb2 import Context, ContextId, Device, Empty, Link, Topology, TopologyId +from common.tools.object_factory.Context import json_context, json_context_id from common.tools.object_factory.Topology import json_topology, json_topology_id from context.client.ContextClient import ContextClient LOGGER = logging.getLogger(__name__) +def create_context( + context_client : ContextClient, context_uuid : str +) -> None: + existing_context_ids = context_client.ListContextIds(Empty()) + existing_context_uuids = {context_id.context_uuid.uuid for context_id in existing_context_ids.context_ids} + if context_uuid in existing_context_uuids: return + context_client.SetContext(Context(**json_context(context_uuid))) + +def create_topology( + context_client : ContextClient, context_uuid : str, topology_uuid : str +) -> None: + context_id = ContextId(**json_context_id(context_uuid)) + existing_topology_ids = context_client.ListTopologyIds(context_id) + existing_topology_uuids = {topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids} + if topology_uuid in existing_topology_uuids: return + context_client.SetTopology(Topology(**json_topology(topology_uuid, context_id=context_id))) + def create_missing_topologies( context_client : ContextClient, context_id : ContextId, topology_uuids : List[str] ) -> None: @@ -40,6 +57,11 @@ def get_existing_device_uuids(context_client : ContextClient) -> Set[str]: existing_device_uuids = {device_id.device_uuid.uuid for device_id in existing_device_ids.device_ids} return existing_device_uuids +def get_existing_link_uuids(context_client : ContextClient) -> Set[str]: + existing_link_ids = context_client.ListLinkIds(Empty()) + existing_link_uuids = {link_id.link_uuid.uuid for link_id in existing_link_ids.link_ids} + return existing_link_uuids + def add_device_to_topology( context_client : ContextClient, context_id : ContextId, topology_uuid : str, device_uuid : str ) -> bool: @@ -54,6 +76,20 @@ def add_device_to_topology( context_client.SetTopology(topology_rw) return True +def add_link_to_topology( + context_client : ContextClient, context_id : ContextId, topology_uuid : str, link_uuid : str +) -> bool: + topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id)) + topology_ro = context_client.GetTopology(topology_id) + link_uuids = {link_id.link_uuid.uuid for link_id in topology_ro.link_ids} + if link_uuid in link_uuids: return False # already existed + + topology_rw = Topology() + topology_rw.CopyFrom(topology_ro) + topology_rw.link_ids.add().link_uuid.uuid = link_uuid + context_client.SetTopology(topology_rw) + return True + def get_uuids_of_devices_in_topology( context_client : ContextClient, context_id : ContextId, topology_uuid : str ) -> List[str]: @@ -62,6 +98,14 @@ def get_uuids_of_devices_in_topology( device_uuids = [device_id.device_uuid.uuid for device_id in topology.device_ids] return device_uuids +def get_uuids_of_links_in_topology( + context_client : ContextClient, context_id : ContextId, topology_uuid : str +) -> List[str]: + topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id)) + topology = context_client.GetTopology(topology_id) + link_uuids = [link_id.link_uuid.uuid for link_id in topology.link_ids] + return link_uuids + def get_devices_in_topology( context_client : ContextClient, context_id : ContextId, topology_uuid : str ) -> List[Device]: @@ -75,3 +119,29 @@ def get_devices_in_topology( devices_in_topology.append(device) return devices_in_topology + +def get_links_in_topology( + context_client : ContextClient, context_id : ContextId, topology_uuid : str +) -> List[Link]: + link_uuids = get_uuids_of_links_in_topology(context_client, context_id, topology_uuid) + + all_links = context_client.ListLinks(Empty()) + links_in_topology = list() + for link in all_links.links: + link_uuid = link.link_id.link_uuid.uuid + if link_uuid not in link_uuids: continue + links_in_topology.append(link) + + return links_in_topology + +def device_type_is_datacenter(device_type : Union[str, DeviceTypeEnum]) -> bool: + return device_type in { + DeviceTypeEnum.DATACENTER, DeviceTypeEnum.DATACENTER.value, + DeviceTypeEnum.EMULATED_DATACENTER, DeviceTypeEnum.EMULATED_DATACENTER.value + } + +def device_type_is_network(device_type : Union[str, DeviceTypeEnum]) -> bool: + return device_type in {DeviceTypeEnum.NETWORK, DeviceTypeEnum.NETWORK.value} + +def endpoint_type_is_border(endpoint_type : str) -> bool: + return str(endpoint_type).endswith('/border') diff --git a/src/interdomain/service/topology_abstractor/TopologyAbstractor.py b/src/interdomain/service/topology_abstractor/TopologyAbstractor.py index 6dcee61c7bf4dbae6e717d6c91f688fbd6236fb5..94abbfa52454909a4292af6a2cfec37182667a1e 100644 --- a/src/interdomain/service/topology_abstractor/TopologyAbstractor.py +++ b/src/interdomain/service/topology_abstractor/TopologyAbstractor.py @@ -12,23 +12,33 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, networkx, threading -from typing import List, Optional, Union -from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, DOMAINS_TOPOLOGY_UUID -from common.proto.context_pb2 import ContextEvent, ContextId, DeviceEvent, DeviceId, ServiceId, SliceId, TopologyEvent +import logging, threading +from typing import Dict, Optional, Tuple +from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, INTERDOMAIN_TOPOLOGY_UUID +from common.DeviceTypes import DeviceTypeEnum +from common.proto.context_pb2 import ( + ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPoint, EndPointId, Link, LinkEvent, TopologyId, + TopologyEvent) from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Device import json_device_id +from common.tools.object_factory.Topology import json_topology_id from context.client.ContextClient import ContextClient from context.client.EventsCollector import EventsCollector from dlt.connector.client.DltConnectorClient import DltConnectorClient -from interdomain.service.topology_abstractor.Tools import create_missing_topologies, get_uuids_of_devices_in_topology -from interdomain.service.topology_abstractor.Types import DltRecordIdTypes, EventTypes from .AbstractDevice import AbstractDevice -from .OwnDomainFinder import OwnDomainFinder +from .AbstractLink import AbstractLink +from .DltRecordSender import DltRecordSender +from .Tools import ( + create_context, create_missing_topologies, device_type_is_datacenter, device_type_is_network, + endpoint_type_is_border, get_devices_in_topology, get_links_in_topology, get_uuids_of_devices_in_topology) +from .Types import EventTypes LOGGER = logging.getLogger(__name__) +ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_UUID)) +INTERDOMAIN_TOPOLOGY_ID = TopologyId(**json_topology_id(INTERDOMAIN_TOPOLOGY_UUID, context_id=ADMIN_CONTEXT_ID)) + class TopologyAbstractor(threading.Thread): def __init__(self) -> None: super().__init__(daemon=True) @@ -38,139 +48,238 @@ class TopologyAbstractor(threading.Thread): self.dlt_connector_client = DltConnectorClient() self.context_event_collector = EventsCollector(self.context_client) - self.own_context_id : Optional[ContextId] = None - self.own_domain_filder = OwnDomainFinder() - self.abstract_topology = networkx.Graph() - self.abstract_device = AbstractDevice() + self.real_to_abstract_device_uuid : Dict[str, str] = dict() + self.real_to_abstract_link_uuid : Dict[str, str] = dict() + + self.abstract_device_to_topology_id : Dict[str, TopologyId] = dict() + self.abstract_link_to_topology_id : Dict[str, TopologyId] = dict() + + self.abstract_devices : Dict[str, AbstractDevice] = dict() + self.abstract_links : Dict[Tuple[str,str], AbstractLink] = dict() def stop(self): self.terminate.set() def run(self) -> None: self.context_client.connect() + create_context(self.context_client, DEFAULT_CONTEXT_UUID) + topology_uuids = [DEFAULT_TOPOLOGY_UUID, INTERDOMAIN_TOPOLOGY_UUID] + create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids) + self.dlt_connector_client.connect() self.context_event_collector.start() while not self.terminate.is_set(): event = self.context_event_collector.get_event(timeout=0.1) if event is None: continue - if self.ignore_event(event): continue - # TODO: filter events resulting from abstraction computation - # TODO: filter events resulting from updating remote abstractions + #if self.ignore_event(event): continue LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event))) - dlt_records = self.update_abstraction(event) - self.send_dlt_records(dlt_records) + self.update_abstraction(event) self.context_event_collector.stop() self.context_client.close() self.dlt_connector_client.close() - def ignore_event(self, event : EventTypes) -> List[DltRecordIdTypes]: - if self.own_context_id is None: return False - own_context_uuid = self.own_context_id.context_uuid.uuid + #def ignore_event(self, event : EventTypes) -> List[DltRecordIdTypes]: + # # TODO: filter events resulting from abstraction computation + # # TODO: filter events resulting from updating remote abstractions + # if self.own_context_id is None: return False + # own_context_uuid = self.own_context_id.context_uuid.uuid + # + # if isinstance(event, ContextEvent): + # context_uuid = event.context_id.context_uuid.uuid + # return context_uuid == own_context_uuid + # elif isinstance(event, TopologyEvent): + # context_uuid = event.topology_id.context_id.context_uuid.uuid + # if context_uuid != own_context_uuid: return True + # topology_uuid = event.topology_id.topology_uuid.uuid + # if topology_uuid in {INTERDOMAIN_TOPOLOGY_UUID}: return True + # + # return False - if isinstance(event, ContextEvent): - context_uuid = event.context_id.context_uuid.uuid - return context_uuid == own_context_uuid - elif isinstance(event, TopologyEvent): - context_uuid = event.topology_id.context_id.context_uuid.uuid - if context_uuid != own_context_uuid: return True - topology_uuid = event.topology_id.topology_uuid.uuid - if topology_uuid in {DOMAINS_TOPOLOGY_UUID}: return True - - return False - - def send_dlt_records(self, dlt_records : Union[DltRecordIdTypes, List[DltRecordIdTypes]]) -> None: - for dlt_record_id in dlt_records: - if isinstance(dlt_record_id, DeviceId): - self.dlt_connector_client.RecordDevice(dlt_record_id) - elif isinstance(dlt_record_id, ServiceId): - self.dlt_connector_client.RecordService(dlt_record_id) - elif isinstance(dlt_record_id, SliceId): - self.dlt_connector_client.RecordSlice(dlt_record_id) - else: - LOGGER.error('Unsupported Record({:s})'.format(str(dlt_record_id))) + def _get_or_create_abstract_device( + self, device_uuid : str, device_type : DeviceTypeEnum, dlt_record_sender : DltRecordSender, + abstract_topology_id : TopologyId + ) -> AbstractDevice: + abstract_device = self.abstract_devices.get(device_uuid) + changed = False + if abstract_device is None: + abstract_device = AbstractDevice(device_uuid, device_type) + changed = abstract_device.initialize() + if changed: dlt_record_sender.add_device(abstract_topology_id, abstract_device.device) + self.abstract_devices[device_uuid] = abstract_device + self.abstract_device_to_topology_id[device_uuid] = abstract_topology_id + return abstract_device + + def _update_abstract_device( + self, device : Device, dlt_record_sender : DltRecordSender, abstract_topology_id : TopologyId, + abstract_device_uuid : Optional[str] = None + ) -> None: + device_uuid = device.device_id.device_uuid.uuid + if device_type_is_datacenter(device.device_type): + abstract_device_uuid = device_uuid + abstract_device = self._get_or_create_abstract_device( + device_uuid, DeviceTypeEnum.EMULATED_DATACENTER, dlt_record_sender, abstract_topology_id) + elif device_type_is_network(device.device_type): + LOGGER.warning('device_type is network; not implemented') + return + else: + abstract_device = self._get_or_create_abstract_device( + abstract_device_uuid, DeviceTypeEnum.NETWORK, dlt_record_sender, abstract_topology_id) + self.real_to_abstract_device_uuid[device_uuid] = abstract_device_uuid + changed = abstract_device.update_endpoints(device) + if changed: dlt_record_sender.add_device(abstract_topology_id, abstract_device.device) - def _initialize_context_and_topologies(self) -> None: - if self.own_context_id is not None: return + def _get_or_create_abstract_link( + self, link_uuid : str, dlt_record_sender : DltRecordSender, abstract_topology_id : TopologyId + ) -> AbstractLink: + abstract_link = self.abstract_links.get(link_uuid) + changed = False + if abstract_link is None: + abstract_link = AbstractLink(link_uuid) + changed = abstract_link.initialize() + if changed: dlt_record_sender.add_link(abstract_topology_id, abstract_link.link) + self.abstract_links[link_uuid] = abstract_link + self.abstract_link_to_topology_id[link_uuid] = abstract_topology_id + return abstract_link - own_domain_uuid = self.own_domain_filder.own_domain_uuid - if own_domain_uuid is None: return + def _get_link_endpoint_data(self, endpoint_id : EndPointId) -> Optional[Tuple[AbstractDevice, EndPoint]]: + device_uuid : str = endpoint_id.device_id.device_uuid.uuid + endpoint_uuid : str = endpoint_id.endpoint_uuid.uuid + abstract_device_uuid = self.real_to_abstract_device_uuid.get(device_uuid) + if abstract_device_uuid is None: return None + abstract_device = self.abstract_devices.get(abstract_device_uuid) + if abstract_device is None: return None + endpoint = abstract_device.get_endpoint(device_uuid, endpoint_uuid) + if endpoint is None: return None + return abstract_device, endpoint - # Find own domain UUID and own ContextId - self.own_context_id = ContextId(**json_context_id(own_domain_uuid)) + def _compute_abstract_link(self, link : Link) -> Optional[str]: + if len(link.link_endpoint_ids) != 2: return None - # If "admin" context does not exist, create it; should exist - #if DEFAULT_CONTEXT_UUID not in existing_context_uuids: - # self.context_client.SetContext(Context(**json_context(DEFAULT_CONTEXT_UUID))) - #self.__admin_context_id = ContextId(**json_context_id(DEFAULT_CONTEXT_UUID)) + link_endpoint_data_A = self._get_link_endpoint_data(link.link_endpoint_ids[0]) + if link_endpoint_data_A is None: return None + abstract_device_A, endpoint_A = link_endpoint_data_A + if not endpoint_type_is_border(endpoint_A.endpoint_type): return None - # Create topologies "admin", "domains", and "aggregated" within own context - topology_uuids = [DEFAULT_TOPOLOGY_UUID, DOMAINS_TOPOLOGY_UUID] - create_missing_topologies(self.context_client, self.own_context_id, topology_uuids) + link_endpoint_data_Z = self._get_link_endpoint_data(link.link_endpoint_ids[-1]) + if link_endpoint_data_Z is None: return None + abstract_device_Z, endpoint_Z = link_endpoint_data_Z + if not endpoint_type_is_border(endpoint_Z.endpoint_type): return None - def update_abstraction(self, event : EventTypes) -> List[DltRecordIdTypes]: - dlt_record_ids_with_changes = [] - changed = False + link_uuid = AbstractLink.compose_uuid( + abstract_device_A.uuid, endpoint_A.endpoint_id.endpoint_uuid.uuid, + abstract_device_Z.uuid, endpoint_Z.endpoint_id.endpoint_uuid.uuid + ) - # TODO: identify changes from event and update endpoints accordingly - if event is None: - # just initializing, do nothing - pass + # sort endpoints lexicographically to prevent duplicities + link_endpoint_uuids = sorted([ + (abstract_device_A.uuid, endpoint_A.endpoint_id.endpoint_uuid.uuid), + (abstract_device_Z.uuid, endpoint_Z.endpoint_id.endpoint_uuid.uuid) + ]) - elif isinstance(event, ContextEvent): - context_id = event.context_id - context_uuid = context_id.context_uuid.uuid - if (context_uuid != DEFAULT_CONTEXT_UUID) and (self.own_context_id is None): - self._initialize_context_and_topologies() - - own_domain_uuid = self.own_domain_filder.own_domain_uuid - - if self.abstract_topology.has_node(own_domain_uuid): - abstract_device = self.abstract_topology.nodes[own_domain_uuid]['obj'] - else: - abstract_device = AbstractDevice() - self.abstract_topology.add_node(own_domain_uuid, obj=abstract_device) - - # if already initialized, does nothing and returns False - # if own context UUID cannot be identified, does nothing and returns None - # if own context UUID be identified, initialized the abstract device and returns True - _changed = abstract_device.initialize() - if _changed is None: return dlt_record_ids_with_changes - changed = changed or _changed - else: - LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event))) + return link_uuid, link_endpoint_uuids + + def _update_abstract_link( + self, link : Link, dlt_record_sender : DltRecordSender, abstract_topology_id : TopologyId + ) -> None: + abstract_link_specs = self._compute_abstract_link(link) + if abstract_link_specs is None: return + abstract_link_uuid, link_endpoint_uuids = abstract_link_specs + + abstract_link = self._get_or_create_abstract_link(abstract_link_uuid, dlt_record_sender, abstract_topology_id) + link_uuid = link.link_id.link_uuid.uuid + self.real_to_abstract_link_uuid[link_uuid] = abstract_link_uuid + changed = abstract_link.update_endpoints(link_endpoint_uuids) + if changed: dlt_record_sender.add_link(abstract_topology_id, abstract_link.link) + + def _infer_abstract_links(self, device : Device, dlt_record_sender : DltRecordSender) -> None: + device_uuid = device.device_id.device_uuid.uuid + + interdomain_device_uuids = get_uuids_of_devices_in_topology( + self.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_UUID) + + for endpoint in device.device_endpoints: + if not endpoint_type_is_border(endpoint.endpoint_type): continue + endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid + + abstract_link_uuid = AbstractLink.compose_uuid(device_uuid, endpoint_uuid, endpoint_uuid, device_uuid) + if abstract_link_uuid in self.abstract_links: continue + + if endpoint_uuid not in interdomain_device_uuids: continue + remote_device = self.context_client.GetDevice(DeviceId(**json_device_id(endpoint_uuid))) + remote_device_border_endpoint_uuids = { + endpoint.endpoint_id.endpoint_uuid.uuid : endpoint.endpoint_type + for endpoint in remote_device.device_endpoints + if endpoint_type_is_border(endpoint.endpoint_type) + } + if device_uuid not in remote_device_border_endpoint_uuids: continue + + link_endpoint_uuids = sorted([(device_uuid, endpoint_uuid), (endpoint_uuid, device_uuid)]) + + abstract_link = self._get_or_create_abstract_link( + abstract_link_uuid, dlt_record_sender, INTERDOMAIN_TOPOLOGY_ID) + changed = abstract_link.update_endpoints(link_endpoint_uuids) + if changed: dlt_record_sender.add_link(INTERDOMAIN_TOPOLOGY_ID, abstract_link.link) + + def update_abstraction(self, event : EventTypes) -> None: + dlt_record_sender = DltRecordSender(self.context_client, self.dlt_connector_client) + + if isinstance(event, ContextEvent): + LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event))) elif isinstance(event, TopologyEvent): topology_id = event.topology_id topology_uuid = topology_id.topology_uuid.uuid context_id = topology_id.context_id context_uuid = context_id.context_uuid.uuid - if (context_uuid == self.own_domain_filder.own_domain_uuid) and (topology_uuid == DEFAULT_TOPOLOGY_UUID): - topology = self.context_client.GetTopology(event.topology_id) - for device_id in topology.device_ids: - device = self.context_client.GetDevice(device_id) - _changed = self.abstract_device.update_abstract_device_endpoints(device) - changed = changed or _changed + topology_uuids = {DEFAULT_TOPOLOGY_UUID, INTERDOMAIN_TOPOLOGY_UUID} + if (context_uuid == DEFAULT_CONTEXT_UUID) and (topology_uuid not in topology_uuids): + abstract_topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=ADMIN_CONTEXT_ID)) + self._get_or_create_abstract_device( + topology_uuid, DeviceTypeEnum.NETWORK, dlt_record_sender, abstract_topology_id) + + devices = get_devices_in_topology(self.context_client, context_id, topology_uuid) + for device in devices: + self._update_abstract_device( + device, dlt_record_sender, abstract_topology_id, abstract_device_uuid=topology_uuid) + + links = get_links_in_topology(self.context_client, context_id, topology_uuid) + for link in links: + self._update_abstract_link(link, dlt_record_sender, abstract_topology_id) + + for device in devices: + self._infer_abstract_links(device, dlt_record_sender) + else: LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event))) elif isinstance(event, DeviceEvent): - admin_topology_device_uuids = get_uuids_of_devices_in_topology( - self.context_client, self.own_context_id, DEFAULT_TOPOLOGY_UUID) - device_uuid = event.device_id.device_uuid.uuid - if device_uuid in admin_topology_device_uuids: - device = self.context_client.GetDevice(event.device_id) - _changed = self.abstract_device.update_abstract_device_endpoints(device) - changed = changed or _changed + device_id = event.device_id + device_uuid = device_id.device_uuid.uuid + abstract_device_uuid = self.real_to_abstract_device_uuid.get(device_uuid) + device = self.context_client.GetDevice(device_id) + if abstract_device_uuid is None: + LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event))) else: + abstract_topology_id = self.abstract_device_to_topology_id[abstract_device_uuid] + self._update_abstract_device( + device, dlt_record_sender, abstract_topology_id, abstract_device_uuid=abstract_device_uuid) + + self._infer_abstract_links(device, dlt_record_sender) + + elif isinstance(event, LinkEvent): + link_id = event.link_id + link_uuid = link_id.link_uuid.uuid + abstract_link_uuid = self.real_to_abstract_link_uuid.get(link_uuid) + if abstract_link_uuid is None: LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event))) + else: + abstract_topology_id = self.abstract_link_to_topology_id[abstract_link_uuid] + link = self.context_client.GetLink(link_id) + self._update_abstract_link(link, dlt_record_sender, abstract_topology_id) else: LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event))) - if changed: - self.context_client.SetDevice(self.abstract_device.own_abstract_device) - dlt_record_ids_with_changes.append(self.abstract_device.own_abstract_device_id) - - return dlt_record_ids_with_changes + dlt_record_sender.commit() diff --git a/src/interdomain/service/topology_abstractor/Types.py b/src/interdomain/service/topology_abstractor/Types.py index 339ff0dba62b1a2e4a057ae3cff41a3db376d9d4..f6a0fa7a1d7a564045b6e850c2b46cf313da52b7 100644 --- a/src/interdomain/service/topology_abstractor/Types.py +++ b/src/interdomain/service/topology_abstractor/Types.py @@ -14,10 +14,12 @@ from typing import Union from common.proto.context_pb2 import ( - ConnectionEvent, ContextEvent, DeviceEvent, DeviceId, LinkEvent, ServiceEvent, ServiceId, SliceEvent, SliceId, - TopologyEvent) + ConnectionEvent, ContextEvent, Device, DeviceEvent, DeviceId, Link, LinkEvent, LinkId, Service, ServiceEvent, + ServiceId, Slice, SliceEvent, SliceId, TopologyEvent) + +DltRecordIdTypes = Union[DeviceId, LinkId, SliceId, ServiceId] +DltRecordTypes = Union[Device, Link, Slice, Service] -DltRecordIdTypes = Union[DeviceId, SliceId, ServiceId] EventTypes = Union[ ContextEvent, TopologyEvent, DeviceEvent, LinkEvent, ServiceEvent, SliceEvent, ConnectionEvent ]