Skip to content
Snippets Groups Projects
Commit 13b78103 authored by Sergio Gonzalez Diaz's avatar Sergio Gonzalez Diaz
Browse files

Initial implementation of SubscriptionManager

parent caa59013
No related branches found
No related tags found
1 merge request!5Feat/monitoring subscriptions
...@@ -17,6 +17,7 @@ redis==4.1.2 ...@@ -17,6 +17,7 @@ redis==4.1.2
requests==2.27.1 requests==2.27.1
xmltodict==0.12.0 xmltodict==0.12.0
questdb==1.0.1 questdb==1.0.1
apscheduler=3.9.1
# pip's dependency resolver does not take into account installed packages. # pip's dependency resolver does not take into account installed packages.
# p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one # p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one
......
...@@ -17,6 +17,7 @@ import requests ...@@ -17,6 +17,7 @@ import requests
import json import json
import logging import logging
import datetime import datetime
from common.tools.timestamp.Converters import timestamp_float_to_string
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -60,6 +61,7 @@ class MetricsDB(): ...@@ -60,6 +61,7 @@ class MetricsDB():
response = requests.get(url, params=query_params) response = requests.get(url, params=query_params)
json_response = json.loads(response.text) json_response = json.loads(response.text)
LOGGER.info(f"Query executed, result:{json_response}") LOGGER.info(f"Query executed, result:{json_response}")
return json_response
def create_table(self): def create_table(self):
query = f'CREATE TABLE IF NOT EXISTS {self.table}'\ query = f'CREATE TABLE IF NOT EXISTS {self.table}'\
...@@ -73,3 +75,10 @@ class MetricsDB(): ...@@ -73,3 +75,10 @@ class MetricsDB():
'TIMESTAMP(timestamp);' 'TIMESTAMP(timestamp);'
self.run_query(query) self.run_query(query)
LOGGER.info(f"Table {self.table} created") LOGGER.info(f"Table {self.table} created")
def get_subscription_data(self, kpi_id, end_date, sampling_interval_s):
start_date = end_date-sampling_interval_s
query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')"
response=self.run_query(query)
kpi_list=response['dataset']
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from common.tools.timestamp.Converters import timestamp_utcnow_to_float
from datetime import datetime, timezone
import time
class SubscriptionManager():
def __init__(self, metrics_db):
self.metrics_db = metrics_db
self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
self.scheduler.start()
def create_subscription(self, subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None):
start_date=None
end_date=None
if sampling_duration_s:
if not start_timestamp:
start_timestamp=time.time()
end_timestamp=start_timestamp+sampling_duration_s
if start_timestamp:
start_date = datetime.fromtimestamp(start_timestamp)
if end_timestamp:
end_date = datetime.fromtimestamp(end_timestamp)
self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(kpi_id, timestamp_utcnow_to_float(), sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id)
def delete_subscription(self, subscription_id):
self.scheduler.remove_job(subscription_id)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment