# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, logging from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.forecaster_pb2 import ( ForecastLinkCapacityReply, ForecastLinkCapacityRequest, ForecastTopologyCapacityReply, ForecastTopologyCapacityRequest ) from common.proto.forecaster_pb2_grpc import ForecasterServiceServicer from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.tools.context_queries.Link import get_link from common.tools.context_queries.Topology import get_topology_details from context.client.ContextClient import ContextClient from forecaster.Config import FORECAST_TO_HISTORY_RATIO from forecaster.service.Forecaster import compute_forecast from forecaster.service.KpiManager import KpiManager from forecaster.service.Tools import time_utc_now_to_float LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Forecaster', 'RPC') class ForecasterServiceServicerImpl(ForecasterServiceServicer): def __init__(self) -> None: LOGGER.debug('Creating Servicer...') self._kpi_manager = KpiManager() LOGGER.debug('Servicer Created') @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ForecastLinkCapacity( self, request : ForecastLinkCapacityRequest, context : grpc.ServicerContext ) -> ForecastLinkCapacityReply: forecast_window_seconds = request.forecast_window_seconds # history_window_seconds indicates the size of the train-set based on the # requested size of the test-set and the configured history ratio history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds link_id = request.link_id kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids([link_id]) kpi_to_link_ids = { link_id : kpi_id for (link_id, kpi_sample_type), kpi_id in kpi_id_map.items() if kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS } kpi_id = kpi_to_link_ids[link_id] end_timestamp = time_utc_now_to_float() start_timestamp = end_timestamp - history_window_seconds df_historical_data = self._kpi_manager.get_kpi_id_samples([kpi_id], start_timestamp, end_timestamp) forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id) context_client = ContextClient() link = get_link(context_client, link_id.link_uuid.uuid) reply = ForecastLinkCapacityReply() reply.link_id.CopyFrom(link_id) reply.total_capacity_gbps = link.attributes.total_capacity_gbps reply.current_used_capacity_gbps = link.attributes.used_capacity_gbps reply.forecast_used_capacity_gbps = forecast_used_capacity_gbps return reply @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ForecastTopologyCapacity( self, request : ForecastTopologyCapacityRequest, context : grpc.ServicerContext ) -> ForecastTopologyCapacityReply: forecast_window_seconds = request.forecast_window_seconds # history_window_seconds indicates the size of the train-set based on the # requested size of the test-set and the configured history ratio history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds context_uuid = request.topology_id.context_id.context_uuid.uuid topology_uuid = request.topology_id.topology_uuid.uuid context_client = ContextClient() topology_details = get_topology_details(context_client, topology_uuid, context_uuid=context_uuid) link_ids = list() link_capacities = dict() for link in topology_details.links: link_ids.append(link.link_id) link_capacities[link.link_id] = link.attributes kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids(link_ids) kpi_to_link_ids = { link_id : kpi_id for (link_id, kpi_sample_type), kpi_id in kpi_id_map.items() if kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS } kpi_ids = list(kpi_to_link_ids.keys()) end_timestamp = time_utc_now_to_float() start_timestamp = end_timestamp - history_window_seconds df_historical_data = self._kpi_manager.get_kpi_id_samples(kpi_ids, start_timestamp, end_timestamp) reply = ForecastTopologyCapacityReply() for link_id, kpi_id in kpi_to_link_ids.items(): link_attributes = link_capacities[link_id] forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id) link_capacity : ForecastLinkCapacityReply = reply.link_capacities.add() link_capacity.link_id.CopyFrom(link_id) link_capacity.total_capacity_gbps = link_attributes.total_capacity_gbps link_capacity.current_used_capacity_gbps = link_attributes.used_capacity_gbps link_capacity.forecast_used_capacity_gbps = forecast_used_capacity_gbps return reply