Commit 84da908d authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

PathComp - FrontEnd component:

- Moved topology retrieval to TopologyTools::get_pathcomp_topology_details
- Added forecasting capacity to get_pathcomp_topology_details
- Updated backedn request composition to use link attributes
- Added parameter to explicitly enable/disable forecaster
parent f0f25bfa
Loading
Loading
Loading
Loading
+11 −0
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@
# limitations under the License.

import os
from common.Settings import get_setting

DEFAULT_PATHCOMP_BACKEND_SCHEME  = 'http'
DEFAULT_PATHCOMP_BACKEND_HOST    = '127.0.0.1'
@@ -37,3 +38,13 @@ PATHCOMP_BACKEND_PORT = int(os.environ.get('PATHCOMP_BACKEND_PORT', backend_port

BACKEND_URL = '{:s}://{:s}:{:d}{:s}'.format(
    PATHCOMP_BACKEND_SCHEME, PATHCOMP_BACKEND_HOST, PATHCOMP_BACKEND_PORT, PATHCOMP_BACKEND_BASEURL)


SETTING_NAME_ENABLE_FORECASTER = 'ENABLE_FORECASTER'
TRUE_VALUES = {'Y', 'YES', 'TRUE', 'T', 'E', 'ENABLE', 'ENABLED'}

def is_forecaster_enabled() -> bool:
    is_enabled = get_setting(SETTING_NAME_ENABLE_FORECASTER, default=None)
    if is_enabled is None: return False
    str_is_enabled = str(is_enabled).upper()
    return str_is_enabled in TRUE_VALUES
+14 −13
Original line number Diff line number Diff line
@@ -13,18 +13,20 @@
# limitations under the License.

import grpc, logging, threading
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME
#from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.context_pb2 import ContextId, Empty, TopologyId
#from common.proto.context_pb2 import ContextId, Empty, TopologyId
from common.proto.pathcomp_pb2 import PathCompReply, PathCompRequest
from common.proto.pathcomp_pb2_grpc import PathCompServiceServicer
from common.tools.context_queries.Device import get_devices_in_topology
from common.tools.context_queries.Link import get_links_in_topology
from common.tools.context_queries.InterDomain import is_inter_domain
#from common.tools.context_queries.Device import get_devices_in_topology
#from common.tools.context_queries.Link import get_links_in_topology
#from common.tools.context_queries.InterDomain import is_inter_domain
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from pathcomp.frontend.Config import is_forecaster_enabled
#from common.tools.object_factory.Context import json_context_id
#from common.tools.object_factory.Topology import json_topology_id
#from context.client.ContextClient import ContextClient
from pathcomp.frontend.service.TopologyTools import get_pathcomp_topology_details
from pathcomp.frontend.service.algorithms.Factory import get_algorithm

LOGGER = logging.getLogger(__name__)
@@ -43,9 +45,7 @@ class PathCompServiceServicerImpl(PathCompServiceServicer):
    def Compute(self, request : PathCompRequest, context : grpc.ServicerContext) -> PathCompReply:
        LOGGER.debug('[Compute] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))

        context_client = ContextClient()

        context_id = json_context_id(DEFAULT_CONTEXT_NAME)
        #context_client = ContextClient()
        # TODO: improve definition of topologies; for interdomain the current topology design might be not convenient
        #if (len(request.services) == 1) and is_inter_domain(context_client, request.services[0].service_endpoint_ids):
        #    #devices = get_devices_in_topology(context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME)
@@ -56,10 +56,11 @@ class PathCompServiceServicerImpl(PathCompServiceServicer):
        #    # TODO: add contexts, topologies, and membership of devices/links in topologies
        #    #devices = context_client.ListDevices(Empty())
        #    #links = context_client.ListLinks(Empty())
        #    context_id = json_context_id(DEFAULT_CONTEXT_NAME)
        #    topology_id = json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id)
        topology_id = json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id)

        topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id))
        allow_forecasting = is_forecaster_enabled()
        topology_details = get_pathcomp_topology_details(request, allow_forecasting=allow_forecasting)

        algorithm = get_algorithm(request)
        algorithm.add_devices(topology_details.devices)
+96 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import math
from typing import Dict, Optional
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, ServiceNameEnum
from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name
from common.method_wrappers.ServiceExceptions import InvalidArgumentException
from common.proto.context_pb2 import Constraint_Schedule, Service, TopologyDetails
from common.proto.forecaster_pb2 import ForecastLinkCapacityReply, ForecastTopologyCapacityRequest
from common.proto.pathcomp_pb2 import PathCompRequest
from common.tools.context_queries.Topology import get_topology_details
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from forecaster.client.ForecasterClient import ForecasterClient

def get_service_schedule(service : Service) -> Optional[Constraint_Schedule]:
    for constraint in service.service_constraints:
        if constraint.WhichOneof('constraint') != 'schedule': continue
        return constraint.schedule
    return None

def get_pathcomp_topology_details(request : PathCompRequest, allow_forecasting : bool = False) -> TopologyDetails:
    context_client = ContextClient()
    topology_details = get_topology_details(
        context_client, DEFAULT_TOPOLOGY_NAME, context_uuid=DEFAULT_CONTEXT_NAME, rw_copy=True
    )

    if len(request.services) == 0:
        raise InvalidArgumentException('services', grpc_message_to_json_string(request), 'must not be empty')

    if len(request.services) > 1:
        # Forecaster does not support multiple services
        return topology_details

    if not allow_forecasting:
        # Forecaster explicitly disabled
        return topology_details

    env_vars = find_environment_variables([
        get_env_var_name(ServiceNameEnum.FORECASTER, ENVVAR_SUFIX_SERVICE_HOST     ),
        get_env_var_name(ServiceNameEnum.FORECASTER, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
    ])
    if len(env_vars) != 2:
        # Forecaster not available
        return topology_details

    service = request.services[0]
    service_schedule = get_service_schedule(service)
    if service_schedule is None:
        # Service provides no schedule constraint, so forecast cannot be computed
        return topology_details

    #start_timestamp = service_schedule.start_timestamp
    duration_days = service_schedule.duration_days
    if float(duration_days) > 0.0:
        # Service provides no scheduled duration, so forecast cannot be computed
        return topology_details

    forecaster_client = ForecasterClient()
    forecaster_client.connect()

    forecast_request = ForecastTopologyCapacityRequest(
        topology_id=topology_details.topology_id,
        forecast_window_seconds = duration_days * 24 * 60 * 60
    )

    forecast_reply = forecaster_client.ForecastTopologyCapacity(forecast_request)

    forecasted_link_capacities : Dict[str, ForecastLinkCapacityReply] = {
        link_capacity.link_id.link_uuid.uuid : link_capacity
        for link_capacity in forecast_reply.link_capacities
    }

    for link in topology_details.links:
        link_uuid = link.link_id.link_uuid.uuid
        forecasted_link_capacity = forecasted_link_capacities.get(link_uuid)
        if forecasted_link_capacity is None: continue
        link.attributes.used_capacity_gbps = forecasted_link_capacity.forecast_used_capacity_gbps
        if link.attributes.total_capacity_gbps < link.attributes.used_capacity_gbps:
            total_capacity_gbps = link.attributes.used_capacity_gbps
            total_capacity_gbps = math.ceil(total_capacity_gbps / 100) * 100 # round up in steps of 100
            link.attributes.total_capacity_gbps = total_capacity_gbps

    return topology_details
+18 −2
Original line number Diff line number Diff line
@@ -118,9 +118,25 @@ def compose_link(grpc_link : Link) -> Dict:
        for link_endpoint_id in grpc_link.link_endpoint_ids
    ]

    total_capacity_gbps, used_capacity_gbps = None, None
    if grpc_link.HasField('attributes'):
        attributes = grpc_link.attributes
        # In proto3, HasField() does not work for scalar fields, using ListFields() instead.
        attribute_names = set([field.name for field,_ in attributes.ListFields()])
        if 'total_capacity_gbps' in attribute_names:
            total_capacity_gbps = attributes.total_capacity_gbps
        if 'used_capacity_gbps' in attribute_names:
            used_capacity_gbps = attributes.used_capacity_gbps
        elif total_capacity_gbps is not None:
            used_capacity_gbps = total_capacity_gbps

    if total_capacity_gbps is None: total_capacity_gbps = 100
    if used_capacity_gbps  is None: used_capacity_gbps = 0
    available_capacity_gbps = total_capacity_gbps - used_capacity_gbps

    forwarding_direction = LinkForwardingDirection.UNIDIRECTIONAL.value
    total_potential_capacity = compose_capacity(200, CapacityUnit.MBPS.value)
    available_capacity = compose_capacity(200, CapacityUnit.MBPS.value)
    total_potential_capacity = compose_capacity(total_capacity_gbps, CapacityUnit.GBPS.value)
    available_capacity = compose_capacity(available_capacity_gbps, CapacityUnit.GBPS.value)
    cost_characteristics = compose_cost_characteristics('linkcost', '1', '0')
    latency_characteristics = compose_latency_characteristics('1')