Skip to content
Snippets Groups Projects
MetricsDBTools.py 13.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    import time
    from random import random
    
    
    from questdb.ingress import Sender, IngressError
    
    import requests
    import json
    
    import logging
    
    from common.tools.timestamp.Converters import timestamp_float_to_string, timestamp_utcnow_to_float
    import psycopg2
    
    
    LOGGER = logging.getLogger(__name__)
    
    class MetricsDB():
    
        def __init__(self, host, ilp_port=9009, rest_port=9000, table="monitoring", commit_lag_ms=1000, retries=10,
                     postgre=False, postgre_port=8812, postgre_user='admin', postgre_password='quest'):
            try:
                self.host = host
                self.ilp_port = int(ilp_port)
                self.rest_port = rest_port
                self.table = table
                self.commit_lag_ms = commit_lag_ms
                self.retries = retries
                self.postgre = postgre
                self.postgre_port = postgre_port
                self.postgre_user = postgre_user
                self.postgre_password = postgre_password
                self.create_table()
                LOGGER.info("MetricsDB initialized")
            except:
                LOGGER.info("MetricsDB cannot be initialized")
                raise Exception("Critical error in the monitoring component")
    
        def is_postgre_enabled(self):
            LOGGER.info(f"PostgreSQL is {self.postgre}")
            return self.postgre
    
        def get_retry_number(self):
            LOGGER.info(f"Retry number is {self.retries}")
            return self.retries
    
        def get_commit_lag(self):
            LOGGER.info(f"Commit lag of monitoring queries is {self.commit_lag_ms} ms")
            return self.commit_lag_ms
    
        def enable_postgre_mode(self):
            self.postgre = True
            LOGGER.info("MetricsDB PostgreSQL query mode enabled")
    
        def disable_postgre_mode(self):
            self.postgre = False
            LOGGER.info("MetricsDB REST query mode enabled")
    
        def set_postgre_credentials(self, user, password):
            self.postgre_user = user
            self.postgre_password = password
            LOGGER.info("MetricsDB PostgreSQL credentials changed")
    
        def set_retry_number(self, retries):
            self.retries = retries
            LOGGER.info(f"Retriy number changed to {retries}")
    
        def set_commit_lag(self, commit_lag_ms):
            self.commit_lag_ms = commit_lag_ms
            LOGGER.info(f"Commit lag of monitoring queries changed to {commit_lag_ms} ms")
    
        def create_table(self):
            try:
                query = f'CREATE TABLE IF NOT EXISTS {self.table}' \
                        '(kpi_id SYMBOL,' \
                        'kpi_sample_type SYMBOL,' \
                        'device_id SYMBOL,' \
                        'endpoint_id SYMBOL,' \
                        'service_id SYMBOL,' \
                        'timestamp TIMESTAMP,' \
                        'kpi_value DOUBLE)' \
                        'TIMESTAMP(timestamp);'
                result = self.run_query(query)
                if (result == True):
                    LOGGER.info(f"Table {self.table} created")
            except (Exception) as e:
                LOGGER.debug(f"Table {self.table} cannot be created. {e}")
                raise Exception
    
        def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, kpi_value):
            counter = 0
            while (counter < self.retries):
                try:
                    with Sender(self.host, self.ilp_port) as sender:
                        sender.row(
                            self.table,
                            symbols={
                                'kpi_id': kpi_id,
                                'kpi_sample_type': kpi_sample_type,
                                'device_id': device_id,
                                'endpoint_id': endpoint_id,
                                'service_id': service_id},
                            columns={
                                'kpi_value': kpi_value},
                            at=datetime.datetime.fromtimestamp(time))
                        sender.flush()
                    counter = self.retries
                    LOGGER.debug(f"KPI written in the MetricsDB")
                except (Exception, IngressError) as e:
                    counter = counter + 1
                    if counter == self.retries:
                        raise Exception(f"Maximum number of retries achieved: {self.retries}")
    
        def run_query(self, sql_query):
            counter = 0
            while (counter < self.retries):
                try:
                    query_params = {'query': sql_query, 'fmt': 'json'}
                    url = f"http://{self.host}:{self.rest_port}/exec"
                    response = requests.get(url, params=query_params)
                    json_response = json.loads(response.text)
                    if ('ddl' in json_response):
                        LOGGER.debug(f"REST query executed succesfully, result: {json_response['ddl']}")
                        counter = self.retries
                        return True
                    elif ('dataset' in json_response):
                        LOGGER.debug(f"REST query executed, result: {json_response['dataset']}")
                        counter = self.retries
                        return json_response['dataset']
                except (Exception, requests.exceptions.RequestException) as e:
                    counter = counter + 1
                    if counter == self.retries:
                        raise Exception(f"Maximum number of retries achieved: {self.retries}")
    
        def run_query_postgre(self, postgre_sql_query):
            connection = None
            cursor = None
            counter = 0
            while (counter < self.retries):
                try:
                    connection = psycopg2.connect(
                        user=self.postgre_user,
                        password=self.postgre_password,
                        host=self.host,
                        port=self.postgre_port,
                        database=self.table)
                    cursor = connection.cursor()
                    cursor.execute(postgre_sql_query)
                    result = cursor.fetchall()
                    LOGGER.debug(f"PostgreSQL query executed, result: {result}")
                    counter = self.retries
                    return result
                except (Exception, psycopg2.Error) as e:
                    counter = counter + 1
                    if counter == self.retries:
                        raise Exception(f"Maximum number of retries achieved: {self.retries}")
                finally:
                    if cursor:
                        cursor.close()
                    if connection:
                        connection.close()
    
        def get_subscription_data(self,subs_queue, kpi_id, sampling_interval_s=1):
            try:
                end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000
                start_date = end_date - sampling_interval_s
                query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')"
                if self.postgre:
                    kpi_list = self.run_query_postgre(query)
                    LOGGER.debug(f"kpi_list postgre: {kpi_list}")
                else:
                    kpi_list = self.run_query(query)
                    LOGGER.debug(f"kpi_list influx: {kpi_list}")
                if kpi_list:
                    subs_queue.put_nowait(kpi_list)
                    LOGGER.debug(f"New data received for subscription to KPI {kpi_id}")
                else:
                    LOGGER.debug(f"No new data for the subscription to KPI {kpi_id}")
            except (Exception) as e:
                LOGGER.debug(f"Subscription data cannot be retrieved. {e}")
    
        def get_alarm_data(self, alarm_queue, kpi_id, kpiMinValue, kpiMaxValue, inRange=True, includeMinValue=True, includeMaxValue=True,
                           subscription_frequency_ms=1000):
            try:
                end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000
                start_date = end_date - subscription_frequency_ms / 1000
                query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')"
                if self.postgre:
                    kpi_list = self.run_query_postgre(query)
                else:
                    kpi_list = self.run_query(query)
                if kpi_list:
                    LOGGER.debug(f"New data received for alarm of KPI {kpi_id}")
                    for kpi in kpi_list:
                        alarm = False
                        kpi_value = kpi[2]
                        if (kpiMinValue == kpi_value and kpiMaxValue == kpi_value and inRange):
                            alarm = True
                        elif (
                                inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue):
                            if (kpi_value >= kpiMinValue and kpi_value <= kpiMaxValue):
                                alarm = True
                        elif (
                                inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue):
                            if (kpi_value >= kpiMinValue and kpi_value < kpiMaxValue):
                                alarm = True
                        elif (
                                inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue):
                            if (kpi_value > kpiMinValue and kpi_value <= kpiMaxValue):
                                alarm = True
                        elif (
                                inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue):
                            if (kpi_value > kpiMinValue and kpi_value < kpiMaxValue):
                                alarm = True
                        elif (
                                not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue):
                            if (kpi_value <= kpiMinValue or kpi_value >= kpiMaxValue):
                                alarm = True
                        elif (
                                not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue):
                            if (kpi_value <= kpiMinValue or kpi_value > kpiMaxValue):
                                alarm = True
                        elif (
                                not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue):
                            if (kpi_value < kpiMinValue or kpi_value >= kpiMaxValue):
                                alarm = True
                        elif (
                                not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue):
                            if (kpi_value < kpiMinValue or kpi_value > kpiMaxValue):
                                alarm = True
                        elif (inRange and kpiMinValue is not None and kpiMaxValue is None and includeMinValue):
                            if (kpi_value >= kpiMinValue):
                                alarm = True
                        elif (inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue):
                            if (kpi_value > kpiMinValue):
                                alarm = True
                        elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue):
                            if (kpi_value <= kpiMinValue):
                                alarm = True
                        elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue):
                            if (kpi_value <= kpiMinValue):
                                alarm = True
                        elif (inRange and kpiMinValue is None and kpiMaxValue is not None and includeMaxValue):
                            if (kpi_value <= kpiMaxValue):
                                alarm = True
                        elif (inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue):
                            if (kpi_value < kpiMaxValue):
                                alarm = True
                        elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue):
                            if (kpi_value >= kpiMaxValue):
                                alarm = True
                        elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue):
                            if (kpi_value >= kpiMaxValue):
                                alarm = True
                        if alarm:
                            # queue.append[kpi]
                            alarm_queue.put_nowait(kpi)
                            LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}")
                    else:
                        LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}")
            except (Exception) as e:
                LOGGER.debug(f"Alarm data cannot be retrieved. {e}")