from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ProcessPoolExecutor from datetime import datetime import time class SubscriptionManager(): def __init__(self, metrics_db): self.metrics_db = metrics_db self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) self.scheduler.start() print("Subscription Manager Initialized") def create_subscription(self, subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): start_date=None end_date=None if sampling_duration_s: if not start_timestamp: start_timestamp=time.time() end_timestamp=start_timestamp+sampling_duration_s if start_timestamp: start_date = datetime.fromtimestamp(start_timestamp) if end_timestamp: end_date = datetime.fromtimestamp(end_timestamp) self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(kpi_id, sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) def delete_subscription(self, subscription_id): self.scheduler.remove_job(subscription_id) if __name__ == '__main__': import MetricsDBTools metrics_db=MetricsDBTools.MetricsDB("localhost",9009,9000,"monitoring",1000) subscription_manager=SubscriptionManager(metrics_db) subscription_manager.create_subscription('1',1,1) time.sleep(100)