diff --git a/proto/monitoring.proto b/proto/monitoring.proto index 8b83afa47b49c130d37dcbcc1024f079ebc2a2fe..9be39db909d915b2a9b5d99b01841db028959543 100644 --- a/proto/monitoring.proto +++ b/proto/monitoring.proto @@ -19,24 +19,24 @@ import "context.proto"; import "kpi_sample_types.proto"; service MonitoringService { - rpc SetKpi (KpiDescriptor ) returns (KpiId ) {} - rpc DeleteKpi (KpiId ) returns (context.Empty ) {} - rpc GetKpiDescriptor (KpiId ) returns (KpiDescriptor ) {} - rpc GetKpiDescriptorList (context.Empty ) returns (KpiDescriptorList ) {} - rpc IncludeKpi (Kpi ) returns (context.Empty ) {} - rpc MonitorKpi (MonitorKpiRequest ) returns (context.Empty ) {} - rpc QueryKpiData (KpiQuery ) returns (KpiList ) {} - rpc SetKpiSubscription (SubsDescriptor ) returns (stream KpiList ) {} - rpc GetSubsDescriptor (SubscriptionID ) returns (SubsDescriptor ) {} - rpc GetSubscriptions (context.Empty ) returns (SubsIDList ) {} - rpc DeleteSubscription (SubscriptionID ) returns (context.Empty ) {} - rpc SetKpiAlarm (AlarmDescriptor ) returns (AlarmID ) {} - rpc GetAlarms (context.Empty ) returns (AlarmIDList ) {} - rpc GetAlarmDescriptor (AlarmID ) returns (AlarmDescriptor ) {} - rpc GetAlarmResponseStream(AlarmSubscription ) returns (stream AlarmResponse) {} - rpc DeleteAlarm (AlarmID ) returns (context.Empty ) {} - rpc GetStreamKpi (KpiId ) returns (stream Kpi ) {} - rpc GetInstantKpi (KpiId ) returns (KpiList ) {} + rpc SetKpi (KpiDescriptor ) returns (KpiId ) {} // Stable not final + rpc DeleteKpi (KpiId ) returns (context.Empty ) {} // Stable and final + rpc GetKpiDescriptor (KpiId ) returns (KpiDescriptor ) {} // Stable and final + rpc GetKpiDescriptorList (context.Empty ) returns (KpiDescriptorList ) {} // Stable and final + rpc IncludeKpi (Kpi ) returns (context.Empty ) {} // Stable and final + rpc MonitorKpi (MonitorKpiRequest ) returns (context.Empty ) {} // Stable and final + rpc QueryKpiData (KpiQuery ) returns (KpiList ) {} // Not implemented + rpc SetKpiSubscription (SubsDescriptor ) returns (stream SubsResponse ) {} // Stable not final + rpc GetSubsDescriptor (SubscriptionID ) returns (SubsDescriptor ) {} // Stable and final + rpc GetSubscriptions (context.Empty ) returns (SubsList ) {} // Stable and final + rpc DeleteSubscription (SubscriptionID ) returns (context.Empty ) {} // Stable and final + rpc SetKpiAlarm (AlarmDescriptor ) returns (AlarmID ) {} // Stable not final + rpc GetAlarms (context.Empty ) returns (AlarmList ) {} // Stable and final + rpc GetAlarmDescriptor (AlarmID ) returns (AlarmDescriptor ) {} // Stable and final + rpc GetAlarmResponseStream(AlarmSubscription ) returns (stream AlarmResponse) {} // Not Stable not final + rpc DeleteAlarm (AlarmID ) returns (context.Empty ) {} // Stable and final + rpc GetStreamKpi (KpiId ) returns (stream Kpi ) {} // Stable not final + rpc GetInstantKpi (KpiId ) returns (Kpi ) {} // Stable not final } message KpiDescriptor { @@ -58,7 +58,7 @@ message MonitorKpiRequest { } message KpiQuery { - repeated KpiId kpi_id = 1; + KpiId kpi_id = 1; float monitoring_window_s = 2; float sampling_rate_s = 3; uint32 last_n_samples = 4; // used when you want something like "get the last N many samples @@ -99,7 +99,7 @@ message KpiValue { message KpiList { - repeated Kpi kpi_list = 1; + repeated Kpi kpi = 1; } message KpiDescriptorList { @@ -122,19 +122,19 @@ message SubscriptionID { message SubsResponse { SubscriptionID subs_id = 1; - repeated KpiList kpi_list = 2; + KpiList kpi_list = 2; } -message SubsIDList { - repeated SubscriptionID subs_list = 1; +message SubsList { + repeated SubsDescriptor subs_descriptor = 1; } message AlarmDescriptor { AlarmID alarm_id = 1; string alarm_description = 2; string name = 3; - repeated KpiId kpi_id = 4; - repeated KpiValueRange kpi_value_range = 5; + KpiId kpi_id = 4; + KpiValueRange kpi_value_range = 5; context.Timestamp timestamp = 6; } @@ -143,7 +143,7 @@ message AlarmID{ } message AlarmSubscription{ - AlarmID alarmID = 1; + AlarmID alarm_id = 1; float subscription_timeout_s = 2; float subscription_frequency_ms = 3; } @@ -151,10 +151,9 @@ message AlarmSubscription{ message AlarmResponse { AlarmID alarm_id = 1; string text = 2; - KpiValue kpi_value = 3; - context.Timestamp timestamp = 4; + KpiList kpi_list = 3; } -message AlarmIDList { - repeated AlarmID alarm_list = 1; +message AlarmList { + repeated AlarmDescriptor alarm_descriptor = 1; } diff --git a/src/monitoring/client/MonitoringClient.py b/src/monitoring/client/MonitoringClient.py index f65072f19013b820312aa56b7f0062f9c95f712c..73607a081cd57e7c62b9c4e2c5e487868e72d189 100644 --- a/src/monitoring/client/MonitoringClient.py +++ b/src/monitoring/client/MonitoringClient.py @@ -21,8 +21,8 @@ from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string from common.proto.context_pb2 import Empty from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest, \ - KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsIDList, \ - AlarmDescriptor, AlarmID, AlarmIDList, AlarmResponse, AlarmSubscription + KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsList, \ + SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription from common.proto.monitoring_pb2_grpc import MonitoringServiceStub LOGGER = logging.getLogger(__name__) @@ -100,10 +100,10 @@ class MonitoringClient: return response @RETRY_DECORATOR - def SubscribeKpi(self, request : SubsDescriptor) -> Iterator[KpiList]: - LOGGER.debug('SubscribeKpi: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.SubscribeKpi(request) - LOGGER.debug('SubscribeKpi result: {:s}'.format(grpc_message_to_json_string(response))) + def SetKpiSubscription(self, request : SubsDescriptor) -> Iterator[SubsResponse]: + LOGGER.debug('SetKpiSubscription: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.SetKpiSubscription(request) + LOGGER.debug('SetKpiSubscription result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR @@ -114,7 +114,7 @@ class MonitoringClient: return response @RETRY_DECORATOR - def GetSubscriptions(self, request : Empty) -> SubsIDList: + def GetSubscriptions(self, request : Empty) -> SubsList: LOGGER.debug('GetSubscriptions: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetSubscriptions(request) LOGGER.debug('GetSubscriptions result: {:s}'.format(grpc_message_to_json_string(response))) @@ -135,7 +135,7 @@ class MonitoringClient: return response @RETRY_DECORATOR - def GetAlarms(self, request : Empty) -> AlarmIDList: + def GetAlarms(self, request : Empty) -> AlarmList: LOGGER.debug('GetAlarms: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetAlarms(request) LOGGER.debug('GetAlarms result: {:s}'.format(grpc_message_to_json_string(response))) diff --git a/src/monitoring/requirements.in b/src/monitoring/requirements.in index 50f283a1940ed99d16276857d2cab22220921879..c07f0c8f4079482a20a138d190004fa314fc9860 100644 --- a/src/monitoring/requirements.in +++ b/src/monitoring/requirements.in @@ -17,6 +17,7 @@ redis==4.1.2 requests==2.27.1 xmltodict==0.12.0 questdb==1.0.1 +psycopg2-binary==2.9.3 # pip's dependency resolver does not take into account installed packages. # p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one diff --git a/src/monitoring/service/AlarmManager.py b/src/monitoring/service/AlarmManager.py new file mode 100644 index 0000000000000000000000000000000000000000..e5ac8915c3728c7894dc70ab901215dd5a7feb41 --- /dev/null +++ b/src/monitoring/service/AlarmManager.py @@ -0,0 +1,32 @@ +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.executors.pool import ProcessPoolExecutor +from apscheduler.jobstores.base import JobLookupError +from datetime import datetime +import time +import logging + +LOGGER = logging.getLogger(__name__) + +class AlarmManager(): + def __init__(self, metrics_db): + self.metrics_db = metrics_db + self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) + self.scheduler.start() + LOGGER.info("Alarm Manager Initialized") + + def create_alarm(self, alarm_queue,alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms, subscription_timeout_s=None): + start_date=None + end_date=None + if subscription_timeout_s: + start_timestamp=time.time() + start_date=datetime.fromtimestamp(start_timestamp) + end_date=datetime.fromtimestamp(start_timestamp+subscription_timeout_s) + self.scheduler.add_job(self.metrics_db.get_alarm_data, args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, end_date=end_date, id=alarm_id) + LOGGER.debug(f"Alarm job {alarm_id} succesfully created") + + def delete_alarm(self, alarm_id): + try: + self.scheduler.remove_job(alarm_id) + LOGGER.debug(f"Alarm job {alarm_id} succesfully deleted") + except (Exception, JobLookupError) as e: + LOGGER.debug(f"Alarm job {alarm_id} does not exists") diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py new file mode 100644 index 0000000000000000000000000000000000000000..2387ddde0ab9eecea6c8fc982ba97a94f1a88c98 --- /dev/null +++ b/src/monitoring/service/ManagementDBTools.py @@ -0,0 +1,248 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sqlite3 +import logging + +LOGGER = logging.getLogger(__name__) + +class ManagementDB(): + def __init__(self, database): + try: + self.client = sqlite3.connect(database, check_same_thread=False) + self.create_monitoring_table() + self.create_subscription_table() + self.create_alarm_table() + LOGGER.info("ManagementDB initialized") + except: + LOGGER.info("ManagementDB cannot be initialized") + raise Exception("Critical error in the monitoring component") + + def create_monitoring_table(self): + try: + result=self.client.execute(""" + CREATE TABLE IF NOT EXISTS kpi( + kpi_id INTEGER PRIMARY KEY AUTOINCREMENT, + kpi_description TEXT, + kpi_sample_type INTEGER, + device_id INTEGER, + endpoint_id INTEGER, + service_id INTEGER + ); + """) + LOGGER.debug("KPI table created in the ManagementDB") + except sqlite3.Error as e: + LOGGER.debug(f"KPI table cannot be created in the ManagementD. {e}") + raise Exception + + def create_subscription_table(self): + try: + result= self.client.execute(""" + CREATE TABLE IF NOT EXISTS subscription( + subs_id INTEGER PRIMARY KEY AUTOINCREMENT, + kpi_id INTEGER, + subscriber TEXT, + sampling_duration_s REAL, + sampling_interval_s REAL, + start_timestamp REAL, + end_timestamp REAL + ); + """) + LOGGER.info("Subscription table created in the ManagementDB") + except sqlite3.Error as e: + LOGGER.debug(f"Subscription table cannot be created in the ManagementDB. {e}") + raise Exception + + def create_alarm_table(self): + try: + result=self.client.execute(""" + CREATE TABLE IF NOT EXISTS alarm( + alarm_id INTEGER PRIMARY KEY AUTOINCREMENT, + alarm_description TEXT, + alarm_name TEXT, + kpi_id INTEGER, + kpi_min_value REAL, + kpi_max_value REAL, + in_range INTEGER, + include_min_value INTEGER, + include_max_value INTEGER + ); + """) + LOGGER.info("Alarm table created in the ManagementDB") + except sqlite3.Error as e: + LOGGER.debug(f"Alarm table cannot be created in the ManagementDB. {e}") + raise Exception + + def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id): + try: + c = self.client.cursor() + c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id)) + data=c.fetchone() + if data is None: + c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id)) + self.client.commit() + kpi_id = c.lastrowid + LOGGER.debug(f"KPI {kpi_id} succesfully inserted in the ManagementDB") + return kpi_id + else: + kpi_id = data[0] + LOGGER.debug(f"KPI {kpi_id} already exists") + return kpi_id + except sqlite3.Error as e: + LOGGER.debug("KPI cannot be inserted in the ManagementDB: {e}") + + def insert_subscription(self,kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp): + try: + c = self.client.cursor() + c.execute("SELECT subs_id FROM subscription WHERE kpi_id is ? AND subscriber is ? AND sampling_duration_s is ? AND sampling_interval_s is ? AND start_timestamp is ? AND end_timestamp is ?",(kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp)) + data=c.fetchone() + if data is None: + c.execute("INSERT INTO subscription (kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp) VALUES (?,?,?,?,?,?)", (kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp)) + self.client.commit() + subs_id = c.lastrowid + LOGGER.debug(f"Subscription {subs_id} succesfully inserted in the ManagementDB") + return subs_id + else: + subs_id = data[0] + LOGGER.debug(f"Subscription {subs_id} already exists") + return subs_id + except sqlite3.Error as e: + LOGGER.debug("Subscription cannot be inserted in the ManagementDB: {e}") + + def insert_alarm(self,alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value): + try: + c = self.client.cursor() + c.execute("SELECT alarm_id FROM alarm WHERE alarm_description is ? AND alarm_name is ? AND kpi_id is ? AND kpi_min_value is ? AND kpi_max_value is ? AND in_range is ? AND include_min_value is ? AND include_max_value is ?",(alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value)) + data=c.fetchone() + if data is None: + c.execute("INSERT INTO alarm (alarm_description, alarm_name, kpi_id, kpi_min_value, kpi_max_value, in_range, include_min_value, include_max_value) VALUES (?,?,?,?,?,?,?,?)", (alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value)) + self.client.commit() + alarm_id=c.lastrowid + LOGGER.debug(f"Alarm {alarm_id} succesfully inserted in the ManagementDB") + return alarm_id + else: + alarm_id=data[0] + LOGGER.debug(f"Alarm {alarm_id} already exists") + return alarm_id + except sqlite3.Error as e: + LOGGER.debug(f"Alarm cannot be inserted in the ManagementDB: {e}") + + def delete_KPI(self,kpi_id): + try: + c = self.client.cursor() + c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)) + data=c.fetchone() + if data is None: + LOGGER.debug(f"KPI {kpi_id} does not exists") + return False + else: + c.execute("DELETE FROM kpi WHERE kpi_id is ?",(kpi_id,)) + self.client.commit() + LOGGER.debug(f"KPI {kpi_id} deleted from the ManagementDB") + return True + except sqlite3.Error as e: + LOGGER.debug(f"KPI cannot be deleted from the ManagementDB: {e}") + + def delete_subscription(self,subs_id): + try: + c = self.client.cursor() + c.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,)) + data=c.fetchone() + if data is None: + LOGGER.debug(f"Subscription {subs_id} does not exists") + return False + else: + c.execute("DELETE FROM subscription WHERE subs_id is ?",(subs_id,)) + self.client.commit() + LOGGER.debug(f"Subscription {subs_id} deleted from the ManagementDB") + return True + except sqlite3.Error as e: + LOGGER.debug(f"Subscription cannot be deleted from the ManagementDB: {e}") + + def delete_alarm(self,alarm_id): + try: + c = self.client.cursor() + c.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,)) + data=c.fetchone() + if data is None: + LOGGER.debug(f"Alarm {alarm_id} does not exists") + return False + else: + c.execute("DELETE FROM alarm WHERE alarm_id is ?",(alarm_id,)) + self.client.commit() + LOGGER.debug(f"Alarm {alarm_id} deleted from the ManagementDB") + return True + except sqlite3.Error as e: + LOGGER.debug(f"Alarm cannot be deleted from the ManagementDB: {e}") + + def get_KPI(self,kpi_id): + try: + data = self.client.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)).fetchone() + if data: + LOGGER.debug(f"KPI {kpi_id} succesfully retrieved from the ManagementDB") + return data + else: + LOGGER.debug(f"KPI {kpi_id} does not exists") + return data + except sqlite3.Error as e: + LOGGER.debug(f"KPI {kpi_id} cannot be retrieved from the ManagementDB: {e}") + + def get_subscription(self,subs_id): + try: + data = self.client.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,)).fetchone() + if data: + LOGGER.debug(f"Subscription {subs_id} succesfully retrieved from the ManagementDB") + return data + else: + LOGGER.debug(f"Subscription {subs_id} does not exists") + return data + except sqlite3.Error as e: + LOGGER.debug(f"Subscription {subs_id} cannot be retrieved from the ManagementDB: {e}") + + def get_alarm(self,alarm_id): + try: + data = self.client.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,)).fetchone() + if data: + LOGGER.debug(f"Alarm {alarm_id} succesfully retrieved from the ManagementDB") + return data + else: + print(data) + LOGGER.debug(f"Alarm {alarm_id} does not exists") + return data + except sqlite3.Error as e: + LOGGER.debug(f"Alarm {alarm_id} cannot be retrieved from the ManagementDB: {e}") + + def get_KPIS(self): + try: + data = self.client.execute("SELECT * FROM kpi").fetchall() + LOGGER.debug(f"KPIs succesfully retrieved from the ManagementDB") + return data + except sqlite3.Error as e: + LOGGER.debug(f"KPIs cannot be retrieved from the ManagementDB: {e}") + + def get_subscriptions(self): + try: + data = self.client.execute("SELECT * FROM subscription").fetchall() + LOGGER.debug(f"Subscriptions succesfully retrieved from the ManagementDB") + return data + except sqlite3.Error as e: + LOGGER.debug(f"Subscriptions cannot be retrieved from the ManagementDB: {e}") + + def get_alarms(self): + try: + data = self.client.execute("SELECT * FROM alarm").fetchall() + LOGGER.debug(f"Alarms succesfully retrieved from the ManagementDB") + return data + except sqlite3.Error as e: + LOGGER.debug(f"Alarms cannot be retrieved from the ManagementDB: {e}") \ No newline at end of file diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index dc194c430c9700a2d89e0757c75c64025082ac29..16e6373f542656b4c172c8d619bf3f17ca5df404 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -12,64 +12,261 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time +from random import random + from questdb.ingress import Sender, IngressError import requests import json import logging import datetime +from common.tools.timestamp.Converters import timestamp_float_to_string, timestamp_utcnow_to_float +import psycopg2 LOGGER = logging.getLogger(__name__) + class MetricsDB(): - def __init__(self, host, ilp_port, rest_port, table): - self.host=host - self.ilp_port=int(ilp_port) - self.rest_port=rest_port - self.table=table - self.create_table() - - def write_KPI(self,time,kpi_id,kpi_sample_type,device_id,endpoint_id,service_id,kpi_value): - counter=0 - number_of_retries=10 - while (counter= kpiMinValue and kpi_value <= kpiMaxValue): + alarm = True + elif ( + inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): + if (kpi_value >= kpiMinValue and kpi_value < kpiMaxValue): + alarm = True + elif ( + inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): + if (kpi_value > kpiMinValue and kpi_value <= kpiMaxValue): + alarm = True + elif ( + inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): + if (kpi_value > kpiMinValue and kpi_value < kpiMaxValue): + alarm = True + elif ( + not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue): + if (kpi_value <= kpiMinValue or kpi_value >= kpiMaxValue): + alarm = True + elif ( + not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): + if (kpi_value <= kpiMinValue or kpi_value > kpiMaxValue): + alarm = True + elif ( + not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): + if (kpi_value < kpiMinValue or kpi_value >= kpiMaxValue): + alarm = True + elif ( + not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): + if (kpi_value < kpiMinValue or kpi_value > kpiMaxValue): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is None and includeMinValue): + if (kpi_value >= kpiMinValue): + alarm = True + elif (inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): + if (kpi_value > kpiMinValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): + if (kpi_value <= kpiMinValue): + alarm = True + elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): + if (kpi_value <= kpiMinValue): + alarm = True + elif (inRange and kpiMinValue is None and kpiMaxValue is not None and includeMaxValue): + if (kpi_value <= kpiMaxValue): + alarm = True + elif (inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): + if (kpi_value < kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): + if (kpi_value >= kpiMaxValue): + alarm = True + elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): + if (kpi_value >= kpiMaxValue): + alarm = True + if alarm: + # queue.append[kpi] + alarm_queue.put_nowait(kpi) + LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") + else: + LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") + except (Exception) as e: + LOGGER.debug(f"Alarm data cannot be retrieved. {e}") \ No newline at end of file diff --git a/src/monitoring/service/MonitoringService.py b/src/monitoring/service/MonitoringService.py index 1a79ef9c131f8c24e50e62423a06181b4164753b..e2cbe2862894aec7b571ae857ad4c4fffa3c94c6 100644 --- a/src/monitoring/service/MonitoringService.py +++ b/src/monitoring/service/MonitoringService.py @@ -16,7 +16,7 @@ from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc from common.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server from common.tools.service.GenericGrpcService import GenericGrpcService -from .MonitoringServiceServicerImpl import MonitoringServiceServicerImpl +from monitoring.service.MonitoringServiceServicerImpl import MonitoringServiceServicerImpl class MonitoringService(GenericGrpcService): def __init__(self, cls_name: str = __name__) -> None: diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index df3b907415aabe0ed4c276ac6ac09582636ebe6b..7cd47f187986a0c32eea2ac8405183ac4418d100 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os, grpc, logging +import os, grpc +from queue import Queue from typing import Iterator @@ -23,16 +24,20 @@ from common.proto.context_pb2 import Empty from common.proto.device_pb2 import MonitoringSettings from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer -from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmIDList, SubsIDList, KpiId, \ +from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \ KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \ - MonitorKpiRequest, Kpi, AlarmSubscription + MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse from common.rpc_method_wrapper.ServiceExceptions import ServiceException +from common.tools.timestamp.Converters import timestamp_string_to_float -from monitoring.service import SqliteTools, MetricsDBTools +from monitoring.service import ManagementDBTools, MetricsDBTools from device.client.DeviceClient import DeviceClient from prometheus_client import Counter, Summary +from monitoring.service.AlarmManager import AlarmManager +from monitoring.service.SubscriptionManager import SubscriptionManager + LOGGER = getJSONLogger('monitoringservice-server') LOGGER.setLevel('DEBUG') @@ -40,14 +45,14 @@ MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary( 'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') -METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME") -METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT") +METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME") +METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT") METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT") -METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE") - +METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE") -DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=get_service_host(ServiceNameEnum.DEVICE) ) -DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=get_service_port_grpc(ServiceNameEnum.DEVICE)) +DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=get_service_host(ServiceNameEnum.DEVICE)) +DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', + default=get_service_port_grpc(ServiceNameEnum.DEVICE)) class MonitoringServiceServicerImpl(MonitoringServiceServicer): @@ -55,34 +60,41 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('Init monitoringService') # Init sqlite monitoring db - self.sql_db = SqliteTools.SQLite('monitoring.db') - self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client - - self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME,METRICSDB_ILP_PORT,METRICSDB_REST_PORT,METRICSDB_TABLE) + self.management_db = ManagementDBTools.ManagementDB('monitoring.db') + self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, + port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client + + self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, + METRICSDB_TABLE) + self.subs_manager = SubscriptionManager(self.metrics_db) + self.alarm_manager = AlarmManager(self.metrics_db) LOGGER.info('MetricsDB initialized') # SetKpi (SetKpiRequest) returns (KpiId) {} def SetKpi( - self, request : KpiDescriptor, grpc_context : grpc.ServicerContext + self, request: KpiDescriptor, grpc_context: grpc.ServicerContext ) -> KpiId: # CREATEKPI_COUNTER_STARTED.inc() LOGGER.info('SetKpi') try: # Here the code to create a sqlite query to crete a KPI and return a KpiID - kpi_id = KpiId() + response = KpiId() kpi_description = request.kpi_description kpi_sample_type = request.kpi_sample_type - kpi_device_id = request.device_id.device_uuid.uuid + kpi_device_id = request.device_id.device_uuid.uuid kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid - kpi_service_id = request.service_id.service_uuid.uuid + kpi_service_id = request.service_id.service_uuid.uuid - data = self.sql_db.insert_KPI( - kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + if request.kpi_id.kpi_id.uuid is not "": + response.kpi_id.uuid = request.kpi_id.kpi_id.uuid + # Here the code to modify an existing kpi + else: + data = self.management_db.insert_KPI( + kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + response.kpi_id.uuid = str(data) - kpi_id.kpi_id.uuid = str(data) - # CREATEKPI_COUNTER_COMPLETED.inc() - return kpi_id + return response except ServiceException as e: LOGGER.exception('SetKpi exception') # CREATEKPI_COUNTER_FAILED.inc() @@ -92,11 +104,17 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) - def DeleteKpi ( self, request : KpiId, grpc_context : grpc.ServicerContext) -> Empty: + def DeleteKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Empty: LOGGER.info('DeleteKpi') try: - # TBC + LOGGER.debug(f'DeleteKpi with KpiID: {request.kpi_id.uuid}') + kpi_id = int(request.kpi_id.uuid) + kpi = self.management_db.get_KPI(kpi_id) + if kpi: + self.management_db.delete_KPI(kpi_id) + else: + LOGGER.info('DeleteKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) return Empty() except ServiceException as e: LOGGER.exception('DeleteKpi exception') @@ -104,71 +122,77 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): except Exception as e: # pragma: no cover LOGGER.exception('DeleteKpi exception') - def GetKpiDescriptorList ( self, request : Empty, grpc_context : grpc.ServicerContext) -> KpiDescriptorList: - - LOGGER.info('GetKpiDescriptorList') + def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiDescriptor: + LOGGER.info('getting Kpi by KpiID') try: - # TBC - return KpiDescriptorList() + kpi_id = request.kpi_id.uuid + kpi_db = self.management_db.get_KPI(int(kpi_id)) + kpiDescriptor = KpiDescriptor() + if kpi_db is None: + LOGGER.info('GetKpiDescriptor error: KpiID({:s}): not found in database'.format(str(kpi_id))) + else: + kpiDescriptor.kpi_description = kpi_db[1] + kpiDescriptor.kpi_sample_type = kpi_db[2] + kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) + kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) + kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) + return kpiDescriptor except ServiceException as e: - LOGGER.exception('GetKpiDescriptorList exception') + LOGGER.exception('GetKpiDescriptor exception') grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('GetKpiDescriptorList exception') + except Exception: # pragma: no cover + LOGGER.exception('GetKpiDescriptor exception') - # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {} - def MonitorKpi ( self, request : MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> Empty: + def GetKpiDescriptorList(self, request: Empty, grpc_context: grpc.ServicerContext) -> KpiDescriptorList: - LOGGER.info('MonitorKpi') + LOGGER.info('GetKpiDescriptorList') try: - # Sets the request to send to the device service - monitor_device_request = MonitoringSettings() + kpi_descriptor_list = KpiDescriptorList() - kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) + data = self.management_db.get_KPIS() + LOGGER.debug(f"data: {data}") - monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor) - monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid - monitor_device_request.sampling_duration_s = request.monitoring_window_s - monitor_device_request.sampling_interval_s = request.sampling_rate_s + for item in data: + kpi_descriptor = KpiDescriptor() + kpi_descriptor.kpi_id.kpi_id.uuid = str(item[0]) + kpi_descriptor.kpi_description = item[1] + kpi_descriptor.kpi_sample_type = item[2] + kpi_descriptor.device_id.device_uuid.uuid = str(item[3]) + kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4]) + kpi_descriptor.service_id.service_uuid.uuid = str(item[5]) - device_client = DeviceClient() - device_client.MonitorDeviceKpi(monitor_device_request) + kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor) + return kpi_descriptor_list except ServiceException as e: - LOGGER.exception('MonitorKpi exception') - # CREATEKPI_COUNTER_FAILED.inc() + LOGGER.exception('GetKpiDescriptorList exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover - LOGGER.exception('MonitorKpi exception') - grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) - # CREATEKPI_COUNTER_FAILED.inc() - - return Empty() + LOGGER.exception('GetKpiDescriptorList exception') - # rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {} - def IncludeKpi(self, request : Kpi, grpc_context : grpc.ServicerContext) -> Empty: + def IncludeKpi(self, request: Kpi, grpc_context: grpc.ServicerContext) -> Empty: LOGGER.info('IncludeKpi') try: + kpi_id = request.kpi_id.kpi_id.uuid kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) - if kpiDescriptor is None: - LOGGER.warning('Ignoring sample with KPIId({:s}): not found in database'.format(str(request.kpi_id))) - return Empty() - - kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') - kpiId = request.kpi_id.kpi_id.uuid - deviceId = kpiDescriptor.device_id.device_uuid.uuid - endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid - serviceId = kpiDescriptor.service_id.service_uuid.uuid - time_stamp = request.timestamp.timestamp - kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) - # Build the structure to be included as point in the MetricsDB - self.metrics_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) - - #self.influx_db.read_KPI_points() + if kpiDescriptor is None: + LOGGER.info('IncludeKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) + else: + kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') + kpiId = kpi_id + deviceId = kpiDescriptor.device_id.device_uuid.uuid + endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid + serviceId = kpiDescriptor.service_id.service_uuid.uuid + time_stamp = request.timestamp.timestamp + kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) + + # Build the structure to be included as point in the MetricsDB + self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, kpi_value) + return Empty() except ServiceException as e: LOGGER.exception('IncludeKpi exception') # CREATEKPI_COUNTER_FAILED.inc() @@ -176,98 +200,154 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): except Exception: # pragma: no cover LOGGER.exception('IncludeKpi exception') # CREATEKPI_COUNTER_FAILED.inc() - return Empty() - # def GetStreamKpi ( self, request, grpc_context : grpc.ServicerContext): - # - # LOGGER.info('GetStreamKpi') - # yield monitoring_pb2.Kpi() - # - # @MONITORING_GETINSTANTKPI_REQUEST_TIME.time() - # def GetInstantKpi ( self, request, grpc_context : grpc.ServicerContext): - # - # LOGGER.info('GetInstantKpi') - # return monitoring_pb2.Kpi() + def MonitorKpi(self, request: MonitorKpiRequest, grpc_context: grpc.ServicerContext) -> Empty: - - def GetKpiDescriptor(self, request : KpiId, grpc_context : grpc.ServicerContext) -> KpiDescriptor: - LOGGER.info('getting Kpi by KpiID') + LOGGER.info('MonitorKpi') try: - kpi_db = self.sql_db.get_KPI(int(request.kpi_id.uuid)) - #LOGGER.info('sql_db.get_KPIS={:s}'.format(str(self.sql_db.get_KPIS()))) - #LOGGER.info('kpi_db={:s}'.format(str(kpi_db))) - if kpi_db is None: return None - - kpiDescriptor = KpiDescriptor() - - kpiDescriptor.kpi_description = kpi_db[1] - kpiDescriptor.kpi_sample_type = kpi_db[2] - kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) - kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) - kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) - - return kpiDescriptor + kpi_id = int(request.kpi_id.kpi_id.uuid) + kpi = self.management_db.get_KPI(kpi_id) + response = Empty() + + if kpi: + # Sets the request to send to the device service + monitor_device_request = MonitoringSettings() + + kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) + + monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor) + monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid + monitor_device_request.sampling_duration_s = request.monitoring_window_s + monitor_device_request.sampling_interval_s = request.sampling_rate_s + + device_client = DeviceClient() + device_client.MonitorDeviceKpi(monitor_device_request) + else: + LOGGER.info('MonitorKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) + return response except ServiceException as e: - LOGGER.exception('GetKpiDescriptor exception') + LOGGER.exception('MonitorKpi exception') + # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('MonitorKpi exception') + grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) + # CREATEKPI_COUNTER_FAILED.inc() - except Exception: # pragma: no cover - LOGGER.exception('GetKpiDescriptor exception') - - def QueryKpiData ( self, request : KpiQuery, grpc_context : grpc.ServicerContext) -> KpiList: + def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> KpiList: LOGGER.info('QueryKpiData') try: - # TBC - return KpiQuery() + # TBC + return KpiList() except ServiceException as e: LOGGER.exception('QueryKpiData exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('QueryKpiData exception') - def SubscribeKpi ( self, request : SubsDescriptor, grpc_context : grpc.ServicerContext) -> KpiList: + def SetKpiSubscription(self, request: SubsDescriptor, grpc_context: grpc.ServicerContext) -> SubsResponse: LOGGER.info('SubscribeKpi') try: - # TBC - yield KpiList() + + subs_queue = Queue() + subs_response = SubsResponse() + + kpi_id = request.kpi_id.kpi_id.uuid + sampling_duration_s = request.sampling_duration_s + sampling_interval_s = request.sampling_interval_s + start_timestamp = request.start_timestamp.timestamp + end_timestamp = request.end_timestamp.timestamp + + subscriber = "localhost" # Investigate how to get info from the requester + + subs_id = self.management_db.insert_subscription(kpi_id, subscriber, sampling_duration_s, + sampling_interval_s, start_timestamp, end_timestamp) + self.subs_manager.create_subscription(subs_queue, subs_id, kpi_id, sampling_interval_s, sampling_duration_s, + start_timestamp, end_timestamp) + + # parse queue to append kpis into the list + while not subs_queue.empty(): + list = subs_queue.get_nowait() + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = str(item[0]) + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] # This must be improved + subs_response.kpi_list.kpi.append(kpi) + + subs_response.subs_id.subs_id.uuid = str(subs_id) + + yield subs_response except ServiceException as e: LOGGER.exception('SubscribeKpi exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('SubscribeKpi exception') - - def GetSubsDescriptor ( self, request : SubscriptionID, grpc_context : grpc.ServicerContext) -> SubsDescriptor: + def GetSubsDescriptor(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> SubsDescriptor: LOGGER.info('GetSubsDescriptor') try: - # TBC - return SubsDescriptor() + subs_id = request.subs_id.uuid + subs_db = self.management_db.get_subscription(int(request.subs_id.uuid)) + response = SubsDescriptor() + if subs_db is None: + LOGGER.info('GetSubsDescriptor error: SubsID({:s}): not found in database'.format(str(subs_id))) + else: + LOGGER.debug(subs_db) + response.subs_id.subs_id.uuid = str(subs_db[0]) + response.kpi_id.kpi_id.uuid = str(subs_db[1]) + response.sampling_duration_s = subs_db[3] + response.sampling_interval_s = subs_db[4] + response.start_timestamp.timestamp = subs_db[5] + response.end_timestamp.timestamp = subs_db[6] + + return response except ServiceException as e: LOGGER.exception('GetSubsDescriptor exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetSubsDescriptor exception') - def GetSubscriptions ( self, request : Empty, grpc_context : grpc.ServicerContext) -> SubsIDList: + def GetSubscriptions(self, request: Empty, grpc_context: grpc.ServicerContext) -> SubsList: LOGGER.info('GetSubscriptions') try: - # TBC - return SubsIDList() + response = SubsList() + data = self.management_db.get_subscriptions() + + for subs_db in data: + subs_descriptor = SubsDescriptor() + + subs_descriptor.subs_id.subs_id.uuid = str(subs_db[0]) + subs_descriptor.kpi_id.kpi_id.uuid = str(subs_db[1]) + subs_descriptor.sampling_duration_s = subs_db[3] + subs_descriptor.sampling_interval_s = subs_db[4] + subs_descriptor.start_timestamp.timestamp = subs_db[5] + subs_descriptor.end_timestamp.timestamp = subs_db[6] + + response.subs_descriptor.append(subs_descriptor) + + return response except ServiceException as e: LOGGER.exception('GetSubscriptions exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetSubscriptions exception') - def DeleteSubscription ( self, request : SubscriptionID, grpc_context : grpc.ServicerContext) -> Empty: + def DeleteSubscription(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> Empty: LOGGER.info('DeleteSubscription') try: - # TBC + LOGGER.debug(f'DeleteSubscription with SubsID: {request.subs_id.uuid}') + subs_id = int(request.subs_id.uuid) + subs_db = self.management_db.get_subscription(int(request.subs_id.uuid)) + if subs_db: + self.management_db.delete_subscription(subs_id) + else: + LOGGER.info('DeleteSubscription error: SubsID({:s}): not found in database'.format(str(subs_id))) return Empty() except ServiceException as e: LOGGER.exception('DeleteSubscription exception') @@ -275,63 +355,211 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): except Exception as e: # pragma: no cover LOGGER.exception('DeleteSubscription exception') - def SetKpiAlarm ( self, request : AlarmDescriptor, grpc_context : grpc.ServicerContext) -> AlarmResponse: + def SetKpiAlarm(self, request: AlarmDescriptor, grpc_context: grpc.ServicerContext) -> AlarmResponse: LOGGER.info('SetKpiAlarm') try: - # TBC - return AlarmResponse() + response = AlarmID() + + alarm_description = request.alarm_description + alarm_name = request.name + kpi_id = request.kpi_id.kpi_id.uuid + kpi_min_value = request.kpi_value_range.kpiMinValue.floatVal + kpi_max_value = request.kpi_value_range.kpiMaxValue.floatVal + in_range = request.kpi_value_range.inRange + include_min_value = request.kpi_value_range.includeMinValue + include_max_value = request.kpi_value_range.includeMaxValue + timestamp = request.timestamp.timestamp + + LOGGER.debug(f"request.AlarmID: {request.alarm_id.alarm_id.uuid}") + + if request.alarm_id.alarm_id.uuid is not "": + alarm_id = request.alarm_id.alarm_id.uuid + # Here the code to modify an existing alarm + else: + alarm_id = self.management_db.insert_alarm(alarm_description, alarm_name, kpi_id, kpi_min_value, + kpi_max_value, + in_range, include_min_value, include_max_value) + LOGGER.debug(f"AlarmID: {alarm_id}") + response.alarm_id.uuid = str(alarm_id) + + return response except ServiceException as e: LOGGER.exception('SetKpiAlarm exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('SetKpiAlarm exception') - - def GetAlarms ( self, request : Empty, grpc_context : grpc.ServicerContext) -> AlarmIDList: + def GetAlarms(self, request: Empty, grpc_context: grpc.ServicerContext) -> AlarmList: LOGGER.info('GetAlarms') try: - # TBC - return AlarmIDList() + response = AlarmList() + data = self.management_db.get_alarms() + + for alarm in data: + alarm_descriptor = AlarmDescriptor() + + alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm[0]) + alarm_descriptor.alarm_description = alarm[1] + alarm_descriptor.name = alarm[2] + alarm_descriptor.kpi_id.kpi_id.uuid = str(alarm[3]) + alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = alarm[4] + alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = alarm[5] + alarm_descriptor.kpi_value_range.inRange = bool(alarm[6]) + alarm_descriptor.kpi_value_range.includeMinValue = bool(alarm[7]) + alarm_descriptor.kpi_value_range.includeMaxValue = bool(alarm[8]) + + response.alarm_descriptor.append(alarm_descriptor) + + return response except ServiceException as e: LOGGER.exception('GetAlarms exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetAlarms exception') - def GetAlarmDescriptor ( self, request : AlarmID, grpc_context : grpc.ServicerContext) -> AlarmDescriptor: + def GetAlarmDescriptor(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> AlarmDescriptor: LOGGER.info('GetAlarmDescriptor') try: - # TBC - return AlarmDescriptor() + alarm_id = request.alarm_id.uuid + alarm = self.management_db.get_alarm(alarm_id) + response = AlarmDescriptor() + + if alarm: + LOGGER.debug(f"{alarm}") + response.alarm_id.alarm_id.uuid = str(alarm_id) + response.alarm_description = alarm[1] + response.name = alarm[2] + response.kpi_id.kpi_id.uuid = str(alarm[3]) + response.kpi_value_range.kpiMinValue.floatVal = alarm[4] + response.kpi_value_range.kpiMaxValue.floatVal = alarm[5] + response.kpi_value_range.inRange = bool(alarm[6]) + response.kpi_value_range.includeMinValue = bool(alarm[7]) + response.kpi_value_range.includeMaxValue = bool(alarm[8]) + else: + LOGGER.info('GetAlarmDescriptor error: AlarmID({:s}): not found in database'.format(str(alarm_id))) + response.alarm_id.alarm_id.uuid = "NoID" + return response except ServiceException as e: LOGGER.exception('GetAlarmDescriptor exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetAlarmDescriptor exception') - def GetAlarmResponseStream(self, request : AlarmSubscription, grpc_context : grpc.ServicerContext) -> Iterator[AlarmResponse]: + def GetAlarmResponseStream(self, request: AlarmSubscription, grpc_context: grpc.ServicerContext) -> Iterator[ + AlarmResponse]: LOGGER.info('GetAlarmResponseStream') try: - # TBC - yield AlarmResponse() + alarm_id = request.alarm_id.alarm_id.uuid + alarm = self.management_db.get_alarm(alarm_id) + alarm_response = AlarmResponse() + + if alarm: + + alarm_queue = Queue() + + alarm_data = self.management_db.get_alarm(alarm) + + alarm_id = request.alarm_id.alarm_id.uuid + kpi_id = alarm_data[3] + kpiMinValue = alarm_data[4] + kpiMaxValue = alarm_data[5] + inRange = alarm_data[6] + includeMinValue = alarm_data[7] + includeMaxValue = alarm_data[8] + subscription_frequency_ms = request.subscription_frequency_ms + subscription_timeout_s = request.subscription_timeout_s + + self.alarm_manager.create_alarm(alarm_queue, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, + includeMinValue, includeMaxValue, subscription_frequency_ms, + subscription_timeout_s) + + while not alarm_queue.empty(): + list = alarm_queue.get_nowait() + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = str(item[0]) + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] # This must be improved + alarm_response.kpi_list.kpi.append(kpi) + + alarm_response.alarm_id.alarm_id.uuid = alarm_id + + yield alarm_response + else: + LOGGER.info('GetAlarmResponseStream error: AlarmID({:s}): not found in database'.format(str(alarm_id))) + alarm_response.alarm_id.alarm_id.uuid = "NoID" + return alarm_response except ServiceException as e: LOGGER.exception('GetAlarmResponseStream exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetAlarmResponseStream exception') - def DeleteAlarm ( self, request : AlarmID, grpc_context : grpc.ServicerContext) -> Empty: + def DeleteAlarm(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> Empty: LOGGER.info('DeleteAlarm') try: - # TBC - return Empty() + LOGGER.debug(f'DeleteAlarm with AlarmID: {request.alarm_id.uuid}') + alarm_id = int(request.alarm_id.uuid) + alarm = self.management_db.get_alarm(alarm_id) + response = Empty() + if alarm: + self.alarm_manager.delete_alarm(alarm_id) + self.management_db.delete_alarm(alarm_id) + else: + LOGGER.info('DeleteAlarm error: AlarmID({:s}): not found in database'.format(str(alarm_id))) + return response except ServiceException as e: LOGGER.exception('DeleteAlarm exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('DeleteAlarm exception') + + def GetStreamKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Iterator[Kpi]: + + LOGGER.info('GetStreamKpi') + + kpi_id = request.kpi_id.uuid + kpi_db = self.management_db.get_KPI(int(kpi_id)) + response = Kpi() + if kpi_db is None: + LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) + response.kpi_id.kpi_id.uuid = "NoID" + return response + else: + yield response + + @MONITORING_GETINSTANTKPI_REQUEST_TIME.time() + def GetInstantKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Kpi: + + LOGGER.info('GetInstantKpi') + try: + kpi_id = request.kpi_id.uuid + response = Kpi() + if kpi_id is "": + LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) + response.kpi_id.kpi_id.uuid = "NoID" + else: + query = f"SELECT kpi_id, timestamp, kpi_value FROM monitoring WHERE kpi_id = '{kpi_id}' " \ + f"LATEST ON timestamp PARTITION BY kpi_id" + data = self.metrics_db.run_query(query)[0] + LOGGER.debug(data) + + response.kpi_id.kpi_id.uuid = str(data[0]) + response.timestamp.timestamp = timestamp_string_to_float(data[1]) + response.kpi_value.floatVal = data[2] # This must be improved + + return response + except ServiceException as e: + LOGGER.exception('SetKpi exception') + # CREATEKPI_COUNTER_FAILED.inc() + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('SetKpi exception') + # CREATEKPI_COUNTER_FAILED.inc() + grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) + diff --git a/src/monitoring/service/SqliteTools.py b/src/monitoring/service/SqliteTools.py deleted file mode 100644 index 092d07e9b961e98a91bb244bcc992c701ad3cd72..0000000000000000000000000000000000000000 --- a/src/monitoring/service/SqliteTools.py +++ /dev/null @@ -1,73 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sqlite3 as sl - -class SQLite(): - def __init__(self, database): - self.client = sl.connect(database, check_same_thread=False) - self.client.execute(""" - CREATE TABLE IF NOT EXISTS KPI( - kpi_id INTEGER PRIMARY KEY AUTOINCREMENT, - kpi_description TEXT, - kpi_sample_type INTEGER, - device_id INTEGER, - endpoint_id INTEGER, - service_id INTEGER - ); - """) - - def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id ): - c = self.client.cursor() - c.execute("SELECT kpi_id FROM KPI WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ?",(device_id,kpi_sample_type,endpoint_id)) - data=c.fetchone() - if data is None: - c.execute("INSERT INTO KPI (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id)) - self.client.commit() - return c.lastrowid - else: - return data[0] - - def delete_KPI(self,device_id,kpi_sample_type): - c = self.client.cursor() - c.execute("SELECT kpi_id FROM KPI WHERE device_id is ? AND kpi_sample_type is ?",(device_id,kpi_sample_type)) - data=c.fetchone() - if data is None: - return False - else: - c.execute("DELETE FROM KPI WHERE device_id is ? AND kpi_sample_type is ?",(device_id,kpi_sample_type)) - self.client.commit() - return True - - def delete_kpid_id(self,kpi_id): - c = self.client.cursor() - c.execute("SELECT * FROM KPI WHERE kpi_id is ?",(kpi_id,)) - data=c.fetchone() - if data is None: - return False - else: - c.execute("DELETE FROM KPI WHERE kpi_id is ?",(kpi_id,)) - self.client.commit() - return True - - def get_KPI(self,kpi_id): - data = self.client.execute("SELECT * FROM KPI WHERE kpi_id is ?",(kpi_id,)) - return data.fetchone() - - def get_KPIS(self): - data = self.client.execute("SELECT * FROM KPI") - #print("\n") - #for row in data: - # print(row) - return data.fetchall() \ No newline at end of file diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py new file mode 100644 index 0000000000000000000000000000000000000000..fe27d6ee365676b05175b762a106621121e3b897 --- /dev/null +++ b/src/monitoring/service/SubscriptionManager.py @@ -0,0 +1,55 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import pytz +from apscheduler.executors.pool import ProcessPoolExecutor +from apscheduler.schedulers.background import BackgroundScheduler + +from datetime import datetime +import time + + +LOGGER = logging.getLogger(__name__) + +class SubscriptionManager(): + def __init__(self, metrics_db): + self.metrics_db = metrics_db + self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) + self.scheduler.start() + + def create_subscription(self,subs_queue, subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): + start_date = None + end_date = None + if sampling_duration_s: + if not start_timestamp: + start_timestamp = time.time() + end_timestamp = start_timestamp + sampling_duration_s + if start_timestamp: + start_date = datetime.utcfromtimestamp(start_timestamp).isoformat() + if end_timestamp: + end_date = datetime.utcfromtimestamp(end_timestamp).isoformat() + + LOGGER.debug(f"kpi_id: {kpi_id}") + LOGGER.debug(f"sampling_interval_s: {sampling_interval_s}") + LOGGER.debug(f"subscription_id: {subscription_id}") + LOGGER.debug(f"start_date: {start_date}") + self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s), + trigger='interval', seconds=sampling_interval_s, start_date=start_date, + end_date=end_date, timezone=pytz.utc, id=str(subscription_id)) + LOGGER.debug(f"Subscrition job {subscription_id} succesfully created") + + def delete_subscription(self, subscription_id): + self.scheduler.remove_job(subscription_id) \ No newline at end of file diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index cf81ceed1e134240415ec1aabe8796cd4486f75f..845153856c44cec0576bd6f11b045e3310558a97 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import datetime +from random import random from common.proto import monitoring_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType -from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float - +from common.tools.timestamp.Converters import timestamp_utcnow_to_float def kpi_id(): _kpi_id = monitoring_pb2.KpiId() @@ -32,6 +32,24 @@ def create_kpi_request(): _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member return _create_kpi_request +def create_kpi_request_b(): + _create_kpi_request = monitoring_pb2.KpiDescriptor() + _create_kpi_request.kpi_description = 'KPI Description Test' + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED + _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member + _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member + return _create_kpi_request + +def create_kpi_request_c(): + _create_kpi_request = monitoring_pb2.KpiDescriptor() + _create_kpi_request.kpi_description = 'KPI Description Test' + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED + _create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member + _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member + return _create_kpi_request + def monitor_kpi_request(kpi_uuid, monitoring_window_s, sampling_rate_s): _monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() _monitor_kpi_request.kpi_id.kpi_id.uuid = kpi_uuid # pylint: disable=maybe-no-member @@ -43,5 +61,66 @@ def include_kpi_request(kpi_id): _include_kpi_request = monitoring_pb2.Kpi() _include_kpi_request.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid _include_kpi_request.timestamp.timestamp = timestamp_utcnow_to_float() - _include_kpi_request.kpi_value.int32Val = 500 # pylint: disable=maybe-no-member + _include_kpi_request.kpi_value.floatVal = 500*random() # pylint: disable=maybe-no-member return _include_kpi_request + +def kpi_descriptor_list(): + _kpi_descriptor_list = monitoring_pb2.KpiDescriptorList() + + return _kpi_descriptor_list + +def kpi_query(): + _kpi_query = monitoring_pb2.KpiQuery() + + return _kpi_query + +def subs_descriptor(kpi_id): + _subs_descriptor = monitoring_pb2.SubsDescriptor() + + _subs_descriptor.subs_id.subs_id.uuid = "" + _subs_descriptor.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid + _subs_descriptor.sampling_duration_s = 10 + _subs_descriptor.sampling_interval_s = 2 + _subs_descriptor.start_timestamp.timestamp = timestamp_utcnow_to_float() + _subs_descriptor.end_timestamp.timestamp = timestamp_utcnow_to_float() + 10 + + return _subs_descriptor + +def subs_id(): + _subs_id = monitoring_pb2.SubsDescriptor() + + return _subs_id + +def alarm_descriptor(): + _alarm_descriptor = monitoring_pb2.AlarmDescriptor() + + _alarm_descriptor.alarm_description = "Alarm Description" + _alarm_descriptor.name = "Alarm Name" + _alarm_descriptor.kpi_id.kpi_id.uuid = "1" + _alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = 0.0 + _alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = 50.0 + _alarm_descriptor.kpi_value_range.inRange = True + _alarm_descriptor.kpi_value_range.includeMinValue = False + _alarm_descriptor.kpi_value_range.includeMaxValue = True + + return _alarm_descriptor + +def alarm_descriptor_b(): + _alarm_descriptor = monitoring_pb2.AlarmDescriptor() + + _alarm_descriptor.kpi_id.kpi_id.uuid = "2" + + return _alarm_descriptor + +def alarm_subscription(alarm_id): + _alarm_descriptor = monitoring_pb2.AlarmSubscription() + + _alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm_id) + + return _alarm_descriptor + + +def alarm_id(): + _alarm_id = monitoring_pb2.AlarmID() + + return _alarm_id \ No newline at end of file diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index b62b5f97f965beb75ddaafa122ac8f026faab686..ee6a29e8a483fe53c58a6e6d2e3aa240f2456b81 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -13,8 +13,15 @@ # limitations under the License. import copy, os, pytest +import threading +import time from time import sleep from typing import Tuple + +from apscheduler.executors.pool import ProcessPoolExecutor +from apscheduler.schedulers.background import BackgroundScheduler +from grpc._channel import _MultiThreadedRendezvous + from common.Constants import ServiceNameEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) @@ -24,7 +31,9 @@ from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBack from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.MessageBroker import MessageBroker from common.proto import monitoring_pb2 -from common.proto.monitoring_pb2 import KpiId, KpiDescriptor +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \ + AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse from context.client.ContextClient import ContextClient from context.service.grpc_server.ContextService import ContextService @@ -38,19 +47,17 @@ from device.service.driver_api.DriverInstanceCache import DriverInstanceCache os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE' from device.service.drivers import DRIVERS # pylint: disable=wrong-import-position -# pylint: disable=wrong-import-position from monitoring.client.MonitoringClient import MonitoringClient -from common.proto.kpi_sample_types_pb2 import KpiSampleType -from monitoring.service import SqliteTools, MetricsDBTools +from monitoring.service import ManagementDBTools, MetricsDBTools from monitoring.service.MonitoringService import MonitoringService from monitoring.service.EventTools import EventsDeviceCollector -from monitoring.tests.Messages import create_kpi_request, include_kpi_request, monitor_kpi_request +from monitoring.tests.Messages import create_kpi_request, include_kpi_request, monitor_kpi_request, \ + create_kpi_request_b, create_kpi_request_c, kpi_query, subs_descriptor, alarm_descriptor, \ + alarm_subscription from monitoring.tests.Objects import DEVICE_DEV1, DEVICE_DEV1_CONNECT_RULES, DEVICE_DEV1_UUID from monitoring.service.MonitoringServiceServicerImpl import LOGGER -# LOGGER = getJSONLogger('monitoringservice-server') -# LOGGER.setLevel('DEBUG') ########################### # Tests Setup @@ -151,9 +158,9 @@ def monitoring_client(monitoring_service : MonitoringService): # pylint: disable _client.close() @pytest.fixture(scope='session') -def sql_db(): - _sql_db = SqliteTools.SQLite('monitoring.db') - return _sql_db +def management_db(): + _management_db = ManagementDBTools.ManagementDB('monitoring.db') + return _management_db @pytest.fixture(scope='session') def metrics_db(): @@ -161,7 +168,21 @@ def metrics_db(): METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE) return _metrics_db +@pytest.fixture(scope='session') +def subs_scheduler(): + _scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) + _scheduler.start() + + return _scheduler + +def ingestion_data(monitoring_client): + _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _include_kpi_request = include_kpi_request(_kpi_id) + for i in range(200): + _include_kpi_request = include_kpi_request(_kpi_id) + monitoring_client.IncludeKpi(_include_kpi_request) + time.sleep(0.01) ########################### # Tests Implementation @@ -173,8 +194,44 @@ def test_set_kpi(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_create_kpi requesting') response = monitoring_client.SetKpi(create_kpi_request()) LOGGER.debug(str(response)) + response = monitoring_client.SetKpi(create_kpi_request_b()) + LOGGER.debug(str(response)) assert isinstance(response, KpiId) + +# Test case that makes use of client fixture to test server's DeleteKpi method +def test_delete_kpi(monitoring_client): # pylint: disable=redefined-outer-name + # make call to server + LOGGER.warning('delete_kpi requesting') + response = monitoring_client.SetKpi(create_kpi_request_b()) + response = monitoring_client.DeleteKpi(response) + LOGGER.debug(str(response)) + assert isinstance(response, Empty) + +# Test case that makes use of client fixture to test server's GetKpiDescriptor method +def test_get_kpidescritor(monitoring_client): # pylint: disable=redefined-outer-name + LOGGER.warning('test_getkpidescritor_kpi begin') + response = monitoring_client.SetKpi(create_kpi_request_c()) + response = monitoring_client.GetKpiDescriptor(response) + LOGGER.debug(str(response)) + assert isinstance(response, KpiDescriptor) + +# Test case that makes use of client fixture to test server's GetKpiDescriptor method +def test_get_kpi_descriptor_list(monitoring_client): # pylint: disable=redefined-outer-name + LOGGER.warning('test_getkpidescritor_kpi begin') + response = monitoring_client.GetKpiDescriptorList(Empty()) + LOGGER.debug(str(response)) + assert isinstance(response, KpiDescriptorList) + +# Test case that makes use of client fixture to test server's IncludeKpi method +def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name + # make call to server + LOGGER.warning('test_include_kpi requesting') + kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + response = monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) + LOGGER.debug(str(response)) + assert isinstance(response, Empty) + # Test case that makes use of client fixture to test server's MonitorKpi method def test_monitor_kpi( context_client : ContextClient, # pylint: disable=redefined-outer-name @@ -210,13 +267,105 @@ def test_monitor_kpi( LOGGER.debug(str(response)) assert isinstance(response, Empty) - -# Test case that makes use of client fixture to test server's IncludeKpi method -def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name - # make call to server - LOGGER.warning('test_include_kpi requesting') - kpi_id = monitoring_client.SetKpi(create_kpi_request()) - response = monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) +# Test case that makes use of client fixture to test server's QueryKpiData method +def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-name + LOGGER.warning('test_query_kpi_data') + response = monitoring_client.QueryKpiData(kpi_query()) + LOGGER.debug(str(response)) + assert isinstance(response, KpiList) + +def test_ingestion_data(monitoring_client): + _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _include_kpi_request = include_kpi_request(_kpi_id) + + for i in range(100): + _include_kpi_request = include_kpi_request(_kpi_id) + monitoring_client.IncludeKpi(_include_kpi_request) + time.sleep(0.01) + +# def test_subscription_scheduler(monitoring_client,metrics_db,subs_scheduler): +# subs_scheduler.add_job(ingestion_data(monitoring_client),id="1") + +# Test case that makes use of client fixture to test server's SetKpiSubscription method +def test_set_kpi_subscription(monitoring_client,metrics_db): # pylint: disable=redefined-outer-name + LOGGER.warning('test_set_kpi_subscription') + kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + # thread = threading.Thread(target=test_ingestion_data, args=(monitoring_client,metrics_db)) + # thread.start() + monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) + response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) + assert isinstance(response, _MultiThreadedRendezvous) + LOGGER.debug(response) + for item in response: + LOGGER.debug(item) + assert isinstance(item, SubsResponse) + +# Test case that makes use of client fixture to test server's GetSubsDescriptor method +def test_get_subs_descriptor(monitoring_client): + LOGGER.warning('test_get_subs_descriptor') + kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) + response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) + for item in response: + response = monitoring_client.GetSubsDescriptor(item.subs_id) + LOGGER.debug(response) + assert isinstance(response, SubsDescriptor) + +# Test case that makes use of client fixture to test server's GetSubscriptions method +def test_get_subscriptions(monitoring_client): + LOGGER.warning('test_get_subscriptions') + response = monitoring_client.GetSubscriptions(Empty()) + LOGGER.debug(response) + assert isinstance(response, SubsList) + +# Test case that makes use of client fixture to test server's DeleteSubscription method +def test_delete_subscription(monitoring_client): + LOGGER.warning('test_delete_subscription') + kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) + subs = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) + for item in subs: + response = monitoring_client.DeleteSubscription(item.subs_id) + assert isinstance(response, Empty) + +# Test case that makes use of client fixture to test server's SetKpiAlarm method +def test_set_kpi_alarm(monitoring_client): + LOGGER.warning('test_set_kpi_alarm') + response = monitoring_client.SetKpiAlarm(alarm_descriptor()) + LOGGER.debug(str(response)) + assert isinstance(response, AlarmID) + +# Test case that makes use of client fixture to test server's GetAlarms method +def test_get_alarms(monitoring_client): + LOGGER.warning('test_get_alarms') + response = monitoring_client.GetAlarms(Empty()) + LOGGER.debug(response) + assert isinstance(response, AlarmList) + +# Test case that makes use of client fixture to test server's GetAlarmDescriptor method +def test_get_alarm_descriptor(monitoring_client): + LOGGER.warning('test_get_alarm_descriptor') + alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) + response = monitoring_client.GetAlarmDescriptor(alarm_id) + LOGGER.debug(response) + assert isinstance(response, AlarmDescriptor) + +# Test case that makes use of client fixture to test server's GetAlarmResponseStream method +def test_get_alarm_response_stream(monitoring_client): + LOGGER.warning('test_get_alarm_descriptor') + alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) + response = monitoring_client.GetAlarmResponseStream(alarm_subscription(alarm_id)) + assert isinstance(response, _MultiThreadedRendezvous) + for item in response: + LOGGER.debug(response) + assert isinstance(item,AlarmResponse) + +# Test case that makes use of client fixture to test server's DeleteAlarm method +def test_delete_alarm(monitoring_client): + LOGGER.warning('test_delete_alarm') + alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) + response = monitoring_client.DeleteAlarm(alarm_id) + LOGGER.debug(type(response)) assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's GetStreamKpi method @@ -224,26 +373,22 @@ def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-na LOGGER.warning('test_getstream_kpi begin') response = monitoring_client.GetStreamKpi(monitoring_pb2.Kpi()) LOGGER.debug(str(response)) - #assert isinstance(response, Kpi) + assert isinstance(response, _MultiThreadedRendezvous) # Test case that makes use of client fixture to test server's GetInstantKpi method # def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name # LOGGER.warning('test_getinstant_kpi begin') -# response = monitoring_client.GetInstantKpi(kpi_id()) -# LOGGER.debug(str(response)) -# # assert isinstance(response, Kpi) - -# Test case that makes use of client fixture to test server's GetInstantKpi method -def test_get_kpidescritor_kpi(monitoring_client): # pylint: disable=redefined-outer-name - LOGGER.warning('test_getkpidescritor_kpi begin') - response = monitoring_client.SetKpi(create_kpi_request()) - # LOGGER.debug(str(response)) - response = monitoring_client.GetKpiDescriptor(response) - # LOGGER.debug(str(response)) - assert isinstance(response, KpiDescriptor) - -def test_sqlitedb_tools_insert_kpi(sql_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_sqlitedb_tools_insert_kpi begin') +# kpi_id = monitoring_client.SetKpi(KpiId()) +# monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) +# sleep(0.3) +# response = monitoring_client.GetInstantKpi(kpi_id) +# LOGGER.debug(response) +# assert isinstance(response, Kpi) + # response = monitoring_client.GetInstantKpi(KpiId()) + # LOGGER.debug(type(response)) + # assert response.kpi_id.kpi_id.uuid == "NoID" +def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name + LOGGER.warning('test_managementdb_tools_insert_kpi begin') _create_kpi_request = create_kpi_request() kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member @@ -251,11 +396,11 @@ def test_sqlitedb_tools_insert_kpi(sql_db): # pylint: disable=redefined-outer-na kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member - response = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) assert isinstance(response, int) -def test_sqlitedb_tools_get_kpi(sql_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_sqlitedb_tools_get_kpi begin') +def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined-outer-name + LOGGER.warning('test_managementdb_tools_get_kpi begin') _create_kpi_request = create_kpi_request() kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member @@ -263,52 +408,32 @@ def test_sqlitedb_tools_get_kpi(sql_db): # pylint: disable=redefined-outer-name kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member - _kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) - response = sql_db.get_KPI(_kpi_id) + _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + response = management_db.get_KPI(_kpi_id) assert isinstance(response, tuple) -def test_sqlitedb_tools_get_kpis(sql_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_sqlitedb_tools_get_kpis begin') - response = sql_db.get_KPIS() +def test_managementdb_tools_get_kpis(management_db): # pylint: disable=redefined-outer-name + LOGGER.warning('test_managementdb_tools_get_kpis begin') + response = management_db.get_KPIS() assert isinstance(response, list) -def test_sqlitedb_tools_delete_kpi(sql_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_sqlitedb_tools_get_kpi begin') - - response = sql_db.delete_KPI("DEV1",KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED) - - if not response: - _create_kpi_request = create_kpi_request() - kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member - kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member - kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member - kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member - kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member +def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefined-outer-name + LOGGER.warning('test_managementdb_tools_get_kpi begin') - sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) - response = sql_db.delete_KPI("DEV1", KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED) - - assert response - -def test_sqlitedb_tools_delete_kpid_id(sql_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_sqlitedb_tools_delete_kpid_id begin') - - response = sql_db.delete_kpid_id(1) + _create_kpi_request = create_kpi_request() + kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member + kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member + kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member + kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member + kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member - if not response: - _create_kpi_request = create_kpi_request() - kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member - kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member - kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member - kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member - kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, + kpi_service_id) - _kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) - response = sql_db.delete_kpid_id(_kpi_id) + response = management_db.delete_KPI(_kpi_id) assert response - def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_metric_sdb_tools_write_kpi begin')