From 9cc955c880cbbcda2c4b5cdf82ba33e379e472ac Mon Sep 17 00:00:00 2001
From: fjmmuro <francisco.moreno@atos.net>
Date: Fri, 18 Nov 2022 12:41:50 +0100
Subject: [PATCH] Updated QueryKpiData RPC (stable)

---
 proto/monitoring.proto                        | 19 +++++++-
 src/monitoring/client/MonitoringClient.py     |  4 +-
 src/monitoring/service/MetricsDBTools.py      | 21 +++++++++
 .../service/MonitoringServiceServicerImpl.py  | 45 +++++++++++++++++--
 src/monitoring/tests/Messages.py              | 11 ++++-
 src/monitoring/tests/test_unitary.py          | 15 ++++---
 6 files changed, 100 insertions(+), 15 deletions(-)

diff --git a/proto/monitoring.proto b/proto/monitoring.proto
index c0e2dd877..270ed9ccf 100644
--- a/proto/monitoring.proto
+++ b/proto/monitoring.proto
@@ -25,7 +25,7 @@ service MonitoringService {
   rpc GetKpiDescriptorList  (context.Empty      ) returns (KpiDescriptorList   ) {} // Stable and final
   rpc IncludeKpi            (Kpi                ) returns (context.Empty       ) {} // Stable and final
   rpc MonitorKpi            (MonitorKpiRequest  ) returns (context.Empty       ) {} // Stable and final
-  rpc QueryKpiData          (KpiQuery           ) returns (KpiList             ) {} // Not implemented
+  rpc QueryKpiData          (KpiQuery           ) returns (RawKpiTable         ) {} // Not implemented
   rpc SetKpiSubscription    (SubsDescriptor     ) returns (stream SubsResponse ) {} // Stable not final
   rpc GetSubsDescriptor     (SubscriptionID     ) returns (SubsDescriptor      ) {} // Stable and final
   rpc GetSubscriptions      (context.Empty      ) returns (SubsList            ) {} // Stable and final
@@ -59,7 +59,7 @@ message MonitorKpiRequest {
 }
 
 message KpiQuery {
-  KpiId    kpi_id              = 1;
+  repeated KpiId    kpi_id              = 1;
   float             monitoring_window_s = 2;
   float             sampling_rate_s     = 3;
   uint32            last_n_samples      = 4;  // used when you want something like "get the last N many samples
@@ -68,6 +68,21 @@ message KpiQuery {
   // Pending add field to reflect Available Device Protocols
 }
 
+
+message RawKpi { // cell
+  context.Timestamp timestamp = 1;
+  KpiValue          kpi_value = 2;
+}
+
+message RawKpiList { // column
+  KpiId           kpi_id        = 1;
+  repeated RawKpi raw_kpi_list  = 2;
+}
+
+message RawKpiTable { // table
+  repeated RawKpiList raw_kpi_table = 1;
+}
+
 message KpiId {
   context.Uuid kpi_id = 1;
 }
diff --git a/src/monitoring/client/MonitoringClient.py b/src/monitoring/client/MonitoringClient.py
index 73607a081..5641b9cf3 100644
--- a/src/monitoring/client/MonitoringClient.py
+++ b/src/monitoring/client/MonitoringClient.py
@@ -22,7 +22,7 @@ from common.tools.grpc.Tools import grpc_message_to_json_string
 from common.proto.context_pb2 import Empty
 from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest, \
     KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsList, \
-    SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription
+    SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription, RawKpiTable
 from common.proto.monitoring_pb2_grpc import MonitoringServiceStub
 
 LOGGER = logging.getLogger(__name__)
@@ -93,7 +93,7 @@ class MonitoringClient:
         return response
 
     @RETRY_DECORATOR
-    def QueryKpiData(self, request : KpiQuery) -> KpiList:
+    def QueryKpiData(self, request : KpiQuery) -> RawKpiTable:
         LOGGER.debug('QueryKpiData: {:s}'.format(grpc_message_to_json_string(request)))
         response = self.stub.QueryKpiData(request)
         LOGGER.debug('QueryKpiData result: {:s}'.format(grpc_message_to_json_string(response)))
diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py
index 0f41cfee1..aa374b340 100644
--- a/src/monitoring/service/MetricsDBTools.py
+++ b/src/monitoring/service/MetricsDBTools.py
@@ -174,6 +174,27 @@ class MetricsDB():
                 if connection:
                     connection.close()
 
+    def get_raw_kpi_list(self, kpi_id, monitoring_window_s,sampling_rate_s, last_n_samples, start_timestamp, end_timestamp):
+        try:
+            end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000
+            start_date = end_date - monitoring_window_s
+            query = f"SELECT timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')"
+
+            if self.postgre:
+                kpi_list = self.run_query_postgre(query)
+                LOGGER.debug(f"kpi_list postgre: {kpi_list}")
+            else:
+                kpi_list = self.run_query(query)
+                LOGGER.debug(f"kpi_list influx: {kpi_list}")
+
+            if kpi_list:
+                LOGGER.debug(f"New data received for subscription to KPI {kpi_id}")
+                return kpi_list
+            else:
+                LOGGER.debug(f"No new data for the subscription to KPI {kpi_id}")
+        except (Exception) as e:
+            LOGGER.debug(f"Subscription data cannot be retrieved. {e}")
+
     def get_subscription_data(self,subs_queue, kpi_id, sampling_interval_s=1):
         try:
             end_date = timestamp_utcnow_to_float() - self.commit_lag_ms / 1000
diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py
index 9c88ed311..e12d7ca73 100644
--- a/src/monitoring/service/MonitoringServiceServicerImpl.py
+++ b/src/monitoring/service/MonitoringServiceServicerImpl.py
@@ -26,7 +26,7 @@ from common.proto.kpi_sample_types_pb2 import KpiSampleType
 from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer
 from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \
     KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \
-    MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse
+    MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse, RawKpiTable, RawKpi, RawKpiList
 from common.rpc_method_wrapper.ServiceExceptions import ServiceException
 from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float
 
@@ -243,12 +243,49 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
             grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
             # CREATEKPI_COUNTER_FAILED.inc()
 
-    def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> KpiList:
+    def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> RawKpiTable:
 
         LOGGER.info('QueryKpiData')
         try:
-            # TBC
-            return KpiList()
+            raw_kpi_table = RawKpiTable()
+
+            LOGGER.debug(str(request))
+
+            kpi_id_list             = request.kpi_id
+            monitoring_window_s     = request.monitoring_window_s
+            sampling_rate_s         = request.sampling_rate_s
+            last_n_samples          = request.last_n_samples
+            start_timestamp         = request.start_timestamp.timestamp
+            end_timestamp           = request.end_timestamp.timestamp
+
+            # Check if all the Kpi_ids exist
+            for item in kpi_id_list:
+                kpi_id = item.kpi_id.uuid
+
+                kpiDescriptor = self.GetKpiDescriptor(item, grpc_context)
+                if kpiDescriptor is None:
+                    LOGGER.info('QueryKpiData error: KpiID({:s}): not found in database'.format(str(kpi_id)))
+                    break
+                else:
+                    # Execute query per Kpi_id and introduce their kpi_list in the table
+                    kpi_list = self.metrics_db.get_raw_kpi_list(kpi_id,monitoring_window_s,sampling_rate_s,last_n_samples,start_timestamp,end_timestamp)
+                    raw_kpi_list = RawKpiList()
+                    raw_kpi_list.kpi_id.kpi_id.uuid = kpi_id
+
+                    LOGGER.debug(str(kpi_list))
+
+                    if kpi_list is None:
+                        LOGGER.info('QueryKpiData error: KpiID({:s}): points not found in metrics database'.format(str(kpi_id)))
+                    else:
+                        for item in kpi_list:
+                            raw_kpi = RawKpi()
+                            raw_kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
+                            raw_kpi.kpi_value.floatVal  = item[2]
+                            raw_kpi_list.raw_kpi_list.append(raw_kpi)
+
+                    raw_kpi_table.raw_kpi_table.append(raw_kpi_list)
+
+            return raw_kpi_table
         except ServiceException as e:
             LOGGER.exception('QueryKpiData exception')
             grpc_context.abort(e.code, e.details)
diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py
index 228b1ce42..a2fc421dd 100644
--- a/src/monitoring/tests/Messages.py
+++ b/src/monitoring/tests/Messages.py
@@ -11,7 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import datetime
 from random import random
 
 from common.proto import monitoring_pb2
@@ -75,9 +74,17 @@ def kpi_descriptor_list():
 
     return _kpi_descriptor_list
 
-def kpi_query():
+def kpi_query(kpi_id_list):
     _kpi_query = monitoring_pb2.KpiQuery()
 
+    _kpi_query.kpi_id.extend(kpi_id_list)
+    _kpi_query.monitoring_window_s          = 20
+    _kpi_query.sampling_rate_s              = 3
+    # _kpi_query.last_n_samples               = 10
+    _kpi_query.start_timestamp.timestamp    = timestamp_utcnow_to_float() - 20
+    _kpi_query.end_timestamp.timestamp      = timestamp_utcnow_to_float()
+
+
     return _kpi_query
 
 def subs_descriptor(kpi_id):
diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py
index 55ac9a18b..68bd6685b 100644
--- a/src/monitoring/tests/test_unitary.py
+++ b/src/monitoring/tests/test_unitary.py
@@ -36,7 +36,7 @@ from common.message_broker.MessageBroker import MessageBroker
 from common.proto import monitoring_pb2
 from common.proto.kpi_sample_types_pb2 import KpiSampleType
 from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \
-    AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse
+    AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse, RawKpiTable
 from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_string_to_float
 
 from context.client.ContextClient import ContextClient
@@ -284,14 +284,19 @@ def test_monitor_kpi(
     assert isinstance(response, Empty)
 
 # Test case that makes use of client fixture to test server's QueryKpiData method
-def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-name
+def test_query_kpi_data(monitoring_client,subs_scheduler): # pylint: disable=redefined-outer-name
+
+    kpi_id_list = []
+    kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
+    subs_scheduler.add_job(ingestion_data)
+    kpi_id_list.append(kpi_id)
     LOGGER.warning('test_query_kpi_data')
-    response = monitoring_client.QueryKpiData(kpi_query())
+    response = monitoring_client.QueryKpiData(kpi_query(kpi_id_list))
     LOGGER.debug(str(response))
-    assert isinstance(response, KpiList)
+    assert isinstance(response, RawKpiTable)
 
 # Test case that makes use of client fixture to test server's SetKpiSubscription method
-def test_set_kpi_subscription(monitoring_client,metrics_db,subs_scheduler): # pylint: disable=redefined-outer-name
+def test_set_kpi_subscription(monitoring_client,subs_scheduler): # pylint: disable=redefined-outer-name
     LOGGER.warning('test_set_kpi_subscription')
     kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
     subs_scheduler.add_job(ingestion_data)
-- 
GitLab