Commit dd85c94c authored by Konstantinos Poulakakis's avatar Konstantinos Poulakakis
Browse files

Adapt Policy service with new Monitoring system. Add a kafka topic that...

Adapt Policy service with new Monitoring system. Add a kafka topic that consumes alarms from monitoring system. Redirect the flow to that kafka topic.
parent cd3a4525
Loading
Loading
Loading
Loading
+7 −30
Original line number Diff line number Diff line
@@ -22,13 +22,10 @@ import static org.etsi.tfs.policy.common.ApplicationProperties.VALIDATED_POLICYR
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.List;
import org.etsi.tfs.policy.context.ContextService;
import org.etsi.tfs.policy.context.model.ServiceId;
import org.etsi.tfs.policy.exception.ExternalServiceFailureException;
import org.etsi.tfs.policy.monitoring.model.AlarmDescriptor;
import org.etsi.tfs.policy.policy.model.PolicyRule;
import org.etsi.tfs.policy.policy.model.PolicyRuleBasic;
import org.etsi.tfs.policy.policy.model.PolicyRuleService;
import org.etsi.tfs.policy.policy.model.PolicyRuleState;
import org.etsi.tfs.policy.policy.model.PolicyRuleStateEnum;
@@ -38,14 +35,10 @@ import org.etsi.tfs.policy.policy.model.PolicyRuleTypeService;
public class AddPolicyServiceImpl {

    @Inject private CommonPolicyServiceImpl commonPolicyService;
    @Inject private CommonAlarmService commonAlarmService;
    @Inject private ContextService contextService;

