Loading src/analytics/backend/Dockerfile +0 −9 Original line number Diff line number Diff line Loading @@ -53,15 +53,6 @@ RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto RUN rm *.proto RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; # Install Java (required for PySpark) RUN apt-get update && \ apt-get install -y default-jdk && \ apt-get clean # Set JAVA_HOME environment variable ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 ENV PATH=$JAVA_HOME/bin:$PATH # Create component sub-folders, get specific Python packages RUN mkdir -p /var/teraflow/analytics/backend WORKDIR /var/teraflow/analytics/backend Loading src/analytics/backend/requirements.in +2 −2 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. dask==2024.9.0 distributed==2024.9.0 dask==2024.1.0 distributed==2024.1.0 pandas==2.2.3 confluent-kafka==2.3.* src/analytics/backend/tests/messages.py +7 −7 Original line number Diff line number Diff line Loading @@ -81,20 +81,20 @@ def create_analyzer(): def create_analyzer_dask(): _create_analyzer = Analyzer() # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) # _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING _kpi_id = KpiId() # input IDs to analyze _kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" _create_analyzer.input_kpi_ids.append(_kpi_id) _kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.input_kpi_ids.append(_kpi_id) _kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.input_kpi_ids.append(_kpi_id) # output IDs after analysis _kpi_id.kpi_id.uuid = str(uuid.uuid4()) Loading @@ -104,7 +104,7 @@ def create_analyzer_dask(): # parameter _threshold_dict = { 'mean_PKS_TX' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),#} 'mean_latency' :(20, 30), 'min_latency' :(00, 10), 'max_latency' :(45, 50),#} 'first_value' :(00, 50), 'last_value' :(50, 100), 'std_value' :(0, 90)} _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) _create_analyzer.parameters['oper_list'] = json.dumps([key.split('_')[0] for key in _threshold_dict.keys()]) Loading src/analytics/backend/tests/test_backend.py +12 −12 Original line number Diff line number Diff line Loading @@ -119,20 +119,20 @@ LOGGER = logging.getLogger(__name__) # assert isinstance(response, bool) # --- To TEST StartRequestListenerFunctionality # def test_StartRequestListener(): # LOGGER.info('test_RunRequestListener') # AnalyticsBackendServiceObj = AnalyticsBackendService() # AnalyticsBackendServiceObj.stop_event = Event() # listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) # listener_thread.start() def test_StartRequestListener(): LOGGER.info('test_RunRequestListener') AnalyticsBackendServiceObj = AnalyticsBackendService() AnalyticsBackendServiceObj.stop_event = Event() listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) listener_thread.start() # time.sleep(100) time.sleep(100) # # AnalyticsBackendServiceObj.stop_event.set() # # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') # # listener_thread.join(timeout=10) # # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." # LOGGER.info('Completed test_RunRequestListener') # AnalyticsBackendServiceObj.stop_event.set() # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') # listener_thread.join(timeout=10) # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." LOGGER.info('Completed test_RunRequestListener') # To test START and STOP communication together # def test_StopRequestListener(): Loading Loading
src/analytics/backend/Dockerfile +0 −9 Original line number Diff line number Diff line Loading @@ -53,15 +53,6 @@ RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto RUN rm *.proto RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; # Install Java (required for PySpark) RUN apt-get update && \ apt-get install -y default-jdk && \ apt-get clean # Set JAVA_HOME environment variable ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 ENV PATH=$JAVA_HOME/bin:$PATH # Create component sub-folders, get specific Python packages RUN mkdir -p /var/teraflow/analytics/backend WORKDIR /var/teraflow/analytics/backend Loading
src/analytics/backend/requirements.in +2 −2 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. dask==2024.9.0 distributed==2024.9.0 dask==2024.1.0 distributed==2024.1.0 pandas==2.2.3 confluent-kafka==2.3.*
src/analytics/backend/tests/messages.py +7 −7 Original line number Diff line number Diff line Loading @@ -81,20 +81,20 @@ def create_analyzer(): def create_analyzer_dask(): _create_analyzer = Analyzer() # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) # _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING _kpi_id = KpiId() # input IDs to analyze _kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" _create_analyzer.input_kpi_ids.append(_kpi_id) _kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.input_kpi_ids.append(_kpi_id) _kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.input_kpi_ids.append(_kpi_id) # output IDs after analysis _kpi_id.kpi_id.uuid = str(uuid.uuid4()) Loading @@ -104,7 +104,7 @@ def create_analyzer_dask(): # parameter _threshold_dict = { 'mean_PKS_TX' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),#} 'mean_latency' :(20, 30), 'min_latency' :(00, 10), 'max_latency' :(45, 50),#} 'first_value' :(00, 50), 'last_value' :(50, 100), 'std_value' :(0, 90)} _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) _create_analyzer.parameters['oper_list'] = json.dumps([key.split('_')[0] for key in _threshold_dict.keys()]) Loading
src/analytics/backend/tests/test_backend.py +12 −12 Original line number Diff line number Diff line Loading @@ -119,20 +119,20 @@ LOGGER = logging.getLogger(__name__) # assert isinstance(response, bool) # --- To TEST StartRequestListenerFunctionality # def test_StartRequestListener(): # LOGGER.info('test_RunRequestListener') # AnalyticsBackendServiceObj = AnalyticsBackendService() # AnalyticsBackendServiceObj.stop_event = Event() # listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) # listener_thread.start() def test_StartRequestListener(): LOGGER.info('test_RunRequestListener') AnalyticsBackendServiceObj = AnalyticsBackendService() AnalyticsBackendServiceObj.stop_event = Event() listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) listener_thread.start() # time.sleep(100) time.sleep(100) # # AnalyticsBackendServiceObj.stop_event.set() # # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') # # listener_thread.join(timeout=10) # # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." # LOGGER.info('Completed test_RunRequestListener') # AnalyticsBackendServiceObj.stop_event.set() # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') # listener_thread.join(timeout=10) # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." LOGGER.info('Completed test_RunRequestListener') # To test START and STOP communication together # def test_StopRequestListener(): Loading