Skip to content
Snippets Groups Projects
Commit 3b36d20e authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Forecaster component:

- Updated TODO.txt
- Corrected multiple bugs in KpiManager
- Corrected multiple bugs in ForecasterServicerImpl
- Corrected multiple bugs in unitary tests
- Added temporary test data files
parent 608e8ab6
No related branches found
No related tags found
2 merge requests!235Release TeraFlowSDN 3.0,!160Resolve "(CTTC) Forecaster component"
......@@ -12,8 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List
import grpc, logging
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.method_wrappers.ServiceExceptions import NotFoundException
from common.proto.context_pb2 import LinkAttributes, LinkId
from common.proto.forecaster_pb2 import (
ForecastLinkCapacityReply, ForecastLinkCapacityRequest,
ForecastTopologyCapacityReply, ForecastTopologyCapacityRequest
......@@ -49,6 +52,11 @@ class ForecasterServiceServicerImpl(ForecasterServiceServicer):
history_window_seconds = FORECAST_TO_HISTORY_RATIO * forecast_window_seconds
link_id = request.link_id
context_client = ContextClient()
link = get_link(context_client, link_id.link_uuid.uuid)
if link is None: raise NotFoundException('Link', link_id.link_uuid.uuid)
kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids([link_id])
kpi_to_link_ids = {
link_id : kpi_id
......@@ -62,11 +70,8 @@ class ForecasterServiceServicerImpl(ForecasterServiceServicer):
df_historical_data = self._kpi_manager.get_kpi_id_samples([kpi_id], start_timestamp, end_timestamp)
forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id)
context_client = ContextClient()
link = get_link(context_client, link_id.link_uuid.uuid)
reply = ForecastLinkCapacityReply()
reply.link_id.CopyFrom(link_id)
reply.link_id.CopyFrom(link_id) # pylint: disable=no-member
reply.total_capacity_gbps = link.attributes.total_capacity_gbps
reply.current_used_capacity_gbps = link.attributes.used_capacity_gbps
reply.forecast_used_capacity_gbps = forecast_used_capacity_gbps
......@@ -86,12 +91,15 @@ class ForecasterServiceServicerImpl(ForecasterServiceServicer):
topology_uuid = request.topology_id.topology_uuid.uuid
context_client = ContextClient()
topology_details = get_topology_details(context_client, topology_uuid, context_uuid=context_uuid)
if topology_details is None:
topology_uuid = '{:s}/{:s}'.format(context_uuid, topology_uuid)
raise NotFoundException('Topology', topology_uuid)
link_ids = list()
link_capacities = dict()
link_ids : List[LinkId] = list()
link_capacities : Dict[str, LinkAttributes] = dict()
for link in topology_details.links:
link_ids.append(link.link_id)
link_capacities[link.link_id] = link.attributes
link_capacities[link.link_id.link_uuid.uuid] = link.attributes
kpi_id_map = self._kpi_manager.get_kpi_ids_from_link_ids(link_ids)
kpi_to_link_ids = {
......@@ -109,7 +117,7 @@ class ForecasterServiceServicerImpl(ForecasterServiceServicer):
for link_id, kpi_id in kpi_to_link_ids.items():
link_attributes = link_capacities[link_id]
forecast_used_capacity_gbps = compute_forecast(df_historical_data, kpi_id)
link_capacity : ForecastLinkCapacityReply = reply.link_capacities.add()
link_capacity : ForecastLinkCapacityReply = reply.link_capacities.add() # pylint: disable=no-member
link_capacity.link_id.CopyFrom(link_id)
link_capacity.total_capacity_gbps = link_attributes.total_capacity_gbps
link_capacity.current_used_capacity_gbps = link_attributes.used_capacity_gbps
......
......@@ -15,7 +15,6 @@
import pandas
from typing import Dict, List, Tuple
from common.proto.context_pb2 import Empty, LinkId
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2 import KpiId, KpiQuery
from monitoring.client.MonitoringClient import MonitoringClient
......@@ -25,21 +24,21 @@ class KpiManager:
def get_kpi_ids_from_link_ids(
self, link_ids : List[LinkId]
) -> Dict[Tuple[LinkId, int], KpiId]:
) -> Dict[Tuple[str, int], KpiId]:
link_uuids = {link_id.link_uuid.uuid for link_id in link_ids}
kpi_descriptors = self._monitoring_client.GetKpiDescriptorList(Empty())
kpi_ids : Dict[Tuple[LinkId, int], KpiId] = {
(kpi_descriptor.link_id, kpi_descriptor.kpi_sample_type) : kpi_descriptor.kpi_id
for kpi_descriptor in kpi_descriptors
kpi_ids : Dict[Tuple[str, int], KpiId] = {
(kpi_descriptor.link_id.link_uuid.uuid, kpi_descriptor.kpi_sample_type) : kpi_descriptor.kpi_id
for kpi_descriptor in kpi_descriptors.kpi_descriptor_list
if kpi_descriptor.link_id.link_uuid.uuid in link_uuids
}
return kpi_ids
def get_kpi_id_samples(
self, kpi_ids : List[KpiId], start_timestamp : float, end_timestamp : float
self, kpi_uuids : List[str], start_timestamp : float, end_timestamp : float
) -> pandas.DataFrame:
kpi_query = KpiQuery()
kpi_query.kpi_ids.extend(kpi_ids) # pylint: disable=no-member
for kpi_uuid in kpi_uuids: kpi_query.kpi_ids.add().kpi_id.uuid = kpi_uuid
kpi_query.start_timestamp.timestamp = start_timestamp # pylint: disable=no-member
kpi_query.end_timestamp.timestamp = end_timestamp # pylint: disable=no-member
raw_kpi_table = self._monitoring_client.QueryKpiData(kpi_query)
......
......@@ -3,13 +3,27 @@ Use a smaller network:
INFO forecaster.tests.Tools:Tools.py:75 Discovering Devices and Links...
INFO forecaster.tests.Tools:Tools.py:104 Found 22 devices and 462 links...
ERROR forecaster.service.ForecasterServiceServicerImpl:Decorator.py:233 ForecastTopologyCapacity exception
ERROR grpc._server:_server.py:453 Exception calling application: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
Traceback (most recent call last):
File "/home/tfs/tfs-ctrl/src/common/method_wrappers/Decorator.py", line 220, in inner_wrapper
reply = func(self, request, grpc_context)
File "/home/tfs/tfs-ctrl/src/forecaster/service/ForecasterServiceServicerImpl.py", line 94, in ForecastTopologyCapacity
link_capacities[link.link_id] = link.attributes
TypeError: unhashable type: 'LinkId'
File "/home/tfs/.pyenv/versions/3.9.13/envs/tfs/lib/python3.9/site-packages/grpc/_server.py", line 443, in _call_behavior
response_or_iterator = behavior(argument, context)
File "/home/tfs/tfs-ctrl/src/common/tests/MockServicerImpl_Monitoring.py", line 99, in QueryKpiData
df_samples = self.ts_db.filter(kpi_uuids, start_timestamp=start_timestamp, end_timestamp=end_timestamp)
File "/home/tfs/tfs-ctrl/src/common/tests/InMemoryTimeSeriesDatabase.py", line 30, in filter
if len(kpi_uuids) > 0: data = data[data.kpi_uuid in kpi_uuids]
File "/home/tfs/.pyenv/versions/3.9.13/envs/tfs/lib/python3.9/site-packages/pandas/core/generic.py", line 1527, in __nonzero__
raise ValueError(
ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
Seems mock_context does not add device and link ids to the topology
def test_forecast_link(
context_client : ContextClient,
forecaster_client : ForecasterClient,
): # pylint: disable=redefined-outer-name
topology = context_client.GetTopology(ADMIN_TOPOLOGY_ID)
> link_id = topology.link_ids[0]
E IndexError: list index (0) out of range
......
......@@ -45,7 +45,7 @@ def read_csv(csv_file : str) -> pandas.DataFrame:
LOGGER.info(' DONE')
LOGGER.info('Parsing and Adapting columns...')
if 'dataset.csv' in csv_file:
if 'dataset.csv' in csv_file or 'dataset-short.csv' in csv_file:
df.rename(columns={'linkid': 'link_id', 'ds': 'timestamp', 'y': 'used_capacity_gbps'}, inplace=True)
df[['source', 'destination']] = df['link_id'].str.split('_', expand=True)
elif 'dataset2.csv' in csv_file:
......@@ -94,7 +94,7 @@ def compose_descriptors(df : pandas.DataFrame) -> Dict:
if link_uuid not in links:
total_capacity_gbps = df[df.link_id==link_uuid]['used_capacity_gbps'].max()
total_capacity_gbps = math.ceil(total_capacity_gbps / 100) * 100 # round up in steps of 100
used_capacity_gbps = df[df.link_id==link_uuid]['used_capacity_gbps'].tail(1)
used_capacity_gbps = df[df.link_id==link_uuid].used_capacity_gbps.iat[-1] # get last value
links[link_uuid] = {
'id': link_uuid,
'src_dev': src_device_uuid, 'src_port': dst_device_uuid,
......
"linkid","ds","y"
"at1.at_be1.be","2005-07-29 11:30:00",20.254615
"at1.at_ch1.ch","2005-07-29 11:30:00",54.915569
"at1.at_de1.de","2005-07-29 11:30:00",35.440825
"at1.at_es1.es","2005-07-29 11:30:00",0.014227
"at1.at_fr1.fr","2005-07-29 11:30:00",1.371593
"at1.at_gr1.gr","2005-07-29 11:30:00",224.449018
"at1.at_hr1.hr","2005-07-29 11:30:00",652.82534
"at1.at_hu1.hu","2005-07-29 11:30:00",1887.08947
"at1.at_ie1.ie","2005-07-29 11:30:00",2.878366
"at1.at_il1.il","2005-07-29 11:30:00",379.481098
"at1.at_it1.it","2005-07-29 11:30:00",92.497118
"at1.at_lu1.lu","2005-07-29 11:30:00",0.163239
"at1.at_nl1.nl","2005-07-29 11:30:00",6.314097
"at1.at_ny1.ny","2005-07-29 11:30:00",2.282933
"at1.at_pt1.pt","2005-07-29 11:30:00",0.132256
"at1.at_se1.se","2005-07-29 11:30:00",1425.471977
"at1.at_si1.si","2005-07-29 11:30:00",1822.904739
"at1.at_uk1.uk","2005-07-29 11:30:00",1.661015
This diff is collapsed.
......@@ -17,25 +17,32 @@ from typing import Dict, Tuple
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.proto.context_pb2 import ContextId, TopologyId
from common.proto.forecaster_pb2 import ForecastLinkCapacityRequest, ForecastTopologyCapacityRequest
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2 import KpiDescriptor
from common.tools.descriptor.Loader import DescriptorLoader, check_descriptor_load_results, validate_empty_scenario
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from forecaster.client.ForecasterClient import ForecasterClient
from forecaster.tests.Tools import compose_descriptors, read_csv
from monitoring.client.MonitoringClient import MonitoringClient
from .MockService_Dependencies import MockService_Dependencies
from .PrepareTestScenario import ( # pylint: disable=unused-import
# be careful, order of symbols is important here!
mock_service, forecaster_service, context_client, monitoring_client, forecaster_client)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
logging.getLogger('common.tests.MockServicerImpl_Context').setLevel(logging.INFO)
logging.getLogger('context.client.ContextClient').setLevel(logging.INFO)
JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME)
ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID)
ADMIN_TOPOLOGY_ID = TopologyId(**json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id=JSON_ADMIN_CONTEXT_ID))
CSV_DATA_FILE = 'forecaster/tests/data/dataset.csv'
CSV_DATA_FILE = 'forecaster/tests/data/dataset-short.csv'
#CSV_DATA_FILE = 'forecaster/tests/data/dataset.csv'
#CSV_DATA_FILE = 'forecaster/tests/data/dataset2.csv'
DESC_DATS_FILE = 'forecaster/tests/data/descriptor.json'
......@@ -48,10 +55,12 @@ def scenario() -> Tuple[pandas.DataFrame, Dict]:
yield df, descriptors
def test_prepare_environment(
context_client : ContextClient, # pylint: disable=redefined-outer-name
scenario : Tuple[pandas.DataFrame, Dict]
context_client : ContextClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name
mock_service : MockService_Dependencies, # pylint: disable=redefined-outer-name
scenario : Tuple[pandas.DataFrame, Dict] # pylint: disable=redefined-outer-name
) -> None:
_, descriptors = scenario
df, descriptors = scenario
validate_empty_scenario(context_client)
descriptor_loader = DescriptorLoader(descriptors=descriptors, context_client=context_client)
......@@ -64,6 +73,20 @@ def test_prepare_environment(
assert len(response.service_ids) == 0
assert len(response.slice_ids) == 0
for link in descriptors['links']:
link_uuid = link['link_id']['link_uuid']['uuid']
kpi_descriptor = KpiDescriptor()
kpi_descriptor.kpi_id.kpi_id.uuid = link_uuid # pylint: disable=no-member
kpi_descriptor.kpi_description = 'Used Capacity in Link: {:s}'.format(link_uuid)
kpi_descriptor.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS
kpi_descriptor.link_id.link_uuid.uuid = link_uuid # pylint: disable=no-member
monitoring_client.SetKpi(kpi_descriptor)
mock_service.monitoring_servicer.ts_db._data = df.rename(columns={
'link_id': 'kpi_uuid',
'used_capacity_gbps': 'value'
})
def test_forecast_link(
context_client : ContextClient,
forecaster_client : ForecasterClient,
......@@ -97,8 +120,8 @@ def test_forecast_topology(
# TODO: validate forecasted values
def test_cleanup_environment(
context_client : ContextClient, # pylint: disable=redefined-outer-name
scenario : Tuple[pandas.DataFrame, Dict]
context_client : ContextClient, # pylint: disable=redefined-outer-name
scenario : Tuple[pandas.DataFrame, Dict] # pylint: disable=redefined-outer-name
) -> None:
_, descriptors = scenario
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment