# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import time import pytest import logging from typing import Union from common.proto.context_pb2 import Empty from common.Constants import ServiceNameEnum from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server from context.client.ContextClient import ContextClient from common.tools.service.GenericGrpcService import GenericGrpcService from common.tests.MockServicerImpl_Context import MockServicerImpl_Context from common.Settings import ( get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC) from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl from telemetry.frontend.tests.Messages import ( create_collector_request, create_collector_filter) from telemetry.database.managementDB import managementDB from telemetry.database.TelemetryEngine import TelemetryEngine from device.client.DeviceClient import DeviceClient from device.service.DeviceService import DeviceService from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache from monitoring.service.NameMapping import NameMapping os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE' from device.service.drivers import DRIVERS ########################### # Tests Setup ########################### LOCAL_HOST = '127.0.0.1' MOCKSERVICE_PORT = 10000 TELEMETRY_FRONTEND_PORT = str(MOCKSERVICE_PORT) + str(get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND)) os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(TELEMETRY_FRONTEND_PORT) LOGGER = logging.getLogger(__name__) class MockContextService(GenericGrpcService): # Mock Service implementing Context to simplify unitary tests of Monitoring def __init__(self, bind_port: Union[str, int]) -> None: super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService') # pylint: disable=attribute-defined-outside-init def install_servicers(self): self.context_servicer = MockServicerImpl_Context() add_ContextServiceServicer_to_server(self.context_servicer, self.server) @pytest.fixture(scope='session') def context_service(): LOGGER.info('Initializing MockContextService...') _service = MockContextService(MOCKSERVICE_PORT) _service.start() LOGGER.info('Yielding MockContextService...') yield _service LOGGER.info('Terminating MockContextService...') _service.context_servicer.msg_broker.terminate() _service.stop() LOGGER.info('Terminated MockContextService...') @pytest.fixture(scope='session') def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument LOGGER.info('Initializing ContextClient...') _client = ContextClient() LOGGER.info('Yielding ContextClient...') yield _client LOGGER.info('Closing ContextClient...') _client.close() LOGGER.info('Closed ContextClient...') @pytest.fixture(scope='session') def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument LOGGER.info('Initializing DeviceService...') driver_factory = DriverFactory(DRIVERS) driver_instance_cache = DriverInstanceCache(driver_factory) _service = DeviceService(driver_instance_cache) _service.start() # yield the server, when test finishes, execution will resume to stop it LOGGER.info('Yielding DeviceService...') yield _service LOGGER.info('Terminating DeviceService...') _service.stop() LOGGER.info('Terminated DeviceService...') @pytest.fixture(scope='session') def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument LOGGER.info('Initializing DeviceClient...') _client = DeviceClient() LOGGER.info('Yielding DeviceClient...') yield _client LOGGER.info('Closing DeviceClient...') _client.close() LOGGER.info('Closed DeviceClient...') @pytest.fixture(scope='session') def telemetryFrontend_service( context_service : MockContextService, device_service : DeviceService ): LOGGER.info('Initializing TelemetryFrontendService...') name_mapping = NameMapping() _service = TelemetryFrontendService(name_mapping) _service.start() # yield the server, when test finishes, execution will resume to stop it LOGGER.info('Yielding TelemetryFrontendService...') yield _service LOGGER.info('Terminating TelemetryFrontendService...') _service.stop() LOGGER.info('Terminated TelemetryFrontendService...') @pytest.fixture(scope='session') def telemetryFrontend_client( telemetryFrontend_service : TelemetryFrontendService ): LOGGER.info('Initializing TelemetryFrontendClient...') _client = TelemetryFrontendClient() # yield the server, when test finishes, execution will resume to stop it LOGGER.info('Yielding TelemetryFrontendClient...') yield _client LOGGER.info('Closing TelemetryFrontendClient...') _client.close() LOGGER.info('Closed TelemetryFrontendClient...') ########################### # Tests Implementation of Telemetry Frontend ########################### def test_verify_db_and_table(): LOGGER.info(' >>> test_verify_database_and_tables START: <<< ') _engine = TelemetryEngine.get_engine() managementDB.create_database(_engine) managementDB.create_tables(_engine) def test_StartCollector(telemetryFrontend_client): LOGGER.info(' >>> test_StartCollector START: <<< ') response = telemetryFrontend_client.StartCollector(create_collector_request()) LOGGER.debug(str(response)) assert isinstance(response, CollectorId) def test_run_kafka_listener(): LOGGER.info(' >>> test_run_kafka_listener START: <<< ') name_mapping = NameMapping() TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping) response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto LOGGER.debug(str(response)) assert isinstance(response, bool) def test_StopCollector(telemetryFrontend_client): LOGGER.info(' >>> test_StopCollector START: <<< ') _collector_id = telemetryFrontend_client.StartCollector(create_collector_request()) time.sleep(3) # wait for small amount before call the stopCollecter() response = telemetryFrontend_client.StopCollector(_collector_id) LOGGER.debug(str(response)) assert isinstance(response, Empty) def test_select_collectors(telemetryFrontend_client): LOGGER.info(' >>> test_select_collector requesting <<< ') response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) LOGGER.info('Received Rows after applying Filter: {:} '.format(response)) LOGGER.debug(str(response)) assert isinstance(response, CollectorList)