diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py index 04693d3ffe40acb04beba31111baa0be5b50ecdd..9b603917cf25cc18ee5adc9083f1f5651b205013 100644 --- a/src/monitoring/service/ManagementDBTools.py +++ b/src/monitoring/service/ManagementDBTools.py @@ -13,123 +13,237 @@ # limitations under the License. from common.tools.timestamp.Converters import timestamp_utcnow_to_float -import sqlite3 as sl -import os -ROOT_DIR = os.path.abspath(os.curdir) +import sqlite3 +import logging + +LOGGER = logging.getLogger(__name__) class ManagementDB(): def __init__(self, database): - self.client = sl.connect(database, check_same_thread=False) - self.create_monitoring_table() - self.create_subscription_table() + try: + self.client = sqlite3.connect(database, check_same_thread=False) + self.create_monitoring_table() + self.create_subscription_table() + self.create_alarm_table() + LOGGER.info("ManagementDB initialized") + except: + LOGGER.info("ManagementDB cannot be initialized") + raise Exception("Critical error in the monitoring component") def create_monitoring_table(self): - self.client.execute(""" - CREATE TABLE IF NOT EXISTS kpi( - kpi_id INTEGER PRIMARY KEY AUTOINCREMENT, - kpi_description TEXT, - kpi_sample_type INTEGER, - device_id INTEGER, - endpoint_id INTEGER, - service_id INTEGER - ); - """) + try: + result=self.client.execute(""" + CREATE TABLE IF NOT EXISTS kpi( + kpi_id INTEGER PRIMARY KEY AUTOINCREMENT, + kpi_description TEXT, + kpi_sample_type INTEGER, + device_id INTEGER, + endpoint_id INTEGER, + service_id INTEGER + ); + """) + LOGGER.debug("KPI table created in the ManagementDB") + except sqlite3.Error as e: + LOGGER.debug(f"KPI table cannot be created in the ManagementD. {e}") + raise Exception def create_subscription_table(self): - self.client.execute(""" - CREATE TABLE IF NOT EXISTS subscription( - subs_id INTEGER PRIMARY KEY AUTOINCREMENT, - kpi_id INTEGER, - subscriber TEXT, - sampling_duration_s REAL, - sampling_interval_s REAL, - start_timestamp REAL, - end_timestamp REAL - ); - """) + try: + result= self.client.execute(""" + CREATE TABLE IF NOT EXISTS subscription( + subs_id INTEGER PRIMARY KEY AUTOINCREMENT, + kpi_id INTEGER, + subscriber TEXT, + sampling_duration_s REAL, + sampling_interval_s REAL, + start_timestamp REAL, + end_timestamp REAL + ); + """) + LOGGER.info("Subscription table created in the ManagementDB") + except sqlite3.Error as e: + LOGGER.debug(f"Subscription table cannot be created in the ManagementDB. {e}") + raise Exception + + def create_alarm_table(self): + try: + result=self.client.execute(""" + CREATE TABLE IF NOT EXISTS alarm( + alarm_id INTEGER PRIMARY KEY AUTOINCREMENT, + alarm_description TEXT, + alarm_name TEXT, + kpi_id INTEGER, + kpi_min_value REAL, + kpi_max_value REAL, + in_range INTEGER, + include_min_value INTEGER, + include_max_value INTEGER + ); + """) + LOGGER.info("Alarm table created in the ManagementDB") + except sqlite3.Error as e: + LOGGER.debug(f"Alarm table cannot be created in the ManagementDB. {e}") + raise Exception def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id): - c = self.client.cursor() - c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ?",(device_id,kpi_sample_type,endpoint_id)) - data=c.fetchone() - if data is None: - c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id)) - self.client.commit() - return c.lastrowid - else: - return data[0] + try: + c = self.client.cursor() + c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id)) + data=c.fetchone() + if data is None: + c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id)) + self.client.commit() + kpi_id = c.lastrowid + LOGGER.debug(f"KPI {kpi_id} succesfully inserted in the ManagementDB") + return kpi_id + else: + kpi_id = data[0] + LOGGER.debug(f"KPI {kpi_id} already exists") + return kpi_id + except sqlite3.Error as e: + LOGGER.debug("KPI cannot be inserted in the ManagementDB: {e}") def insert_subscription(self,kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp): - c = self.client.cursor() - c.execute("SELECT subs_id FROM subscription WHERE kpi_id is ? AND subscriber is ? AND sampling_duration_s is ? AND sampling_interval_s is ? AND start_timestamp is ? AND end_timestamp is ?",(kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp)) - data=c.fetchone() - if data is None: - c.execute("INSERT INTO subscription (kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp) VALUES (?,?,?,?,?,?)", (kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp)) - self.client.commit() - return c.lastrowid - else: - print("already exists") - return data[0] + try: + c = self.client.cursor() + c.execute("SELECT subs_id FROM subscription WHERE kpi_id is ? AND subscriber is ? AND sampling_duration_s is ? AND sampling_interval_s is ? AND start_timestamp is ? AND end_timestamp is ?",(kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp)) + data=c.fetchone() + if data is None: + c.execute("INSERT INTO subscription (kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp) VALUES (?,?,?,?,?,?)", (kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp)) + self.client.commit() + subs_id = c.lastrowid + LOGGER.debug(f"Subscription {subs_id} succesfully inserted in the ManagementDB") + return subs_id + else: + subs_id = data[0] + LOGGER.debug(f"Subscription {subs_id} already exists") + return subs_id + except sqlite3.Error as e: + LOGGER.debug("Subscription cannot be inserted in the ManagementDB: {e}") + + def insert_alarm(self,alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value): + try: + c = self.client.cursor() + c.execute("SELECT alarm_id FROM alarm WHERE alarm_description is ? AND alarm_name is ? AND kpi_id is ? AND kpi_min_value is ? AND kpi_max_value is ? AND in_range is ? AND include_min_value is ? AND include_max_value is ?",(alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value)) + data=c.fetchone() + if data is None: + c.execute("INSERT INTO alarm (alarm_description, alarm_name, kpi_id, kpi_min_value, kpi_max_value, in_range, include_min_value, include_max_value) VALUES (?,?,?,?,?,?,?,?)", (alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value)) + self.client.commit() + alarm_id=c.lastrowid + LOGGER.debug(f"Alarm {alarm_id} succesfully inserted in the ManagementDB") + return alarm_id + else: + alarm_id=data[0] + LOGGER.debug(f"Alarm {alarm_id} already exists") + return alarm_id + except sqlite3.Error as e: + LOGGER.debug(f"Alarm cannot be inserted in the ManagementDB: {e}") - def delete_KPI(self,device_id,kpi_sample_type): - c = self.client.cursor() - c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ?",(device_id,kpi_sample_type)) - data=c.fetchone() - if data is None: - return False - else: - c.execute("DELETE FROM kpi WHERE device_id is ? AND kpi_sample_type is ?",(device_id,kpi_sample_type)) - self.client.commit() - return True + def delete_KPI(self,kpi_id): + try: + c = self.client.cursor() + c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)) + data=c.fetchone() + if data is None: + LOGGER.debug(f"KPI {kpi_id} does not exists") + return False + else: + c.execute("DELETE FROM kpi WHERE kpi_id is ?",(kpi_id,)) + self.client.commit() + LOGGER.debug(f"KPI {kpi_id} deleted from the ManagementDB") + return True + except sqlite3.Error as e: + LOGGER.debug(f"KPI cannot be deleted from the ManagementDB: {e}") def delete_subscription(self,subs_id): - c = self.client.cursor() - c.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,)) - data=c.fetchone() - if data is None: - return False - else: - c.execute("DELETE FROM subscription WHERE subs_id is ?",(subs_id,)) - self.client.commit() - return True + try: + c = self.client.cursor() + c.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,)) + data=c.fetchone() + if data is None: + LOGGER.debug(f"Subscription {subs_id} does not exists") + return False + else: + c.execute("DELETE FROM subscription WHERE subs_id is ?",(subs_id,)) + self.client.commit() + LOGGER.debug(f"Subscription {subs_id} deleted from the ManagementDB") + return True + except sqlite3.Error as e: + LOGGER.debug(f"Subscription cannot be deleted from the ManagementDB: {e}") - def delete_kpid_id(self,kpi_id): - c = self.client.cursor() - c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)) - data=c.fetchone() - if data is None: - return False - else: - c.execute("DELETE FROM kpi WHERE kpi_id is ?",(kpi_id,)) - self.client.commit() - return True + def delete_alarm(self,alarm_id): + try: + c = self.client.cursor() + c.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,)) + data=c.fetchone() + if data is None: + LOGGER.debug(f"Alarm {alarm_id} does not exists") + return False + else: + c.execute("DELETE FROM alarm WHERE alarm_id is ?",(alarm_id,)) + self.client.commit() + LOGGER.debug(f"Alarm {alarm_id} deleted from the ManagementDB") + return True + except sqlite3.Error as e: + LOGGER.debug(f"Alarm cannot be deleted from the ManagementDB: {e}") def get_KPI(self,kpi_id): - data = self.client.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)) - return data.fetchone() + try: + data = self.client.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)).fetchone() + if data: + LOGGER.debug(f"KPI {kpi_id} succesfully retrieved from the ManagementDB") + return data + else: + LOGGER.debug(f"KPI {kpi_id} does not exists") + return data + except sqlite3.Error as e: + LOGGER.debug(f"KPI {kpi_id} cannot be retrieved from the ManagementDB: {e}") def get_subscription(self,subs_id): - data = self.client.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,)) - return data.fetchone() + try: + data = self.client.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,)).fetchone() + if data: + LOGGER.debug(f"Subscription {subs_id} succesfully retrieved from the ManagementDB") + return data + else: + LOGGER.debug(f"Subscription {subs_id} does not exists") + return data + except sqlite3.Error as e: + LOGGER.debug(f"Subscription {subs_id} cannot be retrieved from the ManagementDB: {e}") + + def get_alarm(self,alarm_id): + try: + data = self.client.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,)).fetchone() + if data: + LOGGER.debug(f"Alarm {alarm_id} succesfully retrieved from the ManagementDB") + return data + else: + print(data) + LOGGER.debug(f"Alarm {alarm_id} does not exists") + return data + except sqlite3.Error as e: + LOGGER.debug(f"Alarm {alarm_id} cannot be retrieved from the ManagementDB: {e}") def get_KPIS(self): - data = self.client.execute("SELECT * FROM kpi") - #print("\n") - #for row in data: - # print(row) - return data.fetchall() + try: + data = self.client.execute("SELECT * FROM kpi").fetchall() + LOGGER.debug(f"KPIs succesfully retrieved from the ManagementDB") + return data + except sqlite3.Error as e: + LOGGER.debug(f"KPIs cannot be retrieved from the ManagementDB: {e}") def get_subscriptions(self): - data = self.client.execute("SELECT * FROM subscription") - #print("\n") - #for row in data: - # print(row) - return data.fetchall() - -def main(): - managementdb = ManagementDB("monitoring.db") - subs_id = managementdb.insert_subscription(2,"localhost",20,2,timestamp_utcnow_to_float(),timestamp_utcnow_to_float() + 10) - print("subs id: ", str(subs_id)) + try: + data = self.client.execute("SELECT * FROM subscription").fetchall() + LOGGER.debug(f"Subscriptions succesfully retrieved from the ManagementDB") + return data + except sqlite3.Error as e: + LOGGER.debug(f"Subscriptions cannot be retrieved from the ManagementDB: {e}") -if __name__ == '__main__': - main() + def get_alarms(self): + try: + data = self.client.execute("SELECT * FROM alarm").fetchall() + LOGGER.debug(f"Alarms succesfully retrieved from the ManagementDB") + return data + except sqlite3.Error as e: + LOGGER.debug(f"Alarms cannot be retrieved from the ManagementDB: {e}") diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index 2a5b05cfe8ef6a31416ebf1e5fb19044a94a9879..f2e10286af7b6dd588e72db76bcd04801c4d7b41 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -4,25 +4,19 @@ from random import random from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ProcessPoolExecutor -from apscheduler.triggers.interval import IntervalTrigger -from apscheduler.triggers.cron import CronTrigger - -from common.proto.monitoring_pb2 import Kpi, KpiList -from common.tools.timestamp import Converters -from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_float_to_string +from apscheduler.jobstores.base import JobLookupError from datetime import datetime import time +import logging -from monitoring.service import MetricsDBTools -from monitoring.service.ManagementDBTools import ManagementDB - +LOGGER = logging.getLogger(__name__) class SubscriptionManager(): def __init__(self, metrics_db): self.metrics_db = metrics_db self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) self.scheduler.start() - + LOGGER.info("Subscription Manager Initialized") def create_subscription(self, subs_queue ,subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): start_date=None @@ -168,72 +162,13 @@ if __name__ == '__main__': start_date = datetime.utcfromtimestamp(start_timestamp) print("start_date: " + str(start_date)) if end_timestamp: - end_date = datetime.utcfromtimestamp(end_timestamp) - print("end_date: " + str(end_date)) - self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue, kpi_id, start_timestamp, sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) - self.metrics_db.get_subscription_data(subs_queue,kpi_id,end_date,sampling_interval_s) - - + end_date = datetime.fromtimestamp(end_timestamp) + self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(kpi_id, sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) + LOGGER.debug(f"Subscrition job {subscription_id} succesfully created") def delete_subscription(self, subscription_id): - self.scheduler.remove_job(subscription_id) - - -def main(): - - subs_queue = Queue() - - managementdb = ManagementDB("monitoring.db") - metrics_db = MetricsDBTools.MetricsDB("localhost", "9009", "9000", "monitoring") - subs_manager = SubscriptionManager(metrics_db) - - print("Here") - - kpi_id = "2" - sampling_duration_s = 10 - sampling_interval_s = 2 - start_timestamp = timestamp_utcnow_to_float() - end_timestamp = timestamp_utcnow_to_float() + 10 - - - - print("Before loop") - print("start_timestamp: " + timestamp_float_to_string(start_timestamp)) - print("end_timestamp: " + timestamp_float_to_string(end_timestamp)) - - - for i in range(10): - timestamp = timestamp_utcnow_to_float() - kpitype = "KPISAMPLETYPE_PACKETS_TRANSMITTED" - deviceId = "DEV01" - endpointId = "END01" - serviceId = "SERV01" - kpi_value = 50*random() - metrics_db.write_KPI(timestamp,kpi_id,kpitype,deviceId,endpointId,serviceId,kpi_value) - time.sleep(1) - - print("After loop") - - subs_id = managementdb.insert_subscription(kpi_id, "localhost", sampling_duration_s, sampling_interval_s, start_timestamp, - end_timestamp) - - subs_manager.create_subscription(subs_queue,str(subs_id),kpi_id,sampling_interval_s,sampling_duration_s,start_timestamp,end_timestamp) - - print("Queue empty: " + str(subs_queue.empty())) - print("Queue size: " + str(subs_queue.qsize())) - - while not subs_queue.empty(): - list = subs_queue.get_nowait() - print("List: " + str(list)) - kpi_list = KpiList() - for item in list: - kpi = Kpi() - kpi.kpi_id.kpi_id.uuid = item[0] - kpi.timestamp.timestamp = Converters.timestamp_string_to_float(item[1]) - kpi.kpi_value.floatVal = item[2] - kpi_list.kpi.append(kpi) - print("Kpi List: " + str(kpi_list)) - - -if __name__ == '__main__': - main() + try: + self.scheduler.remove_job(subscription_id) + LOGGER.debug(f"Subscription job {subscription_id} succesfully deleted") + except (Exception, JobLookupError) as e: + LOGGER.debug(f"Subscription job {subscription_id} does not exists")