# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from random import random
from questdb.ingress import Sender, IngressError
import requests
import json
import logging
import datetime
from common.tools.timestamp.Converters import timestamp_float_to_string, timestamp_utcnow_to_float
import psycopg2
LOGGER = logging.getLogger(__name__)
class MetricsDB():
def __init__(self, host, ilp_port=9009, rest_port=9000, table="monitoring", commit_lag_ms=1000, retries=10,
postgre=False, postgre_port=8812, postgre_user='admin', postgre_password='quest'):
try:
self.host = host
self.ilp_port = int(ilp_port)
self.rest_port = rest_port
self.table = table
self.commit_lag_ms = commit_lag_ms
self.retries = retries
self.postgre = postgre
self.postgre_port = postgre_port
self.postgre_user = postgre_user
self.postgre_password = postgre_password
self.create_table()
LOGGER.info("MetricsDB initialized")
except:
LOGGER.info("MetricsDB cannot be initialized")
raise Exception("Critical error in the monitoring component")
def is_postgre_enabled(self):
LOGGER.info(f"PostgreSQL is {self.postgre}")
return self.postgre
def get_retry_number(self):
LOGGER.info(f"Retry number is {self.retries}")
return self.retries
def get_commit_lag(self):
LOGGER.info(f"Commit lag of monitoring queries is {self.commit_lag_ms} ms")
return self.commit_lag_ms
def enable_postgre_mode(self):
self.postgre = True
LOGGER.info("MetricsDB PostgreSQL query mode enabled")
def disable_postgre_mode(self):
self.postgre = False
LOGGER.info("MetricsDB REST query mode enabled")
def set_postgre_credentials(self, user, password):
self.postgre_user = user
self.postgre_password = password
LOGGER.info("MetricsDB PostgreSQL credentials changed")
def set_retry_number(self, retries):
self.retries = retries
LOGGER.info(f"Retriy number changed to {retries}")
def set_commit_lag(self, commit_lag_ms):
self.commit_lag_ms = commit_lag_ms
LOGGER.info(f"Commit lag of monitoring queries changed to {commit_lag_ms} ms")
def create_table(self):
try:
query = f'CREATE TABLE IF NOT EXISTS {self.table}' \
'(kpi_id SYMBOL,' \
'kpi_sample_type SYMBOL,' \
'device_id SYMBOL,' \
'endpoint_id SYMBOL,' \
'service_id SYMBOL,' \
'slice_id SYMBOL,' \
'connection_id SYMBOL,' \
'timestamp TIMESTAMP,' \
'kpi_value DOUBLE)' \
'TIMESTAMP(timestamp);'
result = self.run_query(query)
if (result == True):
LOGGER.info(f"Table {self.table} created")
except (Exception) as e:
LOGGER.debug(f"Table {self.table} cannot be created. {e}")
raise Exception
def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, slice_id, connection_id, kpi_value):
counter = 0
while (counter < self.retries):
try:
with Sender(self.host, self.ilp_port) as sender:
sender.row(
self.table,
symbols={
'kpi_id': kpi_id,
'kpi_sample_type': kpi_sample_type,
'device_id': device_id,
'endpoint_id': endpoint_id,
'service_id': service_id,
'slice_id': slice_id,
'connection_id': connection_id,},
columns={
'kpi_value': kpi_value},
at=datetime.datetime.fromtimestamp(time))
sender.flush()
counter = self.retries
LOGGER.debug(f"KPI written in the MetricsDB")
except (Exception, IngressError) as e:
counter = counter + 1
if counter == self.retries:
raise Exception(f"Maximum number of retries achieved: {self.retries}")
def run_query(self, sql_query):
counter = 0
while (counter < self.retries):
try:
query_params = {'query': sql_query, 'fmt': 'json'}
url = f"http://{self.host}:{self.rest_port}/exec"
response = requests.get(url, params=query_params)
json_response = json.loads(response.text)
if ('ddl' in json_response):
LOGGER.debug(f"REST query executed succesfully, result: {json_response['ddl']}")
counter = self.retries
return True
elif ('dataset' in json_response):
LOGGER.debug(f"REST query executed, result: {json_response['dataset']}")
counter = self.retries
return json_response['dataset']
except (Exception, requests.exceptions.RequestException) as e:
counter = counter + 1
if counter == self.retries:
raise Exception(f"Maximum number of retries achieved: {self.retries}")
def run_query_postgre(self, postgre_sql_query):
connection = None
cursor = None
counter = 0
while (counter < self.retries):
try:
connection = psycopg2.connect(
user=self.postgre_user,
password=self.postgre_password,
host=self.host,
port=self.postgre_port,
database=self.table)
cursor = connection.cursor()
cursor.execute(postgre_sql_query)
result = cursor.fetchall()
LOGGER.debug(f"PostgreSQL query executed, result: {result}")
counter = self.retries
return result
except (Exception, psycopg2.Error) as e:
counter = counter + 1
if counter == self.retries:
raise Exception(f"Maximum number of retries achieved: {self.retries}")
finally:
if cursor:
cursor.close()
if connection:
connection.close()
def get_subscription_data(self,subs_queue, kpi_id, sampling_interval_s=1):
try:
end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000
start_date = end_date - sampling_interval_s
query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')"
if self.postgre:
kpi_list = self.run_query_postgre(query)
LOGGER.debug(f"kpi_list postgre: {kpi_list}")
else:
kpi_list = self.run_query(query)
LOGGER.debug(f"kpi_list influx: {kpi_list}")
if kpi_list:
subs_queue.put_nowait(kpi_list)
LOGGER.debug(f"New data received for subscription to KPI {kpi_id}")
else:
LOGGER.debug(f"No new data for the subscription to KPI {kpi_id}")
except (Exception) as e:
LOGGER.debug(f"Subscription data cannot be retrieved. {e}")
def get_alarm_data(self, alarm_queue, kpi_id, kpiMinValue, kpiMaxValue, inRange=True, includeMinValue=True, includeMaxValue=True,
subscription_frequency_ms=1000):
try:
end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000
start_date = end_date - subscription_frequency_ms / 1000
query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')"
if self.postgre:
kpi_list = self.run_query_postgre(query)
else:
kpi_list = self.run_query(query)
if kpi_list:
LOGGER.debug(f"New data received for alarm of KPI {kpi_id}")
LOGGER.info(kpi_list)
valid_kpi_list = []
for kpi in kpi_list:
alarm = False
kpi_value = kpi[2]
if (kpiMinValue == kpi_value and kpiMaxValue == kpi_value and inRange):
alarm = True
elif (
inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue):
if (kpi_value >= kpiMinValue and kpi_value <= kpiMaxValue):
alarm = True
elif (
inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue):
if (kpi_value >= kpiMinValue and kpi_value < kpiMaxValue):
alarm = True
elif (
inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue):
if (kpi_value > kpiMinValue and kpi_value <= kpiMaxValue):
alarm = True
elif (
inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue):
if (kpi_value > kpiMinValue and kpi_value < kpiMaxValue):
alarm = True
elif (
not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue):
if (kpi_value <= kpiMinValue or kpi_value >= kpiMaxValue):
alarm = True
elif (
not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue):
if (kpi_value <= kpiMinValue or kpi_value > kpiMaxValue):
alarm = True
elif (
not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue):
if (kpi_value < kpiMinValue or kpi_value >= kpiMaxValue):
alarm = True
elif (
not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue):
if (kpi_value < kpiMinValue or kpi_value > kpiMaxValue):
alarm = True
elif (inRange and kpiMinValue is not None and kpiMaxValue is None and includeMinValue):
if (kpi_value >= kpiMinValue):
alarm = True
elif (inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue):
if (kpi_value > kpiMinValue):
alarm = True
elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue):
if (kpi_value <= kpiMinValue):
alarm = True
elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue):
if (kpi_value <= kpiMinValue):
alarm = True
elif (inRange and kpiMinValue is None and kpiMaxValue is not None and includeMaxValue):
if (kpi_value <= kpiMaxValue):
alarm = True
elif (inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue):
if (kpi_value < kpiMaxValue):
alarm = True
elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue):
if (kpi_value >= kpiMaxValue):
alarm = True
elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue):
if (kpi_value >= kpiMaxValue):
alarm = True
if alarm:
valid_kpi_list.append(kpi)
alarm_queue.put_nowait(valid_kpi_list)
LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}")
else:
LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}")
except (Exception) as e:
LOGGER.debug(f"Alarm data cannot be retrieved. {e}")