Commit 102810ea authored by kesnar's avatar kesnar
Browse files

fix: various policy fixes

parent 9cfdce17
Loading
Loading
Loading
Loading
+59 −135
Original line number Diff line number Diff line
@@ -55,7 +55,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.jboss.logging.Logger;
@@ -70,6 +69,7 @@ public class PolicyServiceImpl implements PolicyService {
    private static final int ACCEPTABLE_NUMBER_OF_ALARMS = 3;
    private static final int MONITORING_WINDOW_IN_SECONDS = 5;
    private static final int SAMPLING_RATE_PER_SECOND = 1;
    // TODO: Find a better way to disregard alarms while reconfiguring path
    // Temporary solution for not calling the same rpc more than it's needed
    private static int noAlarms = 0;

@@ -157,7 +157,6 @@ public class PolicyServiceImpl implements PolicyService {

            return Uni.createFrom().item(policyRuleState);
        }
        LOGGER.infof("Passed 1st");

        final var policyRuleBasic = policyRuleService.getPolicyRuleBasic();
        if (!policyRuleBasic.areArgumentsValid()) {
@@ -167,14 +166,11 @@ public class PolicyServiceImpl implements PolicyService {
                            PolicyRuleStateEnum.POLICY_FAILED, policyRuleBasic.getExeceptionMessage());
            return Uni.createFrom().item(policyRuleState);
        }
        LOGGER.infof("Passed 2nd");

        final var serviceId = policyRuleService.getServiceId();
        final var deviceIds = policyRuleService.getDeviceIds();
        final var isServiceValid = policyRuleConditionValidator.isServiceIdValid(serviceId, deviceIds);

        LOGGER.infof("Passed 3rd %s", isServiceValid);

        return isServiceValid
                .onItem()
                .transform(
@@ -188,14 +184,10 @@ public class PolicyServiceImpl implements PolicyService {
                                return policyRuleState;
                            }

                            LOGGER.infof("Passed 4th %s", isService);

                            final var policyRuleTypeService = new PolicyRuleTypeService(policyRuleService);
                            final var policyRule = new PolicyRule(policyRuleTypeService);
                            final var alarmDescriptorList = createAlarmDescriptorList(policyRule);

                            LOGGER.infof("Passed 5th %s", alarmDescriptorList);

                            if (alarmDescriptorList.isEmpty()) {
                                var policyRuleState =
                                        new PolicyRuleState(
@@ -205,9 +197,9 @@ public class PolicyServiceImpl implements PolicyService {
                                                        policyRuleBasic.getPolicyRuleId()));
                                return policyRuleState;
                            }
                            contextService.setPolicyRule(policyRule);

                            LOGGER.infof("Passed 6th");
                            contextService.setPolicyRule(policyRule).subscribe().with(x -> {});
                            setPolicyRuleServiceToContext(policyRuleService, VALIDATED_POLICYRULE_STATE);
                            noAlarms = 0;

                            // Create an alarmIds list that contains the promised ids returned from setKpiAlarm
                            List<Uni<String>> alarmIds = new ArrayList<Uni<String>>();
@@ -215,12 +207,8 @@ public class PolicyServiceImpl implements PolicyService {
                                LOGGER.infof("alarmDescriptor:");
                                LOGGER.infof(alarmDescriptor.toString());
                                alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor));
                                noAlarms = 0;
                            }

                            LOGGER.infof("Passed 7th");
                            LOGGER.infof("%s", alarmIds);

                            // Transform the alarmIds into promised alarms returned from the
                            // getAlarmResponseStream
                            List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>();
@@ -232,21 +220,21 @@ public class PolicyServiceImpl implements PolicyService {
                                                        id -> {
                                                            alarmPolicyRuleServiceMap.put(id, policyRuleService);

                                                            var alarmSubscription = new AlarmSubscription(id, 60, 5000);
                                                            // TODO: Create infinite subscription
                                                            var alarmSubscription = new AlarmSubscription(id, 259200, 5000);
                                                            return monitoringService.getAlarmResponseStream(alarmSubscription);
                                                        }));
                            }

                            // Merge the promised alarms into one stream (Multi Object)
                            final var multi = Multi.createBy().merging().streams(alarmResponseStreamList);
                            setPolicyRuleServiceToContext(policyRuleService, PROVISIONED_POLICYRULE_STATE);

                            monitorAlarmResponseForService(multi);

                            // Check that the stream has ended and in that case resubscribe
                            // multi.onCompletion().invoke(() -> LOGGER.infof("************************ THE STREAM
                            // HAS ENDED *****************"));
                            // TODO: Resubscribe to the stream, if it has ended

                            LOGGER.infof("Passed 8th");
                            // TODO: Redesign evaluation of action
                            // evaluateAction(policyRule, alarmDescriptorList, multi);

                            return VALIDATED_POLICYRULE_STATE;
