diff --git a/src/telemetry/backend/service/EmulatedCollector.py b/src/telemetry/backend/service/EmulatedCollector.py index 716292ae0a8794089847e589718e9d21fe942594..5f546d043653f4cad6371abe15fb69099ff3389f 100644 --- a/src/telemetry/backend/service/EmulatedCollector.py +++ b/src/telemetry/backend/service/EmulatedCollector.py @@ -8,6 +8,9 @@ import queue LOGGER = logging.getLogger(__name__) class NetworkMetricsEmulator(threading.Thread): + """ + This collector class will generate a single emulated metric value. + """ def __init__(self, interval=1, duration=10, metric_queue=None, network_state="moderate"): LOGGER.info("Initiaitng Emulator") super().__init__() diff --git a/src/telemetry/backend/service/EmulatedCollectorMultiple.py b/src/telemetry/backend/service/EmulatedCollectorMultiple.py new file mode 100644 index 0000000000000000000000000000000000000000..54886e5db865ecb36cf4dd17c99c1203628940b7 --- /dev/null +++ b/src/telemetry/backend/service/EmulatedCollectorMultiple.py @@ -0,0 +1,83 @@ +import numpy as np +import random +import threading +import time +import logging +import queue + +LOGGER = logging.getLogger(__name__) + +class NetworkMetricsEmulator(threading.Thread): + """ + This collector class will generate the emulated metrics for PKT_IN, PKT_OUT, BYTES_IN, BYTES_OUT and PKT_DROP as a list. + """ + def __init__(self, interval=1, duration=10, metric_queue=None, network_state="moderate"): + LOGGER.info("Initiaitng Emulator") + super().__init__() + self.interval = interval + self.duration = duration + self.metric_queue = metric_queue if metric_queue is not None else queue.Queue() + self.network_state = network_state + self.running = True + self.set_inital_parameter_values() + + def set_inital_parameter_values(self): + self.bytes_per_pkt = random.uniform(65, 150) + self.states = ["good", "moderate", "poor"] + self.state_probabilities = { + "good" : [0.8, 0.2, 0.0], + "moderate": [0.2, 0.6, 0.2], + "poor" : [0.0, 0.4, 0.6] + } + if self.network_state == "good": + self.packet_in = random.uniform(700, 900) + elif self.network_state == "moderate": + self.packet_in = random.uniform(300, 700) + else: + self.packet_in = random.uniform(100, 300) + + def generate_synthetic_data_point(self): + if self.network_state == "good": + packet_loss = random.uniform(0.01, 0.1) + random_noise = random.uniform(1,10) + elif self.network_state == "moderate": + packet_loss = random.uniform(0.1, 1) + random_noise = random.uniform(10, 40) + elif self.network_state == "poor": + packet_loss = random.uniform(1, 3) + random_noise = random.uniform(40, 100) + else: + raise ValueError("Invalid network state. Must be 'good', 'moderate', or 'poor'.") + # self.packet_in += random_noise + + period = 60 * 60 * random.uniform(10, 100) + amplitude = random.uniform(50, 100) + sin_wave = amplitude * np.sin(2 * np.pi * 100 / period) + self.packet_in + packet_in = sin_wave + ((sin_wave/100) * random_noise) + packet_out = packet_in - ((packet_in / 100) * packet_loss) + bytes_in = packet_in * self.bytes_per_pkt + bytes_out = packet_out * self.bytes_per_pkt + + state_prob = self.state_probabilities[self.network_state] + self.network_state = random.choices(self.states, state_prob)[0] + print (self.network_state) + + return [float(packet_in), float(packet_out), float(bytes_in), float(bytes_out), float(packet_loss)] + # return packet_in + + def run(self): + while self.running and (self.duration == -1 or self.duration > 0): + packet_in = self.generate_synthetic_data_point() + self.metric_queue.put(packet_in) + time.sleep(self.interval) + if self.duration > 0: + self.duration -= self.interval + if self.duration == -1: + self.duration = 0 + LOGGER.debug("Emulator collector is stopped.") + self.stop() + + def stop(self): + self.running = False + if not self.is_alive(): + print("Thread is terminated.")