diff --git a/.gitignore b/.gitignore index 20b98c30c5b3edb0983578b0a5f74fb1c1f3025e..e1f87cfd3842c264bd219237e9afe113d61c35bc 100644 --- a/.gitignore +++ b/.gitignore @@ -176,3 +176,6 @@ libyang/ # Other logs **/logs/*.log.* + +# PySpark checkpoints +src/analytics/.spark/* diff --git a/deploy/all.sh b/deploy/all.sh index e9b33b469b7cad1547ab0dcb63e326672f51971e..06b8ee701530f56381080879d0e2941b664e5197 100755 --- a/deploy/all.sh +++ b/deploy/all.sh @@ -33,7 +33,7 @@ export TFS_COMPONENTS=${TFS_COMPONENTS:-"context device pathcomp service slice n #export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring" # Uncomment to activate Monitoring Framework (new) -#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api" +#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics" # Uncomment to activate BGP-LS Speaker #export TFS_COMPONENTS="${TFS_COMPONENTS} bgpls_speaker" diff --git a/deploy/tfs.sh b/deploy/tfs.sh index e7201441815c7cc08c46cce3714f33f43401c2eb..1dceae1c1b4ee3e2a36816557b54df48b224eba1 100755 --- a/deploy/tfs.sh +++ b/deploy/tfs.sh @@ -182,7 +182,19 @@ kubectl create secret generic crdb-telemetry --namespace ${TFS_K8S_NAMESPACE} -- --from-literal=CRDB_SSLMODE=require printf "\n" -echo "Create secret with Apache Kafka data for KPI and Telemetry microservices" +echo "Create secret with CockroachDB data for Analytics microservices" +CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') +CRDB_DATABASE_ANALYTICS="tfs_analytics" # TODO: change by specific configurable environment variable +kubectl create secret generic crdb-analytics --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ + --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ + --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ + --from-literal=CRDB_DATABASE=${CRDB_DATABASE_ANALYTICS} \ + --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ + --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ + --from-literal=CRDB_SSLMODE=require +printf "\n" + +echo "Create secret with Apache Kafka data for KPI, Telemetry and Analytics microservices" KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}') kubectl create secret generic kfk-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \ @@ -264,7 +276,7 @@ for COMPONENT in $TFS_COMPONENTS; do if [ "$COMPONENT" == "ztp" ] || [ "$COMPONENT" == "policy" ]; then $DOCKER_BUILD -t "$COMPONENT:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/Dockerfile ./src/"$COMPONENT"/ > "$BUILD_LOG" - elif [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then + elif [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ] || [ "$COMPONENT" == "analytics" ]; then BUILD_LOG="$TMP_LOGS_FOLDER/build_${COMPONENT}-frontend.log" $DOCKER_BUILD -t "$COMPONENT-frontend:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/frontend/Dockerfile . > "$BUILD_LOG" @@ -287,7 +299,7 @@ for COMPONENT in $TFS_COMPONENTS; do echo " Pushing Docker image to '$TFS_REGISTRY_IMAGES'..." - if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then + if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ] || [ "$COMPONENT" == "analytics" ] ; then IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT-frontend:$TFS_IMAGE_TAG" | sed 's,//,/,g' | sed 's,http:/,,g') TAG_LOG="$TMP_LOGS_FOLDER/tag_${COMPONENT}-frontend.log" @@ -338,7 +350,7 @@ for COMPONENT in $TFS_COMPONENTS; do cp ./manifests/"${COMPONENT}"service.yaml "$MANIFEST" fi - if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then + if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ] || [ "$COMPONENT" == "analytics" ]; then IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT-frontend:$TFS_IMAGE_TAG" | sed 's,//,/,g' | sed 's,http:/,,g') VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}-frontend:" "$MANIFEST" | cut -d ":" -f4) sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT-frontend:${VERSION}#image: $IMAGE_URL#g" "$MANIFEST" diff --git a/manifests/analyticsservice.yaml b/manifests/analyticsservice.yaml new file mode 100644 index 0000000000000000000000000000000000000000..0fa3ed0be6eda8cf944e199543e3c2cd59cc98d6 --- /dev/null +++ b/manifests/analyticsservice.yaml @@ -0,0 +1,128 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: analyticsservice +spec: + selector: + matchLabels: + app: analyticsservice + #replicas: 1 + template: + metadata: + labels: + app: analyticsservice + spec: + terminationGracePeriodSeconds: 5 + containers: + - name: frontend + image: labs.etsi.org:5050/tfs/controller/analytics-frontend:latest + imagePullPolicy: Always + ports: + - containerPort: 30080 + - containerPort: 9192 + env: + - name: LOG_LEVEL + value: "INFO" + envFrom: + - secretRef: + name: crdb-analytics + - secretRef: + name: kfk-kpi-data + readinessProbe: + exec: + command: ["/bin/grpc_health_probe", "-addr=:30080"] + livenessProbe: + exec: + command: ["/bin/grpc_health_probe", "-addr=:30080"] + resources: + requests: + cpu: 250m + memory: 128Mi + limits: + cpu: 1000m + memory: 1024Mi + - name: backend + image: labs.etsi.org:5050/tfs/controller/analytics-backend:latest + imagePullPolicy: Always + ports: + - containerPort: 30090 + - containerPort: 9192 + env: + - name: LOG_LEVEL + value: "INFO" + envFrom: + - secretRef: + name: kfk-kpi-data + readinessProbe: + exec: + command: ["/bin/grpc_health_probe", "-addr=:30090"] + livenessProbe: + exec: + command: ["/bin/grpc_health_probe", "-addr=:30090"] + resources: + requests: + cpu: 250m + memory: 128Mi + limits: + cpu: 1000m + memory: 1024Mi +--- +apiVersion: v1 +kind: Service +metadata: + name: analyticsservice + labels: + app: analyticsservice +spec: + type: ClusterIP + selector: + app: analyticsservice + ports: + - name: frontend-grpc + protocol: TCP + port: 30080 + targetPort: 30080 + - name: backend-grpc + protocol: TCP + port: 30090 + targetPort: 30090 + - name: metrics + protocol: TCP + port: 9192 + targetPort: 9192 +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: analyticsservice-hpa +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: analyticsservice + minReplicas: 1 + maxReplicas: 20 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 80 + #behavior: + # scaleDown: + # stabilizationWindowSeconds: 30 diff --git a/manifests/kafka/02-kafka.yaml b/manifests/kafka/02-kafka.yaml index 8e4562e6eabec34bf3b87912310479bd98022aeb..8400f5944193458ccdad8be5dbc189f8f40cdd7b 100644 --- a/manifests/kafka/02-kafka.yaml +++ b/manifests/kafka/02-kafka.yaml @@ -53,9 +53,9 @@ spec: - name: KAFKA_LISTENERS value: PLAINTEXT://:9092 - name: KAFKA_ADVERTISED_LISTENERS - value: PLAINTEXT://localhost:9092 + value: PLAINTEXT://kafka-service.kafka.svc.cluster.local:9092 image: wurstmeister/kafka imagePullPolicy: IfNotPresent name: kafka-broker ports: - - containerPort: 9092 \ No newline at end of file + - containerPort: 9092 diff --git a/proto/analytics_frontend.proto b/proto/analytics_frontend.proto index 096c1ee035ae663359d9f4df1e071d3997a0d351..ace0581db816bee1d0d20746f2b864dce602567b 100644 --- a/proto/analytics_frontend.proto +++ b/proto/analytics_frontend.proto @@ -30,21 +30,25 @@ message AnalyzerId { } enum AnalyzerOperationMode { - ANALYZEROPERATIONMODE_BATCH = 0; - ANALYZEROPERATIONMODE_STREAMING = 1; + ANALYZEROPERATIONMODE_UNSPECIFIED = 0; + ANALYZEROPERATIONMODE_BATCH = 1; + ANALYZEROPERATIONMODE_STREAMING = 2; } +// duration field may be added in analyzer... message Analyzer { - string algorithm_name = 1; // The algorithm to be executed - repeated kpi_manager.KpiId input_kpi_ids = 2; // The KPI Ids to be processed by the analyzer - repeated kpi_manager.KpiId output_kpi_ids = 3; // The KPI Ids produced by the analyzer - AnalyzerOperationMode operation_mode = 4; // Operation mode of the analyzer - - // In batch mode... - float batch_min_duration_s = 5; // ..., min duration to collect before executing batch - float batch_max_duration_s = 6; // ..., max duration collected to execute the batch - uint64 batch_min_size = 7; // ..., min number of samples to collect before executing batch - uint64 batch_max_size = 8; // ..., max number of samples collected to execute the batch + AnalyzerId analyzer_id = 1; + string algorithm_name = 2; // The algorithm to be executed + float duration_s = 3; // Termiate the data analytics thread after duration (seconds); 0 = infinity time + repeated kpi_manager.KpiId input_kpi_ids = 4; // The KPI Ids to be processed by the analyzer + repeated kpi_manager.KpiId output_kpi_ids = 5; // The KPI Ids produced by the analyzer + AnalyzerOperationMode operation_mode = 6; // Operation mode of the analyzer + map parameters = 7; // Add dictionary of (key, value) pairs such as (window_size, 10) etc. + // In batch mode... + float batch_min_duration_s = 8; // ..., min duration to collect before executing batch + float batch_max_duration_s = 9; // ..., max duration collected to execute the batch + uint64 batch_min_size = 10; // ..., min number of samples to collect before executing batch + uint64 batch_max_size = 11; // ..., max number of samples collected to execute the batch } message AnalyzerFilter { diff --git a/scripts/run_tests_locally-analytics-DB.sh b/scripts/run_tests_locally-analytics-DB.sh new file mode 100755 index 0000000000000000000000000000000000000000..9df5068d6bde361a4a1e73b96990c0d407c88cb4 --- /dev/null +++ b/scripts/run_tests_locally-analytics-DB.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +RCFILE=$PROJECTDIR/coverage/.coveragerc +CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" +python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ + analytics/tests/test_analytics_db.py diff --git a/scripts/run_tests_locally-analytics-frontend.sh b/scripts/run_tests_locally-analytics-frontend.sh new file mode 100755 index 0000000000000000000000000000000000000000..e30d30da623b2d0eee3d925d69a846b4b1f516a3 --- /dev/null +++ b/scripts/run_tests_locally-analytics-frontend.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +RCFILE=$PROJECTDIR/coverage/.coveragerc +CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" +python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ + analytics/frontend/tests/test_frontend.py diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh index 9cf404ffcef6c99b261f81eb0c6b910dd60845e5..79db05fcf1259365e8a909ee99395eb59dfb9437 100755 --- a/scripts/run_tests_locally-telemetry-backend.sh +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -24,5 +24,5 @@ cd $PROJECTDIR/src # python3 kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc -python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \ - telemetry/backend/tests/testTelemetryBackend.py +python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \ + telemetry/backend/tests/test_TelemetryBackend.py diff --git a/scripts/show_logs_analytics_backend.sh b/scripts/show_logs_analytics_backend.sh new file mode 100755 index 0000000000000000000000000000000000000000..afb58567ca5ab250da48d2cfffa2c56abdff2db2 --- /dev/null +++ b/scripts/show_logs_analytics_backend.sh @@ -0,0 +1,27 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +######################################################################################################################## +# Define your deployment settings here +######################################################################################################################## + +# If not already set, set the name of the Kubernetes namespace to deploy to. +export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} + +######################################################################################################################## +# Automated steps start here +######################################################################################################################## + +kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/analyticsservice -c backend diff --git a/scripts/show_logs_analytics_frontend.sh b/scripts/show_logs_analytics_frontend.sh new file mode 100755 index 0000000000000000000000000000000000000000..6d3fae10b366f0082d3a393c224e8f1cb7830721 --- /dev/null +++ b/scripts/show_logs_analytics_frontend.sh @@ -0,0 +1,27 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +######################################################################################################################## +# Define your deployment settings here +######################################################################################################################## + +# If not already set, set the name of the Kubernetes namespace to deploy to. +export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} + +######################################################################################################################## +# Automated steps start here +######################################################################################################################## + +kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/analyticsservice -c frontend diff --git a/src/analytics/README.md b/src/analytics/README.md new file mode 100644 index 0000000000000000000000000000000000000000..9663e5321ace6866491b90553553d9ccbf5793a1 --- /dev/null +++ b/src/analytics/README.md @@ -0,0 +1,4 @@ +# How to locally run and test Analytic service (To be added soon) + +### Pre-requisets +The following requirements should be fulfilled before the execuation of Telemetry service. diff --git a/src/analytics/__init__.py b/src/analytics/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..bbfc943b68af13a11e562abbc8680ade71db8f02 --- /dev/null +++ b/src/analytics/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/analytics/backend/Dockerfile b/src/analytics/backend/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..17adcd3ab1df5704cc7ef0c5a19b3cfb1539ee22 --- /dev/null +++ b/src/analytics/backend/Dockerfile @@ -0,0 +1,69 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.9-slim + +# Install dependencies +RUN apt-get --yes --quiet --quiet update && \ + apt-get --yes --quiet --quiet install wget g++ git && \ + rm -rf /var/lib/apt/lists/* + +# Set Python to show logs as they occur +ENV PYTHONUNBUFFERED=0 + +# Download the gRPC health probe +RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \ + wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ + chmod +x /bin/grpc_health_probe + +# Get generic Python packages +RUN python3 -m pip install --upgrade pip +RUN python3 -m pip install --upgrade setuptools wheel +RUN python3 -m pip install --upgrade pip-tools + +# Get common Python packages +# Note: this step enables sharing the previous Docker build steps among all the Python components +WORKDIR /var/teraflow +COPY common_requirements.in common_requirements.in +RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in +RUN python3 -m pip install -r common_requirements.txt + +# Add common files into working directory +WORKDIR /var/teraflow/common +COPY src/common/. ./ +RUN rm -rf proto + +# Create proto sub-folder, copy .proto files, and generate Python code +RUN mkdir -p /var/teraflow/common/proto +WORKDIR /var/teraflow/common/proto +RUN touch __init__.py +COPY proto/*.proto ./ +RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto +RUN rm *.proto +RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; + +# Create component sub-folders, get specific Python packages +RUN mkdir -p /var/teraflow/analytics/backend +WORKDIR /var/teraflow/analytics/backend +COPY src/analytics/backend/requirements.in requirements.in +RUN pip-compile --quiet --output-file=requirements.txt requirements.in +RUN python3 -m pip install -r requirements.txt + +# Add component files into working directory +WORKDIR /var/teraflow +COPY src/analytics/__init__.py analytics/__init__.py +COPY src/analytics/backend/. analytics/backend/ + +# Start the service +ENTRYPOINT ["python", "-m", "analytics.backend.service"] diff --git a/src/analytics/backend/__init__.py b/src/analytics/backend/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..bbfc943b68af13a11e562abbc8680ade71db8f02 --- /dev/null +++ b/src/analytics/backend/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/analytics/backend/requirements.in b/src/analytics/backend/requirements.in new file mode 100644 index 0000000000000000000000000000000000000000..9df678fe819f33d479b8f5090ca9ac4eb1f4047c --- /dev/null +++ b/src/analytics/backend/requirements.in @@ -0,0 +1,16 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +pyspark==3.5.2 +confluent-kafka==2.3.* diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py new file mode 100755 index 0000000000000000000000000000000000000000..595603567fe537d9f7b33224cba0fe016a439631 --- /dev/null +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -0,0 +1,132 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import json +import logging +import threading +from common.tools.service.GenericGrpcService import GenericGrpcService +from analytics.backend.service.SparkStreaming import SparkStreamer +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from confluent_kafka import Consumer as KafkaConsumer +from confluent_kafka import KafkaError +from common.Constants import ServiceNameEnum +from common.Settings import get_service_port_grpc + + +LOGGER = logging.getLogger(__name__) + +class AnalyticsBackendService(GenericGrpcService): + """ + Class listens for ... + """ + def __init__(self, cls_name : str = __name__) -> None: + LOGGER.info('Init AnalyticsBackendService') + port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND) + super().__init__(port, cls_name=cls_name) + self.running_threads = {} # To keep track of all running analyzers + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'analytics-frontend', + 'auto.offset.reset' : 'latest'}) + + def StartSparkStreamer(self, analyzer_uuid, analyzer): + kpi_list = analyzer['input_kpis'] + oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())] # TODO: update this line... + thresholds = analyzer['thresholds'] + window_size = analyzer['window_size'] + window_slider = analyzer['window_slider'] + print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( + kpi_list, oper_list, thresholds, window_size, window_slider)) + LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( + kpi_list, oper_list, thresholds, window_size, window_slider)) + try: + stop_event = threading.Event() + thread = threading.Thread(target=SparkStreamer, + args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event, + window_size, window_slider, None )) + self.running_threads[analyzer_uuid] = (thread, stop_event) + thread.start() + print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) + LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) + return True + except Exception as e: + print ("Failed to initiate Analyzer backend: {:}".format(e)) + LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) + return False + + def StopRequestListener(self, threadInfo: tuple): + try: + thread, stop_event = threadInfo + stop_event.set() + thread.join() + print ("Terminating Analytics backend RequestListener") + LOGGER.info("Terminating Analytics backend RequestListener") + return True + except Exception as e: + print ("Failed to terminate analytics backend {:}".format(e)) + LOGGER.error("Failed to terminate analytics backend {:}".format(e)) + return False + + def install_services(self): + stop_event = threading.Event() + thread = threading.Thread(target=self.RequestListener, + args=(stop_event,) ) + thread.start() + return (thread, stop_event) + + def RequestListener(self, stop_event): + """ + listener for requests on Kafka topic. + """ + consumer = self.kafka_consumer + consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) + while not stop_event.is_set(): + receive_msg = consumer.poll(2.0) + if receive_msg is None: + continue + elif receive_msg.error(): + if receive_msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print("Consumer error: {}".format(receive_msg.error())) + break + analyzer = json.loads(receive_msg.value().decode('utf-8')) + analyzer_uuid = receive_msg.key().decode('utf-8') + LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + + if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: + self.TerminateAnalyzerBackend(analyzer_uuid) + else: + self.StartSparkStreamer(analyzer_uuid, analyzer) + LOGGER.debug("Stop Event activated. Terminating...") + print ("Stop Event activated. Terminating...") + + def TerminateAnalyzerBackend(self, analyzer_uuid): + if analyzer_uuid in self.running_threads: + try: + thread, stop_event = self.running_threads[analyzer_uuid] + stop_event.set() + thread.join() + del self.running_threads[analyzer_uuid] + print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) + LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) + return True + except Exception as e: + LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e)) + return False + else: + print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) + LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) + # generate confirmation towards frontend diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py new file mode 100644 index 0000000000000000000000000000000000000000..96e1aa05d898ffdd23c533b74ee87fbf03f54576 --- /dev/null +++ b/src/analytics/backend/service/SparkStreaming.py @@ -0,0 +1,154 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging, time +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType +from pyspark.sql.functions import from_json, col, window, avg, min, max, first, last, stddev, when, round +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic + +LOGGER = logging.getLogger(__name__) + +def DefiningSparkSession(): + # Create a Spark session with specific spark verions (3.5.0) + return SparkSession.builder \ + .appName("Analytics") \ + .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \ + .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \ + .getOrCreate() + +def SettingKafkaConsumerParams(): # TODO: create get_kafka_consumer() in common with inputs (bootstrap server, subscribe, startingOffset and failOnDataLoss with default values) + return { + # "kafka.bootstrap.servers": '127.0.0.1:9092', + "kafka.bootstrap.servers": KafkaConfig.get_kafka_address(), + "subscribe" : KafkaTopic.VALUE.value, + "startingOffsets" : 'latest', + "failOnDataLoss" : 'false' # Optional: Set to "true" to fail the query on data loss + } + +def DefiningRequestSchema(): + return StructType([ + StructField("time_stamp" , StringType() , True), + StructField("kpi_id" , StringType() , True), + StructField("kpi_value" , DoubleType() , True) + ]) + +def GetAggregations(oper_list): + # Define the possible aggregation functions + agg_functions = { + 'avg' : round(avg ("kpi_value"), 3) .alias("avg_value"), + 'min' : round(min ("kpi_value"), 3) .alias("min_value"), + 'max' : round(max ("kpi_value"), 3) .alias("max_value"), + 'first': round(first ("kpi_value"), 3) .alias("first_value"), + 'last' : round(last ("kpi_value"), 3) .alias("last_value"), + 'stdev': round(stddev ("kpi_value"), 3) .alias("stdev_value") + } + return [agg_functions[op] for op in oper_list if op in agg_functions] # Filter and return only the selected aggregations + +def ApplyThresholds(aggregated_df, thresholds): + # Apply thresholds (TH-Fail and TH-RAISE) based on the thresholds dictionary on the aggregated DataFrame. + + # Loop through each column name and its associated thresholds + for col_name, (fail_th, raise_th) in thresholds.items(): + # Apply TH-Fail condition (if column value is less than the fail threshold) + aggregated_df = aggregated_df.withColumn( + f"{col_name}_THRESHOLD_FAIL", + when(col(col_name) < fail_th, True).otherwise(False) + ) + # Apply TH-RAISE condition (if column value is greater than the raise threshold) + aggregated_df = aggregated_df.withColumn( + f"{col_name}_THRESHOLD_RAISE", + when(col(col_name) > raise_th, True).otherwise(False) + ) + return aggregated_df + +def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event, + window_size=None, win_slide_duration=None, time_stamp_col=None): + """ + Method to perform Spark operation Kafka stream. + NOTE: Kafka topic to be processesd should have atleast one row before initiating the spark session. + """ + kafka_consumer_params = SettingKafkaConsumerParams() # Define the Kafka consumer parameters + schema = DefiningRequestSchema() # Define the schema for the incoming JSON data + spark = DefiningSparkSession() # Define the spark session with app name and spark version + + # extra options default assignment + if window_size is None: window_size = "60 seconds" # default + if win_slide_duration is None: win_slide_duration = "30 seconds" # default + if time_stamp_col is None: time_stamp_col = "time_stamp" # default + + try: + # Read data from Kafka + raw_stream_data = spark \ + .readStream \ + .format("kafka") \ + .options(**kafka_consumer_params) \ + .load() + + # Convert the value column from Kafka to a string + stream_data = raw_stream_data.selectExpr("CAST(value AS STRING)") + # Parse the JSON string into a DataFrame with the defined schema + parsed_stream_data = stream_data.withColumn("parsed_value", from_json(col("value"), schema)) + # Select the parsed fields + final_stream_data = parsed_stream_data.select("parsed_value.*") + # Convert the time_stamp to proper timestamp (assuming it's in ISO format) + final_stream_data = final_stream_data.withColumn(time_stamp_col, col(time_stamp_col).cast(TimestampType())) + # Filter the stream to only include rows where the kpi_id is in the kpi_list + filtered_stream_data = final_stream_data.filter(col("kpi_id").isin(kpi_list)) + # Define a window for aggregation + windowed_stream_data = filtered_stream_data \ + .groupBy( + window( col(time_stamp_col), + window_size, slideDuration=win_slide_duration + ), + col("kpi_id") + ) \ + .agg(*GetAggregations(oper_list)) + # Apply thresholds to the aggregated data + thresholded_stream_data = ApplyThresholds(windowed_stream_data, thresholds) + + # --- This will write output on console: FOR TESTING PURPOSES + # Start the Spark streaming query + # query = thresholded_stream_data \ + # .writeStream \ + # .outputMode("update") \ + # .format("console") + + # --- This will write output to Kafka: ACTUAL IMPLEMENTATION + query = thresholded_stream_data \ + .selectExpr(f"'{key}' AS key", "to_json(struct(*)) AS value") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \ + .option("topic", KafkaTopic.ANALYTICS_RESPONSE.value) \ + .option("checkpointLocation", "analytics/.spark/checkpoint") \ + .outputMode("update") + + # Start the query execution + queryHandler = query.start() + + # Loop to check for stop event flag. To be set by stop collector method. + while True: + if stop_event.is_set(): + LOGGER.debug("Stop Event activated. Terminating in 5 seconds...") + print ("Stop Event activated. Terminating in 5 seconds...") + time.sleep(5) + queryHandler.stop() + break + time.sleep(5) + + except Exception as e: + print("Error in Spark streaming process: {:}".format(e)) + LOGGER.debug("Error in Spark streaming process: {:}".format(e)) diff --git a/src/analytics/backend/service/__init__.py b/src/analytics/backend/service/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..bbfc943b68af13a11e562abbc8680ade71db8f02 --- /dev/null +++ b/src/analytics/backend/service/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/analytics/backend/service/__main__.py b/src/analytics/backend/service/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..3c4c36b7c7bd952164bf9e48a45e22fb00575564 --- /dev/null +++ b/src/analytics/backend/service/__main__.py @@ -0,0 +1,56 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, signal, sys, threading +from prometheus_client import start_http_server +from common.Settings import get_log_level, get_metrics_port +from .AnalyticsBackendService import AnalyticsBackendService + +terminate = threading.Event() +LOGGER = None + +def signal_handler(signal, frame): # pylint: disable=redefined-outer-name + LOGGER.warning('Terminate signal received') + terminate.set() + +def main(): + global LOGGER # pylint: disable=global-statement + + log_level = get_log_level() + logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") + LOGGER = logging.getLogger(__name__) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + LOGGER.info('Starting...') + + # Start metrics server + metrics_port = get_metrics_port() + start_http_server(metrics_port) + + grpc_service = AnalyticsBackendService() + grpc_service.start() + + # Wait for Ctrl+C or termination signal + while not terminate.wait(timeout=1.0): pass + + LOGGER.info('Terminating...') + grpc_service.stop() + + LOGGER.info('Bye') + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/analytics/backend/tests/__init__.py b/src/analytics/backend/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..bbfc943b68af13a11e562abbc8680ade71db8f02 --- /dev/null +++ b/src/analytics/backend/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py new file mode 100644 index 0000000000000000000000000000000000000000..9acd6ad9dffe4a5b10b107a6923ed85170ee141f --- /dev/null +++ b/src/analytics/backend/tests/messages.py @@ -0,0 +1,34 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def get_kpi_id_list(): + return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"] + +def get_operation_list(): + return [ 'avg', 'max' ] # possibilities ['avg', 'min', 'max', 'first', 'last', 'stdev'] + +def get_threshold_dict(): + threshold_dict = { + 'avg_value' : (20, 30), + 'min_value' : (00, 10), + 'max_value' : (45, 50), + 'first_value' : (00, 10), + 'last_value' : (40, 50), + 'stdev_value' : (00, 10), + } + # Filter threshold_dict based on the operation_list + return { + op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict + } diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py new file mode 100644 index 0000000000000000000000000000000000000000..2f40faba94ef7081db609116e8fd869e3d119a24 --- /dev/null +++ b/src/analytics/backend/tests/test_backend.py @@ -0,0 +1,64 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import logging +import threading +from common.tools.kafka.Variables import KafkaTopic +from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService +from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict + +LOGGER = logging.getLogger(__name__) + + +########################### +# Tests Implementation of Telemetry Backend +########################### + +# --- "test_validate_kafka_topics" should be run before the functionality tests --- +def test_validate_kafka_topics(): + LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) + +# def test_StartRequestListener(): +# LOGGER.info('test_RunRequestListener') +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) +# LOGGER.debug(str(response)) +# assert isinstance(response, tuple) + +# To test START and STOP communication together +def test_StopRequestListener(): + LOGGER.info('test_RunRequestListener') + LOGGER.info('Initiating StartRequestListener...') + AnalyticsBackendServiceObj = AnalyticsBackendService() + response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) + # LOGGER.debug(str(response_thread)) + time.sleep(10) + LOGGER.info('Initiating StopRequestListener...') + AnalyticsBackendServiceObj = AnalyticsBackendService() + response = AnalyticsBackendServiceObj.StopRequestListener(response_thread) + LOGGER.debug(str(response)) + assert isinstance(response, bool) + +# To independently tests the SparkListener functionality +# def test_SparkListener(): +# LOGGER.info('test_RunRequestListener') +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# response = AnalyticsBackendServiceObj.RunSparkStreamer( +# get_kpi_id_list(), get_operation_list(), get_threshold_dict() +# ) +# LOGGER.debug(str(response)) +# assert isinstance(response, bool) diff --git a/src/analytics/database/AnalyzerEngine.py b/src/analytics/database/AnalyzerEngine.py new file mode 100644 index 0000000000000000000000000000000000000000..9294e09966ef9e13c9cfa3cab590e5d0c8b6a80e --- /dev/null +++ b/src/analytics/database/AnalyzerEngine.py @@ -0,0 +1,40 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, sqlalchemy +from common.Settings import get_setting + +LOGGER = logging.getLogger(__name__) +CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' + +class AnalyzerEngine: + @staticmethod + def get_engine() -> sqlalchemy.engine.Engine: + crdb_uri = get_setting('CRDB_URI', default=None) + if crdb_uri is None: + CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') + CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') + CRDB_DATABASE = "tfs-analyzer" # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT') + CRDB_USERNAME = get_setting('CRDB_USERNAME') + CRDB_PASSWORD = get_setting('CRDB_PASSWORD') + CRDB_SSLMODE = get_setting('CRDB_SSLMODE') + crdb_uri = CRDB_URI_TEMPLATE.format( + CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) + try: + engine = sqlalchemy.create_engine(crdb_uri, echo=False) + LOGGER.info(' AnalyzerDB initalized with DB URL: {:}'.format(crdb_uri)) + except: # pylint: disable=bare-except # pragma: no cover + LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) + return None # type: ignore + return engine diff --git a/src/analytics/database/AnalyzerModel.py b/src/analytics/database/AnalyzerModel.py new file mode 100644 index 0000000000000000000000000000000000000000..c33e396e06a8dce96a86951a64aa59b510931dfe --- /dev/null +++ b/src/analytics/database/AnalyzerModel.py @@ -0,0 +1,106 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import enum + +from sqlalchemy import Column, String, Float, Enum, BigInteger, JSON +from sqlalchemy.orm import registry +from common.proto import analytics_frontend_pb2 +from common.proto import kpi_manager_pb2 + +from sqlalchemy.dialects.postgresql import UUID, ARRAY + + +logging.basicConfig(level=logging.INFO) +LOGGER = logging.getLogger(__name__) + +# Create a base class for declarative models +Base = registry().generate_base() + +class AnalyzerOperationMode (enum.Enum): + BATCH = analytics_frontend_pb2.AnalyzerOperationMode.ANALYZEROPERATIONMODE_BATCH + STREAMING = analytics_frontend_pb2.AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING + +class Analyzer(Base): + __tablename__ = 'analyzer' + + analyzer_id = Column( UUID(as_uuid=False) , primary_key=True) + algorithm_name = Column( String , nullable=False ) + input_kpi_ids = Column( ARRAY(UUID(as_uuid=False)) , nullable=False ) + output_kpi_ids = Column( ARRAY(UUID(as_uuid=False)) , nullable=False ) + operation_mode = Column( Enum(AnalyzerOperationMode), nullable=False ) + parameters = Column( JSON , nullable=True ) + batch_min_duration_s = Column( Float , nullable=False ) + batch_max_duration_s = Column( Float , nullable=False ) + batch_min_size = Column( BigInteger , nullable=False ) + batch_max_size = Column( BigInteger , nullable=False ) + + # helps in logging the information + def __repr__(self): + return (f"") + + + @classmethod + def ConvertAnalyzerToRow(cls, request): + """ + Create an instance of Analyzer table rows from a request object. + Args: request: The request object containing analyzer gRPC message. + Returns: A row (an instance of Analyzer table) initialized with content of the request. + """ + return cls( + analyzer_id = request.analyzer_id.analyzer_id.uuid, + algorithm_name = request.algorithm_name, + input_kpi_ids = [k.kpi_id.uuid for k in request.input_kpi_ids], + output_kpi_ids = [k.kpi_id.uuid for k in request.output_kpi_ids], + operation_mode = AnalyzerOperationMode(request.operation_mode), # converts integer to coresponding Enum class member + parameters = dict(request.parameters), + batch_min_duration_s = request.batch_min_duration_s, + batch_max_duration_s = request.batch_max_duration_s, + batch_min_size = request.batch_min_size, + batch_max_size = request.batch_max_size + ) + + @classmethod + def ConvertRowToAnalyzer(cls, row): + """ + Create and return an Analyzer gRPC message initialized with the content of a row. + Args: row: The Analyzer table instance (row) containing the data. + Returns: An Analyzer gRPC message initialized with the content of the row. + """ + # Create an instance of the Analyzer message + response = analytics_frontend_pb2.Analyzer() + response.analyzer_id.analyzer_id.uuid = row.analyzer_id + response.algorithm_name = row.algorithm_name + response.operation_mode = row.operation_mode.value + response.parameters.update(row.parameters) + + for input_kpi_id in row.input_kpi_ids: + _kpi_id = kpi_manager_pb2.KpiId() + _kpi_id.kpi_id.uuid = input_kpi_id + response.input_kpi_ids.append(_kpi_id) + for output_kpi_id in row.output_kpi_ids: + _kpi_id = kpi_manager_pb2.KpiId() + _kpi_id.kpi_id.uuid = output_kpi_id + response.output_kpi_ids.append(_kpi_id) + + response.batch_min_duration_s = row.batch_min_duration_s + response.batch_max_duration_s = row.batch_max_duration_s + response.batch_min_size = row.batch_min_size + response.batch_max_size = row.batch_max_size + return response diff --git a/src/analytics/database/Analyzer_DB.py b/src/analytics/database/Analyzer_DB.py new file mode 100644 index 0000000000000000000000000000000000000000..1ba68989a066e4638adc12e65289ed50b740731d --- /dev/null +++ b/src/analytics/database/Analyzer_DB.py @@ -0,0 +1,150 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import sqlalchemy_utils + +from sqlalchemy import inspect, or_ +from sqlalchemy.orm import sessionmaker + +from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel +from analytics.database.AnalyzerEngine import AnalyzerEngine +from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException) + +LOGGER = logging.getLogger(__name__) +DB_NAME = "tfs_analyzer" # TODO: export name from enviornment variable + +class AnalyzerDB: + def __init__(self): + self.db_engine = AnalyzerEngine.get_engine() + if self.db_engine is None: + LOGGER.error('Unable to get SQLAlchemy DB Engine...') + return False + self.db_name = DB_NAME + self.Session = sessionmaker(bind=self.db_engine) + + def create_database(self): + if not sqlalchemy_utils.database_exists(self.db_engine.url): + LOGGER.debug("Database created. {:}".format(self.db_engine.url)) + sqlalchemy_utils.create_database(self.db_engine.url) + + def drop_database(self) -> None: + if sqlalchemy_utils.database_exists(self.db_engine.url): + sqlalchemy_utils.drop_database(self.db_engine.url) + + def create_tables(self): + try: + AnalyzerModel.metadata.create_all(self.db_engine) # type: ignore + LOGGER.debug("Tables created in the database: {:}".format(self.db_name)) + except Exception as e: + LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e))) + raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)]) + + def verify_tables(self): + try: + inspect_object = inspect(self.db_engine) + if(inspect_object.has_table('analyzer', None)): + LOGGER.info("Table exists in DB: {:}".format(self.db_name)) + except Exception as e: + LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) + +# ----------------- CURD OPERATIONS --------------------- + + def add_row_to_db(self, row): + session = self.Session() + try: + session.add(row) + session.commit() + LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.") + return True + except Exception as e: + session.rollback() + if "psycopg2.errors.UniqueViolation" in str(e): + LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") + raise AlreadyExistsException(row.__class__.__name__, row, + extra_details=["Unique key voilation: {:}".format(e)] ) + else: + LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") + raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) + finally: + session.close() + + def search_db_row_by_id(self, model, col_name, id_to_search): + session = self.Session() + try: + entity = session.query(model).filter_by(**{col_name: id_to_search}).first() + if entity: + # LOGGER.debug(f"{model.__name__} ID found: {str(entity)}") + return entity + else: + LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}") + print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) + return None + except Exception as e: + session.rollback() + LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") + raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) + finally: + session.close() + + def delete_db_row_by_id(self, model, col_name, id_to_search): + session = self.Session() + try: + record = session.query(model).filter_by(**{col_name: id_to_search}).first() + if record: + session.delete(record) + session.commit() + LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search) + else: + LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search) + return None + except Exception as e: + session.rollback() + LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) + raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) + finally: + session.close() + + def select_with_filter(self, model, filter_object): + session = self.Session() + try: + query = session.query(AnalyzerModel) + + # Apply filters based on the filter_object + if filter_object.analyzer_id: + query = query.filter(AnalyzerModel.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) + + if filter_object.algorithm_names: + query = query.filter(AnalyzerModel.algorithm_name.in_(filter_object.algorithm_names)) + + if filter_object.input_kpi_ids: + input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids] + query = query.filter(AnalyzerModel.input_kpi_ids.op('&&')(input_kpi_uuids)) + + if filter_object.output_kpi_ids: + output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids] + query = query.filter(AnalyzerModel.output_kpi_ids.op('&&')(output_kpi_uuids)) + + result = query.all() + # query should be added to return all rows + if result: + LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result} + else: + LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}") + return result + except Exception as e: + LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}") + raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)]) + finally: + session.close() diff --git a/src/analytics/database/__init__.py b/src/analytics/database/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3 --- /dev/null +++ b/src/analytics/database/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/analytics/frontend/Dockerfile b/src/analytics/frontend/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..10499713f318a23e1aeab49c96e8163a5ec147fa --- /dev/null +++ b/src/analytics/frontend/Dockerfile @@ -0,0 +1,70 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.9-slim + +# Install dependencies +RUN apt-get --yes --quiet --quiet update && \ + apt-get --yes --quiet --quiet install wget g++ git && \ + rm -rf /var/lib/apt/lists/* + +# Set Python to show logs as they occur +ENV PYTHONUNBUFFERED=0 + +# Download the gRPC health probe +RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \ + wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ + chmod +x /bin/grpc_health_probe + +# Get generic Python packages +RUN python3 -m pip install --upgrade pip +RUN python3 -m pip install --upgrade setuptools wheel +RUN python3 -m pip install --upgrade pip-tools + +# Get common Python packages +# Note: this step enables sharing the previous Docker build steps among all the Python components +WORKDIR /var/teraflow +COPY common_requirements.in common_requirements.in +RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in +RUN python3 -m pip install -r common_requirements.txt + +# Add common files into working directory +WORKDIR /var/teraflow/common +COPY src/common/. ./ +RUN rm -rf proto + +# Create proto sub-folder, copy .proto files, and generate Python code +RUN mkdir -p /var/teraflow/common/proto +WORKDIR /var/teraflow/common/proto +RUN touch __init__.py +COPY proto/*.proto ./ +RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto +RUN rm *.proto +RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; + +# Create component sub-folders, get specific Python packages +RUN mkdir -p /var/teraflow/analytics/frontend +WORKDIR /var/teraflow/analytics/frontend +COPY src/analytics/frontend/requirements.in requirements.in +RUN pip-compile --quiet --output-file=requirements.txt requirements.in +RUN python3 -m pip install -r requirements.txt + +# Add component files into working directory +WORKDIR /var/teraflow +COPY src/analytics/__init__.py analytics/__init__.py +COPY src/analytics/frontend/. analytics/frontend/ +COPY src/analytics/database/. analytics/database/ + +# Start the service +ENTRYPOINT ["python", "-m", "analytics.frontend.service"] diff --git a/src/analytics/frontend/__init__.py b/src/analytics/frontend/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3 --- /dev/null +++ b/src/analytics/frontend/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/analytics/frontend/client/AnalyticsFrontendClient.py b/src/analytics/frontend/client/AnalyticsFrontendClient.py new file mode 100644 index 0000000000000000000000000000000000000000..90e95d661d46f24ae5ffaeb7bcfa19b7e1f36526 --- /dev/null +++ b/src/analytics/frontend/client/AnalyticsFrontendClient.py @@ -0,0 +1,68 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, logging +from common.Constants import ServiceNameEnum +from common.proto.context_pb2 import Empty +from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceStub +from common.proto.analytics_frontend_pb2 import AnalyzerId, Analyzer, AnalyzerFilter, AnalyzerList +from common.Settings import get_service_host, get_service_port_grpc +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.tools.client.RetryDecorator import retry, delay_exponential + +LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 10 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') + +class AnalyticsFrontendClient: + def __init__(self, host=None, port=None): + if not host: host = get_service_host(ServiceNameEnum.ANALYTICSFRONTEND) + if not port: port = get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND) + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) + LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) + self.channel = None + self.stub = None + self.connect() + LOGGER.debug('Channel created') + + def connect(self): + self.channel = grpc.insecure_channel(self.endpoint) + self.stub = AnalyticsFrontendServiceStub(self.channel) + + def close(self): + if self.channel is not None: self.channel.close() + self.channel = None + self.stub = None + + @RETRY_DECORATOR + def StartAnalyzer (self, request: Analyzer) -> AnalyzerId: #type: ignore + LOGGER.debug('StartAnalyzer: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.StartAnalyzer(request) + LOGGER.debug('StartAnalyzer result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def StopAnalyzer(self, request : AnalyzerId) -> Empty: # type: ignore + LOGGER.debug('StopAnalyzer: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.StopAnalyzer(request) + LOGGER.debug('StopAnalyzer result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def SelectAnalyzers(self, request : AnalyzerFilter) -> AnalyzerList: # type: ignore + LOGGER.debug('SelectAnalyzers: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.SelectAnalyzers(request) + LOGGER.debug('SelectAnalyzers result: {:s}'.format(grpc_message_to_json_string(response))) + return response diff --git a/src/analytics/frontend/client/__init__.py b/src/analytics/frontend/client/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3 --- /dev/null +++ b/src/analytics/frontend/client/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/analytics/frontend/requirements.in b/src/analytics/frontend/requirements.in new file mode 100644 index 0000000000000000000000000000000000000000..d81b9ddbeafeff94c830d48ca5594e775b9ce240 --- /dev/null +++ b/src/analytics/frontend/requirements.in @@ -0,0 +1,20 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apscheduler==3.10.4 +confluent-kafka==2.3.* +psycopg2-binary==2.9.* +SQLAlchemy==1.4.* +sqlalchemy-cockroachdb==1.4.* +SQLAlchemy-Utils==0.38.* diff --git a/src/analytics/frontend/service/AnalyticsFrontendService.py b/src/analytics/frontend/service/AnalyticsFrontendService.py new file mode 100644 index 0000000000000000000000000000000000000000..42a7fc9b60418c1c0fc5af6f320ae5c330ce8871 --- /dev/null +++ b/src/analytics/frontend/service/AnalyticsFrontendService.py @@ -0,0 +1,28 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.Constants import ServiceNameEnum +from common.Settings import get_service_port_grpc +from common.tools.service.GenericGrpcService import GenericGrpcService +from common.proto.analytics_frontend_pb2_grpc import add_AnalyticsFrontendServiceServicer_to_server +from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl + +class AnalyticsFrontendService(GenericGrpcService): + def __init__(self, cls_name: str = __name__): + port = get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND) + super().__init__(port, cls_name=cls_name) + self.analytics_frontend_servicer = AnalyticsFrontendServiceServicerImpl() + + def install_servicers(self): + add_AnalyticsFrontendServiceServicer_to_server(self.analytics_frontend_servicer, self.server) diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py new file mode 100644 index 0000000000000000000000000000000000000000..8bb6a17afb5b911e3652fdb8d1853b5b7bc6faf3 --- /dev/null +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -0,0 +1,214 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging, grpc, json, queue + +from typing import Dict +from confluent_kafka import Consumer as KafkaConsumer +from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import KafkaError + +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from common.proto.context_pb2 import Empty +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList +from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer +from analytics.database.Analyzer_DB import AnalyzerDB +from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger + +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC') + +class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): + def __init__(self): + LOGGER.info('Init AnalyticsFrontendService') + self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value + self.db_obj = AnalyzerDB() + self.result_queue = queue.Queue() + self.scheduler = BackgroundScheduler() + self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'analytics-frontend', + 'auto.offset.reset' : 'latest'}) + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def StartAnalyzer(self, + request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore + ) -> AnalyzerId: # type: ignore + LOGGER.info ("At Service gRPC message: {:}".format(request)) + response = AnalyzerId() + + self.db_obj.add_row_to_db( + AnalyzerModel.ConvertAnalyzerToRow(request) + ) + self.PublishStartRequestOnKafka(request) + + response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid + return response + + def PublishStartRequestOnKafka(self, analyzer_obj): + """ + Method to generate analyzer request on Kafka. + """ + analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid + analyzer_to_generate : Dict = { + "algo_name" : analyzer_obj.algorithm_name, + "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], + "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], + "oper_mode" : analyzer_obj.operation_mode, + "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), + "window_size" : analyzer_obj.parameters["window_size"], + "window_slider" : analyzer_obj.parameters["window_slider"], + # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] + } + self.kafka_producer.produce( + KafkaTopic.ANALYTICS_REQUEST.value, + key = analyzer_uuid, + value = json.dumps(analyzer_to_generate), + callback = self.delivery_callback + ) + LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate)) + self.kafka_producer.flush() + + # self.StartResponseListener(analyzer_uuid) + + def StartResponseListener(self, filter_key=None): + """ + Start the Kafka response listener with APScheduler and return key-value pairs periodically. + """ + LOGGER.info("Starting StartResponseListener") + # Schedule the ResponseListener at fixed intervals + self.scheduler.add_job( + self.response_listener, + trigger=IntervalTrigger(seconds=5), + args=[filter_key], + id=f"response_listener_{self.listener_topic}", + replace_existing=True + ) + self.scheduler.start() + LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...") + try: + while True: + LOGGER.info("entering while...") + key, value = self.result_queue.get() # Wait until a result is available + LOGGER.info("In while true ...") + yield key, value # Yield the result to the calling function + except KeyboardInterrupt: + LOGGER.warning("Listener stopped manually.") + finally: + self.StopListener() + + def response_listener(self, filter_key=None): + """ + Poll Kafka messages and put key-value pairs into the queue. + """ + LOGGER.info(f"Polling Kafka topic {self.listener_topic}...") + + consumer = self.kafka_consumer + consumer.subscribe([self.listener_topic]) + msg = consumer.poll(2.0) + if msg is None: + return + elif msg.error(): + if msg.error().code() != KafkaError._PARTITION_EOF: + LOGGER.error(f"Kafka error: {msg.error()}") + return + + try: + key = msg.key().decode('utf-8') if msg.key() else None + if filter_key is not None and key == filter_key: + value = json.loads(msg.value().decode('utf-8')) + LOGGER.info(f"Received key: {key}, value: {value}") + self.result_queue.put((key, value)) + else: + LOGGER.info(f"Skipping message with unmatched key: {key}") + # value = json.loads(msg.value().decode('utf-8')) # Added for debugging + # self.result_queue.put((filter_key, value)) # Added for debugging + except Exception as e: + LOGGER.error(f"Error processing Kafka message: {e}") + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def StopAnalyzer(self, + request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore + ) -> Empty: # type: ignore + LOGGER.info ("At Service gRPC message: {:}".format(request)) + try: + analyzer_id_to_delete = request.analyzer_id.uuid + self.db_obj.delete_db_row_by_id( + AnalyzerModel, "analyzer_id", analyzer_id_to_delete + ) + self.PublishStopRequestOnKafka(analyzer_id_to_delete) + except Exception as e: + LOGGER.error('Unable to delete analyzer. Error: {:}'.format(e)) + return Empty() + + def PublishStopRequestOnKafka(self, analyzer_uuid): + """ + Method to generate stop analyzer request on Kafka. + """ + # analyzer_uuid = analyzer_id.analyzer_id.uuid + analyzer_to_stop : Dict = { + "algo_name" : None, + "input_kpis" : [], + "output_kpis" : [], + "oper_mode" : None + } + self.kafka_producer.produce( + KafkaTopic.ANALYTICS_REQUEST.value, + key = analyzer_uuid, + value = json.dumps(analyzer_to_stop), + callback = self.delivery_callback + ) + LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid)) + self.kafka_producer.flush() + self.StopListener() + + def StopListener(self): + """ + Gracefully stop the Kafka listener and the scheduler. + """ + LOGGER.info("Stopping Kafka listener...") + self.scheduler.shutdown() + LOGGER.info("Kafka listener stopped.") + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def SelectAnalyzers(self, + filter : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore + ) -> AnalyzerList: # type: ignore + LOGGER.info("At Service gRPC message: {:}".format(filter)) + response = AnalyzerList() + try: + rows = self.db_obj.select_with_filter(AnalyzerModel, filter) + try: + for row in rows: + response.analyzer_list.append( + AnalyzerModel.ConvertRowToAnalyzer(row) + ) + return response + except Exception as e: + LOGGER.info('Unable to process filter response {:}'.format(e)) + except Exception as e: + LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e)) + + + def delivery_callback(self, err, msg): + if err: + LOGGER.debug('Message delivery failed: {:}'.format(err)) + print ('Message delivery failed: {:}'.format(err)) + # else: + # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) + # print('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/analytics/frontend/service/__init__.py b/src/analytics/frontend/service/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3 --- /dev/null +++ b/src/analytics/frontend/service/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/analytics/frontend/service/__main__.py b/src/analytics/frontend/service/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..6c331844f45d98095ef98951f3db43a0e2f0c69c --- /dev/null +++ b/src/analytics/frontend/service/__main__.py @@ -0,0 +1,56 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, signal, sys, threading +from prometheus_client import start_http_server +from common.Settings import get_log_level, get_metrics_port +from .AnalyticsFrontendService import AnalyticsFrontendService + +terminate = threading.Event() +LOGGER = None + +def signal_handler(signal, frame): # pylint: disable=redefined-outer-name + LOGGER.warning('Terminate signal received') + terminate.set() + +def main(): + global LOGGER # pylint: disable=global-statement + + log_level = get_log_level() + logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") + LOGGER = logging.getLogger(__name__) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + LOGGER.info('Starting...') + + # Start metrics server + metrics_port = get_metrics_port() + start_http_server(metrics_port) + + grpc_service = AnalyticsFrontendService() + grpc_service.start() + + # Wait for Ctrl+C or termination signal + while not terminate.wait(timeout=1.0): pass + + LOGGER.info('Terminating...') + grpc_service.stop() + + LOGGER.info('Bye') + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/analytics/frontend/tests/__init__.py b/src/analytics/frontend/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3 --- /dev/null +++ b/src/analytics/frontend/tests/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py new file mode 100644 index 0000000000000000000000000000000000000000..646de962e8a213582fdb7cd1446ab57bda561a96 --- /dev/null +++ b/src/analytics/frontend/tests/messages.py @@ -0,0 +1,84 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid +import json +from common.proto.kpi_manager_pb2 import KpiId +from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, AnalyzerId, + Analyzer, AnalyzerFilter ) + +def create_analyzer_id(): + _create_analyzer_id = AnalyzerId() + # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + return _create_analyzer_id + +def create_analyzer(): + _create_analyzer = Analyzer() + # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" + _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING + + _kpi_id = KpiId() + # input IDs to analyze + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _create_analyzer.input_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.input_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.input_kpi_ids.append(_kpi_id) + # output IDs after analysis + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + # parameter + _threshold_dict = { + # 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), + 'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)} + _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) + _create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" + _create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size + _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet + + return _create_analyzer + +def create_analyzer_filter(): + _create_analyzer_filter = AnalyzerFilter() + + _analyzer_id_obj = AnalyzerId() + # _analyzer_id_obj.analyzer_id.uuid = str(uuid.uuid4()) + _analyzer_id_obj.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer_filter.analyzer_id.append(_analyzer_id_obj) + + _create_analyzer_filter.algorithm_names.append('Test_Aggergate_and_Threshold') + + # _input_kpi_id_obj = KpiId() + # _input_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) + # _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj) + # another input kpi Id + # _input_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) + # _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj) + + # _output_kpi_id_obj = KpiId() + # _output_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) + # _create_analyzer_filter.output_kpi_ids.append(_output_kpi_id_obj) + # # another output kpi Id + # _output_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) + # _create_analyzer_filter.input_kpi_ids.append(_output_kpi_id_obj) + + return _create_analyzer_filter diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py new file mode 100644 index 0000000000000000000000000000000000000000..d2428c01fb021f71a884d9a99c446bfef6e66559 --- /dev/null +++ b/src/analytics/frontend/tests/test_frontend.py @@ -0,0 +1,134 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +import json +import pytest +import logging +import threading + +from common.Constants import ServiceNameEnum +from common.proto.context_pb2 import Empty +from common.Settings import ( get_service_port_grpc, get_env_var_name, + ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC ) + +from common.tools.kafka.Variables import KafkaTopic +from common.proto.analytics_frontend_pb2 import AnalyzerId, AnalyzerList +from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient +from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService +from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer, + create_analyzer_filter ) +from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger + + +########################### +# Tests Setup +########################### + +LOCAL_HOST = '127.0.0.1' + +ANALYTICS_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND)) +os.environ[get_env_var_name(ServiceNameEnum.ANALYTICSFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.ANALYTICSFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(ANALYTICS_FRONTEND_PORT) + +LOGGER = logging.getLogger(__name__) + +@pytest.fixture(scope='session') +def analyticsFrontend_service(): + LOGGER.info('Initializing AnalyticsFrontendService...') + + _service = AnalyticsFrontendService() + _service.start() + + # yield the server, when test finishes, execution will resume to stop it + LOGGER.info('Yielding AnalyticsFrontendService...') + yield _service + + LOGGER.info('Terminating AnalyticsFrontendService...') + _service.stop() + + LOGGER.info('Terminated AnalyticsFrontendService...') + +@pytest.fixture(scope='session') +def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendService): + LOGGER.info('Initializing AnalyticsFrontendClient...') + + _client = AnalyticsFrontendClient() + + # yield the server, when test finishes, execution will resume to stop it + LOGGER.info('Yielding AnalyticsFrontendClient...') + yield _client + + LOGGER.info('Closing AnalyticsFrontendClient...') + _client.close() + + LOGGER.info('Closed AnalyticsFrontendClient...') + + +########################### +# Tests Implementation of Analytics Frontend +########################### + +# --- "test_validate_kafka_topics" should be executed before the functionality tests --- +def test_validate_kafka_topics(): + LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) + +# ----- core funtionality test ----- +# def test_StartAnalytics(analyticsFrontend_client): +# LOGGER.info(' >>> test_StartAnalytic START: <<< ') +# response = analyticsFrontend_client.StartAnalyzer(create_analyzer()) +# LOGGER.debug(str(response)) +# assert isinstance(response, AnalyzerId) + +# To test start and stop listener together +def test_StartStopAnalyzers(analyticsFrontend_client): + LOGGER.info(' >>> test_StartStopAnalyzers START: <<< ') + LOGGER.info('--> StartAnalyzer') + added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer()) + LOGGER.debug(str(added_analyzer_id)) + LOGGER.info(' --> Calling StartResponseListener... ') + class_obj = AnalyticsFrontendServiceServicerImpl() + response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id._uuid) + LOGGER.debug(response) + LOGGER.info("waiting for timer to comlete ...") + time.sleep(3) + LOGGER.info('--> StopAnalyzer') + response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id) + LOGGER.debug(str(response)) + +# def test_SelectAnalytics(analyticsFrontend_client): +# LOGGER.info(' >>> test_SelectAnalytics START: <<< ') +# response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) +# LOGGER.debug(str(response)) +# assert isinstance(response, AnalyzerList) + +# def test_StopAnalytic(analyticsFrontend_client): +# LOGGER.info(' >>> test_StopAnalytic START: <<< ') +# response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) +# LOGGER.debug(str(response)) +# assert isinstance(response, Empty) + +# def test_ResponseListener(): +# LOGGER.info(' >>> test_ResponseListener START <<< ') +# analyzer_id = create_analyzer_id() +# LOGGER.debug("Starting Response Listener for Analyzer ID: {:}".format(analyzer_id.analyzer_id.uuid)) +# class_obj = AnalyticsFrontendServiceServicerImpl() +# for response in class_obj.StartResponseListener(analyzer_id.analyzer_id.uuid): +# LOGGER.debug(response) +# assert isinstance(response, tuple) \ No newline at end of file diff --git a/src/analytics/requirements.in b/src/analytics/requirements.in new file mode 100644 index 0000000000000000000000000000000000000000..8ff30ddaad25c39713f2e6f68c8d9aebed74dad0 --- /dev/null +++ b/src/analytics/requirements.in @@ -0,0 +1,21 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +java==11.0.* +pyspark==3.5.2 +confluent-kafka==2.3.* +psycopg2-binary==2.9.* +SQLAlchemy==1.4.* +sqlalchemy-cockroachdb==1.4.* +SQLAlchemy-Utils==0.38.* diff --git a/src/analytics/tests/__init__.py b/src/analytics/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3 --- /dev/null +++ b/src/analytics/tests/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/analytics/tests/test_analytics_db.py b/src/analytics/tests/test_analytics_db.py new file mode 100644 index 0000000000000000000000000000000000000000..58e7d0167044bb461e66b053dcb3999641ea8419 --- /dev/null +++ b/src/analytics/tests/test_analytics_db.py @@ -0,0 +1,28 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +from analytics.database.Analyzer_DB import AnalyzerDB + +LOGGER = logging.getLogger(__name__) + +def test_verify_databases_and_tables(): + LOGGER.info('>>> test_verify_databases_and_tables : START <<< ') + AnalyzerDBobj = AnalyzerDB() + # AnalyzerDBobj.drop_database() + # AnalyzerDBobj.verify_tables() + AnalyzerDBobj.create_database() + AnalyzerDBobj.create_tables() + AnalyzerDBobj.verify_tables() diff --git a/src/common/Constants.py b/src/common/Constants.py index 4b2bced95fca28abdfd729492acc1117cdf3e8d9..74490321f9c8ec016fa4b48b583e2217c61710ec 100644 --- a/src/common/Constants.py +++ b/src/common/Constants.py @@ -66,6 +66,8 @@ class ServiceNameEnum(Enum): KPIVALUEWRITER = 'kpi-value-writer' TELEMETRYFRONTEND = 'telemetry-frontend' TELEMETRYBACKEND = 'telemetry-backend' + ANALYTICSFRONTEND = 'analytics-frontend' + ANALYTICSBACKEND = 'analytics-backend' # Used for test and debugging only DLT_GATEWAY = 'dltgateway' @@ -100,6 +102,8 @@ DEFAULT_SERVICE_GRPC_PORTS = { ServiceNameEnum.KPIVALUEWRITER .value : 30030, ServiceNameEnum.TELEMETRYFRONTEND .value : 30050, ServiceNameEnum.TELEMETRYBACKEND .value : 30060, + ServiceNameEnum.ANALYTICSFRONTEND .value : 30080, + ServiceNameEnum.ANALYTICSBACKEND .value : 30090, # Used for test and debugging only ServiceNameEnum.DLT_GATEWAY .value : 50051, diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index 5ada88a1ea0a7eae31eda741d81757fa624521de..fc43c315114e7b51c4e2604afbb14e165796e7c5 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -14,7 +14,6 @@ import logging from enum import Enum -from confluent_kafka import KafkaException from confluent_kafka.admin import AdminClient, NewTopic from common.Settings import get_setting @@ -26,11 +25,11 @@ class KafkaConfig(Enum): @staticmethod def get_kafka_address() -> str: - kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None) - if kafka_server_address is None: - KFK_NAMESPACE = get_setting('KFK_NAMESPACE') - KFK_PORT = get_setting('KFK_SERVER_PORT') - kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) + # kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None) + # if kafka_server_address is None: + KFK_NAMESPACE = get_setting('KFK_NAMESPACE') + KFK_PORT = get_setting('KFK_SERVER_PORT') + kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) return kafka_server_address @staticmethod @@ -41,11 +40,14 @@ class KafkaConfig(Enum): class KafkaTopic(Enum): - REQUEST = 'topic_request' - RESPONSE = 'topic_response' - RAW = 'topic_raw' - LABELED = 'topic_labeled' - VALUE = 'topic_value' + # TODO: Later to be populated from ENV variable. + REQUEST = 'topic_request' + RESPONSE = 'topic_response' + RAW = 'topic_raw' + LABELED = 'topic_labeled' + VALUE = 'topic_value' + ANALYTICS_REQUEST = 'topic_request_analytics' + ANALYTICS_RESPONSE = 'topic_response_analytics' @staticmethod def create_all_topics() -> bool: diff --git a/src/kpi_manager/database/KpiEngine.py b/src/kpi_manager/database/KpiEngine.py index dff406de666b5f68539b8897fa26e0b3ad51286b..0fce7e3d36cf2f03a18f311c815719a4f17b2869 100644 --- a/src/kpi_manager/database/KpiEngine.py +++ b/src/kpi_manager/database/KpiEngine.py @@ -16,8 +16,6 @@ import logging, sqlalchemy from common.Settings import get_setting LOGGER = logging.getLogger(__name__) - -# CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@127.0.0.1:{:s}/{:s}?sslmode={:s}' CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' class KpiEngine: @@ -33,12 +31,10 @@ class KpiEngine: CRDB_SSLMODE = get_setting('CRDB_SSLMODE') crdb_uri = CRDB_URI_TEMPLATE.format( CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) - # crdb_uri = CRDB_URI_TEMPLATE.format( - # CRDB_USERNAME, CRDB_PASSWORD, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) try: engine = sqlalchemy.create_engine(crdb_uri, echo=False) LOGGER.info(' KpiDBmanager initalized with DB URL: {:}'.format(crdb_uri)) except: # pylint: disable=bare-except # pragma: no cover LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) return None # type: ignore - return engine + return engine diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 95662969be4f9191e5f3748490a6bc47167e6243..6ab841238f446a2895cd163fab4b7eb05eaa3176 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -28,12 +28,8 @@ from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.method_wrappers.Decorator import MetricsPool from common.tools.service.GenericGrpcService import GenericGrpcService - - LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') -# EXPORTER_ENDPOINT = "http://10.152.183.2:9100/metrics" - class TelemetryBackendService(GenericGrpcService): """ diff --git a/src/telemetry/backend/service/__main__.py b/src/telemetry/backend/service/__main__.py index 4ad86733141966894070b78b3ac227890293fa7c..9ec9e191fd22e07da46f80214ade0ac516032433 100644 --- a/src/telemetry/backend/service/__main__.py +++ b/src/telemetry/backend/service/__main__.py @@ -13,7 +13,8 @@ # limitations under the License. import logging, signal, sys, threading -from common.Settings import get_log_level +from prometheus_client import start_http_server +from common.Settings import get_log_level, get_metrics_port from .TelemetryBackendService import TelemetryBackendService terminate = threading.Event() @@ -27,13 +28,17 @@ def main(): global LOGGER # pylint: disable=global-statement log_level = get_log_level() - logging.basicConfig(level=log_level) + logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") LOGGER = logging.getLogger(__name__) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - LOGGER.debug('Starting...') + LOGGER.info('Starting...') + + # Start metrics server + metrics_port = get_metrics_port() + start_http_server(metrics_port) grpc_service = TelemetryBackendService() grpc_service.start() @@ -41,10 +46,10 @@ def main(): # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1.0): pass - LOGGER.debug('Terminating...') + LOGGER.info('Terminating...') grpc_service.stop() - LOGGER.debug('Bye') + LOGGER.info('Bye') return 0 if __name__ == '__main__': diff --git a/src/telemetry/database/TelemetryEngine.py b/src/telemetry/database/TelemetryEngine.py index 18ec2ddbc671302b642db04b673038659da7acde..7c8620faf25e695e7f971bce78be9ad208a7701b 100644 --- a/src/telemetry/database/TelemetryEngine.py +++ b/src/telemetry/database/TelemetryEngine.py @@ -16,27 +16,25 @@ import logging, sqlalchemy from common.Settings import get_setting LOGGER = logging.getLogger(__name__) - -# CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@127.0.0.1:{:s}/{:s}?sslmode={:s}' CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' class TelemetryEngine: @staticmethod def get_engine() -> sqlalchemy.engine.Engine: crdb_uri = get_setting('CRDB_URI', default=None) - if crdb_uri is None: - CRDB_NAMESPACE = "crdb" - CRDB_SQL_PORT = "26257" - CRDB_DATABASE = "tfs-telemetry" - CRDB_USERNAME = "tfs" - CRDB_PASSWORD = "tfs123" - CRDB_SSLMODE = "require" + if crdb_uri is None: + CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') + CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') + CRDB_DATABASE = "tfs-telemetry" # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT') + CRDB_USERNAME = get_setting('CRDB_USERNAME') + CRDB_PASSWORD = get_setting('CRDB_PASSWORD') + CRDB_SSLMODE = get_setting('CRDB_SSLMODE') crdb_uri = CRDB_URI_TEMPLATE.format( - CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) + CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) try: engine = sqlalchemy.create_engine(crdb_uri, echo=False) LOGGER.info(' TelemetryDB initalized with DB URL: {:}'.format(crdb_uri)) except: # pylint: disable=bare-except # pragma: no cover LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) return None # type: ignore - return engine # type: ignore + return engine diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 2b872dba33bbe1434b68d5b5d2449e0b228312f7..b73d9fa952ee42aeb7adb8f3c0b2e4a3ba7f3e09 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -89,7 +89,14 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore ) -> Empty: # type: ignore LOGGER.info ("gRPC message: {:}".format(request)) - self.PublishStopRequestOnKafka(request) + try: + collector_to_delete = request.collector_id.uuid + self.tele_db_obj.delete_db_row_by_id( + CollectorModel, "collector_id", collector_to_delete + ) + self.PublishStopRequestOnKafka(request) + except Exception as e: + LOGGER.error('Unable to delete collector. Error: {:}'.format(e)) return Empty() def PublishStopRequestOnKafka(self, collector_id):