Commit f54e758c authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/pathcomp-component-frontend' of...

Merge branch 'feat/pathcomp-component-frontend' of https://gitlab.com/teraflow-h2020/controller into feat/pathcomp-component
parents f37bddcb 8b914ba6
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -31,7 +31,7 @@ def main():
    global LOGGER # pylint: disable=global-statement

    log_level = get_log_level()
    logging.basicConfig(level=log_level)
    logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
    LOGGER = logging.getLogger(__name__)

    wait_for_environment_variables([
+45 −6
Original line number Diff line number Diff line
@@ -14,9 +14,11 @@

import operator
from typing import Dict, List, Optional, Set, Tuple
from common.proto.context_pb2 import Link
from common.proto.context_pb2 import Connection, Link, Service
from common.proto.pathcomp_pb2 import Algorithm_KDisjointPath, Algorithm_KShortestPath, PathCompReply, PathCompRequest
from common.tools.grpc.Tools import grpc_message_to_json_string
from pathcomp.frontend.service.algorithms.tools.ComputeSubServices import convert_explicit_path_hops_to_connections
from pathcomp.frontend.service.algorithms.tools.EroPathToHops import eropath_to_hops
from ._Algorithm import _Algorithm
from .KShortestPathAlgorithm import KShortestPathAlgorithm

@@ -184,10 +186,47 @@ class KDisjointPathAlgorithm(_Algorithm):

    def get_reply(self) -> PathCompReply:
        reply = PathCompReply()
        grpc_services : Dict[Tuple[str, str], Service] = {}
        grpc_connections : Dict[Tuple[int, str], Connection] = {}
        for service_key,paths in self.json_reply.items():
            grpc_service = self.add_service_to_reply(reply, service_key[0], service_key[1])
            for path_endpoints in paths:
                if path_endpoints is None: continue
                grpc_connection = self.add_connection_to_reply(reply, grpc_service)
                self.add_path_to_connection(grpc_connection, path_endpoints)
            context_uuid, service_uuid = service_key

            grpc_services[service_key] = self.add_service_to_reply(reply, context_uuid, service_uuid)

            for num_path,service_path_ero in enumerate(paths):
                if service_path_ero is None: continue
                path_hops = eropath_to_hops(service_path_ero, self.endpoint_to_link_dict)
                connections = convert_explicit_path_hops_to_connections(path_hops, self.device_dict, service_uuid)

                for connection in connections:
                    connection_uuid,device_layer,path_hops,_ = connection

                    service_key = (context_uuid, connection_uuid)
                    grpc_service = grpc_services.get(service_key)
                    if grpc_service is not None: continue
                    grpc_service = self.add_service_to_reply(
                        reply, context_uuid, connection_uuid, device_layer=device_layer, path_hops=path_hops)
                    grpc_services[service_key] = grpc_service

                for connection in connections:
                    connection_uuid,device_layer,path_hops,dependencies = connection

                    service_key = (context_uuid, connection_uuid)
                    grpc_service = grpc_services.get(service_key)
                    if grpc_service is None: raise Exception('Service({:s}) not found'.format(str(service_key)))

                    connection_uuid = '{:s}:{:d}'.format(connection_uuid, num_path)
                    grpc_connection = grpc_connections.get(connection_uuid)
                    if grpc_connection is not None: continue
                    grpc_connection = self.add_connection_to_reply(reply, connection_uuid, grpc_service, path_hops)
                    grpc_connections[connection_uuid] = grpc_connection

                    for service_uuid in dependencies:
                        sub_service_key = (context_uuid, service_uuid)
                        grpc_sub_service = grpc_services.get(sub_service_key)
                        if grpc_sub_service is None:
                            raise Exception('Service({:s}) not found'.format(str(sub_service_key)))
                        grpc_sub_service_id = grpc_connection.sub_service_ids.add()
                        grpc_sub_service_id.CopyFrom(grpc_sub_service.service_id)

        return reply
+96 −28
Original line number Diff line number Diff line
@@ -14,11 +14,14 @@

import json, logging, requests, uuid
from typing import Dict, List, Optional, Tuple
from common.proto.context_pb2 import Connection, Device, DeviceList, EndPointId, Link, LinkList, Service
from common.proto.context_pb2 import Connection, Device, DeviceList, EndPointId, Link, LinkList, Service, ServiceStatusEnum, ServiceTypeEnum
from common.proto.pathcomp_pb2 import PathCompReply, PathCompRequest
from common.tools.grpc.Tools import grpc_message_to_json_string
from pathcomp.frontend.Config import BACKEND_URL
from pathcomp.frontend.service.algorithms.tools.ConstantsMappings import DEVICE_LAYER_TO_SERVICE_TYPE, DeviceLayerEnum
from .tools.EroPathToHops import eropath_to_hops
from .tools.ComposeRequest import compose_device, compose_link, compose_service
from .tools.ComputeSubServices import convert_explicit_path_hops_to_connections

class _Algorithm:
    def __init__(self, algorithm_id : str, sync_paths : bool, class_name=__name__) -> None:
@@ -85,17 +88,17 @@ class _Algorithm:
    def execute(self, dump_request_filename : Optional[str] = None, dump_reply_filename : Optional[str] = None) -> None:
        request = {'serviceList': self.service_list, 'deviceList': self.device_list, 'linkList': self.link_list}

        self.logger.debug('[execute] request={:s}'.format(str(request)))
        self.logger.info('[execute] request={:s}'.format(str(request)))
        if dump_request_filename is not None:
            with open(dump_request_filename, 'w', encoding='UTF-8') as f:
                f.write(json.dumps(request, sort_keys=True, indent=4))

        self.logger.debug('[execute] BACKEND_URL: {:s}'.format(str(BACKEND_URL)))
        self.logger.info('[execute] BACKEND_URL: {:s}'.format(str(BACKEND_URL)))
        reply = requests.post(BACKEND_URL, json=request)
        self.status_code = reply.status_code
        self.raw_reply = reply.content.decode('UTF-8')

        self.logger.debug('[execute] status_code={:s} reply={:s}'.format(str(reply.status_code), str(self.raw_reply)))
        self.logger.info('[execute] status_code={:s} reply={:s}'.format(str(reply.status_code), str(self.raw_reply)))
        if dump_reply_filename is not None:
            with open(dump_reply_filename, 'w', encoding='UTF-8') as f:
                f.write('status_code={:s} reply={:s}'.format(str(self.status_code), str(self.raw_reply)))
@@ -106,41 +109,77 @@ class _Algorithm:
        
        self.json_reply = reply.json()

    def add_path_to_connection(self, connection : Connection, path_endpoints : List[Dict]) -> None:
        for endpoint in path_endpoints:
            device_uuid = endpoint['device_id']
            endpoint_uuid = endpoint['endpoint_uuid']
            endpoint_id = connection.path_hops_endpoint_ids.add()
            endpoint_id.CopyFrom(self.endpoint_dict[device_uuid][endpoint_uuid][1])

    def add_connection_to_reply(self, reply : PathCompReply, service : Service) -> Connection:
    def add_connection_to_reply(
        self, reply : PathCompReply, connection_uuid : str, service : Service, path_hops : List[Dict]
    ) -> Connection:
        connection = reply.connections.add()
        connection.connection_id.connection_uuid.uuid = str(uuid.uuid4())

        connection.connection_id.connection_uuid.uuid = connection_uuid
        connection.service_id.CopyFrom(service.service_id)
        return connection

    def add_service_to_reply(self, reply : PathCompReply, context_uuid : str, service_uuid : str) -> Service:
        service_key = (context_uuid, service_uuid)
        tuple_service = self.service_dict.get(service_key)
        if tuple_service is None: raise Exception('ServiceKey({:s}) not found'.format(str(service_key)))
        _, grpc_service = tuple_service
        for path_hop in path_hops:
            device_uuid = path_hop['device']

            ingress_endpoint_uuid = path_hop['ingress_ep']
            endpoint_id = connection.path_hops_endpoint_ids.add()
            endpoint_id.CopyFrom(self.endpoint_dict[device_uuid][ingress_endpoint_uuid][1])

            egress_endpoint_uuid = path_hop['egress_ep']
            endpoint_id = connection.path_hops_endpoint_ids.add()
            endpoint_id.CopyFrom(self.endpoint_dict[device_uuid][egress_endpoint_uuid][1])

        return connection

    def add_service_to_reply(
        self, reply : PathCompReply, context_uuid : str, service_uuid : str,
        device_layer : Optional[DeviceLayerEnum] = None, path_hops : List[Dict] = []
    ) -> Service:
        # TODO: implement support for multi-point services
        # Control deactivated to enable disjoint paths with multiple redundant endpoints on each side
        #service_endpoint_ids = grpc_service.service_endpoint_ids
        #service_endpoint_ids = service.service_endpoint_ids
        #if len(service_endpoint_ids) != 2: raise NotImplementedError('Service must have 2 endpoints')

        service_key = (context_uuid, service_uuid)
        tuple_service = self.service_dict.get(service_key)
        if tuple_service is not None:
            service = reply.services.add()
            service.CopyFrom(tuple_service[1])
        else:
            service = reply.services.add()
        service.CopyFrom(grpc_service)
            service.service_id.context_id.context_uuid.uuid = context_uuid
            service.service_id.service_uuid.uuid = service_uuid

        return grpc_service
            if device_layer is not None:
                service_type = DEVICE_LAYER_TO_SERVICE_TYPE.get(device_layer.value)
                if service_type is None:
                    MSG = 'Unable to map DeviceLayer({:s}) to ServiceType'
                    raise Exception(MSG.format(str(device_layer)))
                service.service_type = service_type

            service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED

            if path_hops is not None and len(path_hops) > 0:
                ingress_endpoint_id = service.service_endpoint_ids.add()
                ingress_endpoint_id.device_id.device_uuid.uuid = path_hops[0]['device']
                ingress_endpoint_id.endpoint_uuid.uuid = path_hops[0]['ingress_ep']

                egress_endpoint_id = service.service_endpoint_ids.add()
                egress_endpoint_id.device_id.device_uuid.uuid = path_hops[-1]['device']
                egress_endpoint_id.endpoint_uuid.uuid = path_hops[-1]['egress_ep']

        return service

    def get_reply(self) -> PathCompReply:
        response_list = self.json_reply.get('response-list', [])
        reply = PathCompReply()
        grpc_services : Dict[Tuple[str, str], Service] = {}
        grpc_connections : Dict[str, Connection] = {}
        for response in response_list:
            service_id = response['serviceId']
            grpc_service = self.add_service_to_reply(reply, service_id['contextId'], service_id['service_uuid'])
            context_uuid = service_id['contextId']
            service_uuid = service_id['service_uuid']
            service_key = (context_uuid, service_uuid)
            grpc_services[service_key] = self.add_service_to_reply(reply, context_uuid, service_uuid)

            no_path_issue = response.get('noPath', {}).get('issue')
            if no_path_issue is not None:
@@ -148,9 +187,38 @@ class _Algorithm:
                # no_path_issue == 1 => no path due to a constraint
                continue

            for service_path in response['path']:
                grpc_connection = self.add_connection_to_reply(reply, grpc_service)
                self.add_path_to_connection(grpc_connection, service_path['devices'])
            for service_path_ero in response['path']:
                path_hops = eropath_to_hops(service_path_ero['devices'], self.endpoint_to_link_dict)
                connections = convert_explicit_path_hops_to_connections(path_hops, self.device_dict, service_uuid)

                for connection in connections:
                    connection_uuid,device_layer,path_hops,_ = connection
                    service_key = (context_uuid, connection_uuid)
                    grpc_service = grpc_services.get(service_key)
                    if grpc_service is None:
                        grpc_service = self.add_service_to_reply(
                            reply, context_uuid, connection_uuid, device_layer=device_layer, path_hops=path_hops)
                        grpc_services[service_key] = grpc_service

                for connection in connections:
                    connection_uuid,device_layer,path_hops,dependencies = connection

                    service_key = (context_uuid, connection_uuid)
                    grpc_service = grpc_services.get(service_key)
                    if grpc_service is None: raise Exception('Service({:s}) not found'.format(str(service_key)))
                        
                    grpc_connection = grpc_connections.get(connection_uuid)
                    if grpc_connection is not None: continue
                    grpc_connection = self.add_connection_to_reply(reply, connection_uuid, grpc_service, path_hops)
                    grpc_connections[connection_uuid] = grpc_connection

                    for service_uuid in dependencies:
                        sub_service_key = (context_uuid, service_uuid)
                        grpc_sub_service = grpc_services.get(sub_service_key)
                        if grpc_sub_service is None:
                            raise Exception('Service({:s}) not found'.format(str(sub_service_key)))
                        grpc_sub_service_id = grpc_connection.sub_service_ids.add()
                        grpc_sub_service_id.CopyFrom(grpc_sub_service.service_id)

                # ... "path-capacity": {"total-size": {"value": 200, "unit": 0}},
                # ... "path-latency": {"fixed-latency-characteristic": "10.000000"},
+4 −1
Original line number Diff line number Diff line
@@ -17,7 +17,10 @@ from typing import Dict
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from common.proto.context_pb2 import Constraint, Device, EndPointId, Link, Service, ServiceId, TopologyId
from common.tools.grpc.Tools import grpc_message_to_json_string
from .Constants import CapacityUnit, LinkForwardingDirection, LinkPortDirection, TerminationDirection, TerminationState
from .ConstantsMappings import (
    CapacityUnit, LinkForwardingDirection, LinkPortDirection, TerminationDirection, TerminationState)

LOGGER = logging.getLogger(__name__)

LOGGER = logging.getLogger(__name__)

+96 −0
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Convert the path defined as explicit hops with ingress and egress endpoints per device into a set of connections and
# compute the dependencies among them.
#
# Example:
# o-- int DC1 eth1 -- 10/1 CS1 1/2 -- 1/2 R2 2/1 -- a7.. OLS 60.. -- 2/1 R3 1/1 -- 1/1 CS2 10/1 -- eth1 DC2 int --o
#         APP              PKT            PKT            CTRL            PKT           PKT              APP
#
# path_hops = [
#     {'device': 'DC1-GW', 'ingress_ep': 'int', 'egress_ep': 'eth1'},
#     {'device': 'CS1-GW1', 'ingress_ep': '10/1', 'egress_ep': '1/2'},
#     {'device': 'TN-R2', 'ingress_ep': '1/2', 'egress_ep': '2/1'},
#     {'device': 'TN-OLS', 'ingress_ep': 'a7a80b23a703', 'egress_ep': '60519106029e'},
#     {'device': 'TN-R3', 'ingress_ep': '2/1', 'egress_ep': '1/1'},
#     {'device': 'CS2-GW1', 'ingress_ep': '1/1', 'egress_ep': '10/1'},
#     {'device': 'DC2-GW', 'ingress_ep': 'eth1', 'egress_ep': 'int'}
# ]
#
# connections=[
#     (UUID('7548edf7-ee7c-4adf-ac0f-c7a0c0dfba8e'), <DeviceLayerEnum.OPTICAL_CONTROLLER: 1>, [
#             {'device': 'TN-OLS', 'ingress_ep': '833760219d0f', 'egress_ep': 'cf176771a4b9'}
#         ], []),
#     (UUID('c2e57966-5d82-4705-a5fe-44cf6487219e'), <DeviceLayerEnum.PACKET_DEVICE: 30>, [
#             {'device': 'CS1-GW1', 'ingress_ep': '10/1', 'egress_ep': '1/2'},
#             {'device': 'TN-R2', 'ingress_ep': '1/2', 'egress_ep': '2/1'},
#             {'device': 'TN-R3', 'ingress_ep': '2/1', 'egress_ep': '1/1'},
#             {'device': 'CS2-GW1', 'ingress_ep': '1/1', 'egress_ep': '10/1'}
#         ], [UUID('7548edf7-ee7c-4adf-ac0f-c7a0c0dfba8e')]),
#     (UUID('1e205c82-f6ea-4977-9e97-dc27ef1f4802'), <DeviceLayerEnum.APPLICATION_DEVICE: 40>, [
#             {'device': 'DC1-GW', 'ingress_ep': 'int', 'egress_ep': 'eth1'},
#             {'device': 'DC2-GW', 'ingress_ep': 'eth1', 'egress_ep': 'int'}
#         ], [UUID('c2e57966-5d82-4705-a5fe-44cf6487219e')])
# ]

import queue, uuid
from typing import Dict, List, Tuple
from common.proto.context_pb2 import Device
from .ConstantsMappings import DEVICE_TYPE_TO_LAYER, DeviceLayerEnum

def convert_explicit_path_hops_to_connections(
    path_hops : List[Dict], device_dict : Dict[str, Tuple[Dict, Device]], main_connection_uuid : str
) -> List[Tuple[str, DeviceLayerEnum, List[str], List[str]]]:

    connection_stack = queue.LifoQueue()
    connections : List[Tuple[str, DeviceLayerEnum, List[str], List[str]]] = list()
    old_device_layer = None
    last_device_uuid = None
    for path_hop in path_hops:
        device_uuid = path_hop['device']
        if last_device_uuid == device_uuid: continue
        device_tuple = device_dict.get(device_uuid)
        if device_tuple is None: raise Exception('Device({:s}) not found'.format(str(device_uuid)))
        json_device,_ = device_tuple
        device_type = json_device['device_type']
        device_layer = DEVICE_TYPE_TO_LAYER.get(device_type)
        if device_layer is None: raise Exception('Undefined Layer for DeviceType({:s})'.format(str(device_type)))

        if old_device_layer is None:
            # path ingress
            connection_stack.put((main_connection_uuid, device_layer, [path_hop], []))
        elif old_device_layer > device_layer:
            # underlying connection begins
            connection_uuid = str(uuid.uuid4())
            connection_stack.put((connection_uuid, device_layer, [path_hop], []))
        elif old_device_layer == device_layer:
            # same connection continues
            connection_stack.queue[-1][2].append(path_hop)
        elif old_device_layer < device_layer:
            # underlying connection ended
            connection = connection_stack.get()
            connections.append(connection)
            connection_stack.queue[-1][3].append(connection[0])
            connection_stack.queue[-1][2].append(path_hop)
        else:
            raise Exception('Uncontrolled condition')

        old_device_layer = device_layer
        last_device_uuid = device_uuid

    # path egress
    connections.append(connection_stack.get())
    assert connection_stack.empty()
    return connections
Loading