Skip to content
Snippets Groups Projects
test_frontend.py 5.83 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import pytest
import logging

# from common.proto.context_pb2 import Empty
from common.Constants import ServiceNameEnum
from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList
from common.proto.context_pb2 import Empty
from common.Settings import ( 
    get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC)

from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService
from telemetry.frontend.tests.Messages import (
     create_collector_request, create_collector_id, create_collector_filter)
from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl


###########################
# Tests Setup
###########################

LOCAL_HOST = '127.0.0.1'

TELEMETRY_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND))
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(TELEMETRY_FRONTEND_PORT)

LOGGER = logging.getLogger(__name__)

@pytest.fixture(scope='session')
def telemetryFrontend_service():
    LOGGER.info('Initializing TelemetryFrontendService...')

    _service = TelemetryFrontendService()
    _service.start()

    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.info('Yielding TelemetryFrontendService...')
    yield _service

    LOGGER.info('Terminating TelemetryFrontendService...')
    _service.stop()

    LOGGER.info('Terminated TelemetryFrontendService...')

@pytest.fixture(scope='session')
def telemetryFrontend_client(
        telemetryFrontend_service : TelemetryFrontendService
    ):
    LOGGER.info('Initializing TelemetryFrontendClient...')
    _client = TelemetryFrontendClient()

    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.info('Yielding TelemetryFrontendClient...')
    yield _client

    LOGGER.info('Closing TelemetryFrontendClient...')
    _client.close()

    LOGGER.info('Closed TelemetryFrontendClient...')


###########################
# Tests Implementation of Telemetry Frontend
###########################

# ------- Re-structuring Test ---------
def test_StartCollector(telemetryFrontend_client):
    LOGGER.info(' >>> test_StartCollector START: <<< ')
    response = telemetryFrontend_client.StartCollector(create_collector_request())
    LOGGER.debug(str(response))
    assert isinstance(response, CollectorId)

def test_StopCollector(telemetryFrontend_client):
    LOGGER.info(' >>> test_StopCollector START: <<< ')
    response = telemetryFrontend_client.StopCollector(create_collector_id())
    LOGGER.debug(str(response))
    assert isinstance(response, Empty)

def test_SelectCollectors(telemetryFrontend_client):
    LOGGER.info(' >>> test_SelectCollectors START: <<< ')
    response = telemetryFrontend_client.SelectCollectors(create_collector_filter())
    LOGGER.debug(str(response))
    assert isinstance(response, CollectorList)

def test_RunResponseListener():
    LOGGER.info(' >>> test_RunResponseListener START: <<< ')
    TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl()
    response = TelemetryFrontendServiceObj.RunResponseListener()     # becasue Method "run_kafka_listener" is not define in frontend.proto
    LOGGER.debug(str(response))
    assert isinstance(response, bool)

# ------- previous test ----------------

# def test_verify_db_and_table():
#     LOGGER.info(' >>> test_verify_database_and_tables START: <<< ')
#     _engine = TelemetryEngine.get_engine()
#     managementDB.create_database(_engine)
#     managementDB.create_tables(_engine)

# def test_StartCollector(telemetryFrontend_client):
#     LOGGER.info(' >>> test_StartCollector START: <<< ')
#     response = telemetryFrontend_client.StartCollector(create_collector_request())
#     LOGGER.debug(str(response))
#     assert isinstance(response, CollectorId)

# def test_run_kafka_listener():
#     LOGGER.info(' >>> test_run_kafka_listener START: <<< ')
#     name_mapping = NameMapping()
#     TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping)
#     response = TelemetryFrontendServiceObj.run_kafka_listener()     # Method "run_kafka_listener" is not define in frontend.proto
#     LOGGER.debug(str(response))
#     assert isinstance(response, bool)

# def test_StopCollector(telemetryFrontend_client):
#     LOGGER.info(' >>> test_StopCollector START: <<< ')
#     _collector_id = telemetryFrontend_client.StartCollector(create_collector_request())
#     time.sleep(3)   # wait for small amount before call the stopCollecter()
#     response = telemetryFrontend_client.StopCollector(_collector_id)
#     LOGGER.debug(str(response))
#     assert isinstance(response, Empty)

# def test_select_collectors(telemetryFrontend_client):
#     LOGGER.info(' >>> test_select_collector requesting <<< ')
#     response = telemetryFrontend_client.SelectCollectors(create_collector_filter())
#     LOGGER.info('Received Rows after applying Filter: {:} '.format(response))
#     LOGGER.debug(str(response))
#     assert isinstance(response, CollectorList)