From 15915e3b889dc8b11c0e1c9b7bd0c0ac7ff5791f Mon Sep 17 00:00:00 2001
From: Fotis Soldatos <fsoldatos@ubitech.eu>
Date: Wed, 27 Jul 2022 14:42:58 +0300
Subject: [PATCH] feat(policy): introduce MonitoringService & MonitoringGateway
 interfaces

---
 .../policy/monitoring/MonitoringGateway.java  |  48 +++++++
 .../monitoring/MonitoringGatewayImpl.java     | 136 ++++++++++++++++++
 .../policy/monitoring/MonitoringService.java  |  48 +++++++
 .../monitoring/MonitoringServiceImpl.java     |  85 +++++++++++
 src/policy/target/kubernetes/kubernetes.yml   |  16 +--
 5 files changed, 325 insertions(+), 8 deletions(-)
 create mode 100644 src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringGateway.java
 create mode 100644 src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringGatewayImpl.java
 create mode 100644 src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringService.java
 create mode 100644 src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringServiceImpl.java

diff --git a/src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringGateway.java b/src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringGateway.java
new file mode 100644
index 000000000..4b9849a76
--- /dev/null
+++ b/src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringGateway.java
@@ -0,0 +1,48 @@
+/*
+* Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package eu.teraflow.policy.monitoring;
+
+import eu.teraflow.policy.context.model.Empty;
+import eu.teraflow.policy.monitoring.model.AlarmDescriptor;
+import eu.teraflow.policy.monitoring.model.AlarmResponse;
+import eu.teraflow.policy.monitoring.model.Kpi;
+import eu.teraflow.policy.monitoring.model.KpiDescriptor;
+import eu.teraflow.policy.monitoring.model.SubsDescriptor;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+import java.util.List;
+
+public interface MonitoringGateway {
+
+    Uni<String> createKpi(KpiDescriptor kpiDescriptor);
+
+    Uni<KpiDescriptor> getKpiDescriptor(String kpiId);
+
+    Multi<List<Kpi>> subscribeKpi(SubsDescriptor subsDescriptor);
+
+    Uni<SubsDescriptor> getSubsDescriptor(String subscriptionId);
+
+    Uni<Empty> editKpiSubscription(SubsDescriptor subsDescriptor);
+
+    Uni<String> createKpiAlarm(AlarmDescriptor alarmDescriptor);
+
+    Uni<Empty> editKpiAlarm(AlarmDescriptor alarmDescriptor);
+
+    Uni<AlarmDescriptor> getAlarmDescriptor(String alarmId);
+
+    Multi<AlarmResponse> getAlarmResponseStream(String alarmId);
+}
diff --git a/src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringGatewayImpl.java b/src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringGatewayImpl.java
new file mode 100644
index 000000000..e0b4e088a
--- /dev/null
+++ b/src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringGatewayImpl.java
@@ -0,0 +1,136 @@
+/*
+* Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package eu.teraflow.policy.monitoring;
+
+import eu.teraflow.policy.Serializer;
+import eu.teraflow.policy.context.model.Empty;
+import eu.teraflow.policy.monitoring.model.AlarmDescriptor;
+import eu.teraflow.policy.monitoring.model.AlarmResponse;
+import eu.teraflow.policy.monitoring.model.Kpi;
+import eu.teraflow.policy.monitoring.model.KpiDescriptor;
+import eu.teraflow.policy.monitoring.model.SubsDescriptor;
+import io.quarkus.grpc.GrpcClient;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+import java.util.List;
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+import monitoring.MutinyMonitoringServiceGrpc.MutinyMonitoringServiceStub;
+
+@ApplicationScoped
+public class MonitoringGatewayImpl implements MonitoringGateway {
+
+    @GrpcClient("monitoring")
+    MutinyMonitoringServiceStub streamingDelegateMonitoring;
+
+    private final Serializer serializer;
+
+    @Inject
+    public MonitoringGatewayImpl(Serializer serializer) {
+        this.serializer = serializer;
+    }
+
+    @Override
+    public Uni<String> createKpi(KpiDescriptor kpiDescriptor) {
+        final var serializedKpiDescriptor = serializer.serialize(kpiDescriptor);
+
+        return streamingDelegateMonitoring
+                .createKpi(serializedKpiDescriptor)
+                .onItem()
+                .transform(serializer::deserialize);
+    }
+
+    @Override
+    public Uni<KpiDescriptor> getKpiDescriptor(String kpiId) {
+        final var serializedKpiId = serializer.serializeKpiId(kpiId);
+
+        return streamingDelegateMonitoring
+                .getKpiDescriptor(serializedKpiId)
+                .onItem()
+                .transform(serializer::deserialize);
+    }
+
+    @Override
+    public Multi<List<Kpi>> subscribeKpi(SubsDescriptor subsDescriptor) {
+        final var serializedSubsDescriptor = serializer.serialize(subsDescriptor);
+
+        return streamingDelegateMonitoring
+                .subscribeKpi(serializedSubsDescriptor)
+                .onItem()
+                .transform(kpiList -> serializer.deserialize(kpiList.getKpiListList()));
+    }
+
+    @Override
+    public Uni<SubsDescriptor> getSubsDescriptor(String subscriptionId) {
+        final var serializedSubscriptionId = serializer.serializeSubscriptionIdId(subscriptionId);
+
+        return streamingDelegateMonitoring
+                .getSubsDescriptor(serializedSubscriptionId)
+                .onItem()
+                .transform(serializer::deserialize);
+    }
+
+    @Override
+    public Uni<Empty> editKpiSubscription(SubsDescriptor subsDescriptor) {
+        final var serializedSubsDescriptor = serializer.serialize(subsDescriptor);
+
+        return streamingDelegateMonitoring
+                .editKpiSubscription(serializedSubsDescriptor)
+                .onItem()
+                .transform(serializer::deserializeEmpty);
+    }
+
+    @Override
+    public Uni<String> createKpiAlarm(AlarmDescriptor alarmDescriptor) {
+        final var serializedAlarmDescriptor = serializer.serialize(alarmDescriptor);
+
+        return streamingDelegateMonitoring
+                .createKpiAlarm(serializedAlarmDescriptor)
+                .onItem()
+                .transform(serializer::deserialize);
+    }
+
+    @Override
+    public Uni<Empty> editKpiAlarm(AlarmDescriptor alarmDescriptor) {
+        final var serializedAlarmDescriptor = serializer.serialize(alarmDescriptor);
+
+        return streamingDelegateMonitoring
+                .editKpiAlarm(serializedAlarmDescriptor)
+                .onItem()
+                .transform(serializer::deserializeEmpty);
+    }
+
+    @Override
+    public Uni<AlarmDescriptor> getAlarmDescriptor(String alarmId) {
+        final var serializedAlarmId = serializer.serializeAlarmId(alarmId);
+
+        return streamingDelegateMonitoring
+                .getAlarmDescriptor(serializedAlarmId)
+                .onItem()
+                .transform(serializer::deserialize);
+    }
+
+    @Override
+    public Multi<AlarmResponse> getAlarmResponseStream(String alarmId) {
+        final var serializedAlarmId = serializer.serializeAlarmId(alarmId);
+
+        return streamingDelegateMonitoring
+                .getAlarmResponseStream(serializedAlarmId)
+                .onItem()
+                .transform(serializer::deserialize);
+    }
+}
diff --git a/src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringService.java b/src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringService.java
new file mode 100644
index 000000000..276a9d363
--- /dev/null
+++ b/src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringService.java
@@ -0,0 +1,48 @@
+/*
+* Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package eu.teraflow.policy.monitoring;
+
+import eu.teraflow.policy.context.model.Empty;
+import eu.teraflow.policy.monitoring.model.AlarmDescriptor;
+import eu.teraflow.policy.monitoring.model.AlarmResponse;
+import eu.teraflow.policy.monitoring.model.Kpi;
+import eu.teraflow.policy.monitoring.model.KpiDescriptor;
+import eu.teraflow.policy.monitoring.model.SubsDescriptor;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+import java.util.List;
+
+public interface MonitoringService {
+
+    Uni<String> createKpi(KpiDescriptor kpiDescriptor);
+
+    Uni<KpiDescriptor> getKpiDescriptor(String kpiId);
+
+    Multi<List<Kpi>> subscribeKpi(SubsDescriptor subsDescriptor);
+
+    Uni<SubsDescriptor> getSubsDescriptor(String subscriptionId);
+
+    Uni<Empty> editKpiSubscription(SubsDescriptor subsDescriptor);
+
+    Uni<String> createKpiAlarm(AlarmDescriptor alarmDescriptor);
+
+    Uni<Empty> editKpiAlarm(AlarmDescriptor alarmDescriptor);
+
+    Uni<AlarmDescriptor> getAlarmDescriptor(String alarmId);
+
+    Multi<AlarmResponse> getAlarmResponseStream(String alarmId);
+}
diff --git a/src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringServiceImpl.java b/src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringServiceImpl.java
new file mode 100644
index 000000000..e1e79af75
--- /dev/null
+++ b/src/policy/src/main/java/eu/teraflow/policy/monitoring/MonitoringServiceImpl.java
@@ -0,0 +1,85 @@
+/*
+* Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package eu.teraflow.policy.monitoring;
+
+import eu.teraflow.policy.context.model.Empty;
+import eu.teraflow.policy.monitoring.model.AlarmDescriptor;
+import eu.teraflow.policy.monitoring.model.AlarmResponse;
+import eu.teraflow.policy.monitoring.model.Kpi;
+import eu.teraflow.policy.monitoring.model.KpiDescriptor;
+import eu.teraflow.policy.monitoring.model.SubsDescriptor;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.Uni;
+import java.util.List;
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+
+@ApplicationScoped
+public class MonitoringServiceImpl implements MonitoringService {
+
+    private final MonitoringGateway monitoringGateway;
+
+    @Inject
+    public MonitoringServiceImpl(MonitoringGateway monitoringGateway) {
+        this.monitoringGateway = monitoringGateway;
+    }
+
+    @Override
+    public Uni<String> createKpi(KpiDescriptor kpiDescriptor) {
+        return monitoringGateway.createKpi(kpiDescriptor);
+    }
+
+    @Override
+    public Uni<KpiDescriptor> getKpiDescriptor(String kpiId) {
+        return monitoringGateway.getKpiDescriptor(kpiId);
+    }
+
+    @Override
+    public Multi<List<Kpi>> subscribeKpi(SubsDescriptor subsDescriptor) {
+        return monitoringGateway.subscribeKpi(subsDescriptor);
+    }
+
+    @Override
+    public Uni<SubsDescriptor> getSubsDescriptor(String subscriptionId) {
+        return monitoringGateway.getSubsDescriptor(subscriptionId);
+    }
+
+    @Override
+    public Uni<Empty> editKpiSubscription(SubsDescriptor subsDescriptor) {
+        return monitoringGateway.editKpiSubscription(subsDescriptor);
+    }
+
+    @Override
+    public Uni<String> createKpiAlarm(AlarmDescriptor alarmDescriptor) {
+        return monitoringGateway.createKpiAlarm(alarmDescriptor);
+    }
+
+    @Override
+    public Uni<Empty> editKpiAlarm(AlarmDescriptor alarmDescriptor) {
+        return monitoringGateway.editKpiAlarm(alarmDescriptor);
+    }
+
+    @Override
+    public Uni<AlarmDescriptor> getAlarmDescriptor(String alarmId) {
+        return monitoringGateway.getAlarmDescriptor(alarmId);
+    }
+
+    @Override
+    public Multi<AlarmResponse> getAlarmResponseStream(String alarmId) {
+        return monitoringGateway.getAlarmResponseStream(alarmId);
+    }
+}
diff --git a/src/policy/target/kubernetes/kubernetes.yml b/src/policy/target/kubernetes/kubernetes.yml
index 06068f0f5..273778787 100644
--- a/src/policy/target/kubernetes/kubernetes.yml
+++ b/src/policy/target/kubernetes/kubernetes.yml
@@ -3,8 +3,8 @@ apiVersion: v1
 kind: Service
 metadata:
   annotations:
-    app.quarkus.io/commit-id: 1d77cb00ae8f577885de32f01f4740f865853863
-    app.quarkus.io/build-timestamp: 2022-07-26 - 10:46:55 +0000
+    app.quarkus.io/commit-id: 57e16fed85037f2415bac3b1a55997ac4967fd99
+    app.quarkus.io/build-timestamp: 2022-07-27 - 11:40:31 +0000
   labels:
     app.kubernetes.io/name: policyservice
     app: policyservice
@@ -25,8 +25,8 @@ apiVersion: apps/v1
 kind: Deployment
 metadata:
   annotations:
-    app.quarkus.io/commit-id: 1d77cb00ae8f577885de32f01f4740f865853863
-    app.quarkus.io/build-timestamp: 2022-07-26 - 10:46:55 +0000
+    app.quarkus.io/commit-id: 57e16fed85037f2415bac3b1a55997ac4967fd99
+    app.quarkus.io/build-timestamp: 2022-07-27 - 11:40:31 +0000
   labels:
     app: policyservice
     app.kubernetes.io/name: policyservice
@@ -39,8 +39,8 @@ spec:
   template:
     metadata:
       annotations:
-        app.quarkus.io/commit-id: 1d77cb00ae8f577885de32f01f4740f865853863
-        app.quarkus.io/build-timestamp: 2022-07-26 - 10:46:55 +0000
+        app.quarkus.io/commit-id: 57e16fed85037f2415bac3b1a55997ac4967fd99
+        app.quarkus.io/build-timestamp: 2022-07-27 - 11:40:31 +0000
       labels:
         app: policyservice
         app.kubernetes.io/name: policyservice
@@ -51,12 +51,12 @@ spec:
               valueFrom:
                 fieldRef:
                   fieldPath: metadata.namespace
-            - name: SERVICE_SERVICE_HOST
-              value: serviceservice
             - name: MONITORING_SERVICE_HOST
               value: monitoringservice
             - name: CONTEXT_SERVICE_HOST
               value: contextservice
+            - name: SERVICE_SERVICE_HOST
+              value: serviceservice
           image: registry.gitlab.com/teraflow-h2020/controller/policy:0.1.0
           imagePullPolicy: Always
           livenessProbe:
-- 
GitLab