Skip to content
Snippets Groups Projects
Commit f93b59f4 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Telemetry backend emulated.

- Refactor emulated telemetry tests
- remove unused collector class.
- Improved SyntheticMetricsGenerator class.
parent 192573a4
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!287Resolve "(CTTC) Implement Telemetry Backend Collector Emulated"
......@@ -93,12 +93,12 @@ class SyntheticMetricsGenerator():
"bytes_out" : float(bytes_out), "packet_loss": float(packet_loss), "packet_drop_count" : int(packet_drop_count),
"latency" : float(latency), "byte_drop_count": float(byte_drop_count)
}
requested_metrics = self.metric_id_mapper(sample_type_ids)
generated_samples = {metric: generated_samples[metric] for metric in requested_metrics}
requested_metrics = self.metric_id_mapper(sample_type_ids, generated_samples)
# generated_samples = {metric: generated_samples[metric] for metric in requested_metrics}
return (time.time(), resource_key, generated_samples)
return (time.time(), resource_key, requested_metrics)
def metric_id_mapper(self, sample_type_ids):
def metric_id_mapper(self, sample_type_ids, metric_dict):
"""
Maps the sample type IDs to the corresponding metric names.
......@@ -111,19 +111,19 @@ class SyntheticMetricsGenerator():
metric_names = []
for sample_type_id in sample_type_ids:
if sample_type_id == 102:
metric_names.append("packet_in")
metric_names.append(metric_dict["packet_in"])
elif sample_type_id == 101:
metric_names.append("packet_out")
metric_names.append(metric_dict["packet_out"])
elif sample_type_id == 103:
metric_names.append("packet_drop_count")
metric_names.append(metric_dict["packet_drop_count"])
elif sample_type_id == 202:
metric_names.append("bytes_in")
metric_names.append(metric_dict["bytes_in"])
elif sample_type_id == 201:
metric_names.append("bytes_out")
metric_names.append(metric_dict["bytes_out"])
elif sample_type_id == 203:
metric_names.append("byte_drop_count")
metric_names.append(metric_dict["byte_drop_count"])
elif sample_type_id == 701:
metric_names.append("latency")
metric_names.append(metric_dict["latency"])
else:
raise ValueError(f"Invalid sample type ID: {sample_type_id}")
return metric_names
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import random
import threading
import time
import logging
import queue
LOGGER = logging.getLogger(__name__)
class NetworkMetricsEmulator(threading.Thread):
"""
This collector class will generate a single emulated metric value.
"""
def __init__(self, interval=1, duration=10, metric_queue=None, network_state="moderate"):
LOGGER.info("Initiaitng Emulator")
super().__init__()
self.interval = interval
self.duration = duration
self.metric_queue = metric_queue if metric_queue is not None else queue.Queue()
self.network_state = network_state
self.running = True
self.base_utilization = None
self.states = None
self.state_probabilities = None
self.set_inital_parameter_values()
def set_inital_parameter_values(self):
self.states = ["good", "moderate", "poor"]
self.state_probabilities = {
"good" : [0.9, 0.1, 0.0],
"moderate": [0.2, 0.7, 0.1],
"poor" : [0.0, 0.3, 0.7]
}
if self.network_state == "good":
self.base_utilization = random.uniform(700, 900)
elif self.network_state == "moderate":
self.base_utilization = random.uniform(300, 700)
else:
self.base_utilization = random.uniform(100, 300)
def generate_synthetic_data_point(self):
if self.network_state == "good":
variance = random.uniform(-5, 5)
elif self.network_state == "moderate":
variance = random.uniform(-50, 50)
elif self.network_state == "poor":
variance = random.uniform(-100, 100)
else:
raise ValueError("Invalid network state. Must be 'good', 'moderate', or 'poor'.")
self.base_utilization += variance
period = 60 * 60 * random.uniform(10, 100)
amplitude = random.uniform(50, 100)
sin_wave = amplitude * np.sin(2 * np.pi * 100 / period) + self.base_utilization
random_noise = random.uniform(-10, 10)
utilization = sin_wave + random_noise
state_prob = self.state_probabilities[self.network_state]
self.network_state = random.choices(self.states, state_prob)[0]
return utilization
def run(self):
while self.running and (self.duration == -1 or self.duration > 0):
utilization = self.generate_synthetic_data_point()
self.metric_queue.put(round(utilization,3))
time.sleep(self.interval)
if self.duration > 0:
self.duration -= self.interval
if self.duration == -1:
self.duration = 0
LOGGER.debug("Emulator collector is stopped.")
self.stop()
def stop(self):
self.running = False
if not self.is_alive():
LOGGER.debug("Emulator Collector is Termintated.")
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import random
import threading
import time
import logging
import queue
LOGGER = logging.getLogger(__name__)
class NetworkMetricsEmulator(threading.Thread):
"""
This collector class will generate the emulated metrics for PKT_IN, PKT_OUT, BYTES_IN, BYTES_OUT and PKT_DROP as a list.
"""
def __init__(self, interval=1, duration=10, metric_queue=None, network_state="moderate"):
LOGGER.info("Initiaitng Emulator")
super().__init__()
self.interval = interval
self.duration = duration
self.metric_queue = metric_queue if metric_queue is not None else queue.Queue()
self.network_state = network_state
self.running = True
self.set_inital_parameter_values()
def set_inital_parameter_values(self):
self.bytes_per_pkt = random.uniform(65, 150)
self.states = ["good", "moderate", "poor"]
self.state_probabilities = {
"good" : [0.8, 0.2, 0.0],
"moderate": [0.2, 0.6, 0.2],
"poor" : [0.0, 0.4, 0.6]
}
if self.network_state == "good":
self.packet_in = random.uniform(700, 900)
elif self.network_state == "moderate":
self.packet_in = random.uniform(300, 700)
else:
self.packet_in = random.uniform(100, 300)
def generate_synthetic_data_point(self):
if self.network_state == "good":
packet_loss = random.uniform(0.01, 0.1)
random_noise = random.uniform(1,10)
elif self.network_state == "moderate":
packet_loss = random.uniform(0.1, 1)
random_noise = random.uniform(10, 40)
elif self.network_state == "poor":
packet_loss = random.uniform(1, 3)
random_noise = random.uniform(40, 100)
else:
raise ValueError("Invalid network state. Must be 'good', 'moderate', or 'poor'.")
# self.packet_in += random_noise
period = 60 * 60 * random.uniform(10, 100)
amplitude = random.uniform(50, 100)
sin_wave = amplitude * np.sin(2 * np.pi * 100 / period) + self.packet_in
packet_in = sin_wave + ((sin_wave/100) * random_noise)
packet_out = packet_in - ((packet_in / 100) * packet_loss)
bytes_in = packet_in * self.bytes_per_pkt
bytes_out = packet_out * self.bytes_per_pkt
state_prob = self.state_probabilities[self.network_state]
self.network_state = random.choices(self.states, state_prob)[0]
print (self.network_state)
return [float(packet_in), float(packet_out), float(bytes_in), float(bytes_out), float(packet_loss)]
# return packet_in
def run(self):
while self.running and (self.duration == -1 or self.duration > 0):
packet_in = self.generate_synthetic_data_point()
self.metric_queue.put(packet_in)
time.sleep(self.interval)
if self.duration > 0:
self.duration -= self.interval
if self.duration == -1:
self.duration = 0
LOGGER.debug("Emulator collector is stopped.")
self.stop()
def stop(self):
self.running = False
if not self.is_alive():
print("Thread is terminated.")
......@@ -26,7 +26,7 @@ def create_test_configuration():
{"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {
"endpoints": [
{"uuid": "eth0", "type": "ethernet", "sample_types": [101, 102]},
{"uuid": "eth1", "type": "ethernet", "sample_types": [201, 202]},
{"uuid": "eth1", "type": "ethernet", "sample_types": []},
{"uuid": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]}
]
}}},
......@@ -42,19 +42,14 @@ def create_test_configuration():
]
}
# This method is used to create a specific configuration to be used in the test case test_get_config in the test_EmulatedDriver.py file
def create_specific_config_keys():
# config = create_test_configuration()
keys_to_return = ["_connect/settings/endpoints/eth1", "/interface/[13/1/2]/settings", "_connect/address"]
return keys_to_return
# return {rule["custom"]["resource_key"]: rule["custom"]["resource_value"] for rule in config["config_rules"] if rule["custom"]["resource_key"] in keys_to_return}
# write a method to create a specific configuration to be used in the test case test_delete_config in the test_EmulatedDriver1.py file
def create_config_for_delete():
keys_to_delete = ["_connect/settings/endpoints/eth0", "/interface/[eth1]", "_connect/port"]
return keys_to_delete
# write a method to generate subscription for specific endpoints.
def create_test_subscriptions():
return [("_connect/settings/endpoints/eth1", 10, 2),
("_connect/settings/endpoints/13/1/2", 15, 3),
......@@ -63,4 +58,4 @@ def create_test_subscriptions():
def create_unscubscribe_subscriptions():
return [("_connect/settings/endpoints/eth1", 10, 2),
("_connect/settings/endpoints/13/1/2", 15, 3),
("_connect/settings/endpoints/eth0", 8, 2)]
\ No newline at end of file
("_connect/settings/endpoints/eth0", 8, 2)]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment