Skip to content
Snippets Groups Projects
Commit 798c1f47 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes to Analytics Frontend for Receiving Stream Response:

- Added `AnalyzerAlarms` message type in the Proto file.
- Changed the return type of `StartAnalyzer` to `AnalyzerAlarms`.
parent 821e14f8
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!266Resolve: "Unable to correctly extract the aggregation function names from the dictionary received as parameters in the Analyzer message"
...@@ -17,11 +17,10 @@ package analytics_frontend; ...@@ -17,11 +17,10 @@ package analytics_frontend;
import "context.proto"; import "context.proto";
import "kpi_manager.proto"; import "kpi_manager.proto";
import "kpi_value_api.proto";
//import "kpi_sample_types.proto"; //import "kpi_sample_types.proto";
service AnalyticsFrontendService { service AnalyticsFrontendService {
rpc StartAnalyzer (Analyzer ) returns (stream kpi_value_api.KpiValue) {} rpc StartAnalyzer (Analyzer ) returns (stream AnalyzerAlarms) {}
rpc StopAnalyzer (AnalyzerId ) returns (context.Empty) {} rpc StopAnalyzer (AnalyzerId ) returns (context.Empty) {}
rpc SelectAnalyzers(AnalyzerFilter) returns (AnalyzerList ) {} rpc SelectAnalyzers(AnalyzerFilter) returns (AnalyzerList ) {}
} }
...@@ -52,6 +51,13 @@ message Analyzer { ...@@ -52,6 +51,13 @@ message Analyzer {
uint64 batch_max_size = 11; // ..., max number of samples collected to execute the batch uint64 batch_max_size = 11; // ..., max number of samples collected to execute the batch
} }
message AnalyzerAlarms {
context.Timestamp start_timestamp = 1;
context.Timestamp end_timestamp = 2;
kpi_manager.KpiId kpi_id = 3;
map<string, bool> alarms = 4;
}
message AnalyzerFilter { message AnalyzerFilter {
// Analyzer that fulfill the filter are those that match ALL the following fields. // Analyzer that fulfill the filter are those that match ALL the following fields.
// An empty list means: any value is accepted. // An empty list means: any value is accepted.
......
...@@ -21,11 +21,10 @@ from confluent_kafka import Consumer as KafkaConsumer ...@@ -21,11 +21,10 @@ from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import KafkaError from confluent_kafka import KafkaError
from common.proto.kpi_value_api_pb2 import KpiValueType, KpiValue
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.context_pb2 import Empty from common.proto.context_pb2 import Empty
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList, AnalyzerAlarms
from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer
from analytics.database.Analyzer_DB import AnalyzerDB from analytics.database.Analyzer_DB import AnalyzerDB
from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel
...@@ -38,7 +37,7 @@ METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC') ...@@ -38,7 +37,7 @@ METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC')
class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
def __init__(self): def __init__(self):
LOGGER.info('Init AnalyticsFrontendService') LOGGER.info('Init AnalyticsFrontendService')
self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value self.listener_topic = KafkaTopic.ALARMS.value
self.db_obj = AnalyzerDB() self.db_obj = AnalyzerDB()
self.result_queue = queue.Queue() self.result_queue = queue.Queue()
self.scheduler = BackgroundScheduler() self.scheduler = BackgroundScheduler()
...@@ -50,21 +49,26 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -50,21 +49,26 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StartAnalyzer(self, def StartAnalyzer(self,
request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore
) -> KpiValue: # type: ignore ) -> AnalyzerAlarms: # type: ignore
LOGGER.info ("At Service gRPC message: {:}".format(request)) LOGGER.info ("At Service gRPC message: {:}".format(request))
response = KpiValue() response = AnalyzerAlarms()
self.db_obj.add_row_to_db( self.db_obj.add_row_to_db(
AnalyzerModel.ConvertAnalyzerToRow(request) AnalyzerModel.ConvertAnalyzerToRow(request)
) )
self.PublishStartRequestOnKafka(request) self.PublishStartRequestOnKafka(request)
for key, value in self.StartResponseListener(request.analyzer_id.analyzer_id.uuid): for alarm_key, value in self.StartResponseListener(request.analyzer_id.analyzer_id.uuid):
# LOGGER.debug("Response from ---> {:}, {:}".format(key, value)) response.start_timestamp.timestamp = datetime.strptime(
value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
response.end_timestamp.timestamp = datetime.strptime(
value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
response.kpi_id.kpi_id.uuid = value['kpi_id'] response.kpi_id.kpi_id.uuid = value['kpi_id']
response.timestamp.timestamp = datetime.strptime(value['time_stamp'], "%Y-%m-%dT%H:%M:%SZ").timestamp() for key, threshold in value.items():
response.kpi_value_type.CopyFrom(self.ConverValueToKpiValueType(value['kpi_value'])) if "THRESHOLD_" in key:
LOGGER.debug("-- {:} - {:}".format(key, threshold))
response.alarms[key] = threshold
yield response yield response
# response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid
def PublishStartRequestOnKafka(self, analyzer_obj): def PublishStartRequestOnKafka(self, analyzer_obj):
""" """
...@@ -89,8 +93,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -89,8 +93,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
) )
LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate)) LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate))
self.kafka_producer.flush() self.kafka_producer.flush()
# self.StartResponseListener(analyzer_uuid)
def StartResponseListener(self, filter_key=None): def StartResponseListener(self, filter_key=None):
""" """
...@@ -141,9 +143,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -141,9 +143,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
LOGGER.info(f"Received key: {key}, value: {value}") LOGGER.info(f"Received key: {key}, value: {value}")
self.result_queue.put((key, value)) self.result_queue.put((key, value))
else: else:
LOGGER.info(f"Skipping message with unmatched key: {key}") LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}")
# value = json.loads(msg.value().decode('utf-8')) # Added for debugging
# self.result_queue.put((filter_key, value)) # Added for debugging
except Exception as e: except Exception as e:
LOGGER.error(f"Error processing Kafka message: {e}") LOGGER.error(f"Error processing Kafka message: {e}")
...@@ -210,19 +210,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -210,19 +210,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
except Exception as e: except Exception as e:
LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e)) LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e))
def ConverValueToKpiValueType(self, value):
kpi_value_type = KpiValueType()
if isinstance(value, int): # Check for integer type
kpi_value_type.int32Val = value # Assuming int32Val for example
elif isinstance(value, float): # Check for float type
kpi_value_type.floatVal = value
elif isinstance(value, str): # Check for string type
kpi_value_type.stringVal = value
elif isinstance(value, bool): # Check for boolean type
kpi_value_type.boolVal = value
# Add other checks for different types as needed
return kpi_value_type
def delivery_callback(self, err, msg): def delivery_callback(self, err, msg):
if err: if err:
LOGGER.debug('Message delivery failed: {:}'.format(err)) LOGGER.debug('Message delivery failed: {:}'.format(err))
......
...@@ -26,6 +26,7 @@ from common.Settings import ( get_service_port_grpc, get_env_var_name, ...@@ -26,6 +26,7 @@ from common.Settings import ( get_service_port_grpc, get_env_var_name,
from common.tools.kafka.Variables import KafkaTopic from common.tools.kafka.Variables import KafkaTopic
from common.proto.kpi_value_api_pb2 import KpiValue from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.analytics_frontend_pb2 import AnalyzerAlarms
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService
from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer, from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment