Loading src/monitoring/service/MetricsDBTools.py +29 −30 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import time, math import time from random import random from questdb.ingress import Sender, IngressError Loading Loading @@ -264,69 +264,68 @@ class MetricsDB(): for kpi in kpi_list: alarm = False kpi_value = kpi[2] kpiMinIsNone = ((kpiMinValue is None) or math.isnan(kpiMinValue)) kpiMaxIsNone = ((kpiMaxValue is None) or math.isnan(kpiMaxValue)) LOGGER.info(kpiMinIsNone) LOGGER.info(kpiMaxIsNone) if (kpiMinValue == kpi_value and kpiMaxValue == kpi_value and inRange): alarm = True elif (inRange and not kpiMinIsNone and not kpiMaxIsNone and includeMinValue and includeMaxValue): elif ( inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue): if (kpi_value >= kpiMinValue and kpi_value <= kpiMaxValue): alarm = True elif (inRange and not kpiMinIsNone and not kpiMaxIsNone and includeMinValue and not includeMaxValue): elif ( inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): if (kpi_value >= kpiMinValue and kpi_value < kpiMaxValue): alarm = True elif (inRange and not kpiMinIsNone and not kpiMaxIsNone and not includeMinValue and includeMaxValue): elif ( inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): if (kpi_value > kpiMinValue and kpi_value <= kpiMaxValue): alarm = True elif (inRange and not kpiMinIsNone and not kpiMaxIsNone and not includeMinValue and not includeMaxValue): elif ( inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): if (kpi_value > kpiMinValue and kpi_value < kpiMaxValue): alarm = True elif (not inRange and not kpiMinIsNone and not kpiMaxIsNone and includeMinValue and includeMaxValue): elif ( not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue): if (kpi_value <= kpiMinValue or kpi_value >= kpiMaxValue): alarm = True elif (not inRange and not kpiMinIsNone and not kpiMaxIsNone and includeMinValue and not includeMaxValue): elif ( not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): if (kpi_value <= kpiMinValue or kpi_value > kpiMaxValue): alarm = True elif (not inRange and not kpiMinIsNone and not kpiMaxIsNone and not includeMinValue and includeMaxValue): elif ( not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): if (kpi_value < kpiMinValue or kpi_value >= kpiMaxValue): alarm = True elif (not inRange and not kpiMinIsNone and not kpiMaxIsNone and not includeMinValue and not includeMaxValue): elif ( not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): if (kpi_value < kpiMinValue or kpi_value > kpiMaxValue): alarm = True elif (inRange and not kpiMinIsNone and kpiMaxIsNone and includeMinValue): elif (inRange and kpiMinValue is not None and kpiMaxValue is None and includeMinValue): if (kpi_value >= kpiMinValue): alarm = True elif (inRange and not kpiMinIsNone and kpiMaxIsNone and not includeMinValue): elif (inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): if (kpi_value > kpiMinValue): alarm = True elif (not inRange and not kpiMinIsNone and kpiMaxIsNone and includeMinValue): elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): if (kpi_value <= kpiMinValue): alarm = True elif (not inRange and not kpiMinIsNone and kpiMaxIsNone and not includeMinValue): if (kpi_value < kpiMinValue): elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): if (kpi_value <= kpiMinValue): alarm = True elif (inRange and kpiMinIsNone and not kpiMaxIsNone and includeMaxValue): elif (inRange and kpiMinValue is None and kpiMaxValue is not None and includeMaxValue): if (kpi_value <= kpiMaxValue): alarm = True elif (inRange and kpiMinIsNone and not kpiMaxIsNone and not includeMaxValue): elif (inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): if (kpi_value < kpiMaxValue): alarm = True elif (not inRange and kpiMinIsNone and not kpiMaxIsNone and includeMaxValue): elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): if (kpi_value >= kpiMaxValue): alarm = True elif (not inRange and kpiMinIsNone and not kpiMaxIsNone and not includeMaxValue): if (kpi_value > kpiMaxValue): elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): if (kpi_value >= kpiMaxValue): alarm = True if alarm: valid_kpi_list.append(kpi) if valid_kpi_list: alarm_queue.put_nowait(valid_kpi_list) LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") else: LOGGER.debug(f"No new alarms triggered for the alarm of KPI {kpi_id}") else: LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") except (Exception) as e: Loading src/monitoring/service/MonitoringServiceServicerImpl.py +2 −8 Original line number Diff line number Diff line Loading @@ -407,19 +407,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): alarm_description = request.alarm_description alarm_name = request.name kpi_id = request.kpi_id.kpi_id.uuid kpi_min_value = float(request.kpi_value_range.kpiMinValue.floatVal) kpi_max_value = float(request.kpi_value_range.kpiMaxValue.floatVal) kpi_min_value = request.kpi_value_range.kpiMinValue.floatVal kpi_max_value = request.kpi_value_range.kpiMaxValue.floatVal in_range = request.kpi_value_range.inRange include_min_value = request.kpi_value_range.includeMinValue include_max_value = request.kpi_value_range.includeMaxValue timestamp = request.timestamp.timestamp LOGGER.info(f"kpi_min_value: {kpi_min_value}") LOGGER.info(f"kpi_max_value: {kpi_max_value}") LOGGER.info(f"in_range: {in_range}") LOGGER.info(f"include_min_value: {include_min_value}") LOGGER.info(f"include_max_value: {include_max_value}") LOGGER.debug(f"request.AlarmID: {request.alarm_id.alarm_id.uuid}") if request.alarm_id.alarm_id.uuid != "": Loading src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java +80 −48 Original line number Diff line number Diff line Loading @@ -206,7 +206,7 @@ public class PolicyServiceImpl implements PolicyService { LOGGER.infof("Passed 6th"); // SetKpiAlarms and then create the Alarm subscription list based on the returned id. // Create an alarmIds list that contains the promised ids returned from setKpiAlarm List<Uni<String>> alarmIds = new ArrayList<Uni<String>>(); for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { LOGGER.infof("alarmDescriptor:"); Loading @@ -217,7 +217,31 @@ public class PolicyServiceImpl implements PolicyService { LOGGER.infof("Passed 7th"); LOGGER.infof("%s", alarmIds); final var multi = setAlarmResponseStream(policyRule, alarmIds, true); // Transform the alarmIds into promised alarms returned from the getAlarmResponseStream List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); for (Uni<String> alarmId : alarmIds) { alarmResponseStreamList.add( alarmId.onItem().transformToMulti(id -> { alarmPolicyRuleServiceMap.put(id, policyRuleService); var alarmSubscription = new AlarmSubscription(id, 60, 500); LOGGER.infof("Creating Alarm Subscription with id: %s", id); return monitoringService.getAlarmResponseStream(alarmSubscription); }) ); } // Merge the promised alarms into one stream (Multi Object) final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); //multi // .subscribe() // .with( // x -> { // LOGGER.info(x); // } // ); monitorAlarmResponseForService(multi); LOGGER.infof("Passed 8th"); //evaluateAction(policyRule, alarmDescriptorList, multi); Loading Loading @@ -432,9 +456,10 @@ public class PolicyServiceImpl implements PolicyService { if (alarmDescriptorList.isEmpty()) { return List.of(); } for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { alarmPolicyRuleServiceMap.put(alarmDescriptor.getAlarmId(), policyRuleService); } // Moved in subscription, as we need the returned value //for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { // alarmPolicyRuleServiceMap.put(alarmDescriptor.getAlarmId(), policyRuleService); //} } else { final var policyRuleDevice = (PolicyRuleDevice) policyRuleTypeSpecificType; final var policyRuleBasic = policyRuleDevice.getPolicyRuleBasic(); Loading @@ -451,41 +476,42 @@ public class PolicyServiceImpl implements PolicyService { return alarmDescriptorList; } private Multi<AlarmResponse> setAlarmResponseStream( PolicyRule policyRule, List<Uni<String>> alarmIds, Boolean isService) { LOGGER.infof("Just entered setAlarmResponseStream"); List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); for (Uni<String> alarmId : alarmIds) { alarmId .subscribe() .with( id -> { var alarmSubscription = new AlarmSubscription(id, 60, 500); LOGGER.infof("Creating Alarm Subscription with id: %s", id); //alarmResponseStreamList.add(monitoringService.getAlarmResponseStream(alarmSubscription)); monitoringService.getAlarmResponseStream(alarmSubscription).subscribe().with(x -> {LOGGER.info(x);}); LOGGER.infof("Created Alarm Subscription with id: %s", id); }); } LOGGER.info("*****************************************************************************"); for (Multi<AlarmResponse> a: alarmResponseStreamList) { LOGGER.infof("Begin subscribing to the multi object"); a .subscribe() .with( b -> { LOGGER.infof("b"); } ); } return Multi.createBy().merging().streams(alarmResponseStreamList); } // Deprecated // private Multi<AlarmResponse> setAlarmResponseStream( // PolicyRule policyRule, // List<Uni<String>> alarmIds, // Boolean isService) { // // LOGGER.infof("Just entered setAlarmResponseStream"); // // List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); // for (Uni<String> alarmId : alarmIds) { // alarmId // .subscribe() // .with( // id -> { // var alarmSubscription = new AlarmSubscription(id, 60, 500); // LOGGER.infof("Creating Alarm Subscription with id: %s", id); // //alarmResponseStreamList.add(monitoringService.getAlarmResponseStream(alarmSubscription)); // monitoringService.getAlarmResponseStream(alarmSubscription).subscribe().with(x -> {LOGGER.info(x);}); // LOGGER.infof("Created Alarm Subscription with id: %s", id); // }); // } // LOGGER.info("*****************************************************************************"); // // for (Multi<AlarmResponse> a: alarmResponseStreamList) { // LOGGER.infof("Begin subscribing to the multi object"); // a // .subscribe() // .with( // b -> { // LOGGER.infof("b"); // } // ); // } // // return Multi.createBy().merging().streams(alarmResponseStreamList); // } private void monitorAlarmResponseForService(Multi<AlarmResponse> multi) { LOGGER.infof("Just entered monitorAlarmResponseForService"); Loading @@ -495,10 +521,10 @@ public class PolicyServiceImpl implements PolicyService { alarmResponse -> { LOGGER.infof("alarmResponse:"); LOGGER.info(alarmResponse); if (!alarmPolicyRuleServiceMap.containsKey(alarmResponse.getAlarmId())) { return; } LOGGER.info(alarmResponse.getAlarmId()); if (alarmPolicyRuleServiceMap.containsKey(alarmResponse.getAlarmId())) { applyActionService(alarmResponse.getAlarmId()); } }); } Loading Loading @@ -692,8 +718,14 @@ public class PolicyServiceImpl implements PolicyService { } private void applyActionService(String alarmId) { LOGGER.info("Inside applyActionService"); PolicyRuleService policyRuleService = alarmPolicyRuleServiceMap.get(alarmId); PolicyRuleAction policyRuleAction = policyRuleActionMap.get(alarmId); LOGGER.info(policyRuleService); PolicyRuleAction policyRuleAction = policyRuleService.getPolicyRuleBasic().getPolicyRuleActions().get(0); LOGGER.info(policyRuleAction); PolicyRuleAction test = policyRuleActionMap.get(alarmId); LOGGER.info(test); setPolicyRuleServiceToContext(policyRuleService, ACTIVE_POLICYRULE_STATE); Loading Loading @@ -829,16 +861,16 @@ public class PolicyServiceImpl implements PolicyService { false); case POLICY_RULE_CONDITION_NUMERICAL_GREATER_THAN: return new KpiValueRange(policyRuleCondition.getKpiValue(), null, true, false, false); return new KpiValueRange(null, policyRuleCondition.getKpiValue(), true, false, false); case POLICY_RULE_CONDITION_NUMERICAL_GREATER_THAN_EQUAL: return new KpiValueRange(policyRuleCondition.getKpiValue(), null, true, true, false); return new KpiValueRange(null, policyRuleCondition.getKpiValue(), true, true, false); case POLICY_RULE_CONDITION_NUMERICAL_LESS_THAN: return new KpiValueRange(null, policyRuleCondition.getKpiValue(), true, false, false); return new KpiValueRange(policyRuleCondition.getKpiValue(), null, true, false, false); case POLICY_RULE_CONDITION_NUMERICAL_LESS_THAN_EQUAL: return new KpiValueRange(null, policyRuleCondition.getKpiValue(), true, false, true); return new KpiValueRange(policyRuleCondition.getKpiValue(), null, true, false, true); default: return null; } Loading src/policy/target/kubernetes/kubernetes.yml +14 −14 Original line number Diff line number Diff line Loading @@ -3,20 +3,20 @@ apiVersion: v1 kind: Service metadata: annotations: app.quarkus.io/commit-id: e31aec70f91db11fc3b56083861409c505ed1bff app.quarkus.io/build-timestamp: 2023-04-24 - 10:48:40 +0000 app.quarkus.io/commit-id: 9a700e35a479619e999e223cae9ba67482aa367e app.quarkus.io/build-timestamp: 2023-04-28 - 08:21:07 +0000 labels: app.kubernetes.io/name: policyservice app: policyservice name: policyservice spec: ports: - name: grpc-server port: 6060 targetPort: 6060 - name: http port: 8080 targetPort: 8080 - name: grpc-server port: 6060 targetPort: 6060 - name: grpc port: 6060 targetPort: 6060 Loading @@ -28,8 +28,8 @@ apiVersion: apps/v1 kind: Deployment metadata: annotations: app.quarkus.io/commit-id: e31aec70f91db11fc3b56083861409c505ed1bff app.quarkus.io/build-timestamp: 2023-04-24 - 10:48:40 +0000 app.quarkus.io/commit-id: 9a700e35a479619e999e223cae9ba67482aa367e app.quarkus.io/build-timestamp: 2023-04-28 - 08:21:07 +0000 labels: app: policyservice app.kubernetes.io/name: policyservice Loading @@ -42,8 +42,8 @@ spec: template: metadata: annotations: app.quarkus.io/commit-id: e31aec70f91db11fc3b56083861409c505ed1bff app.quarkus.io/build-timestamp: 2023-04-24 - 10:48:40 +0000 app.quarkus.io/commit-id: 9a700e35a479619e999e223cae9ba67482aa367e app.quarkus.io/build-timestamp: 2023-04-28 - 08:21:07 +0000 labels: app: policyservice app.kubernetes.io/name: policyservice Loading @@ -54,10 +54,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace - name: CONTEXT_SERVICE_HOST value: contextservice - name: SERVICE_SERVICE_HOST value: serviceservice - name: CONTEXT_SERVICE_HOST value: contextservice - name: MONITORING_SERVICE_HOST value: monitoringservice image: registry.gitlab.com/teraflow-h2020/controller/policy:0.1.0 Loading @@ -74,12 +74,12 @@ spec: timeoutSeconds: 10 name: policyservice ports: - containerPort: 6060 name: grpc-server protocol: TCP - containerPort: 8080 name: http protocol: TCP - containerPort: 6060 name: grpc-server protocol: TCP - containerPort: 6060 name: grpc protocol: TCP Loading Loading
src/monitoring/service/MetricsDBTools.py +29 −30 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import time, math import time from random import random from questdb.ingress import Sender, IngressError Loading Loading @@ -264,69 +264,68 @@ class MetricsDB(): for kpi in kpi_list: alarm = False kpi_value = kpi[2] kpiMinIsNone = ((kpiMinValue is None) or math.isnan(kpiMinValue)) kpiMaxIsNone = ((kpiMaxValue is None) or math.isnan(kpiMaxValue)) LOGGER.info(kpiMinIsNone) LOGGER.info(kpiMaxIsNone) if (kpiMinValue == kpi_value and kpiMaxValue == kpi_value and inRange): alarm = True elif (inRange and not kpiMinIsNone and not kpiMaxIsNone and includeMinValue and includeMaxValue): elif ( inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue): if (kpi_value >= kpiMinValue and kpi_value <= kpiMaxValue): alarm = True elif (inRange and not kpiMinIsNone and not kpiMaxIsNone and includeMinValue and not includeMaxValue): elif ( inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): if (kpi_value >= kpiMinValue and kpi_value < kpiMaxValue): alarm = True elif (inRange and not kpiMinIsNone and not kpiMaxIsNone and not includeMinValue and includeMaxValue): elif ( inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): if (kpi_value > kpiMinValue and kpi_value <= kpiMaxValue): alarm = True elif (inRange and not kpiMinIsNone and not kpiMaxIsNone and not includeMinValue and not includeMaxValue): elif ( inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): if (kpi_value > kpiMinValue and kpi_value < kpiMaxValue): alarm = True elif (not inRange and not kpiMinIsNone and not kpiMaxIsNone and includeMinValue and includeMaxValue): elif ( not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and includeMaxValue): if (kpi_value <= kpiMinValue or kpi_value >= kpiMaxValue): alarm = True elif (not inRange and not kpiMinIsNone and not kpiMaxIsNone and includeMinValue and not includeMaxValue): elif ( not inRange and kpiMinValue is not None and kpiMaxValue is not None and includeMinValue and not includeMaxValue): if (kpi_value <= kpiMinValue or kpi_value > kpiMaxValue): alarm = True elif (not inRange and not kpiMinIsNone and not kpiMaxIsNone and not includeMinValue and includeMaxValue): elif ( not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and includeMaxValue): if (kpi_value < kpiMinValue or kpi_value >= kpiMaxValue): alarm = True elif (not inRange and not kpiMinIsNone and not kpiMaxIsNone and not includeMinValue and not includeMaxValue): elif ( not inRange and kpiMinValue is not None and kpiMaxValue is not None and not includeMinValue and not includeMaxValue): if (kpi_value < kpiMinValue or kpi_value > kpiMaxValue): alarm = True elif (inRange and not kpiMinIsNone and kpiMaxIsNone and includeMinValue): elif (inRange and kpiMinValue is not None and kpiMaxValue is None and includeMinValue): if (kpi_value >= kpiMinValue): alarm = True elif (inRange and not kpiMinIsNone and kpiMaxIsNone and not includeMinValue): elif (inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): if (kpi_value > kpiMinValue): alarm = True elif (not inRange and not kpiMinIsNone and kpiMaxIsNone and includeMinValue): elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): if (kpi_value <= kpiMinValue): alarm = True elif (not inRange and not kpiMinIsNone and kpiMaxIsNone and not includeMinValue): if (kpi_value < kpiMinValue): elif (not inRange and kpiMinValue is not None and kpiMaxValue is None and not includeMinValue): if (kpi_value <= kpiMinValue): alarm = True elif (inRange and kpiMinIsNone and not kpiMaxIsNone and includeMaxValue): elif (inRange and kpiMinValue is None and kpiMaxValue is not None and includeMaxValue): if (kpi_value <= kpiMaxValue): alarm = True elif (inRange and kpiMinIsNone and not kpiMaxIsNone and not includeMaxValue): elif (inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): if (kpi_value < kpiMaxValue): alarm = True elif (not inRange and kpiMinIsNone and not kpiMaxIsNone and includeMaxValue): elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): if (kpi_value >= kpiMaxValue): alarm = True elif (not inRange and kpiMinIsNone and not kpiMaxIsNone and not includeMaxValue): if (kpi_value > kpiMaxValue): elif (not inRange and kpiMinValue is None and kpiMaxValue is not None and not includeMaxValue): if (kpi_value >= kpiMaxValue): alarm = True if alarm: valid_kpi_list.append(kpi) if valid_kpi_list: alarm_queue.put_nowait(valid_kpi_list) LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") else: LOGGER.debug(f"No new alarms triggered for the alarm of KPI {kpi_id}") else: LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") except (Exception) as e: Loading
src/monitoring/service/MonitoringServiceServicerImpl.py +2 −8 Original line number Diff line number Diff line Loading @@ -407,19 +407,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): alarm_description = request.alarm_description alarm_name = request.name kpi_id = request.kpi_id.kpi_id.uuid kpi_min_value = float(request.kpi_value_range.kpiMinValue.floatVal) kpi_max_value = float(request.kpi_value_range.kpiMaxValue.floatVal) kpi_min_value = request.kpi_value_range.kpiMinValue.floatVal kpi_max_value = request.kpi_value_range.kpiMaxValue.floatVal in_range = request.kpi_value_range.inRange include_min_value = request.kpi_value_range.includeMinValue include_max_value = request.kpi_value_range.includeMaxValue timestamp = request.timestamp.timestamp LOGGER.info(f"kpi_min_value: {kpi_min_value}") LOGGER.info(f"kpi_max_value: {kpi_max_value}") LOGGER.info(f"in_range: {in_range}") LOGGER.info(f"include_min_value: {include_min_value}") LOGGER.info(f"include_max_value: {include_max_value}") LOGGER.debug(f"request.AlarmID: {request.alarm_id.alarm_id.uuid}") if request.alarm_id.alarm_id.uuid != "": Loading
src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java +80 −48 Original line number Diff line number Diff line Loading @@ -206,7 +206,7 @@ public class PolicyServiceImpl implements PolicyService { LOGGER.infof("Passed 6th"); // SetKpiAlarms and then create the Alarm subscription list based on the returned id. // Create an alarmIds list that contains the promised ids returned from setKpiAlarm List<Uni<String>> alarmIds = new ArrayList<Uni<String>>(); for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { LOGGER.infof("alarmDescriptor:"); Loading @@ -217,7 +217,31 @@ public class PolicyServiceImpl implements PolicyService { LOGGER.infof("Passed 7th"); LOGGER.infof("%s", alarmIds); final var multi = setAlarmResponseStream(policyRule, alarmIds, true); // Transform the alarmIds into promised alarms returned from the getAlarmResponseStream List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); for (Uni<String> alarmId : alarmIds) { alarmResponseStreamList.add( alarmId.onItem().transformToMulti(id -> { alarmPolicyRuleServiceMap.put(id, policyRuleService); var alarmSubscription = new AlarmSubscription(id, 60, 500); LOGGER.infof("Creating Alarm Subscription with id: %s", id); return monitoringService.getAlarmResponseStream(alarmSubscription); }) ); } // Merge the promised alarms into one stream (Multi Object) final var multi = Multi.createBy().merging().streams(alarmResponseStreamList); //multi // .subscribe() // .with( // x -> { // LOGGER.info(x); // } // ); monitorAlarmResponseForService(multi); LOGGER.infof("Passed 8th"); //evaluateAction(policyRule, alarmDescriptorList, multi); Loading Loading @@ -432,9 +456,10 @@ public class PolicyServiceImpl implements PolicyService { if (alarmDescriptorList.isEmpty()) { return List.of(); } for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { alarmPolicyRuleServiceMap.put(alarmDescriptor.getAlarmId(), policyRuleService); } // Moved in subscription, as we need the returned value //for (AlarmDescriptor alarmDescriptor : alarmDescriptorList) { // alarmPolicyRuleServiceMap.put(alarmDescriptor.getAlarmId(), policyRuleService); //} } else { final var policyRuleDevice = (PolicyRuleDevice) policyRuleTypeSpecificType; final var policyRuleBasic = policyRuleDevice.getPolicyRuleBasic(); Loading @@ -451,41 +476,42 @@ public class PolicyServiceImpl implements PolicyService { return alarmDescriptorList; } private Multi<AlarmResponse> setAlarmResponseStream( PolicyRule policyRule, List<Uni<String>> alarmIds, Boolean isService) { LOGGER.infof("Just entered setAlarmResponseStream"); List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); for (Uni<String> alarmId : alarmIds) { alarmId .subscribe() .with( id -> { var alarmSubscription = new AlarmSubscription(id, 60, 500); LOGGER.infof("Creating Alarm Subscription with id: %s", id); //alarmResponseStreamList.add(monitoringService.getAlarmResponseStream(alarmSubscription)); monitoringService.getAlarmResponseStream(alarmSubscription).subscribe().with(x -> {LOGGER.info(x);}); LOGGER.infof("Created Alarm Subscription with id: %s", id); }); } LOGGER.info("*****************************************************************************"); for (Multi<AlarmResponse> a: alarmResponseStreamList) { LOGGER.infof("Begin subscribing to the multi object"); a .subscribe() .with( b -> { LOGGER.infof("b"); } ); } return Multi.createBy().merging().streams(alarmResponseStreamList); } // Deprecated // private Multi<AlarmResponse> setAlarmResponseStream( // PolicyRule policyRule, // List<Uni<String>> alarmIds, // Boolean isService) { // // LOGGER.infof("Just entered setAlarmResponseStream"); // // List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>(); // for (Uni<String> alarmId : alarmIds) { // alarmId // .subscribe() // .with( // id -> { // var alarmSubscription = new AlarmSubscription(id, 60, 500); // LOGGER.infof("Creating Alarm Subscription with id: %s", id); // //alarmResponseStreamList.add(monitoringService.getAlarmResponseStream(alarmSubscription)); // monitoringService.getAlarmResponseStream(alarmSubscription).subscribe().with(x -> {LOGGER.info(x);}); // LOGGER.infof("Created Alarm Subscription with id: %s", id); // }); // } // LOGGER.info("*****************************************************************************"); // // for (Multi<AlarmResponse> a: alarmResponseStreamList) { // LOGGER.infof("Begin subscribing to the multi object"); // a // .subscribe() // .with( // b -> { // LOGGER.infof("b"); // } // ); // } // // return Multi.createBy().merging().streams(alarmResponseStreamList); // } private void monitorAlarmResponseForService(Multi<AlarmResponse> multi) { LOGGER.infof("Just entered monitorAlarmResponseForService"); Loading @@ -495,10 +521,10 @@ public class PolicyServiceImpl implements PolicyService { alarmResponse -> { LOGGER.infof("alarmResponse:"); LOGGER.info(alarmResponse); if (!alarmPolicyRuleServiceMap.containsKey(alarmResponse.getAlarmId())) { return; } LOGGER.info(alarmResponse.getAlarmId()); if (alarmPolicyRuleServiceMap.containsKey(alarmResponse.getAlarmId())) { applyActionService(alarmResponse.getAlarmId()); } }); } Loading Loading @@ -692,8 +718,14 @@ public class PolicyServiceImpl implements PolicyService { } private void applyActionService(String alarmId) { LOGGER.info("Inside applyActionService"); PolicyRuleService policyRuleService = alarmPolicyRuleServiceMap.get(alarmId); PolicyRuleAction policyRuleAction = policyRuleActionMap.get(alarmId); LOGGER.info(policyRuleService); PolicyRuleAction policyRuleAction = policyRuleService.getPolicyRuleBasic().getPolicyRuleActions().get(0); LOGGER.info(policyRuleAction); PolicyRuleAction test = policyRuleActionMap.get(alarmId); LOGGER.info(test); setPolicyRuleServiceToContext(policyRuleService, ACTIVE_POLICYRULE_STATE); Loading Loading @@ -829,16 +861,16 @@ public class PolicyServiceImpl implements PolicyService { false); case POLICY_RULE_CONDITION_NUMERICAL_GREATER_THAN: return new KpiValueRange(policyRuleCondition.getKpiValue(), null, true, false, false); return new KpiValueRange(null, policyRuleCondition.getKpiValue(), true, false, false); case POLICY_RULE_CONDITION_NUMERICAL_GREATER_THAN_EQUAL: return new KpiValueRange(policyRuleCondition.getKpiValue(), null, true, true, false); return new KpiValueRange(null, policyRuleCondition.getKpiValue(), true, true, false); case POLICY_RULE_CONDITION_NUMERICAL_LESS_THAN: return new KpiValueRange(null, policyRuleCondition.getKpiValue(), true, false, false); return new KpiValueRange(policyRuleCondition.getKpiValue(), null, true, false, false); case POLICY_RULE_CONDITION_NUMERICAL_LESS_THAN_EQUAL: return new KpiValueRange(null, policyRuleCondition.getKpiValue(), true, false, true); return new KpiValueRange(policyRuleCondition.getKpiValue(), null, true, false, true); default: return null; } Loading
src/policy/target/kubernetes/kubernetes.yml +14 −14 Original line number Diff line number Diff line Loading @@ -3,20 +3,20 @@ apiVersion: v1 kind: Service metadata: annotations: app.quarkus.io/commit-id: e31aec70f91db11fc3b56083861409c505ed1bff app.quarkus.io/build-timestamp: 2023-04-24 - 10:48:40 +0000 app.quarkus.io/commit-id: 9a700e35a479619e999e223cae9ba67482aa367e app.quarkus.io/build-timestamp: 2023-04-28 - 08:21:07 +0000 labels: app.kubernetes.io/name: policyservice app: policyservice name: policyservice spec: ports: - name: grpc-server port: 6060 targetPort: 6060 - name: http port: 8080 targetPort: 8080 - name: grpc-server port: 6060 targetPort: 6060 - name: grpc port: 6060 targetPort: 6060 Loading @@ -28,8 +28,8 @@ apiVersion: apps/v1 kind: Deployment metadata: annotations: app.quarkus.io/commit-id: e31aec70f91db11fc3b56083861409c505ed1bff app.quarkus.io/build-timestamp: 2023-04-24 - 10:48:40 +0000 app.quarkus.io/commit-id: 9a700e35a479619e999e223cae9ba67482aa367e app.quarkus.io/build-timestamp: 2023-04-28 - 08:21:07 +0000 labels: app: policyservice app.kubernetes.io/name: policyservice Loading @@ -42,8 +42,8 @@ spec: template: metadata: annotations: app.quarkus.io/commit-id: e31aec70f91db11fc3b56083861409c505ed1bff app.quarkus.io/build-timestamp: 2023-04-24 - 10:48:40 +0000 app.quarkus.io/commit-id: 9a700e35a479619e999e223cae9ba67482aa367e app.quarkus.io/build-timestamp: 2023-04-28 - 08:21:07 +0000 labels: app: policyservice app.kubernetes.io/name: policyservice Loading @@ -54,10 +54,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace - name: CONTEXT_SERVICE_HOST value: contextservice - name: SERVICE_SERVICE_HOST value: serviceservice - name: CONTEXT_SERVICE_HOST value: contextservice - name: MONITORING_SERVICE_HOST value: monitoringservice image: registry.gitlab.com/teraflow-h2020/controller/policy:0.1.0 Loading @@ -74,12 +74,12 @@ spec: timeoutSeconds: 10 name: policyservice ports: - containerPort: 6060 name: grpc-server protocol: TCP - containerPort: 8080 name: http protocol: TCP - containerPort: 6060 name: grpc-server protocol: TCP - containerPort: 6060 name: grpc protocol: TCP Loading