diff --git a/proto/analytics_frontend.proto b/proto/analytics_frontend.proto index bc7420d540e1e2353faf6fe999f8c539693b80fc..ace0581db816bee1d0d20746f2b864dce602567b 100644 --- a/proto/analytics_frontend.proto +++ b/proto/analytics_frontend.proto @@ -35,18 +35,20 @@ enum AnalyzerOperationMode { ANALYZEROPERATIONMODE_STREAMING = 2; } +// duration field may be added in analyzer... message Analyzer { AnalyzerId analyzer_id = 1; string algorithm_name = 2; // The algorithm to be executed - repeated kpi_manager.KpiId input_kpi_ids = 3; // The KPI Ids to be processed by the analyzer - repeated kpi_manager.KpiId output_kpi_ids = 4; // The KPI Ids produced by the analyzer - AnalyzerOperationMode operation_mode = 5; // Operation mode of the analyzer - map<string, string> parameters = 6; + float duration_s = 3; // Termiate the data analytics thread after duration (seconds); 0 = infinity time + repeated kpi_manager.KpiId input_kpi_ids = 4; // The KPI Ids to be processed by the analyzer + repeated kpi_manager.KpiId output_kpi_ids = 5; // The KPI Ids produced by the analyzer + AnalyzerOperationMode operation_mode = 6; // Operation mode of the analyzer + map<string, string> parameters = 7; // Add dictionary of (key, value) pairs such as (window_size, 10) etc. // In batch mode... - float batch_min_duration_s = 7; // ..., min duration to collect before executing batch - float batch_max_duration_s = 8; // ..., max duration collected to execute the batch - uint64 batch_min_size = 9; // ..., min number of samples to collect before executing batch - uint64 batch_max_size = 10; // ..., max number of samples collected to execute the batch + float batch_min_duration_s = 8; // ..., min duration to collect before executing batch + float batch_max_duration_s = 9; // ..., max duration collected to execute the batch + uint64 batch_min_size = 10; // ..., min number of samples to collect before executing batch + uint64 batch_max_size = 11; // ..., max number of samples collected to execute the batch } message AnalyzerFilter { diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 899525fc6e3e89cf40409ace155d91a03437f4d1..463442f82bd2c6a53786581e8bd89553c499edc8 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -34,7 +34,7 @@ class AnalyticsBackendService(GenericGrpcService): 'group.id' : 'analytics-frontend', 'auto.offset.reset' : 'latest'}) - def RunSparkStreamer(self, analyzer_id, analyzer): + def StartSparkStreamer(self, analyzer_id, analyzer): kpi_list = analyzer['input_kpis'] oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())] # TODO: update this line... thresholds = analyzer['thresholds'] @@ -59,17 +59,33 @@ class AnalyticsBackendService(GenericGrpcService): LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) return False - def RunRequestListener(self)->bool: - threading.Thread(target=self.RequestListener).start() - return True + def StopRequestListener(self, threadInfo: tuple): + try: + thread, stop_event = threadInfo + stop_event.set() + thread.join() + print ("Terminating Analytics backend RequestListener") + LOGGER.info("Terminating Analytics backend RequestListener") + return True + except Exception as e: + print ("Failed to terminate analytics backend {:}".format(e)) + LOGGER.error("Failed to terminate analytics backend {:}".format(e)) + return False + + def StartRequestListener(self)->tuple: + stop_event = threading.Event() + thread = threading.Thread(target=self.RequestListener, + args=(stop_event,) ) + thread.start() + return (thread, stop_event) - def RequestListener(self): + def RequestListener(self, stop_event): """ listener for requests on Kafka topic. """ consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) - while True: + while not stop_event.is_set(): receive_msg = consumer.poll(2.0) if receive_msg is None: continue @@ -87,7 +103,9 @@ class AnalyticsBackendService(GenericGrpcService): if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: self.TerminateAnalyzerBackend(analyzer_uuid) else: - self.RunSparkStreamer(analyzer_uuid, analyzer) + self.StartSparkStreamer(analyzer_uuid, analyzer) + LOGGER.debug("Stop Event activated. Terminating...") + print ("Stop Event activated. Terminating...") def TerminateAnalyzerBackend(self, analyzer_uuid): if analyzer_uuid in self.running_threads: @@ -104,5 +122,5 @@ class AnalyticsBackendService(GenericGrpcService): return False else: print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) - # LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) + LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) # generate confirmation towards frontend diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py index 202076ed52f61dae1ac397fa162d3517919f2ce8..eaabcfed22d37053cc1289ab9a488e34369cbc92 100644 --- a/src/analytics/backend/service/SparkStreaming.py +++ b/src/analytics/backend/service/SparkStreaming.py @@ -142,7 +142,8 @@ def SparkStreamer(kpi_list, oper_list, thresholds, stop_event, # Loop to check for stop event flag. To be set by stop collector method. while True: if stop_event.is_set(): - print ("Stop Event activated. Terminating in 5 seconds...") + LOGGER.debug("Stop Event activated. Terminating in 5 seconds...") + print ("Stop Event activated. Terminating in 5 seconds...") time.sleep(5) queryHandler.stop() break diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 9e8a0832dde18270422071771473a9b5478b0b5f..c3e00df35a9c9e476afcd4c797e4875c94043443 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time import logging +import threading from common.tools.kafka.Variables import KafkaTopic from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict @@ -30,12 +32,25 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) -def test_RunRequestListener(): +def test_StartRequestListener(): LOGGER.info('test_RunRequestListener') AnalyticsBackendServiceObj = AnalyticsBackendService() - response = AnalyticsBackendServiceObj.RunRequestListener() - LOGGER.debug(str(response)) + response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) + LOGGER.debug(str(response)) + assert isinstance(response, tuple) +def test_StopRequestListener(): + LOGGER.info('test_RunRequestListener') + LOGGER.info('Initiating StartRequestListener...') + AnalyticsBackendServiceObj = AnalyticsBackendService() + response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) + # LOGGER.debug(str(response_thread)) + time.sleep(10) + LOGGER.info('Initiating StopRequestListener...') + AnalyticsBackendServiceObj = AnalyticsBackendService() + response = AnalyticsBackendServiceObj.StopRequestListener(response_thread) + LOGGER.debug(str(response)) + assert isinstance(response, bool) def test_SparkListener(): LOGGER.info('test_RunRequestListener') diff --git a/src/analytics/database/AnalyzerModel.py b/src/analytics/database/AnalyzerModel.py index 7832052690c3dd5bc25617ef9143f263f0747843..c33e396e06a8dce96a86951a64aa59b510931dfe 100644 --- a/src/analytics/database/AnalyzerModel.py +++ b/src/analytics/database/AnalyzerModel.py @@ -87,7 +87,7 @@ class Analyzer(Base): response = analytics_frontend_pb2.Analyzer() response.analyzer_id.analyzer_id.uuid = row.analyzer_id response.algorithm_name = row.algorithm_name - response.operation_mode = row.operation_mode + response.operation_mode = row.operation_mode.value response.parameters.update(row.parameters) for input_kpi_id in row.input_kpi_ids: diff --git a/src/analytics/database/Analyzer_DB.py b/src/analytics/database/Analyzer_DB.py index 896ba110027b6e8db6bcb3db6add334f1eb017dd..1ba68989a066e4638adc12e65289ed50b740731d 100644 --- a/src/analytics/database/Analyzer_DB.py +++ b/src/analytics/database/Analyzer_DB.py @@ -15,7 +15,7 @@ import logging import sqlalchemy_utils -from sqlalchemy import inspect +from sqlalchemy import inspect, or_ from sqlalchemy.orm import sessionmaker from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel @@ -120,9 +120,22 @@ class AnalyzerDB: session = self.Session() try: query = session.query(AnalyzerModel) + # Apply filters based on the filter_object - if filter_object.kpi_id: - query = query.filter(AnalyzerModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) # Need to be updated + if filter_object.analyzer_id: + query = query.filter(AnalyzerModel.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) + + if filter_object.algorithm_names: + query = query.filter(AnalyzerModel.algorithm_name.in_(filter_object.algorithm_names)) + + if filter_object.input_kpi_ids: + input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids] + query = query.filter(AnalyzerModel.input_kpi_ids.op('&&')(input_kpi_uuids)) + + if filter_object.output_kpi_ids: + output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids] + query = query.filter(AnalyzerModel.output_kpi_ids.op('&&')(output_kpi_uuids)) + result = query.all() # query should be added to return all rows if result: diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index 9c438761c295405a4f4459700aa3056b2f0a1ecc..f35f035e2fc004f0553c0a471bfcb8cd3d7699be 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -126,12 +126,23 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectAnalyzers(self, - request : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore + filter : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore ) -> AnalyzerList: # type: ignore - LOGGER.info("At Service gRPC message: {:}".format(request)) + LOGGER.info("At Service gRPC message: {:}".format(filter)) response = AnalyzerList() - - return response + try: + rows = self.db_obj.select_with_filter(AnalyzerModel, filter) + try: + for row in rows: + response.analyzer_list.append( + AnalyzerModel.ConvertRowToAnalyzer(row) + ) + return response + except Exception as e: + LOGGER.info('Unable to process filter response {:}'.format(e)) + except Exception as e: + LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e)) + def delivery_callback(self, err, msg): if err: diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py index 180fac1f8d752c61ef9f08c5d9ddb4da3359cee1..646de962e8a213582fdb7cd1446ab57bda561a96 100644 --- a/src/analytics/frontend/tests/messages.py +++ b/src/analytics/frontend/tests/messages.py @@ -21,12 +21,13 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze def create_analyzer_id(): _create_analyzer_id = AnalyzerId() # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) - _create_analyzer_id.analyzer_id.uuid = "9baa306d-d91c-48c2-a92f-76a21bab35b6" + _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" return _create_analyzer_id def create_analyzer(): _create_analyzer = Analyzer() - _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING @@ -60,24 +61,24 @@ def create_analyzer_filter(): _create_analyzer_filter = AnalyzerFilter() _analyzer_id_obj = AnalyzerId() - _analyzer_id_obj.analyzer_id.uuid = str(uuid.uuid4()) + # _analyzer_id_obj.analyzer_id.uuid = str(uuid.uuid4()) + _analyzer_id_obj.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" _create_analyzer_filter.analyzer_id.append(_analyzer_id_obj) - _create_analyzer_filter.algorithm_names.append('Algorithum1') + _create_analyzer_filter.algorithm_names.append('Test_Aggergate_and_Threshold') - _input_kpi_id_obj = KpiId() - _input_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) - _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj) + # _input_kpi_id_obj = KpiId() + # _input_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) + # _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj) # another input kpi Id # _input_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) # _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj) - _output_kpi_id_obj = KpiId() - _output_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) - _create_analyzer_filter.output_kpi_ids.append(_output_kpi_id_obj) - # another output kpi Id + # _output_kpi_id_obj = KpiId() + # _output_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) + # _create_analyzer_filter.output_kpi_ids.append(_output_kpi_id_obj) + # # another output kpi Id # _output_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) # _create_analyzer_filter.input_kpi_ids.append(_output_kpi_id_obj) return _create_analyzer_filter - diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index df6ce165e4ae16444c2d0472121677f899356ba4..b96116d29a16a8c327242a50cd6d8dabd106bfeb 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -21,10 +21,11 @@ from common.proto.context_pb2 import Empty from common.Settings import ( get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC ) +from common.tools.kafka.Variables import KafkaTopic from common.proto.analytics_frontend_pb2 import AnalyzerId, AnalyzerList from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService -from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer, +from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer, create_analyzer_filter ) ########################### @@ -75,6 +76,12 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic # Tests Implementation of Analytics Frontend ########################### +# --- "test_validate_kafka_topics" should be executed before the functionality tests --- +def test_validate_kafka_topics(): + LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) + # ----- core funtionality test ----- def test_StartAnalytics(analyticsFrontend_client): LOGGER.info(' >>> test_StartAnalytic START: <<< ') @@ -82,14 +89,14 @@ def test_StartAnalytics(analyticsFrontend_client): LOGGER.debug(str(response)) assert isinstance(response, AnalyzerId) +def test_SelectAnalytics(analyticsFrontend_client): + LOGGER.info(' >>> test_SelectAnalytics START: <<< ') + response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) + LOGGER.debug(str(response)) + assert isinstance(response, AnalyzerList) + def test_StopAnalytic(analyticsFrontend_client): LOGGER.info(' >>> test_StopAnalytic START: <<< ') response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) LOGGER.debug(str(response)) assert isinstance(response, Empty) - -def test_SelectAnalytics(analyticsFrontend_client): - LOGGER.info(' >>> test_SelectAnalytics START: <<< ') - response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) - LOGGER.debug(str(response)) - assert isinstance(response, AnalyzerList) \ No newline at end of file