# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import time from random import random from questdb.ingress import Sender, IngressError import requests import json import logging import datetime from common.tools.timestamp.Converters import timestamp_float_to_string, timestamp_utcnow_to_float import psycopg2 LOGGER = logging.getLogger(__name__) class MetricsDB(): def __init__(self, host, ilp_port=9009, rest_port=9000, table="monitoring", commit_lag_ms=1000, retries=10, postgre=False, postgre_port=8812, postgre_user='admin', postgre_password='quest'): try: self.host = host self.ilp_port = int(ilp_port) self.rest_port = rest_port self.table = table self.commit_lag_ms = commit_lag_ms self.retries = retries self.postgre = postgre self.postgre_port = postgre_port self.postgre_user = postgre_user self.postgre_password = postgre_password self.create_table() LOGGER.info("MetricsDB initialized") except: LOGGER.info("MetricsDB cannot be initialized") raise Exception("Critical error in the monitoring component") def is_postgre_enabled(self): LOGGER.info(f"PostgreSQL is {self.postgre}") return self.postgre def get_retry_number(self): LOGGER.info(f"Retry number is {self.retries}") return self.retries def get_commit_lag(self): LOGGER.info(f"Commit lag of monitoring queries is {self.commit_lag_ms} ms") return self.commit_lag_ms def enable_postgre_mode(self): self.postgre = True LOGGER.info("MetricsDB PostgreSQL query mode enabled") def disable_postgre_mode(self): self.postgre = False LOGGER.info("MetricsDB REST query mode enabled") def set_postgre_credentials(self, user, password): self.postgre_user = user self.postgre_password = password LOGGER.info("MetricsDB PostgreSQL credentials changed") def set_retry_number(self, retries): self.retries = retries LOGGER.info(f"Retriy number changed to {retries}") def set_commit_lag(self, commit_lag_ms): self.commit_lag_ms = commit_lag_ms LOGGER.info(f"Commit lag of monitoring queries changed to {commit_lag_ms} ms") def create_table(self): try: query = f'CREATE TABLE IF NOT EXISTS {self.table}' \ '(kpi_id SYMBOL,' \ 'kpi_sample_type SYMBOL,' \ 'device_id SYMBOL,' \ 'endpoint_id SYMBOL,' \ 'service_id SYMBOL,' \ 'timestamp TIMESTAMP,' \ 'kpi_value DOUBLE)' \ 'TIMESTAMP(timestamp);' result = self.run_query(query) if (result == True): LOGGER.info(f"Table {self.table} created") except (Exception) as e: LOGGER.debug(f"Table {self.table} cannot be created. {e}") raise Exception def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, kpi_value): counter = 0 while (counter < self.retries): try: with Sender(self.host, self.ilp_port) as sender: sender.row( self.table, symbols={ 'kpi_id': kpi_id, 'kpi_sample_type': kpi_sample_type, 'device_id': device_id, 'endpoint_id': endpoint_id, 'service_id': service_id}, columns={ 'kpi_value': kpi_value}, at=datetime.datetime.fromtimestamp(time)) sender.flush() counter = self.retries LOGGER.debug(f"KPI written in the MetricsDB") except (Exception, IngressError) as e: counter = counter + 1 if counter == self.retries: raise Exception(f"Maximum number of retries achieved: {self.retries}") def run_query(self, sql_query): counter = 0 while (counter < self.retries): try: query_params = {'query': sql_query, 'fmt': 'json'} url = f"http://{self.host}:{self.rest_port}/exec" response = requests.get(url, params=query_params) json_response = json.loads(response.text) if ('ddl' in json_response): LOGGER.debug(f"REST query executed succesfully, result: {json_response['ddl']}") counter = self.retries return True elif ('dataset' in json_response): LOGGER.debug(f"REST query executed, result: {json_response['dataset']}") counter = self.retries return json_response['dataset'] except (Exception, requests.exceptions.RequestException) as e: counter = counter + 1 if counter == self.retries: raise Exception(f"Maximum number of retries achieved: {self.retries}") def run_query_postgre(self, postgre_sql_query): connection = None cursor = None counter = 0 while (counter < self.retries): try: connection = psycopg2.connect( user=self.postgre_user, password=self.postgre_password, host=self.host, port=self.postgre_port, database=self.table) cursor = connection.cursor() cursor.execute(postgre_sql_query) result = cursor.fetchall() LOGGER.debug(f"PostgreSQL query executed, result: {result}") counter = self.retries return result except (Exception, psycopg2.Error) as e: counter = counter + 1 if counter == self.retries: raise Exception(f"Maximum number of retries achieved: {self.retries}") finally: if cursor: cursor.close() if connection: connection.close() def get_subscription_data(self,subs_queue, kpi_id, sampling_interval_s=1): try: end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000 start_date = end_date - sampling_interval_s query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" if self.postgre: kpi_list = self.run_query_postgre(query) LOGGER.debug(f"kpi_list postgre: {kpi_list}") else: kpi_list = self.run_query(query) LOGGER.debug(f"kpi_list influx: {kpi_list}") if kpi_list: subs_queue.put_nowait(kpi_list) LOGGER.debug(f"New data received for subscription to KPI {kpi_id}") else: LOGGER.debug(f"No new data for the subscription to KPI {kpi_id}") except (Exception) as e: LOGGER.debug(f"Subscription data cannot be retrieved. {e}") def get_alarm_data(self, alarm_queue, kpi_id, kpiMinValue, kpiMaxValue, inRange=True, includeMinValue=True, includeMaxValue=True, subscription_frequency_ms=1000): try: end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000 start_date = end_date - subscription_frequency_ms / 1000 query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" if self.postgre: kpi_list = self.run_query_postgre(query) else: kpi_list = self.run_query(query) if kpi_list: LOGGER.debug(f"New data received for alarm of KPI {kpi_id}") for kpi in kpi_list: alarm = False kpi_value = kpi[2] if (kpiMinValue == kpi_value and kpiMaxValue == kpi_value and inRange): alarm = True elif ( inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue): if (kpi_value >= kpiMinValue and kpi_value <= kpiMaxValue): alarm = True elif ( inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): if (kpi_value >= kpiMinValue and kpi_value < kpiMaxValue): alarm = True elif ( inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): if (kpi_value > kpiMinValue and kpi_value <= kpiMaxValue): alarm = True elif ( inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): if (kpi_value > kpiMinValue and kpi_value < kpiMaxValue): alarm = True elif ( not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue): if (kpi_value <= kpiMinValue or kpi_value >= kpiMaxValue): alarm = True elif ( not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): if (kpi_value <= kpiMinValue or kpi_value > kpiMaxValue): alarm = True elif ( not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): if (kpi_value < kpiMinValue or kpi_value >= kpiMaxValue): alarm = True elif ( not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): if (kpi_value < kpiMinValue or kpi_value > kpiMaxValue): alarm = True elif (inRange and kpiMinValue is not None and kpiMaxValue is None and includeMinValue): if (kpi_value >= kpiMinValue): alarm = True elif (inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): if (kpi_value > kpiMinValue): alarm = True elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): if (kpi_value <= kpiMinValue): alarm = True elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): if (kpi_value <= kpiMinValue): alarm = True elif (inRange and kpiMinValue is None and kpiMaxValue is not None and includeMaxValue): if (kpi_value <= kpiMaxValue): alarm = True elif (inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): if (kpi_value < kpiMaxValue): alarm = True elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): if (kpi_value >= kpiMaxValue): alarm = True elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): if (kpi_value >= kpiMaxValue): alarm = True if alarm: # queue.append[kpi] alarm_queue.put_nowait(kpi) LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") else: LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") except (Exception) as e: LOGGER.debug(f"Alarm data cannot be retrieved. {e}")