diff --git a/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java b/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java index faf288cf637dcf8d567df46089c5eaf148a5921c..3ef1987dbd33aa1473f0dd04763f37b71fe99f9f 100644 --- a/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java +++ b/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java @@ -322,74 +322,91 @@ public class PolicyServiceImpl implements PolicyService { return areDevicesValid .onItem() - .transform( - areDevices -> { - if (areDevices.contains(false)) { - var policyRuleState = - new PolicyRuleState( - PolicyRuleStateEnum.POLICY_FAILED, - String.format( - INVALID_MESSAGE, - policyRuleDevice.getPolicyRuleBasic().getPolicyRuleId())); - - return policyRuleState; - } + .transform(areDevices -> addDeviceOnContext(areDevices, policyRuleDevice, policyRuleBasic)); + } - final var policyRuleTypeDevice = new PolicyRuleTypeDevice(policyRuleDevice); - final var policyRule = new PolicyRule(policyRuleTypeDevice); - - final var alarmDescriptorList = createAlarmDescriptorList(policyRule); - if (alarmDescriptorList.isEmpty()) { - var policyRuleState = - new PolicyRuleState( - PolicyRuleStateEnum.POLICY_FAILED, - String.format( - "Invalid PolicyRuleConditions in PolicyRule with ID: %s", - policyRuleBasic.getPolicyRuleId())); - return policyRuleState; - } + private PolicyRuleState addDeviceOnContext( + List<Boolean> areDevices, + PolicyRuleDevice policyRuleDevice, + PolicyRuleBasic policyRuleBasic) { + if (areDevices.contains(false)) { + var policyRuleState = + new PolicyRuleState( + PolicyRuleStateEnum.POLICY_FAILED, + String.format( + INVALID_MESSAGE, policyRuleDevice.getPolicyRuleBasic().getPolicyRuleId())); - contextService.setPolicyRule(policyRule).subscribe().with(x -> {}); - setPolicyRuleDeviceToContext(policyRuleDevice, VALIDATED_POLICYRULE_STATE); - noAlarms = 0; + return policyRuleState; + } - List<Uni<String>> alarmIds = new ArrayList<Uni<String>>(); - for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { - LOGGER.infof("alarmDescriptor:"); - LOGGER.infof(alarmDescriptor.toString()); - alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); - } + final var policyRuleTypeDevice = new PolicyRuleTypeDevice(policyRuleDevice); + final var policyRule = new PolicyRule(policyRuleTypeDevice); - // Transform the alarmIds into promised alarms returned from the - // getAlarmResponseStream - List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); - for (Uni<String> alarmId : alarmIds) { - alarmResponseStreamList.add( - alarmId - .onItem() - .transformToMulti( - id -> { - alarmPolicyRuleDeviceMap.put(id, policyRuleDevice); - - // TODO: Create infinite subscription - var alarmSubscription = new AlarmSubscription(id, 259200, 5000); - return monitoringService.getAlarmResponseStream(alarmSubscription); - })); - } + final var alarmDescriptorList = createAlarmDescriptorList(policyRule); + if (alarmDescriptorList.isEmpty()) { + var policyRuleState = + new PolicyRuleState( + PolicyRuleStateEnum.POLICY_FAILED, + String.format( + "Invalid PolicyRuleConditions in PolicyRule with ID: %s", + policyRuleBasic.getPolicyRuleId())); + return policyRuleState; + } - // Merge the promised alarms into one stream (Multi Object) - final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); - setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE); + contextService.setPolicyRule(policyRule).subscribe().with(x -> {}); + setPolicyRuleDeviceToContext(policyRuleDevice, VALIDATED_POLICYRULE_STATE); + noAlarms = 0; - monitorAlarmResponseForDevice(multi); + List<Uni<String>> alarmIds = getAlarmIds(alarmDescriptorList); - // TODO: Resubscribe to the stream, if it has ended + List<Multi<AlarmResponse>> alarmResponseStreamList = + getAlarmResponse(alarmIds, policyRuleDevice); - // TODO: Redesign evaluation of action - // evaluateAction(policyRule, alarmDescriptorList, multi); + // Merge the promised alarms into one stream (Multi Object) + final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); + setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE); - return VALIDATED_POLICYRULE_STATE; - }); + monitorAlarmResponseForDevice(multi); + + // TODO: Resubscribe to the stream, if it has ended + + // TODO: Redesign evaluation of action + // evaluateAction(policyRule, alarmDescriptorList, multi); + + return VALIDATED_POLICYRULE_STATE; + } + + private List<Multi<AlarmResponse>> getAlarmResponse( + List<Uni<String>> alarmIds, PolicyRuleDevice policyRuleDevice) { + // Transform the alarmIds into promised alarms returned from the + // getAlarmResponseStream + List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); + for (Uni<String> alarmId : alarmIds) { + alarmResponseStreamList.add( + alarmId + .onItem() + .transformToMulti( + id -> setPolicyMonitoringDevice(policyRuleDevice, id))); + } + return alarmResponseStreamList; + } + + private Multi<AlarmResponse> setPolicyMonitoringDevice(PolicyRuleDevice policyRuleDevice, String id){ + alarmPolicyRuleDeviceMap.put(id, policyRuleDevice); + + // TODO: Create infinite subscription + var alarmSubscription = new AlarmSubscription(id, 259200, 5000); + return monitoringService.getAlarmResponseStream(alarmSubscription); + } + + private List<Uni<String>> getAlarmIds(List<AlarmDescriptor> alarmDescriptorList) { + List<Uni<String>> alarmIds = new ArrayList<Uni<String>>(); + for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { + LOGGER.infof("alarmDescriptor:"); + LOGGER.infof(alarmDescriptor.toString()); + alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); + } + return alarmIds; } @Override @@ -477,32 +494,30 @@ public class PolicyServiceImpl implements PolicyService { final var getPolicyRule = contextService.getPolicyRule(policyRuleId); - return getPolicyRule - .onItem() - .transform( - policyRule -> { - var policyRuleBasic = policyRule.getPolicyRuleType().getPolicyRuleBasic(); - String policyId = policyRuleBasic.getPolicyRuleId(); + return getPolicyRule.onItem().transform(policyRule -> removePolicyFromContext(policyRule)); + } - policyRule - .getPolicyRuleType() - .getPolicyRuleBasic() - .setPolicyRuleState(REMOVED_POLICYRULE_STATE); + private PolicyRuleState removePolicyFromContext(PolicyRule policyRule) { + var policyRuleBasic = policyRule.getPolicyRuleType().getPolicyRuleBasic(); + String policyId = policyRuleBasic.getPolicyRuleId(); - contextService - .setPolicyRule(policyRule) - .subscribe() - .with( - tmp -> - LOGGER.infof( - "DeletePolicy with id: " + VALID_MESSAGE, - policyRuleBasic.getPolicyRuleId())); + policyRule + .getPolicyRuleType() + .getPolicyRuleBasic() + .setPolicyRuleState(REMOVED_POLICYRULE_STATE); + + contextService + .setPolicyRule(policyRule) + .subscribe() + .with( + tmp -> + LOGGER.infof( + "DeletePolicy with id: " + VALID_MESSAGE, policyRuleBasic.getPolicyRuleId())); - contextService.removePolicyRule(policyId).subscribe().with(x -> {}); - subscriptionList.get(policyId).cancel(); + contextService.removePolicyRule(policyId).subscribe().with(x -> {}); + subscriptionList.get(policyId).cancel(); - return policyRuleBasic.getPolicyRuleState(); - }); + return policyRuleBasic.getPolicyRuleState(); } private Uni<List<Boolean>> returnInvalidDeviceIds(List<String> deviceIds) {