Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 971 additions and 99 deletions
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
analytics/backend/tests/test_backend.py
......@@ -18,7 +18,8 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
analytics/frontend/tests/test_frontend.py
......@@ -19,8 +19,9 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
KFK_SERVER_ADDRESS=${KAFKA_IP}:9092
# KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
# export KFK_SERVER_ADDRESS=${KAFKA_IP}:9092
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
# helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG -o log_cli=true --verbose \
kpi_value_api/tests/test_kpi_value_api.py
......@@ -18,6 +18,7 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
kpi_value_writer/tests/test_kpi_value_writer.py
......@@ -21,7 +21,7 @@ cd $PROJECTDIR/src
# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
# kpi_manager/tests/test_unitary.py
CRDB_SQL_ADDRESS=$(kubectl get service --namespace ${CRDB_NAMESPACE} cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=debug --verbose \
telemetry/tests/test_telemetryDB.py
......@@ -19,7 +19,12 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc
# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
# kpi_manager/tests/test_unitary.py
# python3 kpi_manager/tests/test_unitary.py
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc
......
......@@ -18,9 +18,10 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
CRDB_SQL_ADDRESS=$(kubectl get service --namespace ${CRDB_NAMESPACE} cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
# python3 kpi_manager/tests/test_unitary.py
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
telemetry/frontend/tests/test_frontend.py
......@@ -24,4 +24,4 @@ export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
# Automated steps start here
########################################################################################################################
kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/ztpservice
kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/automationservice
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/qos_profileservice -c server
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/telemetryservice -c backend
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/telemetryservice -c frontend
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Build, tag, and push the Docker image to the GitLab Docker registry
build analytics:
variables:
IMAGE_NAME: 'analytics' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
# This first build tags the builder resulting image to prevent being removed by dangling image removal command
# - docker buildx build -t "${IMAGE_NAME}-backend:${IMAGE_TAG}-builder" --target builder -f ./src/$IMAGE_NAME/backend/Dockerfile .
- docker buildx build -t "${IMAGE_NAME}-frontend:$IMAGE_TAG" -f ./src/$IMAGE_NAME/frontend/Dockerfile .
- docker buildx build -t "${IMAGE_NAME}-backend:$IMAGE_TAG" -f ./src/$IMAGE_NAME/backend/Dockerfile .
- docker tag "${IMAGE_NAME}-frontend:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG"
- docker tag "${IMAGE_NAME}-backend:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG"
after_script:
- docker images --filter="dangling=true" --quiet | xargs -r docker rmi
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/.gitlab-ci.yml
- src/$IMAGE_NAME/frontend/**/*.{py,in,yml}
- src/$IMAGE_NAME/frontend/Dockerfile
- src/$IMAGE_NAME/frontend/tests/*.py
- src/$IMAGE_NAME/backend/Dockerfile
- src/$IMAGE_NAME/backend/**/*.{py,in,yml}
- src/$IMAGE_NAME/backend/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
# Apply unit test to the component
unit_test analytics-backend:
variables:
IMAGE_NAME: 'analytics' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build analytics
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep kafka; then docker rm -f kafka; else echo "Kafka container is not in the system"; fi
- if docker container ls | grep zookeeper; then docker rm -f zookeeper; else echo "Zookeeper container is not in the system"; fi
# - if docker container ls | grep ${IMAGE_NAME}-frontend; then docker rm -f ${IMAGE_NAME}-frontend; else echo "${IMAGE_NAME}-frontend container is not in the system"; fi
- if docker container ls | grep ${IMAGE_NAME}-backend; then docker rm -f ${IMAGE_NAME}-backend; else echo "${IMAGE_NAME}-backend container is not in the system"; fi
- docker container prune -f
script:
- docker pull "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG"
- docker pull "bitnami/zookeeper:latest"
- docker pull "bitnami/kafka:latest"
- >
docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181
--env ALLOW_ANONYMOUS_LOGIN=yes
bitnami/zookeeper:latest
- sleep 10 # Wait for Zookeeper to start
- >
docker run --name kafka -d --network=teraflowbridge -p 9092:9092
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env ALLOW_PLAINTEXT_LISTENER=yes
bitnami/kafka:latest
- sleep 20 # Wait for Kafka to start
- KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $KAFKA_IP
- >
docker run --name $IMAGE_NAME-backend -d -p 30060:30060
--env "KFK_SERVER_ADDRESS=${KAFKA_IP}:9092"
--volume "$PWD/src/$IMAGE_NAME/backend/tests:/opt/results"
--network=teraflowbridge
$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG
- docker ps -a
- sleep 5
- docker logs ${IMAGE_NAME}-backend
- >
docker exec -i ${IMAGE_NAME}-backend bash -c
"coverage run -m pytest --log-level=INFO --verbose --junitxml=/opt/results/${IMAGE_NAME}-backend_report.xml $IMAGE_NAME/backend/tests/test_*.py"
- docker exec -i ${IMAGE_NAME}-backend bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker rm -f ${IMAGE_NAME}-backend
- docker rm -f kafka
- docker rm -f zookeeper
- docker network rm teraflowbridge
- docker volume prune --force
- docker image prune --force
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/backend/**/*.{py,in,yml}
- src/$IMAGE_NAME/backend/Dockerfile
- src/$IMAGE_NAME/backend/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/backend/tests/${IMAGE_NAME}-backend_report.xml
# Apply unit test to the component
unit_test analytics-frontend:
variables:
IMAGE_NAME: 'analytics' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build analytics
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep crdb; then docker rm -f crdb; else echo "CockroachDB container is not in the system"; fi
- if docker volume ls | grep crdb; then docker volume rm -f crdb; else echo "CockroachDB volume is not in the system"; fi
- if docker container ls | grep kafka; then docker rm -f kafka; else echo "Kafka container is not in the system"; fi
- if docker container ls | grep zookeeper; then docker rm -f zookeeper; else echo "Zookeeper container is not in the system"; fi
- if docker container ls | grep ${IMAGE_NAME}-frontend; then docker rm -f ${IMAGE_NAME}-frontend; else echo "${IMAGE_NAME}-frontend container is not in the system"; fi
- docker container prune -f
script:
- docker pull "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG"
- docker pull "bitnami/zookeeper:latest"
- docker pull "bitnami/kafka:latest"
- docker pull "cockroachdb/cockroach:latest-v22.2"
- docker volume create crdb
- >
docker run --name crdb -d --network=teraflowbridge -p 26257:26257 -p 8080:8080
--env COCKROACH_DATABASE=tfs_test --env COCKROACH_USER=tfs --env COCKROACH_PASSWORD=tfs123
--volume "crdb:/cockroach/cockroach-data"
cockroachdb/cockroach:latest-v22.2 start-single-node
- echo "Waiting for initialization..."
- while ! docker logs crdb 2>&1 | grep -q 'finished creating default user \"tfs\"'; do sleep 1; done
# - docker logs crdb
# - docker ps -a
- CRDB_ADDRESS=$(docker inspect crdb --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $CRDB_ADDRESS
- >
docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181
--env ALLOW_ANONYMOUS_LOGIN=yes
bitnami/zookeeper:latest
- sleep 10 # Wait for Zookeeper to start
- >
docker run --name kafka -d --network=teraflowbridge -p 9092:9092
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env ALLOW_PLAINTEXT_LISTENER=yes
bitnami/kafka:latest
- sleep 20 # Wait for Kafka to start
- KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $KAFKA_IP
- docker logs zookeeper
- docker logs kafka
- >
docker run --name $IMAGE_NAME-frontend -d -p 30050:30050
--env "CRDB_URI=cockroachdb://tfs:tfs123@${CRDB_ADDRESS}:26257/tfs_test?sslmode=require"
--env "KFK_SERVER_ADDRESS=${KAFKA_IP}:9092"
--volume "$PWD/src/$IMAGE_NAME/frontend/tests:/opt/results"
--network=teraflowbridge
$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG
- docker ps -a
- sleep 5
- docker logs ${IMAGE_NAME}-frontend
- >
docker exec -i ${IMAGE_NAME}-frontend bash -c
"coverage run -m pytest --log-level=INFO --verbose --junitxml=/opt/results/${IMAGE_NAME}-frontend_report.xml $IMAGE_NAME/frontend/tests/test_*.py"
- docker exec -i ${IMAGE_NAME}-frontend bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker rm -f ${IMAGE_NAME}-frontend
- docker rm -f zookeeper
- docker rm -f kafka
- docker volume rm -f crdb
- docker volume prune --force
- docker image prune --force
- docker network rm teraflowbridge
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/frontend/**/*.{py,in,yml}
- src/$IMAGE_NAME/frontend/Dockerfile
- src/$IMAGE_NAME/frontend/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/frontend/tests/${IMAGE_NAME}-frontend_report.xml
# How to locally run and test Analytic service (To be added soon)
# How to Locally Run and Test Analytic Frontend Service
### Pre-requisets
The following requirements should be fulfilled before the execuation of Telemetry service.
The following requirements should be fulfilled before the execuation of Analytics service.
1. A virtual enviornment exist with all the required packages listed in [requirements.in](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/analytics/frontend/requirements.in) sucessfully installed.
2. Verify the creation of required database and table. The
[Analytics DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/analytics/tests/test_analytics_db.py) python file lists the functions to create tables and the database.
3. The Analytics backend service should be running.
4. All required Kafka topics must exist. Call `create_all_topics` from the [Kafka class](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/common/tools/kafka/Variables.py) to create any topics that do not already exist.
```
from common.tools.kafka.Variables import KafkaTopic
KafkaTopic.create_all_topics()
```
5. There will be an input stream on the Kafka topic that the Spark Streamer will consume and apply a defined thresholds.
- A JSON encoded string should be generated in the following format:
```
'{"time_stamp": "2024-09-03T12:36:26Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 44.22}'
```
- `kpi_value` should be float or int.
- The Kafka producer key should be the UUID of the Analyzer used when creating it.
- Use the following Kafka topic to generate the stream: `KafkaTopic.ANALYTICS_RESPONSE.value`.
## Steps to create and start Analyzer
The analyzer can be declared as below but there are many other ways to declare:
The given object creation process for `_create_analyzer` involves defining an instance of the `Analyzer` message from the [gRPC definition](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/194-unable-to-correctly-extract-the-aggregation-function-names-from-the-dictionary-received-as/proto/analytics_frontend.proto) and populating its fields.
```
from common.proto.analytics_frontend_pb2 import AnalyzerId
_create_analyzer_id = AnalyzerId()
```
Here is a breakdown of how each field is populated:
### 1. **Analyzer ID**
- `analyzer_id`: This field uses a unique ID to identify the analyzer. In this case, the ID is a UUID.
```python
_create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
```
- The commented-out code shows how the UUID can be generated dynamically using Python's `uuid.uuid4()`. However, for now, a static UUID is used.
### 2. **Algorithm Name**
- `algorithm_name`: Specifies the name of the algorithm to be executed by the analyzer.
```python
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
```
### 3. **Operation Mode**
- `operation_mode`: Sets the mode in which the analyzer operates, in this case, it's set to `ANALYZEROPERATIONMODE_STREAMING`.
```python
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
```
### 4. **Input KPI IDs**
- `input_kpi_ids`: This is a list of KPI IDs that will be used as input for the analysis. KPI IDs are represented using `KpiId`, and UUIDs are assigned to each input. The Spark streamer assume that the provided KPIs exists in the KPI Descriptor database.
```python
_kpi_id = KpiId()
_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.input_kpi_ids.append(_kpi_id)
```
### 5. **Output KPI IDs**
- `output_kpi_ids`: A list of KPI IDs that are produced as output after analysis. Each one is generated and appended to the list.
```python
_kpi_id = KpiId()
_create_analyzer.output_kpi_ids.append(_kpi_id)
```
### 6. **Parameters**
- `parameters`: This is a dictionary containing key-value pairs of various parameters used by the analyzer. These values are often algorithm-specific.
- **Thresholds**: A dictionary containing threshold possible values (min, max, avg, first, last, stdev)_<any_name>. For example: "min_latency", "max_bandwidth", "avg_datarate" etc.
```python
_threshold_dict = {
'min_latency' : (00, 10),
'max_bandwidth': (40, 50),
'avg_datarate': (00, 10)
}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
```
- **Window Size**: Specifies the size of the time window (e.g., `60 seconds`).
```python
_create_analyzer.parameters['window_size'] = "60 seconds"
```
- **Window Slider**: Defines the sliding window interval (e.g., `30 seconds`).
```python
_create_analyzer.parameters['window_slider'] = "30 seconds"
```
### **Calling `StartAnalyzer` with an Analyzer Frontend Object**
- The following code demonstrates how to call `StartAnalyzer()` with an Analyzer object:
```python
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
analytics_client_object = AnalyticsFrontendClient()
analytics_client_object.StartAnalyzer(_create_analyzer_id)
```
### **How to Receive Analyzer Responses**
- There is a non-gRPC method in the analyzer frontend called `StartResponseListener(<analyzer_uuid>)`. The `analyzer_uuid` is the UUID of the analyzer provided when calling `StartAnalyzer()`. The following code will log the responses:
```python
from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
analytic_frontend_service_object = AnalyticsFrontendServiceServicerImpl()
for response in analytic_frontend_service_object.StartResponseListener(<analyzer_uuid>):
LOGGER.debug(response)
```
### **Understanding the Output of the Analyzer**
- **Output Column Names**: The output JSON string will include two keys for each defined threshold. For example, the `min_latency` threshold will generate two keys: `min_latency_THRESHOLD_FAIL` and `min_latency_THRESHOLD_RAISE`.
- `min_latency_THRESHOLD_FAIL` is triggered if the average latency calculated within the defined window size is less than the specified threshold range.
- `min_latency_THRESHOLD_RAISE` is triggered if the average latency calculated within the defined window size exceeds the specified threshold range.
- The thresholds `min_latency_THRESHOLD_FAIL` and `min_latency_THRESHOLD_RAISE` will have a value of `TRUE` if activated; otherwise, they will be set to `FALSE`.
......@@ -12,5 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
pyspark==3.5.2
dask==2024.1.0
distributed==2024.1.0
pandas==2.2.3
confluent-kafka==2.3.*
......@@ -12,18 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import json
import logging
import threading
from common.tools.service.GenericGrpcService import GenericGrpcService
from analytics.backend.service.SparkStreaming import SparkStreamer
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from threading import Thread, Event
from .DaskStreaming import DaskStreamer
LOGGER = logging.getLogger(__name__)
......@@ -40,58 +40,18 @@ class AnalyticsBackendService(GenericGrpcService):
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
def StartSparkStreamer(self, analyzer_uuid, analyzer):
kpi_list = analyzer['input_kpis']
oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())] # TODO: update this line...
thresholds = analyzer['thresholds']
window_size = analyzer['window_size']
window_slider = analyzer['window_slider']
print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
kpi_list, oper_list, thresholds, window_size, window_slider))
LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
kpi_list, oper_list, thresholds, window_size, window_slider))
try:
stop_event = threading.Event()
thread = threading.Thread(target=SparkStreamer,
args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event,
window_size, window_slider, None ))
self.running_threads[analyzer_uuid] = (thread, stop_event)
thread.start()
print ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
return True
except Exception as e:
print ("Failed to initiate Analyzer backend: {:}".format(e))
LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
return False
def StopRequestListener(self, threadInfo: tuple):
try:
thread, stop_event = threadInfo
stop_event.set()
thread.join()
print ("Terminating Analytics backend RequestListener")
LOGGER.info("Terminating Analytics backend RequestListener")
return True
except Exception as e:
print ("Failed to terminate analytics backend {:}".format(e))
LOGGER.error("Failed to terminate analytics backend {:}".format(e))
return False
def install_servicers(self):
threading.Thread(target=self.RequestListener, args=()).start()
def install_services(self):
stop_event = threading.Event()
thread = threading.Thread(target=self.RequestListener,
args=(stop_event,) )
thread.start()
return (thread, stop_event)
def RequestListener(self, stop_event):
def RequestListener(self):
"""
listener for requests on Kafka topic.
"""
LOGGER.info("Request Listener is initiated ...")
# print ("Request Listener is initiated ...")
consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
while not stop_event.is_set():
while True:
receive_msg = consumer.poll(2.0)
if receive_msg is None:
continue
......@@ -99,34 +59,66 @@ class AnalyticsBackendService(GenericGrpcService):
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
# print ("Consumer error: {:}".format(receive_msg.error()))
break
analyzer = json.loads(receive_msg.value().decode('utf-8'))
analyzer_uuid = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
try:
analyzer = json.loads(receive_msg.value().decode('utf-8'))
analyzer_uuid = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
# print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
self.TerminateAnalyzerBackend(analyzer_uuid)
else:
self.StartSparkStreamer(analyzer_uuid, analyzer)
LOGGER.debug("Stop Event activated. Terminating...")
print ("Stop Event activated. Terminating...")
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
self.StopDaskListener(analyzer_uuid)
else:
self.StartDaskListener(analyzer_uuid, analyzer)
except Exception as e:
LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
# print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
def StartDaskListener(self, analyzer_uuid, analyzer):
kpi_list = analyzer[ 'input_kpis' ]
thresholds = analyzer[ 'thresholds' ]
window_size = analyzer[ 'window_size' ]
window_slider = analyzer[ 'window_slider']
LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format(
kpi_list, thresholds, window_size, window_slider))
# print ("Received parameters: {:} - {:} - {:} - {:}".format(
# kpi_list, thresholds, window_size, window_slider))
try:
stop_event = Event()
thread = Thread(
target=DaskStreamer,
# args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event),
args=(analyzer['output_kpis'][0] , kpi_list, thresholds, stop_event),
kwargs={
"window_size" : window_size,
}
)
thread.start()
self.running_threads[analyzer_uuid] = (thread, stop_event)
# print ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
return True
except Exception as e:
# print ("Failed to initiate Analyzer backend: {:}".format(e))
LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
return False
def TerminateAnalyzerBackend(self, analyzer_uuid):
def StopDaskListener(self, analyzer_uuid):
if analyzer_uuid in self.running_threads:
try:
thread, stop_event = self.running_threads[analyzer_uuid]
stop_event.set()
thread.join()
del self.running_threads[analyzer_uuid]
print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
# print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
return True
except Exception as e:
LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e))
return False
else:
print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
# generate confirmation towards frontend
# print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import json
from confluent_kafka import Consumer, Producer, KafkaException, KafkaError
import pandas as pd
from dask.distributed import Client, LocalCluster
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
def SettingKafkaConsumerParams():
return {'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-backend',
'auto.offset.reset' : 'latest'}
def GetAggregationMappings(thresholds):
agg_dict = {}
for threshold_key in thresholds.keys():
parts = threshold_key.split('_', 1)
if len(parts) != 2:
LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '<aggregation>_<metricName>' format. Skipping.")
continue
aggregation, metric_name = parts
# Ensure that the aggregation function is valid in pandas
if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']:
LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.")
continue
agg_dict[threshold_key] = ('kpi_value', aggregation)
return agg_dict
def ApplyThresholds(aggregated_df, thresholds):
"""
Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
on the aggregated DataFrame.
Args: aggregated_df (pd.DataFrame): DataFrame with aggregated metrics.
thresholds (dict): Thresholds dictionary with keys in the format '<aggregation>_<metricName>'.
Returns: pd.DataFrame: DataFrame with additional threshold columns.
"""
for threshold_key, threshold_values in thresholds.items():
if threshold_key not in aggregated_df.columns:
LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.")
continue
if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2:
fail_th, raise_th = threshold_values
aggregated_df["THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th
aggregated_df["THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th
aggregated_df["value"] = aggregated_df[threshold_key]
else:
LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.")
return aggregated_df
def initialize_dask_client():
"""
Initialize a local Dask cluster and client.
"""
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
client = Client(cluster)
LOGGER.info(f"Dask Client Initialized: {client}")
return client, cluster
def initialize_kafka_producer():
return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()})
def delivery_report(err, msg):
if err is not None:
LOGGER.error(f"Message delivery failed: {err}")
else:
LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
def process_batch(batch, agg_mappings, thresholds, key):
"""
Process a batch of data and apply thresholds.
Args: batch (list of dict): List of messages from Kafka.
agg_mappings (dict): Mapping from threshold key to aggregation function.
thresholds (dict): Thresholds dictionary.
Returns: list of dict: Processed records ready to be sent to Kafka.
"""
if not batch:
LOGGER.info("Empty batch received. Skipping processing.")
return []
df = pd.DataFrame(batch)
LOGGER.info(f"df {df} ")
df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
df.dropna(subset=['time_stamp'], inplace=True)
LOGGER.info(f"df {df} ")
required_columns = {'time_stamp', 'kpi_id', 'kpi_value'}
if not required_columns.issubset(df.columns):
LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.")
return []
if df.empty:
LOGGER.info("No data after filtering by KPI IDs. Skipping processing.")
return []
# Perform aggregations using named aggregation
try:
agg_dict = {key: value for key, value in agg_mappings.items()}
df_agg_ = df.groupby(['window_start']).agg(**agg_dict).reset_index()
#example: agg_dict = {'min_latency_E2E': ('kpi_value', 'min')
#given that threshold has 1 value
second_value_tuple = next(iter(agg_dict.values()))[1]
#in case we have multiple thresholds!
#second_values_tuples = [value[1] for value in agg_dict.values()]
if second_value_tuple=="min":
df_agg = df_agg_.min(numeric_only=True).to_frame().T
elif second_value_tuple == "max":
df_agg = df_agg_.max(numeric_only=True).to_frame().T
elif second_value_tuple == "std":
df_agg = df_agg_.sted(numeric_only=True).to_frame().T
else:
df_agg = df_agg_.mean(numeric_only=True).to_frame().T
# Assign the first value of window_start from the original aggregated data
df_agg['window_start'] = df_agg_['window_start'].iloc[0]
# Reorder columns to place 'window_start' first if needed
cols = ['window_start'] + [col for col in df_agg.columns if col != 'window_start']
df_agg = df_agg[cols]
except Exception as e:
LOGGER.error(f"Aggregation error: {e}")
return []
# Apply thresholds
df_thresholded = ApplyThresholds(df_agg, thresholds)
df_thresholded['kpi_id'] = key
df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
# Convert aggregated DataFrame to list of dicts
result = df_thresholded.to_dict(orient='records')
LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.")
return result
def produce_result(result, producer, destination_topic):
for record in result:
try:
producer.produce(
destination_topic,
key=str(record.get('kpi_id', '')),
value=json.dumps(record),
callback=delivery_report
)
except KafkaException as e:
LOGGER.error(f"Failed to produce message: {e}")
producer.flush()
LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
def DaskStreamer(key, kpi_list, thresholds, stop_event,
window_size="30s", time_stamp_col="time_stamp"):
client, cluster = initialize_dask_client()
consumer_conf = SettingKafkaConsumerParams()
consumer = Consumer(consumer_conf)
consumer.subscribe([KafkaTopic.VALUE.value])
producer = initialize_kafka_producer()
# Parse window_size to seconds
try:
window_size_td = pd.to_timedelta(window_size)
window_size_seconds = window_size_td.total_seconds()
except Exception as e:
LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}")
window_size_seconds = 30
LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.")
# Extract aggregation mappings from thresholds
agg_mappings = GetAggregationMappings(thresholds)
if not agg_mappings:
LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.")
consumer.close()
producer.flush()
client.close()
cluster.close()
return
try:
batch = []
last_batch_time = time.time()
LOGGER.info("Starting to consume messages...")
while not stop_event.is_set():
msg = consumer.poll(1.0)
if msg is None:
current_time = time.time()
if (current_time - last_batch_time) >= window_size_seconds and batch:
LOGGER.info("Time-based batch threshold reached. Processing batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
batch = []
last_batch_time = current_time
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
else:
LOGGER.error(f"Kafka error: {msg.error()}")
continue
try:
message_value = json.loads(msg.value().decode('utf-8'))
except json.JSONDecodeError as e:
LOGGER.error(f"JSON decode error: {e}")
continue
try:
message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce')
LOGGER.warning(f"message_timestamp: {message_timestamp}. Skipping message.")
if pd.isna(message_timestamp):
LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.")
continue
window_start = message_timestamp.floor(window_size)
LOGGER.warning(f"window_start: {window_start}. Skipping message.")
message_value['window_start'] = window_start
except Exception as e:
LOGGER.error(f"Error processing timestamp: {e}. Skipping message.")
continue
if message_value['kpi_id'] not in kpi_list:
LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.")
continue
batch.append(message_value)
current_time = time.time()
if (current_time - last_batch_time) >= window_size_seconds and batch:
LOGGER.info("Time-based batch threshold reached. Processing batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds, key)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
batch = []
last_batch_time = current_time
except Exception as e:
LOGGER.exception(f"Error in Dask streaming process: {e}")
finally:
# Process any remaining messages in the batch
if batch:
LOGGER.info("Processing remaining messages in the batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
consumer.close()
producer.flush()
LOGGER.info("Kafka consumer and producer closed.")
client.close()
cluster.close()
LOGGER.info("Dask client and cluster closed.")
......@@ -33,7 +33,7 @@ def SettingKafkaConsumerParams(): # TODO: create get_kafka_consumer() in comm
return {
# "kafka.bootstrap.servers": '127.0.0.1:9092',
"kafka.bootstrap.servers": KafkaConfig.get_kafka_address(),
"subscribe" : KafkaTopic.VALUE.value,
"subscribe" : KafkaTopic.VALUE.value, # topic should have atleast one message before spark session
"startingOffsets" : 'latest',
"failOnDataLoss" : 'false' # Optional: Set to "true" to fail the query on data loss
}
......@@ -64,7 +64,7 @@ def ApplyThresholds(aggregated_df, thresholds):
for col_name, (fail_th, raise_th) in thresholds.items():
# Apply TH-Fail condition (if column value is less than the fail threshold)
aggregated_df = aggregated_df.withColumn(
f"{col_name}_THRESHOLD_FAIL",
f"{col_name}_THRESHOLD_FALL",
when(col(col_name) < fail_th, True).otherwise(False)
)
# Apply TH-RAISE condition (if column value is greater than the raise threshold)
......@@ -128,11 +128,11 @@ def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event,
# --- This will write output to Kafka: ACTUAL IMPLEMENTATION
query = thresholded_stream_data \
.selectExpr(f"'{key}' AS key", "to_json(struct(*)) AS value") \
.selectExpr(f"CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \
.option("topic", KafkaTopic.ANALYTICS_RESPONSE.value) \
.option("topic", KafkaTopic.ALARMS.value) \
.option("checkpointLocation", "analytics/.spark/checkpoint") \
.outputMode("update")
......
......@@ -12,6 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import json
from common.proto.kpi_manager_pb2 import KpiId
from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode,
Analyzer, AnalyzerId )
def get_kpi_id_list():
return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"]
......@@ -32,3 +37,78 @@ def get_threshold_dict():
return {
op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict
}
def create_analyzer_id():
_create_analyzer_id = AnalyzerId()
# _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
# _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
_create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
return _create_analyzer_id
def create_analyzer():
_create_analyzer = Analyzer()
# _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer.analyzer_id.analyzer_id.uuid = "20540c4f-6797-45e5-af70-6491b49283f9"
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
_kpi_id = KpiId()
# input IDs to analyze
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "5716c369-932b-4a02-b4c7-6a2e808b92d7"
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "8f70d908-cc48-48de-8664-dc9be2de0089"
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.input_kpi_ids.append(_kpi_id)
# output IDs after analysis
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter
_threshold_dict = {
# 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),
'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
_create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks"
_create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size
_create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet
return _create_analyzer
def create_analyzer_dask():
_create_analyzer = Analyzer()
_create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
# _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
_kpi_id = KpiId()
# input IDs to analyze
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
_create_analyzer.input_kpi_ids.append(_kpi_id)
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.input_kpi_ids.append(_kpi_id)
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.input_kpi_ids.append(_kpi_id)
# output IDs after analysis
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter
_threshold_dict = {
'mean_latency' :(20, 30), 'min_latency' :(00, 10), 'max_latency' :(45, 50),#}
'first_value' :(00, 50), 'last_value' :(50, 100), 'std_value' :(0, 90)}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
_create_analyzer.parameters['oper_list'] = json.dumps([key.split('_')[0] for key in _threshold_dict.keys()])
_create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks"
_create_analyzer.parameters['window_slider'] = "5s" # should be less than window size
_create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet
return _create_analyzer
......@@ -12,12 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import time, json
from typing import Dict
import logging
import threading
from threading import Event, Thread
from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
from .messages import create_analyzer, create_analyzer_dask
from threading import Thread, Event
from ..service.DaskStreaming import DaskStreamer
LOGGER = logging.getLogger(__name__)
......@@ -32,26 +36,117 @@ def test_validate_kafka_topics():
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
# --- To test Dask Streamer functionality ---
# def test_StartDaskStreamer(): # Directly from the Streamer class
# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
# stop_event = Event()
# kpi_list = ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3']
# oper_list = ['avg', 'min', 'max',]
# thresholds = {
# 'avg_value': (10.0, 90.0),
# 'min_value': (5.0, 95.0),
# 'max_value': (15.0, 85.0),
# 'latency' : (2.0, 10.0)
# }
# # Start the DaskStreamer in a separate thread
# streamer_thread = Thread(
# target=DaskStreamer,
# args=("analytics_stream", kpi_list, oper_list, thresholds, stop_event),
# kwargs={
# "window_size": "60s",
# "win_slide_duration": "30s",
# "time_stamp_col": "time_stamp"
# }
# )
# streamer_thread.start()
# try:
# while True:
# time.sleep(10)
# except KeyboardInterrupt:
# LOGGER.info("KeyboardInterrupt received. Stopping streamer...")
# stop_event.set()
# streamer_thread.join()
# LOGGER.info("Streamer stopped gracefully.")
# --- To test Start Streamer functionality ---
# def test_StartDaskStreamer():
# LOGGER.debug(" >>> test_StartBaskStreamer: START <<< ")
# analyzer_obj = create_analyzer_dask()
# # LOGGER.info("Created Analyzer Object: {:}".format(analyzer_obj))
# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
# analyzer_to_generate : Dict = {
# "algo_name" : analyzer_obj.algorithm_name,
# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
# "oper_mode" : analyzer_obj.operation_mode,
# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
# "oper_list" : json.loads(analyzer_obj.parameters["oper_list"]),
# # "oper_list" : analyzer_obj.parameters["oper_list"],
# "window_size" : analyzer_obj.parameters["window_size"],
# "window_slider" : analyzer_obj.parameters["window_slider"],
# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
# }
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# LOGGER.info("Analyzer to be generated: {:}".format((analyzer_to_generate)))
# response = AnalyticsBackendServiceObj.StartDaskListener(analyzer_uuid, analyzer_to_generate)
# assert isinstance(response, bool)
# time.sleep(100)
# LOGGER.info('Initiating StopRequestListener...')
# # AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StopDaskListener(analyzer_uuid)
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
# --- To test Start Streamer functionality ---
# def test_StartSparkStreamer():
# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
# analyzer_obj = create_analyzer()
# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
# analyzer_to_generate : Dict = {
# "algo_name" : analyzer_obj.algorithm_name,
# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
# "oper_mode" : analyzer_obj.operation_mode,
# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
# "window_size" : analyzer_obj.parameters["window_size"],
# "window_slider" : analyzer_obj.parameters["window_slider"],
# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
# }
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate)
# assert isinstance(response, bool)
# --- To TEST StartRequestListenerFunctionality
# def test_StartRequestListener():
# LOGGER.info('test_RunRequestListener')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
# LOGGER.debug(str(response))
# assert isinstance(response, tuple)
# AnalyticsBackendServiceObj.stop_event = Event()
# listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=())
# listener_thread.start()
# time.sleep(100)
# AnalyticsBackendServiceObj.stop_event.set()
# LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds')
# listener_thread.join(timeout=10)
# assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected."
# LOGGER.info('Completed test_RunRequestListener')
# To test START and STOP communication together
def test_StopRequestListener():
LOGGER.info('test_RunRequestListener')
LOGGER.info('Initiating StartRequestListener...')
AnalyticsBackendServiceObj = AnalyticsBackendService()
response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
# LOGGER.debug(str(response_thread))
time.sleep(10)
LOGGER.info('Initiating StopRequestListener...')
AnalyticsBackendServiceObj = AnalyticsBackendService()
response = AnalyticsBackendServiceObj.StopRequestListener(response_thread)
LOGGER.debug(str(response))
assert isinstance(response, bool)
# def test_StopRequestListener():
# LOGGER.info('test_RunRequestListener')
# LOGGER.info('Initiating StartRequestListener...')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
# # LOGGER.debug(str(response_thread))
# time.sleep(10)
# LOGGER.info('Initiating StopRequestListener...')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StopRequestListener(response_thread)
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
# To independently tests the SparkListener functionality
# def test_SparkListener():
......
......@@ -28,8 +28,8 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION,
class AnalyticsFrontendClient:
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.ANALYTICSFRONTEND)
if not port: port = get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND)
if not host: host = get_service_host(ServiceNameEnum.ANALYTICS)
if not port: port = get_service_port_grpc(ServiceNameEnum.ANALYTICS)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None
......