Newer
Older
import os
from queue import Queue
from random import random
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from common.proto.monitoring_pb2 import Kpi, KpiList
from common.tools.timestamp import Converters
from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_float_to_string
from datetime import datetime
from monitoring.service import MetricsDBTools
from monitoring.service.ManagementDBTools import ManagementDB
class SubscriptionManager():
def __init__(self, metrics_db):
self.metrics_db = metrics_db
self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
self.scheduler.start()
def create_subscription(self, subs_queue ,subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None):
start_date=None
end_date=None
if sampling_duration_s:
if not start_timestamp:
start_timestamp=time.time()
end_timestamp=start_timestamp+sampling_duration_s
print("end_timestamp: " + timestamp_float_to_string(end_timestamp))
start_date = datetime.utcfromtimestamp(start_timestamp)
print("start_date: " + str(start_date))
end_date = datetime.utcfromtimestamp(end_timestamp)
print("end_date: " + str(end_date))
self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue, kpi_id, start_timestamp, sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id)
self.metrics_db.get_subscription_data(subs_queue,kpi_id,end_date,sampling_interval_s)
def delete_subscription(self, subscription_id):
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
self.scheduler.remove_job(subscription_id)
def main():
subs_queue = Queue()
managementdb = ManagementDB("monitoring.db")
metrics_db = MetricsDBTools.MetricsDB("localhost", "9009", "9000", "monitoring")
subs_manager = SubscriptionManager(metrics_db)
print("Here")
kpi_id = "2"
sampling_duration_s = 10
sampling_interval_s = 2
start_timestamp = timestamp_utcnow_to_float()
end_timestamp = timestamp_utcnow_to_float() + 10
print("Before loop")
print("start_timestamp: " + timestamp_float_to_string(start_timestamp))
print("end_timestamp: " + timestamp_float_to_string(end_timestamp))
for i in range(10):
timestamp = timestamp_utcnow_to_float()
kpitype = "KPISAMPLETYPE_PACKETS_TRANSMITTED"
deviceId = "DEV01"
endpointId = "END01"
serviceId = "SERV01"
kpi_value = 50*random()
metrics_db.write_KPI(timestamp,kpi_id,kpitype,deviceId,endpointId,serviceId,kpi_value)
time.sleep(1)
print("After loop")
subs_id = managementdb.insert_subscription(kpi_id, "localhost", sampling_duration_s, sampling_interval_s, start_timestamp,
end_timestamp)
subs_manager.create_subscription(subs_queue,str(subs_id),kpi_id,sampling_interval_s,sampling_duration_s,start_timestamp,end_timestamp)
print("Queue empty: " + str(subs_queue.empty()))
print("Queue size: " + str(subs_queue.qsize()))
while not subs_queue.empty():
list = subs_queue.get_nowait()
print("List: " + str(list))
kpi_list = KpiList()
for item in list:
kpi = Kpi()
kpi.kpi_id.kpi_id.uuid = item[0]
kpi.timestamp.timestamp = Converters.timestamp_string_to_float(item[1])
kpi.kpi_value.floatVal = item[2]
kpi_list.kpi.append(kpi)
print("Kpi List: " + str(kpi_list))
if __name__ == '__main__':
main()