Skip to content
Snippets Groups Projects
ForecasterServiceServicerImpl.py 6.15 KiB
Newer Older
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, List
import grpc, logging
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.method_wrappers.ServiceExceptions import NotFoundException
from common.proto.context_pb2 import LinkAttributes, LinkId
from common.proto.forecaster_pb2 import (
    ForecastLinkCapacityReply, ForecastLinkCapacityRequest,
    ForecastTopologyCapacityReply, ForecastTopologyCapacityRequest
)
from common.proto.forecaster_pb2_grpc import ForecasterServiceServicer
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.context_queries.Link import get_link
from common.tools.context_queries.Topology import get_topology_details
from common.tools.timestamp.Converters import timestamp_utcnow_to_float
from context.client.ContextClient import ContextClient
from forecaster.Config import FORECAST_TO_HISTORY_RATIO
from forecaster.service.Forecaster import compute_forecast
from forecaster.service.KpiManager import KpiManager

LOGGER = logging.getLogger(__name__)

METRICS_POOL = MetricsPool('Forecaster', 'RPC')

class ForecasterServiceServicerImpl(ForecasterServiceServicer):
    def __init__(self) -> None:
        LOGGER.debug('Creating Servicer...')
        self._kpi_manager = KpiManager()
        LOGGER.debug('Servicer Created')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ForecastLinkCapacity(
        self, request : ForecastLinkCapacityRequest, context : grpc.ServicerContext
    ) -> ForecastLinkCapacityReply:
        forecast_window_seconds = request.forecast_window_seconds

        # history_window_seconds indicates the size of the train-set based on the
        # requested size of the test-set and the configured history ratio
        history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds

        link_id = request.link_id
        link_uuid = link_id.link_uuid.uuid

        context_client = ContextClient()
        link = get_link(context_client, link_uuid)
        if link is None: raise NotFoundException('Link', link_uuid)
        kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids([link_id])
        link_uuid__to__kpi_id = {
            _link_uuid : _kpi_id
            for (_link_uuid, _kpi_sample_type), _kpi_id in kpi_id_map.items()
            if _kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS
        kpi_id = link_uuid__to__kpi_id[link_uuid]
        end_timestamp   = timestamp_utcnow_to_float()
        start_timestamp = end_timestamp - history_window_seconds
        df_historical_data = self._kpi_manager.get_kpi_id_samples([kpi_id], start_timestamp, end_timestamp)
        forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id)

        reply = ForecastLinkCapacityReply()
        reply.link_id.link_uuid.uuid      = link_uuid
        reply.total_capacity_gbps         = link.attributes.total_capacity_gbps
        reply.current_used_capacity_gbps  = link.attributes.used_capacity_gbps
        reply.forecast_used_capacity_gbps = forecast_used_capacity_gbps
        return reply

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ForecastTopologyCapacity(
        self, request : ForecastTopologyCapacityRequest, context : grpc.ServicerContext
    ) -> ForecastTopologyCapacityReply:
        forecast_window_seconds = request.forecast_window_seconds

        # history_window_seconds indicates the size of the train-set based on the
        # requested size of the test-set and the configured history ratio
        history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds
        context_uuid  = request.topology_id.context_id.context_uuid.uuid
        topology_uuid = request.topology_id.topology_uuid.uuid
        context_client = ContextClient()
        topology_details = get_topology_details(context_client, topology_uuid, context_uuid=context_uuid)
        if topology_details is None:
            topology_uuid = '{:s}/{:s}'.format(context_uuid, topology_uuid)
            raise NotFoundException('Topology', topology_uuid)
        link_ids        : List[LinkId]              = list()
        link_capacities : Dict[str, LinkAttributes] = dict()
        for link in topology_details.links:
            link_ids.append(link.link_id)
            link_capacities[link.link_id.link_uuid.uuid] = link.attributes

        kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids(link_ids)
        link_uuid__to__kpi_id = {
            _link_id : _kpi_id
            for (_link_id, _kpi_sample_type), _kpi_id in kpi_id_map.items()
            if _kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS
        kpi_ids = list(link_uuid__to__kpi_id.values())
        end_timestamp   = timestamp_utcnow_to_float()
        start_timestamp = end_timestamp - history_window_seconds
        df_historical_data = self._kpi_manager.get_kpi_id_samples(kpi_ids, start_timestamp, end_timestamp)

        reply = ForecastTopologyCapacityReply()
        for link_uuid, kpi_id in link_uuid__to__kpi_id.items():
            link_attributes = link_capacities[link_uuid]
            forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id)
            link_capacity : ForecastLinkCapacityReply = reply.link_capacities.add() # pylint: disable=no-member
            link_capacity.link_id.link_uuid.uuid      = link_uuid
            link_capacity.total_capacity_gbps         = link_attributes.total_capacity_gbps
            link_capacity.current_used_capacity_gbps  = link_attributes.used_capacity_gbps
            link_capacity.forecast_used_capacity_gbps = forecast_used_capacity_gbps
        return reply