Commit 5f72599e authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Analytics bacnkend and KPI value API

- Analytics backend Request listener will terminated incase topic doesn't exists.
- The GetAlrams() in KpiValueApi is updated to process the new resposne from daskstreamer.
parent 10d86c76
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ PROJECTDIR=`pwd`

cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
+1 −0
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ PROJECTDIR=`pwd`

cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
+3 −2
Original line number Diff line number Diff line
@@ -19,8 +19,9 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src

RCFILE=$PROJECTDIR/coverage/.coveragerc
KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
KFK_SERVER_ADDRESS=${KAFKA_IP}:9092
# KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
# export KFK_SERVER_ADDRESS=${KAFKA_IP}:9092
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
# helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG -o log_cli=true --verbose \
    kpi_value_api/tests/test_kpi_value_api.py
+1 −1
Original line number Diff line number Diff line
@@ -61,7 +61,7 @@ class AnalyticsBackendService(GenericGrpcService):
                else:
                    LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
                    print       ("Consumer error: {:}".format(receive_msg.error()))
                    continue
                    break
            try:
                analyzer      = json.loads(receive_msg.value().decode('utf-8'))
                analyzer_uuid = receive_msg.key().decode('utf-8')
+23 −24
Original line number Diff line number Diff line
@@ -46,7 +46,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
        self.scheduler      = BackgroundScheduler()
        self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
        self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
                                            'group.id'           : 'analytics-frontend',
                                            'group.id'           : 'kpi-value-api-frontend',
                                            'auto.offset.reset'  : 'latest'})
        
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
@@ -152,9 +152,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):

        for alarm_key, value in self.StartResponseListener(request.kpi_id.uuid):
            response.start_timestamp.timestamp = datetime.strptime(
                value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
            response.end_timestamp.timestamp = datetime.strptime(
                value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
                value["window_start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
            response.kpi_id.kpi_id.uuid  = value['kpi_id']
            for key, threshold in value.items():
                if key not in ['kpi_id', 'window']:
@@ -183,10 +181,10 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
                key, value = self.result_queue.get()  # Wait until a result is available
                LOGGER.info("In while true ...")
                yield key, value  # Yield the result to the calling function
        except KeyboardInterrupt:
            LOGGER.warning("Listener stopped manually.")
        except Exception as e:
            LOGGER.warning("Listener stopped. Error: {:}".format(e))
        finally:
            self.StopListener()
            self.scheduler.shutdown()

    def response_listener(self, filter_key=None):
        """
@@ -196,13 +194,14 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):

        consumer = self.kafka_consumer
        consumer.subscribe([self.listener_topic])
        msg = consumer.poll(2.0)
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
            return
                continue
            elif msg.error():
                if msg.error().code() != KafkaError._PARTITION_EOF:
                    LOGGER.error(f"Kafka error: {msg.error()}")
            return
                break
            try:
                key = msg.key().decode('utf-8') if msg.key() else None
                if filter_key is not None and key == filter_key: