Commit 8b914ba6 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

PathComp-Frontend component:

- Added timestamp to logs
- Improved algorithm execution logs
- Added basic support for multi-layer path computation
- Improved KDisjointPath Algorithm with basic multi-layer support
- Minor improvements in backend request composition
- Removed unneeded code for unitary tests
- Added OLS-flavoured unitary test scenario
parent 06e90afc
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -31,7 +31,7 @@ def main():
    global LOGGER # pylint: disable=global-statement

    log_level = get_log_level()
    logging.basicConfig(level=log_level)
    logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
    LOGGER = logging.getLogger(__name__)

    wait_for_environment_variables([
+145 −16
Original line number Diff line number Diff line
@@ -12,17 +12,75 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import operator
from typing import Dict, List, Optional, Set, Tuple
from common.proto.context_pb2 import Link
from common.proto.pathcomp_pb2 import Algorithm_KDisjointPath, Algorithm_KShortestPath, PathCompReply
from common.proto.context_pb2 import Connection, Link, Service
from common.proto.pathcomp_pb2 import Algorithm_KDisjointPath, Algorithm_KShortestPath, PathCompReply, PathCompRequest
from common.tools.grpc.Tools import grpc_message_to_json_string
from pathcomp.frontend.service.algorithms.tools.ComputeSubServices import convert_explicit_path_hops_to_connections
from pathcomp.frontend.service.algorithms.tools.EroPathToHops import eropath_to_hops
from ._Algorithm import _Algorithm
from .KShortestPathAlgorithm import KShortestPathAlgorithm

Service_Id          = Tuple[str, str]   # (context_uuid, service_uuid)
Service_Constraints = Dict[str, str]    # {constraint_type => constraint_value}
Endpoint_Id         = Tuple[str, str]   # (device_uuid, endpoint_uuid)
Endpoint_Details    = Tuple[str, int]   # (site_id, priority)
Service_Endpoints   = Dict[Endpoint_Id, Endpoint_Details]
Service_Details     = Tuple[int, Service_Constraints, Service_Endpoints]
Services_Details    = Dict[Service_Id, Service_Details]

CUSTOM_CONSTRAINTS = {'bandwidth[gbps]', 'latency[ms]', 'jitter[us]'}

DUMP_EXECUTION_STEPS = False

class KDisjointPathAlgorithm(_Algorithm):
    def __init__(self, algorithm : Algorithm_KDisjointPath, class_name=__name__) -> None:
        super().__init__('KDP', False, class_name=class_name)
        self.num_disjoint = algorithm.num_disjoint
        self.services_details : Services_Details = dict()

    def add_service_requests(self, request: PathCompRequest) -> None:
        super().add_service_requests(request)
        for service in request.services:
            service_id = service.service_id
            context_uuid = service_id.context_id.context_uuid.uuid
            service_uuid = service_id.service_uuid.uuid
            service_key = (context_uuid, service_uuid)

            constraints = dict()
            endpoints = dict()
            service_details = (int(service.service_type), constraints, endpoints)
            self.services_details.setdefault(service_key, service_details)

            for constraint in service.service_constraints:
                if constraint.WhichOneof('constraint') == 'custom':
                    constraint_type = constraint.custom.constraint_type
                    if constraint_type not in CUSTOM_CONSTRAINTS: continue
                    constraint_value = constraint.custom.constraint_value
                    constraints[constraint_type] = constraint_value

                if constraint.WhichOneof('constraint') == 'endpoint_location':
                    endpoint_id = constraint.endpoint_location.endpoint_id
                    device_uuid = endpoint_id.device_id.device_uuid.uuid
                    endpoint_uuid = endpoint_id.endpoint_uuid.uuid
                    location_kind = constraint.endpoint_location.location.WhichOneof('location')
                    if location_kind != 'region':
                        MSG = 'Unsupported LocationType({:s}) in Constraint({:s})'
                        raise Exception(MSG.format(location_kind, grpc_message_to_json_string(constraint)))
                    site_id = constraint.endpoint_location.location.region
                    endpoints.setdefault((device_uuid, endpoint_uuid), dict())['site_id'] = site_id

                if constraint.WhichOneof('constraint') == 'endpoint_priority':
                    endpoint_id = constraint.endpoint_priority.endpoint_id
                    device_uuid = endpoint_id.device_id.device_uuid.uuid
                    endpoint_uuid = endpoint_id.endpoint_uuid.uuid
                    priority = constraint.endpoint_priority.priority
                    endpoints.setdefault((device_uuid, endpoint_uuid), dict())['priority'] = priority

            # TODO: ensure these constraints are provided in the request
            if 'bandwidth[gbps]' not in constraints: constraints['bandwidth[gbps]'] = '20.0'
            if 'latency[ms]' not in constraints: constraints['latency[ms]'] = '20.0'

    def get_link_from_endpoint(self, endpoint : Dict) -> Tuple[Dict, Link]:
        device_uuid = endpoint['device_id']
@@ -50,8 +108,8 @@ class KDisjointPathAlgorithm(_Algorithm):
    def remove_traversed_links(self, link_list : List[Dict], path_endpoints : List[Dict]):
        _, path_link_ids = self.path_to_links(path_endpoints)
        new_link_list = list(filter(lambda l: l['link_Id'] not in path_link_ids, link_list))
        self.logger.info('cur_link_list = {:s}'.format(str(link_list)))
        self.logger.info('new_link_list = {:s}'.format(str(new_link_list)))
        #self.logger.info('cur_link_list = {:s}'.format(str(link_list)))
        #self.logger.info('new_link_list = {:s}'.format(str(new_link_list)))
        return new_link_list

    def execute(self, dump_request_filename: Optional[str] = None, dump_reply_filename: Optional[str] = None) -> None:
@@ -63,18 +121,52 @@ class KDisjointPathAlgorithm(_Algorithm):
        algorithm.link_list = self.link_list
        algorithm.link_dict = self.link_dict
        algorithm.endpoint_to_link_dict = self.endpoint_to_link_dict
        algorithm.service_list = self.service_list
        algorithm.service_dict = self.service_dict

        Path = List[Dict]
        Path_NoPath = Optional[Path] # None = no path, list = path
        self.json_reply : Dict[Tuple[str, str], List[Path_NoPath]] = dict()

        for num_path in range(self.num_disjoint):
            #dump_request_filename = 'ksp-{:d}-request.json'.format(num_path)
            #dump_reply_filename   = 'ksp-{:d}-reply.txt'.format(num_path)
            #algorithm.execute(dump_request_filename, dump_reply_filename)
            algorithm.execute()
            algorithm.service_list = list()
            algorithm.service_dict = dict()

            #self.logger.warning('services_details = {:s}'.format(str(self.services_details)))

            _request = PathCompRequest()
            for service_key, service_details in self.services_details.items():
                service_type, constraints, endpoints = service_details
                _service = _request.services.add()
                _service.service_id.context_id.context_uuid.uuid = service_key[0]
                _service.service_id.service_uuid.uuid = service_key[1]
                _service.service_type = service_type
                for constraint_type, constraint_value in constraints.items():
                    constraint = _service.service_constraints.add()
                    constraint.custom.constraint_type = constraint_type
                    constraint.custom.constraint_value = constraint_value

                site_to_endpoints : Dict[str, List[Tuple[Endpoint_Id, int]]] = {}
                for endpoint_key,endpoint_details in endpoints.items():
                    site_id = endpoint_details.get('site_id')
                    if site_id is None: continue
                    priority = endpoint_details.get('priority', 999)
                    site_to_endpoints.setdefault(site_id, list()).append((endpoint_key, priority))

                for site_id,site_endpoints in site_to_endpoints.items():
                    pending_endpoints = sorted(site_endpoints, key=operator.itemgetter(1))
                    if len(pending_endpoints) == 0: continue
                    endpoint_key, _ = pending_endpoints[0]
                    device_uuid, endpoint_uuid = endpoint_key
                    endpoint_id = _service.service_endpoint_ids.add()
                    endpoint_id.device_id.device_uuid.uuid = device_uuid
                    endpoint_id.endpoint_uuid.uuid = endpoint_uuid
                    endpoints.pop(endpoint_key)

            algorithm.add_service_requests(_request)

            dump_request_filename = 'ksp-{:d}-request.json'.format(num_path) if DUMP_EXECUTION_STEPS else None
            dump_reply_filename   = 'ksp-{:d}-reply.txt'.format(num_path)    if DUMP_EXECUTION_STEPS else None
            algorithm.execute(dump_request_filename, dump_reply_filename)

            response_list = algorithm.json_reply.get('response-list', [])
            for response in response_list:
                service_id = response['serviceId']
@@ -94,10 +186,47 @@ class KDisjointPathAlgorithm(_Algorithm):

    def get_reply(self) -> PathCompReply:
        reply = PathCompReply()
        grpc_services : Dict[Tuple[str, str], Service] = {}
        grpc_connections : Dict[Tuple[int, str], Connection] = {}
        for service_key,paths in self.json_reply.items():
            grpc_service = self.add_service_to_reply(reply, service_key[0], service_key[1])
            for path_endpoints in paths:
                if path_endpoints is None: continue
                grpc_connection = self.add_connection_to_reply(reply, grpc_service)
                self.add_path_to_connection(grpc_connection, path_endpoints)
            context_uuid, service_uuid = service_key

            grpc_services[service_key] = self.add_service_to_reply(reply, context_uuid, service_uuid)

            for num_path,service_path_ero in enumerate(paths):
                if service_path_ero is None: continue
                path_hops = eropath_to_hops(service_path_ero, self.endpoint_to_link_dict)
                connections = convert_explicit_path_hops_to_connections(path_hops, self.device_dict, service_uuid)

                for connection in connections:
                    connection_uuid,device_layer,path_hops,_ = connection

                    service_key = (context_uuid, connection_uuid)
                    grpc_service = grpc_services.get(service_key)
                    if grpc_service is not None: continue
                    grpc_service = self.add_service_to_reply(
                        reply, context_uuid, connection_uuid, device_layer=device_layer, path_hops=path_hops)
                    grpc_services[service_key] = grpc_service

                for connection in connections:
                    connection_uuid,device_layer,path_hops,dependencies = connection

                    service_key = (context_uuid, connection_uuid)
                    grpc_service = grpc_services.get(service_key)
                    if grpc_service is None: raise Exception('Service({:s}) not found'.format(str(service_key)))

                    connection_uuid = '{:s}:{:d}'.format(connection_uuid, num_path)
                    grpc_connection = grpc_connections.get(connection_uuid)
                    if grpc_connection is not None: continue
                    grpc_connection = self.add_connection_to_reply(reply, connection_uuid, grpc_service, path_hops)
                    grpc_connections[connection_uuid] = grpc_connection

                    for service_uuid in dependencies:
                        sub_service_key = (context_uuid, service_uuid)
                        grpc_sub_service = grpc_services.get(sub_service_key)
                        if grpc_sub_service is None:
                            raise Exception('Service({:s}) not found'.format(str(sub_service_key)))
                        grpc_sub_service_id = grpc_connection.sub_service_ids.add()
                        grpc_sub_service_id.CopyFrom(grpc_sub_service.service_id)

        return reply
+98 −28
Original line number Diff line number Diff line
@@ -14,11 +14,14 @@

import json, logging, requests, uuid
from typing import Dict, List, Optional, Tuple
from common.proto.context_pb2 import Connection, Device, DeviceList, EndPointId, Link, LinkList, Service
from common.proto.context_pb2 import Connection, Device, DeviceList, EndPointId, Link, LinkList, Service, ServiceStatusEnum, ServiceTypeEnum
from common.proto.pathcomp_pb2 import PathCompReply, PathCompRequest
from common.tools.grpc.Tools import grpc_message_to_json_string
from pathcomp.frontend.Config import BACKEND_URL
from pathcomp.frontend.service.algorithms.tools.ConstantsMappings import DEVICE_LAYER_TO_SERVICE_TYPE, DeviceLayerEnum
from .tools.EroPathToHops import eropath_to_hops
from .tools.ComposeRequest import compose_device, compose_link, compose_service
from .tools.ComputeSubServices import convert_explicit_path_hops_to_connections

class _Algorithm:
    def __init__(self, algorithm_id : str, sync_paths : bool, class_name=__name__) -> None:
@@ -85,60 +88,98 @@ class _Algorithm:
    def execute(self, dump_request_filename : Optional[str] = None, dump_reply_filename : Optional[str] = None) -> None:
        request = {'serviceList': self.service_list, 'deviceList': self.device_list, 'linkList': self.link_list}

        self.logger.info('[execute] request={:s}'.format(str(request)))
        if dump_request_filename is not None:
            with open(dump_request_filename, 'w', encoding='UTF-8') as f:
                f.write(json.dumps(request, sort_keys=True, indent=4))

        self.logger.info('[execute] BACKEND_URL: {:s}'.format(str(BACKEND_URL)))
        reply = requests.post(BACKEND_URL, json=request)
        self.status_code = reply.status_code
        self.raw_reply = reply.content.decode('UTF-8')

        self.logger.info('[execute] status_code={:s} reply={:s}'.format(str(reply.status_code), str(self.raw_reply)))
        if dump_reply_filename is not None:
            with open(dump_reply_filename, 'w', encoding='UTF-8') as f:
                f.write('status_code={:s} reply={:s}'.format(str(self.status_code), str(self.raw_reply)))

        self.logger.info('status_code={:s} reply={:s}'.format(str(reply.status_code), str(self.raw_reply)))

        if reply.status_code not in {requests.codes.ok}:
            raise Exception('Backend error({:s}) for request({:s})'.format(
                str(self.raw_reply), json.dumps(request, sort_keys=True)))
        
        self.json_reply = reply.json()

    def add_path_to_connection(self, connection : Connection, path_endpoints : List[Dict]) -> None:
        for endpoint in path_endpoints:
            device_uuid = endpoint['device_id']
            endpoint_uuid = endpoint['endpoint_uuid']
            endpoint_id = connection.path_hops_endpoint_ids.add()
            endpoint_id.CopyFrom(self.endpoint_dict[device_uuid][endpoint_uuid][1])

    def add_connection_to_reply(self, reply : PathCompReply, service : Service) -> Connection:
    def add_connection_to_reply(
        self, reply : PathCompReply, connection_uuid : str, service : Service, path_hops : List[Dict]
    ) -> Connection:
        connection = reply.connections.add()
        connection.connection_id.connection_uuid.uuid = str(uuid.uuid4())

        connection.connection_id.connection_uuid.uuid = connection_uuid
        connection.service_id.CopyFrom(service.service_id)

        for path_hop in path_hops:
            device_uuid = path_hop['device']

            ingress_endpoint_uuid = path_hop['ingress_ep']
            endpoint_id = connection.path_hops_endpoint_ids.add()
            endpoint_id.CopyFrom(self.endpoint_dict[device_uuid][ingress_endpoint_uuid][1])

            egress_endpoint_uuid = path_hop['egress_ep']
            endpoint_id = connection.path_hops_endpoint_ids.add()
            endpoint_id.CopyFrom(self.endpoint_dict[device_uuid][egress_endpoint_uuid][1])

        return connection

    def add_service_to_reply(self, reply : PathCompReply, context_uuid : str, service_uuid : str) -> Service:
    def add_service_to_reply(
        self, reply : PathCompReply, context_uuid : str, service_uuid : str,
        device_layer : Optional[DeviceLayerEnum] = None, path_hops : List[Dict] = []
    ) -> Service:
        # TODO: implement support for multi-point services
        # Control deactivated to enable disjoint paths with multiple redundant endpoints on each side
        #service_endpoint_ids = service.service_endpoint_ids
        #if len(service_endpoint_ids) != 2: raise NotImplementedError('Service must have 2 endpoints')

        service_key = (context_uuid, service_uuid)
        tuple_service = self.service_dict.get(service_key)
        if tuple_service is None: raise Exception('ServiceKey({:s}) not found'.format(str(service_key)))
        _, grpc_service = tuple_service
        if tuple_service is not None:
            service = reply.services.add()
            service.CopyFrom(tuple_service[1])
        else:
            service = reply.services.add()
            service.service_id.context_id.context_uuid.uuid = context_uuid
            service.service_id.service_uuid.uuid = service_uuid

        # TODO: implement support for multi-point services
        service_endpoint_ids = grpc_service.service_endpoint_ids
        if len(service_endpoint_ids) != 2: raise NotImplementedError('Service must have 2 endpoints')
            if device_layer is not None:
                service_type = DEVICE_LAYER_TO_SERVICE_TYPE.get(device_layer.value)
                if service_type is None:
                    MSG = 'Unable to map DeviceLayer({:s}) to ServiceType'
                    raise Exception(MSG.format(str(device_layer)))
                service.service_type = service_type

        service = reply.services.add()
        service.CopyFrom(grpc_service)
            service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED

            if path_hops is not None and len(path_hops) > 0:
                ingress_endpoint_id = service.service_endpoint_ids.add()
                ingress_endpoint_id.device_id.device_uuid.uuid = path_hops[0]['device']
                ingress_endpoint_id.endpoint_uuid.uuid = path_hops[0]['ingress_ep']

        return grpc_service
                egress_endpoint_id = service.service_endpoint_ids.add()
                egress_endpoint_id.device_id.device_uuid.uuid = path_hops[-1]['device']
                egress_endpoint_id.endpoint_uuid.uuid = path_hops[-1]['egress_ep']

        return service

    def get_reply(self) -> PathCompReply:
        response_list = self.json_reply.get('response-list', [])
        reply = PathCompReply()
        grpc_services : Dict[Tuple[str, str], Service] = {}
        grpc_connections : Dict[str, Connection] = {}
        for response in response_list:
            service_id = response['serviceId']
            grpc_service = self.add_service_to_reply(reply, service_id['contextId'], service_id['service_uuid'])
            context_uuid = service_id['contextId']
            service_uuid = service_id['service_uuid']
            service_key = (context_uuid, service_uuid)
            grpc_services[service_key] = self.add_service_to_reply(reply, context_uuid, service_uuid)

            no_path_issue = response.get('noPath', {}).get('issue')
            if no_path_issue is not None:
@@ -146,9 +187,38 @@ class _Algorithm:
                # no_path_issue == 1 => no path due to a constraint
                continue

            for service_path in response['path']:
                grpc_connection = self.add_connection_to_reply(reply, grpc_service)
                self.add_path_to_connection(grpc_connection, service_path['devices'])
            for service_path_ero in response['path']:
                path_hops = eropath_to_hops(service_path_ero['devices'], self.endpoint_to_link_dict)
                connections = convert_explicit_path_hops_to_connections(path_hops, self.device_dict, service_uuid)

                for connection in connections:
                    connection_uuid,device_layer,path_hops,_ = connection
                    service_key = (context_uuid, connection_uuid)
                    grpc_service = grpc_services.get(service_key)
                    if grpc_service is None:
                        grpc_service = self.add_service_to_reply(
                            reply, context_uuid, connection_uuid, device_layer=device_layer, path_hops=path_hops)
                        grpc_services[service_key] = grpc_service

                for connection in connections:
                    connection_uuid,device_layer,path_hops,dependencies = connection

                    service_key = (context_uuid, connection_uuid)
                    grpc_service = grpc_services.get(service_key)
                    if grpc_service is None: raise Exception('Service({:s}) not found'.format(str(service_key)))
                        
                    grpc_connection = grpc_connections.get(connection_uuid)
                    if grpc_connection is not None: continue
                    grpc_connection = self.add_connection_to_reply(reply, connection_uuid, grpc_service, path_hops)
                    grpc_connections[connection_uuid] = grpc_connection

                    for service_uuid in dependencies:
                        sub_service_key = (context_uuid, service_uuid)
                        grpc_sub_service = grpc_services.get(sub_service_key)
                        if grpc_sub_service is None:
                            raise Exception('Service({:s}) not found'.format(str(sub_service_key)))
                        grpc_sub_service_id = grpc_connection.sub_service_ids.add()
                        grpc_sub_service_id.CopyFrom(grpc_sub_service.service_id)

                # ... "path-capacity": {"total-size": {"value": 200, "unit": 0}},
                # ... "path-latency": {"fixed-latency-characteristic": "10.000000"},
+17 −5
Original line number Diff line number Diff line
@@ -12,18 +12,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Dict
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from common.proto.context_pb2 import Constraint, Device, EndPointId, Link, Service, ServiceId, TopologyId
from common.tools.grpc.Tools import grpc_message_to_json_string
from .Constants import CapacityUnit, LinkForwardingDirection, LinkPortDirection, TerminationDirection, TerminationState
from .ConstantsMappings import (
    CapacityUnit, LinkForwardingDirection, LinkPortDirection, TerminationDirection, TerminationState)

LOGGER = logging.getLogger(__name__)

def compose_topology_id(topology_id : TopologyId) -> Dict:
    context_uuid = topology_id.context_id.context_uuid.uuid
    topology_uuid = topology_id.topology_uuid.uuid

    if len(context_uuid) == 0: context_uuid = DEFAULT_CONTEXT_UUID
    if len(topology_uuid) == 0: topology_uuid = DEFAULT_TOPOLOGY_UUID

    return {'contextId': context_uuid, 'topology_uuid': topology_uuid}

def compose_service_id(service_id : ServiceId) -> Dict:
    context_uuid = service_id.context_id.context_uuid.uuid

    if len(context_uuid) == 0: context_uuid = DEFAULT_CONTEXT_UUID

    service_uuid = service_id.service_uuid.uuid
    return {'contextId': context_uuid, 'service_uuid': service_uuid}

@@ -54,9 +66,9 @@ def compose_latency_characteristics(fixed_latency_characteristic : str) -> Dict:

def compose_constraint(constraint : Constraint) -> Dict:
    if constraint.WhichOneof('constraint') != 'custom':
        MSG = 'Constraint({:s}) not supported'
        str_constraint = grpc_message_to_json_string(constraint)
        raise NotImplementedError(MSG.format(str_constraint))
        LOGGER.warning('Ignoring unsupported Constraint({:s})'.format(str_constraint))
        return None
    constraint_type = constraint.custom.constraint_type
    constraint_value = constraint.custom.constraint_value
    return {'constraint_type': constraint_type, 'constraint_value': constraint_value}
@@ -110,10 +122,10 @@ def compose_service(grpc_service : Service) -> Dict:
        for service_endpoint_id in grpc_service.service_endpoint_ids
    ]

    constraints = [
    constraints = list(filter(lambda constraint: constraint is not None, [
        compose_constraint(service_constraint)
        for service_constraint in grpc_service.service_constraints
    ]
    ]))

    return {
        'serviceId': service_id,
+96 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading