Commit 2f9bdc38 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Telemetry Start,Stop and select Collector complete implementation

parent 4f2374f6
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -24,5 +24,5 @@ cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py

RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=INFO --verbose \
python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
    telemetry/backend/tests/testTelemetryBackend.py
 No newline at end of file
+26 −9
Original line number Diff line number Diff line
@@ -33,7 +33,6 @@ LOGGER = logging.getLogger(__name__)
METRICS_POOL       = MetricsPool('Telemetry', 'TelemetryBackend')
KAFKA_SERVER_IP    = '127.0.0.1:9092'
ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
ACTIVE_COLLECTORS  = []
KAFKA_TOPICS       = {'request' : 'topic_request', 
                      'response': 'topic_response'}
EXPORTER_ENDPOINT  = "http://node-exporter-7465c69b87-b6ks5.telebackend:9100/metrics"
@@ -45,6 +44,7 @@ class TelemetryBackendService:

    def __init__(self):
        LOGGER.info('Init TelemetryBackendService')
        self.running_threads = {}
    
    def run_kafka_listener(self)->bool:
        threading.Thread(target=self.kafka_listener).start()
@@ -68,7 +68,7 @@ class TelemetryBackendService:
            receive_msg = consumerObj.poll(2.0)
            if receive_msg is None:
                # print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", topic_request)     # added for debugging purposes
                print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", KAFKA_TOPICS['request'])     # added for debugging purposes
                # print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", KAFKA_TOPICS['request'])     # added for debugging purposes
                continue
            elif receive_msg.error():
                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
