Loading src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java +92 −5 Original line number Diff line number Diff line Loading @@ -284,11 +284,98 @@ public class PolicyServiceImpl implements PolicyService { final var policyRuleTypeDevice = new PolicyRuleTypeDevice(policyRuleDevice); final var policyRule = new PolicyRule(policyRuleTypeDevice); contextService .setPolicyRule(policyRule) .subscribe() .with(id -> validateDevice(policyRuleDevice)); return Uni.createFrom().item(policyRuleBasic.getPolicyRuleState()); final var alarmDescriptorList = createAlarmDescriptorList(policyRule); if (alarmDescriptorList.isEmpty()) { var policyRuleState = new PolicyRuleState( PolicyRuleStateEnum.POLICY_FAILED, String.format( "Invalid PolicyRuleConditions in PolicyRule with ID: %s", policyRuleBasic.getPolicyRuleId())); return policyRuleState; } contextService.setPolicyRule(policyRule).subscribe().with(x -> {}); setPolicyRuleDeviceToContext(policyRuleDevice, VALIDATED_POLICYRULE_STATE); noAlarms = 0; List<Uni<String>> alarmIds = new ArrayList<Uni<String>>(); for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { LOGGER.infof("alarmDescriptor:"); LOGGER.infof(alarmDescriptor.toString()); alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); } // Transform the alarmIds into promised alarms returned from the // getAlarmResponseStream List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); for (Uni<String> alarmId : alarmIds) { alarmResponseStreamList.add( alarmId .onItem() .transformToMulti( id -> { alarmPolicyRuleDeviceMap.put(id, policyRuleDevice); // TODO: Create infinite subscription var alarmSubscription = new AlarmSubscription(id, 259200, 5000); return monitoringService.getAlarmResponseStream(alarmSubscription); })); } // Merge the promised alarms into one stream (Multi Object) final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE); monitorAlarmResponseForDevice(multi); // TODO: Resubscribe to the stream, if it has ended // TODO: Redesign evaluation of action // evaluateAction(policyRule, alarmDescriptorList, multi); return VALIDATED_POLICYRULE_STATE; }); } @Override public Uni<PolicyRuleState> updatePolicyService(PolicyRuleService policyRuleService) { LOGGER.infof("Received %s", policyRuleService); if (!policyRuleService.areArgumentsValid()) { LOGGER.error(policyRuleService.getExeceptionMessage()); final var policyRuleState = new PolicyRuleState( PolicyRuleStateEnum.POLICY_FAILED, policyRuleService.getExeceptionMessage()); return Uni.createFrom().item(policyRuleState); } final var policyRuleBasic = policyRuleService.getPolicyRuleBasic(); if (!policyRuleBasic.areArgumentsValid()) { LOGGER.error(policyRuleService.getExeceptionMessage()); final var policyRuleState = new PolicyRuleState( PolicyRuleStateEnum.POLICY_FAILED, policyRuleBasic.getExeceptionMessage()); return Uni.createFrom().item(policyRuleState); } final var serviceId = policyRuleService.getServiceId(); final var policyRuleId = policyRuleBasic.getPolicyRuleId(); final var isPolicyRuleServiceValid = policyRuleConditionValidator.isPolicyRuleServiceValid(policyRuleId, serviceId); return isPolicyRuleServiceValid .onItem() .transform( isPolicyRuleService -> { if (!isPolicyRuleService) { return new PolicyRuleState( PolicyRuleStateEnum.POLICY_FAILED, String.format(INVALID_MESSAGE, serviceId)); } return VALIDATED_POLICYRULE_STATE; }); } @Override Loading Loading
src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java +92 −5 Original line number Diff line number Diff line Loading @@ -284,11 +284,98 @@ public class PolicyServiceImpl implements PolicyService { final var policyRuleTypeDevice = new PolicyRuleTypeDevice(policyRuleDevice); final var policyRule = new PolicyRule(policyRuleTypeDevice); contextService .setPolicyRule(policyRule) .subscribe() .with(id -> validateDevice(policyRuleDevice)); return Uni.createFrom().item(policyRuleBasic.getPolicyRuleState()); final var alarmDescriptorList = createAlarmDescriptorList(policyRule); if (alarmDescriptorList.isEmpty()) { var policyRuleState = new PolicyRuleState( PolicyRuleStateEnum.POLICY_FAILED, String.format( "Invalid PolicyRuleConditions in PolicyRule with ID: %s", policyRuleBasic.getPolicyRuleId())); return policyRuleState; } contextService.setPolicyRule(policyRule).subscribe().with(x -> {}); setPolicyRuleDeviceToContext(policyRuleDevice, VALIDATED_POLICYRULE_STATE); noAlarms = 0; List<Uni<String>> alarmIds = new ArrayList<Uni<String>>(); for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { LOGGER.infof("alarmDescriptor:"); LOGGER.infof(alarmDescriptor.toString()); alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); } // Transform the alarmIds into promised alarms returned from the // getAlarmResponseStream List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); for (Uni<String> alarmId : alarmIds) { alarmResponseStreamList.add( alarmId .onItem() .transformToMulti( id -> { alarmPolicyRuleDeviceMap.put(id, policyRuleDevice); // TODO: Create infinite subscription var alarmSubscription = new AlarmSubscription(id, 259200, 5000); return monitoringService.getAlarmResponseStream(alarmSubscription); })); } // Merge the promised alarms into one stream (Multi Object) final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE); monitorAlarmResponseForDevice(multi); // TODO: Resubscribe to the stream, if it has ended // TODO: Redesign evaluation of action // evaluateAction(policyRule, alarmDescriptorList, multi); return VALIDATED_POLICYRULE_STATE; }); } @Override public Uni<PolicyRuleState> updatePolicyService(PolicyRuleService policyRuleService) { LOGGER.infof("Received %s", policyRuleService); if (!policyRuleService.areArgumentsValid()) { LOGGER.error(policyRuleService.getExeceptionMessage()); final var policyRuleState = new PolicyRuleState( PolicyRuleStateEnum.POLICY_FAILED, policyRuleService.getExeceptionMessage()); return Uni.createFrom().item(policyRuleState); } final var policyRuleBasic = policyRuleService.getPolicyRuleBasic(); if (!policyRuleBasic.areArgumentsValid()) { LOGGER.error(policyRuleService.getExeceptionMessage()); final var policyRuleState = new PolicyRuleState( PolicyRuleStateEnum.POLICY_FAILED, policyRuleBasic.getExeceptionMessage()); return Uni.createFrom().item(policyRuleState); } final var serviceId = policyRuleService.getServiceId(); final var policyRuleId = policyRuleBasic.getPolicyRuleId(); final var isPolicyRuleServiceValid = policyRuleConditionValidator.isPolicyRuleServiceValid(policyRuleId, serviceId); return isPolicyRuleServiceValid .onItem() .transform( isPolicyRuleService -> { if (!isPolicyRuleService) { return new PolicyRuleState( PolicyRuleStateEnum.POLICY_FAILED, String.format(INVALID_MESSAGE, serviceId)); } return VALIDATED_POLICYRULE_STATE; }); } @Override Loading