Skip to content
Snippets Groups Projects
Commit 8ac130c1 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Analytics

Proto:
- Added `Duration_s` field to the proto file.

Frontend:
- Added `SelectAnalyzer` logic.
- Improved message formatting in the `create_analyzer_filter()` function.
- Added a test case: `test_SelectAnalytics`.

Backend:
- Renamed the `RunSparkStreamer` method to `StartSparkStreamer`.
- Updated the `StartRequestListener` method to return `(thread, stop_event)`.
- Added a `StopRequestListener` method to stop the listener.

Database:
- Added the `select_with_filter` method with actual logic implementation.
- Updated the `ConvertRowToAnalyzer` method to correctly read the `operation_mode` ENUM value.
parent 54e0014b
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
...@@ -35,18 +35,20 @@ enum AnalyzerOperationMode { ...@@ -35,18 +35,20 @@ enum AnalyzerOperationMode {
ANALYZEROPERATIONMODE_STREAMING = 2; ANALYZEROPERATIONMODE_STREAMING = 2;
} }
// duration field may be added in analyzer...
message Analyzer { message Analyzer {
AnalyzerId analyzer_id = 1; AnalyzerId analyzer_id = 1;
string algorithm_name = 2; // The algorithm to be executed string algorithm_name = 2; // The algorithm to be executed
repeated kpi_manager.KpiId input_kpi_ids = 3; // The KPI Ids to be processed by the analyzer float duration_s = 3; // Termiate the data analytics thread after duration (seconds); 0 = infinity time
repeated kpi_manager.KpiId output_kpi_ids = 4; // The KPI Ids produced by the analyzer repeated kpi_manager.KpiId input_kpi_ids = 4; // The KPI Ids to be processed by the analyzer
AnalyzerOperationMode operation_mode = 5; // Operation mode of the analyzer repeated kpi_manager.KpiId output_kpi_ids = 5; // The KPI Ids produced by the analyzer
map<string, string> parameters = 6; AnalyzerOperationMode operation_mode = 6; // Operation mode of the analyzer
map<string, string> parameters = 7; // Add dictionary of (key, value) pairs such as (window_size, 10) etc.
// In batch mode... // In batch mode...
float batch_min_duration_s = 7; // ..., min duration to collect before executing batch float batch_min_duration_s = 8; // ..., min duration to collect before executing batch
float batch_max_duration_s = 8; // ..., max duration collected to execute the batch float batch_max_duration_s = 9; // ..., max duration collected to execute the batch
uint64 batch_min_size = 9; // ..., min number of samples to collect before executing batch uint64 batch_min_size = 10; // ..., min number of samples to collect before executing batch
uint64 batch_max_size = 10; // ..., max number of samples collected to execute the batch uint64 batch_max_size = 11; // ..., max number of samples collected to execute the batch
} }
message AnalyzerFilter { message AnalyzerFilter {
......
...@@ -34,7 +34,7 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -34,7 +34,7 @@ class AnalyticsBackendService(GenericGrpcService):
'group.id' : 'analytics-frontend', 'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'}) 'auto.offset.reset' : 'latest'})
def RunSparkStreamer(self, analyzer_id, analyzer): def StartSparkStreamer(self, analyzer_id, analyzer):
kpi_list = analyzer['input_kpis'] kpi_list = analyzer['input_kpis']
oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())] # TODO: update this line... oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())] # TODO: update this line...
thresholds = analyzer['thresholds'] thresholds = analyzer['thresholds']
...@@ -59,17 +59,33 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -59,17 +59,33 @@ class AnalyticsBackendService(GenericGrpcService):
LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
return False return False
def RunRequestListener(self)->bool: def StopRequestListener(self, threadInfo: tuple):
threading.Thread(target=self.RequestListener).start() try:
return True thread, stop_event = threadInfo
stop_event.set()
thread.join()
print ("Terminating Analytics backend RequestListener")
LOGGER.info("Terminating Analytics backend RequestListener")
return True
except Exception as e:
print ("Failed to terminate analytics backend {:}".format(e))
LOGGER.error("Failed to terminate analytics backend {:}".format(e))
return False
def StartRequestListener(self)->tuple:
stop_event = threading.Event()
thread = threading.Thread(target=self.RequestListener,
args=(stop_event,) )
thread.start()
return (thread, stop_event)
def RequestListener(self): def RequestListener(self, stop_event):
""" """
listener for requests on Kafka topic. listener for requests on Kafka topic.
""" """
consumer = self.kafka_consumer consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
while True: while not stop_event.is_set():
receive_msg = consumer.poll(2.0) receive_msg = consumer.poll(2.0)
if receive_msg is None: if receive_msg is None:
continue continue
...@@ -87,7 +103,9 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -87,7 +103,9 @@ class AnalyticsBackendService(GenericGrpcService):
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
self.TerminateAnalyzerBackend(analyzer_uuid) self.TerminateAnalyzerBackend(analyzer_uuid)
else: else:
self.RunSparkStreamer(analyzer_uuid, analyzer) self.StartSparkStreamer(analyzer_uuid, analyzer)
LOGGER.debug("Stop Event activated. Terminating...")
print ("Stop Event activated. Terminating...")
def TerminateAnalyzerBackend(self, analyzer_uuid): def TerminateAnalyzerBackend(self, analyzer_uuid):
if analyzer_uuid in self.running_threads: if analyzer_uuid in self.running_threads:
...@@ -104,5 +122,5 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -104,5 +122,5 @@ class AnalyticsBackendService(GenericGrpcService):
return False return False
else: else:
print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
# LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
# generate confirmation towards frontend # generate confirmation towards frontend
...@@ -142,7 +142,8 @@ def SparkStreamer(kpi_list, oper_list, thresholds, stop_event, ...@@ -142,7 +142,8 @@ def SparkStreamer(kpi_list, oper_list, thresholds, stop_event,
# Loop to check for stop event flag. To be set by stop collector method. # Loop to check for stop event flag. To be set by stop collector method.
while True: while True:
if stop_event.is_set(): if stop_event.is_set():
print ("Stop Event activated. Terminating in 5 seconds...") LOGGER.debug("Stop Event activated. Terminating in 5 seconds...")
print ("Stop Event activated. Terminating in 5 seconds...")
time.sleep(5) time.sleep(5)
queryHandler.stop() queryHandler.stop()
break break
......
...@@ -12,7 +12,9 @@ ...@@ -12,7 +12,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import time
import logging import logging
import threading
from common.tools.kafka.Variables import KafkaTopic from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
...@@ -30,12 +32,25 @@ def test_validate_kafka_topics(): ...@@ -30,12 +32,25 @@ def test_validate_kafka_topics():
response = KafkaTopic.create_all_topics() response = KafkaTopic.create_all_topics()
assert isinstance(response, bool) assert isinstance(response, bool)
def test_RunRequestListener(): def test_StartRequestListener():
LOGGER.info('test_RunRequestListener') LOGGER.info('test_RunRequestListener')
AnalyticsBackendServiceObj = AnalyticsBackendService() AnalyticsBackendServiceObj = AnalyticsBackendService()
response = AnalyticsBackendServiceObj.RunRequestListener() response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
LOGGER.debug(str(response)) LOGGER.debug(str(response))
assert isinstance(response, tuple)
def test_StopRequestListener():
LOGGER.info('test_RunRequestListener')
LOGGER.info('Initiating StartRequestListener...')
AnalyticsBackendServiceObj = AnalyticsBackendService()
response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
# LOGGER.debug(str(response_thread))
time.sleep(10)
LOGGER.info('Initiating StopRequestListener...')
AnalyticsBackendServiceObj = AnalyticsBackendService()
response = AnalyticsBackendServiceObj.StopRequestListener(response_thread)
LOGGER.debug(str(response))
assert isinstance(response, bool)
def test_SparkListener(): def test_SparkListener():
LOGGER.info('test_RunRequestListener') LOGGER.info('test_RunRequestListener')
......
...@@ -87,7 +87,7 @@ class Analyzer(Base): ...@@ -87,7 +87,7 @@ class Analyzer(Base):
response = analytics_frontend_pb2.Analyzer() response = analytics_frontend_pb2.Analyzer()
response.analyzer_id.analyzer_id.uuid = row.analyzer_id response.analyzer_id.analyzer_id.uuid = row.analyzer_id
response.algorithm_name = row.algorithm_name response.algorithm_name = row.algorithm_name
response.operation_mode = row.operation_mode response.operation_mode = row.operation_mode.value
response.parameters.update(row.parameters) response.parameters.update(row.parameters)
for input_kpi_id in row.input_kpi_ids: for input_kpi_id in row.input_kpi_ids:
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
import logging import logging
import sqlalchemy_utils import sqlalchemy_utils
from sqlalchemy import inspect from sqlalchemy import inspect, or_
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel
...@@ -120,9 +120,22 @@ class AnalyzerDB: ...@@ -120,9 +120,22 @@ class AnalyzerDB:
session = self.Session() session = self.Session()
try: try:
query = session.query(AnalyzerModel) query = session.query(AnalyzerModel)
# Apply filters based on the filter_object # Apply filters based on the filter_object
if filter_object.kpi_id: if filter_object.analyzer_id:
query = query.filter(AnalyzerModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) # Need to be updated query = query.filter(AnalyzerModel.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id]))
if filter_object.algorithm_names:
query = query.filter(AnalyzerModel.algorithm_name.in_(filter_object.algorithm_names))
if filter_object.input_kpi_ids:
input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids]
query = query.filter(AnalyzerModel.input_kpi_ids.op('&&')(input_kpi_uuids))
if filter_object.output_kpi_ids:
output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids]
query = query.filter(AnalyzerModel.output_kpi_ids.op('&&')(output_kpi_uuids))
result = query.all() result = query.all()
# query should be added to return all rows # query should be added to return all rows
if result: if result:
......
...@@ -126,12 +126,23 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -126,12 +126,23 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectAnalyzers(self, def SelectAnalyzers(self,
request : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore filter : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore
) -> AnalyzerList: # type: ignore ) -> AnalyzerList: # type: ignore
LOGGER.info("At Service gRPC message: {:}".format(request)) LOGGER.info("At Service gRPC message: {:}".format(filter))
response = AnalyzerList() response = AnalyzerList()
try:
return response rows = self.db_obj.select_with_filter(AnalyzerModel, filter)
try:
for row in rows:
response.analyzer_list.append(
AnalyzerModel.ConvertRowToAnalyzer(row)
)
return response
except Exception as e:
LOGGER.info('Unable to process filter response {:}'.format(e))
except Exception as e:
LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e))
def delivery_callback(self, err, msg): def delivery_callback(self, err, msg):
if err: if err:
......
...@@ -21,12 +21,13 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze ...@@ -21,12 +21,13 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze
def create_analyzer_id(): def create_analyzer_id():
_create_analyzer_id = AnalyzerId() _create_analyzer_id = AnalyzerId()
# _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer_id.analyzer_id.uuid = "9baa306d-d91c-48c2-a92f-76a21bab35b6" _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
return _create_analyzer_id return _create_analyzer_id
def create_analyzer(): def create_analyzer():
_create_analyzer = Analyzer() _create_analyzer = Analyzer()
_create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
...@@ -60,24 +61,24 @@ def create_analyzer_filter(): ...@@ -60,24 +61,24 @@ def create_analyzer_filter():
_create_analyzer_filter = AnalyzerFilter() _create_analyzer_filter = AnalyzerFilter()
_analyzer_id_obj = AnalyzerId() _analyzer_id_obj = AnalyzerId()
_analyzer_id_obj.analyzer_id.uuid = str(uuid.uuid4()) # _analyzer_id_obj.analyzer_id.uuid = str(uuid.uuid4())
_analyzer_id_obj.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
_create_analyzer_filter.analyzer_id.append(_analyzer_id_obj) _create_analyzer_filter.analyzer_id.append(_analyzer_id_obj)
_create_analyzer_filter.algorithm_names.append('Algorithum1') _create_analyzer_filter.algorithm_names.append('Test_Aggergate_and_Threshold')
_input_kpi_id_obj = KpiId() # _input_kpi_id_obj = KpiId()
_input_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) # _input_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj) # _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj)
# another input kpi Id # another input kpi Id
# _input_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) # _input_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4())
# _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj) # _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj)
_output_kpi_id_obj = KpiId() # _output_kpi_id_obj = KpiId()
_output_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) # _output_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer_filter.output_kpi_ids.append(_output_kpi_id_obj) # _create_analyzer_filter.output_kpi_ids.append(_output_kpi_id_obj)
# another output kpi Id # # another output kpi Id
# _output_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) # _output_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4())
# _create_analyzer_filter.input_kpi_ids.append(_output_kpi_id_obj) # _create_analyzer_filter.input_kpi_ids.append(_output_kpi_id_obj)
return _create_analyzer_filter return _create_analyzer_filter
...@@ -21,10 +21,11 @@ from common.proto.context_pb2 import Empty ...@@ -21,10 +21,11 @@ from common.proto.context_pb2 import Empty
from common.Settings import ( get_service_port_grpc, get_env_var_name, from common.Settings import ( get_service_port_grpc, get_env_var_name,
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC ) ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC )
from common.tools.kafka.Variables import KafkaTopic
from common.proto.analytics_frontend_pb2 import AnalyzerId, AnalyzerList from common.proto.analytics_frontend_pb2 import AnalyzerId, AnalyzerList
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService
from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer, from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer,
create_analyzer_filter ) create_analyzer_filter )
########################### ###########################
...@@ -75,6 +76,12 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic ...@@ -75,6 +76,12 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic
# Tests Implementation of Analytics Frontend # Tests Implementation of Analytics Frontend
########################### ###########################
# --- "test_validate_kafka_topics" should be executed before the functionality tests ---
def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
# ----- core funtionality test ----- # ----- core funtionality test -----
def test_StartAnalytics(analyticsFrontend_client): def test_StartAnalytics(analyticsFrontend_client):
LOGGER.info(' >>> test_StartAnalytic START: <<< ') LOGGER.info(' >>> test_StartAnalytic START: <<< ')
...@@ -82,14 +89,14 @@ def test_StartAnalytics(analyticsFrontend_client): ...@@ -82,14 +89,14 @@ def test_StartAnalytics(analyticsFrontend_client):
LOGGER.debug(str(response)) LOGGER.debug(str(response))
assert isinstance(response, AnalyzerId) assert isinstance(response, AnalyzerId)
def test_SelectAnalytics(analyticsFrontend_client):
LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
LOGGER.debug(str(response))
assert isinstance(response, AnalyzerList)
def test_StopAnalytic(analyticsFrontend_client): def test_StopAnalytic(analyticsFrontend_client):
LOGGER.info(' >>> test_StopAnalytic START: <<< ') LOGGER.info(' >>> test_StopAnalytic START: <<< ')
response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id())
LOGGER.debug(str(response)) LOGGER.debug(str(response))
assert isinstance(response, Empty) assert isinstance(response, Empty)
def test_SelectAnalytics(analyticsFrontend_client):
LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
LOGGER.debug(str(response))
assert isinstance(response, AnalyzerList)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment