diff --git a/src/interdomain/service/topology_abstractor/AbstractDevice.py b/src/interdomain/service/topology_abstractor/AbstractDevice.py index 47832acc02c78b5cfc095fb3ecceccfb6b9a774f..3341dec323311e66fc95917fb21a9eaab87991e8 100644 --- a/src/interdomain/service/topology_abstractor/AbstractDevice.py +++ b/src/interdomain/service/topology_abstractor/AbstractDevice.py @@ -16,78 +16,81 @@ import copy, logging from typing import Dict, Optional from common.Constants import DEFAULT_CONTEXT_NAME, INTERDOMAIN_TOPOLOGY_NAME from common.DeviceTypes import DeviceTypeEnum -from common.proto.context_pb2 import ( - ContextId, Device, DeviceDriverEnum, DeviceId, DeviceOperationalStatusEnum, EndPoint) +from common.proto.context_pb2 import Device, DeviceDriverEnum, DeviceOperationalStatusEnum, EndPoint from common.tools.context_queries.CheckType import ( device_type_is_datacenter, device_type_is_network, endpoint_type_is_border) -from common.tools.context_queries.Device import add_device_to_topology, get_existing_device_uuids -from common.tools.object_factory.Context import json_context_id -from common.tools.object_factory.Device import json_device, json_device_id +from common.tools.context_queries.Device import get_device +from common.tools.grpc.Tools import grpc_message_to_json +from common.tools.object_factory.Device import json_device from context.client.ContextClient import ContextClient -from context.service.database.uuids.EndPoint import endpoint_get_uuid +from .Tools import replace_device_uuids_by_names + +# Remark on UUIDs: +# TopologyAbstractor, AbstractDevice and AbstractLink are used +# to compose network reporesentations to be forwarded to remote +# instances. Constraining it to use UUIDs is pointless given +# these UUIDs, to be unique, need to be bound to the local +# context/topology UUIDs, which might be different than that for +# the remote TeraFlowSDN instances. For this very reason, we use +# the device/link/endpoint/topology/context names as UUIDs, to +# prevent UUID-related issues. LOGGER = logging.getLogger(__name__) class AbstractDevice: - def __init__(self, device_uuid : str, device_name : str, device_type : DeviceTypeEnum): + def __init__(self, device_name : str, device_type : DeviceTypeEnum): self.__context_client = ContextClient() - self.__device_uuid : str = device_uuid self.__device_name : str = device_name self.__device_type : DeviceTypeEnum = device_type self.__device : Optional[Device] = None - self.__device_id : Optional[DeviceId] = None - # Dict[device_uuid, Dict[endpoint_uuid, abstract EndPoint]] + # Dict[device_uuid, Dict[endpoint_name, abstract EndPoint]] self.__device_endpoint_to_abstract : Dict[str, Dict[str, EndPoint]] = dict() - # Dict[endpoint_uuid, device_uuid] + # Dict[endpoint_name, device_name] self.__abstract_endpoint_to_device : Dict[str, str] = dict() def to_json(self) -> Dict: return { - 'device_uuid' : self.__device_uuid, 'device_name' : self.__device_name, 'device_type' : self.__device_type, - 'device' : self.__device, - 'device_id' : self.__device_id, + 'device' : grpc_message_to_json(self.__device), 'device_endpoint_to_abstract' : self.__device_endpoint_to_abstract, 'abstract_endpoint_to_device' : self.__abstract_endpoint_to_device, } - @property - def uuid(self) -> str: return self.__device_uuid - @property def name(self) -> str: return self.__device_name - @property - def device_id(self) -> Optional[DeviceId]: return self.__device_id - @property def device(self) -> Optional[Device]: return self.__device - def get_endpoint(self, device_uuid : str, endpoint_uuid : str) -> Optional[EndPoint]: - return self.__device_endpoint_to_abstract.get(device_uuid, {}).get(endpoint_uuid) + def get_endpoint(self, device_name : str, endpoint_name : str) -> Optional[EndPoint]: + return self.__device_endpoint_to_abstract.get(device_name, {}).get(endpoint_name) def initialize(self) -> bool: if self.__device is not None: return False - existing_device_uuids = get_existing_device_uuids(self.__context_client) - create_abstract_device = self.__device_uuid not in existing_device_uuids + local_interdomain_device = get_device( + self.__context_client, self.__device_name, rw_copy=True, include_endpoints=True, + include_config_rules=False, include_components=False) + create_abstract_device = local_interdomain_device is None if create_abstract_device: self._create_empty() else: - self._load_existing() - - is_datacenter = device_type_is_datacenter(self.__device_type) - is_network = device_type_is_network(self.__device_type) - if is_datacenter or is_network: - # Add abstract device to topologies [INTERDOMAIN_TOPOLOGY_NAME] - context_id = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) - topology_uuids = [INTERDOMAIN_TOPOLOGY_NAME] - for topology_uuid in topology_uuids: - add_device_to_topology(self.__context_client, context_id, topology_uuid, self.__device_uuid) + self._load_existing(local_interdomain_device) + + #is_datacenter = device_type_is_datacenter(self.__device_type) + #is_network = device_type_is_network(self.__device_type) + #if is_datacenter or is_network: + # # Add abstract device to topologies [INTERDOMAIN_TOPOLOGY_NAME] + # admin_context_id = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) + # topology_uuids = [INTERDOMAIN_TOPOLOGY_NAME] + # for topology_uuid in topology_uuids: + # # This action is done automatically; commented out by now. + # add_device_to_topology( + # self.__context_client, admin_context_id, topology_uuid, self.__device_name) # seems not needed; to be removed in future releases #if is_datacenter and create_abstract_device: @@ -101,27 +104,25 @@ class AbstractDevice: # if device_type_is_datacenter(device.device_type): continue # self.update_endpoints(device) - return True + return create_abstract_device def _create_empty(self) -> None: - device_uuid = self.__device_uuid - + device_name = self.__device_name device = Device(**json_device( - device_uuid, self.__device_type.value, DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED, - name=self.__device_name, endpoints=[], config_rules=[], drivers=[DeviceDriverEnum.DEVICEDRIVER_UNDEFINED] + device_name, self.__device_type.value, DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED, + name=device_name, endpoints=[], config_rules=[], drivers=[DeviceDriverEnum.DEVICEDRIVER_UNDEFINED] )) self.__context_client.SetDevice(device) - self.__device = device - self.__device_id = self.__device.device_id + self.__device = device # Store copy with names as UUIDs - def _load_existing(self) -> None: + def _load_existing(self, local_interdomain_device : Device) -> None: self.__device_endpoint_to_abstract = dict() self.__abstract_endpoint_to_device = dict() - self.__device_id = DeviceId(**json_device_id(self.__device_uuid)) - self.__device = self.__context_client.GetDevice(self.__device_id) + replace_device_uuids_by_names(self.__context_client, local_interdomain_device) + + self.__device = local_interdomain_device self.__device_type = self.__device.device_type - device_uuid = self.__device_id.device_uuid.uuid device_type = self.__device_type is_datacenter = device_type_is_datacenter(device_type) @@ -132,67 +133,63 @@ class AbstractDevice: # for each endpoint in abstract device, populate internal data structures and mappings for interdomain_endpoint in self.__device.device_endpoints: - endpoint_uuid : str = interdomain_endpoint.endpoint_id.endpoint_uuid.uuid + endpoint_name : str = interdomain_endpoint.name if is_network: - endpoint_uuid,device_uuid = endpoint_uuid.split('@', maxsplit=1) + endpoint_name,device_name = endpoint_name.split('@', maxsplit=1) + else: + device_name = self.__device_name self.__device_endpoint_to_abstract\ - .setdefault(device_uuid, {}).setdefault(endpoint_uuid, interdomain_endpoint) + .setdefault(device_name, {}).setdefault(endpoint_name, interdomain_endpoint) self.__abstract_endpoint_to_device\ - .setdefault(endpoint_uuid, device_uuid) - - def _update_endpoint_name(self, device_uuid : str, endpoint_uuid : str, endpoint_name : str) -> bool: - device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_uuid, {}) - interdomain_endpoint = device_endpoint_to_abstract.get(endpoint_uuid) - interdomain_endpoint_name = interdomain_endpoint.name - if endpoint_name == interdomain_endpoint_name: return False - interdomain_endpoint.name = endpoint_name - return True - - def _update_endpoint_type(self, device_uuid : str, endpoint_uuid : str, endpoint_type : str) -> bool: - device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_uuid, {}) - interdomain_endpoint = device_endpoint_to_abstract.get(endpoint_uuid) + .setdefault(endpoint_name, device_name) + + # This method becomes useless; the endpoint name is considered a primary key; cannot be updated + #def _update_endpoint_name(self, device_uuid : str, endpoint_uuid : str, endpoint_name : str) -> bool: + # device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_uuid, {}) + # interdomain_endpoint = device_endpoint_to_abstract.get(endpoint_uuid) + # interdomain_endpoint_name = interdomain_endpoint.name + # if endpoint_name == interdomain_endpoint_name: return False + # interdomain_endpoint.name = endpoint_name + # return True + + def _update_endpoint_type(self, device_name : str, endpoint_name : str, endpoint_type : str) -> bool: + device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_name, {}) + interdomain_endpoint = device_endpoint_to_abstract.get(endpoint_name) interdomain_endpoint_type = interdomain_endpoint.endpoint_type if endpoint_type == interdomain_endpoint_type: return False interdomain_endpoint.endpoint_type = endpoint_type return True - def _add_endpoint( - self, device_uuid : str, endpoint_uuid : str, endpoint_name : str, endpoint_type : str - ) -> EndPoint: + def _add_endpoint(self, device_name : str, endpoint_name : str, endpoint_type : str) -> EndPoint: interdomain_endpoint = self.__device.device_endpoints.add() interdomain_endpoint.endpoint_id.topology_id.topology_uuid.uuid = INTERDOMAIN_TOPOLOGY_NAME interdomain_endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME - interdomain_endpoint.endpoint_id.device_id.CopyFrom(self.__device_id) + interdomain_endpoint.endpoint_id.device_id.device_uuid.uuid = self.__device.name interdomain_endpoint.endpoint_id.endpoint_uuid.uuid = endpoint_name interdomain_endpoint.name = endpoint_name interdomain_endpoint.endpoint_type = endpoint_type - uuids = endpoint_get_uuid(interdomain_endpoint.endpoint_id, endpoint_name=endpoint_name, allow_random=False) - _, _, interdomain_endpoint_uuid = uuids - self.__device_endpoint_to_abstract\ - .setdefault(device_uuid, {}).setdefault(endpoint_uuid, interdomain_endpoint) + .setdefault(device_name, {}).setdefault(endpoint_name, interdomain_endpoint) self.__abstract_endpoint_to_device\ - .setdefault(interdomain_endpoint_uuid, device_uuid) + .setdefault(endpoint_name, device_name) return interdomain_endpoint - def _remove_endpoint( - self, device_uuid : str, endpoint_uuid : str, interdomain_endpoint : EndPoint - ) -> None: - self.__abstract_endpoint_to_device.pop(endpoint_uuid, None) - device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_uuid, {}) - device_endpoint_to_abstract.pop(endpoint_uuid, None) + def _remove_endpoint(self, device_name : str, endpoint_name : str, interdomain_endpoint : EndPoint) -> None: + self.__abstract_endpoint_to_device.pop(endpoint_name, None) + device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_name, {}) + device_endpoint_to_abstract.pop(endpoint_name, None) self.__device.device_endpoints.remove(interdomain_endpoint) def update_endpoints(self, device : Device) -> bool: if device_type_is_datacenter(self.__device.device_type): return False - device_uuid = device.device_id.device_uuid.uuid - device_border_endpoint_uuids = { - endpoint.endpoint_id.endpoint_uuid.uuid : (endpoint.name, endpoint.endpoint_type) + device_name = device.name + device_border_endpoint_names = { + endpoint.name : endpoint.endpoint_type for endpoint in device.device_endpoints if endpoint_type_is_border(endpoint.endpoint_type) } @@ -200,24 +197,27 @@ class AbstractDevice: updated = False # for each border endpoint in abstract device that is not in device; remove from abstract device - device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_uuid, {}) + device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_name, {}) _device_endpoint_to_abstract = copy.deepcopy(device_endpoint_to_abstract) - for endpoint_uuid, interdomain_endpoint in _device_endpoint_to_abstract.items(): - if endpoint_uuid in device_border_endpoint_uuids: continue + for endpoint_name, interdomain_endpoint in _device_endpoint_to_abstract.items(): + if endpoint_name in device_border_endpoint_names: continue # remove interdomain endpoint that is not in device - self._remove_endpoint(device_uuid, endpoint_uuid, interdomain_endpoint) + self._remove_endpoint(device_name, endpoint_name, interdomain_endpoint) updated = True # for each border endpoint in device that is not in abstract device; add to abstract device - for endpoint_uuid,(endpoint_name, endpoint_type) in device_border_endpoint_uuids.items(): - # if already added; just check endpoint name and type are not modified - if endpoint_uuid in self.__abstract_endpoint_to_device: - updated = updated or self._update_endpoint_name(device_uuid, endpoint_uuid, endpoint_name) - updated = updated or self._update_endpoint_type(device_uuid, endpoint_uuid, endpoint_type) + for endpoint_name,endpoint_type in device_border_endpoint_names.items(): + abstract_endpoint = self.__device_endpoint_to_abstract.get(device_name, {}).get(endpoint_name) + abstract_endpoint_name = None if abstract_endpoint is None else abstract_endpoint.name + + # if already added; just check endpoint type is not modified + if abstract_endpoint_name in self.__abstract_endpoint_to_device: + #updated = updated or self._update_endpoint_name(device_name, endpoint_name, endpoint_name) + updated = updated or self._update_endpoint_type(device_name, endpoint_name, endpoint_type) continue # otherwise, add it to the abstract device - self._add_endpoint(device_uuid, endpoint_uuid, endpoint_name, endpoint_type) + self._add_endpoint(device_name, endpoint_name, endpoint_type) updated = True return updated diff --git a/src/interdomain/service/topology_abstractor/AbstractLink.py b/src/interdomain/service/topology_abstractor/AbstractLink.py index 76b2a0311b2213d35c1b5461e06c324f9304b934..baeb0f94dcba330ea4d815ba811bd6addad4e5d4 100644 --- a/src/interdomain/service/topology_abstractor/AbstractLink.py +++ b/src/interdomain/service/topology_abstractor/AbstractLink.py @@ -15,122 +15,129 @@ import copy, logging from typing import Dict, List, Optional, Tuple from common.Constants import DEFAULT_CONTEXT_NAME, INTERDOMAIN_TOPOLOGY_NAME -from common.proto.context_pb2 import ContextId, EndPointId, Link, LinkId -from common.tools.context_queries.Link import add_link_to_topology, get_existing_link_uuids -from common.tools.object_factory.Context import json_context_id -from common.tools.object_factory.Link import json_link, json_link_id +from common.proto.context_pb2 import EndPointId, Link +from common.tools.context_queries.Link import get_link +from common.tools.object_factory.Link import json_link from context.client.ContextClient import ContextClient +from .Tools import replace_link_uuids_by_names + +# Remark on UUIDs: +# TopologyAbstractor, AbstractDevice and AbstractLink are used +# to compose network reporesentations to be forwarded to remote +# instances. Constraining it to use UUIDs is pointless given +# these UUIDs, to be unique, need to be bound to the local +# context/topology UUIDs, which might be different than that for +# the remote TeraFlowSDN instances. For this very reason, we use +# the device/link/endpoint/topology/context names as UUIDs, to +# prevent UUID-related issues. LOGGER = logging.getLogger(__name__) class AbstractLink: - def __init__(self, link_uuid : str): + def __init__(self, link_name : str): self.__context_client = ContextClient() - self.__link_uuid : str = link_uuid + self.__link_name : str = link_name self.__link : Optional[Link] = None - self.__link_id : Optional[LinkId] = None - # Dict[(device_uuid, endpoint_uuid), abstract EndPointId] + # Dict[(device_name, endpoint_name), abstract EndPointId] self.__device_endpoint_to_abstract : Dict[Tuple[str, str], EndPointId] = dict() def to_json(self) -> Dict: return { - 'link_uuid' : self.__link_uuid, + 'link_name' : self.__link_name, 'link' : self.__link, - 'link_id' : self.__link_id, 'device_endpoint_to_abstract' : self.__device_endpoint_to_abstract, } @property - def uuid(self) -> str: return self.__link_uuid - - @property - def link_id(self) -> Optional[LinkId]: return self.__link_id + def name(self) -> str: return self.__link_name @property def link(self) -> Optional[Link]: return self.__link @staticmethod - def compose_uuid( - device_uuid_a : str, endpoint_uuid_a : str, device_uuid_z : str, endpoint_uuid_z : str + def compose_name( + device_name_a : str, endpoint_name_a : str, device_name_z : str, endpoint_name_z : str ) -> str: # sort endpoints lexicographically to prevent duplicities - link_endpoint_uuids = sorted([ - (device_uuid_a, endpoint_uuid_a), - (device_uuid_z, endpoint_uuid_z) + link_endpoint_names = sorted([ + (device_name_a, endpoint_name_a), + (device_name_z, endpoint_name_z) ]) - link_uuid = '{:s}/{:s}=={:s}/{:s}'.format( - link_endpoint_uuids[0][0], link_endpoint_uuids[0][1], - link_endpoint_uuids[1][0], link_endpoint_uuids[1][1]) - return link_uuid + link_name = '{:s}/{:s}=={:s}/{:s}'.format( + link_endpoint_names[0][0], link_endpoint_names[0][1], + link_endpoint_names[1][0], link_endpoint_names[1][1]) + return link_name def initialize(self) -> bool: if self.__link is not None: return False - existing_link_uuids = get_existing_link_uuids(self.__context_client) + local_interdomain_link = get_link(self.__context_client, self.__link_name, rw_copy=False) + create_abstract_link = local_interdomain_link is None - create = self.__link_uuid not in existing_link_uuids - if create: + if create_abstract_link: self._create_empty() else: - self._load_existing() + self._load_existing(local_interdomain_link) - # Add abstract link to topologies [INTERDOMAIN_TOPOLOGY_NAME] - context_id = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) - topology_uuids = [INTERDOMAIN_TOPOLOGY_NAME] - for topology_uuid in topology_uuids: - add_link_to_topology(self.__context_client, context_id, topology_uuid, self.__link_uuid) + ## Add abstract link to topologies [INTERDOMAIN_TOPOLOGY_NAME] + #admin_context_id = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) + #topology_uuids = [INTERDOMAIN_TOPOLOGY_NAME] + #for topology_uuid in topology_uuids: + # # This action is done automatically; commented out by now. + # add_link_to_topology( + # self.__context_client, admin_context_id, topology_uuid, self.__link_name) - return create + return create_abstract_link def _create_empty(self) -> None: - link = Link(**json_link(self.__link_uuid, endpoint_ids=[])) + link_name = self.__link_name + link = Link(**json_link(link_name, name=link_name, endpoint_ids=[])) self.__context_client.SetLink(link) - self.__link = link - self.__link_id = self.__link.link_id - - def _load_existing(self) -> None: - self.__link_id = LinkId(**json_link_id(self.__link_uuid)) - self.__link = self.__context_client.GetLink(self.__link_id) + self.__link = link # Store copy with names as UUIDs + def _load_existing(self, local_interdomain_link : Link) -> None: self.__device_endpoint_to_abstract = dict() + self.__link = local_interdomain_link + replace_link_uuids_by_names(self.__context_client, local_interdomain_link) + # for each endpoint in abstract link, populate internal data structures and mappings for endpoint_id in self.__link.link_endpoint_ids: - device_uuid : str = endpoint_id.device_id.device_uuid.uuid - endpoint_uuid : str = endpoint_id.endpoint_uuid.uuid - self.__device_endpoint_to_abstract.setdefault((device_uuid, endpoint_uuid), endpoint_id) + device_name = endpoint_id.device_id.device_uuid.uuid + endpoint_name = endpoint_id.endpoint_uuid.uuid + self.__device_endpoint_to_abstract.setdefault((device_name, endpoint_name), endpoint_id) - def _add_endpoint(self, device_uuid : str, endpoint_uuid : str) -> None: + def _add_endpoint(self, device_name : str, endpoint_name : str) -> None: endpoint_id = self.__link.link_endpoint_ids.add() endpoint_id.topology_id.topology_uuid.uuid = INTERDOMAIN_TOPOLOGY_NAME endpoint_id.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME - endpoint_id.device_id.device_uuid.uuid = device_uuid - endpoint_id.endpoint_uuid.uuid = endpoint_uuid - self.__device_endpoint_to_abstract.setdefault((device_uuid, endpoint_uuid), endpoint_id) + endpoint_id.device_id.device_uuid.uuid = device_name + endpoint_id.endpoint_uuid.uuid = endpoint_name + self.__device_endpoint_to_abstract.setdefault((device_name, endpoint_name), endpoint_id) - def _remove_endpoint(self, device_uuid : str, endpoint_uuid : str) -> None: - device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_uuid, {}) - endpoint_id = device_endpoint_to_abstract.pop(endpoint_uuid, None) + def _remove_endpoint(self, device_name : str, endpoint_name : str) -> None: + device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_name, {}) + endpoint_id = device_endpoint_to_abstract.pop(endpoint_name, None) if endpoint_id is not None: self.__link.link_endpoint_ids.remove(endpoint_id) - def update_endpoints(self, link_endpoint_uuids : List[Tuple[str, str]] = []) -> bool: + def update_endpoints(self, link_endpoint_names : List[Tuple[str, str]] = []) -> bool: updated = False # for each endpoint in abstract link that is not in link; remove from abstract link device_endpoint_to_abstract = copy.deepcopy(self.__device_endpoint_to_abstract) - for device_uuid, endpoint_uuid in device_endpoint_to_abstract.keys(): - if (device_uuid, endpoint_uuid) in link_endpoint_uuids: continue + for device_name, endpoint_name in device_endpoint_to_abstract.keys(): + if (device_name, endpoint_name) in link_endpoint_names: continue # remove endpoint_id that is not in link - self._remove_endpoint(device_uuid, endpoint_uuid) + self._remove_endpoint(device_name, endpoint_name) updated = True # for each endpoint in link that is not in abstract link; add to abstract link - for device_uuid, endpoint_uuid in link_endpoint_uuids: + for device_name, endpoint_name in link_endpoint_names: # if already added; just check endpoint type is not modified - if (device_uuid, endpoint_uuid) in self.__device_endpoint_to_abstract: continue + if (device_name, endpoint_name) in self.__device_endpoint_to_abstract: continue # otherwise, add it to the abstract device - self._add_endpoint(device_uuid, endpoint_uuid) + self._add_endpoint(device_name, endpoint_name) updated = True return updated diff --git a/src/interdomain/service/topology_abstractor/Tools.py b/src/interdomain/service/topology_abstractor/Tools.py new file mode 100644 index 0000000000000000000000000000000000000000..e3d640f3cc0150a46c5d122ed822268017806493 --- /dev/null +++ b/src/interdomain/service/topology_abstractor/Tools.py @@ -0,0 +1,86 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, Tuple +from common.proto.context_pb2 import Device, Link +from common.tools.context_queries.Context import get_context +from common.tools.context_queries.EndPoint import get_endpoint_names +from common.tools.context_queries.Topology import get_topology +from context.client.ContextClient import ContextClient + +def get_context_name( + context_client : ContextClient, context_names_cache : Dict[str, str], context_uuid : str +) -> str: + if context_uuid not in context_names_cache: + context = get_context(context_client, context_uuid) + context_name = context.name + context_names_cache[context_uuid] = context_name + context_names_cache[context_name] = context_name + else: + context_name = context_names_cache[context_uuid] + return context_name + +def get_topology_name( + context_client : ContextClient, topology_names_cache : Dict[Tuple[str, str], str], + context_name : str, topology_uuid : str +) -> str: + topology_key = (context_name, topology_uuid) + if topology_key not in topology_names_cache: + topology = get_topology(context_client, topology_uuid, context_uuid=context_name) + topology_name = topology.name + topology_names_cache[(context_name, topology_uuid)] = topology_name + topology_names_cache[(context_name, topology_name)] = topology_name + else: + topology_name = topology_names_cache[topology_key] + return topology_name + +def replace_device_uuids_by_names( + context_client : ContextClient, device : Device, context_names_cache : Dict[str, str] = dict(), + topology_names_cache : Dict[Tuple[str, str], str] = dict() +) -> None: + device_name = device.name + device.device_id.device_uuid.uuid = device_name + + for endpoint in device.device_endpoints: + endpoint.endpoint_id.endpoint_uuid.uuid = endpoint.name + endpoint.endpoint_id.device_id.device_uuid.uuid = device_name + + context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid + context_name = get_context_name(context_client, context_names_cache, context_uuid) + endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid = context_name + + topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid + topology_name = get_topology_name(context_client, topology_names_cache, context_name, topology_uuid) + endpoint.endpoint_id.topology_id.topology_uuid.uuid = topology_name + +def replace_link_uuids_by_names( + context_client : ContextClient, link : Link, context_names_cache : Dict[str, str] = dict(), + topology_names_cache : Dict[Tuple[str, str], str] = dict() +) -> None: + link.link_id.link_uuid.uuid = link.name + + device_names, endpoints_data = get_endpoint_names(context_client, link.link_endpoint_ids) + + # for each endpoint in abstract link, populate internal data structures and mappings + for endpoint_id in link.link_endpoint_ids: + endpoint_id.device_id.device_uuid.uuid = device_names[endpoint_id.device_id.device_uuid.uuid] + endpoint_id.endpoint_uuid.uuid = endpoints_data[endpoint_id.endpoint_uuid.uuid][0] + + context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid + context_name = get_context_name(context_client, context_names_cache, context_uuid) + endpoint_id.topology_id.context_id.context_uuid.uuid = context_name + + topology_uuid = endpoint_id.topology_id.topology_uuid.uuid + topology_name = get_topology_name(context_client, topology_names_cache, context_name, topology_uuid) + endpoint_id.topology_id.topology_uuid.uuid = topology_name diff --git a/src/interdomain/service/topology_abstractor/TopologyAbstractor.py b/src/interdomain/service/topology_abstractor/TopologyAbstractor.py index 0d9faa0408fe77dceaf5652b144590f9beb4a88d..eddbaab440b7efeb59966b920cf7f674edb944c7 100644 --- a/src/interdomain/service/topology_abstractor/TopologyAbstractor.py +++ b/src/interdomain/service/topology_abstractor/TopologyAbstractor.py @@ -13,23 +13,21 @@ # limitations under the License. import logging, threading -from typing import Dict, Optional, Tuple +from typing import Dict, List, Optional, Tuple from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum from common.DeviceTypes import DeviceTypeEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name) from common.proto.context_pb2 import ( - ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPoint, EndPointId, Link, LinkEvent, TopologyId, - TopologyEvent) + ContextId, Device, DeviceEvent, EndPoint, EndPointId, Link, LinkEvent, TopologyId, TopologyEvent) from common.tools.context_queries.CheckType import ( device_type_is_datacenter, device_type_is_network, endpoint_type_is_border) -from common.tools.context_queries.Context import create_context -from common.tools.context_queries.Device import get_uuids_of_devices_in_topology #, get_devices_in_topology -#from common.tools.context_queries.Link import get_links_in_topology -from common.tools.context_queries.Topology import create_missing_topologies +from common.tools.context_queries.Context import create_context, get_context +from common.tools.context_queries.Device import get_device, get_devices_in_topology +from common.tools.context_queries.Link import get_link +from common.tools.context_queries.Topology import create_missing_topologies, get_topology_details from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Context import json_context_id -from common.tools.object_factory.Device import json_device_id from common.tools.object_factory.Topology import json_topology_id from context.client.ContextClient import ContextClient from context.client.EventsCollector import EventsCollector @@ -37,8 +35,19 @@ from dlt.connector.client.DltConnectorClient import DltConnectorClient from .AbstractDevice import AbstractDevice from .AbstractLink import AbstractLink from .DltRecordSender import DltRecordSender +from .Tools import replace_device_uuids_by_names, replace_link_uuids_by_names from .Types import EventTypes +# Remark on UUIDs: +# TopologyAbstractor, AbstractDevice and AbstractLink are used +# to compose network reporesentations to be forwarded to remote +# instances. Constraining it to use UUIDs is pointless given +# these UUIDs, to be unique, need to be bound to the local +# context/topology UUIDs, which might be different than that for +# the remote TeraFlowSDN instances. For this very reason, we use +# the device/link/endpoint/topology/context names as UUIDs, to +# prevent UUID-related issues. + LOGGER = logging.getLogger(__name__) ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) @@ -50,10 +59,14 @@ class TopologyAbstractor(threading.Thread): self.terminate = threading.Event() self.context_client = ContextClient() - self.context_event_collector = EventsCollector(self.context_client) + self.context_event_collector = EventsCollector( + self.context_client, activate_context_collector=False, activate_topology_collector=True, + activate_device_collector=True, activate_link_collector=True, activate_service_collector=False, + activate_slice_collector=False, activate_connection_collector=False + ) - self.real_to_abstract_device_uuid : Dict[str, str] = dict() - self.real_to_abstract_link_uuid : Dict[str, str] = dict() + self.real_to_abstract_device : Dict[str, str] = dict() + self.real_to_abstract_link : Dict[str, str] = dict() self.abstract_device_to_topology_id : Dict[str, TopologyId] = dict() self.abstract_link_to_topology_id : Dict[str, TopologyId] = dict() @@ -61,17 +74,22 @@ class TopologyAbstractor(threading.Thread): self.abstract_devices : Dict[str, AbstractDevice] = dict() self.abstract_links : Dict[Tuple[str,str], AbstractLink] = dict() + self.context_names_cache : Dict[str, str] = dict() + self.topology_names_cache : Dict[Tuple[str, str], str] = dict() + def stop(self): self.terminate.set() def run(self) -> None: + LOGGER.info('Starting...') self.context_client.connect() create_context(self.context_client, DEFAULT_CONTEXT_NAME) - topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME] - create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids) + topology_names = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME] + create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_names) self.context_event_collector.start() + LOGGER.info('Running...') while not self.terminate.is_set(): event = self.context_event_collector.get_event(timeout=0.1) if event is None: continue @@ -79,6 +97,7 @@ class TopologyAbstractor(threading.Thread): LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event))) self.update_abstraction(event) + LOGGER.info('Stopping...') self.context_event_collector.stop() self.context_client.close() @@ -100,129 +119,174 @@ class TopologyAbstractor(threading.Thread): # return False def _get_or_create_abstract_device( - self, device_uuid : str, device_name : str, device_type : DeviceTypeEnum, dlt_record_sender : DltRecordSender, + self, device_name : str, device_type : DeviceTypeEnum, dlt_record_sender : DltRecordSender, abstract_topology_id : TopologyId ) -> AbstractDevice: - abstract_device = self.abstract_devices.get(device_uuid) + abstract_device = self.abstract_devices.get(device_name) changed = False if abstract_device is None: - abstract_device = AbstractDevice(device_uuid, device_name, device_type) + abstract_device = AbstractDevice(device_name, device_type) changed = abstract_device.initialize() if changed: dlt_record_sender.add_device(abstract_topology_id, abstract_device.device) - self.abstract_devices[device_uuid] = abstract_device - self.abstract_device_to_topology_id[device_uuid] = abstract_topology_id + self.abstract_devices[device_name] = abstract_device + self.abstract_device_to_topology_id[device_name] = abstract_topology_id return abstract_device def _update_abstract_device( self, device : Device, dlt_record_sender : DltRecordSender, abstract_topology_id : TopologyId, - abstract_device_uuid : Optional[str] = None + abstract_device_name : Optional[str] = None ) -> None: - device_uuid = device.device_id.device_uuid.uuid device_name = device.name if device_type_is_datacenter(device.device_type): - abstract_device_uuid = device_uuid + abstract_device_name = device_name abstract_device = self._get_or_create_abstract_device( - device_uuid, device_name, DeviceTypeEnum.EMULATED_DATACENTER, dlt_record_sender, abstract_topology_id) + device_name, DeviceTypeEnum.EMULATED_DATACENTER, dlt_record_sender, abstract_topology_id) elif device_type_is_network(device.device_type): LOGGER.warning('device_type is network; not implemented') return else: abstract_device = self._get_or_create_abstract_device( - abstract_device_uuid, None, DeviceTypeEnum.NETWORK, dlt_record_sender, abstract_topology_id) - self.real_to_abstract_device_uuid[device_uuid] = abstract_device_uuid + abstract_device_name, DeviceTypeEnum.NETWORK, dlt_record_sender, abstract_topology_id) + self.real_to_abstract_device[device_name] = abstract_device_name changed = abstract_device.update_endpoints(device) if changed: dlt_record_sender.add_device(abstract_topology_id, abstract_device.device) def _get_or_create_abstract_link( - self, link_uuid : str, dlt_record_sender : DltRecordSender, abstract_topology_id : TopologyId + self, link_name : str, dlt_record_sender : DltRecordSender, abstract_topology_id : TopologyId ) -> AbstractLink: - abstract_link = self.abstract_links.get(link_uuid) + abstract_link = self.abstract_links.get(link_name) changed = False if abstract_link is None: - abstract_link = AbstractLink(link_uuid) + abstract_link = AbstractLink(link_name) changed = abstract_link.initialize() if changed: dlt_record_sender.add_link(abstract_topology_id, abstract_link.link) - self.abstract_links[link_uuid] = abstract_link - self.abstract_link_to_topology_id[link_uuid] = abstract_topology_id + self.abstract_links[link_name] = abstract_link + self.abstract_link_to_topology_id[link_name] = abstract_topology_id return abstract_link def _get_link_endpoint_data(self, endpoint_id : EndPointId) -> Optional[Tuple[AbstractDevice, EndPoint]]: - device_uuid : str = endpoint_id.device_id.device_uuid.uuid - endpoint_uuid : str = endpoint_id.endpoint_uuid.uuid - abstract_device_uuid = self.real_to_abstract_device_uuid.get(device_uuid) - if abstract_device_uuid is None: return None - abstract_device = self.abstract_devices.get(abstract_device_uuid) + device_name : str = endpoint_id.device_id.device_uuid.uuid + endpoint_name : str = endpoint_id.endpoint_uuid.uuid + abstract_device_name = self.real_to_abstract_device.get(device_name) + if abstract_device_name is None: return None + abstract_device = self.abstract_devices.get(abstract_device_name) if abstract_device is None: return None - endpoint = abstract_device.get_endpoint(device_uuid, endpoint_uuid) + endpoint = abstract_device.get_endpoint(device_name, endpoint_name) if endpoint is None: return None return abstract_device, endpoint - def _compute_abstract_link(self, link : Link) -> Optional[str]: - if len(link.link_endpoint_ids) != 2: return None + def _compute_abstract_link(self, link : Link) -> Optional[Tuple[str, List[Tuple[str, str]]]]: + LOGGER.info('[_compute_abstract_link] link={:s}'.format(grpc_message_to_json_string(link))) + if len(link.link_endpoint_ids) != 2: + LOGGER.warning('[_compute_abstract_link] !=2 eps') + return None link_endpoint_data_A = self._get_link_endpoint_data(link.link_endpoint_ids[0]) - if link_endpoint_data_A is None: return None + if link_endpoint_data_A is None: + LOGGER.warning('[_compute_abstract_link] link_endpoint_data_A is None') + return None abstract_device_A, endpoint_A = link_endpoint_data_A - if not endpoint_type_is_border(endpoint_A.endpoint_type): return None + LOGGER.info('[_compute_abstract_link] abstract_device_A={:s}'.format(str(abstract_device_A.to_json()))) + LOGGER.info('[_compute_abstract_link] endpoint_A={:s}'.format(grpc_message_to_json_string(endpoint_A))) + if not endpoint_type_is_border(endpoint_A.endpoint_type): + LOGGER.warning('[_compute_abstract_link] endpoint_A not border') + return None link_endpoint_data_Z = self._get_link_endpoint_data(link.link_endpoint_ids[-1]) - if link_endpoint_data_Z is None: return None + if link_endpoint_data_Z is None: + LOGGER.warning('[_compute_abstract_link] link_endpoint_data_Z is None') + return None abstract_device_Z, endpoint_Z = link_endpoint_data_Z - if not endpoint_type_is_border(endpoint_Z.endpoint_type): return None - - link_uuid = AbstractLink.compose_uuid( - abstract_device_A.uuid, endpoint_A.endpoint_id.endpoint_uuid.uuid, - abstract_device_Z.uuid, endpoint_Z.endpoint_id.endpoint_uuid.uuid + LOGGER.info('[_compute_abstract_link] abstract_device_Z={:s}'.format(str(abstract_device_Z.to_json()))) + LOGGER.info('[_compute_abstract_link] endpoint_Z={:s}'.format(grpc_message_to_json_string(endpoint_Z))) + if not endpoint_type_is_border(endpoint_Z.endpoint_type): + LOGGER.warning('[_compute_abstract_link] endpoint_Z not border') + return None + + link_name = AbstractLink.compose_name( + abstract_device_A.name, endpoint_A.name, + abstract_device_Z.name, endpoint_Z.name ) + LOGGER.info('[_compute_abstract_link] link_name={:s}'.format(str(link_name))) # sort endpoints lexicographically to prevent duplicities - link_endpoint_uuids = sorted([ - (abstract_device_A.uuid, endpoint_A.endpoint_id.endpoint_uuid.uuid), - (abstract_device_Z.uuid, endpoint_Z.endpoint_id.endpoint_uuid.uuid) + link_endpoint_names = sorted([ + (abstract_device_A.name, endpoint_A.name), + (abstract_device_Z.name, endpoint_Z.name) ]) + LOGGER.info('[_compute_abstract_link] link_endpoint_names={:s}'.format(str(link_endpoint_names))) - return link_uuid, link_endpoint_uuids + return link_name, link_endpoint_names def _update_abstract_link( self, link : Link, dlt_record_sender : DltRecordSender, abstract_topology_id : TopologyId ) -> None: - abstract_link_specs = self._compute_abstract_link(link) - if abstract_link_specs is None: return - abstract_link_uuid, link_endpoint_uuids = abstract_link_specs + LOGGER.info('[_update_abstract_link] link={:s}'.format(grpc_message_to_json_string(link))) + LOGGER.info('[_update_abstract_link] abstract_topology_id={:s}'.format(grpc_message_to_json_string(abstract_topology_id))) - abstract_link = self._get_or_create_abstract_link(abstract_link_uuid, dlt_record_sender, abstract_topology_id) - link_uuid = link.link_id.link_uuid.uuid - self.real_to_abstract_link_uuid[link_uuid] = abstract_link_uuid - changed = abstract_link.update_endpoints(link_endpoint_uuids) + abstract_link_specs = self._compute_abstract_link(link) + if abstract_link_specs is None: + LOGGER.warning('[_update_abstract_link] abstract_link_specs is None') + return + abstract_link_name, link_endpoint_names = abstract_link_specs + LOGGER.info('[_update_abstract_link] abstract_link_name={:s}'.format(str(abstract_link_name))) + LOGGER.info('[_update_abstract_link] link_endpoint_names={:s}'.format(str(link_endpoint_names))) + + abstract_link = self._get_or_create_abstract_link(abstract_link_name, dlt_record_sender, abstract_topology_id) + LOGGER.info('[_update_abstract_link] abstract_link={:s}'.format(str(abstract_link.to_json()))) + link_name = link.name + LOGGER.info('[_update_abstract_link] link_name={:s}'.format(str(link_name))) + self.real_to_abstract_link[link_name] = abstract_link_name + changed = abstract_link.update_endpoints(link_endpoint_names) + LOGGER.info('[_update_abstract_link] changed={:s}'.format(str(changed))) if changed: dlt_record_sender.add_link(abstract_topology_id, abstract_link.link) def _infer_abstract_links(self, device : Device, dlt_record_sender : DltRecordSender) -> None: - device_uuid = device.device_id.device_uuid.uuid + device_name = device.name + LOGGER.info('[_infer_abstract_links] device_name={:s}'.format(str(device_name))) - interdomain_device_uuids = get_uuids_of_devices_in_topology( + interdomain_devices = get_devices_in_topology( self.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME) + interdomain_devices = { + interdomain_device.name : interdomain_device + for interdomain_device in interdomain_devices + } + LOGGER.info('[_infer_abstract_links] interdomain_devices={:s}'.format(str(interdomain_devices))) for endpoint in device.device_endpoints: + LOGGER.info('[_infer_abstract_links] endpoint={:s}'.format(grpc_message_to_json_string(endpoint))) if not endpoint_type_is_border(endpoint.endpoint_type): continue - endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid - - abstract_link_uuid = AbstractLink.compose_uuid(device_uuid, endpoint_uuid, endpoint_uuid, device_uuid) - if abstract_link_uuid in self.abstract_links: continue - - if endpoint_uuid not in interdomain_device_uuids: continue - remote_device = self.context_client.GetDevice(DeviceId(**json_device_id(endpoint_uuid))) - remote_device_border_endpoint_uuids = { - endpoint.endpoint_id.endpoint_uuid.uuid : endpoint.endpoint_type + endpoint_name = endpoint.name + LOGGER.info('[_infer_abstract_links] endpoint_name={:s}'.format(str(endpoint_name))) + + abstract_link_name = AbstractLink.compose_name(device_name, endpoint_name, endpoint_name, device_name) + LOGGER.info('[_infer_abstract_links] abstract_link_name={:s}'.format(str(abstract_link_name))) + LOGGER.info('[_infer_abstract_links] abstract_links={:s}'.format(str({ + abstract_link_name:abstract_link_obj.to_json() + for abstract_link_name,abstract_link_obj in self.abstract_links.items() + }))) + if abstract_link_name in self.abstract_links: continue + + remote_device = interdomain_devices.get(endpoint_name) + LOGGER.info('[_infer_abstract_links] remote_device={:s}'.format(grpc_message_to_json_string(remote_device))) + if remote_device is None: continue + + remote_device_border_endpoint_names = { + endpoint.name : endpoint.endpoint_type for endpoint in remote_device.device_endpoints if endpoint_type_is_border(endpoint.endpoint_type) } - if device_uuid not in remote_device_border_endpoint_uuids: continue + LOGGER.info('[_infer_abstract_links] remote_device_border_endpoint_names={:s}'.format( + str(remote_device_border_endpoint_names))) + if device_name not in remote_device_border_endpoint_names: continue - link_endpoint_uuids = sorted([(device_uuid, endpoint_uuid), (endpoint_uuid, device_uuid)]) + link_endpoint_names = sorted([(device_name, endpoint_name), (endpoint_name, device_name)]) + LOGGER.info('[_infer_abstract_links] link_endpoint_names={:s}'.format(str(link_endpoint_names))) abstract_link = self._get_or_create_abstract_link( - abstract_link_uuid, dlt_record_sender, INTERDOMAIN_TOPOLOGY_ID) - changed = abstract_link.update_endpoints(link_endpoint_uuids) + abstract_link_name, dlt_record_sender, INTERDOMAIN_TOPOLOGY_ID) + LOGGER.info('[_infer_abstract_links] abstract_link={:s}'.format(str(abstract_link.to_json()))) + changed = abstract_link.update_endpoints(link_endpoint_names) if changed: dlt_record_sender.add_link(INTERDOMAIN_TOPOLOGY_ID, abstract_link.link) def update_abstraction(self, event : EventTypes) -> None: @@ -239,77 +303,134 @@ class TopologyAbstractor(threading.Thread): dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client) - if isinstance(event, ContextEvent): - LOGGER.debug('Processing ContextEvent({:s})'.format(grpc_message_to_json_string(event))) - LOGGER.warning('Ignoring ContextEvent({:s})'.format(grpc_message_to_json_string(event))) + if isinstance(event, TopologyEvent): + self._process_event_topology(event, dlt_record_sender) + elif isinstance(event, DeviceEvent): + self._process_event_device(event, dlt_record_sender) + elif isinstance(event, LinkEvent): + self._process_event_link(event, dlt_record_sender) + else: + LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event))) - elif isinstance(event, TopologyEvent): - LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event))) - topology_id = event.topology_id - topology_uuid = topology_id.topology_uuid.uuid - context_id = topology_id.context_id - context_uuid = context_id.context_uuid.uuid - topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME} + dlt_record_sender.commit() + if dlt_connector_client is not None: dlt_connector_client.close() - context = self.context_client.GetContext(context_id) - context_name = context.name + def _process_event_topology(self, event : TopologyEvent, dlt_record_sender : DltRecordSender) -> None: + LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event))) + topology_id = event.topology_id + topology_uuid = topology_id.topology_uuid.uuid + context_uuid = topology_id.context_id.context_uuid.uuid + topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME} + + context = get_context(self.context_client, context_uuid, rw_copy=True) + if context is None: + MSG = 'Ignoring TopologyEvent({:s})... Context not found' + LOGGER.warning(MSG.format(grpc_message_to_json_string(event))) + return - topology_details = self.context_client.GetTopologyDetails(topology_id) - topology_name = topology_details.name + context_name = context.name + context.context_id.context_uuid.uuid = context_name - if ((context_uuid == DEFAULT_CONTEXT_NAME) or (context_name == DEFAULT_CONTEXT_NAME)) and \ - (topology_uuid not in topology_uuids) and (topology_name not in topology_uuids): + topology_details = get_topology_details( + self.context_client, topology_uuid, context_uuid=context_uuid, rw_copy=True) + if topology_details is None: + MSG = 'Ignoring TopologyEvent({:s})... TopologyDetails not found' + LOGGER.warning(MSG.format(grpc_message_to_json_string(event))) + return - abstract_topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=ADMIN_CONTEXT_ID)) - self._get_or_create_abstract_device( - topology_uuid, topology_name, DeviceTypeEnum.NETWORK, dlt_record_sender, abstract_topology_id) + topology_name = topology_details.name + topology_details.topology_id.topology_uuid.uuid = topology_name + topology_details.topology_id.context_id.context_uuid.uuid = context_name + + LOGGER.info(' context_uuid={:s}'.format(str(context_uuid))) + LOGGER.info(' context_name={:s}'.format(str(context_name))) + LOGGER.info(' DEFAULT_CONTEXT_NAME={:s}'.format(str(DEFAULT_CONTEXT_NAME))) + if (context_uuid != DEFAULT_CONTEXT_NAME) and (context_name != DEFAULT_CONTEXT_NAME): + MSG = 'Ignoring ({:s}/{:s})({:s}/{:s}) TopologyEvent({:s})... Wrong context name' + args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event) + LOGGER.warning(MSG.format(*args)) + + LOGGER.info(' topology_uuid={:s}'.format(str(topology_uuid))) + LOGGER.info(' topology_name={:s}'.format(str(topology_name))) + LOGGER.info(' topology_uuids={:s}'.format(str(topology_uuids))) + if (topology_uuid in topology_uuids) or (topology_name in topology_uuids): + MSG = 'Ignoring ({:s}/{:s})({:s}/{:s}) TopologyEvent({:s})... Not a domain topology' + args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event) + LOGGER.warning(MSG.format(*args)) + + abstract_topology_id = TopologyId(**json_topology_id(topology_name, context_id=ADMIN_CONTEXT_ID)) + self._get_or_create_abstract_device( + topology_name, DeviceTypeEnum.NETWORK, dlt_record_sender, abstract_topology_id) + + for device in topology_details.devices: + replace_device_uuids_by_names( + self.context_client, device, context_names_cache=self.context_names_cache, + topology_names_cache=self.topology_names_cache) + self._update_abstract_device( + device, dlt_record_sender, abstract_topology_id, abstract_device_name=topology_name) + + for link in topology_details.links: + replace_link_uuids_by_names( + self.context_client, link, context_names_cache=self.context_names_cache, + topology_names_cache=self.topology_names_cache) + self._update_abstract_link(link, dlt_record_sender, abstract_topology_id) + + for device in topology_details.devices: + self._infer_abstract_links(device, dlt_record_sender) - #devices = get_devices_in_topology(self.context_client, context_id, topology_uuid) - for device in topology_details.devices: - self._update_abstract_device( - device, dlt_record_sender, abstract_topology_id, abstract_device_uuid=topology_uuid) + def _process_event_device(self, event : DeviceEvent, dlt_record_sender : DltRecordSender) -> None: + LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event))) - #links = get_links_in_topology(self.context_client, context_id, topology_uuid) - for link in topology_details.links: - self._update_abstract_link(link, dlt_record_sender, abstract_topology_id) - - for device in topology_details.devices: - self._infer_abstract_links(device, dlt_record_sender) + device_uuid = event.device_id.device_uuid.uuid + device = get_device( + self.context_client, device_uuid, rw_copy=True, include_endpoints=True, + include_config_rules=False, include_components=False) + if device is None: + MSG = 'Ignoring DeviceEvent({:s})... Device not found' + LOGGER.warning(MSG.format(grpc_message_to_json_string(event))) + return - else: - MSG = 'Ignoring ({:s}/{:s})({:s}/{:s}) TopologyEvent({:s})' - args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event) - LOGGER.warning(MSG.format(*args)) + replace_device_uuids_by_names( + self.context_client, device, context_names_cache=self.context_names_cache, + topology_names_cache=self.topology_names_cache) - elif isinstance(event, DeviceEvent): - LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event))) - device_id = event.device_id - device_uuid = device_id.device_uuid.uuid - abstract_device_uuid = self.real_to_abstract_device_uuid.get(device_uuid) - device = self.context_client.GetDevice(device_id) - if abstract_device_uuid is None: - LOGGER.warning('Ignoring DeviceEvent({:s})'.format(grpc_message_to_json_string(event))) - else: - abstract_topology_id = self.abstract_device_to_topology_id[abstract_device_uuid] - self._update_abstract_device( - device, dlt_record_sender, abstract_topology_id, abstract_device_uuid=abstract_device_uuid) + device_name = device.name + LOGGER.info(' device_name={:s}'.format(str(device_name))) + LOGGER.info(' real_to_abstract_device={:s}'.format(str(self.real_to_abstract_device))) + abstract_device_name = self.real_to_abstract_device.get(device_name) + if abstract_device_name is None: + MSG = 'Ignoring DeviceEvent({:s}). Abstract Device not found' + LOGGER.warning(MSG.format(grpc_message_to_json_string(event))) + return - self._infer_abstract_links(device, dlt_record_sender) + abstract_topology_id = self.abstract_device_to_topology_id[abstract_device_name] + self._update_abstract_device( + device, dlt_record_sender, abstract_topology_id, abstract_device_name=abstract_device_name) - elif isinstance(event, LinkEvent): - LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event))) - link_id = event.link_id - link_uuid = link_id.link_uuid.uuid - abstract_link_uuid = self.real_to_abstract_link_uuid.get(link_uuid) - if abstract_link_uuid is None: - LOGGER.warning('Ignoring LinkEvent({:s})'.format(grpc_message_to_json_string(event))) - else: - abstract_topology_id = self.abstract_link_to_topology_id[abstract_link_uuid] - link = self.context_client.GetLink(link_id) - self._update_abstract_link(link, dlt_record_sender, abstract_topology_id) + self._infer_abstract_links(device, dlt_record_sender) - else: - LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event))) + def _process_event_link(self, event : LinkEvent, dlt_record_sender : DltRecordSender) -> None: + LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event))) - dlt_record_sender.commit() - if dlt_connector_client is not None: dlt_connector_client.close() + link_uuid = event.link_id.link_uuid.uuid + link = get_link(self.context_client, link_uuid, rw_copy=True) + if link is None: + MSG = ' Ignoring LinkEvent({:s})... Link not found' + LOGGER.warning(MSG.format(grpc_message_to_json_string(event))) + return + + replace_link_uuids_by_names( + self.context_client, link, context_names_cache=self.context_names_cache, + topology_names_cache=self.topology_names_cache) + + link_name = link.name + LOGGER.info(' link_name={:s}'.format(str(link_name))) + LOGGER.info(' real_to_abstract_link={:s}'.format(str(self.real_to_abstract_link))) + abstract_link_name = self.real_to_abstract_link.get(link_name) + if abstract_link_name is None: + MSG = ' Ignoring LinkEvent({:s}). Abstract Link not found' + LOGGER.warning(MSG.format(grpc_message_to_json_string(event))) + return + + abstract_topology_id = self.abstract_link_to_topology_id[abstract_link_name] + self._update_abstract_link(link, dlt_record_sender, abstract_topology_id)