Commit 03fa01a5 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/monitoring' into 'develop'

Added new KPI Sample Types

See merge request !18
parents 5aff102d 9e82e3c7
Loading
Loading
Loading
Loading
+15 −5
Original line number Original line Diff line number Diff line
@@ -19,6 +19,16 @@ enum KpiSampleType {
    KPISAMPLETYPE_UNKNOWN                       = 0;
    KPISAMPLETYPE_UNKNOWN                       = 0;
    KPISAMPLETYPE_PACKETS_TRANSMITTED           = 101;
    KPISAMPLETYPE_PACKETS_TRANSMITTED           = 101;
    KPISAMPLETYPE_PACKETS_RECEIVED              = 102;
    KPISAMPLETYPE_PACKETS_RECEIVED              = 102;
    KPISAMPLETYPE_PACKETS_DROPPED               = 103;
    KPISAMPLETYPE_BYTES_TRANSMITTED             = 201;
    KPISAMPLETYPE_BYTES_TRANSMITTED             = 201;
    KPISAMPLETYPE_BYTES_RECEIVED                = 202;
    KPISAMPLETYPE_BYTES_RECEIVED                = 202;
    KPISAMPLETYPE_BYTES_DROPPED                 = 203;
    KPISAMPLETYPE_ML_CONFIDENCE                 = 401;  //. can be used by both optical and L3 without any issue
    KPISAMPLETYPE_OPTICAL_SECURITY_STATUS       = 501;  //. can be used by both optical and L3 without any issue
    KPISAMPLETYPE_L3_UNIQUE_ATTACK_CONNS        = 601;
    KPISAMPLETYPE_L3_TOTAL_DROPPED_PACKTS       = 602;
    KPISAMPLETYPE_L3_UNIQUE_ATTACKERS           = 603;
    KPISAMPLETYPE_L3_UNIQUE_COMPROMISED_CLIENTS = 604;
    KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO     = 605;
    KPISAMPLETYPE_SERVICE_LATENCY_MS            = 701;
}
}
+22 −8
Original line number Original line Diff line number Diff line
@@ -25,7 +25,7 @@ service MonitoringService {
  rpc GetKpiDescriptorList  (context.Empty      ) returns (KpiDescriptorList   ) {} // Stable and final
  rpc GetKpiDescriptorList  (context.Empty      ) returns (KpiDescriptorList   ) {} // Stable and final
  rpc IncludeKpi            (Kpi                ) returns (context.Empty       ) {} // Stable and final
  rpc IncludeKpi            (Kpi                ) returns (context.Empty       ) {} // Stable and final
  rpc MonitorKpi            (MonitorKpiRequest  ) returns (context.Empty       ) {} // Stable and final
  rpc MonitorKpi            (MonitorKpiRequest  ) returns (context.Empty       ) {} // Stable and final
  rpc QueryKpiData          (KpiQuery           ) returns (KpiList             ) {} // Not implemented
  rpc QueryKpiData          (KpiQuery           ) returns (RawKpiTable         ) {} // Not implemented
  rpc SetKpiSubscription    (SubsDescriptor     ) returns (stream SubsResponse ) {} // Stable not final
  rpc SetKpiSubscription    (SubsDescriptor     ) returns (stream SubsResponse ) {} // Stable not final
  rpc GetSubsDescriptor     (SubscriptionID     ) returns (SubsDescriptor      ) {} // Stable and final
  rpc GetSubsDescriptor     (SubscriptionID     ) returns (SubsDescriptor      ) {} // Stable and final
  rpc GetSubscriptions      (context.Empty      ) returns (SubsList            ) {} // Stable and final
  rpc GetSubscriptions      (context.Empty      ) returns (SubsList            ) {} // Stable and final
@@ -48,6 +48,7 @@ message KpiDescriptor {
  context.EndPointId             endpoint_id     = 6;
  context.EndPointId             endpoint_id     = 6;
  context.ServiceId              service_id      = 7;
  context.ServiceId              service_id      = 7;
  context.SliceId                slice_id        = 8;
  context.SliceId                slice_id        = 8;
  context.ConnectionId           connection_id   = 9;
}
}


message MonitorKpiRequest {
message MonitorKpiRequest {
@@ -58,13 +59,26 @@ message MonitorKpiRequest {
}
}


message KpiQuery {
message KpiQuery {
  KpiId    kpi_id              = 1;
  repeated KpiId    kpi_ids             = 1;
  float             monitoring_window_s = 2;
  float             monitoring_window_s = 2;
  float             sampling_rate_s     = 3;
  uint32            last_n_samples      = 3;  // used when you want something like "get the last N many samples
  uint32            last_n_samples      = 4;  // used when you want something like "get the last N many samples
  context.Timestamp start_timestamp     = 4;  // used when you want something like "get the samples since X date/time"
  context.Timestamp start_timestamp     = 5;  // used when you want something like "get the samples since X date/time"
  context.Timestamp end_timestamp       = 5;  // used when you want something like "get the samples until X date/time"
  context.Timestamp end_timestamp       = 6;  // used when you want something like "get the samples until X date/time"
}
  // Pending add field to reflect Available Device Protocols


message RawKpi { // cell
  context.Timestamp timestamp = 1;
  KpiValue          kpi_value = 2;
}

message RawKpiList { // column
  KpiId           kpi_id    = 1;
  repeated RawKpi raw_kpis  = 2;
}

message RawKpiTable { // table
  repeated RawKpiList raw_kpi_lists = 1;
}
}


message KpiId {
message KpiId {
+2 −2
Original line number Original line Diff line number Diff line
@@ -22,7 +22,7 @@ from common.tools.grpc.Tools import grpc_message_to_json_string
from common.proto.context_pb2 import Empty
from common.proto.context_pb2 import Empty
from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest, \
from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest, \
    KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsList, \
    KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsList, \
    SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription
    SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription, RawKpiTable
from common.proto.monitoring_pb2_grpc import MonitoringServiceStub
from common.proto.monitoring_pb2_grpc import MonitoringServiceStub


LOGGER = logging.getLogger(__name__)
LOGGER = logging.getLogger(__name__)
@@ -93,7 +93,7 @@ class MonitoringClient:
        return response
        return response


    @RETRY_DECORATOR
    @RETRY_DECORATOR
    def QueryKpiData(self, request : KpiQuery) -> KpiList:
    def QueryKpiData(self, request : KpiQuery) -> RawKpiTable:
        LOGGER.debug('QueryKpiData: {:s}'.format(grpc_message_to_json_string(request)))
        LOGGER.debug('QueryKpiData: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.QueryKpiData(request)
        response = self.stub.QueryKpiData(request)
        LOGGER.debug('QueryKpiData result: {:s}'.format(grpc_message_to_json_string(response)))
        LOGGER.debug('QueryKpiData result: {:s}'.format(grpc_message_to_json_string(response)))
+10 −3
Original line number Original line Diff line number Diff line
import pytz
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.jobstores.base import JobLookupError
from apscheduler.jobstores.base import JobLookupError
@@ -19,10 +20,16 @@ class AlarmManager():
        end_date=None
        end_date=None
        if subscription_timeout_s:
        if subscription_timeout_s:
            start_timestamp=time.time()
            start_timestamp=time.time()
            start_date=datetime.fromtimestamp(start_timestamp)
            end_timestamp = start_timestamp + subscription_timeout_s
            end_date=datetime.fromtimestamp(start_timestamp+subscription_timeout_s)
            start_date = datetime.utcfromtimestamp(start_timestamp).isoformat()
        self.scheduler.add_job(self.metrics_db.get_alarm_data, args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, end_date=end_date, id=alarm_id)
            end_date = datetime.utcfromtimestamp(end_timestamp).isoformat()

        job = self.scheduler.add_job(self.metrics_db.get_alarm_data,
                               args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),
                               trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date,
                               end_date=end_date,timezone=pytz.utc, id=str(alarm_id))
        LOGGER.debug(f"Alarm job {alarm_id} succesfully created")
        LOGGER.debug(f"Alarm job {alarm_id} succesfully created")
        #job.remove()


    def delete_alarm(self, alarm_id):
    def delete_alarm(self, alarm_id):
        try:
        try:
+45 −5
Original line number Original line Diff line number Diff line
@@ -38,7 +38,10 @@ class ManagementDB():
                    kpi_sample_type INTEGER,
                    kpi_sample_type INTEGER,
                    device_id INTEGER,
                    device_id INTEGER,
                    endpoint_id INTEGER,
                    endpoint_id INTEGER,
                    service_id INTEGER
                    service_id INTEGER,
                    slice_id INTEGER,
                    connection_id INTEGER,
                    monitor_flag INTEGER
                );
                );
            """)
            """)
            LOGGER.debug("KPI table created in the ManagementDB")
            LOGGER.debug("KPI table created in the ManagementDB")
@@ -84,13 +87,13 @@ class ManagementDB():
            LOGGER.debug(f"Alarm table cannot be created in the ManagementDB. {e}")
            LOGGER.debug(f"Alarm table cannot be created in the ManagementDB. {e}")
            raise Exception
            raise Exception


    def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id):
    def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,slice_id,connection_id):
        try:
        try:
            c = self.client.cursor()
            c = self.client.cursor()
            c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id))
            c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ? AND slice_id is ? AND connection_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id,slice_id,connection_id))
            data=c.fetchone()
            data=c.fetchone()
            if data is None:
            if data is None:
                c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id))
                c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,slice_id,connection_id) VALUES (?,?,?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,slice_id,connection_id))
                self.client.commit()
                self.client.commit()
                kpi_id = c.lastrowid
                kpi_id = c.lastrowid
                LOGGER.debug(f"KPI {kpi_id} succesfully inserted in the ManagementDB")
                LOGGER.debug(f"KPI {kpi_id} succesfully inserted in the ManagementDB")
@@ -246,3 +249,40 @@ class ManagementDB():
            return data
            return data
        except sqlite3.Error as e:
        except sqlite3.Error as e:
            LOGGER.debug(f"Alarms cannot be retrieved from the ManagementDB: {e}")
            LOGGER.debug(f"Alarms cannot be retrieved from the ManagementDB: {e}")

    def check_monitoring_flag(self,kpi_id):
        try:
            c = self.client.cursor()
            c.execute("SELECT monitor_flag FROM kpi WHERE kpi_id is ?",(kpi_id,))
            data=c.fetchone()
            if data is None:
                LOGGER.debug(f"KPI {kpi_id} does not exists")
                return None
            else:
                if data[0] == 1:
                    return True
                elif data[0] == 0:
                    return False
                else:
                    LOGGER.debug(f"KPI {kpi_id} is wrong")
                    return None
        except sqlite3.Error as e:
            LOGGER.debug(f"KPI {kpi_id} cannot be checked from the ManagementDB: {e}")


    def set_monitoring_flag(self,kpi_id,flag):
        try:
            c = self.client.cursor()
            data = c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)).fetchone()
            if data is None:
                LOGGER.debug(f"KPI {kpi_id} does not exists")
                return None
            else:
                if flag :
                    value = 1
                else:
                    value = 0
                c.execute("UPDATE kpi SET monitor_flag = ? WHERE kpi_id is ?",(value,kpi_id))
                return True
        except sqlite3.Error as e:
            LOGGER.debug(f"KPI {kpi_id} cannot be checked from the ManagementDB: {e}")
 No newline at end of file
Loading