Loading src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java +96 −81 Original line number Diff line number Diff line Loading @@ -322,15 +322,19 @@ public class PolicyServiceImpl implements PolicyService { return areDevicesValid .onItem() .transform( areDevices -> { .transform(areDevices -> addDeviceOnContext(areDevices, policyRuleDevice, policyRuleBasic)); } private PolicyRuleState addDeviceOnContext( List<Boolean> areDevices, PolicyRuleDevice policyRuleDevice, PolicyRuleBasic policyRuleBasic) { if (areDevices.contains(false)) { var policyRuleState = new PolicyRuleState( PolicyRuleStateEnum.POLICY_FAILED, String.format( INVALID_MESSAGE, policyRuleDevice.getPolicyRuleBasic().getPolicyRuleId())); INVALID_MESSAGE, policyRuleDevice.getPolicyRuleBasic().getPolicyRuleId())); return policyRuleState; } Loading @@ -353,13 +357,27 @@ public class PolicyServiceImpl implements PolicyService { setPolicyRuleDeviceToContext(policyRuleDevice, VALIDATED_POLICYRULE_STATE); noAlarms = 0; List<Uni<String>> alarmIds = new ArrayList<Uni<String>>(); for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { LOGGER.infof("alarmDescriptor:"); LOGGER.infof(alarmDescriptor.toString()); alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); List<Uni<String>> alarmIds = getAlarmIds(alarmDescriptorList); List<Multi<AlarmResponse>> alarmResponseStreamList = getAlarmResponse(alarmIds, policyRuleDevice); // Merge the promised alarms into one stream (Multi Object) final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE); monitorAlarmResponseForDevice(multi); // TODO: Resubscribe to the stream, if it has ended // TODO: Redesign evaluation of action // evaluateAction(policyRule, alarmDescriptorList, multi); return VALIDATED_POLICYRULE_STATE; } private List<Multi<AlarmResponse>> getAlarmResponse( List<Uni<String>> alarmIds, PolicyRuleDevice policyRuleDevice) { // Transform the alarmIds into promised alarms returned from the // getAlarmResponseStream List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); Loading @@ -368,28 +386,27 @@ public class PolicyServiceImpl implements PolicyService { alarmId .onItem() .transformToMulti( id -> { id -> setPolicyMonitoringDevice(policyRuleDevice, id))); } return alarmResponseStreamList; } private Multi<AlarmResponse> setPolicyMonitoringDevice(PolicyRuleDevice policyRuleDevice, String id){ alarmPolicyRuleDeviceMap.put(id, policyRuleDevice); // TODO: Create infinite subscription var alarmSubscription = new AlarmSubscription(id, 259200, 5000); return monitoringService.getAlarmResponseStream(alarmSubscription); })); } // Merge the promised alarms into one stream (Multi Object) final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE); monitorAlarmResponseForDevice(multi); // TODO: Resubscribe to the stream, if it has ended // TODO: Redesign evaluation of action // evaluateAction(policyRule, alarmDescriptorList, multi); return VALIDATED_POLICYRULE_STATE; }); private List<Uni<String>> getAlarmIds(List<AlarmDescriptor> alarmDescriptorList) { List<Uni<String>> alarmIds = new ArrayList<Uni<String>>(); for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { LOGGER.infof("alarmDescriptor:"); LOGGER.infof(alarmDescriptor.toString()); alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); } return alarmIds; } @Override Loading Loading @@ -477,10 +494,10 @@ public class PolicyServiceImpl implements PolicyService { final var getPolicyRule = contextService.getPolicyRule(policyRuleId); return getPolicyRule .onItem() .transform( policyRule -> { return getPolicyRule.onItem().transform(policyRule -> removePolicyFromContext(policyRule)); } private PolicyRuleState removePolicyFromContext(PolicyRule policyRule) { var policyRuleBasic = policyRule.getPolicyRuleType().getPolicyRuleBasic(); String policyId = policyRuleBasic.getPolicyRuleId(); Loading @@ -495,14 +512,12 @@ public class PolicyServiceImpl implements PolicyService { .with( tmp -> LOGGER.infof( "DeletePolicy with id: " + VALID_MESSAGE, policyRuleBasic.getPolicyRuleId())); "DeletePolicy with id: " + VALID_MESSAGE, policyRuleBasic.getPolicyRuleId())); contextService.removePolicyRule(policyId).subscribe().with(x -> {}); subscriptionList.get(policyId).cancel(); return policyRuleBasic.getPolicyRuleState(); }); } private Uni<List<Boolean>> returnInvalidDeviceIds(List<String> deviceIds) { Loading Loading
src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java +96 −81 Original line number Diff line number Diff line Loading @@ -322,15 +322,19 @@ public class PolicyServiceImpl implements PolicyService { return areDevicesValid .onItem() .transform( areDevices -> { .transform(areDevices -> addDeviceOnContext(areDevices, policyRuleDevice, policyRuleBasic)); } private PolicyRuleState addDeviceOnContext( List<Boolean> areDevices, PolicyRuleDevice policyRuleDevice, PolicyRuleBasic policyRuleBasic) { if (areDevices.contains(false)) { var policyRuleState = new PolicyRuleState( PolicyRuleStateEnum.POLICY_FAILED, String.format( INVALID_MESSAGE, policyRuleDevice.getPolicyRuleBasic().getPolicyRuleId())); INVALID_MESSAGE, policyRuleDevice.getPolicyRuleBasic().getPolicyRuleId())); return policyRuleState; } Loading @@ -353,13 +357,27 @@ public class PolicyServiceImpl implements PolicyService { setPolicyRuleDeviceToContext(policyRuleDevice, VALIDATED_POLICYRULE_STATE); noAlarms = 0; List<Uni<String>> alarmIds = new ArrayList<Uni<String>>(); for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { LOGGER.infof("alarmDescriptor:"); LOGGER.infof(alarmDescriptor.toString()); alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); List<Uni<String>> alarmIds = getAlarmIds(alarmDescriptorList); List<Multi<AlarmResponse>> alarmResponseStreamList = getAlarmResponse(alarmIds, policyRuleDevice); // Merge the promised alarms into one stream (Multi Object) final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE); monitorAlarmResponseForDevice(multi); // TODO: Resubscribe to the stream, if it has ended // TODO: Redesign evaluation of action // evaluateAction(policyRule, alarmDescriptorList, multi); return VALIDATED_POLICYRULE_STATE; } private List<Multi<AlarmResponse>> getAlarmResponse( List<Uni<String>> alarmIds, PolicyRuleDevice policyRuleDevice) { // Transform the alarmIds into promised alarms returned from the // getAlarmResponseStream List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); Loading @@ -368,28 +386,27 @@ public class PolicyServiceImpl implements PolicyService { alarmId .onItem() .transformToMulti( id -> { id -> setPolicyMonitoringDevice(policyRuleDevice, id))); } return alarmResponseStreamList; } private Multi<AlarmResponse> setPolicyMonitoringDevice(PolicyRuleDevice policyRuleDevice, String id){ alarmPolicyRuleDeviceMap.put(id, policyRuleDevice); // TODO: Create infinite subscription var alarmSubscription = new AlarmSubscription(id, 259200, 5000); return monitoringService.getAlarmResponseStream(alarmSubscription); })); } // Merge the promised alarms into one stream (Multi Object) final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE); monitorAlarmResponseForDevice(multi); // TODO: Resubscribe to the stream, if it has ended // TODO: Redesign evaluation of action // evaluateAction(policyRule, alarmDescriptorList, multi); return VALIDATED_POLICYRULE_STATE; }); private List<Uni<String>> getAlarmIds(List<AlarmDescriptor> alarmDescriptorList) { List<Uni<String>> alarmIds = new ArrayList<Uni<String>>(); for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { LOGGER.infof("alarmDescriptor:"); LOGGER.infof(alarmDescriptor.toString()); alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); } return alarmIds; } @Override Loading Loading @@ -477,10 +494,10 @@ public class PolicyServiceImpl implements PolicyService { final var getPolicyRule = contextService.getPolicyRule(policyRuleId); return getPolicyRule .onItem() .transform( policyRule -> { return getPolicyRule.onItem().transform(policyRule -> removePolicyFromContext(policyRule)); } private PolicyRuleState removePolicyFromContext(PolicyRule policyRule) { var policyRuleBasic = policyRule.getPolicyRuleType().getPolicyRuleBasic(); String policyId = policyRuleBasic.getPolicyRuleId(); Loading @@ -495,14 +512,12 @@ public class PolicyServiceImpl implements PolicyService { .with( tmp -> LOGGER.infof( "DeletePolicy with id: " + VALID_MESSAGE, policyRuleBasic.getPolicyRuleId())); "DeletePolicy with id: " + VALID_MESSAGE, policyRuleBasic.getPolicyRuleId())); contextService.removePolicyRule(policyId).subscribe().with(x -> {}); subscriptionList.get(policyId).cancel(); return policyRuleBasic.getPolicyRuleState(); }); } private Uni<List<Boolean>> returnInvalidDeviceIds(List<String> deviceIds) { Loading