    public Uni<PolicyRuleState> constructPolicyStateBasedOnCriteria(
            Boolean isService,
            ServiceId serviceId,
            PolicyRuleService policyRuleService,
            PolicyRuleBasic policyRuleBasic) {
            Boolean isService, ServiceId serviceId, PolicyRuleService policyRuleService) {

        if (!isService) {
            var policyRuleState =
@@ -57,36 +50,20 @@ public class AddPolicyServiceImpl {

        final var policyRuleTypeService = new PolicyRuleTypeService(policyRuleService);
        final var policyRule = new PolicyRule(policyRuleTypeService);
        final var alarmDescriptorList = commonPolicyService.createAlarmDescriptorList(policyRule);

        if (alarmDescriptorList.isEmpty()) {
            var policyRuleState =
                    new PolicyRuleState(
                            PolicyRuleStateEnum.POLICY_FAILED,
                            String.format(
                                    "Invalid PolicyRuleConditions in PolicyRule with ID: %s",
                                    policyRuleBasic.getPolicyRuleId()));
            return Uni.createFrom().item(policyRuleState);
        }
        final String kpiId =
                policyRuleService.getPolicyRuleBasic().getPolicyRuleConditions().get(0).getKpiId();
        commonPolicyService.getKpiPolicyRuleServiceMap().put(kpiId, policyRuleService);

        return setPolicyRuleOnContextAndReturnState(policyRule, policyRuleService, alarmDescriptorList);
        return setPolicyRuleOnContextAndReturnState(policyRule);
    }

    private Uni<PolicyRuleState> setPolicyRuleOnContextAndReturnState(
            PolicyRule policyRule,
            PolicyRuleService policyRuleService,
            List<AlarmDescriptor> alarmDescriptorList) {
    private Uni<PolicyRuleState> setPolicyRuleOnContextAndReturnState(PolicyRule policyRule) {
        return contextService
                .setPolicyRule(policyRule)
                .onFailure()
                .transform(failure -> new ExternalServiceFailureException(failure.getMessage()))
                .onItem()
                .transform(
                        policyId -> {
                            commonAlarmService.startMonitoringBasedOnAlarmDescriptors(
                                    policyId, policyRuleService, alarmDescriptorList);

                            return VALIDATED_POLICYRULE_STATE;
                        });
                .transform(policyId -> VALIDATED_POLICYRULE_STATE);
    }
}
+31 −1
Original line number Diff line number Diff line
@@ -77,7 +77,8 @@ public class CommonPolicyServiceImpl {
    // TODO: Find a better way to disregard alarms while reconfiguring path
    // Temporary solution for not calling the same rpc more than it's needed
    public static int noAlarms = 0;

    private ConcurrentHashMap<String, PolicyRuleService> kpiPolicyRuleServiceMap =
            new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, PolicyRuleService> alarmPolicyRuleServiceMap =
            new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, PolicyRuleDevice> alarmPolicyRuleDeviceMap =
@@ -89,6 +90,10 @@ public class CommonPolicyServiceImpl {
        return subscriptionList;
    }

    public ConcurrentHashMap<String, PolicyRuleService> getKpiPolicyRuleServiceMap() {
        return kpiPolicyRuleServiceMap;
    }

    public ConcurrentHashMap<String, PolicyRuleService> getAlarmPolicyRuleServiceMap() {
        return alarmPolicyRuleServiceMap;
    }
@@ -111,6 +116,31 @@ public class CommonPolicyServiceImpl {
        return Long.valueOf(now).doubleValue();
    }

    public void applyActionServiceBasedOnKpiId(String kpiId) {
        if (!kpiPolicyRuleServiceMap.contains(kpiId)) {
            LOGGER.info("No Policy for KpiId");
            return;
        }

        PolicyRuleService policyRuleService = kpiPolicyRuleServiceMap.get(kpiId);
        PolicyRuleAction policyRuleAction =
                policyRuleService.getPolicyRuleBasic().getPolicyRuleActions().get(0);

        setPolicyRuleServiceToContext(policyRuleService, ACTIVE_POLICYRULE_STATE);

        switch (policyRuleAction.getPolicyRuleActionEnum()) {
            case POLICY_RULE_ACTION_ADD_SERVICE_CONSTRAINT:
                addServiceConstraint(policyRuleService, policyRuleAction);
            case POLICY_RULE_ACTION_ADD_SERVICE_CONFIGRULE:
                addServiceConfigRule(policyRuleService, policyRuleAction);
            case POLICY_RULE_ACTION_RECALCULATE_PATH:
                callRecalculatePathRPC(policyRuleService, policyRuleAction);
            default:
                LOGGER.errorf(INVALID_MESSAGE, policyRuleAction.getPolicyRuleActionEnum());
                return;
        }
    }

    public void applyActionService(String alarmId) {
        PolicyRuleService policyRuleService = alarmPolicyRuleServiceMap.get(alarmId);
        PolicyRuleAction policyRuleAction =
+1 −1
Original line number Diff line number Diff line
@@ -90,7 +90,7 @@ public class PolicyServiceImpl implements PolicyService {
                .transform(
                        isService ->
                                addPolicyServiceImpl.constructPolicyStateBasedOnCriteria(
                                        isService, serviceId, policyRuleService, policyRuleBasic))
                                        isService, serviceId, policyRuleService))
                .flatMap(Function.identity());
    }

+8 −22
Original line number Diff line number Diff line
@@ -14,14 +14,13 @@
 * limitations under the License.
 */

package org.etsi.tfs.policy.policy;

import static org.etsi.tfs.policy.common.ApplicationProperties.*;
package org.etsi.tfs.policy.policy.kafka;

import io.smallrye.reactive.messaging.annotations.Blocking;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.etsi.tfs.policy.policy.CommonPolicyServiceImpl;
import org.etsi.tfs.policy.policy.model.AlarmTopicDTO;
import org.jboss.logging.Logger;

@@ -40,25 +39,12 @@ public class AlarmListener {
    @Incoming(ALARM_TOPIC)
    @Blocking
    public void receiveAlarm(AlarmTopicDTO alarmTopicDto) {
        logger.infof("Received Alarm for analytic service backend :\n %s", alarmTopicDto.toString());

        if (!alarmTopicDto.getKpiId().isEmpty()) {
            alarmTopicDto
                    .getAlarms()
                    .forEach(
                            (key, value) -> {
                                if (value) {
                                    logger.infof(
                                            "**************************Received Alarm!**************************");
                                    logger.infof("alarmTopicDto:");
                                    logger.info(alarmTopicDto.toString());
        logger.infof("Received message for analytic service backend :\n %s", alarmTopicDto.toString());
        if (alarmTopicDto.isThresholdRaise() || alarmTopicDto.isThresholdFall()) {
            logger.infof("**************************Received Alarm!**************************");
            logger.infof(
                                            "Received Alarm for analytic service backend with id:\n %s", key.toString());
                                    //
                                    // commonPolicyServiceImpl.applyActionService(alarmResponse.getAlarmId());

                                }
                            });
                    "Received Alarm for analytic service backend with kpiId: %s", alarmTopicDto.getKpiId());
            commonPolicyServiceImpl.applyActionServiceBasedOnKpiId(alarmTopicDto.getKpiId());
        }
    }
}
+13 −4
Original line number Diff line number Diff line
@@ -16,14 +16,23 @@

package org.etsi.tfs.policy.policy.model;

import java.util.Map;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;

@Data
public class AlarmTopicDTO {

    private String startTimestamp;
    private String endTimestamp;
    @JsonProperty("window_start")
    private String windowStart;

    @JsonProperty("THRESHOLD_FALL")
    private boolean thresholdFall;

    @JsonProperty("THRESHOLD_RAISE")
    private boolean thresholdRaise;

    private String value;

    @JsonProperty("kpi_id")
    private String kpiId;
    private Map<String, Boolean> alarms;
}
Loading