@@ -78,23 +78,31 @@ class TelemetryBackendService:
                    break
            (kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8'))
            collector_id = receive_msg.key().decode('utf-8')
            if duration == -1 and interval == -1:
                self.terminate_collector_backend(collector_id)
                # threading.Thread(target=self.terminate_collector_backend, args=(collector_id))
            else:
                self.run_initiate_collector_backend(collector_id, kpi_id, duration, interval)


    def run_initiate_collector_backend(self, collector_id: str, kpi_id: str, duration: int, interval: int):
        threading.Thread(target=self.initiate_collector_backend, args=(collector_id, kpi_id, duration, interval)).start()
        stop_event = threading.Event()
        thread = threading.Thread(target=self.initiate_collector_backend, 
                                  args=(collector_id, kpi_id, duration, interval, stop_event))
        self.running_threads[collector_id] = (thread, stop_event)
        thread.start()

    def initiate_collector_backend(self, collector_id, kpi_id, duration, interval
    def initiate_collector_backend(self, collector_id, kpi_id, duration, interval, stop_event
                        ): # type: ignore
        """
        Method to receive collector request attribues and initiates collecter backend.
        """
        print("Initiating backend for collector: ", collector_id)
        start_time = time.time()
        while True:
            ACTIVE_COLLECTORS.append(collector_id)
        while not stop_event.is_set():
            if time.time() - start_time >= duration:            # condition to terminate backend
                print("Execution Time Completed: \n --- Consumer terminating: KPI ID: ", kpi_id, " - ", time.time() - start_time)
                self.generate_kafka_response(collector_id, "NULL", False)
                print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time)
                self.generate_kafka_response(collector_id, "-1", -1)
                # write to Kafka to send the termination confirmation.
                break
            # print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval)
@@ -125,6 +133,15 @@ class TelemetryBackendService:
        producerObj.produce(KAFKA_TOPICS['response'], key=msg_key, value= str(msg_value), callback=self.delivery_callback)
        producerObj.flush()

    def terminate_collector_backend(self, collector_id):
        if collector_id in self.running_threads:
            thread, stop_event = self.running_threads[collector_id]
            stop_event.set()
            thread.join()
            print ("Terminating backend (by StopCollector): Collector Id: ", collector_id)
            del self.running_threads[collector_id]
            self.generate_kafka_response(collector_id, "-1", -1)

    def create_topic_if_not_exists(self, new_topics: list) -> bool:
        """
        Method to create Kafka topic if it does not exist.
+2 −4
Original line number Diff line number Diff line
@@ -20,8 +20,6 @@ from typing import Tuple
from common.proto.context_pb2 import Empty
from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService



LOGGER = logging.getLogger(__name__)


@@ -30,7 +28,7 @@ LOGGER = logging.getLogger(__name__)
###########################

def test_verify_kafka_topics():
    LOGGER.warning('test_receive_kafka_request requesting')
    LOGGER.info('test_verify_kafka_topics requesting')
    TelemetryBackendServiceObj = TelemetryBackendService()
    KafkaTopics = ['topic_request', 'topic_response']
    response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics)
@@ -38,7 +36,7 @@ def test_verify_kafka_topics():
    assert isinstance(response, bool)

def test_run_kafka_listener():
    LOGGER.warning('test_receive_kafka_request requesting')
    LOGGER.info('test_receive_kafka_request requesting')
    TelemetryBackendServiceObj = TelemetryBackendService()
    response = TelemetryBackendServiceObj.run_kafka_listener()
    LOGGER.debug(str(response))
+65 −40
Original line number Diff line number Diff line
@@ -44,9 +44,12 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
    def __init__(self, name_mapping : NameMapping):
        LOGGER.info('Init TelemetryFrontendService')
        self.managementDBobj = managementDB()
        self.kafka_producer = KafkaProducer({'bootstrap.servers': KAFKA_SERVER_IP,})
        self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KAFKA_SERVER_IP,
                                            'group.id'          : 'frontend',
                                            'auto.offset.reset' : 'latest'})


    def add_collector_to_db(self, request: Collector ):
    def add_collector_to_db(self, request: Collector ): # type: ignore
        try:
            # Create a new Collector instance
            collector_to_insert                     = CollectorModel()
@@ -66,6 +69,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
                       request : Collector, grpc_context: grpc.ServicerContext # type: ignore
                      ) -> CollectorId: # type: ignore
        # push info to frontend db
        LOGGER.info ("gRPC message: {:}".format(request))
        response = CollectorId()
        _collector_id       = str(request.collector_id.collector_id.uuid)
        _collector_kpi_id   = str(request.kpi_id.kpi_id.uuid)
@@ -73,37 +77,36 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
        _collector_interval = int(request.interval_s)
        # pushing Collector to DB
        self.add_collector_to_db(request)
        self.generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval)
        # self.run_generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval)
        self.publish_to_kafka_request_topic(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval)
        # self.run_publish_to_kafka_request_topic(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval)
        response.collector_id.uuid = request.collector_id.collector_id.uuid # type: ignore
        return response
    
    def run_generate_kafka_request(self, msg_key: str, kpi: str, duration : int, interval: int):
        threading.Thread(target=self.generate_kafka_request, args=(msg_key, kpi, duration, interval)).start()
    def run_publish_to_kafka_request_topic(self, msg_key: str, kpi: str, duration : int, interval: int):
        # Add threading.Thread() response to dictonary and call start() in the next statement
        threading.Thread(target=self.publish_to_kafka_request_topic, args=(msg_key, kpi, duration, interval)).start()

    # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def generate_kafka_request(self, 
                             msg_key: str, kpi: str, duration : int, interval: int
                             ) -> KafkaProducer:
    def publish_to_kafka_request_topic(self, 
                             collector_id: str, kpi: str, duration : int, interval: int
                             ):
        """
        Method to generate collector request to Kafka topic.
        """
        # time.sleep(5)
        producer_configs = {
            'bootstrap.servers': KAFKA_SERVER_IP,
        }
        # producer_configs = {
        #     'bootstrap.servers': KAFKA_SERVER_IP,
        # }
        # topic_request = "topic_request"
        msg_value   = Tuple [str, int, int]
        msg_value   = (kpi, duration, interval)
        # print ("Request generated: ", "Colletcor Id: ", msg_key, \
        msg_value : Tuple [str, int, int] = (kpi, duration, interval)
        # print ("Request generated: ", "Colletcor Id: ", collector_id, \
        #         ", \nKPI: ", kpi, ", Duration: ", duration, ", Interval: ", interval)
        producerObj = KafkaProducer(producer_configs)
        producerObj.produce(KAFKA_TOPICS['request'], key=msg_key, value= str(msg_value), callback=self.delivery_callback)
        LOGGER.info("Collector Request Generated: {:} -- {:} -- {:} -- {:}".format(msg_key, kpi, duration, interval))
        # producerObj.produce(topic_request, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
        ACTIVE_COLLECTORS.append(msg_key)
        producerObj.flush()
        return producerObj
        # producerObj = KafkaProducer(producer_configs)
        self.kafka_producer.produce(KAFKA_TOPICS['request'], key=collector_id, value= str(msg_value), callback=self.delivery_callback)
        # producerObj.produce(KAFKA_TOPICS['request'], key=collector_id, value= str(msg_value), callback=self.delivery_callback)
        LOGGER.info("Collector Request Generated: {:}, {:}, {:}, {:}".format(collector_id, kpi, duration, interval))
        # producerObj.produce(topic_request, key=collector_id, value= str(msg_value), callback=self.delivery_callback)
        ACTIVE_COLLECTORS.append(collector_id)
        self.kafka_producer.flush()

    def run_kafka_listener(self):
        # print ("--- STARTED: run_kafka_listener ---")
@@ -114,21 +117,21 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
            """
            listener for response on Kafka topic.
            """
            # print ("--- STARTED: kafka_listener ---")
            conusmer_configs = {
                'bootstrap.servers' : KAFKA_SERVER_IP,
                'group.id'          : 'frontend',
                'auto.offset.reset' : 'latest'
            }
            # topic_response = "topic_response"

            consumerObj = KafkaConsumer(conusmer_configs)
            consumerObj.subscribe([KAFKA_TOPICS['response']])
            # # print ("--- STARTED: kafka_listener ---")
            # conusmer_configs = {
            #     'bootstrap.servers' : KAFKA_SERVER_IP,
            #     'group.id'          : 'frontend',
            #     'auto.offset.reset' : 'latest'
            # }
            # # topic_response = "topic_response"

            # consumerObj = KafkaConsumer(conusmer_configs)
            self.kafka_consumer.subscribe([KAFKA_TOPICS['response']])
            # print (time.time())
            while True:
                receive_msg = consumerObj.poll(2.0)
                receive_msg = self.kafka_consumer.poll(2.0)
                if receive_msg is None:
                    print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['response'])     # added for debugging purposes
                    # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['response'])     # added for debugging purposes
                    continue
                elif receive_msg.error():
                    if receive_msg.error().code() == KafkaError._PARTITION_EOF:
@@ -140,7 +143,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
                    collector_id = receive_msg.key().decode('utf-8')
                    if collector_id in ACTIVE_COLLECTORS:
                        (kpi_id, kpi_value) = ast.literal_eval(receive_msg.value().decode('utf-8'))
                        self.process_response(kpi_id, kpi_value)
                        self.process_response(collector_id, kpi_id, kpi_value)
                    else:
                        print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ")
                except Exception as e:
@@ -148,8 +151,12 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
                    continue
                    # return None

    def process_response(self, kpi_id: str, kpi_value: Any):
        print ("Frontend - KPI: ", kpi_id, ", VALUE: ", kpi_value)
    def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any):
        if kpi_id == "-1" and kpi_value == -1:
            # LOGGER.info("Sucessfully terminated Collector: {:}".format(collector_id))
            print ("Sucessfully terminated Collector: ", collector_id)
        else:
            print ("Frontend-Received values Collector Id:", collector_id, "-KPI:", kpi_id, "-VALUE:", kpi_value)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def delivery_callback(self, err, msg):
@@ -168,12 +175,30 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
    def StopCollector(self, 
                      request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore
                     ) -> Empty:  # type: ignore
        request.collector_id.uuid = ""
        LOGGER.info ("gRPC message: {:}".format(request))
        _collector_id = request.collector_id.uuid
        self.publish_to_kafka_request_topic(_collector_id, "", -1, -1)
        return Empty()

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SelectCollectors(self, 
                         request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore
                        ) -> CollectorList:  # type: ignore
        LOGGER.info("gRPC message: {:}".format(request))
        response = CollectorList()
        filter_to_apply = dict()
        filter_to_apply['kpi_id']       = request.kpi_id[0].kpi_id.uuid
        # filter_to_apply['duration_s'] = request.duration_s[0]
        try:
            rows = self.managementDBobj.select_with_filter(CollectorModel, **filter_to_apply)
        except Exception as e:
            LOGGER.info('Unable to apply filter on kpi descriptor. {:}'.format(e))
        try:
            if len(rows) != 0:
                for row in rows:
                    collector_obj = Collector()
                    collector_obj.collector_id.collector_id.uuid = row.collector_id
                    response.collector_list.append(collector_obj)
            return response
        except Exception as e:
            LOGGER.info('Unable to process response {:}'.format(e))
 No newline at end of file
+41 −35
Original line number Diff line number Diff line
@@ -22,10 +22,10 @@ def create_collector_id():
    _collector_id.collector_id.uuid = uuid.uuid4()
    return _collector_id

def create_collector_id_a(coll_id_str : str):
    _collector_id                   = telemetry_frontend_pb2.CollectorId()
    _collector_id.collector_id.uuid = str(coll_id_str)
    return _collector_id
# def create_collector_id_a(coll_id_str : str):
#     _collector_id                   = telemetry_frontend_pb2.CollectorId()
#     _collector_id.collector_id.uuid = str(coll_id_str)
#     return _collector_id

def create_collector_request():
    _create_collector_request                                = telemetry_frontend_pb2.Collector()
@@ -35,39 +35,45 @@ def create_collector_request():
    _create_collector_request.interval_s                     = float(random.randint(2, 4))
    return _create_collector_request

def create_collector_request_a():
    _create_collector_request_a                                = telemetry_frontend_pb2.Collector()
    _create_collector_request_a.collector_id.collector_id.uuid = "-1"
    return _create_collector_request_a

def create_collector_request_b(str_kpi_id, coll_duration_s, coll_interval_s
                               ) -> telemetry_frontend_pb2.Collector:
    _create_collector_request_b                                = telemetry_frontend_pb2.Collector()
    _create_collector_request_b.collector_id.collector_id.uuid = '1'
    _create_collector_request_b.kpi_id.kpi_id.uuid             = str_kpi_id
    _create_collector_request_b.duration_s                     = coll_duration_s
    _create_collector_request_b.interval_s                     = coll_interval_s
    return _create_collector_request_b

def create_collector_filter():
    _create_collector_filter = telemetry_frontend_pb2.CollectorFilter()
    new_collector_id                       = _create_collector_filter.collector_id.add()
    new_collector_id.collector_id.uuid     = "COLL1"
    new_kpi_id                             = _create_collector_filter.kpi_id.add()
    new_kpi_id.kpi_id.uuid                 = "KPI1"
    new_device_id                          = _create_collector_filter.device_id.add()
    new_device_id.device_uuid.uuid         = 'DEV1'
    new_service_id                         = _create_collector_filter.service_id.add()
    new_service_id.service_uuid.uuid       = 'SERV1'
    new_slice_id                           = _create_collector_filter.slice_id.add()
    new_slice_id.slice_uuid.uuid           = 'SLC1'
    new_endpoint_id                        = _create_collector_filter.endpoint_id.add()
    new_endpoint_id.endpoint_uuid.uuid     = 'END1'
    new_connection_id                      = _create_collector_filter.connection_id.add()
    new_connection_id.connection_uuid.uuid = 'CON1'
    _create_collector_filter.kpi_sample_type.append(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED)
    new_kpi_id.kpi_id.uuid                 = "165d20c5-a446-42fa-812f-e2b7ed283c6f"
    return _create_collector_filter

def create_collector_list():
    _create_collector_list = telemetry_frontend_pb2.CollectorList()
    return _create_collector_list
 No newline at end of file
# def create_collector_request_a():
#     _create_collector_request_a                                = telemetry_frontend_pb2.Collector()
#     _create_collector_request_a.collector_id.collector_id.uuid = "-1"
#     return _create_collector_request_a

# def create_collector_request_b(str_kpi_id, coll_duration_s, coll_interval_s
#                                ) -> telemetry_frontend_pb2.Collector:
#     _create_collector_request_b                                = telemetry_frontend_pb2.Collector()
#     _create_collector_request_b.collector_id.collector_id.uuid = '1'
#     _create_collector_request_b.kpi_id.kpi_id.uuid             = str_kpi_id
#     _create_collector_request_b.duration_s                     = coll_duration_s
#     _create_collector_request_b.interval_s                     = coll_interval_s
#     return _create_collector_request_b

# def create_collector_filter():
#     _create_collector_filter = telemetry_frontend_pb2.CollectorFilter()
#     new_collector_id                       = _create_collector_filter.collector_id.add()
#     new_collector_id.collector_id.uuid     = "COLL1"
#     new_kpi_id                             = _create_collector_filter.kpi_id.add()
#     new_kpi_id.kpi_id.uuid                 = "KPI1"
#     new_device_id                          = _create_collector_filter.device_id.add()
#     new_device_id.device_uuid.uuid         = 'DEV1'
#     new_service_id                         = _create_collector_filter.service_id.add()
#     new_service_id.service_uuid.uuid       = 'SERV1'
#     new_slice_id                           = _create_collector_filter.slice_id.add()
#     new_slice_id.slice_uuid.uuid           = 'SLC1'
#     new_endpoint_id                        = _create_collector_filter.endpoint_id.add()
#     new_endpoint_id.endpoint_uuid.uuid     = 'END1'
#     new_connection_id                      = _create_collector_filter.connection_id.add()
#     new_connection_id.connection_uuid.uuid = 'CON1'
#     _create_collector_filter.kpi_sample_type.append(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED)
#     return _create_collector_filter

# def create_collector_list():
#     _create_collector_list = telemetry_frontend_pb2.CollectorList()
#     return _create_collector_list
 No newline at end of file
Loading