diff --git a/src/analytics/backend/Dockerfile b/src/analytics/backend/Dockerfile index ef49657cdc270153b5de416cf46bef35bf2c04e6..17adcd3ab1df5704cc7ef0c5a19b3cfb1539ee22 100644 --- a/src/analytics/backend/Dockerfile +++ b/src/analytics/backend/Dockerfile @@ -53,15 +53,6 @@ RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto RUN rm *.proto RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; -# Install Java (required for PySpark) -RUN apt-get update && \ - apt-get install -y default-jdk && \ - apt-get clean - -# Set JAVA_HOME environment variable -ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 -ENV PATH=$JAVA_HOME/bin:$PATH - # Create component sub-folders, get specific Python packages RUN mkdir -p /var/teraflow/analytics/backend WORKDIR /var/teraflow/analytics/backend diff --git a/src/analytics/backend/requirements.in b/src/analytics/backend/requirements.in index 04ab95c2df65c09e67ce7386ab2277da884b39f3..360d94f4668b19feba305df76a65ef70b26e091f 100644 --- a/src/analytics/backend/requirements.in +++ b/src/analytics/backend/requirements.in @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -dask==2024.9.0 -distributed==2024.9.0 +dask==2024.1.0 +distributed==2024.1.0 pandas==2.2.3 confluent-kafka==2.3.* diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py index a1d4d0629ab7c8729fdfaf9a9253cb5f698186d7..cdc6c34428a72e5fcf90db3c5656a33c2bb29008 100644 --- a/src/analytics/backend/tests/messages.py +++ b/src/analytics/backend/tests/messages.py @@ -81,20 +81,20 @@ def create_analyzer(): def create_analyzer_dask(): _create_analyzer = Analyzer() - # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) - _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + # _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING _kpi_id = KpiId() # input IDs to analyze - _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" _create_analyzer.input_kpi_ids.append(_kpi_id) - _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.input_kpi_ids.append(_kpi_id) - _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.input_kpi_ids.append(_kpi_id) # output IDs after analysis _kpi_id.kpi_id.uuid = str(uuid.uuid4()) @@ -104,8 +104,8 @@ def create_analyzer_dask(): # parameter _threshold_dict = { - 'mean_PKS_TX' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),#} - 'first_value' :(00, 50), 'last_value' :(50, 100), 'std_value' :(0, 90)} + 'mean_latency' :(20, 30), 'min_latency' :(00, 10), 'max_latency' :(45, 50),#} + 'first_value' :(00, 50), 'last_value' :(50, 100), 'std_value' :(0, 90)} _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) _create_analyzer.parameters['oper_list'] = json.dumps([key.split('_')[0] for key in _threshold_dict.keys()]) _create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 48ce867478b6f40b6942ee8df13deef2117b5d55..470729160c75fd7491e58191f534db9f4da61806 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -119,20 +119,20 @@ LOGGER = logging.getLogger(__name__) # assert isinstance(response, bool) # --- To TEST StartRequestListenerFunctionality -# def test_StartRequestListener(): -# LOGGER.info('test_RunRequestListener') -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# AnalyticsBackendServiceObj.stop_event = Event() -# listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) -# listener_thread.start() +def test_StartRequestListener(): + LOGGER.info('test_RunRequestListener') + AnalyticsBackendServiceObj = AnalyticsBackendService() + AnalyticsBackendServiceObj.stop_event = Event() + listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) + listener_thread.start() -# time.sleep(100) + time.sleep(100) -# # AnalyticsBackendServiceObj.stop_event.set() -# # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') -# # listener_thread.join(timeout=10) -# # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." -# LOGGER.info('Completed test_RunRequestListener') + # AnalyticsBackendServiceObj.stop_event.set() + # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') + # listener_thread.join(timeout=10) + # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." + LOGGER.info('Completed test_RunRequestListener') # To test START and STOP communication together # def test_StopRequestListener():