From 0f7bede6a4db931b62d86859bbe7a90e57a7e733 Mon Sep 17 00:00:00 2001
From: Carlos Natalino <carlos.natalino@chalmers.se>
Date: Sat, 1 Oct 2022 17:04:00 +0200
Subject: [PATCH] Separating the monitor funcionality into a new module.

---
 src/opticalattackmanager/__init__.py         |  13 ++
 src/opticalattackmanager/service/__init__.py |  13 ++
 src/opticalattackmanager/service/__main__.py | 114 +++--------------
 src/opticalattackmanager/utils/__init__.py   |  13 ++
 src/opticalattackmanager/utils/monitor.py    | 127 +++++++++++++++++++
 5 files changed, 184 insertions(+), 96 deletions(-)
 create mode 100644 src/opticalattackmanager/utils/monitor.py

diff --git a/src/opticalattackmanager/__init__.py b/src/opticalattackmanager/__init__.py
index e69de29bb..9953c8205 100644
--- a/src/opticalattackmanager/__init__.py
+++ b/src/opticalattackmanager/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/opticalattackmanager/service/__init__.py b/src/opticalattackmanager/service/__init__.py
index e69de29bb..9953c8205 100644
--- a/src/opticalattackmanager/service/__init__.py
+++ b/src/opticalattackmanager/service/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/opticalattackmanager/service/__main__.py b/src/opticalattackmanager/service/__main__.py
index 3c907ecc2..c849a206b 100644
--- a/src/opticalattackmanager/service/__main__.py
+++ b/src/opticalattackmanager/service/__main__.py
@@ -1,3 +1,18 @@
+# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
 import asyncio
 import logging
 import signal
@@ -28,112 +43,19 @@ from common.tools.grpc.Tools import grpc_message_to_json_string
 from context.client.ContextClient import ContextClient
 from grpclib.client import Channel
 from monitoring.client.MonitoringClient import MonitoringClient
-from opticalattackmanager.Config import MONITORING_INTERVAL
 from opticalattackmanager.utils.EventsCollector import EventsCollector
-from prometheus_client import start_http_server, Histogram, Counter
+from opticalattackmanager.utils.monitor import monitor_services
+from prometheus_client import Counter, Histogram, start_http_server
 
 terminate = threading.Event()
 LOGGER = None
 
-# Create a metric to track time spent and requests made.
-# TODO: adjust histogram buckets to more realistic values
-LOOP_TIME = Histogram('optical_security_loop_seconds', 'Time taken by each security loop')
-DROP_COUNTER = Counter('optical_security_dropped_assessments', 'Dropped assessments due to detector timeout')
-
 
 def signal_handler(signal, frame):  # pylint: disable=redefined-outer-name
     LOGGER.warning("Terminate signal received")
     terminate.set()
 
 
-async def detect_attack(
-    host: str,
-    port: int,
-    context_id: str,
-    service_id: str,
-    kpi_id: str,
-    timeout: float = 10.0,
-) -> None:
-    try:
-        LOGGER.info("Sending request for {}...".format(service_id))
-        async with Channel(host, port) as channel:
-            stub = OpticalAttackDetectorServiceStub(channel)
-
-            request: DetectionRequest = DetectionRequest()
-            request.service_id.context_id.context_uuid.uuid = context_id
-            request.service_id.service_uuid.uuid = str(service_id)
-
-            request.kpi_id.kpi_id.uuid = kpi_id
-
-            await stub.DetectAttack(request, timeout=timeout)
-        LOGGER.info("Monitoring finished for {}".format(service_id))
-    except Exception as e:
-        LOGGER.warning("Exception while processing service_id {}".format(service_id))
-        LOGGER.exception(e)
-        DROP_COUNTER.inc()
-
-
-async def monitor_services(service_list: List):
-
-    monitoring_interval = int(
-        get_setting("MONITORING_INTERVAL", default=MONITORING_INTERVAL)
-    )
-
-    host = get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_HOST")
-    port = int(get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC"))
-
-    LOGGER.info("Starting execution of the async loop")
-
-    while not terminate.is_set():
-
-        if len(service_list) == 0:
-            LOGGER.debug("No services to monitor...")
-            time.sleep(monitoring_interval)
-            continue
-
-        LOGGER.info("Starting new monitoring cycle...")
-
-        start_time = time.time()
-
-        tasks = []
-        for service in service_list:
-            aw = detect_attack(
-                host,
-                port,
-                service["context"],
-                service["service"],
-                service["kpi"],
-                # allow at most 90% of the monitoring interval to succeed
-                monitoring_interval * 0.9,
-            )
-            tasks.append(aw)
-        [await aw for aw in tasks]
-
-        end_time = time.time()
-
-        time_taken = end_time - start_time
-        LOOP_TIME.observe(time_taken)
-        LOGGER.info(
-            "Monitoring loop with {} services took {:.3f} seconds ({:.2f}%)... "
-            "Waiting for {:.2f} seconds...".format(
-                len(service_list),
-                time_taken,
-                (time_taken / monitoring_interval) * 100,
-                monitoring_interval - time_taken,
-            )
-        )
-
-        if time_taken / monitoring_interval > 0.9:
-            LOGGER.warning(
-                "Monitoring loop is taking {} % of the desired time "
-                "({} seconds)".format(
-                    (time_taken / monitoring_interval) * 100, monitoring_interval
-                )
-            )
-        if monitoring_interval - time_taken > 0:
-            time.sleep(monitoring_interval - time_taken)
-
-
 def create_kpi(client: MonitoringClient, service_id):
     # create kpi
     kpi_description: KpiDescriptor = KpiDescriptor()
@@ -288,7 +210,7 @@ def main():
 
     # runs the async loop in the background
     loop = asyncio.get_event_loop()
-    loop.run_until_complete(monitor_services(service_list))
+    loop.run_until_complete(monitor_services(terminate, service_list))
     # asyncio.create_task(monitor_services(service_list))
 
     # Wait for Ctrl+C or termination signal
diff --git a/src/opticalattackmanager/utils/__init__.py b/src/opticalattackmanager/utils/__init__.py
index e69de29bb..9953c8205 100644
--- a/src/opticalattackmanager/utils/__init__.py
+++ b/src/opticalattackmanager/utils/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/opticalattackmanager/utils/monitor.py b/src/opticalattackmanager/utils/monitor.py
new file mode 100644
index 000000000..6f7907250
--- /dev/null
+++ b/src/opticalattackmanager/utils/monitor.py
@@ -0,0 +1,127 @@
+# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+from typing import List
+
+from common.proto.asyncio.optical_attack_detector_grpc import (
+    OpticalAttackDetectorServiceStub,
+)
+from common.proto.asyncio.optical_attack_detector_pb2 import DetectionRequest
+from common.Settings import get_log_level, get_setting
+from grpclib.client import Channel
+from opticalattackmanager.Config import MONITORING_INTERVAL
+from prometheus_client import Counter, Histogram
+
+# Create a metric to track time spent and requests made.
+# TODO: adjust histogram buckets to more realistic values
+LOOP_TIME = Histogram(
+    "optical_security_loop_seconds", "Time taken by each security loop"
+)
+DROP_COUNTER = Counter(
+    "optical_security_dropped_assessments",
+    "Dropped assessments due to detector timeout",
+)
+log_level = get_log_level()
+logging.basicConfig(level=log_level)
+LOGGER = logging.getLogger(__name__)
+
+
+async def detect_attack(
+    host: str,
+    port: int,
+    context_id: str,
+    service_id: str,
+    kpi_id: str,
+    timeout: float = 10.0,
+) -> None:
+    try:
+        LOGGER.info("Sending request for {}...".format(service_id))
+        async with Channel(host, port) as channel:
+            stub = OpticalAttackDetectorServiceStub(channel)
+
+            request: DetectionRequest = DetectionRequest()
+            request.service_id.context_id.context_uuid.uuid = context_id
+            request.service_id.service_uuid.uuid = str(service_id)
+
+            request.kpi_id.kpi_id.uuid = kpi_id
+
+            await stub.DetectAttack(request, timeout=timeout)
+        LOGGER.info("Monitoring finished for {}".format(service_id))
+    except Exception as e:
+        LOGGER.warning("Exception while processing service_id {}".format(service_id))
+        LOGGER.exception(e)
+        DROP_COUNTER.inc()
+
+
+async def monitor_services(terminate, service_list: List):
+
+    monitoring_interval = int(
+        get_setting("MONITORING_INTERVAL", default=MONITORING_INTERVAL)
+    )
+
+    host = get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_HOST")
+    port = int(get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC"))
+
+    LOGGER.info("Starting execution of the async loop")
+
+    while not terminate.is_set():
+
+        if len(service_list) == 0:
+            LOGGER.debug("No services to monitor...")
+            time.sleep(monitoring_interval)
+            continue
+
+        LOGGER.info("Starting new monitoring cycle...")
+
+        start_time = time.time()
+
+        tasks = []
+        for service in service_list:
+            aw = detect_attack(
+                host,
+                port,
+                service["context"],
+                service["service"],
+                service["kpi"],
+                # allow at most 90% of the monitoring interval to succeed
+                monitoring_interval * 0.9,
+            )
+            tasks.append(aw)
+        [await aw for aw in tasks]
+
+        end_time = time.time()
+
+        time_taken = end_time - start_time
+        LOOP_TIME.observe(time_taken)
+        LOGGER.info(
+            "Monitoring loop with {} services took {:.3f} seconds ({:.2f}%)... "
+            "Waiting for {:.2f} seconds...".format(
+                len(service_list),
+                time_taken,
+                (time_taken / monitoring_interval) * 100,
+                monitoring_interval - time_taken,
+            )
+        )
+
+        if time_taken / monitoring_interval > 0.9:
+            LOGGER.warning(
+                "Monitoring loop is taking {} % of the desired time "
+                "({} seconds)".format(
+                    (time_taken / monitoring_interval) * 100, monitoring_interval
+                )
+            )
+        if monitoring_interval - time_taken > 0:
+            time.sleep(monitoring_interval - time_taken)
-- 
GitLab