From 3d96ecc65e3a1e978c77f4ebd7f36980892387b2 Mon Sep 17 00:00:00 2001 From: sergonzalezdiaz <sergio.gonzalez.diaz@atos.net> Date: Fri, 7 Oct 2022 13:17:19 +0200 Subject: [PATCH] Last commit --- src/monitoring/requirements.in | 1 + src/monitoring/service/AlarmManager.py | 32 ++ src/monitoring/service/MetricsDBTools.py | 319 ++++++++++++++---- src/monitoring/service/SubscriptionManager.py | 131 +++++++ src/monitoring/tests/test_unitary.py | 7 +- 5 files changed, 418 insertions(+), 72 deletions(-) diff --git a/src/monitoring/requirements.in b/src/monitoring/requirements.in index 50f283a19..c07f0c8f4 100644 --- a/src/monitoring/requirements.in +++ b/src/monitoring/requirements.in @@ -17,6 +17,7 @@ redis==4.1.2 requests==2.27.1 xmltodict==0.12.0 questdb==1.0.1 +psycopg2-binary==2.9.3 # pip's dependency resolver does not take into account installed packages. # p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one diff --git a/src/monitoring/service/AlarmManager.py b/src/monitoring/service/AlarmManager.py index e69de29bb..dc1b02b03 100644 --- a/src/monitoring/service/AlarmManager.py +++ b/src/monitoring/service/AlarmManager.py @@ -0,0 +1,32 @@ +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.executors.pool import ProcessPoolExecutor +from apscheduler.jobstores.base import JobLookupError +from datetime import datetime +import time +import logging + +LOGGER = logging.getLogger(__name__) + +class AlarmManager(): + def __init__(self, metrics_db): + self.metrics_db = metrics_db + self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) + self.scheduler.start() + LOGGER.info("Alarm Manager Initialized") + + def create_alarm(self, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms, subscription_timeout_s=None): + start_date=None + end_date=None + if subscription_timeout_s: + start_timestamp=time.time() + start_date=datetime.fromtimestamp(start_timestamp) + end_date=datetime.fromtimestamp(start_timestamp+subscription_timeout_s) + self.scheduler.add_job(self.metrics_db.get_alarm_data, args=(kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, end_date=end_date, id=alarm_id) + LOGGER.debug(f"Alarm job {alarm_id} succesfully created") + + def delete_alarm(self, alarm_id): + try: + self.scheduler.remove_job(alarm_id) + LOGGER.debug(f"Alarm job {alarm_id} succesfully deleted") + except (Exception, JobLookupError) as e: + LOGGER.debug(f"Alarm job {alarm_id} does not exists") diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index 522a7c211..00a57e1e7 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -12,83 +12,260 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time +from random import random + from questdb.ingress import Sender, IngressError import requests import json import logging import datetime -from common.tools.timestamp.Converters import timestamp_float_to_string +from common.tools.timestamp.Converters import timestamp_float_to_string, timestamp_utcnow_to_float +import psycopg2 LOGGER = logging.getLogger(__name__) + class MetricsDB(): - def __init__(self, host, ilp_port, rest_port, table): - self.host=host - self.ilp_port=int(ilp_port) - self.rest_port=rest_port - self.table=table - self.create_table() - - def write_KPI(self,time,kpi_id,kpi_sample_type,device_id,endpoint_id,service_id,kpi_value): - counter=0 - number_of_retries=10 - while (counter<number_of_retries): - try: - with Sender(self.host, self.ilp_port) as sender: - sender.row( - self.table, - symbols={ - 'kpi_id': kpi_id, - 'kpi_sample_type': kpi_sample_type, - 'device_id': device_id, - 'endpoint_id': endpoint_id, - 'service_id': service_id}, - columns={ - 'kpi_value': kpi_value}, - at=datetime.datetime.fromtimestamp(time)) - sender.flush() - counter=number_of_retries - LOGGER.info(f"KPI written") - except IngressError as ierr: - # LOGGER.info(ierr) - # LOGGER.info(f"Retry number {counter}") - counter=counter+1 - - - def run_query(self, sql_query): - query_params = {'query': sql_query, 'fmt' : 'json'} - url = f"http://{self.host}:{self.rest_port}/exec" - response = requests.get(url, params=query_params) - json_response = json.loads(response.text) - LOGGER.info(f"Query executed, result:{json_response}") - return json_response - - def create_table(self): - query = f'CREATE TABLE IF NOT EXISTS {self.table}'\ - '(kpi_id SYMBOL,'\ - 'kpi_sample_type SYMBOL,'\ - 'device_id SYMBOL,'\ - 'endpoint_id SYMBOL,'\ - 'service_id SYMBOL,'\ - 'timestamp TIMESTAMP,'\ - 'kpi_value DOUBLE)'\ - 'TIMESTAMP(timestamp);' - self.run_query(query) - LOGGER.info(f"Table {self.table} created") - - def get_subscription_data(self, subs_queue, kpi_id, end_date, sampling_interval_s, callback_return): - str_end_date = str(end_date.isoformat()) + 'Z' - print("str_end_date: " + str_end_date) - start_date = end_date - datetime.timedelta(seconds=sampling_interval_s) - str_start_date = str(start_date.isoformat()) + 'Z' - print("str_start_date: " + str(str_start_date)) - # query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '2022-09-28T07:21:26.595586Z' AND '2022-09-28T07:32:34.197792Z')" - query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{str_start_date}' AND '{str_end_date}')" - response=self.run_query(query) - kpi_list=response['dataset'] - # print(kpi_list) - subs_queue.put_nowait(kpi_list) - # return kpi_list - if callback_return: - callback_return(kpi_list) - + def __init__(self, host, ilp_port=9009, rest_port=9000, table="monitoring", commit_lag_ms=1000, retries=10, + postgre=False, postgre_port=8812, postgre_user='admin', postgre_password='quest'): + try: + self.host = host + self.ilp_port = int(ilp_port) + self.rest_port = rest_port + self.table = table + self.commit_lag_ms = commit_lag_ms + self.retries = retries + self.postgre = postgre + self.postgre_port = postgre_port + self.postgre_user = postgre_user + self.postgre_password = postgre_password + self.create_table() + LOGGER.info("MetricsDB initialized") + except: + LOGGER.info("MetricsDB cannot be initialized") + raise Exception("Critical error in the monitoring component") + + def is_postgre_enabled(self): + LOGGER.info(f"PostgreSQL is {self.postgre}") + return self.postgre + + def get_retry_number(self): + LOGGER.info(f"Retry number is {self.retries}") + return self.retries + + def get_commit_lag(self): + LOGGER.info(f"Commit lag of monitoring queries is {self.commit_lag_ms} ms") + return self.commit_lag_ms + + def enable_postgre_mode(self): + self.postgre = True + LOGGER.info("MetricsDB PostgreSQL query mode enabled") + + def disable_postgre_mode(self): + self.postgre = False + LOGGER.info("MetricsDB REST query mode enabled") + + def set_postgre_credentials(self, user, password): + self.postgre_user = user + self.postgre_password = password + LOGGER.info("MetricsDB PostgreSQL credentials changed") + + def set_retry_number(self, retries): + self.retries = retries + LOGGER.info(f"Retriy number changed to {retries}") + + def set_commit_lag(self, commit_lag_ms): + self.commit_lag_ms = commit_lag_ms + LOGGER.info(f"Commit lag of monitoring queries changed to {commit_lag_ms} ms") + + def create_table(self): + try: + query = f'CREATE TABLE IF NOT EXISTS {self.table}' \ + '(kpi_id SYMBOL,' \ + 'kpi_sample_type SYMBOL,' \ + 'device_id SYMBOL,' \ + 'endpoint_id SYMBOL,' \ + 'service_id SYMBOL,' \ + 'timestamp TIMESTAMP,' \ + 'kpi_value DOUBLE)' \ + 'TIMESTAMP(timestamp);' + result = self.run_query(query) + if (result == True): + LOGGER.info(f"Table {self.table} created") + except (Exception) as e: + LOGGER.debug(f"Table {self.table} cannot be created. {e}") + raise Exception + + def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, kpi_value): + counter = 0 + while (counter < self.retries): + try: + with Sender(self.host, self.ilp_port) as sender: + sender.row( + self.table, + symbols={ + 'kpi_id': kpi_id, + 'kpi_sample_type': kpi_sample_type, + 'device_id': device_id, + 'endpoint_id': endpoint_id, + 'service_id': service_id}, + columns={ + 'kpi_value': kpi_value}, + at=datetime.datetime.fromtimestamp(time)) + sender.flush() + counter = self.retries + LOGGER.debug(f"KPI written in the MetricsDB") + except (Exception, IngressError) as e: + counter = counter + 1 + if counter == self.retries: + raise Exception(f"Maximum number of retries achieved: {self.retries}") + + def run_query(self, sql_query): + counter = 0 + while (counter < self.retries): + try: + query_params = {'query': sql_query, 'fmt': 'json'} + url = f"http://{self.host}:{self.rest_port}/exec" + response = requests.get(url, params=query_params) + json_response = json.loads(response.text) + if ('ddl' in json_response): + LOGGER.debug(f"REST query executed succesfully, result: {json_response['ddl']}") + counter = self.retries + return True + elif ('dataset' in json_response): + LOGGER.debug(f"REST query executed, result: {json_response['dataset']}") + counter = self.retries + return json_response['dataset'] + except (Exception, requests.exceptions.RequestException) as e: + counter = counter + 1 + if counter == self.retries: + raise Exception(f"Maximum number of retries achieved: {self.retries}") + + def run_query_postgre(self, postgre_sql_query): + connection = None + cursor = None + counter = 0 + while (counter < self.retries): + try: + connection = psycopg2.connect( + user=self.postgre_user, + password=self.postgre_password, + host=self.host, + port=self.postgre_port, + database=self.table) + cursor = connection.cursor() + cursor.execute(postgre_sql_query) + result = cursor.fetchall() + LOGGER.debug(f"PostgreSQL query executed, result: {result}") + counter = self.retries + return result + except (Exception, psycopg2.Error) as e: + counter = counter + 1 + if counter == self.retries: + raise Exception(f"Maximum number of retries achieved: {self.retries}") + finally: + if cursor: + cursor.close() + if connection: + connection.close() + + def get_subscription_data(self,subs_queue, kpi_id, sampling_interval_s=1): + try: + end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000 + start_date = end_date - sampling_interval_s + query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + if self.postgre: + kpi_list = self.run_query_postgre(query) + LOGGER.debug(f"kpi_list postgre: {kpi_list}") + else: + kpi_list = self.run_query(query) + LOGGER.debug(f"kpi_list influx: {kpi_list}") + if kpi_list: + subs_queue.put_nowait(kpi_list) + LOGGER.debug(f"New data received for subscription to KPI {kpi_id}") + else: + LOGGER.debug(f"No new data for the subscription to KPI {kpi_id}") + except (Exception) as e: + LOGGER.debug(f"Subscription data cannot be retrieved. {e}") + + def get_alarm_data(self, kpi_id, kpiMinValue, kpiMaxValue, inRange=True, includeMinValue=True, includeMaxValue=True, + subscription_frequency_ms=1000): + try: + end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000 + start_date = end_date - subscription_frequency_ms / 1000 + query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + if self.postgre: + kpi_list = self.run_query_postgre(query) + else: + kpi_list = self.run_query(query) + if kpi_list: + LOGGER.debug(f"New data received for alarm of KPI {kpi_id}") + for kpi in kpi_list: + alarm = False + kpi_value = kpi[2] + if (kpiMinValue == kpi_value and kpiMaxValue == kpi_value and inRange): + alarm = True + elif ( + inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue): + if (kpi_value >= kpiMinValue and kpi_value <= kpiMaxValue): + alarm = True + elif ( + inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): + if (kpi_value >= kpiMinValue and kpi_value < kpiMaxValue): + alarm = True + elif ( + inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): + if (kpi_value > kpiMinValue and kpi_value <= kpiMaxValue): + alarm = True + elif ( + inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): + if (kpi_value > kpiMinValue and kpi_value < kpiMaxValue): + alarm = True + elif ( + not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue): + if (kpi_value <= kpiMinValue or kpi_value >= kpiMaxValue): + alarm = True + elif ( + not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): + if (kpi_value <= kpiMinValue or kpi_value > kpiMaxValue): + alarm = True + elif ( + not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): + if (kpi_value < kpiMinValue or kpi_value >= kpiMaxValue): + alarm = True + elif ( + not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): + if (kpi_value < kpiMinValue or kpi_value > kpiMaxValue): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is None and includeMinValue): + if (kpi_value >= kpiMinValue): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): + if (kpi_value > kpiMinValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): + if (kpi_value <= kpiMinValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): + if (kpi_value <= kpiMinValue): + alarm = True + elif (inRange and kpiMinValue is None and kpiMaxValue is not None and includeMaxValue): + if (kpi_value <= kpiMaxValue): + alarm = True + elif (inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): + if (kpi_value < kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): + if (kpi_value >= kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): + if (kpi_value >= kpiMaxValue): + alarm = True + if alarm: + # queue.append[kpi] + LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") + else: + LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") + except (Exception) as e: + LOGGER.debug(f"Alarm data cannot be retrieved. {e}") \ No newline at end of file diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index e133cd914..2a5b05cfe 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -28,8 +28,139 @@ class SubscriptionManager(): start_date=None end_date=None print("Inside create subscription") + if sampling_duration_s: + if not start_timestamp:# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from queue import Queue +from random import random + +import pytz +from apscheduler.schedulers.background import BackgroundScheduler + +from common.proto.monitoring_pb2 import Kpi, KpiList +from common.tools.timestamp import Converters +from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_float_to_string +from datetime import datetime +import time + +from monitoring.service import MetricsDBTools +from monitoring.service.ManagementDBTools import ManagementDB +from monitoring.service.MetricsDBTools import MetricsDB + +LOGGER = logging.getLogger(__name__) + +class SubscriptionManager(): + def __init__(self, metrics_db): + self.metrics_db = metrics_db + self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) + self.scheduler.start() + + def create_subscription(self,subs_queue, subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): + start_date = None + end_date = None if sampling_duration_s: if not start_timestamp: + start_timestamp = time.time() + end_timestamp = start_timestamp + sampling_duration_s + if start_timestamp: + start_date = datetime.utcfromtimestamp(start_timestamp).isoformat() + if end_timestamp: + end_date = datetime.utcfromtimestamp(end_timestamp).isoformat() + + LOGGER.debug(f"kpi_id: {kpi_id}") + LOGGER.debug(f"sampling_interval_s: {sampling_interval_s}") + LOGGER.debug(f"subscription_id: {subscription_id}") + LOGGER.debug(f"start_date: {start_date}") + self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s), + trigger='interval', seconds=sampling_interval_s, start_date=start_date, + end_date=end_date, timezone=pytz.utc, id=str(subscription_id)) + LOGGER.debug(f"Subscrition job {subscription_id} succesfully created") + + def delete_subscription(self, subscription_id): + self.scheduler.remove_job(subscription_id) + + +def ingest_data(): + + metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring") + + kpi_id = "2" + + for i in range(200): + timestamp = timestamp_utcnow_to_float() + kpitype = "KPISAMPLETYPE_PACKETS_TRANSMITTED" + deviceId = "DEV01" + endpointId = "END01" + serviceId = "SERV01" + kpi_value = 50 * random() + metrics_db.write_KPI(timestamp, kpi_id, kpitype, deviceId, endpointId, serviceId, kpi_value) + time.sleep(0.01) + +def scheduler_test(): + scheduler = BackgroundScheduler() + + subs_queue = Queue() + + managementdb = ManagementDB("monitoring.db") + metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring") + subs_manager = SubscriptionManager(metrics_db) + + scheduler.start() + scheduler.add_job(ingest_data) + + kpi_id = "2" + sampling_duration_s = 20 + sampling_interval_s = 3 + real_start_time = timestamp_utcnow_to_float() + start_timestamp = real_start_time + end_timestamp = start_timestamp + sampling_duration_s + + subs_id = managementdb.insert_subscription(kpi_id, "localhost", sampling_duration_s, + sampling_interval_s,start_timestamp,end_timestamp) + subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s, + sampling_duration_s,start_timestamp,end_timestamp) + + try: + # This is here to simulate application activity (which keeps the main thread alive). + i = 0 + while True: + # time.sleep(sampling_interval_s) + while not subs_queue.empty(): + list = subs_queue.get_nowait() + # print(f"List: {list}") + kpi_list = KpiList() + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = item[0] + kpi.timestamp.timestamp = Converters.timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] + kpi_list.kpi.append(kpi) + i += 1 + print("Kpi List: " + str(kpi_list) + " " + str(i)) + if timestamp_utcnow_to_float() > end_timestamp: + print(f"Total metrics: {i}") + break + + except (KeyboardInterrupt, SystemExit): + # Not strictly necessary if daemonic mode is enabled but should be done if possible + scheduler.shutdown() + + +if __name__ == '__main__': + scheduler_test() start_timestamp=time.time() end_timestamp=start_timestamp+sampling_duration_s print("end_timestamp: " + timestamp_float_to_string(end_timestamp)) diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index 529fd4335..c0bbae534 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -28,7 +28,7 @@ from common.message_broker.Factory import get_messagebroker_backend, BackendEnum from common.message_broker.MessageBroker import MessageBroker from common.proto import monitoring_pb2 from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \ - AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse + AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse from context.client.ContextClient import ContextClient from context.service.grpc_server.ContextService import ContextService @@ -268,6 +268,9 @@ def test_set_kpi_subscription(monitoring_client): # pylint: disable=redefined-ou response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) LOGGER.debug(type(response)) assert isinstance(response, _MultiThreadedRendezvous) + # for item in response: + # assert isinstance(item, SubsResponse) + # Test case that makes use of client fixture to test server's GetSubsDescriptor method def test_get_subs_descriptor(monitoring_client): @@ -318,6 +321,8 @@ def test_get_alarm_response_stream(monitoring_client): response = monitoring_client.GetAlarmResponseStream(alarm_descriptor()) LOGGER.debug(type(response)) assert isinstance(response, _MultiThreadedRendezvous) + # for item in response: + # assert isinstance(item,AlarmResponse) # Test case that makes use of client fixture to test server's DeleteAlarm method -- GitLab