Skip to content
Snippets Groups Projects
Commit e3b44b47 authored by Francisco-Javier Moreno-Muro's avatar Francisco-Javier Moreno-Muro
Browse files

Intial commit of the new branch feat/monitoring

parent 07965010
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!14Feat/monitoring major functional updates
......@@ -19,24 +19,24 @@ import "context.proto";
import "kpi_sample_types.proto";
service MonitoringService {
rpc SetKpi (KpiDescriptor ) returns (KpiId ) {}
rpc DeleteKpi (KpiId ) returns (context.Empty ) {}
rpc GetKpiDescriptor (KpiId ) returns (KpiDescriptor ) {}
rpc GetKpiDescriptorList (context.Empty ) returns (KpiDescriptorList ) {}
rpc IncludeKpi (Kpi ) returns (context.Empty ) {}
rpc MonitorKpi (MonitorKpiRequest ) returns (context.Empty ) {}
rpc QueryKpiData (KpiQuery ) returns (KpiList ) {}
rpc SetKpiSubscription (SubsDescriptor ) returns (stream KpiList ) {}
rpc GetSubsDescriptor (SubscriptionID ) returns (SubsDescriptor ) {}
rpc GetSubscriptions (context.Empty ) returns (SubsIDList ) {}
rpc DeleteSubscription (SubscriptionID ) returns (context.Empty ) {}
rpc SetKpiAlarm (AlarmDescriptor ) returns (AlarmID ) {}
rpc GetAlarms (context.Empty ) returns (AlarmIDList ) {}
rpc GetAlarmDescriptor (AlarmID ) returns (AlarmDescriptor ) {}
rpc GetAlarmResponseStream(AlarmSubscription ) returns (stream AlarmResponse) {}
rpc DeleteAlarm (AlarmID ) returns (context.Empty ) {}
rpc GetStreamKpi (KpiId ) returns (stream Kpi ) {}
rpc GetInstantKpi (KpiId ) returns (KpiList ) {}
rpc SetKpi (KpiDescriptor ) returns (KpiId ) {} // Stable not final
rpc DeleteKpi (KpiId ) returns (context.Empty ) {} // Stable and final
rpc GetKpiDescriptor (KpiId ) returns (KpiDescriptor ) {} // Stable and final
rpc GetKpiDescriptorList (context.Empty ) returns (KpiDescriptorList ) {} // Stable and final
rpc IncludeKpi (Kpi ) returns (context.Empty ) {} // Stable and final
rpc MonitorKpi (MonitorKpiRequest ) returns (context.Empty ) {} // Stable and final
rpc QueryKpiData (KpiQuery ) returns (KpiList ) {} // Not implemented
rpc SetKpiSubscription (SubsDescriptor ) returns (stream SubsResponse ) {} // Stable not final
rpc GetSubsDescriptor (SubscriptionID ) returns (SubsDescriptor ) {} // Stable and final
rpc GetSubscriptions (context.Empty ) returns (SubsList ) {} // Stable and final
rpc DeleteSubscription (SubscriptionID ) returns (context.Empty ) {} // Stable and final
rpc SetKpiAlarm (AlarmDescriptor ) returns (AlarmID ) {} // Stable not final
rpc GetAlarms (context.Empty ) returns (AlarmList ) {} // Stable and final
rpc GetAlarmDescriptor (AlarmID ) returns (AlarmDescriptor ) {} // Stable and final
rpc GetAlarmResponseStream(AlarmSubscription ) returns (stream AlarmResponse) {} // Not Stable not final
rpc DeleteAlarm (AlarmID ) returns (context.Empty ) {} // Stable and final
rpc GetStreamKpi (KpiId ) returns (stream Kpi ) {} // Stable not final
rpc GetInstantKpi (KpiId ) returns (Kpi ) {} // Stable not final
}
message KpiDescriptor {
......@@ -58,7 +58,7 @@ message MonitorKpiRequest {
}
message KpiQuery {
repeated KpiId kpi_id = 1;
KpiId kpi_id = 1;
float monitoring_window_s = 2;
float sampling_rate_s = 3;
uint32 last_n_samples = 4; // used when you want something like "get the last N many samples
......@@ -99,7 +99,7 @@ message KpiValue {
message KpiList {
repeated Kpi kpi_list = 1;
repeated Kpi kpi = 1;
}
message KpiDescriptorList {
......@@ -122,19 +122,19 @@ message SubscriptionID {
message SubsResponse {
SubscriptionID subs_id = 1;
repeated KpiList kpi_list = 2;
KpiList kpi_list = 2;
}
message SubsIDList {
repeated SubscriptionID subs_list = 1;
message SubsList {
repeated SubsDescriptor subs_descriptor = 1;
}
message AlarmDescriptor {
AlarmID alarm_id = 1;
string alarm_description = 2;
string name = 3;
repeated KpiId kpi_id = 4;
repeated KpiValueRange kpi_value_range = 5;
KpiId kpi_id = 4;
KpiValueRange kpi_value_range = 5;
context.Timestamp timestamp = 6;
}
......@@ -143,7 +143,7 @@ message AlarmID{
}
message AlarmSubscription{
AlarmID alarmID = 1;
AlarmID alarm_id = 1;
float subscription_timeout_s = 2;
float subscription_frequency_ms = 3;
}
......@@ -151,10 +151,9 @@ message AlarmSubscription{
message AlarmResponse {
AlarmID alarm_id = 1;
string text = 2;
KpiValue kpi_value = 3;
context.Timestamp timestamp = 4;
KpiList kpi_list = 3;
}
message AlarmIDList {
repeated AlarmID alarm_list = 1;
message AlarmList {
repeated AlarmDescriptor alarm_descriptor = 1;
}
......@@ -21,8 +21,8 @@ from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.proto.context_pb2 import Empty
from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest, \
KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsIDList, \
AlarmDescriptor, AlarmID, AlarmIDList, AlarmResponse, AlarmSubscription
KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsList, \
SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription
from common.proto.monitoring_pb2_grpc import MonitoringServiceStub
LOGGER = logging.getLogger(__name__)
......@@ -100,10 +100,10 @@ class MonitoringClient:
return response
@RETRY_DECORATOR
def SubscribeKpi(self, request : SubsDescriptor) -> Iterator[KpiList]:
LOGGER.debug('SubscribeKpi: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.SubscribeKpi(request)
LOGGER.debug('SubscribeKpi result: {:s}'.format(grpc_message_to_json_string(response)))
def SetKpiSubscription(self, request : SubsDescriptor) -> Iterator[SubsResponse]:
LOGGER.debug('SetKpiSubscription: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.SetKpiSubscription(request)
LOGGER.debug('SetKpiSubscription result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
......@@ -114,7 +114,7 @@ class MonitoringClient:
return response
@RETRY_DECORATOR
def GetSubscriptions(self, request : Empty) -> SubsIDList:
def GetSubscriptions(self, request : Empty) -> SubsList:
LOGGER.debug('GetSubscriptions: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetSubscriptions(request)
LOGGER.debug('GetSubscriptions result: {:s}'.format(grpc_message_to_json_string(response)))
......@@ -135,7 +135,7 @@ class MonitoringClient:
return response
@RETRY_DECORATOR
def GetAlarms(self, request : Empty) -> AlarmIDList:
def GetAlarms(self, request : Empty) -> AlarmList:
LOGGER.debug('GetAlarms: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetAlarms(request)
LOGGER.debug('GetAlarms result: {:s}'.format(grpc_message_to_json_string(response)))
......
......@@ -17,6 +17,7 @@ redis==4.1.2
requests==2.27.1
xmltodict==0.12.0
questdb==1.0.1
psycopg2-binary==2.9.3
# pip's dependency resolver does not take into account installed packages.
# p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one
......
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.jobstores.base import JobLookupError
from datetime import datetime
import time
import logging
LOGGER = logging.getLogger(__name__)
class AlarmManager():
def __init__(self, metrics_db):
self.metrics_db = metrics_db
self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
self.scheduler.start()
LOGGER.info("Alarm Manager Initialized")
def create_alarm(self, alarm_queue,alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms, subscription_timeout_s=None):
start_date=None
end_date=None
if subscription_timeout_s:
start_timestamp=time.time()
start_date=datetime.fromtimestamp(start_timestamp)
end_date=datetime.fromtimestamp(start_timestamp+subscription_timeout_s)
self.scheduler.add_job(self.metrics_db.get_alarm_data, args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, end_date=end_date, id=alarm_id)
LOGGER.debug(f"Alarm job {alarm_id} succesfully created")
def delete_alarm(self, alarm_id):
try:
self.scheduler.remove_job(alarm_id)
LOGGER.debug(f"Alarm job {alarm_id} succesfully deleted")
except (Exception, JobLookupError) as e:
LOGGER.debug(f"Alarm job {alarm_id} does not exists")
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sqlite3
import logging
LOGGER = logging.getLogger(__name__)
class ManagementDB():
def __init__(self, database):
try:
self.client = sqlite3.connect(database, check_same_thread=False)
self.create_monitoring_table()
self.create_subscription_table()
self.create_alarm_table()
LOGGER.info("ManagementDB initialized")
except:
LOGGER.info("ManagementDB cannot be initialized")
raise Exception("Critical error in the monitoring component")
def create_monitoring_table(self):
try:
result=self.client.execute("""
CREATE TABLE IF NOT EXISTS kpi(
kpi_id INTEGER PRIMARY KEY AUTOINCREMENT,
kpi_description TEXT,
kpi_sample_type INTEGER,
device_id INTEGER,
endpoint_id INTEGER,
service_id INTEGER
);
""")
LOGGER.debug("KPI table created in the ManagementDB")
except sqlite3.Error as e:
LOGGER.debug(f"KPI table cannot be created in the ManagementD. {e}")
raise Exception
def create_subscription_table(self):
try:
result= self.client.execute("""
CREATE TABLE IF NOT EXISTS subscription(
subs_id INTEGER PRIMARY KEY AUTOINCREMENT,
kpi_id INTEGER,
subscriber TEXT,
sampling_duration_s REAL,
sampling_interval_s REAL,
start_timestamp REAL,
end_timestamp REAL
);
""")
LOGGER.info("Subscription table created in the ManagementDB")
except sqlite3.Error as e:
LOGGER.debug(f"Subscription table cannot be created in the ManagementDB. {e}")
raise Exception
def create_alarm_table(self):
try:
result=self.client.execute("""
CREATE TABLE IF NOT EXISTS alarm(
alarm_id INTEGER PRIMARY KEY AUTOINCREMENT,
alarm_description TEXT,
alarm_name TEXT,
kpi_id INTEGER,
kpi_min_value REAL,
kpi_max_value REAL,
in_range INTEGER,
include_min_value INTEGER,
include_max_value INTEGER
);
""")
LOGGER.info("Alarm table created in the ManagementDB")
except sqlite3.Error as e:
LOGGER.debug(f"Alarm table cannot be created in the ManagementDB. {e}")
raise Exception
def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id):
try:
c = self.client.cursor()
c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id))
data=c.fetchone()
if data is None:
c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id))
self.client.commit()
kpi_id = c.lastrowid
LOGGER.debug(f"KPI {kpi_id} succesfully inserted in the ManagementDB")
return kpi_id
else:
kpi_id = data[0]
LOGGER.debug(f"KPI {kpi_id} already exists")
return kpi_id
except sqlite3.Error as e:
LOGGER.debug("KPI cannot be inserted in the ManagementDB: {e}")
def insert_subscription(self,kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp):
try:
c = self.client.cursor()
c.execute("SELECT subs_id FROM subscription WHERE kpi_id is ? AND subscriber is ? AND sampling_duration_s is ? AND sampling_interval_s is ? AND start_timestamp is ? AND end_timestamp is ?",(kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp))
data=c.fetchone()
if data is None:
c.execute("INSERT INTO subscription (kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp) VALUES (?,?,?,?,?,?)", (kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp))
self.client.commit()
subs_id = c.lastrowid
LOGGER.debug(f"Subscription {subs_id} succesfully inserted in the ManagementDB")
return subs_id
else:
subs_id = data[0]
LOGGER.debug(f"Subscription {subs_id} already exists")
return subs_id
except sqlite3.Error as e:
LOGGER.debug("Subscription cannot be inserted in the ManagementDB: {e}")
def insert_alarm(self,alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value):
try:
c = self.client.cursor()
c.execute("SELECT alarm_id FROM alarm WHERE alarm_description is ? AND alarm_name is ? AND kpi_id is ? AND kpi_min_value is ? AND kpi_max_value is ? AND in_range is ? AND include_min_value is ? AND include_max_value is ?",(alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value))
data=c.fetchone()
if data is None:
c.execute("INSERT INTO alarm (alarm_description, alarm_name, kpi_id, kpi_min_value, kpi_max_value, in_range, include_min_value, include_max_value) VALUES (?,?,?,?,?,?,?,?)", (alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value))
self.client.commit()
alarm_id=c.lastrowid
LOGGER.debug(f"Alarm {alarm_id} succesfully inserted in the ManagementDB")
return alarm_id
else:
alarm_id=data[0]
LOGGER.debug(f"Alarm {alarm_id} already exists")
return alarm_id
except sqlite3.Error as e:
LOGGER.debug(f"Alarm cannot be inserted in the ManagementDB: {e}")
def delete_KPI(self,kpi_id):
try:
c = self.client.cursor()
c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,))
data=c.fetchone()
if data is None:
LOGGER.debug(f"KPI {kpi_id} does not exists")
return False
else:
c.execute("DELETE FROM kpi WHERE kpi_id is ?",(kpi_id,))
self.client.commit()
LOGGER.debug(f"KPI {kpi_id} deleted from the ManagementDB")
return True
except sqlite3.Error as e:
LOGGER.debug(f"KPI cannot be deleted from the ManagementDB: {e}")
def delete_subscription(self,subs_id):
try:
c = self.client.cursor()
c.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,))
data=c.fetchone()
if data is None:
LOGGER.debug(f"Subscription {subs_id} does not exists")
return False
else:
c.execute("DELETE FROM subscription WHERE subs_id is ?",(subs_id,))
self.client.commit()
LOGGER.debug(f"Subscription {subs_id} deleted from the ManagementDB")
return True
except sqlite3.Error as e:
LOGGER.debug(f"Subscription cannot be deleted from the ManagementDB: {e}")
def delete_alarm(self,alarm_id):
try:
c = self.client.cursor()
c.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,))
data=c.fetchone()
if data is None:
LOGGER.debug(f"Alarm {alarm_id} does not exists")
return False
else:
c.execute("DELETE FROM alarm WHERE alarm_id is ?",(alarm_id,))
self.client.commit()
LOGGER.debug(f"Alarm {alarm_id} deleted from the ManagementDB")
return True
except sqlite3.Error as e:
LOGGER.debug(f"Alarm cannot be deleted from the ManagementDB: {e}")
def get_KPI(self,kpi_id):
try:
data = self.client.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)).fetchone()
if data:
LOGGER.debug(f"KPI {kpi_id} succesfully retrieved from the ManagementDB")
return data
else:
LOGGER.debug(f"KPI {kpi_id} does not exists")
return data
except sqlite3.Error as e:
LOGGER.debug(f"KPI {kpi_id} cannot be retrieved from the ManagementDB: {e}")
def get_subscription(self,subs_id):
try:
data = self.client.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,)).fetchone()
if data:
LOGGER.debug(f"Subscription {subs_id} succesfully retrieved from the ManagementDB")
return data
else:
LOGGER.debug(f"Subscription {subs_id} does not exists")
return data
except sqlite3.Error as e:
LOGGER.debug(f"Subscription {subs_id} cannot be retrieved from the ManagementDB: {e}")
def get_alarm(self,alarm_id):
try:
data = self.client.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,)).fetchone()
if data:
LOGGER.debug(f"Alarm {alarm_id} succesfully retrieved from the ManagementDB")
return data
else:
print(data)
LOGGER.debug(f"Alarm {alarm_id} does not exists")
return data
except sqlite3.Error as e:
LOGGER.debug(f"Alarm {alarm_id} cannot be retrieved from the ManagementDB: {e}")
def get_KPIS(self):
try:
data = self.client.execute("SELECT * FROM kpi").fetchall()
LOGGER.debug(f"KPIs succesfully retrieved from the ManagementDB")
return data
except sqlite3.Error as e:
LOGGER.debug(f"KPIs cannot be retrieved from the ManagementDB: {e}")
def get_subscriptions(self):
try:
data = self.client.execute("SELECT * FROM subscription").fetchall()
LOGGER.debug(f"Subscriptions succesfully retrieved from the ManagementDB")
return data
except sqlite3.Error as e:
LOGGER.debug(f"Subscriptions cannot be retrieved from the ManagementDB: {e}")
def get_alarms(self):
try:
data = self.client.execute("SELECT * FROM alarm").fetchall()
LOGGER.debug(f"Alarms succesfully retrieved from the ManagementDB")
return data
except sqlite3.Error as e:
LOGGER.debug(f"Alarms cannot be retrieved from the ManagementDB: {e}")
\ No newline at end of file
......@@ -12,64 +12,261 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from random import random
from questdb.ingress import Sender, IngressError
import requests
import json
import logging
import datetime
from common.tools.timestamp.Converters import timestamp_float_to_string, timestamp_utcnow_to_float
import psycopg2
LOGGER = logging.getLogger(__name__)
class MetricsDB():
def __init__(self, host, ilp_port, rest_port, table):
self.host=host
self.ilp_port=int(ilp_port)
self.rest_port=rest_port
self.table=table
self.create_table()
def write_KPI(self,time,kpi_id,kpi_sample_type,device_id,endpoint_id,service_id,kpi_value):
counter=0
number_of_retries=10
while (counter<number_of_retries):
try:
with Sender(self.host, self.ilp_port) as sender:
sender.row(
self.table,
symbols={
'kpi_id': kpi_id,
'kpi_sample_type': kpi_sample_type,
'device_id': device_id,
'endpoint_id': endpoint_id,
'service_id': service_id},
columns={
'kpi_value': kpi_value},
at=datetime.datetime.fromtimestamp(time))
sender.flush()
counter=number_of_retries
LOGGER.info(f"KPI written")
except IngressError as ierr:
# LOGGER.info(ierr)
# LOGGER.info(f"Ingress Retry number {counter}")
counter=counter+1
def run_query(self, sql_query):
query_params = {'query': sql_query, 'fmt' : 'json'}
url = f"http://{self.host}:{self.rest_port}/exec"
response = requests.get(url, params=query_params)
json_response = json.loads(response.text)
LOGGER.info(f"Query executed, result:{json_response}")
def create_table(self):
query = f'CREATE TABLE IF NOT EXISTS {self.table}'\
'(kpi_id SYMBOL,'\
'kpi_sample_type SYMBOL,'\
'device_id SYMBOL,'\
'endpoint_id SYMBOL,'\
'service_id SYMBOL,'\
'timestamp TIMESTAMP,'\
'kpi_value DOUBLE)'\
'TIMESTAMP(timestamp);'
self.run_query(query)
LOGGER.info(f"Table {self.table} created")
def __init__(self, host, ilp_port=9009, rest_port=9000, table="monitoring", commit_lag_ms=1000, retries=10,
postgre=False, postgre_port=8812, postgre_user='admin', postgre_password='quest'):
try:
self.host = host
self.ilp_port = int(ilp_port)
self.rest_port = rest_port
self.table = table
self.commit_lag_ms = commit_lag_ms
self.retries = retries
self.postgre = postgre
self.postgre_port = postgre_port
self.postgre_user = postgre_user
self.postgre_password = postgre_password
self.create_table()
LOGGER.info("MetricsDB initialized")
except:
LOGGER.info("MetricsDB cannot be initialized")
raise Exception("Critical error in the monitoring component")
def is_postgre_enabled(self):
LOGGER.info(f"PostgreSQL is {self.postgre}")
return self.postgre
def get_retry_number(self):
LOGGER.info(f"Retry number is {self.retries}")
return self.retries
def get_commit_lag(self):
LOGGER.info(f"Commit lag of monitoring queries is {self.commit_lag_ms} ms")
return self.commit_lag_ms
def enable_postgre_mode(self):
self.postgre = True
LOGGER.info("MetricsDB PostgreSQL query mode enabled")
def disable_postgre_mode(self):
self.postgre = False
LOGGER.info("MetricsDB REST query mode enabled")
def set_postgre_credentials(self, user, password):
self.postgre_user = user
self.postgre_password = password
LOGGER.info("MetricsDB PostgreSQL credentials changed")
def set_retry_number(self, retries):
self.retries = retries
LOGGER.info(f"Retriy number changed to {retries}")
def set_commit_lag(self, commit_lag_ms):
self.commit_lag_ms = commit_lag_ms
LOGGER.info(f"Commit lag of monitoring queries changed to {commit_lag_ms} ms")
def create_table(self):
try:
query = f'CREATE TABLE IF NOT EXISTS {self.table}' \
'(kpi_id SYMBOL,' \
'kpi_sample_type SYMBOL,' \
'device_id SYMBOL,' \
'endpoint_id SYMBOL,' \
'service_id SYMBOL,' \
'timestamp TIMESTAMP,' \
'kpi_value DOUBLE)' \
'TIMESTAMP(timestamp);'
result = self.run_query(query)
if (result == True):
LOGGER.info(f"Table {self.table} created")
except (Exception) as e:
LOGGER.debug(f"Table {self.table} cannot be created. {e}")
raise Exception
def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, kpi_value):
counter = 0
while (counter < self.retries):
try:
with Sender(self.host, self.ilp_port) as sender:
sender.row(
self.table,
symbols={
'kpi_id': kpi_id,
'kpi_sample_type': kpi_sample_type,
'device_id': device_id,
'endpoint_id': endpoint_id,
'service_id': service_id},
columns={
'kpi_value': kpi_value},
at=datetime.datetime.fromtimestamp(time))
sender.flush()
counter = self.retries
LOGGER.debug(f"KPI written in the MetricsDB")
except (Exception, IngressError) as e:
counter = counter + 1
if counter == self.retries:
raise Exception(f"Maximum number of retries achieved: {self.retries}")
def run_query(self, sql_query):
counter = 0
while (counter < self.retries):
try:
query_params = {'query': sql_query, 'fmt': 'json'}
url = f"http://{self.host}:{self.rest_port}/exec"
response = requests.get(url, params=query_params)
json_response = json.loads(response.text)
if ('ddl' in json_response):
LOGGER.debug(f"REST query executed succesfully, result: {json_response['ddl']}")
counter = self.retries
return True
elif ('dataset' in json_response):
LOGGER.debug(f"REST query executed, result: {json_response['dataset']}")
counter = self.retries
return json_response['dataset']
except (Exception, requests.exceptions.RequestException) as e:
counter = counter + 1
if counter == self.retries:
raise Exception(f"Maximum number of retries achieved: {self.retries}")
def run_query_postgre(self, postgre_sql_query):
connection = None
cursor = None
counter = 0
while (counter < self.retries):
try:
connection = psycopg2.connect(
user=self.postgre_user,
password=self.postgre_password,
host=self.host,
port=self.postgre_port,
database=self.table)
cursor = connection.cursor()
cursor.execute(postgre_sql_query)
result = cursor.fetchall()
LOGGER.debug(f"PostgreSQL query executed, result: {result}")
counter = self.retries
return result
except (Exception, psycopg2.Error) as e:
counter = counter + 1
if counter == self.retries:
raise Exception(f"Maximum number of retries achieved: {self.retries}")
finally:
if cursor:
cursor.close()
if connection:
connection.close()
def get_subscription_data(self,subs_queue, kpi_id, sampling_interval_s=1):
try:
end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000
start_date = end_date - sampling_interval_s
query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')"
if self.postgre:
kpi_list = self.run_query_postgre(query)
LOGGER.debug(f"kpi_list postgre: {kpi_list}")
else:
kpi_list = self.run_query(query)
LOGGER.debug(f"kpi_list influx: {kpi_list}")
if kpi_list:
subs_queue.put_nowait(kpi_list)
LOGGER.debug(f"New data received for subscription to KPI {kpi_id}")
else:
LOGGER.debug(f"No new data for the subscription to KPI {kpi_id}")
except (Exception) as e:
LOGGER.debug(f"Subscription data cannot be retrieved. {e}")
def get_alarm_data(self, alarm_queue, kpi_id, kpiMinValue, kpiMaxValue, inRange=True, includeMinValue=True, includeMaxValue=True,
subscription_frequency_ms=1000):
try:
end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000
start_date = end_date - subscription_frequency_ms / 1000
query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')"
if self.postgre:
kpi_list = self.run_query_postgre(query)
else:
kpi_list = self.run_query(query)
if kpi_list:
LOGGER.debug(f"New data received for alarm of KPI {kpi_id}")
for kpi in kpi_list:
alarm = False
kpi_value = kpi[2]
if (kpiMinValue == kpi_value and kpiMaxValue == kpi_value and inRange):
alarm = True
elif (
inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue):
if (kpi_value >= kpiMinValue and kpi_value <= kpiMaxValue):
alarm = True
elif (
inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue):
if (kpi_value >= kpiMinValue and kpi_value < kpiMaxValue):
alarm = True
elif (
inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue):
if (kpi_value > kpiMinValue and kpi_value <= kpiMaxValue):
alarm = True
elif (
inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue):
if (kpi_value > kpiMinValue and kpi_value < kpiMaxValue):
alarm = True
elif (
not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue):
if (kpi_value <= kpiMinValue or kpi_value >= kpiMaxValue):
alarm = True
elif (
not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue):
if (kpi_value <= kpiMinValue or kpi_value > kpiMaxValue):
alarm = True
elif (
not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue):
if (kpi_value < kpiMinValue or kpi_value >= kpiMaxValue):
alarm = True
elif (
not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue):
if (kpi_value < kpiMinValue or kpi_value > kpiMaxValue):
alarm = True
elif (inRange and kpiMinValue is not None and kpiMaxValue is None and includeMinValue):
if (kpi_value >= kpiMinValue):
alarm = True
elif (inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue):
if (kpi_value > kpiMinValue):
alarm = True
elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue):
if (kpi_value <= kpiMinValue):
alarm = True
elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue):
if (kpi_value <= kpiMinValue):
alarm = True
elif (inRange and kpiMinValue is None and kpiMaxValue is not None and includeMaxValue):
if (kpi_value <= kpiMaxValue):
alarm = True
elif (inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue):
if (kpi_value < kpiMaxValue):
alarm = True
elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue):
if (kpi_value >= kpiMaxValue):
alarm = True
elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue):
if (kpi_value >= kpiMaxValue):
alarm = True
if alarm:
# queue.append[kpi]
alarm_queue.put_nowait(kpi)
LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}")
else:
LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}")
except (Exception) as e:
LOGGER.debug(f"Alarm data cannot be retrieved. {e}")
\ No newline at end of file
......@@ -16,7 +16,7 @@ from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server
from common.tools.service.GenericGrpcService import GenericGrpcService
from .MonitoringServiceServicerImpl import MonitoringServiceServicerImpl
from monitoring.service.MonitoringServiceServicerImpl import MonitoringServiceServicerImpl
class MonitoringService(GenericGrpcService):
def __init__(self, cls_name: str = __name__) -> None:
......
This diff is collapsed.
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sqlite3 as sl
class SQLite():
def __init__(self, database):
self.client = sl.connect(database, check_same_thread=False)
self.client.execute("""
CREATE TABLE IF NOT EXISTS KPI(
kpi_id INTEGER PRIMARY KEY AUTOINCREMENT,
kpi_description TEXT,
kpi_sample_type INTEGER,
device_id INTEGER,
endpoint_id INTEGER,
service_id INTEGER
);
""")
def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id ):
c = self.client.cursor()
c.execute("SELECT kpi_id FROM KPI WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ?",(device_id,kpi_sample_type,endpoint_id))
data=c.fetchone()
if data is None:
c.execute("INSERT INTO KPI (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id))
self.client.commit()
return c.lastrowid
else:
return data[0]
def delete_KPI(self,device_id,kpi_sample_type):
c = self.client.cursor()
c.execute("SELECT kpi_id FROM KPI WHERE device_id is ? AND kpi_sample_type is ?",(device_id,kpi_sample_type))
data=c.fetchone()
if data is None:
return False
else:
c.execute("DELETE FROM KPI WHERE device_id is ? AND kpi_sample_type is ?",(device_id,kpi_sample_type))
self.client.commit()
return True
def delete_kpid_id(self,kpi_id):
c = self.client.cursor()
c.execute("SELECT * FROM KPI WHERE kpi_id is ?",(kpi_id,))
data=c.fetchone()
if data is None:
return False
else:
c.execute("DELETE FROM KPI WHERE kpi_id is ?",(kpi_id,))
self.client.commit()
return True
def get_KPI(self,kpi_id):
data = self.client.execute("SELECT * FROM KPI WHERE kpi_id is ?",(kpi_id,))
return data.fetchone()
def get_KPIS(self):
data = self.client.execute("SELECT * FROM KPI")
#print("\n")
#for row in data:
# print(row)
return data.fetchall()
\ No newline at end of file
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import pytz
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time
LOGGER = logging.getLogger(__name__)
class SubscriptionManager():
def __init__(self, metrics_db):
self.metrics_db = metrics_db
self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
self.scheduler.start()
def create_subscription(self,subs_queue, subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None):
start_date = None
end_date = None
if sampling_duration_s:
if not start_timestamp:
start_timestamp = time.time()
end_timestamp = start_timestamp + sampling_duration_s
if start_timestamp:
start_date = datetime.utcfromtimestamp(start_timestamp).isoformat()
if end_timestamp:
end_date = datetime.utcfromtimestamp(end_timestamp).isoformat()
LOGGER.debug(f"kpi_id: {kpi_id}")
LOGGER.debug(f"sampling_interval_s: {sampling_interval_s}")
LOGGER.debug(f"subscription_id: {subscription_id}")
LOGGER.debug(f"start_date: {start_date}")
self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s),
trigger='interval', seconds=sampling_interval_s, start_date=start_date,
end_date=end_date, timezone=pytz.utc, id=str(subscription_id))
LOGGER.debug(f"Subscrition job {subscription_id} succesfully created")
def delete_subscription(self, subscription_id):
self.scheduler.remove_job(subscription_id)
\ No newline at end of file
......@@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from random import random
from common.proto import monitoring_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float
from common.tools.timestamp.Converters import timestamp_utcnow_to_float
def kpi_id():
_kpi_id = monitoring_pb2.KpiId()
......@@ -32,6 +32,24 @@ def create_kpi_request():
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member
return _create_kpi_request
def create_kpi_request_b():
_create_kpi_request = monitoring_pb2.KpiDescriptor()
_create_kpi_request.kpi_description = 'KPI Description Test'
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member
_create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member
return _create_kpi_request
def create_kpi_request_c():
_create_kpi_request = monitoring_pb2.KpiDescriptor()
_create_kpi_request.kpi_description = 'KPI Description Test'
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member
_create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member
return _create_kpi_request
def monitor_kpi_request(kpi_uuid, monitoring_window_s, sampling_rate_s):
_monitor_kpi_request = monitoring_pb2.MonitorKpiRequest()
_monitor_kpi_request.kpi_id.kpi_id.uuid = kpi_uuid # pylint: disable=maybe-no-member
......@@ -43,5 +61,66 @@ def include_kpi_request(kpi_id):
_include_kpi_request = monitoring_pb2.Kpi()
_include_kpi_request.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid
_include_kpi_request.timestamp.timestamp = timestamp_utcnow_to_float()
_include_kpi_request.kpi_value.int32Val = 500 # pylint: disable=maybe-no-member
_include_kpi_request.kpi_value.floatVal = 500*random() # pylint: disable=maybe-no-member
return _include_kpi_request
def kpi_descriptor_list():
_kpi_descriptor_list = monitoring_pb2.KpiDescriptorList()
return _kpi_descriptor_list
def kpi_query():
_kpi_query = monitoring_pb2.KpiQuery()
return _kpi_query
def subs_descriptor(kpi_id):
_subs_descriptor = monitoring_pb2.SubsDescriptor()
_subs_descriptor.subs_id.subs_id.uuid = ""
_subs_descriptor.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid
_subs_descriptor.sampling_duration_s = 10
_subs_descriptor.sampling_interval_s = 2
_subs_descriptor.start_timestamp.timestamp = timestamp_utcnow_to_float()
_subs_descriptor.end_timestamp.timestamp = timestamp_utcnow_to_float() + 10
return _subs_descriptor
def subs_id():
_subs_id = monitoring_pb2.SubsDescriptor()
return _subs_id
def alarm_descriptor():
_alarm_descriptor = monitoring_pb2.AlarmDescriptor()
_alarm_descriptor.alarm_description = "Alarm Description"
_alarm_descriptor.name = "Alarm Name"
_alarm_descriptor.kpi_id.kpi_id.uuid = "1"
_alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = 0.0
_alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = 50.0
_alarm_descriptor.kpi_value_range.inRange = True
_alarm_descriptor.kpi_value_range.includeMinValue = False
_alarm_descriptor.kpi_value_range.includeMaxValue = True
return _alarm_descriptor
def alarm_descriptor_b():
_alarm_descriptor = monitoring_pb2.AlarmDescriptor()
_alarm_descriptor.kpi_id.kpi_id.uuid = "2"
return _alarm_descriptor
def alarm_subscription(alarm_id):
_alarm_descriptor = monitoring_pb2.AlarmSubscription()
_alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm_id)
return _alarm_descriptor
def alarm_id():
_alarm_id = monitoring_pb2.AlarmID()
return _alarm_id
\ No newline at end of file
......@@ -13,8 +13,15 @@
# limitations under the License.
import copy, os, pytest
import threading
import time
from time import sleep
from typing import Tuple
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from grpc._channel import _MultiThreadedRendezvous
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
......@@ -24,7 +31,9 @@ from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBack
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.proto import monitoring_pb2
from common.proto.monitoring_pb2 import KpiId, KpiDescriptor
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \
AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse
from context.client.ContextClient import ContextClient
from context.service.grpc_server.ContextService import ContextService
......@@ -38,19 +47,17 @@ from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
from device.service.drivers import DRIVERS # pylint: disable=wrong-import-position
# pylint: disable=wrong-import-position
from monitoring.client.MonitoringClient import MonitoringClient
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from monitoring.service import SqliteTools, MetricsDBTools
from monitoring.service import ManagementDBTools, MetricsDBTools
from monitoring.service.MonitoringService import MonitoringService
from monitoring.service.EventTools import EventsDeviceCollector
from monitoring.tests.Messages import create_kpi_request, include_kpi_request, monitor_kpi_request
from monitoring.tests.Messages import create_kpi_request, include_kpi_request, monitor_kpi_request, \
create_kpi_request_b, create_kpi_request_c, kpi_query, subs_descriptor, alarm_descriptor, \
alarm_subscription
from monitoring.tests.Objects import DEVICE_DEV1, DEVICE_DEV1_CONNECT_RULES, DEVICE_DEV1_UUID
from monitoring.service.MonitoringServiceServicerImpl import LOGGER
# LOGGER = getJSONLogger('monitoringservice-server')
# LOGGER.setLevel('DEBUG')
###########################
# Tests Setup
......@@ -151,9 +158,9 @@ def monitoring_client(monitoring_service : MonitoringService): # pylint: disable
_client.close()
@pytest.fixture(scope='session')
def sql_db():
_sql_db = SqliteTools.SQLite('monitoring.db')
return _sql_db
def management_db():
_management_db = ManagementDBTools.ManagementDB('monitoring.db')
return _management_db
@pytest.fixture(scope='session')
def metrics_db():
......@@ -161,7 +168,21 @@ def metrics_db():
METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE)
return _metrics_db
@pytest.fixture(scope='session')
def subs_scheduler():
_scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
_scheduler.start()
return _scheduler
def ingestion_data(monitoring_client):
_kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
_include_kpi_request = include_kpi_request(_kpi_id)
for i in range(200):
_include_kpi_request = include_kpi_request(_kpi_id)
monitoring_client.IncludeKpi(_include_kpi_request)
time.sleep(0.01)
###########################
# Tests Implementation
......@@ -173,8 +194,44 @@ def test_set_kpi(monitoring_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_create_kpi requesting')
response = monitoring_client.SetKpi(create_kpi_request())
LOGGER.debug(str(response))
response = monitoring_client.SetKpi(create_kpi_request_b())
LOGGER.debug(str(response))
assert isinstance(response, KpiId)
# Test case that makes use of client fixture to test server's DeleteKpi method
def test_delete_kpi(monitoring_client): # pylint: disable=redefined-outer-name
# make call to server
LOGGER.warning('delete_kpi requesting')
response = monitoring_client.SetKpi(create_kpi_request_b())
response = monitoring_client.DeleteKpi(response)
LOGGER.debug(str(response))
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's GetKpiDescriptor method
def test_get_kpidescritor(monitoring_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_getkpidescritor_kpi begin')
response = monitoring_client.SetKpi(create_kpi_request_c())
response = monitoring_client.GetKpiDescriptor(response)
LOGGER.debug(str(response))
assert isinstance(response, KpiDescriptor)
# Test case that makes use of client fixture to test server's GetKpiDescriptor method
def test_get_kpi_descriptor_list(monitoring_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_getkpidescritor_kpi begin')
response = monitoring_client.GetKpiDescriptorList(Empty())
LOGGER.debug(str(response))
assert isinstance(response, KpiDescriptorList)
# Test case that makes use of client fixture to test server's IncludeKpi method
def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name
# make call to server
LOGGER.warning('test_include_kpi requesting')
kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
response = monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
LOGGER.debug(str(response))
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's MonitorKpi method
def test_monitor_kpi(
context_client : ContextClient, # pylint: disable=redefined-outer-name
......@@ -210,13 +267,105 @@ def test_monitor_kpi(
LOGGER.debug(str(response))
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's IncludeKpi method
def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name
# make call to server
LOGGER.warning('test_include_kpi requesting')
kpi_id = monitoring_client.SetKpi(create_kpi_request())
response = monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
# Test case that makes use of client fixture to test server's QueryKpiData method
def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_query_kpi_data')
response = monitoring_client.QueryKpiData(kpi_query())
LOGGER.debug(str(response))
assert isinstance(response, KpiList)
def test_ingestion_data(monitoring_client):
_kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
_include_kpi_request = include_kpi_request(_kpi_id)
for i in range(100):
_include_kpi_request = include_kpi_request(_kpi_id)
monitoring_client.IncludeKpi(_include_kpi_request)
time.sleep(0.01)
# def test_subscription_scheduler(monitoring_client,metrics_db,subs_scheduler):
# subs_scheduler.add_job(ingestion_data(monitoring_client),id="1")
# Test case that makes use of client fixture to test server's SetKpiSubscription method
def test_set_kpi_subscription(monitoring_client,metrics_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_set_kpi_subscription')
kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
# thread = threading.Thread(target=test_ingestion_data, args=(monitoring_client,metrics_db))
# thread.start()
monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id))
assert isinstance(response, _MultiThreadedRendezvous)
LOGGER.debug(response)
for item in response:
LOGGER.debug(item)
assert isinstance(item, SubsResponse)
# Test case that makes use of client fixture to test server's GetSubsDescriptor method
def test_get_subs_descriptor(monitoring_client):
LOGGER.warning('test_get_subs_descriptor')
kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id))
for item in response:
response = monitoring_client.GetSubsDescriptor(item.subs_id)
LOGGER.debug(response)
assert isinstance(response, SubsDescriptor)
# Test case that makes use of client fixture to test server's GetSubscriptions method
def test_get_subscriptions(monitoring_client):
LOGGER.warning('test_get_subscriptions')
response = monitoring_client.GetSubscriptions(Empty())
LOGGER.debug(response)
assert isinstance(response, SubsList)
# Test case that makes use of client fixture to test server's DeleteSubscription method
def test_delete_subscription(monitoring_client):
LOGGER.warning('test_delete_subscription')
kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
subs = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id))
for item in subs:
response = monitoring_client.DeleteSubscription(item.subs_id)
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's SetKpiAlarm method
def test_set_kpi_alarm(monitoring_client):
LOGGER.warning('test_set_kpi_alarm')
response = monitoring_client.SetKpiAlarm(alarm_descriptor())
LOGGER.debug(str(response))
assert isinstance(response, AlarmID)
# Test case that makes use of client fixture to test server's GetAlarms method
def test_get_alarms(monitoring_client):
LOGGER.warning('test_get_alarms')
response = monitoring_client.GetAlarms(Empty())
LOGGER.debug(response)
assert isinstance(response, AlarmList)
# Test case that makes use of client fixture to test server's GetAlarmDescriptor method
def test_get_alarm_descriptor(monitoring_client):
LOGGER.warning('test_get_alarm_descriptor')
alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor())
response = monitoring_client.GetAlarmDescriptor(alarm_id)
LOGGER.debug(response)
assert isinstance(response, AlarmDescriptor)
# Test case that makes use of client fixture to test server's GetAlarmResponseStream method
def test_get_alarm_response_stream(monitoring_client):
LOGGER.warning('test_get_alarm_descriptor')
alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor())
response = monitoring_client.GetAlarmResponseStream(alarm_subscription(alarm_id))
assert isinstance(response, _MultiThreadedRendezvous)
for item in response:
LOGGER.debug(response)
assert isinstance(item,AlarmResponse)
# Test case that makes use of client fixture to test server's DeleteAlarm method
def test_delete_alarm(monitoring_client):
LOGGER.warning('test_delete_alarm')
alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor())
response = monitoring_client.DeleteAlarm(alarm_id)
LOGGER.debug(type(response))
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's GetStreamKpi method
......@@ -224,26 +373,22 @@ def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-na
LOGGER.warning('test_getstream_kpi begin')
response = monitoring_client.GetStreamKpi(monitoring_pb2.Kpi())
LOGGER.debug(str(response))
#assert isinstance(response, Kpi)
# Test case that makes use of client fixture to test server's GetInstantKpi method
# def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name
# LOGGER.warning('test_getinstant_kpi begin')
# response = monitoring_client.GetInstantKpi(kpi_id())
# LOGGER.debug(str(response))
# # assert isinstance(response, Kpi)
assert isinstance(response, _MultiThreadedRendezvous)
# Test case that makes use of client fixture to test server's GetInstantKpi method
def test_get_kpidescritor_kpi(monitoring_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_getkpidescritor_kpi begin')
response = monitoring_client.SetKpi(create_kpi_request())
# LOGGER.debug(str(response))
response = monitoring_client.GetKpiDescriptor(response)
# LOGGER.debug(str(response))
assert isinstance(response, KpiDescriptor)
def test_sqlitedb_tools_insert_kpi(sql_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_sqlitedb_tools_insert_kpi begin')
def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_getinstant_kpi begin')
kpi_id = monitoring_client.SetKpi(KpiId())
monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
sleep(0.3)
response = monitoring_client.GetInstantKpi(kpi_id)
LOGGER.debug(response)
assert isinstance(response, Kpi)
response = monitoring_client.GetInstantKpi(KpiId())
LOGGER.debug(type(response))
assert response.kpi_id.kpi_id.uuid == "NoID"
def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_managementdb_tools_insert_kpi begin')
_create_kpi_request = create_kpi_request()
kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member
kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member
......@@ -251,11 +396,11 @@ def test_sqlitedb_tools_insert_kpi(sql_db): # pylint: disable=redefined-outer-na
kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member
response = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
assert isinstance(response, int)
def test_sqlitedb_tools_get_kpi(sql_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_sqlitedb_tools_get_kpi begin')
def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_managementdb_tools_get_kpi begin')
_create_kpi_request = create_kpi_request()
kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member
kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member
......@@ -263,52 +408,32 @@ def test_sqlitedb_tools_get_kpi(sql_db): # pylint: disable=redefined-outer-name
kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member
_kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
response = sql_db.get_KPI(_kpi_id)
_kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
response = management_db.get_KPI(_kpi_id)
assert isinstance(response, tuple)
def test_sqlitedb_tools_get_kpis(sql_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_sqlitedb_tools_get_kpis begin')
response = sql_db.get_KPIS()
def test_managementdb_tools_get_kpis(management_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_managementdb_tools_get_kpis begin')
response = management_db.get_KPIS()
assert isinstance(response, list)
def test_sqlitedb_tools_delete_kpi(sql_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_sqlitedb_tools_get_kpi begin')
response = sql_db.delete_KPI("DEV1",KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED)
if not response:
_create_kpi_request = create_kpi_request()
kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member
kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member
kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member
kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member
def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_managementdb_tools_get_kpi begin')
sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
response = sql_db.delete_KPI("DEV1", KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED)
assert response
def test_sqlitedb_tools_delete_kpid_id(sql_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_sqlitedb_tools_delete_kpid_id begin')
response = sql_db.delete_kpid_id(1)
_create_kpi_request = create_kpi_request()
kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member
kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member
kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member
kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member
if not response:
_create_kpi_request = create_kpi_request()
kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member
kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member
kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member
kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member
_kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id,
kpi_service_id)
_kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
response = sql_db.delete_kpid_id(_kpi_id)
response = management_db.delete_KPI(_kpi_id)
assert response
def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_metric_sdb_tools_write_kpi begin')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment