Commit 7150a413 authored by Georgios Katsikas's avatar Georgios Katsikas
Browse files

Merge branch 'policy/refactor_monitoring_proto' into 'develop'

refactor(policy): add monitoring proto changes

See merge request !21
parents 790270c2 ad5de17a
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -201,7 +201,7 @@
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <compilerArgs>
                        <arg>-Xlint:deprecation</arg>
                        <arg>-Xlint:unchecked</arg>
                    </compilerArgs>
                </configuration>
            </plugin>
+139 −94
Original line number Diff line number Diff line
@@ -27,7 +27,6 @@ import eu.teraflow.policy.context.model.ConstraintTypeCustom;
import eu.teraflow.policy.context.model.ServiceConfig;
import eu.teraflow.policy.device.DeviceService;
import eu.teraflow.policy.model.BooleanOperator;
import eu.teraflow.policy.model.NumericalOperator;
import eu.teraflow.policy.model.PolicyRule;
import eu.teraflow.policy.model.PolicyRuleAction;
import eu.teraflow.policy.model.PolicyRuleActionConfig;
@@ -45,6 +44,7 @@ import eu.teraflow.policy.monitoring.model.AlarmDescriptor;
import eu.teraflow.policy.monitoring.model.AlarmResponse;
import eu.teraflow.policy.monitoring.model.AlarmSubscription;
import eu.teraflow.policy.monitoring.model.KpiValueRange;
import eu.teraflow.policy.monitoring.model.MonitorKpiRequest;
import eu.teraflow.policy.service.ServiceService;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
@@ -53,10 +53,8 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
@@ -71,6 +69,8 @@ public class PolicyServiceImpl implements PolicyService {
    private static final String VALID_MESSAGE = "%s is valid.";
    private static final int POLICY_EVALUATION_TIMEOUT = 5;
    private static final int ACCEPTABLE_NUMBER_OF_ALARMS = 3;
    private static final int MONITORING_WINDOW_IN_SECONDS = 5;
    private static final int SAMPLING_RATE_PER_SECOND = 1;

    private static final PolicyRuleState INSERTED_POLICYRULE_STATE =
            new PolicyRuleState(
@@ -148,8 +148,18 @@ public class PolicyServiceImpl implements PolicyService {
    public Uni<PolicyRuleState> addPolicyService(PolicyRuleService policyRuleService) {
        LOGGER.infof("Received %s", policyRuleService);

        if (!policyRuleService.areArgumentsValid()) {
            LOGGER.error(policyRuleService.getExeceptionMessage());
            final var policyRuleState =
                    new PolicyRuleState(
                            PolicyRuleStateEnum.POLICY_FAILED, policyRuleService.getExeceptionMessage());

            return Uni.createFrom().item(policyRuleState);
        }

        final var policyRuleBasic = policyRuleService.getPolicyRuleBasic();
        if (!policyRuleBasic.areArgumentsValid() || !policyRuleService.areArgumentsValid()) {
        if (!policyRuleBasic.areArgumentsValid()) {
            LOGGER.error(policyRuleService.getExeceptionMessage());
            setPolicyRuleServiceToContext(
                    policyRuleService,
                    new PolicyRuleState(
@@ -166,7 +176,6 @@ public class PolicyServiceImpl implements PolicyService {
                .setPolicyRule(policyRule)
                .subscribe()
                .with(id -> validateService(policyRuleService));

        return Uni.createFrom().item(policyRuleBasic.getPolicyRuleState());
    }

@@ -174,8 +183,18 @@ public class PolicyServiceImpl implements PolicyService {
    public Uni<PolicyRuleState> updatePolicyService(PolicyRuleService policyRuleService) {
        LOGGER.infof("Received %s", policyRuleService);

        if (!policyRuleService.areArgumentsValid()) {
            LOGGER.error(policyRuleService.getExeceptionMessage());
            final var policyRuleState =
                    new PolicyRuleState(
                            PolicyRuleStateEnum.POLICY_FAILED, policyRuleService.getExeceptionMessage());

            return Uni.createFrom().item(policyRuleState);
        }

        final var policyRuleBasic = policyRuleService.getPolicyRuleBasic();
        if (!policyRuleBasic.areArgumentsValid() || !policyRuleService.areArgumentsValid()) {
        if (!policyRuleBasic.areArgumentsValid()) {
            LOGGER.error(policyRuleService.getExeceptionMessage());
            setPolicyRuleServiceToContext(
                    policyRuleService,
                    new PolicyRuleState(
@@ -200,8 +219,18 @@ public class PolicyServiceImpl implements PolicyService {
    public Uni<PolicyRuleState> addPolicyDevice(PolicyRuleDevice policyRuleDevice) {
        LOGGER.infof("Received %s", policyRuleDevice);

        if (!policyRuleDevice.areArgumentsValid()) {
            LOGGER.error(policyRuleDevice.getExeceptionMessage());
            final var policyRuleState =
                    new PolicyRuleState(
                            PolicyRuleStateEnum.POLICY_FAILED, policyRuleDevice.getExeceptionMessage());

            return Uni.createFrom().item(policyRuleState);
        }

        final var policyRuleBasic = policyRuleDevice.getPolicyRuleBasic();
        if (!policyRuleBasic.areArgumentsValid() || !policyRuleDevice.areArgumentsValid()) {
        if (!policyRuleBasic.areArgumentsValid()) {
            LOGGER.error(policyRuleDevice.getExeceptionMessage());
            setPolicyRuleDeviceToContext(
                    policyRuleDevice,
                    new PolicyRuleState(
@@ -225,8 +254,18 @@ public class PolicyServiceImpl implements PolicyService {
    public Uni<PolicyRuleState> updatePolicyDevice(PolicyRuleDevice policyRuleDevice) {
        LOGGER.infof("Received %s", policyRuleDevice);

        if (!policyRuleDevice.areArgumentsValid()) {
            LOGGER.error(policyRuleDevice.getExeceptionMessage());
            final var policyRuleState =
                    new PolicyRuleState(
                            PolicyRuleStateEnum.POLICY_FAILED, policyRuleDevice.getExeceptionMessage());

            return Uni.createFrom().item(policyRuleState);
        }

        final var policyRuleBasic = policyRuleDevice.getPolicyRuleBasic();
        if (!policyRuleBasic.areArgumentsValid() || !policyRuleDevice.areArgumentsValid()) {
        if (!policyRuleBasic.areArgumentsValid()) {
            LOGGER.error(policyRuleDevice.getExeceptionMessage());
            setPolicyRuleDeviceToContext(
                    policyRuleDevice,
                    new PolicyRuleState(
@@ -272,6 +311,22 @@ public class PolicyServiceImpl implements PolicyService {
        return Uni.createFrom().item(policyRuleBasic.getPolicyRuleState());
    }

    private void monitorKpi(List<AlarmDescriptor> alarmDescriptorList) {

        for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) {
            var monitorKpiRequest =
                    new MonitorKpiRequest(
                            alarmDescriptor.getKpiId(), MONITORING_WINDOW_IN_SECONDS, SAMPLING_RATE_PER_SECOND);
            monitoringService
                    .monitorKpi(monitorKpiRequest)
                    .subscribe()
                    .with(
                            emptyMessage ->
                                    LOGGER.infof(
                                            "Kpi [%s] has started to be monitored.\n", alarmDescriptor.getKpiId()));
        }
    }

    private void provisionAlarm(
            PolicyRule policyRule, List<AlarmDescriptor> alarmDescriptorList, Boolean isService) {

@@ -309,8 +364,14 @@ public class PolicyServiceImpl implements PolicyService {
                        alarmResponse -> {
                            LOGGER.info(alarmResponse);
                            if (isService) {
                                if (!alarmPolicyRuleServiceMap.containsKey(alarmResponse.getAlarmId())) {
                                    return;
                                }
                                applyActionService(alarmResponse.getAlarmId());
                            } else {
                                if (!alarmPolicyRuleDeviceMap.containsKey(alarmResponse.getAlarmId())) {
                                    return;
                                }
                                applyActionDevice(alarmResponse.getAlarmId());
                            }
                        });
@@ -528,6 +589,7 @@ public class PolicyServiceImpl implements PolicyService {

        final var policyRuleTypeService = new PolicyRuleTypeDevice(policyRuleDevice);
        final var policyRule = new PolicyRule(policyRuleTypeService);
        monitorKpi(alarmDescriptorList);
        provisionAlarm(policyRule, alarmDescriptorList, false);
        return;
    }
@@ -542,7 +604,7 @@ public class PolicyServiceImpl implements PolicyService {
                .subscribe()
                .with(
                        policyRuleBoolean -> {
                            if (Boolean.FALSE.equals(isUpdatedPolicyRuleValid)) {
                            if (Boolean.FALSE.equals(policyRuleBoolean)) {

                                String message =
                                        String.format(
@@ -568,7 +630,7 @@ public class PolicyServiceImpl implements PolicyService {
                .subscribe()
                .with(
                        policyRuleBoolean -> {
                            if (Boolean.FALSE.equals(isUpdatedPolicyRuleValid)) {
                            if (Boolean.FALSE.equals(policyRuleBoolean)) {
                                String message =
                                        String.format(
                                                "PolicyRule with ID: %s was not found. PolicyUpdateDevice failed",
@@ -667,18 +729,16 @@ public class PolicyServiceImpl implements PolicyService {
        List<AlarmDescriptor> alarmDescriptorList = new ArrayList<>();

        for (PolicyRuleCondition policyRuleCondition : policyRuleConditions) {
            var kpiValueRange = convertPolicyRuleConditionToKpiValueRange(policyRuleCondition);

            var kpiIdList = Arrays.asList(policyRuleCondition.getKpiId());
            var kpiValueRange = convertPolicyRuleConditionToAlarmDescriptor(policyRuleCondition);
            var kpiValueRangeList = Arrays.asList(kpiValueRange);

            // TODO: Temp fix for AlarmDescriptor object
            AlarmDescriptor alarmDescriptor =
                    new AlarmDescriptor(
                            "alarmId-" + gen(),
                            "alarmDescription",
                            "alarmName-" + gen(),
                            kpiIdList,
                            kpiValueRangeList,
                            policyRuleCondition.getKpiId(),
                            kpiValueRange,
                            getTimeStamp());

            alarmDescriptorList.add(alarmDescriptor);
@@ -696,120 +756,68 @@ public class PolicyServiceImpl implements PolicyService {

    private AlarmDescriptor parsePolicyRuleConditionAnd(PolicyRuleBasic policyRuleBasic) {

        // TODO: KpiIds should be the same. Add check.

        List<PolicyRuleCondition> policyRuleConditionList = policyRuleBasic.getPolicyRuleConditions();
        List<String> kpisList = new ArrayList<String>();

        for (PolicyRuleCondition policyRuleCondition : policyRuleConditionList) {
            kpisList.add(policyRuleCondition.getKpiId());
        }
        Set<String> kpisSet = new HashSet<String>(kpisList);

        if (kpisSet.size() == kpisList.size()) {
            return createAlarmDescriptorWithoutRange(policyRuleConditionList, kpisList);
        if (policyRuleConditionList.size() > 1) {
            return createAlarmDescriptorWithRange(policyRuleConditionList);
        }

        return createAlarmDescriptorWithRange(policyRuleConditionList, kpisList);
        return createAlarmDescriptorWithoutRange(policyRuleConditionList.get(0));
    }

    private AlarmDescriptor createAlarmDescriptorWithoutRange(
            List<PolicyRuleCondition> policyRuleConditionList, List<String> kpisList) {

        List<String> kpiIdList = new ArrayList<>();
        List<KpiValueRange> kpiValueRangeList = new ArrayList<>();
            PolicyRuleCondition policyRuleCondition) {

        for (PolicyRuleCondition policyRuleCondition : policyRuleConditionList) {
            kpisList.add(policyRuleCondition.getKpiId());
            kpiValueRangeList.add(convertPolicyRuleConditionToAlarmDescriptor(policyRuleCondition));
        }
        final var kpiId = policyRuleCondition.getKpiId();
        final var kpiValueRange = convertPolicyRuleConditionToKpiValueRange(policyRuleCondition);

        return new AlarmDescriptor(
                "alarmId-" + gen(),
                "alarmDescription",
                "alarmName-" + gen(),
                kpiIdList,
                kpiValueRangeList,
                kpiId,
                kpiValueRange,
                getTimeStamp());
    }

    private AlarmDescriptor createAlarmDescriptorWithRange(
            List<PolicyRuleCondition> policyRuleConditionList, List<String> kpisList) {
            List<PolicyRuleCondition> policyRuleConditionList) {

        final var kpiId = policyRuleConditionList.get(0).getKpiId();

        HashMap<String, KpiValueRange> KpiValueRangeMap = new HashMap<>();
        for (PolicyRuleCondition policyRuleCondition : policyRuleConditionList) {

            if (KpiValueRangeMap.containsKey(policyRuleCondition.getKpiId())) {
                var kpiValueRange = KpiValueRangeMap.get(policyRuleCondition.getKpiId());

                if (kpiValueRange.getInRange() == true) {
                    LOGGER.errorf("KpiId: %s, has already range values", policyRuleCondition.getKpiId());
                    return null;
                }

                if ((kpiValueRange.getKpiMaxValue() != null) && (kpiValueRange.getKpiMinValue() != null)) {
                    LOGGER.errorf(
                            "KpiId: %s, has already min and max values", policyRuleCondition.getKpiId());
                    return null;
                }

                var kpiMinValue = kpiValueRange.getKpiMinValue();
                var kpiMaxValue = kpiValueRange.getKpiMaxValue();
                boolean inRange = false;
                boolean includeMinValue = kpiValueRange.getIncludeMinValue();
                boolean includeMaxValue = kpiValueRange.getIncludeMaxValue();

                if (policyRuleCondition.getNumericalOperator()
                                == NumericalOperator.POLICY_RULE_CONDITION_NUMERICAL_GREATER_THAN
                        && kpiValueRange.getKpiMinValue() == null) {

                    kpiMinValue = policyRuleCondition.getKpiValue();
                    inRange = true;
                    includeMinValue = false;

                } else if (policyRuleCondition.getNumericalOperator()
                                == NumericalOperator.POLICY_RULE_CONDITION_NUMERICAL_GREATER_THAN_EQUAL
                        && kpiValueRange.getKpiMinValue() == null) {

                    kpiMinValue = policyRuleCondition.getKpiValue();
                    inRange = true;
                    includeMinValue = true;
                } else if (policyRuleCondition.getNumericalOperator()
                                == NumericalOperator.POLICY_RULE_CONDITION_NUMERICAL_LESS_THAN
                        && kpiValueRange.getKpiMaxValue() == null) {

                    kpiMaxValue = policyRuleCondition.getKpiValue();
                    inRange = true;
                    includeMaxValue = false;
                } else if (policyRuleCondition.getNumericalOperator()
                                == NumericalOperator.POLICY_RULE_CONDITION_NUMERICAL_LESS_THAN_EQUAL
                        && kpiValueRange.getKpiMaxValue() == null) {

                    kpiMaxValue = policyRuleCondition.getKpiValue();
                    inRange = true;
                    includeMaxValue = true;
                } else {
                    return null;
            if (!KpiValueRangeMap.containsKey(kpiId)) {
                var kpiValueRange = convertPolicyRuleConditionToKpiValueRange(policyRuleCondition);
                KpiValueRangeMap.put(kpiId, kpiValueRange);
                continue;
            }

                KpiValueRangeMap.put(
                        policyRuleCondition.getKpiId(),
                        new KpiValueRange(kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue));
            }
            var kpiValueRange = convertPolicyRuleConditionToKpiValueRange(policyRuleCondition);
            // TODO: Handle combineKpiValueRanges exceptions
            var combinedKpiValueRange =
                    combineKpiValueRanges(kpiId, KpiValueRangeMap.get(kpiId), kpiValueRange);
            KpiValueRangeMap.put(kpiId, combinedKpiValueRange);
        }

        List<String> kpiIdList = new ArrayList<>();
        kpiIdList.addAll(KpiValueRangeMap.keySet());
        List<KpiValueRange> kpiValueRangeList = new ArrayList<>(KpiValueRangeMap.values());

        return new AlarmDescriptor(
                "alarmId-" + gen(),
                "alarmDescription",
                "alarmName-" + gen(),
                kpiIdList,
                kpiValueRangeList,
                kpiId,
                KpiValueRangeMap.get(kpiId),
                getTimeStamp());
    }

    private KpiValueRange convertPolicyRuleConditionToAlarmDescriptor(
    private KpiValueRange convertPolicyRuleConditionToKpiValueRange(
            PolicyRuleCondition policyRuleCondition) {

        switch (policyRuleCondition.getNumericalOperator()) {
@@ -840,6 +848,43 @@ public class PolicyServiceImpl implements PolicyService {
        }
    }

    private KpiValueRange combineKpiValueRanges(
            String kpiId, KpiValueRange firstKpiValueRange, KpiValueRange secondKpiValueRange) {
        if (secondKpiValueRange.getInRange() == true) {
            LOGGER.errorf("KpiId: %s, has already range values", kpiId);
            return null;
        }

        if ((firstKpiValueRange.getKpiMinValue() != null)
                && (secondKpiValueRange.getKpiMinValue() != null)) {
            LOGGER.errorf("KpiId: %s, has already min value", kpiId);
            return null;
        }

        if ((firstKpiValueRange.getKpiMaxValue() != null)
                && (secondKpiValueRange.getKpiMinValue() != null)) {
            LOGGER.errorf("KpiId: %s, has already max value", kpiId);
            return null;
        }

        // Objects.nonNull(secondKpiValueRange);

        var kpiMinValue =
                firstKpiValueRange.getKpiMinValue() != null
                        ? firstKpiValueRange.getKpiMinValue()
                        : secondKpiValueRange.getKpiMinValue();
        var kpiMaxValue =
                firstKpiValueRange.getKpiMaxValue() != null
                        ? firstKpiValueRange.getKpiMaxValue()
                        : secondKpiValueRange.getKpiMaxValue();
        boolean includeMinValue =
                firstKpiValueRange.getIncludeMinValue() || secondKpiValueRange.getIncludeMinValue();
        boolean includeMaxValue =
                firstKpiValueRange.getIncludeMaxValue() || secondKpiValueRange.getIncludeMaxValue();

        return new KpiValueRange(kpiMinValue, kpiMaxValue, true, includeMinValue, includeMaxValue);
    }

    private List<String> returnInvalidDeviceIds(List<String> deviceIds) {
        var invalidDeviceIds = new ArrayList<String>();

+78 −26

File changed.

Preview size limit exceeded, changes collapsed.

+2 −1
Original line number Diff line number Diff line
@@ -40,7 +40,8 @@ public class PolicyRuleService {
                    !serviceId.getContextId().isBlank(), "Context Id of Service Id must not be empty.");
            checkArgument(!serviceId.getId().isBlank(), "Service Id must not be empty.");
            this.serviceId = serviceId;
            checkArgument(!deviceIds.isEmpty(), "Device Ids must not be empty.");
            // TODO If device list not empty
            // checkArgument(!deviceIds.isEmpty(), "Device Ids must not be empty.");
            this.deviceIds = deviceIds;
            this.isValid = true;
            this.exceptionMessage = "";
+5 −3
Original line number Diff line number Diff line
@@ -20,12 +20,12 @@ import eu.teraflow.policy.context.model.Empty;
import eu.teraflow.policy.monitoring.model.AlarmDescriptor;
import eu.teraflow.policy.monitoring.model.AlarmResponse;
import eu.teraflow.policy.monitoring.model.AlarmSubscription;
import eu.teraflow.policy.monitoring.model.Kpi;
import eu.teraflow.policy.monitoring.model.KpiDescriptor;
import eu.teraflow.policy.monitoring.model.MonitorKpiRequest;
import eu.teraflow.policy.monitoring.model.SubsDescriptor;
import eu.teraflow.policy.monitoring.model.SubsResponse;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.util.List;

public interface MonitoringGateway {

@@ -33,7 +33,9 @@ public interface MonitoringGateway {

    Uni<KpiDescriptor> getKpiDescriptor(String kpiId);

    Multi<List<Kpi>> setKpiSubscription(SubsDescriptor subsDescriptor);
    Uni<Empty> monitorKpi(MonitorKpiRequest monitorKpiRequest);

    Multi<SubsResponse> setKpiSubscription(SubsDescriptor subsDescriptor);

    Uni<SubsDescriptor> getSubsDescriptor(String subscriptionId);

Loading