Skip to content
Snippets Groups Projects
Commit 4988a93a authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'develop' of ssh://gifrerenom_labs.etsi.org/tfs/controller into...

Merge branch 'develop' of ssh://gifrerenom_labs.etsi.org/tfs/controller into feat/216-cttc-implement-integration-test-between-e2e-ip-optical-sdn-controllers
parents 36adfc01 c2a65dc4
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!286Resolve "(CTTC) Implement integration test between E2E-IP-Optical SDN Controllers"
Showing
with 541 additions and 125 deletions
...@@ -179,5 +179,3 @@ libyang/ ...@@ -179,5 +179,3 @@ libyang/
# Other logs # Other logs
**/logs/*.log.* **/logs/*.log.*
# PySpark checkpoints
src/analytics/.spark/*
...@@ -215,6 +215,9 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"} ...@@ -215,6 +215,9 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"}
# Deploy Apache Kafka # Deploy Apache Kafka
./deploy/kafka.sh ./deploy/kafka.sh
#Deploy Monitoring (Prometheus, Mimir, Grafana)
./deploy/monitoring.sh
# Expose Dashboard # Expose Dashboard
./deploy/expose_dashboard.sh ./deploy/expose_dashboard.sh
......
#!/bin/bash
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -euo pipefail
# -----------------------------------------------------------
# Global namespace for all deployments
# -----------------------------------------------------------
NAMESPACE="monitoring"
VALUES_FILE_PATH="manifests/monitoring"
# -----------------------------------------------------------
# Prometheus Configuration
# -----------------------------------------------------------
RELEASE_NAME_PROM="mon-prometheus"
CHART_REPO_NAME_PROM="prometheus-community"
CHART_REPO_URL_PROM="https://prometheus-community.github.io/helm-charts"
CHART_NAME_PROM="prometheus"
VALUES_FILE_PROM="$VALUES_FILE_PATH/prometheus_values.yaml"
# -----------------------------------------------------------
# Mimir Configuration
# -----------------------------------------------------------
# RELEASE_NAME_MIMIR="mon-mimir"
# CHART_REPO_NAME_MIMIR="grafana"
# CHART_REPO_URL_MIMIR="https://grafana.github.io/helm-charts"
# CHART_NAME_MIMIR="mimir-distributed"
# VALUES_FILE_MIMIR="$VALUES_FILE_PATH/mimir_values.yaml"
# -----------------------------------------------------------
# Grafana Configuration
# -----------------------------------------------------------
# RELEASE_NAME_GRAFANA="mon-grafana"
# CHART_REPO_NAME_GRAFANA="grafana"
# CHART_REPO_URL_GRAFANA="https://grafana.github.io/helm-charts"
# CHART_NAME_GRAFANA="grafana"
# VALUES_FILE_GRAFANA="$VALUES_FILE_PATH/grafana_values.yaml"
# -----------------------------------------------------------
# Function to deploy or upgrade a Helm chart
# -----------------------------------------------------------
deploy_chart() {
local release_name="$1"
local chart_repo_name="$2"
local chart_repo_url="$3"
local chart_name="$4"
local values_file="$5"
local namespace="$6"
echo ">>> Deploying [${release_name}] from repo [${chart_repo_name}]..."
# Add or update the Helm repo
echo "Adding/updating Helm repo: $chart_repo_name -> $chart_repo_url"
helm repo add "$chart_repo_name" "$chart_repo_url" || true
helm repo update
# Create namespace if needed
echo "Creating namespace '$namespace' if it doesn't exist..."
kubectl get namespace "$namespace" >/dev/null 2>&1 || kubectl create namespace "$namespace"
# Install or upgrade the chart
if [ -n "$values_file" ] && [ -f "$values_file" ]; then
echo "Installing/Upgrading $release_name using custom values from $values_file..."
helm upgrade --install "$release_name" "$chart_repo_name/$chart_name" \
--namespace "$namespace" \
--values "$values_file"
else
echo "Installing/Upgrading $release_name with default chart values..."
helm upgrade --install "$release_name" "$chart_repo_name/$chart_name" \
--namespace "$namespace"
fi
echo "<<< Deployment initiated for [$release_name]."
echo
}
# -----------------------------------------------------------
# Actual Deployments
# -----------------------------------------------------------
# 1) Deploy Prometheus
deploy_chart "$RELEASE_NAME_PROM" \
"$CHART_REPO_NAME_PROM" \
"$CHART_REPO_URL_PROM" \
"$CHART_NAME_PROM" \
"$VALUES_FILE_PROM" \
"$NAMESPACE"
# Optionally wait for Prometheus server pod to become ready
kubectl rollout status deployment/"$RELEASE_NAME_PROM-server" -n "$NAMESPACE" || true
# 2) Deploy Mimir
# deploy_chart "$RELEASE_NAME_MIMIR" \
# "$CHART_REPO_NAME_MIMIR" \
# "$CHART_REPO_URL_MIMIR" \
# "$CHART_NAME_MIMIR" \
# "$VALUES_FILE_MIMIR" \
# "$NAMESPACE"
# Depending on how Mimir runs (StatefulSets, Deployments), you can wait for
# the correct resource to be ready. For example:
# kubectl rollout status statefulset/"$RELEASE_NAME_MIMIR-distributor" -n "$NAMESPACE" || true
# 3) Deploy Grafana
# deploy_chart "$RELEASE_NAME_GRAFANA" \
# "$CHART_REPO_NAME_GRAFANA" \
# "$CHART_REPO_URL_GRAFANA" \
# "$CHART_NAME_GRAFANA" \
# "$VALUES_FILE_GRAFANA" \
# "$NAMESPACE"
# kubectl rollout status deployment/"$RELEASE_NAME_GRAFANA" -n "$NAMESPACE" || true
# -----------------------------------------------------------
echo "All deployments completed!"
...@@ -39,6 +39,8 @@ spec: ...@@ -39,6 +39,8 @@ spec:
env: env:
- name: LOG_LEVEL - name: LOG_LEVEL
value: "INFO" value: "INFO"
- name: PUSHGATEWAY_URL
value: "http://mon-prometheus-prometheus-pushgateway.monitoring.svc.cluster.local:9091"
envFrom: envFrom:
- secretRef: - secretRef:
name: kfk-kpi-data name: kfk-kpi-data
......
rbac:
create: true
## Use an existing ClusterRole/Role (depending on rbac.namespaced false/true)
# useExistingRole: name-of-some-role
# useExistingClusterRole: name-of-some-clusterRole
pspEnabled: false
pspUseAppArmor: false
namespaced: false
serviceAccount:
create: true
name:
nameTest:
## ServiceAccount labels.
automountServiceAccountToken: false
replicas: 1
## Create a headless service for the deployment
headlessService: false
## Should the service account be auto mounted on the pod
automountServiceAccountToken: true
## Create HorizontalPodAutoscaler object for deployment type
#
autoscaling:
enabled: false
minReplicas: 1
maxReplicas: 3
targetCPU: "60"
targetMemory: ""
behavior: {}
deploymentStrategy:
type: RollingUpdate
readinessProbe:
httpGet:
path: /api/health
port: 3000
livenessProbe:
httpGet:
path: /api/health
port: 3000
initialDelaySeconds: 60
timeoutSeconds: 30
failureThreshold: 10
image:
registry: docker.io
repository: grafana/grafana
# Overrides the Grafana image tag whose default is the chart appVersion
tag: ""
sha: ""
pullPolicy: IfNotPresent
## Optionally specify an array of imagePullSecrets.
## Secrets must be manually created in the namespace.
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/
## Can be templated.
##
pullSecrets: []
# - myRegistrKeySecretName
testFramework:
enabled: true
## The type of Helm hook used to run this test. Defaults to test.
## ref: https://helm.sh/docs/topics/charts_hooks/#the-available-hooks
##
# hookType: test
image:
# -- The Docker registry
registry: docker.io
repository: bats/bats
tag: "v1.4.1"
imagePullPolicy: IfNotPresent
# dns configuration for pod
dnsPolicy: ~
dnsConfig: {}
# nameservers:
# - 8.8.8.8
# options:
# - name: ndots
# value: "2"
# - name: edns0
securityContext:
runAsNonRoot: true
runAsUser: 472
runAsGroup: 472
fsGroup: 472
containerSecurityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
seccompProfile:
type: RuntimeDefault
# Enable creating the grafana configmap
createConfigmap: true
downloadDashboardsImage:
registry: docker.io
repository: curlimages/curl
tag: 8.9.1
sha: ""
pullPolicy: IfNotPresent
downloadDashboards:
env: {}
envFromSecret: ""
resources: {}
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
seccompProfile:
type: RuntimeDefault
envValueFrom: {}
# ENV_NAME:
# configMapKeyRef:
# name: configmap-name
# key: value_key
## Pod Annotations
# podAnnotations: {}
## ConfigMap Annotations
# configMapAnnotations: {}
# argocd.argoproj.io/sync-options: Replace=true
## Pod Labels
# podLabels: {}
podPortName: grafana
gossipPortName: gossip
## Deployment annotations
# annotations: {}
service:
enabled: true
type: NodePort
port: 80
targetPort: 3000
nodePort: 30080
portName: service
## Enable persistence using Persistent Volume Claims
## ref: https://kubernetes.io/docs/user-guide/persistent-volumes/
##
persistence:
type: pvc
enabled: true
# storageClassName: default
accessModes:
- ReadWriteOnce
size: 10Gi
# annotations: {}
finalizers:
- kubernetes.io/pvc-protection
disableWarning: false
## If 'lookupVolumeName' is set to true, Helm will attempt to retrieve
## the current value of 'spec.volumeName' and incorporate it into the template.
lookupVolumeName: true
# Administrator credentials when not using an existing secret (see below)
adminUser: admin
# adminPassword: strongpassword
# Use an existing secret for the admin user.
admin:
## Name of the secret. Can be templated.
existingSecret: ""
userKey: admin-user
passwordKey: admin-password
## Configure grafana datasources
## ref: http://docs.grafana.org/administration/provisioning/#datasources
##
datasources:
datasources.yaml:
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
url: http://mon-prometheus-server.monitoring.svc.cluster.local
access: proxy
isDefault: true
- name: Mimir
type: prometheus
url: http://mimir-nginx.mon-mimir.svc:80/prometheus
access: proxy
isDefault: false
## Grafana's primary configuration
## NOTE: values in map will be converted to ini format
## ref: http://docs.grafana.org/installation/configuration/
##
grafana.ini:
paths:
data: /var/lib/grafana/
logs: /var/log/grafana
plugins: /var/lib/grafana/plugins
provisioning: /etc/grafana/provisioning
analytics:
check_for_updates: true
log:
mode: console
grafana_net:
url: https://grafana.net
server:
domain: "{{ if (and .Values.ingress.enabled .Values.ingress.hosts) }}{{ tpl (.Values.ingress.hosts | first) . }}{{ else }}''{{ end }}"
## Number of old ReplicaSets to retain
##
revisionHistoryLimit: 5
# assertNoLeakedSecrets is a helper function defined in _helpers.tpl that checks if secret
# values are not exposed in the rendered grafana.ini configmap. It is enabled by default.
#
# To pass values into grafana.ini without exposing them in a configmap, use variable expansion:
# https://grafana.com/docs/grafana/latest/setup-grafana/configure-grafana/#variable-expansion
#
# Alternatively, if you wish to allow secret values to be exposed in the rendered grafana.ini configmap,
# you can disable this check by setting assertNoLeakedSecrets to false.
assertNoLeakedSecrets: true
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Configuration for Prometheus components and server settings
# Global Prometheus configuration
alertmanager:
enabled: false # Default is true
kube-state-metrics:
enabled: false # Default is true
prometheus-node-exporter:
enabled: false # Default is true
prometheus-pushgateway:
enabled: true # Default is true
# Prometheus server-specific configuration
server:
retention: "30d"
logLevel: "debug"
resources:
requests:
cpu: "250m"
memory: "256Mi"
limits:
cpu: "1"
memory: "1Gi"
# Expose the Prometheus server via a Kubernetes service
service:
type: NodePort
nodePort: 30090
extraScrapeConfigs:
- job_name: 'pushgateway'
static_configs:
- targets:
- 'prometheus-pushgateway.monitoring.svc.cluster.local:9091' # Push Gateway endpoint
# Global Prometheus settings:
global:
scrape_interval: 10s
evaluation_interval: 10s
...@@ -19,6 +19,7 @@ PROJECTDIR=`pwd` ...@@ -19,6 +19,7 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src cd $PROJECTDIR/src
export KFK_SERVER_ADDRESS='127.0.0.1:9092' export KFK_SERVER_ADDRESS='127.0.0.1:9092'
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
kpi_value_writer/tests/test_kpi_value_writer.py kpi_value_writer/tests/test_kpi_value_writer.py
...@@ -25,5 +25,5 @@ export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetr ...@@ -25,5 +25,5 @@ export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetr
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=debug --log-cli-level=debug --verbose \ python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
telemetry/backend/tests/test_backend.py telemetry/backend/tests/test_backend.py
...@@ -18,10 +18,11 @@ PROJECTDIR=`pwd` ...@@ -18,10 +18,11 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py
export KFK_SERVER_ADDRESS='127.0.0.1:9092' export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require" export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
telemetry/frontend/tests/test_frontend.py telemetry/frontend/tests/test_frontend.py
...@@ -16,8 +16,11 @@ import logging, signal, sys, threading ...@@ -16,8 +16,11 @@ import logging, signal, sys, threading
from prometheus_client import start_http_server from prometheus_client import start_http_server
from common.Settings import get_log_level, get_metrics_port from common.Settings import get_log_level, get_metrics_port
from .AnalyticsBackendService import AnalyticsBackendService from .AnalyticsBackendService import AnalyticsBackendService
from common.tools.kafka.Variables import KafkaTopic
terminate = threading.Event() terminate = threading.Event()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
LOGGER = None LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
...@@ -36,6 +39,8 @@ def main(): ...@@ -36,6 +39,8 @@ def main():
LOGGER.info('Starting...') LOGGER.info('Starting...')
KafkaTopic.create_all_topics()
# Start metrics server # Start metrics server
metrics_port = get_metrics_port() metrics_port = get_metrics_port()
start_http_server(metrics_port) start_http_server(metrics_port)
......
...@@ -18,8 +18,11 @@ from common.Settings import get_log_level, get_metrics_port ...@@ -18,8 +18,11 @@ from common.Settings import get_log_level, get_metrics_port
from .AnalyticsFrontendService import AnalyticsFrontendService from .AnalyticsFrontendService import AnalyticsFrontendService
from analytics.database.AnalyzerModel import Analyzer as Model from analytics.database.AnalyzerModel import Analyzer as Model
from common.tools.database.GenericDatabase import Database from common.tools.database.GenericDatabase import Database
from common.tools.kafka.Variables import KafkaTopic
terminate = threading.Event() terminate = threading.Event()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
LOGGER = None LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
...@@ -43,6 +46,8 @@ def main(): ...@@ -43,6 +46,8 @@ def main():
kpiDBobj.create_database() kpiDBobj.create_database()
kpiDBobj.create_tables() kpiDBobj.create_tables()
KafkaTopic.create_all_topics()
# Start metrics server # Start metrics server
metrics_port = get_metrics_port() metrics_port = get_metrics_port()
start_http_server(metrics_port) start_http_server(metrics_port)
......
...@@ -56,7 +56,7 @@ def delay_linear(initial=0, increment=0, maximum=None): ...@@ -56,7 +56,7 @@ def delay_linear(initial=0, increment=0, maximum=None):
return delay return delay
return compute return compute
def delay_exponential(initial=1, increment=1, maximum=None): def delay_exponential(initial=1.0, increment=1.0, maximum=None):
def compute(num_try): def compute(num_try):
delay = initial * pow(increment, (num_try - 1)) delay = initial * pow(increment, (num_try - 1))
if maximum is not None: if maximum is not None:
......
...@@ -33,8 +33,8 @@ class Engine: ...@@ -33,8 +33,8 @@ class Engine:
CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
try: try:
engine = sqlalchemy.create_engine(crdb_uri, echo=False) engine = sqlalchemy.create_engine(crdb_uri, echo=False)
LOGGER.info(' AnalyzerDB initalized with DB URL: {:}'.format(crdb_uri)) LOGGER.info(' Database initalized with DB URL: {:}'.format(crdb_uri))
return engine
except: # pylint: disable=bare-except # pragma: no cover except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
return None # type: ignore return None # type: ignore
return engine
...@@ -22,13 +22,16 @@ from ._GrpcToEnum import grpc_to_enum ...@@ -22,13 +22,16 @@ from ._GrpcToEnum import grpc_to_enum
# BYTES_RECEIVED. If item name does not match, automatic mapping of # BYTES_RECEIVED. If item name does not match, automatic mapping of
# proto enums to database enums will fail. # proto enums to database enums will fail.
class ORM_KpiSampleTypeEnum(enum.Enum): class ORM_KpiSampleTypeEnum(enum.Enum):
UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN # 0
PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED # 101
PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # 102
BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED PACKETS_DROPPED = KpiSampleType.KPISAMPLETYPE_PACKETS_DROPPED # 103
BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED # 201
LINK_TOTAL_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED # 202
LINK_USED_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS BYTES_DROPPED = KpiSampleType.KPISAMPLETYPE_BYTES_DROPPED # 203
LINK_TOTAL_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS # 301
LINK_USED_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS # 302
grpc_to_enum__kpi_sample_type = functools.partial( grpc_to_enum__kpi_sample_type = functools.partial(
grpc_to_enum, KpiSampleType, ORM_KpiSampleTypeEnum) grpc_to_enum, KpiSampleType, ORM_KpiSampleTypeEnum)
...@@ -18,7 +18,7 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m ...@@ -18,7 +18,7 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m
from common.proto.context_pb2 import Empty from common.proto.context_pb2 import Empty
from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceServicer from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceServicer
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList
# from kpi_manager.database.Kpi_DB import KpiDB from common.method_wrappers.ServiceExceptions import NotFoundException
from kpi_manager.database.KpiDB import KpiDB from kpi_manager.database.KpiDB import KpiDB
from kpi_manager.database.KpiModel import Kpi as KpiModel from kpi_manager.database.KpiModel import Kpi as KpiModel
...@@ -31,65 +31,48 @@ class KpiManagerServiceServicerImpl(KpiManagerServiceServicer): ...@@ -31,65 +31,48 @@ class KpiManagerServiceServicerImpl(KpiManagerServiceServicer):
self.kpi_db_obj = KpiDB(KpiModel) self.kpi_db_obj = KpiDB(KpiModel)
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SetKpiDescriptor(self, request: KpiDescriptor, grpc_context: grpc.ServicerContext # type: ignore def SetKpiDescriptor(
) -> KpiId: # type: ignore self, request: KpiDescriptor, grpc_context: grpc.ServicerContext # type: ignore
) -> KpiId: # type: ignore
response = KpiId() response = KpiId()
LOGGER.info("Received gRPC message object: {:}".format(request)) LOGGER.info("Received gRPC message object: {:}".format(request))
try: kpi_to_insert = KpiModel.convert_KpiDescriptor_to_row(request)
kpi_to_insert = KpiModel.convert_KpiDescriptor_to_row(request) if self.kpi_db_obj.add_row_to_db(kpi_to_insert):
if(self.kpi_db_obj.add_row_to_db(kpi_to_insert)): response.kpi_id.uuid = request.kpi_id.kpi_id.uuid
response.kpi_id.uuid = request.kpi_id.kpi_id.uuid # LOGGER.info("Added Row: {:}".format(response))
# LOGGER.info("Added Row: {:}".format(response)) return response
return response
except Exception as e:
LOGGER.info("Unable to create KpiModel class object. {:}".format(e))
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext # type: ignore def GetKpiDescriptor(
) -> KpiDescriptor: # type: ignore self, request: KpiId, grpc_context: grpc.ServicerContext # type: ignore
) -> KpiDescriptor: # type: ignore
response = KpiDescriptor() response = KpiDescriptor()
print("--> Received gRPC message object: {:}".format(request))
LOGGER.info("Received gRPC message object: {:}".format(request)) LOGGER.info("Received gRPC message object: {:}".format(request))
try: kpi_id_to_search = request.kpi_id.uuid
kpi_id_to_search = request.kpi_id.uuid row = self.kpi_db_obj.search_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search)
row = self.kpi_db_obj.search_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search) if row is None:
if row is None: LOGGER.info('No matching row found kpi id: {:}'.format(kpi_id_to_search))
print ('No matching row found for kpi id: {:}'.format(kpi_id_to_search)) raise NotFoundException('KpiDescriptor', kpi_id_to_search)
LOGGER.info('No matching row found kpi id: {:}'.format(kpi_id_to_search)) response = KpiModel.convert_row_to_KpiDescriptor(row)
return Empty() return response
else:
response = KpiModel.convert_row_to_KpiDescriptor(row)
return response
except Exception as e:
print ('Unable to search kpi id. {:}'.format(e))
LOGGER.info('Unable to search kpi id. {:}'.format(e))
raise e
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def DeleteKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext # type: ignore def DeleteKpiDescriptor(
) -> Empty: # type: ignore self, request: KpiId, grpc_context: grpc.ServicerContext # type: ignore
) -> Empty: # type: ignore
LOGGER.info("Received gRPC message object: {:}".format(request)) LOGGER.info("Received gRPC message object: {:}".format(request))
try: kpi_id_to_search = request.kpi_id.uuid
kpi_id_to_search = request.kpi_id.uuid self.kpi_db_obj.delete_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search)
self.kpi_db_obj.delete_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search) return Empty()
except Exception as e:
LOGGER.info('Unable to search kpi id. {:}'.format(e))
finally:
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectKpiDescriptor(self, filter: KpiDescriptorFilter, grpc_context: grpc.ServicerContext # type: ignore def SelectKpiDescriptor(
) -> KpiDescriptorList: # type: ignore self, filter: KpiDescriptorFilter, grpc_context: grpc.ServicerContext # type: ignore
) -> KpiDescriptorList: # type: ignore
LOGGER.info("Received gRPC message object: {:}".format(filter)) LOGGER.info("Received gRPC message object: {:}".format(filter))
response = KpiDescriptorList() response = KpiDescriptorList()
try: rows = self.kpi_db_obj.select_with_filter(KpiModel, filter)
rows = self.kpi_db_obj.select_with_filter(KpiModel, filter) for row in rows:
except Exception as e: kpiDescriptor_obj = KpiModel.convert_row_to_KpiDescriptor(row)
LOGGER.info('Unable to apply filter on kpi descriptor. {:}'.format(e)) response.kpi_descriptor_list.append(kpiDescriptor_obj)
try: return response
for row in rows:
kpiDescriptor_obj = KpiModel.convert_row_to_KpiDescriptor(row)
response.kpi_descriptor_list.append(kpiDescriptor_obj)
return response
except Exception as e:
LOGGER.info('Unable to process filter response {:}'.format(e))
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
import grpc
import os, pytest import os, pytest
import logging import logging
from typing import Union from typing import Union
...@@ -109,13 +110,19 @@ def test_DeleteKpiDescriptor(kpi_manager_client): ...@@ -109,13 +110,19 @@ def test_DeleteKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ") LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ")
# adding KPI # adding KPI
response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# deleting KPI # deleting KPI
del_response = kpi_manager_client.DeleteKpiDescriptor(response_id) del_response = kpi_manager_client.DeleteKpiDescriptor(response_id)
# select KPI
kpi_manager_client.GetKpiDescriptor(response_id)
LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response)) LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response))
assert isinstance(del_response, Empty) assert isinstance(del_response, Empty)
# select KPI and check it does not exist
with pytest.raises(grpc.RpcError) as e:
kpi_manager_client.GetKpiDescriptor(response_id)
assert e.value.code() == grpc.StatusCode.NOT_FOUND
MSG = 'KpiDescriptor({:s}) not found'
assert e.value.details() == MSG.format(response_id.kpi_id.uuid)
def test_GetKpiDescriptor(kpi_manager_client): def test_GetKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
# adding KPI # adding KPI
...@@ -123,11 +130,6 @@ def test_GetKpiDescriptor(kpi_manager_client): ...@@ -123,11 +130,6 @@ def test_GetKpiDescriptor(kpi_manager_client):
# get KPI # get KPI
response = kpi_manager_client.GetKpiDescriptor(response_id) response = kpi_manager_client.GetKpiDescriptor(response_id)
LOGGER.info("Response gRPC message object: {:}".format(response)) LOGGER.info("Response gRPC message object: {:}".format(response))
LOGGER.info(" >>> calling GetKpiDescriptor with random ID")
rand_response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request())
LOGGER.info("Response gRPC message object: {:}".format(rand_response))
assert isinstance(response, KpiDescriptor) assert isinstance(response, KpiDescriptor)
def test_SelectKpiDescriptor(kpi_manager_client): def test_SelectKpiDescriptor(kpi_manager_client):
......
...@@ -28,13 +28,13 @@ def create_kpi_descriptor_request(descriptor_name: str = "Test_name"): ...@@ -28,13 +28,13 @@ def create_kpi_descriptor_request(descriptor_name: str = "Test_name"):
_create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request = kpi_manager_pb2.KpiDescriptor()
_create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4())
# _create_kpi_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" # _create_kpi_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
# _create_kpi_request.kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" # _create_kpi_request.kpi_id.kpi_id.uuid = "f974b6cc-095f-4767-b8c1-3457b383fb99"
_create_kpi_request.kpi_description = descriptor_name _create_kpi_request.kpi_description = descriptor_name
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV2' _create_kpi_request.device_id.device_uuid.uuid = str(uuid.uuid4())
_create_kpi_request.service_id.service_uuid.uuid = 'SERV2' _create_kpi_request.service_id.service_uuid.uuid = 'SERV2'
_create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1'
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' _create_kpi_request.endpoint_id.endpoint_uuid.uuid = str(uuid.uuid4())
_create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1'
_create_kpi_request.link_id.link_uuid.uuid = 'LNK1' _create_kpi_request.link_id.link_uuid.uuid = 'LNK1'
return _create_kpi_request return _create_kpi_request
...@@ -77,4 +77,4 @@ def create_kpi_filter_request(): ...@@ -77,4 +77,4 @@ def create_kpi_filter_request():
_create_kpi_filter_request.connection_id.append(connection_id_obj) _create_kpi_filter_request.connection_id.append(connection_id_obj)
_create_kpi_filter_request.link_id.append(link_id_obj) _create_kpi_filter_request.link_id.append(link_id_obj)
return _create_kpi_filter_request return _create_kpi_filter_request
\ No newline at end of file
...@@ -15,25 +15,21 @@ ...@@ -15,25 +15,21 @@
import json import json
import logging import logging
import threading import threading
from confluent_kafka import KafkaError
from confluent_kafka import Consumer as KafkaConsumer
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.Settings import get_service_port_grpc from common.Settings import get_service_port_grpc
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.tools.service.GenericGrpcService import GenericGrpcService from common.tools.service.GenericGrpcService import GenericGrpcService
from confluent_kafka import KafkaError
from confluent_kafka import Consumer as KafkaConsumer
from kpi_manager.client.KpiManagerClient import KpiManagerClient from kpi_manager.client.KpiManagerClient import KpiManagerClient
# -- test import --
# from kpi_value_writer.tests.test_messages import create_kpi_descriptor_request
from .MetricWriterToPrometheus import MetricWriterToPrometheus from .MetricWriterToPrometheus import MetricWriterToPrometheus
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
ACTIVE_CONSUMERS = []
class KpiValueWriter(GenericGrpcService): class KpiValueWriter(GenericGrpcService):
def __init__(self, cls_name : str = __name__) -> None: def __init__(self, cls_name : str = __name__) -> None:
...@@ -43,9 +39,8 @@ class KpiValueWriter(GenericGrpcService): ...@@ -43,9 +39,8 @@ class KpiValueWriter(GenericGrpcService):
'group.id' : 'KpiValueWriter', 'group.id' : 'KpiValueWriter',
'auto.offset.reset' : 'latest'}) 'auto.offset.reset' : 'latest'})
def RunKafkaConsumer(self): def install_servicers(self):
thread = threading.Thread(target=self.KafkaKpiConsumer, args=()) thread = threading.Thread(target=self.KafkaKpiConsumer, args=())
ACTIVE_CONSUMERS.append(thread)
thread.start() thread.start()
def KafkaKpiConsumer(self): def KafkaKpiConsumer(self):
...@@ -55,44 +50,32 @@ class KpiValueWriter(GenericGrpcService): ...@@ -55,44 +50,32 @@ class KpiValueWriter(GenericGrpcService):
consumer = self.kafka_consumer consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.VALUE.value]) consumer.subscribe([KafkaTopic.VALUE.value])
LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
while True: while True:
raw_kpi = consumer.poll(1.0) raw_kpi = consumer.poll(1.0)
if raw_kpi is None: if raw_kpi is None:
continue continue
elif raw_kpi.error(): elif raw_kpi.error():
if raw_kpi.error().code() == KafkaError._PARTITION_EOF: if raw_kpi.error().code() != KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(raw_kpi.error())) print("Consumer error: {}".format(raw_kpi.error()))
continue continue
try: try:
kpi_value = json.loads(raw_kpi.value().decode('utf-8')) kpi_value = json.loads(raw_kpi.value().decode('utf-8'))
LOGGER.info("Received KPI : {:}".format(kpi_value)) LOGGER.info("Received KPI : {:}".format(kpi_value))
print("Received KPI : {:}".format(kpi_value))
self.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer) self.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer)
except Exception as e: except:
print("Error detail: {:}".format(e)) LOGGER.exception("Error detail: ")
continue continue
def get_kpi_descriptor(self, kpi_value: str, kpi_manager_client, metric_writer): def get_kpi_descriptor(self, kpi_value: str, kpi_manager_client, metric_writer):
print("--- START -----")
kpi_id = KpiId() kpi_id = KpiId()
kpi_id.kpi_id.uuid = kpi_value['kpi_uuid'] kpi_id.kpi_id.uuid = kpi_value['kpi_id'] # type: ignore
print("KpiId generated: {:}".format(kpi_id))
# print("Kpi manger client created: {:}".format(kpi_manager_client))
try: try:
kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
# TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object???
if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid: if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid:
LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
metric_writer.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value) metric_writer.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value)
else: else:
LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) LOGGER.info("No KPI Descriptor found in Database for Kpi ID: {:}".format(kpi_id))
print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) except:
except Exception as e: LOGGER.exception("Unable to get KpiDescriptor")
LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e))
print ("Unable to get KpiDescriptor. Error: {:}".format(e))
...@@ -14,15 +14,20 @@ ...@@ -14,15 +14,20 @@
# read Kafka stream from Kafka topic # read Kafka stream from Kafka topic
import os
import logging import logging
from prometheus_client import Gauge
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_value_api_pb2 import KpiValue from prometheus_client import Gauge
from common.proto.kpi_manager_pb2 import KpiDescriptor from prometheus_client.exposition import push_to_gateway
from prometheus_client.registry import CollectorRegistry
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.kpi_manager_pb2 import KpiDescriptor
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
PROM_METRICS = {} PROM_METRICS = {}
GATEWAY_URL = os.getenv('PUSHGATEWAY_URL', 'prometheus-pushgateway.monitoring.svc.cluster.local:9091')
class MetricWriterToPrometheus: class MetricWriterToPrometheus:
''' '''
...@@ -30,7 +35,9 @@ class MetricWriterToPrometheus: ...@@ -30,7 +35,9 @@ class MetricWriterToPrometheus:
cooked KPI value = KpiDescriptor (gRPC message) + KpiValue (gRPC message) cooked KPI value = KpiDescriptor (gRPC message) + KpiValue (gRPC message)
''' '''
def __init__(self): def __init__(self):
pass self.job_name = 'kpivaluewriter'
self.registry = CollectorRegistry()
self.gateway_url = GATEWAY_URL
def merge_kpi_descriptor_and_kpi_value(self, kpi_descriptor, kpi_value): def merge_kpi_descriptor_and_kpi_value(self, kpi_descriptor, kpi_value):
# Creating a dictionary from the kpi_descriptor's attributes # Creating a dictionary from the kpi_descriptor's attributes
...@@ -45,25 +52,28 @@ class MetricWriterToPrometheus: ...@@ -45,25 +52,28 @@ class MetricWriterToPrometheus:
'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid, 'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid,
'link_id' : kpi_descriptor.link_id.link_uuid.uuid, 'link_id' : kpi_descriptor.link_id.link_uuid.uuid,
'time_stamp' : kpi_value.timestamp.timestamp, 'time_stamp' : kpi_value.timestamp.timestamp,
#'time_stamp' : kpi_value["time_stamp"],
'kpi_value' : kpi_value.kpi_value_type.floatVal 'kpi_value' : kpi_value.kpi_value_type.floatVal
#'kpi_value' : kpi_value["kpi_value"]
} }
LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi)) LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi))
return cooked_kpi return cooked_kpi
def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue): def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue):
# merge both gRPC messages into single varible. # merge both gRPC messages into single varible.
cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value)
tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'}
metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags
metric_name = cooked_kpi['kpi_sample_type'] metric_name = cooked_kpi['kpi_sample_type']
try: try:
if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists
PROM_METRICS[metric_name] = Gauge ( PROM_METRICS[metric_name] = Gauge (
metric_name, metric_name,
cooked_kpi['kpi_description'], cooked_kpi['kpi_description'],
metric_tags metric_tags,
registry=self.registry
) )
LOGGER.debug("Metric is created with labels: {:}".format(metric_tags)) LOGGER.debug("Metric is created with labels: {:}".format(metric_tags))
PROM_METRICS[metric_name].labels( PROM_METRICS[metric_name].labels(
kpi_id = cooked_kpi['kpi_id'], kpi_id = cooked_kpi['kpi_id'],
device_id = cooked_kpi['device_id'], device_id = cooked_kpi['device_id'],
...@@ -74,7 +84,11 @@ class MetricWriterToPrometheus: ...@@ -74,7 +84,11 @@ class MetricWriterToPrometheus:
link_id = cooked_kpi['link_id'], link_id = cooked_kpi['link_id'],
time_stamp = cooked_kpi['time_stamp'], time_stamp = cooked_kpi['time_stamp'],
).set(float(cooked_kpi['kpi_value'])) ).set(float(cooked_kpi['kpi_value']))
LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name])) LOGGER.debug("Metric is being pushed to the Gateway ... : {:}".format(PROM_METRICS[metric_name]))
# Push to the Prometheus Gateway, Prometheus is preconfigured to scrap the metrics from the gateway
push_to_gateway(self.gateway_url, job=self.job_name, registry=self.registry)
LOGGER.debug("Metric pushed to Prometheus Gateway.")
except ValueError as e: except ValueError as e:
if 'Duplicated timeseries' in str(e): if 'Duplicated timeseries' in str(e):
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
# limitations under the License. # limitations under the License.
import logging, signal, sys, threading import logging, signal, sys, threading
from prometheus_client import start_http_server
from kpi_value_writer.service.KpiValueWriter import KpiValueWriter from kpi_value_writer.service.KpiValueWriter import KpiValueWriter
from common.Settings import get_log_level from common.Settings import get_log_level
...@@ -39,8 +38,6 @@ def main(): ...@@ -39,8 +38,6 @@ def main():
grpc_service = KpiValueWriter() grpc_service = KpiValueWriter()
grpc_service.start() grpc_service.start()
start_http_server(10808)
LOGGER.debug("Prometheus client is started on port 10808")
# Wait for Ctrl+C or termination signal # Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass while not terminate.wait(timeout=1.0): pass
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment