Loading src/policy/src/main/docker/Dockerfile.multistage.jvm +1 −1 Original line number Diff line number Diff line Loading @@ -29,7 +29,7 @@ RUN mvn --errors --batch-mode package -Dmaven.test.skip=true # Stage 2 FROM builder AS unit-test RUN mvn --errors --batch-mode -Pgenerate-consolidated-coverage verify #RUN ./mvnw --errors --batch-mode -Pgenerate-consolidated-coverage verify # Stage 3 FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4 AS release Loading src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java +161 −67 Original line number Diff line number Diff line Loading @@ -48,7 +48,6 @@ import eu.teraflow.policy.service.ServiceService; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.groups.UniJoin; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; Loading Loading @@ -156,6 +155,7 @@ public class PolicyServiceImpl implements PolicyService { return Uni.createFrom().item(policyRuleState); } LOGGER.infof("Passed 1st"); final var policyRuleBasic = policyRuleService.getPolicyRuleBasic(); if (!policyRuleBasic.areArgumentsValid()) { Loading @@ -165,11 +165,14 @@ public class PolicyServiceImpl implements PolicyService { PolicyRuleStateEnum.POLICY_FAILED, policyRuleBasic.getExeceptionMessage()); return Uni.createFrom().item(policyRuleState); } LOGGER.infof("Passed 2nd"); final var serviceId = policyRuleService.getServiceId(); final var deviceIds = policyRuleService.getDeviceIds(); final var isServiceValid = policyRuleConditionValidator.isServiceIdValid(serviceId, deviceIds); LOGGER.infof("Passed 3rd %s", isServiceValid); return isServiceValid .onItem() .transform( Loading @@ -183,9 +186,14 @@ public class PolicyServiceImpl implements PolicyService { return policyRuleState; } LOGGER.infof("Passed 4th %s", isService); final var policyRuleTypeService = new PolicyRuleTypeService(policyRuleService); final var policyRule = new PolicyRule(policyRuleTypeService); final var alarmDescriptorList = createAlarmDescriptorList(policyRule); LOGGER.infof("Passed 5th %s", alarmDescriptorList); if (alarmDescriptorList.isEmpty()) { var policyRuleState = new PolicyRuleState( Loading @@ -196,22 +204,47 @@ public class PolicyServiceImpl implements PolicyService { return policyRuleState; } List<AlarmSubscription> alarmSubscriptionList = new ArrayList<>(); LOGGER.infof("Passed 6th"); // Create an alarmIds list that contains the promised ids returned from setKpiAlarm List<Uni<String>> alarmIds = new ArrayList<Uni<String>>(); for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { monitoringService .setKpiAlarm(alarmDescriptor) .subscribe() .with( alarmId -> { alarmSubscriptionList.add(new AlarmSubscription(alarmId, 0, 0)); }); LOGGER.infof("alarmDescriptor:"); LOGGER.infof(alarmDescriptor.toString()); alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); } LOGGER.infof("Passed 7th"); LOGGER.infof("%s", alarmIds); // Transform the alarmIds into promised alarms returned from the getAlarmResponseStream List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); for (Uni<String> alarmId : alarmIds) { alarmResponseStreamList.add( alarmId.onItem().transformToMulti(id -> { alarmPolicyRuleServiceMap.put(id, policyRuleService); var alarmSubscription = new AlarmSubscription(id, 60, 500); LOGGER.infof("Creating Alarm Subscription with id: %s", id); return monitoringService.getAlarmResponseStream(alarmSubscription); }) ); } final var multi = setAlarmResponseStream( policyRule, alarmDescriptorList, alarmSubscriptionList, true); // Merge the promised alarms into one stream (Multi Object) final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); //multi // .subscribe() // .with( // x -> { // LOGGER.info(x); // } // ); monitorAlarmResponseForService(multi); evaluateAction(policyRule, alarmDescriptorList, multi); LOGGER.infof("Passed 8th"); //evaluateAction(policyRule, alarmDescriptorList, multi); return VALIDATED_POLICYRULE_STATE; }); Loading Loading @@ -282,11 +315,11 @@ public class PolicyServiceImpl implements PolicyService { }); } final var multi = setAlarmResponseStream( policyRule, alarmDescriptorList, alarmSubscriptionList, false); monitorAlarmResponseForDevice(multi); evaluateAction(policyRule, alarmDescriptorList, multi); // final var multi = // setAlarmResponseStream( // policyRule, alarmDescriptorList, alarmSubscriptionList, false); // monitorAlarmResponseForDevice(multi); // evaluateAction(policyRule, alarmDescriptorList, multi); return VALIDATED_POLICYRULE_STATE; }); Loading Loading @@ -423,9 +456,10 @@ public class PolicyServiceImpl implements PolicyService { if (alarmDescriptorList.isEmpty()) { return List.of(); } for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { alarmPolicyRuleServiceMap.put(alarmDescriptor.getAlarmId(), policyRuleService); } // Moved in subscription, as we need the returned value //for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { // alarmPolicyRuleServiceMap.put(alarmDescriptor.getAlarmId(), policyRuleService); //} } else { final var policyRuleDevice = (PolicyRuleDevice) policyRuleTypeSpecificType; final var policyRuleBasic = policyRuleDevice.getPolicyRuleBasic(); Loading @@ -442,32 +476,55 @@ public class PolicyServiceImpl implements PolicyService { return alarmDescriptorList; } private Multi<AlarmResponse> setAlarmResponseStream( PolicyRule policyRule, List<AlarmDescriptor> alarmDescriptorList, List<AlarmSubscription> alarmSubscriptionList, Boolean isService) { List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); for (AlarmSubscription alarmSubscription : alarmSubscriptionList) { alarmResponseStreamList.add(monitoringService.getAlarmResponseStream(alarmSubscription)); } return Multi.createBy().merging().streams(alarmResponseStreamList); } // Deprecated // private Multi<AlarmResponse> setAlarmResponseStream( // PolicyRule policyRule, // List<Uni<String>> alarmIds, // Boolean isService) { // // LOGGER.infof("Just entered setAlarmResponseStream"); // // List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); // for (Uni<String> alarmId : alarmIds) { // alarmId // .subscribe() // .with( // id -> { // var alarmSubscription = new AlarmSubscription(id, 60, 500); // LOGGER.infof("Creating Alarm Subscription with id: %s", id); // //alarmResponseStreamList.add(monitoringService.getAlarmResponseStream(alarmSubscription)); // monitoringService.getAlarmResponseStream(alarmSubscription).subscribe().with(x -> {LOGGER.info(x);}); // LOGGER.infof("Created Alarm Subscription with id: %s", id); // }); // } // LOGGER.info("*****************************************************************************"); // // for (Multi<AlarmResponse> a: alarmResponseStreamList) { // LOGGER.infof("Begin subscribing to the multi object"); // a // .subscribe() // .with( // b -> { // LOGGER.infof("b"); // } // ); // } // // return Multi.createBy().merging().streams(alarmResponseStreamList); // } private void monitorAlarmResponseForService(Multi<AlarmResponse> multi) { LOGGER.infof("Just entered monitorAlarmResponseForService"); multi .select() .first() .subscribe() .with( alarmResponse -> { LOGGER.infof("alarmResponse:"); LOGGER.info(alarmResponse); if (!alarmPolicyRuleServiceMap.containsKey(alarmResponse.getAlarmId())) { return; } LOGGER.info(alarmResponse.getAlarmId()); if (alarmPolicyRuleServiceMap.containsKey(alarmResponse.getAlarmId())) { applyActionService(alarmResponse.getAlarmId()); } }); } Loading @@ -491,13 +548,14 @@ public class PolicyServiceImpl implements PolicyService { List<AlarmDescriptor> alarmDescriptorList, Multi<AlarmResponse> multi) { Long count = multi .collect() .with(Collectors.counting()) .await() .atMost(Duration.ofMinutes(POLICY_EVALUATION_TIMEOUT)); .subscribe() .with( count -> { LOGGER.infof("Inside evaluateAction"); LOGGER.infof(count.toString(count)); if (count > ACCEPTABLE_NUMBER_OF_ALARMS) { for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { monitoringService Loading @@ -513,8 +571,38 @@ public class PolicyServiceImpl implements PolicyService { } else { setPolicyRuleToContext(policyRule, EFFECTIVE_POLICYRULE_STATE); } }); } // private void evaluateAction( // PolicyRule policyRule, // List<AlarmDescriptor> alarmDescriptorList, // Multi<AlarmResponse> multi) { // // Long count = // multi // .collect() // .with(Collectors.counting()) // .await() // .atMost(Duration.ofMinutes(POLICY_EVALUATION_TIMEOUT)); // // if (count > ACCEPTABLE_NUMBER_OF_ALARMS) { // for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { // monitoringService // .deleteAlarm(alarmDescriptor.getAlarmId()) // .subscribe() // .with( // emptyMessage -> // LOGGER.infof( // "Alarm [%s] has been deleted as ineffective.\n", // alarmDescriptor.getAlarmId())); // } // setPolicyRuleToContext(policyRule, INEFFECTIVE_POLICYRULE_STATE); // } else { // setPolicyRuleToContext(policyRule, EFFECTIVE_POLICYRULE_STATE); // } // } private void applyActionDevice(String alarmId) { PolicyRuleDevice policyRuleDevice = alarmPolicyRuleDeviceMap.get(alarmId); Loading Loading @@ -630,8 +718,14 @@ public class PolicyServiceImpl implements PolicyService { } private void applyActionService(String alarmId) { LOGGER.info("Inside applyActionService"); PolicyRuleService policyRuleService = alarmPolicyRuleServiceMap.get(alarmId); PolicyRuleAction policyRuleAction = policyRuleActionMap.get(alarmId); LOGGER.info(policyRuleService); PolicyRuleAction policyRuleAction = policyRuleService.getPolicyRuleBasic().getPolicyRuleActions().get(0); LOGGER.info(policyRuleAction); PolicyRuleAction test = policyRuleActionMap.get(alarmId); LOGGER.info(test); setPolicyRuleServiceToContext(policyRuleService, ACTIVE_POLICYRULE_STATE); Loading Loading @@ -767,16 +861,16 @@ public class PolicyServiceImpl implements PolicyService { false); case POLICY_RULE_CONDITION_NUMERICAL_GREATER_THAN: return new KpiValueRange(policyRuleCondition.getKpiValue(), null, false, false, false); return new KpiValueRange(null, policyRuleCondition.getKpiValue(), true, false, false); case POLICY_RULE_CONDITION_NUMERICAL_GREATER_THAN_EQUAL: return new KpiValueRange(policyRuleCondition.getKpiValue(), null, false, true, false); return new KpiValueRange(null, policyRuleCondition.getKpiValue(), true, true, false); case POLICY_RULE_CONDITION_NUMERICAL_LESS_THAN: return new KpiValueRange(null, policyRuleCondition.getKpiValue(), false, false, false); return new KpiValueRange(policyRuleCondition.getKpiValue(), null, true, false, false); case POLICY_RULE_CONDITION_NUMERICAL_LESS_THAN_EQUAL: return new KpiValueRange(null, policyRuleCondition.getKpiValue(), false, false, true); return new KpiValueRange(policyRuleCondition.getKpiValue(), null, true, false, true); default: return null; } Loading src/policy/src/main/java/eu/teraflow/policy/Serializer.java +23 −18 Original line number Diff line number Diff line Loading @@ -1549,26 +1549,29 @@ public class Serializer { public Monitoring.KpiValueRange serialize(KpiValueRange kpiValueRange) { final var builder = Monitoring.KpiValueRange.newBuilder(); LOGGER.infof("inside KpiValueRange %s", kpiValueRange); final var kpiValueMin = kpiValueRange.getKpiMinValue(); final var kpiValueMax = kpiValueRange.getKpiMaxValue(); Monitoring.KpiValue serializedKpiValueMin; Monitoring.KpiValue serializedKpiValueMax; if (kpiValueMin == null && kpiValueMax == null) { throw new IllegalStateException("KPI value max and min cannot be both null"); } else if (kpiValueMax == null) { final var serializedKpiValueMin = serialize(kpiValueMin); builder.setKpiMinValue(serializedKpiValueMin); builder.setInRange(false); serializedKpiValueMin = serialize(kpiValueMin); serializedKpiValueMax = serialize(new StringKpiValue("NaN")); } else if (kpiValueMin == null) { final var serializedKpiValueMax = serialize(kpiValueMax); builder.setKpiMaxValue(serializedKpiValueMax); builder.setInRange(false); serializedKpiValueMin = serialize(new StringKpiValue("NaN")); serializedKpiValueMax = serialize(kpiValueMax); } else { final var serializedKpiValueMin = serialize(kpiValueMin); final var serializedKpiValueMax = serialize(kpiValueMax); serializedKpiValueMin = serialize(kpiValueMin); serializedKpiValueMax = serialize(kpiValueMax); } builder.setKpiMaxValue(serializedKpiValueMax); builder.setKpiMinValue(serializedKpiValueMin); builder.setInRange(true); } return builder.build(); } Loading Loading @@ -1603,27 +1606,29 @@ public class Serializer { } public Monitoring.AlarmDescriptor serialize(AlarmDescriptor alarmDescriptor) { LOGGER.infof("inside serialize %s", alarmDescriptor); final var builder = Monitoring.AlarmDescriptor.newBuilder(); final var alarmId = alarmDescriptor.getAlarmId(); // final var alarmId = alarmDescriptor.getAlarmId(); final var alarmDescription = alarmDescriptor.getAlarmDescription(); final var name = alarmDescriptor.getName(); final var kpiId = alarmDescriptor.getKpiId(); final var kpiValueRange = alarmDescriptor.getKpiValueRange(); final var timestamp = alarmDescriptor.getTimestamp(); // final var timestamp = alarmDescriptor.getTimestamp(); final var serializedAlarmId = serializeAlarmId(alarmId); // final var serializedAlarmId = serializeAlarmId(alarmId); final var serializedKpiId = serializeKpiId(kpiId); final var serializedKpiValueRange = serialize(kpiValueRange); final var serializedTimestamp = serialize(timestamp); // final var serializedTimestamp = serialize(timestamp); builder.setAlarmId(serializedAlarmId); // builder.setAlarmId(serializedAlarmId); builder.setAlarmDescription(alarmDescription); builder.setName(name); builder.setKpiId(serializedKpiId); builder.setKpiValueRange(serializedKpiValueRange); builder.setTimestamp(serializedTimestamp); // builder.setTimestamp(serializedTimestamp); LOGGER.infof("just before leaving serialize"); return builder.build(); } Loading src/policy/src/main/java/eu/teraflow/policy/context/ContextGatewayImpl.java +18 −5 Original line number Diff line number Diff line Loading @@ -28,10 +28,13 @@ import io.quarkus.grpc.GrpcClient; import io.smallrye.mutiny.Uni; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import org.jboss.logging.Logger; @ApplicationScoped public class ContextGatewayImpl implements ContextGateway { private static final Logger LOGGER = Logger.getLogger(ContextGatewayImpl.class); @GrpcClient("context") MutinyContextServiceStub streamingDelegateContext; Loading @@ -52,6 +55,8 @@ public class ContextGatewayImpl implements ContextGateway { final var serializedServiceId = serializer.serialize(serviceId); LOGGER.infof("getService = %s", serializedServiceId); return streamingDelegateContext .getService(serializedServiceId) .onItem() Loading Loading @@ -92,11 +97,19 @@ public class ContextGatewayImpl implements ContextGateway { public Uni<String> setPolicyRule(PolicyRule policyRule) { // return Uni.createFrom().item("571eabc1-0f59-48da-b608-c45876c3fa8a"); final var serializedPolicyRuleBasic = serializer.serialize(policyRule); LOGGER.infof("Inside setPolicyRule"); return streamingDelegateContextPolicy var ret = streamingDelegateContextPolicy .setPolicyRule(serializedPolicyRuleBasic) .onItem() .transform(serializer::deserialize); ret.subscribe() .with( x -> { LOGGER.infof(x); }); return ret; } @Override Loading src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringGatewayImpl.java +9 −0 Original line number Diff line number Diff line Loading @@ -31,10 +31,13 @@ import io.smallrye.mutiny.Uni; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import monitoring.MutinyMonitoringServiceGrpc.MutinyMonitoringServiceStub; import org.jboss.logging.Logger; @ApplicationScoped public class MonitoringGatewayImpl implements MonitoringGateway { private static final Logger LOGGER = Logger.getLogger(MonitoringGatewayImpl.class); @GrpcClient("monitoring") MutinyMonitoringServiceStub streamingDelegateMonitoring; Loading Loading @@ -99,6 +102,9 @@ public class MonitoringGatewayImpl implements MonitoringGateway { public Uni<String> setKpiAlarm(AlarmDescriptor alarmDescriptor) { final var serializedAlarmDescriptor = serializer.serialize(alarmDescriptor); LOGGER.infof("inside setKpiAlarm"); LOGGER.infof("AlarmDescriptor = %s", serializedAlarmDescriptor); return streamingDelegateMonitoring .setKpiAlarm(serializedAlarmDescriptor) .onItem() Loading @@ -119,6 +125,9 @@ public class MonitoringGatewayImpl implements MonitoringGateway { public Multi<AlarmResponse> getAlarmResponseStream(AlarmSubscription alarmSubscription) { final var serializedAlarmSubscription = serializer.serialize(alarmSubscription); LOGGER.infof("inside getAlarmResponseStream"); LOGGER.infof("AlarmSubscription = %s", serializedAlarmSubscription); return streamingDelegateMonitoring .getAlarmResponseStream(serializedAlarmSubscription) .onItem() Loading Loading
src/policy/src/main/docker/Dockerfile.multistage.jvm +1 −1 Original line number Diff line number Diff line Loading @@ -29,7 +29,7 @@ RUN mvn --errors --batch-mode package -Dmaven.test.skip=true # Stage 2 FROM builder AS unit-test RUN mvn --errors --batch-mode -Pgenerate-consolidated-coverage verify #RUN ./mvnw --errors --batch-mode -Pgenerate-consolidated-coverage verify # Stage 3 FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4 AS release Loading
src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java +161 −67 Original line number Diff line number Diff line Loading @@ -48,7 +48,6 @@ import eu.teraflow.policy.service.ServiceService; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.groups.UniJoin; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; Loading Loading @@ -156,6 +155,7 @@ public class PolicyServiceImpl implements PolicyService { return Uni.createFrom().item(policyRuleState); } LOGGER.infof("Passed 1st"); final var policyRuleBasic = policyRuleService.getPolicyRuleBasic(); if (!policyRuleBasic.areArgumentsValid()) { Loading @@ -165,11 +165,14 @@ public class PolicyServiceImpl implements PolicyService { PolicyRuleStateEnum.POLICY_FAILED, policyRuleBasic.getExeceptionMessage()); return Uni.createFrom().item(policyRuleState); } LOGGER.infof("Passed 2nd"); final var serviceId = policyRuleService.getServiceId(); final var deviceIds = policyRuleService.getDeviceIds(); final var isServiceValid = policyRuleConditionValidator.isServiceIdValid(serviceId, deviceIds); LOGGER.infof("Passed 3rd %s", isServiceValid); return isServiceValid .onItem() .transform( Loading @@ -183,9 +186,14 @@ public class PolicyServiceImpl implements PolicyService { return policyRuleState; } LOGGER.infof("Passed 4th %s", isService); final var policyRuleTypeService = new PolicyRuleTypeService(policyRuleService); final var policyRule = new PolicyRule(policyRuleTypeService); final var alarmDescriptorList = createAlarmDescriptorList(policyRule); LOGGER.infof("Passed 5th %s", alarmDescriptorList); if (alarmDescriptorList.isEmpty()) { var policyRuleState = new PolicyRuleState( Loading @@ -196,22 +204,47 @@ public class PolicyServiceImpl implements PolicyService { return policyRuleState; } List<AlarmSubscription> alarmSubscriptionList = new ArrayList<>(); LOGGER.infof("Passed 6th"); // Create an alarmIds list that contains the promised ids returned from setKpiAlarm List<Uni<String>> alarmIds = new ArrayList<Uni<String>>(); for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { monitoringService .setKpiAlarm(alarmDescriptor) .subscribe() .with( alarmId -> { alarmSubscriptionList.add(new AlarmSubscription(alarmId, 0, 0)); }); LOGGER.infof("alarmDescriptor:"); LOGGER.infof(alarmDescriptor.toString()); alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); } LOGGER.infof("Passed 7th"); LOGGER.infof("%s", alarmIds); // Transform the alarmIds into promised alarms returned from the getAlarmResponseStream List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); for (Uni<String> alarmId : alarmIds) { alarmResponseStreamList.add( alarmId.onItem().transformToMulti(id -> { alarmPolicyRuleServiceMap.put(id, policyRuleService); var alarmSubscription = new AlarmSubscription(id, 60, 500); LOGGER.infof("Creating Alarm Subscription with id: %s", id); return monitoringService.getAlarmResponseStream(alarmSubscription); }) ); } final var multi = setAlarmResponseStream( policyRule, alarmDescriptorList, alarmSubscriptionList, true); // Merge the promised alarms into one stream (Multi Object) final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); //multi // .subscribe() // .with( // x -> { // LOGGER.info(x); // } // ); monitorAlarmResponseForService(multi); evaluateAction(policyRule, alarmDescriptorList, multi); LOGGER.infof("Passed 8th"); //evaluateAction(policyRule, alarmDescriptorList, multi); return VALIDATED_POLICYRULE_STATE; }); Loading Loading @@ -282,11 +315,11 @@ public class PolicyServiceImpl implements PolicyService { }); } final var multi = setAlarmResponseStream( policyRule, alarmDescriptorList, alarmSubscriptionList, false); monitorAlarmResponseForDevice(multi); evaluateAction(policyRule, alarmDescriptorList, multi); // final var multi = // setAlarmResponseStream( // policyRule, alarmDescriptorList, alarmSubscriptionList, false); // monitorAlarmResponseForDevice(multi); // evaluateAction(policyRule, alarmDescriptorList, multi); return VALIDATED_POLICYRULE_STATE; }); Loading Loading @@ -423,9 +456,10 @@ public class PolicyServiceImpl implements PolicyService { if (alarmDescriptorList.isEmpty()) { return List.of(); } for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { alarmPolicyRuleServiceMap.put(alarmDescriptor.getAlarmId(), policyRuleService); } // Moved in subscription, as we need the returned value //for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { // alarmPolicyRuleServiceMap.put(alarmDescriptor.getAlarmId(), policyRuleService); //} } else { final var policyRuleDevice = (PolicyRuleDevice) policyRuleTypeSpecificType; final var policyRuleBasic = policyRuleDevice.getPolicyRuleBasic(); Loading @@ -442,32 +476,55 @@ public class PolicyServiceImpl implements PolicyService { return alarmDescriptorList; } private Multi<AlarmResponse> setAlarmResponseStream( PolicyRule policyRule, List<AlarmDescriptor> alarmDescriptorList, List<AlarmSubscription> alarmSubscriptionList, Boolean isService) { List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); for (AlarmSubscription alarmSubscription : alarmSubscriptionList) { alarmResponseStreamList.add(monitoringService.getAlarmResponseStream(alarmSubscription)); } return Multi.createBy().merging().streams(alarmResponseStreamList); } // Deprecated // private Multi<AlarmResponse> setAlarmResponseStream( // PolicyRule policyRule, // List<Uni<String>> alarmIds, // Boolean isService) { // // LOGGER.infof("Just entered setAlarmResponseStream"); // // List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); // for (Uni<String> alarmId : alarmIds) { // alarmId // .subscribe() // .with( // id -> { // var alarmSubscription = new AlarmSubscription(id, 60, 500); // LOGGER.infof("Creating Alarm Subscription with id: %s", id); // //alarmResponseStreamList.add(monitoringService.getAlarmResponseStream(alarmSubscription)); // monitoringService.getAlarmResponseStream(alarmSubscription).subscribe().with(x -> {LOGGER.info(x);}); // LOGGER.infof("Created Alarm Subscription with id: %s", id); // }); // } // LOGGER.info("*****************************************************************************"); // // for (Multi<AlarmResponse> a: alarmResponseStreamList) { // LOGGER.infof("Begin subscribing to the multi object"); // a // .subscribe() // .with( // b -> { // LOGGER.infof("b"); // } // ); // } // // return Multi.createBy().merging().streams(alarmResponseStreamList); // } private void monitorAlarmResponseForService(Multi<AlarmResponse> multi) { LOGGER.infof("Just entered monitorAlarmResponseForService"); multi .select() .first() .subscribe() .with( alarmResponse -> { LOGGER.infof("alarmResponse:"); LOGGER.info(alarmResponse); if (!alarmPolicyRuleServiceMap.containsKey(alarmResponse.getAlarmId())) { return; } LOGGER.info(alarmResponse.getAlarmId()); if (alarmPolicyRuleServiceMap.containsKey(alarmResponse.getAlarmId())) { applyActionService(alarmResponse.getAlarmId()); } }); } Loading @@ -491,13 +548,14 @@ public class PolicyServiceImpl implements PolicyService { List<AlarmDescriptor> alarmDescriptorList, Multi<AlarmResponse> multi) { Long count = multi .collect() .with(Collectors.counting()) .await() .atMost(Duration.ofMinutes(POLICY_EVALUATION_TIMEOUT)); .subscribe() .with( count -> { LOGGER.infof("Inside evaluateAction"); LOGGER.infof(count.toString(count)); if (count > ACCEPTABLE_NUMBER_OF_ALARMS) { for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { monitoringService Loading @@ -513,8 +571,38 @@ public class PolicyServiceImpl implements PolicyService { } else { setPolicyRuleToContext(policyRule, EFFECTIVE_POLICYRULE_STATE); } }); } // private void evaluateAction( // PolicyRule policyRule, // List<AlarmDescriptor> alarmDescriptorList, // Multi<AlarmResponse> multi) { // // Long count = // multi // .collect() // .with(Collectors.counting()) // .await() // .atMost(Duration.ofMinutes(POLICY_EVALUATION_TIMEOUT)); // // if (count > ACCEPTABLE_NUMBER_OF_ALARMS) { // for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { // monitoringService // .deleteAlarm(alarmDescriptor.getAlarmId()) // .subscribe() // .with( // emptyMessage -> // LOGGER.infof( // "Alarm [%s] has been deleted as ineffective.\n", // alarmDescriptor.getAlarmId())); // } // setPolicyRuleToContext(policyRule, INEFFECTIVE_POLICYRULE_STATE); // } else { // setPolicyRuleToContext(policyRule, EFFECTIVE_POLICYRULE_STATE); // } // } private void applyActionDevice(String alarmId) { PolicyRuleDevice policyRuleDevice = alarmPolicyRuleDeviceMap.get(alarmId); Loading Loading @@ -630,8 +718,14 @@ public class PolicyServiceImpl implements PolicyService { } private void applyActionService(String alarmId) { LOGGER.info("Inside applyActionService"); PolicyRuleService policyRuleService = alarmPolicyRuleServiceMap.get(alarmId); PolicyRuleAction policyRuleAction = policyRuleActionMap.get(alarmId); LOGGER.info(policyRuleService); PolicyRuleAction policyRuleAction = policyRuleService.getPolicyRuleBasic().getPolicyRuleActions().get(0); LOGGER.info(policyRuleAction); PolicyRuleAction test = policyRuleActionMap.get(alarmId); LOGGER.info(test); setPolicyRuleServiceToContext(policyRuleService, ACTIVE_POLICYRULE_STATE); Loading Loading @@ -767,16 +861,16 @@ public class PolicyServiceImpl implements PolicyService { false); case POLICY_RULE_CONDITION_NUMERICAL_GREATER_THAN: return new KpiValueRange(policyRuleCondition.getKpiValue(), null, false, false, false); return new KpiValueRange(null, policyRuleCondition.getKpiValue(), true, false, false); case POLICY_RULE_CONDITION_NUMERICAL_GREATER_THAN_EQUAL: return new KpiValueRange(policyRuleCondition.getKpiValue(), null, false, true, false); return new KpiValueRange(null, policyRuleCondition.getKpiValue(), true, true, false); case POLICY_RULE_CONDITION_NUMERICAL_LESS_THAN: return new KpiValueRange(null, policyRuleCondition.getKpiValue(), false, false, false); return new KpiValueRange(policyRuleCondition.getKpiValue(), null, true, false, false); case POLICY_RULE_CONDITION_NUMERICAL_LESS_THAN_EQUAL: return new KpiValueRange(null, policyRuleCondition.getKpiValue(), false, false, true); return new KpiValueRange(policyRuleCondition.getKpiValue(), null, true, false, true); default: return null; } Loading
src/policy/src/main/java/eu/teraflow/policy/Serializer.java +23 −18 Original line number Diff line number Diff line Loading @@ -1549,26 +1549,29 @@ public class Serializer { public Monitoring.KpiValueRange serialize(KpiValueRange kpiValueRange) { final var builder = Monitoring.KpiValueRange.newBuilder(); LOGGER.infof("inside KpiValueRange %s", kpiValueRange); final var kpiValueMin = kpiValueRange.getKpiMinValue(); final var kpiValueMax = kpiValueRange.getKpiMaxValue(); Monitoring.KpiValue serializedKpiValueMin; Monitoring.KpiValue serializedKpiValueMax; if (kpiValueMin == null && kpiValueMax == null) { throw new IllegalStateException("KPI value max and min cannot be both null"); } else if (kpiValueMax == null) { final var serializedKpiValueMin = serialize(kpiValueMin); builder.setKpiMinValue(serializedKpiValueMin); builder.setInRange(false); serializedKpiValueMin = serialize(kpiValueMin); serializedKpiValueMax = serialize(new StringKpiValue("NaN")); } else if (kpiValueMin == null) { final var serializedKpiValueMax = serialize(kpiValueMax); builder.setKpiMaxValue(serializedKpiValueMax); builder.setInRange(false); serializedKpiValueMin = serialize(new StringKpiValue("NaN")); serializedKpiValueMax = serialize(kpiValueMax); } else { final var serializedKpiValueMin = serialize(kpiValueMin); final var serializedKpiValueMax = serialize(kpiValueMax); serializedKpiValueMin = serialize(kpiValueMin); serializedKpiValueMax = serialize(kpiValueMax); } builder.setKpiMaxValue(serializedKpiValueMax); builder.setKpiMinValue(serializedKpiValueMin); builder.setInRange(true); } return builder.build(); } Loading Loading @@ -1603,27 +1606,29 @@ public class Serializer { } public Monitoring.AlarmDescriptor serialize(AlarmDescriptor alarmDescriptor) { LOGGER.infof("inside serialize %s", alarmDescriptor); final var builder = Monitoring.AlarmDescriptor.newBuilder(); final var alarmId = alarmDescriptor.getAlarmId(); // final var alarmId = alarmDescriptor.getAlarmId(); final var alarmDescription = alarmDescriptor.getAlarmDescription(); final var name = alarmDescriptor.getName(); final var kpiId = alarmDescriptor.getKpiId(); final var kpiValueRange = alarmDescriptor.getKpiValueRange(); final var timestamp = alarmDescriptor.getTimestamp(); // final var timestamp = alarmDescriptor.getTimestamp(); final var serializedAlarmId = serializeAlarmId(alarmId); // final var serializedAlarmId = serializeAlarmId(alarmId); final var serializedKpiId = serializeKpiId(kpiId); final var serializedKpiValueRange = serialize(kpiValueRange); final var serializedTimestamp = serialize(timestamp); // final var serializedTimestamp = serialize(timestamp); builder.setAlarmId(serializedAlarmId); // builder.setAlarmId(serializedAlarmId); builder.setAlarmDescription(alarmDescription); builder.setName(name); builder.setKpiId(serializedKpiId); builder.setKpiValueRange(serializedKpiValueRange); builder.setTimestamp(serializedTimestamp); // builder.setTimestamp(serializedTimestamp); LOGGER.infof("just before leaving serialize"); return builder.build(); } Loading
src/policy/src/main/java/eu/teraflow/policy/context/ContextGatewayImpl.java +18 −5 Original line number Diff line number Diff line Loading @@ -28,10 +28,13 @@ import io.quarkus.grpc.GrpcClient; import io.smallrye.mutiny.Uni; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import org.jboss.logging.Logger; @ApplicationScoped public class ContextGatewayImpl implements ContextGateway { private static final Logger LOGGER = Logger.getLogger(ContextGatewayImpl.class); @GrpcClient("context") MutinyContextServiceStub streamingDelegateContext; Loading @@ -52,6 +55,8 @@ public class ContextGatewayImpl implements ContextGateway { final var serializedServiceId = serializer.serialize(serviceId); LOGGER.infof("getService = %s", serializedServiceId); return streamingDelegateContext .getService(serializedServiceId) .onItem() Loading Loading @@ -92,11 +97,19 @@ public class ContextGatewayImpl implements ContextGateway { public Uni<String> setPolicyRule(PolicyRule policyRule) { // return Uni.createFrom().item("571eabc1-0f59-48da-b608-c45876c3fa8a"); final var serializedPolicyRuleBasic = serializer.serialize(policyRule); LOGGER.infof("Inside setPolicyRule"); return streamingDelegateContextPolicy var ret = streamingDelegateContextPolicy .setPolicyRule(serializedPolicyRuleBasic) .onItem() .transform(serializer::deserialize); ret.subscribe() .with( x -> { LOGGER.infof(x); }); return ret; } @Override Loading
src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringGatewayImpl.java +9 −0 Original line number Diff line number Diff line Loading @@ -31,10 +31,13 @@ import io.smallrye.mutiny.Uni; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import monitoring.MutinyMonitoringServiceGrpc.MutinyMonitoringServiceStub; import org.jboss.logging.Logger; @ApplicationScoped public class MonitoringGatewayImpl implements MonitoringGateway { private static final Logger LOGGER = Logger.getLogger(MonitoringGatewayImpl.class); @GrpcClient("monitoring") MutinyMonitoringServiceStub streamingDelegateMonitoring; Loading Loading @@ -99,6 +102,9 @@ public class MonitoringGatewayImpl implements MonitoringGateway { public Uni<String> setKpiAlarm(AlarmDescriptor alarmDescriptor) { final var serializedAlarmDescriptor = serializer.serialize(alarmDescriptor); LOGGER.infof("inside setKpiAlarm"); LOGGER.infof("AlarmDescriptor = %s", serializedAlarmDescriptor); return streamingDelegateMonitoring .setKpiAlarm(serializedAlarmDescriptor) .onItem() Loading @@ -119,6 +125,9 @@ public class MonitoringGatewayImpl implements MonitoringGateway { public Multi<AlarmResponse> getAlarmResponseStream(AlarmSubscription alarmSubscription) { final var serializedAlarmSubscription = serializer.serialize(alarmSubscription); LOGGER.infof("inside getAlarmResponseStream"); LOGGER.infof("AlarmSubscription = %s", serializedAlarmSubscription); return streamingDelegateMonitoring .getAlarmResponseStream(serializedAlarmSubscription) .onItem() Loading