Newer
Older
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.method_wrappers.ServiceExceptions import NotFoundException
from common.proto.context_pb2 import LinkAttributes, LinkId
from common.proto.forecaster_pb2 import (
ForecastLinkCapacityReply, ForecastLinkCapacityRequest,
ForecastTopologyCapacityReply, ForecastTopologyCapacityRequest
)
from common.proto.forecaster_pb2_grpc import ForecasterServiceServicer
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.context_queries.Link import get_link
from common.tools.context_queries.Topology import get_topology_details
from common.tools.timestamp.Converters import timestamp_utcnow_to_float
from context.client.ContextClient import ContextClient
from forecaster.Config import FORECAST_TO_HISTORY_RATIO
from forecaster.service.Forecaster import compute_forecast
from forecaster.service.KpiManager import KpiManager
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Forecaster', 'RPC')
class ForecasterServiceServicerImpl(ForecasterServiceServicer):
def __init__(self) -> None:
LOGGER.debug('Creating Servicer...')
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def ForecastLinkCapacity(
self, request : ForecastLinkCapacityRequest, context : grpc.ServicerContext
) -> ForecastLinkCapacityReply:
forecast_window_seconds = request.forecast_window_seconds
# history_window_seconds indicates the size of the train-set based on the
# requested size of the test-set and the configured history ratio
history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds
link_id = request.link_id
link = get_link(context_client, link_uuid)
if link is None: raise NotFoundException('Link', link_uuid)
kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids([link_id])
link_uuid__to__kpi_id = {
_link_uuid : _kpi_id
for (_link_uuid, _kpi_sample_type), _kpi_id in kpi_id_map.items()
if _kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS
start_timestamp = end_timestamp - history_window_seconds
df_historical_data = self._kpi_manager.get_kpi_id_samples([kpi_id], start_timestamp, end_timestamp)
forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id)
reply = ForecastLinkCapacityReply()
reply.total_capacity_gbps = link.attributes.total_capacity_gbps
reply.current_used_capacity_gbps = link.attributes.used_capacity_gbps
reply.forecast_used_capacity_gbps = forecast_used_capacity_gbps
return reply
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def ForecastTopologyCapacity(
self, request : ForecastTopologyCapacityRequest, context : grpc.ServicerContext
) -> ForecastTopologyCapacityReply:
forecast_window_seconds = request.forecast_window_seconds
# history_window_seconds indicates the size of the train-set based on the
# requested size of the test-set and the configured history ratio
history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds
context_uuid = request.topology_id.context_id.context_uuid.uuid
topology_uuid = request.topology_id.topology_uuid.uuid
context_client = ContextClient()
topology_details = get_topology_details(context_client, topology_uuid, context_uuid=context_uuid)
if topology_details is None:
topology_uuid = '{:s}/{:s}'.format(context_uuid, topology_uuid)
raise NotFoundException('Topology', topology_uuid)
link_ids : List[LinkId] = list()
link_capacities : Dict[str, LinkAttributes] = dict()
for link in topology_details.links:
link_ids.append(link.link_id)
link_capacities[link.link_id.link_uuid.uuid] = link.attributes
kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids(link_ids)
link_uuid__to__kpi_id = {
_link_id : _kpi_id
for (_link_id, _kpi_sample_type), _kpi_id in kpi_id_map.items()
if _kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS
kpi_ids = list(link_uuid__to__kpi_id.values())
start_timestamp = end_timestamp - history_window_seconds
df_historical_data = self._kpi_manager.get_kpi_id_samples(kpi_ids, start_timestamp, end_timestamp)
reply = ForecastTopologyCapacityReply()
for link_uuid, kpi_id in link_uuid__to__kpi_id.items():
link_attributes = link_capacities[link_uuid]
forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id)
link_capacity : ForecastLinkCapacityReply = reply.link_capacities.add() # pylint: disable=no-member
link_capacity.link_id.link_uuid.uuid = link_uuid
link_capacity.total_capacity_gbps = link_attributes.total_capacity_gbps
link_capacity.current_used_capacity_gbps = link_attributes.used_capacity_gbps
link_capacity.forecast_used_capacity_gbps = forecast_used_capacity_gbps
return reply