diff --git a/my_deploy.sh b/my_deploy.sh index ee17cb3679f6390115917a005cfaf670585c28c7..e5f2ff904e74412a9a22364c894f3cd691c38f4a 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -26,7 +26,7 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene #export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring" # Uncomment to activate Monitoring Framework (new) -#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api" +#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics" # Uncomment to activate BGP-LS Speaker #export TFS_COMPONENTS="${TFS_COMPONENTS} bgpls_speaker" diff --git a/proto/kpi_value_api.proto b/proto/kpi_value_api.proto index 4d3a1f216406841344d40712ea04ec82cedf04d0..a97b0ae2b023dfe0a535aa3cb1ba63b00b418371 100644 --- a/proto/kpi_value_api.proto +++ b/proto/kpi_value_api.proto @@ -19,19 +19,19 @@ import "context.proto"; import "kpi_manager.proto"; service KpiValueAPIService { - rpc StoreKpiValues (KpiValueList) returns (context.Empty) {} - rpc SelectKpiValues (KpiValueFilter) returns (KpiValueList) {} + rpc StoreKpiValues (KpiValueList ) returns (context.Empty ) {} + rpc SelectKpiValues (KpiValueFilter ) returns (KpiValueList ) {} rpc GetKpiAlarms (kpi_manager.KpiId) returns (stream KpiAlarms) {} } message KpiValue { - kpi_manager.KpiId kpi_id = 1; - context.Timestamp timestamp = 2; - KpiValueType kpi_value_type = 3; + kpi_manager.KpiId kpi_id = 1; + context.Timestamp timestamp = 2; + KpiValueType kpi_value_type = 3; } message KpiValueList { - repeated KpiValue kpi_value_list = 1; + repeated KpiValue kpi_value_list = 1; } message KpiValueType { @@ -47,9 +47,9 @@ message KpiValueType { } message KpiValueFilter { - repeated kpi_manager.KpiId kpi_id = 1; - repeated context.Timestamp start_timestamp = 2; - repeated context.Timestamp end_timestamp = 3; + repeated kpi_manager.KpiId kpi_id = 1; + repeated context.Timestamp start_timestamp = 2; + repeated context.Timestamp end_timestamp = 3; } message KpiAlarms { diff --git a/scripts/run_tests_locally-analytics-backend.sh b/scripts/run_tests_locally-analytics-backend.sh new file mode 100755 index 0000000000000000000000000000000000000000..722779824a8c91b5d7cff0337c10c4ea1327a1b6 --- /dev/null +++ b/scripts/run_tests_locally-analytics-backend.sh @@ -0,0 +1,25 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +RCFILE=$PROJECTDIR/coverage/.coveragerc +export KFK_SERVER_ADDRESS='127.0.0.1:9092' +CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" +python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ + analytics/backend/tests/test_backend.py diff --git a/scripts/run_tests_locally-analytics-frontend.sh b/scripts/run_tests_locally-analytics-frontend.sh index e30d30da623b2d0eee3d925d69a846b4b1f516a3..e74eb4ec198d77688f62004931c69eac31e60f0c 100755 --- a/scripts/run_tests_locally-analytics-frontend.sh +++ b/scripts/run_tests_locally-analytics-frontend.sh @@ -18,6 +18,7 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc +export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ diff --git a/scripts/run_tests_locally-kpi-value-API.sh b/scripts/run_tests_locally-kpi-value-API.sh index 3953d2a89c6fbe2bd3546e648246b9b018e5fdb0..96ac558bad5f0bf6bc6f5ee90a26cd11fda69273 100755 --- a/scripts/run_tests_locally-kpi-value-API.sh +++ b/scripts/run_tests_locally-kpi-value-API.sh @@ -19,8 +19,9 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc -KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") -KFK_SERVER_ADDRESS=${KAFKA_IP}:9092 +# KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") +# export KFK_SERVER_ADDRESS=${KAFKA_IP}:9092 +export KFK_SERVER_ADDRESS='127.0.0.1:9092' # helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0 python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG -o log_cli=true --verbose \ kpi_value_api/tests/test_kpi_value_api.py diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh index 97a06a0d6c16daf94e3e6b30bfc70eca3e7ce3a3..745d77c62849fb946e37e0d3177c94e162bdc6af 100755 --- a/scripts/run_tests_locally-telemetry-backend.sh +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -19,9 +19,14 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src # RCFILE=$PROJECTDIR/coverage/.coveragerc # coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ +# kpi_manager/tests/test_unitary.py +# python3 kpi_manager/tests/test_unitary.py +export KFK_SERVER_ADDRESS='127.0.0.1:9092' +CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \ - telemetry/backend/tests/test_TelemetryBackend.py + telemetry/backend/tests/test_backend.py diff --git a/scripts/run_tests_locally-telemetry-frontend.sh b/scripts/run_tests_locally-telemetry-frontend.sh index 7506be5e0750b44e37368e86dbbfd00131c0d270..a6447cb4c6bfaa6d80fefac8417df28a960b1943 100755 --- a/scripts/run_tests_locally-telemetry-frontend.sh +++ b/scripts/run_tests_locally-telemetry-frontend.sh @@ -18,9 +18,10 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src -CRDB_SQL_ADDRESS=$(kubectl get service --namespace ${CRDB_NAMESPACE} cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +# python3 kpi_manager/tests/test_unitary.py +export KFK_SERVER_ADDRESS='127.0.0.1:9092' +CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" - RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ telemetry/frontend/tests/test_frontend.py diff --git a/scripts/show_logs_telemetry-backend.sh b/scripts/show_logs_telemetry-backend.sh new file mode 100755 index 0000000000000000000000000000000000000000..c28083dcbf5c7056145d1a0696116da66b5e9828 --- /dev/null +++ b/scripts/show_logs_telemetry-backend.sh @@ -0,0 +1,27 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +######################################################################################################################## +# Define your deployment settings here +######################################################################################################################## + +# If not already set, set the name of the Kubernetes namespace to deploy to. +export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} + +######################################################################################################################## +# Automated steps start here +######################################################################################################################## + +kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/telemetryservice -c backend diff --git a/scripts/show_logs_telemetry-frontend.sh b/scripts/show_logs_telemetry-frontend.sh new file mode 100755 index 0000000000000000000000000000000000000000..821dc275b22ebf7ffc63d2e8c41dfab684407895 --- /dev/null +++ b/scripts/show_logs_telemetry-frontend.sh @@ -0,0 +1,27 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +######################################################################################################################## +# Define your deployment settings here +######################################################################################################################## + +# If not already set, set the name of the Kubernetes namespace to deploy to. +export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} + +######################################################################################################################## +# Automated steps start here +######################################################################################################################## + +kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/telemetryservice -c frontend diff --git a/src/analytics/backend/Dockerfile b/src/analytics/backend/Dockerfile index df5cd7fbde6dc45780fdb3a333594ab11c1ab146..17adcd3ab1df5704cc7ef0c5a19b3cfb1539ee22 100644 --- a/src/analytics/backend/Dockerfile +++ b/src/analytics/backend/Dockerfile @@ -53,15 +53,6 @@ RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto RUN rm *.proto RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; -# Install Java (required for PySpark) -RUN apt-get update && \ - apt-get install -y default-jdk && \ - apt-get clean - -# Set JAVA_HOME environment variable -ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 -ENV PATH=$JAVA_HOME/bin:$PATH - # Create component sub-folders, get specific Python packages RUN mkdir -p /var/teraflow/analytics/backend WORKDIR /var/teraflow/analytics/backend diff --git a/src/analytics/backend/requirements.in b/src/analytics/backend/requirements.in index 9df678fe819f33d479b8f5090ca9ac4eb1f4047c..360d94f4668b19feba305df76a65ef70b26e091f 100644 --- a/src/analytics/backend/requirements.in +++ b/src/analytics/backend/requirements.in @@ -12,5 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -pyspark==3.5.2 +dask==2024.1.0 +distributed==2024.1.0 +pandas==2.2.3 confluent-kafka==2.3.* diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 658d237956b4ed3addbbc295ef0d19dd4b977257..a8c790e786f2768091ee3ab8d8d61860f18c2f77 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -12,18 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. - +import time import json import logging import threading from common.tools.service.GenericGrpcService import GenericGrpcService -from analytics.backend.service.SparkStreaming import SparkStreamer from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import KafkaError from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc - +from threading import Thread, Event +from .DaskStreaming import DaskStreamer LOGGER = logging.getLogger(__name__) @@ -40,44 +40,6 @@ class AnalyticsBackendService(GenericGrpcService): 'group.id' : 'analytics-frontend', 'auto.offset.reset' : 'latest'}) - def StartSparkStreamer(self, analyzer_uuid, analyzer): - kpi_list = analyzer['input_kpis'] - oper_list = [s.split('_', 1)[0] for s in list(analyzer["thresholds"].keys())] # TODO: update this line... - thresholds = analyzer['thresholds'] - window_size = analyzer['window_size'] - window_slider = analyzer['window_slider'] - print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( - kpi_list, oper_list, thresholds, window_size, window_slider)) - LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( - kpi_list, oper_list, thresholds, window_size, window_slider)) - try: - stop_event = threading.Event() - thread = threading.Thread(target=SparkStreamer, - args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event, - window_size, window_slider, None )) - self.running_threads[analyzer_uuid] = (thread, stop_event) - thread.start() - print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) - LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) - return True - except Exception as e: - print ("Failed to initiate Analyzer backend: {:}".format(e)) - LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) - return False - - def StopRequestListener(self, threadInfo: tuple): - try: - thread, stop_event = threadInfo - stop_event.set() - thread.join() - print ("Terminating Analytics backend RequestListener") - LOGGER.info("Terminating Analytics backend RequestListener") - return True - except Exception as e: - print ("Failed to terminate analytics backend {:}".format(e)) - LOGGER.error("Failed to terminate analytics backend {:}".format(e)) - return False - def install_servicers(self): threading.Thread(target=self.RequestListener, args=()).start() @@ -86,6 +48,7 @@ class AnalyticsBackendService(GenericGrpcService): listener for requests on Kafka topic. """ LOGGER.info("Request Listener is initiated ...") + # print ("Request Listener is initiated ...") consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) while True: @@ -96,34 +59,66 @@ class AnalyticsBackendService(GenericGrpcService): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: - print("Consumer error: {}".format(receive_msg.error())) + LOGGER.error("Consumer error: {:}".format(receive_msg.error())) + # print ("Consumer error: {:}".format(receive_msg.error())) break - analyzer = json.loads(receive_msg.value().decode('utf-8')) - analyzer_uuid = receive_msg.key().decode('utf-8') - LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) - print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + try: + analyzer = json.loads(receive_msg.value().decode('utf-8')) + analyzer_uuid = receive_msg.key().decode('utf-8') + LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + # print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) - if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: - self.TerminateAnalyzerBackend(analyzer_uuid) - else: - self.StartSparkStreamer(analyzer_uuid, analyzer) - LOGGER.debug("Stop Event activated. Terminating...") - print ("Stop Event activated. Terminating...") + if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: + self.StopDaskListener(analyzer_uuid) + else: + self.StartDaskListener(analyzer_uuid, analyzer) + except Exception as e: + LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) + # print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) + + def StartDaskListener(self, analyzer_uuid, analyzer): + kpi_list = analyzer[ 'input_kpis' ] + thresholds = analyzer[ 'thresholds' ] + window_size = analyzer[ 'window_size' ] + window_slider = analyzer[ 'window_slider'] + + LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format( + kpi_list, thresholds, window_size, window_slider)) + # print ("Received parameters: {:} - {:} - {:} - {:}".format( + # kpi_list, thresholds, window_size, window_slider)) + try: + stop_event = Event() + thread = Thread( + target=DaskStreamer, + # args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event), + args=(analyzer_uuid, kpi_list, thresholds, stop_event), + kwargs={ + "window_size" : window_size, + } + ) + thread.start() + self.running_threads[analyzer_uuid] = (thread, stop_event) + # print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) + LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) + return True + except Exception as e: + # print ("Failed to initiate Analyzer backend: {:}".format(e)) + LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) + return False - def TerminateAnalyzerBackend(self, analyzer_uuid): + def StopDaskListener(self, analyzer_uuid): if analyzer_uuid in self.running_threads: try: thread, stop_event = self.running_threads[analyzer_uuid] stop_event.set() thread.join() del self.running_threads[analyzer_uuid] - print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) + # print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) return True except Exception as e: LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e)) return False else: - print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) - LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) - # generate confirmation towards frontend + # print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) + LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) diff --git a/src/analytics/backend/service/DaskStreaming.py b/src/analytics/backend/service/DaskStreaming.py new file mode 100644 index 0000000000000000000000000000000000000000..f09da9949fd0f745f80f782273024f9175db820c --- /dev/null +++ b/src/analytics/backend/service/DaskStreaming.py @@ -0,0 +1,233 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +import json +from confluent_kafka import Consumer, Producer, KafkaException, KafkaError +import pandas as pd +from dask.distributed import Client, LocalCluster +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic + +logging.basicConfig(level=logging.INFO) +LOGGER = logging.getLogger(__name__) + +def SettingKafkaConsumerParams(): + return {'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'analytics-backend', + 'auto.offset.reset' : 'latest'} + +def GetAggregationMappings(thresholds): + agg_dict = {} + for threshold_key in thresholds.keys(): + parts = threshold_key.split('_', 1) + if len(parts) != 2: + LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '_' format. Skipping.") + continue + aggregation, metric_name = parts + # Ensure that the aggregation function is valid in pandas + if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']: + LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.") + continue + agg_dict[threshold_key] = ('kpi_value', aggregation) + return agg_dict + +def ApplyThresholds(aggregated_df, thresholds): + """ + Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary + on the aggregated DataFrame. + Args: aggregated_df (pd.DataFrame): DataFrame with aggregated metrics. + thresholds (dict): Thresholds dictionary with keys in the format '_'. + Returns: pd.DataFrame: DataFrame with additional threshold columns. + """ + for threshold_key, threshold_values in thresholds.items(): + if threshold_key not in aggregated_df.columns: + LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.") + continue + if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2: + fail_th, raise_th = threshold_values + aggregated_df[f"{threshold_key}_THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th + aggregated_df[f"{threshold_key}_THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th + else: + LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.") + return aggregated_df + +def initialize_dask_client(): + """ + Initialize a local Dask cluster and client. + """ + cluster = LocalCluster(n_workers=2, threads_per_worker=2) + client = Client(cluster) + LOGGER.info(f"Dask Client Initialized: {client}") + return client, cluster + +def initialize_kafka_producer(): + return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) + +def delivery_report(err, msg): + if err is not None: + LOGGER.error(f"Message delivery failed: {err}") + else: + LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") + +def process_batch(batch, agg_mappings, thresholds): + """ + Process a batch of data and apply thresholds. + Args: batch (list of dict): List of messages from Kafka. + agg_mappings (dict): Mapping from threshold key to aggregation function. + thresholds (dict): Thresholds dictionary. + Returns: list of dict: Processed records ready to be sent to Kafka. + """ + if not batch: + LOGGER.info("Empty batch received. Skipping processing.") + return [] + + df = pd.DataFrame(batch) + df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce') + df.dropna(subset=['time_stamp'], inplace=True) + required_columns = {'time_stamp', 'kpi_id', 'kpi_value'} + if not required_columns.issubset(df.columns): + LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.") + return [] + if df.empty: + LOGGER.info("No data after filtering by KPI IDs. Skipping processing.") + return [] + + # Perform aggregations using named aggregation + try: + agg_dict = {key: value for key, value in agg_mappings.items()} + df_agg = df.groupby(['window_start', 'kpi_id']).agg(**agg_dict).reset_index() + except Exception as e: + LOGGER.error(f"Aggregation error: {e}") + return [] + + # Apply thresholds + df_thresholded = ApplyThresholds(df_agg, thresholds) + df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ') + # Convert aggregated DataFrame to list of dicts + result = df_thresholded.to_dict(orient='records') + LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.") + + return result + +def produce_result(result, producer, destination_topic): + for record in result: + try: + producer.produce( + destination_topic, + key=str(record.get('kpi_id', '')), + value=json.dumps(record), + callback=delivery_report + ) + except KafkaException as e: + LOGGER.error(f"Failed to produce message: {e}") + producer.flush() + LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.") + +def DaskStreamer(key, kpi_list, thresholds, stop_event, + window_size="30s", time_stamp_col="time_stamp"): + client, cluster = initialize_dask_client() + consumer_conf = SettingKafkaConsumerParams() + consumer = Consumer(consumer_conf) + consumer.subscribe([KafkaTopic.VALUE.value]) + producer = initialize_kafka_producer() + + # Parse window_size to seconds + try: + window_size_td = pd.to_timedelta(window_size) + window_size_seconds = window_size_td.total_seconds() + except Exception as e: + LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}") + window_size_seconds = 30 + LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.") + + # Extract aggregation mappings from thresholds + agg_mappings = GetAggregationMappings(thresholds) + if not agg_mappings: + LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.") + consumer.close() + producer.flush() + client.close() + cluster.close() + return + try: + batch = [] + last_batch_time = time.time() + LOGGER.info("Starting to consume messages...") + + while not stop_event.is_set(): + msg = consumer.poll(1.0) + + if msg is None: + current_time = time.time() + if (current_time - last_batch_time) >= window_size_seconds and batch: + LOGGER.info("Time-based batch threshold reached. Processing batch.") + future = client.submit(process_batch, batch, agg_mappings, thresholds) + future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) + batch = [] + last_batch_time = current_time + continue + + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") + else: + LOGGER.error(f"Kafka error: {msg.error()}") + continue + + try: + message_value = json.loads(msg.value().decode('utf-8')) + except json.JSONDecodeError as e: + LOGGER.error(f"JSON decode error: {e}") + continue + + try: + message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce') + if pd.isna(message_timestamp): + LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.") + continue + window_start = message_timestamp.floor(window_size) + message_value['window_start'] = window_start + except Exception as e: + LOGGER.error(f"Error processing timestamp: {e}. Skipping message.") + continue + + if message_value['kpi_id'] not in kpi_list: + LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.") + continue + + batch.append(message_value) + + current_time = time.time() + if (current_time - last_batch_time) >= window_size_seconds and batch: + LOGGER.info("Time-based batch threshold reached. Processing batch.") + future = client.submit(process_batch, batch, agg_mappings, thresholds) + future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) + batch = [] + last_batch_time = current_time + + except Exception as e: + LOGGER.exception(f"Error in Dask streaming process: {e}") + finally: + # Process any remaining messages in the batch + if batch: + LOGGER.info("Processing remaining messages in the batch.") + future = client.submit(process_batch, batch, agg_mappings, thresholds) + future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) + consumer.close() + producer.flush() + LOGGER.info("Kafka consumer and producer closed.") + client.close() + cluster.close() + LOGGER.info("Dask client and cluster closed.") diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py index c3b78967efe13eef9a60e19e50e56bdfca4a410d..cdc6c34428a72e5fcf90db3c5656a33c2bb29008 100644 --- a/src/analytics/backend/tests/messages.py +++ b/src/analytics/backend/tests/messages.py @@ -16,7 +16,7 @@ import uuid import json from common.proto.kpi_manager_pb2 import KpiId from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, - Analyzer ) + Analyzer, AnalyzerId ) def get_kpi_id_list(): return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"] @@ -38,18 +38,25 @@ def get_threshold_dict(): op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict } +def create_analyzer_id(): + _create_analyzer_id = AnalyzerId() + # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + # _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + return _create_analyzer_id + def create_analyzer(): _create_analyzer = Analyzer() # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) - _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING _kpi_id = KpiId() # input IDs to analyze _kpi_id.kpi_id.uuid = str(uuid.uuid4()) - _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" _create_analyzer.input_kpi_ids.append(_kpi_id) _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" @@ -70,4 +77,38 @@ def create_analyzer(): _create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet - return _create_analyzer \ No newline at end of file + return _create_analyzer + +def create_analyzer_dask(): + _create_analyzer = Analyzer() + _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + # _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" + _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING + + _kpi_id = KpiId() + # input IDs to analyze + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _create_analyzer.input_kpi_ids.append(_kpi_id) + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.input_kpi_ids.append(_kpi_id) + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.input_kpi_ids.append(_kpi_id) + # output IDs after analysis + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + # parameter + + _threshold_dict = { + 'mean_latency' :(20, 30), 'min_latency' :(00, 10), 'max_latency' :(45, 50),#} + 'first_value' :(00, 50), 'last_value' :(50, 100), 'std_value' :(0, 90)} + _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) + _create_analyzer.parameters['oper_list'] = json.dumps([key.split('_')[0] for key in _threshold_dict.keys()]) + _create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" + _create_analyzer.parameters['window_slider'] = "5s" # should be less than window size + _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet + return _create_analyzer diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index bc0f47eb213f705a71f46de6a45d38b1c6f37e96..86de220a21b4c2c1c38d518c01ae13f33ee200d5 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -19,7 +19,9 @@ from threading import Event, Thread from common.tools.kafka.Variables import KafkaTopic from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict -from .messages import create_analyzer +from .messages import create_analyzer, create_analyzer_dask +from threading import Thread, Event +from ..service.DaskStreaming import DaskStreamer LOGGER = logging.getLogger(__name__) @@ -34,52 +36,103 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) -def test_StartSparkStreamer(): - LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") - analyzer_obj = create_analyzer() - analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid - analyzer_to_generate : Dict = { - "algo_name" : analyzer_obj.algorithm_name, - "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], - "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], - "oper_mode" : analyzer_obj.operation_mode, - "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), - "window_size" : analyzer_obj.parameters["window_size"], - "window_slider" : analyzer_obj.parameters["window_slider"], - # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] - } - AnalyticsBackendServiceObj = AnalyticsBackendService() - response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) - assert isinstance(response, bool) -# def test_StartRequestListener(): -# LOGGER.info('test_RunRequestListener') +# --- To test Dask Streamer functionality --- +# def test_StartDaskStreamer(): # Directly from the Streamer class +# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") +# stop_event = Event() +# kpi_list = ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3'] +# oper_list = ['avg', 'min', 'max',] +# thresholds = { +# 'avg_value': (10.0, 90.0), +# 'min_value': (5.0, 95.0), +# 'max_value': (15.0, 85.0), +# 'latency' : (2.0, 10.0) +# } + +# # Start the DaskStreamer in a separate thread +# streamer_thread = Thread( +# target=DaskStreamer, +# args=("analytics_stream", kpi_list, oper_list, thresholds, stop_event), +# kwargs={ +# "window_size": "60s", +# "win_slide_duration": "30s", +# "time_stamp_col": "time_stamp" +# } +# ) +# streamer_thread.start() +# try: +# while True: +# time.sleep(10) +# except KeyboardInterrupt: +# LOGGER.info("KeyboardInterrupt received. Stopping streamer...") +# stop_event.set() +# streamer_thread.join() +# LOGGER.info("Streamer stopped gracefully.") + +# --- To test Start Streamer functionality --- +# def test_StartDaskStreamer(): +# LOGGER.debug(" >>> test_StartBaskStreamer: START <<< ") +# analyzer_obj = create_analyzer_dask() +# # LOGGER.info("Created Analyzer Object: {:}".format(analyzer_obj)) +# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid +# analyzer_to_generate : Dict = { +# "algo_name" : analyzer_obj.algorithm_name, +# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], +# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], +# "oper_mode" : analyzer_obj.operation_mode, +# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), +# "oper_list" : json.loads(analyzer_obj.parameters["oper_list"]), +# # "oper_list" : analyzer_obj.parameters["oper_list"], +# "window_size" : analyzer_obj.parameters["window_size"], +# "window_slider" : analyzer_obj.parameters["window_slider"], +# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] +# } # AnalyticsBackendServiceObj = AnalyticsBackendService() -# response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) +# LOGGER.info("Analyzer to be generated: {:}".format((analyzer_to_generate))) +# response = AnalyticsBackendServiceObj.StartDaskListener(analyzer_uuid, analyzer_to_generate) +# assert isinstance(response, bool) +# time.sleep(100) +# LOGGER.info('Initiating StopRequestListener...') +# # AnalyticsBackendServiceObj = AnalyticsBackendService() +# response = AnalyticsBackendServiceObj.StopDaskListener(analyzer_uuid) # LOGGER.debug(str(response)) -# assert isinstance(response, tuple) +# assert isinstance(response, bool) -# To test START and STOP communication together -def test_StopRequestListener(): - LOGGER.info('test_RunRequestListener') - LOGGER.info('Initiating StartRequestListener...') - AnalyticsBackendServiceObj = AnalyticsBackendService() - response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) - # LOGGER.debug(str(response_thread)) - time.sleep(10) - LOGGER.info('Initiating StopRequestListener...') - AnalyticsBackendServiceObj = AnalyticsBackendService() - AnalyticsBackendServiceObj.stop_event = Event() - listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) - listener_thread.start() +# --- To test Start Streamer functionality --- +# def test_StartSparkStreamer(): +# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") +# analyzer_obj = create_analyzer() +# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid +# analyzer_to_generate : Dict = { +# "algo_name" : analyzer_obj.algorithm_name, +# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], +# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], +# "oper_mode" : analyzer_obj.operation_mode, +# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), +# "window_size" : analyzer_obj.parameters["window_size"], +# "window_slider" : analyzer_obj.parameters["window_slider"], +# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] +# } +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) +# assert isinstance(response, bool) + +# --- To TEST StartRequestListenerFunctionality +# def test_StartRequestListener(): +# LOGGER.info('test_RunRequestListener') +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# AnalyticsBackendServiceObj.stop_event = Event() +# listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) +# listener_thread.start() - time.sleep(2000) +# time.sleep(100) # AnalyticsBackendServiceObj.stop_event.set() # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') # listener_thread.join(timeout=10) # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." - LOGGER.info('Completed test_RunRequestListener') + # LOGGER.info('Completed test_RunRequestListener') # To test START and STOP communication together # def test_StopRequestListener(): diff --git a/src/analytics/frontend/requirements.in b/src/analytics/frontend/requirements.in index d81b9ddbeafeff94c830d48ca5594e775b9ce240..0b1ec921b8bb77c0d26e8240585a19ef165f0eec 100644 --- a/src/analytics/frontend/requirements.in +++ b/src/analytics/frontend/requirements.in @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -apscheduler==3.10.4 +apscheduler==3.10.1 confluent-kafka==2.3.* psycopg2-binary==2.9.* SQLAlchemy==1.4.* diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index 9ffacecc30fac40bb4899b8889386bc23a7609ac..323113bb0d8234f41961d05a049986296167b96b 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, grpc, json +import logging, grpc, json, queue from typing import Dict from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import KafkaError from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.context_pb2 import Empty @@ -24,6 +25,8 @@ from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, Analy from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer from analytics.database.Analyzer_DB import AnalyzerDB from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger LOGGER = logging.getLogger(__name__) @@ -94,10 +97,8 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...") try: while True: - LOGGER.info("entering while...") - key, value = self.result_queue.get() # Wait until a result is available - LOGGER.info("In while true ...") - yield key, value # Yield the result to the calling function + key, value = self.result_queue.get() + yield key, value except KeyboardInterrupt: LOGGER.warning("Listener stopped manually.") finally: @@ -127,8 +128,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): self.result_queue.put((key, value)) else: LOGGER.info(f"Skipping message with unmatched key: {key}") - # value = json.loads(msg.value().decode('utf-8')) # Added for debugging - # self.result_queue.put((filter_key, value)) # Added for debugging except Exception as e: LOGGER.error(f"Error processing Kafka message: {e}") @@ -189,7 +188,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) - print ('Message delivery failed: {:}'.format(err)) + # print ('Message delivery failed: {:}'.format(err)) else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) - print('Message delivered to topic {:}'.format(msg.topic())) + # print('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py index 646de962e8a213582fdb7cd1446ab57bda561a96..e2d39585e434b58c0d48d0061e105a5ebaabe6b9 100644 --- a/src/analytics/frontend/tests/messages.py +++ b/src/analytics/frontend/tests/messages.py @@ -21,13 +21,14 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze def create_analyzer_id(): _create_analyzer_id = AnalyzerId() # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) - _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + # _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" return _create_analyzer_id def create_analyzer(): _create_analyzer = Analyzer() # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) - _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING @@ -48,11 +49,12 @@ def create_analyzer(): _create_analyzer.output_kpi_ids.append(_kpi_id) # parameter _threshold_dict = { - # 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), - 'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)} + 'mean_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), + 'first_value' :(00, 10), 'last_value' :(40, 50), 'std_value':(00, 10) + } _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) - _create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" - _create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size + _create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" + _create_analyzer.parameters['window_slider'] = "5s" # should be less than window size _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet return _create_analyzer diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index 4583a45ac12acde9da81c4b7a15165db99fb38bb..526c32eb8af98afde6b89e784f62f0a2d0f7f432 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -26,7 +26,6 @@ from common.Settings import ( get_service_port_grpc, get_env_var_name, from common.tools.kafka.Variables import KafkaTopic from common.proto.kpi_value_api_pb2 import KpiValue -from common.proto.analytics_frontend_pb2 import AnalyzerAlarms from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer, @@ -34,7 +33,7 @@ from analytics.frontend.tests.messages import ( create_analyze from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger - +from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList ########################### # Tests Setup @@ -85,23 +84,21 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic ########################### # --- "test_validate_kafka_topics" should be executed before the functionality tests --- -# def test_validate_kafka_topics(): -# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") -# response = KafkaTopic.create_all_topics() -# assert isinstance(response, bool) - -# # ----- core funtionality test ----- -def test_StartAnalytics(analyticsFrontend_client): - LOGGER.info(' >>> test_StartAnalytic START: <<< ') - stream = analyticsFrontend_client.StartAnalyzer(create_analyzer()) - for response in stream: - LOGGER.debug(str(response)) - assert isinstance(response, KpiValue) +def test_validate_kafka_topics(): + LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) + +# ----- core funtionality test ----- +# def test_StartAnalytics(analyticsFrontend_client): +# LOGGER.info(' >>> test_StartAnalytic START: <<< ') +# response = analyticsFrontend_client.StartAnalyzer(create_analyzer()) +# LOGGER.debug(str(response)) +# assert isinstance(response, AnalyzerId) # To test start and stop listener together -def test_StartStopAnalyzers(analyticsFrontend_client): - LOGGER.info(' >>> test_StartStopAnalyzers START: <<< ') - LOGGER.info('--> StartAnalyzer') +def test_StartAnalyzers(analyticsFrontend_client): + LOGGER.info(' >>> test_StartAnalyzers START: <<< ') added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer()) LOGGER.debug(str(added_analyzer_id)) assert isinstance(added_analyzer_id, AnalyzerId) diff --git a/src/analytics/requirements.in b/src/analytics/requirements.in index 8ff30ddaad25c39713f2e6f68c8d9aebed74dad0..231dc04e820387c95ffea72cbe67b9f0a9a0865a 100644 --- a/src/analytics/requirements.in +++ b/src/analytics/requirements.in @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -java==11.0.* -pyspark==3.5.2 confluent-kafka==2.3.* psycopg2-binary==2.9.* SQLAlchemy==1.4.* diff --git a/src/analytics/tests/test_analytics_db.py b/src/analytics/tests/test_analytics_db.py index 58e7d0167044bb461e66b053dcb3999641ea8419..2794edb4a051b38d4cef902fd09aaad5db966179 100644 --- a/src/analytics/tests/test_analytics_db.py +++ b/src/analytics/tests/test_analytics_db.py @@ -15,12 +15,13 @@ import logging from analytics.database.Analyzer_DB import AnalyzerDB +from analytics.database.AnalyzerModel import Analyzer LOGGER = logging.getLogger(__name__) def test_verify_databases_and_tables(): LOGGER.info('>>> test_verify_databases_and_tables : START <<< ') - AnalyzerDBobj = AnalyzerDB() + AnalyzerDBobj = AnalyzerDB(Analyzer) # AnalyzerDBobj.drop_database() # AnalyzerDBobj.verify_tables() AnalyzerDBobj.create_database() diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index cadeec6ed331f599411d6480769985673f8d584d..d1acff7e6dbad64086064a7e2344914cbba114f1 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -90,4 +90,4 @@ class KafkaTopic(Enum): return False return True -# create all topics after the deployments (Telemetry and Analytics) +# TODO: create all topics after the deployments (Telemetry and Analytics) diff --git a/src/kpi_manager/tests/test_messages.py b/src/kpi_manager/tests/test_messages.py index 7b5c45859b6c10056211f9f33df950d9668c11ea..08a2dbf7334c3e4e68c3cfa6c27ce08532521342 100644 --- a/src/kpi_manager/tests/test_messages.py +++ b/src/kpi_manager/tests/test_messages.py @@ -27,6 +27,8 @@ def create_kpi_id_request(): def create_kpi_descriptor_request(descriptor_name: str = "Test_name"): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _create_kpi_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + # _create_kpi_request.kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_kpi_request.kpi_description = descriptor_name _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' diff --git a/src/kpi_value_api/.gitlab-ci.yml b/src/kpi_value_api/.gitlab-ci.yml index 3db0538dd071e72a5b66f8489036ff83245ce337..14c8df299b1a4970ec0a4733bcd918bf1485b00d 100644 --- a/src/kpi_value_api/.gitlab-ci.yml +++ b/src/kpi_value_api/.gitlab-ci.yml @@ -62,7 +62,8 @@ unit_test kpi-value-api: --env ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:latest - sleep 10 # Wait for Zookeeper to start - - docker run --name kafka -d --network=teraflowbridge -p 9092:9092 + - > + docker run --name kafka -d --network=teraflowbridge -p 9092:9092 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env ALLOW_PLAINTEXT_LISTENER=yes bitnami/kafka:latest diff --git a/src/kpi_value_api/requirements.in b/src/kpi_value_api/requirements.in index e95d6d8bbd81abca7eb2622e1faf6af473fcdb12..0615fa833f255bf91fd72fc484e40842face7a44 100644 --- a/src/kpi_value_api/requirements.in +++ b/src/kpi_value_api/requirements.in @@ -15,4 +15,4 @@ confluent-kafka==2.3.* requests==2.27.* prometheus-api-client==0.5.3 -apscheduler==3.10.4 +apscheduler==3.10.1 diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 0f57f88219a74108a555cf87e9bdb98999fd5da2..706e180d5cf65107f5899315cdf75867beb81608 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -46,7 +46,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'analytics-frontend', + 'group.id' : 'kpi-value-api-frontend', 'auto.offset.reset' : 'latest'}) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @@ -152,12 +152,10 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): for alarm_key, value in self.StartResponseListener(request.kpi_id.uuid): response.start_timestamp.timestamp = datetime.strptime( - value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() - response.end_timestamp.timestamp = datetime.strptime( - value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() + value["window_start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() response.kpi_id.kpi_id.uuid = value['kpi_id'] for key, threshold in value.items(): - if "THRESHOLD_" in key: + if key not in ['kpi_id', 'window']: response.alarms[key] = threshold yield response @@ -183,10 +181,10 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): key, value = self.result_queue.get() # Wait until a result is available LOGGER.info("In while true ...") yield key, value # Yield the result to the calling function - except KeyboardInterrupt: - LOGGER.warning("Listener stopped manually.") + except Exception as e: + LOGGER.warning("Listener stopped. Error: {:}".format(e)) finally: - self.StopListener() + self.scheduler.shutdown() def response_listener(self, filter_key=None): """ @@ -196,23 +194,24 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): consumer = self.kafka_consumer consumer.subscribe([self.listener_topic]) - msg = consumer.poll(2.0) - if msg is None: - return - elif msg.error(): - if msg.error().code() != KafkaError._PARTITION_EOF: - LOGGER.error(f"Kafka error: {msg.error()}") - return - try: - key = msg.key().decode('utf-8') if msg.key() else None - if filter_key is not None and key == filter_key: - value = json.loads(msg.value().decode('utf-8')) - LOGGER.info(f"Received key: {key}, value: {value}") - self.result_queue.put((key, value)) - else: - LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}") - except Exception as e: - LOGGER.error(f"Error processing Kafka message: {e}") + while True: + msg = consumer.poll(1.0) + if msg is None: + continue + elif msg.error(): + if msg.error().code() != KafkaError._PARTITION_EOF: + LOGGER.error(f"Kafka error: {msg.error()}") + break + try: + key = msg.key().decode('utf-8') if msg.key() else None + if filter_key is not None and key == filter_key: + value = json.loads(msg.value().decode('utf-8')) + LOGGER.info(f"Received key: {key}, value: {value}") + self.result_queue.put((key, value)) + else: + LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}") + except Exception as e: + LOGGER.error(f"Error processing Kafka message: {e}") def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) diff --git a/src/kpi_value_api/tests/test_kpi_value_api.py b/src/kpi_value_api/tests/test_kpi_value_api.py index ac17f6f987d437ee6dacd7dfdc7a1de7a8965343..c245bb9ef64eaa29dc4d51955ff94adeeeeb8dda 100644 --- a/src/kpi_value_api/tests/test_kpi_value_api.py +++ b/src/kpi_value_api/tests/test_kpi_value_api.py @@ -78,12 +78,12 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) -def test_GetKpiAlarms(kpi_value_api_client): - LOGGER.debug(" >>> test_GetKpiAlarms") - stream = kpi_value_api_client.GetKpiAlarms(create_kpi_id_request()) - for response in stream: - LOGGER.debug(str(response)) - assert isinstance(response, KpiAlarms) +# def test_GetKpiAlarms(kpi_value_api_client): +# LOGGER.debug(" >>> test_GetKpiAlarms") +# stream = kpi_value_api_client.GetKpiAlarms(create_kpi_id_request()) +# for response in stream: +# LOGGER.debug(str(response)) +# assert isinstance(response, KpiAlarms) # def test_store_kpi_values(kpi_value_api_client): # LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 078fa5896d5fb5033833e0e2ef2248613ef80c18..79a35d343860d19992518c0e8b29e427e5cbbef4 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -17,7 +17,8 @@ import time import random import logging import threading -from typing import Any, Dict +from typing import Any, Dict +from datetime import datetime, timezone # from common.proto.context_pb2 import Empty from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer @@ -53,6 +54,8 @@ class TelemetryBackendService(GenericGrpcService): """ listener for requests on Kafka topic. """ + LOGGER.info('Telemetry backend request listener is running ...') + # print ('Telemetry backend request listener is running ...') consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.REQUEST.value]) while True: @@ -63,29 +66,33 @@ class TelemetryBackendService(GenericGrpcService): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: - print("Consumer error: {}".format(receive_msg.error())) + # print("Consumer error: {}".format(receive_msg.error())) break - - collector = json.loads(receive_msg.value().decode('utf-8')) - collector_id = receive_msg.key().decode('utf-8') - LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) - print('Recevied Collector: {:} - {:}'.format(collector_id, collector)) + try: + collector = json.loads(receive_msg.value().decode('utf-8')) + collector_id = receive_msg.key().decode('utf-8') + LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) + # print('Recevied Collector: {:} - {:}'.format(collector_id, collector)) - if collector['duration'] == -1 and collector['interval'] == -1: - self.TerminateCollectorBackend(collector_id) - else: - self.RunInitiateCollectorBackend(collector_id, collector) + if collector['duration'] == -1 and collector['interval'] == -1: + self.TerminateCollectorBackend(collector_id) + else: + self.RunInitiateCollectorBackend(collector_id, collector) + except Exception as e: + LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) + # print ("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) def TerminateCollectorBackend(self, collector_id): if collector_id in self.running_threads: thread, stop_event = self.running_threads[collector_id] stop_event.set() thread.join() - print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) + # print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) del self.running_threads[collector_id] self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. else: - print ('Backend collector {:} not found'.format(collector_id)) + # print ('Backend collector {:} not found'.format(collector_id)) + LOGGER.warning('Backend collector {:} not found'.format(collector_id)) def RunInitiateCollectorBackend(self, collector_id: str, collector: str): stop_event = threading.Event() @@ -98,10 +105,11 @@ class TelemetryBackendService(GenericGrpcService): """ Method receives collector request and initiates collecter backend. """ - print("Initiating backend for collector: ", collector_id) + # print("Initiating backend for collector: ", collector_id) + LOGGER.info("Initiating backend for collector: ", collector_id) start_time = time.time() while not stop_event.is_set(): - if time.time() - start_time >= collector['duration']: # condition to terminate backend + if int(collector['duration']) != -1 and time.time() - start_time >= collector['duration']: # condition to terminate backend print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time) self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. break @@ -130,8 +138,7 @@ class TelemetryBackendService(GenericGrpcService): Method to extract kpi value. """ measured_kpi_value = random.randint(1,100) # TODO: To be extracted from a device - print ("Measured Kpi value: {:}".format(measured_kpi_value)) - # measured_kpi_value = self.fetch_node_exporter_metrics() # exporter extracted metric value against default KPI + # print ("Measured Kpi value: {:}".format(measured_kpi_value)) self.GenerateCollectorResponse(collector_id, kpi_id , measured_kpi_value) def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): @@ -140,7 +147,7 @@ class TelemetryBackendService(GenericGrpcService): """ producer = self.kafka_producer kpi_value : Dict = { - "time_stamp": str(time.time()), + "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "kpi_id" : kpi_id, "kpi_value" : measured_kpi_value } @@ -160,7 +167,7 @@ class TelemetryBackendService(GenericGrpcService): """ if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) - print(f'Message delivery failed: {err}') + # print(f'Message delivery failed: {err}') else: LOGGER.info('Message delivered to topic {:}'.format(msg.topic())) - print(f'Message delivered to topic {msg.topic()}') + # print(f'Message delivered to topic {msg.topic()}') diff --git a/src/telemetry/backend/tests/messagesBackend.py b/src/telemetry/backend/tests/messages.py similarity index 100% rename from src/telemetry/backend/tests/messagesBackend.py rename to src/telemetry/backend/tests/messages.py diff --git a/src/telemetry/backend/tests/test_TelemetryBackend.py b/src/telemetry/backend/tests/test_backend.py similarity index 69% rename from src/telemetry/backend/tests/test_TelemetryBackend.py rename to src/telemetry/backend/tests/test_backend.py index 665fa825e3ee31b2e92351d9c5855f627ce40fa1..8bbde9769ae1dfb16a33ef528f74031d2ba94c01 100644 --- a/src/telemetry/backend/tests/test_TelemetryBackend.py +++ b/src/telemetry/backend/tests/test_backend.py @@ -26,13 +26,12 @@ LOGGER = logging.getLogger(__name__) ########################### # --- "test_validate_kafka_topics" should be run before the functionality tests --- -# def test_validate_kafka_topics(): -# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") -# response = KafkaTopic.create_all_topics() -# assert isinstance(response, bool) +def test_validate_kafka_topics(): + LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) -def test_RunRequestListener(): - LOGGER.info('test_RunRequestListener') - TelemetryBackendServiceObj = TelemetryBackendService() - response = threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() - LOGGER.debug(str(response)) +# def test_RunRequestListener(): +# LOGGER.info('test_RunRequestListener') +# TelemetryBackendServiceObj = TelemetryBackendService() +# threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() \ No newline at end of file diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index ad99dff12dc641232972f8cff8226878caefd71b..5c569e2ddd1d75dd89f88fe9ae08517330470254 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -153,7 +153,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): """ if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) - print('Message delivery failed: {:}'.format(err)) + # print('Message delivery failed: {:}'.format(err)) # else: # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) # print('Message delivered to topic {:}'.format(msg.topic())) @@ -177,7 +177,8 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: - print("Consumer error: {}".format(receive_msg.error())) + # print("Consumer error: {:}".format(receive_msg.error())) + LOGGER.error("Consumer error: {:}".format(receive_msg.error())) break try: collector_id = receive_msg.key().decode('utf-8') @@ -185,13 +186,17 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): kpi_value = json.loads(receive_msg.value().decode('utf-8')) self.process_response(collector_id, kpi_value['kpi_id'], kpi_value['kpi_value']) else: - print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") + # print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") + LOGGER.info("collector id does not match.\nRespone ID: {:} --- Active IDs: {:}".format(collector_id, ACTIVE_COLLECTORS)) except Exception as e: - print(f"Error extarcting msg key or value: {str(e)}") + # print(f"Error extarcting msg key or value: {str(e)}") + LOGGER.info("Error extarcting msg key or value: {:}".format(e)) continue def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any): if kpi_id == "-1" and kpi_value == -1: - print ("Backend termination confirmation for collector id: ", collector_id) + # print ("Backend termination confirmation for collector id: ", collector_id) + LOGGER.info("Backend termination confirmation for collector id: ", collector_id) else: - print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value) + LOGGER.info("Backend termination confirmation for collector id: ", collector_id) + # print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value) diff --git a/src/telemetry/frontend/tests/Messages.py b/src/telemetry/frontend/tests/Messages.py index a0e93e8a121b9efaac83f7169419911c8ee6e3ea..e6d8ef439f4ad4764c5a6f8b5f36ec68cbb10867 100644 --- a/src/telemetry/frontend/tests/Messages.py +++ b/src/telemetry/frontend/tests/Messages.py @@ -22,15 +22,18 @@ from common.proto.kpi_manager_pb2 import KpiId def create_collector_id(): _collector_id = telemetry_frontend_pb2.CollectorId() # _collector_id.collector_id.uuid = str(uuid.uuid4()) - _collector_id.collector_id.uuid = "5d45f53f-d567-429f-9427-9196ac72ff0c" + _collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" return _collector_id def create_collector_request(): _create_collector_request = telemetry_frontend_pb2.Collector() - _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) - _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) - _create_collector_request.duration_s = float(random.randint(8, 16)) - _create_collector_request.interval_s = float(random.randint(2, 4)) + # _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) + _create_collector_request.collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" + # _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_collector_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + # _create_collector_request.duration_s = float(random.randint(8, 16)) + _create_collector_request.duration_s = -1 + _create_collector_request.interval_s = float(random.randint(3, 5)) return _create_collector_request def create_collector_filter(): diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index 9c3f9d3a8f545792eb2bb3a371c6c20664d24f69..c3f8091c83f56fd4a134ec092b1e22723040595d 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -105,47 +105,10 @@ def test_SelectCollectors(telemetryFrontend_client): LOGGER.debug(str(response)) assert isinstance(response, CollectorList) -# ----- Non-gRPC method tests ----- -def test_RunResponseListener(): - LOGGER.info(' >>> test_RunResponseListener START: <<< ') - TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl() - response = TelemetryFrontendServiceObj.RunResponseListener() # becasue Method "run_kafka_listener" is not define in frontend.proto - LOGGER.debug(str(response)) - assert isinstance(response, bool) - -# ------- previous test ---------------- - -# def test_verify_db_and_table(): -# LOGGER.info(' >>> test_verify_database_and_tables START: <<< ') -# _engine = TelemetryEngine.get_engine() -# managementDB.create_database(_engine) -# managementDB.create_tables(_engine) - -# def test_StartCollector(telemetryFrontend_client): -# LOGGER.info(' >>> test_StartCollector START: <<< ') -# response = telemetryFrontend_client.StartCollector(create_collector_request()) -# LOGGER.debug(str(response)) -# assert isinstance(response, CollectorId) - -# def test_run_kafka_listener(): -# LOGGER.info(' >>> test_run_kafka_listener START: <<< ') -# name_mapping = NameMapping() -# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping) -# response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto +# # ----- Non-gRPC method tests ----- +# def test_RunResponseListener(): +# LOGGER.info(' >>> test_RunResponseListener START: <<< ') +# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl() +# response = TelemetryFrontendServiceObj.RunResponseListener() # becasue Method "run_kafka_listener" is not define in frontend.proto # LOGGER.debug(str(response)) # assert isinstance(response, bool) - -# def test_StopCollector(telemetryFrontend_client): -# LOGGER.info(' >>> test_StopCollector START: <<< ') -# _collector_id = telemetryFrontend_client.StartCollector(create_collector_request()) -# time.sleep(3) # wait for small amount before call the stopCollecter() -# response = telemetryFrontend_client.StopCollector(_collector_id) -# LOGGER.debug(str(response)) -# assert isinstance(response, Empty) - -# def test_select_collectors(telemetryFrontend_client): -# LOGGER.info(' >>> test_select_collector requesting <<< ') -# response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) -# LOGGER.info('Received Rows after applying Filter: {:} '.format(response)) -# LOGGER.debug(str(response)) -# assert isinstance(response, CollectorList) \ No newline at end of file diff --git a/src/telemetry/requirements.in b/src/telemetry/requirements.in index a0e78d2bfb7270b9664ad5ba810e2f213d887bf7..503468a662599f0225b293d0ef4c4e4313fa3e0f 100644 --- a/src/telemetry/requirements.in +++ b/src/telemetry/requirements.in @@ -12,13 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -anytree==2.8.0 APScheduler==3.10.1 -influx-line-protocol==0.1.4 psycopg2-binary==2.9.3 python-dateutil==2.8.2 python-json-logger==2.0.2 pytz==2024.1 -questdb==1.0.1 requests==2.27.1 -xmltodict==0.12.0 \ No newline at end of file diff --git a/src/telemetry/tests/test_telemetryDB.py b/src/telemetry/tests/test_telemetryDB.py index 1b122e4bca266018c01044e2eb8a1ab277b3e3c3..bbc02a2a22fbbae3a1064fc5f9606ec8b29ff0f9 100644 --- a/src/telemetry/tests/test_telemetryDB.py +++ b/src/telemetry/tests/test_telemetryDB.py @@ -15,12 +15,13 @@ import logging from telemetry.database.Telemetry_DB import TelemetryDB +from telemetry.database.TelemetryModel import Collector as CollectorModel LOGGER = logging.getLogger(__name__) def test_verify_databases_and_tables(): LOGGER.info('>>> test_verify_databases_and_tables : START <<< ') - TelemetryDBobj = TelemetryDB() + TelemetryDBobj = TelemetryDB(CollectorModel) # TelemetryDBobj.drop_database() # TelemetryDBobj.verify_tables() TelemetryDBobj.create_database()