diff --git a/src/monitoring/service/AlarmManager.py b/src/monitoring/service/AlarmManager.py index e5ac8915c3728c7894dc70ab901215dd5a7feb41..27b169b164f0e2ca6534caf72bb0cf1ad71df8dd 100644 --- a/src/monitoring/service/AlarmManager.py +++ b/src/monitoring/service/AlarmManager.py @@ -1,3 +1,4 @@ +import pytz from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.jobstores.base import JobLookupError @@ -19,9 +20,14 @@ class AlarmManager(): end_date=None if subscription_timeout_s: start_timestamp=time.time() - start_date=datetime.fromtimestamp(start_timestamp) - end_date=datetime.fromtimestamp(start_timestamp+subscription_timeout_s) - self.scheduler.add_job(self.metrics_db.get_alarm_data, args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, end_date=end_date, id=alarm_id) + end_timestamp = start_timestamp + subscription_timeout_s + start_date = datetime.utcfromtimestamp(start_timestamp).isoformat() + end_date = datetime.utcfromtimestamp(end_timestamp).isoformat() + + self.scheduler.add_job(self.metrics_db.get_alarm_data, + args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms), + trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, + end_date=end_date,timezone=pytz.utc, id=str(alarm_id)) LOGGER.debug(f"Alarm job {alarm_id} succesfully created") def delete_alarm(self, alarm_id): diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index 76ffc781536c7fa6efc4de7610db9f9c8b4d6833..0f41cfee1a5461a2d70a61ec23093496bd43e1c4 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -205,6 +205,8 @@ class MetricsDB(): kpi_list = self.run_query(query) if kpi_list: LOGGER.debug(f"New data received for alarm of KPI {kpi_id}") + LOGGER.info(kpi_list) + valid_kpi_list = [] for kpi in kpi_list: alarm = False kpi_value = kpi[2] @@ -267,10 +269,10 @@ class MetricsDB(): if (kpi_value >= kpiMaxValue): alarm = True if alarm: - # queue.append[kpi] - alarm_queue.put_nowait(kpi) - LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") - else: - LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") + valid_kpi_list.append(kpi) + alarm_queue.put_nowait(valid_kpi_list) + LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") + else: + LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") except (Exception) as e: LOGGER.debug(f"Alarm data cannot be retrieved. {e}") \ No newline at end of file diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 927eedf5175594577d72b34820dacbe6647fd596..9f007ae8601e2c8977aec5fb93884bef8f82ccc7 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -259,9 +259,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('SubscribeKpi') try: - subs_queue = Queue() - subs_response = SubsResponse() kpi_id = request.kpi_id.kpi_id.uuid sampling_duration_s = request.sampling_duration_s @@ -279,6 +277,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): # parse queue to append kpis into the list while True: while not subs_queue.empty(): + subs_response = SubsResponse() list = subs_queue.get_nowait() for item in list: kpi = Kpi() @@ -286,12 +285,11 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) kpi.kpi_value.floatVal = item[2] # This must be improved subs_response.kpi_list.kpi.append(kpi) + subs_response.subs_id.subs_id.uuid = str(subs_id) + yield subs_response if timestamp_utcnow_to_float() > end_timestamp: break - - subs_response.subs_id.subs_id.uuid = str(subs_id) - - yield subs_response + # yield subs_response except ServiceException as e: LOGGER.exception('SubscribeKpi exception') grpc_context.abort(e.code, e.details) @@ -468,7 +466,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): try: alarm_id = request.alarm_id.alarm_id.uuid alarm_data = self.management_db.get_alarm(alarm_id) - alarm_response = AlarmResponse() + real_start_time = timestamp_utcnow_to_float() if alarm_data: LOGGER.debug(f"{alarm_data}") @@ -484,24 +482,30 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): subscription_frequency_ms = request.subscription_frequency_ms subscription_timeout_s = request.subscription_timeout_s + end_timestamp = real_start_time + subscription_timeout_s + self.alarm_manager.create_alarm(alarm_queue, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms, subscription_timeout_s) - while not alarm_queue.empty(): - list = alarm_queue.get_nowait() - for item in list: - kpi = Kpi() - kpi.kpi_id.kpi_id.uuid = str(item[0]) - kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) - kpi.kpi_value.floatVal = item[2] # This must be improved - alarm_response.kpi_list.kpi.append(kpi) - - alarm_response.alarm_id.alarm_id.uuid = alarm_id - - yield alarm_response + while True: + while not alarm_queue.empty(): + alarm_response = AlarmResponse() + list = alarm_queue.get_nowait() + size = len(list) + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = str(item[0]) + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] # This must be improved + alarm_response.kpi_list.kpi.append(kpi) + alarm_response.alarm_id.alarm_id.uuid = alarm_id + yield alarm_response + if timestamp_utcnow_to_float() > end_timestamp: + break else: LOGGER.info('GetAlarmResponseStream error: AlarmID({:s}): not found in database'.format(str(alarm_id))) + alarm_response = AlarmResponse() alarm_response.alarm_id.alarm_id.uuid = "NoID" return alarm_response except ServiceException as e: diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index fe27d6ee365676b05175b762a106621121e3b897..f76cf8c394b26c552757acec3a6185af8cd69114 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -42,14 +42,11 @@ class SubscriptionManager(): if end_timestamp: end_date = datetime.utcfromtimestamp(end_timestamp).isoformat() - LOGGER.debug(f"kpi_id: {kpi_id}") - LOGGER.debug(f"sampling_interval_s: {sampling_interval_s}") - LOGGER.debug(f"subscription_id: {subscription_id}") - LOGGER.debug(f"start_date: {start_date}") self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s), trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, timezone=pytz.utc, id=str(subscription_id)) LOGGER.debug(f"Subscrition job {subscription_id} succesfully created") def delete_subscription(self, subscription_id): - self.scheduler.remove_job(subscription_id) \ No newline at end of file + self.scheduler.remove_job(subscription_id) + LOGGER.debug(f"Subscription job {subscription_id} succesfully deleted") \ No newline at end of file diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index 23e0867c1533b747f978ba087172fccffddc5300..a4c210b615dd54f747530febb21c24cd95ca13fc 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -125,11 +125,16 @@ def alarm_descriptor_b(): return _alarm_descriptor def alarm_subscription(alarm_id): - _alarm_descriptor = monitoring_pb2.AlarmSubscription() + _alarm_subscription = monitoring_pb2.AlarmSubscription() - _alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm_id.alarm_id.uuid) + subscription_timeout_s = 20 + subscription_frequency_ms = 3000 - return _alarm_descriptor + _alarm_subscription.alarm_id.alarm_id.uuid = str(alarm_id.alarm_id.uuid) + _alarm_subscription.subscription_timeout_s = subscription_timeout_s + _alarm_subscription.subscription_frequency_ms = subscription_frequency_ms + + return _alarm_subscription def alarm_id(): diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index 33e9d3224baa80263af88f9434eac6de0955b5da..9eff2e5c9240bdc77fc3b31df3ce4b78cabd7bd1 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -197,7 +197,7 @@ def ingestion_data(): metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, kpi_value) - sleep(0.05) + sleep(0.1) ########################### # Tests Implementation @@ -353,14 +353,15 @@ def test_get_alarm_descriptor(monitoring_client): assert isinstance(_response, AlarmDescriptor) # Test case that makes use of client fixture to test server's GetAlarmResponseStream method -def test_get_alarm_response_stream(monitoring_client): +def test_get_alarm_response_stream(monitoring_client,subs_scheduler): LOGGER.warning('test_get_alarm_descriptor') _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + subs_scheduler.add_job(ingestion_data) _response = monitoring_client.GetAlarmResponseStream(alarm_subscription(_alarm_id)) assert isinstance(_response, _MultiThreadedRendezvous) for item in _response: - LOGGER.debug(_response) + LOGGER.debug(item) assert isinstance(item,AlarmResponse) # Test case that makes use of client fixture to test server's DeleteAlarm method