# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import pytest import logging import time from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService from .messages import create_collector_request, _create_kpi_descriptor, _create_kpi_id from .Fixtures import context_client, device_client, service_client, kpi_manager_client from .add_devices import load_topology from common.tools.context_queries.Topology import get_topology from common.Constants import DEFAULT_CONTEXT_NAME from common.tools.context_queries.Device import get_device, add_device_to_topology # from common.tools.context_queries.EndPoint import get_endpoint_names #from .EndPoint import get_endpoint_names # modofied version of get_endpoint_names from common.proto.context_pb2 import EndPointId, DeviceId, TopologyId, ContextId , Empty from common.proto.kpi_manager_pb2 import KpiId LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) ########################### # Tests Implementation of Telemetry Backend ########################### @pytest.fixture(autouse=True) def log_all_methods(request): ''' This fixture logs messages before and after each test function runs, indicating the start and end of the test. The autouse=True parameter ensures that this logging happens automatically for all tests in the module. ''' LOGGER.info(f" >>>>> Starting test: {request.node.name} ") yield LOGGER.info(f" <<<<< Finished test: {request.node.name} ") # # ----- Add Topology ----- # def test_add_to_topology(context_client, device_client, service_client): # load_topology(context_client, device_client) # # ----- Add Device to Topology ------ # def test_add_device_to_topology(context_client): # context_id = ContextId() # context_id.context_uuid.uuid = "43813baf-195e-5da6-af20-b3d0922e71a7" # topology_uuid = "c76135e3-24a8-5e92-9bed-c3c9139359c8" # device_uuid = "69a3a3f0-5237-5f9e-bc96-d450d0c6c03a" # response = add_device_to_topology( context_client = context_client, # context_id = context_id, # topology_uuid = topology_uuid, # device_uuid = device_uuid # ) # LOGGER.info(f"Device added to topology: {response}") # assert response is True # # ----- Get Topology ----- # def test_get_topology(context_client, device_client): # response = get_topology(context_client = context_client, topology_uuid = "test1", context_uuid = "test1") # LOGGER.info(f"Topology: {response}") # assert response is not None # def test_set_kpi_descriptor_and_get_device_id(kpi_manager_client): # kpi_descriptor = _create_kpi_descriptor("1290fb71-bf15-5528-8b69-2d2fabe1fa18") # kpi_id = kpi_manager_client.SetKpiDescriptor(kpi_descriptor) # LOGGER.info(f"KPI Descriptor set: {kpi_id}") # assert kpi_id is not None # response = kpi_manager_client.GetKpiDescriptor(kpi_id) # # response = kpi_manager_client.GetKpiDescriptor(_create_kpi_id()) # assert response is not None # LOGGER.info(f"KPI Descriptor: {response}") # LOGGER.info(f"Device Id: {response.device_id.device_uuid.uuid}") # LOGGER.info(f"Endpoint Id: {response.endpoint_id.endpoint_uuid.uuid}") # # ----- Get endpoint detail using device ID ----- # def test_get_device_details(context_client): # response = get_device(context_client = context_client, device_uuid = "1290fb71-bf15-5528-8b69-2d2fabe1fa18", include_config_rules = False, include_components = False) # if response: # LOGGER.info(f"Device type: {response.device_type}") # for endpoint in response.device_endpoints: # if endpoint.endpoint_id.endpoint_uuid.uuid == '36571df2-bac1-5909-a27d-5f42491d2ff0': # endpoint_dict = {} # kpi_sample_types = [] # # LOGGER.info(f"Endpoint: {endpoint}") # # LOGGER.info(f"Enpoint_uuid: {endpoint.endpoint_id.endpoint_uuid.uuid}") # endpoint_dict["uuid"] = endpoint.endpoint_id.endpoint_uuid.uuid # # LOGGER.info(f"Enpoint_name: {endpoint.name}") # endpoint_dict["name"] = endpoint.name # # LOGGER.info(f"Enpoint_type: {endpoint.endpoint_type}") # endpoint_dict["type"] = endpoint.endpoint_type # for sample_type in endpoint.kpi_sample_types: # # LOGGER.info(f"Enpoint_sample_types: {sample_type}") # kpi_sample_types.append(sample_type) # endpoint_dict["sample_types"] = kpi_sample_types # LOGGER.info(f"Extracted endpoint dict: {endpoint_dict}") # else: # LOGGER.info(f"Endpoint not matched") # LOGGER.info(f"Device Type: {type(response)}") # assert response is not None # # ----- List Conetxts ----- # def test_list_contextIds(context_client): # empty = Empty() # response = context_client.ListContexts(empty) # LOGGER.info(f"Contexts: {response}") # assert response # # ----- List Devices ----- # def test_list_devices(context_client): # empty = Empty() # response = context_client.ListDeviceIds(empty) # LOGGER.info(f"Devices: {response}") # assert response # ----- Get Endpoints ----- TODO: get_endpoint_names method doesn't return KPI samples types # def test_get_endpoints(context_client): # device_id = DeviceId() # device_id.device_uuid.uuid = "1290fb71-bf15-5528-8b69-2d2fabe1fa18" # endpoint_id = EndPointId() # endpoint_id.endpoint_uuid.uuid = "43b817fa-246f-5e0a-a2e3-2aad0b3e16ca" # endpoint_id.device_id.CopyFrom(device_id) # response = get_endpoint_names(context_client = context_client, endpoint_ids = [endpoint_id]) # LOGGER.info(f"Endpoints: {response}") # assert response is not None # # ----- List Topologies ----- # def test_list_topologies(context_client): # context_id = ContextId() # context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef" # respone = context_client.ListTopologies(context_id) # LOGGER.info(f"Topologies: {respone}") # # ----- Remove Topology ----- # def test_remove_topology(context_client): # context_id = ContextId() # context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef" # topology_id = TopologyId() # topology_id.topology_uuid.uuid = "9ef0118c-4bca-5e81-808b-dc8f60e2cda4" # topology_id.context_id.CopyFrom(context_id) # response = context_client.RemoveTopology(topology_id) # LOGGER.info(f"Topology removed: {response}") # # ----- Remove context ----- # def test_remove_context(context_client): # context_id = ContextId() # context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef" # response = context_client.RemoveContext(context_id) # LOGGER.info(f"Context removed: {response}") @pytest.fixture def telemetryBackend_service(): LOGGER.info('Initializing TelemetryBackendService...') _service = TelemetryBackendService() _service.start() LOGGER.info('Yielding TelemetryBackendService...') yield _service LOGGER.info('Terminating TelemetryBackendService...') _service.stop() LOGGER.info('Terminated TelemetryBackendService...') def test_InitiateCollectorBackend(telemetryBackend_service): LOGGER.info(" Backend Initiated Successfully. Waiting for timer to finish ...") time.sleep(30) LOGGER.info(" Backend Timer Finished Successfully. ")