From 500a0a232ec361b669b48cc432e9bd9c332eae8b Mon Sep 17 00:00:00 2001
From: Lluis Gifre <lluis.gifre@cttc.es>
Date: Fri, 11 Feb 2022 09:14:02 +0100
Subject: [PATCH] Multiple issue resolutions:

Scripts:
- added missing commands to script run_tests_locally.sh

Device:
- added key formatting inside unsibscribe from monitoring
- added termination attribute in GetState to properly interrupt monitoring loops

Service:
- arranged service component testing, missing fixtures

Monitoring:
- added missing environment variable INFLUX_PORT
---
 run_tests_locally.sh                          |  6 ++++-
 .../service/DeviceServiceServicerImpl.py      |  3 ++-
 src/device/service/MonitoringLoops.py         |  3 +--
 src/device/service/driver_api/_Driver.py      | 24 ++++++++++++-----
 .../drivers/emulated/EmulatedDriver.py        |  6 +++--
 src/device/service/drivers/p4/p4_driver.py    |  2 +-
 .../transport_api/TransportApiDriver.py       |  6 +++--
 src/monitoring/.gitlab-ci.yml                 |  2 +-
 src/service/tests/test_unitary.py             | 26 ++++++++++++++-----
 9 files changed, 56 insertions(+), 22 deletions(-)

diff --git a/run_tests_locally.sh b/run_tests_locally.sh
index 7885567a2..6e2f3afa2 100755
--- a/run_tests_locally.sh
+++ b/run_tests_locally.sh
@@ -18,8 +18,12 @@ K8S_HOSTNAME="kubernetes-master"
 #kubectl delete namespace $K8S_NAMESPACE
 #kubectl create namespace $K8S_NAMESPACE
 #kubectl --namespace $K8S_NAMESPACE apply -f ../manifests/contextservice.yaml
+#kubectl --namespace $K8S_NAMESPACE apply -f ../manifests/monitoringservice.yaml
+#kubectl create secret generic influxdb-secrets --namespace=$K8S_NAMESPACE --from-literal=INFLUXDB_DB="monitoring" --from-literal=INFLUXDB_ADMIN_USER="teraflow" --from-literal=INFLUXDB_ADMIN_PASSWORD="teraflow" --from-literal=INFLUXDB_HTTP_AUTH_ENABLED="True"
+#kubectl create secret generic monitoring-secrets --namespace=$K8S_NAMESPACE --from-literal=INFLUXDB_DATABASE="monitoring" --from-literal=INFLUXDB_USER="teraflow" --from-literal=INFLUXDB_PASSWORD="teraflow" --from-literal=INFLUXDB_HOSTNAME="localhost"
 #kubectl --namespace $K8S_NAMESPACE expose deployment contextservice --port=6379 --type=NodePort --name=redis-tests
-#echo "Waiting 10 seconds for Redis to start..."
+#kubectl --namespace $K8S_NAMESPACE expose deployment monitoringservice --port=8086 --type=NodePort --name=influx-tests
+#echo "Waiting 10 seconds for Redis/Influx to start..."
 #sleep 10
 export REDIS_SERVICE_HOST=$(kubectl get node $K8S_HOSTNAME -o 'jsonpath={.status.addresses[?(@.type=="InternalIP")].address}')
 export REDIS_SERVICE_PORT=$(kubectl get service redis-tests --namespace $K8S_NAMESPACE -o 'jsonpath={.spec.ports[?(@.port==6379)].nodePort}')
diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py
index 44a343de4..b449b0547 100644
--- a/src/device/service/DeviceServiceServicerImpl.py
+++ b/src/device/service/DeviceServiceServicerImpl.py
@@ -373,7 +373,8 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
                 msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key))
                 raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
 
-            str_endpoint_monitor_kpi_key = key_to_str([device_uuid, db_endpoint_monitor.resource_key], separator=':')
+            endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
+            str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
             db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
                 self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
             if db_endpoint_monitor_kpi is None:
diff --git a/src/device/service/MonitoringLoops.py b/src/device/service/MonitoringLoops.py
index c0682ddc4..64bb12205 100644
--- a/src/device/service/MonitoringLoops.py
+++ b/src/device/service/MonitoringLoops.py
@@ -20,7 +20,7 @@ class MonitoringLoop:
         self._samples_queue = samples_queue
         self._running = threading.Event()
         self._terminate = threading.Event()
