diff --git a/src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java b/src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java index 4f62b1ea421579ec5eb1c135e76188c4a071e6c1..8462e0b9d7ec875004cea0a1b2836dc92d3b6f15 100644 --- a/src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java +++ b/src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java @@ -284,11 +284,98 @@ public class PolicyServiceImpl implements PolicyService { final var policyRuleTypeDevice = new PolicyRuleTypeDevice(policyRuleDevice); final var policyRule = new PolicyRule(policyRuleTypeDevice); - contextService - .setPolicyRule(policyRule) - .subscribe() - .with(id -> validateDevice(policyRuleDevice)); - return Uni.createFrom().item(policyRuleBasic.getPolicyRuleState()); + final var alarmDescriptorList = createAlarmDescriptorList(policyRule); + if (alarmDescriptorList.isEmpty()) { + var policyRuleState = + new PolicyRuleState( + PolicyRuleStateEnum.POLICY_FAILED, + String.format( + "Invalid PolicyRuleConditions in PolicyRule with ID: %s", + policyRuleBasic.getPolicyRuleId())); + return policyRuleState; + } + + contextService.setPolicyRule(policyRule).subscribe().with(x -> {}); + setPolicyRuleDeviceToContext(policyRuleDevice, VALIDATED_POLICYRULE_STATE); + noAlarms = 0; + + List<Uni<String>> alarmIds = new ArrayList<Uni<String>>(); + for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { + LOGGER.infof("alarmDescriptor:"); + LOGGER.infof(alarmDescriptor.toString()); + alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); + } + + // Transform the alarmIds into promised alarms returned from the + // getAlarmResponseStream + List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); + for (Uni<String> alarmId : alarmIds) { + alarmResponseStreamList.add( + alarmId + .onItem() + .transformToMulti( + id -> { + alarmPolicyRuleDeviceMap.put(id, policyRuleDevice); + + // TODO: Create infinite subscription + var alarmSubscription = new AlarmSubscription(id, 259200, 5000); + return monitoringService.getAlarmResponseStream(alarmSubscription); + })); + } + + // Merge the promised alarms into one stream (Multi Object) + final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); + setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE); + + monitorAlarmResponseForDevice(multi); + + // TODO: Resubscribe to the stream, if it has ended + + // TODO: Redesign evaluation of action + // evaluateAction(policyRule, alarmDescriptorList, multi); + + return VALIDATED_POLICYRULE_STATE; + }); + } + + @Override + public Uni<PolicyRuleState> updatePolicyService(PolicyRuleService policyRuleService) { + LOGGER.infof("Received %s", policyRuleService); + + if (!policyRuleService.areArgumentsValid()) { + LOGGER.error(policyRuleService.getExeceptionMessage()); + final var policyRuleState = + new PolicyRuleState( + PolicyRuleStateEnum.POLICY_FAILED, policyRuleService.getExeceptionMessage()); + + return Uni.createFrom().item(policyRuleState); + } + + final var policyRuleBasic = policyRuleService.getPolicyRuleBasic(); + if (!policyRuleBasic.areArgumentsValid()) { + LOGGER.error(policyRuleService.getExeceptionMessage()); + final var policyRuleState = + new PolicyRuleState( + PolicyRuleStateEnum.POLICY_FAILED, policyRuleBasic.getExeceptionMessage()); + return Uni.createFrom().item(policyRuleState); + } + + final var serviceId = policyRuleService.getServiceId(); + final var policyRuleId = policyRuleBasic.getPolicyRuleId(); + final var isPolicyRuleServiceValid = + policyRuleConditionValidator.isPolicyRuleServiceValid(policyRuleId, serviceId); + + return isPolicyRuleServiceValid + .onItem() + .transform( + isPolicyRuleService -> { + if (!isPolicyRuleService) { + return new PolicyRuleState( + PolicyRuleStateEnum.POLICY_FAILED, String.format(INVALID_MESSAGE, serviceId)); + } + + return VALIDATED_POLICYRULE_STATE; + }); } @Override