Commit 98039a9c authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Kafka topic name changed from "labled" to "labeled"

parent fe6ced62
Loading
Loading
Loading
Loading
+9 −7
Original line number Diff line number Diff line
@@ -28,7 +28,7 @@ LOGGER = logging.getLogger(__name__)
KAFKA_SERVER_IP    = '127.0.0.1:9092'
# ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
KAFKA_TOPICS       = {'request' : 'topic_request', 'response': 'topic_response',
                      'raw'     : 'topic_raw'    , 'labeled' : 'topic_labled'}
                      'raw'     : 'topic_raw'    , 'labeled' : 'topic_labeled'}
PRODUCER_CONFIG    = {'bootstrap.servers': KAFKA_SERVER_IP,}
CONSUMER_CONFIG    = {'bootstrap.servers' : KAFKA_SERVER_IP,
                      'group.id'          : 'kpi_composer',
@@ -45,12 +45,12 @@ class KpiValueComposer:
    
    @staticmethod
    def compose_kpi():
        KpiValueComposer.run_kafka_listener()
    
    @staticmethod
    def run_kafka_listener():
        threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start()
    
    # @staticmethod
    # def run_kafka_listener():
    #     threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start()
    
    @staticmethod
    def kafka_listener():
        """
@@ -91,17 +91,19 @@ class KpiValueComposer:
                    if kpi_name.endswith('}'):
                        (kpi_name, sub_names) = kpi_name.replace('}','').split('{')
                    print("Extracted row that match the KPI {:}".format((kpi_name, sub_names, kpi_value)))
                    kpi_descriptor = KpiValueComposer.request_kpi_descriptor_from_db()
                    kpi_descriptor = KpiValueComposer.request_kpi_descriptor_from_db(kpi_name)
                    if kpi_descriptor is not None:
                        kpi_to_produce = KpiValueComposer.merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value)
                        producerObj = KafkaProducer(PRODUCER_CONFIG)
                        producerObj.produce(KAFKA_TOPICS['labeled'], key="labeled", value= str(kpi_to_produce), callback=KpiValueComposer.delivery_callback)
                        producerObj.flush()
                    else:
                        print ("No matching of KPI ({:}) found in db".format(kpi_name))
            except Exception as e:
                print("Unable to extract kpi name and value from raw data: ERROR Info: {:}".format(e))
        
    @staticmethod
    def request_kpi_descriptor_from_db(kpi_name: str = KPIs_TO_SEARCH[0]):
    def request_kpi_descriptor_from_db(kpi_name: str = KPIs_TO_SEARCH[0]): #  = KPIs_TO_SEARCH[0] is added for testing
        col_name = "kpi_description"
        kpiDBobj = Kpi_DB()
        row = kpiDBobj.search_db_row_by_id(DB_TABLE_NAME, col_name, kpi_name)
+1 −1
Original line number Diff line number Diff line
@@ -34,7 +34,7 @@ KAFKA_SERVER_IP = '127.0.0.1:9092'
# KAFKA_SERVER_IP    = '10.152.183.175:30092'
ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
KAFKA_TOPICS       = {'request' : 'topic_request', 'response': 'topic_response',
                      'raw'     : 'topic_raw'    , 'labeled' : 'topic_labled'}
                      'raw'     : 'topic_raw'    , 'labeled' : 'topic_labeled'}
EXPORTER_ENDPOINT  = "http://10.152.183.2:9100/metrics"
PRODUCER_CONFIG    = {'bootstrap.servers': KAFKA_SERVER_IP,}