@@ -307,22 +295,43 @@ public class PolicyServiceImpl implements PolicyService {
                                return policyRuleState;
                            }

                            List<AlarmSubscription> alarmSubscriptionList = new ArrayList<>();
                            contextService.setPolicyRule(policyRule).subscribe().with(x -> {});
                            setPolicyRuleDeviceToContext(policyRuleDevice, VALIDATED_POLICYRULE_STATE);
                            noAlarms = 0;

                            List<Uni<String>> alarmIds = new ArrayList<Uni<String>>();
                            for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) {
                                monitoringService
                                        .setKpiAlarm(alarmDescriptor)
                                        .subscribe()
                                        .with(
                                                alarmId -> {
                                                    alarmSubscriptionList.add(new AlarmSubscription(alarmId, 30, 4000));
                                                });
                                LOGGER.infof("alarmDescriptor:");
                                LOGGER.infof(alarmDescriptor.toString());
                                alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor));
                            }

                            //                          final var multi =
                            //                                  setAlarmResponseStream(
                            //                                          policyRule, alarmDescriptorList,
                            // alarmSubscriptionList, false);
                            //                          monitorAlarmResponseForDevice(multi);
                            // Transform the alarmIds into promised alarms returned from the
                            // getAlarmResponseStream
                            List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>();
                            for (Uni<String> alarmId : alarmIds) {
                                alarmResponseStreamList.add(
                                        alarmId
                                                .onItem()
                                                .transformToMulti(
                                                        id -> {
                                                            alarmPolicyRuleDeviceMap.put(id, policyRuleDevice);

                                                            // TODO: Create infinite subscription
                                                            var alarmSubscription = new AlarmSubscription(id, 259200, 5000);
                                                            return monitoringService.getAlarmResponseStream(alarmSubscription);
                                                        }));
                            }

                            // Merge the promised alarms into one stream (Multi Object)
                            final var multi = Multi.createBy().merging().streams(alarmResponseStreamList);
                            setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE);

                            monitorAlarmResponseForDevice(multi);

                            // TODO: Resubscribe to the stream, if it has ended

                            // TODO: Redesign evaluation of action
                            // evaluateAction(policyRule, alarmDescriptorList, multi);

                            return VALIDATED_POLICYRULE_STATE;
@@ -460,10 +469,6 @@ public class PolicyServiceImpl implements PolicyService {
            if (alarmDescriptorList.isEmpty()) {
                return List.of();
            }
            // Moved in subscription, as we need the returned value
            // for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) {
            //    alarmPolicyRuleServiceMap.put(alarmDescriptor.getAlarmId(), policyRuleService);
            // }
        } else {
            final var policyRuleDevice = (PolicyRuleDevice) policyRuleTypeSpecificType;
            final var policyRuleBasic = policyRuleDevice.getPolicyRuleBasic();
@@ -480,48 +485,7 @@ public class PolicyServiceImpl implements PolicyService {
        return alarmDescriptorList;
    }

    //    Deprecated
    //    private Multi<AlarmResponse> setAlarmResponseStream(
    //            PolicyRule policyRule,
    //            List<Uni<String>> alarmIds,
    //            Boolean isService) {
    //
    //        LOGGER.infof("Just entered setAlarmResponseStream");
    //
    //        List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>();
    //        for (Uni<String> alarmId : alarmIds) {
    //            alarmId
    //                .subscribe()
    //                .with(
    //                        id -> {
    //                            var alarmSubscription = new AlarmSubscription(id, 60, 500);
    //                            LOGGER.infof("Creating Alarm Subscription with id: %s", id);
    //
    // //alarmResponseStreamList.add(monitoringService.getAlarmResponseStream(alarmSubscription));
    //
    // monitoringService.getAlarmResponseStream(alarmSubscription).subscribe().with(x ->
    // {LOGGER.info(x);});
    //                            LOGGER.infof("Created Alarm Subscription with id: %s", id);
    //                        });
    //        }
    //
    // LOGGER.info("*****************************************************************************");
    //
    //        for (Multi<AlarmResponse> a: alarmResponseStreamList) {
    //            LOGGER.infof("Begin subscribing to the multi object");
    //            a
    //                .subscribe()
    //                .with(
    //                    b -> {
    //                        LOGGER.infof("b");
    //                    }
    //                );
    //
    //        return Multi.createBy().merging().streams(alarmResponseStreamList);
    //    }

    private void monitorAlarmResponseForService(Multi<AlarmResponse> multi) {
        LOGGER.infof("Just entered monitorAlarmResponseForService");
        multi
                .subscribe()
                .with(
@@ -530,58 +494,24 @@ public class PolicyServiceImpl implements PolicyService {
                            LOGGER.infof("alarmResponse:");
                            LOGGER.info(alarmResponse);
                            LOGGER.info(alarmResponse.getAlarmId());
                            if (alarmPolicyRuleServiceMap.containsKey(alarmResponse.getAlarmId())) {
                            applyActionService(alarmResponse.getAlarmId());
                            }
                        });
    }

    private void monitorAlarmResponseForDevice(Multi<AlarmResponse> multi) {
        multi
                .select()
                .first()
                .subscribe()
                .with(
                        alarmResponse -> {
                            LOGGER.infof("**************************Received Alarm!**************************");
                            LOGGER.infof("alarmResponse:");
                            LOGGER.info(alarmResponse);
                            if (!alarmPolicyRuleDeviceMap.containsKey(alarmResponse.getAlarmId())) {
                                return;
                            }
                            LOGGER.info(alarmResponse.getAlarmId());
                            applyActionDevice(alarmResponse.getAlarmId());
                        });
    }

    private void evaluateAction(
            PolicyRule policyRule,
            List<AlarmDescriptor> alarmDescriptorList,
            Multi<AlarmResponse> multi) {

        multi
                .collect()
                .with(Collectors.counting())
                .subscribe()
                .with(
                        count -> {
                            LOGGER.infof("Inside evaluateAction");
                            LOGGER.infof(count.toString(count));
                            if (count > ACCEPTABLE_NUMBER_OF_ALARMS) {
                                for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) {
                                    monitoringService
                                            .deleteAlarm(alarmDescriptor.getAlarmId())
                                            .subscribe()
                                            .with(
                                                    emptyMessage ->
                                                            LOGGER.infof(
                                                                    "Alarm [%s] has been deleted as ineffective.\n",
                                                                    alarmDescriptor.getAlarmId()));
                                }
                                setPolicyRuleToContext(policyRule, INEFFECTIVE_POLICYRULE_STATE);
                            } else {
                                setPolicyRuleToContext(policyRule, EFFECTIVE_POLICYRULE_STATE);
                            }
                        });
    }

    // TODO: To be refactored or deprecated
    //    private void evaluateAction(
    //            PolicyRule policyRule,
    //            List<AlarmDescriptor> alarmDescriptorList,