-        self._samples_stream = self._driver.GetState(blocking=True)
+        self._samples_stream = self._driver.GetState(blocking=True, terminate=self._terminate)
         self._collector_thread = threading.Thread(target=self._collect, daemon=True)
 
     def _collect(self) -> None:
@@ -38,7 +38,6 @@ class MonitoringLoop:
 
     def stop(self):
         self._terminate.set()
-        #self._samples_stream.close() # leave the work to the garbage collector by now
         self._collector_thread.join()
 
 class MonitoringLoops:
diff --git a/src/device/service/driver_api/_Driver.py b/src/device/service/driver_api/_Driver.py
index 73174ba53..b52cf6498 100644
--- a/src/device/service/driver_api/_Driver.py
+++ b/src/device/service/driver_api/_Driver.py
@@ -1,4 +1,5 @@
-from typing import Any, Iterator, List, Tuple, Union
+import threading
+from typing import Any, Iterator, List, Optional, Tuple, Union
 
 # Special resource names to request to the driver to retrieve the specified configuration/structural resources.
 # These resource names should be used with GetConfig() method.
@@ -129,21 +130,30 @@ class _Driver:
         """
         raise NotImplementedError()
 
-    def GetState(self, blocking=False) -> Iterator[Tuple[float, str, Any]]:
+    def GetState(
+        self, blocking=False, terminate : Optional[threading.Event] = None
+    ) -> Iterator[Tuple[float, str, Any]]:
         """ Retrieve last collected values for subscribed resources. Operates as a generator, so this method should be
             called once and will block until values are available. When values are available, it should yield each of
             them and block again until new values are available. When the driver is destroyed, GetState() can return
-            instead of yield to terminate the loop.
+            instead of yield to terminate the loop. Terminate enables to request interruption of the generation.
             Examples:
                 # keep looping waiting for extra samples (generator loop)
-                for timestamp,resource_key,resource_value in my_driver.GetState(blocking=True):
+                terminate = threading.Event()
+                i = 0
+                for timestamp,resource_key,resource_value in my_driver.GetState(blocking=True, terminate=terminate):
                     process(timestamp, resource_key, resource_value)
+                    i += 1
+                    if i == 10: terminate.set()
 
                 # just retrieve accumulated samples
-                samples = my_driver.GetState(blocking=False)
+                samples = my_driver.GetState(blocking=False, terminate=terminate)
                 # or (as classical loop)
-                for timestamp,resource_key,resource_value in my_driver.GetState(blocking=False):
+                i = 0
+                for timestamp,resource_key,resource_value in my_driver.GetState(blocking=False, terminate=terminate):
                     process(timestamp, resource_key, resource_value)
+                    i += 1
+                    if i == 10: terminate.set()
             Parameters:
                 blocking : bool
                     Select the driver behaviour. In both cases, the driver will first retrieve the samples accumulated
@@ -152,6 +162,8 @@ class _Driver:
                     terminates the loop and returns. Non-blocking behaviour can be used for periodically polling the
                     driver, while blocking can be used when a separate thread is in charge of collecting the samples
                     produced by the driver.
+                terminate : threading.Event
+                    Signals the interruption of the GetState method as soon as possible.
             Returns:
                 results : Iterator[Tuple[float, str, Any]]
                     Sequences of state sample. Each State sample contains a float Unix-like timestamps of the samples in
diff --git a/src/device/service/drivers/emulated/EmulatedDriver.py b/src/device/service/drivers/emulated/EmulatedDriver.py
index a1af85da4..1154e4047 100644
--- a/src/device/service/drivers/emulated/EmulatedDriver.py
+++ b/src/device/service/drivers/emulated/EmulatedDriver.py
@@ -332,8 +332,10 @@ class EmulatedDriver(_Driver):
                 results.append(True)
         return results
 
-    def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]:
-        while not self.__terminate.is_set():
+    def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[str, Any]]:
+        while True:
+            if self.__terminate.is_set(): break
+            if terminate is not None and terminate.is_set(): break
             try:
                 sample = self.__out_samples.get(block=blocking, timeout=0.1)
             except queue.Empty:
