Skip to content
Snippets Groups Projects
SubscriptionManager.py 7.1 KiB
Newer Older
Javier Moreno's avatar
Javier Moreno committed
import os
from queue import Queue
from random import random

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.jobstores.base import JobLookupError
Javier Moreno's avatar
Javier Moreno committed
from datetime import datetime
import logging
LOGGER = logging.getLogger(__name__)

class SubscriptionManager():
    def __init__(self, metrics_db):
        self.metrics_db = metrics_db
        self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
        self.scheduler.start()
        LOGGER.info("Subscription Manager Initialized")
    def create_subscription(self, subs_queue ,subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None):
        start_date=None
        end_date=None
Javier Moreno's avatar
Javier Moreno committed
        print("Inside create subscription")
Sergio Gonzalez Diaz's avatar
Sergio Gonzalez Diaz committed
        if sampling_duration_s:
            if not start_timestamp:# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from queue import Queue
from random import random

import pytz
from apscheduler.schedulers.background import BackgroundScheduler

from common.proto.monitoring_pb2 import Kpi, KpiList
from common.tools.timestamp import Converters
from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_float_to_string
from datetime import datetime
import time

from monitoring.service import MetricsDBTools
from monitoring.service.ManagementDBTools import ManagementDB
from monitoring.service.MetricsDBTools import MetricsDB

LOGGER = logging.getLogger(__name__)

class SubscriptionManager():
    def __init__(self, metrics_db):
        self.metrics_db = metrics_db
        self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
        self.scheduler.start()
        
    def create_subscription(self,subs_queue, subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None):
        start_date = None
        end_date = None
        if sampling_duration_s:
            if not start_timestamp:
Sergio Gonzalez Diaz's avatar
Sergio Gonzalez Diaz committed
                start_timestamp = time.time()
            end_timestamp = start_timestamp + sampling_duration_s
        if start_timestamp:
            start_date = datetime.utcfromtimestamp(start_timestamp).isoformat()
        if end_timestamp:
            end_date = datetime.utcfromtimestamp(end_timestamp).isoformat()

        LOGGER.debug(f"kpi_id: {kpi_id}")
        LOGGER.debug(f"sampling_interval_s: {sampling_interval_s}")
        LOGGER.debug(f"subscription_id: {subscription_id}")
        LOGGER.debug(f"start_date: {start_date}")
        self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s),
                               trigger='interval', seconds=sampling_interval_s, start_date=start_date,
                               end_date=end_date, timezone=pytz.utc, id=str(subscription_id))
        LOGGER.debug(f"Subscrition job {subscription_id} succesfully created")

    def delete_subscription(self, subscription_id):
        self.scheduler.remove_job(subscription_id)


def ingest_data():

    metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring")

    kpi_id = "2"

    for i in range(200):
        timestamp = timestamp_utcnow_to_float()
        kpitype = "KPISAMPLETYPE_PACKETS_TRANSMITTED"
        deviceId = "DEV01"
        endpointId = "END01"
        serviceId = "SERV01"
        kpi_value = 50 * random()
        metrics_db.write_KPI(timestamp, kpi_id, kpitype, deviceId, endpointId, serviceId, kpi_value)
        time.sleep(0.01)

def scheduler_test():
    scheduler = BackgroundScheduler()

    subs_queue = Queue()

    managementdb = ManagementDB("monitoring.db")
    metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring")
    subs_manager = SubscriptionManager(metrics_db)

    scheduler.start()
    scheduler.add_job(ingest_data)

    kpi_id = "2"
    sampling_duration_s = 20
    sampling_interval_s = 3
    real_start_time     = timestamp_utcnow_to_float()
    start_timestamp     = real_start_time
    end_timestamp       = start_timestamp + sampling_duration_s

    subs_id = managementdb.insert_subscription(kpi_id, "localhost", sampling_duration_s,
                                               sampling_interval_s,start_timestamp,end_timestamp)
    subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s,
                                     sampling_duration_s,start_timestamp,end_timestamp)

    try:
        # This is here to simulate application activity (which keeps the main thread alive).
        i = 0
        while True:
            # time.sleep(sampling_interval_s)
            while not subs_queue.empty():
                list = subs_queue.get_nowait()
                # print(f"List: {list}")
                kpi_list = KpiList()
                for item in list:
                    kpi = Kpi()
                    kpi.kpi_id.kpi_id.uuid = item[0]
                    kpi.timestamp.timestamp = Converters.timestamp_string_to_float(item[1])
                    kpi.kpi_value.floatVal = item[2]
                    kpi_list.kpi.append(kpi)
                    i += 1
                    print("Kpi List: " + str(kpi_list) + " " + str(i))
            if timestamp_utcnow_to_float() > end_timestamp:
                print(f"Total metrics: {i}")
                break

    except (KeyboardInterrupt, SystemExit):
        # Not strictly necessary if daemonic mode is enabled but should be done if possible
        scheduler.shutdown()


if __name__ == '__main__':
    scheduler_test()
                start_timestamp=time.time()
            end_timestamp=start_timestamp+sampling_duration_s
Javier Moreno's avatar
Javier Moreno committed
            print("end_timestamp: " + timestamp_float_to_string(end_timestamp))
Javier Moreno's avatar
Javier Moreno committed
            start_date = datetime.utcfromtimestamp(start_timestamp)
            print("start_date: " + str(start_date))
        if end_timestamp:
            end_date = datetime.fromtimestamp(end_timestamp)
        self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(kpi_id, sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id)
        LOGGER.debug(f"Subscrition job {subscription_id} succesfully created")

    def delete_subscription(self, subscription_id):
        try:
            self.scheduler.remove_job(subscription_id)
            LOGGER.debug(f"Subscription job {subscription_id} succesfully deleted")
        except (Exception, JobLookupError) as e:
            LOGGER.debug(f"Subscription job {subscription_id} does not exists")