Commit 3b02043c authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/monitoring' into 'develop'

Feat/monitoring major functional updates

See merge request !14
parents 07965010 35fd8b25
Loading
Loading
Loading
Loading
+29 −30
Original line number Diff line number Diff line
@@ -19,24 +19,24 @@ import "context.proto";
import "kpi_sample_types.proto";

service MonitoringService {
  rpc SetKpi                (KpiDescriptor      ) returns (KpiId               ) {}
  rpc DeleteKpi             (KpiId              ) returns (context.Empty       ) {}
  rpc GetKpiDescriptor      (KpiId              ) returns (KpiDescriptor       ) {}
  rpc GetKpiDescriptorList  (context.Empty      ) returns (KpiDescriptorList   ) {}
  rpc IncludeKpi            (Kpi                ) returns (context.Empty       ) {}
  rpc MonitorKpi            (MonitorKpiRequest  ) returns (context.Empty       ) {}
  rpc QueryKpiData          (KpiQuery           ) returns (KpiList             ) {}
  rpc SetKpiSubscription    (SubsDescriptor     ) returns (stream KpiList      ) {}
  rpc GetSubsDescriptor     (SubscriptionID     ) returns (SubsDescriptor      ) {}
  rpc GetSubscriptions      (context.Empty      ) returns (SubsIDList          ) {}
  rpc DeleteSubscription    (SubscriptionID     ) returns (context.Empty       ) {}
  rpc SetKpiAlarm           (AlarmDescriptor    ) returns (AlarmID             ) {}
  rpc GetAlarms             (context.Empty      ) returns (AlarmIDList         ) {}
  rpc GetAlarmDescriptor    (AlarmID            ) returns (AlarmDescriptor     ) {}
  rpc GetAlarmResponseStream(AlarmSubscription  ) returns (stream AlarmResponse) {}
  rpc DeleteAlarm           (AlarmID            ) returns (context.Empty       ) {}
  rpc GetStreamKpi          (KpiId              ) returns (stream Kpi          ) {}
  rpc GetInstantKpi         (KpiId              ) returns (KpiList             ) {}
  rpc SetKpi                (KpiDescriptor      ) returns (KpiId               ) {} // Stable not final
  rpc DeleteKpi             (KpiId              ) returns (context.Empty       ) {} // Stable and final
  rpc GetKpiDescriptor      (KpiId              ) returns (KpiDescriptor       ) {} // Stable and final
  rpc GetKpiDescriptorList  (context.Empty      ) returns (KpiDescriptorList   ) {} // Stable and final
  rpc IncludeKpi            (Kpi                ) returns (context.Empty       ) {} // Stable and final
  rpc MonitorKpi            (MonitorKpiRequest  ) returns (context.Empty       ) {} // Stable and final
  rpc QueryKpiData          (KpiQuery           ) returns (KpiList             ) {} // Not implemented
  rpc SetKpiSubscription    (SubsDescriptor     ) returns (stream SubsResponse ) {} // Stable not final
  rpc GetSubsDescriptor     (SubscriptionID     ) returns (SubsDescriptor      ) {} // Stable and final
  rpc GetSubscriptions      (context.Empty      ) returns (SubsList            ) {} // Stable and final
  rpc DeleteSubscription    (SubscriptionID     ) returns (context.Empty       ) {} // Stable and final
  rpc SetKpiAlarm           (AlarmDescriptor    ) returns (AlarmID             ) {} // Stable not final
  rpc GetAlarms             (context.Empty      ) returns (AlarmList           ) {} // Stable and final
  rpc GetAlarmDescriptor    (AlarmID            ) returns (AlarmDescriptor     ) {} // Stable and final
  rpc GetAlarmResponseStream(AlarmSubscription  ) returns (stream AlarmResponse) {} // Not Stable not final
  rpc DeleteAlarm           (AlarmID            ) returns (context.Empty       ) {} // Stable and final
  rpc GetStreamKpi          (KpiId              ) returns (stream Kpi          ) {} // Stable not final
  rpc GetInstantKpi         (KpiId              ) returns (Kpi             ) {} // Stable not final
}

message KpiDescriptor {
@@ -58,7 +58,7 @@ message MonitorKpiRequest {
}

message KpiQuery {
  repeated KpiId    kpi_id              = 1;
  KpiId    kpi_id              = 1;
  float             monitoring_window_s = 2;
  float             sampling_rate_s     = 3;
  uint32            last_n_samples      = 4;  // used when you want something like "get the last N many samples
@@ -99,7 +99,7 @@ message KpiValue {


message KpiList {
  repeated Kpi kpi_list = 1;
  repeated Kpi kpi = 1;
}

message KpiDescriptorList {
@@ -122,19 +122,19 @@ message SubscriptionID {

message SubsResponse {
  SubscriptionID   subs_id  = 1;
  repeated KpiList kpi_list = 2;
  KpiList          kpi_list = 2;
}

message SubsIDList {
  repeated SubscriptionID subs_list = 1;
message SubsList {
  repeated SubsDescriptor subs_descriptor = 1;
}

message AlarmDescriptor {
  AlarmID                     alarm_id              = 1;
  string                      alarm_description     = 2;
  string                      name                  = 3;
  repeated KpiId              kpi_id                = 4;
  repeated KpiValueRange      kpi_value_range       = 5;
  KpiId                       kpi_id                = 4;
  KpiValueRange               kpi_value_range       = 5;
  context.Timestamp           timestamp             = 6;
}

@@ -143,7 +143,7 @@ message AlarmID{
}

message AlarmSubscription{
  AlarmID alarmID                   = 1;
  AlarmID alarm_id                  = 1;
  float   subscription_timeout_s    = 2;
  float   subscription_frequency_ms = 3;
}
@@ -151,10 +151,9 @@ message AlarmSubscription{
message AlarmResponse {
  AlarmID           alarm_id  = 1;
  string            text      = 2;
  KpiValue          kpi_value = 3;
  context.Timestamp timestamp = 4;
  KpiList           kpi_list  = 3;
}

message AlarmIDList {
    repeated AlarmID alarm_list = 1;
message AlarmList {
    repeated AlarmDescriptor alarm_descriptor = 1;
}
+8 −8
Original line number Diff line number Diff line
@@ -21,8 +21,8 @@ from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.proto.context_pb2 import Empty
from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest, \
    KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsIDList, \
    AlarmDescriptor, AlarmID, AlarmIDList, AlarmResponse, AlarmSubscription
    KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsList, \
    SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription
from common.proto.monitoring_pb2_grpc import MonitoringServiceStub

LOGGER = logging.getLogger(__name__)
@@ -100,10 +100,10 @@ class MonitoringClient:
        return response

    @RETRY_DECORATOR
    def SubscribeKpi(self, request : SubsDescriptor) -> Iterator[KpiList]:
        LOGGER.debug('SubscribeKpi: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.SubscribeKpi(request)
        LOGGER.debug('SubscribeKpi result: {:s}'.format(grpc_message_to_json_string(response)))
    def SetKpiSubscription(self, request : SubsDescriptor) -> Iterator[SubsResponse]:
        LOGGER.debug('SetKpiSubscription: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.SetKpiSubscription(request)
        LOGGER.debug('SetKpiSubscription result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
@@ -114,7 +114,7 @@ class MonitoringClient:
        return response

    @RETRY_DECORATOR
    def GetSubscriptions(self, request : Empty) -> SubsIDList:
    def GetSubscriptions(self, request : Empty) -> SubsList:
        LOGGER.debug('GetSubscriptions: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.GetSubscriptions(request)
        LOGGER.debug('GetSubscriptions result: {:s}'.format(grpc_message_to_json_string(response)))
@@ -135,7 +135,7 @@ class MonitoringClient:
        return response

    @RETRY_DECORATOR
    def GetAlarms(self, request : Empty) -> AlarmIDList:
    def GetAlarms(self, request : Empty) -> AlarmList:
        LOGGER.debug('GetAlarms: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.GetAlarms(request)
        LOGGER.debug('GetAlarms result: {:s}'.format(grpc_message_to_json_string(response)))
+1 −0
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ redis==4.1.2
requests==2.27.1
xmltodict==0.12.0
questdb==1.0.1
psycopg2-binary==2.9.3

# pip's dependency resolver does not take into account installed packages.
# p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one
+32 −0
Original line number Diff line number Diff line
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.jobstores.base import JobLookupError
from datetime import datetime
import time
import logging

LOGGER = logging.getLogger(__name__)

class AlarmManager():
    def __init__(self, metrics_db):
        self.metrics_db = metrics_db
        self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
        self.scheduler.start()
        LOGGER.info("Alarm Manager Initialized")

    def create_alarm(self, alarm_queue,alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms, subscription_timeout_s=None):
        start_date=None
        end_date=None
        if subscription_timeout_s:
            start_timestamp=time.time()
            start_date=datetime.fromtimestamp(start_timestamp)
            end_date=datetime.fromtimestamp(start_timestamp+subscription_timeout_s)
        self.scheduler.add_job(self.metrics_db.get_alarm_data, args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, end_date=end_date, id=alarm_id)
        LOGGER.debug(f"Alarm job {alarm_id} succesfully created")

    def delete_alarm(self, alarm_id):
        try:
            self.scheduler.remove_job(alarm_id)
            LOGGER.debug(f"Alarm job {alarm_id} succesfully deleted")
        except (Exception, JobLookupError) as e:
            LOGGER.debug(f"Alarm job {alarm_id} does not exists")
+248 −0
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sqlite3
import logging

LOGGER = logging.getLogger(__name__)

class ManagementDB():
    def __init__(self, database):
        try:
            self.client = sqlite3.connect(database, check_same_thread=False)
            self.create_monitoring_table()
            self.create_subscription_table()
            self.create_alarm_table()
            LOGGER.info("ManagementDB initialized")
        except:
            LOGGER.info("ManagementDB cannot be initialized")
            raise Exception("Critical error in the monitoring component")
        
    def create_monitoring_table(self):
        try:
            result=self.client.execute("""
                CREATE TABLE IF NOT EXISTS kpi(
                    kpi_id INTEGER PRIMARY KEY AUTOINCREMENT,
                    kpi_description TEXT,
                    kpi_sample_type INTEGER,
                    device_id INTEGER,
                    endpoint_id INTEGER,
                    service_id INTEGER
                );
            """)
            LOGGER.debug("KPI table created in the ManagementDB")
        except sqlite3.Error as e:
            LOGGER.debug(f"KPI table cannot be created in the ManagementD. {e}")
            raise Exception
    
    def create_subscription_table(self):
        try:
            result= self.client.execute("""
                CREATE TABLE IF NOT EXISTS subscription(
                    subs_id INTEGER PRIMARY KEY AUTOINCREMENT,
                    kpi_id INTEGER,
                    subscriber TEXT,
                    sampling_duration_s REAL,
                    sampling_interval_s REAL,
                    start_timestamp REAL,
                    end_timestamp REAL
                );
            """)
            LOGGER.info("Subscription table created in the ManagementDB")
        except sqlite3.Error as e:
            LOGGER.debug(f"Subscription table cannot be created in the ManagementDB. {e}")
            raise Exception

    def create_alarm_table(self):
        try:
            result=self.client.execute("""
                CREATE TABLE IF NOT EXISTS alarm(
                    alarm_id INTEGER PRIMARY KEY AUTOINCREMENT,
                    alarm_description TEXT,
                    alarm_name TEXT,
                    kpi_id INTEGER,
                    kpi_min_value REAL,
                    kpi_max_value REAL,
                    in_range INTEGER,
                    include_min_value INTEGER,
                    include_max_value INTEGER
                );
            """)
            LOGGER.info("Alarm table created in the ManagementDB")
        except sqlite3.Error as e:
            LOGGER.debug(f"Alarm table cannot be created in the ManagementDB. {e}")
            raise Exception

    def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id):
        try:
            c = self.client.cursor()
            c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id))
            data=c.fetchone()
            if data is None:
                c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id))
                self.client.commit()
                kpi_id = c.lastrowid
                LOGGER.debug(f"KPI {kpi_id} succesfully inserted in the ManagementDB")
                return kpi_id
            else:
                kpi_id = data[0]
                LOGGER.debug(f"KPI {kpi_id} already exists")
                return kpi_id
        except sqlite3.Error as e:
            LOGGER.debug("KPI cannot be inserted in the ManagementDB: {e}")
            
    def insert_subscription(self,kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp):
        try:
            c = self.client.cursor()
            c.execute("SELECT subs_id FROM subscription WHERE kpi_id is ? AND subscriber is ? AND sampling_duration_s is ? AND sampling_interval_s is ? AND start_timestamp is ? AND end_timestamp is ?",(kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp))
            data=c.fetchone()
            if data is None:
                c.execute("INSERT INTO subscription (kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp) VALUES (?,?,?,?,?,?)", (kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp, end_timestamp))
                self.client.commit()
                subs_id = c.lastrowid
                LOGGER.debug(f"Subscription {subs_id} succesfully inserted in the ManagementDB")
                return subs_id
            else:
                subs_id = data[0]
                LOGGER.debug(f"Subscription {subs_id} already exists")
                return subs_id
        except sqlite3.Error as e:
            LOGGER.debug("Subscription cannot be inserted in the ManagementDB: {e}")

    def insert_alarm(self,alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value):
        try:
            c = self.client.cursor()
            c.execute("SELECT alarm_id FROM alarm WHERE alarm_description is ? AND alarm_name is ? AND kpi_id is ? AND kpi_min_value is ? AND kpi_max_value is ? AND in_range is ? AND include_min_value is ? AND include_max_value is ?",(alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value))
            data=c.fetchone()
            if data is None:
                c.execute("INSERT INTO alarm (alarm_description, alarm_name, kpi_id, kpi_min_value, kpi_max_value, in_range, include_min_value, include_max_value) VALUES (?,?,?,?,?,?,?,?)", (alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value))
                self.client.commit()
                alarm_id=c.lastrowid
                LOGGER.debug(f"Alarm {alarm_id} succesfully inserted in the ManagementDB")
                return alarm_id
            else:
                alarm_id=data[0]
                LOGGER.debug(f"Alarm {alarm_id} already exists")
                return alarm_id
        except sqlite3.Error as e:
            LOGGER.debug(f"Alarm cannot be inserted in the ManagementDB: {e}")

    def delete_KPI(self,kpi_id):
        try:
            c = self.client.cursor()
            c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,))       
            data=c.fetchone()
            if data is None:
                LOGGER.debug(f"KPI {kpi_id} does not exists")
                return False
            else:
                c.execute("DELETE FROM kpi WHERE kpi_id is ?",(kpi_id,))
                self.client.commit()
                LOGGER.debug(f"KPI {kpi_id} deleted from the ManagementDB")
                return True
        except sqlite3.Error as e:
            LOGGER.debug(f"KPI cannot be deleted from the ManagementDB: {e}")

    def delete_subscription(self,subs_id):
        try:
            c = self.client.cursor()
            c.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,))       
            data=c.fetchone()
            if data is None:
                LOGGER.debug(f"Subscription {subs_id} does not exists")
                return False
            else:
                c.execute("DELETE FROM subscription WHERE subs_id is ?",(subs_id,))
                self.client.commit()
                LOGGER.debug(f"Subscription {subs_id} deleted from the ManagementDB")
                return True
        except sqlite3.Error as e:
            LOGGER.debug(f"Subscription cannot be deleted from the ManagementDB: {e}")

    def delete_alarm(self,alarm_id):
        try:
            c = self.client.cursor()
            c.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,))       
            data=c.fetchone()
            if data is None:
                LOGGER.debug(f"Alarm {alarm_id} does not exists")
                return False
            else:
                c.execute("DELETE FROM alarm WHERE alarm_id is ?",(alarm_id,))
                self.client.commit()
                LOGGER.debug(f"Alarm {alarm_id} deleted from the ManagementDB")
                return True
        except sqlite3.Error as e:
            LOGGER.debug(f"Alarm cannot be deleted from the ManagementDB: {e}")

    def get_KPI(self,kpi_id):
        try:
            data = self.client.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)).fetchone()
            if data:
                LOGGER.debug(f"KPI {kpi_id} succesfully retrieved from the ManagementDB")
                return data
            else:
                LOGGER.debug(f"KPI {kpi_id} does not exists")
                return data
        except sqlite3.Error as e:
            LOGGER.debug(f"KPI {kpi_id} cannot be retrieved from the ManagementDB: {e}")

    def get_subscription(self,subs_id):
        try:
            data = self.client.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,)).fetchone()
            if data:
                LOGGER.debug(f"Subscription {subs_id} succesfully retrieved from the ManagementDB")
                return data
            else:
                LOGGER.debug(f"Subscription {subs_id} does not exists")
                return data
        except sqlite3.Error as e:
            LOGGER.debug(f"Subscription {subs_id} cannot be retrieved from the ManagementDB: {e}")

    def get_alarm(self,alarm_id):
        try:
            data = self.client.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,)).fetchone()
            if data:
                LOGGER.debug(f"Alarm {alarm_id} succesfully retrieved from the ManagementDB")
                return data
            else:
                print(data)
                LOGGER.debug(f"Alarm {alarm_id} does not exists")
                return data
        except sqlite3.Error as e:
            LOGGER.debug(f"Alarm {alarm_id} cannot be retrieved from the ManagementDB: {e}")
        
    def get_KPIS(self):
        try:
            data = self.client.execute("SELECT * FROM kpi").fetchall()
            LOGGER.debug(f"KPIs succesfully retrieved from the ManagementDB")
            return data
        except sqlite3.Error as e:
            LOGGER.debug(f"KPIs cannot be retrieved from the ManagementDB: {e}")

    def get_subscriptions(self):
        try:
            data = self.client.execute("SELECT * FROM subscription").fetchall()
            LOGGER.debug(f"Subscriptions succesfully retrieved from the ManagementDB")
            return data
        except sqlite3.Error as e:
            LOGGER.debug(f"Subscriptions cannot be retrieved from the ManagementDB: {e}")

    def get_alarms(self):
        try:
            data = self.client.execute("SELECT * FROM alarm").fetchall()
            LOGGER.debug(f"Alarms succesfully retrieved from the ManagementDB")
            return data
        except sqlite3.Error as e:
            LOGGER.debug(f"Alarms cannot be retrieved from the ManagementDB: {e}")
 No newline at end of file
Loading