diff --git a/proto/analytics_frontend.proto b/proto/analytics_frontend.proto index ace0581db816bee1d0d20746f2b864dce602567b..bb6afb2e424b646fd8d9a6810c38a8a6863c230f 100644 --- a/proto/analytics_frontend.proto +++ b/proto/analytics_frontend.proto @@ -20,7 +20,7 @@ import "kpi_manager.proto"; //import "kpi_sample_types.proto"; service AnalyticsFrontendService { - rpc StartAnalyzer (Analyzer ) returns (AnalyzerId ) {} + rpc StartAnalyzer (Analyzer ) returns (AnalyzerId) {} rpc StopAnalyzer (AnalyzerId ) returns (context.Empty) {} rpc SelectAnalyzers(AnalyzerFilter) returns (AnalyzerList ) {} } diff --git a/proto/kpi_value_api.proto b/proto/kpi_value_api.proto index dff96272e3d05756dd19a49ecaede7311b196540..4d3a1f216406841344d40712ea04ec82cedf04d0 100644 --- a/proto/kpi_value_api.proto +++ b/proto/kpi_value_api.proto @@ -19,8 +19,9 @@ import "context.proto"; import "kpi_manager.proto"; service KpiValueAPIService { - rpc StoreKpiValues (KpiValueList) returns (context.Empty) {} - rpc SelectKpiValues (KpiValueFilter) returns (KpiValueList) {} + rpc StoreKpiValues (KpiValueList) returns (context.Empty) {} + rpc SelectKpiValues (KpiValueFilter) returns (KpiValueList) {} + rpc GetKpiAlarms (kpi_manager.KpiId) returns (stream KpiAlarms) {} } message KpiValue { @@ -50,3 +51,10 @@ message KpiValueFilter { repeated context.Timestamp start_timestamp = 2; repeated context.Timestamp end_timestamp = 3; } + +message KpiAlarms { + context.Timestamp start_timestamp = 1; + context.Timestamp end_timestamp = 2; + kpi_manager.KpiId kpi_id = 3; + map alarms = 4; +} diff --git a/src/analytics/README.md b/src/analytics/README.md index 9663e5321ace6866491b90553553d9ccbf5793a1..ece11ea969caf1cf92fb474ae1371c9979231ac2 100644 --- a/src/analytics/README.md +++ b/src/analytics/README.md @@ -1,4 +1,118 @@ -# How to locally run and test Analytic service (To be added soon) +# How to Locally Run and Test Analytic Frontend Service ### Pre-requisets -The following requirements should be fulfilled before the execuation of Telemetry service. +The following requirements should be fulfilled before the execuation of Analytics service. + +1. A virtual enviornment exist with all the required packages listed in [requirements.in](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/analytics/frontend/requirements.in) sucessfully installed. +2. Verify the creation of required database and table. The +[Analytics DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/analytics/tests/test_analytics_db.py) python file lists the functions to create tables and the database. +3. The Analytics backend service should be running. + +4. All required Kafka topics must exist. Call `create_all_topics` from the [Kafka class](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/common/tools/kafka/Variables.py) to create any topics that do not already exist. +``` +from common.tools.kafka.Variables import KafkaTopic +KafkaTopic.create_all_topics() +``` + +5. There will be an input stream on the Kafka topic that the Spark Streamer will consume and apply a defined thresholds. +- A JSON encoded string should be generated in the following format: +``` +'{"time_stamp": "2024-09-03T12:36:26Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 44.22}' +``` +- `kpi_value` should be float or int. +- The Kafka producer key should be the UUID of the Analyzer used when creating it. +- Use the following Kafka topic to generate the stream: `KafkaTopic.ANALYTICS_RESPONSE.value`. + +## Steps to create and start Analyzer +The analyzer can be declared as below but there are many other ways to declare: + +The given object creation process for `_create_analyzer` involves defining an instance of the `Analyzer` message from the [gRPC definition](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/194-unable-to-correctly-extract-the-aggregation-function-names-from-the-dictionary-received-as/proto/analytics_frontend.proto) and populating its fields. + +``` +from common.proto.analytics_frontend_pb2 import AnalyzerId +_create_analyzer_id = AnalyzerId() +``` + +Here is a breakdown of how each field is populated: + +### 1. **Analyzer ID** + - `analyzer_id`: This field uses a unique ID to identify the analyzer. In this case, the ID is a UUID. + ```python + _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + ``` + - The commented-out code shows how the UUID can be generated dynamically using Python's `uuid.uuid4()`. However, for now, a static UUID is used. + +### 2. **Algorithm Name** + - `algorithm_name`: Specifies the name of the algorithm to be executed by the analyzer. + ```python + _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" + ``` + +### 3. **Operation Mode** + - `operation_mode`: Sets the mode in which the analyzer operates, in this case, it's set to `ANALYZEROPERATIONMODE_STREAMING`. + ```python + _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING + ``` + +### 4. **Input KPI IDs** + - `input_kpi_ids`: This is a list of KPI IDs that will be used as input for the analysis. KPI IDs are represented using `KpiId`, and UUIDs are assigned to each input. The Spark streamer assume that the provided KPIs exists in the KPI Descriptor database. + ```python + _kpi_id = KpiId() + _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _create_analyzer.input_kpi_ids.append(_kpi_id) + + _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.input_kpi_ids.append(_kpi_id) + ``` + +### 5. **Output KPI IDs** + - `output_kpi_ids`: A list of KPI IDs that are produced as output after analysis. Each one is generated and appended to the list. + ```python + _kpi_id = KpiId() + _create_analyzer.output_kpi_ids.append(_kpi_id) + ``` + +### 6. **Parameters** + - `parameters`: This is a dictionary containing key-value pairs of various parameters used by the analyzer. These values are often algorithm-specific. + - **Thresholds**: A dictionary containing threshold possible values (min, max, avg, first, last, stdev)_. For example: "min_latency", "max_bandwidth", "avg_datarate" etc. + ```python + _threshold_dict = { + 'min_latency' : (00, 10), + 'max_bandwidth': (40, 50), + 'avg_datarate': (00, 10) + } + _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) + ``` + - **Window Size**: Specifies the size of the time window (e.g., `60 seconds`). + ```python + _create_analyzer.parameters['window_size'] = "60 seconds" + ``` + - **Window Slider**: Defines the sliding window interval (e.g., `30 seconds`). + ```python + _create_analyzer.parameters['window_slider'] = "30 seconds" + ``` + +### **Calling `StartAnalyzer` with an Analyzer Frontend Object** + - The following code demonstrates how to call `StartAnalyzer()` with an Analyzer object: +```python +from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient + +analytics_client_object = AnalyticsFrontendClient() +analytics_client_object.StartAnalyzer(_create_analyzer_id) +``` + +### **How to Receive Analyzer Responses** + - There is a non-gRPC method in the analyzer frontend called `StartResponseListener()`. The `analyzer_uuid` is the UUID of the analyzer provided when calling `StartAnalyzer()`. The following code will log the responses: +```python +from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl + +analytic_frontend_service_object = AnalyticsFrontendServiceServicerImpl() +for response in analytic_frontend_service_object.StartResponseListener(): + LOGGER.debug(response) +``` + +### **Understanding the Output of the Analyzer** +- **Output Column Names**: The output JSON string will include two keys for each defined threshold. For example, the `min_latency` threshold will generate two keys: `min_latency_THRESHOLD_FAIL` and `min_latency_THRESHOLD_RAISE`. + - `min_latency_THRESHOLD_FAIL` is triggered if the average latency calculated within the defined window size is less than the specified threshold range. + - `min_latency_THRESHOLD_RAISE` is triggered if the average latency calculated within the defined window size exceeds the specified threshold range. +- The thresholds `min_latency_THRESHOLD_FAIL` and `min_latency_THRESHOLD_RAISE` will have a value of `TRUE` if activated; otherwise, they will be set to `FALSE`. diff --git a/src/analytics/backend/Dockerfile b/src/analytics/backend/Dockerfile index 17adcd3ab1df5704cc7ef0c5a19b3cfb1539ee22..df5cd7fbde6dc45780fdb3a333594ab11c1ab146 100644 --- a/src/analytics/backend/Dockerfile +++ b/src/analytics/backend/Dockerfile @@ -53,6 +53,15 @@ RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto RUN rm *.proto RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; +# Install Java (required for PySpark) +RUN apt-get update && \ + apt-get install -y default-jdk && \ + apt-get clean + +# Set JAVA_HOME environment variable +ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 +ENV PATH=$JAVA_HOME/bin:$PATH + # Create component sub-folders, get specific Python packages RUN mkdir -p /var/teraflow/analytics/backend WORKDIR /var/teraflow/analytics/backend diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 595603567fe537d9f7b33224cba0fe016a439631..658d237956b4ed3addbbc295ef0d19dd4b977257 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -42,7 +42,7 @@ class AnalyticsBackendService(GenericGrpcService): def StartSparkStreamer(self, analyzer_uuid, analyzer): kpi_list = analyzer['input_kpis'] - oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())] # TODO: update this line... + oper_list = [s.split('_', 1)[0] for s in list(analyzer["thresholds"].keys())] # TODO: update this line... thresholds = analyzer['thresholds'] window_size = analyzer['window_size'] window_slider = analyzer['window_slider'] @@ -78,20 +78,17 @@ class AnalyticsBackendService(GenericGrpcService): LOGGER.error("Failed to terminate analytics backend {:}".format(e)) return False - def install_services(self): - stop_event = threading.Event() - thread = threading.Thread(target=self.RequestListener, - args=(stop_event,) ) - thread.start() - return (thread, stop_event) + def install_servicers(self): + threading.Thread(target=self.RequestListener, args=()).start() - def RequestListener(self, stop_event): + def RequestListener(self): """ listener for requests on Kafka topic. """ + LOGGER.info("Request Listener is initiated ...") consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) - while not stop_event.is_set(): + while True: receive_msg = consumer.poll(2.0) if receive_msg is None: continue @@ -101,7 +98,7 @@ class AnalyticsBackendService(GenericGrpcService): else: print("Consumer error: {}".format(receive_msg.error())) break - analyzer = json.loads(receive_msg.value().decode('utf-8')) + analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer_uuid = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py index 96e1aa05d898ffdd23c533b74ee87fbf03f54576..f204c6247436177cd032c777c048ecb165051ec2 100644 --- a/src/analytics/backend/service/SparkStreaming.py +++ b/src/analytics/backend/service/SparkStreaming.py @@ -33,7 +33,7 @@ def SettingKafkaConsumerParams(): # TODO: create get_kafka_consumer() in comm return { # "kafka.bootstrap.servers": '127.0.0.1:9092', "kafka.bootstrap.servers": KafkaConfig.get_kafka_address(), - "subscribe" : KafkaTopic.VALUE.value, + "subscribe" : KafkaTopic.VALUE.value, # topic should have atleast one message before spark session "startingOffsets" : 'latest', "failOnDataLoss" : 'false' # Optional: Set to "true" to fail the query on data loss } @@ -64,7 +64,7 @@ def ApplyThresholds(aggregated_df, thresholds): for col_name, (fail_th, raise_th) in thresholds.items(): # Apply TH-Fail condition (if column value is less than the fail threshold) aggregated_df = aggregated_df.withColumn( - f"{col_name}_THRESHOLD_FAIL", + f"{col_name}_THRESHOLD_FALL", when(col(col_name) < fail_th, True).otherwise(False) ) # Apply TH-RAISE condition (if column value is greater than the raise threshold) @@ -128,11 +128,11 @@ def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event, # --- This will write output to Kafka: ACTUAL IMPLEMENTATION query = thresholded_stream_data \ - .selectExpr(f"'{key}' AS key", "to_json(struct(*)) AS value") \ + .selectExpr(f"CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \ - .option("topic", KafkaTopic.ANALYTICS_RESPONSE.value) \ + .option("topic", KafkaTopic.ALARMS.value) \ .option("checkpointLocation", "analytics/.spark/checkpoint") \ .outputMode("update") diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py index 9acd6ad9dffe4a5b10b107a6923ed85170ee141f..c3b78967efe13eef9a60e19e50e56bdfca4a410d 100644 --- a/src/analytics/backend/tests/messages.py +++ b/src/analytics/backend/tests/messages.py @@ -12,6 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import uuid +import json +from common.proto.kpi_manager_pb2 import KpiId +from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, + Analyzer ) def get_kpi_id_list(): return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"] @@ -32,3 +37,37 @@ def get_threshold_dict(): return { op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict } + + +def create_analyzer(): + _create_analyzer = Analyzer() + # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" + _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING + + _kpi_id = KpiId() + # input IDs to analyze + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _create_analyzer.input_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.input_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.input_kpi_ids.append(_kpi_id) + # output IDs after analysis + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + # parameter + _threshold_dict = { + # 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), + 'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)} + _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) + _create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" + _create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size + _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet + + return _create_analyzer \ No newline at end of file diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 2f40faba94ef7081db609116e8fd869e3d119a24..bc0f47eb213f705a71f46de6a45d38b1c6f37e96 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time +import time, json +from typing import Dict import logging -import threading +from threading import Event, Thread from common.tools.kafka.Variables import KafkaTopic from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict +from .messages import create_analyzer LOGGER = logging.getLogger(__name__) @@ -32,6 +34,24 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) +def test_StartSparkStreamer(): + LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") + analyzer_obj = create_analyzer() + analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid + analyzer_to_generate : Dict = { + "algo_name" : analyzer_obj.algorithm_name, + "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], + "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], + "oper_mode" : analyzer_obj.operation_mode, + "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), + "window_size" : analyzer_obj.parameters["window_size"], + "window_slider" : analyzer_obj.parameters["window_slider"], + # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] + } + AnalyticsBackendServiceObj = AnalyticsBackendService() + response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) + assert isinstance(response, bool) + # def test_StartRequestListener(): # LOGGER.info('test_RunRequestListener') # AnalyticsBackendServiceObj = AnalyticsBackendService() @@ -49,9 +69,31 @@ def test_StopRequestListener(): time.sleep(10) LOGGER.info('Initiating StopRequestListener...') AnalyticsBackendServiceObj = AnalyticsBackendService() - response = AnalyticsBackendServiceObj.StopRequestListener(response_thread) - LOGGER.debug(str(response)) - assert isinstance(response, bool) + AnalyticsBackendServiceObj.stop_event = Event() + listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) + listener_thread.start() + + time.sleep(2000) + + # AnalyticsBackendServiceObj.stop_event.set() + # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') + # listener_thread.join(timeout=10) + # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." + LOGGER.info('Completed test_RunRequestListener') + +# To test START and STOP communication together +# def test_StopRequestListener(): +# LOGGER.info('test_RunRequestListener') +# LOGGER.info('Initiating StartRequestListener...') +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) +# # LOGGER.debug(str(response_thread)) +# time.sleep(10) +# LOGGER.info('Initiating StopRequestListener...') +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# response = AnalyticsBackendServiceObj.StopRequestListener(response_thread) +# LOGGER.debug(str(response)) +# assert isinstance(response, bool) # To independently tests the SparkListener functionality # def test_SparkListener(): diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index a7fc8d49248ff01a860accac1b64a29d5533069f..9ffacecc30fac40bb4899b8889386bc23a7609ac 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -12,13 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. - -import logging, grpc, json, queue +import logging, grpc, json from typing import Dict -from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Producer as KafkaProducer -from confluent_kafka import KafkaError from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.context_pb2 import Empty @@ -27,8 +24,7 @@ from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, Analy from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer from analytics.database.Analyzer_DB import AnalyzerDB from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel -from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.triggers.interval import IntervalTrigger + LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC') @@ -41,9 +37,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): self.result_queue = queue.Queue() self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) - self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'analytics-frontend', - 'auto.offset.reset' : 'latest'}) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartAnalyzer(self, @@ -56,7 +49,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): AnalyzerModel.ConvertAnalyzerToRow(request) ) self.PublishStartRequestOnKafka(request) - response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid return response @@ -174,15 +166,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ) LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid)) self.kafka_producer.flush() - self.StopListener() - - def StopListener(self): - """ - Gracefully stop the Kafka listener and the scheduler. - """ - LOGGER.info("Stopping Kafka listener...") - self.scheduler.shutdown() - LOGGER.info("Kafka listener stopped.") @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectAnalyzers(self, @@ -202,7 +185,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): LOGGER.info('Unable to process filter response {:}'.format(e)) except Exception as e: LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e)) - def delivery_callback(self, err, msg): if err: diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index 44e84e4683bcdcec72e572b8e4deea903bf0de65..4583a45ac12acde9da81c4b7a15165db99fb38bb 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -25,7 +25,8 @@ from common.Settings import ( get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC ) from common.tools.kafka.Variables import KafkaTopic -from common.proto.analytics_frontend_pb2 import AnalyzerId, AnalyzerList +from common.proto.kpi_value_api_pb2 import KpiValue +from common.proto.analytics_frontend_pb2 import AnalyzerAlarms from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer, @@ -89,12 +90,13 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic # response = KafkaTopic.create_all_topics() # assert isinstance(response, bool) -# ----- core funtionality test ----- -# def test_StartAnalytics(analyticsFrontend_client): -# LOGGER.info(' >>> test_StartAnalytic START: <<< ') -# response = analyticsFrontend_client.StartAnalyzer(create_analyzer()) -# LOGGER.debug(str(response)) -# assert isinstance(response, AnalyzerId) +# # ----- core funtionality test ----- +def test_StartAnalytics(analyticsFrontend_client): + LOGGER.info(' >>> test_StartAnalytic START: <<< ') + stream = analyticsFrontend_client.StartAnalyzer(create_analyzer()) + for response in stream: + LOGGER.debug(str(response)) + assert isinstance(response, KpiValue) # To test start and stop listener together def test_StartStopAnalyzers(analyticsFrontend_client): diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index 73b633e23cd55aefeed9b8075f2ad35348fc83ef..cadeec6ed331f599411d6480769985673f8d584d 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -46,6 +46,7 @@ class KafkaTopic(Enum): RAW = 'topic_raw' LABELED = 'topic_labeled' VALUE = 'topic_value' + ALARMS = 'topic_alarms' ANALYTICS_REQUEST = 'topic_request_analytics' ANALYTICS_RESPONSE = 'topic_response_analytics' @@ -77,8 +78,8 @@ class KafkaTopic(Enum): # LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics)) if topic not in topic_metadata.topics: # If the topic does not exist, create a new topic - print("Topic {:} does not exist. Creating...".format(topic)) - LOGGER.debug("Topic {:} does not exist. Creating...".format(topic)) + # print("Topic {:} does not exist. Creating...".format(topic)) + # LOGGER.debug("Topic {:} does not exist. Creating...".format(topic)) new_topic = NewTopic(topic, num_partitions=1, replication_factor=1) KafkaConfig.get_admin_client().create_topics([new_topic]) else: diff --git a/src/kpi_value_api/client/KpiValueApiClient.py b/src/kpi_value_api/client/KpiValueApiClient.py index f432271cfb7c8136f72156330b25d0b82b934d99..dfc5f07254a30db34a20ee8d0eae931cfd0ce571 100644 --- a/src/kpi_value_api/client/KpiValueApiClient.py +++ b/src/kpi_value_api/client/KpiValueApiClient.py @@ -15,17 +15,18 @@ import grpc, logging from common.Constants import ServiceNameEnum -from common.Settings import get_service_host, get_service_port_grpc -from common.tools.client.RetryDecorator import retry, delay_exponential -from common.tools.grpc.Tools import grpc_message_to_json_string +from common.Settings import get_service_host, get_service_port_grpc -from common.proto.context_pb2 import Empty -from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter +from common.tools.client.RetryDecorator import retry, delay_exponential +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.proto.context_pb2 import Empty +from common.proto.kpi_manager_pb2 import KpiId +from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiAlarms from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceStub -LOGGER = logging.getLogger(__name__) -MAX_RETRIES = 10 -DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 10 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') class KpiValueApiClient: @@ -34,8 +35,8 @@ class KpiValueApiClient: if not port: port = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI) self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) - self.channel = None - self.stub = None + self.channel = None + self.stub = None self.connect() LOGGER.debug('Channel created') @@ -46,18 +47,25 @@ class KpiValueApiClient: def close(self): if self.channel is not None: self.channel.close() self.channel = None - self.stub = None + self.stub = None @RETRY_DECORATOR - def StoreKpiValues(self, request: KpiValueList) -> Empty: + def StoreKpiValues(self, request: KpiValueList) -> Empty: # type: ignore LOGGER.debug('StoreKpiValues: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.StoreKpiValues(request) LOGGER.debug('StoreKpiValues result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def SelectKpiValues(self, request: KpiValueFilter) -> KpiValueList: + def SelectKpiValues(self, request: KpiValueFilter) -> KpiValueList: # type: ignore LOGGER.debug('SelectKpiValues: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SelectKpiValues(request) LOGGER.debug('SelectKpiValues result: {:s}'.format(grpc_message_to_json_string(response))) return response + + @RETRY_DECORATOR + def GetKpiAlarms(self, request: KpiId) -> KpiAlarms: # type: ignore + LOGGER.debug('GetKpiAlarms: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetKpiAlarms(request) + LOGGER.debug('GetKpiAlarms result: {:s}'.format(grpc_message_to_json_string(response))) + return response diff --git a/src/kpi_value_api/requirements.in b/src/kpi_value_api/requirements.in index f5695906a8d02d55e15960a76986b8d03f02dba1..e95d6d8bbd81abca7eb2622e1faf6af473fcdb12 100644 --- a/src/kpi_value_api/requirements.in +++ b/src/kpi_value_api/requirements.in @@ -14,4 +14,5 @@ confluent-kafka==2.3.* requests==2.27.* -prometheus-api-client==0.5.3 \ No newline at end of file +prometheus-api-client==0.5.3 +apscheduler==3.10.4 diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 4ea978fafc8d7454d41f64182d553d030215113a..0f57f88219a74108a555cf87e9bdb98999fd5da2 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -12,18 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, grpc, json +from datetime import datetime +import logging, grpc, json, queue from typing import Dict from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from confluent_kafka import KafkaError from common.proto.context_pb2 import Empty from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer -from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType - +from common.proto.kpi_value_api_pb2 import KpiAlarms, KpiValueList, KpiValueFilter, KpiValue, KpiValueType +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import Consumer as KafkaConsumer from prometheus_api_client import PrometheusConnect from prometheus_api_client.utils import parse_datetime @@ -37,8 +41,14 @@ PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TO class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): def __init__(self): LOGGER.debug('Init KpiValueApiService') + self.listener_topic = KafkaTopic.ALARMS.value + self.result_queue = queue.Queue() + self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) - + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'analytics-frontend', + 'auto.offset.reset' : 'latest'}) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext ) -> Empty: @@ -109,17 +119,14 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): kpi_value = KpiValue() kpi_value.kpi_id.kpi_id = record['metric']['__name__'], kpi_value.timestamp = value[0], - kpi_value.kpi_value_type = self.ConverValueToKpiValueType(value[1]) + kpi_value.kpi_value_type.CopyFrom(self.ConverValueToKpiValueType(value['kpi_value'])) response.kpi_value_list.append(kpi_value) return response def GetKpiSampleType(self, kpi_value: str, kpi_manager_client): - print("--- START -----") - kpi_id = KpiId() kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid # print("KpiId generated: {:}".format(kpi_id)) - try: kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) @@ -135,26 +142,91 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e)) print ("Unable to get KpiDescriptor. Error: {:}".format(e)) - def ConverValueToKpiValueType(self, value): - # Check if the value is an integer (int64) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def GetKpiAlarms(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiAlarms: # type: ignore + """ + Get Alarms from Kafka return Alrams periodically. + """ + LOGGER.debug('GetKpiAlarms: {:}'.format(request)) + response = KpiAlarms() + + for alarm_key, value in self.StartResponseListener(request.kpi_id.uuid): + response.start_timestamp.timestamp = datetime.strptime( + value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() + response.end_timestamp.timestamp = datetime.strptime( + value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() + response.kpi_id.kpi_id.uuid = value['kpi_id'] + for key, threshold in value.items(): + if "THRESHOLD_" in key: + response.alarms[key] = threshold + + yield response + + def StartResponseListener(self, filter_key=None): + """ + Start the Kafka response listener with APScheduler and return key-value pairs periodically. + """ + LOGGER.info("Starting StartResponseListener") + # Schedule the ResponseListener at fixed intervals + self.scheduler.add_job( + self.response_listener, + trigger=IntervalTrigger(seconds=5), + args=[filter_key], + id=f"response_listener_{self.listener_topic}", + replace_existing=True + ) + self.scheduler.start() + LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...") try: - int_value = int(value) - return KpiValueType(int64Val=int_value) - except (ValueError, TypeError): - pass - # Check if the value is a float + while True: + LOGGER.info("entering while...") + key, value = self.result_queue.get() # Wait until a result is available + LOGGER.info("In while true ...") + yield key, value # Yield the result to the calling function + except KeyboardInterrupt: + LOGGER.warning("Listener stopped manually.") + finally: + self.StopListener() + + def response_listener(self, filter_key=None): + """ + Poll Kafka messages and put key-value pairs into the queue. + """ + LOGGER.info(f"Polling Kafka topic {self.listener_topic}...") + + consumer = self.kafka_consumer + consumer.subscribe([self.listener_topic]) + msg = consumer.poll(2.0) + if msg is None: + return + elif msg.error(): + if msg.error().code() != KafkaError._PARTITION_EOF: + LOGGER.error(f"Kafka error: {msg.error()}") + return try: - float_value = float(value) - return KpiValueType(floatVal=float_value) - except (ValueError, TypeError): - pass - # Check if the value is a boolean - if value.lower() in ['true', 'false']: - bool_value = value.lower() == 'true' - return KpiValueType(boolVal=bool_value) - # If none of the above, treat it as a string - return KpiValueType(stringVal=value) + key = msg.key().decode('utf-8') if msg.key() else None + if filter_key is not None and key == filter_key: + value = json.loads(msg.value().decode('utf-8')) + LOGGER.info(f"Received key: {key}, value: {value}") + self.result_queue.put((key, value)) + else: + LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}") + except Exception as e: + LOGGER.error(f"Error processing Kafka message: {e}") def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) + + def ConverValueToKpiValueType(self, value): + kpi_value_type = KpiValueType() + if isinstance(value, int): + kpi_value_type.int32Val = value + elif isinstance(value, float): + kpi_value_type.floatVal = value + elif isinstance(value, str): + kpi_value_type.stringVal = value + elif isinstance(value, bool): + kpi_value_type.boolVal = value + # Add other checks for different types as needed + return kpi_value_type diff --git a/src/kpi_value_api/tests/messages.py b/src/kpi_value_api/tests/messages.py index d8ad14bd44eebc1e9412cfd5ff2973e6018c95e9..50240c0154deff33dfdbb797cd5e0fca9a05c8ab 100644 --- a/src/kpi_value_api/tests/messages.py +++ b/src/kpi_value_api/tests/messages.py @@ -13,9 +13,16 @@ # limitations under the License. import uuid, time +from common.proto import kpi_manager_pb2 from common.proto.kpi_value_api_pb2 import KpiValue, KpiValueList +def create_kpi_id_request(): + _create_kpi_id = kpi_manager_pb2.KpiId() + _create_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + # _create_kpi_id.kpi_id.uuid = str(uuid.uuid4()) + return _create_kpi_id + def create_kpi_value_list(): _create_kpi_value_list = KpiValueList() # To run this experiment sucessfully, add an existing UUID of a KPI Descriptor from the KPI DB. diff --git a/src/kpi_value_api/tests/test_kpi_value_api.py b/src/kpi_value_api/tests/test_kpi_value_api.py index 307b5cdad4e6503a774e308f669fc44762f84bf1..ac17f6f987d437ee6dacd7dfdc7a1de7a8965343 100644 --- a/src/kpi_value_api/tests/test_kpi_value_api.py +++ b/src/kpi_value_api/tests/test_kpi_value_api.py @@ -21,8 +21,8 @@ from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) from kpi_value_api.service.KpiValueApiService import KpiValueApiService from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient -from kpi_value_api.tests.messages import create_kpi_value_list - +from kpi_value_api.tests.messages import create_kpi_value_list, create_kpi_id_request +from common.proto.kpi_value_api_pb2 import KpiAlarms LOCAL_HOST = '127.0.0.1' KPIVALUEAPI_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI) # type: ignore @@ -78,7 +78,14 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) -def test_store_kpi_values(kpi_value_api_client): - LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") - response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list()) - assert isinstance(response, Empty) +def test_GetKpiAlarms(kpi_value_api_client): + LOGGER.debug(" >>> test_GetKpiAlarms") + stream = kpi_value_api_client.GetKpiAlarms(create_kpi_id_request()) + for response in stream: + LOGGER.debug(str(response)) + assert isinstance(response, KpiAlarms) + +# def test_store_kpi_values(kpi_value_api_client): +# LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") +# response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list()) +# assert isinstance(response, Empty) diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 6ab841238f446a2895cd163fab4b7eb05eaa3176..078fa5896d5fb5033833e0e2ef2248613ef80c18 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -83,7 +83,7 @@ class TelemetryBackendService(GenericGrpcService): thread.join() print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) del self.running_threads[collector_id] - self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend. + self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. else: print ('Backend collector {:} not found'.format(collector_id)) @@ -103,11 +103,28 @@ class TelemetryBackendService(GenericGrpcService): while not stop_event.is_set(): if time.time() - start_time >= collector['duration']: # condition to terminate backend print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time) - self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend. + self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. break self.ExtractKpiValue(collector_id, collector['kpi_id']) time.sleep(collector['interval']) + def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): + """ + Method to write kpi Termination signat on RESPONSE Kafka topic + """ + producer = self.kafka_producer + kpi_value : Dict = { + "kpi_id" : kpi_id, + "kpi_value" : measured_kpi_value, + } + producer.produce( + KafkaTopic.RESPONSE.value, # TODO: to the topic ... + key = collector_id, + value = json.dumps(kpi_value), + callback = self.delivery_callback + ) + producer.flush() + def ExtractKpiValue(self, collector_id: str, kpi_id: str): """ Method to extract kpi value. @@ -123,117 +140,27 @@ class TelemetryBackendService(GenericGrpcService): """ producer = self.kafka_producer kpi_value : Dict = { + "time_stamp": str(time.time()), "kpi_id" : kpi_id, "kpi_value" : measured_kpi_value } producer.produce( - KafkaTopic.RESPONSE.value, + KafkaTopic.VALUE.value, # TODO: to the topic ... key = collector_id, value = json.dumps(kpi_value), callback = self.delivery_callback ) producer.flush() - def GenerateRawMetric(self, metrics: Any): - """ - Method writes raw metrics on VALUE Kafka topic - """ - producer = self.kafka_producer - some_metric : Dict = { - "some_id" : metrics - } - producer.produce( - KafkaTopic.VALUE.value, - key = 'raw', - value = json.dumps(some_metric), - callback = self.delivery_callback - ) - producer.flush() - def delivery_callback(self, err, msg): """ Callback function to handle message delivery status. Args: err (KafkaError): Kafka error object. msg (Message): Kafka message object. """ - if err: print(f'Message delivery failed: {err}') - # else: print(f'Message delivered to topic {msg.topic()}') - -# # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- -# @staticmethod -# def fetch_single_node_exporter_metric(): -# """ -# Method to fetch metrics from Node Exporter. -# Returns: -# str: Metrics fetched from Node Exporter. -# """ -# KPI = "node_network_receive_packets_total" -# try: -# response = requests.get(EXPORTER_ENDPOINT) # type: ignore -# LOGGER.info("Request status {:}".format(response)) -# if response.status_code == 200: -# # print(f"Metrics fetched sucessfully...") -# metrics = response.text -# # Check if the desired metric is available in the response -# if KPI in metrics: -# KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI) -# # Extract the metric value -# if KPI_VALUE is not None: -# LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE)) -# print(f"Extracted value of {KPI} is: {KPI_VALUE}") -# return KPI_VALUE -# else: -# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code)) -# # print(f"Failed to fetch metrics. Status code: {response.status_code}") -# return None -# except Exception as e: -# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) -# # print(f"Failed to fetch metrics: {str(e)}") -# return None - -# @staticmethod -# def extract_metric_value(metrics, metric_name): -# """ -# Method to extract the value of a metric from the metrics string. -# Args: -# metrics (str): Metrics string fetched from Exporter. -# metric_name (str): Name of the metric to extract. -# Returns: -# float: Value of the extracted metric, or None if not found. -# """ -# try: -# # Find the metric line containing the desired metric name -# metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name)) -# # Split the line to extract the metric value -# metric_value = float(metric_line.split()[1]) -# return metric_value -# except StopIteration: -# print(f"Metric '{metric_name}' not found in the metrics.") -# return None - -# @staticmethod -# def stream_node_export_metrics_to_raw_topic(): -# try: -# while True: -# response = requests.get(EXPORTER_ENDPOINT) -# # print("Response Status {:} ".format(response)) -# # LOGGER.info("Response Status {:} ".format(response)) -# try: -# if response.status_code == 200: -# producerObj = KafkaProducer(PRODUCER_CONFIG) -# producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback) -# producerObj.flush() -# LOGGER.info("Produce to topic") -# else: -# LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code)) -# print(f"Didn't received expected response. Status code: {response.status_code}") -# return None -# time.sleep(15) -# except Exception as e: -# LOGGER.info("Failed to process response. Status code: {:}".format(e)) -# return None -# except Exception as e: -# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) -# print(f"Failed to fetch metrics: {str(e)}") -# return None -# # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter ----------- \ No newline at end of file + if err: + LOGGER.debug('Message delivery failed: {:}'.format(err)) + print(f'Message delivery failed: {err}') + else: + LOGGER.info('Message delivered to topic {:}'.format(msg.topic())) + print(f'Message delivered to topic {msg.topic()}') diff --git a/src/telemetry/backend/tests/test_TelemetryBackend.py b/src/telemetry/backend/tests/test_TelemetryBackend.py index a2bbee540c3ce348ef52eceb0e776f48a68d94b1..665fa825e3ee31b2e92351d9c5855f627ce40fa1 100644 --- a/src/telemetry/backend/tests/test_TelemetryBackend.py +++ b/src/telemetry/backend/tests/test_TelemetryBackend.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import threading from common.tools.kafka.Variables import KafkaTopic from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService @@ -25,14 +26,13 @@ LOGGER = logging.getLogger(__name__) ########################### # --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) +# def test_validate_kafka_topics(): +# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() +# assert isinstance(response, bool) def test_RunRequestListener(): LOGGER.info('test_RunRequestListener') TelemetryBackendServiceObj = TelemetryBackendService() - response = TelemetryBackendServiceObj.RunRequestListener() + response = threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() LOGGER.debug(str(response)) - assert isinstance(response, bool) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index c72e66bdd53f165ebae131e07f51d23e609dd8be..ad99dff12dc641232972f8cff8226878caefd71b 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -161,9 +161,8 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): # ---------- Independent Method --------------- # Listener method is independent of any method (same lifetime as service) # continously listens for responses - def RunResponseListener(self): + def install_servicers(self): threading.Thread(target=self.ResponseListener).start() - return True def ResponseListener(self): """