Skip to content
Snippets Groups Projects
Commit c0059504 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes to Analytics Frontend for Managing Stream Response:

- Added `KpiValue` message type in `AnalyticsFrontend.proto`.
- Changed `StartAnalyzer` response type from message to stream.
- Added `StartResponseListener()` call and response handler in the `StartAnalyzer` method.
- Implemented `ConvertValueToKpiValueType` to handle `OneOf` type conversion.
- Added tests to handle stream responses.
parent b0108a9e
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!266Resolve: "Unable to correctly extract the aggregation function names from the dictionary received as parameters in the Analyzer message"
...@@ -17,10 +17,11 @@ package analytics_frontend; ...@@ -17,10 +17,11 @@ package analytics_frontend;
import "context.proto"; import "context.proto";
import "kpi_manager.proto"; import "kpi_manager.proto";
import "kpi_value_api.proto";
//import "kpi_sample_types.proto"; //import "kpi_sample_types.proto";
service AnalyticsFrontendService { service AnalyticsFrontendService {
rpc StartAnalyzer (Analyzer ) returns (AnalyzerId ) {} rpc StartAnalyzer (Analyzer ) returns (stream kpi_value_api.KpiValue) {}
rpc StopAnalyzer (AnalyzerId ) returns (context.Empty) {} rpc StopAnalyzer (AnalyzerId ) returns (context.Empty) {}
rpc SelectAnalyzers(AnalyzerFilter) returns (AnalyzerList ) {} rpc SelectAnalyzers(AnalyzerFilter) returns (AnalyzerList ) {}
} }
......
...@@ -15,11 +15,13 @@ ...@@ -15,11 +15,13 @@
import logging, grpc, json, queue import logging, grpc, json, queue
from datetime import datetime
from typing import Dict from typing import Dict
from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import KafkaError from confluent_kafka import KafkaError
from common.proto.kpi_value_api_pb2 import KpiValueType, KpiValue
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.context_pb2 import Empty from common.proto.context_pb2 import Empty
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
...@@ -48,17 +50,21 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -48,17 +50,21 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StartAnalyzer(self, def StartAnalyzer(self,
request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore
) -> AnalyzerId: # type: ignore ) -> KpiValue: # type: ignore
LOGGER.info ("At Service gRPC message: {:}".format(request)) LOGGER.info ("At Service gRPC message: {:}".format(request))
response = AnalyzerId() response = KpiValue()
self.db_obj.add_row_to_db( self.db_obj.add_row_to_db(
AnalyzerModel.ConvertAnalyzerToRow(request) AnalyzerModel.ConvertAnalyzerToRow(request)
) )
self.PublishStartRequestOnKafka(request) self.PublishStartRequestOnKafka(request)
for key, value in self.StartResponseListener(request.analyzer_id.analyzer_id.uuid):
response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid # LOGGER.debug("Response from ---> {:}, {:}".format(key, value))
return response response.kpi_id.kpi_id.uuid = value['kpi_id']
response.timestamp.timestamp = datetime.strptime(value['time_stamp'], "%Y-%m-%dT%H:%M:%SZ").timestamp()
response.kpi_value_type.CopyFrom(self.ConverValueToKpiValueType(value['kpi_value']))
yield response
# response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid
def PublishStartRequestOnKafka(self, analyzer_obj): def PublishStartRequestOnKafka(self, analyzer_obj):
""" """
...@@ -203,12 +209,24 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -203,12 +209,24 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
LOGGER.info('Unable to process filter response {:}'.format(e)) LOGGER.info('Unable to process filter response {:}'.format(e))
except Exception as e: except Exception as e:
LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e)) LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e))
def ConverValueToKpiValueType(self, value):
kpi_value_type = KpiValueType()
if isinstance(value, int): # Check for integer type
kpi_value_type.int32Val = value # Assuming int32Val for example
elif isinstance(value, float): # Check for float type
kpi_value_type.floatVal = value
elif isinstance(value, str): # Check for string type
kpi_value_type.stringVal = value
elif isinstance(value, bool): # Check for boolean type
kpi_value_type.boolVal = value
# Add other checks for different types as needed
return kpi_value_type
def delivery_callback(self, err, msg): def delivery_callback(self, err, msg):
if err: if err:
LOGGER.debug('Message delivery failed: {:}'.format(err)) LOGGER.debug('Message delivery failed: {:}'.format(err))
print ('Message delivery failed: {:}'.format(err)) print ('Message delivery failed: {:}'.format(err))
# else: else:
# LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
# print('Message delivered to topic {:}'.format(msg.topic())) print('Message delivered to topic {:}'.format(msg.topic()))
...@@ -25,7 +25,7 @@ from common.Settings import ( get_service_port_grpc, get_env_var_name, ...@@ -25,7 +25,7 @@ from common.Settings import ( get_service_port_grpc, get_env_var_name,
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC ) ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC )
from common.tools.kafka.Variables import KafkaTopic from common.tools.kafka.Variables import KafkaTopic
from common.proto.analytics_frontend_pb2 import AnalyzerId, AnalyzerList from common.proto.kpi_value_api_pb2 import KpiValue
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService
from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer, from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer,
...@@ -89,12 +89,13 @@ def test_validate_kafka_topics(): ...@@ -89,12 +89,13 @@ def test_validate_kafka_topics():
response = KafkaTopic.create_all_topics() response = KafkaTopic.create_all_topics()
assert isinstance(response, bool) assert isinstance(response, bool)
# ----- core funtionality test ----- # # ----- core funtionality test -----
# def test_StartAnalytics(analyticsFrontend_client): def test_StartAnalytics(analyticsFrontend_client):
# LOGGER.info(' >>> test_StartAnalytic START: <<< ') LOGGER.info(' >>> test_StartAnalytic START: <<< ')
# response = analyticsFrontend_client.StartAnalyzer(create_analyzer()) stream = analyticsFrontend_client.StartAnalyzer(create_analyzer())
# LOGGER.debug(str(response)) for response in stream:
# assert isinstance(response, AnalyzerId) LOGGER.debug(str(response))
assert isinstance(response, KpiValue)
# To test start and stop listener together # To test start and stop listener together
def test_StartStopAnalyzers(analyticsFrontend_client): def test_StartStopAnalyzers(analyticsFrontend_client):
...@@ -131,4 +132,4 @@ def test_StartStopAnalyzers(analyticsFrontend_client): ...@@ -131,4 +132,4 @@ def test_StartStopAnalyzers(analyticsFrontend_client):
# class_obj = AnalyticsFrontendServiceServicerImpl() # class_obj = AnalyticsFrontendServiceServicerImpl()
# for response in class_obj.StartResponseListener(analyzer_id.analyzer_id.uuid): # for response in class_obj.StartResponseListener(analyzer_id.analyzer_id.uuid):
# LOGGER.debug(response) # LOGGER.debug(response)
# assert isinstance(response, tuple) # assert isinstance(response, tuple)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment