# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from typing import List, Set, Union from common.DeviceTypes import DeviceTypeEnum from common.proto.context_pb2 import Context, ContextId, Device, Empty, Link, Topology, TopologyId from common.tools.object_factory.Context import json_context, json_context_id from common.tools.object_factory.Topology import json_topology, json_topology_id from context.client.ContextClient import ContextClient LOGGER = logging.getLogger(__name__) def create_context( context_client : ContextClient, context_uuid : str ) -> None: existing_context_ids = context_client.ListContextIds(Empty()) existing_context_uuids = {context_id.context_uuid.uuid for context_id in existing_context_ids.context_ids} if context_uuid in existing_context_uuids: return context_client.SetContext(Context(**json_context(context_uuid))) def create_topology( context_client : ContextClient, context_uuid : str, topology_uuid : str ) -> None: context_id = ContextId(**json_context_id(context_uuid)) existing_topology_ids = context_client.ListTopologyIds(context_id) existing_topology_uuids = {topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids} if topology_uuid in existing_topology_uuids: return context_client.SetTopology(Topology(**json_topology(topology_uuid, context_id=context_id))) def create_missing_topologies( context_client : ContextClient, context_id : ContextId, topology_uuids : List[str] ) -> None: # Find existing topologies within own context existing_topology_ids = context_client.ListTopologyIds(context_id) existing_topology_uuids = {topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids} # Create topologies within provided context for topology_uuid in topology_uuids: if topology_uuid in existing_topology_uuids: continue grpc_topology = Topology(**json_topology(topology_uuid, context_id=context_id)) context_client.SetTopology(grpc_topology) def get_existing_device_uuids(context_client : ContextClient) -> Set[str]: existing_device_ids = context_client.ListDeviceIds(Empty()) existing_device_uuids = {device_id.device_uuid.uuid for device_id in existing_device_ids.device_ids} return existing_device_uuids def get_existing_link_uuids(context_client : ContextClient) -> Set[str]: existing_link_ids = context_client.ListLinkIds(Empty()) existing_link_uuids = {link_id.link_uuid.uuid for link_id in existing_link_ids.link_ids} return existing_link_uuids def add_device_to_topology( context_client : ContextClient, context_id : ContextId, topology_uuid : str, device_uuid : str ) -> bool: topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id)) topology_ro = context_client.GetTopology(topology_id) device_uuids = {device_id.device_uuid.uuid for device_id in topology_ro.device_ids} if device_uuid in device_uuids: return False # already existed topology_rw = Topology() topology_rw.CopyFrom(topology_ro) topology_rw.device_ids.add().device_uuid.uuid = device_uuid context_client.SetTopology(topology_rw) return True def add_link_to_topology( context_client : ContextClient, context_id : ContextId, topology_uuid : str, link_uuid : str ) -> bool: topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id)) topology_ro = context_client.GetTopology(topology_id) link_uuids = {link_id.link_uuid.uuid for link_id in topology_ro.link_ids} if link_uuid in link_uuids: return False # already existed topology_rw = Topology() topology_rw.CopyFrom(topology_ro) topology_rw.link_ids.add().link_uuid.uuid = link_uuid context_client.SetTopology(topology_rw) return True def get_uuids_of_devices_in_topology( context_client : ContextClient, context_id : ContextId, topology_uuid : str ) -> List[str]: topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id)) topology = context_client.GetTopology(topology_id) device_uuids = [device_id.device_uuid.uuid for device_id in topology.device_ids] return device_uuids def get_uuids_of_links_in_topology( context_client : ContextClient, context_id : ContextId, topology_uuid : str ) -> List[str]: topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id)) topology = context_client.GetTopology(topology_id) link_uuids = [link_id.link_uuid.uuid for link_id in topology.link_ids] return link_uuids def get_devices_in_topology( context_client : ContextClient, context_id : ContextId, topology_uuid : str ) -> List[Device]: device_uuids = get_uuids_of_devices_in_topology(context_client, context_id, topology_uuid) all_devices = context_client.ListDevices(Empty()) devices_in_topology = list() for device in all_devices.devices: device_uuid = device.device_id.device_uuid.uuid if device_uuid not in device_uuids: continue devices_in_topology.append(device) return devices_in_topology def get_links_in_topology( context_client : ContextClient, context_id : ContextId, topology_uuid : str ) -> List[Link]: link_uuids = get_uuids_of_links_in_topology(context_client, context_id, topology_uuid) all_links = context_client.ListLinks(Empty()) links_in_topology = list() for link in all_links.links: link_uuid = link.link_id.link_uuid.uuid if link_uuid not in link_uuids: continue links_in_topology.append(link) return links_in_topology def device_type_is_datacenter(device_type : Union[str, DeviceTypeEnum]) -> bool: return device_type in { DeviceTypeEnum.DATACENTER, DeviceTypeEnum.DATACENTER.value, DeviceTypeEnum.EMULATED_DATACENTER, DeviceTypeEnum.EMULATED_DATACENTER.value } def device_type_is_network(device_type : Union[str, DeviceTypeEnum]) -> bool: return device_type in {DeviceTypeEnum.NETWORK, DeviceTypeEnum.NETWORK.value} def endpoint_type_is_border(endpoint_type : str) -> bool: return str(endpoint_type).endswith('/border')