Newer
Older
import pytz
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.jobstores.base import JobLookupError
from datetime import datetime
import time
import logging
LOGGER = logging.getLogger(__name__)
class AlarmManager():
def __init__(self, metrics_db):
self.metrics_db = metrics_db
self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
self.scheduler.start()
LOGGER.info("Alarm Manager Initialized")
def create_alarm(self, alarm_queue,alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms, subscription_timeout_s=None):
start_date=None
end_date=None
if subscription_timeout_s:
start_timestamp=time.time()
end_timestamp = start_timestamp + subscription_timeout_s
start_date = datetime.utcfromtimestamp(start_timestamp).isoformat()
end_date = datetime.utcfromtimestamp(end_timestamp).isoformat()
job = self.scheduler.add_job(self.metrics_db.get_alarm_data,
args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),
trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date,
end_date=end_date,timezone=pytz.utc, id=str(alarm_id))
LOGGER.debug(f"Alarm job {alarm_id} succesfully created")
def delete_alarm(self, alarm_id):
try:
self.scheduler.remove_job(alarm_id)
LOGGER.debug(f"Alarm job {alarm_id} succesfully deleted")
except (Exception, JobLookupError) as e:
LOGGER.debug(f"Alarm job {alarm_id} does not exists")