Commit 500a0a23 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Multiple issue resolutions:

Scripts:
- added missing commands to script run_tests_locally.sh

Device:
- added key formatting inside unsibscribe from monitoring
- added termination attribute in GetState to properly interrupt monitoring loops

Service:
- arranged service component testing, missing fixtures

Monitoring:
- added missing environment variable INFLUX_PORT
parent f71a63fd
Loading
Loading
Loading
Loading
+5 −1
Original line number Diff line number Diff line
@@ -18,8 +18,12 @@ K8S_HOSTNAME="kubernetes-master"
#kubectl delete namespace $K8S_NAMESPACE
#kubectl create namespace $K8S_NAMESPACE
#kubectl --namespace $K8S_NAMESPACE apply -f ../manifests/contextservice.yaml
#kubectl --namespace $K8S_NAMESPACE apply -f ../manifests/monitoringservice.yaml
#kubectl create secret generic influxdb-secrets --namespace=$K8S_NAMESPACE --from-literal=INFLUXDB_DB="monitoring" --from-literal=INFLUXDB_ADMIN_USER="teraflow" --from-literal=INFLUXDB_ADMIN_PASSWORD="teraflow" --from-literal=INFLUXDB_HTTP_AUTH_ENABLED="True"
#kubectl create secret generic monitoring-secrets --namespace=$K8S_NAMESPACE --from-literal=INFLUXDB_DATABASE="monitoring" --from-literal=INFLUXDB_USER="teraflow" --from-literal=INFLUXDB_PASSWORD="teraflow" --from-literal=INFLUXDB_HOSTNAME="localhost"
#kubectl --namespace $K8S_NAMESPACE expose deployment contextservice --port=6379 --type=NodePort --name=redis-tests
#echo "Waiting 10 seconds for Redis to start..."
#kubectl --namespace $K8S_NAMESPACE expose deployment monitoringservice --port=8086 --type=NodePort --name=influx-tests
#echo "Waiting 10 seconds for Redis/Influx to start..."
#sleep 10
export REDIS_SERVICE_HOST=$(kubectl get node $K8S_HOSTNAME -o 'jsonpath={.status.addresses[?(@.type=="InternalIP")].address}')
export REDIS_SERVICE_PORT=$(kubectl get service redis-tests --namespace $K8S_NAMESPACE -o 'jsonpath={.spec.ports[?(@.port==6379)].nodePort}')
+2 −1
Original line number Diff line number Diff line
@@ -373,7 +373,8 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
                msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            str_endpoint_monitor_kpi_key = key_to_str([device_uuid, db_endpoint_monitor.resource_key], separator=':')
            endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
            str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
            db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
                self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
            if db_endpoint_monitor_kpi is None:
+1 −2
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ class MonitoringLoop:
        self._samples_queue = samples_queue
        self._running = threading.Event()
        self._terminate = threading.Event()
        self._samples_stream = self._driver.GetState(blocking=True)
        self._samples_stream = self._driver.GetState(blocking=True, terminate=self._terminate)
        self._collector_thread = threading.Thread(target=self._collect, daemon=True)

    def _collect(self) -> None:
@@ -38,7 +38,6 @@ class MonitoringLoop:

    def stop(self):
        self._terminate.set()
        #self._samples_stream.close() # leave the work to the garbage collector by now
        self._collector_thread.join()

class MonitoringLoops:
+18 −6
Original line number Diff line number Diff line
from typing import Any, Iterator, List, Tuple, Union
import threading
from typing import Any, Iterator, List, Optional, Tuple, Union

# Special resource names to request to the driver to retrieve the specified configuration/structural resources.
# These resource names should be used with GetConfig() method.
@@ -129,21 +130,30 @@ class _Driver:
        """
        raise NotImplementedError()

    def GetState(self, blocking=False) -> Iterator[Tuple[float, str, Any]]:
    def GetState(
        self, blocking=False, terminate : Optional[threading.Event] = None
    ) -> Iterator[Tuple[float, str, Any]]:
        """ Retrieve last collected values for subscribed resources. Operates as a generator, so this method should be
            called once and will block until values are available. When values are available, it should yield each of
            them and block again until new values are available. When the driver is destroyed, GetState() can return
            instead of yield to terminate the loop.
            instead of yield to terminate the loop. Terminate enables to request interruption of the generation.
            Examples:
                # keep looping waiting for extra samples (generator loop)
                for timestamp,resource_key,resource_value in my_driver.GetState(blocking=True):
                terminate = threading.Event()
                i = 0
                for timestamp,resource_key,resource_value in my_driver.GetState(blocking=True, terminate=terminate):
                    process(timestamp, resource_key, resource_value)
                    i += 1
                    if i == 10: terminate.set()

                # just retrieve accumulated samples
                samples = my_driver.GetState(blocking=False)
                samples = my_driver.GetState(blocking=False, terminate=terminate)
                # or (as classical loop)
                for timestamp,resource_key,resource_value in my_driver.GetState(blocking=False):
                i = 0
                for timestamp,resource_key,resource_value in my_driver.GetState(blocking=False, terminate=terminate):
                    process(timestamp, resource_key, resource_value)
                    i += 1
                    if i == 10: terminate.set()
            Parameters:
                blocking : bool
                    Select the driver behaviour. In both cases, the driver will first retrieve the samples accumulated
@@ -152,6 +162,8 @@ class _Driver:
                    terminates the loop and returns. Non-blocking behaviour can be used for periodically polling the
                    driver, while blocking can be used when a separate thread is in charge of collecting the samples
                    produced by the driver.
                terminate : threading.Event
                    Signals the interruption of the GetState method as soon as possible.
            Returns:
                results : Iterator[Tuple[float, str, Any]]
                    Sequences of state sample. Each State sample contains a float Unix-like timestamps of the samples in
+4 −2
Original line number Diff line number Diff line
@@ -332,8 +332,10 @@ class EmulatedDriver(_Driver):
                results.append(True)
        return results

    def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]:
        while not self.__terminate.is_set():
    def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[str, Any]]:
        while True:
            if self.__terminate.is_set(): break
            if terminate is not None and terminate.is_set(): break
            try:
                sample = self.__out_samples.get(block=blocking, timeout=0.1)
            except queue.Empty:
Loading