from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger from common.tools.timestamp.Converters import timestamp_utcnow_to_float from datetime import datetime, timezone import time class SubscriptionManager(): def __init__(self, metrics_db): self.metrics_db = metrics_db self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) self.scheduler.start() def create_subscription(self, subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): start_date=None end_date=None if sampling_duration_s: if not start_timestamp: start_timestamp=time.time() end_timestamp=start_timestamp+sampling_duration_s if start_timestamp: start_date = datetime.fromtimestamp(start_timestamp) if end_timestamp: end_date = datetime.fromtimestamp(end_timestamp) self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(kpi_id, timestamp_utcnow_to_float(), sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) def delete_subscription(self, subscription_id): self.scheduler.remove_job(subscription_id)