Commit 49fef75e authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

PathComp component:

Frontend:
- added show logs script
- improved config settings to get address and port of the backend
- improved logging of _Algorithm
- implemented KDisjointPath logic

Backend:
- added show logs script
parent 06e90afc
Loading
Loading
Loading
Loading
+27 −0
Original line number Diff line number Diff line
#!/bin/bash
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

########################################################################################################################
# Define your deployment settings here
########################################################################################################################

# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}

########################################################################################################################
# Automated steps start here
########################################################################################################################

kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/pathcompservice -c backend
+27 −0
Original line number Diff line number Diff line
#!/bin/bash
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

########################################################################################################################
# Define your deployment settings here
########################################################################################################################

# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}

########################################################################################################################
# Automated steps start here
########################################################################################################################

kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/pathcompservice -c frontend
+13 −3
Original line number Diff line number Diff line
@@ -16,13 +16,23 @@ import os

DEFAULT_PATHCOMP_BACKEND_SCHEME  = 'http'
DEFAULT_PATHCOMP_BACKEND_HOST    = '127.0.0.1'
DEFAULT_PATHCOMP_BACKEND_PORT    = 8081
DEFAULT_PATHCOMP_BACKEND_PORT    = '8081'
DEFAULT_PATHCOMP_BACKEND_BASEURL = '/pathComp/api/v1/compRoute'

PATHCOMP_BACKEND_SCHEME  = str(os.environ.get('PATHCOMP_BACKEND_SCHEME',  DEFAULT_PATHCOMP_BACKEND_SCHEME ))
PATHCOMP_BACKEND_HOST    = str(os.environ.get('PATHCOMP_BACKEND_HOST',    DEFAULT_PATHCOMP_BACKEND_HOST   ))
PATHCOMP_BACKEND_PORT    = int(os.environ.get('PATHCOMP_BACKEND_PORT',    DEFAULT_PATHCOMP_BACKEND_PORT   ))
PATHCOMP_BACKEND_BASEURL = str(os.environ.get('PATHCOMP_BACKEND_BASEURL', DEFAULT_PATHCOMP_BACKEND_BASEURL))

# Find IP:port of backend container as follows:
# - first check env vars PATHCOMP_BACKEND_HOST & PATHCOMP_BACKEND_PORT
# - if not set, check env vars PATHCOMPSERVICE_SERVICE_HOST & PATHCOMPSERVICE_SERVICE_PORT_HTTP
# - if not set, use DEFAULT_PATHCOMP_BACKEND_HOST & DEFAULT_PATHCOMP_BACKEND_PORT
backend_host = DEFAULT_PATHCOMP_BACKEND_HOST
backend_host = os.environ.get('PATHCOMPSERVICE_SERVICE_HOST', backend_host)
PATHCOMP_BACKEND_HOST = str(os.environ.get('PATHCOMP_BACKEND_HOST', backend_host))

backend_port = DEFAULT_PATHCOMP_BACKEND_PORT
backend_port = os.environ.get('PATHCOMPSERVICE_SERVICE_PORT_HTTP', backend_port)
PATHCOMP_BACKEND_PORT = int(os.environ.get('PATHCOMP_BACKEND_PORT', backend_port))

BACKEND_URL = '{:s}://{:s}:{:d}{:s}'.format(
    PATHCOMP_BACKEND_SCHEME, PATHCOMP_BACKEND_HOST, PATHCOMP_BACKEND_PORT, PATHCOMP_BACKEND_BASEURL)
+101 −11
Original line number Diff line number Diff line
@@ -12,17 +12,73 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import operator
from typing import Dict, List, Optional, Set, Tuple
from common.proto.context_pb2 import Link
from common.proto.pathcomp_pb2 import Algorithm_KDisjointPath, Algorithm_KShortestPath, PathCompReply
from common.proto.pathcomp_pb2 import Algorithm_KDisjointPath, Algorithm_KShortestPath, PathCompReply, PathCompRequest
from common.tools.grpc.Tools import grpc_message_to_json_string
from ._Algorithm import _Algorithm
from .KShortestPathAlgorithm import KShortestPathAlgorithm

Service_Id          = Tuple[str, str]   # (context_uuid, service_uuid)
Service_Constraints = Dict[str, str]    # {constraint_type => constraint_value}
Endpoint_Id         = Tuple[str, str]   # (device_uuid, endpoint_uuid)
Endpoint_Details    = Tuple[str, int]   # (site_id, priority)
Service_Endpoints   = Dict[Endpoint_Id, Endpoint_Details]
Service_Details     = Tuple[int, Service_Constraints, Service_Endpoints]
Services_Details    = Dict[Service_Id, Service_Details]

CUSTOM_CONSTRAINTS = {'bandwidth[gbps]', 'latency[ms]'}

DUMP_EXECUTION_STEPS = False

class KDisjointPathAlgorithm(_Algorithm):
    def __init__(self, algorithm : Algorithm_KDisjointPath, class_name=__name__) -> None:
        super().__init__('KDP', False, class_name=class_name)
        self.num_disjoint = algorithm.num_disjoint
        self.services_details : Services_Details = dict()

    def add_service_requests(self, request: PathCompRequest) -> None:
        super().add_service_requests(request)
        for service in request.services:
            service_id = service.service_id
            context_uuid = service_id.context_id.context_uuid.uuid
            service_uuid = service_id.service_uuid.uuid
            service_key = (context_uuid, service_uuid)

            constraints = dict()
            endpoints = dict()
            service_details = (int(service.service_type), constraints, endpoints)
            self.services_details.setdefault(service_key, service_details)

            for constraint in service.service_constraints:
                if constraint.WhichOneof('constraint') == 'custom':
                    constraint_type = constraint.custom.constraint_type
                    if constraint_type not in CUSTOM_CONSTRAINTS: continue
                    constraint_value = constraint.custom.constraint_value
                    constraints[constraint_type] = constraint_value

                if constraint.WhichOneof('constraint') == 'endpoint_location':
                    endpoint_id = constraint.endpoint_location.endpoint_id
                    device_uuid = endpoint_id.device_id.device_uuid.uuid
                    endpoint_uuid = endpoint_id.endpoint_uuid.uuid
                    location_kind = constraint.endpoint_location.location.WhichOneof('location')
                    if location_kind != 'region':
                        MSG = 'Unsupported LocationType({:s}) in Constraint({:s})'
                        raise Exception(MSG.format(location_kind, grpc_message_to_json_string(constraint)))
                    site_id = constraint.endpoint_location.location.region
                    endpoints.setdefault((device_uuid, endpoint_uuid), dict())['site_id'] = site_id

                if constraint.WhichOneof('constraint') == 'endpoint_priority':
                    endpoint_id = constraint.endpoint_priority.endpoint_id
                    device_uuid = endpoint_id.device_id.device_uuid.uuid
                    endpoint_uuid = endpoint_id.endpoint_uuid.uuid
                    priority = constraint.endpoint_priority.priority
                    endpoints.setdefault((device_uuid, endpoint_uuid), dict())['priority'] = priority

            # TODO: ensure these constraints are provided in the request
            if 'bandwidth[gbps]' not in constraints: constraints['bandwidth[gbps]'] = '20.0'
            if 'latency[ms]' not in constraints: constraints['latency[ms]'] = '20.0'

    def get_link_from_endpoint(self, endpoint : Dict) -> Tuple[Dict, Link]:
        device_uuid = endpoint['device_id']
@@ -50,8 +106,8 @@ class KDisjointPathAlgorithm(_Algorithm):
    def remove_traversed_links(self, link_list : List[Dict], path_endpoints : List[Dict]):
        _, path_link_ids = self.path_to_links(path_endpoints)
        new_link_list = list(filter(lambda l: l['link_Id'] not in path_link_ids, link_list))
        self.logger.info('cur_link_list = {:s}'.format(str(link_list)))
        self.logger.info('new_link_list = {:s}'.format(str(new_link_list)))
        #self.logger.info('cur_link_list = {:s}'.format(str(link_list)))
        #self.logger.info('new_link_list = {:s}'.format(str(new_link_list)))
        return new_link_list

    def execute(self, dump_request_filename: Optional[str] = None, dump_reply_filename: Optional[str] = None) -> None:
