diff --git a/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java b/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java index c2f98e31efb351166a5ffaee57bbf455e34e48c8..c94aa37a92bdc5f7d0ad8f47e3b7a7c97e20f313 100644 --- a/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java +++ b/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java @@ -40,6 +40,7 @@ import org.etsi.tfs.policy.context.model.Constraint; import org.etsi.tfs.policy.context.model.ConstraintCustom; import org.etsi.tfs.policy.context.model.ConstraintTypeCustom; import org.etsi.tfs.policy.context.model.ServiceConfig; +import org.etsi.tfs.policy.context.model.ServiceId; import org.etsi.tfs.policy.device.DeviceService; import org.etsi.tfs.policy.model.BooleanOperator; import org.etsi.tfs.policy.model.PolicyRule; @@ -144,81 +145,122 @@ public class PolicyServiceImpl implements PolicyService { return isServiceValid .onItem() .transform( - isService -> { - if (!isService) { - var policyRuleState = - new PolicyRuleState( - PolicyRuleStateEnum.POLICY_FAILED, - String.format(INVALID_MESSAGE, serviceId)); - - return policyRuleState; - } + isService -> + constructPolicyStateBasedOnCriteria( + isService, serviceId, policyRuleService, policyRuleBasic)); + } - final var policyRuleTypeService = new PolicyRuleTypeService(policyRuleService); - final var policyRule = new PolicyRule(policyRuleTypeService); - final var alarmDescriptorList = createAlarmDescriptorList(policyRule); - - if (alarmDescriptorList.isEmpty()) { - var policyRuleState = - new PolicyRuleState( - PolicyRuleStateEnum.POLICY_FAILED, - String.format( - "Invalid PolicyRuleConditions in PolicyRule with ID: %s", - policyRuleBasic.getPolicyRuleId())); - return policyRuleState; - } else { - contextService - .setPolicyRule(policyRule) - .subscribe() - .with( - policyId -> { - setPolicyRuleServiceToContext( - policyRuleService, VALIDATED_POLICYRULE_STATE); - noAlarms = 0; - - // Create an alarmIds list that contains the promised ids returned from - // setKpiAlarm - List> alarmIds = new ArrayList>(); - for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { - LOGGER.infof("alarmDescriptor:"); - LOGGER.infof(alarmDescriptor.toString()); - alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); - } - // Transform the alarmIds into promised alarms returned from the - // getAlarmResponseStream - List> alarmResponseStreamList = new ArrayList<>(); - for (Uni alarmId : alarmIds) { - alarmResponseStreamList.add( - alarmId - .onItem() - .transformToMulti( - id -> { - alarmPolicyRuleServiceMap.put(id, policyRuleService); - - // TODO: Create infinite subscription - var alarmSubscription = - new AlarmSubscription(id, 259200, 5000); - return monitoringService.getAlarmResponseStream( - alarmSubscription); - })); - } - - // Merge the promised alarms into one stream (Multi Object) - final var multi = - Multi.createBy().merging().streams(alarmResponseStreamList); - setPolicyRuleServiceToContext( - policyRuleService, PROVISIONED_POLICYRULE_STATE); - - subscriptionList.put(policyId, monitorAlarmResponseForService(multi)); - - // TODO: Resubscribe to the stream, if it has ended - - // TODO: Redesign evaluation of action - // evaluateAction(policyRule, alarmDescriptorList, multi); - }); - return VALIDATED_POLICYRULE_STATE; - } - }); + private PolicyRuleState constructPolicyStateBasedOnCriteria( + Boolean isService, + ServiceId serviceId, + PolicyRuleService policyRuleService, + PolicyRuleBasic policyRuleBasic) { + + if (!isService) { + var policyRuleState = + new PolicyRuleState( + PolicyRuleStateEnum.POLICY_FAILED, String.format(INVALID_MESSAGE, serviceId)); + + return policyRuleState; + } + + final var policyRuleTypeService = new PolicyRuleTypeService(policyRuleService); + final var policyRule = new PolicyRule(policyRuleTypeService); + final var alarmDescriptorList = createAlarmDescriptorList(policyRule); + + if (alarmDescriptorList.isEmpty()) { + var policyRuleState = + new PolicyRuleState( + PolicyRuleStateEnum.POLICY_FAILED, + String.format( + "Invalid PolicyRuleConditions in PolicyRule with ID: %s", + policyRuleBasic.getPolicyRuleId())); + return policyRuleState; + } + + return setPolicyRuleOnContextAndReturnState(policyRule, policyRuleService, alarmDescriptorList); + } + + private PolicyRuleState setPolicyRuleOnContextAndReturnState( + PolicyRule policyRule, + PolicyRuleService policyRuleService, + List alarmDescriptorList) { + contextService + .setPolicyRule(policyRule) + .subscribe() + .with( + policyId -> + startMonitoringBasedOnAlarmDescriptors( + policyId, policyRuleService, alarmDescriptorList)); + return VALIDATED_POLICYRULE_STATE; + } + + private void startMonitoringBasedOnAlarmDescriptors( + String policyId, + PolicyRuleService policyRuleService, + List alarmDescriptorList) { + setPolicyRuleServiceToContext(policyRuleService, VALIDATED_POLICYRULE_STATE); + noAlarms = 0; + + List> alarmIds = + createAlarmList(alarmDescriptorList); // setAllarmtomonitoring get back alarmid + + List> alarmResponseStreamList = + transformAlarmIds(alarmIds, policyRuleService); + + // Merge the promised alarms into one stream (Multi Object) + final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); + setPolicyRuleServiceToContext(policyRuleService, PROVISIONED_POLICYRULE_STATE); + + subscriptionList.put(policyId, monitorAlarmResponseForService(multi)); + + // TODO: Resubscribe to the stream, if it has ended + + // TODO: Redesign evaluation of action + // evaluateAction(policyRule, alarmDescriptorList, multi); + } + + /** + * Transform the alarmIds into promised alarms returned from the getAlarmResponseStream + * + * @param alarmIds the list of alarm ids + * @param policyRuleService the policy rule service + * @return + */ + private List> transformAlarmIds( + List> alarmIds, PolicyRuleService policyRuleService) { + List> alarmResponseStreamList = new ArrayList<>(); + for (Uni alarmId : alarmIds) { + Multi alarmResponseStream = + alarmId.onItem().transformToMulti(id -> setPolicyMonitor(policyRuleService, id)); + + alarmResponseStreamList.add(alarmResponseStream); + } + return alarmResponseStreamList; + } + + private Multi setPolicyMonitor(PolicyRuleService policyRuleService, String id) { + alarmPolicyRuleServiceMap.put(id, policyRuleService); + + // TODO: Create infinite subscription + var alarmSubscription = new AlarmSubscription(id, 259200, 5000); + return monitoringService.getAlarmResponseStream(alarmSubscription); + } + + /** + * Create an alarmIds list that contains the promised ids returned from setKpiAlarm + * + * @param alarmDescriptorList the list of alarm descriptors + * @return the list of alarm descriptors + */ + public List> createAlarmList(List alarmDescriptorList) { + List> alarmIds = new ArrayList>(); + for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { + LOGGER.infof("alarmDescriptor:"); + LOGGER.infof(alarmDescriptor.toString()); + alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); + } + return alarmIds; } @Override @@ -248,74 +290,102 @@ public class PolicyServiceImpl implements PolicyService { return areDevicesValid .onItem() - .transform( - areDevices -> { - if (areDevices.contains(false)) { - var policyRuleState = - new PolicyRuleState( - PolicyRuleStateEnum.POLICY_FAILED, - String.format( - INVALID_MESSAGE, - policyRuleDevice.getPolicyRuleBasic().getPolicyRuleId())); - - return policyRuleState; - } + .transform(areDevices -> areDeviceOnContext(areDevices, policyRuleDevice, policyRuleBasic)); + } - final var policyRuleTypeDevice = new PolicyRuleTypeDevice(policyRuleDevice); - final var policyRule = new PolicyRule(policyRuleTypeDevice); - - final var alarmDescriptorList = createAlarmDescriptorList(policyRule); - if (alarmDescriptorList.isEmpty()) { - var policyRuleState = - new PolicyRuleState( - PolicyRuleStateEnum.POLICY_FAILED, - String.format( - "Invalid PolicyRuleConditions in PolicyRule with ID: %s", - policyRuleBasic.getPolicyRuleId())); - return policyRuleState; - } + private PolicyRuleState areDeviceOnContext( + List areDevices, + PolicyRuleDevice policyRuleDevice, + PolicyRuleBasic policyRuleBasic) { + if (areDevices.contains(false)) { + var policyRuleState = + new PolicyRuleState( + PolicyRuleStateEnum.POLICY_FAILED, + String.format( + INVALID_MESSAGE, policyRuleDevice.getPolicyRuleBasic().getPolicyRuleId())); - contextService.setPolicyRule(policyRule).subscribe().with(x -> {}); - setPolicyRuleDeviceToContext(policyRuleDevice, VALIDATED_POLICYRULE_STATE); - noAlarms = 0; + return policyRuleState; + } - List> alarmIds = new ArrayList>(); - for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { - LOGGER.infof("alarmDescriptor:"); - LOGGER.infof(alarmDescriptor.toString()); - alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); - } + final var policyRuleTypeDevice = new PolicyRuleTypeDevice(policyRuleDevice); + final var policyRule = new PolicyRule(policyRuleTypeDevice); - // Transform the alarmIds into promised alarms returned from the - // getAlarmResponseStream - List> alarmResponseStreamList = new ArrayList<>(); - for (Uni alarmId : alarmIds) { - alarmResponseStreamList.add( - alarmId - .onItem() - .transformToMulti( - id -> { - alarmPolicyRuleDeviceMap.put(id, policyRuleDevice); - - // TODO: Create infinite subscription - var alarmSubscription = new AlarmSubscription(id, 259200, 5000); - return monitoringService.getAlarmResponseStream(alarmSubscription); - })); - } + final var alarmDescriptorList = createAlarmDescriptorList(policyRule); + if (alarmDescriptorList.isEmpty()) { + var policyRuleState = + new PolicyRuleState( + PolicyRuleStateEnum.POLICY_FAILED, + String.format( + "Invalid PolicyRuleConditions in PolicyRule with ID: %s", + policyRuleBasic.getPolicyRuleId())); + return policyRuleState; + } - // Merge the promised alarms into one stream (Multi Object) - final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); - setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE); + contextService + .setPolicyRule(policyRule) + .subscribe() + .with( + policyId -> { + startMonitoringBasedOnAlarmDescriptors( + policyId, policyRuleDevice, alarmDescriptorList); + }); - monitorAlarmResponseForDevice(multi); + return VALIDATED_POLICYRULE_STATE; + } - // TODO: Resubscribe to the stream, if it has ended + private void startMonitoringBasedOnAlarmDescriptors( + String policyId, + PolicyRuleDevice policyRuleDevice, + List alarmDescriptorList) { + setPolicyRuleDeviceToContext(policyRuleDevice, VALIDATED_POLICYRULE_STATE); + noAlarms = 0; - // TODO: Redesign evaluation of action - // evaluateAction(policyRule, alarmDescriptorList, multi); + List> alarmIds = getAlarmIds(alarmDescriptorList); - return VALIDATED_POLICYRULE_STATE; - }); + List> alarmResponseStreamList = + getAlarmResponse(alarmIds, policyRuleDevice); + + // Merge the promised alarms into one stream (Multi Object) + final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); + setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE); + + subscriptionList.put(policyId, monitorAlarmResponseForDevice(multi)); + + // TODO: Resubscribe to the stream, if it has ended + + // TODO: Redesign evaluation of action + // evaluateAction(policyRule, alarmDescriptorList, multi); + } + + private List> getAlarmResponse( + List> alarmIds, PolicyRuleDevice policyRuleDevice) { + // Transform the alarmIds into promised alarms returned from the + // getAlarmResponseStream + List> alarmResponseStreamList = new ArrayList<>(); + for (Uni alarmId : alarmIds) { + alarmResponseStreamList.add( + alarmId.onItem().transformToMulti(id -> setPolicyMonitoringDevice(policyRuleDevice, id))); + } + return alarmResponseStreamList; + } + + private Multi setPolicyMonitoringDevice( + PolicyRuleDevice policyRuleDevice, String id) { + alarmPolicyRuleDeviceMap.put(id, policyRuleDevice); + + // TODO: Create infinite subscription + var alarmSubscription = new AlarmSubscription(id, 259200, 5000); + return monitoringService.getAlarmResponseStream(alarmSubscription); + } + + private List> getAlarmIds(List alarmDescriptorList) { + List> alarmIds = new ArrayList>(); + for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { + LOGGER.infof("alarmDescriptor:"); + LOGGER.infof(alarmDescriptor.toString()); + alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); + } + return alarmIds; } @Override @@ -403,34 +473,32 @@ public class PolicyServiceImpl implements PolicyService { final var getPolicyRule = contextService.getPolicyRule(policyRuleId); - return getPolicyRule - .onItem() - .transform( - policyRule -> { - var policyRuleBasic = policyRule.getPolicyRuleType().getPolicyRuleBasic(); - String policyId = policyRuleBasic.getPolicyRuleId(); + return getPolicyRule.onItem().transform(policyRule -> removePolicyFromContext(policyRule)); + } - policyRule - .getPolicyRuleType() - .getPolicyRuleBasic() - .setPolicyRuleState(REMOVED_POLICYRULE_STATE); + private PolicyRuleState removePolicyFromContext(PolicyRule policyRule) { + var policyRuleBasic = policyRule.getPolicyRuleType().getPolicyRuleBasic(); + String policyId = policyRuleBasic.getPolicyRuleId(); - contextService - .setPolicyRule(policyRule) - .subscribe() - .with( - tmp -> - LOGGER.infof( - "DeletePolicy with id: " + VALID_MESSAGE, - policyRuleBasic.getPolicyRuleId())); + policyRule + .getPolicyRuleType() + .getPolicyRuleBasic() + .setPolicyRuleState(REMOVED_POLICYRULE_STATE); - contextService.removePolicyRule(policyId).subscribe().with(x -> {}); + contextService + .setPolicyRule(policyRule) + .subscribe() + .with( + tmp -> + LOGGER.infof( + "DeletePolicy with id: " + VALID_MESSAGE, policyRuleBasic.getPolicyRuleId())); - // TODO: When the Map doesn't contains the policyId we should throw an exception? - if (subscriptionList.contains(policyId)) subscriptionList.get(policyId).cancel(); + contextService.removePolicyRule(policyId).subscribe().with(x -> {}); - return policyRuleBasic.getPolicyRuleState(); - }); + // TODO: When the Map doesn't contains the policyId we should throw an exception? + if (subscriptionList.contains(policyId)) subscriptionList.get(policyId).cancel(); + + return policyRuleBasic.getPolicyRuleState(); } private Uni> returnInvalidDeviceIds(List deviceIds) { @@ -484,8 +552,8 @@ public class PolicyServiceImpl implements PolicyService { }); } - private void monitorAlarmResponseForDevice(Multi multi) { - multi + private Cancellable monitorAlarmResponseForDevice(Multi multi) { + return multi .subscribe() .with( alarmResponse -> { diff --git a/src/policy/src/test/java/org/etsi/tfs/policy/PolicyAddDeviceTest.java b/src/policy/src/test/java/org/etsi/tfs/policy/PolicyAddDeviceTest.java index 3c4a1577b550d56e853f6374594051c4e08838aa..7c7c6b1b5e096ac9422ec5209b213e4f4435410b 100644 --- a/src/policy/src/test/java/org/etsi/tfs/policy/PolicyAddDeviceTest.java +++ b/src/policy/src/test/java/org/etsi/tfs/policy/PolicyAddDeviceTest.java @@ -42,6 +42,7 @@ import org.etsi.tfs.policy.model.PolicyRuleCondition; import org.etsi.tfs.policy.model.PolicyRuleDevice; import org.etsi.tfs.policy.model.PolicyRuleState; import org.etsi.tfs.policy.model.PolicyRuleStateEnum; +import org.etsi.tfs.policy.monitoring.MonitoringService; import org.etsi.tfs.policy.monitoring.model.IntegerKpiValue; import org.etsi.tfs.policy.monitoring.model.KpiValue; import org.junit.jupiter.api.BeforeAll; @@ -57,6 +58,7 @@ class PolicyAddDeviceTest { @InjectMock ContextService contextService; + @InjectMock MonitoringService monitoringService; static PolicyRuleBasic policyRuleBasic; static PolicyRuleDevice policyRuleDevice; diff --git a/src/policy/src/test/java/org/etsi/tfs/policy/PolicyDeleteServiceTest.java b/src/policy/src/test/java/org/etsi/tfs/policy/PolicyDeleteServiceTest.java index a62c5dd3d6316b8a3a86a8f7745af08170404bd4..56e686bf6822a577439b020ea71d8c7f40b51c94 100644 --- a/src/policy/src/test/java/org/etsi/tfs/policy/PolicyDeleteServiceTest.java +++ b/src/policy/src/test/java/org/etsi/tfs/policy/PolicyDeleteServiceTest.java @@ -46,6 +46,7 @@ import org.etsi.tfs.policy.model.PolicyRuleState; import org.etsi.tfs.policy.model.PolicyRuleStateEnum; import org.etsi.tfs.policy.model.PolicyRuleType; import org.etsi.tfs.policy.model.PolicyRuleTypeService; +import org.etsi.tfs.policy.monitoring.MonitoringService; import org.etsi.tfs.policy.monitoring.model.IntegerKpiValue; import org.etsi.tfs.policy.monitoring.model.KpiValue; import org.junit.jupiter.api.BeforeAll; @@ -58,6 +59,8 @@ class PolicyDeleteServiceTest { @Inject PolicyServiceImpl policyService; @InjectMock ContextService contextService; + @InjectMock MonitoringService monitoringService; + static PolicyRuleBasic policyRuleBasic; static PolicyRuleService policyRuleService; diff --git a/src/policy/src/test/java/org/etsi/tfs/policy/PolicyUpdateDeviceTest.java b/src/policy/src/test/java/org/etsi/tfs/policy/PolicyUpdateDeviceTest.java index 0cc2d5a70d3a5070eda718f906145de2b468cc6f..ac8757508f2e0ccb5575fe210fe21819ab7e93aa 100644 --- a/src/policy/src/test/java/org/etsi/tfs/policy/PolicyUpdateDeviceTest.java +++ b/src/policy/src/test/java/org/etsi/tfs/policy/PolicyUpdateDeviceTest.java @@ -41,6 +41,7 @@ import org.etsi.tfs.policy.model.PolicyRuleCondition; import org.etsi.tfs.policy.model.PolicyRuleDevice; import org.etsi.tfs.policy.model.PolicyRuleState; import org.etsi.tfs.policy.model.PolicyRuleStateEnum; +import org.etsi.tfs.policy.monitoring.MonitoringService; import org.etsi.tfs.policy.monitoring.model.IntegerKpiValue; import org.etsi.tfs.policy.monitoring.model.KpiValue; import org.junit.jupiter.api.BeforeAll; @@ -54,6 +55,8 @@ class PolicyUpdateDeviceTest { @InjectMock PolicyRuleConditionValidator policyRuleConditionValidator; + @InjectMock MonitoringService monitoringService; + static PolicyRuleBasic policyRuleBasic; static PolicyRuleDevice policyRuleDevice;