diff --git a/src/telemetry/backend/drivers/emulated/SyntheticMetricsGenerator.py b/src/telemetry/backend/drivers/emulated/SyntheticMetricsGenerator.py index 5471fef198604bf87589c350827f49da217f177f..1af542eb498ec31d021181d64b55f52b5d3fbc42 100644 --- a/src/telemetry/backend/drivers/emulated/SyntheticMetricsGenerator.py +++ b/src/telemetry/backend/drivers/emulated/SyntheticMetricsGenerator.py @@ -93,12 +93,12 @@ class SyntheticMetricsGenerator(): "bytes_out" : float(bytes_out), "packet_loss": float(packet_loss), "packet_drop_count" : int(packet_drop_count), "latency" : float(latency), "byte_drop_count": float(byte_drop_count) } - requested_metrics = self.metric_id_mapper(sample_type_ids) - generated_samples = {metric: generated_samples[metric] for metric in requested_metrics} + requested_metrics = self.metric_id_mapper(sample_type_ids, generated_samples) + # generated_samples = {metric: generated_samples[metric] for metric in requested_metrics} - return (time.time(), resource_key, generated_samples) + return (time.time(), resource_key, requested_metrics) - def metric_id_mapper(self, sample_type_ids): + def metric_id_mapper(self, sample_type_ids, metric_dict): """ Maps the sample type IDs to the corresponding metric names. @@ -111,19 +111,19 @@ class SyntheticMetricsGenerator(): metric_names = [] for sample_type_id in sample_type_ids: if sample_type_id == 102: - metric_names.append("packet_in") + metric_names.append(metric_dict["packet_in"]) elif sample_type_id == 101: - metric_names.append("packet_out") + metric_names.append(metric_dict["packet_out"]) elif sample_type_id == 103: - metric_names.append("packet_drop_count") + metric_names.append(metric_dict["packet_drop_count"]) elif sample_type_id == 202: - metric_names.append("bytes_in") + metric_names.append(metric_dict["bytes_in"]) elif sample_type_id == 201: - metric_names.append("bytes_out") + metric_names.append(metric_dict["bytes_out"]) elif sample_type_id == 203: - metric_names.append("byte_drop_count") + metric_names.append(metric_dict["byte_drop_count"]) elif sample_type_id == 701: - metric_names.append("latency") + metric_names.append(metric_dict["latency"]) else: raise ValueError(f"Invalid sample type ID: {sample_type_id}") return metric_names diff --git a/src/telemetry/backend/service/EmulatedCollector.py b/src/telemetry/backend/service/EmulatedCollector.py deleted file mode 100644 index cffff516f00ddb338739df0a3538a52900f07601..0000000000000000000000000000000000000000 --- a/src/telemetry/backend/service/EmulatedCollector.py +++ /dev/null @@ -1,92 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import numpy as np -import random -import threading -import time -import logging -import queue - -LOGGER = logging.getLogger(__name__) - -class NetworkMetricsEmulator(threading.Thread): - """ - This collector class will generate a single emulated metric value. - """ - def __init__(self, interval=1, duration=10, metric_queue=None, network_state="moderate"): - LOGGER.info("Initiaitng Emulator") - super().__init__() - self.interval = interval - self.duration = duration - self.metric_queue = metric_queue if metric_queue is not None else queue.Queue() - self.network_state = network_state - self.running = True - self.base_utilization = None - self.states = None - self.state_probabilities = None - self.set_inital_parameter_values() - - def set_inital_parameter_values(self): - self.states = ["good", "moderate", "poor"] - self.state_probabilities = { - "good" : [0.9, 0.1, 0.0], - "moderate": [0.2, 0.7, 0.1], - "poor" : [0.0, 0.3, 0.7] - } - if self.network_state == "good": - self.base_utilization = random.uniform(700, 900) - elif self.network_state == "moderate": - self.base_utilization = random.uniform(300, 700) - else: - self.base_utilization = random.uniform(100, 300) - - def generate_synthetic_data_point(self): - if self.network_state == "good": - variance = random.uniform(-5, 5) - elif self.network_state == "moderate": - variance = random.uniform(-50, 50) - elif self.network_state == "poor": - variance = random.uniform(-100, 100) - else: - raise ValueError("Invalid network state. Must be 'good', 'moderate', or 'poor'.") - self.base_utilization += variance - - period = 60 * 60 * random.uniform(10, 100) - amplitude = random.uniform(50, 100) - sin_wave = amplitude * np.sin(2 * np.pi * 100 / period) + self.base_utilization - random_noise = random.uniform(-10, 10) - utilization = sin_wave + random_noise - - state_prob = self.state_probabilities[self.network_state] - self.network_state = random.choices(self.states, state_prob)[0] - - return utilization - - def run(self): - while self.running and (self.duration == -1 or self.duration > 0): - utilization = self.generate_synthetic_data_point() - self.metric_queue.put(round(utilization,3)) - time.sleep(self.interval) - if self.duration > 0: - self.duration -= self.interval - if self.duration == -1: - self.duration = 0 - LOGGER.debug("Emulator collector is stopped.") - self.stop() - - def stop(self): - self.running = False - if not self.is_alive(): - LOGGER.debug("Emulator Collector is Termintated.") diff --git a/src/telemetry/backend/service/EmulatedCollectorMultiple.py b/src/telemetry/backend/service/EmulatedCollectorMultiple.py deleted file mode 100644 index 5be634deac941fb3c17ed1b5c031da0bd2357600..0000000000000000000000000000000000000000 --- a/src/telemetry/backend/service/EmulatedCollectorMultiple.py +++ /dev/null @@ -1,97 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import numpy as np -import random -import threading -import time -import logging -import queue - -LOGGER = logging.getLogger(__name__) - -class NetworkMetricsEmulator(threading.Thread): - """ - This collector class will generate the emulated metrics for PKT_IN, PKT_OUT, BYTES_IN, BYTES_OUT and PKT_DROP as a list. - """ - def __init__(self, interval=1, duration=10, metric_queue=None, network_state="moderate"): - LOGGER.info("Initiaitng Emulator") - super().__init__() - self.interval = interval - self.duration = duration - self.metric_queue = metric_queue if metric_queue is not None else queue.Queue() - self.network_state = network_state - self.running = True - self.set_inital_parameter_values() - - def set_inital_parameter_values(self): - self.bytes_per_pkt = random.uniform(65, 150) - self.states = ["good", "moderate", "poor"] - self.state_probabilities = { - "good" : [0.8, 0.2, 0.0], - "moderate": [0.2, 0.6, 0.2], - "poor" : [0.0, 0.4, 0.6] - } - if self.network_state == "good": - self.packet_in = random.uniform(700, 900) - elif self.network_state == "moderate": - self.packet_in = random.uniform(300, 700) - else: - self.packet_in = random.uniform(100, 300) - - def generate_synthetic_data_point(self): - if self.network_state == "good": - packet_loss = random.uniform(0.01, 0.1) - random_noise = random.uniform(1,10) - elif self.network_state == "moderate": - packet_loss = random.uniform(0.1, 1) - random_noise = random.uniform(10, 40) - elif self.network_state == "poor": - packet_loss = random.uniform(1, 3) - random_noise = random.uniform(40, 100) - else: - raise ValueError("Invalid network state. Must be 'good', 'moderate', or 'poor'.") - # self.packet_in += random_noise - - period = 60 * 60 * random.uniform(10, 100) - amplitude = random.uniform(50, 100) - sin_wave = amplitude * np.sin(2 * np.pi * 100 / period) + self.packet_in - packet_in = sin_wave + ((sin_wave/100) * random_noise) - packet_out = packet_in - ((packet_in / 100) * packet_loss) - bytes_in = packet_in * self.bytes_per_pkt - bytes_out = packet_out * self.bytes_per_pkt - - state_prob = self.state_probabilities[self.network_state] - self.network_state = random.choices(self.states, state_prob)[0] - print (self.network_state) - - return [float(packet_in), float(packet_out), float(bytes_in), float(bytes_out), float(packet_loss)] - # return packet_in - - def run(self): - while self.running and (self.duration == -1 or self.duration > 0): - packet_in = self.generate_synthetic_data_point() - self.metric_queue.put(packet_in) - time.sleep(self.interval) - if self.duration > 0: - self.duration -= self.interval - if self.duration == -1: - self.duration = 0 - LOGGER.debug("Emulator collector is stopped.") - self.stop() - - def stop(self): - self.running = False - if not self.is_alive(): - print("Thread is terminated.") diff --git a/src/telemetry/backend/tests/messages_emulated.py b/src/telemetry/backend/tests/messages_emulated.py index 451792327eb17a5cc4581cf436099c7730864039..fa4ddba1b72f790c22ae214453e4b8a090286141 100644 --- a/src/telemetry/backend/tests/messages_emulated.py +++ b/src/telemetry/backend/tests/messages_emulated.py @@ -26,7 +26,7 @@ def create_test_configuration(): {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": { "endpoints": [ {"uuid": "eth0", "type": "ethernet", "sample_types": [101, 102]}, - {"uuid": "eth1", "type": "ethernet", "sample_types": [201, 202]}, + {"uuid": "eth1", "type": "ethernet", "sample_types": []}, {"uuid": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]} ] }}}, @@ -42,19 +42,14 @@ def create_test_configuration(): ] } -# This method is used to create a specific configuration to be used in the test case test_get_config in the test_EmulatedDriver.py file def create_specific_config_keys(): - # config = create_test_configuration() keys_to_return = ["_connect/settings/endpoints/eth1", "/interface/[13/1/2]/settings", "_connect/address"] return keys_to_return - # return {rule["custom"]["resource_key"]: rule["custom"]["resource_value"] for rule in config["config_rules"] if rule["custom"]["resource_key"] in keys_to_return} -# write a method to create a specific configuration to be used in the test case test_delete_config in the test_EmulatedDriver1.py file def create_config_for_delete(): keys_to_delete = ["_connect/settings/endpoints/eth0", "/interface/[eth1]", "_connect/port"] return keys_to_delete -# write a method to generate subscription for specific endpoints. def create_test_subscriptions(): return [("_connect/settings/endpoints/eth1", 10, 2), ("_connect/settings/endpoints/13/1/2", 15, 3), @@ -63,4 +58,4 @@ def create_test_subscriptions(): def create_unscubscribe_subscriptions(): return [("_connect/settings/endpoints/eth1", 10, 2), ("_connect/settings/endpoints/13/1/2", 15, 3), - ("_connect/settings/endpoints/eth0", 8, 2)] \ No newline at end of file + ("_connect/settings/endpoints/eth0", 8, 2)]