Commit 92dfd5b4 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Service component:

- Removed deprecated PathComputationElement class
- Migrated old L3NMOpenConfig Service Handler to the new framework
- Removed unneeded file in L3NMEmulated Service Handler
parent f2a1b9ea
Loading
Loading
Loading
Loading
+0 −37
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum

class NodeTypeEnum(Enum):
    DEVICE   = 'device'
    ENDPOINT = 'endpoint'

class EndPointTypeEnum(Enum):
    COPPER    = 'copper'
    OPTICAL   = 'optical'
    MICROWAVE = 'microwave'

class EdgeTypeEnum(Enum):
    INTERNAL  = 'internal'
    COPPER    = 'copper'
    OPTICAL   = 'optical'
    MICROWAVE = 'microwave'
    VIRTUAL   = 'virtual'
    OTHER     = 'other'

class LayerTypeEnum(Enum):
    COPPER    = 'copper'
    OPTICAL   = 'optical'
    MICROWAVE = 'microwave'
+0 −373
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, json, logging, networkx, uuid
from queue import Queue
from typing import Dict, List, Tuple
from networkx.drawing.nx_pydot import write_dot
from common.Constants import DEFAULT_CONTEXT_UUID
from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import (
    ConfigActionEnum, Connection, Device, Empty, EndPoint, EndPointId, Service, ServiceId, ServiceStatusEnum,
    ServiceTypeEnum)
from common.tools.grpc.Tools import (
    grpc_message_list_to_json, grpc_message_list_to_json_string, grpc_message_to_json, grpc_message_to_json_string)
from context.client.ContextClient import ContextClient
from .Enums import EdgeTypeEnum, NodeTypeEnum
from .Tools import get_device, get_device_key, get_edge_type, get_endpoint, get_endpoint_key, get_link_key

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)

SUB_SERVICE_TYPES = {
    DeviceTypeEnum.EMULATED_PACKET_ROUTER.value   : ServiceTypeEnum.SERVICETYPE_L3NM,
    DeviceTypeEnum.EMULATED_OPEN_LINE_SYSTEM.value: ServiceTypeEnum.SERVICETYPE_TAPI_CONNECTIVITY_SERVICE,
    DeviceTypeEnum.PACKET_ROUTER.value            : ServiceTypeEnum.SERVICETYPE_L3NM,
    DeviceTypeEnum.OPEN_LINE_SYSTEM.value         : ServiceTypeEnum.SERVICETYPE_TAPI_CONNECTIVITY_SERVICE,
}
DEFAULT_SUB_SERVICE_TYPE = ServiceTypeEnum.SERVICETYPE_UNKNOWN

def dump_requirements(requirements):
    if requirements is None: return None
    return [
        {
            'sub_service': grpc_message_to_json(sub_service),
            'sub_connections': grpc_message_list_to_json(sub_connections),
        }
        for sub_service,sub_connections in requirements
    ]

def dump_connectivity(connectivity):
    if connectivity is None: return None
    return {
        'service': grpc_message_to_json(connectivity.get('service')),
        'connections': grpc_message_list_to_json(connectivity.get('connections', [])),
        'requirements': dump_requirements(connectivity.get('requirements', [])),
    }

def dump_hops(hops):
    if hops is None: return None
    return [
        'in_endpoint={:s}, device={:s}, out_endpoint={:s}'.format(
            grpc_message_to_json_string(in_endpoint), grpc_message_to_json_string(device),
            grpc_message_to_json_string(out_endpoint))
        for in_endpoint,device,out_endpoint in hops
    ]

class PathComputationElement:
    def __init__(self) -> None:
        self.__topology = networkx.Graph()
        self.__connectivity = {} # (a_ep_key, z_ep_key) => {service, connection, sub_services: [], sub_connections: []}
        self.__service_endpoints = {} # (context_uuid, service_uuid) => (a_ep_key, z_ep_key)

    def dump_topology_to_file(self, dot_filepath : str):
        write_dot(self.__topology, dot_filepath)

    def dump_connectivity_to_file(self, filepath : str):
        with open(filepath, 'w', encoding='UTF-8') as f:
            f.write(str(self.__connectivity)) # do not use json; it contains protobuf objects

    def load_topology(self, context_client : ContextClient):
        response_devices = context_client.ListDevices(Empty())
        devices = response_devices.devices
        LOGGER.debug('Devices[{:d}] = {:s}'.format(len(devices), grpc_message_to_json_string(response_devices)))
        assert len(devices) > 0

        response_links = context_client.ListLinks(Empty())
        links = response_links.links
        LOGGER.debug('Links[{:d}] = {:s}'.format(len(links), grpc_message_to_json_string(response_links)))
        assert len(links) > 0

        for device in response_devices.devices:
            device_uuid = get_device_key(device.device_id)
            self.__topology.add_node(device_uuid, node_type=NodeTypeEnum.DEVICE, device=device)
            for endpoint in device.device_endpoints:
                endpoint_key = get_endpoint_key(endpoint.endpoint_id, device_uuid=device_uuid)
                self.__topology.add_node(endpoint_key, node_type=NodeTypeEnum.ENDPOINT, endpoint=endpoint)
                self.__topology.add_edge(device_uuid, endpoint_key, edge_type=EdgeTypeEnum.INTERNAL)

        for link in response_links.links:
            link_uuid = get_link_key(link.link_id)
            if len(link.link_endpoint_ids) != 2:
                msg = 'Link({:s}) with {:d} != 2 endpoints'
                raise NotImplementedError(msg.format(link_uuid, len(link.link_endpoint_ids)))
            endpoint_keys = [get_endpoint_key(endpoint_id) for endpoint_id in link.link_endpoint_ids]
            edge_type = get_edge_type(self.__topology, endpoint_keys)
            self.__topology.add_edge(endpoint_keys[0], endpoint_keys[1], link=link, edge_type=edge_type)

    def load_connectivity(self, context_client : ContextClient, service_id : ServiceId):
        pending_service_ids = Queue()
        pending_service_ids.put((service_id, True))

        connectivity = {}
        requirements : List[Tuple[Service, List[Connection]]] = connectivity.setdefault('requirements', [])
        connections : List[Connection] = connectivity.setdefault('connections', [])

        while not pending_service_ids.empty():
            service_id,is_main = pending_service_ids.get()

            try:
                service = context_client.GetService(service_id)
                LOGGER.debug('[load_connectivity] GetService({:s}) = {:s}'.format(
                    grpc_message_to_json_string(service_id), grpc_message_to_json_string(service)))
            except grpc.RpcError as e:
                if is_main and e.code() == grpc.StatusCode.NOT_FOUND: continue # pylint: disable=no-member
                raise

            # TODO: implement support for services with more than 2 endpoints;
            # Right now, services with less than 2 endpoints are ignored; with more than 2 endpoints throws exception.
            # e.g., compute pairs such as:
            #   list(filter(lambda ep: ep[0] < ep[1], itertools.product(service_endpoint_ids, service_endpoint_ids)))
            service_endpoint_ids = service.service_endpoint_ids
            if len(service_endpoint_ids) < 2: continue
            if len(service_endpoint_ids) > 2: raise NotImplementedError('Service with more than 2 endpoints')

            service_connections = context_client.ListConnections(service_id)
            LOGGER.debug('[load_connectivity] ListConnections({:s}) = {:s}'.format(
                grpc_message_to_json_string(service_id), grpc_message_to_json_string(service_connections)))

            if is_main:
                connectivity['service'] = service
                a_endpoint_key = get_endpoint_key(service_endpoint_ids[0])
                z_endpoint_key = get_endpoint_key(service_endpoint_ids[-1])
                self.__connectivity[(a_endpoint_key, z_endpoint_key)] = connectivity
                self.__connectivity[(z_endpoint_key, a_endpoint_key)] = connectivity
                context_service_id = (service_id.context_id.context_uuid.uuid, service_id.service_uuid.uuid)
                self.__service_endpoints[context_service_id] = (a_endpoint_key, z_endpoint_key)
                connections.extend(service_connections.connections)
            else:
                requirements.append((service, service_connections.connections))

            for connection in service_connections.connections:
                for service_id in connection.sub_service_ids:
                    pending_service_ids.put((service_id, False))

    def get_connectivity_from_service_id(self, service_id : ServiceId) -> Dict:
        LOGGER.debug('[get_connectivity_from_service_id] service_id={:s}'.format(
            grpc_message_to_json_string(service_id)))

        context_uuid = service_id.context_id.context_uuid.uuid
        service_uuid = service_id.service_uuid.uuid
        context_service_id = (context_uuid, service_uuid)
        if context_service_id in self.__service_endpoints:
            a_endpoint_key, z_endpoint_key = self.__service_endpoints[context_service_id]
        else:
            return None

        if (a_endpoint_key, z_endpoint_key) in self.__connectivity:
            connectivity = self.__connectivity.get((a_endpoint_key, z_endpoint_key))
        elif (z_endpoint_key, a_endpoint_key) in self.__connectivity:
            connectivity = self.__connectivity.get((z_endpoint_key, a_endpoint_key))
        else:
            connectivity = None

        if connectivity is None or connectivity.get('connections') is None: return None
        LOGGER.debug('[get_connectivity_from_service_id] Connectivity: {:s}'.format(
            str(dump_connectivity(connectivity))))
        return connectivity

    def get_connectivity(self, service : Service) -> Tuple[Dict, str, str]:
        LOGGER.debug('[get_connectivity] service.service_id = {:s}'.format(
            grpc_message_to_json_string(service.service_id)))

        context_uuid = service.service_id.context_id.context_uuid.uuid
        service_uuid = service.service_id.service_uuid.uuid
        context_service_id = (context_uuid, service_uuid)

        if context_service_id in self.__service_endpoints:
            LOGGER.debug('[get_connectivity] exists')
            a_endpoint_key, z_endpoint_key = self.__service_endpoints[context_service_id]

            LOGGER.debug('[get_connectivity] a_endpoint_key={:s}'.format(str(a_endpoint_key)))
            LOGGER.debug('[get_connectivity] z_endpoint_key={:s}'.format(str(z_endpoint_key)))
        else:
            LOGGER.debug('[get_connectivity] new')
            # TODO: implement support for services with more than 2 endpoints;
            # Right now, less than 2 reports None, more than 2 endpoints throws an exception.
            # e.g., compute pairs such as:
            #   list(filter(lambda ep: ep[0] < ep[1], itertools.product(service_endpoint_ids, service_endpoint_ids)))
            service_endpoint_ids = service.service_endpoint_ids

            LOGGER.debug('[get_connectivity] service_endpoint_ids[{:d}] = {:s}'.format(
                len(service_endpoint_ids), grpc_message_list_to_json_string(service_endpoint_ids)))

            if len(service_endpoint_ids) < 2: return None
            if len(service_endpoint_ids) > 2: raise NotImplementedError('Service with more than 2 endpoints')

            a_endpoint_key = get_endpoint_key(service_endpoint_ids[0])
            z_endpoint_key = get_endpoint_key(service_endpoint_ids[-1])
            LOGGER.debug('[get_connectivity] a_endpoint_key={:s}'.format(str(a_endpoint_key)))
            LOGGER.debug('[get_connectivity] z_endpoint_key={:s}'.format(str(z_endpoint_key)))

        if (a_endpoint_key, z_endpoint_key) in self.__connectivity:
            connectivity = self.__connectivity.get((a_endpoint_key, z_endpoint_key))
        elif (z_endpoint_key, a_endpoint_key) in self.__connectivity:
            connectivity = self.__connectivity.get((z_endpoint_key, a_endpoint_key))
        else:
            connectivity = None

        LOGGER.debug('[get_connectivity] connectivity = {:s}'.format(str(dump_connectivity(connectivity))))

        if connectivity is None or connectivity.get('connections') is None: return None, a_endpoint_key, z_endpoint_key
        return connectivity, a_endpoint_key, z_endpoint_key

    def route_service(self, service : Service):
        if self.__topology is None: raise Exception('Topology has not been loaded')

        connectivity = self.get_connectivity(service)
        if connectivity is None:
            LOGGER.debug('[route_service] connectivity = None')
            return None
        _, a_endpoint_key, z_endpoint_key = connectivity

        LOGGER.debug('[route_service] a_endpoint_key={:s}'.format(str(a_endpoint_key)))
        LOGGER.debug('[route_service] z_endpoint_key={:s}'.format(str(z_endpoint_key)))

        # TODO: consider implementing something like a K-shortest path instead of a simple shortest path
        # pylint: disable=no-value-for-parameter,unexpected-keyword-arg
        #paths = networkx.all_shortest_paths(self.__topology, source=a_endpoint_key, target=z_endpoint_key)
        path_node_keys = networkx.shortest_path(
            self.__topology, source=a_endpoint_key, target=z_endpoint_key)
        LOGGER.debug('[route_service] Path[{:d}] = {:s}'.format(len(path_node_keys), str(path_node_keys)))

        if len(path_node_keys) % 3 != 0:
            msg = 'Weird path: length({:d}) mod 3 != 0. Path should be a sequence of endpoint-device-endpoint, '\
                  ' so it must be multiple of 3'
            raise Exception(msg.format(len(path_node_keys)))

        hops : List[Tuple[EndPoint, Device, EndPoint]] = []
        device_type__to__hops : Dict[str, List[int]] = {}
        for i in range(0, len(path_node_keys), 3):
            hop_device = get_device(self.__topology, path_node_keys[i+1])
            hop_a_endpoint = get_endpoint(self.__topology, path_node_keys[i+0])
            hop_z_endpoint = get_endpoint(self.__topology, path_node_keys[i+2])

            hop_device_key = get_device_key(hop_device.device_id)
            hop_a_endpoint_device_key = get_device_key(hop_a_endpoint.endpoint_id.device_id)
            hop_z_endpoint_device_key = get_device_key(hop_z_endpoint.endpoint_id.device_id)

            if hop_a_endpoint_device_key != hop_device_key:
                msg = 'Weird path: Hop[{:d}] a_endpoint.device({:s}) != device({:s})'
                raise Exception(msg.format(i/3, str(hop_a_endpoint_device_key), str(hop_device_key)))
            if hop_z_endpoint_device_key != hop_device_key:
                msg = 'Weird path: Hop[{:d}] z_endpoint.device({:s}) != device({:s})'
                raise Exception(msg.format(i/3, str(hop_z_endpoint_device_key), str(hop_device_key)))

            hops.append((hop_a_endpoint, hop_device, hop_z_endpoint))
            device_type__to__hops.setdefault(hop_device.device_type, []).append(len(hops) - 1)

        LOGGER.debug('[route_service] hops[{:d}] = {:s}'.format(
            len(hops), str(dump_hops(hops))))
        LOGGER.debug('[route_service] device_type__to__hops = {:s}'.format(str(device_type__to__hops)))

        context_uuid = service.service_id.context_id.context_uuid.uuid
        service_uuid = service.service_id.service_uuid.uuid

        # create main service's connection
        main_service_device_type = hops[0][1].device_type
        main_service_hop_indexes = device_type__to__hops.pop(main_service_device_type)

        # create sub-service and sub-services' connections
        sub_service_ids = []
        requirements : List[Tuple[Service, List[Connection]]] = []
        for sub_service_device_type, sub_service_hop_indexes in device_type__to__hops.items():
            LOGGER.debug('[route_service] sub_service_device_type = {:s}'.format(str(sub_service_device_type)))
            LOGGER.debug('[route_service] sub_service_hop_indexes = {:s}'.format(str(sub_service_hop_indexes)))

            # create sub-service
            sub_service_uuid = '{:s}:optical'.format(service_uuid)
            sub_service_id = {
                'context_id': {'context_uuid': {'uuid': context_uuid}},
                'service_uuid': {'uuid': sub_service_uuid},
            }
            sub_service = Service(**{
                'service_id': sub_service_id,
                'service_type' : SUB_SERVICE_TYPES.get(sub_service_device_type, DEFAULT_SUB_SERVICE_TYPE),
                'service_status': {'service_status': ServiceStatusEnum.SERVICESTATUS_PLANNED},
                'service_endpoint_ids': [
                    hops[sub_service_hop_indexes[ 0]][ 0].endpoint_id,
                    hops[sub_service_hop_indexes[-1]][-1].endpoint_id,
                ],
                'service_config': {'config_rules': [
                    {
                        'action': ConfigActionEnum.CONFIGACTION_SET,
                        'custom': {
                            'resource_key': 'settings',
                            'resource_value': json.dumps({
                                'capacity_value':   1,
                                'capacity_unit':    'GHz',
                                'layer_proto_name': 'PHOTONIC_MEDIA',
                                'layer_proto_qual': 'tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC',
                                'direction':        'UNIDIRECTIONAL',
                            }, sort_keys=True),
                        }
                    }
                ]},
            })
            LOGGER.debug('[route_service] sub_service = {:s}'.format(grpc_message_to_json_string(sub_service)))

            # create sub-service's connection
            sub_connection_uuid = '{:s}:{:s}'.format(sub_service_uuid, sub_service_device_type)
            sub_conn_path_hops = []
            for i in sub_service_hop_indexes:
                sub_conn_path_hops.extend([hops[i][0].endpoint_id, hops[i][2].endpoint_id])
            sub_connection = Connection(**{
                'connection_id': {'connection_uuid': {'uuid': sub_connection_uuid}},
                'service_id': sub_service_id,
                'path_hops_endpoint_ids': sub_conn_path_hops,
            })

            LOGGER.debug('[route_service] sub_connection = {:s}'.format(grpc_message_to_json_string(sub_connection)))

            sub_service_ids.append(sub_service_id)
            requirements.append((sub_service, [sub_connection]))

        LOGGER.debug('[route_service] sub_service_ids = {:s}'.format(str(sub_service_ids)))
        LOGGER.debug('[route_service] requirements = {:s}'.format(str(dump_requirements(requirements))))
        LOGGER.debug('[route_service] requirements[{:d}] = {:s}'.format(
            len(requirements), str(dump_requirements(requirements))))

        connections : List[Connection] = []

        connection_uuid = '{:s}:{:s}'.format(service_uuid, main_service_device_type)
        connection_path_hops : List[EndPointId] = []
        for i in main_service_hop_indexes:
            connection_path_hops.extend([hops[i][0].endpoint_id, hops[i][2].endpoint_id])
        connection = Connection(**{
            'connection_id': {'connection_uuid': {'uuid': connection_uuid}},
            'service_id': grpc_message_to_json(service.service_id),
            'path_hops_endpoint_ids': connection_path_hops,
            'sub_service_ids': sub_service_ids,
        })
        LOGGER.debug('[route_service] connection = {:s}'.format(grpc_message_to_json_string(connection)))
        connections.append(connection)
            
        LOGGER.debug('[route_service] connections[{:d}] = {:s}'.format(
            len(connections), grpc_message_list_to_json_string(connections)))

        connectivity = {
            'service': service,
            'connections': connections,
            'requirements': requirements,
        }

        LOGGER.debug('[route_service] connectivity = {:s}'.format(str(dump_connectivity(connectivity))))

        self.__connectivity[(a_endpoint_key, z_endpoint_key)] = connectivity
        self.__connectivity[(z_endpoint_key, a_endpoint_key)] = connectivity

        context_service_id = (context_uuid, service_uuid)
        self.__service_endpoints[context_service_id] = (a_endpoint_key, z_endpoint_key)

        return connectivity
