diff --git a/src/policy/src/main/java/org/etsi/tfs/policy/policy/AddPolicyServiceImpl.java b/src/policy/src/main/java/org/etsi/tfs/policy/policy/AddPolicyServiceImpl.java index cf94760d66daf59867cd418f7fd7d3e96b2195af..db9bb35d257c53b6254c2cd951f132970e2ab80d 100644 --- a/src/policy/src/main/java/org/etsi/tfs/policy/policy/AddPolicyServiceImpl.java +++ b/src/policy/src/main/java/org/etsi/tfs/policy/policy/AddPolicyServiceImpl.java @@ -22,13 +22,10 @@ import static org.etsi.tfs.policy.common.ApplicationProperties.VALIDATED_POLICYR import io.smallrye.mutiny.Uni; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; -import java.util.List; import org.etsi.tfs.policy.context.ContextService; import org.etsi.tfs.policy.context.model.ServiceId; import org.etsi.tfs.policy.exception.ExternalServiceFailureException; -import org.etsi.tfs.policy.monitoring.model.AlarmDescriptor; import org.etsi.tfs.policy.policy.model.PolicyRule; -import org.etsi.tfs.policy.policy.model.PolicyRuleBasic; import org.etsi.tfs.policy.policy.model.PolicyRuleService; import org.etsi.tfs.policy.policy.model.PolicyRuleState; import org.etsi.tfs.policy.policy.model.PolicyRuleStateEnum; @@ -38,14 +35,10 @@ import org.etsi.tfs.policy.policy.model.PolicyRuleTypeService; public class AddPolicyServiceImpl { @Inject private CommonPolicyServiceImpl commonPolicyService; - @Inject private CommonAlarmService commonAlarmService; @Inject private ContextService contextService; public Uni<PolicyRuleState> constructPolicyStateBasedOnCriteria( - Boolean isService, - ServiceId serviceId, - PolicyRuleService policyRuleService, - PolicyRuleBasic policyRuleBasic) { + Boolean isService, ServiceId serviceId, PolicyRuleService policyRuleService) { if (!isService) { var policyRuleState = @@ -57,36 +50,20 @@ public class AddPolicyServiceImpl { final var policyRuleTypeService = new PolicyRuleTypeService(policyRuleService); final var policyRule = new PolicyRule(policyRuleTypeService); - final var alarmDescriptorList = commonPolicyService.createAlarmDescriptorList(policyRule); - if (alarmDescriptorList.isEmpty()) { - var policyRuleState = - new PolicyRuleState( - PolicyRuleStateEnum.POLICY_FAILED, - String.format( - "Invalid PolicyRuleConditions in PolicyRule with ID: %s", - policyRuleBasic.getPolicyRuleId())); - return Uni.createFrom().item(policyRuleState); - } + final String kpiId = + policyRuleService.getPolicyRuleBasic().getPolicyRuleConditions().get(0).getKpiId(); + commonPolicyService.getKpiPolicyRuleServiceMap().put(kpiId, policyRuleService); - return setPolicyRuleOnContextAndReturnState(policyRule, policyRuleService, alarmDescriptorList); + return setPolicyRuleOnContextAndReturnState(policyRule); } - private Uni<PolicyRuleState> setPolicyRuleOnContextAndReturnState( - PolicyRule policyRule, - PolicyRuleService policyRuleService, - List<AlarmDescriptor> alarmDescriptorList) { + private Uni<PolicyRuleState> setPolicyRuleOnContextAndReturnState(PolicyRule policyRule) { return contextService .setPolicyRule(policyRule) .onFailure() .transform(failure -> new ExternalServiceFailureException(failure.getMessage())) .onItem() - .transform( - policyId -> { - commonAlarmService.startMonitoringBasedOnAlarmDescriptors( - policyId, policyRuleService, alarmDescriptorList); - - return VALIDATED_POLICYRULE_STATE; - }); + .transform(policyId -> VALIDATED_POLICYRULE_STATE); } } diff --git a/src/policy/src/main/java/org/etsi/tfs/policy/policy/CommonPolicyServiceImpl.java b/src/policy/src/main/java/org/etsi/tfs/policy/policy/CommonPolicyServiceImpl.java index 23b75dbdfb393107dd5e2ac209d042b88fedfa88..3ce13574dfe16572731fda07303885ff5e097377 100644 --- a/src/policy/src/main/java/org/etsi/tfs/policy/policy/CommonPolicyServiceImpl.java +++ b/src/policy/src/main/java/org/etsi/tfs/policy/policy/CommonPolicyServiceImpl.java @@ -77,7 +77,8 @@ public class CommonPolicyServiceImpl { // TODO: Find a better way to disregard alarms while reconfiguring path // Temporary solution for not calling the same rpc more than it's needed public static int noAlarms = 0; - + private ConcurrentHashMap<String, PolicyRuleService> kpiPolicyRuleServiceMap = + new ConcurrentHashMap<>(); private ConcurrentHashMap<String, PolicyRuleService> alarmPolicyRuleServiceMap = new ConcurrentHashMap<>(); private ConcurrentHashMap<String, PolicyRuleDevice> alarmPolicyRuleDeviceMap = @@ -89,6 +90,10 @@ public class CommonPolicyServiceImpl { return subscriptionList; } + public ConcurrentHashMap<String, PolicyRuleService> getKpiPolicyRuleServiceMap() { + return kpiPolicyRuleServiceMap; + } + public ConcurrentHashMap<String, PolicyRuleService> getAlarmPolicyRuleServiceMap() { return alarmPolicyRuleServiceMap; } @@ -111,6 +116,31 @@ public class CommonPolicyServiceImpl { return Long.valueOf(now).doubleValue(); } + public void applyActionServiceBasedOnKpiId(String kpiId) { + if (!kpiPolicyRuleServiceMap.contains(kpiId)) { + LOGGER.info("No Policy for KpiId"); + return; + } + + PolicyRuleService policyRuleService = kpiPolicyRuleServiceMap.get(kpiId); + PolicyRuleAction policyRuleAction = + policyRuleService.getPolicyRuleBasic().getPolicyRuleActions().get(0); + + setPolicyRuleServiceToContext(policyRuleService, ACTIVE_POLICYRULE_STATE); + + switch (policyRuleAction.getPolicyRuleActionEnum()) { + case POLICY_RULE_ACTION_ADD_SERVICE_CONSTRAINT: + addServiceConstraint(policyRuleService, policyRuleAction); + case POLICY_RULE_ACTION_ADD_SERVICE_CONFIGRULE: + addServiceConfigRule(policyRuleService, policyRuleAction); + case POLICY_RULE_ACTION_RECALCULATE_PATH: + callRecalculatePathRPC(policyRuleService, policyRuleAction); + default: + LOGGER.errorf(INVALID_MESSAGE, policyRuleAction.getPolicyRuleActionEnum()); + return; + } + } + public void applyActionService(String alarmId) { PolicyRuleService policyRuleService = alarmPolicyRuleServiceMap.get(alarmId); PolicyRuleAction policyRuleAction = diff --git a/src/policy/src/main/java/org/etsi/tfs/policy/policy/PolicyServiceImpl.java b/src/policy/src/main/java/org/etsi/tfs/policy/policy/PolicyServiceImpl.java index e85ab8fe12be29a46f493949ffc8cdd3062f5bc1..c2476e11b5b370c499c4bd8af725e4d71361fd9a 100644 --- a/src/policy/src/main/java/org/etsi/tfs/policy/policy/PolicyServiceImpl.java +++ b/src/policy/src/main/java/org/etsi/tfs/policy/policy/PolicyServiceImpl.java @@ -90,7 +90,7 @@ public class PolicyServiceImpl implements PolicyService { .transform( isService -> addPolicyServiceImpl.constructPolicyStateBasedOnCriteria( - isService, serviceId, policyRuleService, policyRuleBasic)) + isService, serviceId, policyRuleService)) .flatMap(Function.identity()); } diff --git a/src/policy/src/main/java/org/etsi/tfs/policy/policy/kafka/AlarmListener.java b/src/policy/src/main/java/org/etsi/tfs/policy/policy/kafka/AlarmListener.java index 33fcfb71119e80733343ee63dd7cec5c9070a075..81cd6df4d9699f1ce5c7fda8956d336bec7a7c13 100644 --- a/src/policy/src/main/java/org/etsi/tfs/policy/policy/kafka/AlarmListener.java +++ b/src/policy/src/main/java/org/etsi/tfs/policy/policy/kafka/AlarmListener.java @@ -14,14 +14,13 @@ * limitations under the License. */ -package org.etsi.tfs.policy.policy; - -import static org.etsi.tfs.policy.common.ApplicationProperties.*; +package org.etsi.tfs.policy.policy.kafka; import io.smallrye.reactive.messaging.annotations.Blocking; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.etsi.tfs.policy.policy.CommonPolicyServiceImpl; import org.etsi.tfs.policy.policy.model.AlarmTopicDTO; import org.jboss.logging.Logger; @@ -40,25 +39,12 @@ public class AlarmListener { @Incoming(ALARM_TOPIC) @Blocking public void receiveAlarm(AlarmTopicDTO alarmTopicDto) { - logger.infof("Received Alarm for analytic service backend :\n %s", alarmTopicDto.toString()); - - if (!alarmTopicDto.getKpiId().isEmpty()) { - alarmTopicDto - .getAlarms() - .forEach( - (key, value) -> { - if (value) { - logger.infof( - "**************************Received Alarm!**************************"); - logger.infof("alarmTopicDto:"); - logger.info(alarmTopicDto.toString()); - logger.infof( - "Received Alarm for analytic service backend with id:\n %s", key.toString()); - // - // commonPolicyServiceImpl.applyActionService(alarmResponse.getAlarmId()); - - } - }); + logger.infof("Received message for analytic service backend :\n %s", alarmTopicDto.toString()); + if (alarmTopicDto.isThresholdRaise() || alarmTopicDto.isThresholdFall()) { + logger.infof("**************************Received Alarm!**************************"); + logger.infof( + "Received Alarm for analytic service backend with kpiId: %s", alarmTopicDto.getKpiId()); + commonPolicyServiceImpl.applyActionServiceBasedOnKpiId(alarmTopicDto.getKpiId()); } } } diff --git a/src/policy/src/main/java/org/etsi/tfs/policy/policy/model/AlarmTopicDTO.java b/src/policy/src/main/java/org/etsi/tfs/policy/policy/model/AlarmTopicDTO.java index 69bdd47bb2f97c5c500a948054e88f0c5a61709a..b26d53484d02495391367a5dbc45619ddac9c2e1 100644 --- a/src/policy/src/main/java/org/etsi/tfs/policy/policy/model/AlarmTopicDTO.java +++ b/src/policy/src/main/java/org/etsi/tfs/policy/policy/model/AlarmTopicDTO.java @@ -16,14 +16,23 @@ package org.etsi.tfs.policy.policy.model; -import java.util.Map; +import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; @Data public class AlarmTopicDTO { - private String startTimestamp; - private String endTimestamp; + @JsonProperty("window_start") + private String windowStart; + + @JsonProperty("THRESHOLD_FALL") + private boolean thresholdFall; + + @JsonProperty("THRESHOLD_RAISE") + private boolean thresholdRaise; + + private String value; + + @JsonProperty("kpi_id") private String kpiId; - private Map<String, Boolean> alarms; } diff --git a/src/policy/src/main/resources/application.yml b/src/policy/src/main/resources/application.yml index 4086e22df7bff45f7f874ed1ba3f12edfe933676..e6bf8d1e1e2c48c884b70c04b51b5ea45dc2278a 100644 --- a/src/policy/src/main/resources/application.yml +++ b/src/policy/src/main/resources/application.yml @@ -76,10 +76,11 @@ mp: messaging: incoming: topic-alarms: + failure-strategy: ignore connector: smallrye-kafka topic: topic-alarms value: deserializer: org.etsi.tfs.policy.policy.kafka.TopicAlarmDeserializer kafka: bootstrap: - servers: ${quarkus.kubernetes.env.vars.kafka-broker-host}:9092 \ No newline at end of file + servers: ${quarkus.kubernetes.env.vars.kafka-broker-host}:9092