Commit fba6d749 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch...

Merge branch 'feat/194-unable-to-correctly-extract-the-aggregation-function-names-from-the-dictionary-received-as' into 'develop'

Resolve: "Unable to correctly extract the aggregation function names from the dictionary received as parameters in the Analyzer message"

See merge request !266
parents af298c82 447bb2e2
Loading
Loading
Loading
Loading
+10 −2
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ import "kpi_manager.proto";
service KpiValueAPIService {
	rpc StoreKpiValues  (KpiValueList)      returns (context.Empty)    {}
	rpc SelectKpiValues (KpiValueFilter)    returns (KpiValueList)     {}
  rpc GetKpiAlarms    (kpi_manager.KpiId) returns (stream KpiAlarms) {}
}

message KpiValue {
@@ -50,3 +51,10 @@ message KpiValueFilter {
	repeated context.Timestamp start_timestamp = 2;
	repeated context.Timestamp end_timestamp   = 3;
}

message KpiAlarms {
  context.Timestamp start_timestamp = 1;
  context.Timestamp end_timestamp   = 2;
  kpi_manager.KpiId kpi_id          = 3;
  map<string, bool> alarms          = 4;
}
+116 −2
Original line number Diff line number Diff line
# How to locally run and test Analytic service (To be added soon)
# How to Locally Run and Test Analytic Frontend Service

### Pre-requisets 
The following requirements should be fulfilled before the execuation of Telemetry service.
The following requirements should be fulfilled before the execuation of Analytics service.

1. A virtual enviornment exist with all the required packages listed in [requirements.in](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/analytics/frontend/requirements.in) sucessfully  installed.
2. Verify the creation of required database and table. The 
[Analytics DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/analytics/tests/test_analytics_db.py) python file lists the functions to create tables and the database.
3. The Analytics backend service should be running.

4. All required Kafka topics must exist. Call `create_all_topics` from the [Kafka class](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/common/tools/kafka/Variables.py) to create any topics that do not already exist.
```
from common.tools.kafka.Variables import KafkaTopic
KafkaTopic.create_all_topics()
```

5. There will be an input stream on the Kafka topic that the Spark Streamer will consume and apply a defined thresholds.
- A JSON encoded string should be generated in the following format:
```
'{"time_stamp": "2024-09-03T12:36:26Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 44.22}'
```
- `kpi_value` should be float or int.
- The Kafka producer key should be the UUID of the Analyzer used when creating it.
- Use the following Kafka topic to generate the stream: `KafkaTopic.ANALYTICS_RESPONSE.value`.

## Steps to create and start Analyzer
The analyzer can be declared as below but there are many other ways to declare:

The given object creation process for `_create_analyzer` involves defining an instance of the `Analyzer` message from the [gRPC definition](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/194-unable-to-correctly-extract-the-aggregation-function-names-from-the-dictionary-received-as/proto/analytics_frontend.proto) and populating its fields.

```
from common.proto.analytics_frontend_pb2 import AnalyzerId
_create_analyzer_id = AnalyzerId()
```

Here is a breakdown of how each field is populated:

### 1. **Analyzer ID**
   - `analyzer_id`: This field uses a unique ID to identify the analyzer. In this case, the ID is a UUID.
     ```python
     _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
     ```
   - The commented-out code shows how the UUID can be generated dynamically using Python's `uuid.uuid4()`. However, for now, a static UUID is used.

### 2. **Algorithm Name**
   - `algorithm_name`: Specifies the name of the algorithm to be executed by the analyzer.
     ```python
     _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
     ```

### 3. **Operation Mode**
   - `operation_mode`: Sets the mode in which the analyzer operates, in this case, it's set to `ANALYZEROPERATIONMODE_STREAMING`.
     ```python
     _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
     ```

### 4. **Input KPI IDs**
   - `input_kpi_ids`: This is a list of KPI IDs that will be used as input for the analysis. KPI IDs are represented using `KpiId`, and UUIDs are assigned to each input. The Spark streamer assume that the provided KPIs exists in the KPI Descriptor database.
     ```python
     _kpi_id = KpiId()
     _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
     _create_analyzer.input_kpi_ids.append(_kpi_id)
     
     _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
     _create_analyzer.input_kpi_ids.append(_kpi_id)
     ```

### 5. **Output KPI IDs**
   - `output_kpi_ids`: A list of KPI IDs that are produced as output after analysis. Each one is generated and appended to the list.
     ```python
     _kpi_id = KpiId()
     _create_analyzer.output_kpi_ids.append(_kpi_id)
     ```

### 6. **Parameters**
   - `parameters`: This is a dictionary containing key-value pairs of various parameters used by the analyzer. These values are often algorithm-specific.
     - **Thresholds**: A dictionary containing threshold possible values (min, max, avg, first, last, stdev)_<any_name>. For example: "min_latency", "max_bandwidth", "avg_datarate" etc. 
       ```python
       _threshold_dict = {
           'min_latency' : (00, 10), 
           'max_bandwidth': (40, 50), 
           'avg_datarate': (00, 10)
       }
       _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
       ```
     - **Window Size**: Specifies the size of the time window (e.g., `60 seconds`).
       ```python
       _create_analyzer.parameters['window_size'] = "60 seconds"
       ```
     - **Window Slider**: Defines the sliding window interval (e.g., `30 seconds`).
       ```python
       _create_analyzer.parameters['window_slider'] = "30 seconds"
       ```

###  **Calling `StartAnalyzer` with an Analyzer Frontend Object**
   - The following code demonstrates how to call `StartAnalyzer()` with an Analyzer object:
```python
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient

analytics_client_object = AnalyticsFrontendClient()
analytics_client_object.StartAnalyzer(_create_analyzer_id)
```

### **How to Receive Analyzer Responses**
   - There is a non-gRPC method in the analyzer frontend called `StartResponseListener(<analyzer_uuid>)`. The `analyzer_uuid` is the UUID of the analyzer provided when calling `StartAnalyzer()`. The following code will log the responses:
```python
from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl

analytic_frontend_service_object = AnalyticsFrontendServiceServicerImpl()
for response in analytic_frontend_service_object.StartResponseListener(<analyzer_uuid>):
    LOGGER.debug(response)
```

### **Understanding the Output of the Analyzer**
- **Output Column Names**: The output JSON string will include two keys for each defined threshold. For example, the `min_latency` threshold will generate two keys: `min_latency_THRESHOLD_FAIL` and `min_latency_THRESHOLD_RAISE`.
  - `min_latency_THRESHOLD_FAIL` is triggered if the average latency calculated within the defined window size is less than the specified threshold range.
  - `min_latency_THRESHOLD_RAISE` is triggered if the average latency calculated within the defined window size exceeds the specified threshold range.
- The thresholds `min_latency_THRESHOLD_FAIL` and `min_latency_THRESHOLD_RAISE` will have a value of `TRUE` if activated; otherwise, they will be set to `FALSE`.
+9 −0
Original line number Diff line number Diff line
@@ -53,6 +53,15 @@ RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;

# Install Java (required for PySpark)
RUN apt-get update && \
    apt-get install -y default-jdk && \
    apt-get clean

# Set JAVA_HOME environment variable
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ENV PATH=$JAVA_HOME/bin:$PATH

# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/analytics/backend
WORKDIR /var/teraflow/analytics/backend
+7 −10
Original line number Diff line number Diff line
@@ -42,7 +42,7 @@ class AnalyticsBackendService(GenericGrpcService):

    def StartSparkStreamer(self, analyzer_uuid, analyzer):
        kpi_list      = analyzer['input_kpis'] 
        oper_list     = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())]  # TODO: update this line...
        oper_list     = [s.split('_', 1)[0] for s in list(analyzer["thresholds"].keys())]  # TODO: update this line...
        thresholds    = analyzer['thresholds']
        window_size   = analyzer['window_size']
        window_slider = analyzer['window_slider']
@@ -78,20 +78,17 @@ class AnalyticsBackendService(GenericGrpcService):
            LOGGER.error("Failed to terminate analytics backend {:}".format(e))
            return False

    def install_services(self):
        stop_event = threading.Event()
        thread = threading.Thread(target=self.RequestListener,
                                  args=(stop_event,) )
        thread.start()
        return (thread, stop_event)
    def install_servicers(self):
        threading.Thread(target=self.RequestListener, args=()).start()

    def RequestListener(self, stop_event):
    def RequestListener(self):
        """
        listener for requests on Kafka topic.
        """
        LOGGER.info("Request Listener is initiated ...")
        consumer = self.kafka_consumer
        consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
        while not stop_event.is_set():
        while True:
            receive_msg = consumer.poll(2.0)
            if receive_msg is None:
                continue
+4 −4
Original line number Diff line number Diff line
@@ -33,7 +33,7 @@ def SettingKafkaConsumerParams(): # TODO: create get_kafka_consumer() in comm
    return {
            # "kafka.bootstrap.servers": '127.0.0.1:9092',
            "kafka.bootstrap.servers": KafkaConfig.get_kafka_address(),
            "subscribe"              : KafkaTopic.VALUE.value,
            "subscribe"              : KafkaTopic.VALUE.value,         # topic should have atleast one message before spark session 
            "startingOffsets"        : 'latest',
            "failOnDataLoss"         : 'false'              # Optional: Set to "true" to fail the query on data loss
        }
@@ -64,7 +64,7 @@ def ApplyThresholds(aggregated_df, thresholds):
    for col_name, (fail_th, raise_th) in thresholds.items():
        # Apply TH-Fail condition (if column value is less than the fail threshold)
        aggregated_df = aggregated_df.withColumn(
            f"{col_name}_THRESHOLD_FAIL", 
            f"{col_name}_THRESHOLD_FALL", 
            when(col(col_name) < fail_th, True).otherwise(False)
        )
        # Apply TH-RAISE condition (if column value is greater than the raise threshold)
@@ -128,11 +128,11 @@ def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event,

        # --- This will write output to Kafka: ACTUAL IMPLEMENTATION
        query = thresholded_stream_data \
            .selectExpr(f"'{key}' AS key", "to_json(struct(*)) AS value") \
            .selectExpr(f"CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \
            .writeStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \
            .option("topic",                   KafkaTopic.ANALYTICS_RESPONSE.value) \
            .option("topic",                   KafkaTopic.ALARMS.value) \
            .option("checkpointLocation",      "analytics/.spark/checkpoint") \
            .outputMode("update")

Loading