Skip to content
ForecasterServiceServicerImpl.py 4.96 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import grpc, logging, numpy, pandas
from datetime import datetime
from forecaster.Config import FORECAST_TO_HISTORY_RATIO
from prophet import Prophet
from sklearn.ensemble import RandomForestRegressor
from statistics import mean 

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.forecaster_pb2 import (
    ForecastLinkCapacityReply, ForecastLinkCapacityRequest,
    ForecastTopologyCapacityReply, ForecastTopologyCapacityRequest
)
from common.proto.forecaster_pb2_grpc import ForecasterServiceServicer
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient

LOGGER = logging.getLogger(__name__)

METRICS_POOL = MetricsPool('Forecaster', 'RPC')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def getTopology(topologyId):
    if topologyId == "1":
        df = pandas.read_csv('dataset.csv')
    elif topologyId == "2":
        df = pandas.read_csv('dataset2.csv')
        print(df.columns)
        #Adapting the dataframe to have the same structure
        df = df.drop(['Unnamed: 0', 'source', 'target'], axis=1)
        df.rename(columns = {'id':'linkid'}, inplace = True)
        df.rename(columns = {'demandValue':'y'}, inplace = True)
    else:
        df = pandas.read_csv('./PythonGrpc-main/dataset.csv')
    return df

def compute_forecast(df : pandas.DataFrame):
    trainset = df[int(df.shape[0] / 10):df.shape[0]]
    testset = df[0:int(df.shape[0] / 10)]

    rf = RandomForestRegressor(n_estimators = 600, random_state = 42)
    rf.fit(trainset.drop(['y'], axis=1), trainset['y'])
    forecast = rf.predict(testset.drop(['y'], axis=1))
    average_result = round(mean(testset['y']), 2)


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
class ForecasterServiceServicerImpl(ForecasterServiceServicer):
    def __init__(self) -> None:
        LOGGER.debug('Creating Servicer...')
        LOGGER.debug('Servicer Created')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def _forecast_link_capacity(self, forecast_window_seconds : float):
        # history_window_seconds indicates the size of the train-set based on the
        # requested size of the test-set and the configured history ratio
        history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ForecastLinkCapacity(
        self, request : ForecastLinkCapacityRequest, context : grpc.ServicerContext
    ) -> ForecastLinkCapacityReply:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        forecast_window_seconds = request.forecast_window_seconds
        df = getLink(request.link_id)
        self._forecast_link_capacity(forecast_window_seconds)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return ForecastLinkCapacityReply()

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ForecastTopologyCapacity(
        self, request : ForecastTopologyCapacityRequest, context : grpc.ServicerContext
    ) -> ForecastTopologyCapacityReply:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        forecast_window_seconds = request.forecast_window_seconds
        df = getTopology(request.topology_id)
        self._forecast_link_capacity(forecast_window_seconds)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return ForecastTopologyCapacityReply()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed





class GreeterServicer(greet_pb2_grpc.GreeterServicer):
    def ComputeTopologyForecast(self, request, context):

        df['ds'] = pandas.to_datetime(df['ds']) - datetime(1970, 1, 1)
        df['ds'] = df['ds'].dt.total_seconds()

        maximum = df['ds'].max()
        minimum = maximum - history_window_seconds
        print(f'The dates of the trainset would have the following range of values: From {minimum} until {maximum} ')

        forecast_reply = greet_pb2.ForecastReply()

        for linkid in df['linkid'].unique()[:10]:
            print(linkid)
            newdf = df[df.linkid == linkid ]
            #converting linkid to float to avoid problems with machine learning, since there is only one option. It is converted to 1.0
            newdf.loc[:,'linkid'] = 1.0

            trainset = newdf[int(newdf.shape[0] / 10):newdf.shape[0]]
            testset = newdf[0:int(newdf.shape[0] / 10)]

            rf = RandomForestRegressor(n_estimators = 600, random_state = 42)
            rf.fit(trainset.drop(['y'], axis=1), trainset['y'])
            forecast = rf.predict(testset.drop(['y'], axis=1))
            averageResult = round(mean(testset['y']), 2)

            link_capacity = greet_pb2.LinkCapacity()
            link_capacity.linkuuid = linkid
            link_capacity.forecasted_capacity = averageResult

            forecast_reply.LinkCapacityList.append(link_capacity)

        return forecast_reply