From c6e8a2595d2c3fee175b94256a6d715dc2747718 Mon Sep 17 00:00:00 2001
From: Javier Moreno <francisco.moreno.external@atos.net>
Date: Tue, 4 Oct 2022 17:35:23 +0200
Subject: [PATCH] Merge from develop (WIP)

---
 src/monitoring/service/ManagementDBTools.py   | 15 +++-
 src/monitoring/service/MetricsDBTools.py      | 17 +++-
 src/monitoring/service/SubscriptionManager.py | 89 +++++++++++++++++--
 3 files changed, 109 insertions(+), 12 deletions(-)

diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py
index 09fb3049f..04693d3ff 100644
--- a/src/monitoring/service/ManagementDBTools.py
+++ b/src/monitoring/service/ManagementDBTools.py
@@ -11,8 +11,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from monitoring.service.MonitoringServiceServicerImpl import LOGGER
+from common.tools.timestamp.Converters import timestamp_utcnow_to_float
+
 import sqlite3 as sl
+import os
+ROOT_DIR = os.path.abspath(os.curdir)
 
 class ManagementDB():
     def __init__(self, database):
@@ -121,4 +124,12 @@ class ManagementDB():
         #print("\n")
         #for row in data:
         #    print(row)
-        return data.fetchall()
\ No newline at end of file
+        return data.fetchall()
+
+def main():
+    managementdb = ManagementDB("monitoring.db")
+    subs_id = managementdb.insert_subscription(2,"localhost",20,2,timestamp_utcnow_to_float(),timestamp_utcnow_to_float() + 10)
+    print("subs id: ", str(subs_id))
+
+if __name__ == '__main__':
+    main()
diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py
index d8a7d5060..522a7c211 100644
--- a/src/monitoring/service/MetricsDBTools.py
+++ b/src/monitoring/service/MetricsDBTools.py
@@ -76,10 +76,19 @@ class MetricsDB():
     self.run_query(query)
     LOGGER.info(f"Table {self.table} created")
 
-  def get_subscription_data(self, subs_queue, kpi_id, end_date, sampling_interval_s):
-      start_date = end_date-sampling_interval_s
-      query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')"
+  def get_subscription_data(self, subs_queue, kpi_id, end_date, sampling_interval_s, callback_return):
+      str_end_date = str(end_date.isoformat()) + 'Z'
+      print("str_end_date: " + str_end_date)
+      start_date = end_date - datetime.timedelta(seconds=sampling_interval_s)
+      str_start_date = str(start_date.isoformat()) + 'Z'
+      print("str_start_date: " + str(str_start_date))
+      # query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '2022-09-28T07:21:26.595586Z' AND '2022-09-28T07:32:34.197792Z')"
+      query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{str_start_date}' AND '{str_end_date}')"
       response=self.run_query(query)
       kpi_list=response['dataset']
-      # subs_queue.append(kpi_list)
+      # print(kpi_list)
+      subs_queue.put_nowait(kpi_list)
+      # return kpi_list
+      if callback_return:
+          callback_return(kpi_list)
       
diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py
index 0b6717337..e133cd914 100644
--- a/src/monitoring/service/SubscriptionManager.py
+++ b/src/monitoring/service/SubscriptionManager.py
@@ -1,11 +1,21 @@
+import os
+from queue import Queue
+from random import random
+
 from apscheduler.schedulers.background import BackgroundScheduler
 from apscheduler.executors.pool import ProcessPoolExecutor
 from apscheduler.triggers.interval import IntervalTrigger
 from apscheduler.triggers.cron import CronTrigger
-from common.tools.timestamp.Converters import timestamp_utcnow_to_float
-from datetime import datetime, timezone
+
+from common.proto.monitoring_pb2 import Kpi, KpiList
+from common.tools.timestamp import Converters
+from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_float_to_string
+from datetime import datetime
 import time
 
+from monitoring.service import MetricsDBTools
+from monitoring.service.ManagementDBTools import ManagementDB
+
 
 class SubscriptionManager():
     def __init__(self, metrics_db):
@@ -17,15 +27,82 @@ class SubscriptionManager():
     def create_subscription(self, subs_queue ,subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None):
         start_date=None
         end_date=None
+        print("Inside create subscription")
         if sampling_duration_s:
             if not start_timestamp:
                 start_timestamp=time.time()
             end_timestamp=start_timestamp+sampling_duration_s
+            print("end_timestamp: " + timestamp_float_to_string(end_timestamp))
         if start_timestamp:
-            start_date = datetime.fromtimestamp(start_timestamp)
+            start_date = datetime.utcfromtimestamp(start_timestamp)
+            print("start_date: " + str(start_date))
         if end_timestamp:
-            end_date = datetime.fromtimestamp(end_timestamp)
-        self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue, kpi_id, timestamp_utcnow_to_float(), sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id)
+            end_date = datetime.utcfromtimestamp(end_timestamp)
+            print("end_date: " + str(end_date))
+        self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue, kpi_id, start_timestamp, sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id)
+        self.metrics_db.get_subscription_data(subs_queue,kpi_id,end_date,sampling_interval_s)
+
+
 
     def delete_subscription(self, subscription_id):
-        self.scheduler.remove_job(subscription_id)
\ No newline at end of file
+        self.scheduler.remove_job(subscription_id)
+
+
+def main():
+
+    subs_queue = Queue()
+
+    managementdb = ManagementDB("monitoring.db")
+    metrics_db = MetricsDBTools.MetricsDB("localhost", "9009", "9000", "monitoring")
+    subs_manager = SubscriptionManager(metrics_db)
+
+    print("Here")
+
+    kpi_id = "2"
+    sampling_duration_s = 10
+    sampling_interval_s = 2
+    start_timestamp = timestamp_utcnow_to_float()
+    end_timestamp = timestamp_utcnow_to_float() + 10
+
+
+
+    print("Before loop")
+    print("start_timestamp: " + timestamp_float_to_string(start_timestamp))
+    print("end_timestamp: " + timestamp_float_to_string(end_timestamp))
+
+
+    for i in range(10):
+        timestamp = timestamp_utcnow_to_float()
+        kpitype = "KPISAMPLETYPE_PACKETS_TRANSMITTED"
+        deviceId = "DEV01"
+        endpointId = "END01"
+        serviceId = "SERV01"
+        kpi_value = 50*random()
+        metrics_db.write_KPI(timestamp,kpi_id,kpitype,deviceId,endpointId,serviceId,kpi_value)
+        time.sleep(1)
+
+    print("After loop")
+
+    subs_id = managementdb.insert_subscription(kpi_id, "localhost", sampling_duration_s, sampling_interval_s, start_timestamp,
+                                               end_timestamp)
+
+    subs_manager.create_subscription(subs_queue,str(subs_id),kpi_id,sampling_interval_s,sampling_duration_s,start_timestamp,end_timestamp)
+
+    print("Queue empty: " + str(subs_queue.empty()))
+    print("Queue size: " + str(subs_queue.qsize()))
+
+    while not subs_queue.empty():
+        list = subs_queue.get_nowait()
+        print("List: " + str(list))
+        kpi_list = KpiList()
+        for item in list:
+            kpi = Kpi()
+            kpi.kpi_id.kpi_id.uuid = item[0]
+            kpi.timestamp.timestamp = Converters.timestamp_string_to_float(item[1])
+            kpi.kpi_value.floatVal = item[2]
+            kpi_list.kpi.append(kpi)
+            print("Kpi List: " + str(kpi_list))
+
+
+if __name__ == '__main__':
+    main()
-- 
GitLab