diff --git a/src/forecaster/service/ForecasterServiceServicerImpl.py b/src/forecaster/service/ForecasterServiceServicerImpl.py index 8466198a4cea6bca7c3b093349158361d911ab2d..fcc24866532f920cb6feb09109f0ce0f17ed821a 100644 --- a/src/forecaster/service/ForecasterServiceServicerImpl.py +++ b/src/forecaster/service/ForecasterServiceServicerImpl.py @@ -12,7 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging +import grpc, logging, numpy, pandas +from datetime import datetime +from forecaster.Config import FORECAST_TO_HISTORY_RATIO +from prophet import Prophet +from sklearn.ensemble import RandomForestRegressor +from statistics import mean + from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.forecaster_pb2 import ( ForecastLinkCapacityReply, ForecastLinkCapacityRequest, @@ -26,19 +32,93 @@ LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Forecaster', 'RPC') +def getTopology(topologyId): + if topologyId == "1": + df = pandas.read_csv('dataset.csv') + elif topologyId == "2": + df = pandas.read_csv('dataset2.csv') + print(df.columns) + #Adapting the dataframe to have the same structure + df = df.drop(['Unnamed: 0', 'source', 'target'], axis=1) + df.rename(columns = {'id':'linkid'}, inplace = True) + df.rename(columns = {'demandValue':'y'}, inplace = True) + else: + df = pandas.read_csv('./PythonGrpc-main/dataset.csv') + return df + +def compute_forecast(df : pandas.DataFrame): + trainset = df[int(df.shape[0] / 10):df.shape[0]] + testset = df[0:int(df.shape[0] / 10)] + + rf = RandomForestRegressor(n_estimators = 600, random_state = 42) + rf.fit(trainset.drop(['y'], axis=1), trainset['y']) + forecast = rf.predict(testset.drop(['y'], axis=1)) + average_result = round(mean(testset['y']), 2) + + class ForecasterServiceServicerImpl(ForecasterServiceServicer): def __init__(self) -> None: LOGGER.debug('Creating Servicer...') LOGGER.debug('Servicer Created') + def _forecast_link_capacity(self, forecast_window_seconds : float): + # history_window_seconds indicates the size of the train-set based on the + # requested size of the test-set and the configured history ratio + history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ForecastLinkCapacity( self, request : ForecastLinkCapacityRequest, context : grpc.ServicerContext ) -> ForecastLinkCapacityReply: + forecast_window_seconds = request.forecast_window_seconds + df = getLink(request.link_id) + self._forecast_link_capacity(forecast_window_seconds) return ForecastLinkCapacityReply() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ForecastTopologyCapacity( self, request : ForecastTopologyCapacityRequest, context : grpc.ServicerContext ) -> ForecastTopologyCapacityReply: + forecast_window_seconds = request.forecast_window_seconds + df = getTopology(request.topology_id) + self._forecast_link_capacity(forecast_window_seconds) return ForecastTopologyCapacityReply() + + + + + +class GreeterServicer(greet_pb2_grpc.GreeterServicer): + def ComputeTopologyForecast(self, request, context): + + df['ds'] = pandas.to_datetime(df['ds']) - datetime(1970, 1, 1) + df['ds'] = df['ds'].dt.total_seconds() + + maximum = df['ds'].max() + minimum = maximum - history_window_seconds + print(f'The dates of the trainset would have the following range of values: From {minimum} until {maximum} ') + + forecast_reply = greet_pb2.ForecastReply() + + for linkid in df['linkid'].unique()[:10]: + print(linkid) + newdf = df[df.linkid == linkid ] + #converting linkid to float to avoid problems with machine learning, since there is only one option. It is converted to 1.0 + newdf.loc[:,'linkid'] = 1.0 + + trainset = newdf[int(newdf.shape[0] / 10):newdf.shape[0]] + testset = newdf[0:int(newdf.shape[0] / 10)] + + rf = RandomForestRegressor(n_estimators = 600, random_state = 42) + rf.fit(trainset.drop(['y'], axis=1), trainset['y']) + forecast = rf.predict(testset.drop(['y'], axis=1)) + averageResult = round(mean(testset['y']), 2) + + link_capacity = greet_pb2.LinkCapacity() + link_capacity.linkuuid = linkid + link_capacity.forecasted_capacity = averageResult + + forecast_reply.LinkCapacityList.append(link_capacity) + + return forecast_reply diff --git a/src/forecaster/service/TODO.txt b/src/forecaster/service/TODO.txt new file mode 100644 index 0000000000000000000000000000000000000000..eb3cf3652e62c383a8f9c1e440d4ba1267740940 --- /dev/null +++ b/src/forecaster/service/TODO.txt @@ -0,0 +1,51 @@ +Pseudocode for RPC method `ComputeTopologyForecast`: +```python + # Setting to configure the ratio between requested forecast and amount of historical data to be used for the forecast. + # E.g., if forecast window is 1 week, compute forecast based on 10 weeks of historical data. + FORECAST_TO_HISTORY_RATIO = 10 + + history_window_seconds = FORECAST_TO_HISTORY_RATIO * request.forecast_window_seconds + + forecast_reply = ForecastTopologyCapacityReply() + + topology = context_client.GetTopology(topology_id) + for link_id in topology.link_ids: + link = context_client.GetLink(link_id) + + used_capacity_history_gbps = monitoring_client.GetKPIValue(link_id, KPI.LinkUsedCapacity, window=history_window_seconds) + forecast_used_capacity_gbps = compute_forecast(used_capacity_history_gbps, forecast_window_seconds) + + forecast_reply.link_capacities.append(ForecastLinkCapacityReply( + link_id=link_id, + total_capacity_gbps=link.total_capacity_gbps, + current_used_capacity_gbps=link.used_capacity_gbps, + forecast_used_capacity_gbps=forecast_used_capacity_gbps + )) + + return forecast_reply +``` + +## PathComp Impact +After retrieving the topology, if the service has a duration constraint configured, the PathComp component should interrogate the Forecaster and request a topology forecast according to the requested duration of the service. The computed link capacity forecast should be used as link capacity in path computations. + +``` + link_capacities : Dict[str, float] = dict() # link_uuid => available_capacity_gbps + service_duration = get_service_duration(request.service) + if service_duration is None: + topology = context_client.GetTopologyDetails(topology_id) + for link in topology.links: + link_capacities[link.link_id.uuid] = link.available_capacity_gbps + else: + forecast_request = ForecastTopology( + topology_id = TopologyId(service.context_id, admin), + forecast_window_seconds = service.duration + ) + forecast_reply = forecaster_client.ComputeTopologyForecast(forecast_request) + for link_capacity in forecast_reply.link_capacities: + total_capacity_gbps = link_capacity.total_capacity_gbps + forecast_used_capacity_gbps = link_capacity.forecast_used_capacity_gbps + forecast_available_capacity_gbps = total_capacity_gbps - forecast_used_capacity_gbps + link_capacities[link_capacity.link_id.uuid] = forecast_available_capacity_gbps + + # Continue path computation passing link_capacities to the path comp algorithms. +``` \ No newline at end of file