Skip to content
Snippets Groups Projects
Commit a63b2cbb authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Forecaster component:

- Complete implementation of forecaster component
parent 3e81bea6
No related branches found
No related tags found
2 merge requests!235Release TeraFlowSDN 3.0,!160Resolve "(CTTC) Forecaster component"
...@@ -11,3 +11,8 @@ ...@@ -11,3 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
#numpy==1.23.*
pandas==1.5.*
#prophet==1.1.*
scikit-learn==1.1.*
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math, pandas
from statistics import mean
from sklearn.ensemble import RandomForestRegressor
from common.proto.monitoring_pb2 import KpiId
from forecaster.Config import FORECAST_TO_HISTORY_RATIO
def compute_forecast(samples : pandas.DataFrame, kpi_id : KpiId) -> float:
kpi_uuid = kpi_id.kpi_id.uuid
samples = samples[samples.kpi_id == kpi_uuid]
num_samples = samples.shape[0]
num_samples_test = math.ceil(num_samples / FORECAST_TO_HISTORY_RATIO)
num_samples_train = num_samples - num_samples_test
if num_samples_train <= 0: raise Exception('Wrong number of train samples: {:d}'.format(num_samples_train))
if num_samples_test <= 0: raise Exception('Wrong number of test samples: {:d}'.format(num_samples_test ))
train_set = samples[0:num_samples_train]
test_set = samples[num_samples_train:num_samples]
rfr = RandomForestRegressor(n_estimators=600, random_state=42)
rfr.fit(train_set.drop(['kpi_id', 'value'], axis=1), train_set['value'])
forecast = rfr.predict(test_set.drop(['value'], axis=1))
avg_forecast = round(mean(forecast), 2)
return avg_forecast
...@@ -12,113 +12,106 @@ ...@@ -12,113 +12,106 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import grpc, logging, numpy, pandas import grpc, logging
from datetime import datetime
from forecaster.Config import FORECAST_TO_HISTORY_RATIO
from prophet import Prophet
from sklearn.ensemble import RandomForestRegressor
from statistics import mean
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.forecaster_pb2 import ( from common.proto.forecaster_pb2 import (
ForecastLinkCapacityReply, ForecastLinkCapacityRequest, ForecastLinkCapacityReply, ForecastLinkCapacityRequest,
ForecastTopologyCapacityReply, ForecastTopologyCapacityRequest ForecastTopologyCapacityReply, ForecastTopologyCapacityRequest
) )
from common.proto.forecaster_pb2_grpc import ForecasterServiceServicer from common.proto.forecaster_pb2_grpc import ForecasterServiceServicer
from common.tools.grpc.Tools import grpc_message_to_json_string from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.context_queries.Link import get_link
from common.tools.context_queries.Topology import get_topology_details
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
from forecaster.Config import FORECAST_TO_HISTORY_RATIO
from forecaster.service.Forecaster import compute_forecast
from forecaster.service.KpiManager import KpiManager
from forecaster.service.Tools import time_utc_now_to_float
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Forecaster', 'RPC') METRICS_POOL = MetricsPool('Forecaster', 'RPC')
def getTopology(topologyId):
if topologyId == "1":
df = pandas.read_csv('dataset.csv')
elif topologyId == "2":
df = pandas.read_csv('dataset2.csv')
print(df.columns)
#Adapting the dataframe to have the same structure
df = df.drop(['Unnamed: 0', 'source', 'target'], axis=1)
df.rename(columns = {'id':'linkid'}, inplace = True)
df.rename(columns = {'demandValue':'y'}, inplace = True)
else:
df = pandas.read_csv('./PythonGrpc-main/dataset.csv')
return df
def compute_forecast(df : pandas.DataFrame):
trainset = df[int(df.shape[0] / 10):df.shape[0]]
testset = df[0:int(df.shape[0] / 10)]
rf = RandomForestRegressor(n_estimators = 600, random_state = 42)
rf.fit(trainset.drop(['y'], axis=1), trainset['y'])
forecast = rf.predict(testset.drop(['y'], axis=1))
average_result = round(mean(testset['y']), 2)
class ForecasterServiceServicerImpl(ForecasterServiceServicer): class ForecasterServiceServicerImpl(ForecasterServiceServicer):
def __init__(self) -> None: def __init__(self) -> None:
LOGGER.debug('Creating Servicer...') LOGGER.debug('Creating Servicer...')
self._kpi_manager = KpiManager()
LOGGER.debug('Servicer Created') LOGGER.debug('Servicer Created')
def _forecast_link_capacity(self, forecast_window_seconds : float):
# history_window_seconds indicates the size of the train-set based on the
# requested size of the test-set and the configured history ratio
history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def ForecastLinkCapacity( def ForecastLinkCapacity(
self, request : ForecastLinkCapacityRequest, context : grpc.ServicerContext self, request : ForecastLinkCapacityRequest, context : grpc.ServicerContext
) -> ForecastLinkCapacityReply: ) -> ForecastLinkCapacityReply:
forecast_window_seconds = request.forecast_window_seconds forecast_window_seconds = request.forecast_window_seconds
df = getLink(request.link_id)
self._forecast_link_capacity(forecast_window_seconds) # history_window_seconds indicates the size of the train-set based on the
return ForecastLinkCapacityReply() # requested size of the test-set and the configured history ratio
history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds
link_id = request.link_id
kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids([link_id])
kpi_to_link_ids = {
link_id : kpi_id
for (link_id, kpi_sample_type), kpi_id in kpi_id_map.items()
if kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS
}
kpi_id = kpi_to_link_ids[link_id]
end_timestamp = time_utc_now_to_float()
start_timestamp = end_timestamp - history_window_seconds
df_historical_data = self._kpi_manager.get_kpi_id_samples([kpi_id], start_timestamp, end_timestamp)
forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id)
context_client = ContextClient()
link = get_link(context_client, link_id.link_uuid.uuid)
reply = ForecastLinkCapacityReply()
reply.link_id.CopyFrom(link_id)
reply.total_capacity_gbps = link.attributes.total_capacity_gbps
reply.current_used_capacity_gbps = link.attributes.used_capacity_gbps
reply.forecast_used_capacity_gbps = forecast_used_capacity_gbps
return reply
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def ForecastTopologyCapacity( def ForecastTopologyCapacity(
self, request : ForecastTopologyCapacityRequest, context : grpc.ServicerContext self, request : ForecastTopologyCapacityRequest, context : grpc.ServicerContext
) -> ForecastTopologyCapacityReply: ) -> ForecastTopologyCapacityReply:
forecast_window_seconds = request.forecast_window_seconds forecast_window_seconds = request.forecast_window_seconds
df = getTopology(request.topology_id)
self._forecast_link_capacity(forecast_window_seconds)
return ForecastTopologyCapacityReply()
# history_window_seconds indicates the size of the train-set based on the
# requested size of the test-set and the configured history ratio
history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds
context_uuid = request.topology_id.context_id.context_uuid.uuid
topology_uuid = request.topology_id.topology_uuid.uuid
context_client = ContextClient()
class GreeterServicer(greet_pb2_grpc.GreeterServicer): topology_details = get_topology_details(context_client, topology_uuid, context_uuid=context_uuid)
def ComputeTopologyForecast(self, request, context):
link_ids = list()
df['ds'] = pandas.to_datetime(df['ds']) - datetime(1970, 1, 1) link_capacities = dict()
df['ds'] = df['ds'].dt.total_seconds() for link in topology_details.links:
link_ids.append(link.link_id)
maximum = df['ds'].max() link_capacities[link.link_id] = link.attributes
minimum = maximum - history_window_seconds
print(f'The dates of the trainset would have the following range of values: From {minimum} until {maximum} ') kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids(link_ids)
kpi_to_link_ids = {
forecast_reply = greet_pb2.ForecastReply() link_id : kpi_id
for (link_id, kpi_sample_type), kpi_id in kpi_id_map.items()
for linkid in df['linkid'].unique()[:10]: if kpi_sample_type == KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS
print(linkid) }
newdf = df[df.linkid == linkid ]
#converting linkid to float to avoid problems with machine learning, since there is only one option. It is converted to 1.0 kpi_ids = list(kpi_to_link_ids.keys())
newdf.loc[:,'linkid'] = 1.0 end_timestamp = time_utc_now_to_float()
start_timestamp = end_timestamp - history_window_seconds
trainset = newdf[int(newdf.shape[0] / 10):newdf.shape[0]] df_historical_data = self._kpi_manager.get_kpi_id_samples(kpi_ids, start_timestamp, end_timestamp)
testset = newdf[0:int(newdf.shape[0] / 10)]
reply = ForecastTopologyCapacityReply()
rf = RandomForestRegressor(n_estimators = 600, random_state = 42) for link_id, kpi_id in kpi_to_link_ids.items():
rf.fit(trainset.drop(['y'], axis=1), trainset['y']) link_attributes = link_capacities[link_id]
forecast = rf.predict(testset.drop(['y'], axis=1)) forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id)
averageResult = round(mean(testset['y']), 2) link_capacity : ForecastLinkCapacityReply = reply.link_capacities.add()
link_capacity.link_id.CopyFrom(link_id)
link_capacity = greet_pb2.LinkCapacity() link_capacity.total_capacity_gbps = link_attributes.total_capacity_gbps
link_capacity.linkuuid = linkid link_capacity.current_used_capacity_gbps = link_attributes.used_capacity_gbps
link_capacity.forecasted_capacity = averageResult link_capacity.forecast_used_capacity_gbps = forecast_used_capacity_gbps
return reply
forecast_reply.LinkCapacityList.append(link_capacity)
return forecast_reply
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas
from typing import Dict, List, Tuple
from common.proto.context_pb2 import Empty, LinkId
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2 import KpiId, KpiQuery
from monitoring.client.MonitoringClient import MonitoringClient
class KpiManager:
def __init__(self) -> None:
self._monitoring_client = MonitoringClient()
def get_kpi_ids_from_link_ids(
self, link_ids : List[LinkId]
) -> Dict[Tuple[LinkId, KpiSampleType], KpiId]:
link_uuids = {link_id.link_uuid.uuid for link_id in link_ids}
kpi_descriptors = self._monitoring_client.GetKpiDescriptorList(Empty())
kpi_ids : Dict[Tuple[LinkId, KpiSampleType], KpiId] = {
(kpi_descriptor.link_id, kpi_descriptor.kpi_sample_type) : kpi_descriptor.kpi_id
for kpi_descriptor in kpi_descriptors
if kpi_descriptor.link_id.link_uuid.uuid in link_uuids
}
return kpi_ids
def get_kpi_id_samples(
self, kpi_ids : List[KpiId], start_timestamp : float, end_timestamp : float
) -> pandas.DataFrame:
kpi_query = KpiQuery()
kpi_query.kpi_ids.extend(kpi_ids) # pylint: disable=no-member
kpi_query.start_timestamp.timestamp = start_timestamp # pylint: disable=no-member
kpi_query.end_timestamp.timestamp = end_timestamp # pylint: disable=no-member
raw_kpi_table = self._monitoring_client.QueryKpiData(kpi_query)
data : List[Tuple[str, float, float]] = list()
for raw_kpi_list in raw_kpi_table.raw_kpi_lists:
kpi_uuid = raw_kpi_list.kpi_id.kpi_id.uuid
for raw_kpi in raw_kpi_list.raw_kpis:
timestamp = raw_kpi.timestamp.timestamp
value = float(getattr(raw_kpi.kpi_value, raw_kpi.kpi_value.WhichOneof('value')))
data.append((timestamp, kpi_uuid, value))
df = pandas.DataFrame(data, columns=['timestamp', 'kpi_id', 'value'])
df['timestamp'] = pandas.to_datetime(df['timestamp'].astype('int'), unit='s', utc=True)
df.sort_values('timestamp', ascending=True, inplace=True)
return df
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import calendar
from datetime import datetime, timezone
def time_datetime_to_int(dt_time : datetime) -> int:
return int(calendar.timegm(dt_time.timetuple()))
def time_datetime_to_float(dt_time : datetime) -> float:
return time_datetime_to_int(dt_time) + (dt_time.microsecond / 1.e6)
def time_utc_now_to_datetime() -> datetime:
return datetime.now(tz=timezone.utc)
def time_utc_now_to_float() -> float:
return time_datetime_to_float(time_utc_now_to_datetime())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment