diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index f2e10286af7b6dd588e72db76bcd04801c4d7b41..47f73e7f9ead2016060dee735010590bf79a7b32 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -1,29 +1,4 @@ -import os -from queue import Queue -from random import random - -from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.executors.pool import ProcessPoolExecutor -from apscheduler.jobstores.base import JobLookupError -from datetime import datetime -import time -import logging - -LOGGER = logging.getLogger(__name__) - -class SubscriptionManager(): - def __init__(self, metrics_db): - self.metrics_db = metrics_db - self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) - self.scheduler.start() - LOGGER.info("Subscription Manager Initialized") - - def create_subscription(self, subs_queue ,subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): - start_date=None - end_date=None - print("Inside create subscription") - if sampling_duration_s: - if not start_timestamp:# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -84,91 +59,4 @@ class SubscriptionManager(): LOGGER.debug(f"Subscrition job {subscription_id} succesfully created") def delete_subscription(self, subscription_id): - self.scheduler.remove_job(subscription_id) - - -def ingest_data(): - - metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring") - - kpi_id = "2" - - for i in range(200): - timestamp = timestamp_utcnow_to_float() - kpitype = "KPISAMPLETYPE_PACKETS_TRANSMITTED" - deviceId = "DEV01" - endpointId = "END01" - serviceId = "SERV01" - kpi_value = 50 * random() - metrics_db.write_KPI(timestamp, kpi_id, kpitype, deviceId, endpointId, serviceId, kpi_value) - time.sleep(0.01) - -def scheduler_test(): - scheduler = BackgroundScheduler() - - subs_queue = Queue() - - managementdb = ManagementDB("monitoring.db") - metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring") - subs_manager = SubscriptionManager(metrics_db) - - scheduler.start() - scheduler.add_job(ingest_data) - - kpi_id = "2" - sampling_duration_s = 20 - sampling_interval_s = 3 - real_start_time = timestamp_utcnow_to_float() - start_timestamp = real_start_time - end_timestamp = start_timestamp + sampling_duration_s - - subs_id = managementdb.insert_subscription(kpi_id, "localhost", sampling_duration_s, - sampling_interval_s,start_timestamp,end_timestamp) - subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s, - sampling_duration_s,start_timestamp,end_timestamp) - - try: - # This is here to simulate application activity (which keeps the main thread alive). - i = 0 - while True: - # time.sleep(sampling_interval_s) - while not subs_queue.empty(): - list = subs_queue.get_nowait() - # print(f"List: {list}") - kpi_list = KpiList() - for item in list: - kpi = Kpi() - kpi.kpi_id.kpi_id.uuid = item[0] - kpi.timestamp.timestamp = Converters.timestamp_string_to_float(item[1]) - kpi.kpi_value.floatVal = item[2] - kpi_list.kpi.append(kpi) - i += 1 - print("Kpi List: " + str(kpi_list) + " " + str(i)) - if timestamp_utcnow_to_float() > end_timestamp: - print(f"Total metrics: {i}") - break - - except (KeyboardInterrupt, SystemExit): - # Not strictly necessary if daemonic mode is enabled but should be done if possible - scheduler.shutdown() - - -if __name__ == '__main__': - scheduler_test() - start_timestamp=time.time() - end_timestamp=start_timestamp+sampling_duration_s - print("end_timestamp: " + timestamp_float_to_string(end_timestamp)) - if start_timestamp: - start_date = datetime.utcfromtimestamp(start_timestamp) - print("start_date: " + str(start_date)) - if end_timestamp: - end_date = datetime.fromtimestamp(end_timestamp) - self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(kpi_id, sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) - LOGGER.debug(f"Subscrition job {subscription_id} succesfully created") - - def delete_subscription(self, subscription_id): - try: - self.scheduler.remove_job(subscription_id) - LOGGER.debug(f"Subscription job {subscription_id} succesfully deleted") - except (Exception, JobLookupError) as e: - LOGGER.debug(f"Subscription job {subscription_id} does not exists") + self.scheduler.remove_job(subscription_id) \ No newline at end of file