# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from dask.distributed import Client, LocalCluster from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from confluent_kafka import Consumer, Producer, KafkaException, KafkaError import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') class AnalyzerHelper: def __init__(self): pass @staticmethod def initialize_dask_client(n_workers=1, threads_per_worker=1): """Initialize a local Dask cluster and client.""" cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker) client = Client(cluster) logger.info(f"Dask Client Initialized: {client}") return client, cluster @staticmethod def initialize_kafka_consumer(): """Initialize the Kafka consumer.""" consumer_conf = { 'bootstrap.servers': KafkaConfig.get_kafka_address(), 'group.id': 'analytics-backend', 'auto.offset.reset': 'latest' } consumer = Consumer(consumer_conf) consumer.subscribe([KafkaTopic.VALUE.value]) return consumer @staticmethod def initialize_kafka_producer(): """Initialize the Kafka producer.""" return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) @staticmethod def delivery_report(err, msg): if err is not None: logger.error(f"Message delivery failed: {err}") else: logger.debug(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")