Loading src/analytics/backend/service/Streamer.py +0 −2 Original line number Diff line number Diff line Loading @@ -143,8 +143,6 @@ class DaskStreamer(threading.Thread): future.add_done_callback( lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value) ) # result = aggregation_handler(self.key, self.batch, self.input_kpis, self.output_kpis, self.thresholds) # self.produce_result(result, KafkaTopic.ALARMS.value) except Exception as e: logger.error( f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") Loading src/analytics/backend/tests/test_backend.py +4 −2 Original line number Diff line number Diff line Loading @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import time import json import pytest import logging Loading Loading @@ -193,6 +192,7 @@ def test_produce_result(dask_streamer): result = [{"kpi_id": "kpi1", "value": 100}] with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.delivery_report', return_value=None) as mock_delivery_report, \ patch.object(dask_streamer.producer, 'produce') as mock_produce: dask_streamer.output_kpis = ['kpi1'] dask_streamer.produce_result(result, "test_topic") mock_produce.assert_called_once_with( "test_topic", Loading Loading @@ -220,6 +220,8 @@ def test_run_with_valid_consumer(dask_streamer): with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \ patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector: dask_streamer.input_kpis = ['kpi1', 'kpi2'] # Simulate valid messages without errors mock_message_1 = MagicMock() mock_message_1.value.return_value = b'{"kpi_id": "kpi1", "value": 100}' Loading Loading
src/analytics/backend/service/Streamer.py +0 −2 Original line number Diff line number Diff line Loading @@ -143,8 +143,6 @@ class DaskStreamer(threading.Thread): future.add_done_callback( lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value) ) # result = aggregation_handler(self.key, self.batch, self.input_kpis, self.output_kpis, self.thresholds) # self.produce_result(result, KafkaTopic.ALARMS.value) except Exception as e: logger.error( f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") Loading
src/analytics/backend/tests/test_backend.py +4 −2 Original line number Diff line number Diff line Loading @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import time import json import pytest import logging Loading Loading @@ -193,6 +192,7 @@ def test_produce_result(dask_streamer): result = [{"kpi_id": "kpi1", "value": 100}] with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.delivery_report', return_value=None) as mock_delivery_report, \ patch.object(dask_streamer.producer, 'produce') as mock_produce: dask_streamer.output_kpis = ['kpi1'] dask_streamer.produce_result(result, "test_topic") mock_produce.assert_called_once_with( "test_topic", Loading Loading @@ -220,6 +220,8 @@ def test_run_with_valid_consumer(dask_streamer): with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \ patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector: dask_streamer.input_kpis = ['kpi1', 'kpi2'] # Simulate valid messages without errors mock_message_1 = MagicMock() mock_message_1.value.return_value = b'{"kpi_id": "kpi1", "value": 100}' Loading