From bb8344033c6afaaec1bbce62dbea7d04f130d8e4 Mon Sep 17 00:00:00 2001
From: kpoulakakis <kpoulakakis@ubitech.eu>
Date: Fri, 9 Feb 2024 15:47:12 +0200
Subject: [PATCH] refactor: refactor add policy device two add the policyId to
 the subscription list.

---
 .../etsi/tfs/policy/PolicyServiceImpl.java    | 29 ++++++++++++++-----
 1 file changed, 21 insertions(+), 8 deletions(-)

diff --git a/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java b/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java
index 716949e65..e276694a7 100644
--- a/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java
+++ b/src/policy/src/main/java/org/etsi/tfs/policy/PolicyServiceImpl.java
@@ -322,10 +322,10 @@ public class PolicyServiceImpl implements PolicyService {
 
         return areDevicesValid
                 .onItem()
-                .transform(areDevices -> addDeviceOnContext(areDevices, policyRuleDevice, policyRuleBasic));
+                .transform(areDevices -> areDeviceOnContext(areDevices, policyRuleDevice, policyRuleBasic));
     }
 
-    private PolicyRuleState addDeviceOnContext(
+    private PolicyRuleState areDeviceOnContext(
             List<Boolean> areDevices,
             PolicyRuleDevice policyRuleDevice,
             PolicyRuleBasic policyRuleBasic) {
@@ -353,7 +353,22 @@ public class PolicyServiceImpl implements PolicyService {
             return policyRuleState;
         }
 
-        contextService.setPolicyRule(policyRule).subscribe().with(x -> {});
+        contextService
+                .setPolicyRule(policyRule)
+                .subscribe()
+                .with(
+                        policyId -> {
+                            startMonitoringBasedOnAlarmDescriptors(
+                                    policyId, policyRuleDevice, alarmDescriptorList);
+                        });
+
+        return VALIDATED_POLICYRULE_STATE;
+    }
+
+    private void startMonitoringBasedOnAlarmDescriptors(
+            String policyId,
+            PolicyRuleDevice policyRuleDevice,
+            List<AlarmDescriptor> alarmDescriptorList) {
         setPolicyRuleDeviceToContext(policyRuleDevice, VALIDATED_POLICYRULE_STATE);
         noAlarms = 0;
 
@@ -366,14 +381,12 @@ public class PolicyServiceImpl implements PolicyService {
         final var multi = Multi.createBy().merging().streams(alarmResponseStreamList);
         setPolicyRuleDeviceToContext(policyRuleDevice, PROVISIONED_POLICYRULE_STATE);
 
-        monitorAlarmResponseForDevice(multi);
+        subscriptionList.put(policyId, monitorAlarmResponseForDevice(multi));
 
         // TODO: Resubscribe to the stream, if it has ended
 
         // TODO: Redesign evaluation of action
         // evaluateAction(policyRule, alarmDescriptorList, multi);
-
-        return VALIDATED_POLICYRULE_STATE;
     }
 
     private List<Multi<AlarmResponse>> getAlarmResponse(
@@ -569,8 +582,8 @@ public class PolicyServiceImpl implements PolicyService {
                         });
     }
 
-    private void monitorAlarmResponseForDevice(Multi<AlarmResponse> multi) {
-        multi
+    private Cancellable monitorAlarmResponseForDevice(Multi<AlarmResponse> multi) {
+        return multi
                 .subscribe()
                 .with(
                         alarmResponse -> {
-- 
GitLab