Skip to content
VntRecommThread.py 3.8 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, socketio, threading
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from kafka import KafkaConsumer
from .Constants import SIO_NAMESPACE, SIO_ROOM

logging.getLogger('kafka.client').setLevel(logging.WARNING)
logging.getLogger('kafka.cluster').setLevel(logging.WARNING)
logging.getLogger('kafka.conn').setLevel(logging.WARNING)
logging.getLogger('kafka.consumer.fetcher').setLevel(logging.WARNING)
logging.getLogger('kafka.consumer.group').setLevel(logging.WARNING)
logging.getLogger('kafka.consumer.subscription_state').setLevel(logging.WARNING)
logging.getLogger('kafka.metrics.metrics').setLevel(logging.WARNING)
logging.getLogger('kafka.protocol.parser').setLevel(logging.WARNING)

LOGGER = logging.getLogger(__name__)

class VntRecommThread(threading.Thread):
    def __init__(self, namespace : socketio.Namespace):
        super().__init__(daemon=True)
        self._terminate = threading.Event()
        self._namespace = namespace

    def start(self):
        self._terminate.clear()
        return super().start()

    def stop(self) -> None:
        self._terminate.set()

    def run(self):
        LOGGER.info('[run] Starting...')
        try:
            kafka_consumer = KafkaConsumer(
                bootstrap_servers = KafkaConfig.get_kafka_address(),
                group_id          = None, # consumer dispatch all messages sent to subscribed topics
                auto_offset_reset = 'latest',
            )

            LOGGER.info('[run] Subscribing...')
            kafka_consumer.subscribe(topics=[KafkaTopic.VNTMANAGER_REQUEST.value])
            LOGGER.info('[run] Subscribed')

            while not self._terminate.is_set():
                records = kafka_consumer.poll(timeout_ms=1000, max_records=1)
                if len(records) == 0: continue  # no pending messages... continuing

                MSG = '[run] records={:s}'
                LOGGER.debug(MSG.format(str(records)))
                raise NotImplementedError('parse kafka records and extract recommendation')

                #if vntm_request.error():
                #    if vntm_request.error().code() == KafkaError._PARTITION_EOF: continue
                #    MSG = '[run] Consumer error: {:s}'
                #    LOGGER.error(MSG.format(str(vntm_request.error())))
                #    break
                #message_key = vntm_request.key().decode('utf-8')
                #message_value = vntm_request.value().decode('utf-8')
                #MSG = '[run] Recommendation: key={:s} value={:s}'
                #LOGGER.debug(MSG.format(str(message_key), str(message_value)))
                #
                #LOGGER.debug('[run] checking server namespace...')
                #server : socketio.Server = self._namespace.server
                #if server is None: continue
                #LOGGER.debug('[run] emitting recommendation...')
                #server.emit('recommendation', message_value, namespace=SIO_NAMESPACE, to=SIO_ROOM)
                #LOGGER.debug('[run] emitted')
            
            LOGGER.info('[run] Closing...')
            kafka_consumer.close()
        except: # pylint: disable=bare-except
            LOGGER.exception('[run] Unexpected Thread Exception')
        LOGGER.info('[run] Terminated')