Select Git revision
test_frontend.py

Lluis Gifre Renom authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_frontend.py 5.52 KiB
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import json
import pytest
import logging
import threading
from common.Constants import ServiceNameEnum
from common.proto.context_pb2 import Empty
from common.Settings import ( get_service_port_grpc, get_env_var_name,
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC )
from common.tools.kafka.Variables import KafkaTopic
from common.proto.analytics_frontend_pb2 import AnalyzerId, AnalyzerList
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService
from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer,
create_analyzer_filter )
from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
###########################
# Tests Setup
###########################
LOCAL_HOST = '127.0.0.1'
ANALYTICS_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND))
os.environ[get_env_var_name(ServiceNameEnum.ANALYTICSFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.ANALYTICSFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(ANALYTICS_FRONTEND_PORT)
LOGGER = logging.getLogger(__name__)
@pytest.fixture(scope='session')
def analyticsFrontend_service():
LOGGER.info('Initializing AnalyticsFrontendService...')
_service = AnalyticsFrontendService()
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding AnalyticsFrontendService...')
yield _service
LOGGER.info('Terminating AnalyticsFrontendService...')
_service.stop()
LOGGER.info('Terminated AnalyticsFrontendService...')
@pytest.fixture(scope='session')
def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendService):
LOGGER.info('Initializing AnalyticsFrontendClient...')
_client = AnalyticsFrontendClient()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding AnalyticsFrontendClient...')
yield _client
LOGGER.info('Closing AnalyticsFrontendClient...')
_client.close()
LOGGER.info('Closed AnalyticsFrontendClient...')
###########################
# Tests Implementation of Analytics Frontend
###########################
# --- "test_validate_kafka_topics" should be executed before the functionality tests ---
def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
# ----- core funtionality test -----
# def test_StartAnalytics(analyticsFrontend_client):
# LOGGER.info(' >>> test_StartAnalytic START: <<< ')
# response = analyticsFrontend_client.StartAnalyzer(create_analyzer())
# LOGGER.debug(str(response))
# assert isinstance(response, AnalyzerId)
# To test start and stop listener together
def test_StartAnalyzers(analyticsFrontend_client):
LOGGER.info(' >>> test_StartAnalyzers START: <<< ')
added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer())
LOGGER.debug(str(added_analyzer_id))
LOGGER.info(' --> Calling StartResponseListener... ')
class_obj = AnalyticsFrontendServiceServicerImpl()
response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id.uuid)
LOGGER.debug(response)
LOGGER.info("waiting for timer to comlete ...")
time.sleep(3)
LOGGER.info('--> StopAnalyzer')
response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id)
LOGGER.debug(str(response))
# def test_SelectAnalytics(analyticsFrontend_client):
# LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
# response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
# LOGGER.debug(str(response))
# assert isinstance(response, AnalyzerList)
# def test_StopAnalytic(analyticsFrontend_client):
# LOGGER.info(' >>> test_StopAnalytic START: <<< ')
# response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id())
# LOGGER.debug(str(response))
# assert isinstance(response, Empty)
# def test_ResponseListener():
# LOGGER.info(' >>> test_ResponseListener START <<< ')
# analyzer_id = create_analyzer_id()
# LOGGER.debug("Starting Response Listener for Analyzer ID: {:}".format(analyzer_id.analyzer_id.uuid))
# class_obj = AnalyticsFrontendServiceServicerImpl()
# for response in class_obj.StartResponseListener(analyzer_id.analyzer_id.uuid):
# LOGGER.debug(response)
# assert isinstance(response, tuple)