Commit 0f5789bd authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/pathcomp-component' into 'develop'

PathComp component improvements

See merge request teraflow-h2020/controller!174
parents 2a093fe6 f54e758c
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -120,7 +120,7 @@ struct map_nodes_t {
    gint numMapNodes;
};

#define MAX_NUM_VERTICES				10 // 100 # LGR: reduced from 100 to 10 to divide by 10 the memory used
#define MAX_NUM_VERTICES				20 // 100 # LGR: reduced from 100 to 20 to divide by 5 the memory used
#define MAX_NUM_EDGES					10 // 100 # LGR: reduced from 100 to 10 to divide by 10 the memory used
// Structures for the graph composition
struct targetNodes_t {
+1 −1
Original line number Diff line number Diff line
@@ -31,7 +31,7 @@ def main():
    global LOGGER # pylint: disable=global-statement

    log_level = get_log_level()
    logging.basicConfig(level=log_level)
    logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
    LOGGER = logging.getLogger(__name__)

    wait_for_environment_variables([
+45 −6
Original line number Diff line number Diff line
@@ -14,9 +14,11 @@

import operator
from typing import Dict, List, Optional, Set, Tuple
from common.proto.context_pb2 import Link
from common.proto.context_pb2 import Connection, Link, Service
from common.proto.pathcomp_pb2 import Algorithm_KDisjointPath, Algorithm_KShortestPath, PathCompReply, PathCompRequest
from common.tools.grpc.Tools import grpc_message_to_json_string
from pathcomp.frontend.service.algorithms.tools.ComputeSubServices import convert_explicit_path_hops_to_connections
from pathcomp.frontend.service.algorithms.tools.EroPathToHops import eropath_to_hops
from ._Algorithm import _Algorithm
from .KShortestPathAlgorithm import KShortestPathAlgorithm

@@ -184,10 +186,47 @@ class KDisjointPathAlgorithm(_Algorithm):

    def get_reply(self) -> PathCompReply:
        reply = PathCompReply()
        grpc_services : Dict[Tuple[str, str], Service] = {}
        grpc_connections : Dict[Tuple[int, str], Connection] = {}
        for service_key,paths in self.json_reply.items():
            grpc_service = self.add_service_to_reply(reply, service_key[0], service_key[1])
            for path_endpoints in paths:
                if path_endpoints is None: continue
                grpc_connection = self.add_connection_to_reply(reply, grpc_service)
                self.add_path_to_connection(grpc_connection, path_endpoints)
            context_uuid, service_uuid = service_key

            grpc_services[service_key] = self.add_service_to_reply(reply, context_uuid, service_uuid)

            for num_path,service_path_ero in enumerate(paths):
                if service_path_ero is None: continue
                path_hops = eropath_to_hops(service_path_ero, self.endpoint_to_link_dict)
                connections = convert_explicit_path_hops_to_connections(path_hops, self.device_dict, service_uuid)

                for connection in connections:
                    connection_uuid,device_layer,path_hops,_ = connection

                    service_key = (context_uuid, connection_uuid)
                    grpc_service = grpc_services.get(service_key)
                    if grpc_service is not None: continue
                    grpc_service = self.add_service_to_reply(
                        reply, context_uuid, connection_uuid, device_layer=device_layer, path_hops=path_hops)
                    grpc_services[service_key] = grpc_service

                for connection in connections:
                    connection_uuid,device_layer,path_hops,dependencies = connection

                    service_key = (context_uuid, connection_uuid)
                    grpc_service = grpc_services.get(service_key)
                    if grpc_service is None: raise Exception('Service({:s}) not found'.format(str(service_key)))

                    connection_uuid = '{:s}:{:d}'.format(connection_uuid, num_path)
                    grpc_connection = grpc_connections.get(connection_uuid)
                    if grpc_connection is not None: continue
                    grpc_connection = self.add_connection_to_reply(reply, connection_uuid, grpc_service, path_hops)
                    grpc_connections[connection_uuid] = grpc_connection

                    for service_uuid in dependencies:
                        sub_service_key = (context_uuid, service_uuid)
                        grpc_sub_service = grpc_services.get(sub_service_key)
                        if grpc_sub_service is None:
                            raise Exception('Service({:s}) not found'.format(str(sub_service_key)))
                        grpc_sub_service_id = grpc_connection.sub_service_ids.add()
                        grpc_sub_service_id.CopyFrom(grpc_sub_service.service_id)

        return reply
+96 −28
Original line number Diff line number Diff line
@@ -14,11 +14,14 @@

import json, logging, requests, uuid
from typing import Dict, List, Optional, Tuple
from common.proto.context_pb2 import Connection, Device, DeviceList, EndPointId, Link, LinkList, Service
from common.proto.context_pb2 import Connection, Device, DeviceList, EndPointId, Link, LinkList, Service, ServiceStatusEnum, ServiceTypeEnum
from common.proto.pathcomp_pb2 import PathCompReply, PathCompRequest
from common.tools.grpc.Tools import grpc_message_to_json_string
from pathcomp.frontend.Config import BACKEND_URL
from pathcomp.frontend.service.algorithms.tools.ConstantsMappings import DEVICE_LAYER_TO_SERVICE_TYPE, DeviceLayerEnum
from .tools.EroPathToHops import eropath_to_hops
from .tools.ComposeRequest import compose_device, compose_link, compose_service
from .tools.ComputeSubServices import convert_explicit_path_hops_to_connections

class _Algorithm:
    def __init__(self, algorithm_id : str, sync_paths : bool, class_name=__name__) -> None:
@@ -85,17 +88,17 @@ class _Algorithm:
    def execute(self, dump_request_filename : Optional[str] = None, dump_reply_filename : Optional[str] = None) -> None:
        request = {'serviceList': self.service_list, 'deviceList': self.device_list, 'linkList': self.link_list}

        self.logger.debug('[execute] request={:s}'.format(str(request)))
        self.logger.info('[execute] request={:s}'.format(str(request)))
        if dump_request_filename is not None:
            with open(dump_request_filename, 'w', encoding='UTF-8') as f:
                f.write(json.dumps(request, sort_keys=True, indent=4))

        self.logger.debug('[execute] BACKEND_URL: {:s}'.format(str(BACKEND_URL)))
        self.logger.info('[execute] BACKEND_URL: {:s}'.format(str(BACKEND_URL)))
        reply = requests.post(BACKEND_URL, json=request)
        self.status_code = reply.status_code
        self.raw_reply = reply.content.decode('UTF-8')

        self.logger.debug('[execute] status_code={:s} reply={:s}'.format(str(reply.status_code), str(self.raw_reply)))
        self.logger.info('[execute] status_code={:s} reply={:s}'.format(str(reply.status_code), str(self.raw_reply)))
        if dump_reply_filename is not None:
            with open(dump_reply_filename, 'w', encoding='UTF-8') as f:
                f.write('status_code={:s} reply={:s}'.format(str(self.status_code), str(self.raw_reply)))
@@ -106,41 +109,77 @@ class _Algorithm:
        
        self.json_reply = reply.json()

    def add_path_to_connection(self, connection : Connection, path_endpoints : List[Dict]) -> None:
        for endpoint in path_endpoints:
            device_uuid = endpoint['device_id']
            endpoint_uuid = endpoint['endpoint_uuid']
            endpoint_id = connection.path_hops_endpoint_ids.add()
            endpoint_id.CopyFrom(self.endpoint_dict[device_uuid][endpoint_uuid][1])

    def add_connection_to_reply(self, reply : PathCompReply, service : Service) -> Connection:
    def add_connection_to_reply(
        self, reply : PathCompReply, connection_uuid : str, service : Service, path_hops : List[Dict]
    ) -> Connection:
        connection = reply.connections.add()
        connection.connection_id.connection_uuid.uuid = str(uuid.uuid4())

        connection.connection_id.connection_uuid.uuid = connection_uuid
        connection.service_id.CopyFrom(service.service_id)
        return connection

    def add_service_to_reply(self, reply : PathCompReply, context_uuid : str, service_uuid : str) -> Service:
        service_key = (context_uuid, service_uuid)
        tuple_service = self.service_dict.get(service_key)
        if tuple_service is None: raise Exception('ServiceKey({:s}) not found'.format(str(service_key)))
        _, grpc_service = tuple_service
        for path_hop in path_hops:
            device_uuid = path_hop['device']

            ingress_endpoint_uuid = path_hop['ingress_ep']
            endpoint_id = connection.path_hops_endpoint_ids.add()
            endpoint_id.CopyFrom(self.endpoint_dict[device_uuid][ingress_endpoint_uuid][1])

            egress_endpoint_uuid = path_hop['egress_ep']
            endpoint_id = connection.path_hops_endpoint_ids.add()
            endpoint_id.CopyFrom(self.endpoint_dict[device_uuid][egress_endpoint_uuid][1])

        return connection

    def add_service_to_reply(
        self, reply : PathCompReply, context_uuid : str, service_uuid : str,
        device_layer : Optional[DeviceLayerEnum] = None, path_hops : List[Dict] = []
    ) -> Service:
        # TODO: implement support for multi-point services
        # Control deactivated to enable disjoint paths with multiple redundant endpoints on each side
        #service_endpoint_ids = grpc_service.service_endpoint_ids
        #service_endpoint_ids = service.service_endpoint_ids
        #if len(service_endpoint_ids) != 2: raise NotImplementedError('Service must have 2 endpoints')

        service_key = (context_uuid, service_uuid)
        tuple_service = self.service_dict.get(service_key)
        if tuple_service is not None:
            service = reply.services.add()
            service.CopyFrom(tuple_service[1])
        else:
            service = reply.services.add()
        service.CopyFrom(grpc_service)
            service.service_id.context_id.context_uuid.uuid = context_uuid
            service.service_id.service_uuid.uuid = service_uuid

        return grpc_service
            if device_layer is not None:
                service_type = DEVICE_LAYER_TO_SERVICE_TYPE.get(device_layer.value)
                if service_type is None:
                    MSG = 'Unable to map DeviceLayer({:s}) to ServiceType'
                    raise Exception(MSG.format(str(device_layer)))
                service.service_type = service_type

            service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED

            if path_hops is not None and len(path_hops) > 0:
                ingress_endpoint_id = service.service_endpoint_ids.add()
                ingress_endpoint_id.device_id.device_uuid.uuid = path_hops[0]['device']
                ingress_endpoint_id.endpoint_uuid.uuid = path_hops[0]['ingress_ep']

                egress_endpoint_id = service.service_endpoint_ids.add()
                egress_endpoint_id.device_id.device_uuid.uuid = path_hops[-1]['device']
                egress_endpoint_id.endpoint_uuid.uuid = path_hops[-1]['egress_ep']

        return service

    def get_reply(self) -> PathCompReply:
        response_list = self.json_reply.get('response-list', [])
        reply = PathCompReply()
        grpc_services : Dict[Tuple[str, str], Service] = {}
        grpc_connections : Dict[str, Connection] = {}
        for response in response_list:
            service_id = response['serviceId']
            grpc_service = self.add_service_to_reply(reply, service_id['contextId'], service_id['service_uuid'])
            context_uuid = service_id['contextId']
            service_uuid = service_id['service_uuid']
            service_key = (context_uuid, service_uuid)
            grpc_services[service_key] = self.add_service_to_reply(reply, context_uuid, service_uuid)

            no_path_issue = response.get('noPath', {}).get('issue')
            if no_path_issue is not None:
@@ -148,9 +187,38 @@ class _Algorithm:
                # no_path_issue == 1 => no path due to a constraint
                continue

            for service_path in response['path']:
                grpc_connection = self.add_connection_to_reply(reply, grpc_service)
                self.add_path_to_connection(grpc_connection, service_path['devices'])
            for service_path_ero in response['path']:
                path_hops = eropath_to_hops(service_path_ero['devices'], self.endpoint_to_link_dict)
                connections = convert_explicit_path_hops_to_connections(path_hops, self.device_dict, service_uuid)

                for connection in connections:
                    connection_uuid,device_layer,path_hops,_ = connection
                    service_key = (context_uuid, connection_uuid)
                    grpc_service = grpc_services.get(service_key)
                    if grpc_service is None:
                        grpc_service = self.add_service_to_reply(
                            reply, context_uuid, connection_uuid, device_layer=device_layer, path_hops=path_hops)
                        grpc_services[service_key] = grpc_service

                for connection in connections:
                    connection_uuid,device_layer,path_hops,dependencies = connection

                    service_key = (context_uuid, connection_uuid)
                    grpc_service = grpc_services.get(service_key)
                    if grpc_service is None: raise Exception('Service({:s}) not found'.format(str(service_key)))
                        
                    grpc_connection = grpc_connections.get(connection_uuid)
                    if grpc_connection is not None: continue
                    grpc_connection = self.add_connection_to_reply(reply, connection_uuid, grpc_service, path_hops)
                    grpc_connections[connection_uuid] = grpc_connection

                    for service_uuid in dependencies:
                        sub_service_key = (context_uuid, service_uuid)
                        grpc_sub_service = grpc_services.get(sub_service_key)
                        if grpc_sub_service is None:
                            raise Exception('Service({:s}) not found'.format(str(sub_service_key)))
                        grpc_sub_service_id = grpc_connection.sub_service_ids.add()
                        grpc_sub_service_id.CopyFrom(grpc_sub_service.service_id)

                # ... "path-capacity": {"total-size": {"value": 200, "unit": 0}},
                # ... "path-latency": {"fixed-latency-characteristic": "10.000000"},
+4 −1
Original line number Diff line number Diff line
@@ -17,7 +17,10 @@ from typing import Dict
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from common.proto.context_pb2 import Constraint, Device, EndPointId, Link, Service, ServiceId, TopologyId
from common.tools.grpc.Tools import grpc_message_to_json_string
from .Constants import CapacityUnit, LinkForwardingDirection, LinkPortDirection, TerminationDirection, TerminationState
from .ConstantsMappings import (
    CapacityUnit, LinkForwardingDirection, LinkPortDirection, TerminationDirection, TerminationState)

LOGGER = logging.getLogger(__name__)

LOGGER = logging.getLogger(__name__)

Loading