import os from queue import Queue from random import random from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger from common.proto.monitoring_pb2 import Kpi, KpiList from common.tools.timestamp import Converters from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_float_to_string from datetime import datetime import time from monitoring.service import MetricsDBTools from monitoring.service.ManagementDBTools import ManagementDB class SubscriptionManager(): def __init__(self, metrics_db): self.metrics_db = metrics_db self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) self.scheduler.start() def create_subscription(self, subs_queue ,subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): start_date=None end_date=None print("Inside create subscription") if sampling_duration_s: if not start_timestamp: start_timestamp=time.time() end_timestamp=start_timestamp+sampling_duration_s print("end_timestamp: " + timestamp_float_to_string(end_timestamp)) if start_timestamp: start_date = datetime.utcfromtimestamp(start_timestamp) print("start_date: " + str(start_date)) if end_timestamp: end_date = datetime.utcfromtimestamp(end_timestamp) print("end_date: " + str(end_date)) self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue, kpi_id, start_timestamp, sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) self.metrics_db.get_subscription_data(subs_queue,kpi_id,end_date,sampling_interval_s) def delete_subscription(self, subscription_id): self.scheduler.remove_job(subscription_id) def main(): subs_queue = Queue() managementdb = ManagementDB("monitoring.db") metrics_db = MetricsDBTools.MetricsDB("localhost", "9009", "9000", "monitoring") subs_manager = SubscriptionManager(metrics_db) print("Here") kpi_id = "2" sampling_duration_s = 10 sampling_interval_s = 2 start_timestamp = timestamp_utcnow_to_float() end_timestamp = timestamp_utcnow_to_float() + 10 print("Before loop") print("start_timestamp: " + timestamp_float_to_string(start_timestamp)) print("end_timestamp: " + timestamp_float_to_string(end_timestamp)) for i in range(10): timestamp = timestamp_utcnow_to_float() kpitype = "KPISAMPLETYPE_PACKETS_TRANSMITTED" deviceId = "DEV01" endpointId = "END01" serviceId = "SERV01" kpi_value = 50*random() metrics_db.write_KPI(timestamp,kpi_id,kpitype,deviceId,endpointId,serviceId,kpi_value) time.sleep(1) print("After loop") subs_id = managementdb.insert_subscription(kpi_id, "localhost", sampling_duration_s, sampling_interval_s, start_timestamp, end_timestamp) subs_manager.create_subscription(subs_queue,str(subs_id),kpi_id,sampling_interval_s,sampling_duration_s,start_timestamp,end_timestamp) print("Queue empty: " + str(subs_queue.empty())) print("Queue size: " + str(subs_queue.qsize())) while not subs_queue.empty(): list = subs_queue.get_nowait() print("List: " + str(list)) kpi_list = KpiList() for item in list: kpi = Kpi() kpi.kpi_id.kpi_id.uuid = item[0] kpi.timestamp.timestamp = Converters.timestamp_string_to_float(item[1]) kpi.kpi_value.floatVal = item[2] kpi_list.kpi.append(kpi) print("Kpi List: " + str(kpi_list)) if __name__ == '__main__': main()