Skip to content
Snippets Groups Projects
Commit cf640e65 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Updated Analaytics backend streamer class.

- Enhance AnalyticsBackendService
- DaskStreamer with improved logging and threading management
- Update duration handling in tests.
parent 1e90b149
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!317Resolve "(CTTC) Analytics Module Enhancements"
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import time
import json import json
import logging import logging
import threading import threading
...@@ -26,11 +27,11 @@ from common.Settings import get_service_port_grpc ...@@ -26,11 +27,11 @@ from common.Settings import get_service_port_grpc
from threading import Thread, Event from threading import Thread, Event
from analytics.backend.service.Streamer import DaskStreamer from analytics.backend.service.Streamer import DaskStreamer
from common.proto.analytics_frontend_pb2 import Analyzer from common.proto.analytics_frontend_pb2 import Analyzer
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta from datetime import datetime, timedelta
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s')
class AnalyticsBackendService(GenericGrpcService): class AnalyticsBackendService(GenericGrpcService):
""" """
...@@ -40,9 +41,7 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -40,9 +41,7 @@ class AnalyticsBackendService(GenericGrpcService):
LOGGER.info('Init AnalyticsBackendService') LOGGER.info('Init AnalyticsBackendService')
port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND) port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND)
super().__init__(port, cls_name=cls_name) super().__init__(port, cls_name=cls_name)
self.schedular = BackgroundScheduler(daemon=True) self.active_streamers = {}
self.schedular.start()
self.running_threads = {} # To keep track of all running analyzers
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend', 'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'}) 'auto.offset.reset' : 'latest'})
...@@ -75,7 +74,7 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -75,7 +74,7 @@ class AnalyticsBackendService(GenericGrpcService):
try: try:
analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer = json.loads(receive_msg.value().decode('utf-8'))
analyzer_uuid = receive_msg.key().decode('utf-8') analyzer_uuid = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
# print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) # print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
...@@ -90,6 +89,9 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -90,6 +89,9 @@ class AnalyticsBackendService(GenericGrpcService):
""" """
Start the DaskStreamer with the given parameters. Start the DaskStreamer with the given parameters.
""" """
if analyzer_uuid in self.active_streamers:
LOGGER.warning("Dask Streamer already running with the given analyzer_uuid: {:}".format(analyzer_uuid))
return False
try: try:
streamer = DaskStreamer( streamer = DaskStreamer(
analyzer_uuid, analyzer_uuid,
...@@ -99,13 +101,20 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -99,13 +101,20 @@ class AnalyticsBackendService(GenericGrpcService):
analyzer['batch_size' ], analyzer['batch_size' ],
analyzer['window_size'], analyzer['window_size'],
) )
self.schedular.add_job( streamer.start()
streamer.run, logging.info(f"Streamer started with analyzer Id: {analyzer_uuid}")
'date',
run_date=datetime.now(pytz.utc), # Stop the streamer after the given duration
id=analyzer_uuid, if analyzer['duration'] is not None:
replace_existing=True def stop_after_duration():
) time.sleep(analyzer['duration'])
logging.info(f"Stopping streamer with analyzer: {analyzer_uuid}")
streamer.stop()
duration_thread = threading.Thread(target=stop_after_duration, daemon=True)
duration_thread.start()
self.active_streamers[analyzer_uuid] = streamer
LOGGER.info("Dask Streamer started.") LOGGER.info("Dask Streamer started.")
return True return True
except Exception as e: except Exception as e:
...@@ -117,13 +126,15 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -117,13 +126,15 @@ class AnalyticsBackendService(GenericGrpcService):
Stop the DaskStreamer with the given analyzer_uuid. Stop the DaskStreamer with the given analyzer_uuid.
""" """
try: try:
active_jobs = self.schedular.get_jobs() if analyzer_uuid not in self.active_streamers:
logger.debug("Active Jobs: {:}".format(active_jobs))
if analyzer_uuid not in [job.id for job in active_jobs]:
LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid)) LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid))
return False return False
self.schedular.remove_job(analyzer_uuid) LOGGER.info(f"Stopping streamer with key: {analyzer_uuid}")
LOGGER.info("Dask Streamer stopped.") streamer = self.active_streamers[analyzer_uuid]
streamer.stop()
streamer.join()
del self.active_streamers[analyzer_uuid]
LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been stopped.")
return True return True
except Exception as e: except Exception as e:
LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e)) LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e))
......
...@@ -119,9 +119,13 @@ def aggregation_handler( ...@@ -119,9 +119,13 @@ def aggregation_handler(
agg_df['kpi_id'] = output_kpi_list[kpi_index] agg_df['kpi_id'] = output_kpi_list[kpi_index]
# if agg_df.empty:
# logger.warning(f"No data available for KPI: {kpi_id}. Skipping threshold application.")
# continue
# logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}")
result = threshold_handler(key, agg_df, kpi_task_parameters) result = threshold_handler(key, agg_df, kpi_task_parameters)
return result.to_dict(orient='records') return result.to_dict(orient='records')
else: else:
logger.debug(f"No data available for KPI: {kpi_id}. Skipping aggregation.") logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.")
continue continue
...@@ -16,20 +16,19 @@ import logging ...@@ -16,20 +16,19 @@ import logging
import time import time
import json import json
from confluent_kafka import KafkaException, KafkaError from confluent_kafka import KafkaException, KafkaError
# import pandas as pd
from common.tools.kafka.Variables import KafkaTopic from common.tools.kafka.Variables import KafkaTopic
from .AnalyzerHandlers import AnalyzerHandlers, aggregation_handler from .AnalyzerHandlers import AnalyzerHandlers, aggregation_handler
from .AnalyzerHelper import AnalyzerHelper from .AnalyzerHelper import AnalyzerHelper
import threading
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s')
class DaskStreamer: class DaskStreamer(threading.Thread):
def __init__(self, key, input_kpis, output_kpis, thresholds, batch_size=5, def __init__(self, key, input_kpis, output_kpis, thresholds, batch_size=5,
window_size=None, n_workers=5, threads_per_worker=2): window_size=None, n_workers=1, threads_per_worker=1):
super().__init__()
self.key = key self.key = key
self.input_kpis = input_kpis self.input_kpis = input_kpis
self.output_kpis = output_kpis self.output_kpis = output_kpis
...@@ -96,8 +95,6 @@ class DaskStreamer: ...@@ -96,8 +95,6 @@ class DaskStreamer:
logger.exception(f"Error in Dask streaming process: {e}") logger.exception(f"Error in Dask streaming process: {e}")
finally: finally:
logger.info(">>> Exiting Dask Streamer...") logger.info(">>> Exiting Dask Streamer...")
self.cleanup()
logger.info(">>> Dask Streamer Cleanup Completed.")
def task_handler_selector(self): def task_handler_selector(self):
"""Select the task handler based on the task type.""" """Select the task handler based on the task type."""
...@@ -126,7 +123,7 @@ class DaskStreamer: ...@@ -126,7 +123,7 @@ class DaskStreamer:
self.producer.flush() self.producer.flush()
logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.") logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
def cleanup(self): def stop(self):
"""Clean up Kafka and Dask resources.""" """Clean up Kafka and Dask resources."""
logger.info("Shutting down resources...") logger.info("Shutting down resources...")
self.running = False self.running = False
...@@ -144,16 +141,18 @@ class DaskStreamer: ...@@ -144,16 +141,18 @@ class DaskStreamer:
except Exception as e: except Exception as e:
logger.error(f"Error closing Kafka producer: {e}") logger.error(f"Error closing Kafka producer: {e}")
if self.client and hasattr(self.client, 'status') and self.client.status == 'running': if self.client is not None and hasattr(self.client, 'status') and self.client.status == 'running':
try: try:
self.client.close() self.client.close()
logger.info("Dask client closed.") logger.info("Dask client closed.")
except Exception as e: except Exception as e:
logger.error(f"Error closing Dask client: {e}") logger.error(f"Error closing Dask client: {e}")
if self.cluster and hasattr(self.cluster, 'close'): if self.cluster is not None and hasattr(self.cluster, 'close'):
try: try:
self.cluster.close(timeout=5) self.cluster.close(timeout=5)
logger.info("Dask cluster closed.") logger.info("Dask cluster closed.")
except Exception as e: except Exception as e:
logger.error(f"May be timeout. Error closing Dask cluster: {e}") logger.error(f"Timeout error while closing Dask cluster: {e}")
...@@ -32,7 +32,7 @@ def get_thresholds(): ...@@ -32,7 +32,7 @@ def get_thresholds():
} }
def get_duration(): def get_duration():
return 30 return 60
def get_windows_size(): def get_windows_size():
return None return None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment