Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Commits on Source (6)
Showing
with 531 additions and 70 deletions
......@@ -179,5 +179,3 @@ libyang/
# Other logs
**/logs/*.log.*
# PySpark checkpoints
src/analytics/.spark/*
......@@ -215,6 +215,9 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"}
# Deploy Apache Kafka
./deploy/kafka.sh
#Deploy Monitoring (Prometheus, Mimir, Grafana)
./deploy/monitoring.sh
# Expose Dashboard
./deploy/expose_dashboard.sh
......
#!/bin/bash
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -euo pipefail
# -----------------------------------------------------------
# Global namespace for all deployments
# -----------------------------------------------------------
NAMESPACE="monitoring"
VALUES_FILE_PATH="manifests/monitoring"
# -----------------------------------------------------------
# Prometheus Configuration
# -----------------------------------------------------------
RELEASE_NAME_PROM="mon-prometheus"
CHART_REPO_NAME_PROM="prometheus-community"
CHART_REPO_URL_PROM="https://prometheus-community.github.io/helm-charts"
CHART_NAME_PROM="prometheus"
VALUES_FILE_PROM="$VALUES_FILE_PATH/prometheus_values.yaml"
# -----------------------------------------------------------
# Mimir Configuration
# -----------------------------------------------------------
RELEASE_NAME_MIMIR="mon-mimir"
CHART_REPO_NAME_MIMIR="grafana"
CHART_REPO_URL_MIMIR="https://grafana.github.io/helm-charts"
CHART_NAME_MIMIR="mimir-distributed"
VALUES_FILE_MIMIR="$VALUES_FILE_PATH/mimir_values.yaml"
# -----------------------------------------------------------
# Grafana Configuration
# -----------------------------------------------------------
# RELEASE_NAME_GRAFANA="mon-grafana"
# CHART_REPO_NAME_GRAFANA="grafana"
# CHART_REPO_URL_GRAFANA="https://grafana.github.io/helm-charts"
# CHART_NAME_GRAFANA="grafana"
# VALUES_FILE_GRAFANA="$VALUES_FILE_PATH/grafana_values.yaml"
# -----------------------------------------------------------
# Function to deploy or upgrade a Helm chart
# -----------------------------------------------------------
deploy_chart() {
local release_name="$1"
local chart_repo_name="$2"
local chart_repo_url="$3"
local chart_name="$4"
local values_file="$5"
local namespace="$6"
echo ">>> Deploying [${release_name}] from repo [${chart_repo_name}]..."
# Add or update the Helm repo
echo "Adding/updating Helm repo: $chart_repo_name -> $chart_repo_url"
helm repo add "$chart_repo_name" "$chart_repo_url" || true
helm repo update
# Create namespace if needed
echo "Creating namespace '$namespace' if it doesn't exist..."
kubectl get namespace "$namespace" >/dev/null 2>&1 || kubectl create namespace "$namespace"
# Install or upgrade the chart
if [ -n "$values_file" ] && [ -f "$values_file" ]; then
echo "Installing/Upgrading $release_name using custom values from $values_file..."
helm upgrade --install "$release_name" "$chart_repo_name/$chart_name" \
--namespace "$namespace" \
--values "$values_file"
else
echo "Installing/Upgrading $release_name with default chart values..."
helm upgrade --install "$release_name" "$chart_repo_name/$chart_name" \
--namespace "$namespace"
fi
echo "<<< Deployment initiated for [$release_name]."
echo
}
# -----------------------------------------------------------
# Actual Deployments
# -----------------------------------------------------------
# 1) Deploy Prometheus
deploy_chart "$RELEASE_NAME_PROM" \
"$CHART_REPO_NAME_PROM" \
"$CHART_REPO_URL_PROM" \
"$CHART_NAME_PROM" \
"$VALUES_FILE_PROM" \
"$NAMESPACE"
# Optionally wait for Prometheus server pod to become ready
kubectl rollout status deployment/"$RELEASE_NAME_PROM-server" -n "$NAMESPACE" || true
# 2) Deploy Mimir
deploy_chart "$RELEASE_NAME_MIMIR" \
"$CHART_REPO_NAME_MIMIR" \
"$CHART_REPO_URL_MIMIR" \
"$CHART_NAME_MIMIR" \
"$VALUES_FILE_MIMIR" \
"$NAMESPACE"
# Depending on how Mimir runs (StatefulSets, Deployments), you can wait for
# the correct resource to be ready. For example:
# kubectl rollout status statefulset/"$RELEASE_NAME_MIMIR-distributor" -n "$NAMESPACE" || true
# 3) Deploy Grafana
# deploy_chart "$RELEASE_NAME_GRAFANA" \
# "$CHART_REPO_NAME_GRAFANA" \
# "$CHART_REPO_URL_GRAFANA" \
# "$CHART_NAME_GRAFANA" \
# "$VALUES_FILE_GRAFANA" \
# "$NAMESPACE"
# kubectl rollout status deployment/"$RELEASE_NAME_GRAFANA" -n "$NAMESPACE" || true
# -----------------------------------------------------------
echo "All deployments completed!"
......@@ -39,6 +39,8 @@ spec:
env:
- name: LOG_LEVEL
value: "INFO"
- name: PUSHGATEWAY_URL
value: "http://mon-prometheus-prometheus-pushgateway.monitoring.svc.cluster.local:9091"
envFrom:
- secretRef:
name: kfk-kpi-data
......
rbac:
create: true
## Use an existing ClusterRole/Role (depending on rbac.namespaced false/true)
# useExistingRole: name-of-some-role
# useExistingClusterRole: name-of-some-clusterRole
pspEnabled: false
pspUseAppArmor: false
namespaced: false
serviceAccount:
create: true
name:
nameTest:
## ServiceAccount labels.
automountServiceAccountToken: false
replicas: 1
## Create a headless service for the deployment
headlessService: false
## Should the service account be auto mounted on the pod
automountServiceAccountToken: true
## Create HorizontalPodAutoscaler object for deployment type
#
autoscaling:
enabled: false
minReplicas: 1
maxReplicas: 3
targetCPU: "60"
targetMemory: ""
behavior: {}
deploymentStrategy:
type: RollingUpdate
readinessProbe:
httpGet:
path: /api/health
port: 3000
livenessProbe:
httpGet:
path: /api/health
port: 3000
initialDelaySeconds: 60
timeoutSeconds: 30
failureThreshold: 10
image:
registry: docker.io
repository: grafana/grafana
# Overrides the Grafana image tag whose default is the chart appVersion
tag: ""
sha: ""
pullPolicy: IfNotPresent
## Optionally specify an array of imagePullSecrets.
## Secrets must be manually created in the namespace.
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/
## Can be templated.
##
pullSecrets: []
# - myRegistrKeySecretName
testFramework:
enabled: true
## The type of Helm hook used to run this test. Defaults to test.
## ref: https://helm.sh/docs/topics/charts_hooks/#the-available-hooks
##
# hookType: test
image:
# -- The Docker registry
registry: docker.io
repository: bats/bats
tag: "v1.4.1"
imagePullPolicy: IfNotPresent
# dns configuration for pod
dnsPolicy: ~
dnsConfig: {}
# nameservers:
# - 8.8.8.8
# options:
# - name: ndots
# value: "2"
# - name: edns0
securityContext:
runAsNonRoot: true
runAsUser: 472
runAsGroup: 472
fsGroup: 472
containerSecurityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
seccompProfile:
type: RuntimeDefault
# Enable creating the grafana configmap
createConfigmap: true
downloadDashboardsImage:
registry: docker.io
repository: curlimages/curl
tag: 8.9.1
sha: ""
pullPolicy: IfNotPresent
downloadDashboards:
env: {}
envFromSecret: ""
resources: {}
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
seccompProfile:
type: RuntimeDefault
envValueFrom: {}
# ENV_NAME:
# configMapKeyRef:
# name: configmap-name
# key: value_key
## Pod Annotations
# podAnnotations: {}
## ConfigMap Annotations
# configMapAnnotations: {}
# argocd.argoproj.io/sync-options: Replace=true
## Pod Labels
# podLabels: {}
podPortName: grafana
gossipPortName: gossip
## Deployment annotations
# annotations: {}
service:
enabled: true
type: NodePort
port: 80
targetPort: 3000
nodePort: 30080
portName: service
## Enable persistence using Persistent Volume Claims
## ref: https://kubernetes.io/docs/user-guide/persistent-volumes/
##
persistence:
type: pvc
enabled: true
# storageClassName: default
accessModes:
- ReadWriteOnce
size: 10Gi
# annotations: {}
finalizers:
- kubernetes.io/pvc-protection
disableWarning: false
## If 'lookupVolumeName' is set to true, Helm will attempt to retrieve
## the current value of 'spec.volumeName' and incorporate it into the template.
lookupVolumeName: true
# Administrator credentials when not using an existing secret (see below)
adminUser: admin
# adminPassword: strongpassword
# Use an existing secret for the admin user.
admin:
## Name of the secret. Can be templated.
existingSecret: ""
userKey: admin-user
passwordKey: admin-password
## Configure grafana datasources
## ref: http://docs.grafana.org/administration/provisioning/#datasources
##
datasources:
datasources.yaml:
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
url: http://mon-prometheus-server.monitoring.svc.cluster.local
access: proxy
isDefault: true
- name: Mimir
type: prometheus
url: http://mimir-nginx.mon-mimir.svc:80/prometheus
access: proxy
isDefault: false
## Grafana's primary configuration
## NOTE: values in map will be converted to ini format
## ref: http://docs.grafana.org/installation/configuration/
##
grafana.ini:
paths:
data: /var/lib/grafana/
logs: /var/log/grafana
plugins: /var/lib/grafana/plugins
provisioning: /etc/grafana/provisioning
analytics:
check_for_updates: true
log:
mode: console
grafana_net:
url: https://grafana.net
server:
domain: "{{ if (and .Values.ingress.enabled .Values.ingress.hosts) }}{{ tpl (.Values.ingress.hosts | first) . }}{{ else }}''{{ end }}"
## Number of old ReplicaSets to retain
##
revisionHistoryLimit: 5
# assertNoLeakedSecrets is a helper function defined in _helpers.tpl that checks if secret
# values are not exposed in the rendered grafana.ini configmap. It is enabled by default.
#
# To pass values into grafana.ini without exposing them in a configmap, use variable expansion:
# https://grafana.com/docs/grafana/latest/setup-grafana/configure-grafana/#variable-expansion
#
# Alternatively, if you wish to allow secret values to be exposed in the rendered grafana.ini configmap,
# you can disable this check by setting assertNoLeakedSecrets to false.
assertNoLeakedSecrets: true
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Configuration for Prometheus components and server settings
# Global Prometheus configuration
alertmanager:
enabled: false # Default is true
kube-state-metrics:
enabled: false # Default is true
prometheus-node-exporter:
enabled: false # Default is true
prometheus-pushgateway:
enabled: true # Default is true
# Prometheus server-specific configuration
server:
retention: "30d"
logLevel: "debug"
resources:
requests:
cpu: "250m"
memory: "256Mi"
limits:
cpu: "1"
memory: "1Gi"
# Expose the Prometheus server via a Kubernetes service
service:
type: NodePort
nodePort: 30090
extraScrapeConfigs:
- job_name: 'pushgateway'
static_configs:
- targets:
- 'prometheus-pushgateway.monitoring.svc.cluster.local:9091' # Push Gateway endpoint
# Global Prometheus settings:
global:
scrape_interval: 10s
evaluation_interval: 10s
......@@ -19,6 +19,7 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
kpi_value_writer/tests/test_kpi_value_writer.py
......@@ -25,5 +25,5 @@ export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetr
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=debug --log-cli-level=debug --verbose \
python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
telemetry/backend/tests/test_backend.py
......@@ -18,10 +18,11 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
telemetry/frontend/tests/test_frontend.py
......@@ -16,8 +16,11 @@ import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Settings import get_log_level, get_metrics_port
from .AnalyticsBackendService import AnalyticsBackendService
from common.tools.kafka.Variables import KafkaTopic
terminate = threading.Event()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
......@@ -36,6 +39,8 @@ def main():
LOGGER.info('Starting...')
KafkaTopic.create_all_topics()
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
......
......@@ -18,8 +18,11 @@ from common.Settings import get_log_level, get_metrics_port
from .AnalyticsFrontendService import AnalyticsFrontendService
from analytics.database.AnalyzerModel import Analyzer as Model
from common.tools.database.GenericDatabase import Database
from common.tools.kafka.Variables import KafkaTopic
terminate = threading.Event()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
......@@ -43,6 +46,8 @@ def main():
kpiDBobj.create_database()
kpiDBobj.create_tables()
KafkaTopic.create_all_topics()
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
......
......@@ -56,7 +56,7 @@ def delay_linear(initial=0, increment=0, maximum=None):
return delay
return compute
def delay_exponential(initial=1, increment=1, maximum=None):
def delay_exponential(initial=1.0, increment=1.0, maximum=None):
def compute(num_try):
delay = initial * pow(increment, (num_try - 1))
if maximum is not None:
......
......@@ -33,8 +33,8 @@ class Engine:
CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
try:
engine = sqlalchemy.create_engine(crdb_uri, echo=False)
LOGGER.info(' AnalyzerDB initalized with DB URL: {:}'.format(crdb_uri))
LOGGER.info(' Database initalized with DB URL: {:}'.format(crdb_uri))
return engine
except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
return None # type: ignore
return engine
......@@ -22,13 +22,16 @@ from ._GrpcToEnum import grpc_to_enum
# BYTES_RECEIVED. If item name does not match, automatic mapping of
# proto enums to database enums will fail.
class ORM_KpiSampleTypeEnum(enum.Enum):
UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN
PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED
PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED
BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED
LINK_TOTAL_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS
LINK_USED_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS
UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN # 0
PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED # 101
PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # 102
PACKETS_DROPPED = KpiSampleType.KPISAMPLETYPE_PACKETS_DROPPED # 103
BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED # 201
BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED # 202
BYTES_DROPPED = KpiSampleType.KPISAMPLETYPE_BYTES_DROPPED # 203
LINK_TOTAL_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS # 301
LINK_USED_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS # 302
grpc_to_enum__kpi_sample_type = functools.partial(
grpc_to_enum, KpiSampleType, ORM_KpiSampleTypeEnum)
......@@ -26,15 +26,15 @@ def create_kpi_id_request():
def create_kpi_descriptor_request(descriptor_name: str = "Test_name"):
_create_kpi_request = kpi_manager_pb2.KpiDescriptor()
_create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4())
# _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4())
# _create_kpi_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
# _create_kpi_request.kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_kpi_request.kpi_id.kpi_id.uuid = "f974b6cc-095f-4767-b8c1-3457b383fb99"
_create_kpi_request.kpi_description = descriptor_name
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV2'
_create_kpi_request.device_id.device_uuid.uuid = str(uuid.uuid4())
_create_kpi_request.service_id.service_uuid.uuid = 'SERV2'
_create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1'
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1'
_create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1'
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = str(uuid.uuid4())
_create_kpi_request.connection_id.connection_uuid.uuid = 'CON1'
_create_kpi_request.link_id.link_uuid.uuid = 'LNK1'
return _create_kpi_request
......@@ -77,4 +77,4 @@ def create_kpi_filter_request():
_create_kpi_filter_request.connection_id.append(connection_id_obj)
_create_kpi_filter_request.link_id.append(link_id_obj)
return _create_kpi_filter_request
\ No newline at end of file
return _create_kpi_filter_request
......@@ -15,25 +15,21 @@
import json
import logging
import threading
from confluent_kafka import KafkaError
from confluent_kafka import Consumer as KafkaConsumer
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.Settings import get_service_port_grpc
from common.Constants import ServiceNameEnum
from common.tools.service.GenericGrpcService import GenericGrpcService
from confluent_kafka import KafkaError
from confluent_kafka import Consumer as KafkaConsumer
from kpi_manager.client.KpiManagerClient import KpiManagerClient
# -- test import --
# from kpi_value_writer.tests.test_messages import create_kpi_descriptor_request
from .MetricWriterToPrometheus import MetricWriterToPrometheus
LOGGER = logging.getLogger(__name__)
ACTIVE_CONSUMERS = []
LOGGER = logging.getLogger(__name__)
class KpiValueWriter(GenericGrpcService):
def __init__(self, cls_name : str = __name__) -> None:
......@@ -43,9 +39,8 @@ class KpiValueWriter(GenericGrpcService):
'group.id' : 'KpiValueWriter',
'auto.offset.reset' : 'latest'})
def RunKafkaConsumer(self):
def install_servicers(self):
thread = threading.Thread(target=self.KafkaKpiConsumer, args=())
ACTIVE_CONSUMERS.append(thread)
thread.start()
def KafkaKpiConsumer(self):
......@@ -55,7 +50,6 @@ class KpiValueWriter(GenericGrpcService):
consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.VALUE.value])
LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
while True:
raw_kpi = consumer.poll(1.0)
if raw_kpi is None:
......@@ -69,30 +63,21 @@ class KpiValueWriter(GenericGrpcService):
try:
kpi_value = json.loads(raw_kpi.value().decode('utf-8'))
LOGGER.info("Received KPI : {:}".format(kpi_value))
print("Received KPI : {:}".format(kpi_value))
self.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer)
except Exception as e:
print("Error detail: {:}".format(e))
except:
LOGGER.exception("Error detail: ")
continue
def get_kpi_descriptor(self, kpi_value: str, kpi_manager_client, metric_writer):
print("--- START -----")
kpi_id = KpiId()
kpi_id.kpi_id.uuid = kpi_value['kpi_uuid']
print("KpiId generated: {:}".format(kpi_id))
# print("Kpi manger client created: {:}".format(kpi_manager_client))
kpi_id.kpi_id.uuid = kpi_value['kpi_id'] # type: ignore
try:
kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
# TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object???
if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid:
LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
metric_writer.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value)
else:
LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
LOGGER.info("No KPI Descriptor found in Database for Kpi ID: {:}".format(kpi_id))
except Exception as e:
LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e))
print ("Unable to get KpiDescriptor. Error: {:}".format(e))
......@@ -14,15 +14,20 @@
# read Kafka stream from Kafka topic
import os
import logging
from prometheus_client import Gauge
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.kpi_manager_pb2 import KpiDescriptor
from prometheus_client import Gauge
from prometheus_client.exposition import push_to_gateway
from prometheus_client.registry import CollectorRegistry
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.kpi_manager_pb2 import KpiDescriptor
LOGGER = logging.getLogger(__name__)
PROM_METRICS = {}
LOGGER = logging.getLogger(__name__)
PROM_METRICS = {}
GATEWAY_URL = os.getenv('PUSHGATEWAY_URL', 'prometheus-pushgateway.monitoring.svc.cluster.local:9091')
class MetricWriterToPrometheus:
'''
......@@ -30,7 +35,9 @@ class MetricWriterToPrometheus:
cooked KPI value = KpiDescriptor (gRPC message) + KpiValue (gRPC message)
'''
def __init__(self):
pass
self.job_name = 'kpivaluewriter'
self.registry = CollectorRegistry()
self.gateway_url = GATEWAY_URL
def merge_kpi_descriptor_and_kpi_value(self, kpi_descriptor, kpi_value):
# Creating a dictionary from the kpi_descriptor's attributes
......@@ -44,26 +51,27 @@ class MetricWriterToPrometheus:
'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid,
'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid,
'link_id' : kpi_descriptor.link_id.link_uuid.uuid,
'time_stamp' : kpi_value.timestamp.timestamp,
'kpi_value' : kpi_value.kpi_value_type.floatVal
'time_stamp' : kpi_value["time_stamp"],
'kpi_value' : kpi_value["kpi_value"]
}
LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi))
return cooked_kpi
def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue):
# merge both gRPC messages into single varible.
cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value)
cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value)
tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'}
metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags
metric_name = cooked_kpi['kpi_sample_type']
metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags
metric_name = cooked_kpi['kpi_sample_type']
try:
if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists
PROM_METRICS[metric_name] = Gauge (
metric_name,
cooked_kpi['kpi_description'],
metric_tags
metric_tags,
registry=self.registry
)
LOGGER.debug("Metric is created with labels: {:}".format(metric_tags))
LOGGER.debug("Metric is created with labels: {:}".format(metric_tags))
PROM_METRICS[metric_name].labels(
kpi_id = cooked_kpi['kpi_id'],
device_id = cooked_kpi['device_id'],
......@@ -74,7 +82,11 @@ class MetricWriterToPrometheus:
link_id = cooked_kpi['link_id'],
time_stamp = cooked_kpi['time_stamp'],
).set(float(cooked_kpi['kpi_value']))
LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name]))
LOGGER.debug("Metric is being pushed to the Gateway ... : {:}".format(PROM_METRICS[metric_name]))
# Push to the Prometheus Gateway, Prometheus is preconfigured to scrap the metrics from the gateway
push_to_gateway(self.gateway_url, job=self.job_name, registry=self.registry)
LOGGER.debug("Metric pushed to Prometheus Gateway.")
except ValueError as e:
if 'Duplicated timeseries' in str(e):
......
......@@ -13,7 +13,6 @@
# limitations under the License.
import logging, signal, sys, threading
from prometheus_client import start_http_server
from kpi_value_writer.service.KpiValueWriter import KpiValueWriter
from common.Settings import get_log_level
......@@ -39,8 +38,6 @@ def main():
grpc_service = KpiValueWriter()
grpc_service.start()
start_http_server(10808)
LOGGER.debug("Prometheus client is started on port 10808")
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass
......
......@@ -12,14 +12,35 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
import time
import logging
from kpi_value_writer.service.KpiValueWriter import KpiValueWriter
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from common.tools.kafka.Variables import KafkaTopic
from test_messages import create_kpi_descriptor_request
LOGGER = logging.getLogger(__name__)
# -------- Fixtures ----------------
@pytest.fixture(autouse=True)
def log_all_methods(request):
'''
This fixture logs messages before and after each test function runs, indicating the start and end of the test.
The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
'''
LOGGER.info(f" >>>>> Starting test: {request.node.name} ")
yield
LOGGER.info(f" <<<<< Finished test: {request.node.name} ")
# @pytest.fixture(scope='module')
# def kpi_manager_client():
# LOGGER.debug("Yielding KpiManagerClient ...")
# yield KpiManagerClient(host="10.152.183.203")
# LOGGER.debug("KpiManagerClient is terminated.")
LOGGER = logging.getLogger(__name__)
# -------- Initial Test ----------------
def test_validate_kafka_topics():
......@@ -27,7 +48,15 @@ def test_validate_kafka_topics():
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
def test_KafkaConsumer():
LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ")
# kpi_value_writer = KpiValueWriter()
# kpi_value_writer.RunKafkaConsumer()
# --------------
# NOT FOR GITHUB PIPELINE (Local testing only)
# --------------
# def test_KafkaConsumer(kpi_manager_client):
# # kpidescriptor = create_kpi_descriptor_request()
# # kpi_manager_client.SetKpiDescriptor(kpidescriptor)
# kpi_value_writer = KpiValueWriter()
# kpi_value_writer.KafkaKpiConsumer()
# LOGGER.debug(" waiting for timer to finish ")
# time.sleep(300)
......@@ -25,7 +25,8 @@ def create_kpi_id_request():
def create_kpi_descriptor_request(description: str = "Test Description"):
_create_kpi_request = kpi_manager_pb2.KpiDescriptor()
_create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4())
# _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_kpi_request.kpi_id.kpi_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd"
_create_kpi_request.kpi_description = description
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV4'
......