diff --git a/proto/analytics_frontend.proto b/proto/analytics_frontend.proto index ace0581db816bee1d0d20746f2b864dce602567b..bb6afb2e424b646fd8d9a6810c38a8a6863c230f 100644 --- a/proto/analytics_frontend.proto +++ b/proto/analytics_frontend.proto @@ -20,7 +20,7 @@ import "kpi_manager.proto"; //import "kpi_sample_types.proto"; service AnalyticsFrontendService { - rpc StartAnalyzer (Analyzer ) returns (AnalyzerId ) {} + rpc StartAnalyzer (Analyzer ) returns (AnalyzerId) {} rpc StopAnalyzer (AnalyzerId ) returns (context.Empty) {} rpc SelectAnalyzers(AnalyzerFilter) returns (AnalyzerList ) {} } diff --git a/proto/kpi_value_api.proto b/proto/kpi_value_api.proto index dff96272e3d05756dd19a49ecaede7311b196540..4d3a1f216406841344d40712ea04ec82cedf04d0 100644 --- a/proto/kpi_value_api.proto +++ b/proto/kpi_value_api.proto @@ -19,8 +19,9 @@ import "context.proto"; import "kpi_manager.proto"; service KpiValueAPIService { - rpc StoreKpiValues (KpiValueList) returns (context.Empty) {} - rpc SelectKpiValues (KpiValueFilter) returns (KpiValueList) {} + rpc StoreKpiValues (KpiValueList) returns (context.Empty) {} + rpc SelectKpiValues (KpiValueFilter) returns (KpiValueList) {} + rpc GetKpiAlarms (kpi_manager.KpiId) returns (stream KpiAlarms) {} } message KpiValue { @@ -50,3 +51,10 @@ message KpiValueFilter { repeated context.Timestamp start_timestamp = 2; repeated context.Timestamp end_timestamp = 3; } + +message KpiAlarms { + context.Timestamp start_timestamp = 1; + context.Timestamp end_timestamp = 2; + kpi_manager.KpiId kpi_id = 3; + map<string, bool> alarms = 4; +} diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index d4a53745956f8007c7f7ad0859e44f046e5fe989..658d237956b4ed3addbbc295ef0d19dd4b977257 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -78,20 +78,17 @@ class AnalyticsBackendService(GenericGrpcService): LOGGER.error("Failed to terminate analytics backend {:}".format(e)) return False - def install_services(self): - stop_event = threading.Event() - thread = threading.Thread(target=self.RequestListener, - args=(stop_event,) ) - thread.start() - return (thread, stop_event) + def install_servicers(self): + threading.Thread(target=self.RequestListener, args=()).start() - def RequestListener(self, stop_event): + def RequestListener(self): """ listener for requests on Kafka topic. """ + LOGGER.info("Request Listener is initiated ...") consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) - while not stop_event.is_set(): + while True: receive_msg = consumer.poll(2.0) if receive_msg is None: continue @@ -101,7 +98,7 @@ class AnalyticsBackendService(GenericGrpcService): else: print("Consumer error: {}".format(receive_msg.error())) break - analyzer = json.loads(receive_msg.value().decode('utf-8')) + analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer_uuid = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py index 96e1aa05d898ffdd23c533b74ee87fbf03f54576..f204c6247436177cd032c777c048ecb165051ec2 100644 --- a/src/analytics/backend/service/SparkStreaming.py +++ b/src/analytics/backend/service/SparkStreaming.py @@ -33,7 +33,7 @@ def SettingKafkaConsumerParams(): # TODO: create get_kafka_consumer() in comm return { # "kafka.bootstrap.servers": '127.0.0.1:9092', "kafka.bootstrap.servers": KafkaConfig.get_kafka_address(), - "subscribe" : KafkaTopic.VALUE.value, + "subscribe" : KafkaTopic.VALUE.value, # topic should have atleast one message before spark session "startingOffsets" : 'latest', "failOnDataLoss" : 'false' # Optional: Set to "true" to fail the query on data loss } @@ -64,7 +64,7 @@ def ApplyThresholds(aggregated_df, thresholds): for col_name, (fail_th, raise_th) in thresholds.items(): # Apply TH-Fail condition (if column value is less than the fail threshold) aggregated_df = aggregated_df.withColumn( - f"{col_name}_THRESHOLD_FAIL", + f"{col_name}_THRESHOLD_FALL", when(col(col_name) < fail_th, True).otherwise(False) ) # Apply TH-RAISE condition (if column value is greater than the raise threshold) @@ -128,11 +128,11 @@ def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event, # --- This will write output to Kafka: ACTUAL IMPLEMENTATION query = thresholded_stream_data \ - .selectExpr(f"'{key}' AS key", "to_json(struct(*)) AS value") \ + .selectExpr(f"CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \ - .option("topic", KafkaTopic.ANALYTICS_RESPONSE.value) \ + .option("topic", KafkaTopic.ALARMS.value) \ .option("checkpointLocation", "analytics/.spark/checkpoint") \ .outputMode("update") diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py index 9acd6ad9dffe4a5b10b107a6923ed85170ee141f..c3b78967efe13eef9a60e19e50e56bdfca4a410d 100644 --- a/src/analytics/backend/tests/messages.py +++ b/src/analytics/backend/tests/messages.py @@ -12,6 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import uuid +import json +from common.proto.kpi_manager_pb2 import KpiId +from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, + Analyzer ) def get_kpi_id_list(): return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"] @@ -32,3 +37,37 @@ def get_threshold_dict(): return { op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict } + + +def create_analyzer(): + _create_analyzer = Analyzer() + # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" + _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING + + _kpi_id = KpiId() + # input IDs to analyze + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _create_analyzer.input_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.input_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.input_kpi_ids.append(_kpi_id) + # output IDs after analysis + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + # parameter + _threshold_dict = { + # 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), + 'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)} + _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) + _create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" + _create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size + _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet + + return _create_analyzer \ No newline at end of file diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 2f40faba94ef7081db609116e8fd869e3d119a24..9221bb23ee041da06a4c1f401c75d1906f6748b0 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time +import time, json +from typing import Dict import logging import threading from common.tools.kafka.Variables import KafkaTopic from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict +from .messages import create_analyzer LOGGER = logging.getLogger(__name__) @@ -32,6 +34,24 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) +def test_StartSparkStreamer(): + LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") + analyzer_obj = create_analyzer() + analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid + analyzer_to_generate : Dict = { + "algo_name" : analyzer_obj.algorithm_name, + "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], + "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], + "oper_mode" : analyzer_obj.operation_mode, + "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), + "window_size" : analyzer_obj.parameters["window_size"], + "window_slider" : analyzer_obj.parameters["window_slider"], + # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] + } + AnalyticsBackendServiceObj = AnalyticsBackendService() + response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) + assert isinstance(response, bool) + # def test_StartRequestListener(): # LOGGER.info('test_RunRequestListener') # AnalyticsBackendServiceObj = AnalyticsBackendService() diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index 8bb6a17afb5b911e3652fdb8d1853b5b7bc6faf3..baa88a8b7de2a06b569cf61bb248b281ce5715c9 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -12,13 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. - -import logging, grpc, json, queue +import logging, grpc, json from typing import Dict -from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Producer as KafkaProducer -from confluent_kafka import KafkaError from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.context_pb2 import Empty @@ -27,8 +24,7 @@ from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, Analy from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer from analytics.database.Analyzer_DB import AnalyzerDB from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel -from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.triggers.interval import IntervalTrigger + LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC') @@ -36,19 +32,13 @@ METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC') class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def __init__(self): LOGGER.info('Init AnalyticsFrontendService') - self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value self.db_obj = AnalyzerDB() - self.result_queue = queue.Queue() - self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) - self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'analytics-frontend', - 'auto.offset.reset' : 'latest'}) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartAnalyzer(self, request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore - ) -> AnalyzerId: # type: ignore + ) -> AnalyzerAlarms: # type: ignore LOGGER.info ("At Service gRPC message: {:}".format(request)) response = AnalyzerId() @@ -56,7 +46,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): AnalyzerModel.ConvertAnalyzerToRow(request) ) self.PublishStartRequestOnKafka(request) - response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid return response @@ -83,63 +72,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ) LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate)) self.kafka_producer.flush() - - # self.StartResponseListener(analyzer_uuid) - - def StartResponseListener(self, filter_key=None): - """ - Start the Kafka response listener with APScheduler and return key-value pairs periodically. - """ - LOGGER.info("Starting StartResponseListener") - # Schedule the ResponseListener at fixed intervals - self.scheduler.add_job( - self.response_listener, - trigger=IntervalTrigger(seconds=5), - args=[filter_key], - id=f"response_listener_{self.listener_topic}", - replace_existing=True - ) - self.scheduler.start() - LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...") - try: - while True: - LOGGER.info("entering while...") - key, value = self.result_queue.get() # Wait until a result is available - LOGGER.info("In while true ...") - yield key, value # Yield the result to the calling function - except KeyboardInterrupt: - LOGGER.warning("Listener stopped manually.") - finally: - self.StopListener() - - def response_listener(self, filter_key=None): - """ - Poll Kafka messages and put key-value pairs into the queue. - """ - LOGGER.info(f"Polling Kafka topic {self.listener_topic}...") - - consumer = self.kafka_consumer - consumer.subscribe([self.listener_topic]) - msg = consumer.poll(2.0) - if msg is None: - return - elif msg.error(): - if msg.error().code() != KafkaError._PARTITION_EOF: - LOGGER.error(f"Kafka error: {msg.error()}") - return - - try: - key = msg.key().decode('utf-8') if msg.key() else None - if filter_key is not None and key == filter_key: - value = json.loads(msg.value().decode('utf-8')) - LOGGER.info(f"Received key: {key}, value: {value}") - self.result_queue.put((key, value)) - else: - LOGGER.info(f"Skipping message with unmatched key: {key}") - # value = json.loads(msg.value().decode('utf-8')) # Added for debugging - # self.result_queue.put((filter_key, value)) # Added for debugging - except Exception as e: - LOGGER.error(f"Error processing Kafka message: {e}") @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StopAnalyzer(self, @@ -175,15 +107,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ) LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid)) self.kafka_producer.flush() - self.StopListener() - - def StopListener(self): - """ - Gracefully stop the Kafka listener and the scheduler. - """ - LOGGER.info("Stopping Kafka listener...") - self.scheduler.shutdown() - LOGGER.info("Kafka listener stopped.") @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectAnalyzers(self, @@ -203,12 +126,11 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): LOGGER.info('Unable to process filter response {:}'.format(e)) except Exception as e: LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e)) - def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) print ('Message delivery failed: {:}'.format(err)) - # else: - # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) - # print('Message delivered to topic {:}'.format(msg.topic())) + else: + LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) + print('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index d2428c01fb021f71a884d9a99c446bfef6e66559..9f5c040f366b02a6fea27e8e8696c0c118ece05a 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -25,7 +25,8 @@ from common.Settings import ( get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC ) from common.tools.kafka.Variables import KafkaTopic -from common.proto.analytics_frontend_pb2 import AnalyzerId, AnalyzerList +from common.proto.kpi_value_api_pb2 import KpiValue +from common.proto.analytics_frontend_pb2 import AnalyzerAlarms from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer, @@ -89,12 +90,13 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) -# ----- core funtionality test ----- -# def test_StartAnalytics(analyticsFrontend_client): -# LOGGER.info(' >>> test_StartAnalytic START: <<< ') -# response = analyticsFrontend_client.StartAnalyzer(create_analyzer()) -# LOGGER.debug(str(response)) -# assert isinstance(response, AnalyzerId) +# # ----- core funtionality test ----- +def test_StartAnalytics(analyticsFrontend_client): + LOGGER.info(' >>> test_StartAnalytic START: <<< ') + stream = analyticsFrontend_client.StartAnalyzer(create_analyzer()) + for response in stream: + LOGGER.debug(str(response)) + assert isinstance(response, KpiValue) # To test start and stop listener together def test_StartStopAnalyzers(analyticsFrontend_client): @@ -131,4 +133,4 @@ def test_StartStopAnalyzers(analyticsFrontend_client): # class_obj = AnalyticsFrontendServiceServicerImpl() # for response in class_obj.StartResponseListener(analyzer_id.analyzer_id.uuid): # LOGGER.debug(response) -# assert isinstance(response, tuple) \ No newline at end of file +# assert isinstance(response, tuple) diff --git a/src/kpi_value_api/client/KpiValueApiClient.py b/src/kpi_value_api/client/KpiValueApiClient.py index f432271cfb7c8136f72156330b25d0b82b934d99..dfc5f07254a30db34a20ee8d0eae931cfd0ce571 100644 --- a/src/kpi_value_api/client/KpiValueApiClient.py +++ b/src/kpi_value_api/client/KpiValueApiClient.py @@ -15,17 +15,18 @@ import grpc, logging from common.Constants import ServiceNameEnum -from common.Settings import get_service_host, get_service_port_grpc -from common.tools.client.RetryDecorator import retry, delay_exponential -from common.tools.grpc.Tools import grpc_message_to_json_string +from common.Settings import get_service_host, get_service_port_grpc -from common.proto.context_pb2 import Empty -from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter +from common.tools.client.RetryDecorator import retry, delay_exponential +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.proto.context_pb2 import Empty +from common.proto.kpi_manager_pb2 import KpiId +from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiAlarms from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceStub -LOGGER = logging.getLogger(__name__) -MAX_RETRIES = 10 -DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 10 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') class KpiValueApiClient: @@ -34,8 +35,8 @@ class KpiValueApiClient: if not port: port = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI) self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) - self.channel = None - self.stub = None + self.channel = None + self.stub = None self.connect() LOGGER.debug('Channel created') @@ -46,18 +47,25 @@ class KpiValueApiClient: def close(self): if self.channel is not None: self.channel.close() self.channel = None - self.stub = None + self.stub = None @RETRY_DECORATOR - def StoreKpiValues(self, request: KpiValueList) -> Empty: + def StoreKpiValues(self, request: KpiValueList) -> Empty: # type: ignore LOGGER.debug('StoreKpiValues: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.StoreKpiValues(request) LOGGER.debug('StoreKpiValues result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def SelectKpiValues(self, request: KpiValueFilter) -> KpiValueList: + def SelectKpiValues(self, request: KpiValueFilter) -> KpiValueList: # type: ignore LOGGER.debug('SelectKpiValues: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SelectKpiValues(request) LOGGER.debug('SelectKpiValues result: {:s}'.format(grpc_message_to_json_string(response))) return response + + @RETRY_DECORATOR + def GetKpiAlarms(self, request: KpiId) -> KpiAlarms: # type: ignore + LOGGER.debug('GetKpiAlarms: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetKpiAlarms(request) + LOGGER.debug('GetKpiAlarms result: {:s}'.format(grpc_message_to_json_string(response))) + return response diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 4ea978fafc8d7454d41f64182d553d030215113a..0f57f88219a74108a555cf87e9bdb98999fd5da2 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -12,18 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, grpc, json +from datetime import datetime +import logging, grpc, json, queue from typing import Dict from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from confluent_kafka import KafkaError from common.proto.context_pb2 import Empty from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer -from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType - +from common.proto.kpi_value_api_pb2 import KpiAlarms, KpiValueList, KpiValueFilter, KpiValue, KpiValueType +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import Consumer as KafkaConsumer from prometheus_api_client import PrometheusConnect from prometheus_api_client.utils import parse_datetime @@ -37,8 +41,14 @@ PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TO class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): def __init__(self): LOGGER.debug('Init KpiValueApiService') + self.listener_topic = KafkaTopic.ALARMS.value + self.result_queue = queue.Queue() + self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) - + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'analytics-frontend', + 'auto.offset.reset' : 'latest'}) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext ) -> Empty: @@ -109,17 +119,14 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): kpi_value = KpiValue() kpi_value.kpi_id.kpi_id = record['metric']['__name__'], kpi_value.timestamp = value[0], - kpi_value.kpi_value_type = self.ConverValueToKpiValueType(value[1]) + kpi_value.kpi_value_type.CopyFrom(self.ConverValueToKpiValueType(value['kpi_value'])) response.kpi_value_list.append(kpi_value) return response def GetKpiSampleType(self, kpi_value: str, kpi_manager_client): - print("--- START -----") - kpi_id = KpiId() kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid # print("KpiId generated: {:}".format(kpi_id)) - try: kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) @@ -135,26 +142,91 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e)) print ("Unable to get KpiDescriptor. Error: {:}".format(e)) - def ConverValueToKpiValueType(self, value): - # Check if the value is an integer (int64) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def GetKpiAlarms(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiAlarms: # type: ignore + """ + Get Alarms from Kafka return Alrams periodically. + """ + LOGGER.debug('GetKpiAlarms: {:}'.format(request)) + response = KpiAlarms() + + for alarm_key, value in self.StartResponseListener(request.kpi_id.uuid): + response.start_timestamp.timestamp = datetime.strptime( + value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() + response.end_timestamp.timestamp = datetime.strptime( + value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() + response.kpi_id.kpi_id.uuid = value['kpi_id'] + for key, threshold in value.items(): + if "THRESHOLD_" in key: + response.alarms[key] = threshold + + yield response + + def StartResponseListener(self, filter_key=None): + """ + Start the Kafka response listener with APScheduler and return key-value pairs periodically. + """ + LOGGER.info("Starting StartResponseListener") + # Schedule the ResponseListener at fixed intervals + self.scheduler.add_job( + self.response_listener, + trigger=IntervalTrigger(seconds=5), + args=[filter_key], + id=f"response_listener_{self.listener_topic}", + replace_existing=True + ) + self.scheduler.start() + LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...") try: - int_value = int(value) - return KpiValueType(int64Val=int_value) - except (ValueError, TypeError): - pass - # Check if the value is a float + while True: + LOGGER.info("entering while...") + key, value = self.result_queue.get() # Wait until a result is available + LOGGER.info("In while true ...") + yield key, value # Yield the result to the calling function + except KeyboardInterrupt: + LOGGER.warning("Listener stopped manually.") + finally: + self.StopListener() + + def response_listener(self, filter_key=None): + """ + Poll Kafka messages and put key-value pairs into the queue. + """ + LOGGER.info(f"Polling Kafka topic {self.listener_topic}...") + + consumer = self.kafka_consumer + consumer.subscribe([self.listener_topic]) + msg = consumer.poll(2.0) + if msg is None: + return + elif msg.error(): + if msg.error().code() != KafkaError._PARTITION_EOF: + LOGGER.error(f"Kafka error: {msg.error()}") + return try: - float_value = float(value) - return KpiValueType(floatVal=float_value) - except (ValueError, TypeError): - pass - # Check if the value is a boolean - if value.lower() in ['true', 'false']: - bool_value = value.lower() == 'true' - return KpiValueType(boolVal=bool_value) - # If none of the above, treat it as a string - return KpiValueType(stringVal=value) + key = msg.key().decode('utf-8') if msg.key() else None + if filter_key is not None and key == filter_key: + value = json.loads(msg.value().decode('utf-8')) + LOGGER.info(f"Received key: {key}, value: {value}") + self.result_queue.put((key, value)) + else: + LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}") + except Exception as e: + LOGGER.error(f"Error processing Kafka message: {e}") def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) + + def ConverValueToKpiValueType(self, value): + kpi_value_type = KpiValueType() + if isinstance(value, int): + kpi_value_type.int32Val = value + elif isinstance(value, float): + kpi_value_type.floatVal = value + elif isinstance(value, str): + kpi_value_type.stringVal = value + elif isinstance(value, bool): + kpi_value_type.boolVal = value + # Add other checks for different types as needed + return kpi_value_type diff --git a/src/kpi_value_api/tests/messages.py b/src/kpi_value_api/tests/messages.py index d8ad14bd44eebc1e9412cfd5ff2973e6018c95e9..50240c0154deff33dfdbb797cd5e0fca9a05c8ab 100644 --- a/src/kpi_value_api/tests/messages.py +++ b/src/kpi_value_api/tests/messages.py @@ -13,9 +13,16 @@ # limitations under the License. import uuid, time +from common.proto import kpi_manager_pb2 from common.proto.kpi_value_api_pb2 import KpiValue, KpiValueList +def create_kpi_id_request(): + _create_kpi_id = kpi_manager_pb2.KpiId() + _create_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + # _create_kpi_id.kpi_id.uuid = str(uuid.uuid4()) + return _create_kpi_id + def create_kpi_value_list(): _create_kpi_value_list = KpiValueList() # To run this experiment sucessfully, add an existing UUID of a KPI Descriptor from the KPI DB. diff --git a/src/kpi_value_api/tests/test_kpi_value_api.py b/src/kpi_value_api/tests/test_kpi_value_api.py index 307b5cdad4e6503a774e308f669fc44762f84bf1..ea6b22585a68d565c8162f1629a6a1ac4a4f6d6a 100644 --- a/src/kpi_value_api/tests/test_kpi_value_api.py +++ b/src/kpi_value_api/tests/test_kpi_value_api.py @@ -21,8 +21,8 @@ from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) from kpi_value_api.service.KpiValueApiService import KpiValueApiService from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient -from kpi_value_api.tests.messages import create_kpi_value_list - +from kpi_value_api.tests.messages import create_kpi_value_list, create_kpi_id_request +from common.proto.kpi_value_api_pb2 import KpiAlarms LOCAL_HOST = '127.0.0.1' KPIVALUEAPI_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI) # type: ignore diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 6ab841238f446a2895cd163fab4b7eb05eaa3176..078fa5896d5fb5033833e0e2ef2248613ef80c18 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -83,7 +83,7 @@ class TelemetryBackendService(GenericGrpcService): thread.join() print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) del self.running_threads[collector_id] - self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend. + self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. else: print ('Backend collector {:} not found'.format(collector_id)) @@ -103,11 +103,28 @@ class TelemetryBackendService(GenericGrpcService): while not stop_event.is_set(): if time.time() - start_time >= collector['duration']: # condition to terminate backend print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time) - self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend. + self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. break self.ExtractKpiValue(collector_id, collector['kpi_id']) time.sleep(collector['interval']) + def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): + """ + Method to write kpi Termination signat on RESPONSE Kafka topic + """ + producer = self.kafka_producer + kpi_value : Dict = { + "kpi_id" : kpi_id, + "kpi_value" : measured_kpi_value, + } + producer.produce( + KafkaTopic.RESPONSE.value, # TODO: to the topic ... + key = collector_id, + value = json.dumps(kpi_value), + callback = self.delivery_callback + ) + producer.flush() + def ExtractKpiValue(self, collector_id: str, kpi_id: str): """ Method to extract kpi value. @@ -123,117 +140,27 @@ class TelemetryBackendService(GenericGrpcService): """ producer = self.kafka_producer kpi_value : Dict = { + "time_stamp": str(time.time()), "kpi_id" : kpi_id, "kpi_value" : measured_kpi_value } producer.produce( - KafkaTopic.RESPONSE.value, + KafkaTopic.VALUE.value, # TODO: to the topic ... key = collector_id, value = json.dumps(kpi_value), callback = self.delivery_callback ) producer.flush() - def GenerateRawMetric(self, metrics: Any): - """ - Method writes raw metrics on VALUE Kafka topic - """ - producer = self.kafka_producer - some_metric : Dict = { - "some_id" : metrics - } - producer.produce( - KafkaTopic.VALUE.value, - key = 'raw', - value = json.dumps(some_metric), - callback = self.delivery_callback - ) - producer.flush() - def delivery_callback(self, err, msg): """ Callback function to handle message delivery status. Args: err (KafkaError): Kafka error object. msg (Message): Kafka message object. """ - if err: print(f'Message delivery failed: {err}') - # else: print(f'Message delivered to topic {msg.topic()}') - -# # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- -# @staticmethod -# def fetch_single_node_exporter_metric(): -# """ -# Method to fetch metrics from Node Exporter. -# Returns: -# str: Metrics fetched from Node Exporter. -# """ -# KPI = "node_network_receive_packets_total" -# try: -# response = requests.get(EXPORTER_ENDPOINT) # type: ignore -# LOGGER.info("Request status {:}".format(response)) -# if response.status_code == 200: -# # print(f"Metrics fetched sucessfully...") -# metrics = response.text -# # Check if the desired metric is available in the response -# if KPI in metrics: -# KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI) -# # Extract the metric value -# if KPI_VALUE is not None: -# LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE)) -# print(f"Extracted value of {KPI} is: {KPI_VALUE}") -# return KPI_VALUE -# else: -# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code)) -# # print(f"Failed to fetch metrics. Status code: {response.status_code}") -# return None -# except Exception as e: -# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) -# # print(f"Failed to fetch metrics: {str(e)}") -# return None - -# @staticmethod -# def extract_metric_value(metrics, metric_name): -# """ -# Method to extract the value of a metric from the metrics string. -# Args: -# metrics (str): Metrics string fetched from Exporter. -# metric_name (str): Name of the metric to extract. -# Returns: -# float: Value of the extracted metric, or None if not found. -# """ -# try: -# # Find the metric line containing the desired metric name -# metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name)) -# # Split the line to extract the metric value -# metric_value = float(metric_line.split()[1]) -# return metric_value -# except StopIteration: -# print(f"Metric '{metric_name}' not found in the metrics.") -# return None - -# @staticmethod -# def stream_node_export_metrics_to_raw_topic(): -# try: -# while True: -# response = requests.get(EXPORTER_ENDPOINT) -# # print("Response Status {:} ".format(response)) -# # LOGGER.info("Response Status {:} ".format(response)) -# try: -# if response.status_code == 200: -# producerObj = KafkaProducer(PRODUCER_CONFIG) -# producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback) -# producerObj.flush() -# LOGGER.info("Produce to topic") -# else: -# LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code)) -# print(f"Didn't received expected response. Status code: {response.status_code}") -# return None -# time.sleep(15) -# except Exception as e: -# LOGGER.info("Failed to process response. Status code: {:}".format(e)) -# return None -# except Exception as e: -# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) -# print(f"Failed to fetch metrics: {str(e)}") -# return None -# # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter ----------- \ No newline at end of file + if err: + LOGGER.debug('Message delivery failed: {:}'.format(err)) + print(f'Message delivery failed: {err}') + else: + LOGGER.info('Message delivered to topic {:}'.format(msg.topic())) + print(f'Message delivered to topic {msg.topic()}') diff --git a/src/telemetry/backend/tests/test_TelemetryBackend.py b/src/telemetry/backend/tests/test_TelemetryBackend.py index a2bbee540c3ce348ef52eceb0e776f48a68d94b1..665fa825e3ee31b2e92351d9c5855f627ce40fa1 100644 --- a/src/telemetry/backend/tests/test_TelemetryBackend.py +++ b/src/telemetry/backend/tests/test_TelemetryBackend.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import threading from common.tools.kafka.Variables import KafkaTopic from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService @@ -25,14 +26,13 @@ LOGGER = logging.getLogger(__name__) ########################### # --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) +# def test_validate_kafka_topics(): +# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() +# assert isinstance(response, bool) def test_RunRequestListener(): LOGGER.info('test_RunRequestListener') TelemetryBackendServiceObj = TelemetryBackendService() - response = TelemetryBackendServiceObj.RunRequestListener() + response = threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() LOGGER.debug(str(response)) - assert isinstance(response, bool) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index b73d9fa952ee42aeb7adb8f3c0b2e4a3ba7f3e09..746790bf68fd3d843850fc96db526dcefad59283 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -161,9 +161,8 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): # ---------- Independent Method --------------- # Listener method is independent of any method (same lifetime as service) # continously listens for responses - def RunResponseListener(self): + def install_servicers(self): threading.Thread(target=self.ResponseListener).start() - return True def ResponseListener(self): """