Skip to content
Snippets Groups Projects
Commit 1e90b149 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Initial Telemetry backend and new analytics integration test.

parent 26865f93
No related branches found
No related tags found
3 merge requests!346Draft: support for restconf protocol,!345Draft: support ipinfusion devices via netconf,!317Resolve "(CTTC) Analytics Module Enhancements"
...@@ -24,5 +24,5 @@ export KFK_SERVER_ADDRESS='127.0.0.1:9092' ...@@ -24,5 +24,5 @@ export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require" export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \
analytics/backend/tests/test_backend.py analytics/backend/tests/test_backend.py
...@@ -21,5 +21,5 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc ...@@ -21,5 +21,5 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc
export KFK_SERVER_ADDRESS='127.0.0.1:9092' export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require" export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \
analytics/frontend/tests/test_frontend.py analytics/frontend/tests/test_frontend.py
...@@ -12,10 +12,11 @@ ...@@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import time
import json import json
import logging import logging
import threading import threading
import pytz
from common.tools.service.GenericGrpcService import GenericGrpcService from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Consumer as KafkaConsumer
...@@ -23,7 +24,11 @@ from confluent_kafka import KafkaError ...@@ -23,7 +24,11 @@ from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc from common.Settings import get_service_port_grpc
from threading import Thread, Event from threading import Thread, Event
# from .DaskStreaming import DaskStreamer from analytics.backend.service.Streamer import DaskStreamer
from common.proto.analytics_frontend_pb2 import Analyzer
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -35,13 +40,18 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -35,13 +40,18 @@ class AnalyticsBackendService(GenericGrpcService):
LOGGER.info('Init AnalyticsBackendService') LOGGER.info('Init AnalyticsBackendService')
port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND) port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND)
super().__init__(port, cls_name=cls_name) super().__init__(port, cls_name=cls_name)
self.schedular = BackgroundScheduler(daemon=True)
self.schedular.start()
self.running_threads = {} # To keep track of all running analyzers self.running_threads = {} # To keep track of all running analyzers
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend', 'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'}) 'auto.offset.reset' : 'latest'})
def install_servicers(self): def install_servicers(self):
threading.Thread(target=self.RequestListener, args=()).start() threading.Thread(
target=self.RequestListener,
args=()
).start()
def RequestListener(self): def RequestListener(self):
""" """
...@@ -69,56 +79,53 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -69,56 +79,53 @@ class AnalyticsBackendService(GenericGrpcService):
# print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) # print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
self.StopDaskListener(analyzer_uuid) self.StopStreamer(analyzer_uuid)
else: else:
self.StartDaskListener(analyzer_uuid, analyzer) self.StartStreamer(analyzer_uuid, analyzer)
except Exception as e: except Exception as e:
LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
# print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
def StartDaskListener(self, analyzer_uuid, analyzer):
kpi_list = analyzer[ 'input_kpis' ]
thresholds = analyzer[ 'thresholds' ]
window_size = analyzer[ 'window_size' ]
window_slider = analyzer[ 'window_slider']
LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format( def StartStreamer(self, analyzer_uuid : str, analyzer : json):
kpi_list, thresholds, window_size, window_slider)) """
# print ("Received parameters: {:} - {:} - {:} - {:}".format( Start the DaskStreamer with the given parameters.
# kpi_list, thresholds, window_size, window_slider)) """
try: try:
stop_event = Event() streamer = DaskStreamer(
thread = Thread( analyzer_uuid,
target=None, # DaskStreamer, analyzer['input_kpis' ],
# args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event), analyzer['output_kpis'],
args=(analyzer['output_kpis'][0] , kpi_list, thresholds, stop_event), analyzer['thresholds' ],
kwargs={ analyzer['batch_size' ],
"window_size" : window_size, analyzer['window_size'],
}
) )
thread.start() self.schedular.add_job(
self.running_threads[analyzer_uuid] = (thread, stop_event) streamer.run,
# print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) 'date',
LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) run_date=datetime.now(pytz.utc),
id=analyzer_uuid,
replace_existing=True
)
LOGGER.info("Dask Streamer started.")
return True return True
except Exception as e: except Exception as e:
# print ("Failed to initiate Analyzer backend: {:}".format(e)) LOGGER.error("Failed to start Dask Streamer. ERROR: {:}".format(e))
LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
return False return False
def StopDaskListener(self, analyzer_uuid): def StopStreamer(self, analyzer_uuid : str):
if analyzer_uuid in self.running_threads: """
try: Stop the DaskStreamer with the given analyzer_uuid.
thread, stop_event = self.running_threads[analyzer_uuid] """
stop_event.set() try:
thread.join() active_jobs = self.schedular.get_jobs()
del self.running_threads[analyzer_uuid] logger.debug("Active Jobs: {:}".format(active_jobs))
# print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) if analyzer_uuid not in [job.id for job in active_jobs]:
LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid))
return True
except Exception as e:
LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e))
return False return False
else: self.schedular.remove_job(analyzer_uuid)
# print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) LOGGER.info("Dask Streamer stopped.")
LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) return True
except Exception as e:
LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e))
return False
...@@ -18,7 +18,7 @@ import pandas as pd ...@@ -18,7 +18,7 @@ import pandas as pd
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s')
class AnalyzerHandlers(Enum): class AnalyzerHandlers(Enum):
AGGREGATION_HANDLER = "AggregationHandler" AGGREGATION_HANDLER = "AggregationHandler"
...@@ -49,14 +49,14 @@ def threshold_handler(key, aggregated_df, thresholds): ...@@ -49,14 +49,14 @@ def threshold_handler(key, aggregated_df, thresholds):
continue continue
# Ensure the threshold values are valid (check for tuple specifically) # Ensure the threshold values are valid (check for tuple specifically)
if isinstance(threshold_values, tuple) and len(threshold_values) == 2: if isinstance(threshold_values, list) and len(threshold_values) == 2:
fail_th, raise_th = threshold_values fail_th, raise_th = threshold_values
# Add threshold columns with updated naming # Add threshold columns with updated naming
aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th
aggregated_df[f"{metric_name}_TH_FALL"] = aggregated_df[metric_name] < fail_th aggregated_df[f"{metric_name}_TH_FALL"] = aggregated_df[metric_name] < fail_th
else: else:
logger.warning(f"Threshold values for '{metric_name}' are not a tuple of length 2. Skipping threshold application.") logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a tuple of length 2. Skipping threshold application.")
return aggregated_df return aggregated_df
def aggregation_handler( def aggregation_handler(
...@@ -71,7 +71,7 @@ def aggregation_handler( ...@@ -71,7 +71,7 @@ def aggregation_handler(
logger.info("Empty batch received. Skipping processing.") logger.info("Empty batch received. Skipping processing.")
return [] return []
else: else:
logger.info(f"Processing {len(batch)} records for key: {key}") logger.info(f" >>>>> Processing {len(batch)} records for key: {key}")
# Convert data into a DataFrame # Convert data into a DataFrame
df = pd.DataFrame(batch) df = pd.DataFrame(batch)
......
...@@ -24,7 +24,7 @@ from .AnalyzerHelper import AnalyzerHelper ...@@ -24,7 +24,7 @@ from .AnalyzerHelper import AnalyzerHelper
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s')
class DaskStreamer: class DaskStreamer:
...@@ -59,7 +59,7 @@ class DaskStreamer: ...@@ -59,7 +59,7 @@ class DaskStreamer:
if not self.running: if not self.running:
logger.warning("Dask Streamer is not running. Exiting loop.") logger.warning("Dask Streamer is not running. Exiting loop.")
break break
message = self.consumer.poll(timeout=2.0) message = self.consumer.poll(timeout=2.0) # Poll for new messages after 2 sceonds
if message is None: if message is None:
# logger.info("No new messages received.") # logger.info("No new messages received.")
continue continue
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import time
import pytest import pytest
import logging import logging
import pandas as pd import pandas as pd
...@@ -21,19 +22,11 @@ from analytics.backend.service.Streamer import DaskStreamer ...@@ -21,19 +22,11 @@ from analytics.backend.service.Streamer import DaskStreamer
from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \ from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \
get_windows_size, get_batch_size, get_agg_df get_windows_size, get_batch_size, get_agg_df
from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
def test_validate_kafka_topics():
logger.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
###########################
# Tests Implementation of Telemetry Backend
###########################
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def log_all_methods(request): def log_all_methods(request):
...@@ -45,6 +38,30 @@ def log_all_methods(request): ...@@ -45,6 +38,30 @@ def log_all_methods(request):
yield yield
logger.info(f" <<< Finished test: {request.node.name} <<< ") logger.info(f" <<< Finished test: {request.node.name} <<< ")
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
def test_validate_kafka_topics():
logger.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
###########################
# integration test of Streamer with backend service
###########################
def test_backend_integration_with_analyzer():
backendServiceObject = AnalyticsBackendService()
backendServiceObject.install_servicers()
logger.info(" waiting for 2 minutes for the backend service before termination ... ")
time.sleep(120)
backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666")
logger.info(" Backend service terminated successfully ... ")
###########################
# funtionality pytest for analyzer sub methods
###########################
@pytest.fixture @pytest.fixture
def dask_streamer(): def dask_streamer():
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client') as mock_dask_client, \ with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client') as mock_dask_client, \
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment