How to Locally Run and Test Analytic Frontend Service
Pre-requisets
The following requirements should be fulfilled before the execuation of Analytics service.
-
A virtual enviornment exist with all the required packages listed in requirements.in sucessfully installed.
-
Verify the creation of required database and table. The Analytics DB test python file lists the functions to create tables and the database.
-
The Analytics backend service should be running.
-
All required Kafka topics must exist. Call
create_all_topics
from the Kafka class to create any topics that do not already exist.
from common.tools.kafka.Variables import KafkaTopic
KafkaTopic.create_all_topics()
- There will be an input stream on the Kafka topic that the Spark Streamer will consume and apply a defined thresholds.
- A JSON encoded string should be generated in the following format:
'{"time_stamp": "2024-09-03T12:36:26Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 44.22}'
-
kpi_value
should be float or int. - The Kafka producer key should be the UUID of the Analyzer used when creating it.
- Use the following Kafka topic to generate the stream:
KafkaTopic.ANALYTICS_RESPONSE.value
.
Steps to create and start Analyzer
The analyzer can be declared as below but there are many other ways to declare:
The given object creation process for _create_analyzer
involves defining an instance of the Analyzer
message from the gRPC definition and populating its fields.
from common.proto.analytics_frontend_pb2 import AnalyzerId
_create_analyzer_id = AnalyzerId()
Here is a breakdown of how each field is populated:
1. Analyzer ID
-
analyzer_id
: This field uses a unique ID to identify the analyzer. In this case, the ID is a UUID._create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
- The commented-out code shows how the UUID can be generated dynamically using Python's
uuid.uuid4()
. However, for now, a static UUID is used.
2. Algorithm Name
-
algorithm_name
: Specifies the name of the algorithm to be executed by the analyzer._create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
3. Operation Mode
-
operation_mode
: Sets the mode in which the analyzer operates, in this case, it's set toANALYZEROPERATIONMODE_STREAMING
._create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
4. Input KPI IDs
-
input_kpi_ids
: This is a list of KPI IDs that will be used as input for the analysis. KPI IDs are represented usingKpiId
, and UUIDs are assigned to each input. The Spark streamer assume that the provided KPIs exists in the KPI Descriptor database._kpi_id = KpiId() _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" _create_analyzer.input_kpi_ids.append(_kpi_id) _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.input_kpi_ids.append(_kpi_id)
5. Output KPI IDs
-
output_kpi_ids
: A list of KPI IDs that are produced as output after analysis. Each one is generated and appended to the list._kpi_id = KpiId() _create_analyzer.output_kpi_ids.append(_kpi_id)
6. Parameters
-
parameters
: This is a dictionary containing key-value pairs of various parameters used by the analyzer. These values are often algorithm-specific.-
Thresholds: A dictionary containing threshold possible values (min, max, avg, first, last, stdev)_<any_name>. For example: "min_latency", "max_bandwidth", "avg_datarate" etc.
_threshold_dict = { 'min_latency' : (00, 10), 'max_bandwidth': (40, 50), 'avg_datarate': (00, 10) } _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
-
Window Size: Specifies the size of the time window (e.g.,
60 seconds
)._create_analyzer.parameters['window_size'] = "60 seconds"
-
Window Slider: Defines the sliding window interval (e.g.,
30 seconds
)._create_analyzer.parameters['window_slider'] = "30 seconds"
-
Thresholds: A dictionary containing threshold possible values (min, max, avg, first, last, stdev)_<any_name>. For example: "min_latency", "max_bandwidth", "avg_datarate" etc.
StartAnalyzer
with an Analyzer Frontend Object
Calling - The following code demonstrates how to call
StartAnalyzer()
with an Analyzer object:
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
analytics_client_object = AnalyticsFrontendClient()
analytics_client_object.StartAnalyzer(_create_analyzer_id)
How to Receive Analyzer Responses
- There is a non-gRPC method in the analyzer frontend called
StartResponseListener(<analyzer_uuid>)
. Theanalyzer_uuid
is the UUID of the analyzer provided when callingStartAnalyzer()
. The following code will log the responses:
from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
analytic_frontend_service_object = AnalyticsFrontendServiceServicerImpl()
for response in analytic_frontend_service_object.StartResponseListener(<analyzer_uuid>):
LOGGER.debug(response)
Understanding the Output of the Analyzer
-
Output Column Names: The output JSON string will include two keys for each defined threshold. For example, the
min_latency
threshold will generate two keys:min_latency_THRESHOLD_FAIL
andmin_latency_THRESHOLD_RAISE
.-
min_latency_THRESHOLD_FAIL
is triggered if the average latency calculated within the defined window size is less than the specified threshold range. -
min_latency_THRESHOLD_RAISE
is triggered if the average latency calculated within the defined window size exceeds the specified threshold range.
-
- The thresholds
min_latency_THRESHOLD_FAIL
andmin_latency_THRESHOLD_RAISE
will have a value ofTRUE
if activated; otherwise, they will be set toFALSE
.