+0 −43
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import networkx
from typing import List, Optional
from common.proto.context_pb2 import Device, DeviceId, EndPoint, EndPointId, LinkId
from .Enums import EdgeTypeEnum

def get_device_key(device_id : DeviceId) -> str:
    return device_id.device_uuid.uuid # pylint: disable=no-member

def get_endpoint_key(endpoint_id : EndPointId, device_uuid : Optional[str] = None) -> str:
    if device_uuid is None: device_uuid = endpoint_id.device_id.device_uuid.uuid # pylint: disable=no-member
    endpoint_uuid = endpoint_id.endpoint_uuid.uuid # pylint: disable=no-member
    return '{:s}/{:s}'.format(device_uuid, endpoint_uuid)

def get_link_key(link_id : LinkId) -> str:
    return link_id.link_uuid.uuid # pylint: disable=no-member

def get_device(topology : networkx.Graph, device_key : str) -> Device:
    return topology.nodes[device_key]['device']

def get_endpoint(topology : networkx.Graph, endpoint_key : str) -> EndPoint:
    return topology.nodes[endpoint_key]['endpoint']

def get_edge_type(topology : networkx.Graph, endpoint_keys : List[str]) -> EdgeTypeEnum:
    # pylint: disable=no-member,protected-access
    endpoint_types = {get_endpoint(topology, endpoint_key).endpoint_type for endpoint_key in endpoint_keys}
    edge_type = None if len(endpoint_types) > 1 else \
        EdgeTypeEnum._value2member_map_.get(endpoint_types.pop())
    if edge_type is None: edge_type = EdgeTypeEnum.OTHER
    return edge_type
+0 −57

File deleted.

Preview size limit exceeded, changes collapsed.

+0 −14
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Loading