diff --git a/src/monitoring/service/AlarmManager.py b/src/monitoring/service/AlarmManager.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..45764150d965f1ec35a534d8455a0b607e52e34c 100644 --- a/src/monitoring/service/AlarmManager.py +++ b/src/monitoring/service/AlarmManager.py @@ -0,0 +1,32 @@ +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.executors.pool import ProcessPoolExecutor +from datetime import datetime +import time + + +class AlarmManager(): + def __init__(self, metrics_db): + self.metrics_db = metrics_db + self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) + self.scheduler.start() + + + def create_alarm(self, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms, subscription_timeout_s=None): + start_date=None + end_date=None + if subscription_timeout_s: + start_timestamp=time.time() + start_date=datetime.fromtimestamp(start_timestamp) + end_date=datetime.fromtimestamp(start_timestamp+subscription_timeout_s) + self.scheduler.add_job(self.metrics_db.get_alarm_data, args=(kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, end_date=end_date, id=alarm_id) + + def delete_alarm(self, alarm_id): + self.scheduler.remove_job(alarm_id) + +if __name__ == '__main__': + import MetricsDBTools + metrics_db=MetricsDBTools.MetricsDB("localhost",9009,9000,"monitoring",10) + alarm_manager=AlarmManager(metrics_db) + alarm_manager.create_alarm('1',1,0,1,True,True,True,1000) + time.sleep(100) + print("END") diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py index 53430780e843526cdad2ddbfb030f75287d93154..be4420f9953bbc8b41a40860e058f8d97e5b1c3d 100644 --- a/src/monitoring/service/ManagementDBTools.py +++ b/src/monitoring/service/ManagementDBTools.py @@ -19,6 +19,7 @@ class ManagementDB(): self.client = sl.connect(database, check_same_thread=False) self.create_monitoring_table() self.create_subscription_table() + self.create_alarm_table() def create_monitoring_table(self): self.client.execute(""" @@ -45,6 +46,21 @@ class ManagementDB(): ); """) + def create_alarm_table(self): + self.client.execute(""" + CREATE TABLE IF NOT EXISTS alarm( + alarm_id INTEGER PRIMARY KEY AUTOINCREMENT, + alarm_description TEXT, + alarm_name TEXT, + kpi_id INTEGER, + kpi_min_value REAL, + kpi_max_value REAL, + in_range INTEGER, + include_min_value INTEGER, + include_max_value INTEGER + ); + """) + def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id): c = self.client.cursor() c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ?",(device_id,kpi_sample_type,endpoint_id)) @@ -68,14 +84,26 @@ class ManagementDB(): print("already exists") return data[0] - def delete_KPI(self,device_id,kpi_sample_type): + def insert_alarm(self,alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value): + c = self.client.cursor() + c.execute("SELECT alarm_id FROM alarm WHERE alarm_description is ? AND alarm_name is ? AND kpi_id is ? AND kpi_min_value is ? AND kpi_max_value is ? AND in_range is ? AND include_min_value is ? AND include_max_value is ?",(alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value)) + data=c.fetchone() + if data is None: + c.execute("INSERT INTO alarm (alarm_description, alarm_name, kpi_id, kpi_min_value, kpi_max_value, in_range, include_min_value, include_max_value) VALUES (?,?,?,?,?,?)", (alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value)) + self.client.commit() + return c.lastrowid + else: + print("already exists") + return data[0] + + def delete_KPI(self,kpi_id): c = self.client.cursor() - c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ?",(device_id,kpi_sample_type)) + c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)) data=c.fetchone() if data is None: return False else: - c.execute("DELETE FROM kpi WHERE device_id is ? AND kpi_sample_type is ?",(device_id,kpi_sample_type)) + c.execute("DELETE FROM kpi WHERE kpi_id is ?",(kpi_id,)) self.client.commit() return True @@ -90,14 +118,14 @@ class ManagementDB(): self.client.commit() return True - def delete_kpid_id(self,kpi_id): + def delete_alarm(self,alarm_id): c = self.client.cursor() - c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)) + c.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,)) data=c.fetchone() if data is None: return False else: - c.execute("DELETE FROM kpi WHERE kpi_id is ?",(kpi_id,)) + c.execute("DELETE FROM alarm WHERE alarm_id is ?",(alarm_id,)) self.client.commit() return True @@ -108,17 +136,19 @@ class ManagementDB(): def get_subscription(self,subs_id): data = self.client.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,)) return data.fetchone() + + def get_alarm(self,alarm_id): + data = self.client.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,)) + return data.fetchone() def get_KPIS(self): data = self.client.execute("SELECT * FROM kpi") - #print("\n") - #for row in data: - # print(row) return data.fetchall() def get_subscriptions(self): data = self.client.execute("SELECT * FROM subscription") - #print("\n") - #for row in data: - # print(row) + return data.fetchall() + + def get_alarms(self): + data = self.client.execute("SELECT * FROM alarm") return data.fetchall() \ No newline at end of file diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index a60968df73a6fa3d6c7036d1fe4a61becb10c4e6..d9c92f957ac583d6334cb9cab6c06a3b1be10cfc 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -17,16 +17,19 @@ import requests import json import logging import datetime -from common.tools.timestamp.Converters import timestamp_float_to_string +# from common.tools.timestamp.Converters import timestamp_float_to_string +from Converters import timestamp_float_to_string, timestamp_utcnow_to_float + LOGGER = logging.getLogger(__name__) class MetricsDB(): - def __init__(self, host, ilp_port, rest_port, table): + def __init__(self, host, ilp_port, rest_port, table, commit_lag_ms): self.host=host self.ilp_port=int(ilp_port) self.rest_port=rest_port self.table=table + self.commit_lag_ms=commit_lag_ms self.create_table() def write_KPI(self,time,kpi_id,kpi_sample_type,device_id,endpoint_id,service_id,kpi_value): @@ -76,9 +79,73 @@ class MetricsDB(): self.run_query(query) LOGGER.info(f"Table {self.table} created") - def get_subscription_data(self, kpi_id, end_date, sampling_interval_s): - start_date = end_date-sampling_interval_s - query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" - response=self.run_query(query) - kpi_list=response['dataset'] + def get_subscription_data(self, kpi_id, sampling_interval_s): + end_date = timestamp_utcnow_to_float()-self.commit_lag_ms/1000 + start_date = end_date-sampling_interval_s + query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + response=self.run_query(query) + kpi_list=response['dataset'] + print(kpi_list) + def get_alarm_data(self, kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms): + end_date = timestamp_utcnow_to_float()-self.commit_lag_ms/1000 + start_date = end_date-subscription_frequency_ms/1000 + query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + response=self.run_query(query) + kpi_list=response['dataset'] + for kpi in kpi_list: + alarm = False + kpi_value = kpi[2] + if (kpiMinValue == kpi_value and kpiMaxValue == kpi_value and inRange): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue): + if (kpi_value >= kpiMinValue and kpi_value <= kpiMaxValue): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): + if (kpi_value >= kpiMinValue and kpi_value < kpiMaxValue): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): + if (kpi_value > kpiMinValue and kpi_value <= kpiMaxValue): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): + if (kpi_value > kpiMinValue and kpi_value < kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue): + if (kpi_value <= kpiMinValue or kpi_value >= kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): + if (kpi_value <= kpiMinValue or kpi_value > kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): + if (kpi_value < kpiMinValue or kpi_value >= kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): + if (kpi_value < kpiMinValue or kpi_value > kpiMaxValue): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is None and includeMinValue): + if (kpi_value >= kpiMinValue): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): + if (kpi_value > kpiMinValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): + if (kpi_value <= kpiMinValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): + if (kpi_value <= kpiMinValue): + alarm = True + elif (inRange and kpiMinValue is None and kpiMaxValue is not None and includeMaxValue): + if (kpi_value <= kpiMaxValue): + alarm = True + elif (inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): + if (kpi_value < kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): + if (kpi_value >= kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): + if (kpi_value >= kpiMaxValue): + alarm = True + if alarm: + print(kpi) + # LOGGER.info(f"Alarm triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index eb46f3ec546862b4caee04c1caedd71c82392cb2..94e602bef6231cf0677d59087556d6a06f92e062 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -1,9 +1,6 @@ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ProcessPoolExecutor -from apscheduler.triggers.interval import IntervalTrigger -from apscheduler.triggers.cron import CronTrigger -from common.tools.timestamp.Converters import timestamp_utcnow_to_float -from datetime import datetime, timezone +from datetime import datetime import time @@ -12,6 +9,7 @@ class SubscriptionManager(): self.metrics_db = metrics_db self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) self.scheduler.start() + print("Subscription Manager Initialized") def create_subscription(self, subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): @@ -25,7 +23,15 @@ class SubscriptionManager(): start_date = datetime.fromtimestamp(start_timestamp) if end_timestamp: end_date = datetime.fromtimestamp(end_timestamp) - self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(kpi_id, timestamp_utcnow_to_float(), sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) + self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(kpi_id, sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) def delete_subscription(self, subscription_id): - self.scheduler.remove_job(subscription_id) \ No newline at end of file + self.scheduler.remove_job(subscription_id) + +if __name__ == '__main__': + + import MetricsDBTools + metrics_db=MetricsDBTools.MetricsDB("localhost",9009,9000,"monitoring",1000) + subscription_manager=SubscriptionManager(metrics_db) + subscription_manager.create_subscription('1',1,1) + time.sleep(100) \ No newline at end of file