Commit dafc8e87 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Updates to Streamer Classes in Backend Analytics:

- Replaced `SparkStreamer` class with `DaskStreamer` class.
- Updated methods to call the `DaskStreamer` class.
- Improved messages and test files.
- Added new package requirements.
- Made minor changes to the frontend `requirements` file.
parent 87802cc7
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -12,5 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

pyspark==3.5.2
dask==2024.9.0
distributed==2024.9.0
pandas==2.2.3
confluent-kafka==2.3.*
+40 −48
Original line number Diff line number Diff line
@@ -12,18 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import time
import json
import logging
import threading
from common.tools.service.GenericGrpcService import GenericGrpcService
from analytics.backend.service.SparkStreaming import SparkStreamer
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc

from threading import Thread, Event
from .DaskStreaming import DaskStreamer

LOGGER = logging.getLogger(__name__)

@@ -40,44 +40,6 @@ class AnalyticsBackendService(GenericGrpcService):
                                            'group.id'           : 'analytics-frontend',
                                            'auto.offset.reset'  : 'latest'})

    def StartSparkStreamer(self, analyzer_uuid, analyzer):
        kpi_list      = analyzer['input_kpis'] 
        oper_list     = [s.split('_', 1)[0] for s in list(analyzer["thresholds"].keys())]  # TODO: update this line...
        thresholds    = analyzer['thresholds']
        window_size   = analyzer['window_size']
        window_slider = analyzer['window_slider']
        # print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
        #     kpi_list, oper_list, thresholds, window_size, window_slider))
        # LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
        #     kpi_list, oper_list, thresholds, window_size, window_slider))
        try:
            stop_event = threading.Event()
            thread = threading.Thread(target=SparkStreamer, 
                            args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event,
                                  window_size, window_slider, None ))
            self.running_threads[analyzer_uuid] = (thread, stop_event)
            thread.start()
            print      ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
            LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
            return True
        except Exception as e:
            print       ("Failed to initiate Analyzer backend: {:}".format(e))
            LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
            return False

    def StopRequestListener(self, threadInfo: tuple):
        try:
            thread, stop_event = threadInfo
            stop_event.set()
            thread.join()
            print      ("Terminating Analytics backend RequestListener")
            LOGGER.info("Terminating Analytics backend RequestListener")
            return True
        except Exception as e:
            print       ("Failed to terminate analytics backend {:}".format(e))
            LOGGER.error("Failed to terminate analytics backend {:}".format(e))
            return False

    def install_servicers(self):
        threading.Thread(target=self.RequestListener, args=()).start()

@@ -97,8 +59,9 @@ class AnalyticsBackendService(GenericGrpcService):
                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print("Consumer error: {}".format(receive_msg.error()))
                    break
                    LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
                    print       ("Consumer error: {:}".format(receive_msg.error()))
                    continue
            try:
                analyzer      = json.loads(receive_msg.value().decode('utf-8'))
                analyzer_uuid = receive_msg.key().decode('utf-8')
@@ -106,14 +69,44 @@ class AnalyticsBackendService(GenericGrpcService):
                print       ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))

                if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
                    self.TerminateAnalyzerBackend(analyzer_uuid)
                    self.StopDaskListener(analyzer_uuid)
                else:
                    self.StartSparkStreamer(analyzer_uuid, analyzer)
                    self.StartDaskListener(analyzer_uuid, analyzer)
            except Exception as e:
                LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
                print         ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))

    def TerminateAnalyzerBackend(self, analyzer_uuid):
    def StartDaskListener(self, analyzer_uuid, analyzer):
        kpi_list      = analyzer[ 'input_kpis'   ] 
        thresholds    = analyzer[ 'thresholds'   ]
        window_size   = analyzer[ 'window_size'  ]
        window_slider = analyzer[ 'window_slider']

        LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format(
            kpi_list, thresholds, window_size, window_slider))
        print        ("Received parameters: {:} - {:} - {:} - {:}".format(
            kpi_list, thresholds, window_size, window_slider))
        try:
            stop_event = Event()
            thread     = Thread(
                target=DaskStreamer,
                # args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event),
                args=(analyzer_uuid, kpi_list, thresholds, stop_event),
                kwargs={
                    "window_size"       : window_size,
                }
            )
            thread.start()
            self.running_threads[analyzer_uuid] = (thread, stop_event)
            print      ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
            LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
            return True
        except Exception as e:
            print       ("Failed to initiate Analyzer backend: {:}".format(e))
            LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
            return False

    def StopDaskListener(self, analyzer_uuid):
        if analyzer_uuid in self.running_threads:
            try:
                thread, stop_event = self.running_threads[analyzer_uuid]
