diff --git a/src/forecaster/greet_client.py b/src/forecaster/greet_client.py new file mode 100644 index 0000000000000000000000000000000000000000..e5c42439d86e574b562c9df7b78f8e5a89feda08 --- /dev/null +++ b/src/forecaster/greet_client.py @@ -0,0 +1,23 @@ +import greet_pb2_grpc +import greet_pb2 +import grpc + +#from tests import greet_pb2 +#from tests import greet_pb2_grpc + +import datetime as dt + +def run(): + with grpc.insecure_channel('localhost:50051') as channel: + stub = greet_pb2_grpc.GreeterStub(channel) + print("The stub has been created") + forecastRequest = greet_pb2.ForecastTopology(forecast_window_seconds = 36000, topology_id = "1" ) + print("The request has been sent to the client") + hello_reply = stub.ComputeTopologyForecast(forecastRequest) + print("The response has been received") + print(hello_reply) + + + +if __name__ == "__main__": + run() \ No newline at end of file diff --git a/src/forecaster/greet_pb2.py b/src/forecaster/greet_pb2.py new file mode 100644 index 0000000000000000000000000000000000000000..800b3edd823b8cd6eb938c62d76b70e45649bb3b --- /dev/null +++ b/src/forecaster/greet_pb2.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: greet.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0bgreet.proto\x12\x05greet\"H\n\x10\x46orecastTopology\x12\x1f\n\x17\x66orecast_window_seconds\x18\x01 \x01(\x02\x12\x13\n\x0btopology_id\x18\x02 \x01(\t\">\n\rForecastReply\x12-\n\x10LinkCapacityList\x18\x01 \x03(\x0b\x32\x13.greet.LinkCapacity\"Q\n\x14\x46orecastDelayedReply\x12\x0f\n\x07message\x18\x01 \x01(\t\x12(\n\x07request\x18\x02 \x03(\x0b\x32\x17.greet.ForecastTopology\"Z\n\x0cLinkCapacity\x12\x10\n\x08linkuuid\x18\x01 \x01(\t\x12\x1b\n\x13\x66orecasted_capacity\x18\x02 \x01(\x02\x12\x1b\n\x13total_capacity_gbps\x18\x03 \x01(\x02\x32S\n\x07Greeter\x12H\n\x17\x43omputeTopologyForecast\x12\x17.greet.ForecastTopology\x1a\x14.greet.ForecastReplyb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'greet_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_FORECASTTOPOLOGY']._serialized_start=22 + _globals['_FORECASTTOPOLOGY']._serialized_end=94 + _globals['_FORECASTREPLY']._serialized_start=96 + _globals['_FORECASTREPLY']._serialized_end=158 + _globals['_FORECASTDELAYEDREPLY']._serialized_start=160 + _globals['_FORECASTDELAYEDREPLY']._serialized_end=241 + _globals['_LINKCAPACITY']._serialized_start=243 + _globals['_LINKCAPACITY']._serialized_end=333 + _globals['_GREETER']._serialized_start=335 + _globals['_GREETER']._serialized_end=418 +# @@protoc_insertion_point(module_scope) diff --git a/src/forecaster/greet_pb2.pyi b/src/forecaster/greet_pb2.pyi new file mode 100644 index 0000000000000000000000000000000000000000..5197df5e81bb0d538a0013f4b5b85963cdf3924f --- /dev/null +++ b/src/forecaster/greet_pb2.pyi @@ -0,0 +1,30 @@ +from google.protobuf.internal import containers as _containers +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union + +DESCRIPTOR: _descriptor.FileDescriptor + +class ForecastRequest(_message.Message): + __slots__ = ["linkid"] + LINKID_FIELD_NUMBER: _ClassVar[int] + linkid: str + def __init__(self, linkid: _Optional[str] = ...) -> None: ... + +class ForecastReply(_message.Message): + __slots__ = ["message", "errorRate", "calculationTime"] + MESSAGE_FIELD_NUMBER: _ClassVar[int] + ERRORRATE_FIELD_NUMBER: _ClassVar[int] + CALCULATIONTIME_FIELD_NUMBER: _ClassVar[int] + message: str + errorRate: float + calculationTime: float + def __init__(self, message: _Optional[str] = ..., errorRate: _Optional[float] = ..., calculationTime: _Optional[float] = ...) -> None: ... + +class ForecastDelayedReply(_message.Message): + __slots__ = ["message", "request"] + MESSAGE_FIELD_NUMBER: _ClassVar[int] + REQUEST_FIELD_NUMBER: _ClassVar[int] + message: str + request: _containers.RepeatedCompositeFieldContainer[ForecastRequest] + def __init__(self, message: _Optional[str] = ..., request: _Optional[_Iterable[_Union[ForecastRequest, _Mapping]]] = ...) -> None: ... diff --git a/src/forecaster/greet_pb2_grpc.py b/src/forecaster/greet_pb2_grpc.py new file mode 100644 index 0000000000000000000000000000000000000000..f0a69cca320d518eb750e6a4d36c9e4ef5792d1a --- /dev/null +++ b/src/forecaster/greet_pb2_grpc.py @@ -0,0 +1,70 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +import greet_pb2 as greet__pb2 + + +class GreeterStub(object): + """The greeting service definition. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.ComputeTopologyForecast = channel.unary_unary( + '/greet.Greeter/ComputeTopologyForecast', + request_serializer=greet__pb2.ForecastTopology.SerializeToString, + response_deserializer=greet__pb2.ForecastReply.FromString, + ) + + +class GreeterServicer(object): + """The greeting service definition. + """ + + def ComputeTopologyForecast(self, request, context): + """Unary + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_GreeterServicer_to_server(servicer, server): + rpc_method_handlers = { + 'ComputeTopologyForecast': grpc.unary_unary_rpc_method_handler( + servicer.ComputeTopologyForecast, + request_deserializer=greet__pb2.ForecastTopology.FromString, + response_serializer=greet__pb2.ForecastReply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'greet.Greeter', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class Greeter(object): + """The greeting service definition. + """ + + @staticmethod + def ComputeTopologyForecast(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/greet.Greeter/ComputeTopologyForecast', + greet__pb2.ForecastTopology.SerializeToString, + greet__pb2.ForecastReply.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/src/forecaster/greet_server.py b/src/forecaster/greet_server.py new file mode 100644 index 0000000000000000000000000000000000000000..362a258561995a541a4583084b80cb61f0ae1415 --- /dev/null +++ b/src/forecaster/greet_server.py @@ -0,0 +1,99 @@ +from concurrent import futures +import time + +import grpc +import greet_pb2 +import greet_pb2_grpc + +#imported by me +import pandas as pd +import numpy as np +from prophet import Prophet +import matplotlib.pyplot as plt +import time +from sklearn.ensemble import RandomForestRegressor +import datetime as dt +from statistics import mean + +class GreeterServicer(greet_pb2_grpc.GreeterServicer): + def ComputeTopologyForecast(self, request, context): + print("The following request has been received:") + print(request) + + #this value indicates the size of the trainset. + #For example a history ratio of 10 would imply that the trainset will be 10 times bigger than the testset. + forecast_to_history_ratio = 10 + #history_window_seconds indicates the size of the trainset based on the requested size of the testset and the predifined history ratio + history_window_seconds = forecast_to_history_ratio * request.forecast_window_seconds + + df = getTopology(request.topology_id) + + df["ds"] = pd.to_datetime(df["ds"]) - dt.datetime(1970, 1, 1) + df["ds"] = df["ds"].dt.total_seconds() + + maximum = df["ds"].max() + minimum = maximum - history_window_seconds + print(f"The dates of the trainset would have the following range of values: From {minimum} until {maximum} ") + #smallest = df["ds"].drop_duplicates().nsmallest(2) + #smallest = smallest.reset_index() + #sampleDifference = smallest["ds"][1] - smallest["ds"][0] + #calculating the maximum size of the dataset + #futureMaximum = sampleDifference * request.forecast_window_seconds + #futureMaximum = futureMaximum + maximum + #print(f"Sample frequency: {sampleDifference}") + #print(f"The testset would have the following range of values: From {maximum} until {futureMaximum} ") + + #filtering by linkId and parsing it into integers + link_capacity = greet_pb2.LinkCapacity() + forecast_reply = greet_pb2.ForecastReply() + + for linkid in df["linkid"].unique()[:10]: + print(linkid) + newdf = df[df.linkid == linkid ] + #converting linkid to float to avoid problems with machine learning, since there is only one option. It is converted to 1.0 + newdf.loc[:,"linkid"] = 1.0 + trainset = newdf[int(newdf.shape[0] / 10):newdf.shape[0]] + testset = newdf[0:int(newdf.shape[0] / 10)] + + rf = RandomForestRegressor(n_estimators = 600, random_state = 42) + rf.fit(trainset.drop(['y'], axis=1), trainset['y']) + + forecast = rf.predict(testset.drop(['y'], axis=1)) + averageResult = round( mean(testset['y']), 2 ) + maximumCapacity = round( trainset["y"].max(), 2) + + link_capacity.linkuuid = linkid + link_capacity.forecasted_capacity = averageResult + link_capacity.total_capacity_gbps = maximumCapacity + + forecast_reply.LinkCapacityList.append(link_capacity) + + + print("The results were completely generated") + + return forecast_reply + +def getTopology(topologyId): + if topologyId == "1": + df = pd.read_csv('dataset.csv') + elif topologyId == "2": + df = pd.read_csv('dataset2.csv') + print(df.columns) + #Adapting the dataframe to have the same structure + df = df.drop(['Unnamed: 0', 'source', 'target'], axis=1) + df.rename(columns = {'id':'linkid'}, inplace = True) + df.rename(columns = {'demandValue':'y'}, inplace = True) + else: + df = pd.read_csv('./PythonGrpc-main/dataset.csv') + return df + + +def serve(): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + greet_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server) + server.add_insecure_port("localhost:50051") + server.start() + server.wait_for_termination() + +if __name__ == "__main__": + serve() \ No newline at end of file