Skip to content
Snippets Groups Projects
Select Git revision
  • ededfa65e0f75f054570057a6a242f1371675a5e
  • master default
  • feat/331-bitnami-kafka-moved-to-bitnamilegacy-kafka
  • feat/330-tid-pcep-component
  • feat/tid-newer-pcep-component
  • feat/policy-refactor
  • feat/314-tid-new-service-for-ipowdm-configuration-fron-orchestrator-to-ipowdm-controller
  • feat/329-ecoc-2025-hack-your-research
  • develop protected
  • feat/116-ubi-updates-in-telemetry-backend-to-support-p4-in-band-network-telemetry
  • feat/320-cttc-ietf-simap-basic-support-with-kafka-yang-push
  • feat/310-cttc-implement-nbi-connector-to-interface-with-osm-client
  • feat/307-update-python-version-service
  • feat/292-cttc-implement-integration-test-for-ryu-openflow
  • cnit_tapi
  • feat/327-tid-new-service-to-ipowdm-controller-to-manage-transceivers-configuration-on-external-agent
  • cnit-p2mp-premerge
  • feat/325-tid-nbi-e2e-to-manage-e2e-path-computation
  • feat/326-tid-external-management-of-devices-telemetry-nbi
  • openroadm-flex-grid
  • CTTC-IMPLEMENT-NBI-CONNECTOR-NOS-ZTP
  • feat/324-tid-nbi-ietf_l3vpn-deploy-fail
  • v5.0.0 protected
  • v4.0.0 protected
  • demo-dpiab-eucnc2024
  • v3.0.0 protected
  • v2.1.0 protected
  • v2.0.0 protected
  • v1.0.0 protected
29 results

Streamer.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    Streamer.py 7.55 KiB
    # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import time
    import json
    import threading
    import logging
    
    from confluent_kafka                            import KafkaException, KafkaError
    from common.tools.kafka.Variables               import KafkaTopic
    from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler
    from analytics.backend.service.AnalyzerHelper   import AnalyzerHelper
    
    
    logger = logging.getLogger(__name__)
    
    
    class DaskStreamer(threading.Thread):
        def __init__(self, key, input_kpis, output_kpis, thresholds, 
                     batch_size        = 5, 
                     batch_duration    = None,
                     window_size       = None,
                     cluster_instance  = None,
                     producer_instance = AnalyzerHelper.initialize_kafka_producer()
                     ):
            super().__init__()
            self.key            = key
            self.input_kpis     = input_kpis
            self.output_kpis    = output_kpis
            self.thresholds     = thresholds
            self.window_size    = window_size      # TODO: Not implemented
            self.batch_size     = batch_size
            self.batch_duration = batch_duration
            self.running        = True
            self.batch          = []
    
            # Initialize Kafka and Dask components
            self.client   = AnalyzerHelper.initialize_dask_client(cluster_instance)
            self.consumer = AnalyzerHelper.initialize_kafka_consumer()      # Single-threaded consumer
            self.producer = producer_instance
    
            logger.info("Dask Streamer initialized.")
    
        def run(self):
            """Main method to start the DaskStreamer."""
            try:
                logger.info("Starting Dask Streamer")
                last_batch_time = time.time()
                while True:
                    if not self.consumer:
                        logger.warning("Kafka consumer is not initialized or stopped. Exiting loop.")
                        break
                    if not self.running:
                        logger.warning("Dask Streamer instance has been terminated. Exiting loop.")
                        break
                    if not self.client:
                        logger.warning("Dask client is not running. Exiting loop.")
                        break
                    message = self.consumer.poll(timeout=1.0)
                    if message is None:
                        # logger.info("No new messages received.")
                        continue
                    if message.error():
                        if message.error().code() == KafkaError._PARTITION_EOF:
                            logger.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}")
                        elif message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
                            logger.error(f"Subscribed topic {message.topic()} does not exist. May be topic does not have any messages.")
                            continue
                        elif message.error():
                            raise KafkaException(message.error())
                    else:
                        try:
                            value = json.loads(message.value())
                        except json.JSONDecodeError:
                            logger.error(f"Failed to decode message: {message.value()}")
                            continue
                        self.batch.append(value)
    
                    # Window size has a precedence over batch size
                    if self.batch_duration is None:
                        if len(self.batch) >= self.batch_size:  # If batch size is not provided, process continue with the default batch size
                            logger.info(f"Processing based on batch size {self.batch_size}.")
                            self.task_handler_selector()
                            self.batch = []
                    else:
                        # Process based on window size
                        current_time = time.time()
                        if (current_time - last_batch_time) >= self.batch_duration and self.batch:
                            logger.info(f"Processing based on window size {self.batch_duration}.")
                            self.task_handler_selector()
                            self.batch = []
                            last_batch_time = current_time
    
            except Exception as e:
                logger.exception(f"Error in Dask streaming process: {e}")
            finally:
                self.stop()
                logger.info(">>> Exiting Dask Streamer...")
    
        def task_handler_selector(self):
            """Select the task handler based on the task type."""
            logger.info(f"Batch to be processed: {self.batch}")
            if Handlers.is_valid_handler(self.thresholds["task_type"]):
                if self.client is not None and self.client.status == 'running':
                    try:
                        future = self.client.submit(aggregation_handler, "batch size", self.key,
                                                        self.batch, self.input_kpis, self.output_kpis, self.thresholds)
                        future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value))
                    except Exception as e:
                        logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}")
                else:
                    logger.warning("Dask client is not running. Skipping processing.")
            else:
                logger.warning(f"Unknown task type: {self.thresholds['task_type']}. Skipping processing.")
    
        def produce_result(self, result, destination_topic):
            """Produce results to the Kafka topic."""
            if not result:
                logger.warning("Nothing to produce. Skipping.")
                return
            for record in result:
                try:
                    self.producer.produce(
                        destination_topic,
                        key=str(record.get('kpi_id', '')),
                        value=json.dumps(record),
                        callback=AnalyzerHelper.delivery_report
                    )
                except KafkaException as e:
                    logger.error(f"Failed to produce message: {e}")
            self.producer.flush()
            logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
    
        def stop(self):
            """Clean up Kafka and Dask thread resources."""
            if not self.running:
                logger.info("Dask Streamer is already stopped.")
                return
            self.running = False
            logger.info("Streamer running status is set to False. Waiting 5 seconds before stopping...")
            time.sleep(5)       # Waiting time for running tasks to complete
            if self.consumer:
                try:
                    self.consumer.close()
                    logger.info("Kafka consumer closed.")
                except Exception as e:
                    logger.error(f"Error closing Kafka consumer: {e}")
    
            if self.client is not None and hasattr(self.client, 'status') and self.client.status == 'running':
                try:
                    self.client.close()
                    logger.info("Dask client closed.")
                except Exception as e:
                    logger.error(f"Error closing Dask client: {e}")
    
    # TODO: May be Single streamer for all analyzers ... ?