Newer
Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pytest
import logging
# from common.proto.context_pb2 import Empty
from common.Constants import ServiceNameEnum
from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList
from common.Settings import (
get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC)
from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService
from telemetry.frontend.tests.Messages import (
create_collector_request, create_collector_filter)
###########################
# Tests Setup
###########################
LOCAL_HOST = '127.0.0.1'
TELEMETRY_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND))
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(TELEMETRY_FRONTEND_PORT)
LOGGER = logging.getLogger(__name__)
@pytest.fixture(scope='session')
LOGGER.info('Initializing TelemetryFrontendService...')
_service = TelemetryFrontendService()
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding TelemetryFrontendService...')
yield _service
LOGGER.info('Terminating TelemetryFrontendService...')
_service.stop()
LOGGER.info('Terminated TelemetryFrontendService...')
@pytest.fixture(scope='session')
def telemetryFrontend_client(
telemetryFrontend_service : TelemetryFrontendService
):
LOGGER.info('Initializing TelemetryFrontendClient...')
_client = TelemetryFrontendClient()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding TelemetryFrontendClient...')
yield _client
LOGGER.info('Closing TelemetryFrontendClient...')
_client.close()
LOGGER.info('Closed TelemetryFrontendClient...')
###########################
# Tests Implementation of Telemetry Frontend
###########################
# ------- Re-structuring Test ---------
def test_StartCollector(telemetryFrontend_client):
LOGGER.info(' >>> test_StartCollector START: <<< ')
response = telemetryFrontend_client.StartCollector(create_collector_request())
LOGGER.debug(str(response))
assert isinstance(response, CollectorId)
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# ------- previous test ----------------
# def test_verify_db_and_table():
# LOGGER.info(' >>> test_verify_database_and_tables START: <<< ')
# _engine = TelemetryEngine.get_engine()
# managementDB.create_database(_engine)
# managementDB.create_tables(_engine)
# def test_StartCollector(telemetryFrontend_client):
# LOGGER.info(' >>> test_StartCollector START: <<< ')
# response = telemetryFrontend_client.StartCollector(create_collector_request())
# LOGGER.debug(str(response))
# assert isinstance(response, CollectorId)
# def test_run_kafka_listener():
# LOGGER.info(' >>> test_run_kafka_listener START: <<< ')
# name_mapping = NameMapping()
# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping)
# response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
# def test_StopCollector(telemetryFrontend_client):
# LOGGER.info(' >>> test_StopCollector START: <<< ')
# _collector_id = telemetryFrontend_client.StartCollector(create_collector_request())
# time.sleep(3) # wait for small amount before call the stopCollecter()
# response = telemetryFrontend_client.StopCollector(_collector_id)
# LOGGER.debug(str(response))
# assert isinstance(response, Empty)
# def test_select_collectors(telemetryFrontend_client):
# LOGGER.info(' >>> test_select_collector requesting <<< ')
# response = telemetryFrontend_client.SelectCollectors(create_collector_filter())
# LOGGER.info('Received Rows after applying Filter: {:} '.format(response))
# LOGGER.debug(str(response))
# assert isinstance(response, CollectorList)