Skip to content
Snippets Groups Projects
Commit 70a40996 authored by Daniel Adanza Dopazo's avatar Daniel Adanza Dopazo
Browse files

Including the first implementation of the forecaster component

parent 3a8c1b69
No related branches found
No related tags found
2 merge requests!235Release TeraFlowSDN 3.0,!160Resolve "(CTTC) Forecaster component"
import greet_pb2_grpc
import greet_pb2
import grpc
#from tests import greet_pb2
#from tests import greet_pb2_grpc
import datetime as dt
def run():
with grpc.insecure_channel('localhost:50051') as channel:
stub = greet_pb2_grpc.GreeterStub(channel)
print("The stub has been created")
forecastRequest = greet_pb2.ForecastTopology(forecast_window_seconds = 36000, topology_id = "1" )
print("The request has been sent to the client")
hello_reply = stub.ComputeTopologyForecast(forecastRequest)
print("The response has been received")
print(hello_reply)
if __name__ == "__main__":
run()
\ No newline at end of file
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: greet.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0bgreet.proto\x12\x05greet\"H\n\x10\x46orecastTopology\x12\x1f\n\x17\x66orecast_window_seconds\x18\x01 \x01(\x02\x12\x13\n\x0btopology_id\x18\x02 \x01(\t\">\n\rForecastReply\x12-\n\x10LinkCapacityList\x18\x01 \x03(\x0b\x32\x13.greet.LinkCapacity\"Q\n\x14\x46orecastDelayedReply\x12\x0f\n\x07message\x18\x01 \x01(\t\x12(\n\x07request\x18\x02 \x03(\x0b\x32\x17.greet.ForecastTopology\"Z\n\x0cLinkCapacity\x12\x10\n\x08linkuuid\x18\x01 \x01(\t\x12\x1b\n\x13\x66orecasted_capacity\x18\x02 \x01(\x02\x12\x1b\n\x13total_capacity_gbps\x18\x03 \x01(\x02\x32S\n\x07Greeter\x12H\n\x17\x43omputeTopologyForecast\x12\x17.greet.ForecastTopology\x1a\x14.greet.ForecastReplyb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'greet_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals['_FORECASTTOPOLOGY']._serialized_start=22
_globals['_FORECASTTOPOLOGY']._serialized_end=94
_globals['_FORECASTREPLY']._serialized_start=96
_globals['_FORECASTREPLY']._serialized_end=158
_globals['_FORECASTDELAYEDREPLY']._serialized_start=160
_globals['_FORECASTDELAYEDREPLY']._serialized_end=241
_globals['_LINKCAPACITY']._serialized_start=243
_globals['_LINKCAPACITY']._serialized_end=333
_globals['_GREETER']._serialized_start=335
_globals['_GREETER']._serialized_end=418
# @@protoc_insertion_point(module_scope)
from google.protobuf.internal import containers as _containers
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class ForecastRequest(_message.Message):
__slots__ = ["linkid"]
LINKID_FIELD_NUMBER: _ClassVar[int]
linkid: str
def __init__(self, linkid: _Optional[str] = ...) -> None: ...
class ForecastReply(_message.Message):
__slots__ = ["message", "errorRate", "calculationTime"]
MESSAGE_FIELD_NUMBER: _ClassVar[int]
ERRORRATE_FIELD_NUMBER: _ClassVar[int]
CALCULATIONTIME_FIELD_NUMBER: _ClassVar[int]
message: str
errorRate: float
calculationTime: float
def __init__(self, message: _Optional[str] = ..., errorRate: _Optional[float] = ..., calculationTime: _Optional[float] = ...) -> None: ...
class ForecastDelayedReply(_message.Message):
__slots__ = ["message", "request"]
MESSAGE_FIELD_NUMBER: _ClassVar[int]
REQUEST_FIELD_NUMBER: _ClassVar[int]
message: str
request: _containers.RepeatedCompositeFieldContainer[ForecastRequest]
def __init__(self, message: _Optional[str] = ..., request: _Optional[_Iterable[_Union[ForecastRequest, _Mapping]]] = ...) -> None: ...
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import greet_pb2 as greet__pb2
class GreeterStub(object):
"""The greeting service definition.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.ComputeTopologyForecast = channel.unary_unary(
'/greet.Greeter/ComputeTopologyForecast',
request_serializer=greet__pb2.ForecastTopology.SerializeToString,
response_deserializer=greet__pb2.ForecastReply.FromString,
)
class GreeterServicer(object):
"""The greeting service definition.
"""
def ComputeTopologyForecast(self, request, context):
"""Unary
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_GreeterServicer_to_server(servicer, server):
rpc_method_handlers = {
'ComputeTopologyForecast': grpc.unary_unary_rpc_method_handler(
servicer.ComputeTopologyForecast,
request_deserializer=greet__pb2.ForecastTopology.FromString,
response_serializer=greet__pb2.ForecastReply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'greet.Greeter', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class Greeter(object):
"""The greeting service definition.
"""
@staticmethod
def ComputeTopologyForecast(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/greet.Greeter/ComputeTopologyForecast',
greet__pb2.ForecastTopology.SerializeToString,
greet__pb2.ForecastReply.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
from concurrent import futures
import time
import grpc
import greet_pb2
import greet_pb2_grpc
#imported by me
import pandas as pd
import numpy as np
from prophet import Prophet
import matplotlib.pyplot as plt
import time
from sklearn.ensemble import RandomForestRegressor
import datetime as dt
from statistics import mean
class GreeterServicer(greet_pb2_grpc.GreeterServicer):
def ComputeTopologyForecast(self, request, context):
print("The following request has been received:")
print(request)
#this value indicates the size of the trainset.
#For example a history ratio of 10 would imply that the trainset will be 10 times bigger than the testset.
forecast_to_history_ratio = 10
#history_window_seconds indicates the size of the trainset based on the requested size of the testset and the predifined history ratio
history_window_seconds = forecast_to_history_ratio * request.forecast_window_seconds
df = getTopology(request.topology_id)
df["ds"] = pd.to_datetime(df["ds"]) - dt.datetime(1970, 1, 1)
df["ds"] = df["ds"].dt.total_seconds()
maximum = df["ds"].max()
minimum = maximum - history_window_seconds
print(f"The dates of the trainset would have the following range of values: From {minimum} until {maximum} ")
#smallest = df["ds"].drop_duplicates().nsmallest(2)
#smallest = smallest.reset_index()
#sampleDifference = smallest["ds"][1] - smallest["ds"][0]
#calculating the maximum size of the dataset
#futureMaximum = sampleDifference * request.forecast_window_seconds
#futureMaximum = futureMaximum + maximum
#print(f"Sample frequency: {sampleDifference}")
#print(f"The testset would have the following range of values: From {maximum} until {futureMaximum} ")
#filtering by linkId and parsing it into integers
link_capacity = greet_pb2.LinkCapacity()
forecast_reply = greet_pb2.ForecastReply()
for linkid in df["linkid"].unique()[:10]:
print(linkid)
newdf = df[df.linkid == linkid ]
#converting linkid to float to avoid problems with machine learning, since there is only one option. It is converted to 1.0
newdf.loc[:,"linkid"] = 1.0
trainset = newdf[int(newdf.shape[0] / 10):newdf.shape[0]]
testset = newdf[0:int(newdf.shape[0] / 10)]
rf = RandomForestRegressor(n_estimators = 600, random_state = 42)
rf.fit(trainset.drop(['y'], axis=1), trainset['y'])
forecast = rf.predict(testset.drop(['y'], axis=1))
averageResult = round( mean(testset['y']), 2 )
maximumCapacity = round( trainset["y"].max(), 2)
link_capacity.linkuuid = linkid
link_capacity.forecasted_capacity = averageResult
link_capacity.total_capacity_gbps = maximumCapacity
forecast_reply.LinkCapacityList.append(link_capacity)
print("The results were completely generated")
return forecast_reply
def getTopology(topologyId):
if topologyId == "1":
df = pd.read_csv('dataset.csv')
elif topologyId == "2":
df = pd.read_csv('dataset2.csv')
print(df.columns)
#Adapting the dataframe to have the same structure
df = df.drop(['Unnamed: 0', 'source', 'target'], axis=1)
df.rename(columns = {'id':'linkid'}, inplace = True)
df.rename(columns = {'demandValue':'y'}, inplace = True)
else:
df = pd.read_csv('./PythonGrpc-main/dataset.csv')
return df
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
greet_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
server.add_insecure_port("localhost:50051")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
serve()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment