Commit 9cc955c8 authored by Francisco-Javier Moreno-Muro's avatar Francisco-Javier Moreno-Muro
Browse files

Updated QueryKpiData RPC (stable)

parent cc78e3b4
Loading
Loading
Loading
Loading
+17 −2
Original line number Original line Diff line number Diff line
@@ -25,7 +25,7 @@ service MonitoringService {
  rpc GetKpiDescriptorList  (context.Empty      ) returns (KpiDescriptorList   ) {} // Stable and final
  rpc GetKpiDescriptorList  (context.Empty      ) returns (KpiDescriptorList   ) {} // Stable and final
  rpc IncludeKpi            (Kpi                ) returns (context.Empty       ) {} // Stable and final
  rpc IncludeKpi            (Kpi                ) returns (context.Empty       ) {} // Stable and final
  rpc MonitorKpi            (MonitorKpiRequest  ) returns (context.Empty       ) {} // Stable and final
  rpc MonitorKpi            (MonitorKpiRequest  ) returns (context.Empty       ) {} // Stable and final
  rpc QueryKpiData          (KpiQuery           ) returns (KpiList             ) {} // Not implemented
  rpc QueryKpiData          (KpiQuery           ) returns (RawKpiTable         ) {} // Not implemented
  rpc SetKpiSubscription    (SubsDescriptor     ) returns (stream SubsResponse ) {} // Stable not final
  rpc SetKpiSubscription    (SubsDescriptor     ) returns (stream SubsResponse ) {} // Stable not final
  rpc GetSubsDescriptor     (SubscriptionID     ) returns (SubsDescriptor      ) {} // Stable and final
  rpc GetSubsDescriptor     (SubscriptionID     ) returns (SubsDescriptor      ) {} // Stable and final
  rpc GetSubscriptions      (context.Empty      ) returns (SubsList            ) {} // Stable and final
  rpc GetSubscriptions      (context.Empty      ) returns (SubsList            ) {} // Stable and final
@@ -59,7 +59,7 @@ message MonitorKpiRequest {
}
}


message KpiQuery {
message KpiQuery {
  KpiId    kpi_id              = 1;
  repeated KpiId    kpi_id              = 1;
  float             monitoring_window_s = 2;
  float             monitoring_window_s = 2;
  float             sampling_rate_s     = 3;
  float             sampling_rate_s     = 3;
  uint32            last_n_samples      = 4;  // used when you want something like "get the last N many samples
  uint32            last_n_samples      = 4;  // used when you want something like "get the last N many samples
@@ -68,6 +68,21 @@ message KpiQuery {
  // Pending add field to reflect Available Device Protocols
  // Pending add field to reflect Available Device Protocols
}
}



message RawKpi { // cell
  context.Timestamp timestamp = 1;
  KpiValue          kpi_value = 2;
}

message RawKpiList { // column
  KpiId           kpi_id        = 1;
  repeated RawKpi raw_kpi_list  = 2;
}

message RawKpiTable { // table
  repeated RawKpiList raw_kpi_table = 1;
}

message KpiId {
message KpiId {
  context.Uuid kpi_id = 1;
  context.Uuid kpi_id = 1;
}
}
+2 −2
Original line number Original line Diff line number Diff line
@@ -22,7 +22,7 @@ from common.tools.grpc.Tools import grpc_message_to_json_string
from common.proto.context_pb2 import Empty
from common.proto.context_pb2 import Empty
from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest, \
from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest, \
    KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsList, \
    KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsList, \
    SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription
    SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription, RawKpiTable
from common.proto.monitoring_pb2_grpc import MonitoringServiceStub
from common.proto.monitoring_pb2_grpc import MonitoringServiceStub


LOGGER = logging.getLogger(__name__)
LOGGER = logging.getLogger(__name__)
@@ -93,7 +93,7 @@ class MonitoringClient:
        return response
        return response


    @RETRY_DECORATOR
    @RETRY_DECORATOR
    def QueryKpiData(self, request : KpiQuery) -> KpiList:
    def QueryKpiData(self, request : KpiQuery) -> RawKpiTable:
        LOGGER.debug('QueryKpiData: {:s}'.format(grpc_message_to_json_string(request)))
        LOGGER.debug('QueryKpiData: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.QueryKpiData(request)
        response = self.stub.QueryKpiData(request)
        LOGGER.debug('QueryKpiData result: {:s}'.format(grpc_message_to_json_string(response)))
        LOGGER.debug('QueryKpiData result: {:s}'.format(grpc_message_to_json_string(response)))
+21 −0
Original line number Original line Diff line number Diff line
@@ -174,6 +174,27 @@ class MetricsDB():
                if connection:
                if connection:
                    connection.close()
                    connection.close()


    def get_raw_kpi_list(self, kpi_id, monitoring_window_s,sampling_rate_s, last_n_samples, start_timestamp, end_timestamp):
        try:
            end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000
            start_date = end_date - monitoring_window_s
            query = f"SELECT timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')"

            if self.postgre:
                kpi_list = self.run_query_postgre(query)
                LOGGER.debug(f"kpi_list postgre: {kpi_list}")
            else:
                kpi_list = self.run_query(query)
                LOGGER.debug(f"kpi_list influx: {kpi_list}")

            if kpi_list:
                LOGGER.debug(f"New data received for subscription to KPI {kpi_id}")
                return kpi_list
            else:
                LOGGER.debug(f"No new data for the subscription to KPI {kpi_id}")
        except (Exception) as e:
            LOGGER.debug(f"Subscription data cannot be retrieved. {e}")

    def get_subscription_data(self,subs_queue, kpi_id, sampling_interval_s=1):
    def get_subscription_data(self,subs_queue, kpi_id, sampling_interval_s=1):
        try:
        try:
            end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000
            end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000
+41 −4
Original line number Original line Diff line number Diff line
@@ -26,7 +26,7 @@ from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer
from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer
from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \
from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \
    KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \
    KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \
    MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse
    MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse, RawKpiTable, RawKpi, RawKpiList
from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float
from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float


@@ -243,12 +243,49 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
            # CREATEKPI_COUNTER_FAILED.inc()
            # CREATEKPI_COUNTER_FAILED.inc()


    def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> KpiList:
    def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> RawKpiTable:


        LOGGER.info('QueryKpiData')
        LOGGER.info('QueryKpiData')
        try:
        try:
            # TBC
            raw_kpi_table = RawKpiTable()
            return KpiList()

            LOGGER.debug(str(request))

            kpi_id_list             = request.kpi_id
            monitoring_window_s     = request.monitoring_window_s
            sampling_rate_s         = request.sampling_rate_s
            last_n_samples          = request.last_n_samples
            start_timestamp         = request.start_timestamp.timestamp
            end_timestamp           = request.end_timestamp.timestamp

            # Check if all the Kpi_ids exist
            for item in kpi_id_list:
                kpi_id = item.kpi_id.uuid

                kpiDescriptor = self.GetKpiDescriptor(item, grpc_context)
                if kpiDescriptor is None:
                    LOGGER.info('QueryKpiData error: KpiID({:s}): not found in database'.format(str(kpi_id)))
                    break
                else:
                    # Execute query per Kpi_id and introduce their kpi_list in the table
                    kpi_list = self.metrics_db.get_raw_kpi_list(kpi_id,monitoring_window_s,sampling_rate_s,last_n_samples,start_timestamp,end_timestamp)
                    raw_kpi_list = RawKpiList()
                    raw_kpi_list.kpi_id.kpi_id.uuid = kpi_id

                    LOGGER.debug(str(kpi_list))

                    if kpi_list is None:
                        LOGGER.info('QueryKpiData error: KpiID({:s}): points not found in metrics database'.format(str(kpi_id)))
                    else:
                        for item in kpi_list:
                            raw_kpi = RawKpi()
                            raw_kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
                            raw_kpi.kpi_value.floatVal  = item[2]
                            raw_kpi_list.raw_kpi_list.append(raw_kpi)

                    raw_kpi_table.raw_kpi_table.append(raw_kpi_list)

            return raw_kpi_table
        except ServiceException as e:
        except ServiceException as e:
            LOGGER.exception('QueryKpiData exception')
            LOGGER.exception('QueryKpiData exception')
            grpc_context.abort(e.code, e.details)
            grpc_context.abort(e.code, e.details)
+9 −2
Original line number Original line Diff line number Diff line
@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License.
import datetime
from random import random
from random import random


from common.proto import monitoring_pb2
from common.proto import monitoring_pb2
@@ -75,9 +74,17 @@ def kpi_descriptor_list():


    return _kpi_descriptor_list
    return _kpi_descriptor_list


def kpi_query():
def kpi_query(kpi_id_list):
    _kpi_query = monitoring_pb2.KpiQuery()
    _kpi_query = monitoring_pb2.KpiQuery()


    _kpi_query.kpi_id.extend(kpi_id_list)
    _kpi_query.monitoring_window_s          = 20
    _kpi_query.sampling_rate_s              = 3
    # _kpi_query.last_n_samples               = 10
    _kpi_query.start_timestamp.timestamp    = timestamp_utcnow_to_float() - 20
    _kpi_query.end_timestamp.timestamp      = timestamp_utcnow_to_float()


    return _kpi_query
    return _kpi_query


def subs_descriptor(kpi_id):
def subs_descriptor(kpi_id):
Loading