# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import time, json from typing import Dict import logging import threading from common.tools.kafka.Variables import KafkaTopic from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict from .messages import create_analyzer LOGGER = logging.getLogger(__name__) ########################### # Tests Implementation of Telemetry Backend ########################### # --- "test_validate_kafka_topics" should be run before the functionality tests --- def test_validate_kafka_topics(): LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") response = KafkaTopic.create_all_topics() assert isinstance(response, bool) def test_StartSparkStreamer(): LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") analyzer_obj = create_analyzer() analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid analyzer_to_generate : Dict = { "algo_name" : analyzer_obj.algorithm_name, "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], "oper_mode" : analyzer_obj.operation_mode, "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), "window_size" : analyzer_obj.parameters["window_size"], "window_slider" : analyzer_obj.parameters["window_slider"], # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] } AnalyticsBackendServiceObj = AnalyticsBackendService() response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) assert isinstance(response, bool) # def test_StartRequestListener(): # LOGGER.info('test_RunRequestListener') # AnalyticsBackendServiceObj = AnalyticsBackendService() # response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) # LOGGER.debug(str(response)) # assert isinstance(response, tuple) # To test START and STOP communication together def test_StopRequestListener(): LOGGER.info('test_RunRequestListener') LOGGER.info('Initiating StartRequestListener...') AnalyticsBackendServiceObj = AnalyticsBackendService() response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) # LOGGER.debug(str(response_thread)) time.sleep(10) LOGGER.info('Initiating StopRequestListener...') AnalyticsBackendServiceObj = AnalyticsBackendService() response = AnalyticsBackendServiceObj.StopRequestListener(response_thread) LOGGER.debug(str(response)) assert isinstance(response, bool) # To independently tests the SparkListener functionality # def test_SparkListener(): # LOGGER.info('test_RunRequestListener') # AnalyticsBackendServiceObj = AnalyticsBackendService() # response = AnalyticsBackendServiceObj.RunSparkStreamer( # get_kpi_id_list(), get_operation_list(), get_threshold_dict() # ) # LOGGER.debug(str(response)) # assert isinstance(response, bool)