Commits (27)
...@@ -18,6 +18,9 @@ ...@@ -18,6 +18,9 @@
# Read deployment settings # Read deployment settings
######################################################################################################################## ########################################################################################################################
# ----- TeraFlowSDN ------------------------------------------------------------
# If not already set, set the URL of the Docker registry where the images will be uploaded to. # If not already set, set the URL of the Docker registry where the images will be uploaded to.
# By default, assume internal MicroK8s registry is used. # By default, assume internal MicroK8s registry is used.
export TFS_REGISTRY_IMAGES=${TFS_REGISTRY_IMAGES:-"http://localhost:32000/tfs/"} export TFS_REGISTRY_IMAGES=${TFS_REGISTRY_IMAGES:-"http://localhost:32000/tfs/"}
...@@ -42,6 +45,9 @@ export TFS_GRAFANA_PASSWORD=${TFS_GRAFANA_PASSWORD:-"admin123+"} ...@@ -42,6 +45,9 @@ export TFS_GRAFANA_PASSWORD=${TFS_GRAFANA_PASSWORD:-"admin123+"}
# If TFS_SKIP_BUILD is "YES", the containers are not rebuilt-retagged-repushed and existing ones are used. # If TFS_SKIP_BUILD is "YES", the containers are not rebuilt-retagged-repushed and existing ones are used.
export TFS_SKIP_BUILD=${TFS_SKIP_BUILD:-""} export TFS_SKIP_BUILD=${TFS_SKIP_BUILD:-""}
# ----- CockroachDB ------------------------------------------------------------
# If not already set, set the namespace where CockroackDB will be deployed. # If not already set, set the namespace where CockroackDB will be deployed.
export CRDB_NAMESPACE=${CRDB_NAMESPACE:-"crdb"} export CRDB_NAMESPACE=${CRDB_NAMESPACE:-"crdb"}
...@@ -84,6 +90,9 @@ export CRDB_DROP_DATABASE_IF_EXISTS=${CRDB_DROP_DATABASE_IF_EXISTS:-""} ...@@ -84,6 +90,9 @@ export CRDB_DROP_DATABASE_IF_EXISTS=${CRDB_DROP_DATABASE_IF_EXISTS:-""}
# If CRDB_REDEPLOY is "YES", the database will be dropped while checking/deploying CockroachDB. # If CRDB_REDEPLOY is "YES", the database will be dropped while checking/deploying CockroachDB.
export CRDB_REDEPLOY=${CRDB_REDEPLOY:-""} export CRDB_REDEPLOY=${CRDB_REDEPLOY:-""}
# ----- NATS -------------------------------------------------------------------
# If not already set, set the namespace where NATS will be deployed. # If not already set, set the namespace where NATS will be deployed.
export NATS_NAMESPACE=${NATS_NAMESPACE:-"nats"} export NATS_NAMESPACE=${NATS_NAMESPACE:-"nats"}
...@@ -99,6 +108,32 @@ export NATS_SECRET_NAMESPACE=${NATS_SECRET_NAMESPACE:-${TFS_K8S_NAMESPACE}} ...@@ -99,6 +108,32 @@ export NATS_SECRET_NAMESPACE=${NATS_SECRET_NAMESPACE:-${TFS_K8S_NAMESPACE}}
export NATS_REDEPLOY=${NATS_REDEPLOY:-""} export NATS_REDEPLOY=${NATS_REDEPLOY:-""}
# ----- QuestDB ----------------------------------------------------------------
# If not already set, set the namespace where QuestDB will be deployed.
export QDB_NAMESPACE=${QDB_NAMESPACE:-"qdb"}
# If not already set, set the database username to be used by Monitoring.
export QDB_USERNAME=${QDB_USERNAME:-"admin"}
# If not already set, set the database user's password to be used by Monitoring.
export QDB_PASSWORD=${QDB_PASSWORD:-"quest"}
# If not already set, set the table name to be used by Monitoring.
export QDB_TABLE=${QDB_TABLE:-"tfs_monitoring"}
## If not already set, disable flag for dropping table if exists.
## WARNING: ACTIVATING THIS FLAG IMPLIES LOOSING THE TABLE INFORMATION!
## If QDB_DROP_TABLE_IF_EXISTS is "YES", the table pointed by variable QDB_TABLE will be dropped while
## checking/deploying QuestDB.
#export QDB_DROP_TABLE_IF_EXISTS=${QDB_DROP_TABLE_IF_EXISTS:-""}
# If not already set, disable flag for re-deploying QuestDB from scratch.
# WARNING: ACTIVATING THIS FLAG IMPLIES LOOSING THE DATABASE INFORMATION!
# If QDB_REDEPLOY is "YES", the database will be dropped while checking/deploying QuestDB.
export QDB_REDEPLOY=${QDB_REDEPLOY:-""}
######################################################################################################################## ########################################################################################################################
# Automated steps start here # Automated steps start here
######################################################################################################################## ########################################################################################################################
...@@ -109,7 +144,10 @@ export NATS_REDEPLOY=${NATS_REDEPLOY:-""} ...@@ -109,7 +144,10 @@ export NATS_REDEPLOY=${NATS_REDEPLOY:-""}
# Deploy NATS # Deploy NATS
./deploy/nats.sh ./deploy/nats.sh
# Deploy TFS # Deploy QuestDB
./deploy/qdb.sh
# Deploy TeraFlowSDN
./deploy/tfs.sh ./deploy/tfs.sh
# Show deploy summary # Show deploy summary
......
...@@ -44,7 +44,7 @@ export CRDB_DEPLOY_MODE=${CRDB_DEPLOY_MODE:-"single"} ...@@ -44,7 +44,7 @@ export CRDB_DEPLOY_MODE=${CRDB_DEPLOY_MODE:-"single"}
# If not already set, disable flag for dropping database if exists. # If not already set, disable flag for dropping database if exists.
# WARNING: ACTIVATING THIS FLAG IMPLIES LOOSING THE DATABASE INFORMATION! # WARNING: ACTIVATING THIS FLAG IMPLIES LOOSING THE DATABASE INFORMATION!
# If CRDB_DROP_DATABASE_IF_EXISTS is "YES", the database pointed by variable CRDB_NAMESPACE will be dropped while # If CRDB_DROP_DATABASE_IF_EXISTS is "YES", the database pointed by variable CRDB_DATABASE will be dropped while
# checking/deploying CockroachDB. # checking/deploying CockroachDB.
export CRDB_DROP_DATABASE_IF_EXISTS=${CRDB_DROP_DATABASE_IF_EXISTS:-""} export CRDB_DROP_DATABASE_IF_EXISTS=${CRDB_DROP_DATABASE_IF_EXISTS:-""}
......
#!/bin/bash
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Read deployment settings
########################################################################################################################
# If not already set, set the namespace where QuestDB will be deployed.
export QDB_NAMESPACE=${QDB_NAMESPACE:-"qdb"}
# If not already set, set the database username to be used by Monitoring.
export QDB_USERNAME=${QDB_USERNAME:-"admin"}
# If not already set, set the database user's password to be used by Monitoring.
export QDB_PASSWORD=${QDB_PASSWORD:-"quest"}
# If not already set, set the table name to be used by Monitoring.
export QDB_TABLE=${QDB_TABLE:-"tfs_monitoring"}
## If not already set, disable flag for dropping table if exists.
## WARNING: ACTIVATING THIS FLAG IMPLIES LOOSING THE TABLE INFORMATION!
## If QDB_DROP_TABLE_IF_EXISTS is "YES", the table pointed by variable QDB_TABLE will be dropped while
## checking/deploying QuestDB.
#export QDB_DROP_TABLE_IF_EXISTS=${QDB_DROP_TABLE_IF_EXISTS:-""}
# If not already set, disable flag for re-deploying QuestDB from scratch.
# WARNING: ACTIVATING THIS FLAG IMPLIES LOOSING THE DATABASE INFORMATION!
# If QDB_REDEPLOY is "YES", the database will be dropped while checking/deploying QuestDB.
export QDB_REDEPLOY=${QDB_REDEPLOY:-""}
########################################################################################################################
# Automated steps start here
########################################################################################################################
# Constants
TMP_FOLDER="./tmp"
QDB_MANIFESTS_PATH="manifests/questdb"
# Create a tmp folder for files modified during the deployment
TMP_MANIFESTS_FOLDER="$TMP_FOLDER/manifests"
TMP_LOGS_FOLDER="$TMP_FOLDER/logs"
QDB_LOG_FILE="$TMP_LOGS_FOLDER/qdb_deploy.log"
mkdir -p $TMP_LOGS_FOLDER
function qdb_deploy() {
echo "QuestDB Namespace"
echo ">>> Create QuestDB Namespace (if missing)"
kubectl create namespace ${QDB_NAMESPACE}
echo
echo "QuestDB"
echo ">>> Checking if QuestDB is deployed..."
if kubectl get --namespace ${QDB_NAMESPACE} statefulset/questdb &> /dev/null; then
echo ">>> QuestDB is present; skipping step."
else
echo ">>> Deploy QuestDB"
cp "${QDB_MANIFESTS_PATH}/manifest.yaml" "${TMP_MANIFESTS_FOLDER}/qdb_manifest.yaml"
kubectl apply --namespace ${QDB_NAMESPACE} -f "${TMP_MANIFESTS_FOLDER}/qdb_manifest.yaml"
echo ">>> Waiting QuestDB statefulset to be created..."
while ! kubectl get --namespace ${QDB_NAMESPACE} statefulset/questdb &> /dev/null; do
printf "%c" "."
sleep 1
done
# Wait for statefulset condition "Available=True" does not work
# Wait for statefulset condition "jsonpath='{.status.readyReplicas}'=3" throws error:
# "error: readyReplicas is not found"
# Workaround: Check the pods are ready
#echo ">>> QuestDB statefulset created. Waiting for readiness condition..."
#kubectl wait --namespace ${QDB_NAMESPACE} --for=condition=Available=True --timeout=300s statefulset/questdb
#kubectl wait --namespace ${QDB_NAMESPACE} --for=jsonpath='{.status.readyReplicas}'=3 --timeout=300s \
# statefulset/questdb
echo ">>> QuestDB statefulset created. Waiting QuestDB pods to be created..."
while ! kubectl get --namespace ${QDB_NAMESPACE} pod/questdb-0 &> /dev/null; do
printf "%c" "."
sleep 1
done
kubectl wait --namespace ${QDB_NAMESPACE} --for=condition=Ready --timeout=300s pod/questdb-0
fi
echo
echo "QuestDB Port Mapping"
echo ">>> Expose QuestDB SQL port (8812->8812)"
QDB_SQL_PORT=$(kubectl --namespace ${QDB_NAMESPACE} get service questdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}')
PATCH='{"data": {"'${QDB_SQL_PORT}'": "'${QDB_NAMESPACE}'/questdb-public:'${QDB_SQL_PORT}'"}}'
kubectl patch configmap nginx-ingress-tcp-microk8s-conf --namespace ingress --patch "${PATCH}"
PORT_MAP='{"containerPort": '${QDB_SQL_PORT}', "hostPort": '${QDB_SQL_PORT}'}'
CONTAINER='{"name": "nginx-ingress-microk8s", "ports": ['${PORT_MAP}']}'
PATCH='{"spec": {"template": {"spec": {"containers": ['${CONTAINER}']}}}}'
kubectl patch daemonset nginx-ingress-microk8s-controller --namespace ingress --patch "${PATCH}"
echo
echo ">>> Expose QuestDB Influx Line Protocol port (9009->9009)"
QDB_ILP_PORT=$(kubectl --namespace ${QDB_NAMESPACE} get service questdb-public -o 'jsonpath={.spec.ports[?(@.name=="ilp")].port}')
PATCH='{"data": {"'${QDB_ILP_PORT}'": "'${QDB_NAMESPACE}'/questdb-public:'${QDB_ILP_PORT}'"}}'
kubectl patch configmap nginx-ingress-tcp-microk8s-conf --namespace ingress --patch "${PATCH}"
PORT_MAP='{"containerPort": '${QDB_ILP_PORT}', "hostPort": '${QDB_ILP_PORT}'}'
CONTAINER='{"name": "nginx-ingress-microk8s", "ports": ['${PORT_MAP}']}'
PATCH='{"spec": {"template": {"spec": {"containers": ['${CONTAINER}']}}}}'
kubectl patch daemonset nginx-ingress-microk8s-controller --namespace ingress --patch "${PATCH}"
echo
echo ">>> Expose QuestDB HTTP Mgmt GUI port (9000->9000)"
QDB_GUI_PORT=$(kubectl --namespace ${QDB_NAMESPACE} get service questdb-public -o 'jsonpath={.spec.ports[?(@.name=="http")].port}')
PATCH='{"data": {"'${QDB_GUI_PORT}'": "'${QDB_NAMESPACE}'/questdb-public:'${QDB_GUI_PORT}'"}}'
kubectl patch configmap nginx-ingress-tcp-microk8s-conf --namespace ingress --patch "${PATCH}"
PORT_MAP='{"containerPort": '${QDB_GUI_PORT}', "hostPort": '${QDB_GUI_PORT}'}'
CONTAINER='{"name": "nginx-ingress-microk8s", "ports": ['${PORT_MAP}']}'
PATCH='{"spec": {"template": {"spec": {"containers": ['${CONTAINER}']}}}}'
kubectl patch daemonset nginx-ingress-microk8s-controller --namespace ingress --patch "${PATCH}"
echo
}
function qdb_undeploy() {
echo "QuestDB"
echo ">>> Checking if QuestDB is deployed..."
if kubectl get --namespace ${QDB_NAMESPACE} statefulset/questdb &> /dev/null; then
echo ">>> Undeploy QuestDB"
kubectl delete --namespace ${QDB_NAMESPACE} -f "${TMP_MANIFESTS_FOLDER}/qdb_manifest.yaml" --ignore-not-found
else
echo ">>> QuestDB is not present; skipping step."
fi
echo
echo "QuestDB Namespace"
echo ">>> Delete QuestDB Namespace (if exists)"
echo "NOTE: this step might take few minutes to complete!"
kubectl delete namespace ${QDB_NAMESPACE} --ignore-not-found
echo
}
# TODO: implement method to drop table
#function qdb_drop_table() {
# echo "Drop table if exists"
# QDB_CLIENT_URL="postgresql://${QDB_USERNAME}:${QDB_PASSWORD}@questdb-0:${QDB_SQL_PORT}/defaultdb?sslmode=require"
# kubectl exec -it --namespace ${QDB_NAMESPACE} questdb-0 -- \
# ./qdb sql --certs-dir=/qdb/qdb-certs --url=${QDB_CLIENT_URL} \
# --execute "DROP TABLE IF EXISTS ${QDB_TABLE};"
# echo
#}
if [ "$QDB_REDEPLOY" == "YES" ]; then
qdb_undeploy
#elif [ "$QDB_DROP_TABLE_IF_EXISTS" == "YES" ]; then
# qdb_drop_table
fi
qdb_deploy
...@@ -57,6 +57,18 @@ export CRDB_DATABASE=${CRDB_DATABASE:-"tfs"} ...@@ -57,6 +57,18 @@ export CRDB_DATABASE=${CRDB_DATABASE:-"tfs"}
# If not already set, set the namespace where NATS will be deployed. # If not already set, set the namespace where NATS will be deployed.
export NATS_NAMESPACE=${NATS_NAMESPACE:-"nats"} export NATS_NAMESPACE=${NATS_NAMESPACE:-"nats"}
# If not already set, set the namespace where QuestDB will be deployed.
export QDB_NAMESPACE=${QDB_NAMESPACE:-"qdb"}
# If not already set, set the database username to be used by Monitoring.
export QDB_USERNAME=${QDB_USERNAME:-"admin"}
# If not already set, set the database user's password to be used by Monitoring.
export QDB_PASSWORD=${QDB_PASSWORD:-"quest"}
# If not already set, set the table name to be used by Monitoring.
export QDB_TABLE=${QDB_TABLE:-"tfs_monitoring"}
######################################################################################################################## ########################################################################################################################
# Automated steps start here # Automated steps start here
...@@ -95,6 +107,22 @@ kubectl create secret generic nats-data --namespace ${TFS_K8S_NAMESPACE} --type= ...@@ -95,6 +107,22 @@ kubectl create secret generic nats-data --namespace ${TFS_K8S_NAMESPACE} --type=
--from-literal=NATS_CLIENT_PORT=${NATS_CLIENT_PORT} --from-literal=NATS_CLIENT_PORT=${NATS_CLIENT_PORT}
printf "\n" printf "\n"
echo "Create secret with QuestDB data"
QDB_HTTP_PORT=$(kubectl --namespace ${QDB_NAMESPACE} get service questdb-public -o 'jsonpath={.spec.ports[?(@.name=="http")].port}')
QDB_ILP_PORT=$(kubectl --namespace ${QDB_NAMESPACE} get service questdb-public -o 'jsonpath={.spec.ports[?(@.name=="ilp")].port}')
QDB_SQL_PORT=$(kubectl --namespace ${QDB_NAMESPACE} get service questdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}')
METRICSDB_HOSTNAME="questdb-public.${QDB_NAMESPACE}.svc.cluster.local"
kubectl create secret generic qdb-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
--from-literal=QDB_NAMESPACE=${QDB_NAMESPACE} \
--from-literal=METRICSDB_HOSTNAME=${METRICSDB_HOSTNAME} \
--from-literal=METRICSDB_REST_PORT=${QDB_HTTP_PORT} \
--from-literal=METRICSDB_ILP_PORT=${QDB_ILP_PORT} \
--from-literal=METRICSDB_SQL_PORT=${QDB_SQL_PORT} \
--from-literal=METRICSDB_TABLE=${QDB_TABLE} \
--from-literal=METRICSDB_USERNAME=${QDB_USERNAME} \
--from-literal=METRICSDB_PASSWORD=${QDB_PASSWORD}
printf "\n"
echo "Deploying components and collecting environment variables..." echo "Deploying components and collecting environment variables..."
ENV_VARS_SCRIPT=tfs_runtime_env_vars.sh ENV_VARS_SCRIPT=tfs_runtime_env_vars.sh
echo "# Environment variables for TeraFlowSDN deployment" > $ENV_VARS_SCRIPT echo "# Environment variables for TeraFlowSDN deployment" > $ENV_VARS_SCRIPT
...@@ -251,17 +279,6 @@ for EXTRA_MANIFEST in $TFS_EXTRA_MANIFESTS; do ...@@ -251,17 +279,6 @@ for EXTRA_MANIFEST in $TFS_EXTRA_MANIFESTS; do
done done
printf "\n" printf "\n"
# By now, leave these controls here. Some component dependencies are not well handled.
if [[ "$TFS_COMPONENTS" == *"monitoring"* ]]; then
echo "Waiting for 'MonitoringDB' component..."
# Kubernetes does not implement --for='condition=available' for statefulsets.
# By now, we assume a single replica of monitoringdb. To be updated in future releases.
kubectl wait --namespace $TFS_K8S_NAMESPACE \
--for=jsonpath='{.status.readyReplicas}'=1 --timeout=300s statefulset/monitoringdb
printf "\n"
fi
for COMPONENT in $TFS_COMPONENTS; do for COMPONENT in $TFS_COMPONENTS; do
echo "Waiting for '$COMPONENT' component..." echo "Waiting for '$COMPONENT' component..."
COMPONENT_OBJNAME=$(echo "${COMPONENT}" | sed "s/\_/-/") COMPONENT_OBJNAME=$(echo "${COMPONENT}" | sed "s/\_/-/")
...@@ -272,36 +289,19 @@ done ...@@ -272,36 +289,19 @@ done
if [[ "$TFS_COMPONENTS" == *"webui"* ]] && [[ "$TFS_COMPONENTS" == *"monitoring"* ]]; then if [[ "$TFS_COMPONENTS" == *"webui"* ]] && [[ "$TFS_COMPONENTS" == *"monitoring"* ]]; then
echo "Configuring WebUI DataStores and Dashboards..." echo "Configuring WebUI DataStores and Dashboards..."
sleep 3 sleep 5
# INFLUXDB_HOST="monitoringservice"
# INFLUXDB_PORT=$(kubectl --namespace $TFS_K8S_NAMESPACE get service/monitoringservice -o jsonpath='{.spec.ports[?(@.name=="influxdb")].port}')
# INFLUXDB_URL="http://${INFLUXDB_HOST}:${INFLUXDB_PORT}"
# INFLUXDB_USER=$(kubectl --namespace $TFS_K8S_NAMESPACE get secrets influxdb-secrets -o jsonpath='{.data.INFLUXDB_ADMIN_USER}' | base64 --decode)
# INFLUXDB_PASSWORD=$(kubectl --namespace $TFS_K8S_NAMESPACE get secrets influxdb-secrets -o jsonpath='{.data.INFLUXDB_ADMIN_PASSWORD}' | base64 --decode)
# INFLUXDB_DATABASE=$(kubectl --namespace $TFS_K8S_NAMESPACE get secrets influxdb-secrets -o jsonpath='{.data.INFLUXDB_DB}' | base64 --decode)
# Exposed through the ingress controller "tfs-ingress" # Exposed through the ingress controller "tfs-ingress"
GRAFANA_HOSTNAME="127.0.0.1" GRAFANA_URL="127.0.0.1:80/grafana"
GRAFANA_PORT="80"
GRAFANA_BASEURL="/grafana"
# Default Grafana credentials # Default Grafana credentials
GRAFANA_USERNAME="admin" GRAFANA_USERNAME="admin"
GRAFANA_PASSWORD="admin" GRAFANA_PASSWORD="admin"
# Default Grafana API URL
GRAFANA_URL_DEFAULT="http://${GRAFANA_USERNAME}:${GRAFANA_PASSWORD}@${GRAFANA_HOSTNAME}:${GRAFANA_PORT}${GRAFANA_BASEURL}"
# Updated Grafana API URL
GRAFANA_URL_UPDATED="http://${GRAFANA_USERNAME}:${TFS_GRAFANA_PASSWORD}@${GRAFANA_HOSTNAME}:${GRAFANA_PORT}${GRAFANA_BASEURL}"
echo "export GRAFANA_URL_UPDATED=${GRAFANA_URL_UPDATED}" >> $ENV_VARS_SCRIPT
echo "Connecting to grafana at URL: ${GRAFANA_URL_DEFAULT}..."
# Configure Grafana Admin Password # Configure Grafana Admin Password
# Ref: https://grafana.com/docs/grafana/latest/http_api/user/#change-password # Ref: https://grafana.com/docs/grafana/latest/http_api/user/#change-password
GRAFANA_URL_DEFAULT="http://${GRAFANA_USERNAME}:${GRAFANA_PASSWORD}@${GRAFANA_URL}"
echo "Connecting to grafana at URL: ${GRAFANA_URL_DEFAULT}..."
curl -X PUT -H "Content-Type: application/json" -d '{ curl -X PUT -H "Content-Type: application/json" -d '{
"oldPassword": "'${GRAFANA_PASSWORD}'", "oldPassword": "'${GRAFANA_PASSWORD}'",
"newPassword": "'${TFS_GRAFANA_PASSWORD}'", "newPassword": "'${TFS_GRAFANA_PASSWORD}'",
...@@ -309,16 +309,21 @@ if [[ "$TFS_COMPONENTS" == *"webui"* ]] && [[ "$TFS_COMPONENTS" == *"monitoring" ...@@ -309,16 +309,21 @@ if [[ "$TFS_COMPONENTS" == *"webui"* ]] && [[ "$TFS_COMPONENTS" == *"monitoring"
}' ${GRAFANA_URL_DEFAULT}/api/user/password }' ${GRAFANA_URL_DEFAULT}/api/user/password
echo echo
# Updated Grafana API URL
GRAFANA_URL_UPDATED="http://${GRAFANA_USERNAME}:${TFS_GRAFANA_PASSWORD}@${GRAFANA_URL}"
echo "export GRAFANA_URL_UPDATED=${GRAFANA_URL_UPDATED}" >> $ENV_VARS_SCRIPT
# Ref: https://grafana.com/docs/grafana/latest/http_api/data_source/ # Ref: https://grafana.com/docs/grafana/latest/http_api/data_source/
# TODO: replace user, password and database by variables to be saved # TODO: replace user, password and database by variables to be saved
QDB_HOST_PORT="${METRICSDB_HOSTNAME}:${QDB_SQL_PORT}"
echo "Creating a datasource..." echo "Creating a datasource..."
curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -d '{ curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -d '{
"access" : "proxy", "access" : "proxy",
"type" : "postgres", "type" : "postgres",
"name" : "monitoringdb", "name" : "questdb",
"url" : "monitoringservice:8812", "url" : "'${QDB_HOST_PORT}'",
"database" : "monitoring", "database" : "'${QDB_TABLE}'",
"user" : "admin", "user" : "'${QDB_USERNAME}'",
"basicAuth": false, "basicAuth": false,
"isDefault": true, "isDefault": true,
"jsonData" : { "jsonData" : {
...@@ -333,9 +338,7 @@ if [[ "$TFS_COMPONENTS" == *"webui"* ]] && [[ "$TFS_COMPONENTS" == *"monitoring" ...@@ -333,9 +338,7 @@ if [[ "$TFS_COMPONENTS" == *"webui"* ]] && [[ "$TFS_COMPONENTS" == *"monitoring"
"tlsConfigurationMethod": "file-path", "tlsConfigurationMethod": "file-path",
"tlsSkipVerify" : true "tlsSkipVerify" : true
}, },
"secureJsonData": { "secureJsonData": {"password": "'${QDB_PASSWORD}'"}
"password": "quest"
}
}' ${GRAFANA_URL_UPDATED}/api/datasources }' ${GRAFANA_URL_UPDATED}/api/datasources
echo echo
...@@ -352,4 +355,3 @@ if [[ "$TFS_COMPONENTS" == *"webui"* ]] && [[ "$TFS_COMPONENTS" == *"monitoring" ...@@ -352,4 +355,3 @@ if [[ "$TFS_COMPONENTS" == *"webui"* ]] && [[ "$TFS_COMPONENTS" == *"monitoring"
printf "\n\n" printf "\n\n"
fi fi
...@@ -12,42 +12,6 @@ ...@@ -12,42 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: monitoringdb
spec:
selector:
matchLabels:
app: monitoringservice
serviceName: "monitoringservice"
replicas: 1
template:
metadata:
labels:
app: monitoringservice
spec:
terminationGracePeriodSeconds: 5
restartPolicy: Always
containers:
- name: metricsdb
image: questdb/questdb
ports:
- name: http
containerPort: 9000
protocol: TCP
- name: influxdb
containerPort: 9009
protocol: TCP
- name: postgre
containerPort: 8812
protocol: TCP
env:
- name: QDB_CAIRO_COMMIT_LAG
value: "1000"
- name: QDB_CAIRO_MAX_UNCOMMITTED_ROWS
value: "100000"
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
...@@ -63,29 +27,19 @@ spec: ...@@ -63,29 +27,19 @@ spec:
app: monitoringservice app: monitoringservice
spec: spec:
terminationGracePeriodSeconds: 5 terminationGracePeriodSeconds: 5
restartPolicy: Always
containers: containers:
- name: server - name: server
image: labs.etsi.org:5050/tfs/controller/monitoring:latest image: labs.etsi.org:5050/tfs/controller/monitoring:latest
imagePullPolicy: Always imagePullPolicy: Always
ports: ports:
- name: grpc - containerPort: 7070
containerPort: 7070 - containerPort: 9192
protocol: TCP
- name: metrics
containerPort: 9192
protocol: TCP
env: env:
- name: LOG_LEVEL - name: LOG_LEVEL
value: "INFO" value: "INFO"
- name: METRICSDB_HOSTNAME envFrom:
value: "monitoringservice" - secretRef:
- name: METRICSDB_ILP_PORT name: qdb-data
value: "9009"
- name: METRICSDB_REST_PORT
value: "9000"
- name: METRICSDB_TABLE
value: "monitoring"
readinessProbe: readinessProbe:
exec: exec:
command: ["/bin/grpc_health_probe", "-addr=:7070"] command: ["/bin/grpc_health_probe", "-addr=:7070"]
...@@ -94,11 +48,11 @@ spec: ...@@ -94,11 +48,11 @@ spec:
command: ["/bin/grpc_health_probe", "-addr=:7070"] command: ["/bin/grpc_health_probe", "-addr=:7070"]
resources: resources:
requests: requests:
cpu: 250m cpu: 50m
memory: 512Mi memory: 64Mi
limits: limits:
cpu: 700m cpu: 500m
memory: 1024Mi memory: 512Mi
--- ---
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
...@@ -115,41 +69,7 @@ spec: ...@@ -115,41 +69,7 @@ spec:
protocol: TCP protocol: TCP
port: 7070 port: 7070
targetPort: 7070 targetPort: 7070
- name: http
protocol: TCP
port: 9000
targetPort: 9000
- name: influxdb
protocol: TCP
port: 9009
targetPort: 9009
- name: postgre
protocol: TCP
port: 8812
targetPort: 8812
- name: metrics - name: metrics
protocol: TCP protocol: TCP
port: 9192 port: 9192
targetPort: 9192 targetPort: 9192
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: access-monitoring
spec:
podSelector:
matchLabels:
app: monitoringservice
ingress:
- from: []
ports:
- port: 7070
- port: 8812
- from:
- podSelector:
matchLabels:
app: monitoringservice
ports:
- port: 9009
- port: 9000
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: questdb
spec:
selector:
matchLabels:
app: questdb
serviceName: "questdb-public"
replicas: 1
template:
metadata:
labels:
app: questdb
spec:
terminationGracePeriodSeconds: 5
restartPolicy: Always
containers:
- name: metricsdb
image: questdb/questdb
ports:
- containerPort: 9000
- containerPort: 9009
- containerPort: 8812
env:
- name: QDB_CAIRO_COMMIT_LAG
value: "1000"
- name: QDB_CAIRO_MAX_UNCOMMITTED_ROWS
value: "100000"
---
apiVersion: v1
kind: Service
metadata:
name: questdb-public
labels:
app: questdb
spec:
type: ClusterIP
selector:
app: questdb
ports:
- name: http
protocol: TCP
port: 9000
targetPort: 9000
- name: ilp
protocol: TCP
port: 9009
targetPort: 9009
- name: sql
protocol: TCP
port: 8812
targetPort: 8812
...@@ -37,7 +37,9 @@ export TFS_GRAFANA_PASSWORD="admin123+" ...@@ -37,7 +37,9 @@ export TFS_GRAFANA_PASSWORD="admin123+"
# Disable skip-build flag to rebuild the Docker images. # Disable skip-build flag to rebuild the Docker images.
export TFS_SKIP_BUILD="" export TFS_SKIP_BUILD=""
# ----- CockroachDB ------------------------------------------------------------ # ----- CockroachDB ------------------------------------------------------------
# Set the namespace where CockroackDB will be deployed. # Set the namespace where CockroackDB will be deployed.
export CRDB_NAMESPACE="crdb" export CRDB_NAMESPACE="crdb"
...@@ -60,9 +62,32 @@ export CRDB_DROP_DATABASE_IF_EXISTS="" ...@@ -60,9 +62,32 @@ export CRDB_DROP_DATABASE_IF_EXISTS=""
# Disable flag for re-deploying CockroachDB from scratch. # Disable flag for re-deploying CockroachDB from scratch.
export CRDB_REDEPLOY="" export CRDB_REDEPLOY=""
# ----- NATS ------------------------------------------------------------------- # ----- NATS -------------------------------------------------------------------
# Set the namespace where NATS will be deployed. # Set the namespace where NATS will be deployed.
export NATS_NAMESPACE="nats" export NATS_NAMESPACE="nats"
# Disable flag for re-deploying NATS from scratch. # Disable flag for re-deploying NATS from scratch.
export NATS_REDEPLOY="" export NATS_REDEPLOY=""
# ----- QuestDB ----------------------------------------------------------------
# If not already set, set the namespace where QuestDB will be deployed.
export QDB_NAMESPACE=${QDB_NAMESPACE:-"qdb"}
# If not already set, set the database username to be used by Monitoring.
export QDB_USERNAME=${QDB_USERNAME:-"admin"}
# If not already set, set the database user's password to be used by Monitoring.
export QDB_PASSWORD=${QDB_PASSWORD:-"quest"}
# If not already set, set the table name to be used by Monitoring.
export QDB_TABLE=${QDB_TABLE:-"tfs_monitoring"}
## If not already set, disable flag for dropping table if exists.
#export QDB_DROP_TABLE_IF_EXISTS=${QDB_DROP_TABLE_IF_EXISTS:-""}
# If not already set, disable flag for re-deploying QuestDB from scratch.
export QDB_REDEPLOY=${QDB_REDEPLOY:-""}
...@@ -37,7 +37,6 @@ FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4 AS release ...@@ -37,7 +37,6 @@ FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4 AS release
ARG JAVA_PACKAGE=java-11-openjdk-headless ARG JAVA_PACKAGE=java-11-openjdk-headless
ARG RUN_JAVA_VERSION=1.3.8 ARG RUN_JAVA_VERSION=1.3.8
ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en'
ENV QUARKUS_LAUNCH_DEVMODE="true"
# Install java and the run-java script # Install java and the run-java script
# Also set up permissions for user `1001` # Also set up permissions for user `1001`
RUN microdnf install curl ca-certificates ${JAVA_PACKAGE} \ RUN microdnf install curl ca-certificates ${JAVA_PACKAGE} \
......
...@@ -17,9 +17,6 @@ automation: ...@@ -17,9 +17,6 @@ automation:
quarkus: quarkus:
package: package:
type: mutable-jar type: mutable-jar
live-reload:
password: 1234
url: http://0.0.0.0:8080
banner: banner:
path: teraflow-automation-banner.txt path: teraflow-automation-banner.txt
grpc: grpc:
......
...@@ -19,9 +19,10 @@ from sqlalchemy.orm import Session, sessionmaker ...@@ -19,9 +19,10 @@ from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy_cockroachdb import run_transaction from sqlalchemy_cockroachdb import run_transaction
from typing import Dict, List, Optional, Set, Tuple from typing import Dict, List, Optional, Set, Tuple
from common.method_wrappers.ServiceExceptions import InvalidArgumentException, NotFoundException from common.method_wrappers.ServiceExceptions import InvalidArgumentException, NotFoundException
from common.proto.context_pb2 import Device, DeviceId from common.proto.context_pb2 import Device, DeviceId, TopologyId
from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Device import json_device_id from common.tools.object_factory.Device import json_device_id
from context.service.database.uuids.Topology import topology_get_uuid
from .models.DeviceModel import DeviceModel from .models.DeviceModel import DeviceModel
from .models.EndPointModel import EndPointModel from .models.EndPointModel import EndPointModel
from .models.TopologyModel import TopologyDeviceModel from .models.TopologyModel import TopologyDeviceModel
...@@ -73,6 +74,15 @@ def device_set(db_engine : Engine, request : Device) -> Tuple[Dict, bool]: ...@@ -73,6 +74,15 @@ def device_set(db_engine : Engine, request : Device) -> Tuple[Dict, bool]:
topology_uuids : Set[str] = set() topology_uuids : Set[str] = set()
related_topologies : List[Dict] = list() related_topologies : List[Dict] = list()
# By default, always add device to default Context/Topology
_,topology_uuid = topology_get_uuid(TopologyId(), allow_random=False, allow_default=True)
related_topologies.append({
'topology_uuid': topology_uuid,
'device_uuid' : device_uuid,
})
topology_uuids.add(topology_uuid)
endpoints_data : List[Dict] = list() endpoints_data : List[Dict] = list()
for i, endpoint in enumerate(request.device_endpoints): for i, endpoint in enumerate(request.device_endpoints):
endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
......
...@@ -27,7 +27,7 @@ from .driver_api.DriverInstanceCache import DriverInstanceCache, get_driver ...@@ -27,7 +27,7 @@ from .driver_api.DriverInstanceCache import DriverInstanceCache, get_driver
from .monitoring.MonitoringLoops import MonitoringLoops from .monitoring.MonitoringLoops import MonitoringLoops
from .Tools import ( from .Tools import (
check_connect_rules, check_no_endpoints, compute_rules_to_add_delete, configure_rules, deconfigure_rules, check_connect_rules, check_no_endpoints, compute_rules_to_add_delete, configure_rules, deconfigure_rules,
populate_config_rules, populate_endpoints, populate_initial_config_rules, subscribe_kpi, unsubscribe_kpi) populate_config_rules, populate_endpoint_monitoring_resources, populate_endpoints, populate_initial_config_rules, subscribe_kpi, unsubscribe_kpi)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -86,6 +86,11 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): ...@@ -86,6 +86,11 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
raise OperationFailedException('AddDevice', extra_details=errors) raise OperationFailedException('AddDevice', extra_details=errors)
device_id = context_client.SetDevice(device) device_id = context_client.SetDevice(device)
# Update endpoint monitoring resources with UUIDs
device_with_uuids = context_client.GetDevice(device_id)
populate_endpoint_monitoring_resources(device_with_uuids, self.monitoring_loops)
return device_id return device_id
finally: finally:
self.mutex_queues.signal_done(device_uuid) self.mutex_queues.signal_done(device_uuid)
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json import json, logging
from typing import Any, Dict, List, Tuple, Union from typing import Any, Dict, List, Tuple, Union
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.method_wrappers.ServiceExceptions import InvalidArgumentException from common.method_wrappers.ServiceExceptions import InvalidArgumentException
...@@ -27,6 +27,8 @@ from .Errors import ( ...@@ -27,6 +27,8 @@ from .Errors import (
ERROR_BAD_ENDPOINT, ERROR_DELETE, ERROR_GET, ERROR_GET_INIT, ERROR_MISSING_KPI, ERROR_SAMPLETYPE, ERROR_SET, ERROR_BAD_ENDPOINT, ERROR_DELETE, ERROR_GET, ERROR_GET_INIT, ERROR_MISSING_KPI, ERROR_SAMPLETYPE, ERROR_SET,
ERROR_SUBSCRIBE, ERROR_UNSUBSCRIBE) ERROR_SUBSCRIBE, ERROR_UNSUBSCRIBE)
LOGGER = logging.getLogger(__name__)
def check_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]: def check_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]:
connection_config_rules = dict() connection_config_rules = dict()
unexpected_config_rules = list() unexpected_config_rules = list()
...@@ -107,6 +109,21 @@ def populate_endpoints(device : Device, driver : _Driver, monitoring_loops : Mon ...@@ -107,6 +109,21 @@ def populate_endpoints(device : Device, driver : _Driver, monitoring_loops : Mon
return errors return errors
def populate_endpoint_monitoring_resources(device_with_uuids : Device, monitoring_loops : MonitoringLoops) -> None:
device_uuid = device_with_uuids.device_id.device_uuid.uuid
for endpoint in device_with_uuids.device_endpoints:
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
endpoint_name = endpoint.name
kpi_sample_types = endpoint.kpi_sample_types
for kpi_sample_type in kpi_sample_types:
monitor_resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type)
if monitor_resource_key is not None: continue
monitor_resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_name, kpi_sample_type)
if monitor_resource_key is None: continue
monitoring_loops.add_resource_key(device_uuid, endpoint_uuid, kpi_sample_type, monitor_resource_key)
def _raw_config_rules_to_grpc( def _raw_config_rules_to_grpc(
device_uuid : str, device_config : DeviceConfig, error_template : str, config_action : ConfigActionEnum, device_uuid : str, device_config : DeviceConfig, error_template : str, config_action : ConfigActionEnum,
raw_config_rules : List[Tuple[str, Union[Any, Exception, None]]] raw_config_rules : List[Tuple[str, Union[Any, Exception, None]]]
...@@ -118,6 +135,7 @@ def _raw_config_rules_to_grpc( ...@@ -118,6 +135,7 @@ def _raw_config_rules_to_grpc(
errors.append(error_template.format(device_uuid, str(resource_key), str(resource_value))) errors.append(error_template.format(device_uuid, str(resource_key), str(resource_value)))
continue continue
if resource_value is None: continue
resource_value = json.loads(resource_value) if isinstance(resource_value, str) else resource_value resource_value = json.loads(resource_value) if isinstance(resource_value, str) else resource_value
resource_value = {field_name : (field_value, False) for field_name,field_value in resource_value.items()} resource_value = {field_name : (field_value, False) for field_name,field_value in resource_value.items()}
update_config_rule_custom(device_config.config_rules, resource_key, resource_value, new_action=config_action) update_config_rule_custom(device_config.config_rules, resource_key, resource_value, new_action=config_action)
...@@ -202,11 +220,12 @@ def subscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loo ...@@ -202,11 +220,12 @@ def subscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loo
resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type) resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type)
if resource_key is None: if resource_key is None:
kpi_sample_type_name = KpiSampleType.Name(kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') kpi_sample_type_name = KpiSampleType.Name(kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
return [ MSG = ERROR_SAMPLETYPE.format(
ERROR_SAMPLETYPE.format( str(device_uuid), str(endpoint_uuid), str(kpi_sample_type), str(kpi_sample_type_name)
str(device_uuid), str(endpoint_uuid), str(kpi_sample_type), str(kpi_sample_type_name) )
) LOGGER.warning('{:s} Supported Device-Endpoint-KpiSampleType items: {:s}'.format(
] MSG, str(monitoring_loops.get_all_resource_keys())))
return [MSG]
sampling_duration = request.sampling_duration_s # seconds sampling_duration = request.sampling_duration_s # seconds
sampling_interval = request.sampling_interval_s # seconds sampling_interval = request.sampling_interval_s # seconds
......
...@@ -29,7 +29,7 @@ from .Tools import compose_resource_endpoint ...@@ -29,7 +29,7 @@ from .Tools import compose_resource_endpoint
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
RE_GET_ENDPOINT_FROM_INTERFACE = re.compile(r'.*\/interface\[([^\]]+)\].*') RE_GET_ENDPOINT_FROM_INTERFACE = re.compile(r'^\/interface\[([^\]]+)\].*')
HISTOGRAM_BUCKETS = ( HISTOGRAM_BUCKETS = (
# .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, INF # .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, INF
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging, queue, threading import copy, logging, queue, threading
from typing import Dict, Optional, Tuple, Union from typing import Dict, Optional, Tuple, Union
from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2 import Kpi from common.proto.monitoring_pb2 import Kpi
...@@ -93,6 +93,10 @@ class MonitoringLoops: ...@@ -93,6 +93,10 @@ class MonitoringLoops:
key = (device_uuid, endpoint_uuid, kpi_sample_type) key = (device_uuid, endpoint_uuid, kpi_sample_type)
return self._device_endpoint_sampletype__to__resource_key.get(key) return self._device_endpoint_sampletype__to__resource_key.get(key)
def get_all_resource_keys(self) -> Dict[Tuple[str, str, int], str]:
with self._lock_device_endpoint:
return copy.deepcopy(self._device_endpoint_sampletype__to__resource_key)
def remove_resource_key( def remove_resource_key(
self, device_uuid : str, endpoint_uuid : str, kpi_sample_type : KpiSampleType self, device_uuid : str, endpoint_uuid : str, kpi_sample_type : KpiSampleType
) -> None: ) -> None:
......
...@@ -12,18 +12,27 @@ ...@@ -12,18 +12,27 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from typing import Dict
import grpc, logging, queue, threading import grpc, logging, queue, threading
from common.method_wrappers.ServiceExceptions import ServiceException from common.method_wrappers.ServiceExceptions import ServiceException
from common.proto import monitoring_pb2 from common.proto import monitoring_pb2
from common.proto.context_pb2 import Empty, EventTypeEnum from common.proto.context_pb2 import DeviceOperationalStatusEnum, Empty, EventTypeEnum
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
from monitoring.client.MonitoringClient import MonitoringClient from monitoring.client.MonitoringClient import MonitoringClient
from monitoring.service.MonitoringServiceServicerImpl import LOGGER from monitoring.service.MonitoringServiceServicerImpl import LOGGER
from monitoring.service.NameMapping import NameMapping
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
DEVICE_OP_STATUS_UNDEFINED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED
DEVICE_OP_STATUS_DISABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
DEVICE_OP_STATUS_ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
DEVICE_OP_STATUS_NOT_ENABLED = {DEVICE_OP_STATUS_UNDEFINED, DEVICE_OP_STATUS_DISABLED}
KPISAMPLETYPE_UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN
class EventsDeviceCollector: class EventsDeviceCollector:
def __init__(self) -> None: # pylint: disable=redefined-outer-name def __init__(self, name_mapping : NameMapping) -> None: # pylint: disable=redefined-outer-name
self._events_queue = queue.Queue() self._events_queue = queue.Queue()
self._context_client_grpc = ContextClient() self._context_client_grpc = ContextClient()
...@@ -34,6 +43,9 @@ class EventsDeviceCollector: ...@@ -34,6 +43,9 @@ class EventsDeviceCollector:
self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream,), daemon=False) self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream,), daemon=False)
self._device_to_state : Dict[str, DeviceOperationalStatusEnum] = dict()
self._name_mapping = name_mapping
def grpc_server_on(self): def grpc_server_on(self):
try: try:
grpc.channel_ready_future(self._channel).result(timeout=15) grpc.channel_ready_future(self._channel).result(timeout=15)
...@@ -52,7 +64,7 @@ class EventsDeviceCollector: ...@@ -52,7 +64,7 @@ class EventsDeviceCollector:
def start(self): def start(self):
try: try:
self._device_thread.start() self._device_thread.start()
except RuntimeError as e: except RuntimeError:
LOGGER.exception('Start EventTools exception') LOGGER.exception('Start EventTools exception')
def get_event(self, block : bool = True, timeout : float = 0.1): def get_event(self, block : bool = True, timeout : float = 0.1):
...@@ -71,29 +83,51 @@ class EventsDeviceCollector: ...@@ -71,29 +83,51 @@ class EventsDeviceCollector:
try: try:
event = self.get_event(block=True, timeout=0.5) event = self.get_event(block=True, timeout=0.5)
if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE: event_type = event.event.event_type
device = self._context_client.GetDevice(event.device_id) device_uuid = event.device_id.device_uuid.uuid
for j,end_point in enumerate(device.device_endpoints): if event_type in {EventTypeEnum.EVENTTYPE_REMOVE}:
#for i, value in enumerate(kpi_sample_types_pb2.KpiSampleType.values()): self._device_to_state.pop(device_uuid, None)
for i, value in enumerate(end_point.kpi_sample_types): continue
#if value == kpi_sample_types_pb2.KpiSampleType.KPISAMPLETYPE_UNKNOWN: continue
if event_type not in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
# Unknown event type
continue
device = self._context_client.GetDevice(event.device_id)
self._name_mapping.set_device_name(device_uuid, device.name)
old_operational_status = self._device_to_state.get(device_uuid, DEVICE_OP_STATUS_UNDEFINED)
device_was_not_enabled = (old_operational_status in DEVICE_OP_STATUS_NOT_ENABLED)
kpi_descriptor = monitoring_pb2.KpiDescriptor() new_operational_status = device.device_operational_status
device_is_enabled = (new_operational_status == DEVICE_OP_STATUS_ENABLED)
self._device_to_state[device_uuid] = new_operational_status
kpi_descriptor.kpi_description = device.device_type activate_monitoring = device_was_not_enabled and device_is_enabled
kpi_descriptor.kpi_sample_type = value if not activate_monitoring:
#kpi_descriptor.service_id.service_uuid.uuid = "" # device is not ready for monitoring
kpi_descriptor.device_id.CopyFrom(device.device_id) continue
kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id)
kpi_id = self._monitoring_client.SetKpi(kpi_descriptor) for endpoint in device.device_endpoints:
kpi_id_list.append(kpi_id) endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
self._name_mapping.set_endpoint_name(endpoint_uuid, endpoint.name)
for value in endpoint.kpi_sample_types:
if value == KPISAMPLETYPE_UNKNOWN: continue
kpi_descriptor = monitoring_pb2.KpiDescriptor()
kpi_descriptor.kpi_description = device.device_type
kpi_descriptor.kpi_sample_type = value
kpi_descriptor.device_id.CopyFrom(device.device_id) # pylint: disable=no-member
kpi_descriptor.endpoint_id.CopyFrom(endpoint.endpoint_id) # pylint: disable=no-member
kpi_id = self._monitoring_client.SetKpi(kpi_descriptor)
kpi_id_list.append(kpi_id)
except queue.Empty: except queue.Empty:
break break
return kpi_id_list return kpi_id_list
except ServiceException as e: except ServiceException:
LOGGER.exception('ListenEvents exception') LOGGER.exception('ListenEvents exception')
except Exception as e: # pragma: no cover except Exception: # pragma: no cover # pylint: disable=broad-except
LOGGER.exception('ListenEvents exception') LOGGER.exception('ListenEvents exception')
...@@ -23,14 +23,18 @@ import datetime ...@@ -23,14 +23,18 @@ import datetime
from common.tools.timestamp.Converters import timestamp_float_to_string, timestamp_utcnow_to_float from common.tools.timestamp.Converters import timestamp_float_to_string, timestamp_utcnow_to_float
import psycopg2 import psycopg2
from monitoring.service.NameMapping import NameMapping
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class MetricsDB(): class MetricsDB():
def __init__(self, host, ilp_port=9009, rest_port=9000, table="monitoring", commit_lag_ms=1000, retries=10, def __init__(self, host, name_mapping : NameMapping, ilp_port=9009, rest_port=9000, table="monitoring",
postgre=False, postgre_port=8812, postgre_user='admin', postgre_password='quest'): commit_lag_ms=1000, retries=10, postgre=False, postgre_port=8812, postgre_user='admin',
postgre_password='quest'):
try: try:
self.host = host self.host = host
self.name_mapping = name_mapping
self.ilp_port = int(ilp_port) self.ilp_port = int(ilp_port)
self.rest_port = rest_port self.rest_port = rest_port
self.table = table self.table = table
...@@ -85,7 +89,9 @@ class MetricsDB(): ...@@ -85,7 +89,9 @@ class MetricsDB():
'(kpi_id SYMBOL,' \ '(kpi_id SYMBOL,' \
'kpi_sample_type SYMBOL,' \ 'kpi_sample_type SYMBOL,' \
'device_id SYMBOL,' \ 'device_id SYMBOL,' \
'device_name SYMBOL,' \
'endpoint_id SYMBOL,' \ 'endpoint_id SYMBOL,' \
'endpoint_name SYMBOL,' \
'service_id SYMBOL,' \ 'service_id SYMBOL,' \
'slice_id SYMBOL,' \ 'slice_id SYMBOL,' \
'connection_id SYMBOL,' \ 'connection_id SYMBOL,' \
...@@ -100,6 +106,9 @@ class MetricsDB(): ...@@ -100,6 +106,9 @@ class MetricsDB():
raise Exception raise Exception
def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, slice_id, connection_id, kpi_value): def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, slice_id, connection_id, kpi_value):
device_name = self.name_mapping.get_device_name(device_id) or ''
endpoint_name = self.name_mapping.get_endpoint_name(endpoint_id) or ''
counter = 0 counter = 0
while (counter < self.retries): while (counter < self.retries):
try: try:
...@@ -110,7 +119,9 @@ class MetricsDB(): ...@@ -110,7 +119,9 @@ class MetricsDB():
'kpi_id': kpi_id, 'kpi_id': kpi_id,
'kpi_sample_type': kpi_sample_type, 'kpi_sample_type': kpi_sample_type,
'device_id': device_id, 'device_id': device_id,
'device_name': device_name,
'endpoint_id': endpoint_id, 'endpoint_id': endpoint_id,
'endpoint_name': endpoint_name,
'service_id': service_id, 'service_id': service_id,
'slice_id': slice_id, 'slice_id': slice_id,
'connection_id': connection_id,}, 'connection_id': connection_id,},
......
...@@ -17,12 +17,13 @@ from common.Settings import get_service_port_grpc ...@@ -17,12 +17,13 @@ from common.Settings import get_service_port_grpc
from common.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server from common.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server
from common.tools.service.GenericGrpcService import GenericGrpcService from common.tools.service.GenericGrpcService import GenericGrpcService
from monitoring.service.MonitoringServiceServicerImpl import MonitoringServiceServicerImpl from monitoring.service.MonitoringServiceServicerImpl import MonitoringServiceServicerImpl
from monitoring.service.NameMapping import NameMapping
class MonitoringService(GenericGrpcService): class MonitoringService(GenericGrpcService):
def __init__(self, cls_name: str = __name__) -> None: def __init__(self, name_mapping : NameMapping, cls_name: str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.MONITORING) port = get_service_port_grpc(ServiceNameEnum.MONITORING)
super().__init__(port, cls_name=cls_name) super().__init__(port, cls_name=cls_name)
self.monitoring_servicer = MonitoringServiceServicerImpl() self.monitoring_servicer = MonitoringServiceServicerImpl(name_mapping)
def install_servicers(self): def install_servicers(self):
add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server) add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server)
...@@ -17,8 +17,6 @@ from queue import Queue ...@@ -17,8 +17,6 @@ from queue import Queue
from typing import Iterator from typing import Iterator
from common.Constants import ServiceNameEnum
from common.Settings import get_setting, get_service_port_grpc, get_service_host
from common.logger import getJSONLogger from common.logger import getJSONLogger
from common.proto.context_pb2 import Empty from common.proto.context_pb2 import Empty
from common.proto.device_pb2 import MonitoringSettings from common.proto.device_pb2 import MonitoringSettings
...@@ -36,6 +34,7 @@ from device.client.DeviceClient import DeviceClient ...@@ -36,6 +34,7 @@ from device.client.DeviceClient import DeviceClient
from prometheus_client import Counter, Summary from prometheus_client import Counter, Summary
from monitoring.service.AlarmManager import AlarmManager from monitoring.service.AlarmManager import AlarmManager
from monitoring.service.NameMapping import NameMapping
from monitoring.service.SubscriptionManager import SubscriptionManager from monitoring.service.SubscriptionManager import SubscriptionManager
LOGGER = getJSONLogger('monitoringservice-server') LOGGER = getJSONLogger('monitoringservice-server')
...@@ -50,22 +49,15 @@ METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT") ...@@ -50,22 +49,15 @@ METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT") METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE") METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")
DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=get_service_host(ServiceNameEnum.DEVICE))
DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC',
default=get_service_port_grpc(ServiceNameEnum.DEVICE))
class MonitoringServiceServicerImpl(MonitoringServiceServicer): class MonitoringServiceServicerImpl(MonitoringServiceServicer):
def __init__(self): def __init__(self, name_mapping : NameMapping):
LOGGER.info('Init monitoringService') LOGGER.info('Init monitoringService')
# Init sqlite monitoring db # Init sqlite monitoring db
self.management_db = ManagementDBTools.ManagementDB('monitoring.db') self.management_db = ManagementDBTools.ManagementDB('monitoring.db')
self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, self.deviceClient = DeviceClient()
port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client self.metrics_db = MetricsDBTools.MetricsDB(
METRICSDB_HOSTNAME, name_mapping, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE)
self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT,
METRICSDB_TABLE)
self.subs_manager = SubscriptionManager(self.metrics_db) self.subs_manager = SubscriptionManager(self.metrics_db)
self.alarm_manager = AlarmManager(self.metrics_db) self.alarm_manager = AlarmManager(self.metrics_db)
LOGGER.info('MetricsDB initialized') LOGGER.info('MetricsDB initialized')
......
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
from typing import Dict, Optional
class NameMapping:
def __init__(self) -> None:
self.__lock = threading.Lock()
self.__device_to_name : Dict[str, str] = dict()
self.__endpoint_to_name : Dict[str, str] = dict()
def get_device_name(self, device_uuid : str) -> Optional[str]:
with self.__lock:
return self.__device_to_name.get(device_uuid)
def get_endpoint_name(self, endpoint_uuid : str) -> Optional[str]:
with self.__lock:
return self.__endpoint_to_name.get(endpoint_uuid)
def set_device_name(self, device_uuid : str, device_name : str) -> None:
with self.__lock:
self.__device_to_name[device_uuid] = device_name
def set_endpoint_name(self, endpoint_uuid : str, endpoint_name : str) -> None:
with self.__lock:
self.__endpoint_to_name[endpoint_uuid] = endpoint_name
def delete_device_name(self, device_uuid : str) -> None:
with self.__lock:
self.__device_to_name.pop(device_uuid, None)
def delete_endpoint_name(self, endpoint_uuid : str) -> None:
with self.__lock:
self.__endpoint_to_name.pop(endpoint_uuid, None)
...@@ -21,6 +21,7 @@ from common.Settings import ( ...@@ -21,6 +21,7 @@ from common.Settings import (
from common.proto import monitoring_pb2 from common.proto import monitoring_pb2
from .EventTools import EventsDeviceCollector from .EventTools import EventsDeviceCollector
from .MonitoringService import MonitoringService from .MonitoringService import MonitoringService
from .NameMapping import NameMapping
terminate = threading.Event() terminate = threading.Event()
LOGGER = None LOGGER = None
...@@ -29,10 +30,10 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name ...@@ -29,10 +30,10 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received') LOGGER.warning('Terminate signal received')
terminate.set() terminate.set()
def start_monitoring(): def start_monitoring(name_mapping : NameMapping):
LOGGER.info('Start Monitoring...',) LOGGER.info('Start Monitoring...',)
events_collector = EventsDeviceCollector() events_collector = EventsDeviceCollector(name_mapping)
events_collector.start() events_collector.start()
# TODO: redesign this method to be more clear and clean # TODO: redesign this method to be more clear and clean
...@@ -79,11 +80,13 @@ def main(): ...@@ -79,11 +80,13 @@ def main():
metrics_port = get_metrics_port() metrics_port = get_metrics_port()
start_http_server(metrics_port) start_http_server(metrics_port)
name_mapping = NameMapping()
# Starting monitoring service # Starting monitoring service
grpc_service = MonitoringService() grpc_service = MonitoringService(name_mapping)
grpc_service.start() grpc_service.start()
start_monitoring() start_monitoring(name_mapping)
# Wait for Ctrl+C or termination signal # Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=0.1): pass while not terminate.wait(timeout=0.1): pass
......