@@ -728,17 +658,17 @@ public class PolicyServiceImpl implements PolicyService {
    private void callRecalculatePathRPC(
            PolicyRuleService policyRuleService, PolicyRuleAction policyRuleAction) {

        LOGGER.info("Inside callRecalculatePathRPC");

        final var deserializedServiceUni = contextService.getService(policyRuleService.getServiceId());

        deserializedServiceUni
                .subscribe()
                .with(
                        deserializedService -> {
                            serviceService.recomputeConnections(deserializedService)
                            serviceService
                                    .recomputeConnections(deserializedService)
                                    .subscribe()
                            .with( x -> {
                                    .with(
                                            x -> {
                                                LOGGER.info("called recomputeConnections with:");
                                                LOGGER.info(deserializedService);
                                                setPolicyRuleServiceToContext(policyRuleService, ENFORCED_POLICYRULE_STATE);
@@ -747,15 +677,9 @@ public class PolicyServiceImpl implements PolicyService {
    }

    private void applyActionService(String alarmId) {
        LOGGER.info("Inside applyActionService");
        PolicyRuleService policyRuleService = alarmPolicyRuleServiceMap.get(alarmId);
        LOGGER.info(policyRuleService);
        PolicyRuleAction policyRuleAction =
                policyRuleService.getPolicyRuleBasic().getPolicyRuleActions().get(0);
        LOGGER.info(policyRuleAction);

        PolicyRuleAction test = policyRuleActionMap.get(alarmId);
        LOGGER.info(test);

        if (noAlarms == 0) {
            noAlarms++;
@@ -772,7 +696,7 @@ public class PolicyServiceImpl implements PolicyService {
                    LOGGER.errorf(INVALID_MESSAGE, policyRuleAction.getPolicyRuleActionEnum());
                    return;
            }
        } else if (noAlarms == 5) {
        } else if (noAlarms == 2) {
            noAlarms = 0;
        } else {
            noAlarms++;
+0 −5
Original line number Diff line number Diff line
@@ -1557,8 +1557,6 @@ public class Serializer {
    public Monitoring.KpiValueRange serialize(KpiValueRange kpiValueRange) {
        final var builder = Monitoring.KpiValueRange.newBuilder();

        LOGGER.infof("inside KpiValueRange %s", kpiValueRange);

        final var kpiValueMin = kpiValueRange.getKpiMinValue();
        final var kpiValueMax = kpiValueRange.getKpiMaxValue();

@@ -1614,7 +1612,6 @@ public class Serializer {
    }

    public Monitoring.AlarmDescriptor serialize(AlarmDescriptor alarmDescriptor) {
        LOGGER.infof("inside serialize %s", alarmDescriptor);
        final var builder = Monitoring.AlarmDescriptor.newBuilder();

        // final var alarmId = alarmDescriptor.getAlarmId();
@@ -1636,8 +1633,6 @@ public class Serializer {
        builder.setKpiValueRange(serializedKpiValueRange);
        // builder.setTimestamp(serializedTimestamp);

        LOGGER.infof("just before leaving serialize");

        return builder.build();
    }

+0 −1
Original line number Diff line number Diff line
@@ -97,7 +97,6 @@ public class ContextGatewayImpl implements ContextGateway {
    public Uni<String> setPolicyRule(PolicyRule policyRule) {
        // return Uni.createFrom().item("571eabc1-0f59-48da-b608-c45876c3fa8a");
        final var serializedPolicyRuleBasic = serializer.serialize(policyRule);
        LOGGER.infof("Inside setPolicyRule");

        var ret =
                streamingDelegateContextPolicy
+0 −6
Original line number Diff line number Diff line
@@ -102,9 +102,6 @@ public class MonitoringGatewayImpl implements MonitoringGateway {
    public Uni<String> setKpiAlarm(AlarmDescriptor alarmDescriptor) {
        final var serializedAlarmDescriptor = serializer.serialize(alarmDescriptor);

        LOGGER.infof("inside setKpiAlarm");
        LOGGER.infof("AlarmDescriptor = %s", serializedAlarmDescriptor);

        return streamingDelegateMonitoring
                .setKpiAlarm(serializedAlarmDescriptor)
                .onItem()
@@ -125,9 +122,6 @@ public class MonitoringGatewayImpl implements MonitoringGateway {
    public Multi<AlarmResponse> getAlarmResponseStream(AlarmSubscription alarmSubscription) {
        final var serializedAlarmSubscription = serializer.serialize(alarmSubscription);

        LOGGER.infof("inside getAlarmResponseStream");
        LOGGER.infof("AlarmSubscription = %s", serializedAlarmSubscription);

        return streamingDelegateMonitoring
                .getAlarmResponseStream(serializedAlarmSubscription)
                .onItem()
+1 −0
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@ public class AlarmSubscription {
    private final float subscriptionTimeoutS;
    private final float subscriptionFrequencyMs;

    // TODO: Refactor the AlarmSubscription constructor to allow infinite subscriptionTimeoutS
    public AlarmSubscription(
            String alarmId, float subscriptionTimeoutS, float subscriptionFrequencyMs) {
        this.alarmId = alarmId;
Loading