# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging, socketio, threading from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from kafka import KafkaConsumer from .Constants import SIO_NAMESPACE, SIO_ROOM logging.getLogger('kafka.client').setLevel(logging.WARNING) logging.getLogger('kafka.cluster').setLevel(logging.WARNING) logging.getLogger('kafka.conn').setLevel(logging.WARNING) logging.getLogger('kafka.consumer.fetcher').setLevel(logging.WARNING) logging.getLogger('kafka.consumer.group').setLevel(logging.WARNING) logging.getLogger('kafka.consumer.subscription_state').setLevel(logging.WARNING) logging.getLogger('kafka.metrics.metrics').setLevel(logging.WARNING) logging.getLogger('kafka.protocol.parser').setLevel(logging.WARNING) LOGGER = logging.getLogger(__name__) class VntRecommThread(threading.Thread): def __init__(self, namespace : socketio.Namespace): super().__init__(daemon=True) self._terminate = threading.Event() self._namespace = namespace def start(self): self._terminate.clear() return super().start() def stop(self) -> None: self._terminate.set() def run(self): LOGGER.info('[run] Starting...') try: kafka_consumer = KafkaConsumer( bootstrap_servers = KafkaConfig.get_kafka_address(), group_id = None, # consumer dispatch all messages sent to subscribed topics auto_offset_reset = 'latest', ) LOGGER.info('[run] Subscribing...') kafka_consumer.subscribe(topics=[KafkaTopic.VNTMANAGER_REQUEST.value]) LOGGER.info('[run] Subscribed') while not self._terminate.is_set(): records = kafka_consumer.poll(timeout_ms=1000, max_records=1) if len(records) == 0: continue # no pending messages... continuing MSG = '[run] records={:s}' LOGGER.debug(MSG.format(str(records))) raise NotImplementedError('parse kafka records and extract recommendation') #if vntm_request.error(): # if vntm_request.error().code() == KafkaError._PARTITION_EOF: continue # MSG = '[run] Consumer error: {:s}' # LOGGER.error(MSG.format(str(vntm_request.error()))) # break #message_key = vntm_request.key().decode('utf-8') #message_value = vntm_request.value().decode('utf-8') #MSG = '[run] Recommendation: key={:s} value={:s}' #LOGGER.debug(MSG.format(str(message_key), str(message_value))) # #LOGGER.debug('[run] checking server namespace...') #server : socketio.Server = self._namespace.server #if server is None: continue #LOGGER.debug('[run] emitting recommendation...') #server.emit('recommendation', message_value, namespace=SIO_NAMESPACE, to=SIO_ROOM) #LOGGER.debug('[run] emitted') LOGGER.info('[run] Closing...') kafka_consumer.close() except: # pylint: disable=bare-except LOGGER.exception('[run] Unexpected Thread Exception') LOGGER.info('[run] Terminated')