diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index b981c038bd95318f81a0536743e7a7fd2a176fae..071890105411f18960c39ecd426af3c8e96f4f37 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -13,22 +13,31 @@ # limitations under the License. -import logging, grpc +import logging, grpc, json +from typing import Dict +from confluent_kafka import Consumer as KafkaConsumer +from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import KafkaError + +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.context_pb2 import Empty from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method - from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer -LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC') - +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC') +ACTIVE_ANALYZERS = [] # In case of sevice restarts, the list can be populated from the DB. class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def __init__(self): LOGGER.info('Init AnalyticsFrontendService') + self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'analytics-frontend', + 'auto.offset.reset' : 'latest'}) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @@ -37,17 +46,64 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ) -> AnalyzerId: # type: ignore LOGGER.info ("At Service gRPC message: {:}".format(request)) response = AnalyzerId() - + self.PublishStartRequestOnKafka(request) return response - + + def PublishStartRequestOnKafka(self, analyzer_obj): + """ + Method to generate analyzer request on Kafka. + """ + analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid + analyzer_to_generate : Dict = { + "algo_name" : analyzer_obj.algorithm_name, + "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], + "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], + "oper_mode" : analyzer_obj.operation_mode + } + self.kafka_producer.produce( + KafkaTopic.VALUE.value, + key = analyzer_uuid, + value = json.dumps(analyzer_to_generate), + callback = self.delivery_callback + ) + LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate)) + ACTIVE_ANALYZERS.append(analyzer_uuid) + self.kafka_producer.flush() + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StopAnalyzer(self, request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore ) -> Empty: # type: ignore LOGGER.info ("At Service gRPC message: {:}".format(request)) - + self.PublishStopRequestOnKafka(request) return Empty() + def PublishStopRequestOnKafka(self, analyzer_id): + """ + Method to generate stop analyzer request on Kafka. + """ + analyzer_uuid = analyzer_id.analyzer_id.uuid + analyzer_to_stop : Dict = { + "algo_name" : -1, + "input_kpis" : [], + "output_kpis" : [], + "oper_mode" : -1 + } + self.kafka_producer.produce( + KafkaTopic.VALUE.value, + key = analyzer_uuid, + value = json.dumps(analyzer_to_stop), + callback = self.delivery_callback + ) + LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid)) + self.kafka_producer.flush() + try: + ACTIVE_ANALYZERS.remove(analyzer_uuid) + except ValueError: + LOGGER.warning('Analyzer ID {:} not found in active analyzers'.format(analyzer_uuid)) + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectAnalyzers(self, request : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore @@ -56,3 +112,11 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): response = AnalyzerList() return response + + def delivery_callback(self, err, msg): + if err: + LOGGER.debug('Message delivery failed: {:}'.format(err)) + print('Message delivery failed: {:}'.format(err)) + # else: + # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) + # print('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py index 1aaf8dd47503f74864b70fc58687035518d37788..04653857de39e42c996a2fa63783b3f8db97eacf 100644 --- a/src/analytics/frontend/tests/messages.py +++ b/src/analytics/frontend/tests/messages.py @@ -28,9 +28,15 @@ def create_analyzer(): _create_analyzer.algorithm_name = "some_algo_name" _kpi_id = KpiId() - _kpi_id.kpi_id.uuid = str(uuid.uuid4()) # input IDs to analyze + # input IDs to analyze + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.input_kpi_ids.append(_kpi_id) - _kpi_id.kpi_id.uuid = str(uuid.uuid4()) # output IDs after analysis + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.input_kpi_ids.append(_kpi_id) + # output IDs after analysis + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.output_kpi_ids.append(_kpi_id) _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING @@ -49,10 +55,16 @@ def create_analyzer_filter(): _input_kpi_id_obj = KpiId() _input_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj) + # another input kpi Id + # _input_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) + # _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj) _output_kpi_id_obj = KpiId() _output_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer_filter.output_kpi_ids.append(_output_kpi_id_obj) + # another output kpi Id + # _output_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) + # _create_analyzer_filter.input_kpi_ids.append(_output_kpi_id_obj) return _create_analyzer_filter