From 13b78103338bcb4152c13cc78c3e9c72a23f4d38 Mon Sep 17 00:00:00 2001 From: Sergio Gonzalez <sergio.gonzalez.diaz@atos.net> Date: Fri, 23 Sep 2022 14:11:55 +0200 Subject: [PATCH] Initial implementation of SubscriptionManager --- src/monitoring/requirements.in | 1 + src/monitoring/service/MetricsDBTools.py | 9 ++++++ src/monitoring/service/SubscriptionManager.py | 31 +++++++++++++++++++ 3 files changed, 41 insertions(+) diff --git a/src/monitoring/requirements.in b/src/monitoring/requirements.in index e0176e026..8ceddf216 100644 --- a/src/monitoring/requirements.in +++ b/src/monitoring/requirements.in @@ -17,6 +17,7 @@ redis==4.1.2 requests==2.27.1 xmltodict==0.12.0 questdb==1.0.1 +apscheduler=3.9.1 # pip's dependency resolver does not take into account installed packages. # p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index f04b2a7ba..a60968df7 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -17,6 +17,7 @@ import requests import json import logging import datetime +from common.tools.timestamp.Converters import timestamp_float_to_string LOGGER = logging.getLogger(__name__) @@ -60,6 +61,7 @@ class MetricsDB(): response = requests.get(url, params=query_params) json_response = json.loads(response.text) LOGGER.info(f"Query executed, result:{json_response}") + return json_response def create_table(self): query = f'CREATE TABLE IF NOT EXISTS {self.table}'\ @@ -73,3 +75,10 @@ class MetricsDB(): 'TIMESTAMP(timestamp);' self.run_query(query) LOGGER.info(f"Table {self.table} created") + + def get_subscription_data(self, kpi_id, end_date, sampling_interval_s): + start_date = end_date-sampling_interval_s + query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + response=self.run_query(query) + kpi_list=response['dataset'] + diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index e69de29bb..eb46f3ec5 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -0,0 +1,31 @@ +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.executors.pool import ProcessPoolExecutor +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.triggers.cron import CronTrigger +from common.tools.timestamp.Converters import timestamp_utcnow_to_float +from datetime import datetime, timezone +import time + + +class SubscriptionManager(): + def __init__(self, metrics_db): + self.metrics_db = metrics_db + self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) + self.scheduler.start() + + + def create_subscription(self, subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): + start_date=None + end_date=None + if sampling_duration_s: + if not start_timestamp: + start_timestamp=time.time() + end_timestamp=start_timestamp+sampling_duration_s + if start_timestamp: + start_date = datetime.fromtimestamp(start_timestamp) + if end_timestamp: + end_date = datetime.fromtimestamp(end_timestamp) + self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(kpi_id, timestamp_utcnow_to_float(), sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) + + def delete_subscription(self, subscription_id): + self.scheduler.remove_job(subscription_id) \ No newline at end of file -- GitLab