diff --git a/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java b/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java index 716949e65752a2934fbb1c93254d6e6285eb3814..e276694a7cbfaf3a9e22352c13b73054454994dd 100644 --- a/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java +++ b/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java @@ -322,10 +322,10 @@ public class PolicyServiceImpl implements PolicyService { return areDevicesValid .onItem() - .transform(areDevices -> addDeviceOnContext(areDevices, policyRuleDevice, policyRuleBasic)); + .transform(areDevices -> areDeviceOnContext(areDevices, policyRuleDevice, policyRuleBasic)); } - private PolicyRuleState addDeviceOnContext( + private PolicyRuleState areDeviceOnContext( List<Boolean> areDevices, PolicyRuleDevice policyRuleDevice, PolicyRuleBasic policyRuleBasic) { @@ -353,7 +353,22 @@ public class PolicyServiceImpl implements PolicyService { return policyRuleState; } - contextService.setPolicyRule(policyRule).subscribe().with(x -> {}); + contextService + .setPolicyRule(policyRule) + .subscribe() + .with( + policyId -> { + startMonitoringBasedOnAlarmDescriptors( + policyId, policyRuleDevice, alarmDescriptorList); + }); + + return VALIDATED_POLICYRULE_STATE; + } + + private void startMonitoringBasedOnAlarmDescriptors( + String policyId, + PolicyRuleDevice policyRuleDevice, + List<AlarmDescriptor> alarmDescriptorList) { setPolicyRuleDeviceToContext(policyRuleDevice, VALIDATED_POLICYRULE_STATE); noAlarms = 0; @@ -366,14 +381,12 @@ public class PolicyServiceImpl implements PolicyService { final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE); - monitorAlarmResponseForDevice(multi); + subscriptionList.put(policyId, monitorAlarmResponseForDevice(multi)); // TODO: Resubscribe to the stream, if it has ended // TODO: Redesign evaluation of action // evaluateAction(policyRule, alarmDescriptorList, multi); - - return VALIDATED_POLICYRULE_STATE; } private List<Multi<AlarmResponse>> getAlarmResponse( @@ -569,8 +582,8 @@ public class PolicyServiceImpl implements PolicyService { }); } - private void monitorAlarmResponseForDevice(Multi<AlarmResponse> multi) { - multi + private Cancellable monitorAlarmResponseForDevice(Multi<AlarmResponse> multi) { + return multi .subscribe() .with( alarmResponse -> {