# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import json
import logging
import threading
from common.tools.service.GenericGrpcService import GenericGrpcService
from analytics.backend.service.SparkStreaming import SparkStreamer
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError

LOGGER = logging.getLogger(__name__)

class AnalyticsBackendService(GenericGrpcService):
    """
    Class listens for ...
    """
    def __init__(self, cls_name : str = __name__) -> None:
        self.running_threads = {}       # To keep track of all running analyzers 
        self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
                                            'group.id'           : 'analytics-frontend',
                                            'auto.offset.reset'  : 'latest'})

    def StartSparkStreamer(self, analyzer_uuid, analyzer):
        kpi_list      = analyzer['input_kpis'] 
        oper_list     = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())]  # TODO: update this line...
        thresholds    = analyzer['thresholds']
        window_size   = analyzer['window_size']
        window_slider = analyzer['window_slider']
        print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
            kpi_list, oper_list, thresholds, window_size, window_slider))
        LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
            kpi_list, oper_list, thresholds, window_size, window_slider))
        try:
            stop_event = threading.Event()
            thread = threading.Thread(target=SparkStreamer, 
                            args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event,
                                  window_size, window_slider, None ))
            self.running_threads[analyzer_uuid] = (thread, stop_event)
            thread.start()
            print      ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
            LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
            return True
        except Exception as e:
            print       ("Failed to initiate Analyzer backend: {:}".format(e))
            LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
            return False

    def StopRequestListener(self, threadInfo: tuple):
        try:
            thread, stop_event = threadInfo
            stop_event.set()
            thread.join()
            print      ("Terminating Analytics backend RequestListener")
            LOGGER.info("Terminating Analytics backend RequestListener")
            return True
        except Exception as e:
            print       ("Failed to terminate analytics backend {:}".format(e))
            LOGGER.error("Failed to terminate analytics backend {:}".format(e))
            return False

    def StartRequestListener(self)->tuple:
        stop_event = threading.Event()
        thread = threading.Thread(target=self.RequestListener,
                                  args=(stop_event,) )
        thread.start()
        return (thread, stop_event)

    def RequestListener(self, stop_event):
        """
        listener for requests on Kafka topic.
        """
        consumer = self.kafka_consumer
        consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
        while not stop_event.is_set():
            receive_msg = consumer.poll(2.0)
            if receive_msg is None:
                continue
            elif receive_msg.error():
                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print("Consumer error: {}".format(receive_msg.error()))
                    break
            analyzer    = json.loads(receive_msg.value().decode('utf-8'))
            analyzer_uuid = receive_msg.key().decode('utf-8')
            LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
            print       ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))

            if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
                self.TerminateAnalyzerBackend(analyzer_uuid)
            else:
                self.StartSparkStreamer(analyzer_uuid, analyzer)
        LOGGER.debug("Stop Event activated. Terminating...")
        print       ("Stop Event activated. Terminating...")

    def TerminateAnalyzerBackend(self, analyzer_uuid):
        if analyzer_uuid in self.running_threads:
            try:
                thread, stop_event = self.running_threads[analyzer_uuid]
                stop_event.set()
                thread.join()
                del self.running_threads[analyzer_uuid]
                print      ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
                LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
                return True
            except Exception as e:
                LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e))
                return False
        else:
            print         ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
            LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))           
            # generate confirmation towards frontend