Skip to content
Snippets Groups Projects
SubscriptionManager.py 3.95 KiB
Newer Older
Javier Moreno's avatar
Javier Moreno committed
import os
from queue import Queue
from random import random

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
Javier Moreno's avatar
Javier Moreno committed

from common.proto.monitoring_pb2 import Kpi, KpiList
from common.tools.timestamp import Converters
from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_float_to_string
from datetime import datetime
Javier Moreno's avatar
Javier Moreno committed
from monitoring.service import MetricsDBTools
from monitoring.service.ManagementDBTools import ManagementDB


class SubscriptionManager():
    def __init__(self, metrics_db):
        self.metrics_db = metrics_db
        self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
        self.scheduler.start()
        

    def create_subscription(self, subs_queue ,subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None):
        start_date=None
        end_date=None
Javier Moreno's avatar
Javier Moreno committed
        print("Inside create subscription")
        if sampling_duration_s:
            if not start_timestamp:
                start_timestamp=time.time()
            end_timestamp=start_timestamp+sampling_duration_s
Javier Moreno's avatar
Javier Moreno committed
            print("end_timestamp: " + timestamp_float_to_string(end_timestamp))
Javier Moreno's avatar
Javier Moreno committed
            start_date = datetime.utcfromtimestamp(start_timestamp)
            print("start_date: " + str(start_date))
Javier Moreno's avatar
Javier Moreno committed
            end_date = datetime.utcfromtimestamp(end_timestamp)
            print("end_date: " + str(end_date))
        self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue, kpi_id, start_timestamp, sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id)
        self.metrics_db.get_subscription_data(subs_queue,kpi_id,end_date,sampling_interval_s)



    def delete_subscription(self, subscription_id):
Javier Moreno's avatar
Javier Moreno committed
        self.scheduler.remove_job(subscription_id)


def main():

    subs_queue = Queue()

    managementdb = ManagementDB("monitoring.db")
    metrics_db = MetricsDBTools.MetricsDB("localhost", "9009", "9000", "monitoring")
    subs_manager = SubscriptionManager(metrics_db)

    print("Here")

    kpi_id = "2"
    sampling_duration_s = 10
    sampling_interval_s = 2
    start_timestamp = timestamp_utcnow_to_float()
    end_timestamp = timestamp_utcnow_to_float() + 10



    print("Before loop")
    print("start_timestamp: " + timestamp_float_to_string(start_timestamp))
    print("end_timestamp: " + timestamp_float_to_string(end_timestamp))


    for i in range(10):
        timestamp = timestamp_utcnow_to_float()
        kpitype = "KPISAMPLETYPE_PACKETS_TRANSMITTED"
        deviceId = "DEV01"
        endpointId = "END01"
        serviceId = "SERV01"
        kpi_value = 50*random()
        metrics_db.write_KPI(timestamp,kpi_id,kpitype,deviceId,endpointId,serviceId,kpi_value)
        time.sleep(1)

    print("After loop")

    subs_id = managementdb.insert_subscription(kpi_id, "localhost", sampling_duration_s, sampling_interval_s, start_timestamp,
                                               end_timestamp)

    subs_manager.create_subscription(subs_queue,str(subs_id),kpi_id,sampling_interval_s,sampling_duration_s,start_timestamp,end_timestamp)

    print("Queue empty: " + str(subs_queue.empty()))
    print("Queue size: " + str(subs_queue.qsize()))

    while not subs_queue.empty():
        list = subs_queue.get_nowait()
        print("List: " + str(list))
        kpi_list = KpiList()
        for item in list:
            kpi = Kpi()
            kpi.kpi_id.kpi_id.uuid = item[0]
            kpi.timestamp.timestamp = Converters.timestamp_string_to_float(item[1])
            kpi.kpi_value.floatVal = item[2]
            kpi_list.kpi.append(kpi)
            print("Kpi List: " + str(kpi_list))


if __name__ == '__main__':
    main()