import os from queue import Queue from random import random from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger from common.proto.monitoring_pb2 import Kpi, KpiList from common.tools.timestamp import Converters from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_float_to_string from datetime import datetime import time from monitoring.service import MetricsDBTools from monitoring.service.ManagementDBTools import ManagementDB class SubscriptionManager(): def __init__(self, metrics_db): self.metrics_db = metrics_db self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) self.scheduler.start() def create_subscription(self, subs_queue ,subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): start_date=None end_date=None print("Inside create subscription") if sampling_duration_s: if not start_timestamp:# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from queue import Queue from random import random import pytz from apscheduler.schedulers.background import BackgroundScheduler from common.proto.monitoring_pb2 import Kpi, KpiList from common.tools.timestamp import Converters from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_float_to_string from datetime import datetime import time from monitoring.service import MetricsDBTools from monitoring.service.ManagementDBTools import ManagementDB from monitoring.service.MetricsDBTools import MetricsDB LOGGER = logging.getLogger(__name__) class SubscriptionManager(): def __init__(self, metrics_db): self.metrics_db = metrics_db self.scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) self.scheduler.start() def create_subscription(self,subs_queue, subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): start_date = None end_date = None if sampling_duration_s: if not start_timestamp: start_timestamp = time.time() end_timestamp = start_timestamp + sampling_duration_s if start_timestamp: start_date = datetime.utcfromtimestamp(start_timestamp).isoformat() if end_timestamp: end_date = datetime.utcfromtimestamp(end_timestamp).isoformat() LOGGER.debug(f"kpi_id: {kpi_id}") LOGGER.debug(f"sampling_interval_s: {sampling_interval_s}") LOGGER.debug(f"subscription_id: {subscription_id}") LOGGER.debug(f"start_date: {start_date}") self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s), trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, timezone=pytz.utc, id=str(subscription_id)) LOGGER.debug(f"Subscrition job {subscription_id} succesfully created") def delete_subscription(self, subscription_id): self.scheduler.remove_job(subscription_id) def ingest_data(): metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring") kpi_id = "2" for i in range(200): timestamp = timestamp_utcnow_to_float() kpitype = "KPISAMPLETYPE_PACKETS_TRANSMITTED" deviceId = "DEV01" endpointId = "END01" serviceId = "SERV01" kpi_value = 50 * random() metrics_db.write_KPI(timestamp, kpi_id, kpitype, deviceId, endpointId, serviceId, kpi_value) time.sleep(0.01) def scheduler_test(): scheduler = BackgroundScheduler() subs_queue = Queue() managementdb = ManagementDB("monitoring.db") metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring") subs_manager = SubscriptionManager(metrics_db) scheduler.start() scheduler.add_job(ingest_data) kpi_id = "2" sampling_duration_s = 20 sampling_interval_s = 3 real_start_time = timestamp_utcnow_to_float() start_timestamp = real_start_time end_timestamp = start_timestamp + sampling_duration_s subs_id = managementdb.insert_subscription(kpi_id, "localhost", sampling_duration_s, sampling_interval_s,start_timestamp,end_timestamp) subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s, sampling_duration_s,start_timestamp,end_timestamp) try: # This is here to simulate application activity (which keeps the main thread alive). i = 0 while True: # time.sleep(sampling_interval_s) while not subs_queue.empty(): list = subs_queue.get_nowait() # print(f"List: {list}") kpi_list = KpiList() for item in list: kpi = Kpi() kpi.kpi_id.kpi_id.uuid = item[0] kpi.timestamp.timestamp = Converters.timestamp_string_to_float(item[1]) kpi.kpi_value.floatVal = item[2] kpi_list.kpi.append(kpi) i += 1 print("Kpi List: " + str(kpi_list) + " " + str(i)) if timestamp_utcnow_to_float() > end_timestamp: print(f"Total metrics: {i}") break except (KeyboardInterrupt, SystemExit): # Not strictly necessary if daemonic mode is enabled but should be done if possible scheduler.shutdown() if __name__ == '__main__': scheduler_test() start_timestamp=time.time() end_timestamp=start_timestamp+sampling_duration_s print("end_timestamp: " + timestamp_float_to_string(end_timestamp)) if start_timestamp: start_date = datetime.utcfromtimestamp(start_timestamp) print("start_date: " + str(start_date)) if end_timestamp: end_date = datetime.utcfromtimestamp(end_timestamp) print("end_date: " + str(end_date)) self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue, kpi_id, start_timestamp, sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) self.metrics_db.get_subscription_data(subs_queue,kpi_id,end_date,sampling_interval_s) def delete_subscription(self, subscription_id): self.scheduler.remove_job(subscription_id) def main(): subs_queue = Queue() managementdb = ManagementDB("monitoring.db") metrics_db = MetricsDBTools.MetricsDB("localhost", "9009", "9000", "monitoring") subs_manager = SubscriptionManager(metrics_db) print("Here") kpi_id = "2" sampling_duration_s = 10 sampling_interval_s = 2 start_timestamp = timestamp_utcnow_to_float() end_timestamp = timestamp_utcnow_to_float() + 10 print("Before loop") print("start_timestamp: " + timestamp_float_to_string(start_timestamp)) print("end_timestamp: " + timestamp_float_to_string(end_timestamp)) for i in range(10): timestamp = timestamp_utcnow_to_float() kpitype = "KPISAMPLETYPE_PACKETS_TRANSMITTED" deviceId = "DEV01" endpointId = "END01" serviceId = "SERV01" kpi_value = 50*random() metrics_db.write_KPI(timestamp,kpi_id,kpitype,deviceId,endpointId,serviceId,kpi_value) time.sleep(1) print("After loop") subs_id = managementdb.insert_subscription(kpi_id, "localhost", sampling_duration_s, sampling_interval_s, start_timestamp, end_timestamp) subs_manager.create_subscription(subs_queue,str(subs_id),kpi_id,sampling_interval_s,sampling_duration_s,start_timestamp,end_timestamp) print("Queue empty: " + str(subs_queue.empty())) print("Queue size: " + str(subs_queue.qsize())) while not subs_queue.empty(): list = subs_queue.get_nowait() print("List: " + str(list)) kpi_list = KpiList() for item in list: kpi = Kpi() kpi.kpi_id.kpi_id.uuid = item[0] kpi.timestamp.timestamp = Converters.timestamp_string_to_float(item[1]) kpi.kpi_value.floatVal = item[2] kpi_list.kpi.append(kpi) print("Kpi List: " + str(kpi_list)) if __name__ == '__main__': main()