Loading src/telemetry/backend/tests/test_backend.py +81 −32 Original line number Diff line number Diff line Loading @@ -12,28 +12,38 @@ # See the License for the specific language governing permissions and # limitations under the License. import pytest import logging import pytest import time from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService from .messages import create_collector_request, _create_kpi_descriptor, _create_kpi_id from .Fixtures import context_client, device_client, service_client, kpi_manager_client from .add_devices import load_topology from common.tools.context_queries.Topology import get_topology from common.Constants import DEFAULT_CONTEXT_NAME from common.tools.context_queries.Device import get_device, add_device_to_topology # from common.tools.context_queries.EndPoint import get_endpoint_names #from .EndPoint import get_endpoint_names # modofied version of get_endpoint_names from common.tools.kafka.Variables import KafkaTopic from .add_devices import load_topology from .Fixtures import context_client, device_client, service_client, kpi_manager_client from .messages import create_collector_request, _create_kpi_descriptor, _create_kpi_id from common.Constants import DEFAULT_CONTEXT_NAME from common.proto.context_pb2 import EndPointId, DeviceId, TopologyId, ContextId , Empty from common.proto.kpi_manager_pb2 import KpiId from common.tools.context_queries.Device import get_device, add_device_to_topology from common.tools.context_queries.Topology import get_topology from telemetry.backend.service.HelperMethods import get_subscription_parameters from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService from telemetry.backend.service.HelperMethods import get_collector_by_kpi_id from telemetry.backend.service.collector_api.DriverFactory import DriverFactory from telemetry.backend.service.collector_api.DriverInstanceCache import DriverInstanceCache, preload_drivers from telemetry.backend.service.collectors import COLLECTORS from telemetry.backend.service.collectors.gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) ########################### # Tests Implementation of Telemetry Backend ########################### # -------------------------------------------------------------- # -------------------- FIXTURES -------------------------------- # -------------------------------------------------------------- @pytest.fixture(autouse=True) def log_all_methods(request): Loading @@ -45,6 +55,66 @@ def log_all_methods(request): yield LOGGER.info(f" <<<<< Finished test: {request.node.name} ") @pytest.fixture def telemetry_backend_service(): LOGGER.info('Initializing TelemetryBackendService...') KafkaTopic.create_all_topics() driver_factory = DriverFactory(COLLECTORS) driver_instance_cache = DriverInstanceCache(driver_factory) _service = TelemetryBackendService(driver_instance_cache) _service.start() LOGGER.info('Preloading collectors...') preload_drivers(driver_instance_cache) LOGGER.info('Yielding TelemetryBackendService...') yield _service LOGGER.info('Terminating TelemetryBackendService...') _service.stop() LOGGER.info('Terminated TelemetryBackendService...') # -------------------------------------------------------------- # -------------------- TESTS ----------------------------------- # -------------------------------------------------------------- # The following conditions to completly test this method (not for Gitlab CI/CD) # 1. A KPI Descriptor must be added in KPI DB with correct device_id. (gNMI OpenConfig driver) # 2. A collector must be created for the KPI ID. # 3. A container lab should be running with the gNMI OpenConfig device. # def test_generic_collector_handler( # telemetry_backend_service, kpi_manager_client, context_client # ): # telemetry_backend_service.GenericCollectorHandler( # collector_id="xxx", # kpi_id="6e22f180-ba28-4641-b190-2287bf447777", # duration=10.0, # interval=5.0, # stop_event="" # ) # def test_helper_get_subscription_parameters(kpi_manager_client, context_client): # """ # This test validates the correct retrieval of subscription parameters based on a KPI ID. # Preconditions: # - A KPI Descriptor must be added in KPI DB with correct device_id. # """ # kpi_id = "6e22f180-ba28-4641-b190-2287bf447777" # Example KPI ID # subscription_params = get_subscription_parameters( # kpi_id, kpi_manager_client, context_client, 10.0, 5.0 # ) # LOGGER.info(f"Subscription Parameters: {subscription_params}") # assert subscription_params is not None # ------------------------------------------------------------------- # ---------------------------- OLD TESTS ---------------------------- # -------------------------------------------------------------------- # # ----- Add Topology ----- # def test_add_to_topology(context_client, device_client, service_client): # load_topology(context_client, device_client) Loading Loading @@ -158,24 +228,3 @@ def log_all_methods(request): # context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef" # response = context_client.RemoveContext(context_id) # LOGGER.info(f"Context removed: {response}") @pytest.fixture def telemetryBackend_service(): LOGGER.info('Initializing TelemetryBackendService...') _service = TelemetryBackendService() _service.start() LOGGER.info('Yielding TelemetryBackendService...') yield _service LOGGER.info('Terminating TelemetryBackendService...') _service.stop() LOGGER.info('Terminated TelemetryBackendService...') def test_InitiateCollectorBackend(telemetryBackend_service): LOGGER.info(" Backend Initiated Successfully. Waiting for timer to finish ...") time.sleep(30) LOGGER.info(" Backend Timer Finished Successfully. ") Loading
src/telemetry/backend/tests/test_backend.py +81 −32 Original line number Diff line number Diff line Loading @@ -12,28 +12,38 @@ # See the License for the specific language governing permissions and # limitations under the License. import pytest import logging import pytest import time from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService from .messages import create_collector_request, _create_kpi_descriptor, _create_kpi_id from .Fixtures import context_client, device_client, service_client, kpi_manager_client from .add_devices import load_topology from common.tools.context_queries.Topology import get_topology from common.Constants import DEFAULT_CONTEXT_NAME from common.tools.context_queries.Device import get_device, add_device_to_topology # from common.tools.context_queries.EndPoint import get_endpoint_names #from .EndPoint import get_endpoint_names # modofied version of get_endpoint_names from common.tools.kafka.Variables import KafkaTopic from .add_devices import load_topology from .Fixtures import context_client, device_client, service_client, kpi_manager_client from .messages import create_collector_request, _create_kpi_descriptor, _create_kpi_id from common.Constants import DEFAULT_CONTEXT_NAME from common.proto.context_pb2 import EndPointId, DeviceId, TopologyId, ContextId , Empty from common.proto.kpi_manager_pb2 import KpiId from common.tools.context_queries.Device import get_device, add_device_to_topology from common.tools.context_queries.Topology import get_topology from telemetry.backend.service.HelperMethods import get_subscription_parameters from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService from telemetry.backend.service.HelperMethods import get_collector_by_kpi_id from telemetry.backend.service.collector_api.DriverFactory import DriverFactory from telemetry.backend.service.collector_api.DriverInstanceCache import DriverInstanceCache, preload_drivers from telemetry.backend.service.collectors import COLLECTORS from telemetry.backend.service.collectors.gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) ########################### # Tests Implementation of Telemetry Backend ########################### # -------------------------------------------------------------- # -------------------- FIXTURES -------------------------------- # -------------------------------------------------------------- @pytest.fixture(autouse=True) def log_all_methods(request): Loading @@ -45,6 +55,66 @@ def log_all_methods(request): yield LOGGER.info(f" <<<<< Finished test: {request.node.name} ") @pytest.fixture def telemetry_backend_service(): LOGGER.info('Initializing TelemetryBackendService...') KafkaTopic.create_all_topics() driver_factory = DriverFactory(COLLECTORS) driver_instance_cache = DriverInstanceCache(driver_factory) _service = TelemetryBackendService(driver_instance_cache) _service.start() LOGGER.info('Preloading collectors...') preload_drivers(driver_instance_cache) LOGGER.info('Yielding TelemetryBackendService...') yield _service LOGGER.info('Terminating TelemetryBackendService...') _service.stop() LOGGER.info('Terminated TelemetryBackendService...') # -------------------------------------------------------------- # -------------------- TESTS ----------------------------------- # -------------------------------------------------------------- # The following conditions to completly test this method (not for Gitlab CI/CD) # 1. A KPI Descriptor must be added in KPI DB with correct device_id. (gNMI OpenConfig driver) # 2. A collector must be created for the KPI ID. # 3. A container lab should be running with the gNMI OpenConfig device. # def test_generic_collector_handler( # telemetry_backend_service, kpi_manager_client, context_client # ): # telemetry_backend_service.GenericCollectorHandler( # collector_id="xxx", # kpi_id="6e22f180-ba28-4641-b190-2287bf447777", # duration=10.0, # interval=5.0, # stop_event="" # ) # def test_helper_get_subscription_parameters(kpi_manager_client, context_client): # """ # This test validates the correct retrieval of subscription parameters based on a KPI ID. # Preconditions: # - A KPI Descriptor must be added in KPI DB with correct device_id. # """ # kpi_id = "6e22f180-ba28-4641-b190-2287bf447777" # Example KPI ID # subscription_params = get_subscription_parameters( # kpi_id, kpi_manager_client, context_client, 10.0, 5.0 # ) # LOGGER.info(f"Subscription Parameters: {subscription_params}") # assert subscription_params is not None # ------------------------------------------------------------------- # ---------------------------- OLD TESTS ---------------------------- # -------------------------------------------------------------------- # # ----- Add Topology ----- # def test_add_to_topology(context_client, device_client, service_client): # load_topology(context_client, device_client) Loading Loading @@ -158,24 +228,3 @@ def log_all_methods(request): # context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef" # response = context_client.RemoveContext(context_id) # LOGGER.info(f"Context removed: {response}") @pytest.fixture def telemetryBackend_service(): LOGGER.info('Initializing TelemetryBackendService...') _service = TelemetryBackendService() _service.start() LOGGER.info('Yielding TelemetryBackendService...') yield _service LOGGER.info('Terminating TelemetryBackendService...') _service.stop() LOGGER.info('Terminated TelemetryBackendService...') def test_InitiateCollectorBackend(telemetryBackend_service): LOGGER.info(" Backend Initiated Successfully. Waiting for timer to finish ...") time.sleep(30) LOGGER.info(" Backend Timer Finished Successfully. ")