Skip to content
Snippets Groups Projects
ForecasterServiceServicerImpl.py 6.15 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    from typing import Dict, List
    
    import grpc, logging
    
    from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
    
    from common.method_wrappers.ServiceExceptions import NotFoundException
    from common.proto.context_pb2 import LinkAttributes, LinkId
    
    from common.proto.forecaster_pb2 import (
        ForecastLinkCapacityReply, ForecastLinkCapacityRequest,
        ForecastTopologyCapacityReply, ForecastTopologyCapacityRequest
    )
    from common.proto.forecaster_pb2_grpc import ForecasterServiceServicer
    
    from common.proto.kpi_sample_types_pb2 import KpiSampleType
    from common.tools.context_queries.Link import get_link
    from common.tools.context_queries.Topology import get_topology_details
    
    from common.tools.timestamp.Converters import timestamp_utcnow_to_float
    
    from context.client.ContextClient import ContextClient
    
    from forecaster.Config import FORECAST_TO_HISTORY_RATIO
    from forecaster.service.Forecaster import compute_forecast
    from forecaster.service.KpiManager import KpiManager
    
    
    LOGGER = logging.getLogger(__name__)
    
    METRICS_POOL = MetricsPool('Forecaster', 'RPC')
    
    class ForecasterServiceServicerImpl(ForecasterServiceServicer):
        def __init__(self) -> None:
            LOGGER.debug('Creating Servicer...')
    
            self._kpi_manager = KpiManager()
    
            LOGGER.debug('Servicer Created')
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ForecastLinkCapacity(
            self, request : ForecastLinkCapacityRequest, context : grpc.ServicerContext
        ) -> ForecastLinkCapacityReply:
    
            forecast_window_seconds = request.forecast_window_seconds
    
    
            # history_window_seconds indicates the size of the train-set based on the
            # requested size of the test-set and the configured history ratio
            history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds
    
            link_id = request.link_id
    
            link_uuid = link_id.link_uuid.uuid
    
    
            context_client = ContextClient()
    
            link = get_link(context_client, link_uuid)
            if link is None: raise NotFoundException('Link', link_uuid)
    
            kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids([link_id])
    
            link_uuid__to__kpi_id = {
                _link_uuid : _kpi_id
                for (_link_uuid, _kpi_sample_type), _kpi_id in kpi_id_map.items()
                if _kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS
    
            kpi_id = link_uuid__to__kpi_id[link_uuid]
    
            end_timestamp   = timestamp_utcnow_to_float()
    
            start_timestamp = end_timestamp - history_window_seconds
            df_historical_data = self._kpi_manager.get_kpi_id_samples([kpi_id], start_timestamp, end_timestamp)
            forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id)
    
            reply = ForecastLinkCapacityReply()
    
            reply.link_id.link_uuid.uuid      = link_uuid
    
            reply.total_capacity_gbps         = link.attributes.total_capacity_gbps
            reply.current_used_capacity_gbps  = link.attributes.used_capacity_gbps
            reply.forecast_used_capacity_gbps = forecast_used_capacity_gbps
            return reply
    
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ForecastTopologyCapacity(
            self, request : ForecastTopologyCapacityRequest, context : grpc.ServicerContext
        ) -> ForecastTopologyCapacityReply:
    
            forecast_window_seconds = request.forecast_window_seconds
    
    
            # history_window_seconds indicates the size of the train-set based on the
            # requested size of the test-set and the configured history ratio
            history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds
    
            context_uuid  = request.topology_id.context_id.context_uuid.uuid
            topology_uuid = request.topology_id.topology_uuid.uuid
            context_client = ContextClient()
            topology_details = get_topology_details(context_client, topology_uuid, context_uuid=context_uuid)
    
            if topology_details is None:
                topology_uuid = '{:s}/{:s}'.format(context_uuid, topology_uuid)
                raise NotFoundException('Topology', topology_uuid)
    
            link_ids        : List[LinkId]              = list()
            link_capacities : Dict[str, LinkAttributes] = dict()
    
            for link in topology_details.links:
                link_ids.append(link.link_id)
    
                link_capacities[link.link_id.link_uuid.uuid] = link.attributes
    
    
            kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids(link_ids)
    
            link_uuid__to__kpi_id = {
                _link_id : _kpi_id
                for (_link_id, _kpi_sample_type), _kpi_id in kpi_id_map.items()
                if _kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS
    
            kpi_ids = list(link_uuid__to__kpi_id.values())
    
            end_timestamp   = timestamp_utcnow_to_float()
    
            start_timestamp = end_timestamp - history_window_seconds
            df_historical_data = self._kpi_manager.get_kpi_id_samples(kpi_ids, start_timestamp, end_timestamp)
    
            reply = ForecastTopologyCapacityReply()
    
            for link_uuid, kpi_id in link_uuid__to__kpi_id.items():
                link_attributes = link_capacities[link_uuid]
    
                forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id)
    
                link_capacity : ForecastLinkCapacityReply = reply.link_capacities.add() # pylint: disable=no-member
    
                link_capacity.link_id.link_uuid.uuid      = link_uuid
    
                link_capacity.total_capacity_gbps         = link_attributes.total_capacity_gbps
                link_capacity.current_used_capacity_gbps  = link_attributes.used_capacity_gbps
                link_capacity.forecast_used_capacity_gbps = forecast_used_capacity_gbps
            return reply