diff --git a/src/forecaster/requirements.in b/src/forecaster/requirements.in index 38d04994fb0fa1951fb465bc127eb72659dc2eaf..3ed37c5998a550427f987d881e5ce4455b5e1649 100644 --- a/src/forecaster/requirements.in +++ b/src/forecaster/requirements.in @@ -11,3 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +#numpy==1.23.* +pandas==1.5.* +#prophet==1.1.* +scikit-learn==1.1.* diff --git a/src/forecaster/service/Forecaster.py b/src/forecaster/service/Forecaster.py new file mode 100644 index 0000000000000000000000000000000000000000..16b6d02da693665bcb1509a70e24d7d7f224554d --- /dev/null +++ b/src/forecaster/service/Forecaster.py @@ -0,0 +1,38 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import math, pandas +from statistics import mean +from sklearn.ensemble import RandomForestRegressor +from common.proto.monitoring_pb2 import KpiId +from forecaster.Config import FORECAST_TO_HISTORY_RATIO + +def compute_forecast(samples : pandas.DataFrame, kpi_id : KpiId) -> float: + kpi_uuid = kpi_id.kpi_id.uuid + samples = samples[samples.kpi_id == kpi_uuid] + + num_samples = samples.shape[0] + num_samples_test = math.ceil(num_samples / FORECAST_TO_HISTORY_RATIO) + num_samples_train = num_samples - num_samples_test + if num_samples_train <= 0: raise Exception('Wrong number of train samples: {:d}'.format(num_samples_train)) + if num_samples_test <= 0: raise Exception('Wrong number of test samples: {:d}'.format(num_samples_test )) + + train_set = samples[0:num_samples_train] + test_set = samples[num_samples_train:num_samples] + + rfr = RandomForestRegressor(n_estimators=600, random_state=42) + rfr.fit(train_set.drop(['kpi_id', 'value'], axis=1), train_set['value']) + forecast = rfr.predict(test_set.drop(['value'], axis=1)) + avg_forecast = round(mean(forecast), 2) + return avg_forecast diff --git a/src/forecaster/service/ForecasterServiceServicerImpl.py b/src/forecaster/service/ForecasterServiceServicerImpl.py index fcc24866532f920cb6feb09109f0ce0f17ed821a..138986a440c1b051aae458ca136b08e3b4c3b4d9 100644 --- a/src/forecaster/service/ForecasterServiceServicerImpl.py +++ b/src/forecaster/service/ForecasterServiceServicerImpl.py @@ -12,113 +12,106 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging, numpy, pandas -from datetime import datetime -from forecaster.Config import FORECAST_TO_HISTORY_RATIO -from prophet import Prophet -from sklearn.ensemble import RandomForestRegressor -from statistics import mean - +import grpc, logging from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.forecaster_pb2 import ( ForecastLinkCapacityReply, ForecastLinkCapacityRequest, ForecastTopologyCapacityReply, ForecastTopologyCapacityRequest ) from common.proto.forecaster_pb2_grpc import ForecasterServiceServicer -from common.tools.grpc.Tools import grpc_message_to_json_string +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.tools.context_queries.Link import get_link +from common.tools.context_queries.Topology import get_topology_details from context.client.ContextClient import ContextClient +from forecaster.Config import FORECAST_TO_HISTORY_RATIO +from forecaster.service.Forecaster import compute_forecast +from forecaster.service.KpiManager import KpiManager +from forecaster.service.Tools import time_utc_now_to_float LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Forecaster', 'RPC') -def getTopology(topologyId): - if topologyId == "1": - df = pandas.read_csv('dataset.csv') - elif topologyId == "2": - df = pandas.read_csv('dataset2.csv') - print(df.columns) - #Adapting the dataframe to have the same structure - df = df.drop(['Unnamed: 0', 'source', 'target'], axis=1) - df.rename(columns = {'id':'linkid'}, inplace = True) - df.rename(columns = {'demandValue':'y'}, inplace = True) - else: - df = pandas.read_csv('./PythonGrpc-main/dataset.csv') - return df - -def compute_forecast(df : pandas.DataFrame): - trainset = df[int(df.shape[0] / 10):df.shape[0]] - testset = df[0:int(df.shape[0] / 10)] - - rf = RandomForestRegressor(n_estimators = 600, random_state = 42) - rf.fit(trainset.drop(['y'], axis=1), trainset['y']) - forecast = rf.predict(testset.drop(['y'], axis=1)) - average_result = round(mean(testset['y']), 2) - - class ForecasterServiceServicerImpl(ForecasterServiceServicer): def __init__(self) -> None: LOGGER.debug('Creating Servicer...') + self._kpi_manager = KpiManager() LOGGER.debug('Servicer Created') - def _forecast_link_capacity(self, forecast_window_seconds : float): - # history_window_seconds indicates the size of the train-set based on the - # requested size of the test-set and the configured history ratio - history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds - - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ForecastLinkCapacity( self, request : ForecastLinkCapacityRequest, context : grpc.ServicerContext ) -> ForecastLinkCapacityReply: forecast_window_seconds = request.forecast_window_seconds - df = getLink(request.link_id) - self._forecast_link_capacity(forecast_window_seconds) - return ForecastLinkCapacityReply() + + # history_window_seconds indicates the size of the train-set based on the + # requested size of the test-set and the configured history ratio + history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds + + link_id = request.link_id + kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids([link_id]) + kpi_to_link_ids = { + link_id : kpi_id + for (link_id, kpi_sample_type), kpi_id in kpi_id_map.items() + if kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS + } + kpi_id = kpi_to_link_ids[link_id] + + end_timestamp = time_utc_now_to_float() + start_timestamp = end_timestamp - history_window_seconds + df_historical_data = self._kpi_manager.get_kpi_id_samples([kpi_id], start_timestamp, end_timestamp) + forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id) + + context_client = ContextClient() + link = get_link(context_client, link_id.link_uuid.uuid) + + reply = ForecastLinkCapacityReply() + reply.link_id.CopyFrom(link_id) + reply.total_capacity_gbps = link.attributes.total_capacity_gbps + reply.current_used_capacity_gbps = link.attributes.used_capacity_gbps + reply.forecast_used_capacity_gbps = forecast_used_capacity_gbps + return reply @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ForecastTopologyCapacity( self, request : ForecastTopologyCapacityRequest, context : grpc.ServicerContext ) -> ForecastTopologyCapacityReply: forecast_window_seconds = request.forecast_window_seconds - df = getTopology(request.topology_id) - self._forecast_link_capacity(forecast_window_seconds) - return ForecastTopologyCapacityReply() + # history_window_seconds indicates the size of the train-set based on the + # requested size of the test-set and the configured history ratio + history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds - - - -class GreeterServicer(greet_pb2_grpc.GreeterServicer): - def ComputeTopologyForecast(self, request, context): - - df['ds'] = pandas.to_datetime(df['ds']) - datetime(1970, 1, 1) - df['ds'] = df['ds'].dt.total_seconds() - - maximum = df['ds'].max() - minimum = maximum - history_window_seconds - print(f'The dates of the trainset would have the following range of values: From {minimum} until {maximum} ') - - forecast_reply = greet_pb2.ForecastReply() - - for linkid in df['linkid'].unique()[:10]: - print(linkid) - newdf = df[df.linkid == linkid ] - #converting linkid to float to avoid problems with machine learning, since there is only one option. It is converted to 1.0 - newdf.loc[:,'linkid'] = 1.0 - - trainset = newdf[int(newdf.shape[0] / 10):newdf.shape[0]] - testset = newdf[0:int(newdf.shape[0] / 10)] - - rf = RandomForestRegressor(n_estimators = 600, random_state = 42) - rf.fit(trainset.drop(['y'], axis=1), trainset['y']) - forecast = rf.predict(testset.drop(['y'], axis=1)) - averageResult = round(mean(testset['y']), 2) - - link_capacity = greet_pb2.LinkCapacity() - link_capacity.linkuuid = linkid - link_capacity.forecasted_capacity = averageResult - - forecast_reply.LinkCapacityList.append(link_capacity) - - return forecast_reply + context_uuid = request.topology_id.context_id.context_uuid.uuid + topology_uuid = request.topology_id.topology_uuid.uuid + context_client = ContextClient() + topology_details = get_topology_details(context_client, topology_uuid, context_uuid=context_uuid) + + link_ids = list() + link_capacities = dict() + for link in topology_details.links: + link_ids.append(link.link_id) + link_capacities[link.link_id] = link.attributes + + kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids(link_ids) + kpi_to_link_ids = { + link_id : kpi_id + for (link_id, kpi_sample_type), kpi_id in kpi_id_map.items() + if kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS + } + + kpi_ids = list(kpi_to_link_ids.keys()) + end_timestamp = time_utc_now_to_float() + start_timestamp = end_timestamp - history_window_seconds + df_historical_data = self._kpi_manager.get_kpi_id_samples(kpi_ids, start_timestamp, end_timestamp) + + reply = ForecastTopologyCapacityReply() + for link_id, kpi_id in kpi_to_link_ids.items(): + link_attributes = link_capacities[link_id] + forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id) + link_capacity : ForecastLinkCapacityReply = reply.link_capacities.add() + link_capacity.link_id.CopyFrom(link_id) + link_capacity.total_capacity_gbps = link_attributes.total_capacity_gbps + link_capacity.current_used_capacity_gbps = link_attributes.used_capacity_gbps + link_capacity.forecast_used_capacity_gbps = forecast_used_capacity_gbps + return reply diff --git a/src/forecaster/service/KpiManager.py b/src/forecaster/service/KpiManager.py new file mode 100644 index 0000000000000000000000000000000000000000..bbf900d07eaf4bdc2105f3f53311855c5154d2ec --- /dev/null +++ b/src/forecaster/service/KpiManager.py @@ -0,0 +1,58 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pandas +from typing import Dict, List, Tuple +from common.proto.context_pb2 import Empty, LinkId +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.monitoring_pb2 import KpiId, KpiQuery +from monitoring.client.MonitoringClient import MonitoringClient + +class KpiManager: + def __init__(self) -> None: + self._monitoring_client = MonitoringClient() + + def get_kpi_ids_from_link_ids( + self, link_ids : List[LinkId] + ) -> Dict[Tuple[LinkId, KpiSampleType], KpiId]: + link_uuids = {link_id.link_uuid.uuid for link_id in link_ids} + kpi_descriptors = self._monitoring_client.GetKpiDescriptorList(Empty()) + kpi_ids : Dict[Tuple[LinkId, KpiSampleType], KpiId] = { + (kpi_descriptor.link_id, kpi_descriptor.kpi_sample_type) : kpi_descriptor.kpi_id + for kpi_descriptor in kpi_descriptors + if kpi_descriptor.link_id.link_uuid.uuid in link_uuids + } + return kpi_ids + + def get_kpi_id_samples( + self, kpi_ids : List[KpiId], start_timestamp : float, end_timestamp : float + ) -> pandas.DataFrame: + kpi_query = KpiQuery() + kpi_query.kpi_ids.extend(kpi_ids) # pylint: disable=no-member + kpi_query.start_timestamp.timestamp = start_timestamp # pylint: disable=no-member + kpi_query.end_timestamp.timestamp = end_timestamp # pylint: disable=no-member + raw_kpi_table = self._monitoring_client.QueryKpiData(kpi_query) + + data : List[Tuple[str, float, float]] = list() + for raw_kpi_list in raw_kpi_table.raw_kpi_lists: + kpi_uuid = raw_kpi_list.kpi_id.kpi_id.uuid + for raw_kpi in raw_kpi_list.raw_kpis: + timestamp = raw_kpi.timestamp.timestamp + value = float(getattr(raw_kpi.kpi_value, raw_kpi.kpi_value.WhichOneof('value'))) + data.append((timestamp, kpi_uuid, value)) + + df = pandas.DataFrame(data, columns=['timestamp', 'kpi_id', 'value']) + df['timestamp'] = pandas.to_datetime(df['timestamp'].astype('int'), unit='s', utc=True) + df.sort_values('timestamp', ascending=True, inplace=True) + return df diff --git a/src/forecaster/service/Tools.py b/src/forecaster/service/Tools.py new file mode 100644 index 0000000000000000000000000000000000000000..93e052fe8535b9d7d5a6753133e948c45c17f037 --- /dev/null +++ b/src/forecaster/service/Tools.py @@ -0,0 +1,28 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import calendar +from datetime import datetime, timezone + +def time_datetime_to_int(dt_time : datetime) -> int: + return int(calendar.timegm(dt_time.timetuple())) + +def time_datetime_to_float(dt_time : datetime) -> float: + return time_datetime_to_int(dt_time) + (dt_time.microsecond / 1.e6) + +def time_utc_now_to_datetime() -> datetime: + return datetime.now(tz=timezone.utc) + +def time_utc_now_to_float() -> float: + return time_datetime_to_float(time_utc_now_to_datetime())