diff --git a/src/device/service/drivers/p4/p4_driver.py b/src/device/service/drivers/p4/p4_driver.py
index 3d3abf236..0acd16475 100644
--- a/src/device/service/drivers/p4/p4_driver.py
+++ b/src/device/service/drivers/p4/p4_driver.py
@@ -212,7 +212,7 @@ class P4Driver(_Driver):
         LOGGER.info('P4 GetResource()')
         return ""
 
-    def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]:
+    def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[str, Any]]:
         """
         Retrieves the state of a P4 device.
 
diff --git a/src/device/service/drivers/transport_api/TransportApiDriver.py b/src/device/service/drivers/transport_api/TransportApiDriver.py
index b3e5f4fa3..794a17b99 100644
--- a/src/device/service/drivers/transport_api/TransportApiDriver.py
+++ b/src/device/service/drivers/transport_api/TransportApiDriver.py
@@ -1,5 +1,5 @@
 import logging, requests, threading
-from typing import Any, Iterator, List, Tuple, Union
+from typing import Any, Iterator, List, Optional, Tuple, Union
 from common.type_checkers.Checkers import chk_string, chk_type
 from device.service.driver_api._Driver import _Driver
 from . import ALL_RESOURCE_KEYS
@@ -92,6 +92,8 @@ class TransportApiDriver(_Driver):
         # TODO: TAPI does not support monitoring by now
         return [False for _ in subscriptions]
 
-    def GetState(self, blocking=False) -> Iterator[Tuple[float, str, Any]]:
+    def GetState(
+        self, blocking=False, terminate : Optional[threading.Event] = None
+    ) -> Iterator[Tuple[float, str, Any]]:
         # TODO: TAPI does not support monitoring by now
         return []
diff --git a/src/monitoring/.gitlab-ci.yml b/src/monitoring/.gitlab-ci.yml
index 3fad3a185..af81abe83 100644
--- a/src/monitoring/.gitlab-ci.yml
+++ b/src/monitoring/.gitlab-ci.yml
@@ -39,7 +39,7 @@ unit test monitoring:
     - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
     - docker run --name influxdb -d -p 8086:8086 -e INFLUXDB_DB=$INFLUXDB_DATABASE -e INFLUXDB_ADMIN_USER=$INFLUXDB_USER -e INFLUXDB_ADMIN_PASSWORD=$INFLUXDB_PASSWORD -e INFLUXDB_HTTP_AUTH_ENABLED=True --network=teraflowbridge --rm influxdb:1.8
     - sleep 10
-    - docker run --name $IMAGE_NAME -d -p 7070:7070 --env INFLUXDB_USER=$INFLUXDB_USER --env INFLUXDB_PASSWORD=$INFLUXDB_PASSWORD --env INFLUXDB_DATABASE=$INFLUXDB_DATABASE --env INFLUXDB_HOSTNAME=influxdb -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge --rm $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
+    - docker run --name $IMAGE_NAME -d -p 7070:7070 --env INFLUXDB_USER=$INFLUXDB_USER --env INFLUXDB_PASSWORD=$INFLUXDB_PASSWORD --env INFLUXDB_DATABASE=$INFLUXDB_DATABASE --env INFLUXDB_HOSTNAME=influxdb --env INFLUXDB_PORT=8086 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge --rm $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
     - sleep 30
     - docker ps -a
     - docker logs $IMAGE_NAME
diff --git a/src/service/tests/test_unitary.py b/src/service/tests/test_unitary.py
index 298318a0e..be1c2a69f 100644
--- a/src/service/tests/test_unitary.py
+++ b/src/service/tests/test_unitary.py
@@ -92,8 +92,9 @@ class TestServiceHandlers:
     def test_prepare_environment(
         self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
         contexts, topologies, devices, links,
-        context_client : ContextClient, # pylint: disable=redefined-outer-name
-        device_client : DeviceClient):  # pylint: disable=redefined-outer-name
+        context_client : ContextClient,     # pylint: disable=redefined-outer-name
+        device_client : DeviceClient,       # pylint: disable=redefined-outer-name
+        service_client : ServiceClient):    # pylint: disable=redefined-outer-name
 
         for context in contexts: context_client.SetContext(Context(**context))
         for topology in topologies: context_client.SetTopology(Topology(**topology))
@@ -104,6 +105,8 @@ class TestServiceHandlers:
     def test_service_create_error_cases(
         self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
         contexts, topologies, devices, links,
+        context_client : ContextClient,     # pylint: disable=redefined-outer-name
+        device_client : DeviceClient,       # pylint: disable=redefined-outer-name
         service_client : ServiceClient):    # pylint: disable=redefined-outer-name
 
         with pytest.raises(grpc.RpcError) as e:
@@ -143,6 +146,8 @@ class TestServiceHandlers:
     def test_service_create_correct(
         self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
         contexts, topologies, devices, links,
+        context_client : ContextClient,     # pylint: disable=redefined-outer-name
+        device_client : DeviceClient,       # pylint: disable=redefined-outer-name
         service_client : ServiceClient):    # pylint: disable=redefined-outer-name
 
         service_client.CreateService(Service(**service_descriptor))
@@ -151,7 +156,9 @@ class TestServiceHandlers:
     def test_service_get_created(
         self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
         contexts, topologies, devices, links,
-        context_client : ContextClient):    # pylint: disable=redefined-outer-name
+        context_client : ContextClient,     # pylint: disable=redefined-outer-name
+        device_client : DeviceClient,       # pylint: disable=redefined-outer-name
+        service_client : ServiceClient):    # pylint: disable=redefined-outer-name
 
         service_data = context_client.GetService(ServiceId(**service_id))
         LOGGER.info('service_data = {:s}'.format(grpc_message_to_json_string(service_data)))
@@ -161,6 +168,7 @@ class TestServiceHandlers:
         self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
         contexts, topologies, devices, links,
         context_client : ContextClient,     # pylint: disable=redefined-outer-name
+        device_client : DeviceClient,       # pylint: disable=redefined-outer-name
         service_client : ServiceClient):    # pylint: disable=redefined-outer-name
 
         service_with_settings = copy.deepcopy(service_descriptor)
@@ -181,6 +189,7 @@ class TestServiceHandlers:
         self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
         contexts, topologies, devices, links,
         context_client : ContextClient,     # pylint: disable=redefined-outer-name
+        device_client : DeviceClient,       # pylint: disable=redefined-outer-name
         service_client : ServiceClient):    # pylint: disable=redefined-outer-name
 
         service_with_settings = copy.deepcopy(service_descriptor)
@@ -198,7 +207,9 @@ class TestServiceHandlers:
     def test_service_get_updated(
         self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
         contexts, topologies, devices, links,
-        context_client : ContextClient):    # pylint: disable=redefined-outer-name
+        context_client : ContextClient,     # pylint: disable=redefined-outer-name
+        device_client : DeviceClient,       # pylint: disable=redefined-outer-name
+        service_client : ServiceClient):    # pylint: disable=redefined-outer-name
 
         service_data = context_client.GetService(ServiceId(**service_id))
         LOGGER.info('service_data = {:s}'.format(grpc_message_to_json_string(service_data)))
@@ -207,6 +218,8 @@ class TestServiceHandlers:
     def test_service_delete(
         self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
         contexts, topologies, devices, links,
+        context_client : ContextClient,     # pylint: disable=redefined-outer-name
+        device_client : DeviceClient,       # pylint: disable=redefined-outer-name
         service_client : ServiceClient):    # pylint: disable=redefined-outer-name
 
         service_client.DeleteService(ServiceId(**service_id))
@@ -215,8 +228,9 @@ class TestServiceHandlers:
     def test_cleanup_environment(
         self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
         contexts, topologies, devices, links,
-        context_client : ContextClient, # pylint: disable=redefined-outer-name
-        device_client : DeviceClient):  # pylint: disable=redefined-outer-name
+        context_client : ContextClient,     # pylint: disable=redefined-outer-name
+        device_client : DeviceClient,       # pylint: disable=redefined-outer-name
+        service_client : ServiceClient):    # pylint: disable=redefined-outer-name
 
         for link in links: context_client.RemoveLink(LinkId(**link['link_id']))
         for device in devices: device_client.DeleteDevice(DeviceId(**device['device_id']))
-- 
GitLab