Skip to content
Snippets Groups Projects
Commit c6e8a259 authored by Javier Moreno's avatar Javier Moreno
Browse files

Merge from develop (WIP)

parent 1ca95b17
No related branches found
No related tags found
2 merge requests!5Feat/monitoring subscriptions,!4Compute component:
......@@ -11,8 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from monitoring.service.MonitoringServiceServicerImpl import LOGGER
from common.tools.timestamp.Converters import timestamp_utcnow_to_float
import sqlite3 as sl
import os
ROOT_DIR = os.path.abspath(os.curdir)
class ManagementDB():
def __init__(self, database):
......@@ -121,4 +124,12 @@ class ManagementDB():
#print("\n")
#for row in data:
# print(row)
return data.fetchall()
\ No newline at end of file
return data.fetchall()
def main():
managementdb = ManagementDB("monitoring.db")
subs_id = managementdb.insert_subscription(2,"localhost",20,2,timestamp_utcnow_to_float(),timestamp_utcnow_to_float() + 10)
print("subs id: ", str(subs_id))
if __name__ == '__main__':
main()
......@@ -76,10 +76,19 @@ class MetricsDB():
self.run_query(query)
LOGGER.info(f"Table {self.table} created")
def get_subscription_data(self, subs_queue, kpi_id, end_date, sampling_interval_s):
start_date = end_date-sampling_interval_s
query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')"
def get_subscription_data(self, subs_queue, kpi_id, end_date, sampling_interval_s, callback_return):
str_end_date = str(end_date.isoformat()) + 'Z'
print("str_end_date: " + str_end_date)
start_date = end_date - datetime.timedelta(seconds=sampling_interval_s)
str_start_date = str(start_date.isoformat()) + 'Z'
print("str_start_date: " + str(str_start_date))
# query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '2022-09-28T07:21:26.595586Z' AND '2022-09-28T07:32:34.197792Z')"
query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{str_start_date}' AND '{str_end_date}')"
response=self.run_query(query)
kpi_list=response['dataset']
# subs_queue.append(kpi_list)
# print(kpi_list)
subs_queue.put_nowait(kpi_list)
# return kpi_list
if callback_return:
callback_return(kpi_list)
import os
from queue import Queue
from random import random
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from common.tools.timestamp.Converters import timestamp_utcnow_to_float
from datetime import datetime, timezone
from common.proto.monitoring_pb2 import Kpi, KpiList
from common.tools.timestamp import Converters
from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_float_to_string
from datetime import datetime
import time
from monitoring.service import MetricsDBTools
from monitoring.service.ManagementDBTools import ManagementDB
class SubscriptionManager():
def __init__(self, metrics_db):
......@@ -17,15 +27,82 @@ class SubscriptionManager():
def create_subscription(self, subs_queue ,subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None):
start_date=None
end_date=None
print("Inside create subscription")
if sampling_duration_s:
if not start_timestamp:
start_timestamp=time.time()
end_timestamp=start_timestamp+sampling_duration_s
print("end_timestamp: " + timestamp_float_to_string(end_timestamp))
if start_timestamp:
start_date = datetime.fromtimestamp(start_timestamp)
start_date = datetime.utcfromtimestamp(start_timestamp)
print("start_date: " + str(start_date))
if end_timestamp:
end_date = datetime.fromtimestamp(end_timestamp)
self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue, kpi_id, timestamp_utcnow_to_float(), sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id)
end_date = datetime.utcfromtimestamp(end_timestamp)
print("end_date: " + str(end_date))
self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue, kpi_id, start_timestamp, sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id)
self.metrics_db.get_subscription_data(subs_queue,kpi_id,end_date,sampling_interval_s)
def delete_subscription(self, subscription_id):
self.scheduler.remove_job(subscription_id)
\ No newline at end of file
self.scheduler.remove_job(subscription_id)
def main():
subs_queue = Queue()
managementdb = ManagementDB("monitoring.db")
metrics_db = MetricsDBTools.MetricsDB("localhost", "9009", "9000", "monitoring")
subs_manager = SubscriptionManager(metrics_db)
print("Here")
kpi_id = "2"
sampling_duration_s = 10
sampling_interval_s = 2
start_timestamp = timestamp_utcnow_to_float()
end_timestamp = timestamp_utcnow_to_float() + 10
print("Before loop")
print("start_timestamp: " + timestamp_float_to_string(start_timestamp))
print("end_timestamp: " + timestamp_float_to_string(end_timestamp))
for i in range(10):
timestamp = timestamp_utcnow_to_float()
kpitype = "KPISAMPLETYPE_PACKETS_TRANSMITTED"
deviceId = "DEV01"
endpointId = "END01"
serviceId = "SERV01"
kpi_value = 50*random()
metrics_db.write_KPI(timestamp,kpi_id,kpitype,deviceId,endpointId,serviceId,kpi_value)
time.sleep(1)
print("After loop")
subs_id = managementdb.insert_subscription(kpi_id, "localhost", sampling_duration_s, sampling_interval_s, start_timestamp,
end_timestamp)
subs_manager.create_subscription(subs_queue,str(subs_id),kpi_id,sampling_interval_s,sampling_duration_s,start_timestamp,end_timestamp)
print("Queue empty: " + str(subs_queue.empty()))
print("Queue size: " + str(subs_queue.qsize()))
while not subs_queue.empty():
list = subs_queue.get_nowait()
print("List: " + str(list))
kpi_list = KpiList()
for item in list:
kpi = Kpi()
kpi.kpi_id.kpi_id.uuid = item[0]
kpi.timestamp.timestamp = Converters.timestamp_string_to_float(item[1])
kpi.kpi_value.floatVal = item[2]
kpi_list.kpi.append(kpi)
print("Kpi List: " + str(kpi_list))
if __name__ == '__main__':
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment