Skip to content
Snippets Groups Projects
Commit 013799d6 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Merge branch 'feat/249-cttc-telemetry-enhancement' into...

Merge branch 'feat/249-cttc-telemetry-enhancement' into feat/192-cttc-implement-telemetry-backend-collector-gnmi-openconfig
parents 161e2f94 87685c9c
No related branches found
No related tags found
1 merge request!289Draft: Resolve "(CTTC) Implement Telemetry Backend Collector gNMI/OpenConfig"
Showing
with 531 additions and 70 deletions
...@@ -179,5 +179,3 @@ libyang/ ...@@ -179,5 +179,3 @@ libyang/
# Other logs # Other logs
**/logs/*.log.* **/logs/*.log.*
# PySpark checkpoints
src/analytics/.spark/*
...@@ -215,6 +215,9 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"} ...@@ -215,6 +215,9 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"}
# Deploy Apache Kafka # Deploy Apache Kafka
./deploy/kafka.sh ./deploy/kafka.sh
#Deploy Monitoring (Prometheus, Mimir, Grafana)
./deploy/monitoring.sh
# Expose Dashboard # Expose Dashboard
./deploy/expose_dashboard.sh ./deploy/expose_dashboard.sh
......
#!/bin/bash
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -euo pipefail
# -----------------------------------------------------------
# Global namespace for all deployments
# -----------------------------------------------------------
NAMESPACE="monitoring"
VALUES_FILE_PATH="manifests/monitoring"
# -----------------------------------------------------------
# Prometheus Configuration
# -----------------------------------------------------------
RELEASE_NAME_PROM="mon-prometheus"
CHART_REPO_NAME_PROM="prometheus-community"
CHART_REPO_URL_PROM="https://prometheus-community.github.io/helm-charts"
CHART_NAME_PROM="prometheus"
VALUES_FILE_PROM="$VALUES_FILE_PATH/prometheus_values.yaml"
# -----------------------------------------------------------
# Mimir Configuration
# -----------------------------------------------------------
RELEASE_NAME_MIMIR="mon-mimir"
CHART_REPO_NAME_MIMIR="grafana"
CHART_REPO_URL_MIMIR="https://grafana.github.io/helm-charts"
CHART_NAME_MIMIR="mimir-distributed"
VALUES_FILE_MIMIR="$VALUES_FILE_PATH/mimir_values.yaml"
# -----------------------------------------------------------
# Grafana Configuration
# -----------------------------------------------------------
# RELEASE_NAME_GRAFANA="mon-grafana"
# CHART_REPO_NAME_GRAFANA="grafana"
# CHART_REPO_URL_GRAFANA="https://grafana.github.io/helm-charts"
# CHART_NAME_GRAFANA="grafana"
# VALUES_FILE_GRAFANA="$VALUES_FILE_PATH/grafana_values.yaml"
# -----------------------------------------------------------
# Function to deploy or upgrade a Helm chart
# -----------------------------------------------------------
deploy_chart() {
local release_name="$1"
local chart_repo_name="$2"
local chart_repo_url="$3"
local chart_name="$4"
local values_file="$5"
local namespace="$6"
echo ">>> Deploying [${release_name}] from repo [${chart_repo_name}]..."
# Add or update the Helm repo
echo "Adding/updating Helm repo: $chart_repo_name -> $chart_repo_url"
helm repo add "$chart_repo_name" "$chart_repo_url" || true
helm repo update
# Create namespace if needed
echo "Creating namespace '$namespace' if it doesn't exist..."
kubectl get namespace "$namespace" >/dev/null 2>&1 || kubectl create namespace "$namespace"
# Install or upgrade the chart
if [ -n "$values_file" ] && [ -f "$values_file" ]; then
echo "Installing/Upgrading $release_name using custom values from $values_file..."
helm upgrade --install "$release_name" "$chart_repo_name/$chart_name" \
--namespace "$namespace" \
--values "$values_file"
else
echo "Installing/Upgrading $release_name with default chart values..."
helm upgrade --install "$release_name" "$chart_repo_name/$chart_name" \
--namespace "$namespace"
fi
echo "<<< Deployment initiated for [$release_name]."
echo
}
# -----------------------------------------------------------
# Actual Deployments
# -----------------------------------------------------------
# 1) Deploy Prometheus
deploy_chart "$RELEASE_NAME_PROM" \
"$CHART_REPO_NAME_PROM" \
"$CHART_REPO_URL_PROM" \
"$CHART_NAME_PROM" \
"$VALUES_FILE_PROM" \
"$NAMESPACE"
# Optionally wait for Prometheus server pod to become ready
kubectl rollout status deployment/"$RELEASE_NAME_PROM-server" -n "$NAMESPACE" || true
# 2) Deploy Mimir
deploy_chart "$RELEASE_NAME_MIMIR" \
"$CHART_REPO_NAME_MIMIR" \
"$CHART_REPO_URL_MIMIR" \
"$CHART_NAME_MIMIR" \
"$VALUES_FILE_MIMIR" \
"$NAMESPACE"
# Depending on how Mimir runs (StatefulSets, Deployments), you can wait for
# the correct resource to be ready. For example:
# kubectl rollout status statefulset/"$RELEASE_NAME_MIMIR-distributor" -n "$NAMESPACE" || true
# 3) Deploy Grafana
# deploy_chart "$RELEASE_NAME_GRAFANA" \
# "$CHART_REPO_NAME_GRAFANA" \
# "$CHART_REPO_URL_GRAFANA" \
# "$CHART_NAME_GRAFANA" \
# "$VALUES_FILE_GRAFANA" \
# "$NAMESPACE"
# kubectl rollout status deployment/"$RELEASE_NAME_GRAFANA" -n "$NAMESPACE" || true
# -----------------------------------------------------------
echo "All deployments completed!"
...@@ -39,6 +39,8 @@ spec: ...@@ -39,6 +39,8 @@ spec:
env: env:
- name: LOG_LEVEL - name: LOG_LEVEL
value: "INFO" value: "INFO"
- name: PUSHGATEWAY_URL
value: "http://mon-prometheus-prometheus-pushgateway.monitoring.svc.cluster.local:9091"
envFrom: envFrom:
- secretRef: - secretRef:
name: kfk-kpi-data name: kfk-kpi-data
......
rbac:
create: true
## Use an existing ClusterRole/Role (depending on rbac.namespaced false/true)
# useExistingRole: name-of-some-role
# useExistingClusterRole: name-of-some-clusterRole
pspEnabled: false
pspUseAppArmor: false
namespaced: false
serviceAccount:
create: true
name:
nameTest:
## ServiceAccount labels.
automountServiceAccountToken: false
replicas: 1
## Create a headless service for the deployment
headlessService: false
## Should the service account be auto mounted on the pod
automountServiceAccountToken: true
## Create HorizontalPodAutoscaler object for deployment type
#
autoscaling:
enabled: false
minReplicas: 1
maxReplicas: 3
targetCPU: "60"
targetMemory: ""
behavior: {}
deploymentStrategy:
type: RollingUpdate
readinessProbe:
httpGet:
path: /api/health
port: 3000
livenessProbe:
httpGet:
path: /api/health
port: 3000
initialDelaySeconds: 60
timeoutSeconds: 30
failureThreshold: 10
image:
registry: docker.io
repository: grafana/grafana
# Overrides the Grafana image tag whose default is the chart appVersion
tag: ""
sha: ""
pullPolicy: IfNotPresent
## Optionally specify an array of imagePullSecrets.
## Secrets must be manually created in the namespace.
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/
## Can be templated.
##
pullSecrets: []
# - myRegistrKeySecretName
testFramework:
enabled: true
## The type of Helm hook used to run this test. Defaults to test.
## ref: https://helm.sh/docs/topics/charts_hooks/#the-available-hooks
##
# hookType: test
image:
# -- The Docker registry
registry: docker.io
repository: bats/bats
tag: "v1.4.1"
imagePullPolicy: IfNotPresent
# dns configuration for pod
dnsPolicy: ~
dnsConfig: {}
# nameservers:
# - 8.8.8.8
# options:
# - name: ndots
# value: "2"
# - name: edns0
securityContext:
runAsNonRoot: true
runAsUser: 472
runAsGroup: 472
fsGroup: 472
containerSecurityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
seccompProfile:
type: RuntimeDefault
# Enable creating the grafana configmap
createConfigmap: true
downloadDashboardsImage:
registry: docker.io
repository: curlimages/curl
tag: 8.9.1
sha: ""
pullPolicy: IfNotPresent
downloadDashboards:
env: {}
envFromSecret: ""
resources: {}
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
seccompProfile:
type: RuntimeDefault
envValueFrom: {}
# ENV_NAME:
# configMapKeyRef:
# name: configmap-name
# key: value_key
## Pod Annotations
# podAnnotations: {}
## ConfigMap Annotations
# configMapAnnotations: {}
# argocd.argoproj.io/sync-options: Replace=true
## Pod Labels
# podLabels: {}
podPortName: grafana
gossipPortName: gossip
## Deployment annotations
# annotations: {}
service:
enabled: true
type: NodePort
port: 80
targetPort: 3000
nodePort: 30080
portName: service
## Enable persistence using Persistent Volume Claims
## ref: https://kubernetes.io/docs/user-guide/persistent-volumes/
##
persistence:
type: pvc
enabled: true
# storageClassName: default
accessModes:
- ReadWriteOnce
size: 10Gi
# annotations: {}
finalizers:
- kubernetes.io/pvc-protection
disableWarning: false
## If 'lookupVolumeName' is set to true, Helm will attempt to retrieve
## the current value of 'spec.volumeName' and incorporate it into the template.
lookupVolumeName: true
# Administrator credentials when not using an existing secret (see below)
adminUser: admin
# adminPassword: strongpassword
# Use an existing secret for the admin user.
admin:
## Name of the secret. Can be templated.
existingSecret: ""
userKey: admin-user
passwordKey: admin-password
## Configure grafana datasources
## ref: http://docs.grafana.org/administration/provisioning/#datasources
##
datasources:
datasources.yaml:
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
url: http://mon-prometheus-server.monitoring.svc.cluster.local
access: proxy
isDefault: true
- name: Mimir
type: prometheus
url: http://mimir-nginx.mon-mimir.svc:80/prometheus
access: proxy
isDefault: false
## Grafana's primary configuration
## NOTE: values in map will be converted to ini format
## ref: http://docs.grafana.org/installation/configuration/
##
grafana.ini:
paths:
data: /var/lib/grafana/
logs: /var/log/grafana
plugins: /var/lib/grafana/plugins
provisioning: /etc/grafana/provisioning
analytics:
check_for_updates: true
log:
mode: console
grafana_net:
url: https://grafana.net
server:
domain: "{{ if (and .Values.ingress.enabled .Values.ingress.hosts) }}{{ tpl (.Values.ingress.hosts | first) . }}{{ else }}''{{ end }}"
## Number of old ReplicaSets to retain
##
revisionHistoryLimit: 5
# assertNoLeakedSecrets is a helper function defined in _helpers.tpl that checks if secret
# values are not exposed in the rendered grafana.ini configmap. It is enabled by default.
#
# To pass values into grafana.ini without exposing them in a configmap, use variable expansion:
# https://grafana.com/docs/grafana/latest/setup-grafana/configure-grafana/#variable-expansion
#
# Alternatively, if you wish to allow secret values to be exposed in the rendered grafana.ini configmap,
# you can disable this check by setting assertNoLeakedSecrets to false.
assertNoLeakedSecrets: true
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Configuration for Prometheus components and server settings
# Global Prometheus configuration
alertmanager:
enabled: false # Default is true
kube-state-metrics:
enabled: false # Default is true
prometheus-node-exporter:
enabled: false # Default is true
prometheus-pushgateway:
enabled: true # Default is true
# Prometheus server-specific configuration
server:
retention: "30d"
logLevel: "debug"
resources:
requests:
cpu: "250m"
memory: "256Mi"
limits:
cpu: "1"
memory: "1Gi"
# Expose the Prometheus server via a Kubernetes service
service:
type: NodePort
nodePort: 30090
extraScrapeConfigs:
- job_name: 'pushgateway'
static_configs:
- targets:
- 'prometheus-pushgateway.monitoring.svc.cluster.local:9091' # Push Gateway endpoint
# Global Prometheus settings:
global:
scrape_interval: 10s
evaluation_interval: 10s
...@@ -19,6 +19,7 @@ PROJECTDIR=`pwd` ...@@ -19,6 +19,7 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src cd $PROJECTDIR/src
export KFK_SERVER_ADDRESS='127.0.0.1:9092' export KFK_SERVER_ADDRESS='127.0.0.1:9092'
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
kpi_value_writer/tests/test_kpi_value_writer.py kpi_value_writer/tests/test_kpi_value_writer.py
...@@ -25,5 +25,5 @@ export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetr ...@@ -25,5 +25,5 @@ export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetr
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=debug --log-cli-level=debug --verbose \ python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
telemetry/backend/tests/test_backend.py telemetry/backend/tests/test_backend.py
...@@ -18,10 +18,11 @@ PROJECTDIR=`pwd` ...@@ -18,10 +18,11 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py
export KFK_SERVER_ADDRESS='127.0.0.1:9092' export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require" export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
telemetry/frontend/tests/test_frontend.py telemetry/frontend/tests/test_frontend.py
...@@ -16,8 +16,11 @@ import logging, signal, sys, threading ...@@ -16,8 +16,11 @@ import logging, signal, sys, threading
from prometheus_client import start_http_server from prometheus_client import start_http_server
from common.Settings import get_log_level, get_metrics_port from common.Settings import get_log_level, get_metrics_port
from .AnalyticsBackendService import AnalyticsBackendService from .AnalyticsBackendService import AnalyticsBackendService
from common.tools.kafka.Variables import KafkaTopic
terminate = threading.Event() terminate = threading.Event()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
LOGGER = None LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
...@@ -36,6 +39,8 @@ def main(): ...@@ -36,6 +39,8 @@ def main():
LOGGER.info('Starting...') LOGGER.info('Starting...')
KafkaTopic.create_all_topics()
# Start metrics server # Start metrics server
metrics_port = get_metrics_port() metrics_port = get_metrics_port()
start_http_server(metrics_port) start_http_server(metrics_port)
......
...@@ -18,8 +18,11 @@ from common.Settings import get_log_level, get_metrics_port ...@@ -18,8 +18,11 @@ from common.Settings import get_log_level, get_metrics_port
from .AnalyticsFrontendService import AnalyticsFrontendService from .AnalyticsFrontendService import AnalyticsFrontendService
from analytics.database.AnalyzerModel import Analyzer as Model from analytics.database.AnalyzerModel import Analyzer as Model
from common.tools.database.GenericDatabase import Database from common.tools.database.GenericDatabase import Database
from common.tools.kafka.Variables import KafkaTopic
terminate = threading.Event() terminate = threading.Event()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
LOGGER = None LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
...@@ -43,6 +46,8 @@ def main(): ...@@ -43,6 +46,8 @@ def main():
kpiDBobj.create_database() kpiDBobj.create_database()
kpiDBobj.create_tables() kpiDBobj.create_tables()
KafkaTopic.create_all_topics()
# Start metrics server # Start metrics server
metrics_port = get_metrics_port() metrics_port = get_metrics_port()
start_http_server(metrics_port) start_http_server(metrics_port)
......
...@@ -56,7 +56,7 @@ def delay_linear(initial=0, increment=0, maximum=None): ...@@ -56,7 +56,7 @@ def delay_linear(initial=0, increment=0, maximum=None):
return delay return delay
return compute return compute
def delay_exponential(initial=1, increment=1, maximum=None): def delay_exponential(initial=1.0, increment=1.0, maximum=None):
def compute(num_try): def compute(num_try):
delay = initial * pow(increment, (num_try - 1)) delay = initial * pow(increment, (num_try - 1))
if maximum is not None: if maximum is not None:
......
...@@ -33,8 +33,8 @@ class Engine: ...@@ -33,8 +33,8 @@ class Engine:
CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
try: try:
engine = sqlalchemy.create_engine(crdb_uri, echo=False) engine = sqlalchemy.create_engine(crdb_uri, echo=False)
LOGGER.info(' AnalyzerDB initalized with DB URL: {:}'.format(crdb_uri)) LOGGER.info(' Database initalized with DB URL: {:}'.format(crdb_uri))
return engine
except: # pylint: disable=bare-except # pragma: no cover except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
return None # type: ignore return None # type: ignore
return engine
...@@ -22,13 +22,16 @@ from ._GrpcToEnum import grpc_to_enum ...@@ -22,13 +22,16 @@ from ._GrpcToEnum import grpc_to_enum
# BYTES_RECEIVED. If item name does not match, automatic mapping of # BYTES_RECEIVED. If item name does not match, automatic mapping of
# proto enums to database enums will fail. # proto enums to database enums will fail.
class ORM_KpiSampleTypeEnum(enum.Enum): class ORM_KpiSampleTypeEnum(enum.Enum):
UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN # 0
PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED # 101
PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # 102
BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED PACKETS_DROPPED = KpiSampleType.KPISAMPLETYPE_PACKETS_DROPPED # 103
BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED # 201
LINK_TOTAL_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED # 202
LINK_USED_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS BYTES_DROPPED = KpiSampleType.KPISAMPLETYPE_BYTES_DROPPED # 203
LINK_TOTAL_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS # 301
LINK_USED_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS # 302
grpc_to_enum__kpi_sample_type = functools.partial( grpc_to_enum__kpi_sample_type = functools.partial(
grpc_to_enum, KpiSampleType, ORM_KpiSampleTypeEnum) grpc_to_enum, KpiSampleType, ORM_KpiSampleTypeEnum)
...@@ -26,15 +26,15 @@ def create_kpi_id_request(): ...@@ -26,15 +26,15 @@ def create_kpi_id_request():
def create_kpi_descriptor_request(descriptor_name: str = "Test_name"): def create_kpi_descriptor_request(descriptor_name: str = "Test_name"):
_create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request = kpi_manager_pb2.KpiDescriptor()
_create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4())
# _create_kpi_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" # _create_kpi_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
# _create_kpi_request.kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_kpi_request.kpi_id.kpi_id.uuid = "f974b6cc-095f-4767-b8c1-3457b383fb99"
_create_kpi_request.kpi_description = descriptor_name _create_kpi_request.kpi_description = descriptor_name
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV2' _create_kpi_request.device_id.device_uuid.uuid = str(uuid.uuid4())
_create_kpi_request.service_id.service_uuid.uuid = 'SERV2' _create_kpi_request.service_id.service_uuid.uuid = 'SERV2'
_create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1'
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' _create_kpi_request.endpoint_id.endpoint_uuid.uuid = str(uuid.uuid4())
_create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1'
_create_kpi_request.link_id.link_uuid.uuid = 'LNK1' _create_kpi_request.link_id.link_uuid.uuid = 'LNK1'
return _create_kpi_request return _create_kpi_request
...@@ -77,4 +77,4 @@ def create_kpi_filter_request(): ...@@ -77,4 +77,4 @@ def create_kpi_filter_request():
_create_kpi_filter_request.connection_id.append(connection_id_obj) _create_kpi_filter_request.connection_id.append(connection_id_obj)
_create_kpi_filter_request.link_id.append(link_id_obj) _create_kpi_filter_request.link_id.append(link_id_obj)
return _create_kpi_filter_request return _create_kpi_filter_request
\ No newline at end of file
...@@ -15,25 +15,21 @@ ...@@ -15,25 +15,21 @@
import json import json
import logging import logging
import threading import threading
from confluent_kafka import KafkaError
from confluent_kafka import Consumer as KafkaConsumer
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.Settings import get_service_port_grpc from common.Settings import get_service_port_grpc
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.tools.service.GenericGrpcService import GenericGrpcService from common.tools.service.GenericGrpcService import GenericGrpcService
from confluent_kafka import KafkaError
from confluent_kafka import Consumer as KafkaConsumer
from kpi_manager.client.KpiManagerClient import KpiManagerClient from kpi_manager.client.KpiManagerClient import KpiManagerClient
# -- test import --
# from kpi_value_writer.tests.test_messages import create_kpi_descriptor_request
from .MetricWriterToPrometheus import MetricWriterToPrometheus from .MetricWriterToPrometheus import MetricWriterToPrometheus
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
ACTIVE_CONSUMERS = []
class KpiValueWriter(GenericGrpcService): class KpiValueWriter(GenericGrpcService):
def __init__(self, cls_name : str = __name__) -> None: def __init__(self, cls_name : str = __name__) -> None:
...@@ -43,9 +39,8 @@ class KpiValueWriter(GenericGrpcService): ...@@ -43,9 +39,8 @@ class KpiValueWriter(GenericGrpcService):
'group.id' : 'KpiValueWriter', 'group.id' : 'KpiValueWriter',
'auto.offset.reset' : 'latest'}) 'auto.offset.reset' : 'latest'})
def RunKafkaConsumer(self): def install_servicers(self):
thread = threading.Thread(target=self.KafkaKpiConsumer, args=()) thread = threading.Thread(target=self.KafkaKpiConsumer, args=())
ACTIVE_CONSUMERS.append(thread)
thread.start() thread.start()
def KafkaKpiConsumer(self): def KafkaKpiConsumer(self):
...@@ -55,7 +50,6 @@ class KpiValueWriter(GenericGrpcService): ...@@ -55,7 +50,6 @@ class KpiValueWriter(GenericGrpcService):
consumer = self.kafka_consumer consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.VALUE.value]) consumer.subscribe([KafkaTopic.VALUE.value])
LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
while True: while True:
raw_kpi = consumer.poll(1.0) raw_kpi = consumer.poll(1.0)
if raw_kpi is None: if raw_kpi is None:
...@@ -69,30 +63,21 @@ class KpiValueWriter(GenericGrpcService): ...@@ -69,30 +63,21 @@ class KpiValueWriter(GenericGrpcService):
try: try:
kpi_value = json.loads(raw_kpi.value().decode('utf-8')) kpi_value = json.loads(raw_kpi.value().decode('utf-8'))
LOGGER.info("Received KPI : {:}".format(kpi_value)) LOGGER.info("Received KPI : {:}".format(kpi_value))
print("Received KPI : {:}".format(kpi_value))
self.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer) self.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer)
except Exception as e: except:
print("Error detail: {:}".format(e)) LOGGER.exception("Error detail: ")
continue continue
def get_kpi_descriptor(self, kpi_value: str, kpi_manager_client, metric_writer): def get_kpi_descriptor(self, kpi_value: str, kpi_manager_client, metric_writer):
print("--- START -----")
kpi_id = KpiId() kpi_id = KpiId()
kpi_id.kpi_id.uuid = kpi_value['kpi_uuid'] kpi_id.kpi_id.uuid = kpi_value['kpi_id'] # type: ignore
print("KpiId generated: {:}".format(kpi_id))
# print("Kpi manger client created: {:}".format(kpi_manager_client))
try: try:
kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
# TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object???
if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid: if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid:
LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
metric_writer.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value) metric_writer.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value)
else: else:
LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) LOGGER.info("No KPI Descriptor found in Database for Kpi ID: {:}".format(kpi_id))
print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
except Exception as e: except Exception as e:
LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e)) LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e))
print ("Unable to get KpiDescriptor. Error: {:}".format(e))
...@@ -14,15 +14,20 @@ ...@@ -14,15 +14,20 @@
# read Kafka stream from Kafka topic # read Kafka stream from Kafka topic
import os
import logging import logging
from prometheus_client import Gauge
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_value_api_pb2 import KpiValue from prometheus_client import Gauge
from common.proto.kpi_manager_pb2 import KpiDescriptor from prometheus_client.exposition import push_to_gateway
from prometheus_client.registry import CollectorRegistry
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.kpi_manager_pb2 import KpiDescriptor
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
PROM_METRICS = {} PROM_METRICS = {}
GATEWAY_URL = os.getenv('PUSHGATEWAY_URL', 'prometheus-pushgateway.monitoring.svc.cluster.local:9091')
class MetricWriterToPrometheus: class MetricWriterToPrometheus:
''' '''
...@@ -30,7 +35,9 @@ class MetricWriterToPrometheus: ...@@ -30,7 +35,9 @@ class MetricWriterToPrometheus:
cooked KPI value = KpiDescriptor (gRPC message) + KpiValue (gRPC message) cooked KPI value = KpiDescriptor (gRPC message) + KpiValue (gRPC message)
''' '''
def __init__(self): def __init__(self):
pass self.job_name = 'kpivaluewriter'
self.registry = CollectorRegistry()
self.gateway_url = GATEWAY_URL
def merge_kpi_descriptor_and_kpi_value(self, kpi_descriptor, kpi_value): def merge_kpi_descriptor_and_kpi_value(self, kpi_descriptor, kpi_value):
# Creating a dictionary from the kpi_descriptor's attributes # Creating a dictionary from the kpi_descriptor's attributes
...@@ -44,26 +51,27 @@ class MetricWriterToPrometheus: ...@@ -44,26 +51,27 @@ class MetricWriterToPrometheus:
'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid, 'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid,
'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid, 'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid,
'link_id' : kpi_descriptor.link_id.link_uuid.uuid, 'link_id' : kpi_descriptor.link_id.link_uuid.uuid,
'time_stamp' : kpi_value.timestamp.timestamp, 'time_stamp' : kpi_value["time_stamp"],
'kpi_value' : kpi_value.kpi_value_type.floatVal 'kpi_value' : kpi_value["kpi_value"]
} }
LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi)) LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi))
return cooked_kpi return cooked_kpi
def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue): def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue):
# merge both gRPC messages into single varible. # merge both gRPC messages into single varible.
cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value)
tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'}
metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags
metric_name = cooked_kpi['kpi_sample_type'] metric_name = cooked_kpi['kpi_sample_type']
try: try:
if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists
PROM_METRICS[metric_name] = Gauge ( PROM_METRICS[metric_name] = Gauge (
metric_name, metric_name,
cooked_kpi['kpi_description'], cooked_kpi['kpi_description'],
metric_tags metric_tags,
registry=self.registry
) )
LOGGER.debug("Metric is created with labels: {:}".format(metric_tags)) LOGGER.debug("Metric is created with labels: {:}".format(metric_tags))
PROM_METRICS[metric_name].labels( PROM_METRICS[metric_name].labels(
kpi_id = cooked_kpi['kpi_id'], kpi_id = cooked_kpi['kpi_id'],
device_id = cooked_kpi['device_id'], device_id = cooked_kpi['device_id'],
...@@ -74,7 +82,11 @@ class MetricWriterToPrometheus: ...@@ -74,7 +82,11 @@ class MetricWriterToPrometheus:
link_id = cooked_kpi['link_id'], link_id = cooked_kpi['link_id'],
time_stamp = cooked_kpi['time_stamp'], time_stamp = cooked_kpi['time_stamp'],
).set(float(cooked_kpi['kpi_value'])) ).set(float(cooked_kpi['kpi_value']))
LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name])) LOGGER.debug("Metric is being pushed to the Gateway ... : {:}".format(PROM_METRICS[metric_name]))
# Push to the Prometheus Gateway, Prometheus is preconfigured to scrap the metrics from the gateway
push_to_gateway(self.gateway_url, job=self.job_name, registry=self.registry)
LOGGER.debug("Metric pushed to Prometheus Gateway.")
except ValueError as e: except ValueError as e:
if 'Duplicated timeseries' in str(e): if 'Duplicated timeseries' in str(e):
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
# limitations under the License. # limitations under the License.
import logging, signal, sys, threading import logging, signal, sys, threading
from prometheus_client import start_http_server
from kpi_value_writer.service.KpiValueWriter import KpiValueWriter from kpi_value_writer.service.KpiValueWriter import KpiValueWriter
from common.Settings import get_log_level from common.Settings import get_log_level
...@@ -39,8 +38,6 @@ def main(): ...@@ -39,8 +38,6 @@ def main():
grpc_service = KpiValueWriter() grpc_service = KpiValueWriter()
grpc_service.start() grpc_service.start()
start_http_server(10808)
LOGGER.debug("Prometheus client is started on port 10808")
# Wait for Ctrl+C or termination signal # Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass while not terminate.wait(timeout=1.0): pass
......
...@@ -12,14 +12,35 @@ ...@@ -12,14 +12,35 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import pytest
import time
import logging import logging
from kpi_value_writer.service.KpiValueWriter import KpiValueWriter from kpi_value_writer.service.KpiValueWriter import KpiValueWriter
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from common.tools.kafka.Variables import KafkaTopic from common.tools.kafka.Variables import KafkaTopic
from test_messages import create_kpi_descriptor_request
LOGGER = logging.getLogger(__name__)
# -------- Fixtures ----------------
@pytest.fixture(autouse=True)
def log_all_methods(request):
'''
This fixture logs messages before and after each test function runs, indicating the start and end of the test.
The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
'''
LOGGER.info(f" >>>>> Starting test: {request.node.name} ")
yield
LOGGER.info(f" <<<<< Finished test: {request.node.name} ")
# @pytest.fixture(scope='module')
# def kpi_manager_client():
# LOGGER.debug("Yielding KpiManagerClient ...")
# yield KpiManagerClient(host="10.152.183.203")
# LOGGER.debug("KpiManagerClient is terminated.")
LOGGER = logging.getLogger(__name__)
# -------- Initial Test ---------------- # -------- Initial Test ----------------
def test_validate_kafka_topics(): def test_validate_kafka_topics():
...@@ -27,7 +48,15 @@ def test_validate_kafka_topics(): ...@@ -27,7 +48,15 @@ def test_validate_kafka_topics():
response = KafkaTopic.create_all_topics() response = KafkaTopic.create_all_topics()
assert isinstance(response, bool) assert isinstance(response, bool)
def test_KafkaConsumer(): # --------------
LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ") # NOT FOR GITHUB PIPELINE (Local testing only)
# kpi_value_writer = KpiValueWriter() # --------------
# kpi_value_writer.RunKafkaConsumer() # def test_KafkaConsumer(kpi_manager_client):
# # kpidescriptor = create_kpi_descriptor_request()
# # kpi_manager_client.SetKpiDescriptor(kpidescriptor)
# kpi_value_writer = KpiValueWriter()
# kpi_value_writer.KafkaKpiConsumer()
# LOGGER.debug(" waiting for timer to finish ")
# time.sleep(300)
...@@ -25,7 +25,8 @@ def create_kpi_id_request(): ...@@ -25,7 +25,8 @@ def create_kpi_id_request():
def create_kpi_descriptor_request(description: str = "Test Description"): def create_kpi_descriptor_request(description: str = "Test Description"):
_create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request = kpi_manager_pb2.KpiDescriptor()
_create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_kpi_request.kpi_id.kpi_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd"
_create_kpi_request.kpi_description = description _create_kpi_request.kpi_description = description
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV4' _create_kpi_request.device_id.device_uuid.uuid = 'DEV4'
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment