Commit d3b14a1d authored by Francisco-Javier Moreno-Muro's avatar Francisco-Javier Moreno-Muro
Browse files

Issues with subscription and alarm managers solved

parent 9a69ab50
Loading
Loading
Loading
Loading
+9 −3
Original line number Diff line number Diff line
import pytz
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.jobstores.base import JobLookupError
@@ -19,9 +20,14 @@ class AlarmManager():
        end_date=None
        if subscription_timeout_s:
            start_timestamp=time.time()
            start_date=datetime.fromtimestamp(start_timestamp)
            end_date=datetime.fromtimestamp(start_timestamp+subscription_timeout_s)
        self.scheduler.add_job(self.metrics_db.get_alarm_data, args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, end_date=end_date, id=alarm_id)
            end_timestamp = start_timestamp + subscription_timeout_s
            start_date = datetime.utcfromtimestamp(start_timestamp).isoformat()
            end_date = datetime.utcfromtimestamp(end_timestamp).isoformat()

        self.scheduler.add_job(self.metrics_db.get_alarm_data,
                               args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),
                               trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date,
                               end_date=end_date,timezone=pytz.utc, id=str(alarm_id))
        LOGGER.debug(f"Alarm job {alarm_id} succesfully created")

    def delete_alarm(self, alarm_id):
+7 −5
Original line number Diff line number Diff line
@@ -205,6 +205,8 @@ class MetricsDB():
                kpi_list = self.run_query(query)
            if kpi_list:
                LOGGER.debug(f"New data received for alarm of KPI {kpi_id}")
                LOGGER.info(kpi_list)
                valid_kpi_list = []
                for kpi in kpi_list:
                    alarm = False
                    kpi_value = kpi[2]
@@ -267,8 +269,8 @@ class MetricsDB():
                        if (kpi_value >= kpiMaxValue):
                            alarm = True
                    if alarm:
                        # queue.append[kpi]
                        alarm_queue.put_nowait(kpi)
                        valid_kpi_list.append(kpi)
                alarm_queue.put_nowait(valid_kpi_list)
                LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}")
            else:
                LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}")
+23 −19
Original line number Diff line number Diff line
@@ -259,9 +259,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):

        LOGGER.info('SubscribeKpi')
        try:

            subs_queue = Queue()
            subs_response = SubsResponse()

            kpi_id = request.kpi_id.kpi_id.uuid
            sampling_duration_s = request.sampling_duration_s
@@ -279,6 +277,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
            # parse queue to append kpis into the list
            while True:
                while not subs_queue.empty():
                    subs_response = SubsResponse()
                    list = subs_queue.get_nowait()
                    for item in list:
                        kpi = Kpi()
@@ -286,12 +285,11 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
                        kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
                        kpi.kpi_value.floatVal = item[2]  # This must be improved
                        subs_response.kpi_list.kpi.append(kpi)
                if timestamp_utcnow_to_float() > end_timestamp:
                    break

                    subs_response.subs_id.subs_id.uuid = str(subs_id)

                    yield subs_response
                if timestamp_utcnow_to_float() > end_timestamp:
                    break
            # yield subs_response
        except ServiceException as e:
            LOGGER.exception('SubscribeKpi exception')
            grpc_context.abort(e.code, e.details)
@@ -468,7 +466,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
        try:
            alarm_id = request.alarm_id.alarm_id.uuid
            alarm_data = self.management_db.get_alarm(alarm_id)
            alarm_response = AlarmResponse()
            real_start_time = timestamp_utcnow_to_float()

            if alarm_data:
                LOGGER.debug(f"{alarm_data}")
@@ -484,24 +482,30 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
                subscription_frequency_ms = request.subscription_frequency_ms
                subscription_timeout_s = request.subscription_timeout_s

                end_timestamp = real_start_time + subscription_timeout_s

                self.alarm_manager.create_alarm(alarm_queue, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange,
                                                includeMinValue, includeMaxValue, subscription_frequency_ms,
                                                subscription_timeout_s)

                while True:
                    while not alarm_queue.empty():
                        alarm_response = AlarmResponse()
                        list = alarm_queue.get_nowait()
                        size = len(list)
                        for item in list:
                            kpi = Kpi()
                            kpi.kpi_id.kpi_id.uuid = str(item[0])
                            kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
                            kpi.kpi_value.floatVal = item[2]  # This must be improved
                            alarm_response.kpi_list.kpi.append(kpi)

                        alarm_response.alarm_id.alarm_id.uuid = alarm_id

                        yield alarm_response
                    if timestamp_utcnow_to_float() > end_timestamp:
                        break
            else:
                LOGGER.info('GetAlarmResponseStream error: AlarmID({:s}): not found in database'.format(str(alarm_id)))
                alarm_response = AlarmResponse()
                alarm_response.alarm_id.alarm_id.uuid = "NoID"
                return alarm_response
        except ServiceException as e:
+2 −5
Original line number Diff line number Diff line
@@ -42,10 +42,6 @@ class SubscriptionManager():
        if end_timestamp:
            end_date = datetime.utcfromtimestamp(end_timestamp).isoformat()

        LOGGER.debug(f"kpi_id: {kpi_id}")
        LOGGER.debug(f"sampling_interval_s: {sampling_interval_s}")
        LOGGER.debug(f"subscription_id: {subscription_id}")
        LOGGER.debug(f"start_date: {start_date}")
        self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s),
                               trigger='interval', seconds=sampling_interval_s, start_date=start_date,
                               end_date=end_date, timezone=pytz.utc, id=str(subscription_id))
@@ -53,3 +49,4 @@ class SubscriptionManager():

    def delete_subscription(self, subscription_id):
        self.scheduler.remove_job(subscription_id)
        LOGGER.debug(f"Subscription job {subscription_id} succesfully deleted")
 No newline at end of file
+8 −3
Original line number Diff line number Diff line
@@ -125,11 +125,16 @@ def alarm_descriptor_b():
    return _alarm_descriptor

def alarm_subscription(alarm_id):
    _alarm_descriptor = monitoring_pb2.AlarmSubscription()
    _alarm_subscription = monitoring_pb2.AlarmSubscription()

    _alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm_id.alarm_id.uuid)
    subscription_timeout_s = 20
    subscription_frequency_ms = 3000

    return _alarm_descriptor
    _alarm_subscription.alarm_id.alarm_id.uuid      = str(alarm_id.alarm_id.uuid)
    _alarm_subscription.subscription_timeout_s      = subscription_timeout_s
    _alarm_subscription.subscription_frequency_ms   = subscription_frequency_ms

    return _alarm_subscription


def alarm_id():
Loading