Skip to content
Snippets Groups Projects
SubscriptionManager.py 1.47 KiB
Newer Older
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from common.tools.timestamp.Converters import timestamp_utcnow_to_float
from datetime import datetime, timezone
import time


class SubscriptionManager():
    def __init__(self, metrics_db):
        self.metrics_db = metrics_db
        self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
        self.scheduler.start()
        

    def create_subscription(self, subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None):
        start_date=None
        end_date=None
        if sampling_duration_s:
            if not start_timestamp:
                start_timestamp=time.time()
            end_timestamp=start_timestamp+sampling_duration_s
        if start_timestamp:
            start_date = datetime.fromtimestamp(start_timestamp)
        if end_timestamp:
            end_date = datetime.fromtimestamp(end_timestamp)
        self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(kpi_id, timestamp_utcnow_to_float(), sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id)

    def delete_subscription(self, subscription_id):
        self.scheduler.remove_job(subscription_id)