diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py index 09fb3049f758628d260fee6bb5210358fd099692..04693d3ffe40acb04beba31111baa0be5b50ecdd 100644 --- a/src/monitoring/service/ManagementDBTools.py +++ b/src/monitoring/service/ManagementDBTools.py @@ -11,8 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from monitoring.service.MonitoringServiceServicerImpl import LOGGER +from common.tools.timestamp.Converters import timestamp_utcnow_to_float + import sqlite3 as sl +import os +ROOT_DIR = os.path.abspath(os.curdir) class ManagementDB(): def __init__(self, database): @@ -121,4 +124,12 @@ class ManagementDB(): #print("\n") #for row in data: # print(row) - return data.fetchall() \ No newline at end of file + return data.fetchall() + +def main(): + managementdb = ManagementDB("monitoring.db") + subs_id = managementdb.insert_subscription(2,"localhost",20,2,timestamp_utcnow_to_float(),timestamp_utcnow_to_float() + 10) + print("subs id: ", str(subs_id)) + +if __name__ == '__main__': + main() diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index d8a7d5060e44694e734fd82508466c2dc1ecbe97..522a7c211cf75315320160fe4a0a90c4efba2c83 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -76,10 +76,19 @@ class MetricsDB(): self.run_query(query) LOGGER.info(f"Table {self.table} created") - def get_subscription_data(self, subs_queue, kpi_id, end_date, sampling_interval_s): - start_date = end_date-sampling_interval_s - query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" + def get_subscription_data(self, subs_queue, kpi_id, end_date, sampling_interval_s, callback_return): + str_end_date = str(end_date.isoformat()) + 'Z' + print("str_end_date: " + str_end_date) + start_date = end_date - datetime.timedelta(seconds=sampling_interval_s) + str_start_date = str(start_date.isoformat()) + 'Z' + print("str_start_date: " + str(str_start_date)) + # query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '2022-09-28T07:21:26.595586Z' AND '2022-09-28T07:32:34.197792Z')" + query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{str_start_date}' AND '{str_end_date}')" response=self.run_query(query) kpi_list=response['dataset'] - # subs_queue.append(kpi_list) + # print(kpi_list) + subs_queue.put_nowait(kpi_list) + # return kpi_list + if callback_return: + callback_return(kpi_list) diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index 0b6717337d96915b126dfb457241e30b69ec80b8..e133cd914f6a5cf22066a44c644121d408ada028 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -1,11 +1,21 @@ +import os +from queue import Queue +from random import random + from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger -from common.tools.timestamp.Converters import timestamp_utcnow_to_float -from datetime import datetime, timezone + +from common.proto.monitoring_pb2 import Kpi, KpiList +from common.tools.timestamp import Converters +from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_float_to_string +from datetime import datetime import time +from monitoring.service import MetricsDBTools +from monitoring.service.ManagementDBTools import ManagementDB + class SubscriptionManager(): def __init__(self, metrics_db): @@ -17,15 +27,82 @@ class SubscriptionManager(): def create_subscription(self, subs_queue ,subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): start_date=None end_date=None + print("Inside create subscription") if sampling_duration_s: if not start_timestamp: start_timestamp=time.time() end_timestamp=start_timestamp+sampling_duration_s + print("end_timestamp: " + timestamp_float_to_string(end_timestamp)) if start_timestamp: - start_date = datetime.fromtimestamp(start_timestamp) + start_date = datetime.utcfromtimestamp(start_timestamp) + print("start_date: " + str(start_date)) if end_timestamp: - end_date = datetime.fromtimestamp(end_timestamp) - self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue, kpi_id, timestamp_utcnow_to_float(), sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) + end_date = datetime.utcfromtimestamp(end_timestamp) + print("end_date: " + str(end_date)) + self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue, kpi_id, start_timestamp, sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) + self.metrics_db.get_subscription_data(subs_queue,kpi_id,end_date,sampling_interval_s) + + def delete_subscription(self, subscription_id): - self.scheduler.remove_job(subscription_id) \ No newline at end of file + self.scheduler.remove_job(subscription_id) + + +def main(): + + subs_queue = Queue() + + managementdb = ManagementDB("monitoring.db") + metrics_db = MetricsDBTools.MetricsDB("localhost", "9009", "9000", "monitoring") + subs_manager = SubscriptionManager(metrics_db) + + print("Here") + + kpi_id = "2" + sampling_duration_s = 10 + sampling_interval_s = 2 + start_timestamp = timestamp_utcnow_to_float() + end_timestamp = timestamp_utcnow_to_float() + 10 + + + + print("Before loop") + print("start_timestamp: " + timestamp_float_to_string(start_timestamp)) + print("end_timestamp: " + timestamp_float_to_string(end_timestamp)) + + + for i in range(10): + timestamp = timestamp_utcnow_to_float() + kpitype = "KPISAMPLETYPE_PACKETS_TRANSMITTED" + deviceId = "DEV01" + endpointId = "END01" + serviceId = "SERV01" + kpi_value = 50*random() + metrics_db.write_KPI(timestamp,kpi_id,kpitype,deviceId,endpointId,serviceId,kpi_value) + time.sleep(1) + + print("After loop") + + subs_id = managementdb.insert_subscription(kpi_id, "localhost", sampling_duration_s, sampling_interval_s, start_timestamp, + end_timestamp) + + subs_manager.create_subscription(subs_queue,str(subs_id),kpi_id,sampling_interval_s,sampling_duration_s,start_timestamp,end_timestamp) + + print("Queue empty: " + str(subs_queue.empty())) + print("Queue size: " + str(subs_queue.qsize())) + + while not subs_queue.empty(): + list = subs_queue.get_nowait() + print("List: " + str(list)) + kpi_list = KpiList() + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = item[0] + kpi.timestamp.timestamp = Converters.timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] + kpi_list.kpi.append(kpi) + print("Kpi List: " + str(kpi_list)) + + +if __name__ == '__main__': + main()