Skip to content
ForecasterServiceServicerImpl.py 6.15 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Dict, List
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import grpc, logging
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.ServiceExceptions import NotFoundException
from common.proto.context_pb2 import LinkAttributes, LinkId
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.forecaster_pb2 import (
    ForecastLinkCapacityReply, ForecastLinkCapacityRequest,
    ForecastTopologyCapacityReply, ForecastTopologyCapacityRequest
)
from common.proto.forecaster_pb2_grpc import ForecasterServiceServicer
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.context_queries.Link import get_link
from common.tools.context_queries.Topology import get_topology_details
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.timestamp.Converters import timestamp_utcnow_to_float
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from forecaster.Config import FORECAST_TO_HISTORY_RATIO
from forecaster.service.Forecaster import compute_forecast
from forecaster.service.KpiManager import KpiManager
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)

METRICS_POOL = MetricsPool('Forecaster', 'RPC')

class ForecasterServiceServicerImpl(ForecasterServiceServicer):
    def __init__(self) -> None:
        LOGGER.debug('Creating Servicer...')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._kpi_manager = KpiManager()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('Servicer Created')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ForecastLinkCapacity(
        self, request : ForecastLinkCapacityRequest, context : grpc.ServicerContext
    ) -> ForecastLinkCapacityReply:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        forecast_window_seconds = request.forecast_window_seconds
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        # history_window_seconds indicates the size of the train-set based on the
        # requested size of the test-set and the configured history ratio
        history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds

        link_id = request.link_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        link_uuid = link_id.link_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        link = get_link(context_client, link_uuid)
        if link is None: raise NotFoundException('Link', link_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids([link_id])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        link_uuid__to__kpi_id = {
            _link_uuid : _kpi_id
            for (_link_uuid, _kpi_sample_type), _kpi_id in kpi_id_map.items()
            if _kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        kpi_id = link_uuid__to__kpi_id[link_uuid]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        end_timestamp   = timestamp_utcnow_to_float()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        start_timestamp = end_timestamp - history_window_seconds
        df_historical_data = self._kpi_manager.get_kpi_id_samples([kpi_id], start_timestamp, end_timestamp)
        forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id)

        reply = ForecastLinkCapacityReply()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        reply.link_id.link_uuid.uuid      = link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        reply.total_capacity_gbps         = link.attributes.total_capacity_gbps
        reply.current_used_capacity_gbps  = link.attributes.used_capacity_gbps
        reply.forecast_used_capacity_gbps = forecast_used_capacity_gbps
        return reply
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ForecastTopologyCapacity(
        self, request : ForecastTopologyCapacityRequest, context : grpc.ServicerContext
    ) -> ForecastTopologyCapacityReply:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        forecast_window_seconds = request.forecast_window_seconds

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # history_window_seconds indicates the size of the train-set based on the
        # requested size of the test-set and the configured history ratio
        history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid  = request.topology_id.context_id.context_uuid.uuid
        topology_uuid = request.topology_id.topology_uuid.uuid
        context_client = ContextClient()
        topology_details = get_topology_details(context_client, topology_uuid, context_uuid=context_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if topology_details is None:
            topology_uuid = '{:s}/{:s}'.format(context_uuid, topology_uuid)
            raise NotFoundException('Topology', topology_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        link_ids        : List[LinkId]              = list()
        link_capacities : Dict[str, LinkAttributes] = dict()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for link in topology_details.links:
            link_ids.append(link.link_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            link_capacities[link.link_id.link_uuid.uuid] = link.attributes
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids(link_ids)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        link_uuid__to__kpi_id = {
            _link_id : _kpi_id
            for (_link_id, _kpi_sample_type), _kpi_id in kpi_id_map.items()
            if _kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        kpi_ids = list(link_uuid__to__kpi_id.values())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        end_timestamp   = timestamp_utcnow_to_float()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        start_timestamp = end_timestamp - history_window_seconds
        df_historical_data = self._kpi_manager.get_kpi_id_samples(kpi_ids, start_timestamp, end_timestamp)

        reply = ForecastTopologyCapacityReply()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for link_uuid, kpi_id in link_uuid__to__kpi_id.items():
            link_attributes = link_capacities[link_uuid]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            link_capacity : ForecastLinkCapacityReply = reply.link_capacities.add() # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            link_capacity.link_id.link_uuid.uuid      = link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            link_capacity.total_capacity_gbps         = link_attributes.total_capacity_gbps
            link_capacity.current_used_capacity_gbps  = link_attributes.used_capacity_gbps
            link_capacity.forecast_used_capacity_gbps = forecast_used_capacity_gbps
        return reply