@@ -63,18 +119,52 @@ class KDisjointPathAlgorithm(_Algorithm):
        algorithm.link_list = self.link_list
        algorithm.link_dict = self.link_dict
        algorithm.endpoint_to_link_dict = self.endpoint_to_link_dict
        algorithm.service_list = self.service_list
        algorithm.service_dict = self.service_dict

        Path = List[Dict]
        Path_NoPath = Optional[Path] # None = no path, list = path
        self.json_reply : Dict[Tuple[str, str], List[Path_NoPath]] = dict()

        for num_path in range(self.num_disjoint):
            #dump_request_filename = 'ksp-{:d}-request.json'.format(num_path)
            #dump_reply_filename   = 'ksp-{:d}-reply.txt'.format(num_path)
            #algorithm.execute(dump_request_filename, dump_reply_filename)
            algorithm.execute()
            algorithm.service_list = list()
            algorithm.service_dict = dict()

            #self.logger.warning('services_details = {:s}'.format(str(self.services_details)))

            _request = PathCompRequest()
            for service_key, service_details in self.services_details.items():
                service_type, constraints, endpoints = service_details
                _service = _request.services.add()
                _service.service_id.context_id.context_uuid.uuid = service_key[0]
                _service.service_id.service_uuid.uuid = service_key[1]
                _service.service_type = service_type
                for constraint_type, constraint_value in constraints.items():
                    constraint = _service.service_constraints.add()
                    constraint.custom.constraint_type = constraint_type
                    constraint.custom.constraint_value = constraint_value

                site_to_endpoints : Dict[str, List[Tuple[Endpoint_Id, int]]] = {}
                for endpoint_key,endpoint_details in endpoints.items():
                    site_id = endpoint_details.get('site_id')
                    if site_id is None: continue
                    priority = endpoint_details.get('priority', 999)
                    site_to_endpoints.setdefault(site_id, list()).append((endpoint_key, priority))

                for site_id,site_endpoints in site_to_endpoints.items():
                    pending_endpoints = sorted(site_endpoints, key=operator.itemgetter(1))
                    if len(pending_endpoints) == 0: continue
                    endpoint_key, _ = pending_endpoints[0]
                    device_uuid, endpoint_uuid = endpoint_key
                    endpoint_id = _service.service_endpoint_ids.add()
                    endpoint_id.device_id.device_uuid.uuid = device_uuid
                    endpoint_id.endpoint_uuid.uuid = endpoint_uuid
                    endpoints.pop(endpoint_key)

            algorithm.add_service_requests(_request)

            dump_request_filename = 'ksp-{:d}-request.json'.format(num_path) if DUMP_EXECUTION_STEPS else None
            dump_reply_filename   = 'ksp-{:d}-reply.txt'.format(num_path)    if DUMP_EXECUTION_STEPS else None
            algorithm.execute(dump_request_filename, dump_reply_filename)

            response_list = algorithm.json_reply.get('response-list', [])
            for response in response_list:
                service_id = response['serviceId']
@@ -90,7 +180,7 @@ class KDisjointPathAlgorithm(_Algorithm):
                json_reply_service.append(path_endpoints)
                algorithm.link_list = self.remove_traversed_links(algorithm.link_list, path_endpoints)

        self.logger.info('self.json_reply = {:s}'.format(str(self.json_reply)))
        self.logger.debug('self.json_reply = {:s}'.format(str(self.json_reply)))

    def get_reply(self) -> PathCompReply:
        reply = PathCompReply()
+6 −4
Original line number Diff line number Diff line
@@ -85,20 +85,21 @@ class _Algorithm:
    def execute(self, dump_request_filename : Optional[str] = None, dump_reply_filename : Optional[str] = None) -> None:
        request = {'serviceList': self.service_list, 'deviceList': self.device_list, 'linkList': self.link_list}

        self.logger.debug('[execute] request={:s}'.format(str(request)))
        if dump_request_filename is not None:
            with open(dump_request_filename, 'w', encoding='UTF-8') as f:
                f.write(json.dumps(request, sort_keys=True, indent=4))

        self.logger.debug('[execute] BACKEND_URL: {:s}'.format(str(BACKEND_URL)))
        reply = requests.post(BACKEND_URL, json=request)
        self.status_code = reply.status_code
        self.raw_reply = reply.content.decode('UTF-8')

        self.logger.debug('[execute] status_code={:s} reply={:s}'.format(str(reply.status_code), str(self.raw_reply)))
        if dump_reply_filename is not None:
            with open(dump_reply_filename, 'w', encoding='UTF-8') as f:
                f.write('status_code={:s} reply={:s}'.format(str(self.status_code), str(self.raw_reply)))

        self.logger.info('status_code={:s} reply={:s}'.format(str(reply.status_code), str(self.raw_reply)))

        if reply.status_code not in {requests.codes.ok}:
            raise Exception('Backend error({:s}) for request({:s})'.format(
                str(self.raw_reply), json.dumps(request, sort_keys=True)))
@@ -125,8 +126,9 @@ class _Algorithm:
        _, grpc_service = tuple_service

        # TODO: implement support for multi-point services
        service_endpoint_ids = grpc_service.service_endpoint_ids
        if len(service_endpoint_ids) != 2: raise NotImplementedError('Service must have 2 endpoints')
        # Control deactivated to enable disjoint paths with multiple redundant endpoints on each side
        #service_endpoint_ids = grpc_service.service_endpoint_ids
        #if len(service_endpoint_ids) != 2: raise NotImplementedError('Service must have 2 endpoints')

        service = reply.services.add()
        service.CopyFrom(grpc_service)