Commit c0c517f0 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Minor Changes in New Monitoring Services to Improve Overall Functionality

- Added new tests in the `scripts` folder.
- Improved messages and test files across all services.
Telemetry and Analytics Backend:
- Added a `try` statement to handle Kafka message decoding errors.
Telemetry Frontend and KPI Value API:
- Downgraded `apscheduler` from `3.10.4` to `3.10.1` in the requirements file.
Kafka:
- Added a method to retrieve `KAFKA_SERVER_ADDRESS` from environment variables.
Telemetry Backend:
- Added a condition to allow the collector backend to run indefinitely.
- Changed the extracted `KpiValue` timestamp format to `"%Y-%m-%dT%H:%M:%SZ"` to meet Apache Spark requirements.
- Removed some unused packages from the `requirements.in` file.
Deployment Script:
- Added `"telemetry"` and `"analytics"` component names to the `TFS_COMPONENTS` variable in the `my_deploy.sh` file.
parent 6b136318
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -26,7 +26,7 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene
#export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring"

# Uncomment to activate Monitoring Framework (new)
#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api"
#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics"

# Uncomment to activate BGP-LS Speaker
#export TFS_COMPONENTS="${TFS_COMPONENTS} bgpls_speaker"
+1 −1
Original line number Diff line number Diff line
@@ -24,7 +24,7 @@ cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py

RCFILE=$PROJECTDIR/coverage/.coveragerc
CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace ${CRDB_NAMESPACE} -o 'jsonpath={.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
    kpi_manager/tests/test_kpi_db.py
+2 −1
Original line number Diff line number Diff line
@@ -20,7 +20,8 @@ cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc
# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
#     kpi_manager/tests/test_unitary.py

CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=debug --verbose \
    telemetry/tests/test_telemetryDB.py
+1 −1
Original line number Diff line number Diff line
@@ -26,4 +26,4 @@ CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o js
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \
    telemetry/backend/tests/test_TelemetryBackend.py
    telemetry/backend/tests/test_backend.py
+17 −14
Original line number Diff line number Diff line
@@ -46,10 +46,10 @@ class AnalyticsBackendService(GenericGrpcService):
        thresholds    = analyzer['thresholds']
        window_size   = analyzer['window_size']
        window_slider = analyzer['window_slider']
        print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
            kpi_list, oper_list, thresholds, window_size, window_slider))
        LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
            kpi_list, oper_list, thresholds, window_size, window_slider))
        # print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
        #     kpi_list, oper_list, thresholds, window_size, window_slider))
        # LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
        #     kpi_list, oper_list, thresholds, window_size, window_slider))
        try:
            stop_event = threading.Event()
            thread = threading.Thread(target=SparkStreamer, 
@@ -86,6 +86,7 @@ class AnalyticsBackendService(GenericGrpcService):
        listener for requests on Kafka topic.
        """
        LOGGER.info("Request Listener is initiated ...")
        print      ("Request Listener is initiated ...")
        consumer = self.kafka_consumer
        consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
        while True:
@@ -98,6 +99,7 @@ class AnalyticsBackendService(GenericGrpcService):
                else:
                    print("Consumer error: {}".format(receive_msg.error()))
                    break
            try:
                analyzer      = json.loads(receive_msg.value().decode('utf-8'))
                analyzer_uuid = receive_msg.key().decode('utf-8')
                LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
@@ -107,8 +109,9 @@ class AnalyticsBackendService(GenericGrpcService):
                    self.TerminateAnalyzerBackend(analyzer_uuid)
                else:
                    self.StartSparkStreamer(analyzer_uuid, analyzer)
        LOGGER.debug("Stop Event activated. Terminating...")
        print       ("Stop Event activated. Terminating...")
            except Exception as e:
                LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
                print         ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))

    def TerminateAnalyzerBackend(self, analyzer_uuid):
        if analyzer_uuid in self.running_threads:
Loading