Newer
Older

Waleed Akbar
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import numpy as np
import random
import threading
import time
import logging
import queue
LOGGER = logging.getLogger(__name__)
class NetworkMetricsEmulator(threading.Thread):
"""
This collector class will generate the emulated metrics for PKT_IN, PKT_OUT, BYTES_IN, BYTES_OUT and PKT_DROP as a list.
"""
def __init__(self, interval=1, duration=10, metric_queue=None, network_state="moderate"):
LOGGER.info("Initiaitng Emulator")
super().__init__()
self.interval = interval
self.duration = duration
self.metric_queue = metric_queue if metric_queue is not None else queue.Queue()
self.network_state = network_state
self.running = True
self.set_inital_parameter_values()
def set_inital_parameter_values(self):
self.bytes_per_pkt = random.uniform(65, 150)
self.states = ["good", "moderate", "poor"]
self.state_probabilities = {
"good" : [0.8, 0.2, 0.0],
"moderate": [0.2, 0.6, 0.2],
"poor" : [0.0, 0.4, 0.6]
}
if self.network_state == "good":
self.packet_in = random.uniform(700, 900)
elif self.network_state == "moderate":
self.packet_in = random.uniform(300, 700)
else:
self.packet_in = random.uniform(100, 300)
def generate_synthetic_data_point(self):
if self.network_state == "good":
packet_loss = random.uniform(0.01, 0.1)
random_noise = random.uniform(1,10)
elif self.network_state == "moderate":
packet_loss = random.uniform(0.1, 1)
random_noise = random.uniform(10, 40)
elif self.network_state == "poor":
packet_loss = random.uniform(1, 3)
random_noise = random.uniform(40, 100)
else:
raise ValueError("Invalid network state. Must be 'good', 'moderate', or 'poor'.")
# self.packet_in += random_noise
period = 60 * 60 * random.uniform(10, 100)
amplitude = random.uniform(50, 100)
sin_wave = amplitude * np.sin(2 * np.pi * 100 / period) + self.packet_in
packet_in = sin_wave + ((sin_wave/100) * random_noise)
packet_out = packet_in - ((packet_in / 100) * packet_loss)
bytes_in = packet_in * self.bytes_per_pkt
bytes_out = packet_out * self.bytes_per_pkt
state_prob = self.state_probabilities[self.network_state]
self.network_state = random.choices(self.states, state_prob)[0]
print (self.network_state)
return [float(packet_in), float(packet_out), float(bytes_in), float(bytes_out), float(packet_loss)]
# return packet_in
def run(self):
while self.running and (self.duration == -1 or self.duration > 0):
packet_in = self.generate_synthetic_data_point()
self.metric_queue.put(packet_in)
time.sleep(self.interval)
if self.duration > 0:
self.duration -= self.interval
if self.duration == -1:
self.duration = 0
LOGGER.debug("Emulator collector is stopped.")
self.stop()
def stop(self):
self.running = False
if not self.is_alive():
print("Thread is terminated.")