diff --git a/src/monitoring/requirements.in b/src/monitoring/requirements.in index e0176e0266ad6239dabb3aeedc273ddc0b638ded..8ceddf2168809ec6407f717f4a197daee16b104f 100644 --- a/src/monitoring/requirements.in +++ b/src/monitoring/requirements.in @@ -17,6 +17,7 @@ redis==4.1.2 requests==2.27.1 xmltodict==0.12.0 questdb==1.0.1 +apscheduler=3.9.1 # pip's dependency resolver does not take into account installed packages. # p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index f04b2a7ba17ec87d1f3eb0f33989e59555077687..a60968df73a6fa3d6c7036d1fe4a61becb10c4e6 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -17,6 +17,7 @@ import requests import json import logging import datetime +from common.tools.timestamp.Converters import timestamp_float_to_string LOGGER = logging.getLogger(__name__) @@ -60,6 +61,7 @@ class MetricsDB(): response = requests.get(url, params=query_params) json_response = json.loads(response.text) LOGGER.info(f"Query executed, result:{json_response}") + return json_response def create_table(self): query = f'CREATE TABLE IF NOT EXISTS {self.table}'\ @@ -73,3 +75,10 @@ class MetricsDB(): 'TIMESTAMP(timestamp);' self.run_query(query) LOGGER.info(f"Table {self.table} created") + + def get_subscription_data(self, kpi_id, end_date, sampling_interval_s): + start_date = end_date-sampling_interval_s + query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + response=self.run_query(query) + kpi_list=response['dataset'] + diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..eb46f3ec546862b4caee04c1caedd71c82392cb2 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -0,0 +1,31 @@ +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.executors.pool import ProcessPoolExecutor +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.triggers.cron import CronTrigger +from common.tools.timestamp.Converters import timestamp_utcnow_to_float +from datetime import datetime, timezone +import time + + +class SubscriptionManager(): + def __init__(self, metrics_db): + self.metrics_db = metrics_db + self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) + self.scheduler.start() + + + def create_subscription(self, subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): + start_date=None + end_date=None + if sampling_duration_s: + if not start_timestamp: + start_timestamp=time.time() + end_timestamp=start_timestamp+sampling_duration_s + if start_timestamp: + start_date = datetime.fromtimestamp(start_timestamp) + if end_timestamp: + end_date = datetime.fromtimestamp(end_timestamp) + self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(kpi_id, timestamp_utcnow_to_float(), sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) + + def delete_subscription(self, subscription_id): + self.scheduler.remove_job(subscription_id) \ No newline at end of file