Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 969 additions and 406 deletions
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/telemetryservice -c frontend
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Build, tag, and push the Docker image to the GitLab Docker registry
build analytics:
variables:
IMAGE_NAME: 'analytics' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
# This first build tags the builder resulting image to prevent being removed by dangling image removal command
# - docker buildx build -t "${IMAGE_NAME}-backend:${IMAGE_TAG}-builder" --target builder -f ./src/$IMAGE_NAME/backend/Dockerfile .
- docker buildx build -t "${IMAGE_NAME}-frontend:$IMAGE_TAG" -f ./src/$IMAGE_NAME/frontend/Dockerfile .
- docker buildx build -t "${IMAGE_NAME}-backend:$IMAGE_TAG" -f ./src/$IMAGE_NAME/backend/Dockerfile .
- docker tag "${IMAGE_NAME}-frontend:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG"
- docker tag "${IMAGE_NAME}-backend:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG"
after_script:
- docker images --filter="dangling=true" --quiet | xargs -r docker rmi
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/.gitlab-ci.yml
- src/$IMAGE_NAME/frontend/**/*.{py,in,yml}
- src/$IMAGE_NAME/frontend/Dockerfile
- src/$IMAGE_NAME/frontend/tests/*.py
- src/$IMAGE_NAME/backend/Dockerfile
- src/$IMAGE_NAME/backend/**/*.{py,in,yml}
- src/$IMAGE_NAME/backend/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
# Apply unit test to the component
unit_test analytics-backend:
variables:
IMAGE_NAME: 'analytics' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build analytics
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep kafka; then docker rm -f kafka; else echo "Kafka container is not in the system"; fi
- if docker container ls | grep zookeeper; then docker rm -f zookeeper; else echo "Zookeeper container is not in the system"; fi
- if docker container ls | grep ${IMAGE_NAME}-backend; then docker rm -f ${IMAGE_NAME}-backend; else echo "${IMAGE_NAME}-backend container is not in the system"; fi
- docker container prune -f
script:
- docker pull "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG"
- docker pull "bitnami/zookeeper:latest"
- docker pull "bitnami/kafka:latest"
- >
docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181
bitnami/zookeeper:latest
- sleep 10 # Wait for Zookeeper to start
- >
docker run --name kafka -d --network=teraflowbridge -p 9092:9092
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env ALLOW_PLAINTEXT_LISTENER=yes
bitnami/kafka:latest
- sleep 10 # Wait for Kafka to start
- KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $KAFKA_IP
- >
docker run --name $IMAGE_NAME-backend -d -p 30060:30060
--env "KFK_SERVER_ADDRESS=${KAFKA_IP}:9092"
--volume "$PWD/src/$IMAGE_NAME/backend/tests:/opt/results"
--network=teraflowbridge
$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG
- docker ps -a
- sleep 5
- docker logs ${IMAGE_NAME}-backend
- >
docker exec -i ${IMAGE_NAME}-backend bash -c
"coverage run -m pytest --log-level=INFO --verbose --junitxml=/opt/results/${IMAGE_NAME}-backend_report.xml $IMAGE_NAME/backend/tests/test_*.py"
- docker exec -i ${IMAGE_NAME}-backend bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker network rm teraflowbridge
- docker volume prune --force
- docker image prune --force
- docker rm -f ${IMAGE_NAME}-backend
- docker rm -f zookeeper
- docker rm -f kafka
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/backend/**/*.{py,in,yml}
- src/$IMAGE_NAME/backend/Dockerfile
- src/$IMAGE_NAME/backend/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/backend/tests/${IMAGE_NAME}-backend_report.xml
# Apply unit test to the component
unit_test analytics-frontend:
variables:
IMAGE_NAME: 'analytics' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build analytics
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep crdb; then docker rm -f crdb; else echo "CockroachDB container is not in the system"; fi
- if docker volume ls | grep crdb; then docker volume rm -f crdb; else echo "CockroachDB volume is not in the system"; fi
- if docker container ls | grep kafka; then docker rm -f kafka; else echo "Kafka container is not in the system"; fi
- if docker container ls | grep zookeeper; then docker rm -f zookeeper; else echo "Zookeeper container is not in the system"; fi
- if docker container ls | grep ${IMAGE_NAME}-frontend; then docker rm -f ${IMAGE_NAME}-frontend; else echo "${IMAGE_NAME}-frontend container is not in the system"; fi
- docker container prune -f
script:
- docker pull "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG"
- docker pull "bitnami/zookeeper:latest"
- docker pull "bitnami/kafka:latest"
- docker pull "cockroachdb/cockroach:latest-v22.2"
- docker volume create crdb
- >
docker run --name crdb -d --network=teraflowbridge -p 26257:26257 -p 8080:8080
--env COCKROACH_DATABASE=tfs_test --env COCKROACH_USER=tfs --env COCKROACH_PASSWORD=tfs123
--volume "crdb:/cockroach/cockroach-data"
cockroachdb/cockroach:latest-v22.2 start-single-node
- echo "Waiting for initialization..."
- while ! docker logs crdb 2>&1 | grep -q 'finished creating default user \"tfs\"'; do sleep 1; done
# - docker logs crdb
# - docker ps -a
- CRDB_ADDRESS=$(docker inspect crdb --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $CRDB_ADDRESS
- >
docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181 \
-e ALLOW_ANONYMOUS_LOGIN=yes \
bitnami/zookeeper:latest
- sleep 10 # Wait for Zookeeper to start
- docker run --name kafka -d --network=teraflowbridge -p 9092:9092
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env ALLOW_PLAINTEXT_LISTENER=yes
bitnami/kafka:latest
- sleep 10 # Wait for Kafka to start
- KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $KAFKA_IP
# - docker logs zookeeper
# - docker logs kafka
- >
docker run --name $IMAGE_NAME-frontend -d -p 30050:30050
--env "CRDB_URI=cockroachdb://tfs:tfs123@${CRDB_ADDRESS}:26257/tfs_test?sslmode=require"
--env "KFK_SERVER_ADDRESS=${KAFKA_IP}:9092"
--volume "$PWD/src/$IMAGE_NAME/frontend/tests:/opt/results"
--network=teraflowbridge
$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG
- docker ps -a
- sleep 5
- docker logs ${IMAGE_NAME}-frontend
- >
docker exec -i ${IMAGE_NAME}-frontend bash -c
"coverage run -m pytest --log-level=INFO --verbose --junitxml=/opt/results/${IMAGE_NAME}-frontend_report.xml $IMAGE_NAME/frontend/tests/test_*.py"
- docker exec -i ${IMAGE_NAME}-frontend bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker volume rm -f crdb
- docker network rm teraflowbridge
- docker volume prune --force
- docker image prune --force
- docker rm -f ${IMAGE_NAME}-frontend
- docker rm -f zookeeper
- docker rm -f kafka
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/frontend/**/*.{py,in,yml}
- src/$IMAGE_NAME/frontend/Dockerfile
- src/$IMAGE_NAME/frontend/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/frontend/tests/${IMAGE_NAME}-frontend_report.xml
\ No newline at end of file
# How to locally run and test Analytic service (To be added soon) # How to Locally Run and Test Analytic Frontend Service
### Pre-requisets ### Pre-requisets
The following requirements should be fulfilled before the execuation of Telemetry service. The following requirements should be fulfilled before the execuation of Analytics service.
1. A virtual enviornment exist with all the required packages listed in [requirements.in](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/analytics/frontend/requirements.in) sucessfully installed.
2. Verify the creation of required database and table. The
[Analytics DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/analytics/tests/test_analytics_db.py) python file lists the functions to create tables and the database.
3. The Analytics backend service should be running.
4. All required Kafka topics must exist. Call `create_all_topics` from the [Kafka class](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/common/tools/kafka/Variables.py) to create any topics that do not already exist.
```
from common.tools.kafka.Variables import KafkaTopic
KafkaTopic.create_all_topics()
```
5. There will be an input stream on the Kafka topic that the Spark Streamer will consume and apply a defined thresholds.
- A JSON encoded string should be generated in the following format:
```
'{"time_stamp": "2024-09-03T12:36:26Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 44.22}'
```
- `kpi_value` should be float or int.
- The Kafka producer key should be the UUID of the Analyzer used when creating it.
- Use the following Kafka topic to generate the stream: `KafkaTopic.ANALYTICS_RESPONSE.value`.
## Steps to create and start Analyzer
The analyzer can be declared as below but there are many other ways to declare:
The given object creation process for `_create_analyzer` involves defining an instance of the `Analyzer` message from the [gRPC definition](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/194-unable-to-correctly-extract-the-aggregation-function-names-from-the-dictionary-received-as/proto/analytics_frontend.proto) and populating its fields.
```
from common.proto.analytics_frontend_pb2 import AnalyzerId
_create_analyzer_id = AnalyzerId()
```
Here is a breakdown of how each field is populated:
### 1. **Analyzer ID**
- `analyzer_id`: This field uses a unique ID to identify the analyzer. In this case, the ID is a UUID.
```python
_create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
```
- The commented-out code shows how the UUID can be generated dynamically using Python's `uuid.uuid4()`. However, for now, a static UUID is used.
### 2. **Algorithm Name**
- `algorithm_name`: Specifies the name of the algorithm to be executed by the analyzer.
```python
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
```
### 3. **Operation Mode**
- `operation_mode`: Sets the mode in which the analyzer operates, in this case, it's set to `ANALYZEROPERATIONMODE_STREAMING`.
```python
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
```
### 4. **Input KPI IDs**
- `input_kpi_ids`: This is a list of KPI IDs that will be used as input for the analysis. KPI IDs are represented using `KpiId`, and UUIDs are assigned to each input. The Spark streamer assume that the provided KPIs exists in the KPI Descriptor database.
```python
_kpi_id = KpiId()
_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.input_kpi_ids.append(_kpi_id)
```
### 5. **Output KPI IDs**
- `output_kpi_ids`: A list of KPI IDs that are produced as output after analysis. Each one is generated and appended to the list.
```python
_kpi_id = KpiId()
_create_analyzer.output_kpi_ids.append(_kpi_id)
```
### 6. **Parameters**
- `parameters`: This is a dictionary containing key-value pairs of various parameters used by the analyzer. These values are often algorithm-specific.
- **Thresholds**: A dictionary containing threshold possible values (min, max, avg, first, last, stdev)_<any_name>. For example: "min_latency", "max_bandwidth", "avg_datarate" etc.
```python
_threshold_dict = {
'min_latency' : (00, 10),
'max_bandwidth': (40, 50),
'avg_datarate': (00, 10)
}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
```
- **Window Size**: Specifies the size of the time window (e.g., `60 seconds`).
```python
_create_analyzer.parameters['window_size'] = "60 seconds"
```
- **Window Slider**: Defines the sliding window interval (e.g., `30 seconds`).
```python
_create_analyzer.parameters['window_slider'] = "30 seconds"
```
### **Calling `StartAnalyzer` with an Analyzer Frontend Object**
- The following code demonstrates how to call `StartAnalyzer()` with an Analyzer object:
```python
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
analytics_client_object = AnalyticsFrontendClient()
analytics_client_object.StartAnalyzer(_create_analyzer_id)
```
### **How to Receive Analyzer Responses**
- There is a non-gRPC method in the analyzer frontend called `StartResponseListener(<analyzer_uuid>)`. The `analyzer_uuid` is the UUID of the analyzer provided when calling `StartAnalyzer()`. The following code will log the responses:
```python
from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
analytic_frontend_service_object = AnalyticsFrontendServiceServicerImpl()
for response in analytic_frontend_service_object.StartResponseListener(<analyzer_uuid>):
LOGGER.debug(response)
```
### **Understanding the Output of the Analyzer**
- **Output Column Names**: The output JSON string will include two keys for each defined threshold. For example, the `min_latency` threshold will generate two keys: `min_latency_THRESHOLD_FAIL` and `min_latency_THRESHOLD_RAISE`.
- `min_latency_THRESHOLD_FAIL` is triggered if the average latency calculated within the defined window size is less than the specified threshold range.
- `min_latency_THRESHOLD_RAISE` is triggered if the average latency calculated within the defined window size exceeds the specified threshold range.
- The thresholds `min_latency_THRESHOLD_FAIL` and `min_latency_THRESHOLD_RAISE` will have a value of `TRUE` if activated; otherwise, they will be set to `FALSE`.
...@@ -12,5 +12,7 @@ ...@@ -12,5 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
pyspark==3.5.2 dask==2024.1.0
distributed==2024.1.0
pandas==2.2.3
confluent-kafka==2.3.* confluent-kafka==2.3.*
...@@ -12,18 +12,18 @@ ...@@ -12,18 +12,18 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import time
import json import json
import logging import logging
import threading import threading
from common.tools.service.GenericGrpcService import GenericGrpcService from common.tools.service.GenericGrpcService import GenericGrpcService
from analytics.backend.service.SparkStreaming import SparkStreamer
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc from common.Settings import get_service_port_grpc
from threading import Thread, Event
from .DaskStreaming import DaskStreamer
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -36,62 +36,22 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -36,62 +36,22 @@ class AnalyticsBackendService(GenericGrpcService):
port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND) port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND)
super().__init__(port, cls_name=cls_name) super().__init__(port, cls_name=cls_name)
self.running_threads = {} # To keep track of all running analyzers self.running_threads = {} # To keep track of all running analyzers
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : '10.152.183.186:9092',
'group.id' : 'analytics-frontend', 'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'}) 'auto.offset.reset' : 'latest'})
def StartSparkStreamer(self, analyzer_uuid, analyzer): def install_servicers(self):
kpi_list = analyzer['input_kpis'] threading.Thread(target=self.RequestListener, args=()).start()
oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())] # TODO: update this line...
thresholds = analyzer['thresholds']
window_size = analyzer['window_size']
window_slider = analyzer['window_slider']
print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
kpi_list, oper_list, thresholds, window_size, window_slider))
LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
kpi_list, oper_list, thresholds, window_size, window_slider))
try:
stop_event = threading.Event()
thread = threading.Thread(target=SparkStreamer,
args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event,
window_size, window_slider, None ))
self.running_threads[analyzer_uuid] = (thread, stop_event)
thread.start()
print ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
return True
except Exception as e:
print ("Failed to initiate Analyzer backend: {:}".format(e))
LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
return False
def StopRequestListener(self, threadInfo: tuple):
try:
thread, stop_event = threadInfo
stop_event.set()
thread.join()
print ("Terminating Analytics backend RequestListener")
LOGGER.info("Terminating Analytics backend RequestListener")
return True
except Exception as e:
print ("Failed to terminate analytics backend {:}".format(e))
LOGGER.error("Failed to terminate analytics backend {:}".format(e))
return False
def install_services(self): def RequestListener(self):
stop_event = threading.Event()
thread = threading.Thread(target=self.RequestListener,
args=(stop_event,) )
thread.start()
return (thread, stop_event)
def RequestListener(self, stop_event):
""" """
listener for requests on Kafka topic. listener for requests on Kafka topic.
""" """
LOGGER.info("Request Listener is initiated ...")
print ("Request Listener is initiated ...")
consumer = self.kafka_consumer consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
while not stop_event.is_set(): while True:
receive_msg = consumer.poll(2.0) receive_msg = consumer.poll(2.0)
if receive_msg is None: if receive_msg is None:
continue continue
...@@ -99,21 +59,54 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -99,21 +59,54 @@ class AnalyticsBackendService(GenericGrpcService):
if receive_msg.error().code() == KafkaError._PARTITION_EOF: if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue continue
else: else:
print("Consumer error: {}".format(receive_msg.error())) LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
print ("Consumer error: {:}".format(receive_msg.error()))
break break
analyzer = json.loads(receive_msg.value().decode('utf-8')) try:
analyzer_uuid = receive_msg.key().decode('utf-8') analyzer = json.loads(receive_msg.value().decode('utf-8'))
LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) analyzer_uuid = receive_msg.key().decode('utf-8')
print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
self.StopDaskListener(analyzer_uuid)
else:
self.StartDaskListener(analyzer_uuid, analyzer)
except Exception as e:
LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
def StartDaskListener(self, analyzer_uuid, analyzer):
kpi_list = analyzer[ 'input_kpis' ]
thresholds = analyzer[ 'thresholds' ]
window_size = analyzer[ 'window_size' ]
window_slider = analyzer[ 'window_slider']
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format(
self.TerminateAnalyzerBackend(analyzer_uuid) kpi_list, thresholds, window_size, window_slider))
else: print ("Received parameters: {:} - {:} - {:} - {:}".format(
self.StartSparkStreamer(analyzer_uuid, analyzer) kpi_list, thresholds, window_size, window_slider))
LOGGER.debug("Stop Event activated. Terminating...") try:
print ("Stop Event activated. Terminating...") stop_event = Event()
thread = Thread(
target=DaskStreamer,
# args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event),
args=(analyzer['output_kpis'][0] , kpi_list, thresholds, stop_event),
kwargs={
"window_size" : window_size,
}
)
thread.start()
self.running_threads[analyzer_uuid] = (thread, stop_event)
print ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
return True
except Exception as e:
print ("Failed to initiate Analyzer backend: {:}".format(e))
LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
return False
def TerminateAnalyzerBackend(self, analyzer_uuid): def StopDaskListener(self, analyzer_uuid):
if analyzer_uuid in self.running_threads: if analyzer_uuid in self.running_threads:
try: try:
thread, stop_event = self.running_threads[analyzer_uuid] thread, stop_event = self.running_threads[analyzer_uuid]
...@@ -128,5 +121,4 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -128,5 +121,4 @@ class AnalyticsBackendService(GenericGrpcService):
return False return False
else: else:
print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
# generate confirmation towards frontend
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import json
from confluent_kafka import Consumer, Producer, KafkaException, KafkaError
import pandas as pd
from dask.distributed import Client, LocalCluster
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
def SettingKafkaConsumerParams():
return {'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-backend',
'auto.offset.reset' : 'latest'}
def GetAggregationMappings(thresholds):
agg_dict = {}
for threshold_key in thresholds.keys():
parts = threshold_key.split('_', 1)
if len(parts) != 2:
LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '<aggregation>_<metricName>' format. Skipping.")
continue
aggregation, metric_name = parts
# Ensure that the aggregation function is valid in pandas
if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']:
LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.")
continue
agg_dict[threshold_key] = ('kpi_value', aggregation)
return agg_dict
def ApplyThresholds(aggregated_df, thresholds):
"""
Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
on the aggregated DataFrame.
Args: aggregated_df (pd.DataFrame): DataFrame with aggregated metrics.
thresholds (dict): Thresholds dictionary with keys in the format '<aggregation>_<metricName>'.
Returns: pd.DataFrame: DataFrame with additional threshold columns.
"""
for threshold_key, threshold_values in thresholds.items():
if threshold_key not in aggregated_df.columns:
LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.")
continue
if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2:
fail_th, raise_th = threshold_values
aggregated_df[f"{threshold_key}_THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th
aggregated_df[f"{threshold_key}_THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th
else:
LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.")
return aggregated_df
def initialize_dask_client():
"""
Initialize a local Dask cluster and client.
"""
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
client = Client(cluster)
LOGGER.info(f"Dask Client Initialized: {client}")
return client, cluster
def initialize_kafka_producer():
return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()})
def delivery_report(err, msg):
if err is not None:
LOGGER.error(f"Message delivery failed: {err}")
else:
LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
def process_batch(batch, agg_mappings, thresholds, key):
"""
Process a batch of data and apply thresholds.
Args: batch (list of dict): List of messages from Kafka.
agg_mappings (dict): Mapping from threshold key to aggregation function.
thresholds (dict): Thresholds dictionary.
Returns: list of dict: Processed records ready to be sent to Kafka.
"""
if not batch:
LOGGER.info("Empty batch received. Skipping processing.")
return []
df = pd.DataFrame(batch)
LOGGER.info(f"df {df} ")
df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce',unit='s')
df.dropna(subset=['time_stamp'], inplace=True)
LOGGER.info(f"df {df} ")
required_columns = {'time_stamp', 'kpi_id', 'kpi_value'}
if not required_columns.issubset(df.columns):
LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.")
return []
if df.empty:
LOGGER.info("No data after filtering by KPI IDs. Skipping processing.")
return []
# Perform aggregations using named aggregation
try:
agg_dict = {key: value for key, value in agg_mappings.items()}
df_agg = df.groupby(['window_start']).agg(**agg_dict).reset_index()
except Exception as e:
LOGGER.error(f"Aggregation error: {e}")
return []
# Apply thresholds
df_thresholded = ApplyThresholds(df_agg, thresholds)
df_thresholded['kpi_id'] = key
df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
# Convert aggregated DataFrame to list of dicts
result = df_thresholded.to_dict(orient='records')
LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.")
return result
def produce_result(result, producer, destination_topic):
for record in result:
try:
producer.produce(
destination_topic,
key=str(record.get('kpi_id', '')),
value=json.dumps(record),
callback=delivery_report
)
except KafkaException as e:
LOGGER.error(f"Failed to produce message: {e}")
producer.flush()
LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
def DaskStreamer(key, kpi_list, thresholds, stop_event,
window_size="30s", time_stamp_col="time_stamp"):
client, cluster = initialize_dask_client()
consumer_conf = SettingKafkaConsumerParams()
consumer = Consumer(consumer_conf)
consumer.subscribe([KafkaTopic.VALUE.value])
producer = initialize_kafka_producer()
# Parse window_size to seconds
try:
window_size_td = pd.to_timedelta(window_size)
window_size_seconds = window_size_td.total_seconds()
except Exception as e:
LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}")
window_size_seconds = 30
LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.")
# Extract aggregation mappings from thresholds
agg_mappings = GetAggregationMappings(thresholds)
if not agg_mappings:
LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.")
consumer.close()
producer.flush()
client.close()
cluster.close()
return
try:
batch = []
last_batch_time = time.time()
LOGGER.info("Starting to consume messages...")
while not stop_event.is_set():
msg = consumer.poll(1.0)
if msg is None:
current_time = time.time()
if (current_time - last_batch_time) >= window_size_seconds and batch:
LOGGER.info("Time-based batch threshold reached. Processing batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
batch = []
last_batch_time = current_time
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
else:
LOGGER.error(f"Kafka error: {msg.error()}")
continue
try:
message_value = json.loads(msg.value().decode('utf-8'))
except json.JSONDecodeError as e:
LOGGER.error(f"JSON decode error: {e}")
continue
try:
message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce',unit='s')
LOGGER.warning(f"message_timestamp: {message_timestamp}. Skipping message.")
if pd.isna(message_timestamp):
LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.")
continue
window_start = message_timestamp.floor(window_size)
LOGGER.warning(f"window_start: {window_start}. Skipping message.")
message_value['window_start'] = window_start
except Exception as e:
LOGGER.error(f"Error processing timestamp: {e}. Skipping message.")
continue
if message_value['kpi_id'] not in kpi_list:
LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.")
continue
batch.append(message_value)
current_time = time.time()
if (current_time - last_batch_time) >= window_size_seconds and batch:
LOGGER.info("Time-based batch threshold reached. Processing batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds, key)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
batch = []
last_batch_time = current_time
except Exception as e:
LOGGER.exception(f"Error in Dask streaming process: {e}")
finally:
# Process any remaining messages in the batch
if batch:
LOGGER.info("Processing remaining messages in the batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
consumer.close()
producer.flush()
LOGGER.info("Kafka consumer and producer closed.")
client.close()
cluster.close()
LOGGER.info("Dask client and cluster closed.")
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import from_json, col, window, avg, min, max, first, last, stddev, when, round
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
LOGGER = logging.getLogger(__name__)
def DefiningSparkSession():
# Create a Spark session with specific spark verions (3.5.0)
return SparkSession.builder \
.appName("Analytics") \
.config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.getOrCreate()
def SettingKafkaConsumerParams(): # TODO: create get_kafka_consumer() in common with inputs (bootstrap server, subscribe, startingOffset and failOnDataLoss with default values)
return {
# "kafka.bootstrap.servers": '127.0.0.1:9092',
"kafka.bootstrap.servers": KafkaConfig.get_kafka_address(),
"subscribe" : KafkaTopic.VALUE.value,
"startingOffsets" : 'latest',
"failOnDataLoss" : 'false' # Optional: Set to "true" to fail the query on data loss
}
def DefiningRequestSchema():
return StructType([
StructField("time_stamp" , StringType() , True),
StructField("kpi_id" , StringType() , True),
StructField("kpi_value" , DoubleType() , True)
])
def GetAggregations(oper_list):
# Define the possible aggregation functions
agg_functions = {
'avg' : round(avg ("kpi_value"), 3) .alias("avg_value"),
'min' : round(min ("kpi_value"), 3) .alias("min_value"),
'max' : round(max ("kpi_value"), 3) .alias("max_value"),
'first': round(first ("kpi_value"), 3) .alias("first_value"),
'last' : round(last ("kpi_value"), 3) .alias("last_value"),
'stdev': round(stddev ("kpi_value"), 3) .alias("stdev_value")
}
return [agg_functions[op] for op in oper_list if op in agg_functions] # Filter and return only the selected aggregations
def ApplyThresholds(aggregated_df, thresholds):
# Apply thresholds (TH-Fail and TH-RAISE) based on the thresholds dictionary on the aggregated DataFrame.
# Loop through each column name and its associated thresholds
for col_name, (fail_th, raise_th) in thresholds.items():
# Apply TH-Fail condition (if column value is less than the fail threshold)
aggregated_df = aggregated_df.withColumn(
f"{col_name}_THRESHOLD_FAIL",
when(col(col_name) < fail_th, True).otherwise(False)
)
# Apply TH-RAISE condition (if column value is greater than the raise threshold)
aggregated_df = aggregated_df.withColumn(
f"{col_name}_THRESHOLD_RAISE",
when(col(col_name) > raise_th, True).otherwise(False)
)
return aggregated_df
def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event,
window_size=None, win_slide_duration=None, time_stamp_col=None):
"""
Method to perform Spark operation Kafka stream.
NOTE: Kafka topic to be processesd should have atleast one row before initiating the spark session.
"""
kafka_consumer_params = SettingKafkaConsumerParams() # Define the Kafka consumer parameters
schema = DefiningRequestSchema() # Define the schema for the incoming JSON data
spark = DefiningSparkSession() # Define the spark session with app name and spark version
# extra options default assignment
if window_size is None: window_size = "60 seconds" # default
if win_slide_duration is None: win_slide_duration = "30 seconds" # default
if time_stamp_col is None: time_stamp_col = "time_stamp" # default
try:
# Read data from Kafka
raw_stream_data = spark \
.readStream \
.format("kafka") \
.options(**kafka_consumer_params) \
.load()
# Convert the value column from Kafka to a string
stream_data = raw_stream_data.selectExpr("CAST(value AS STRING)")
# Parse the JSON string into a DataFrame with the defined schema
parsed_stream_data = stream_data.withColumn("parsed_value", from_json(col("value"), schema))
# Select the parsed fields
final_stream_data = parsed_stream_data.select("parsed_value.*")
# Convert the time_stamp to proper timestamp (assuming it's in ISO format)
final_stream_data = final_stream_data.withColumn(time_stamp_col, col(time_stamp_col).cast(TimestampType()))
# Filter the stream to only include rows where the kpi_id is in the kpi_list
filtered_stream_data = final_stream_data.filter(col("kpi_id").isin(kpi_list))
# Define a window for aggregation
windowed_stream_data = filtered_stream_data \
.groupBy(
window( col(time_stamp_col),
window_size, slideDuration=win_slide_duration
),
col("kpi_id")
) \
.agg(*GetAggregations(oper_list))
# Apply thresholds to the aggregated data
thresholded_stream_data = ApplyThresholds(windowed_stream_data, thresholds)
# --- This will write output on console: FOR TESTING PURPOSES
# Start the Spark streaming query
# query = thresholded_stream_data \
# .writeStream \
# .outputMode("update") \
# .format("console")
# --- This will write output to Kafka: ACTUAL IMPLEMENTATION
query = thresholded_stream_data \
.selectExpr(f"'{key}' AS key", "to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \
.option("topic", KafkaTopic.ANALYTICS_RESPONSE.value) \
.option("checkpointLocation", "analytics/.spark/checkpoint") \
.outputMode("update")
# Start the query execution
queryHandler = query.start()
# Loop to check for stop event flag. To be set by stop collector method.
while True:
if stop_event.is_set():
LOGGER.debug("Stop Event activated. Terminating in 5 seconds...")
print ("Stop Event activated. Terminating in 5 seconds...")
time.sleep(5)
queryHandler.stop()
break
time.sleep(5)
except Exception as e:
print("Error in Spark streaming process: {:}".format(e))
LOGGER.debug("Error in Spark streaming process: {:}".format(e))
...@@ -37,8 +37,8 @@ def main(): ...@@ -37,8 +37,8 @@ def main():
LOGGER.info('Starting...') LOGGER.info('Starting...')
# Start metrics server # Start metrics server
metrics_port = get_metrics_port() # metrics_port = get_metrics_port()
start_http_server(metrics_port) # start_http_server(metrics_port)
grpc_service = AnalyticsBackendService() grpc_service = AnalyticsBackendService()
grpc_service.start() grpc_service.start()
......
...@@ -12,6 +12,11 @@ ...@@ -12,6 +12,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import uuid
import json
from common.proto.kpi_manager_pb2 import KpiId
from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode,
Analyzer, AnalyzerId )
def get_kpi_id_list(): def get_kpi_id_list():
return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"] return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"]
...@@ -32,3 +37,78 @@ def get_threshold_dict(): ...@@ -32,3 +37,78 @@ def get_threshold_dict():
return { return {
op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict
} }
def create_analyzer_id():
_create_analyzer_id = AnalyzerId()
# _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
# _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
_create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
return _create_analyzer_id
def create_analyzer():
_create_analyzer = Analyzer()
# _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer.analyzer_id.analyzer_id.uuid = "20540c4f-6797-45e5-af70-6491b49283f9"
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
_kpi_id = KpiId()
# input IDs to analyze
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "5716c369-932b-4a02-b4c7-6a2e808b92d7"
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "8f70d908-cc48-48de-8664-dc9be2de0089"
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.input_kpi_ids.append(_kpi_id)
# output IDs after analysis
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter
_threshold_dict = {
# 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),
'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
_create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks"
_create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size
_create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet
return _create_analyzer
def create_analyzer_dask():
_create_analyzer = Analyzer()
_create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
# _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
_kpi_id = KpiId()
# input IDs to analyze
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
_create_analyzer.input_kpi_ids.append(_kpi_id)
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.input_kpi_ids.append(_kpi_id)
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.input_kpi_ids.append(_kpi_id)
# output IDs after analysis
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter
_threshold_dict = {
'mean_latency' :(20, 30), 'min_latency' :(00, 10), 'max_latency' :(45, 50),#}
'first_value' :(00, 50), 'last_value' :(50, 100), 'std_value' :(0, 90)}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
_create_analyzer.parameters['oper_list'] = json.dumps([key.split('_')[0] for key in _threshold_dict.keys()])
_create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks"
_create_analyzer.parameters['window_slider'] = "5s" # should be less than window size
_create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet
return _create_analyzer
...@@ -12,12 +12,16 @@ ...@@ -12,12 +12,16 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import time import time, json
from typing import Dict
import logging import logging
import threading import threading
from common.tools.kafka.Variables import KafkaTopic from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
from .messages import create_analyzer, create_analyzer_dask
from threading import Thread, Event
from ..service.DaskStreaming import DaskStreamer
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -27,31 +31,122 @@ LOGGER = logging.getLogger(__name__) ...@@ -27,31 +31,122 @@ LOGGER = logging.getLogger(__name__)
########################### ###########################
# --- "test_validate_kafka_topics" should be run before the functionality tests --- # --- "test_validate_kafka_topics" should be run before the functionality tests ---
def test_validate_kafka_topics(): # def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") # LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics() # response = KafkaTopic.create_all_topics()
assert isinstance(response, bool) # assert isinstance(response, bool)
# def test_StartRequestListener():
# LOGGER.info('test_RunRequestListener') # --- To test Dask Streamer functionality ---
# def test_StartDaskStreamer(): # Directly from the Streamer class
# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
# stop_event = Event()
# kpi_list = ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3']
# oper_list = ['avg', 'min', 'max',]
# thresholds = {
# 'avg_value': (10.0, 90.0),
# 'min_value': (5.0, 95.0),
# 'max_value': (15.0, 85.0),
# 'latency' : (2.0, 10.0)
# }
# # Start the DaskStreamer in a separate thread
# streamer_thread = Thread(
# target=DaskStreamer,
# args=("analytics_stream", kpi_list, oper_list, thresholds, stop_event),
# kwargs={
# "window_size": "60s",
# "win_slide_duration": "30s",
# "time_stamp_col": "time_stamp"
# }
# )
# streamer_thread.start()
# try:
# while True:
# time.sleep(10)
# except KeyboardInterrupt:
# LOGGER.info("KeyboardInterrupt received. Stopping streamer...")
# stop_event.set()
# streamer_thread.join()
# LOGGER.info("Streamer stopped gracefully.")
# --- To test Start Streamer functionality ---
# def test_StartDaskStreamer():
# LOGGER.debug(" >>> test_StartBaskStreamer: START <<< ")
# analyzer_obj = create_analyzer_dask()
# # LOGGER.info("Created Analyzer Object: {:}".format(analyzer_obj))
# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
# analyzer_to_generate : Dict = {
# "algo_name" : analyzer_obj.algorithm_name,
# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
# "oper_mode" : analyzer_obj.operation_mode,
# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
# "oper_list" : json.loads(analyzer_obj.parameters["oper_list"]),
# # "oper_list" : analyzer_obj.parameters["oper_list"],
# "window_size" : analyzer_obj.parameters["window_size"],
# "window_slider" : analyzer_obj.parameters["window_slider"],
# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
# }
# AnalyticsBackendServiceObj = AnalyticsBackendService() # AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) # LOGGER.info("Analyzer to be generated: {:}".format((analyzer_to_generate)))
# response = AnalyticsBackendServiceObj.StartDaskListener(analyzer_uuid, analyzer_to_generate)
# assert isinstance(response, bool)
# time.sleep(100)
# LOGGER.info('Initiating StopRequestListener...')
# # AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StopDaskListener(analyzer_uuid)
# LOGGER.debug(str(response)) # LOGGER.debug(str(response))
# assert isinstance(response, tuple) # assert isinstance(response, bool)
# To test START and STOP communication together # --- To test Start Streamer functionality ---
def test_StopRequestListener(): # def test_StartSparkStreamer():
# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
# analyzer_obj = create_analyzer()
# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
# analyzer_to_generate : Dict = {
# "algo_name" : analyzer_obj.algorithm_name,
# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
# "oper_mode" : analyzer_obj.operation_mode,
# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
# "window_size" : analyzer_obj.parameters["window_size"],
# "window_slider" : analyzer_obj.parameters["window_slider"],
# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
# }
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate)
# assert isinstance(response, bool)
# --- To TEST StartRequestListenerFunctionality
def test_StartRequestListener():
LOGGER.info('test_RunRequestListener') LOGGER.info('test_RunRequestListener')
LOGGER.info('Initiating StartRequestListener...')
AnalyticsBackendServiceObj = AnalyticsBackendService() AnalyticsBackendServiceObj = AnalyticsBackendService()
response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) AnalyticsBackendServiceObj.stop_event = Event()
# LOGGER.debug(str(response_thread)) listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=())
time.sleep(10) listener_thread.start()
LOGGER.info('Initiating StopRequestListener...')
AnalyticsBackendServiceObj = AnalyticsBackendService() time.sleep(100)
response = AnalyticsBackendServiceObj.StopRequestListener(response_thread)
LOGGER.debug(str(response)) # AnalyticsBackendServiceObj.stop_event.set()
assert isinstance(response, bool) # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds')
# listener_thread.join(timeout=10)
# assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected."
LOGGER.info('Completed test_RunRequestListener')
# To test START and STOP communication together
# def test_StopRequestListener():
# LOGGER.info('test_RunRequestListener')
# LOGGER.info('Initiating StartRequestListener...')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
# # LOGGER.debug(str(response_thread))
# time.sleep(10)
# LOGGER.info('Initiating StopRequestListener...')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StopRequestListener(response_thread)
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
# To independently tests the SparkListener functionality # To independently tests the SparkListener functionality
# def test_SparkListener(): # def test_SparkListener():
......
...@@ -62,9 +62,9 @@ RUN python3 -m pip install -r requirements.txt ...@@ -62,9 +62,9 @@ RUN python3 -m pip install -r requirements.txt
# Add component files into working directory # Add component files into working directory
WORKDIR /var/teraflow WORKDIR /var/teraflow
COPY src/analytics/__init__.py analytics/__init__.py COPY ./src/analytics/__init__.py analytics/__init__.py
COPY src/analytics/frontend/. analytics/frontend/ COPY ./src/analytics/frontend/. analytics/frontend/
COPY src/analytics/database/. analytics/database/ COPY ./src/analytics/database/. analytics/database/
# Start the service # Start the service
ENTRYPOINT ["python", "-m", "analytics.frontend.service"] ENTRYPOINT ["python", "-m", "analytics.frontend.service"]
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
apscheduler==3.10.4 apscheduler==3.10.1
confluent-kafka==2.3.* confluent-kafka==2.3.*
psycopg2-binary==2.9.* psycopg2-binary==2.9.*
SQLAlchemy==1.4.* SQLAlchemy==1.4.*
......
...@@ -12,13 +12,10 @@ ...@@ -12,13 +12,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging, grpc, json
import logging, grpc, json, queue
from typing import Dict from typing import Dict
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import KafkaError
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.context_pb2 import Empty from common.proto.context_pb2 import Empty
...@@ -27,8 +24,7 @@ from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, Analy ...@@ -27,8 +24,7 @@ from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, Analy
from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer
from analytics.database.Analyzer_DB import AnalyzerDB from analytics.database.Analyzer_DB import AnalyzerDB
from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC') METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC')
...@@ -36,14 +32,8 @@ METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC') ...@@ -36,14 +32,8 @@ METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC')
class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
def __init__(self): def __init__(self):
LOGGER.info('Init AnalyticsFrontendService') LOGGER.info('Init AnalyticsFrontendService')
self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value
self.db_obj = AnalyzerDB() self.db_obj = AnalyzerDB()
self.result_queue = queue.Queue()
self.scheduler = BackgroundScheduler()
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StartAnalyzer(self, def StartAnalyzer(self,
...@@ -56,7 +46,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -56,7 +46,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
AnalyzerModel.ConvertAnalyzerToRow(request) AnalyzerModel.ConvertAnalyzerToRow(request)
) )
self.PublishStartRequestOnKafka(request) self.PublishStartRequestOnKafka(request)
response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid
return response return response
...@@ -83,63 +72,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -83,63 +72,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
) )
LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate)) LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate))
self.kafka_producer.flush() self.kafka_producer.flush()
# self.StartResponseListener(analyzer_uuid)
def StartResponseListener(self, filter_key=None):
"""
Start the Kafka response listener with APScheduler and return key-value pairs periodically.
"""
LOGGER.info("Starting StartResponseListener")
# Schedule the ResponseListener at fixed intervals
self.scheduler.add_job(
self.response_listener,
trigger=IntervalTrigger(seconds=5),
args=[filter_key],
id=f"response_listener_{self.listener_topic}",
replace_existing=True
)
self.scheduler.start()
LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...")
try:
while True:
LOGGER.info("entering while...")
key, value = self.result_queue.get() # Wait until a result is available
LOGGER.info("In while true ...")
yield key, value # Yield the result to the calling function
except KeyboardInterrupt:
LOGGER.warning("Listener stopped manually.")
finally:
self.StopListener()
def response_listener(self, filter_key=None):
"""
Poll Kafka messages and put key-value pairs into the queue.
"""
LOGGER.info(f"Polling Kafka topic {self.listener_topic}...")
consumer = self.kafka_consumer
consumer.subscribe([self.listener_topic])
msg = consumer.poll(2.0)
if msg is None:
return
elif msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
LOGGER.error(f"Kafka error: {msg.error()}")
return
try:
key = msg.key().decode('utf-8') if msg.key() else None
if filter_key is not None and key == filter_key:
value = json.loads(msg.value().decode('utf-8'))
LOGGER.info(f"Received key: {key}, value: {value}")
self.result_queue.put((key, value))
else:
LOGGER.info(f"Skipping message with unmatched key: {key}")
# value = json.loads(msg.value().decode('utf-8')) # Added for debugging
# self.result_queue.put((filter_key, value)) # Added for debugging
except Exception as e:
LOGGER.error(f"Error processing Kafka message: {e}")
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StopAnalyzer(self, def StopAnalyzer(self,
...@@ -175,15 +107,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -175,15 +107,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
) )
LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid)) LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid))
self.kafka_producer.flush() self.kafka_producer.flush()
self.StopListener()
def StopListener(self):
"""
Gracefully stop the Kafka listener and the scheduler.
"""
LOGGER.info("Stopping Kafka listener...")
self.scheduler.shutdown()
LOGGER.info("Kafka listener stopped.")
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectAnalyzers(self, def SelectAnalyzers(self,
...@@ -203,12 +126,11 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -203,12 +126,11 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
LOGGER.info('Unable to process filter response {:}'.format(e)) LOGGER.info('Unable to process filter response {:}'.format(e))
except Exception as e: except Exception as e:
LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e)) LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e))
def delivery_callback(self, err, msg): def delivery_callback(self, err, msg):
if err: if err:
LOGGER.debug('Message delivery failed: {:}'.format(err)) LOGGER.debug('Message delivery failed: {:}'.format(err))
print ('Message delivery failed: {:}'.format(err)) print ('Message delivery failed: {:}'.format(err))
# else: else:
# LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
# print('Message delivered to topic {:}'.format(msg.topic())) print('Message delivered to topic {:}'.format(msg.topic()))
...@@ -21,13 +21,14 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze ...@@ -21,13 +21,14 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze
def create_analyzer_id(): def create_analyzer_id():
_create_analyzer_id = AnalyzerId() _create_analyzer_id = AnalyzerId()
# _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" # _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
_create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
return _create_analyzer_id return _create_analyzer_id
def create_analyzer(): def create_analyzer():
_create_analyzer = Analyzer() _create_analyzer = Analyzer()
# _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
...@@ -48,11 +49,12 @@ def create_analyzer(): ...@@ -48,11 +49,12 @@ def create_analyzer():
_create_analyzer.output_kpi_ids.append(_kpi_id) _create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter # parameter
_threshold_dict = { _threshold_dict = {
# 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), 'mean_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),
'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)} 'first_value' :(00, 10), 'last_value' :(40, 50), 'std_value':(00, 10)
}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
_create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" _create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks"
_create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size _create_analyzer.parameters['window_slider'] = "5s" # should be less than window size
_create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet
return _create_analyzer return _create_analyzer
......
...@@ -25,7 +25,7 @@ from common.Settings import ( get_service_port_grpc, get_env_var_name, ...@@ -25,7 +25,7 @@ from common.Settings import ( get_service_port_grpc, get_env_var_name,
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC ) ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC )
from common.tools.kafka.Variables import KafkaTopic from common.tools.kafka.Variables import KafkaTopic
from common.proto.analytics_frontend_pb2 import AnalyzerId, AnalyzerList from common.proto.kpi_value_api_pb2 import KpiValue
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService
from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer, from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer,
...@@ -33,7 +33,7 @@ from analytics.frontend.tests.messages import ( create_analyze ...@@ -33,7 +33,7 @@ from analytics.frontend.tests.messages import ( create_analyze
from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.interval import IntervalTrigger
from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList
########################### ###########################
# Tests Setup # Tests Setup
...@@ -84,45 +84,23 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic ...@@ -84,45 +84,23 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic
########################### ###########################
# --- "test_validate_kafka_topics" should be executed before the functionality tests --- # --- "test_validate_kafka_topics" should be executed before the functionality tests ---
def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
# ----- core funtionality test -----
# def test_StartAnalytics(analyticsFrontend_client):
# LOGGER.info(' >>> test_StartAnalytic START: <<< ')
# response = analyticsFrontend_client.StartAnalyzer(create_analyzer())
# LOGGER.debug(str(response))
# assert isinstance(response, AnalyzerId)
# To test start and stop listener together
def test_StartStopAnalyzers(analyticsFrontend_client): def test_StartStopAnalyzers(analyticsFrontend_client):
LOGGER.info(' >>> test_StartStopAnalyzers START: <<< ') LOGGER.info(' >>> test_StartAnalyzers START: <<< ')
LOGGER.info('--> StartAnalyzer')
added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer()) added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer())
LOGGER.debug(str(added_analyzer_id)) LOGGER.debug(str(added_analyzer_id))
LOGGER.info(' --> Calling StartResponseListener... ') assert isinstance(added_analyzer_id, AnalyzerId)
class_obj = AnalyticsFrontendServiceServicerImpl()
response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id._uuid)
LOGGER.debug(response)
LOGGER.info("waiting for timer to comlete ...")
time.sleep(3)
LOGGER.info('--> StopAnalyzer')
response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id)
LOGGER.debug(str(response))
# def test_SelectAnalytics(analyticsFrontend_client): def test_StopAnalytic(analyticsFrontend_client):
# LOGGER.info(' >>> test_SelectAnalytics START: <<< ') LOGGER.info(' >>> test_StopAnalytic START: <<< ')
# response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id())
# LOGGER.debug(str(response)) LOGGER.debug(str(response))
# assert isinstance(response, AnalyzerList) assert isinstance(response, Empty)
# def test_StopAnalytic(analyticsFrontend_client): def test_SelectAnalytics(analyticsFrontend_client):
# LOGGER.info(' >>> test_StopAnalytic START: <<< ') LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
# response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
# LOGGER.debug(str(response)) LOGGER.debug(str(response))
# assert isinstance(response, Empty) assert isinstance(response, AnalyzerList)
# def test_ResponseListener(): # def test_ResponseListener():
# LOGGER.info(' >>> test_ResponseListener START <<< ') # LOGGER.info(' >>> test_ResponseListener START <<< ')
...@@ -131,4 +109,4 @@ def test_StartStopAnalyzers(analyticsFrontend_client): ...@@ -131,4 +109,4 @@ def test_StartStopAnalyzers(analyticsFrontend_client):
# class_obj = AnalyticsFrontendServiceServicerImpl() # class_obj = AnalyticsFrontendServiceServicerImpl()
# for response in class_obj.StartResponseListener(analyzer_id.analyzer_id.uuid): # for response in class_obj.StartResponseListener(analyzer_id.analyzer_id.uuid):
# LOGGER.debug(response) # LOGGER.debug(response)
# assert isinstance(response, tuple) # assert isinstance(response, tuple)
\ No newline at end of file
...@@ -12,8 +12,6 @@ ...@@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
java==11.0.*
pyspark==3.5.2
confluent-kafka==2.3.* confluent-kafka==2.3.*
psycopg2-binary==2.9.* psycopg2-binary==2.9.*
SQLAlchemy==1.4.* SQLAlchemy==1.4.*
......
...@@ -64,12 +64,24 @@ RUN python3 -m pip install -r requirements.txt ...@@ -64,12 +64,24 @@ RUN python3 -m pip install -r requirements.txt
WORKDIR /var/teraflow WORKDIR /var/teraflow
COPY src/telemetry/frontend/__init__.py telemetry/frontend/__init__.py COPY src/telemetry/frontend/__init__.py telemetry/frontend/__init__.py
COPY src/telemetry/frontend/client/. telemetry/frontend/client/ COPY src/telemetry/frontend/client/. telemetry/frontend/client/
COPY src/analytics/frontend/client/. analytics/frontend/client/
COPY src/analytics/frontend/service/. analytics/frontend/service/
COPY src/analytics/database/. analytics/database/
COPY src/analytics/frontend/__init__.py analytics/frontend/__init__.py
COPY src/context/__init__.py context/__init__.py COPY src/context/__init__.py context/__init__.py
COPY src/context/client/. context/client/ COPY src/context/client/. context/client/
COPY src/kpi_value_api/__init__.py kpi_value_api/__init__.py
COPY src/kpi_value_api/client/. kpi_value_api/client/
COPY src/kpi_manager/__init__.py kpi_manager/__init__.py COPY src/kpi_manager/__init__.py kpi_manager/__init__.py
COPY src/kpi_manager/client/. kpi_manager/client/ COPY src/kpi_manager/client/. kpi_manager/client/
COPY src/monitoring/__init__.py monitoring/__init__.py COPY src/monitoring/__init__.py monitoring/__init__.py
COPY src/monitoring/client/. monitoring/client/ COPY src/monitoring/client/. monitoring/client/
COPY src/automation/. automation/ COPY src/automation/. automation/
# Start the service # Start the service
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apscheduler==3.10.4
confluent-kafka==2.3.*
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
sqlalchemy-cockroachdb==1.4.*
SQLAlchemy-Utils==0.38.*
...@@ -27,11 +27,19 @@ from kpi_manager.client.KpiManagerClient import KpiManagerClient ...@@ -27,11 +27,19 @@ from kpi_manager.client.KpiManagerClient import KpiManagerClient
from common.proto.context_pb2 import ( Service ) from common.proto.context_pb2 import ( Service )
from common.proto.kpi_manager_pb2 import (KpiId, KpiDescriptor) from common.proto.kpi_manager_pb2 import (KpiId, KpiDescriptor)
from common.proto.kpi_value_api_pb2 import (KpiAlarms)
from common.proto.policy_pb2 import PolicyRuleService, PolicyRuleState from common.proto.policy_pb2 import PolicyRuleService, PolicyRuleState
from common.proto.policy_action_pb2 import PolicyRuleAction , PolicyRuleActionConfig from common.proto.policy_action_pb2 import PolicyRuleAction , PolicyRuleActionConfig
from common.proto.policy_condition_pb2 import PolicyRuleCondition from common.proto.policy_condition_pb2 import PolicyRuleCondition
from uuid import uuid4 from uuid import uuid4
import json
from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId
from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient
from common.method_wrappers.ServiceExceptions import InvalidArgumentException from common.method_wrappers.ServiceExceptions import InvalidArgumentException
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -49,6 +57,8 @@ class AutomationServiceServicerImpl(AutomationServiceServicer): ...@@ -49,6 +57,8 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
kpi_manager_client = KpiManagerClient() kpi_manager_client = KpiManagerClient()
policy_client = PolicyClient() policy_client = PolicyClient()
telemetry_frontend_client = TelemetryFrontendClient() telemetry_frontend_client = TelemetryFrontendClient()
analytics_frontend_client = AnalyticsFrontendClient()
analytic_frontend_service = AnalyticsFrontendServiceServicerImpl()
LOGGER.info('Trying to get the service ') LOGGER.info('Trying to get the service ')
LOGGER.info('request.serviceId.service_uuid.uuid({:s})'.format(str(request.serviceId.service_uuid.uuid))) LOGGER.info('request.serviceId.service_uuid.uuid({:s})'.format(str(request.serviceId.service_uuid.uuid)))
...@@ -98,16 +108,31 @@ class AutomationServiceServicerImpl(AutomationServiceServicer): ...@@ -98,16 +108,31 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
LOGGER.info('kpi_id_rx({:s})'.format(str(kpi_id_rx))) LOGGER.info('kpi_id_rx({:s})'.format(str(kpi_id_rx)))
########################################### ###########################################
####### START Analyzer LAT ################
# analyzer = Analyzer()
# analyzer.algorithm_name = '' # static ####### START Collector TX #################
# analyzer.operation_mode = '' collect_tx = Collector()
# analyzer.input_kpi_ids[] = [kpi_id_rx,kpi_id_tx] collect_tx.collector_id.collector_id.uuid = str(uuid4())
# analyzer.output_kpi_ids[] = [kpi_id_lat] collect_tx.kpi_id.kpi_id.uuid = kpi_id_tx.kpi_id.uuid
# collect_tx.duration_s = 2000 # static
# analyzer_id_lat: AnalyzerId = analyzer_client.StartAnalyzer(analyzer) collect_tx.interval_s = 1 # static
# LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat))) LOGGER.info('Start Collector TX'.format(str(collect_tx)))
###########################################
collect_id_tx: CollectorId = telemetry_frontend_client.StartCollector(collect_tx)
LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_tx)))
#############################################
####### START Collector RX ##################
collect_rx = Collector()
collect_rx.collector_id.collector_id.uuid = str(uuid4())
collect_rx.kpi_id.kpi_id.uuid = kpi_id_rx.kpi_id.uuid
collect_rx.duration_s = 2000 # static
collect_rx.interval_s = 1 # static
LOGGER.info('Start Collector RX'.format(str(collect_rx)))
collect_id_rx: CollectorId = telemetry_frontend_client.StartCollector(collect_rx)
LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_rx)))
###############################################
####### SET Policy LAT ################ ####### SET Policy LAT ################
policy_lat = PolicyRuleService() policy_lat = PolicyRuleService()
...@@ -144,29 +169,35 @@ class AutomationServiceServicerImpl(AutomationServiceServicer): ...@@ -144,29 +169,35 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state))) LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state)))
####### START Collector TX ################# ####### START Analyzer LAT ################
collect_tx = Collector() analyzer = Analyzer()
collect_tx.collector_id.collector_id.uuid = str(uuid4()) analyzer.analyzer_id.analyzer_id.uuid = str(uuid4())
collect_tx.kpi_id.kpi_id.uuid = kpi_id_tx.kpi_id.uuid analyzer.algorithm_name = 'Test_Aggergate_and_Threshold' # static
collect_tx.duration_s = 0 # static analyzer.operation_mode = 2
collect_tx.interval_s = 1 # static analyzer.input_kpi_ids.append(kpi_id_rx)
LOGGER.info('Start Collector TX'.format(str(collect_tx))) analyzer.input_kpi_ids.append(kpi_id_tx)
analyzer.output_kpi_ids.append(kpi_id_lat)
collect_id_tx: CollectorId = telemetry_frontend_client.StartCollector(collect_tx)
LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_tx))) _threshold_dict = {'min_latency_E2E': (2, 105)}
############################################# analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
analyzer.parameters['window_size'] = "60s"
analyzer.parameters['window_slider'] = "30s"
analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(analyzer)
LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat)))
kpi_value_api_client = KpiValueApiClient()
stream: KpiAlarms = kpi_value_api_client.GetKpiAlarms(kpi_id_lat.kpi_id.uuid)
for response in stream:
if response is None:
LOGGER.debug('NO message')
else:
LOGGER.debug(str(response))
###########################################
####### START Collector RX ################## # for response in analytic_frontend_service.StartResponseListener( analyzer_id_lat.analyzer_id.uuid):
collect_rx = Collector() # LOGGER.info("response.value {:s}",response)
collect_rx.collector_id.collector_id.uuid = str(uuid4())
collect_rx.kpi_id.kpi_id.uuid = kpi_id_rx.kpi_id.uuid
collect_rx.duration_s = 0 # static
collect_rx.interval_s = 1 # static
LOGGER.info('Start Collector RX'.format(str(collect_rx)))
collect_id_rx: CollectorId = telemetry_frontend_client.StartCollector(collect_rx)
LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_rx)))
###############################################
except grpc.RpcError as e: except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member
......
...@@ -47,6 +47,7 @@ class DeviceTypeEnum(Enum): ...@@ -47,6 +47,7 @@ class DeviceTypeEnum(Enum):
PACKET_ROUTER = 'packet-router' PACKET_ROUTER = 'packet-router'
PACKET_SWITCH = 'packet-switch' PACKET_SWITCH = 'packet-switch'
XR_CONSTELLATION = 'xr-constellation' XR_CONSTELLATION = 'xr-constellation'
QKD_NODE = 'qkd-node'
# ETSI TeraFlowSDN controller # ETSI TeraFlowSDN controller
TERAFLOWSDN_CONTROLLER = 'teraflowsdn' TERAFLOWSDN_CONTROLLER = 'teraflowsdn'