diff --git a/.gitignore b/.gitignore index db47387c8c8ff9900a59107642221960134aa1f1..235d7768a8c1d23bf9ed32cd3288a94e338e0eca 100644 --- a/.gitignore +++ b/.gitignore @@ -179,5 +179,3 @@ libyang/ # Other logs **/logs/*.log.* -# PySpark checkpoints -src/analytics/.spark/* diff --git a/deploy/all.sh b/deploy/all.sh index 97f4db37d53f4a7fcca850c51a5bfe6cc7653cb4..f3075949e036c5fee17969f20199c20ed7d983d3 100755 --- a/deploy/all.sh +++ b/deploy/all.sh @@ -215,6 +215,9 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"} # Deploy Apache Kafka ./deploy/kafka.sh +#Deploy Monitoring (Prometheus, Mimir, Grafana) +./deploy/monitoring.sh + # Expose Dashboard ./deploy/expose_dashboard.sh diff --git a/deploy/monitoring.sh b/deploy/monitoring.sh new file mode 100755 index 0000000000000000000000000000000000000000..6fa633a378d133d258b96a4b41dbd7de833690d9 --- /dev/null +++ b/deploy/monitoring.sh @@ -0,0 +1,132 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +# ----------------------------------------------------------- +# Global namespace for all deployments +# ----------------------------------------------------------- +NAMESPACE="monitoring" +VALUES_FILE_PATH="manifests/monitoring" + +# ----------------------------------------------------------- +# Prometheus Configuration +# ----------------------------------------------------------- +RELEASE_NAME_PROM="mon-prometheus" +CHART_REPO_NAME_PROM="prometheus-community" +CHART_REPO_URL_PROM="https://prometheus-community.github.io/helm-charts" +CHART_NAME_PROM="prometheus" +VALUES_FILE_PROM="$VALUES_FILE_PATH/prometheus_values.yaml" + +# ----------------------------------------------------------- +# Mimir Configuration +# ----------------------------------------------------------- +# RELEASE_NAME_MIMIR="mon-mimir" +# CHART_REPO_NAME_MIMIR="grafana" +# CHART_REPO_URL_MIMIR="https://grafana.github.io/helm-charts" +# CHART_NAME_MIMIR="mimir-distributed" +# VALUES_FILE_MIMIR="$VALUES_FILE_PATH/mimir_values.yaml" + +# ----------------------------------------------------------- +# Grafana Configuration +# ----------------------------------------------------------- +# RELEASE_NAME_GRAFANA="mon-grafana" +# CHART_REPO_NAME_GRAFANA="grafana" +# CHART_REPO_URL_GRAFANA="https://grafana.github.io/helm-charts" +# CHART_NAME_GRAFANA="grafana" +# VALUES_FILE_GRAFANA="$VALUES_FILE_PATH/grafana_values.yaml" + + +# ----------------------------------------------------------- +# Function to deploy or upgrade a Helm chart +# ----------------------------------------------------------- +deploy_chart() { + local release_name="$1" + local chart_repo_name="$2" + local chart_repo_url="$3" + local chart_name="$4" + local values_file="$5" + local namespace="$6" + + echo ">>> Deploying [${release_name}] from repo [${chart_repo_name}]..." + + # Add or update the Helm repo + echo "Adding/updating Helm repo: $chart_repo_name -> $chart_repo_url" + helm repo add "$chart_repo_name" "$chart_repo_url" || true + helm repo update + + # Create namespace if needed + echo "Creating namespace '$namespace' if it doesn't exist..." + kubectl get namespace "$namespace" >/dev/null 2>&1 || kubectl create namespace "$namespace" + + # Install or upgrade the chart + if [ -n "$values_file" ] && [ -f "$values_file" ]; then + echo "Installing/Upgrading $release_name using custom values from $values_file..." + helm upgrade --install "$release_name" "$chart_repo_name/$chart_name" \ + --namespace "$namespace" \ + --values "$values_file" + else + echo "Installing/Upgrading $release_name with default chart values..." + helm upgrade --install "$release_name" "$chart_repo_name/$chart_name" \ + --namespace "$namespace" + fi + + echo "<<< Deployment initiated for [$release_name]." + echo +} + + +# ----------------------------------------------------------- +# Actual Deployments +# ----------------------------------------------------------- + +# 1) Deploy Prometheus +deploy_chart "$RELEASE_NAME_PROM" \ + "$CHART_REPO_NAME_PROM" \ + "$CHART_REPO_URL_PROM" \ + "$CHART_NAME_PROM" \ + "$VALUES_FILE_PROM" \ + "$NAMESPACE" + +# Optionally wait for Prometheus server pod to become ready +kubectl rollout status deployment/"$RELEASE_NAME_PROM-server" -n "$NAMESPACE" || true + + +# 2) Deploy Mimir +# deploy_chart "$RELEASE_NAME_MIMIR" \ +# "$CHART_REPO_NAME_MIMIR" \ +# "$CHART_REPO_URL_MIMIR" \ +# "$CHART_NAME_MIMIR" \ +# "$VALUES_FILE_MIMIR" \ +# "$NAMESPACE" + +# Depending on how Mimir runs (StatefulSets, Deployments), you can wait for +# the correct resource to be ready. For example: +# kubectl rollout status statefulset/"$RELEASE_NAME_MIMIR-distributor" -n "$NAMESPACE" || true + + +# 3) Deploy Grafana +# deploy_chart "$RELEASE_NAME_GRAFANA" \ +# "$CHART_REPO_NAME_GRAFANA" \ +# "$CHART_REPO_URL_GRAFANA" \ +# "$CHART_NAME_GRAFANA" \ +# "$VALUES_FILE_GRAFANA" \ +# "$NAMESPACE" + +# kubectl rollout status deployment/"$RELEASE_NAME_GRAFANA" -n "$NAMESPACE" || true + +# ----------------------------------------------------------- +echo "All deployments completed!" + diff --git a/manifests/kpi_value_writerservice.yaml b/manifests/kpi_value_writerservice.yaml index f98be462990fff4d678e41144511a284e2dd4f6c..27c61c9331606cc3e404a5b928696674a53356c0 100644 --- a/manifests/kpi_value_writerservice.yaml +++ b/manifests/kpi_value_writerservice.yaml @@ -39,6 +39,8 @@ spec: env: - name: LOG_LEVEL value: "INFO" + - name: PUSHGATEWAY_URL + value: "http://mon-prometheus-prometheus-pushgateway.monitoring.svc.cluster.local:9091" envFrom: - secretRef: name: kfk-kpi-data diff --git a/manifests/monitoring/grafana_values.yaml b/manifests/monitoring/grafana_values.yaml new file mode 100644 index 0000000000000000000000000000000000000000..a2dbd7971159fa2b506e1f875eb8484f96850f38 --- /dev/null +++ b/manifests/monitoring/grafana_values.yaml @@ -0,0 +1,235 @@ +rbac: + create: true + ## Use an existing ClusterRole/Role (depending on rbac.namespaced false/true) + # useExistingRole: name-of-some-role + # useExistingClusterRole: name-of-some-clusterRole + pspEnabled: false + pspUseAppArmor: false + namespaced: false + +serviceAccount: + create: true + name: + nameTest: + ## ServiceAccount labels. + automountServiceAccountToken: false + +replicas: 1 + +## Create a headless service for the deployment +headlessService: false + +## Should the service account be auto mounted on the pod +automountServiceAccountToken: true + +## Create HorizontalPodAutoscaler object for deployment type +# +autoscaling: + enabled: false + minReplicas: 1 + maxReplicas: 3 + targetCPU: "60" + targetMemory: "" + behavior: {} + +deploymentStrategy: + type: RollingUpdate + +readinessProbe: + httpGet: + path: /api/health + port: 3000 + +livenessProbe: + httpGet: + path: /api/health + port: 3000 + initialDelaySeconds: 60 + timeoutSeconds: 30 + failureThreshold: 10 + +image: + registry: docker.io + repository: grafana/grafana + # Overrides the Grafana image tag whose default is the chart appVersion + tag: "" + sha: "" + pullPolicy: IfNotPresent + + ## Optionally specify an array of imagePullSecrets. + ## Secrets must be manually created in the namespace. + ## ref: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/ + ## Can be templated. + ## + pullSecrets: [] + # - myRegistrKeySecretName + +testFramework: + enabled: true + ## The type of Helm hook used to run this test. Defaults to test. + ## ref: https://helm.sh/docs/topics/charts_hooks/#the-available-hooks + ## + # hookType: test + image: + # -- The Docker registry + registry: docker.io + repository: bats/bats + tag: "v1.4.1" + imagePullPolicy: IfNotPresent + +# dns configuration for pod +dnsPolicy: ~ +dnsConfig: {} + # nameservers: + # - 8.8.8.8 + # options: + # - name: ndots + # value: "2" + # - name: edns0 + +securityContext: + runAsNonRoot: true + runAsUser: 472 + runAsGroup: 472 + fsGroup: 472 + +containerSecurityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + seccompProfile: + type: RuntimeDefault + +# Enable creating the grafana configmap +createConfigmap: true + +downloadDashboardsImage: + registry: docker.io + repository: curlimages/curl + tag: 8.9.1 + sha: "" + pullPolicy: IfNotPresent + +downloadDashboards: + env: {} + envFromSecret: "" + resources: {} + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + seccompProfile: + type: RuntimeDefault + envValueFrom: {} + # ENV_NAME: + # configMapKeyRef: + # name: configmap-name + # key: value_key + +## Pod Annotations +# podAnnotations: {} + +## ConfigMap Annotations +# configMapAnnotations: {} + # argocd.argoproj.io/sync-options: Replace=true + +## Pod Labels +# podLabels: {} + +podPortName: grafana +gossipPortName: gossip +## Deployment annotations +# annotations: {} + +service: + enabled: true + type: NodePort + port: 80 + targetPort: 3000 + nodePort: 30080 + portName: service + +## Enable persistence using Persistent Volume Claims +## ref: https://kubernetes.io/docs/user-guide/persistent-volumes/ +## +persistence: + type: pvc + enabled: true + # storageClassName: default + accessModes: + - ReadWriteOnce + size: 10Gi + # annotations: {} + finalizers: + - kubernetes.io/pvc-protection + + disableWarning: false + + ## If 'lookupVolumeName' is set to true, Helm will attempt to retrieve + ## the current value of 'spec.volumeName' and incorporate it into the template. + lookupVolumeName: true + +# Administrator credentials when not using an existing secret (see below) +adminUser: admin +# adminPassword: strongpassword + +# Use an existing secret for the admin user. +admin: + ## Name of the secret. Can be templated. + existingSecret: "" + userKey: admin-user + passwordKey: admin-password + +## Configure grafana datasources +## ref: http://docs.grafana.org/administration/provisioning/#datasources +## +datasources: + datasources.yaml: + apiVersion: 1 + datasources: + - name: Prometheus + type: prometheus + url: http://mon-prometheus-server.monitoring.svc.cluster.local + access: proxy + isDefault: true + - name: Mimir + type: prometheus + url: http://mimir-nginx.mon-mimir.svc:80/prometheus + access: proxy + isDefault: false + +## Grafana's primary configuration +## NOTE: values in map will be converted to ini format +## ref: http://docs.grafana.org/installation/configuration/ +## +grafana.ini: + paths: + data: /var/lib/grafana/ + logs: /var/log/grafana + plugins: /var/lib/grafana/plugins + provisioning: /etc/grafana/provisioning + analytics: + check_for_updates: true + log: + mode: console + grafana_net: + url: https://grafana.net + server: + domain: "{{ if (and .Values.ingress.enabled .Values.ingress.hosts) }}{{ tpl (.Values.ingress.hosts | first) . }}{{ else }}''{{ end }}" + +## Number of old ReplicaSets to retain +## +revisionHistoryLimit: 5 + +# assertNoLeakedSecrets is a helper function defined in _helpers.tpl that checks if secret +# values are not exposed in the rendered grafana.ini configmap. It is enabled by default. +# +# To pass values into grafana.ini without exposing them in a configmap, use variable expansion: +# https://grafana.com/docs/grafana/latest/setup-grafana/configure-grafana/#variable-expansion +# +# Alternatively, if you wish to allow secret values to be exposed in the rendered grafana.ini configmap, +# you can disable this check by setting assertNoLeakedSecrets to false. +assertNoLeakedSecrets: true + diff --git a/manifests/monitoring/prometheus_values.yaml b/manifests/monitoring/prometheus_values.yaml new file mode 100644 index 0000000000000000000000000000000000000000..fabc97c4a371ea47aff82a4bb310e56500aab991 --- /dev/null +++ b/manifests/monitoring/prometheus_values.yaml @@ -0,0 +1,52 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Configuration for Prometheus components and server settings +# Global Prometheus configuration +alertmanager: + enabled: false # Default is true +kube-state-metrics: + enabled: false # Default is true +prometheus-node-exporter: + enabled: false # Default is true +prometheus-pushgateway: + enabled: true # Default is true + +# Prometheus server-specific configuration +server: + retention: "30d" + logLevel: "debug" + resources: + requests: + cpu: "250m" + memory: "256Mi" + limits: + cpu: "1" + memory: "1Gi" + + # Expose the Prometheus server via a Kubernetes service + service: + type: NodePort + nodePort: 30090 + + extraScrapeConfigs: + - job_name: 'pushgateway' + static_configs: + - targets: + - 'prometheus-pushgateway.monitoring.svc.cluster.local:9091' # Push Gateway endpoint + + # Global Prometheus settings: + global: + scrape_interval: 10s + evaluation_interval: 10s diff --git a/scripts/run_tests_locally-kpi-value-writer.sh b/scripts/run_tests_locally-kpi-value-writer.sh index cbeed3b784a2316a3261ee7950bb5e6cffbb7fbf..e3d9c7c6a419483cf0ce9e066ba67e5d4ccefed4 100755 --- a/scripts/run_tests_locally-kpi-value-writer.sh +++ b/scripts/run_tests_locally-kpi-value-writer.sh @@ -19,6 +19,7 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src export KFK_SERVER_ADDRESS='127.0.0.1:9092' + RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ kpi_value_writer/tests/test_kpi_value_writer.py diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh index f648a62520f2f7b23f30edb19bf54735f5d13e12..1b4915d7476311d1ceb1693a0934278b44516f22 100755 --- a/scripts/run_tests_locally-telemetry-backend.sh +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -25,5 +25,5 @@ export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetr RCFILE=$PROJECTDIR/coverage/.coveragerc -python3 -m pytest --log-level=debug --log-cli-level=debug --verbose \ +python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \ telemetry/backend/tests/test_backend.py diff --git a/scripts/run_tests_locally-telemetry-frontend.sh b/scripts/run_tests_locally-telemetry-frontend.sh index 38822330ec3837ac1a101e2a7d46f4928c4b31e6..e70818377ed4c7021da0222a831c6f7d319398c7 100755 --- a/scripts/run_tests_locally-telemetry-frontend.sh +++ b/scripts/run_tests_locally-telemetry-frontend.sh @@ -18,10 +18,11 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src -# python3 kpi_manager/tests/test_unitary.py export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') + export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc -python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ + +python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \ telemetry/frontend/tests/test_frontend.py diff --git a/src/analytics/backend/service/__main__.py b/src/analytics/backend/service/__main__.py index 533761bab2ed225e3f8d82f5df7d9290f7fa01b8..55bcb53e4c1c404bd7203f7c7ecc6d0c260d5a54 100644 --- a/src/analytics/backend/service/__main__.py +++ b/src/analytics/backend/service/__main__.py @@ -16,8 +16,11 @@ import logging, signal, sys, threading from prometheus_client import start_http_server from common.Settings import get_log_level, get_metrics_port from .AnalyticsBackendService import AnalyticsBackendService +from common.tools.kafka.Variables import KafkaTopic + terminate = threading.Event() +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') LOGGER = None def signal_handler(signal, frame): # pylint: disable=redefined-outer-name @@ -36,6 +39,8 @@ def main(): LOGGER.info('Starting...') + KafkaTopic.create_all_topics() + # Start metrics server metrics_port = get_metrics_port() start_http_server(metrics_port) diff --git a/src/analytics/frontend/service/__main__.py b/src/analytics/frontend/service/__main__.py index edf94c4fdd828c9a195d8695b0ba52b544b6a863..a79b2bbc63b5543e11ee8bda92e1b8d9fdda967d 100644 --- a/src/analytics/frontend/service/__main__.py +++ b/src/analytics/frontend/service/__main__.py @@ -18,8 +18,11 @@ from common.Settings import get_log_level, get_metrics_port from .AnalyticsFrontendService import AnalyticsFrontendService from analytics.database.AnalyzerModel import Analyzer as Model from common.tools.database.GenericDatabase import Database +from common.tools.kafka.Variables import KafkaTopic + terminate = threading.Event() +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') LOGGER = None def signal_handler(signal, frame): # pylint: disable=redefined-outer-name @@ -43,6 +46,8 @@ def main(): kpiDBobj.create_database() kpiDBobj.create_tables() + KafkaTopic.create_all_topics() + # Start metrics server metrics_port = get_metrics_port() start_http_server(metrics_port) diff --git a/src/common/tools/client/RetryDecorator.py b/src/common/tools/client/RetryDecorator.py index 4750ff73ae4342ce2eb2a31941ff48b46e5be281..efc8b52348e6becdec13d4929e1ae9b4f3ad428f 100644 --- a/src/common/tools/client/RetryDecorator.py +++ b/src/common/tools/client/RetryDecorator.py @@ -56,7 +56,7 @@ def delay_linear(initial=0, increment=0, maximum=None): return delay return compute -def delay_exponential(initial=1, increment=1, maximum=None): +def delay_exponential(initial=1.0, increment=1.0, maximum=None): def compute(num_try): delay = initial * pow(increment, (num_try - 1)) if maximum is not None: diff --git a/src/common/tools/database/GenericEngine.py b/src/common/tools/database/GenericEngine.py index 1d38a1f440af643c253d5d17bda8ca9e6c3fdc44..89b6c2b6dd1d4c94dc2d1082ce0051a82078ddd8 100644 --- a/src/common/tools/database/GenericEngine.py +++ b/src/common/tools/database/GenericEngine.py @@ -33,8 +33,8 @@ class Engine: CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) try: engine = sqlalchemy.create_engine(crdb_uri, echo=False) - LOGGER.info(' AnalyzerDB initalized with DB URL: {:}'.format(crdb_uri)) + LOGGER.info(' Database initalized with DB URL: {:}'.format(crdb_uri)) + return engine except: # pylint: disable=bare-except # pragma: no cover LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) return None # type: ignore - return engine diff --git a/src/context/service/database/models/enums/KpiSampleType.py b/src/context/service/database/models/enums/KpiSampleType.py index 77b568dcfc809447851bd39fc5093ab60ca67892..66afdb710720f7bd272a8764a4c624fb7a563ab7 100644 --- a/src/context/service/database/models/enums/KpiSampleType.py +++ b/src/context/service/database/models/enums/KpiSampleType.py @@ -22,13 +22,16 @@ from ._GrpcToEnum import grpc_to_enum # BYTES_RECEIVED. If item name does not match, automatic mapping of # proto enums to database enums will fail. class ORM_KpiSampleTypeEnum(enum.Enum): - UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN - PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED - PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED - BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED - LINK_TOTAL_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS - LINK_USED_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS + UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN # 0 + PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED # 101 + PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # 102 + PACKETS_DROPPED = KpiSampleType.KPISAMPLETYPE_PACKETS_DROPPED # 103 + BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED # 201 + BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED # 202 + BYTES_DROPPED = KpiSampleType.KPISAMPLETYPE_BYTES_DROPPED # 203 + LINK_TOTAL_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS # 301 + LINK_USED_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS # 302 + grpc_to_enum__kpi_sample_type = functools.partial( grpc_to_enum, KpiSampleType, ORM_KpiSampleTypeEnum) diff --git a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py index 1dd214506339ccb257830c28fc43d0d80cdee9e7..38e6a1fe16f9ed89ee24934bc14040f2d60b5cdc 100644 --- a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py +++ b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py @@ -18,7 +18,7 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m from common.proto.context_pb2 import Empty from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceServicer from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList -# from kpi_manager.database.Kpi_DB import KpiDB +from common.method_wrappers.ServiceExceptions import NotFoundException from kpi_manager.database.KpiDB import KpiDB from kpi_manager.database.KpiModel import Kpi as KpiModel @@ -31,65 +31,48 @@ class KpiManagerServiceServicerImpl(KpiManagerServiceServicer): self.kpi_db_obj = KpiDB(KpiModel) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def SetKpiDescriptor(self, request: KpiDescriptor, grpc_context: grpc.ServicerContext # type: ignore - ) -> KpiId: # type: ignore + def SetKpiDescriptor( + self, request: KpiDescriptor, grpc_context: grpc.ServicerContext # type: ignore + ) -> KpiId: # type: ignore response = KpiId() LOGGER.info("Received gRPC message object: {:}".format(request)) - try: - kpi_to_insert = KpiModel.convert_KpiDescriptor_to_row(request) - if(self.kpi_db_obj.add_row_to_db(kpi_to_insert)): - response.kpi_id.uuid = request.kpi_id.kpi_id.uuid - # LOGGER.info("Added Row: {:}".format(response)) - return response - except Exception as e: - LOGGER.info("Unable to create KpiModel class object. {:}".format(e)) - + kpi_to_insert = KpiModel.convert_KpiDescriptor_to_row(request) + if self.kpi_db_obj.add_row_to_db(kpi_to_insert): + response.kpi_id.uuid = request.kpi_id.kpi_id.uuid + # LOGGER.info("Added Row: {:}".format(response)) + return response + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext # type: ignore - ) -> KpiDescriptor: # type: ignore + def GetKpiDescriptor( + self, request: KpiId, grpc_context: grpc.ServicerContext # type: ignore + ) -> KpiDescriptor: # type: ignore response = KpiDescriptor() - print("--> Received gRPC message object: {:}".format(request)) LOGGER.info("Received gRPC message object: {:}".format(request)) - try: - kpi_id_to_search = request.kpi_id.uuid - row = self.kpi_db_obj.search_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search) - if row is None: - print ('No matching row found for kpi id: {:}'.format(kpi_id_to_search)) - LOGGER.info('No matching row found kpi id: {:}'.format(kpi_id_to_search)) - return Empty() - else: - response = KpiModel.convert_row_to_KpiDescriptor(row) - return response - except Exception as e: - print ('Unable to search kpi id. {:}'.format(e)) - LOGGER.info('Unable to search kpi id. {:}'.format(e)) - raise e + kpi_id_to_search = request.kpi_id.uuid + row = self.kpi_db_obj.search_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search) + if row is None: + LOGGER.info('No matching row found kpi id: {:}'.format(kpi_id_to_search)) + raise NotFoundException('KpiDescriptor', kpi_id_to_search) + response = KpiModel.convert_row_to_KpiDescriptor(row) + return response @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def DeleteKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext # type: ignore - ) -> Empty: # type: ignore + def DeleteKpiDescriptor( + self, request: KpiId, grpc_context: grpc.ServicerContext # type: ignore + ) -> Empty: # type: ignore LOGGER.info("Received gRPC message object: {:}".format(request)) - try: - kpi_id_to_search = request.kpi_id.uuid - self.kpi_db_obj.delete_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search) - except Exception as e: - LOGGER.info('Unable to search kpi id. {:}'.format(e)) - finally: - return Empty() + kpi_id_to_search = request.kpi_id.uuid + self.kpi_db_obj.delete_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search) + return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def SelectKpiDescriptor(self, filter: KpiDescriptorFilter, grpc_context: grpc.ServicerContext # type: ignore - ) -> KpiDescriptorList: # type: ignore + def SelectKpiDescriptor( + self, filter: KpiDescriptorFilter, grpc_context: grpc.ServicerContext # type: ignore + ) -> KpiDescriptorList: # type: ignore LOGGER.info("Received gRPC message object: {:}".format(filter)) response = KpiDescriptorList() - try: - rows = self.kpi_db_obj.select_with_filter(KpiModel, filter) - except Exception as e: - LOGGER.info('Unable to apply filter on kpi descriptor. {:}'.format(e)) - try: - for row in rows: - kpiDescriptor_obj = KpiModel.convert_row_to_KpiDescriptor(row) - response.kpi_descriptor_list.append(kpiDescriptor_obj) - return response - except Exception as e: - LOGGER.info('Unable to process filter response {:}'.format(e)) + rows = self.kpi_db_obj.select_with_filter(KpiModel, filter) + for row in rows: + kpiDescriptor_obj = KpiModel.convert_row_to_KpiDescriptor(row) + response.kpi_descriptor_list.append(kpiDescriptor_obj) + return response diff --git a/src/kpi_manager/tests/test_kpi_manager.py b/src/kpi_manager/tests/test_kpi_manager.py index fedc3f94cee5be08301cd6241779966f3111f9c1..17a1c8d77d975c35fec82928f46c52b791816837 100755 --- a/src/kpi_manager/tests/test_kpi_manager.py +++ b/src/kpi_manager/tests/test_kpi_manager.py @@ -13,6 +13,7 @@ # limitations under the License. +import grpc import os, pytest import logging from typing import Union @@ -109,13 +110,19 @@ def test_DeleteKpiDescriptor(kpi_manager_client): LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ") # adding KPI response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) + # deleting KPI del_response = kpi_manager_client.DeleteKpiDescriptor(response_id) - # select KPI - kpi_manager_client.GetKpiDescriptor(response_id) LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response)) assert isinstance(del_response, Empty) + # select KPI and check it does not exist + with pytest.raises(grpc.RpcError) as e: + kpi_manager_client.GetKpiDescriptor(response_id) + assert e.value.code() == grpc.StatusCode.NOT_FOUND + MSG = 'KpiDescriptor({:s}) not found' + assert e.value.details() == MSG.format(response_id.kpi_id.uuid) + def test_GetKpiDescriptor(kpi_manager_client): LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") # adding KPI @@ -123,11 +130,6 @@ def test_GetKpiDescriptor(kpi_manager_client): # get KPI response = kpi_manager_client.GetKpiDescriptor(response_id) LOGGER.info("Response gRPC message object: {:}".format(response)) - - LOGGER.info(" >>> calling GetKpiDescriptor with random ID") - rand_response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request()) - LOGGER.info("Response gRPC message object: {:}".format(rand_response)) - assert isinstance(response, KpiDescriptor) def test_SelectKpiDescriptor(kpi_manager_client): diff --git a/src/kpi_manager/tests/test_messages.py b/src/kpi_manager/tests/test_messages.py index 5f55c2cfcfd3c5c65aa317d02376dd6971fba384..094c56df8d1056175526498081fdc790645f4233 100644 --- a/src/kpi_manager/tests/test_messages.py +++ b/src/kpi_manager/tests/test_messages.py @@ -28,13 +28,13 @@ def create_kpi_descriptor_request(descriptor_name: str = "Test_name"): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _create_kpi_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" - # _create_kpi_request.kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + # _create_kpi_request.kpi_id.kpi_id.uuid = "f974b6cc-095f-4767-b8c1-3457b383fb99" _create_kpi_request.kpi_description = descriptor_name _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' + _create_kpi_request.device_id.device_uuid.uuid = str(uuid.uuid4()) _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' - _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' - _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = str(uuid.uuid4()) _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' _create_kpi_request.link_id.link_uuid.uuid = 'LNK1' return _create_kpi_request @@ -77,4 +77,4 @@ def create_kpi_filter_request(): _create_kpi_filter_request.connection_id.append(connection_id_obj) _create_kpi_filter_request.link_id.append(link_id_obj) - return _create_kpi_filter_request \ No newline at end of file + return _create_kpi_filter_request diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index 0bc95355e35e6deab8ba79eeeb87e278b1b2ecd2..74291bba34ff85556ffe55e2c57ad7877846dc34 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -15,25 +15,21 @@ import json import logging import threading + +from confluent_kafka import KafkaError +from confluent_kafka import Consumer as KafkaConsumer + from common.tools.kafka.Variables import KafkaConfig, KafkaTopic -from common.proto.kpi_value_api_pb2 import KpiValue from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.Settings import get_service_port_grpc from common.Constants import ServiceNameEnum from common.tools.service.GenericGrpcService import GenericGrpcService - -from confluent_kafka import KafkaError -from confluent_kafka import Consumer as KafkaConsumer - from kpi_manager.client.KpiManagerClient import KpiManagerClient -# -- test import -- -# from kpi_value_writer.tests.test_messages import create_kpi_descriptor_request from .MetricWriterToPrometheus import MetricWriterToPrometheus -LOGGER = logging.getLogger(__name__) -ACTIVE_CONSUMERS = [] +LOGGER = logging.getLogger(__name__) class KpiValueWriter(GenericGrpcService): def __init__(self, cls_name : str = __name__) -> None: @@ -43,9 +39,8 @@ class KpiValueWriter(GenericGrpcService): 'group.id' : 'KpiValueWriter', 'auto.offset.reset' : 'latest'}) - def RunKafkaConsumer(self): + def install_servicers(self): thread = threading.Thread(target=self.KafkaKpiConsumer, args=()) - ACTIVE_CONSUMERS.append(thread) thread.start() def KafkaKpiConsumer(self): @@ -55,44 +50,32 @@ class KpiValueWriter(GenericGrpcService): consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.VALUE.value]) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) - print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) while True: raw_kpi = consumer.poll(1.0) if raw_kpi is None: continue elif raw_kpi.error(): - if raw_kpi.error().code() == KafkaError._PARTITION_EOF: - continue - else: + if raw_kpi.error().code() != KafkaError._PARTITION_EOF: print("Consumer error: {}".format(raw_kpi.error())) - continue + continue try: kpi_value = json.loads(raw_kpi.value().decode('utf-8')) LOGGER.info("Received KPI : {:}".format(kpi_value)) - print("Received KPI : {:}".format(kpi_value)) self.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer) - except Exception as e: - print("Error detail: {:}".format(e)) + except: + LOGGER.exception("Error detail: ") continue def get_kpi_descriptor(self, kpi_value: str, kpi_manager_client, metric_writer): - print("--- START -----") - kpi_id = KpiId() - kpi_id.kpi_id.uuid = kpi_value['kpi_uuid'] - print("KpiId generated: {:}".format(kpi_id)) - # print("Kpi manger client created: {:}".format(kpi_manager_client)) + kpi_id.kpi_id.uuid = kpi_value['kpi_id'] # type: ignore try: kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) - # TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object??? if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid: LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) - print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) metric_writer.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value) else: - LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) - print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) - except Exception as e: - LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e)) - print ("Unable to get KpiDescriptor. Error: {:}".format(e)) + LOGGER.info("No KPI Descriptor found in Database for Kpi ID: {:}".format(kpi_id)) + except: + LOGGER.exception("Unable to get KpiDescriptor") diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index bfbb6e3bab9770719f2fc23b3fab00e2805b074a..595d025b3de512453b9591b0e9d52d21a80de67a 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -14,15 +14,20 @@ # read Kafka stream from Kafka topic +import os import logging -from prometheus_client import Gauge -from common.proto.kpi_sample_types_pb2 import KpiSampleType -from common.proto.kpi_value_api_pb2 import KpiValue -from common.proto.kpi_manager_pb2 import KpiDescriptor +from prometheus_client import Gauge +from prometheus_client.exposition import push_to_gateway +from prometheus_client.registry import CollectorRegistry + +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.kpi_value_api_pb2 import KpiValue +from common.proto.kpi_manager_pb2 import KpiDescriptor -LOGGER = logging.getLogger(__name__) -PROM_METRICS = {} +LOGGER = logging.getLogger(__name__) +PROM_METRICS = {} +GATEWAY_URL = os.getenv('PUSHGATEWAY_URL', 'prometheus-pushgateway.monitoring.svc.cluster.local:9091') class MetricWriterToPrometheus: ''' @@ -30,7 +35,9 @@ class MetricWriterToPrometheus: cooked KPI value = KpiDescriptor (gRPC message) + KpiValue (gRPC message) ''' def __init__(self): - pass + self.job_name = 'kpivaluewriter' + self.registry = CollectorRegistry() + self.gateway_url = GATEWAY_URL def merge_kpi_descriptor_and_kpi_value(self, kpi_descriptor, kpi_value): # Creating a dictionary from the kpi_descriptor's attributes @@ -45,25 +52,28 @@ class MetricWriterToPrometheus: 'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid, 'link_id' : kpi_descriptor.link_id.link_uuid.uuid, 'time_stamp' : kpi_value.timestamp.timestamp, + #'time_stamp' : kpi_value["time_stamp"], 'kpi_value' : kpi_value.kpi_value_type.floatVal + #'kpi_value' : kpi_value["kpi_value"] } LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi)) return cooked_kpi def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue): # merge both gRPC messages into single varible. - cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) + cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} - metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags - metric_name = cooked_kpi['kpi_sample_type'] + metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags + metric_name = cooked_kpi['kpi_sample_type'] try: if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists PROM_METRICS[metric_name] = Gauge ( metric_name, cooked_kpi['kpi_description'], - metric_tags + metric_tags, + registry=self.registry ) - LOGGER.debug("Metric is created with labels: {:}".format(metric_tags)) + LOGGER.debug("Metric is created with labels: {:}".format(metric_tags)) PROM_METRICS[metric_name].labels( kpi_id = cooked_kpi['kpi_id'], device_id = cooked_kpi['device_id'], @@ -74,7 +84,11 @@ class MetricWriterToPrometheus: link_id = cooked_kpi['link_id'], time_stamp = cooked_kpi['time_stamp'], ).set(float(cooked_kpi['kpi_value'])) - LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name])) + LOGGER.debug("Metric is being pushed to the Gateway ... : {:}".format(PROM_METRICS[metric_name])) + + # Push to the Prometheus Gateway, Prometheus is preconfigured to scrap the metrics from the gateway + push_to_gateway(self.gateway_url, job=self.job_name, registry=self.registry) + LOGGER.debug("Metric pushed to Prometheus Gateway.") except ValueError as e: if 'Duplicated timeseries' in str(e): diff --git a/src/kpi_value_writer/service/__main__.py b/src/kpi_value_writer/service/__main__.py index 28ba2ac90f1e9ed28dfeeeda6b6da17568a124e7..56fc6100d391eec5953b24d882397c3ef7a2f130 100644 --- a/src/kpi_value_writer/service/__main__.py +++ b/src/kpi_value_writer/service/__main__.py @@ -13,7 +13,6 @@ # limitations under the License. import logging, signal, sys, threading -from prometheus_client import start_http_server from kpi_value_writer.service.KpiValueWriter import KpiValueWriter from common.Settings import get_log_level @@ -39,8 +38,6 @@ def main(): grpc_service = KpiValueWriter() grpc_service.start() - start_http_server(10808) - LOGGER.debug("Prometheus client is started on port 10808") # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1.0): pass diff --git a/src/kpi_value_writer/tests/test_kpi_value_writer.py b/src/kpi_value_writer/tests/test_kpi_value_writer.py index 0d3f9e683db5430fe9214cbf4131dcc38912da85..29e81d28ae8da9f9a2602be8b36cee368ee3d87b 100755 --- a/src/kpi_value_writer/tests/test_kpi_value_writer.py +++ b/src/kpi_value_writer/tests/test_kpi_value_writer.py @@ -12,14 +12,35 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest +import time import logging from kpi_value_writer.service.KpiValueWriter import KpiValueWriter +from kpi_manager.client.KpiManagerClient import KpiManagerClient from common.tools.kafka.Variables import KafkaTopic +from test_messages import create_kpi_descriptor_request +LOGGER = logging.getLogger(__name__) +# -------- Fixtures ---------------- + +@pytest.fixture(autouse=True) +def log_all_methods(request): + ''' + This fixture logs messages before and after each test function runs, indicating the start and end of the test. + The autouse=True parameter ensures that this logging happens automatically for all tests in the module. + ''' + LOGGER.info(f" >>>>> Starting test: {request.node.name} ") + yield + LOGGER.info(f" <<<<< Finished test: {request.node.name} ") + +# @pytest.fixture(scope='module') +# def kpi_manager_client(): +# LOGGER.debug("Yielding KpiManagerClient ...") +# yield KpiManagerClient(host="10.152.183.203") +# LOGGER.debug("KpiManagerClient is terminated.") -LOGGER = logging.getLogger(__name__) # -------- Initial Test ---------------- def test_validate_kafka_topics(): @@ -27,7 +48,15 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) -def test_KafkaConsumer(): - LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ") - # kpi_value_writer = KpiValueWriter() - # kpi_value_writer.RunKafkaConsumer() +# -------------- +# NOT FOR GITHUB PIPELINE (Local testing only) +# -------------- +# def test_KafkaConsumer(kpi_manager_client): + +# # kpidescriptor = create_kpi_descriptor_request() +# # kpi_manager_client.SetKpiDescriptor(kpidescriptor) + +# kpi_value_writer = KpiValueWriter() +# kpi_value_writer.KafkaKpiConsumer() +# LOGGER.debug(" waiting for timer to finish ") +# time.sleep(300) diff --git a/src/kpi_value_writer/tests/test_messages.py b/src/kpi_value_writer/tests/test_messages.py index ffc6b398c4ff6405fe1ac8eec086553fa6fbe193..4cd901b2c8b28e13f6ff0f373d3c0de6201a4c96 100755 --- a/src/kpi_value_writer/tests/test_messages.py +++ b/src/kpi_value_writer/tests/test_messages.py @@ -25,7 +25,8 @@ def create_kpi_id_request(): def create_kpi_descriptor_request(description: str = "Test Description"): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() - _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_kpi_request.kpi_id.kpi_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" _create_kpi_request.kpi_description = description _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' diff --git a/src/telemetry/backend/Dockerfile b/src/telemetry/backend/Dockerfile index 4bc5605d59bdb2e2b24cf580ce8e2a6978caa2e8..07459986d7efbcc1e0997239e327760093384a2e 100644 --- a/src/telemetry/backend/Dockerfile +++ b/src/telemetry/backend/Dockerfile @@ -62,6 +62,16 @@ RUN python3 -m pip install -r requirements.txt # Add component files into working directory WORKDIR /var/teraflow +COPY src/context/__init__.py context/__init__.py +COPY src/context/client/. context/client/ +COPY src/device/__init__.py device/__init__.py +COPY src/device/client/. device/client/ +COPY src/kpi_manager/client/. kpi_manager/client/ +COPY src/kpi_manager/__init__.py kpi_manager/__init__.py +COPY src/service/__init__.py service/__init__.py +COPY src/service/client/. service/client/ +COPY src/slice/__init__.py slice/__init__.py +COPY src/slice/client/. slice/client/ COPY src/telemetry/__init__.py telemetry/__init__.py COPY src/telemetry/backend/. telemetry/backend/ diff --git a/src/telemetry/backend/collector_api/_Collector.py b/src/telemetry/backend/collector_api/_Collector.py index ec4ba943c90de8a8d683d1e7a9dd9d48865b5edf..a4bd7f17f254873cb5fc6d3302d257e7f1e35f12 100644 --- a/src/telemetry/backend/collector_api/_Collector.py +++ b/src/telemetry/backend/collector_api/_Collector.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import threading +import queue from typing import Any, Iterator, List, Optional, Tuple, Union # Special resource names to request to the collector to retrieve the specified @@ -71,95 +71,89 @@ class _Collector: """ raise NotImplementedError() - def GetInitialConfig(self) -> List[Tuple[str, Any]]: - """ Retrieve initial configuration of entire device. - Returns: - values : List[Tuple[str, Any]] - List of tuples (resource key, resource value) for - resource keys. - """ - raise NotImplementedError() - - def GetConfig(self, resource_keys: List[str] = []) -> \ - List[Tuple[str, Union[Any, None, Exception]]]: - """ Retrieve running configuration of entire device or - selected resource keys. - Parameters: - resource_keys : List[str] - List of keys pointing to the resources to be retrieved. - Returns: - values : List[Tuple[str, Union[Any, None, Exception]]] - List of tuples (resource key, resource value) for - resource keys requested. If a resource is found, - the appropriate value type must be retrieved. - If a resource is not found, None must be retrieved as - value for that resource. In case of Exception, - the Exception must be retrieved as value. - """ - raise NotImplementedError() - - def SetConfig(self, resources: List[Tuple[str, Any]]) -> \ - List[Union[bool, Exception]]: - """ Create/Update configuration for a list of resources. - Parameters: - resources : List[Tuple[str, Any]] - List of tuples, each containing a resource_key pointing the - resource to be modified, and a resource_value containing - the new value to be set. - Returns: - results : List[Union[bool, Exception]] - List of results for resource key changes requested. - Return values must be in the same order as the - resource keys requested. If a resource is properly set, - True must be retrieved; otherwise, the Exception that is - raised during the processing must be retrieved. - """ - raise NotImplementedError() - - def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> \ - List[Union[bool, Exception]]: - """ Delete configuration for a list of resources. - Parameters: - resources : List[Tuple[str, Any]] - List of tuples, each containing a resource_key pointing the - resource to be modified, and a resource_value containing - possible additionally required values to locate - the value to be removed. - Returns: - results : List[Union[bool, Exception]] - List of results for resource key deletions requested. - Return values must be in the same order as the resource keys - requested. If a resource is properly deleted, True must be - retrieved; otherwise, the Exception that is raised during - the processing must be retrieved. - """ - raise NotImplementedError() - - def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> \ + # def GetInitialConfig(self) -> List[Tuple[str, Any]]: + # """ Retrieve initial configuration of entire device. + # Returns: + # values : List[Tuple[str, Any]] + # List of tuples (resource key, resource value) for + # resource keys. + # """ + # raise NotImplementedError() + + # def GetConfig(self, resource_keys: List[str] = []) -> \ + # List[Tuple[str, Union[Any, None, Exception]]]: + # """ Retrieve running configuration of entire device or + # selected resource keys. + # Parameters: + # resource_keys : List[str] + # List of keys pointing to the resources to be retrieved. + # Returns: + # values : List[Tuple[str, Union[Any, None, Exception]]] + # List of tuples (resource key, resource value) for + # resource keys requested. If a resource is found, + # the appropriate value type must be retrieved. + # If a resource is not found, None must be retrieved as + # value for that resource. In case of Exception, + # the Exception must be retrieved as value. + # """ + # raise NotImplementedError() + + # def SetConfig(self, resources: List[Tuple[str, Any]]) -> \ + # List[Union[bool, Exception]]: + # """ Create/Update configuration for a list of resources. + # Parameters: + # resources : List[Tuple[str, Any]] + # List of tuples, each containing a resource_key pointing the + # resource to be modified, and a resource_value containing + # the new value to be set. + # Returns: + # results : List[Union[bool, Exception]] + # List of results for resource key changes requested. + # Return values must be in the same order as the + # resource keys requested. If a resource is properly set, + # True must be retrieved; otherwise, the Exception that is + # raised during the processing must be retrieved. + # """ + # raise NotImplementedError() + + # def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> \ + # List[Union[bool, Exception]]: + # """ Delete configuration for a list of resources. + # Parameters: + # resources : List[Tuple[str, Any]] + # List of tuples, each containing a resource_key pointing the + # resource to be modified, and a resource_value containing + # possible additionally required values to locate + # the value to be removed. + # Returns: + # results : List[Union[bool, Exception]] + # List of results for resource key deletions requested. + # Return values must be in the same order as the resource keys + # requested. If a resource is properly deleted, True must be + # retrieved; otherwise, the Exception that is raised during + # the processing must be retrieved. + # """ + # raise NotImplementedError() + + def SubscribeState(self, subscriptions: List[Tuple[str, dict, float, float]]) -> \ + bool: + """ Subscribe to state information of the entire device or selected resources. + Subscriptions are incremental, and the collector should keep track of requested resources. + List of tuples, each containing: + - resource_id (str): Identifier pointing to the resource to be subscribed. + - resource_dict (dict): Dictionary containing resource name, KPI to be subscribed, and type. + - sampling_duration (float): Duration (in seconds) for how long monitoring should last. + - sampling_interval (float): Desired monitoring interval (in seconds) for the specified resource. + List of results for the requested resource key subscriptions. + The return values are in the same order as the requested resource keys. + - True if a resource is successfully subscribed. + - Exception if an error occurs during the subscription process. List[Union[bool, Exception]]: - """ Subscribe to state information of entire device or - selected resources. Subscriptions are incremental. - Collector should keep track of requested resources. - Parameters: - subscriptions : List[Tuple[str, float, float]] - List of tuples, each containing a resource_key pointing the - resource to be subscribed, a sampling_duration, and a - sampling_interval (both in seconds with float - representation) defining, respectively, for how long - monitoring should last, and the desired monitoring interval - for the resource specified. - Returns: - results : List[Union[bool, Exception]] - List of results for resource key subscriptions requested. - Return values must be in the same order as the resource keys - requested. If a resource is properly subscribed, - True must be retrieved; otherwise, the Exception that is - raised during the processing must be retrieved. - """ + """ raise NotImplementedError() - def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) \ - -> List[Union[bool, Exception]]: + def UnsubscribeState(self, resource_key: str) \ + -> bool: """ Unsubscribe from state information of entire device or selected resources. Subscriptions are incremental. Collector should keep track of requested resources. @@ -182,7 +176,7 @@ class _Collector: raise NotImplementedError() def GetState( - self, blocking=False, terminate : Optional[threading.Event] = None + self, duration : int, blocking=False, terminate: Optional[queue.Queue] = None ) -> Iterator[Tuple[float, str, Any]]: """ Retrieve last collected values for subscribed resources. Operates as a generator, so this method should be called once and will diff --git a/src/telemetry/backend/collectors/emulated/EmulatedCollector.py b/src/telemetry/backend/collectors/emulated/EmulatedCollector.py index 90be013368c5aa80dcb52c2394e8b74f9d74b6f4..48102a943f54eead9b0119b2839faaa123e1cb51 100644 --- a/src/telemetry/backend/collectors/emulated/EmulatedCollector.py +++ b/src/telemetry/backend/collectors/emulated/EmulatedCollector.py @@ -15,10 +15,7 @@ import pytz import queue import logging -import uuid -import json from anytree import Node, Resolver -from apscheduler.events import EVENT_JOB_ADDED, EVENT_JOB_REMOVED from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor @@ -36,10 +33,6 @@ class EmulatedCollector(_Collector): """ def __init__(self, address: str, port: int, **settings): super().__init__('emulated_collector', address, port, **settings) - self._initial_config = Node('root') # Tree structure for initial config - self._running_config = Node('root') # Tree structure for running config - self._subscriptions = Node('subscriptions') # Tree for state subscriptions - self._resolver = Resolver() # For path resolution in tree structures self._out_samples = queue.Queue() # Queue to hold synthetic state samples self._synthetic_data = SyntheticMetricsGenerator(metric_queue=self._out_samples) # Placeholder for synthetic data generator self._scheduler = BackgroundScheduler(daemon=True) @@ -48,8 +41,8 @@ class EmulatedCollector(_Collector): executors = {'default': ThreadPoolExecutor(max_workers=1)}, timezone = pytz.utc ) - self._scheduler.add_listener(self._listener_job_added_to_subscription_tree, EVENT_JOB_ADDED) - self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED) + # self._scheduler.add_listener(self._listener_job_added_to_subscription_tree, EVENT_JOB_ADDED) + # self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED) self._helper_methods = EmulatedCollectorHelper() self.logger = logging.getLogger(__name__) @@ -77,73 +70,56 @@ class EmulatedCollector(_Collector): if not self.connected: raise RuntimeError("Collector is not connected. Please connect before performing operations.") - def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + def SubscribeState(self, subscriptions: List[Tuple[str, dict, float, float]]) -> bool: self._require_connection() - results = [] - for resource_key, duration, interval in subscriptions: - resource_key = self._helper_methods.validate_resource_key(resource_key) # Validate the endpoint name - self.logger.info(f"1. Subscribing to {resource_key} with duration {duration}s and interval {interval}s") + try: + job_id, endpoint, duration, interval = subscriptions + except: + self.logger.exception(f"Invalid subscription format: {subscriptions}") + return False + if endpoint: + self.logger.info(f"Subscribing to {endpoint} with duration {duration}s and interval {interval}s") try: - self._resolver.get(self._running_config, resource_key) # Verify if the resource key exists in the running configuration - self.logger.info(f"Resource key {resource_key} exists in the configuration.") - resource_value = json.loads(self._resolver.get(self._running_config, resource_key).value) - if resource_value is not None: - sample_type_ids = resource_value['sample_types'] - self.logger.info(f"Sample type IDs for {resource_key}: {sample_type_ids}") - if len(sample_type_ids) == 0: - self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.") - results.append(False) - continue - else: - self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.") - results.append(False) - continue + sample_type_ids = endpoint['sample_types'] # type: ignore + resource_name = endpoint['name'] # type: ignore # Add the job to the scheduler - job_id = f"{resource_key}-{uuid.uuid4()}" self._scheduler.add_job( self._generate_sample, 'interval', seconds=interval, - args=[resource_key, sample_type_ids], - id=job_id, + args=[resource_name, sample_type_ids], + id=f"{job_id}", replace_existing=True, end_date=datetime.now(pytz.utc) + timedelta(seconds=duration) ) - self.logger.info(f"Job added to scheduler for resource key {resource_key} with duration {duration}s and interval {interval}s") - results.append(True) - except Exception as e: - self.logger.error(f"Failed to verify resource key or add job: {e}") - results.append(e) - return results + self.logger.info(f"Job added to scheduler for resource key {resource_name} with duration {duration}s and interval {interval}s") + return True + except: + self.logger.exception(f"Failed to verify resource key or add job:") + return False + else: + self.logger.warning(f"No sample types found for {endpoint}. Skipping subscription.") + return False - def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + def UnsubscribeState(self, resource_key: str) -> bool: self._require_connection() - results = [] - for resource_key, _, _ in subscriptions: - resource_key = self._helper_methods.validate_resource_key(resource_key) - try: - # Check if job exists - job_ids = [job.id for job in self._scheduler.get_jobs() if resource_key in job.id] - if not job_ids: - self.logger.warning(f"No active jobs found for {resource_key}. It might have already terminated.") - results.append(False) - continue - # Remove jobs - for job_id in job_ids: - self._scheduler.remove_job(job_id) - - self.logger.info(f"Unsubscribed from {resource_key} with job IDs: {job_ids}") - results.append(True) - except Exception as e: - self.logger.exception(f"Failed to unsubscribe from {resource_key}") - results.append(e) - return results + try: + # Check if job exists + job_ids = [job.id for job in self._scheduler.get_jobs() if resource_key in job.id] + if not job_ids: + self.logger.warning(f"No active jobs found for {resource_key}. It might have already terminated.") + return False + for job_id in job_ids: + self._scheduler.remove_job(job_id) + self.logger.info(f"Unsubscribed from {resource_key} with job IDs: {job_ids}") + return True + except: + self.logger.exception(f"Failed to unsubscribe from {resource_key}") + return False - def GetState(self, blocking: bool = False, terminate: Optional[queue.Queue] = None) -> Iterator[Tuple[float, str, Any]]: + def GetState(self, duration : int, blocking: bool = False, terminate: Optional[queue.Queue] = None) -> Iterator[Tuple[float, str, Any]]: self._require_connection() start_time = datetime.now(pytz.utc) - duration = 10 # Duration of the subscription in seconds (as an example) - while True: try: if terminate and not terminate.empty(): @@ -168,283 +144,3 @@ class EmulatedCollector(_Collector): self.logger.debug(f"Executing _generate_sample for resource: {resource_key}") sample = self._synthetic_data.generate_synthetic_data_point(resource_key, sample_type_ids) self._out_samples.put(sample) - -# ------------- Event Listeners (START)----------------- - - def _listener_job_removed_from_subscription_tree(self, event): - if event.job_id: - # Extract the resource key from the job ID - resource_key = event.job_id.split('-')[0] - resource_key = self._helper_methods.validate_resource_key(resource_key) - - # Remove the subscription from the tree - try: - subscription_path = resource_key.split('/') - parent = self._subscriptions - for part in subscription_path: - parent = next((child for child in parent.children if child.name == part), None) - if not parent: - raise ValueError(f"Subscription path '{resource_key}' not found in tree.") - if parent: - parent.parent.children = tuple(child for child in parent.parent.children if child != parent) - self.logger.warning(f"Automatically removed subscription from subscription_tree for {resource_key} after job termination by listener. Maybe due to timeout.") - except Exception as e: - self.logger.warning(f"Failed to remove subscription for {resource_key}: {e}") - - def _listener_job_added_to_subscription_tree(self, event): - try: - job_id = event.job_id - if job_id: - resource_key = job_id.split('-')[0] # Extract resource key from job ID - resource_key = self._helper_methods.validate_resource_key(resource_key) - subscription_path = resource_key.split('/') - parent = self._subscriptions - for part in subscription_path: - node = next((child for child in parent.children if child.name == part), None) - if not node: - node = Node(part, parent=parent) - parent = node - parent.value = { - "job_id": job_id - } - self.logger.info(f"Automatically added subscription for {resource_key} to the subscription_tree by listener.") - except Exception as e: - self.logger.exception("Failed to add subscription to the tree") - -# ------------- Event Listeners (END)----------------- - -#------------------------------------------------------------------------------------- -# ------- The below methods are kept for debugging purposes (test-case) only --------- -#------------------------------------------------------------------------------------- - -# This method can be commented but this will arise an error in the test-case (@pytest.fixture --> connected_configured_collector()). - def SetConfig(self, resources: dict) -> List[Union[bool, Exception]]: # For debugging purposes. - self._require_connection() - results = [] - - # if not isinstance(resources, dict): - # self.logger.error("Invalid configuration format: resources must be a dictionary.") - # raise ValueError("Invalid configuration format. Must be a dictionary.") - if 'config_rules' not in resources or not isinstance(resources['config_rules'], list): - self.logger.error("Invalid configuration format: 'config_rules' key missing or not a list.") - raise ValueError("Invalid configuration format. Must contain a 'config_rules' key with a list of rules.") - - for rule in resources['config_rules']: - try: - if 'action' not in rule or 'custom' not in rule: - raise ValueError(f"Invalid rule format: {rule}") - - action = rule['action'] - custom = rule['custom'] - resource_key = custom.get('resource_key') - resource_value = custom.get('resource_value') - - if not resource_key: - raise ValueError(f"Resource key is missing in rule: {rule}") - - if resource_value is None: - raise ValueError(f"Resource value is None for key: {resource_key}") - if not resource_key: - raise ValueError(f"Resource key is missing in rule: {rule}") - - if action == 1: # Set action - resource_path = self._helper_methods._parse_resource_key(resource_key) - # self.logger.info(f"1. Setting configuration for resource key {resource_key} and resource_path: {resource_path}") - parent = self._running_config - - for part in resource_path[:-1]: - if '[' in part and ']' in part: - base, index = part.split('[', 1) - index = index.rstrip(']') - parent = self._helper_methods._find_or_create_node(index, self._helper_methods._find_or_create_node(base, parent)) - # self.logger.info(f"2a. Creating node: {base}, {index}, {parent}") - elif resource_path[-1] != 'settings': - # self.logger.info(f"2b. Creating node: {part}") - parent = self._helper_methods._find_or_create_node(part, parent) - - final_part = resource_path[-1] - if final_part in ['address', 'port']: - self._helper_methods._create_or_update_node(final_part, parent, resource_value) - self.logger.info(f"Configured: {resource_key} = {resource_value}") - - if resource_key.startswith("_connect/settings"): - parent = self._helper_methods._find_or_create_node("_connect", self._running_config) - settings_node = self._helper_methods._find_or_create_node("settings", parent) - settings_node.value = None # Ensure settings node has None value - endpoints_node = self._helper_methods._find_or_create_node("endpoints", settings_node) - - for endpoint in resource_value.get("endpoints", []): - uuid = endpoint.get("uuid") - uuid = uuid.replace('/', '_') if uuid else None - if uuid: - # self.logger.info(f"3. Creating endpoint: {uuid}, {endpoint}, {endpoints_node}") - self._helper_methods._create_or_update_node(uuid, endpoints_node, endpoint) - self.logger.info(f"Configured endpoint: {uuid} : {endpoint}") - - elif resource_key.startswith("/interface"): - interface_parent = self._helper_methods._find_or_create_node("interface", self._running_config) - name = resource_value.get("name") - name = name.replace('/', '_') if name else None - if name: - self._helper_methods._create_or_update_node(name, interface_parent, resource_value) - self.logger.info(f"Configured interface: {name} : {resource_value}") - # self.logger.info(f"4. Configured interface: {name}") - - results.append(True) - else: - raise ValueError(f"Unsupported action '{action}' in rule: {rule}") - - if resource_value is None: - raise ValueError(f"Resource value is None for key: {resource_key}") - - except Exception as e: - self.logger.exception(f"Failed to apply rule: {rule}") - results.append(e) - - return results - -#----------------------------------- -# ------- EXTRA Methods ------------ -#----------------------------------- - - # def log_active_jobs(self): # For debugging purposes. - # """ - # Logs the IDs of all active jobs. - # This method retrieves the list of active jobs from the scheduler and logs their IDs using the logger. - # """ - # self._require_connection() - # jobs = self._scheduler.get_jobs() - # self.logger.info(f"Active jobs: {[job.id for job in jobs]}") - - # def print_config_tree(self): # For debugging purposes. - # """ - # Reads the configuration using GetConfig and prints it as a hierarchical tree structure. - # """ - # self._require_connection() - - # def print_tree(node, indent=""): - # """ - # Recursively prints the configuration tree. - - # Args: - # node (Node): The current node to print. - # indent (str): The current indentation level. - # """ - # if node.name != "root": # Skip the root node's name - # value = getattr(node, "value", None) - # print(f"{indent}- {node.name}: {json.loads(value) if value else ''}") - - # for child in node.children: - # print_tree(child, indent + " ") - - # print("Configuration Tree:") - # print_tree(self._running_config) - - - # def GetInitialConfig(self) -> List[Tuple[str, Any]]: # comment - # self._require_connection() - # results = [] - # for node in self._initial_config.descendants: - # value = getattr(node, "value", None) - # results.append((node.name, json.loads(value) if value else None)) - # self.logger.info("Retrieved initial configurations") - # return results - - # def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, dict, Exception]]]: # comment - # """ - # Retrieves the configuration for the specified resource keys. - # If no keys are provided, returns the full configuration tree. - - # Args: - # resource_keys (List[str]): A list of keys specifying the configuration to retrieve. - - # Returns: - # List[Tuple[str, Union[Any, dict, Exception]]]: A list of tuples with the resource key and its value, - # subtree, or an exception. - # """ - # self._require_connection() - # results = [] - - # try: - # if not resource_keys: - # # If no specific keys are provided, return the full configuration tree - - # full_tree = self._helper_methods._generate_subtree(self._running_config) - # # full_tree = self._generate_subtree(self._running_config) - # return [("full_configuration", full_tree)] - - # for key in resource_keys: - # try: - # # Parse the resource key - # resource_path = self._helper_methods.(key) - # self.logger.info(f"1. Retrieving configuration for resource path : {resource_path}") - - # # Navigate to the node corresponding to the key - # parent = self._running_config - # for part in resource_path: - # parent = self._find_or_raise_node(part, parent) - - # # Check if the node has a value - # value = getattr(parent, "value", None) - # if value: - # # If a value exists, return it - # results.append((key, json.loads(value))) - # else: - # # If no value, return the subtree of this node - # subtree = self._helper_methods._generate_subtree(parent) - # # subtree = self._generate_subtree(parent) - # results.append((key, subtree)) - - # except Exception as e: - # self.logger.exception(f"Failed to retrieve configuration for key: {key}") - # results.append((key, e)) - - # except Exception as e: - # self.logger.exception("Failed to retrieve configurations") - # results.append(("Error", e)) - - # return results - - # def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: # comment - # self._require_connection() - # results = [] - - # for key in resources: - # try: - # # Parse resource key into parts, handling brackets correctly - # resource_path = self._helper_methods.(key) - - # parent = self._running_config - # for part in resource_path: - # parent = self._find_or_raise_node(part, parent) - - # # Delete the final node - # node_to_delete = parent - # parent = node_to_delete.parent - # parent.children = tuple(child for child in parent.children if child != node_to_delete) - # self.logger.info(f"Deleted configuration for key: {key}") - - # # Handle endpoints structure - # if "interface" in key and "settings" in key: - # interface_name = key.split('[')[-1].split(']')[0] - # endpoints_parent = self._find_or_raise_node("_connect", self._running_config) - # endpoints_node = self._find_or_raise_node("endpoints", endpoints_parent) - # endpoint_to_delete = next((child for child in endpoints_node.children if child.name == interface_name), None) - # if endpoint_to_delete: - # endpoints_node.children = tuple(child for child in endpoints_node.children if child != endpoint_to_delete) - # self.logger.info(f"Removed endpoint entry for interface '{interface_name}'") - - # # Check if parent has no more children and is not the root - # while parent and parent.name != "root" and not parent.children: - # node_to_delete = parent - # parent = node_to_delete.parent - # parent.children = tuple(child for child in parent.children if child != node_to_delete) - # self.logger.info(f"Deleted empty parent node: {node_to_delete.name}") - - # results.append(True) - # except Exception as e: - # self.logger.exception(f"Failed to delete configuration for key: {key}") - # results.append(e) - - # return results - diff --git a/src/telemetry/backend/collectors/emulated/SyntheticMetricsGenerator.py b/src/telemetry/backend/collectors/emulated/SyntheticMetricsGenerator.py index a01e2c0e659f1eea6383030daeafef11c83d7a45..77d99843247c342a76d8dd7187a151fca83ae955 100644 --- a/src/telemetry/backend/collectors/emulated/SyntheticMetricsGenerator.py +++ b/src/telemetry/backend/collectors/emulated/SyntheticMetricsGenerator.py @@ -98,7 +98,7 @@ class SyntheticMetricsGenerator(): return (time.time(), resource_key, requested_metrics) - def metric_id_mapper(self, sample_type_ids, metric_dict): + def metric_id_mapper(self, sample_type_ids, metric_dict): # TODO: Add a dynamic mappper from kpi_sample_type ID to name... """ Maps the sample type IDs to the corresponding metric names. diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index a1f17df3cb65a6bd13ffb8e96a6a07b536200825..3aeee8238d3fa47d511f1b520d44bf0712fe10e7 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -12,12 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import queue import json import time import logging import threading -from typing import Any, Dict +from typing import Any, Dict, Tuple from datetime import datetime, timezone from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer @@ -27,9 +26,15 @@ from common.Settings import get_service_port_grpc from common.method_wrappers.Decorator import MetricsPool from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.service.GenericGrpcService import GenericGrpcService +from common.tools.context_queries.Device import get_device +from common.proto.kpi_manager_pb2 import KpiId -LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') +from kpi_manager.client.KpiManagerClient import KpiManagerClient +from context.client.ContextClient import ContextClient +from telemetry.backend.collectors.emulated.EmulatedCollector import EmulatedCollector + +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') class TelemetryBackendService(GenericGrpcService): """ @@ -44,9 +49,10 @@ class TelemetryBackendService(GenericGrpcService): self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'backend', 'auto.offset.reset' : 'latest'}) - self.running_threads = {} - self.emulatorCollector = None - self.metric_queue = queue.Queue() + self.collector = None + self.context_client = ContextClient() + self.kpi_manager_client = KpiManagerClient() + self.active_jobs = {} def install_servicers(self): threading.Thread(target=self.RequestListener).start() @@ -60,49 +66,91 @@ class TelemetryBackendService(GenericGrpcService): consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.TELEMETRY_REQUEST.value]) while True: - receive_msg = consumer.poll(2.0) + receive_msg = consumer.poll(1.0) if receive_msg is None: continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue + elif receive_msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART: + LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist or topic does not have any messages.") + continue else: LOGGER.error("Consumer error: {}".format(receive_msg.error())) break try: - collector = json.loads(receive_msg.value().decode('utf-8')) + collector = json.loads( + receive_msg.value().decode('utf-8') + ) collector_id = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) - if collector['duration'] == -1 and collector['interval'] == -1: - self.TerminateCollectorBackend(collector_id) + duration = collector.get('duration', 0) + if duration == -1 and collector['interval'] == -1: + self.TerminateCollector(collector_id) else: - threading.Thread(target=self.InitiateCollectorBackend, - args=(collector_id, collector)).start() + LOGGER.info("Received Collector ID: {:} - Scheduling...".format(collector_id)) + if collector_id not in self.active_jobs: + stop_event = threading.Event() + self.active_jobs[collector_id] = stop_event + threading.Thread(target = self.CollectorHandler, + args=( + collector_id, + collector['kpi_id'], + duration, + collector['interval'], + stop_event + )).start() + # Stop the Collector after the given duration + if duration > 0: + def stop_after_duration(completion_time, stop_event): + time.sleep(completion_time) + if not stop_event.is_set(): + LOGGER.warning(f"Execution duration ({completion_time}) completed of Collector: {collector_id}") + self.TerminateCollector(collector_id) + + duration_thread = threading.Thread( + target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}", + args=(duration, stop_event) + ) + duration_thread.start() + else: + LOGGER.warning("Collector ID: {:} - Already scheduled or running".format(collector_id)) except Exception as e: LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e)) - def InitiateCollectorBackend(self, collector_id, collector): + def CollectorHandler(self, collector_id, kpi_id, duration, interval, stop_event): """ - Method receives collector request and initiates collecter backend. + Method to handle collector request. """ - LOGGER.info("Initiating backend for collector: (Not Implemented... In progress ) {:s}".format(str(collector_id))) - # start_time = time.time() - # self.emulatorCollector = NetworkMetricsEmulator( - # duration = collector['duration'], - # interval = collector['interval'], - # metric_queue = self.metric_queue - # ) - # self.emulatorCollector.start() - # self.running_threads[collector_id] = self.emulatorCollector - - # while self.emulatorCollector.is_alive(): - # if not self.metric_queue.empty(): - # metric_value = self.metric_queue.get() - # LOGGER.debug("Metric: {:} - Value : {:}".format(collector['kpi_id'], metric_value)) - # self.GenerateKpiValue(collector_id, collector['kpi_id'] , metric_value) - # time.sleep(1) - # self.TerminateCollectorBackend(collector_id) + device_type, end_points = self.get_endpoint_detail(kpi_id) + + if end_points is None: + LOGGER.warning("KPI ID: {:} - Endpoints not found. Skipping...".format(kpi_id)) + return + + if device_type and "emu" in device_type: + LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points)) + subscription = [collector_id, end_points, duration, interval] + self.EmulatedCollectorHandler(subscription, duration, collector_id, kpi_id, stop_event) + else: + LOGGER.warning("KPI ID: {:} - Device Type: {:} - Not Supported".format(kpi_id, device_type)) + + def EmulatedCollectorHandler(self, subscription, duration, collector_id, kpi_id, stop_event): + # EmulatedCollector + + self.collector = EmulatedCollector(address="127.0.0.1", port=8000) + self.collector.Connect() + if not self.collector.SubscribeState(subscription): + LOGGER.warning("KPI ID: {:} - Subscription failed. Skipping...".format(kpi_id)) + else: + while not stop_event.is_set(): + samples = list(self.collector.GetState(duration=duration, blocking=True)) + LOGGER.info("KPI: {:} - Value: {:}".format(kpi_id, samples)) + self.GenerateKpiValue(collector_id, kpi_id, samples) + time.sleep(1) + self.collector.Disconnect() + # self.TerminateCollector(collector_id) # No need to terminate, automatically terminated after duration. def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ @@ -122,38 +170,62 @@ class TelemetryBackendService(GenericGrpcService): ) producer.flush() - def TerminateCollectorBackend(self, collector_id): + def TerminateCollector(self, job_id): LOGGER.debug("Terminating collector backend...") - if collector_id in self.running_threads: - thread = self.running_threads[collector_id] - thread.stop() - del self.running_threads[collector_id] - LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id)) - self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. - else: - LOGGER.warning('Backend collector {:} not found'.format(collector_id)) + try: + if job_id not in self.active_jobs: # not job_ids: + # self.logger.warning(f"Active jobs: {self.active_jobs}") + self.logger.warning(f"No active jobs found for {job_id}. It might have already terminated.") + else: + LOGGER.info(f"Terminating job: {job_id}") + stop_event = self.active_jobs.pop(job_id, None) + if stop_event: + stop_event.set() + LOGGER.info(f"Job {job_id} terminated.") + if self.collector.UnsubscribeState(job_id): + LOGGER.info(f"Unsubscribed from collector: {job_id}") + else: + LOGGER.warning(f"Failed to unsubscribe from collector: {job_id}") + else: + LOGGER.warning(f"Job {job_id} not found in active jobs.") + except: + LOGGER.exception("Error terminating job: {:}".format(job_id)) - def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): + def get_endpoint_detail(self, kpi_id: str): """ - Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic + Method to get device_type and endpoint detail based on device_uuid. """ - producer = self.kafka_producer - kpi_value : Dict = { - "kpi_id" : kpi_id, - "kpi_value" : measured_kpi_value, - } - producer.produce( - KafkaTopic.TELEMETRY_RESPONSE.value, - key = collector_id, - value = json.dumps(kpi_value), - callback = self.delivery_callback - ) - producer.flush() + kpi_id_obj = KpiId() + kpi_id_obj.kpi_id.uuid = kpi_id + kpi_descriptor = self.kpi_manager_client.GetKpiDescriptor(kpi_id_obj) + if not kpi_descriptor: + LOGGER.warning(f"KPI ID: {kpi_id} - Descriptor not found. Skipping...") + return (None, None) + + device_id = kpi_descriptor.device_id.device_uuid.uuid + endpoint_id = kpi_descriptor.endpoint_id.endpoint_uuid.uuid + device = get_device( context_client = self.context_client, + device_uuid = device_id, + include_config_rules = False, + include_components = False, + ) + if device: + for endpoint in device.device_endpoints: + if endpoint.endpoint_id.endpoint_uuid.uuid == endpoint_id: + endpoint_dict = {} + kpi_sample_types = [] + endpoint_dict["uuid"] = endpoint.endpoint_id.endpoint_uuid.uuid + endpoint_dict["name"] = endpoint.name + endpoint_dict["type"] = endpoint.endpoint_type + for sample_type in endpoint.kpi_sample_types: + kpi_sample_types.append(sample_type) + endpoint_dict["sample_types"] = kpi_sample_types + + return (device.device_type, endpoint_dict) + + LOGGER.warning(f"Device ID: {device_id} - Endpoint ID: {endpoint_id} - Not Found") + return (None, None) def delivery_callback(self, err, msg): if err: LOGGER.error('Message delivery failed: {:s}'.format(str(err))) - # print(f'Message delivery failed: {err}') - # else: - # LOGGER.info('Message delivered to topic {:}'.format(msg.topic())) - # print(f'Message delivered to topic {msg.topic()}') diff --git a/src/telemetry/backend/service/__main__.py b/src/telemetry/backend/service/__main__.py index 61ff397214fa21ff2d767c2eda4b3a7ee1f796b5..6e77d5d6cc7e31792d737bca04fb1af0e7baa2bb 100644 --- a/src/telemetry/backend/service/__main__.py +++ b/src/telemetry/backend/service/__main__.py @@ -16,6 +16,7 @@ import logging, signal, sys, threading from prometheus_client import start_http_server from common.Settings import get_log_level, get_metrics_port from .TelemetryBackendService import TelemetryBackendService +from common.tools.kafka.Variables import KafkaTopic terminate = threading.Event() LOGGER = None @@ -34,6 +35,8 @@ def main(): signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) + KafkaTopic.create_all_topics() + LOGGER.info('Starting...') # Start metrics server diff --git a/src/telemetry/backend/tests/Fixtures.py b/src/telemetry/backend/tests/Fixtures.py new file mode 100644 index 0000000000000000000000000000000000000000..59f1b761ca40caf2013471c7f6fdbaa781759a0a --- /dev/null +++ b/src/telemetry/backend/tests/Fixtures.py @@ -0,0 +1,58 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import logging + +from context.client.ContextClient import ContextClient +from device.client.DeviceClient import DeviceClient +from service.client.ServiceClient import ServiceClient +from kpi_manager.client.KpiManagerClient import KpiManagerClient + + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + + +@pytest.fixture(scope='session') +def context_client(): + _client = ContextClient(host="10.152.183.234") + _client.connect() + LOGGER.info('Yielding Connected ContextClient...') + yield _client + _client.close() + +@pytest.fixture(scope='session') +def device_client(): + _client = DeviceClient(host="10.152.183.95") + _client.connect() + LOGGER.info('Yielding Connected DeviceClient...') + yield _client + _client.close() + +@pytest.fixture(scope='session') +def service_client(): + _client = ServiceClient(host="10.152.183.47") + _client.connect() + LOGGER.info('Yielding Connected DeviceClient...') + yield _client + _client.close() + +@pytest.fixture(scope='session') +def kpi_manager_client(): + _client = KpiManagerClient(host="10.152.183.118") + LOGGER.info('Yielding Connected KpiManagerClient...') + yield _client + _client.close() + LOGGER.info('Closed KpiManagerClient...') diff --git a/src/telemetry/backend/tests/add_devices.py b/src/telemetry/backend/tests/add_devices.py new file mode 100644 index 0000000000000000000000000000000000000000..9fe02a953ec8a789ea5faa2aa49f3829b33aa721 --- /dev/null +++ b/src/telemetry/backend/tests/add_devices.py @@ -0,0 +1,78 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, os, time +from common.Constants import DEFAULT_CONTEXT_NAME +from common.proto.context_pb2 import ContextId, DeviceOperationalStatusEnum, Empty +from common.tools.descriptor.Loader import DescriptorLoader, check_descriptor_load_results, validate_empty_scenario +from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string +from common.tools.object_factory.Context import json_context_id +from context.client.ContextClient import ContextClient +from device.client.DeviceClient import DeviceClient +from .Fixtures import context_client, device_client # pylint: disable=unused-import + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +DESCRIPTOR_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'topology.json') +ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) + +def load_topology( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name +) -> None: + LOGGER.info('Loading Topology...') + validate_empty_scenario(context_client) + descriptor_loader = DescriptorLoader( + descriptors_file=DESCRIPTOR_FILE, context_client=context_client, device_client=device_client) + LOGGER.info('Descriptor Loader Created') + results = descriptor_loader.process() + # LOGGER.info('Descriptor Load Results: {:s}'.format(str(results))) + check_descriptor_load_results(results, descriptor_loader) + # descriptor_loader.validate() + + # Verify the scenario has no services/slices + response = context_client.GetContext(ADMIN_CONTEXT_ID) + assert len(response.service_ids) == 0 + assert len(response.slice_ids) == 0 + +def test_scenario_devices_enabled( + context_client : ContextClient, # pylint: disable=redefined-outer-name +) -> None: + """ + This test validates that the devices are enabled. + """ + DEVICE_OP_STATUS_ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED + + disabled_devices = list() + response = None + num_devices = -1 + num_devices_enabled, num_retry = 0, 0 + while (num_devices != num_devices_enabled) and (num_retry < 10): + time.sleep(1.0) + response = context_client.ListDevices(Empty()) + num_devices = len(response.devices) + num_devices_enabled = 0 + disabled_devices = list() + for device in response.devices: + if device.device_operational_status == DEVICE_OP_STATUS_ENABLED: + num_devices_enabled += 1 + else: + disabled_devices.append(grpc_message_to_json(device)) + LOGGER.info('Num Devices enabled: {:d}/{:d}'.format(num_devices_enabled, num_devices)) + num_retry += 1 + if num_devices_enabled != num_devices: + LOGGER.info('Disabled Devices: {:s}'.format(str(disabled_devices))) + LOGGER.info('Devices: {:s}'.format(grpc_message_to_json_string(response))) + assert num_devices_enabled == num_devices diff --git a/src/telemetry/backend/tests/messages.py b/src/telemetry/backend/tests/messages.py index f6a2bb247f28d10654746e0c75b6ed1973382e38..0d31cd15f2038c3065d679e34b1b772d37aaaf45 100644 --- a/src/telemetry/backend/tests/messages.py +++ b/src/telemetry/backend/tests/messages.py @@ -15,8 +15,8 @@ import uuid import random from common.proto import telemetry_frontend_pb2 -# from common.proto.kpi_sample_types_pb2 import KpiSampleType -# from common.proto.kpi_manager_pb2 import KpiId +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId def create_collector_request(): _create_collector_request = telemetry_frontend_pb2.Collector() @@ -24,8 +24,25 @@ def create_collector_request(): # _create_collector_request.collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _create_collector_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" - _create_collector_request.duration_s = float(random.randint(8, 16)) + _create_collector_request.duration_s = float(random.randint(30, 50)) # _create_collector_request.duration_s = -1 _create_collector_request.interval_s = float(random.randint(2, 4)) return _create_collector_request +def _create_kpi_descriptor(device_id : str = ""): + _create_kpi_request = KpiDescriptor() + _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_kpi_request.kpi_description = "Test Description" + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED + _create_kpi_request.device_id.device_uuid.uuid = device_id + _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = '36571df2-bac1-5909-a27d-5f42491d2ff0' + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' + _create_kpi_request.link_id.link_uuid.uuid = 'LNK2' + return _create_kpi_request + +def _create_kpi_id(kpi_id : str = "fc046641-0c9a-4750-b4d9-9f98401714e2"): + _create_kpi_id_request = KpiId() + _create_kpi_id_request.kpi_id.uuid = kpi_id + return _create_kpi_id_request diff --git a/src/telemetry/backend/tests/test_backend.py b/src/telemetry/backend/tests/test_backend.py index e75b33ca58c6bf27c5d2e1c2012dc31de5274ad3..1329aa969a4fed5baa887dd12d120f41eb56c2fa 100644 --- a/src/telemetry/backend/tests/test_backend.py +++ b/src/telemetry/backend/tests/test_backend.py @@ -12,42 +12,170 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest import logging import time -from typing import Dict -from common.tools.kafka.Variables import KafkaTopic from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService -from .messages import create_collector_request +from .messages import create_collector_request, _create_kpi_descriptor, _create_kpi_id +from .Fixtures import context_client, device_client, service_client, kpi_manager_client +from .add_devices import load_topology +from common.tools.context_queries.Topology import get_topology +from common.Constants import DEFAULT_CONTEXT_NAME +from common.tools.context_queries.Device import get_device, add_device_to_topology +# from common.tools.context_queries.EndPoint import get_endpoint_names +from .EndPoint import get_endpoint_names # modofied version of get_endpoint_names +from common.proto.context_pb2 import EndPointId, DeviceId, TopologyId, ContextId , Empty +from common.proto.kpi_manager_pb2 import KpiId LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) ########################### # Tests Implementation of Telemetry Backend ########################### -# --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) - -# def test_RunRequestListener(): -# LOGGER.info('test_RunRequestListener') -# TelemetryBackendServiceObj = TelemetryBackendService() -# threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() - -def test_RunInitiateCollectorBackend(): - LOGGER.debug(">>> RunInitiateCollectorBackend <<<") - collector_obj = create_collector_request() - collector_id = collector_obj.collector_id.collector_id.uuid - collector_dict : Dict = { - "kpi_id" : collector_obj.kpi_id.kpi_id.uuid, - "duration": collector_obj.duration_s, - "interval": collector_obj.interval_s - } - TeleObj = TelemetryBackendService() - TeleObj.InitiateCollectorBackend(collector_id, collector_dict) - time.sleep(20) - - LOGGER.debug("--- Execution Finished Sucessfully---") +@pytest.fixture(autouse=True) +def log_all_methods(request): + ''' + This fixture logs messages before and after each test function runs, indicating the start and end of the test. + The autouse=True parameter ensures that this logging happens automatically for all tests in the module. + ''' + LOGGER.info(f" >>>>> Starting test: {request.node.name} ") + yield + LOGGER.info(f" <<<<< Finished test: {request.node.name} ") + +# # ----- Add Topology ----- +# def test_add_to_topology(context_client, device_client, service_client): +# load_topology(context_client, device_client) + +# # ----- Add Device to Topology ------ +# def test_add_device_to_topology(context_client): +# context_id = ContextId() +# context_id.context_uuid.uuid = "43813baf-195e-5da6-af20-b3d0922e71a7" +# topology_uuid = "c76135e3-24a8-5e92-9bed-c3c9139359c8" +# device_uuid = "69a3a3f0-5237-5f9e-bc96-d450d0c6c03a" +# response = add_device_to_topology( context_client = context_client, +# context_id = context_id, +# topology_uuid = topology_uuid, +# device_uuid = device_uuid +# ) +# LOGGER.info(f"Device added to topology: {response}") +# assert response is True + +# # ----- Get Topology ----- +# def test_get_topology(context_client, device_client): +# response = get_topology(context_client = context_client, topology_uuid = "test1", context_uuid = "test1") +# LOGGER.info(f"Topology: {response}") +# assert response is not None + +# def test_set_kpi_descriptor_and_get_device_id(kpi_manager_client): +# kpi_descriptor = _create_kpi_descriptor("1290fb71-bf15-5528-8b69-2d2fabe1fa18") +# kpi_id = kpi_manager_client.SetKpiDescriptor(kpi_descriptor) +# LOGGER.info(f"KPI Descriptor set: {kpi_id}") +# assert kpi_id is not None + +# response = kpi_manager_client.GetKpiDescriptor(kpi_id) +# # response = kpi_manager_client.GetKpiDescriptor(_create_kpi_id()) + +# assert response is not None +# LOGGER.info(f"KPI Descriptor: {response}") +# LOGGER.info(f"Device Id: {response.device_id.device_uuid.uuid}") +# LOGGER.info(f"Endpoint Id: {response.endpoint_id.endpoint_uuid.uuid}") + +# # ----- Get endpoint detail using device ID ----- +# def test_get_device_details(context_client): +# response = get_device(context_client = context_client, device_uuid = "1290fb71-bf15-5528-8b69-2d2fabe1fa18", include_config_rules = False, include_components = False) +# if response: +# LOGGER.info(f"Device type: {response.device_type}") +# for endpoint in response.device_endpoints: +# if endpoint.endpoint_id.endpoint_uuid.uuid == '36571df2-bac1-5909-a27d-5f42491d2ff0': +# endpoint_dict = {} +# kpi_sample_types = [] +# # LOGGER.info(f"Endpoint: {endpoint}") +# # LOGGER.info(f"Enpoint_uuid: {endpoint.endpoint_id.endpoint_uuid.uuid}") +# endpoint_dict["uuid"] = endpoint.endpoint_id.endpoint_uuid.uuid +# # LOGGER.info(f"Enpoint_name: {endpoint.name}") +# endpoint_dict["name"] = endpoint.name +# # LOGGER.info(f"Enpoint_type: {endpoint.endpoint_type}") +# endpoint_dict["type"] = endpoint.endpoint_type +# for sample_type in endpoint.kpi_sample_types: +# # LOGGER.info(f"Enpoint_sample_types: {sample_type}") +# kpi_sample_types.append(sample_type) +# endpoint_dict["sample_types"] = kpi_sample_types +# LOGGER.info(f"Extracted endpoint dict: {endpoint_dict}") +# else: +# LOGGER.info(f"Endpoint not matched") +# LOGGER.info(f"Device Type: {type(response)}") +# assert response is not None + +# # ----- List Conetxts ----- +# def test_list_contextIds(context_client): +# empty = Empty() +# response = context_client.ListContexts(empty) +# LOGGER.info(f"Contexts: {response}") +# assert response + +# # ----- List Devices ----- +# def test_list_devices(context_client): +# empty = Empty() +# response = context_client.ListDeviceIds(empty) +# LOGGER.info(f"Devices: {response}") +# assert response + +# ----- Get Endpoints ----- TODO: get_endpoint_names method doesn't return KPI samples types +# def test_get_endpoints(context_client): +# device_id = DeviceId() +# device_id.device_uuid.uuid = "1290fb71-bf15-5528-8b69-2d2fabe1fa18" +# endpoint_id = EndPointId() +# endpoint_id.endpoint_uuid.uuid = "43b817fa-246f-5e0a-a2e3-2aad0b3e16ca" +# endpoint_id.device_id.CopyFrom(device_id) +# response = get_endpoint_names(context_client = context_client, endpoint_ids = [endpoint_id]) +# LOGGER.info(f"Endpoints: {response}") +# assert response is not None + +# # ----- List Topologies ----- +# def test_list_topologies(context_client): +# context_id = ContextId() +# context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef" +# respone = context_client.ListTopologies(context_id) +# LOGGER.info(f"Topologies: {respone}") + +# # ----- Remove Topology ----- +# def test_remove_topology(context_client): +# context_id = ContextId() +# context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef" +# topology_id = TopologyId() +# topology_id.topology_uuid.uuid = "9ef0118c-4bca-5e81-808b-dc8f60e2cda4" +# topology_id.context_id.CopyFrom(context_id) + +# response = context_client.RemoveTopology(topology_id) +# LOGGER.info(f"Topology removed: {response}") + +# # ----- Remove context ----- +# def test_remove_context(context_client): +# context_id = ContextId() +# context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef" +# response = context_client.RemoveContext(context_id) +# LOGGER.info(f"Context removed: {response}") + +@pytest.fixture +def telemetryBackend_service(): + LOGGER.info('Initializing TelemetryBackendService...') + + _service = TelemetryBackendService() + _service.start() + + LOGGER.info('Yielding TelemetryBackendService...') + yield _service + + LOGGER.info('Terminating TelemetryBackendService...') + _service.stop() + LOGGER.info('Terminated TelemetryBackendService...') + + +def test_InitiateCollectorBackend(telemetryBackend_service): + LOGGER.info(" Backend Initiated Successfully. Waiting for timer to finish ...") + time.sleep(30) + LOGGER.info(" Backend Timer Finished Successfully. ") + diff --git a/src/telemetry/backend/tests/topology.json b/src/telemetry/backend/tests/topology.json new file mode 100644 index 0000000000000000000000000000000000000000..6416130b924441e959fcdb7001b7c1b51df172d8 --- /dev/null +++ b/src/telemetry/backend/tests/topology.json @@ -0,0 +1,148 @@ +{ + "contexts": [ + {"context_id": {"context_uuid": {"uuid": "admin"}}} + ], + "topologies": [ + {"topology_id": {"context_id": {"context_uuid": {"uuid": "admin"}}, "topology_uuid": {"uuid": "admin"}}} + ], + "devices": [ + { + "device_id": {"device_uuid": {"uuid": "DE1"}}, "device_type": "emu-packet-router", "device_drivers": [0], + "device_endpoints": [], "device_operational_status": 0, "device_config": {"config_rules": [ + {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}}, + {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}}, + {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {"endpoints": [ + {"sample_types": [101, 102], "type": "copper/internal", "uuid": "1/1"}, + {"sample_types": [103, 102], "type": "copper/internal", "uuid": "1/2"}, + {"sample_types": [201, 202], "type": "copper/internal", "uuid": "2/1"}, + {"sample_types": [202, 203], "type": "copper/internal", "uuid": "2/2"}, + {"sample_types": [201, 203], "type": "copper/internal", "uuid": "2/3"}, + {"sample_types": [101, 103], "type": "copper/internal", "uuid": "2/4"} + ]}}} + ]} + }, + { + "device_id": {"device_uuid": {"uuid": "DE2"}}, "device_type": "emu-packet-router", "device_drivers": [0], + "device_endpoints": [], "device_operational_status": 0, "device_config": {"config_rules": [ + {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}}, + {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}}, + {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {"endpoints": [ + {"sample_types": [101, 103], "type": "copper/internal", "uuid": "1/1"}, + {"sample_types": [103, 101], "type": "copper/internal", "uuid": "1/2"}, + {"sample_types": [202, 201], "type": "copper/internal", "uuid": "2/1"}, + {"sample_types": [203, 201], "type": "copper/internal", "uuid": "2/2"}, + {"sample_types": [203, 202], "type": "copper/internal", "uuid": "2/3"}, + {"sample_types": [102 ], "type": "copper/internal", "uuid": "2/4"} + ]}}} + ]} + }, + { + "device_id": {"device_uuid": {"uuid": "DE3"}}, "device_type": "emu-packet-router", "device_drivers": [0], + "device_endpoints": [], "device_operational_status": 0, "device_config": {"config_rules": [ + {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}}, + {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}}, + {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {"endpoints": [ + {"sample_types": [], "type": "copper/internal", "uuid": "1/1"}, + {"sample_types": [], "type": "copper/internal", "uuid": "1/2"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/1"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/2"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/3"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/4"} + ]}}} + ]} + }, + { + "device_id": {"device_uuid": {"uuid": "DE4"}}, "device_type": "emu-packet-router", "device_drivers": [0], + "device_endpoints": [], "device_operational_status": 0, "device_config": {"config_rules": [ + {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}}, + {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}}, + {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {"endpoints": [ + {"sample_types": [], "type": "copper/internal", "uuid": "1/1"}, + {"sample_types": [], "type": "copper/internal", "uuid": "1/2"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/1"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/2"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/3"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/4"} + ]}}} + ]} + } + ], + "links": [ + + { + "link_id": {"link_uuid": {"uuid": "DE1/2/2==DE2/2/1"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/2"}}, + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/1"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE1/2/3==DE3/2/1"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/3"}}, + {"device_id": {"device_uuid": {"uuid": "DE3"}}, "endpoint_uuid": {"uuid": "2/1"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE1/2/4==DE4/2/1"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/4"}}, + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/1"}} + ] + }, + + { + "link_id": {"link_uuid": {"uuid": "DE2/2/1==DE1/2/2"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/1"}}, + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/2"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE2/2/3==DE3/2/2"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/3"}}, + {"device_id": {"device_uuid": {"uuid": "DE3"}}, "endpoint_uuid": {"uuid": "2/2"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE2/2/4==DE4/2/2"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/4"}}, + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/2"}} + ] + }, + + { + "link_id": {"link_uuid": {"uuid": "DE3/2/1==DE1/2/3"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE3"}}, "endpoint_uuid": {"uuid": "2/1"}}, + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/3"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE3/2/2==DE2/2/3"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE3"}}, "endpoint_uuid": {"uuid": "2/2"}}, + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/3"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE4/2/2==DE2/2/4"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/2"}}, + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/4"}} + ] + }, + + { + "link_id": {"link_uuid": {"uuid": "DE4/2/1==DE1/2/4"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/1"}}, + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/4"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE4/2/2==DE2/2/4"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/2"}}, + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/4"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE4/2/3==DE3/2/4"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/3"}}, + {"device_id": {"device_uuid": {"uuid": "DE3"}}, "endpoint_uuid": {"uuid": "2/4"}} + ] + } + ] +} diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index f74e97ffd4998ca0b3255ca4e1ebe496ebc6737b..955036495f670dc8d126a0682917dfc90acba185 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -13,7 +13,6 @@ # limitations under the License. import json -import threading from typing import Any, Dict import grpc import logging @@ -29,7 +28,6 @@ from telemetry.database.Telemetry_DB import TelemetryDB from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Producer as KafkaProducer -from confluent_kafka import KafkaError LOGGER = logging.getLogger(__name__) @@ -49,7 +47,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartCollector(self, - request : Collector, grpc_context: grpc.ServicerContext # type: ignore + request : Collector, context: grpc.ServicerContext # type: ignore ) -> CollectorId: # type: ignore LOGGER.info ("gRPC message: {:}".format(request)) response = CollectorId() @@ -86,7 +84,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StopCollector(self, - request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore + request : CollectorId, context: grpc.ServicerContext # type: ignore ) -> Empty: # type: ignore LOGGER.info ("gRPC message: {:}".format(request)) try: @@ -125,7 +123,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectCollectors(self, - request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore + request : CollectorFilter, context: grpc.ServicerContext # type: ignore ) -> CollectorList: # type: ignore LOGGER.info("gRPC message: {:}".format(request)) response = CollectorList() @@ -145,58 +143,6 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def delivery_callback(self, err, msg): - """ - Callback function to handle message delivery status. - Args: - err (KafkaError): Kafka error object. - msg (Message): Kafka message object. - """ if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) - # print('Message delivery failed: {:}'.format(err)) - # else: - # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) - # print('Message delivered to topic {:}'.format(msg.topic())) - - # ---------- Independent Method --------------- - # Listener method is independent of any method (same lifetime as service) - # continously listens for responses - def install_servicers(self): - threading.Thread(target=self.ResponseListener).start() - - def ResponseListener(self): - """ - listener for response on Kafka topic. - """ - self.kafka_consumer.subscribe([KafkaTopic.TELEMETRY_RESPONSE.value]) - while True: - receive_msg = self.kafka_consumer.poll(2.0) - if receive_msg is None: - continue - elif receive_msg.error(): - if receive_msg.error().code() == KafkaError._PARTITION_EOF: - continue - else: - # print("Consumer error: {:}".format(receive_msg.error())) - LOGGER.error("Consumer error: {:}".format(receive_msg.error())) - break - try: - collector_id = receive_msg.key().decode('utf-8') - if collector_id in ACTIVE_COLLECTORS: - kpi_value = json.loads(receive_msg.value().decode('utf-8')) - self.process_response(collector_id, kpi_value['kpi_id'], kpi_value['kpi_value']) - else: - # print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") - LOGGER.info("collector id does not match.\nRespone ID: {:} --- Active IDs: {:}".format(collector_id, ACTIVE_COLLECTORS)) - except Exception as e: - # print(f"Error extarcting msg key or value: {str(e)}") - LOGGER.info("Error extarcting msg key or value: {:}".format(e)) - continue - - def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any): - if kpi_id == "-1" and kpi_value == -1: - # print ("Backend termination confirmation for collector id: ", collector_id) - LOGGER.info("Backend termination confirmation for collector id: {:}".format(collector_id)) - else: - LOGGER.info("Backend termination confirmation for collector id: {:}".format(collector_id)) - # print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value) + diff --git a/src/telemetry/frontend/service/__main__.py b/src/telemetry/frontend/service/__main__.py index e1b9dba4e97fe30b962a1deb9050c67671cbe976..874b34b8c7ae7800b323d427e9347798b22cf7bc 100644 --- a/src/telemetry/frontend/service/__main__.py +++ b/src/telemetry/frontend/service/__main__.py @@ -18,6 +18,8 @@ from common.Settings import get_log_level, get_metrics_port from .TelemetryFrontendService import TelemetryFrontendService from telemetry.database.TelemetryModel import Collector as Model from common.tools.database.GenericDatabase import Database +from common.tools.kafka.Variables import KafkaTopic + terminate = threading.Event() LOGGER = None @@ -43,6 +45,8 @@ def main(): kpiDBobj.create_database() kpiDBobj.create_tables() + KafkaTopic.create_all_topics() + # Start metrics server metrics_port = get_metrics_port() start_http_server(metrics_port) diff --git a/src/telemetry/frontend/tests/Messages.py b/src/telemetry/frontend/tests/Messages.py index 177bcc0b7e3829d2cdfd54c404618af9ebe43161..d766f68fac4fdf978543cc94a151fbca81d9b0de 100644 --- a/src/telemetry/frontend/tests/Messages.py +++ b/src/telemetry/frontend/tests/Messages.py @@ -30,16 +30,17 @@ def create_collector_request(): # _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) _create_collector_request.collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" # _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) - _create_collector_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + # _create_collector_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _create_collector_request.kpi_id.kpi_id.uuid = "8c5ca114-cdc7-4081-b128-b667fd159832" # _create_collector_request.duration_s = float(random.randint(8, 16)) - _create_collector_request.duration_s = -1 - _create_collector_request.interval_s = float(random.randint(3, 5)) + _create_collector_request.duration_s = float(random.randint(40, 60)) + _create_collector_request.interval_s = float(random.randint(5, 7)) return _create_collector_request def create_collector_filter(): _create_collector_filter = telemetry_frontend_pb2.CollectorFilter() kpi_id_obj = KpiId() # kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) - kpi_id_obj.kpi_id.uuid = "a7237fa3-caf4-479d-84b6-4d9f9738fb7f" + kpi_id_obj.kpi_id.uuid = "8c5ca114-cdc7-4081-b128-b667fd159832" _create_collector_filter.kpi_id.append(kpi_id_obj) return _create_collector_filter diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index 067925a285f6e6d69b89b518e6a96c1ed495b7e0..767a1f73f2ebd73f88c71ac44e8ce87efb37bebd 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -15,6 +15,7 @@ import os import pytest import logging +import time from common.Constants import ServiceNameEnum from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList @@ -42,6 +43,16 @@ os.environ[get_env_var_name(ServiceNameEnum.TELEMETRY, ENVVAR_SUFIX_SERVICE_PORT LOGGER = logging.getLogger(__name__) +@pytest.fixture(autouse=True) +def log_all_methods(request): + ''' + This fixture logs messages before and after each test function runs, indicating the start and end of the test. + The autouse=True parameter ensures that this logging happens automatically for all tests in the module. + ''' + LOGGER.info(f" >>>>> Starting test: {request.node.name} ") + yield + LOGGER.info(f" <<<<< Finished test: {request.node.name} ") + @pytest.fixture(scope='session') def telemetryFrontend_service(): LOGGER.info('Initializing TelemetryFrontendService...') @@ -79,36 +90,24 @@ def telemetryFrontend_client( # Tests Implementation of Telemetry Frontend ########################### -# ------- Re-structuring Test --------- -# --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) - # ----- core funtionality test ----- def test_StartCollector(telemetryFrontend_client): - LOGGER.info(' >>> test_StartCollector START: <<< ') + # LOGGER.info(' >>> test_StartCollector START: <<< ') response = telemetryFrontend_client.StartCollector(create_collector_request()) LOGGER.debug(str(response)) assert isinstance(response, CollectorId) -def test_StopCollector(telemetryFrontend_client): - LOGGER.info(' >>> test_StopCollector START: <<< ') - response = telemetryFrontend_client.StopCollector(create_collector_id()) - LOGGER.debug(str(response)) - assert isinstance(response, Empty) - def test_SelectCollectors(telemetryFrontend_client): LOGGER.info(' >>> test_SelectCollectors START: <<< ') response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) LOGGER.debug(str(response)) assert isinstance(response, CollectorList) -# # ----- Non-gRPC method tests ----- -# def test_RunResponseListener(): -# LOGGER.info(' >>> test_RunResponseListener START: <<< ') -# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl() -# response = TelemetryFrontendServiceObj.RunResponseListener() # becasue Method "run_kafka_listener" is not define in frontend.proto -# LOGGER.debug(str(response)) -# assert isinstance(response, bool) +def test_StopCollector(telemetryFrontend_client): + # LOGGER.info(' >>> test_StopCollector START: <<< ') + # LOGGER.info("Waiting before termination...") + # time.sleep(30) + response = telemetryFrontend_client.StopCollector(create_collector_id()) + LOGGER.debug(str(response)) + assert isinstance(response, Empty) +