Newer
Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pytest
import logging
# from common.proto.context_pb2 import Empty
from common.Constants import ServiceNameEnum
from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList
from common.proto.context_pb2 import Empty
from common.Settings import (
get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC)
from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService
from telemetry.frontend.tests.Messages import (
create_collector_request, create_collector_id, create_collector_filter)
from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl
###########################
# Tests Setup
###########################
LOCAL_HOST = '127.0.0.1'
TELEMETRY_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND))
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(TELEMETRY_FRONTEND_PORT)
LOGGER = logging.getLogger(__name__)
@pytest.fixture(scope='session')
LOGGER.info('Initializing TelemetryFrontendService...')
_service = TelemetryFrontendService()
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding TelemetryFrontendService...')
yield _service
LOGGER.info('Terminating TelemetryFrontendService...')
_service.stop()
LOGGER.info('Terminated TelemetryFrontendService...')
@pytest.fixture(scope='session')
def telemetryFrontend_client(
telemetryFrontend_service : TelemetryFrontendService
):
LOGGER.info('Initializing TelemetryFrontendClient...')
_client = TelemetryFrontendClient()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding TelemetryFrontendClient...')
yield _client
LOGGER.info('Closing TelemetryFrontendClient...')
_client.close()
LOGGER.info('Closed TelemetryFrontendClient...')
###########################
# Tests Implementation of Telemetry Frontend
###########################
# ------- Re-structuring Test ---------
def test_StartCollector(telemetryFrontend_client):
LOGGER.info(' >>> test_StartCollector START: <<< ')
response = telemetryFrontend_client.StartCollector(create_collector_request())
LOGGER.debug(str(response))
assert isinstance(response, CollectorId)
def test_StopCollector(telemetryFrontend_client):
LOGGER.info(' >>> test_StopCollector START: <<< ')
response = telemetryFrontend_client.StopCollector(create_collector_id())
LOGGER.debug(str(response))
assert isinstance(response, Empty)
def test_SelectCollectors(telemetryFrontend_client):
LOGGER.info(' >>> test_SelectCollectors START: <<< ')
response = telemetryFrontend_client.SelectCollectors(create_collector_filter())
LOGGER.debug(str(response))
assert isinstance(response, CollectorList)
def test_RunResponseListener():
LOGGER.info(' >>> test_RunResponseListener START: <<< ')
TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl()
response = TelemetryFrontendServiceObj.RunResponseListener() # becasue Method "run_kafka_listener" is not define in frontend.proto
LOGGER.debug(str(response))
assert isinstance(response, bool)
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# ------- previous test ----------------
# def test_verify_db_and_table():
# LOGGER.info(' >>> test_verify_database_and_tables START: <<< ')
# _engine = TelemetryEngine.get_engine()
# managementDB.create_database(_engine)
# managementDB.create_tables(_engine)
# def test_StartCollector(telemetryFrontend_client):
# LOGGER.info(' >>> test_StartCollector START: <<< ')
# response = telemetryFrontend_client.StartCollector(create_collector_request())
# LOGGER.debug(str(response))
# assert isinstance(response, CollectorId)
# def test_run_kafka_listener():
# LOGGER.info(' >>> test_run_kafka_listener START: <<< ')
# name_mapping = NameMapping()
# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping)
# response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
# def test_StopCollector(telemetryFrontend_client):
# LOGGER.info(' >>> test_StopCollector START: <<< ')
# _collector_id = telemetryFrontend_client.StartCollector(create_collector_request())
# time.sleep(3) # wait for small amount before call the stopCollecter()
# response = telemetryFrontend_client.StopCollector(_collector_id)
# LOGGER.debug(str(response))
# assert isinstance(response, Empty)
# def test_select_collectors(telemetryFrontend_client):
# LOGGER.info(' >>> test_select_collector requesting <<< ')
# response = telemetryFrontend_client.SelectCollectors(create_collector_filter())
# LOGGER.info('Received Rows after applying Filter: {:} '.format(response))
# LOGGER.debug(str(response))
# assert isinstance(response, CollectorList)