diff --git a/scripts/run_tests_locally-analytics-frontend.sh b/scripts/run_tests_locally-analytics-frontend.sh index 0cb4dc98da051ba27347be3534f070b80f51fd65..2c18296cf86ae4a00904fb684e2de5c56da9a2ea 100755 --- a/scripts/run_tests_locally-analytics-frontend.sh +++ b/scripts/run_tests_locally-analytics-frontend.sh @@ -18,8 +18,10 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc + export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require" + python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \ analytics/frontend/tests/test_frontend.py diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 10f1f75ea6e571548492272862890962ce5be9cd..5c924bfc5cc72dbde89faf1df8b17a90aba4f345 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -20,7 +20,7 @@ import threading from common.tools.service.GenericGrpcService import GenericGrpcService from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from confluent_kafka import Consumer -from confluent_kafka import KafkaError +from confluent_kafka import KafkaError, KafkaException from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc from analytics.backend.service.Streamer import DaskStreamer @@ -32,7 +32,7 @@ LOGGER = logging.getLogger(__name__) class AnalyticsBackendService(GenericGrpcService): """ AnalyticsBackendService class is responsible for handling the requests from the AnalyticsFrontendService. - It listens to the Kafka topic for the requests and starts/stops the DaskStreamer accordingly. + It listens to the Kafka topic for the requests to start and stop the Streamer accordingly. It also initializes the Kafka producer and Dask cluster for the streamer. """ def __init__(self, cls_name : str = __name__, n_workers=1, threads_per_worker=1 @@ -62,34 +62,36 @@ class AnalyticsBackendService(GenericGrpcService): listener for requests on Kafka topic. """ LOGGER.info("Request Listener is initiated ...") - # print ("Request Listener is initiated ...") consumer = self.request_consumer consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) while True: - receive_msg = consumer.poll(2.0) - if receive_msg is None: + message = consumer.poll(2.0) + if message is None: continue - elif receive_msg.error(): - if receive_msg.error().code() == KafkaError._PARTITION_EOF: - continue - else: - LOGGER.error("Consumer error: {:}".format(receive_msg.error())) - break + elif message.error(): + if message.error().code() == KafkaError._PARTITION_EOF: + LOGGER.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}") + break + elif message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART: + LOGGER.error(f"Subscribed topic {message.topic()} does not exist. May be topic does not have any messages.") + continue + elif message.error(): + raise KafkaException(message.error()) try: - analyzer = json.loads(receive_msg.value().decode('utf-8')) - analyzer_uuid = receive_msg.key().decode('utf-8') + analyzer = json.loads(message.value().decode('utf-8')) + analyzer_uuid = message.key().decode('utf-8') LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: if self.StopStreamer(analyzer_uuid): LOGGER.info("Dask Streamer stopped.") else: - LOGGER.error("Failed to stop Dask Streamer.") + LOGGER.warning("Failed to stop Dask Streamer. May be already terminated...") else: if self.StartStreamer(analyzer_uuid, analyzer): LOGGER.info("Dask Streamer started.") else: - LOGGER.error("Failed to start Dask Streamer.") + LOGGER.warning("Failed to start Dask Streamer.") except Exception as e: LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) @@ -117,14 +119,17 @@ class AnalyticsBackendService(GenericGrpcService): LOGGER.info(f"Streamer started with analyzer Id: {analyzer_uuid}") # Stop the streamer after the given duration - if analyzer['duration'] > 0: + duration = analyzer['duration'] + if duration > 0: def stop_after_duration(): - time.sleep(analyzer['duration']) - LOGGER.warning(f"Execution duration ({analyzer['duration']}) completed of Analyzer: {analyzer_uuid}") + time.sleep(duration) + LOGGER.warning(f"Execution duration ({duration}) completed of Analyzer: {analyzer_uuid}") if not self.StopStreamer(analyzer_uuid): LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.") - duration_thread = threading.Thread(target=stop_after_duration, daemon=True) + duration_thread = threading.Thread( + target=stop_after_duration, daemon=True, name=f"stop_after_duration_{analyzer_uuid}" + ) duration_thread.start() self.active_streamers[analyzer_uuid] = streamer @@ -140,7 +145,7 @@ class AnalyticsBackendService(GenericGrpcService): try: if analyzer_uuid not in self.active_streamers: LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid)) - return False + return True LOGGER.info(f"Terminating streamer with Analyzer Id: {analyzer_uuid}") streamer = self.active_streamers[analyzer_uuid] streamer.stop() diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py index e1eaffc49b43c7849f4a1d399d22246dc05609fd..10917f002ed306ea408a20b92e94aed597ef1ea3 100644 --- a/src/analytics/backend/service/Streamer.py +++ b/src/analytics/backend/service/Streamer.py @@ -74,6 +74,9 @@ class DaskStreamer(threading.Thread): if message.error(): if message.error().code() == KafkaError._PARTITION_EOF: logger.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}") + elif message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART: + logger.error(f"Subscribed topic {message.topic()} does not exist. May be topic does not have any messages.") + continue elif message.error(): raise KafkaException(message.error()) else: diff --git a/src/analytics/backend/tests/messages_analyzer.py b/src/analytics/backend/tests/messages_analyzer.py index 11210ded12cc993041c8f872b147213f70eb67a0..813b4f06cdbaf1dd1c00c4d0f0dfa4b78339e09c 100644 --- a/src/analytics/backend/tests/messages_analyzer.py +++ b/src/analytics/backend/tests/messages_analyzer.py @@ -34,6 +34,9 @@ def get_thresholds(): def get_duration(): return 90 +def get_batch_duration(): + return 30 + def get_windows_size(): return None diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 91d53000d408981896607a0b4af974100a916d15..1bbfaee136942bb7f14e77438683e8460f0a4f0b 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -97,14 +97,15 @@ def analytics_service(mock_kafka_producer, mock_dask_cluster, mock_dask_client, @pytest.fixture def analyzer_data(): return { - 'algo_name' : 'test_algorithm', - 'oper_mode' : 'test_mode', - 'input_kpis' : get_input_kpi_list(), - 'output_kpis': get_output_kpi_list(), - 'thresholds' : get_thresholds(), - 'batch_size' : get_batch_size(), - 'window_size': get_windows_size(), - 'duration' : get_duration(), + 'algo_name' : 'test_algorithm', + 'oper_mode' : 'test_mode', + 'input_kpis' : get_input_kpi_list(), + 'output_kpis' : get_output_kpi_list(), + 'thresholds' : get_thresholds(), + 'duration' : get_duration(), + 'batch_size_min' : get_batch_size(), + 'window_size' : get_windows_size(), + 'batch_duration_min' : get_duration(), } def test_start_streamer(analytics_service, analyzer_data): @@ -122,7 +123,8 @@ def test_stop_streamer(analytics_service, analyzer_data): assert analyzer_uuid in analytics_service.active_streamers # Stop streamer - result = analytics_service.StopStreamer(analyzer_uuid) + with patch('time.sleep', return_value=None): + result = analytics_service.StopStreamer(analyzer_uuid) assert result is True assert analyzer_uuid not in analytics_service.active_streamers @@ -246,7 +248,7 @@ def test_run_with_valid_consumer(dask_streamer): assert len(dask_streamer.batch) == 0 # Batch should be cleared after processing mock_task_handler_selector.assert_called_once() # Task handler should be called once - mock_poll.assert_any_call(timeout=2.0) # Poll should have been called at least once + mock_poll.assert_any_call(timeout=1.0) # Poll should have been called at least once # # add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py def test_aggregation_handler(): @@ -282,20 +284,35 @@ def test_threshold_handler(): ########################### # This is a local machine test to check the integration of the backend service with the Streamer -# --- "test_validate_kafka_topics" should be run before the functionality tests --- +# @pytest.fixture(scope='session') +# def analyticBackend_service(): +# logger.info('Initializing AnalyticsBackendService...') + +# _service = AnalyticsBackendService() +# _service.start() + +# logger.info('Yielding AnalyticsBackendService...') +# yield _service + +# logger.info('Terminating AnalyticsBackendService...') +# _service.stop() +# logger.info('Terminated AnalyticsBackendService...') + + +# # --- "test_validate_kafka_topics" should be run before the functionality tests --- # def test_validate_kafka_topics(): # logger.debug(" >>> test_validate_kafka_topics: START <<< ") # response = KafkaTopic.create_all_topics() # assert isinstance(response, bool) -# def test_backend_integration_with_analyzer(): -# backendServiceObject = AnalyticsBackendService() -# backendServiceObject.install_servicers() +# def test_backend_integration_with_frontend(analyticBackend_service: AnalyticsBackendService): +# # backendServiceObject = AnalyticsBackendService() +# # backendServiceObject.install_servicers() # logger.info(" waiting for 2 minutes for the backend service before termination ... ") -# time.sleep(150) +# time.sleep(300) # logger.info(" Initiating stop collector ... ") -# status = backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666") -# backendServiceObject.close() +# status = analyticBackend_service.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666") +# analyticBackend_service.close() # assert isinstance(status, bool) # assert status == True # logger.info(" Backend service terminated successfully ... ") diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index fd5bcd185b1f9945eccf583c33af2a243fe729be..cd20503e7dbe1059b2209e4b0ccd29a229e7916e 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -46,7 +46,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartAnalyzer(self, - request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore + request : Analyzer, context: grpc.ServicerContext # type: ignore ) -> AnalyzerId: # type: ignore LOGGER.info ("At Service gRPC message: {:}".format(request)) response = AnalyzerId() @@ -65,14 +65,18 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): """ analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid analyzer_to_generate : Dict = { - "algo_name" : analyzer_obj.algorithm_name, - "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], - "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], - "oper_mode" : analyzer_obj.operation_mode, - "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), - "window_size" : analyzer_obj.parameters["window_size"], - "window_slider" : analyzer_obj.parameters["window_slider"], - # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] + "algo_name" : analyzer_obj.algorithm_name, + "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], + "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], + "oper_mode" : analyzer_obj.operation_mode, + "duration" : analyzer_obj.duration_s, + "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), + "window_size" : analyzer_obj.parameters["window_size"], # slider window size in seconds (single batch execution time) + "window_slider" : analyzer_obj.parameters["window_slider"], # slider shift in seconds + "batch_size_min" : analyzer_obj.batch_min_size, # currently implemented + "batch_size_max" : analyzer_obj.batch_max_size, + "batch_duration_min" : analyzer_obj.batch_min_duration_s, # currently implemented + "batch_interval_max" : analyzer_obj.batch_max_duration_s } self.kafka_producer.produce( KafkaTopic.ANALYTICS_REQUEST.value, @@ -137,7 +141,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StopAnalyzer(self, - request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore + request : AnalyzerId, context: grpc.ServicerContext # type: ignore ) -> Empty: # type: ignore LOGGER.info ("At Service gRPC message: {:}".format(request)) try: @@ -181,7 +185,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectAnalyzers(self, - filter : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore + filter : AnalyzerFilter, context: grpc.ServicerContext # type: ignore ) -> AnalyzerList: # type: ignore LOGGER.info("At Service gRPC message: {:}".format(filter)) response = AnalyzerList() @@ -202,7 +206,5 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) - # print ('Message delivery failed: {:}'.format(err)) else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) - # print('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py index 4df6070bedffd91402953bbbbbec16ce0118008c..326bc0be22c0d0e01ccdd79b439b82a88d06e0ad 100644 --- a/src/analytics/frontend/tests/messages.py +++ b/src/analytics/frontend/tests/messages.py @@ -20,43 +20,77 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze def create_analyzer_id(): _create_analyzer_id = AnalyzerId() - # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) # _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" - _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + # _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" return _create_analyzer_id def create_analyzer(): _create_analyzer = Analyzer() - # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) - _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" - _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" + + _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + # _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.algorithm_name = "Test_new_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING - _kpi_id = KpiId() # input IDs to analyze - _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id = KpiId() + + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" _create_analyzer.input_kpi_ids.append(_kpi_id) - _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.input_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.input_kpi_ids.append(_kpi_id) + # output IDs after analysis - _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id = KpiId() + + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf181818" + _create_analyzer.output_kpi_ids.append(_kpi_id) + + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf441616" _create_analyzer.output_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.output_kpi_ids.append(_kpi_id) + # parameter + # _threshold_dict = { + # 'mean_value' :[20, 30], 'min_value' :[00, 10], 'max_value' :[45, 50], + # 'first_value' :[00, 10], 'last_value' :[40, 50], 'std_value' :[00, 10] + # } _threshold_dict = { - 'mean_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), - 'first_value' :(00, 10), 'last_value' :(40, 50), 'std_value':(00, 10) - } + "task_type": Handlers.AGGREGATION_HANDLER.value, + "task_parameter": [ + {"last": [40, 80], "variance": [300, 500]}, + {"count": [2, 4], "max": [70, 100]}, + {"min": [10, 20], "avg": [50, 70]}, + ], + } + _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) - _create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" - _create_analyzer.parameters['window_slider'] = "5s" # should be less than window size - _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet + _create_analyzer.parameters['window_size'] = "0" # slider window size in seconds (Total time for aggeration processing) + _create_analyzer.parameters['window_slider'] = "0" # should be less than window size + _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet + # duration of the analyzer + _create_analyzer.duration_s = 90 + + # batch window size + _create_analyzer.batch_min_duration_s = 20 + _create_analyzer.batch_max_duration_s = 50 + + # batch size + _create_analyzer.batch_min_size = 5 + _create_analyzer.batch_max_size = 10 + return _create_analyzer def create_analyzer_filter(): @@ -84,3 +118,10 @@ def create_analyzer_filter(): # _create_analyzer_filter.input_kpi_ids.append(_output_kpi_id_obj) return _create_analyzer_filter + + +# Added for testing to remove the dependency on the backend service +from enum import Enum +class Handlers(Enum): + AGGREGATION_HANDLER = "AggregationHandler" + UNSUPPORTED_HANDLER = "UnsupportedHandler" diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index 134871fb77719e4747b6fc3ae6cfd21dd317a31f..7d8a08d3ad2d82758b088a8f83342c2b3929eadd 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -78,6 +78,15 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic LOGGER.info('Closed AnalyticsFrontendClient...') +@pytest.fixture(autouse=True) +def log_all_methods(request): + ''' + This fixture logs messages before and after each test function runs, indicating the start and end of the test. + The autouse=True parameter ensures that this logging happens automatically for all tests in the module. + ''' + LOGGER.info(f" >>>>> Starting test: {request.node.name} ") + yield + LOGGER.info(f" <<<<< Finished test: {request.node.name} ") ########################### # Tests Implementation of Analytics Frontend @@ -89,24 +98,17 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) -# ----- core funtionality test ----- -# def test_StartAnalytics(analyticsFrontend_client): -# LOGGER.info(' >>> test_StartAnalytic START: <<< ') -# response = analyticsFrontend_client.StartAnalyzer(create_analyzer()) -# LOGGER.debug(str(response)) -# assert isinstance(response, AnalyzerId) - # To test start and stop listener together def test_StartAnalyzers(analyticsFrontend_client): LOGGER.info(' >>> test_StartAnalyzers START: <<< ') added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer()) LOGGER.debug(str(added_analyzer_id)) - LOGGER.info(' --> Calling StartResponseListener... ') - class_obj = AnalyticsFrontendServiceServicerImpl() - response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id.uuid) - LOGGER.debug(response) - LOGGER.info("waiting for timer to comlete ...") - time.sleep(3) + # LOGGER.info(' --> Calling StartResponseListener... ') + # class_obj = AnalyticsFrontendServiceServicerImpl() + # response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id.uuid) + # LOGGER.debug(response) + LOGGER.info("waiting for timer to complete ...") + time.sleep(15) LOGGER.info('--> StopAnalyzer') response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id) LOGGER.debug(str(response))