diff --git a/src/monitoring/requirements.in b/src/monitoring/requirements.in index e0176e0266ad6239dabb3aeedc273ddc0b638ded..f0f29dcd6be1b864803a208b186c9c6fb263344e 100644 --- a/src/monitoring/requirements.in +++ b/src/monitoring/requirements.in @@ -17,6 +17,7 @@ redis==4.1.2 requests==2.27.1 xmltodict==0.12.0 questdb==1.0.1 +psycopg2-binary==2.9.3 # pip's dependency resolver does not take into account installed packages. # p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one diff --git a/src/monitoring/service/AlarmManager.py b/src/monitoring/service/AlarmManager.py index 45764150d965f1ec35a534d8455a0b607e52e34c..dc1b02b03079b6791b05a6f6380694adf71a9b1a 100644 --- a/src/monitoring/service/AlarmManager.py +++ b/src/monitoring/service/AlarmManager.py @@ -1,15 +1,18 @@ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ProcessPoolExecutor +from apscheduler.jobstores.base import JobLookupError from datetime import datetime import time +import logging +LOGGER = logging.getLogger(__name__) class AlarmManager(): def __init__(self, metrics_db): self.metrics_db = metrics_db self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) self.scheduler.start() - + LOGGER.info("Alarm Manager Initialized") def create_alarm(self, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms, subscription_timeout_s=None): start_date=None @@ -19,14 +22,11 @@ class AlarmManager(): start_date=datetime.fromtimestamp(start_timestamp) end_date=datetime.fromtimestamp(start_timestamp+subscription_timeout_s) self.scheduler.add_job(self.metrics_db.get_alarm_data, args=(kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, end_date=end_date, id=alarm_id) + LOGGER.debug(f"Alarm job {alarm_id} succesfully created") def delete_alarm(self, alarm_id): - self.scheduler.remove_job(alarm_id) - -if __name__ == '__main__': - import MetricsDBTools - metrics_db=MetricsDBTools.MetricsDB("localhost",9009,9000,"monitoring",10) - alarm_manager=AlarmManager(metrics_db) - alarm_manager.create_alarm('1',1,0,1,True,True,True,1000) - time.sleep(100) - print("END") + try: + self.scheduler.remove_job(alarm_id) + LOGGER.debug(f"Alarm job {alarm_id} succesfully deleted") + except (Exception, JobLookupError) as e: + LOGGER.debug(f"Alarm job {alarm_id} does not exists") diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py index 00f8b5aa234451e6f610a59d2c319cab8888c88e..2387ddde0ab9eecea6c8fc982ba97a94f1a88c98 100644 --- a/src/monitoring/service/ManagementDBTools.py +++ b/src/monitoring/service/ManagementDBTools.py @@ -12,143 +12,237 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sqlite3 as sl +import sqlite3 +import logging + +LOGGER = logging.getLogger(__name__) class ManagementDB(): def __init__(self, database): - self.client = sl.connect(database, check_same_thread=False) - self.create_monitoring_table() - self.create_subscription_table() - self.create_alarm_table() + try: + self.client = sqlite3.connect(database, check_same_thread=False) + self.create_monitoring_table() + self.create_subscription_table() + self.create_alarm_table() + LOGGER.info("ManagementDB initialized") + except: + LOGGER.info("ManagementDB cannot be initialized") + raise Exception("Critical error in the monitoring component") def create_monitoring_table(self): - self.client.execute(""" - CREATE TABLE IF NOT EXISTS kpi( - kpi_id INTEGER PRIMARY KEY AUTOINCREMENT, - kpi_description TEXT, - kpi_sample_type INTEGER, - device_id INTEGER, - endpoint_id INTEGER, - service_id INTEGER - ); - """) + try: + result=self.client.execute(""" + CREATE TABLE IF NOT EXISTS kpi( + kpi_id INTEGER PRIMARY KEY AUTOINCREMENT, + kpi_description TEXT, + kpi_sample_type INTEGER, + device_id INTEGER, + endpoint_id INTEGER, + service_id INTEGER + ); + """) + LOGGER.debug("KPI table created in the ManagementDB") + except sqlite3.Error as e: + LOGGER.debug(f"KPI table cannot be created in the ManagementD. {e}") + raise Exception def create_subscription_table(self): - self.client.execute(""" - CREATE TABLE IF NOT EXISTS subscription( - subs_id INTEGER PRIMARY KEY AUTOINCREMENT, - kpi_id INTEGER, - subscriber TEXT, - sampling_duration_s REAL, - sampling_interval_s REAL, - start_timestamp REAL, - end_timestamp REAL - ); - """) + try: + result= self.client.execute(""" + CREATE TABLE IF NOT EXISTS subscription( + subs_id INTEGER PRIMARY KEY AUTOINCREMENT, + kpi_id INTEGER, + subscriber TEXT, + sampling_duration_s REAL, + sampling_interval_s REAL, + start_timestamp REAL, + end_timestamp REAL + ); + """) + LOGGER.info("Subscription table created in the ManagementDB") + except sqlite3.Error as e: + LOGGER.debug(f"Subscription table cannot be created in the ManagementDB. {e}") + raise Exception def create_alarm_table(self): - self.client.execute(""" - CREATE TABLE IF NOT EXISTS alarm( - alarm_id INTEGER PRIMARY KEY AUTOINCREMENT, - alarm_description TEXT, - alarm_name TEXT, - kpi_id INTEGER, - kpi_min_value REAL, - kpi_max_value REAL, - in_range INTEGER, - include_min_value INTEGER, - include_max_value INTEGER - ); - """) + try: + result=self.client.execute(""" + CREATE TABLE IF NOT EXISTS alarm( + alarm_id INTEGER PRIMARY KEY AUTOINCREMENT, + alarm_description TEXT, + alarm_name TEXT, + kpi_id INTEGER, + kpi_min_value REAL, + kpi_max_value REAL, + in_range INTEGER, + include_min_value INTEGER, + include_max_value INTEGER + ); + """) + LOGGER.info("Alarm table created in the ManagementDB") + except sqlite3.Error as e: + LOGGER.debug(f"Alarm table cannot be created in the ManagementDB. {e}") + raise Exception def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id): - c = self.client.cursor() - c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id)) - data=c.fetchone() - if data is None: - c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id)) - self.client.commit() - return c.lastrowid - else: - return data[0] + try: + c = self.client.cursor() + c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id)) + data=c.fetchone() + if data is None: + c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id)) + self.client.commit() + kpi_id = c.lastrowid + LOGGER.debug(f"KPI {kpi_id} succesfully inserted in the ManagementDB") + return kpi_id + else: + kpi_id = data[0] + LOGGER.debug(f"KPI {kpi_id} already exists") + return kpi_id + except sqlite3.Error as e: + LOGGER.debug("KPI cannot be inserted in the ManagementDB: {e}") def insert_subscription(self,kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp): - c = self.client.cursor() - c.execute("SELECT subs_id FROM subscription WHERE kpi_id is ? AND subscriber is ? AND sampling_duration_s is ? AND sampling_interval_s is ? AND start_timestamp is ? AND end_timestamp is ?",(kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp)) - data=c.fetchone() - if data is None: - c.execute("INSERT INTO subscription (kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp) VALUES (?,?,?,?,?,?)", (kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp)) - self.client.commit() - return c.lastrowid - else: - print("already exists") - return data[0] + try: + c = self.client.cursor() + c.execute("SELECT subs_id FROM subscription WHERE kpi_id is ? AND subscriber is ? AND sampling_duration_s is ? AND sampling_interval_s is ? AND start_timestamp is ? AND end_timestamp is ?",(kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp)) + data=c.fetchone() + if data is None: + c.execute("INSERT INTO subscription (kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp) VALUES (?,?,?,?,?,?)", (kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp)) + self.client.commit() + subs_id = c.lastrowid + LOGGER.debug(f"Subscription {subs_id} succesfully inserted in the ManagementDB") + return subs_id + else: + subs_id = data[0] + LOGGER.debug(f"Subscription {subs_id} already exists") + return subs_id + except sqlite3.Error as e: + LOGGER.debug("Subscription cannot be inserted in the ManagementDB: {e}") def insert_alarm(self,alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value): - c = self.client.cursor() - c.execute("SELECT alarm_id FROM alarm WHERE alarm_description is ? AND alarm_name is ? AND kpi_id is ? AND kpi_min_value is ? AND kpi_max_value is ? AND in_range is ? AND include_min_value is ? AND include_max_value is ?",(alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value)) - data=c.fetchone() - if data is None: - c.execute("INSERT INTO alarm (alarm_description, alarm_name, kpi_id, kpi_min_value, kpi_max_value, in_range, include_min_value, include_max_value) VALUES (?,?,?,?,?,?,?,?)", (alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value)) - self.client.commit() - return c.lastrowid - else: - print("already exists") - return data[0] + try: + c = self.client.cursor() + c.execute("SELECT alarm_id FROM alarm WHERE alarm_description is ? AND alarm_name is ? AND kpi_id is ? AND kpi_min_value is ? AND kpi_max_value is ? AND in_range is ? AND include_min_value is ? AND include_max_value is ?",(alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value)) + data=c.fetchone() + if data is None: + c.execute("INSERT INTO alarm (alarm_description, alarm_name, kpi_id, kpi_min_value, kpi_max_value, in_range, include_min_value, include_max_value) VALUES (?,?,?,?,?,?,?,?)", (alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value)) + self.client.commit() + alarm_id=c.lastrowid + LOGGER.debug(f"Alarm {alarm_id} succesfully inserted in the ManagementDB") + return alarm_id + else: + alarm_id=data[0] + LOGGER.debug(f"Alarm {alarm_id} already exists") + return alarm_id + except sqlite3.Error as e: + LOGGER.debug(f"Alarm cannot be inserted in the ManagementDB: {e}") def delete_KPI(self,kpi_id): - c = self.client.cursor() - c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)) - data=c.fetchone() - if data is None: - return False - else: - c.execute("DELETE FROM kpi WHERE kpi_id is ?",(kpi_id,)) - self.client.commit() - return True + try: + c = self.client.cursor() + c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)) + data=c.fetchone() + if data is None: + LOGGER.debug(f"KPI {kpi_id} does not exists") + return False + else: + c.execute("DELETE FROM kpi WHERE kpi_id is ?",(kpi_id,)) + self.client.commit() + LOGGER.debug(f"KPI {kpi_id} deleted from the ManagementDB") + return True + except sqlite3.Error as e: + LOGGER.debug(f"KPI cannot be deleted from the ManagementDB: {e}") def delete_subscription(self,subs_id): - c = self.client.cursor() - c.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,)) - data=c.fetchone() - if data is None: - return False - else: - c.execute("DELETE FROM subscription WHERE subs_id is ?",(subs_id,)) - self.client.commit() - return True + try: + c = self.client.cursor() + c.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,)) + data=c.fetchone() + if data is None: + LOGGER.debug(f"Subscription {subs_id} does not exists") + return False + else: + c.execute("DELETE FROM subscription WHERE subs_id is ?",(subs_id,)) + self.client.commit() + LOGGER.debug(f"Subscription {subs_id} deleted from the ManagementDB") + return True + except sqlite3.Error as e: + LOGGER.debug(f"Subscription cannot be deleted from the ManagementDB: {e}") def delete_alarm(self,alarm_id): - c = self.client.cursor() - c.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,)) - data=c.fetchone() - if data is None: - return False - else: - c.execute("DELETE FROM alarm WHERE alarm_id is ?",(alarm_id,)) - self.client.commit() - return True + try: + c = self.client.cursor() + c.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,)) + data=c.fetchone() + if data is None: + LOGGER.debug(f"Alarm {alarm_id} does not exists") + return False + else: + c.execute("DELETE FROM alarm WHERE alarm_id is ?",(alarm_id,)) + self.client.commit() + LOGGER.debug(f"Alarm {alarm_id} deleted from the ManagementDB") + return True + except sqlite3.Error as e: + LOGGER.debug(f"Alarm cannot be deleted from the ManagementDB: {e}") def get_KPI(self,kpi_id): - data = self.client.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)) - return data.fetchone() + try: + data = self.client.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)).fetchone() + if data: + LOGGER.debug(f"KPI {kpi_id} succesfully retrieved from the ManagementDB") + return data + else: + LOGGER.debug(f"KPI {kpi_id} does not exists") + return data + except sqlite3.Error as e: + LOGGER.debug(f"KPI {kpi_id} cannot be retrieved from the ManagementDB: {e}") def get_subscription(self,subs_id): - data = self.client.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,)) - return data.fetchone() + try: + data = self.client.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,)).fetchone() + if data: + LOGGER.debug(f"Subscription {subs_id} succesfully retrieved from the ManagementDB") + return data + else: + LOGGER.debug(f"Subscription {subs_id} does not exists") + return data + except sqlite3.Error as e: + LOGGER.debug(f"Subscription {subs_id} cannot be retrieved from the ManagementDB: {e}") def get_alarm(self,alarm_id): - data = self.client.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,)) - return data.fetchone() + try: + data = self.client.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,)).fetchone() + if data: + LOGGER.debug(f"Alarm {alarm_id} succesfully retrieved from the ManagementDB") + return data + else: + print(data) + LOGGER.debug(f"Alarm {alarm_id} does not exists") + return data + except sqlite3.Error as e: + LOGGER.debug(f"Alarm {alarm_id} cannot be retrieved from the ManagementDB: {e}") def get_KPIS(self): - data = self.client.execute("SELECT * FROM kpi") - return data.fetchall() + try: + data = self.client.execute("SELECT * FROM kpi").fetchall() + LOGGER.debug(f"KPIs succesfully retrieved from the ManagementDB") + return data + except sqlite3.Error as e: + LOGGER.debug(f"KPIs cannot be retrieved from the ManagementDB: {e}") def get_subscriptions(self): - data = self.client.execute("SELECT * FROM subscription") - return data.fetchall() + try: + data = self.client.execute("SELECT * FROM subscription").fetchall() + LOGGER.debug(f"Subscriptions succesfully retrieved from the ManagementDB") + return data + except sqlite3.Error as e: + LOGGER.debug(f"Subscriptions cannot be retrieved from the ManagementDB: {e}") def get_alarms(self): - data = self.client.execute("SELECT * FROM alarm") - return data.fetchall() \ No newline at end of file + try: + data = self.client.execute("SELECT * FROM alarm").fetchall() + LOGGER.debug(f"Alarms succesfully retrieved from the ManagementDB") + return data + except sqlite3.Error as e: + LOGGER.debug(f"Alarms cannot be retrieved from the ManagementDB: {e}") \ No newline at end of file diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index 24335ca72757461e6308944c8de54dce4a191415..1d2f1257c32c0e131e1ce0257f8485b0a7405e28 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -18,133 +18,238 @@ import json import logging import datetime from common.tools.timestamp.Converters import timestamp_float_to_string, timestamp_utcnow_to_float - +import psycopg2 LOGGER = logging.getLogger(__name__) class MetricsDB(): - def __init__(self, host, ilp_port, rest_port, table, commit_lag_ms): - self.host=host - self.ilp_port=int(ilp_port) - self.rest_port=rest_port - self.table=table + def __init__(self, host, ilp_port=9009, rest_port=9000, table="monitoring", commit_lag_ms=1000, retries=10, postgre=False, postgre_port=8812, postgre_user='admin',postgre_password='quest'): + try: + self.host=host + self.ilp_port=int(ilp_port) + self.rest_port=rest_port + self.table=table + self.commit_lag_ms=commit_lag_ms + self.retries=retries + self.postgre=postgre + self.postgre_port=postgre_port + self.postgre_user=postgre_user + self.postgre_password=postgre_password + self.create_table() + LOGGER.info("MetricsDB initialized") + except: + LOGGER.info("MetricsDB cannot be initialized") + raise Exception("Critical error in the monitoring component") + + def is_postgre_enabled(self): + LOGGER.info(f"PostgreSQL is {self.postgre}") + return self.postgre + + def get_retry_number(self): + LOGGER.info(f"Retry number is {self.retries}") + return self.retries + + def get_commit_lag(self): + LOGGER.info(f"Commit lag of monitoring queries is {self.commit_lag_ms} ms") + return self.commit_lag_ms + + def enable_postgre_mode(self): + self.postgre=True + LOGGER.info("MetricsDB PostgreSQL query mode enabled") + + def disable_postgre_mode(self): + self.postgre=False + LOGGER.info("MetricsDB REST query mode enabled") + + def set_postgre_credentials(self, user, password): + self.postgre_user=user + self.postgre_password=password + LOGGER.info("MetricsDB PostgreSQL credentials changed") + + def set_retry_number(self, retries): + self.retries=retries + LOGGER.info(f"Retriy number changed to {retries}") + + def set_commit_lag(self, commit_lag_ms): self.commit_lag_ms=commit_lag_ms - self.create_table() + LOGGER.info(f"Commit lag of monitoring queries changed to {commit_lag_ms} ms") + + def create_table(self): + try: + query = f'CREATE TABLE IF NOT EXISTS {self.table}'\ + '(kpi_id SYMBOL,'\ + 'kpi_sample_type SYMBOL,'\ + 'device_id SYMBOL,'\ + 'endpoint_id SYMBOL,'\ + 'service_id SYMBOL,'\ + 'timestamp TIMESTAMP,'\ + 'kpi_value DOUBLE)'\ + 'TIMESTAMP(timestamp);' + result=self.run_query(query) + if(result==True): + LOGGER.info(f"Table {self.table} created") + except (Exception) as e: + LOGGER.debug(f"Table {self.table} cannot be created. {e}") + raise Exception def write_KPI(self,time,kpi_id,kpi_sample_type,device_id,endpoint_id,service_id,kpi_value): counter=0 - number_of_retries=10 - while (counter<number_of_retries): + while (counter<self.retries): try: with Sender(self.host, self.ilp_port) as sender: sender.row( self.table, symbols={ - 'kpi_id': kpi_id, - 'kpi_sample_type': kpi_sample_type, - 'device_id': device_id, - 'endpoint_id': endpoint_id, - 'service_id': service_id}, + 'kpi_id': kpi_id, + 'kpi_sample_type': kpi_sample_type, + 'device_id': device_id, + 'endpoint_id': endpoint_id, + 'service_id': service_id}, columns={ - 'kpi_value': kpi_value}, + 'kpi_value': kpi_value}, at=datetime.datetime.fromtimestamp(time)) sender.flush() - counter=number_of_retries - LOGGER.info(f"KPI written") - except IngressError as ierr: - # LOGGER.info(ierr) - # LOGGER.info(f"Retry number {counter}") + counter=self.retries + LOGGER.debug(f"KPI written in the MetricsDB") + except (Exception, IngressError) as e: counter=counter+1 - + if counter==self.retries: + raise Exception(f"Maximum number of retries achieved: {self.retries}") def run_query(self, sql_query): - query_params = {'query': sql_query, 'fmt' : 'json'} - url = f"http://{self.host}:{self.rest_port}/exec" - response = requests.get(url, params=query_params) - json_response = json.loads(response.text) - LOGGER.info(f"Query executed, result:{json_response}") - return json_response - - def create_table(self): - query = f'CREATE TABLE IF NOT EXISTS {self.table}'\ - '(kpi_id SYMBOL,'\ - 'kpi_sample_type SYMBOL,'\ - 'device_id SYMBOL,'\ - 'endpoint_id SYMBOL,'\ - 'service_id SYMBOL,'\ - 'timestamp TIMESTAMP,'\ - 'kpi_value DOUBLE)'\ - 'TIMESTAMP(timestamp);' - self.run_query(query) - LOGGER.info(f"Table {self.table} created") + counter=0 + while (counter<self.retries): + try: + query_params = {'query': sql_query, 'fmt' : 'json'} + url = f"http://{self.host}:{self.rest_port}/exec" + response = requests.get(url, params=query_params) + json_response = json.loads(response.text) + if('ddl' in json_response): + LOGGER.debug(f"REST query executed succesfully, result: {json_response['ddl']}") + counter=self.retries + return True + elif('dataset' in json_response): + LOGGER.debug(f"REST query executed, result: {json_response['dataset']}") + counter=self.retries + return json_response['dataset'] + except (Exception, requests.exceptions.RequestException) as e: + counter=counter+1 + if counter==self.retries: + raise Exception(f"Maximum number of retries achieved: {self.retries}") + + def run_query_postgre(self, postgre_sql_query): + connection = None + cursor = None + counter=0 + while (counter<self.retries): + try: + connection = psycopg2.connect( + user=self.postgre_user, + password=self.postgre_password, + host=self.host, + port=self.postgre_port, + database=self.table) + cursor = connection.cursor() + cursor.execute(postgre_sql_query) + result = cursor.fetchall() + LOGGER.debug(f"PostgreSQL query executed, result: {result}") + counter=self.retries + return result + except (Exception, psycopg2.Error) as e: + counter=counter+1 + if counter==self.retries: + raise Exception(f"Maximum number of retries achieved: {self.retries}") + finally: + if cursor: + cursor.close() + if connection: + connection.close() def get_subscription_data(self, kpi_id, sampling_interval_s=1): - end_date = timestamp_utcnow_to_float()-self.commit_lag_ms/1000 - start_date = end_date-sampling_interval_s - query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" - response=self.run_query(query) - kpi_list=response['dataset'] - print(len(kpi_list)) - + try: + end_date = timestamp_utcnow_to_float()-self.commit_lag_ms/1000 + start_date = end_date-sampling_interval_s + query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + if self.postgre: + kpi_list=self.run_query_postgre(query) + else: + kpi_list=self.run_query(query) + if kpi_list: + #queue.append[kpi] + LOGGER.debug(f"New data received for subscription to KPI {kpi_id}") + else: + LOGGER.debug(f"No new data for the subscription to KPI {kpi_id}") + except (Exception) as e: + LOGGER.debug(f"Subscription data cannot be retrieved. {e}") + def get_alarm_data(self, kpi_id, kpiMinValue, kpiMaxValue, inRange=True, includeMinValue=True, includeMaxValue=True, subscription_frequency_ms=1000): - end_date = timestamp_utcnow_to_float()-self.commit_lag_ms/1000 - start_date = end_date-subscription_frequency_ms/1000 - query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" - response=self.run_query(query) - kpi_list=response['dataset'] - for kpi in kpi_list: - alarm = False - kpi_value = kpi[2] - if (kpiMinValue == kpi_value and kpiMaxValue == kpi_value and inRange): - alarm = True - elif (inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue): - if (kpi_value >= kpiMinValue and kpi_value <= kpiMaxValue): - alarm = True - elif (inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): - if (kpi_value >= kpiMinValue and kpi_value < kpiMaxValue): - alarm = True - elif (inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): - if (kpi_value > kpiMinValue and kpi_value <= kpiMaxValue): - alarm = True - elif (inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): - if (kpi_value > kpiMinValue and kpi_value < kpiMaxValue): - alarm = True - elif (not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue): - if (kpi_value <= kpiMinValue or kpi_value >= kpiMaxValue): - alarm = True - elif (not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): - if (kpi_value <= kpiMinValue or kpi_value > kpiMaxValue): - alarm = True - elif (not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): - if (kpi_value < kpiMinValue or kpi_value >= kpiMaxValue): - alarm = True - elif (not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): - if (kpi_value < kpiMinValue or kpi_value > kpiMaxValue): - alarm = True - elif (inRange and kpiMinValue is not None and kpiMaxValue is None and includeMinValue): - if (kpi_value >= kpiMinValue): - alarm = True - elif (inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): - if (kpi_value > kpiMinValue): - alarm = True - elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): - if (kpi_value <= kpiMinValue): - alarm = True - elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): - if (kpi_value <= kpiMinValue): - alarm = True - elif (inRange and kpiMinValue is None and kpiMaxValue is not None and includeMaxValue): - if (kpi_value <= kpiMaxValue): - alarm = True - elif (inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): - if (kpi_value < kpiMaxValue): - alarm = True - elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): - if (kpi_value >= kpiMaxValue): - alarm = True - elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): - if (kpi_value >= kpiMaxValue): - alarm = True - if alarm: - print(kpi) - # LOGGER.info(f"Alarm triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") + try: + end_date = timestamp_utcnow_to_float()-self.commit_lag_ms/1000 + start_date = end_date-subscription_frequency_ms/1000 + query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + if self.postgre: + kpi_list=self.run_query_postgre(query) + else: + kpi_list=self.run_query(query) + if kpi_list: + LOGGER.debug(f"New data received for alarm of KPI {kpi_id}") + for kpi in kpi_list: + alarm = False + kpi_value = kpi[2] + if (kpiMinValue == kpi_value and kpiMaxValue == kpi_value and inRange): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue): + if (kpi_value >= kpiMinValue and kpi_value <= kpiMaxValue): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): + if (kpi_value >= kpiMinValue and kpi_value < kpiMaxValue): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): + if (kpi_value > kpiMinValue and kpi_value <= kpiMaxValue): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): + if (kpi_value > kpiMinValue and kpi_value < kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue): + if (kpi_value <= kpiMinValue or kpi_value >= kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): + if (kpi_value <= kpiMinValue or kpi_value > kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): + if (kpi_value < kpiMinValue or kpi_value >= kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): + if (kpi_value < kpiMinValue or kpi_value > kpiMaxValue): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is None and includeMinValue): + if (kpi_value >= kpiMinValue): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): + if (kpi_value > kpiMinValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): + if (kpi_value <= kpiMinValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): + if (kpi_value <= kpiMinValue): + alarm = True + elif (inRange and kpiMinValue is None and kpiMaxValue is not None and includeMaxValue): + if (kpi_value <= kpiMaxValue): + alarm = True + elif (inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): + if (kpi_value < kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): + if (kpi_value >= kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): + if (kpi_value >= kpiMaxValue): + alarm = True + if alarm: + #queue.append[kpi] + LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") + else: + LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") + except (Exception) as e: + LOGGER.debug(f"Alarm data cannot be retrieved. {e}") \ No newline at end of file diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index 94e602bef6231cf0677d59087556d6a06f92e062..a07d40b550faa64eef3a4e4a24b825e60c1d2b4b 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -1,16 +1,18 @@ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ProcessPoolExecutor +from apscheduler.jobstores.base import JobLookupError from datetime import datetime import time +import logging +LOGGER = logging.getLogger(__name__) class SubscriptionManager(): def __init__(self, metrics_db): self.metrics_db = metrics_db self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) self.scheduler.start() - print("Subscription Manager Initialized") - + LOGGER.info("Subscription Manager Initialized") def create_subscription(self, subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): start_date=None @@ -24,14 +26,11 @@ class SubscriptionManager(): if end_timestamp: end_date = datetime.fromtimestamp(end_timestamp) self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(kpi_id, sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) + LOGGER.debug(f"Subscrition job {subscription_id} succesfully created") def delete_subscription(self, subscription_id): - self.scheduler.remove_job(subscription_id) - -if __name__ == '__main__': - - import MetricsDBTools - metrics_db=MetricsDBTools.MetricsDB("localhost",9009,9000,"monitoring",1000) - subscription_manager=SubscriptionManager(metrics_db) - subscription_manager.create_subscription('1',1,1) - time.sleep(100) \ No newline at end of file + try: + self.scheduler.remove_job(subscription_id) + LOGGER.debug(f"Subscription job {subscription_id} succesfully deleted") + except (Exception, JobLookupError) as e: + LOGGER.debug(f"Subscription job {subscription_id} does not exists") \ No newline at end of file