diff --git a/run_tests_locally.sh b/run_tests_locally.sh index 7885567a2d85f689b7eb6f6251cef276e48c2619..6e2f3afa293d9df2ab3b47b7461834fc1745236c 100755 --- a/run_tests_locally.sh +++ b/run_tests_locally.sh @@ -18,8 +18,12 @@ K8S_HOSTNAME="kubernetes-master" #kubectl delete namespace $K8S_NAMESPACE #kubectl create namespace $K8S_NAMESPACE #kubectl --namespace $K8S_NAMESPACE apply -f ../manifests/contextservice.yaml +#kubectl --namespace $K8S_NAMESPACE apply -f ../manifests/monitoringservice.yaml +#kubectl create secret generic influxdb-secrets --namespace=$K8S_NAMESPACE --from-literal=INFLUXDB_DB="monitoring" --from-literal=INFLUXDB_ADMIN_USER="teraflow" --from-literal=INFLUXDB_ADMIN_PASSWORD="teraflow" --from-literal=INFLUXDB_HTTP_AUTH_ENABLED="True" +#kubectl create secret generic monitoring-secrets --namespace=$K8S_NAMESPACE --from-literal=INFLUXDB_DATABASE="monitoring" --from-literal=INFLUXDB_USER="teraflow" --from-literal=INFLUXDB_PASSWORD="teraflow" --from-literal=INFLUXDB_HOSTNAME="localhost" #kubectl --namespace $K8S_NAMESPACE expose deployment contextservice --port=6379 --type=NodePort --name=redis-tests -#echo "Waiting 10 seconds for Redis to start..." +#kubectl --namespace $K8S_NAMESPACE expose deployment monitoringservice --port=8086 --type=NodePort --name=influx-tests +#echo "Waiting 10 seconds for Redis/Influx to start..." #sleep 10 export REDIS_SERVICE_HOST=$(kubectl get node $K8S_HOSTNAME -o 'jsonpath={.status.addresses[?(@.type=="InternalIP")].address}') export REDIS_SERVICE_PORT=$(kubectl get service redis-tests --namespace $K8S_NAMESPACE -o 'jsonpath={.spec.ports[?(@.port==6379)].nodePort}') diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py index 44a343de475c049dccc46f98a0c8625f0f19db9a..b449b05476ae5388a353642b9394430c97026c95 100644 --- a/src/device/service/DeviceServiceServicerImpl.py +++ b/src/device/service/DeviceServiceServicerImpl.py @@ -373,7 +373,8 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key)) raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - str_endpoint_monitor_kpi_key = key_to_str([device_uuid, db_endpoint_monitor.resource_key], separator=':') + endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key) + str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':') db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object( self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False) if db_endpoint_monitor_kpi is None: diff --git a/src/device/service/MonitoringLoops.py b/src/device/service/MonitoringLoops.py index c0682ddc4ff6171fe6ec89752188b79222a49cef..64bb122055411bd1a2eea442a0a5f06917020d71 100644 --- a/src/device/service/MonitoringLoops.py +++ b/src/device/service/MonitoringLoops.py @@ -20,7 +20,7 @@ class MonitoringLoop: self._samples_queue = samples_queue self._running = threading.Event() self._terminate = threading.Event() - self._samples_stream = self._driver.GetState(blocking=True) + self._samples_stream = self._driver.GetState(blocking=True, terminate=self._terminate) self._collector_thread = threading.Thread(target=self._collect, daemon=True) def _collect(self) -> None: @@ -38,7 +38,6 @@ class MonitoringLoop: def stop(self): self._terminate.set() - #self._samples_stream.close() # leave the work to the garbage collector by now self._collector_thread.join() class MonitoringLoops: diff --git a/src/device/service/driver_api/_Driver.py b/src/device/service/driver_api/_Driver.py index 73174ba53ac6e0b357d3bd790c53f429bea36d87..b52cf6498a5c7169929b7c72f3552702f602c8c7 100644 --- a/src/device/service/driver_api/_Driver.py +++ b/src/device/service/driver_api/_Driver.py @@ -1,4 +1,5 @@ -from typing import Any, Iterator, List, Tuple, Union +import threading +from typing import Any, Iterator, List, Optional, Tuple, Union # Special resource names to request to the driver to retrieve the specified configuration/structural resources. # These resource names should be used with GetConfig() method. @@ -129,21 +130,30 @@ class _Driver: """ raise NotImplementedError() - def GetState(self, blocking=False) -> Iterator[Tuple[float, str, Any]]: + def GetState( + self, blocking=False, terminate : Optional[threading.Event] = None + ) -> Iterator[Tuple[float, str, Any]]: """ Retrieve last collected values for subscribed resources. Operates as a generator, so this method should be called once and will block until values are available. When values are available, it should yield each of them and block again until new values are available. When the driver is destroyed, GetState() can return - instead of yield to terminate the loop. + instead of yield to terminate the loop. Terminate enables to request interruption of the generation. Examples: # keep looping waiting for extra samples (generator loop) - for timestamp,resource_key,resource_value in my_driver.GetState(blocking=True): + terminate = threading.Event() + i = 0 + for timestamp,resource_key,resource_value in my_driver.GetState(blocking=True, terminate=terminate): process(timestamp, resource_key, resource_value) + i += 1 + if i == 10: terminate.set() # just retrieve accumulated samples - samples = my_driver.GetState(blocking=False) + samples = my_driver.GetState(blocking=False, terminate=terminate) # or (as classical loop) - for timestamp,resource_key,resource_value in my_driver.GetState(blocking=False): + i = 0 + for timestamp,resource_key,resource_value in my_driver.GetState(blocking=False, terminate=terminate): process(timestamp, resource_key, resource_value) + i += 1 + if i == 10: terminate.set() Parameters: blocking : bool Select the driver behaviour. In both cases, the driver will first retrieve the samples accumulated @@ -152,6 +162,8 @@ class _Driver: terminates the loop and returns. Non-blocking behaviour can be used for periodically polling the driver, while blocking can be used when a separate thread is in charge of collecting the samples produced by the driver. + terminate : threading.Event + Signals the interruption of the GetState method as soon as possible. Returns: results : Iterator[Tuple[float, str, Any]] Sequences of state sample. Each State sample contains a float Unix-like timestamps of the samples in diff --git a/src/device/service/drivers/emulated/EmulatedDriver.py b/src/device/service/drivers/emulated/EmulatedDriver.py index a1af85da4e442bf7ae491f8b22b5125445a9681a..1154e4047ace1def8abfd29c327558f4fb84b75f 100644 --- a/src/device/service/drivers/emulated/EmulatedDriver.py +++ b/src/device/service/drivers/emulated/EmulatedDriver.py @@ -332,8 +332,10 @@ class EmulatedDriver(_Driver): results.append(True) return results - def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]: - while not self.__terminate.is_set(): + def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[str, Any]]: + while True: + if self.__terminate.is_set(): break + if terminate is not None and terminate.is_set(): break try: sample = self.__out_samples.get(block=blocking, timeout=0.1) except queue.Empty: diff --git a/src/device/service/drivers/p4/p4_driver.py b/src/device/service/drivers/p4/p4_driver.py index 3d3abf236f016608ef93e3d63ab04ac86830da7d..0acd1647595234b3e2396b957ece8eb2e7b2f283 100644 --- a/src/device/service/drivers/p4/p4_driver.py +++ b/src/device/service/drivers/p4/p4_driver.py @@ -212,7 +212,7 @@ class P4Driver(_Driver): LOGGER.info('P4 GetResource()') return "" - def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]: + def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[str, Any]]: """ Retrieves the state of a P4 device. diff --git a/src/device/service/drivers/transport_api/TransportApiDriver.py b/src/device/service/drivers/transport_api/TransportApiDriver.py index b3e5f4fa33f20836629c06968261fb1ceac8f075..794a17b99a3c3c773f04e9964b774ef9fc54cba0 100644 --- a/src/device/service/drivers/transport_api/TransportApiDriver.py +++ b/src/device/service/drivers/transport_api/TransportApiDriver.py @@ -1,5 +1,5 @@ import logging, requests, threading -from typing import Any, Iterator, List, Tuple, Union +from typing import Any, Iterator, List, Optional, Tuple, Union from common.type_checkers.Checkers import chk_string, chk_type from device.service.driver_api._Driver import _Driver from . import ALL_RESOURCE_KEYS @@ -92,6 +92,8 @@ class TransportApiDriver(_Driver): # TODO: TAPI does not support monitoring by now return [False for _ in subscriptions] - def GetState(self, blocking=False) -> Iterator[Tuple[float, str, Any]]: + def GetState( + self, blocking=False, terminate : Optional[threading.Event] = None + ) -> Iterator[Tuple[float, str, Any]]: # TODO: TAPI does not support monitoring by now return [] diff --git a/src/monitoring/.gitlab-ci.yml b/src/monitoring/.gitlab-ci.yml index 3fad3a185ed44c657af4b96ce3f026a8e0b5192b..af81abe83b84c58a558fd6b450e53fb45707e1f7 100644 --- a/src/monitoring/.gitlab-ci.yml +++ b/src/monitoring/.gitlab-ci.yml @@ -39,7 +39,7 @@ unit test monitoring: - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" - docker run --name influxdb -d -p 8086:8086 -e INFLUXDB_DB=$INFLUXDB_DATABASE -e INFLUXDB_ADMIN_USER=$INFLUXDB_USER -e INFLUXDB_ADMIN_PASSWORD=$INFLUXDB_PASSWORD -e INFLUXDB_HTTP_AUTH_ENABLED=True --network=teraflowbridge --rm influxdb:1.8 - sleep 10 - - docker run --name $IMAGE_NAME -d -p 7070:7070 --env INFLUXDB_USER=$INFLUXDB_USER --env INFLUXDB_PASSWORD=$INFLUXDB_PASSWORD --env INFLUXDB_DATABASE=$INFLUXDB_DATABASE --env INFLUXDB_HOSTNAME=influxdb -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge --rm $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG + - docker run --name $IMAGE_NAME -d -p 7070:7070 --env INFLUXDB_USER=$INFLUXDB_USER --env INFLUXDB_PASSWORD=$INFLUXDB_PASSWORD --env INFLUXDB_DATABASE=$INFLUXDB_DATABASE --env INFLUXDB_HOSTNAME=influxdb --env INFLUXDB_PORT=8086 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge --rm $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG - sleep 30 - docker ps -a - docker logs $IMAGE_NAME diff --git a/src/service/tests/test_unitary.py b/src/service/tests/test_unitary.py index 298318a0e2ff4cd8fdf41157863cd4661d8a982c..be1c2a69fa7e1e24804d69cc6e85f136b95f73f2 100644 --- a/src/service/tests/test_unitary.py +++ b/src/service/tests/test_unitary.py @@ -92,8 +92,9 @@ class TestServiceHandlers: def test_prepare_environment( self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, contexts, topologies, devices, links, - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient): # pylint: disable=redefined-outer-name + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + service_client : ServiceClient): # pylint: disable=redefined-outer-name for context in contexts: context_client.SetContext(Context(**context)) for topology in topologies: context_client.SetTopology(Topology(**topology)) @@ -104,6 +105,8 @@ class TestServiceHandlers: def test_service_create_error_cases( self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, contexts, topologies, devices, links, + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name service_client : ServiceClient): # pylint: disable=redefined-outer-name with pytest.raises(grpc.RpcError) as e: @@ -143,6 +146,8 @@ class TestServiceHandlers: def test_service_create_correct( self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, contexts, topologies, devices, links, + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name service_client : ServiceClient): # pylint: disable=redefined-outer-name service_client.CreateService(Service(**service_descriptor)) @@ -151,7 +156,9 @@ class TestServiceHandlers: def test_service_get_created( self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, contexts, topologies, devices, links, - context_client : ContextClient): # pylint: disable=redefined-outer-name + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + service_client : ServiceClient): # pylint: disable=redefined-outer-name service_data = context_client.GetService(ServiceId(**service_id)) LOGGER.info('service_data = {:s}'.format(grpc_message_to_json_string(service_data))) @@ -161,6 +168,7 @@ class TestServiceHandlers: self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, contexts, topologies, devices, links, context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name service_client : ServiceClient): # pylint: disable=redefined-outer-name service_with_settings = copy.deepcopy(service_descriptor) @@ -181,6 +189,7 @@ class TestServiceHandlers: self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, contexts, topologies, devices, links, context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name service_client : ServiceClient): # pylint: disable=redefined-outer-name service_with_settings = copy.deepcopy(service_descriptor) @@ -198,7 +207,9 @@ class TestServiceHandlers: def test_service_get_updated( self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, contexts, topologies, devices, links, - context_client : ContextClient): # pylint: disable=redefined-outer-name + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + service_client : ServiceClient): # pylint: disable=redefined-outer-name service_data = context_client.GetService(ServiceId(**service_id)) LOGGER.info('service_data = {:s}'.format(grpc_message_to_json_string(service_data))) @@ -207,6 +218,8 @@ class TestServiceHandlers: def test_service_delete( self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, contexts, topologies, devices, links, + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name service_client : ServiceClient): # pylint: disable=redefined-outer-name service_client.DeleteService(ServiceId(**service_id)) @@ -215,8 +228,9 @@ class TestServiceHandlers: def test_cleanup_environment( self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, contexts, topologies, devices, links, - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient): # pylint: disable=redefined-outer-name + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + service_client : ServiceClient): # pylint: disable=redefined-outer-name for link in links: context_client.RemoveLink(LinkId(**link['link_id'])) for device in devices: device_client.DeleteDevice(DeviceId(**device['device_id']))