Skip to content
Snippets Groups Projects

Resolve "(UBI) Refactor Policy"

Merged Konstantinos Poulakakis requested to merge feat/112-ubi-refactor-policy into develop
Files
2
@@ -38,6 +38,7 @@ import org.etsi.tfs.policy.context.model.Constraint;
import org.etsi.tfs.policy.context.model.ConstraintCustom;
import org.etsi.tfs.policy.context.model.ConstraintTypeCustom;
import org.etsi.tfs.policy.context.model.ServiceConfig;
import org.etsi.tfs.policy.context.model.ServiceId;
import org.etsi.tfs.policy.device.DeviceService;
import org.etsi.tfs.policy.model.BooleanOperator;
import org.etsi.tfs.policy.model.PolicyRule;
@@ -176,81 +177,122 @@ public class PolicyServiceImpl implements PolicyService {
return isServiceValid
.onItem()
.transform(
isService -> {
if (!isService) {
var policyRuleState =
new PolicyRuleState(
PolicyRuleStateEnum.POLICY_FAILED,
String.format(INVALID_MESSAGE, serviceId));
return policyRuleState;
}
isService ->
constructPolicyStateBasedOnCriteria(
isService, serviceId, policyRuleService, policyRuleBasic));
}
final var policyRuleTypeService = new PolicyRuleTypeService(policyRuleService);
final var policyRule = new PolicyRule(policyRuleTypeService);
final var alarmDescriptorList = createAlarmDescriptorList(policyRule);
if (alarmDescriptorList.isEmpty()) {
var policyRuleState =
new PolicyRuleState(
PolicyRuleStateEnum.POLICY_FAILED,
String.format(
"Invalid PolicyRuleConditions in PolicyRule with ID: %s",
policyRuleBasic.getPolicyRuleId()));
return policyRuleState;
} else {
contextService
.setPolicyRule(policyRule)
.subscribe()
.with(
policyId -> {
setPolicyRuleServiceToContext(
policyRuleService, VALIDATED_POLICYRULE_STATE);
noAlarms = 0;
// Create an alarmIds list that contains the promised ids returned from
// setKpiAlarm
List<Uni<String>> alarmIds = new ArrayList<Uni<String>>();
for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) {
LOGGER.infof("alarmDescriptor:");
LOGGER.infof(alarmDescriptor.toString());
alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor));
}
// Transform the alarmIds into promised alarms returned from the
// getAlarmResponseStream
List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>();
for (Uni<String> alarmId : alarmIds) {
alarmResponseStreamList.add(
alarmId
.onItem()
.transformToMulti(
id -> {
alarmPolicyRuleServiceMap.put(id, policyRuleService);
// TODO: Create infinite subscription
var alarmSubscription =
new AlarmSubscription(id, 259200, 5000);
return monitoringService.getAlarmResponseStream(
alarmSubscription);
}));
}
// Merge the promised alarms into one stream (Multi Object)
final var multi =
Multi.createBy().merging().streams(alarmResponseStreamList);
setPolicyRuleServiceToContext(
policyRuleService, PROVISIONED_POLICYRULE_STATE);
subscriptionList.put(policyId, monitorAlarmResponseForService(multi));
// TODO: Resubscribe to the stream, if it has ended
// TODO: Redesign evaluation of action
// evaluateAction(policyRule, alarmDescriptorList, multi);
});
return VALIDATED_POLICYRULE_STATE;
}
});
private PolicyRuleState constructPolicyStateBasedOnCriteria(
Boolean isService,
ServiceId serviceId,
PolicyRuleService policyRuleService,
PolicyRuleBasic policyRuleBasic) {
if (!isService) {
var policyRuleState =
new PolicyRuleState(
PolicyRuleStateEnum.POLICY_FAILED, String.format(INVALID_MESSAGE, serviceId));
return policyRuleState;
}
final var policyRuleTypeService = new PolicyRuleTypeService(policyRuleService);
final var policyRule = new PolicyRule(policyRuleTypeService);
final var alarmDescriptorList = createAlarmDescriptorList(policyRule);
if (alarmDescriptorList.isEmpty()) {
var policyRuleState =
new PolicyRuleState(
PolicyRuleStateEnum.POLICY_FAILED,
String.format(
"Invalid PolicyRuleConditions in PolicyRule with ID: %s",
policyRuleBasic.getPolicyRuleId()));
return policyRuleState;
}
return setPolicyRuleOnContextAndReturnState(policyRule, policyRuleService, alarmDescriptorList);
}
private PolicyRuleState setPolicyRuleOnContextAndReturnState(
PolicyRule policyRule,
PolicyRuleService policyRuleService,
List<AlarmDescriptor> alarmDescriptorList) {
contextService
.setPolicyRule(policyRule)
.subscribe()
.with(
policyId ->
startMonitoringBasedOnAlarmDescriptors(
policyId, policyRuleService, alarmDescriptorList));
return VALIDATED_POLICYRULE_STATE;
}
private void startMonitoringBasedOnAlarmDescriptors(
String policyId,
PolicyRuleService policyRuleService,
List<AlarmDescriptor> alarmDescriptorList) {
setPolicyRuleServiceToContext(policyRuleService, VALIDATED_POLICYRULE_STATE);
noAlarms = 0;
List<Uni<String>> alarmIds =
createAlarmList(alarmDescriptorList); // setAllarmtomonitoring get back alarmid
List<Multi<AlarmResponse>> alarmResponseStreamList =
transformAlarmIds(alarmIds, policyRuleService);
// Merge the promised alarms into one stream (Multi Object)
final var multi = Multi.createBy().merging().streams(alarmResponseStreamList);
setPolicyRuleServiceToContext(policyRuleService, PROVISIONED_POLICYRULE_STATE);
subscriptionList.put(policyId, monitorAlarmResponseForService(multi));
// TODO: Resubscribe to the stream, if it has ended
// TODO: Redesign evaluation of action
// evaluateAction(policyRule, alarmDescriptorList, multi);
}
/**
* Transform the alarmIds into promised alarms returned from the getAlarmResponseStream
*
* @param alarmIds the list of alarm ids
* @param policyRuleService the policy rule service
* @return
*/
private List<Multi<AlarmResponse>> transformAlarmIds(
List<Uni<String>> alarmIds, PolicyRuleService policyRuleService) {
List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>();
for (Uni<String> alarmId : alarmIds) {
Multi<AlarmResponse> alarmResponseStream =
alarmId.onItem().transformToMulti(id -> setPolicyMonitor(policyRuleService, id));
alarmResponseStreamList.add(alarmResponseStream);
}
return alarmResponseStreamList;
}
private Multi<AlarmResponse> setPolicyMonitor(PolicyRuleService policyRuleService, String id) {
alarmPolicyRuleServiceMap.put(id, policyRuleService);
// TODO: Create infinite subscription
var alarmSubscription = new AlarmSubscription(id, 259200, 5000);
return monitoringService.getAlarmResponseStream(alarmSubscription);
}
/**
* Create an alarmIds list that contains the promised ids returned from setKpiAlarm
*
* @param alarmDescriptorList the list of alarm descriptors
* @return the list of alarm descriptors
*/
public List<Uni<String>> createAlarmList(List<AlarmDescriptor> alarmDescriptorList) {
List<Uni<String>> alarmIds = new ArrayList<Uni<String>>();
for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) {
LOGGER.infof("alarmDescriptor:");
LOGGER.infof(alarmDescriptor.toString());
alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor));
}
return alarmIds;
}
@Override
@@ -280,74 +322,102 @@ public class PolicyServiceImpl implements PolicyService {
return areDevicesValid
.onItem()
.transform(
areDevices -> {
if (areDevices.contains(false)) {
var policyRuleState =
new PolicyRuleState(
PolicyRuleStateEnum.POLICY_FAILED,
String.format(
INVALID_MESSAGE,
policyRuleDevice.getPolicyRuleBasic().getPolicyRuleId()));
return policyRuleState;
}
.transform(areDevices -> areDeviceOnContext(areDevices, policyRuleDevice, policyRuleBasic));
}
final var policyRuleTypeDevice = new PolicyRuleTypeDevice(policyRuleDevice);
final var policyRule = new PolicyRule(policyRuleTypeDevice);
final var alarmDescriptorList = createAlarmDescriptorList(policyRule);
if (alarmDescriptorList.isEmpty()) {
var policyRuleState =
new PolicyRuleState(
PolicyRuleStateEnum.POLICY_FAILED,
String.format(
"Invalid PolicyRuleConditions in PolicyRule with ID: %s",
policyRuleBasic.getPolicyRuleId()));
return policyRuleState;
}
private PolicyRuleState areDeviceOnContext(
List<Boolean> areDevices,
PolicyRuleDevice policyRuleDevice,
PolicyRuleBasic policyRuleBasic) {
if (areDevices.contains(false)) {
var policyRuleState =
new PolicyRuleState(
PolicyRuleStateEnum.POLICY_FAILED,
String.format(
INVALID_MESSAGE, policyRuleDevice.getPolicyRuleBasic().getPolicyRuleId()));
contextService.setPolicyRule(policyRule).subscribe().with(x -> {});
setPolicyRuleDeviceToContext(policyRuleDevice, VALIDATED_POLICYRULE_STATE);
noAlarms = 0;
return policyRuleState;
}
List<Uni<String>> alarmIds = new ArrayList<Uni<String>>();
for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) {
LOGGER.infof("alarmDescriptor:");
LOGGER.infof(alarmDescriptor.toString());
alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor));
}
final var policyRuleTypeDevice = new PolicyRuleTypeDevice(policyRuleDevice);
final var policyRule = new PolicyRule(policyRuleTypeDevice);
// Transform the alarmIds into promised alarms returned from the
// getAlarmResponseStream
List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>();
for (Uni<String> alarmId : alarmIds) {
alarmResponseStreamList.add(
alarmId
.onItem()
.transformToMulti(
id -> {
alarmPolicyRuleDeviceMap.put(id, policyRuleDevice);
// TODO: Create infinite subscription
var alarmSubscription = new AlarmSubscription(id, 259200, 5000);
return monitoringService.getAlarmResponseStream(alarmSubscription);
}));
}
final var alarmDescriptorList = createAlarmDescriptorList(policyRule);
if (alarmDescriptorList.isEmpty()) {
var policyRuleState =
new PolicyRuleState(
PolicyRuleStateEnum.POLICY_FAILED,
String.format(
"Invalid PolicyRuleConditions in PolicyRule with ID: %s",
policyRuleBasic.getPolicyRuleId()));
return policyRuleState;
}
// Merge the promised alarms into one stream (Multi Object)
final var multi = Multi.createBy().merging().streams(alarmResponseStreamList);
setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE);
contextService
.setPolicyRule(policyRule)
.subscribe()
.with(
policyId -> {
startMonitoringBasedOnAlarmDescriptors(
policyId, policyRuleDevice, alarmDescriptorList);
});
monitorAlarmResponseForDevice(multi);
return VALIDATED_POLICYRULE_STATE;
}
// TODO: Resubscribe to the stream, if it has ended
private void startMonitoringBasedOnAlarmDescriptors(
String policyId,
PolicyRuleDevice policyRuleDevice,
List<AlarmDescriptor> alarmDescriptorList) {
setPolicyRuleDeviceToContext(policyRuleDevice, VALIDATED_POLICYRULE_STATE);
noAlarms = 0;
// TODO: Redesign evaluation of action
// evaluateAction(policyRule, alarmDescriptorList, multi);
List<Uni<String>> alarmIds = getAlarmIds(alarmDescriptorList);
return VALIDATED_POLICYRULE_STATE;
});
List<Multi<AlarmResponse>> alarmResponseStreamList =
getAlarmResponse(alarmIds, policyRuleDevice);
// Merge the promised alarms into one stream (Multi Object)
final var multi = Multi.createBy().merging().streams(alarmResponseStreamList);
setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE);
subscriptionList.put(policyId, monitorAlarmResponseForDevice(multi));
// TODO: Resubscribe to the stream, if it has ended
// TODO: Redesign evaluation of action
// evaluateAction(policyRule, alarmDescriptorList, multi);
}
private List<Multi<AlarmResponse>> getAlarmResponse(
List<Uni<String>> alarmIds, PolicyRuleDevice policyRuleDevice) {
// Transform the alarmIds into promised alarms returned from the
// getAlarmResponseStream
List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>();
for (Uni<String> alarmId : alarmIds) {
alarmResponseStreamList.add(
alarmId.onItem().transformToMulti(id -> setPolicyMonitoringDevice(policyRuleDevice, id)));
}
return alarmResponseStreamList;
}
private Multi<AlarmResponse> setPolicyMonitoringDevice(
PolicyRuleDevice policyRuleDevice, String id) {
alarmPolicyRuleDeviceMap.put(id, policyRuleDevice);
// TODO: Create infinite subscription
var alarmSubscription = new AlarmSubscription(id, 259200, 5000);
return monitoringService.getAlarmResponseStream(alarmSubscription);
}
private List<Uni<String>> getAlarmIds(List<AlarmDescriptor> alarmDescriptorList) {
List<Uni<String>> alarmIds = new ArrayList<Uni<String>>();
for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) {
LOGGER.infof("alarmDescriptor:");
LOGGER.infof(alarmDescriptor.toString());
alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor));
}
return alarmIds;
}
@Override
@@ -435,32 +505,30 @@ public class PolicyServiceImpl implements PolicyService {
final var getPolicyRule = contextService.getPolicyRule(policyRuleId);
return getPolicyRule
.onItem()
.transform(
policyRule -> {
var policyRuleBasic = policyRule.getPolicyRuleType().getPolicyRuleBasic();
String policyId = policyRuleBasic.getPolicyRuleId();
return getPolicyRule.onItem().transform(policyRule -> removePolicyFromContext(policyRule));
}
policyRule
.getPolicyRuleType()
.getPolicyRuleBasic()
.setPolicyRuleState(REMOVED_POLICYRULE_STATE);
private PolicyRuleState removePolicyFromContext(PolicyRule policyRule) {
var policyRuleBasic = policyRule.getPolicyRuleType().getPolicyRuleBasic();
String policyId = policyRuleBasic.getPolicyRuleId();
contextService
.setPolicyRule(policyRule)
.subscribe()
.with(
tmp ->
LOGGER.infof(
"DeletePolicy with id: " + VALID_MESSAGE,
policyRuleBasic.getPolicyRuleId()));
policyRule
.getPolicyRuleType()
.getPolicyRuleBasic()
.setPolicyRuleState(REMOVED_POLICYRULE_STATE);
contextService.removePolicyRule(policyId).subscribe().with(x -> {});
subscriptionList.get(policyId).cancel();
contextService
.setPolicyRule(policyRule)
.subscribe()
.with(
tmp ->
LOGGER.infof(
"DeletePolicy with id: " + VALID_MESSAGE, policyRuleBasic.getPolicyRuleId()));
return policyRuleBasic.getPolicyRuleState();
});
contextService.removePolicyRule(policyId).subscribe().with(x -> {});
subscriptionList.get(policyId).cancel();
return policyRuleBasic.getPolicyRuleState();
}
private Uni<List<Boolean>> returnInvalidDeviceIds(List<String> deviceIds) {
@@ -514,8 +582,8 @@ public class PolicyServiceImpl implements PolicyService {
});
}
private void monitorAlarmResponseForDevice(Multi<AlarmResponse> multi) {
multi
private Cancellable monitorAlarmResponseForDevice(Multi<AlarmResponse> multi) {
return multi
.subscribe()
.with(
alarmResponse -> {
Loading