Skip to content
Snippets Groups Projects
Commit 35923f74 authored by Sergio Gonzalez Diaz's avatar Sergio Gonzalez Diaz
Browse files

Update SubscriptionManager.py

parent 9722e333
No related branches found
No related tags found
1 merge request!5Feat/monitoring subscriptions
import os # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
from queue import Queue
from random import random
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.jobstores.base import JobLookupError
from datetime import datetime
import time
import logging
LOGGER = logging.getLogger(__name__)
class SubscriptionManager():
def __init__(self, metrics_db):
self.metrics_db = metrics_db
self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
self.scheduler.start()
LOGGER.info("Subscription Manager Initialized")
def create_subscription(self, subs_queue ,subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None):
start_date=None
end_date=None
print("Inside create subscription")
if sampling_duration_s:
if not start_timestamp:# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
...@@ -84,91 +59,4 @@ class SubscriptionManager(): ...@@ -84,91 +59,4 @@ class SubscriptionManager():
LOGGER.debug(f"Subscrition job {subscription_id} succesfully created") LOGGER.debug(f"Subscrition job {subscription_id} succesfully created")
def delete_subscription(self, subscription_id): def delete_subscription(self, subscription_id):
self.scheduler.remove_job(subscription_id) self.scheduler.remove_job(subscription_id)
\ No newline at end of file
def ingest_data():
metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring")
kpi_id = "2"
for i in range(200):
timestamp = timestamp_utcnow_to_float()
kpitype = "KPISAMPLETYPE_PACKETS_TRANSMITTED"
deviceId = "DEV01"
endpointId = "END01"
serviceId = "SERV01"
kpi_value = 50 * random()
metrics_db.write_KPI(timestamp, kpi_id, kpitype, deviceId, endpointId, serviceId, kpi_value)
time.sleep(0.01)
def scheduler_test():
scheduler = BackgroundScheduler()
subs_queue = Queue()
managementdb = ManagementDB("monitoring.db")
metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring")
subs_manager = SubscriptionManager(metrics_db)
scheduler.start()
scheduler.add_job(ingest_data)
kpi_id = "2"
sampling_duration_s = 20
sampling_interval_s = 3
real_start_time = timestamp_utcnow_to_float()
start_timestamp = real_start_time
end_timestamp = start_timestamp + sampling_duration_s
subs_id = managementdb.insert_subscription(kpi_id, "localhost", sampling_duration_s,
sampling_interval_s,start_timestamp,end_timestamp)
subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s,
sampling_duration_s,start_timestamp,end_timestamp)
try:
# This is here to simulate application activity (which keeps the main thread alive).
i = 0
while True:
# time.sleep(sampling_interval_s)
while not subs_queue.empty():
list = subs_queue.get_nowait()
# print(f"List: {list}")
kpi_list = KpiList()
for item in list:
kpi = Kpi()
kpi.kpi_id.kpi_id.uuid = item[0]
kpi.timestamp.timestamp = Converters.timestamp_string_to_float(item[1])
kpi.kpi_value.floatVal = item[2]
kpi_list.kpi.append(kpi)
i += 1
print("Kpi List: " + str(kpi_list) + " " + str(i))
if timestamp_utcnow_to_float() > end_timestamp:
print(f"Total metrics: {i}")
break
except (KeyboardInterrupt, SystemExit):
# Not strictly necessary if daemonic mode is enabled but should be done if possible
scheduler.shutdown()
if __name__ == '__main__':
scheduler_test()
start_timestamp=time.time()
end_timestamp=start_timestamp+sampling_duration_s
print("end_timestamp: " + timestamp_float_to_string(end_timestamp))
if start_timestamp:
start_date = datetime.utcfromtimestamp(start_timestamp)
print("start_date: " + str(start_date))
if end_timestamp:
end_date = datetime.fromtimestamp(end_timestamp)
self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(kpi_id, sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id)
LOGGER.debug(f"Subscrition job {subscription_id} succesfully created")
def delete_subscription(self, subscription_id):
try:
self.scheduler.remove_job(subscription_id)
LOGGER.debug(f"Subscription job {subscription_id} succesfully deleted")
except (Exception, JobLookupError) as e:
LOGGER.debug(f"Subscription job {subscription_id} does not exists")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment