Skip to content
Snippets Groups Projects
test_unitary.py 6.62 KiB
Newer Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import pytest
from typing import Union
from common.proto.context_pb2 import Empty
from common.Constants import ServiceNameEnum
from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList
from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from context.client.ContextClient import ContextClient
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tests.MockServicerImpl_Context import MockServicerImpl_Context
from common.Settings import ( 
    get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC)

from telemetry_frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
from telemetry_frontend.service.TelemetryFrontendService import TelemetryFrontendService
from telemetry_frontend.tests.Messages import create_collector_request

from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache

from monitoring.service.NameMapping import NameMapping

os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
from device.service.drivers import DRIVERS

###########################
# Tests Setup
###########################

LOCAL_HOST = '127.0.0.1'
MOCKSERVICE_PORT = 10000

TELEMETRY_FRONTEND_PORT = MOCKSERVICE_PORT + get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND)
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(TELEMETRY_FRONTEND_PORT)

LOGGER = logging.getLogger(__name__)

class MockContextService(GenericGrpcService):
    # Mock Service implementing Context to simplify unitary tests of Monitoring

    def __init__(self, bind_port: Union[str, int]) -> None:
        super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService')

    # pylint: disable=attribute-defined-outside-init
    def install_servicers(self):
        self.context_servicer = MockServicerImpl_Context()
        add_ContextServiceServicer_to_server(self.context_servicer, self.server)

@pytest.fixture(scope='session')
def context_service():
    LOGGER.info('Initializing MockContextService...')
    _service = MockContextService(MOCKSERVICE_PORT)
    _service.start()
    
    LOGGER.info('Yielding MockContextService...')
    yield _service

    LOGGER.info('Terminating MockContextService...')
    _service.context_servicer.msg_broker.terminate()
    _service.stop()

    LOGGER.info('Terminated MockContextService...')

@pytest.fixture(scope='session')
def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
    LOGGER.info('Initializing ContextClient...')
    _client = ContextClient()
    
    LOGGER.info('Yielding ContextClient...')
    yield _client

    LOGGER.info('Closing ContextClient...')
    _client.close()

    LOGGER.info('Closed ContextClient...')

@pytest.fixture(scope='session')
def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
    LOGGER.info('Initializing DeviceService...')
    driver_factory = DriverFactory(DRIVERS)
    driver_instance_cache = DriverInstanceCache(driver_factory)
    _service = DeviceService(driver_instance_cache)
    _service.start()

    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.info('Yielding DeviceService...')
    yield _service

    LOGGER.info('Terminating DeviceService...')
    _service.stop()

    LOGGER.info('Terminated DeviceService...')

@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
    LOGGER.info('Initializing DeviceClient...')
    _client = DeviceClient()

    LOGGER.info('Yielding DeviceClient...')
    yield _client

    LOGGER.info('Closing DeviceClient...')
    _client.close()

    LOGGER.info('Closed DeviceClient...')

@pytest.fixture(scope='session')
def telemetryFrontend_service(
        context_service : MockContextService,
        device_service  : DeviceService
    ):
    LOGGER.info('Initializing TelemetryFrontendService...')
    name_mapping = NameMapping()

    _service = TelemetryFrontendService(name_mapping)
    _service.start()

    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.info('Yielding TelemetryFrontendService...')
    yield _service

    LOGGER.info('Terminating TelemetryFrontendService...')
    _service.stop()

    LOGGER.info('Terminated TelemetryFrontendService...')

@pytest.fixture(scope='session')
def telemetryFrontend_client(
        telemetryFrontend_service : TelemetryFrontendService
    ):
    LOGGER.info('Initializing TelemetryFrontendClient...')
    _client = TelemetryFrontendClient()

    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.info('Yielding TelemetryFrontendClient...')
    yield _client

    LOGGER.info('Closing TelemetryFrontendClient...')
    _client.close()

    LOGGER.info('Closed TelemetryFrontendClient...')


###########################
# Tests Implementation of Telemetry Frontend
###########################
def test_start_collector(telemetryFrontend_client):
    LOGGER.warning('test_start_collector requesting')
    response = telemetryFrontend_client.StartCollector(create_collector_request('1'))
    LOGGER.debug(str(response))
    assert isinstance(response, CollectorId)

def test_stop_collector(telemetryFrontend_client):
    LOGGER.warning('test_stop_collector requesting')
    response = telemetryFrontend_client.StopCollector('1')
    LOGGER.debug(str(response))
    assert isinstance(response, Empty)

def test_select_collectors(telemetryFrontend_client):
    LOGGER.warning('test_select_collector requesting')
    response = telemetryFrontend_client.SelectCollectors()
    LOGGER.debug(str(response))
    assert isinstance(response, CollectorList)