Loading src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java +66 −53 Original line number Diff line number Diff line Loading @@ -48,6 +48,7 @@ import eu.teraflow.policy.service.ServiceService; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.groups.UniJoin; import io.smallrye.mutiny.subscription.Cancellable; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; Loading Loading @@ -118,6 +119,7 @@ public class PolicyServiceImpl implements PolicyService { new ConcurrentHashMap<>(); private ConcurrentHashMap<String, PolicyRuleDevice> alarmPolicyRuleDeviceMap = new ConcurrentHashMap<>(); private ConcurrentHashMap<String, Cancellable> subscriptionList = new ConcurrentHashMap<>(); @Inject public PolicyServiceImpl( Loading Loading @@ -196,19 +198,26 @@ public class PolicyServiceImpl implements PolicyService { "Invalid PolicyRuleConditions in PolicyRule with ID: %s", policyRuleBasic.getPolicyRuleId())); return policyRuleState; } contextService.setPolicyRule(policyRule).subscribe().with(x -> {}); setPolicyRuleServiceToContext(policyRuleService, VALIDATED_POLICYRULE_STATE); } else { contextService .setPolicyRule(policyRule) .subscribe() .with( policyId -> { setPolicyRuleServiceToContext( policyRuleService, VALIDATED_POLICYRULE_STATE); noAlarms = 0; // Create an alarmIds list that contains the promised ids returned from setKpiAlarm // Create an alarmIds list that contains the promised ids returned from // setKpiAlarm List<Uni<String>> alarmIds = new ArrayList<Uni<String>>(); for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { LOGGER.infof("alarmDescriptor:"); LOGGER.infof(alarmDescriptor.toString()); alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); } //LOGGER.infof("THIS IS A TEST!"); //LOGGER.infof("%s", alarmIds); // Transform the alarmIds into promised alarms returned from the // getAlarmResponseStream List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); Loading @@ -221,23 +230,28 @@ public class PolicyServiceImpl implements PolicyService { alarmPolicyRuleServiceMap.put(id, policyRuleService); // TODO: Create infinite subscription var alarmSubscription = new AlarmSubscription(id, 259200, 5000); return monitoringService.getAlarmResponseStream(alarmSubscription); var alarmSubscription = new AlarmSubscription(id, 259200, 5000); return monitoringService.getAlarmResponseStream( alarmSubscription); })); } // Merge the promised alarms into one stream (Multi Object) final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); setPolicyRuleServiceToContext(policyRuleService, PROVISIONED_POLICYRULE_STATE); final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); setPolicyRuleServiceToContext( policyRuleService, PROVISIONED_POLICYRULE_STATE); monitorAlarmResponseForService(multi); subscriptionList.put(policyId, monitorAlarmResponseForService(multi)); // TODO: Resubscribe to the stream, if it has ended // TODO: Redesign evaluation of action // evaluateAction(policyRule, alarmDescriptorList, multi); }); return VALIDATED_POLICYRULE_STATE; } }); } Loading Loading @@ -428,6 +442,7 @@ public class PolicyServiceImpl implements PolicyService { .transform( policyRule -> { var policyRuleBasic = policyRule.getPolicyRuleType().getPolicyRuleBasic(); String policyId = policyRuleBasic.getPolicyRuleId(); policyRule .getPolicyRuleType() Loading @@ -443,6 +458,9 @@ public class PolicyServiceImpl implements PolicyService { "DeletePolicy with id: " + VALID_MESSAGE, policyRuleBasic.getPolicyRuleId())); contextService.removePolicyRule(policyId).subscribe().with(x -> {}); subscriptionList.get(policyId).cancel(); return policyRuleBasic.getPolicyRuleState(); }); } Loading Loading @@ -485,8 +503,8 @@ public class PolicyServiceImpl implements PolicyService { return alarmDescriptorList; } private void monitorAlarmResponseForService(Multi<AlarmResponse> multi) { multi private Cancellable monitorAlarmResponseForService(Multi<AlarmResponse> multi) { return multi .subscribe() .with( alarmResponse -> { Loading Loading @@ -725,7 +743,7 @@ public class PolicyServiceImpl implements PolicyService { // TODO: Temp fix for AlarmDescriptor object AlarmDescriptor alarmDescriptor = new AlarmDescriptor( "alarmId-" + gen(), "", "alarmDescription", "alarmName-" + gen(), policyRuleCondition.getKpiId(), Loading Loading @@ -770,12 +788,7 @@ public class PolicyServiceImpl implements PolicyService { final var kpiValueRange = convertPolicyRuleConditionToKpiValueRange(policyRuleCondition); return new AlarmDescriptor( "alarmId-" + gen(), "alarmDescription", "alarmName-" + gen(), kpiId, kpiValueRange, getTimeStamp()); "", "alarmDescription", "alarmName-" + gen(), kpiId, kpiValueRange, getTimeStamp()); } private AlarmDescriptor createAlarmDescriptorWithRange( Loading @@ -800,7 +813,7 @@ public class PolicyServiceImpl implements PolicyService { } return new AlarmDescriptor( "alarmId-" + gen(), "", "alarmDescription", "alarmName-" + gen(), kpiId, Loading Loading @@ -899,7 +912,7 @@ public class PolicyServiceImpl implements PolicyService { final var policyRuleTypeService = new PolicyRuleTypeService(policyRuleService); final var policyRule = new PolicyRule(policyRuleTypeService); contextService.setPolicyRule(policyRule); contextService.setPolicyRule(policyRule).subscribe().with(x -> {}); } private void setPolicyRuleDeviceToContext( Loading @@ -912,6 +925,6 @@ public class PolicyServiceImpl implements PolicyService { final var policyRuleTypeService = new PolicyRuleTypeDevice(policyRuleDevice); final var policyRule = new PolicyRule(policyRuleTypeService); contextService.setPolicyRule(policyRule); contextService.setPolicyRule(policyRule).subscribe().with(x -> {}); } } src/policy/target/generated-sources/grpc/context/ContextOuterClass.java +1414 −565 File changed.Preview size limit exceeded, changes collapsed. Show changes src/policy/target/kubernetes/kubernetes.yml +6 −6 Original line number Diff line number Diff line Loading @@ -16,8 +16,8 @@ apiVersion: v1 kind: Service metadata: annotations: app.quarkus.io/commit-id: 8d0654b519e90fe0127e7d1419adce25fa3a179d app.quarkus.io/build-timestamp: 2023-07-10 - 09:41:26 +0000 app.quarkus.io/commit-id: 447bcf0c8224e0b15715f54d82a0936dd93f5542 app.quarkus.io/build-timestamp: 2023-11-07 - 12:10:37 +0000 prometheus.io/scrape: "true" prometheus.io/path: /q/metrics prometheus.io/port: "8080" Loading @@ -42,8 +42,8 @@ apiVersion: apps/v1 kind: Deployment metadata: annotations: app.quarkus.io/commit-id: 8d0654b519e90fe0127e7d1419adce25fa3a179d app.quarkus.io/build-timestamp: 2023-07-10 - 09:41:26 +0000 app.quarkus.io/commit-id: 447bcf0c8224e0b15715f54d82a0936dd93f5542 app.quarkus.io/build-timestamp: 2023-11-07 - 12:10:37 +0000 prometheus.io/scrape: "true" prometheus.io/path: /q/metrics prometheus.io/port: "8080" Loading @@ -60,8 +60,8 @@ spec: template: metadata: annotations: app.quarkus.io/commit-id: 8d0654b519e90fe0127e7d1419adce25fa3a179d app.quarkus.io/build-timestamp: 2023-07-10 - 09:41:26 +0000 app.quarkus.io/commit-id: 447bcf0c8224e0b15715f54d82a0936dd93f5542 app.quarkus.io/build-timestamp: 2023-11-07 - 12:10:37 +0000 prometheus.io/scrape: "true" prometheus.io/path: /q/metrics prometheus.io/port: "8080" Loading Loading
src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java +66 −53 Original line number Diff line number Diff line Loading @@ -48,6 +48,7 @@ import eu.teraflow.policy.service.ServiceService; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.groups.UniJoin; import io.smallrye.mutiny.subscription.Cancellable; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; Loading Loading @@ -118,6 +119,7 @@ public class PolicyServiceImpl implements PolicyService { new ConcurrentHashMap<>(); private ConcurrentHashMap<String, PolicyRuleDevice> alarmPolicyRuleDeviceMap = new ConcurrentHashMap<>(); private ConcurrentHashMap<String, Cancellable> subscriptionList = new ConcurrentHashMap<>(); @Inject public PolicyServiceImpl( Loading Loading @@ -196,19 +198,26 @@ public class PolicyServiceImpl implements PolicyService { "Invalid PolicyRuleConditions in PolicyRule with ID: %s", policyRuleBasic.getPolicyRuleId())); return policyRuleState; } contextService.setPolicyRule(policyRule).subscribe().with(x -> {}); setPolicyRuleServiceToContext(policyRuleService, VALIDATED_POLICYRULE_STATE); } else { contextService .setPolicyRule(policyRule) .subscribe() .with( policyId -> { setPolicyRuleServiceToContext( policyRuleService, VALIDATED_POLICYRULE_STATE); noAlarms = 0; // Create an alarmIds list that contains the promised ids returned from setKpiAlarm // Create an alarmIds list that contains the promised ids returned from // setKpiAlarm List<Uni<String>> alarmIds = new ArrayList<Uni<String>>(); for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { LOGGER.infof("alarmDescriptor:"); LOGGER.infof(alarmDescriptor.toString()); alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); } //LOGGER.infof("THIS IS A TEST!"); //LOGGER.infof("%s", alarmIds); // Transform the alarmIds into promised alarms returned from the // getAlarmResponseStream List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); Loading @@ -221,23 +230,28 @@ public class PolicyServiceImpl implements PolicyService { alarmPolicyRuleServiceMap.put(id, policyRuleService); // TODO: Create infinite subscription var alarmSubscription = new AlarmSubscription(id, 259200, 5000); return monitoringService.getAlarmResponseStream(alarmSubscription); var alarmSubscription = new AlarmSubscription(id, 259200, 5000); return monitoringService.getAlarmResponseStream( alarmSubscription); })); } // Merge the promised alarms into one stream (Multi Object) final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); setPolicyRuleServiceToContext(policyRuleService, PROVISIONED_POLICYRULE_STATE); final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); setPolicyRuleServiceToContext( policyRuleService, PROVISIONED_POLICYRULE_STATE); monitorAlarmResponseForService(multi); subscriptionList.put(policyId, monitorAlarmResponseForService(multi)); // TODO: Resubscribe to the stream, if it has ended // TODO: Redesign evaluation of action // evaluateAction(policyRule, alarmDescriptorList, multi); }); return VALIDATED_POLICYRULE_STATE; } }); } Loading Loading @@ -428,6 +442,7 @@ public class PolicyServiceImpl implements PolicyService { .transform( policyRule -> { var policyRuleBasic = policyRule.getPolicyRuleType().getPolicyRuleBasic(); String policyId = policyRuleBasic.getPolicyRuleId(); policyRule .getPolicyRuleType() Loading @@ -443,6 +458,9 @@ public class PolicyServiceImpl implements PolicyService { "DeletePolicy with id: " + VALID_MESSAGE, policyRuleBasic.getPolicyRuleId())); contextService.removePolicyRule(policyId).subscribe().with(x -> {}); subscriptionList.get(policyId).cancel(); return policyRuleBasic.getPolicyRuleState(); }); } Loading Loading @@ -485,8 +503,8 @@ public class PolicyServiceImpl implements PolicyService { return alarmDescriptorList; } private void monitorAlarmResponseForService(Multi<AlarmResponse> multi) { multi private Cancellable monitorAlarmResponseForService(Multi<AlarmResponse> multi) { return multi .subscribe() .with( alarmResponse -> { Loading Loading @@ -725,7 +743,7 @@ public class PolicyServiceImpl implements PolicyService { // TODO: Temp fix for AlarmDescriptor object AlarmDescriptor alarmDescriptor = new AlarmDescriptor( "alarmId-" + gen(), "", "alarmDescription", "alarmName-" + gen(), policyRuleCondition.getKpiId(), Loading Loading @@ -770,12 +788,7 @@ public class PolicyServiceImpl implements PolicyService { final var kpiValueRange = convertPolicyRuleConditionToKpiValueRange(policyRuleCondition); return new AlarmDescriptor( "alarmId-" + gen(), "alarmDescription", "alarmName-" + gen(), kpiId, kpiValueRange, getTimeStamp()); "", "alarmDescription", "alarmName-" + gen(), kpiId, kpiValueRange, getTimeStamp()); } private AlarmDescriptor createAlarmDescriptorWithRange( Loading @@ -800,7 +813,7 @@ public class PolicyServiceImpl implements PolicyService { } return new AlarmDescriptor( "alarmId-" + gen(), "", "alarmDescription", "alarmName-" + gen(), kpiId, Loading Loading @@ -899,7 +912,7 @@ public class PolicyServiceImpl implements PolicyService { final var policyRuleTypeService = new PolicyRuleTypeService(policyRuleService); final var policyRule = new PolicyRule(policyRuleTypeService); contextService.setPolicyRule(policyRule); contextService.setPolicyRule(policyRule).subscribe().with(x -> {}); } private void setPolicyRuleDeviceToContext( Loading @@ -912,6 +925,6 @@ public class PolicyServiceImpl implements PolicyService { final var policyRuleTypeService = new PolicyRuleTypeDevice(policyRuleDevice); final var policyRule = new PolicyRule(policyRuleTypeService); contextService.setPolicyRule(policyRule); contextService.setPolicyRule(policyRule).subscribe().with(x -> {}); } }
src/policy/target/generated-sources/grpc/context/ContextOuterClass.java +1414 −565 File changed.Preview size limit exceeded, changes collapsed. Show changes
src/policy/target/kubernetes/kubernetes.yml +6 −6 Original line number Diff line number Diff line Loading @@ -16,8 +16,8 @@ apiVersion: v1 kind: Service metadata: annotations: app.quarkus.io/commit-id: 8d0654b519e90fe0127e7d1419adce25fa3a179d app.quarkus.io/build-timestamp: 2023-07-10 - 09:41:26 +0000 app.quarkus.io/commit-id: 447bcf0c8224e0b15715f54d82a0936dd93f5542 app.quarkus.io/build-timestamp: 2023-11-07 - 12:10:37 +0000 prometheus.io/scrape: "true" prometheus.io/path: /q/metrics prometheus.io/port: "8080" Loading @@ -42,8 +42,8 @@ apiVersion: apps/v1 kind: Deployment metadata: annotations: app.quarkus.io/commit-id: 8d0654b519e90fe0127e7d1419adce25fa3a179d app.quarkus.io/build-timestamp: 2023-07-10 - 09:41:26 +0000 app.quarkus.io/commit-id: 447bcf0c8224e0b15715f54d82a0936dd93f5542 app.quarkus.io/build-timestamp: 2023-11-07 - 12:10:37 +0000 prometheus.io/scrape: "true" prometheus.io/path: /q/metrics prometheus.io/port: "8080" Loading @@ -60,8 +60,8 @@ spec: template: metadata: annotations: app.quarkus.io/commit-id: 8d0654b519e90fe0127e7d1419adce25fa3a179d app.quarkus.io/build-timestamp: 2023-07-10 - 09:41:26 +0000 app.quarkus.io/commit-id: 447bcf0c8224e0b15715f54d82a0936dd93f5542 app.quarkus.io/build-timestamp: 2023-11-07 - 12:10:37 +0000 prometheus.io/scrape: "true" prometheus.io/path: /q/metrics prometheus.io/port: "8080" Loading