@@ -129,4 +122,3 @@ class AnalyticsBackendService(GenericGrpcService):
        else:
            print         ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
            LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
            # generate confirmation towards frontend
+233 −0
Original line number Diff line number Diff line
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import time
import json
from confluent_kafka import Consumer, Producer, KafkaException, KafkaError
import pandas as pd
from dask.distributed import Client, LocalCluster
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic

logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)

def SettingKafkaConsumerParams():
    return {'bootstrap.servers'  : KafkaConfig.get_kafka_address(),
            'group.id'           : 'analytics-backend',
            'auto.offset.reset'  : 'latest'}

def GetAggregationMappings(thresholds):
    agg_dict = {}
    for threshold_key in thresholds.keys():
        parts = threshold_key.split('_', 1)
        if len(parts) != 2:
            LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '<aggregation>_<metricName>' format. Skipping.")
            continue
        aggregation, metric_name = parts
        # Ensure that the aggregation function is valid in pandas
        if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']:
            LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.")
            continue
        agg_dict[threshold_key] = ('kpi_value', aggregation)
    return agg_dict

def ApplyThresholds(aggregated_df, thresholds):
    """
    Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
    on the aggregated DataFrame.
    Args:       aggregated_df (pd.DataFrame): DataFrame with aggregated metrics.
                thresholds (dict): Thresholds dictionary with keys in the format '<aggregation>_<metricName>'.
    Returns:    pd.DataFrame: DataFrame with additional threshold columns.
    """
    for threshold_key, threshold_values in thresholds.items():
        if threshold_key not in aggregated_df.columns:
            LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.")
            continue
        if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2:
            fail_th, raise_th = threshold_values
            aggregated_df[f"{threshold_key}_THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th
            aggregated_df[f"{threshold_key}_THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th
        else:
            LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.")
    return aggregated_df

def initialize_dask_client():
    """
    Initialize a local Dask cluster and client.
    """
    cluster = LocalCluster(n_workers=2, threads_per_worker=2)
    client = Client(cluster)
    LOGGER.info(f"Dask Client Initialized: {client}")
    return client, cluster

def initialize_kafka_producer():
    return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()})

def delivery_report(err, msg):
    if err is not None:
        LOGGER.error(f"Message delivery failed: {err}")
    else:
        LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")

def process_batch(batch, agg_mappings, thresholds):
    """
    Process a batch of data and apply thresholds.
    Args:       batch (list of dict): List of messages from Kafka.
                agg_mappings (dict): Mapping from threshold key to aggregation function.
                thresholds (dict): Thresholds dictionary.
    Returns:    list of dict: Processed records ready to be sent to Kafka.
    """
    if not batch:
        LOGGER.info("Empty batch received. Skipping processing.")
        return []

    df = pd.DataFrame(batch)
    df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
    df.dropna(subset=['time_stamp'], inplace=True)
    required_columns = {'time_stamp', 'kpi_id', 'kpi_value'}
    if not required_columns.issubset(df.columns):
        LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.")
        return []
    if df.empty:
        LOGGER.info("No data after filtering by KPI IDs. Skipping processing.")
        return []

    # Perform aggregations using named aggregation
    try:
        agg_dict = {key: value for key, value in agg_mappings.items()}
        df_agg = df.groupby(['window_start', 'kpi_id']).agg(**agg_dict).reset_index()
    except Exception as e:
        LOGGER.error(f"Aggregation error: {e}")
        return []

    # Apply thresholds
    df_thresholded = ApplyThresholds(df_agg, thresholds)
    df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
    # Convert aggregated DataFrame to list of dicts
    result = df_thresholded.to_dict(orient='records')
    LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.")

    return result

def produce_result(result, producer, destination_topic):
    for record in result:
        try:
            producer.produce(
                destination_topic,
                key=str(record.get('kpi_id', '')),
                value=json.dumps(record),
                callback=delivery_report
            )
        except KafkaException as e:
            LOGGER.error(f"Failed to produce message: {e}")
    producer.flush()
    LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")

def DaskStreamer(key, kpi_list, thresholds, stop_event,
                window_size="30s", time_stamp_col="time_stamp"):
    client, cluster = initialize_dask_client()
    consumer_conf   = SettingKafkaConsumerParams()
    consumer        = Consumer(consumer_conf)
    consumer.subscribe([KafkaTopic.VALUE.value])
    producer        = initialize_kafka_producer()

    # Parse window_size to seconds
    try:
        window_size_td = pd.to_timedelta(window_size)
        window_size_seconds = window_size_td.total_seconds()
    except Exception as e:
        LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}")
        window_size_seconds = 30 
    LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.")

    # Extract aggregation mappings from thresholds
    agg_mappings = GetAggregationMappings(thresholds)
    if not agg_mappings:
        LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.")
        consumer.close()
        producer.flush()
        client.close()
        cluster.close()
        return
    try:
        batch = []
        last_batch_time = time.time()
        LOGGER.info("Starting to consume messages...")

        while not stop_event.is_set():
            msg = consumer.poll(1.0)

            if msg is None:
                current_time = time.time()
                if (current_time - last_batch_time) >= window_size_seconds and batch:
                    LOGGER.info("Time-based batch threshold reached. Processing batch.")
                    future = client.submit(process_batch, batch, agg_mappings, thresholds)
                    future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
                    batch = []
                    last_batch_time = current_time
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
                else:
                    LOGGER.error(f"Kafka error: {msg.error()}")
                continue

            try:
                message_value = json.loads(msg.value().decode('utf-8'))
            except json.JSONDecodeError as e:
                LOGGER.error(f"JSON decode error: {e}")
                continue

            try:
                message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce')
                if pd.isna(message_timestamp):
                    LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.")
                    continue
                window_start = message_timestamp.floor(window_size)
                message_value['window_start'] = window_start
            except Exception as e:
                LOGGER.error(f"Error processing timestamp: {e}. Skipping message.")
                continue

            if message_value['kpi_id'] not in kpi_list:
                LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.")
                continue

            batch.append(message_value)

            current_time = time.time()
            if (current_time - last_batch_time) >= window_size_seconds and batch:
                LOGGER.info("Time-based batch threshold reached. Processing batch.")
                future = client.submit(process_batch, batch, agg_mappings, thresholds)
                future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
                batch = []
                last_batch_time = current_time

    except Exception as e:
        LOGGER.exception(f"Error in Dask streaming process: {e}")
    finally:
        # Process any remaining messages in the batch
        if batch:
            LOGGER.info("Processing remaining messages in the batch.")
            future = client.submit(process_batch, batch, agg_mappings, thresholds)
            future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
        consumer.close()
        producer.flush()
        LOGGER.info("Kafka consumer and producer closed.")
        client.close()
        cluster.close()
        LOGGER.info("Dask client and cluster closed.")
+43 −2
Original line number Diff line number Diff line
@@ -16,7 +16,7 @@ import uuid
import json
from common.proto.kpi_manager_pb2        import KpiId
from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode,
                                                Analyzer )
                                                Analyzer, AnalyzerId )

def get_kpi_id_list():
    return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"]
@@ -38,6 +38,13 @@ def get_threshold_dict():
        op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict
    }

def create_analyzer_id():
    _create_analyzer_id                  = AnalyzerId()
    # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
    # _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
    _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
    return _create_analyzer_id


def create_analyzer():
    _create_analyzer                              = Analyzer()
@@ -71,3 +78,37 @@ def create_analyzer():
    _create_analyzer.parameters['store_aggregate'] = str(False)       # TRUE to store. No implemented yet

    return _create_analyzer

def create_analyzer_dask():
    _create_analyzer                              = Analyzer()
    # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
    _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
    _create_analyzer.algorithm_name               = "Test_Aggergate_and_Threshold"
    _create_analyzer.operation_mode               = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
    
    _kpi_id = KpiId()
    # input IDs to analyze
    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
    _kpi_id.kpi_id.uuid              = "6e22f180-ba28-4641-b190-2287bf448888" 
    _create_analyzer.input_kpi_ids.append(_kpi_id)
    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
    _kpi_id.kpi_id.uuid              = "1e22f180-ba28-4641-b190-2287bf446666"
    _create_analyzer.input_kpi_ids.append(_kpi_id)
    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
    _create_analyzer.input_kpi_ids.append(_kpi_id)
    # output IDs after analysis
    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
    _create_analyzer.output_kpi_ids.append(_kpi_id)
    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
    _create_analyzer.output_kpi_ids.append(_kpi_id)
    # parameter

    _threshold_dict = {
        'mean_PKS_TX'   :(20, 30), 'min_value'   :(00, 10),  'max_value' :(45, 50),#}
        'first_value'   :(00, 50), 'last_value'  :(50, 100), 'std_value' :(0, 90)}
    _create_analyzer.parameters['thresholds']      = json.dumps(_threshold_dict)
    _create_analyzer.parameters['oper_list']       = json.dumps([key.split('_')[0] for key in _threshold_dict.keys()])
    _create_analyzer.parameters['window_size']     = "10s"     # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" 
    _create_analyzer.parameters['window_slider']   = "5s"     # should be less than window size
    _create_analyzer.parameters['store_aggregate'] = str(False)       # TRUE to store. No implemented yet
    return _create_analyzer
+99 −24

File changed.

Preview size limit exceeded, changes collapsed.

Loading