Newer
Older
import numpy as np
import random
import threading
import time
import logging
import queue
LOGGER = logging.getLogger(__name__)
class NetworkMetricsEmulator(threading.Thread):

Waleed Akbar
committed
"""
This collector class will generate a single emulated metric value.
"""
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def __init__(self, interval=1, duration=10, metric_queue=None, network_state="moderate"):
LOGGER.info("Initiaitng Emulator")
super().__init__()
self.interval = interval
self.duration = duration
self.metric_queue = metric_queue if metric_queue is not None else queue.Queue()
self.network_state = network_state
self.running = True
self.base_utilization = None
self.states = None
self.state_probabilities = None
self.set_inital_parameter_values()
def set_inital_parameter_values(self):
self.states = ["good", "moderate", "poor"]
self.state_probabilities = {
"good" : [0.9, 0.1, 0.0],
"moderate": [0.2, 0.7, 0.1],
"poor" : [0.0, 0.3, 0.7]
}
if self.network_state == "good":
self.base_utilization = random.uniform(700, 900)
elif self.network_state == "moderate":
self.base_utilization = random.uniform(300, 700)
else:
self.base_utilization = random.uniform(100, 300)
def generate_synthetic_data_point(self):
if self.network_state == "good":
variance = random.uniform(-5, 5)
elif self.network_state == "moderate":
variance = random.uniform(-50, 50)
elif self.network_state == "poor":
variance = random.uniform(-100, 100)
else:
raise ValueError("Invalid network state. Must be 'good', 'moderate', or 'poor'.")
self.base_utilization += variance
period = 60 * 60 * random.uniform(10, 100)
amplitude = random.uniform(50, 100)
sin_wave = amplitude * np.sin(2 * np.pi * 100 / period) + self.base_utilization
random_noise = random.uniform(-10, 10)
utilization = sin_wave + random_noise
state_prob = self.state_probabilities[self.network_state]
self.network_state = random.choices(self.states, state_prob)[0]
return utilization
def run(self):
while self.running and (self.duration == -1 or self.duration > 0):
utilization = self.generate_synthetic_data_point()
self.metric_queue.put(round(utilization,3))
time.sleep(self.interval)
if self.duration > 0:
self.duration -= self.interval
if self.duration == -1:
self.duration = 0
LOGGER.debug("Emulator collector is stopped.")
self.stop()
def stop(self):
self.running = False
if not self.is_alive():
LOGGER.debug("Emulator Collector is Termintated.")