Skip to content
Snippets Groups Projects
VntRecommThread.py 3.69 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json, logging, socketio, threading
from typing import Dict, List
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from kafka import KafkaConsumer, TopicPartition
from kafka.consumer.fetcher import ConsumerRecord
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .Constants import SIO_NAMESPACE, SIO_ROOM

LOGGER = logging.getLogger(__name__)

class VntRecommThread(threading.Thread):
    def __init__(self, namespace : socketio.Namespace):
        super().__init__(daemon=True)
        self._terminate = threading.Event()
        self._namespace = namespace

    def start(self):
        self._terminate.clear()
        return super().start()

    def stop(self) -> None:
        self._terminate.set()

    def run(self):
        LOGGER.info('[run] Starting...')
        try:
            kafka_consumer = KafkaConsumer(
                bootstrap_servers = KafkaConfig.get_kafka_address(),
                group_id          = None, # consumer dispatch all messages sent to subscribed topics
                auto_offset_reset = 'latest',
            )

            LOGGER.info('[run] Subscribing...')
            kafka_consumer.subscribe(topics=[KafkaTopic.VNTMANAGER_REQUEST.value])
            LOGGER.info('[run] Subscribed')

            while not self._terminate.is_set():
                topic_records : Dict[TopicPartition, List[ConsumerRecord]] = \
                    kafka_consumer.poll(timeout_ms=1000, max_records=1)
                if len(topic_records) == 0: continue    # no pending records
                self.process_topic_records(topic_records)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            LOGGER.info('[run] Closing...')
            kafka_consumer.close()
        except: # pylint: disable=bare-except
            LOGGER.exception('[run] Unexpected Thread Exception')
        LOGGER.info('[run] Terminated')

    def process_topic_records(
        self, topic_records : Dict[TopicPartition, List[ConsumerRecord]]
    ) -> None:
        MSG = '[process_topic_records] topic_records={:s}'
        LOGGER.debug(MSG.format(str(topic_records)))
        for topic, records in topic_records.items():
            if topic.topic == KafkaTopic.VNTMANAGER_REQUEST.value:
                for record in records: self.emit_recommendation(record)

    def emit_recommendation(self, record : ConsumerRecord) -> None:
        message_key   = record.key.decode('utf-8')
        message_value = record.value.decode('utf-8')
        message_value = json.loads(message_value)
        message_event = message_value.pop('event')
        message_data  = json.loads(message_value['data'])
        message_data['_request_key'] = message_key
        message_data = json.dumps(message_data)
        MSG = '[emit_recommendation] Recommendation: event={:s} data={:s}'
        LOGGER.debug(MSG.format(str(message_event), str(message_data)))

        LOGGER.debug('[emit_recommendation] checking server namespace...')
        server : socketio.Server = self._namespace.server
        if server is None: return
        LOGGER.debug('[emit_recommendation] emitting recommendation...')
        server.emit(message_event, message_data, namespace=SIO_NAMESPACE, to=SIO_ROOM)
        LOGGER.debug('[emit_recommendation] emitted')