Commit bba35aa9 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Telemetry backend and frontend

- (Backend) Refore code to improved logging and error handling
- (Backend) Improved tests
- (Frontend) Refector code to improve Analyzer handling
- (Frontend) improve tests and messages format to handle new backend enhancements.
parent eaffff44
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -18,8 +18,10 @@ PROJECTDIR=`pwd`

cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc

export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"

python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \
    analytics/frontend/tests/test_frontend.py
+25 −20
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ import threading
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer
from confluent_kafka import KafkaError
from confluent_kafka import KafkaError, KafkaException
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from analytics.backend.service.Streamer import DaskStreamer
@@ -32,7 +32,7 @@ LOGGER = logging.getLogger(__name__)
class AnalyticsBackendService(GenericGrpcService):
    """
    AnalyticsBackendService class is responsible for handling the requests from the AnalyticsFrontendService.
    It listens to the Kafka topic for the requests and starts/stops the DaskStreamer accordingly.
    It listens to the Kafka topic for the requests to start and stop the Streamer accordingly.
    It also initializes the Kafka producer and Dask cluster for the streamer.
    """
    def __init__(self, cls_name : str = __name__, n_workers=1, threads_per_worker=1
@@ -62,34 +62,36 @@ class AnalyticsBackendService(GenericGrpcService):
        listener for requests on Kafka topic.
        """
        LOGGER.info("Request Listener is initiated ...")
        # print      ("Request Listener is initiated ...")
        consumer = self.request_consumer
        consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
        while True:
            receive_msg = consumer.poll(2.0)
            if receive_msg is None:
            message = consumer.poll(2.0)
            if message is None:
                continue
            elif receive_msg.error():
                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
            elif message.error():
                    if message.error().code() == KafkaError._PARTITION_EOF:
                        LOGGER.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}")
                        break
                    elif message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
                        LOGGER.error(f"Subscribed topic {message.topic()} does not exist. May be topic does not have any messages.")
                        continue
                    elif message.error():
                        raise KafkaException(message.error())
            try:
                analyzer      = json.loads(receive_msg.value().decode('utf-8'))
                analyzer_uuid = receive_msg.key().decode('utf-8')
                analyzer      = json.loads(message.value().decode('utf-8'))
                analyzer_uuid = message.key().decode('utf-8')
                LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))

                if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
                    if self.StopStreamer(analyzer_uuid):
                        LOGGER.info("Dask Streamer stopped.")
                    else:
                        LOGGER.error("Failed to stop Dask Streamer.")
                        LOGGER.warning("Failed to stop Dask Streamer. May be already terminated...")
                else:
                    if self.StartStreamer(analyzer_uuid, analyzer):
                        LOGGER.info("Dask Streamer started.")
                    else:
                        LOGGER.error("Failed to start Dask Streamer.")
                        LOGGER.warning("Failed to start Dask Streamer.")
            except Exception as e:
                LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))

@@ -117,14 +119,17 @@ class AnalyticsBackendService(GenericGrpcService):
            LOGGER.info(f"Streamer started with analyzer Id: {analyzer_uuid}")

            # Stop the streamer after the given duration
            if analyzer['duration'] > 0:
            duration = analyzer['duration']
            if duration > 0:
                def stop_after_duration():
                    time.sleep(analyzer['duration'])
                    LOGGER.warning(f"Execution duration ({analyzer['duration']}) completed of Analyzer: {analyzer_uuid}")
                    time.sleep(duration)
                    LOGGER.warning(f"Execution duration ({duration}) completed of Analyzer: {analyzer_uuid}")
                    if not self.StopStreamer(analyzer_uuid):
                        LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.")

                duration_thread = threading.Thread(target=stop_after_duration, daemon=True)
                duration_thread = threading.Thread(
                    target=stop_after_duration, daemon=True, name=f"stop_after_duration_{analyzer_uuid}"
                    )
                duration_thread.start()

            self.active_streamers[analyzer_uuid] = streamer
@@ -140,7 +145,7 @@ class AnalyticsBackendService(GenericGrpcService):
        try:
            if analyzer_uuid not in self.active_streamers:
                LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid))
                return False
                return True
            LOGGER.info(f"Terminating streamer with Analyzer Id: {analyzer_uuid}")
            streamer = self.active_streamers[analyzer_uuid]
            streamer.stop()
+3 −0
Original line number Diff line number Diff line
@@ -74,6 +74,9 @@ class DaskStreamer(threading.Thread):
                if message.error():
                    if message.error().code() == KafkaError._PARTITION_EOF:
                        logger.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}")
                    elif message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
                        logger.error(f"Subscribed topic {message.topic()} does not exist. May be topic does not have any messages.")
                        continue
                    elif message.error():
                        raise KafkaException(message.error())
                else:
+3 −0
Original line number Diff line number Diff line
@@ -34,6 +34,9 @@ def get_thresholds():
def get_duration():
    return 90

def get_batch_duration():
    return 30

def get_windows_size():
    return None

+34 −17
Original line number Diff line number Diff line
@@ -102,9 +102,10 @@ def analyzer_data():
        'input_kpis'         : get_input_kpi_list(),
        'output_kpis'        : get_output_kpi_list(),
        'thresholds'         : get_thresholds(),
        'batch_size' : get_batch_size(),
        'window_size': get_windows_size(),
        'duration'           : get_duration(),
        'batch_size_min'     : get_batch_size(),
        'window_size'        : get_windows_size(),
        'batch_duration_min' : get_duration(),
    }

def test_start_streamer(analytics_service, analyzer_data):
@@ -122,6 +123,7 @@ def test_stop_streamer(analytics_service, analyzer_data):
    assert analyzer_uuid in analytics_service.active_streamers

    # Stop streamer
    with patch('time.sleep', return_value=None):
        result = analytics_service.StopStreamer(analyzer_uuid)
    assert result is True
    assert analyzer_uuid not in analytics_service.active_streamers
@@ -246,7 +248,7 @@ def test_run_with_valid_consumer(dask_streamer):
        
        assert len(dask_streamer.batch) == 0  # Batch should be cleared after processing
        mock_task_handler_selector.assert_called_once()  # Task handler should be called once
        mock_poll.assert_any_call(timeout=2.0)  # Poll should have been called at least once
        mock_poll.assert_any_call(timeout=1.0)  # Poll should have been called at least once

# # add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py
def test_aggregation_handler():
@@ -282,20 +284,35 @@ def test_threshold_handler():
###########################
# This is a local machine test to check the integration of the backend service with the Streamer

# --- "test_validate_kafka_topics" should be run before the functionality tests ---
# @pytest.fixture(scope='session')
# def analyticBackend_service():
#     logger.info('Initializing AnalyticsBackendService...')

#     _service = AnalyticsBackendService()
#     _service.start()

#     logger.info('Yielding AnalyticsBackendService...')
#     yield _service

#     logger.info('Terminating AnalyticsBackendService...')
#     _service.stop()
#     logger.info('Terminated AnalyticsBackendService...')


# # --- "test_validate_kafka_topics" should be run before the functionality tests ---
# def test_validate_kafka_topics():
#     logger.debug(" >>> test_validate_kafka_topics: START <<< ")
#     response = KafkaTopic.create_all_topics()
#     assert isinstance(response, bool)

# def test_backend_integration_with_analyzer():
#     backendServiceObject = AnalyticsBackendService()
#     backendServiceObject.install_servicers()
# def test_backend_integration_with_frontend(analyticBackend_service: AnalyticsBackendService):
#     # backendServiceObject = AnalyticsBackendService()
#     # backendServiceObject.install_servicers()
#     logger.info(" waiting for 2 minutes for the backend service before termination  ... ")
#     time.sleep(150)
#     time.sleep(300)
#     logger.info(" Initiating stop collector ... ")
#     status = backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666")
#     backendServiceObject.close()
#     status = analyticBackend_service.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666")
#     analyticBackend_service.close()
#     assert isinstance(status, bool)
#     assert status == True
#     logger.info(" Backend service terminated successfully ... ")
Loading