Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.forecaster_pb2 import (
ForecastLinkCapacityReply, ForecastLinkCapacityRequest,
ForecastTopologyCapacityReply, ForecastTopologyCapacityRequest
)
from common.proto.forecaster_pb2_grpc import ForecasterServiceServicer
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.context_queries.Link import get_link
from common.tools.context_queries.Topology import get_topology_details
from context.client.ContextClient import ContextClient
from forecaster.Config import FORECAST_TO_HISTORY_RATIO
from forecaster.service.Forecaster import compute_forecast
from forecaster.service.KpiManager import KpiManager
from forecaster.service.Tools import time_utc_now_to_float
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Forecaster', 'RPC')
class ForecasterServiceServicerImpl(ForecasterServiceServicer):
def __init__(self) -> None:
LOGGER.debug('Creating Servicer...')
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def ForecastLinkCapacity(
self, request : ForecastLinkCapacityRequest, context : grpc.ServicerContext
) -> ForecastLinkCapacityReply:
forecast_window_seconds = request.forecast_window_seconds
# history_window_seconds indicates the size of the train-set based on the
# requested size of the test-set and the configured history ratio
history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds
link_id = request.link_id
kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids([link_id])
kpi_to_link_ids = {
link_id : kpi_id
for (link_id, kpi_sample_type), kpi_id in kpi_id_map.items()
if kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS
}
kpi_id = kpi_to_link_ids[link_id]
end_timestamp = time_utc_now_to_float()
start_timestamp = end_timestamp - history_window_seconds
df_historical_data = self._kpi_manager.get_kpi_id_samples([kpi_id], start_timestamp, end_timestamp)
forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id)
context_client = ContextClient()
link = get_link(context_client, link_id.link_uuid.uuid)
reply = ForecastLinkCapacityReply()
reply.link_id.CopyFrom(link_id)
reply.total_capacity_gbps = link.attributes.total_capacity_gbps
reply.current_used_capacity_gbps = link.attributes.used_capacity_gbps
reply.forecast_used_capacity_gbps = forecast_used_capacity_gbps
return reply
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def ForecastTopologyCapacity(
self, request : ForecastTopologyCapacityRequest, context : grpc.ServicerContext
) -> ForecastTopologyCapacityReply:
forecast_window_seconds = request.forecast_window_seconds
# history_window_seconds indicates the size of the train-set based on the
# requested size of the test-set and the configured history ratio
history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
context_uuid = request.topology_id.context_id.context_uuid.uuid
topology_uuid = request.topology_id.topology_uuid.uuid
context_client = ContextClient()
topology_details = get_topology_details(context_client, topology_uuid, context_uuid=context_uuid)
link_ids = list()
link_capacities = dict()
for link in topology_details.links:
link_ids.append(link.link_id)
link_capacities[link.link_id] = link.attributes
kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids(link_ids)
kpi_to_link_ids = {
link_id : kpi_id
for (link_id, kpi_sample_type), kpi_id in kpi_id_map.items()
if kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS
}
kpi_ids = list(kpi_to_link_ids.keys())
end_timestamp = time_utc_now_to_float()
start_timestamp = end_timestamp - history_window_seconds
df_historical_data = self._kpi_manager.get_kpi_id_samples(kpi_ids, start_timestamp, end_timestamp)
reply = ForecastTopologyCapacityReply()
for link_id, kpi_id in kpi_to_link_ids.items():
link_attributes = link_capacities[link_id]
forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id)
link_capacity : ForecastLinkCapacityReply = reply.link_capacities.add()
link_capacity.link_id.CopyFrom(link_id)
link_capacity.total_capacity_gbps = link_attributes.total_capacity_gbps
link_capacity.current_used_capacity_gbps = link_attributes.used_capacity_gbps
link_capacity.forecast_used_capacity_gbps = forecast_used_capacity_gbps
return reply