diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 0c5ff9325944d1a5a54d941d32d6a45782257970..115b336761dd94902597c3b6e21e7d3dcf225af1 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -48,6 +48,6 @@ include: - local: '/src/kpi_manager/.gitlab-ci.yml' - local: '/src/kpi_value_api/.gitlab-ci.yml' - local: '/src/kpi_value_writer/.gitlab-ci.yml' - + - local: '/src/telemetry/.gitlab-ci.yml' # This should be last one: end-to-end integration tests - local: '/src/tests/.gitlab-ci.yml' diff --git a/deploy/kafka.sh b/deploy/kafka.sh index 21ba89408e6bdadd92a2a96c59d6d24cd580952e..0483bce153b457800c6f7db2ef66685e90118111 100755 --- a/deploy/kafka.sh +++ b/deploy/kafka.sh @@ -79,10 +79,12 @@ function kafka_deploy() { echo "Apache Kafka" echo ">>> Checking if Apache Kafka is deployed ... " if [ "$KFK_REDEPLOY" == "YES" ]; then + echo ">>> Redeploying kafka namespace" kafka_deploy -elif kubectl get --namespace ${KFK_NAMESPACE} deployments.apps &> /dev/null; then - echo ">>> Apache Kafka already present; skipping step." +elif kubectl get namespace "${KFK_NAMESPACE}" &> /dev/null; then + echo ">>> Apache Kafka already present; skipping step." else + echo ">>> Kafka namespace doesn't exists. Deploying kafka namespace" kafka_deploy fi echo diff --git a/deploy/tfs.sh b/deploy/tfs.sh index 4ecfaae9972ec136b2b7e74c6071392ba288f0d3..e7201441815c7cc08c46cce3714f33f43401c2eb 100755 --- a/deploy/tfs.sh +++ b/deploy/tfs.sh @@ -170,7 +170,19 @@ kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --t --from-literal=CRDB_SSLMODE=require printf "\n" -echo "Create secret with Apache Kafka kfk-kpi-data for KPI and Telemetry microservices" +echo "Create secret with CockroachDB data for Telemetry microservices" +CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') +CRDB_DATABASE_TELEMETRY="tfs_telemetry" # TODO: change by specific configurable environment variable +kubectl create secret generic crdb-telemetry --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ + --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ + --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ + --from-literal=CRDB_DATABASE=${CRDB_DATABASE_TELEMETRY} \ + --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ + --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ + --from-literal=CRDB_SSLMODE=require +printf "\n" + +echo "Create secret with Apache Kafka data for KPI and Telemetry microservices" KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}') kubectl create secret generic kfk-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \ @@ -252,15 +264,17 @@ for COMPONENT in $TFS_COMPONENTS; do if [ "$COMPONENT" == "ztp" ] || [ "$COMPONENT" == "policy" ]; then $DOCKER_BUILD -t "$COMPONENT:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/Dockerfile ./src/"$COMPONENT"/ > "$BUILD_LOG" - elif [ "$COMPONENT" == "pathcomp" ]; then + elif [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then BUILD_LOG="$TMP_LOGS_FOLDER/build_${COMPONENT}-frontend.log" $DOCKER_BUILD -t "$COMPONENT-frontend:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/frontend/Dockerfile . > "$BUILD_LOG" BUILD_LOG="$TMP_LOGS_FOLDER/build_${COMPONENT}-backend.log" $DOCKER_BUILD -t "$COMPONENT-backend:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/backend/Dockerfile . > "$BUILD_LOG" - # next command is redundant, but helpful to keep cache updated between rebuilds - IMAGE_NAME="$COMPONENT-backend:$TFS_IMAGE_TAG-builder" - $DOCKER_BUILD -t "$IMAGE_NAME" --target builder -f ./src/"$COMPONENT"/backend/Dockerfile . >> "$BUILD_LOG" + if [ "$COMPONENT" == "pathcomp" ]; then + # next command is redundant, but helpful to keep cache updated between rebuilds + IMAGE_NAME="$COMPONENT-backend:$TFS_IMAGE_TAG-builder" + $DOCKER_BUILD -t "$IMAGE_NAME" --target builder -f ./src/"$COMPONENT"/backend/Dockerfile . >> "$BUILD_LOG" + fi elif [ "$COMPONENT" == "dlt" ]; then BUILD_LOG="$TMP_LOGS_FOLDER/build_${COMPONENT}-connector.log" $DOCKER_BUILD -t "$COMPONENT-connector:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/connector/Dockerfile . > "$BUILD_LOG" @@ -273,7 +287,7 @@ for COMPONENT in $TFS_COMPONENTS; do echo " Pushing Docker image to '$TFS_REGISTRY_IMAGES'..." - if [ "$COMPONENT" == "pathcomp" ]; then + if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT-frontend:$TFS_IMAGE_TAG" | sed 's,//,/,g' | sed 's,http:/,,g') TAG_LOG="$TMP_LOGS_FOLDER/tag_${COMPONENT}-frontend.log" @@ -324,7 +338,7 @@ for COMPONENT in $TFS_COMPONENTS; do cp ./manifests/"${COMPONENT}"service.yaml "$MANIFEST" fi - if [ "$COMPONENT" == "pathcomp" ]; then + if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT-frontend:$TFS_IMAGE_TAG" | sed 's,//,/,g' | sed 's,http:/,,g') VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}-frontend:" "$MANIFEST" | cut -d ":" -f4) sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT-frontend:${VERSION}#image: $IMAGE_URL#g" "$MANIFEST" diff --git a/manifests/telemetryservice.yaml b/manifests/telemetryservice.yaml new file mode 100644 index 0000000000000000000000000000000000000000..2f9917499a425b95d436ffa8cdb311d29483d2ca --- /dev/null +++ b/manifests/telemetryservice.yaml @@ -0,0 +1,128 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: telemetryservice +spec: + selector: + matchLabels: + app: telemetryservice + #replicas: 1 + template: + metadata: + labels: + app: telemetryservice + spec: + terminationGracePeriodSeconds: 5 + containers: + - name: frontend + image: labs.etsi.org:5050/tfs/controller/telemetry-frontend:latest + imagePullPolicy: Always + ports: + - containerPort: 30050 + - containerPort: 9192 + env: + - name: LOG_LEVEL + value: "INFO" + envFrom: + - secretRef: + name: crdb-telemetry + - secretRef: + name: kfk-kpi-data + readinessProbe: + exec: + command: ["/bin/grpc_health_probe", "-addr=:30050"] + livenessProbe: + exec: + command: ["/bin/grpc_health_probe", "-addr=:30050"] + resources: + requests: + cpu: 250m + memory: 128Mi + limits: + cpu: 1000m + memory: 1024Mi + - name: backend + image: labs.etsi.org:5050/tfs/controller/telemetry-backend:latest + imagePullPolicy: Always + ports: + - containerPort: 30060 + - containerPort: 9192 + env: + - name: LOG_LEVEL + value: "INFO" + envFrom: + - secretRef: + name: kfk-kpi-data + readinessProbe: + exec: + command: ["/bin/grpc_health_probe", "-addr=:30060"] + livenessProbe: + exec: + command: ["/bin/grpc_health_probe", "-addr=:30060"] + resources: + requests: + cpu: 250m + memory: 128Mi + limits: + cpu: 1000m + memory: 1024Mi +--- +apiVersion: v1 +kind: Service +metadata: + name: telemetryservice + labels: + app: telemetryservice +spec: + type: ClusterIP + selector: + app: telemetryservice + ports: + - name: frontend-grpc + protocol: TCP + port: 30050 + targetPort: 30050 + - name: backend-grpc + protocol: TCP + port: 30060 + targetPort: 30060 + - name: metrics + protocol: TCP + port: 9192 + targetPort: 9192 +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: telemetryservice-hpa +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: telemetryservice + minReplicas: 1 + maxReplicas: 20 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 80 + #behavior: + # scaleDown: + # stabilizationWindowSeconds: 30 diff --git a/proto/telemetry_frontend.proto b/proto/telemetry_frontend.proto index dbc1e8bf688f9f2df341484c1929e2338c458bbf..614d10cf06cdbb1ff4fba6e51a39286eb5132688 100644 --- a/proto/telemetry_frontend.proto +++ b/proto/telemetry_frontend.proto @@ -19,9 +19,9 @@ import "context.proto"; import "kpi_manager.proto"; service TelemetryFrontendService { - rpc StartCollector (Collector ) returns (CollectorId ) {} - rpc StopCollector (CollectorId ) returns (context.Empty) {} - rpc SelectCollectors(CollectorFilter) returns (CollectorList) {} + rpc StartCollector (Collector ) returns (CollectorId ) {} + rpc StopCollector (CollectorId ) returns (context.Empty) {} + rpc SelectCollectors (CollectorFilter) returns (CollectorList) {} } message CollectorId { @@ -29,10 +29,12 @@ message CollectorId { } message Collector { - CollectorId collector_id = 1; // The Collector ID - kpi_manager.KpiId kpi_id = 2; // The KPI Id to be associated to the collected samples - float duration_s = 3; // Terminate data collection after duration[seconds]; duration==0 means indefinitely - float interval_s = 4; // Interval between collected samples + CollectorId collector_id = 1; // The Collector ID + kpi_manager.KpiId kpi_id = 2; // The KPI Id to be associated to the collected samples + float duration_s = 3; // Terminate data collection after duration[seconds]; duration==0 means indefinitely + float interval_s = 4; // Interval between collected samples + context.Timestamp start_time = 5; // Timestamp when Collector start execution + context.Timestamp end_time = 6; // Timestamp when Collector stop execution } message CollectorFilter { diff --git a/scripts/run_tests_locally-kpi-manager.sh b/scripts/run_tests_locally-kpi-manager.sh index a6a24f90db93d56300ac997bd00675c479ef13ae..8a4ce8d95c74657451147078a1d93e891dfc2ac8 100755 --- a/scripts/run_tests_locally-kpi-manager.sh +++ b/scripts/run_tests_locally-kpi-manager.sh @@ -24,7 +24,7 @@ cd $PROJECTDIR/src # python3 kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc -CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace ${CRDB_NAMESPACE} -o 'jsonpath={.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ kpi_manager/tests/test_kpi_manager.py diff --git a/scripts/run_tests_locally-kpi-value-API.sh b/scripts/run_tests_locally-kpi-value-API.sh index 8dfbfb16237634519dcae2fcc34f850a5188c1e7..3953d2a89c6fbe2bd3546e648246b9b018e5fdb0 100755 --- a/scripts/run_tests_locally-kpi-value-API.sh +++ b/scripts/run_tests_locally-kpi-value-API.sh @@ -19,7 +19,8 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc - +KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") +KFK_SERVER_ADDRESS=${KAFKA_IP}:9092 # helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0 python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG -o log_cli=true --verbose \ kpi_value_api/tests/test_kpi_value_api.py diff --git a/scripts/run_tests_locally-telemetry-DB.sh b/scripts/run_tests_locally-telemetry-DB.sh index bb1c48b76440c00b398875a8f704c2a82ba4ab50..4b9a417603cc42a4e7e8b19c7394cc38633817fa 100755 --- a/scripts/run_tests_locally-telemetry-DB.sh +++ b/scripts/run_tests_locally-telemetry-DB.sh @@ -22,5 +22,5 @@ cd $PROJECTDIR/src # kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc -python3 -m pytest --log-cli-level=INFO --verbose \ - telemetry/database/tests/telemetryDBtests.py +python3 -m pytest --log-level=DEBUG --log-cli-level=debug --verbose \ + telemetry/tests/test_telemetryDB.py diff --git a/scripts/run_tests_locally-telemetry-frontend.sh b/scripts/run_tests_locally-telemetry-frontend.sh index 7652ccb583268285dcd2fcf3090b717dc18e4fc3..a2a1de52340cac527d4d1c446c76740d38ce7783 100755 --- a/scripts/run_tests_locally-telemetry-frontend.sh +++ b/scripts/run_tests_locally-telemetry-frontend.sh @@ -24,5 +24,5 @@ cd $PROJECTDIR/src # python3 kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc -python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \ +python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ telemetry/frontend/tests/test_frontend.py diff --git a/src/common/Constants.py b/src/common/Constants.py index 767b21343f89e35c2338b522bcdc71c56aca1815..4b2bced95fca28abdfd729492acc1117cdf3e8d9 100644 --- a/src/common/Constants.py +++ b/src/common/Constants.py @@ -65,6 +65,7 @@ class ServiceNameEnum(Enum): KPIVALUEAPI = 'kpi-value-api' KPIVALUEWRITER = 'kpi-value-writer' TELEMETRYFRONTEND = 'telemetry-frontend' + TELEMETRYBACKEND = 'telemetry-backend' # Used for test and debugging only DLT_GATEWAY = 'dltgateway' @@ -98,6 +99,7 @@ DEFAULT_SERVICE_GRPC_PORTS = { ServiceNameEnum.KPIVALUEAPI .value : 30020, ServiceNameEnum.KPIVALUEWRITER .value : 30030, ServiceNameEnum.TELEMETRYFRONTEND .value : 30050, + ServiceNameEnum.TELEMETRYBACKEND .value : 30060, # Used for test and debugging only ServiceNameEnum.DLT_GATEWAY .value : 50051, diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index 168957a26c4f1947d65d7c849c25ed7e9dad06be..5ada88a1ea0a7eae31eda741d81757fa624521de 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -20,16 +20,25 @@ from common.Settings import get_setting LOGGER = logging.getLogger(__name__) +KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}' class KafkaConfig(Enum): - KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}' - KFK_NAMESPACE = 'kafka' - # KFK_NAMESPACE = get_setting('KFK_NAMESPACE') - KFK_PORT = '9092' - # KFK_PORT = get_setting('KFK_SERVER_PORT') - # SERVER_ADDRESS = "127.0.0.1:9092" - SERVER_ADDRESS = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) - ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS }) + + @staticmethod + def get_kafka_address() -> str: + kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None) + if kafka_server_address is None: + KFK_NAMESPACE = get_setting('KFK_NAMESPACE') + KFK_PORT = get_setting('KFK_SERVER_PORT') + kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) + return kafka_server_address + + @staticmethod + def get_admin_client(): + SERVER_ADDRESS = KafkaConfig.get_kafka_address() + ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS }) + return ADMIN_CLIENT + class KafkaTopic(Enum): REQUEST = 'topic_request' @@ -44,9 +53,9 @@ class KafkaTopic(Enum): Method to create Kafka topics defined as class members """ all_topics = [member.value for member in KafkaTopic] - LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.SERVER_ADDRESS.value)) + LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.get_kafka_address())) if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): - LOGGER.debug("All topics are created sucsessfully") + LOGGER.debug("All topics are created sucsessfully or Already Exists") return True else: LOGGER.debug("Error creating all topics") @@ -62,14 +71,14 @@ class KafkaTopic(Enum): LOGGER.debug("Topics names to be verified and created: {:}".format(new_topics)) for topic in new_topics: try: - topic_metadata = KafkaConfig.ADMIN_CLIENT.value.list_topics(timeout=5) + topic_metadata = KafkaConfig.get_admin_client().list_topics(timeout=5) # LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics)) if topic not in topic_metadata.topics: # If the topic does not exist, create a new topic print("Topic {:} does not exist. Creating...".format(topic)) LOGGER.debug("Topic {:} does not exist. Creating...".format(topic)) new_topic = NewTopic(topic, num_partitions=1, replication_factor=1) - KafkaConfig.ADMIN_CLIENT.value.create_topics([new_topic]) + KafkaConfig.get_admin_client().create_topics([new_topic]) else: print("Topic name already exists: {:}".format(topic)) LOGGER.debug("Topic name already exists: {:}".format(topic)) diff --git a/src/kpi_value_api/.gitlab-ci.yml b/src/kpi_value_api/.gitlab-ci.yml index 166e9d3cbcf3eb09c914384a9906853dddd7bfb5..1a6f821ba9e798bb4220d914109ab3a65f0f1792 100644 --- a/src/kpi_value_api/.gitlab-ci.yml +++ b/src/kpi_value_api/.gitlab-ci.yml @@ -50,10 +50,30 @@ unit_test kpi-value-api: - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi - if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME container is not in the system"; fi + - if docker container ls | grep kafka; then docker rm -f kafka; else echo "Kafka container is not in the system"; fi + - if docker container ls | grep zookeeper; then docker rm -f zookeeper; else echo "Zookeeper container is not in the system"; fi - docker container prune -f script: - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" - - docker run --name $IMAGE_NAME -d -p 30020:30020 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG + - docker pull "bitnami/zookeeper:latest" + - docker pull "bitnami/kafka:latest" + - > + docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181 + bitnami/zookeeper:latest + - sleep 10 # Wait for Zookeeper to start + - docker run --name kafka -d --network=teraflowbridge -p 9092:9092 + --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + --env ALLOW_PLAINTEXT_LISTENER=yes + bitnami/kafka:latest + - sleep 20 # Wait for Kafka to start + - KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") + - echo $KAFKA_IP + - > + docker run --name $IMAGE_NAME -d -p 30020:30020 + --env "KFK_SERVER_ADDRESS=${KAFKA_IP}:9092" + --volume "$PWD/src/$IMAGE_NAME/tests:/opt/results" + --network=teraflowbridge + $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG - sleep 5 - docker ps -a - docker logs $IMAGE_NAME @@ -74,7 +94,7 @@ unit_test kpi-value-api: - src/$IMAGE_NAME/**/*.{py,in,yml} - src/$IMAGE_NAME/Dockerfile - src/$IMAGE_NAME/tests/*.py - - src/$IMAGE_NAME/tests/Dockerfile + # - src/$IMAGE_NAME/tests/Dockerfile # mayne not needed - manifests/${IMAGE_NAME}service.yaml - .gitlab-ci.yml artifacts: diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 5e7c3d139a9fd041d5d9fc6dd7032cb54bda17c4..4ea978fafc8d7454d41f64182d553d030215113a 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, grpc -from typing import Tuple, Any +import logging, grpc, json +from typing import Dict from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.tools.kafka.Variables import KafkaConfig, KafkaTopic @@ -37,32 +37,42 @@ PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TO class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): def __init__(self): LOGGER.debug('Init KpiValueApiService') - + self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext ) -> Empty: LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) - producer_obj = KafkaProducer({ - 'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value - }) + + producer = self.kafka_producer for kpi_value in request.kpi_value_list: - kpi_value_to_produce : Tuple [str, Any, Any] = ( - kpi_value.kpi_id.kpi_id, - kpi_value.timestamp, - kpi_value.kpi_value_type # kpi_value.kpi_value_type.(many options) how? - ) + kpi_value_to_produce : Dict = { + "kpi_uuid" : kpi_value.kpi_id.kpi_id.uuid, + "timestamp" : kpi_value.timestamp.timestamp, + "kpi_value_type" : self.ExtractKpiValueByType(kpi_value.kpi_value_type) + } LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce)) msg_key = "gRPC-kpivalueapi" # str(__class__.__name__) can be used - producer_obj.produce( + producer.produce( KafkaTopic.VALUE.value, key = msg_key, - value = kpi_value.SerializeToString(), # value = json.dumps(kpi_value_to_produce), + value = json.dumps(kpi_value_to_produce), callback = self.delivery_callback ) - producer_obj.flush() + producer.flush() return Empty() + def ExtractKpiValueByType(self, value): + attributes = [ 'floatVal' , 'int32Val' , 'uint32Val','int64Val', + 'uint64Val', 'stringVal', 'boolVal'] + for attr in attributes: + try: + return getattr(value, attr) + except (ValueError, TypeError, AttributeError): + continue + return None + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectKpiValues(self, request: KpiValueFilter, grpc_context: grpc.ServicerContext ) -> KpiValueList: @@ -130,13 +140,13 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): try: int_value = int(value) return KpiValueType(int64Val=int_value) - except ValueError: + except (ValueError, TypeError): pass # Check if the value is a float try: float_value = float(value) return KpiValueType(floatVal=float_value) - except ValueError: + except (ValueError, TypeError): pass # Check if the value is a boolean if value.lower() in ['true', 'false']: diff --git a/src/kpi_value_writer/.gitlab-ci.yml b/src/kpi_value_writer/.gitlab-ci.yml index 25619ce7f8b4346172587dbf2e804896aff20e4d..9a2f9fd47e435b26e2e3a335bd9b95da58a0517f 100644 --- a/src/kpi_value_writer/.gitlab-ci.yml +++ b/src/kpi_value_writer/.gitlab-ci.yml @@ -50,10 +50,30 @@ unit_test kpi-value-writer: - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi - if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME container is not in the system"; fi + - if docker container ls | grep kafka; then docker rm -f kafka; else echo "Kafka container is not in the system"; fi + - if docker container ls | grep zookeeper; then docker rm -f zookeeper; else echo "Zookeeper container is not in the system"; fi - docker container prune -f script: - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" - - docker run --name $IMAGE_NAME -d -p 30030:30030 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG + - docker pull "bitnami/zookeeper:latest" + - docker pull "bitnami/kafka:latest" + - > + docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181 + bitnami/zookeeper:latest + - sleep 10 # Wait for Zookeeper to start + - docker run --name kafka -d --network=teraflowbridge -p 9092:9092 + --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + --env ALLOW_PLAINTEXT_LISTENER=yes + bitnami/kafka:latest + - sleep 20 # Wait for Kafka to start + - KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") + - echo $KAFKA_IP + - > + docker run --name $IMAGE_NAME -d -p 30030:30030 + --env "KFK_SERVER_ADDRESS=${KAFKA_IP}:9092" + --volume "$PWD/src/$IMAGE_NAME/tests:/opt/results" + --network=teraflowbridge + $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG - sleep 5 - docker ps -a - docker logs $IMAGE_NAME @@ -64,6 +84,8 @@ unit_test kpi-value-writer: coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' after_script: - docker rm -f $IMAGE_NAME + - docker rm -f zookeeper + - docker rm -f kafka - docker network rm teraflowbridge rules: - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index 5e2b6babe210e4fb71eb8617432a6dfd5d164407..8b258a1424cc44be4dcb9134ee913c707cc44bfa 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import logging import threading from common.tools.kafka.Variables import KafkaConfig, KafkaTopic @@ -38,28 +39,25 @@ class KpiValueWriter(GenericGrpcService): def __init__(self, cls_name : str = __name__) -> None: port = get_service_port_grpc(ServiceNameEnum.KPIVALUEWRITER) super().__init__(port, cls_name=cls_name) + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'KpiValueWriter', + 'auto.offset.reset' : 'latest'}) - @staticmethod - def RunKafkaConsumer(): - thread = threading.Thread(target=KpiValueWriter.KafkaConsumer, args=()) + def RunKafkaConsumer(self): + thread = threading.Thread(target=self.KafkaKpiConsumer, args=()) ACTIVE_CONSUMERS.append(thread) thread.start() - @staticmethod - def KafkaConsumer(): + def KafkaKpiConsumer(self): kpi_manager_client = KpiManagerClient() metric_writer = MetricWriterToPrometheus() - kafka_consumer = KafkaConsumer( - { 'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value, - 'group.id' : __class__, - 'auto.offset.reset' : 'latest'} - ) - kafka_consumer.subscribe([KafkaTopic.VALUE.value]) + consumer = self.kafka_consumer + consumer.subscribe([KafkaTopic.VALUE.value]) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) while True: - raw_kpi = kafka_consumer.poll(1.0) + raw_kpi = consumer.poll(1.0) if raw_kpi is None: continue elif raw_kpi.error(): @@ -69,24 +67,21 @@ class KpiValueWriter(GenericGrpcService): print("Consumer error: {}".format(raw_kpi.error())) continue try: - kpi_value = KpiValue() - kpi_value.ParseFromString(raw_kpi.value()) + kpi_value = json.loads(raw_kpi.value().decode('utf-8')) LOGGER.info("Received KPI : {:}".format(kpi_value)) print("Received KPI : {:}".format(kpi_value)) - KpiValueWriter.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer) + self.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer) except Exception as e: print("Error detail: {:}".format(e)) continue - @staticmethod - def get_kpi_descriptor(kpi_value: str, kpi_manager_client, metric_writer): + def get_kpi_descriptor(self, kpi_value: str, kpi_manager_client, metric_writer): print("--- START -----") kpi_id = KpiId() - kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid + kpi_id.kpi_id.uuid = kpi_value['kpi_uuid'] print("KpiId generated: {:}".format(kpi_id)) # print("Kpi manger client created: {:}".format(kpi_manager_client)) - try: kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index f1d07978303dd8ac635978fe4b3bc286a746ce88..85e618a4b5b330cb83cf255652e7be8dff2dabd3 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -14,11 +14,9 @@ # read Kafka stream from Kafka topic -import ast -import time -import threading import logging -from prometheus_client import start_http_server, Gauge, CollectorRegistry +from typing import Dict +from prometheus_client import Gauge from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_value_api_pb2 import KpiValue @@ -26,7 +24,6 @@ from common.proto.kpi_manager_pb2 import KpiDescriptor LOGGER = logging.getLogger(__name__) PROM_METRICS = {} -PROM_REGISTERY = CollectorRegistry() class MetricWriterToPrometheus: ''' @@ -34,13 +31,7 @@ class MetricWriterToPrometheus: cooked KPI value = KpiDescriptor (gRPC message) + KpiValue (gRPC message) ''' def __init__(self): - # prometheus server address and configs - self.start_prometheus_client() pass - - def start_prometheus_client(self): - start_http_server(10808, registry=PROM_REGISTERY) - LOGGER.debug("Prometheus client is started on port 10808") def merge_kpi_descriptor_and_kpi_value(self, kpi_descriptor, kpi_value): # Creating a dictionary from the kpi_descriptor's attributes @@ -54,13 +45,13 @@ class MetricWriterToPrometheus: 'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid, 'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid, 'link_id' : kpi_descriptor.link_id.link_uuid.uuid, - 'time_stamp' : kpi_value.timestamp.timestamp, - 'kpi_value' : kpi_value.kpi_value_type.floatVal + 'time_stamp' : kpi_value['timestamp'], + 'kpi_value' : kpi_value['kpi_value_type'] } # LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi)) return cooked_kpi - def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue): + def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: Dict): # merge both gRPC messages into single varible. cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} @@ -71,8 +62,7 @@ class MetricWriterToPrometheus: PROM_METRICS[metric_name] = Gauge ( metric_name, cooked_kpi['kpi_description'], - metric_tags, - registry=PROM_REGISTERY + metric_tags ) LOGGER.debug("Metric is created with labels: {:}".format(metric_tags)) PROM_METRICS[metric_name].labels( @@ -84,7 +74,7 @@ class MetricWriterToPrometheus: connection_id = cooked_kpi['connection_id'], link_id = cooked_kpi['link_id'], time_stamp = cooked_kpi['time_stamp'], - ).set(float(cooked_kpi['kpi_value'])) + ).set(cooked_kpi['kpi_value']) LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name])) except ValueError as e: diff --git a/src/kpi_value_writer/service/__main__.py b/src/kpi_value_writer/service/__main__.py index aa67540fb899781297d1235dc2e15bcbb2c38585..be9f8f29bfdb2397eedd0ce2821c5da8f778cfc4 100644 --- a/src/kpi_value_writer/service/__main__.py +++ b/src/kpi_value_writer/service/__main__.py @@ -13,6 +13,7 @@ # limitations under the License. import logging, signal, sys, threading +from prometheus_client import start_http_server from kpi_value_writer.service.KpiValueWriter import KpiValueWriter from common.Settings import get_log_level @@ -38,6 +39,8 @@ def main(): grpc_service = KpiValueWriter() grpc_service.start() + start_http_server(10808) + LOGGER.debug("Prometheus client is started on port 10808") # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1.0): pass diff --git a/src/kpi_value_writer/tests/test_kpi_value_writer.py b/src/kpi_value_writer/tests/test_kpi_value_writer.py index fce043d7fd6c9b5cbb9374d0b059cb1e2fa65a24..b784fae5da713f9bd7cd7a1668f48b080f7a84fa 100755 --- a/src/kpi_value_writer/tests/test_kpi_value_writer.py +++ b/src/kpi_value_writer/tests/test_kpi_value_writer.py @@ -29,4 +29,5 @@ def test_validate_kafka_topics(): def test_KafkaConsumer(): LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ") - KpiValueWriter.RunKafkaConsumer() + kpi_value_writer = KpiValueWriter() + kpi_value_writer.RunKafkaConsumer() diff --git a/src/telemetry/.gitlab-ci.yml b/src/telemetry/.gitlab-ci.yml new file mode 100644 index 0000000000000000000000000000000000000000..110a6490d20558c6589550be45b6432e500ba9d6 --- /dev/null +++ b/src/telemetry/.gitlab-ci.yml @@ -0,0 +1,203 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Build, tag, and push the Docker image to the GitLab Docker registry +build telemetry: + variables: + IMAGE_NAME: 'telemetry' # name of the microservice + IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) + stage: build + before_script: + - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY + script: + # This first build tags the builder resulting image to prevent being removed by dangling image removal command + # - docker buildx build -t "${IMAGE_NAME}-backend:${IMAGE_TAG}-builder" --target builder -f ./src/$IMAGE_NAME/backend/Dockerfile . + - docker buildx build -t "${IMAGE_NAME}-frontend:$IMAGE_TAG" -f ./src/$IMAGE_NAME/frontend/Dockerfile . + - docker buildx build -t "${IMAGE_NAME}-backend:$IMAGE_TAG" -f ./src/$IMAGE_NAME/backend/Dockerfile . + - docker tag "${IMAGE_NAME}-frontend:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG" + - docker tag "${IMAGE_NAME}-backend:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG" + - docker push "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG" + - docker push "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG" + after_script: + - docker images --filter="dangling=true" --quiet | xargs -r docker rmi + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' + - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"' + - changes: + - src/common/**/*.py + - proto/*.proto + - src/$IMAGE_NAME/.gitlab-ci.yml + - src/$IMAGE_NAME/frontend/**/*.{py,in,yml} + - src/$IMAGE_NAME/frontend/Dockerfile + - src/$IMAGE_NAME/frontend/tests/*.py + - src/$IMAGE_NAME/backend/Dockerfile + - src/$IMAGE_NAME/backend/**/*.{py,in,yml} + - src/$IMAGE_NAME/backend/tests/*.py + - manifests/${IMAGE_NAME}service.yaml + - .gitlab-ci.yml + +# Apply unit test to the component +unit_test telemetry-backend: + variables: + IMAGE_NAME: 'telemetry' # name of the microservice + IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) + stage: unit_test + needs: + - build telemetry + before_script: + - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY + - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi + - if docker container ls | grep kafka; then docker rm -f kafka; else echo "Kafka container is not in the system"; fi + - if docker container ls | grep zookeeper; then docker rm -f zookeeper; else echo "Zookeeper container is not in the system"; fi + # - if docker container ls | grep ${IMAGE_NAME}-frontend; then docker rm -f ${IMAGE_NAME}-frontend; else echo "${IMAGE_NAME}-frontend container is not in the system"; fi + - if docker container ls | grep ${IMAGE_NAME}-backend; then docker rm -f ${IMAGE_NAME}-backend; else echo "${IMAGE_NAME}-backend container is not in the system"; fi + - docker container prune -f + script: + - docker pull "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG" + - docker pull "bitnami/zookeeper:latest" + - docker pull "bitnami/kafka:latest" + - > + docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181 + bitnami/zookeeper:latest + - sleep 10 # Wait for Zookeeper to start + - docker run --name kafka -d --network=teraflowbridge -p 9092:9092 + --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + --env ALLOW_PLAINTEXT_LISTENER=yes + bitnami/kafka:latest + - sleep 20 # Wait for Kafka to start + - KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") + - echo $KAFKA_IP + - > + docker run --name $IMAGE_NAME-backend -d -p 30060:30060 + --env "KFK_SERVER_ADDRESS=${KAFKA_IP}:9092" + --volume "$PWD/src/$IMAGE_NAME/backend/tests:/opt/results" + --network=teraflowbridge + $CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG + - docker ps -a + - sleep 5 + - docker logs ${IMAGE_NAME}-backend + - > + docker exec -i ${IMAGE_NAME}-backend bash -c + "coverage run -m pytest --log-level=INFO --verbose --junitxml=/opt/results/${IMAGE_NAME}-backend_report.xml $IMAGE_NAME/backend/tests/test_*.py" + - docker exec -i ${IMAGE_NAME}-backend bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing" + coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' + after_script: + - docker network rm teraflowbridge + - docker volume prune --force + - docker image prune --force + - docker rm -f ${IMAGE_NAME}-backend + - docker rm -f zookeeper + - docker rm -f kafka + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' + - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"' + - changes: + - src/common/**/*.py + - proto/*.proto + - src/$IMAGE_NAME/backend/**/*.{py,in,yml} + - src/$IMAGE_NAME/backend/Dockerfile + - src/$IMAGE_NAME/backend/tests/*.py + - manifests/${IMAGE_NAME}service.yaml + - .gitlab-ci.yml + artifacts: + when: always + reports: + junit: src/$IMAGE_NAME/backend/tests/${IMAGE_NAME}-backend_report.xml + +# Apply unit test to the component +unit_test telemetry-frontend: + variables: + IMAGE_NAME: 'telemetry' # name of the microservice + IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) + stage: unit_test + needs: + - build telemetry + before_script: + - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY + - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi + - if docker container ls | grep crdb; then docker rm -f crdb; else echo "CockroachDB container is not in the system"; fi + - if docker volume ls | grep crdb; then docker volume rm -f crdb; else echo "CockroachDB volume is not in the system"; fi + - if docker container ls | grep kafka; then docker rm -f kafka; else echo "Kafka container is not in the system"; fi + - if docker container ls | grep zookeeper; then docker rm -f zookeeper; else echo "Zookeeper container is not in the system"; fi + - if docker container ls | grep ${IMAGE_NAME}-frontend; then docker rm -f ${IMAGE_NAME}-frontend; else echo "${IMAGE_NAME}-frontend container is not in the system"; fi + - docker container prune -f + script: + - docker pull "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG" + - docker pull "bitnami/zookeeper:latest" + - docker pull "bitnami/kafka:latest" + - docker pull "cockroachdb/cockroach:latest-v22.2" + - docker volume create crdb + - > + docker run --name crdb -d --network=teraflowbridge -p 26257:26257 -p 8080:8080 + --env COCKROACH_DATABASE=tfs_test --env COCKROACH_USER=tfs --env COCKROACH_PASSWORD=tfs123 + --volume "crdb:/cockroach/cockroach-data" + cockroachdb/cockroach:latest-v22.2 start-single-node + - echo "Waiting for initialization..." + - while ! docker logs crdb 2>&1 | grep -q 'finished creating default user \"tfs\"'; do sleep 1; done + # - docker logs crdb + # - docker ps -a + - CRDB_ADDRESS=$(docker inspect crdb --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") + - echo $CRDB_ADDRESS + - > + docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181 \ + -e ALLOW_ANONYMOUS_LOGIN=yes \ + bitnami/zookeeper:latest + - sleep 10 # Wait for Zookeeper to start + - docker run --name kafka -d --network=teraflowbridge -p 9092:9092 + --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + --env ALLOW_PLAINTEXT_LISTENER=yes + bitnami/kafka:latest + - sleep 20 # Wait for Kafka to start + - KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") + - echo $KAFKA_IP + # - docker logs zookeeper + # - docker logs kafka + - > + docker run --name $IMAGE_NAME-frontend -d -p 30050:30050 + --env "CRDB_URI=cockroachdb://tfs:tfs123@${CRDB_ADDRESS}:26257/tfs_test?sslmode=require" + --env "KFK_SERVER_ADDRESS=${KAFKA_IP}:9092" + --volume "$PWD/src/$IMAGE_NAME/frontend/tests:/opt/results" + --network=teraflowbridge + $CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG + - docker ps -a + - sleep 5 + - docker logs ${IMAGE_NAME}-frontend + - > + docker exec -i ${IMAGE_NAME}-frontend bash -c + "coverage run -m pytest --log-level=INFO --verbose --junitxml=/opt/results/${IMAGE_NAME}-frontend_report.xml $IMAGE_NAME/frontend/tests/test_*.py" + - docker exec -i ${IMAGE_NAME}-frontend bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing" + coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' + after_script: + - docker volume rm -f crdb + - docker network rm teraflowbridge + - docker volume prune --force + - docker image prune --force + - docker rm -f ${IMAGE_NAME}-frontend + - docker rm -f zookeeper + - docker rm -f kafka + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' + - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"' + - changes: + - src/common/**/*.py + - proto/*.proto + - src/$IMAGE_NAME/frontend/**/*.{py,in,yml} + - src/$IMAGE_NAME/frontend/Dockerfile + - src/$IMAGE_NAME/frontend/tests/*.py + - manifests/${IMAGE_NAME}service.yaml + - .gitlab-ci.yml + artifacts: + when: always + reports: + junit: src/$IMAGE_NAME/frontend/tests/${IMAGE_NAME}-frontend_report.xml \ No newline at end of file diff --git a/src/telemetry/backend/Dockerfile b/src/telemetry/backend/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..eebfe24ab3ca457b9d05b02a07f4b28d6f196987 --- /dev/null +++ b/src/telemetry/backend/Dockerfile @@ -0,0 +1,69 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.9-slim + +# Install dependencies +RUN apt-get --yes --quiet --quiet update && \ + apt-get --yes --quiet --quiet install wget g++ git && \ + rm -rf /var/lib/apt/lists/* + +# Set Python to show logs as they occur +ENV PYTHONUNBUFFERED=0 + +# Download the gRPC health probe +RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \ + wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ + chmod +x /bin/grpc_health_probe + +# Get generic Python packages +RUN python3 -m pip install --upgrade pip +RUN python3 -m pip install --upgrade setuptools wheel +RUN python3 -m pip install --upgrade pip-tools + +# Get common Python packages +# Note: this step enables sharing the previous Docker build steps among all the Python components +WORKDIR /var/teraflow +COPY common_requirements.in common_requirements.in +RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in +RUN python3 -m pip install -r common_requirements.txt + +# Add common files into working directory +WORKDIR /var/teraflow/common +COPY src/common/. ./ +RUN rm -rf proto + +# Create proto sub-folder, copy .proto files, and generate Python code +RUN mkdir -p /var/teraflow/common/proto +WORKDIR /var/teraflow/common/proto +RUN touch __init__.py +COPY proto/*.proto ./ +RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto +RUN rm *.proto +RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; + +# Create component sub-folders, get specific Python packages +RUN mkdir -p /var/teraflow/telemetry/backend +WORKDIR /var/teraflow/telemetry/backend +COPY src/telemetry/backend/requirements.in requirements.in +RUN pip-compile --quiet --output-file=requirements.txt requirements.in +RUN python3 -m pip install -r requirements.txt + +# Add component files into working directory +WORKDIR /var/teraflow +COPY src/telemetry/__init__.py telemetry/__init__.py +COPY src/telemetry/backend/. telemetry/backend/ + +# Start the service +ENTRYPOINT ["python", "-m", "telemetry.backend.service"] diff --git a/src/telemetry/frontend/tests/__init__.py b/src/telemetry/backend/requirements.in similarity index 96% rename from src/telemetry/frontend/tests/__init__.py rename to src/telemetry/backend/requirements.in index 3ee6f7071f145e06c3aeaefc09a43ccd88e619e3..e6a559be714faa31196206dbbdc53788506369b5 100644 --- a/src/telemetry/frontend/tests/__init__.py +++ b/src/telemetry/backend/requirements.in @@ -12,3 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. +confluent-kafka==2.3.* diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index d81be79dbe410ccbf2781816f34735f6bfe5639d..95662969be4f9191e5f3748490a6bc47167e6243 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -12,64 +12,56 @@ # See the License for the specific language governing permissions and # limitations under the License. -import ast +import json import time import random import logging -import requests import threading -from typing import Any, Tuple -from common.proto.context_pb2 import Empty +from typing import Any, Dict +# from common.proto.context_pb2 import Empty from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer -from confluent_kafka import KafkaException from confluent_kafka import KafkaError -from confluent_kafka.admin import AdminClient, NewTopic -from common.proto.telemetry_frontend_pb2 import Collector, CollectorId -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from common.Constants import ServiceNameEnum +from common.Settings import get_service_port_grpc +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from common.method_wrappers.Decorator import MetricsPool +from common.tools.service.GenericGrpcService import GenericGrpcService + + LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend') -KAFKA_SERVER_IP = '127.0.0.1:9092' -# KAFKA_SERVER_IP = '10.152.183.175:30092' -ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) -KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', - 'raw' : 'topic_raw' , 'labeled' : 'topic_labeled'} -EXPORTER_ENDPOINT = "http://10.152.183.2:9100/metrics" -PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,} +METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') +# EXPORTER_ENDPOINT = "http://10.152.183.2:9100/metrics" -class TelemetryBackendService: +class TelemetryBackendService(GenericGrpcService): """ - Class to listens for request on Kafka topic, fetches metrics and produces measured values to another Kafka topic. + Class listens for request on Kafka topic, fetches requested metrics from device. + Produces metrics on both RESPONSE and VALUE kafka topics. """ - - def __init__(self): + def __init__(self, cls_name : str = __name__) -> None: LOGGER.info('Init TelemetryBackendService') + port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND) + super().__init__(port, cls_name=cls_name) + self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'backend', + 'auto.offset.reset' : 'latest'}) self.running_threads = {} - - def run_kafka_listener(self)->bool: - threading.Thread(target=self.kafka_listener).start() - return True - - def kafka_listener(self): + + def install_servicers(self): + threading.Thread(target=self.RequestListener).start() + + def RequestListener(self): """ listener for requests on Kafka topic. """ - conusmer_configs = { - 'bootstrap.servers' : KAFKA_SERVER_IP, - 'group.id' : 'backend', - 'auto.offset.reset' : 'latest' - } - # topic_request = "topic_request" - consumerObj = KafkaConsumer(conusmer_configs) - # consumerObj.subscribe([topic_request]) - consumerObj.subscribe([KAFKA_TOPICS['request']]) - + consumer = self.kafka_consumer + consumer.subscribe([KafkaTopic.REQUEST.value]) while True: - receive_msg = consumerObj.poll(2.0) + receive_msg = consumer.poll(2.0) if receive_msg is None: - # print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", KAFKA_TOPICS['request']) # added for debugging purposes continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: @@ -77,177 +69,175 @@ class TelemetryBackendService: else: print("Consumer error: {}".format(receive_msg.error())) break - (kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8')) + + collector = json.loads(receive_msg.value().decode('utf-8')) collector_id = receive_msg.key().decode('utf-8') - if duration == -1 and interval == -1: - self.terminate_collector_backend(collector_id) - # threading.Thread(target=self.terminate_collector_backend, args=(collector_id)) + LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) + print('Recevied Collector: {:} - {:}'.format(collector_id, collector)) + + if collector['duration'] == -1 and collector['interval'] == -1: + self.TerminateCollectorBackend(collector_id) else: - self.run_initiate_collector_backend(collector_id, kpi_id, duration, interval) + self.RunInitiateCollectorBackend(collector_id, collector) + def TerminateCollectorBackend(self, collector_id): + if collector_id in self.running_threads: + thread, stop_event = self.running_threads[collector_id] + stop_event.set() + thread.join() + print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) + del self.running_threads[collector_id] + self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend. + else: + print ('Backend collector {:} not found'.format(collector_id)) - def run_initiate_collector_backend(self, collector_id: str, kpi_id: str, duration: int, interval: int): + def RunInitiateCollectorBackend(self, collector_id: str, collector: str): stop_event = threading.Event() - thread = threading.Thread(target=self.initiate_collector_backend, - args=(collector_id, kpi_id, duration, interval, stop_event)) + thread = threading.Thread(target=self.InitiateCollectorBackend, + args=(collector_id, collector, stop_event)) self.running_threads[collector_id] = (thread, stop_event) thread.start() - def initiate_collector_backend(self, collector_id, kpi_id, duration, interval, stop_event - ): # type: ignore + def InitiateCollectorBackend(self, collector_id, collector, stop_event): """ - Method to receive collector request attribues and initiates collecter backend. + Method receives collector request and initiates collecter backend. """ print("Initiating backend for collector: ", collector_id) start_time = time.time() while not stop_event.is_set(): - if time.time() - start_time >= duration: # condition to terminate backend + if time.time() - start_time >= collector['duration']: # condition to terminate backend print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time) - self.generate_kafka_response(collector_id, "-1", -1) - # write to Kafka to send the termination confirmation. + self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend. break - # print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval) - self.extract_kpi_value(collector_id, kpi_id) - # print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval) - time.sleep(interval) + self.ExtractKpiValue(collector_id, collector['kpi_id']) + time.sleep(collector['interval']) - def extract_kpi_value(self, collector_id: str, kpi_id: str): + def ExtractKpiValue(self, collector_id: str, kpi_id: str): """ Method to extract kpi value. """ - measured_kpi_value = random.randint(1,100) # Should be extracted from exporter/stream - # measured_kpi_value = self.fetch_node_exporter_metrics() # exporter extracted metric value against default KPI - self.generate_kafka_response(collector_id, kpi_id , measured_kpi_value) + measured_kpi_value = random.randint(1,100) # TODO: To be extracted from a device + print ("Measured Kpi value: {:}".format(measured_kpi_value)) + # measured_kpi_value = self.fetch_node_exporter_metrics() # exporter extracted metric value against default KPI + self.GenerateCollectorResponse(collector_id, kpi_id , measured_kpi_value) - def generate_kafka_response(self, collector_id: str, kpi_id: str, kpi_value: Any): + def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ - Method to write response on Kafka topic + Method to write kpi value on RESPONSE Kafka topic """ - # topic_response = "topic_response" - msg_value : Tuple [str, Any] = (kpi_id, kpi_value) - msg_key = collector_id - producerObj = KafkaProducer(PRODUCER_CONFIG) - # producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback) - producerObj.produce(KAFKA_TOPICS['response'], key=msg_key, value= str(msg_value), callback=TelemetryBackendService.delivery_callback) - producerObj.flush() - - def terminate_collector_backend(self, collector_id): - if collector_id in self.running_threads: - thread, stop_event = self.running_threads[collector_id] - stop_event.set() - thread.join() - print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) - del self.running_threads[collector_id] - self.generate_kafka_response(collector_id, "-1", -1) + producer = self.kafka_producer + kpi_value : Dict = { + "kpi_id" : kpi_id, + "kpi_value" : measured_kpi_value + } + producer.produce( + KafkaTopic.RESPONSE.value, + key = collector_id, + value = json.dumps(kpi_value), + callback = self.delivery_callback + ) + producer.flush() - def create_topic_if_not_exists(self, new_topics: list) -> bool: - """ - Method to create Kafka topic if it does not exist. - Args: - admin_client (AdminClient): Kafka admin client. - """ - for topic in new_topics: - try: - topic_metadata = ADMIN_KAFKA_CLIENT.list_topics(timeout=5) - if topic not in topic_metadata.topics: - # If the topic does not exist, create a new topic - print(f"Topic '{topic}' does not exist. Creating...") - LOGGER.warning("Topic {:} does not exist. Creating...".format(topic)) - new_topic = NewTopic(topic, num_partitions=1, replication_factor=1) - ADMIN_KAFKA_CLIENT.create_topics([new_topic]) - except KafkaException as e: - print(f"Failed to create topic: {e}") - return False - return True - - @staticmethod - def delivery_callback( err, msg): + def GenerateRawMetric(self, metrics: Any): """ - Callback function to handle message delivery status. - Args: - err (KafkaError): Kafka error object. - msg (Message): Kafka message object. + Method writes raw metrics on VALUE Kafka topic """ - if err: - print(f'Message delivery failed: {err}') - else: - print(f'Message delivered to topic {msg.topic()}') + producer = self.kafka_producer + some_metric : Dict = { + "some_id" : metrics + } + producer.produce( + KafkaTopic.VALUE.value, + key = 'raw', + value = json.dumps(some_metric), + callback = self.delivery_callback + ) + producer.flush() -# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- - @staticmethod - def fetch_single_node_exporter_metric(): - """ - Method to fetch metrics from Node Exporter. - Returns: - str: Metrics fetched from Node Exporter. + def delivery_callback(self, err, msg): """ - KPI = "node_network_receive_packets_total" - try: - response = requests.get(EXPORTER_ENDPOINT) # type: ignore - LOGGER.info("Request status {:}".format(response)) - if response.status_code == 200: - # print(f"Metrics fetched sucessfully...") - metrics = response.text - # Check if the desired metric is available in the response - if KPI in metrics: - KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI) - # Extract the metric value - if KPI_VALUE is not None: - LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE)) - print(f"Extracted value of {KPI} is: {KPI_VALUE}") - return KPI_VALUE - else: - LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code)) - # print(f"Failed to fetch metrics. Status code: {response.status_code}") - return None - except Exception as e: - LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) - # print(f"Failed to fetch metrics: {str(e)}") - return None - - @staticmethod - def extract_metric_value(metrics, metric_name): - """ - Method to extract the value of a metric from the metrics string. - Args: - metrics (str): Metrics string fetched from Exporter. - metric_name (str): Name of the metric to extract. - Returns: - float: Value of the extracted metric, or None if not found. - """ - try: - # Find the metric line containing the desired metric name - metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name)) - # Split the line to extract the metric value - metric_value = float(metric_line.split()[1]) - return metric_value - except StopIteration: - print(f"Metric '{metric_name}' not found in the metrics.") - return None - - @staticmethod - def stream_node_export_metrics_to_raw_topic(): - try: - while True: - response = requests.get(EXPORTER_ENDPOINT) - # print("Response Status {:} ".format(response)) - # LOGGER.info("Response Status {:} ".format(response)) - try: - if response.status_code == 200: - producerObj = KafkaProducer(PRODUCER_CONFIG) - producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback) - producerObj.flush() - LOGGER.info("Produce to topic") - else: - LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code)) - print(f"Didn't received expected response. Status code: {response.status_code}") - return None - time.sleep(15) - except Exception as e: - LOGGER.info("Failed to process response. Status code: {:}".format(e)) - return None - except Exception as e: - LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) - print(f"Failed to fetch metrics: {str(e)}") - return None -# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter ----------- \ No newline at end of file + Callback function to handle message delivery status. + Args: err (KafkaError): Kafka error object. + msg (Message): Kafka message object. + """ + if err: print(f'Message delivery failed: {err}') + # else: print(f'Message delivered to topic {msg.topic()}') + +# # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- +# @staticmethod +# def fetch_single_node_exporter_metric(): +# """ +# Method to fetch metrics from Node Exporter. +# Returns: +# str: Metrics fetched from Node Exporter. +# """ +# KPI = "node_network_receive_packets_total" +# try: +# response = requests.get(EXPORTER_ENDPOINT) # type: ignore +# LOGGER.info("Request status {:}".format(response)) +# if response.status_code == 200: +# # print(f"Metrics fetched sucessfully...") +# metrics = response.text +# # Check if the desired metric is available in the response +# if KPI in metrics: +# KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI) +# # Extract the metric value +# if KPI_VALUE is not None: +# LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE)) +# print(f"Extracted value of {KPI} is: {KPI_VALUE}") +# return KPI_VALUE +# else: +# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code)) +# # print(f"Failed to fetch metrics. Status code: {response.status_code}") +# return None +# except Exception as e: +# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) +# # print(f"Failed to fetch metrics: {str(e)}") +# return None + +# @staticmethod +# def extract_metric_value(metrics, metric_name): +# """ +# Method to extract the value of a metric from the metrics string. +# Args: +# metrics (str): Metrics string fetched from Exporter. +# metric_name (str): Name of the metric to extract. +# Returns: +# float: Value of the extracted metric, or None if not found. +# """ +# try: +# # Find the metric line containing the desired metric name +# metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name)) +# # Split the line to extract the metric value +# metric_value = float(metric_line.split()[1]) +# return metric_value +# except StopIteration: +# print(f"Metric '{metric_name}' not found in the metrics.") +# return None + +# @staticmethod +# def stream_node_export_metrics_to_raw_topic(): +# try: +# while True: +# response = requests.get(EXPORTER_ENDPOINT) +# # print("Response Status {:} ".format(response)) +# # LOGGER.info("Response Status {:} ".format(response)) +# try: +# if response.status_code == 200: +# producerObj = KafkaProducer(PRODUCER_CONFIG) +# producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback) +# producerObj.flush() +# LOGGER.info("Produce to topic") +# else: +# LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code)) +# print(f"Didn't received expected response. Status code: {response.status_code}") +# return None +# time.sleep(15) +# except Exception as e: +# LOGGER.info("Failed to process response. Status code: {:}".format(e)) +# return None +# except Exception as e: +# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) +# print(f"Failed to fetch metrics: {str(e)}") +# return None +# # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter ----------- \ No newline at end of file diff --git a/src/telemetry/backend/service/__main__.py b/src/telemetry/backend/service/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..4ad86733141966894070b78b3ac227890293fa7c --- /dev/null +++ b/src/telemetry/backend/service/__main__.py @@ -0,0 +1,51 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, signal, sys, threading +from common.Settings import get_log_level +from .TelemetryBackendService import TelemetryBackendService + +terminate = threading.Event() +LOGGER = None + +def signal_handler(signal, frame): # pylint: disable=redefined-outer-name + LOGGER.warning('Terminate signal received') + terminate.set() + +def main(): + global LOGGER # pylint: disable=global-statement + + log_level = get_log_level() + logging.basicConfig(level=log_level) + LOGGER = logging.getLogger(__name__) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + LOGGER.debug('Starting...') + + grpc_service = TelemetryBackendService() + grpc_service.start() + + # Wait for Ctrl+C or termination signal + while not terminate.wait(timeout=1.0): pass + + LOGGER.debug('Terminating...') + grpc_service.stop() + + LOGGER.debug('Bye') + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/telemetry/backend/tests/testTelemetryBackend.py b/src/telemetry/backend/tests/testTelemetryBackend.py deleted file mode 100644 index d832e54e77589ca677682760d19e68b1bd09b1f7..0000000000000000000000000000000000000000 --- a/src/telemetry/backend/tests/testTelemetryBackend.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys -print (sys.path) -sys.path.append('/home/tfs/tfs-ctrl') -import threading -import logging -from typing import Tuple -# from common.proto.context_pb2 import Empty -from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService - -LOGGER = logging.getLogger(__name__) - - -########################### -# Tests Implementation of Telemetry Backend -########################### - -def test_verify_kafka_topics(): - LOGGER.info('test_verify_kafka_topics requesting') - TelemetryBackendServiceObj = TelemetryBackendService() - KafkaTopics = ['topic_request', 'topic_response', 'topic_raw', 'topic_labled'] - response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics) - LOGGER.debug(str(response)) - assert isinstance(response, bool) - -# def test_run_kafka_listener(): -# LOGGER.info('test_receive_kafka_request requesting') -# TelemetryBackendServiceObj = TelemetryBackendService() -# response = TelemetryBackendServiceObj.run_kafka_listener() -# LOGGER.debug(str(response)) -# assert isinstance(response, bool) - -# def test_fetch_node_exporter_metrics(): -# LOGGER.info(' >>> test_fetch_node_exporter_metrics START <<< ') -# TelemetryBackendService.fetch_single_node_exporter_metric() - -def test_stream_node_export_metrics_to_raw_topic(): - LOGGER.info(' >>> test_stream_node_export_metrics_to_raw_topic START <<< ') - threading.Thread(target=TelemetryBackendService.stream_node_export_metrics_to_raw_topic, args=()).start() - diff --git a/src/telemetry/backend/tests/test_TelemetryBackend.py b/src/telemetry/backend/tests/test_TelemetryBackend.py new file mode 100644 index 0000000000000000000000000000000000000000..a2bbee540c3ce348ef52eceb0e776f48a68d94b1 --- /dev/null +++ b/src/telemetry/backend/tests/test_TelemetryBackend.py @@ -0,0 +1,38 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from common.tools.kafka.Variables import KafkaTopic +from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService + + +LOGGER = logging.getLogger(__name__) + + +########################### +# Tests Implementation of Telemetry Backend +########################### + +# --- "test_validate_kafka_topics" should be run before the functionality tests --- +def test_validate_kafka_topics(): + LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) + +def test_RunRequestListener(): + LOGGER.info('test_RunRequestListener') + TelemetryBackendServiceObj = TelemetryBackendService() + response = TelemetryBackendServiceObj.RunRequestListener() + LOGGER.debug(str(response)) + assert isinstance(response, bool) diff --git a/src/telemetry/database/TelemetryDBmanager.py b/src/telemetry/database/TelemetryDBmanager.py deleted file mode 100644 index b558180a9e1fbf85bf523c7faededf58f57e2264..0000000000000000000000000000000000000000 --- a/src/telemetry/database/TelemetryDBmanager.py +++ /dev/null @@ -1,248 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging, time -import sqlalchemy -from sqlalchemy import inspect, MetaData, Table -from sqlalchemy.orm import sessionmaker -from telemetry.database.TelemetryModel import Collector as CollectorModel -from telemetry.database.TelemetryModel import Kpi as KpiModel -from sqlalchemy.ext.declarative import declarative_base -from telemetry.database.TelemetryEngine import TelemetryEngine -from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId -from common.proto.telemetry_frontend_pb2 import Collector, CollectorId -from sqlalchemy.exc import SQLAlchemyError -from telemetry.database.TelemetryModel import Base - -LOGGER = logging.getLogger(__name__) -DB_NAME = "telemetryfrontend" - -class TelemetryDBmanager: - def __init__(self): - self.db_engine = TelemetryEngine.get_engine() - if self.db_engine is None: - LOGGER.error('Unable to get SQLAlchemy DB Engine...') - return False - self.db_name = DB_NAME - self.Session = sessionmaker(bind=self.db_engine) - - def create_database(self): - try: - # with self.db_engine.connect() as connection: - # connection.execute(f"CREATE DATABASE {self.db_name};") - TelemetryEngine.create_database(self.db_engine) - LOGGER.info('TelemetryDBmanager initalized DB Name: {:}'.format(self.db_name)) - return True - except Exception as e: # pylint: disable=bare-except # pragma: no cover - LOGGER.exception('Failed to check/create the database: {:s}'.format(str(e))) - return False - - def create_tables(self): - try: - Base.metadata.create_all(self.db_engine) # type: ignore - LOGGER.info("Tables created in database ({:}) the as per Models".format(self.db_name)) - except Exception as e: - LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e))) - - def verify_tables(self): - try: - with self.db_engine.connect() as connection: - result = connection.execute("SHOW TABLES;") - tables = result.fetchall() - LOGGER.info("Tables in DB: {:}".format(tables)) - except Exception as e: - LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) - - def drop_table(self, table_to_drop: str): - try: - inspector = inspect(self.db_engine) - existing_tables = inspector.get_table_names() - if table_to_drop in existing_tables: - table = Table(table_to_drop, MetaData(), autoload_with=self.db_engine) - table.drop(self.db_engine) - LOGGER.info("Tables delete in the DB Name: {:}".format(self.db_name)) - else: - LOGGER.warning("No table {:} in database {:} ".format(table_to_drop, DB_NAME)) - except Exception as e: - LOGGER.info("Tables cannot be deleted in the {:} database. {:s}".format(DB_NAME, str(e))) - - def list_databases(self): - query = "SHOW DATABASES" - with self.db_engine.connect() as connection: - result = connection.execute(query) - databases = [row[0] for row in result] - LOGGER.info("List of available DBs: {:}".format(databases)) - -# ------------------ INSERT METHODs -------------------------------------- - - def inser_kpi(self, request: KpiDescriptor): - session = self.Session() - try: - # Create a new Kpi instance - kpi_to_insert = KpiModel() - kpi_to_insert.kpi_id = request.kpi_id.kpi_id.uuid - kpi_to_insert.kpi_description = request.kpi_description - kpi_to_insert.kpi_sample_type = request.kpi_sample_type - kpi_to_insert.device_id = request.service_id.service_uuid.uuid - kpi_to_insert.endpoint_id = request.device_id.device_uuid.uuid - kpi_to_insert.service_id = request.slice_id.slice_uuid.uuid - kpi_to_insert.slice_id = request.endpoint_id.endpoint_uuid.uuid - kpi_to_insert.connection_id = request.connection_id.connection_uuid.uuid - # kpi_to_insert.link_id = request.link_id.link_id.uuid - # Add the instance to the session - session.add(kpi_to_insert) - session.commit() - LOGGER.info("Row inserted into kpi table: {:}".format(kpi_to_insert.kpi_id)) - except Exception as e: - session.rollback() - LOGGER.info("Failed to insert new kpi. {:s}".format(str(e))) - finally: - # Close the session - session.close() - - # Function to insert a row into the Collector model - def insert_collector(self, request: Collector): - session = self.Session() - try: - # Create a new Collector instance - collector_to_insert = CollectorModel() - collector_to_insert.collector_id = request.collector_id.collector_id.uuid - collector_to_insert.kpi_id = request.kpi_id.kpi_id.uuid - collector_to_insert.collector = "Test collector description" - collector_to_insert.sampling_duration_s = request.duration_s - collector_to_insert.sampling_interval_s = request.interval_s - collector_to_insert.start_timestamp = time.time() - collector_to_insert.end_timestamp = time.time() - - session.add(collector_to_insert) - session.commit() - LOGGER.info("Row inserted into collector table: {:}".format(collector_to_insert.collector_id)) - except Exception as e: - session.rollback() - LOGGER.info("Failed to insert new collector. {:s}".format(str(e))) - finally: - # Close the session - session.close() - -# ------------------ GET METHODs -------------------------------------- - - def get_kpi_descriptor(self, request: KpiId): - session = self.Session() - try: - kpi_id_to_search = request.kpi_id.uuid - kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_search).first() - if kpi: - LOGGER.info("kpi ID found: {:s}".format(str(kpi))) - return kpi - else: - LOGGER.warning("Kpi ID not found {:s}".format(str(kpi_id_to_search))) - return None - except Exception as e: - session.rollback() - LOGGER.info("Failed to retrieve KPI ID. {:s}".format(str(e))) - raise - finally: - session.close() - - def get_collector(self, request: CollectorId): - session = self.Session() - try: - collector_id_to_search = request.collector_id.uuid - collector = session.query(CollectorModel).filter_by(collector_id=collector_id_to_search).first() - if collector: - LOGGER.info("collector ID found: {:s}".format(str(collector))) - return collector - else: - LOGGER.warning("collector ID not found{:s}".format(str(collector_id_to_search))) - return None - except Exception as e: - session.rollback() - LOGGER.info("Failed to retrieve collector ID. {:s}".format(str(e))) - raise - finally: - session.close() - - # ------------------ SELECT METHODs -------------------------------------- - - def select_kpi_descriptor(self, **filters): - session = self.Session() - try: - query = session.query(KpiModel) - for column, value in filters.items(): - query = query.filter(getattr(KpiModel, column) == value) - result = query.all() - if len(result) != 0: - LOGGER.info("Fetched filtered rows from KPI table with filters : {:s}".format(str(result))) - else: - LOGGER.warning("No matching row found : {:s}".format(str(result))) - return result - except SQLAlchemyError as e: - LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e)) - return [] - finally: - session.close() - - def select_collector(self, **filters): - session = self.Session() - try: - query = session.query(CollectorModel) - for column, value in filters.items(): - query = query.filter(getattr(CollectorModel, column) == value) - result = query.all() - if len(result) != 0: - LOGGER.info("Fetched filtered rows from KPI table with filters : {:s}".format(str(result))) - else: - LOGGER.warning("No matching row found : {:s}".format(str(result))) - return result - except SQLAlchemyError as e: - LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e)) - return [] - finally: - session.close() - -# ------------------ DELETE METHODs -------------------------------------- - - def delete_kpi_descriptor(self, request: KpiId): - session = self.Session() - try: - kpi_id_to_delete = request.kpi_id.uuid - kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_delete).first() - if kpi: - session.delete(kpi) - session.commit() - LOGGER.info("Deleted KPI with kpi_id: %s", kpi_id_to_delete) - else: - LOGGER.warning("KPI with kpi_id %s not found", kpi_id_to_delete) - except SQLAlchemyError as e: - session.rollback() - LOGGER.error("Error deleting KPI with kpi_id %s: %s", kpi_id_to_delete, e) - finally: - session.close() - - def delete_collector(self, request: CollectorId): - session = self.Session() - try: - collector_id_to_delete = request.collector_id.uuid - collector = session.query(CollectorModel).filter_by(collector_id=collector_id_to_delete).first() - if collector: - session.delete(collector) - session.commit() - LOGGER.info("Deleted collector with collector_id: %s", collector_id_to_delete) - else: - LOGGER.warning("collector with collector_id %s not found", collector_id_to_delete) - except SQLAlchemyError as e: - session.rollback() - LOGGER.error("Error deleting collector with collector_id %s: %s", collector_id_to_delete, e) - finally: - session.close() \ No newline at end of file diff --git a/src/telemetry/database/TelemetryEngine.py b/src/telemetry/database/TelemetryEngine.py index a563fa09f94c812aed07d0aa3cbd5bc988737fc4..18ec2ddbc671302b642db04b673038659da7acde 100644 --- a/src/telemetry/database/TelemetryEngine.py +++ b/src/telemetry/database/TelemetryEngine.py @@ -12,48 +12,31 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, sqlalchemy, sqlalchemy_utils -# from common.Settings import get_setting +import logging, sqlalchemy +from common.Settings import get_setting LOGGER = logging.getLogger(__name__) -APP_NAME = 'tfs' -ECHO = False # False: No dump SQL commands and transactions executed -CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@127.0.0.1:{:s}/{:s}?sslmode={:s}' -# CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' +# CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@127.0.0.1:{:s}/{:s}?sslmode={:s}' +CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' class TelemetryEngine: - # def __init__(self): - # self.engine = self.get_engine() @staticmethod def get_engine() -> sqlalchemy.engine.Engine: - CRDB_NAMESPACE = "crdb" - CRDB_SQL_PORT = "26257" - CRDB_DATABASE = "telemetryfrontend" - CRDB_USERNAME = "tfs" - CRDB_PASSWORD = "tfs123" - CRDB_SSLMODE = "require" - crdb_uri = CRDB_URI_TEMPLATE.format( - CRDB_USERNAME, CRDB_PASSWORD, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) - # crdb_uri = CRDB_URI_TEMPLATE.format( - # CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) + crdb_uri = get_setting('CRDB_URI', default=None) + if crdb_uri is None: + CRDB_NAMESPACE = "crdb" + CRDB_SQL_PORT = "26257" + CRDB_DATABASE = "tfs-telemetry" + CRDB_USERNAME = "tfs" + CRDB_PASSWORD = "tfs123" + CRDB_SSLMODE = "require" + crdb_uri = CRDB_URI_TEMPLATE.format( + CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) try: - # engine = sqlalchemy.create_engine( - # crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True) engine = sqlalchemy.create_engine(crdb_uri, echo=False) - LOGGER.info(' TelemetryDBmanager initalized with DB URL: {:}'.format(crdb_uri)) + LOGGER.info(' TelemetryDB initalized with DB URL: {:}'.format(crdb_uri)) except: # pylint: disable=bare-except # pragma: no cover LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) return None # type: ignore return engine # type: ignore - - @staticmethod - def create_database(engine : sqlalchemy.engine.Engine) -> None: - if not sqlalchemy_utils.database_exists(engine.url): - LOGGER.info("Database created. {:}".format(engine.url)) - sqlalchemy_utils.create_database(engine.url) - - @staticmethod - def drop_database(engine : sqlalchemy.engine.Engine) -> None: - if sqlalchemy_utils.database_exists(engine.url): - sqlalchemy_utils.drop_database(engine.url) diff --git a/src/telemetry/database/TelemetryModel.py b/src/telemetry/database/TelemetryModel.py index be4f0969c86638520cf226b8e42db90426165804..4e71ce8138af39e51c80791dbd6683d855231d7b 100644 --- a/src/telemetry/database/TelemetryModel.py +++ b/src/telemetry/database/TelemetryModel.py @@ -14,32 +14,60 @@ import logging from sqlalchemy.dialects.postgresql import UUID -from sqlalchemy import Column, Integer, String, Float, Text, ForeignKey -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker, relationship +from sqlalchemy import Column, String, Float from sqlalchemy.orm import registry +from common.proto import telemetry_frontend_pb2 logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) # Create a base class for declarative models Base = registry().generate_base() -# Base = declarative_base() class Collector(Base): __tablename__ = 'collector' collector_id = Column(UUID(as_uuid=False), primary_key=True) - kpi_id = Column(UUID(as_uuid=False)) - collector_decription = Column(String) - sampling_duration_s = Column(Float) - sampling_interval_s = Column(Float) - start_timestamp = Column(Float) - end_timestamp = Column(Float) - + kpi_id = Column(UUID(as_uuid=False), nullable=False) + sampling_duration_s = Column(Float , nullable=False) + sampling_interval_s = Column(Float , nullable=False) + start_timestamp = Column(Float , nullable=False) + end_timestamp = Column(Float , nullable=False) + # helps in logging the information def __repr__(self): - return (f"") \ No newline at end of file + return (f"") + + @classmethod + def ConvertCollectorToRow(cls, request): + """ + Create an instance of Collector table rows from a request object. + Args: request: The request object containing collector gRPC message. + Returns: A row (an instance of Collector table) initialized with content of the request. + """ + return cls( + collector_id = request.collector_id.collector_id.uuid, + kpi_id = request.kpi_id.kpi_id.uuid, + sampling_duration_s = request.duration_s, + sampling_interval_s = request.interval_s, + start_timestamp = request.start_time.timestamp, + end_timestamp = request.end_time.timestamp + ) + + @classmethod + def ConvertRowToCollector(cls, row): + """ + Create and return a dictionary representation of a Collector table instance. + Args: row: The Collector table instance (row) containing the data. + Returns: collector gRPC message initialized with the content of a row. + """ + response = telemetry_frontend_pb2.Collector() + response.collector_id.collector_id.uuid = row.collector_id + response.kpi_id.kpi_id.uuid = row.kpi_id + response.duration_s = row.sampling_duration_s + response.interval_s = row.sampling_interval_s + response.start_time.timestamp = row.start_timestamp + response.end_time.timestamp = row.end_timestamp + return response diff --git a/src/telemetry/database/Telemetry_DB.py b/src/telemetry/database/Telemetry_DB.py new file mode 100644 index 0000000000000000000000000000000000000000..32acfd73a410a7bfddd6b487d0b1962afadb3842 --- /dev/null +++ b/src/telemetry/database/Telemetry_DB.py @@ -0,0 +1,137 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import sqlalchemy_utils +from sqlalchemy import inspect +from sqlalchemy.orm import sessionmaker +from telemetry.database.TelemetryModel import Collector as CollectorModel +from telemetry.database.TelemetryEngine import TelemetryEngine +from common.method_wrappers.ServiceExceptions import ( + OperationFailedException, AlreadyExistsException ) + +LOGGER = logging.getLogger(__name__) +DB_NAME = "tfs_telemetry" + +class TelemetryDB: + def __init__(self): + self.db_engine = TelemetryEngine.get_engine() + if self.db_engine is None: + LOGGER.error('Unable to get SQLAlchemy DB Engine...') + return False + self.db_name = DB_NAME + self.Session = sessionmaker(bind=self.db_engine) + + def create_database(self): + if not sqlalchemy_utils.database_exists(self.db_engine.url): + LOGGER.debug("Database created. {:}".format(self.db_engine.url)) + sqlalchemy_utils.create_database(self.db_engine.url) + + def drop_database(self) -> None: + if sqlalchemy_utils.database_exists(self.db_engine.url): + sqlalchemy_utils.drop_database(self.db_engine.url) + + def create_tables(self): + try: + CollectorModel.metadata.create_all(self.db_engine) # type: ignore + LOGGER.debug("Tables created in the database: {:}".format(self.db_name)) + except Exception as e: + LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e))) + raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)]) + + def verify_tables(self): + try: + inspect_object = inspect(self.db_engine) + if(inspect_object.has_table('collector', None)): + LOGGER.info("Table exists in DB: {:}".format(self.db_name)) + except Exception as e: + LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) + +# ----------------- CURD METHODs --------------------- + + def add_row_to_db(self, row): + session = self.Session() + try: + session.add(row) + session.commit() + LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.") + return True + except Exception as e: + session.rollback() + if "psycopg2.errors.UniqueViolation" in str(e): + LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") + raise AlreadyExistsException(row.__class__.__name__, row, + extra_details=["Unique key voilation: {:}".format(e)] ) + else: + LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") + raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) + finally: + session.close() + + def search_db_row_by_id(self, model, col_name, id_to_search): + session = self.Session() + try: + entity = session.query(model).filter_by(**{col_name: id_to_search}).first() + if entity: + # LOGGER.debug(f"{model.__name__} ID found: {str(entity)}") + return entity + else: + LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}") + print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) + return None + except Exception as e: + session.rollback() + LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") + raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) + finally: + session.close() + + def delete_db_row_by_id(self, model, col_name, id_to_search): + session = self.Session() + try: + record = session.query(model).filter_by(**{col_name: id_to_search}).first() + if record: + session.delete(record) + session.commit() + LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search) + else: + LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search) + return None + except Exception as e: + session.rollback() + LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) + raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) + finally: + session.close() + + def select_with_filter(self, model, filter_object): + session = self.Session() + try: + query = session.query(CollectorModel) + # Apply filters based on the filter_object + if filter_object.kpi_id: + query = query.filter(CollectorModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) + result = query.all() + # query should be added to return all rows + if result: + LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result} + else: + LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}") + return result + except Exception as e: + LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}") + raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)]) + finally: + session.close() + diff --git a/src/telemetry/database/managementDB.py b/src/telemetry/database/managementDB.py deleted file mode 100644 index f79126f279d7bbece6c08ae5eb1cd74e340d1c7d..0000000000000000000000000000000000000000 --- a/src/telemetry/database/managementDB.py +++ /dev/null @@ -1,138 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging, time -import sqlalchemy -import sqlalchemy_utils -from sqlalchemy.orm import sessionmaker -from sqlalchemy.ext.declarative import declarative_base -from telemetry.database.TelemetryEngine import TelemetryEngine -from telemetry.database.TelemetryModel import Base - -LOGGER = logging.getLogger(__name__) -DB_NAME = "telemetryfrontend" - -# # Create a base class for declarative models -# Base = declarative_base() - -class managementDB: - def __init__(self): - self.db_engine = TelemetryEngine.get_engine() - if self.db_engine is None: - LOGGER.error('Unable to get SQLAlchemy DB Engine...') - return False - self.db_name = DB_NAME - self.Session = sessionmaker(bind=self.db_engine) - - @staticmethod - def create_database(engine : sqlalchemy.engine.Engine) -> None: - if not sqlalchemy_utils.database_exists(engine.url): - LOGGER.info("Database created. {:}".format(engine.url)) - sqlalchemy_utils.create_database(engine.url) - - @staticmethod - def drop_database(engine : sqlalchemy.engine.Engine) -> None: - if sqlalchemy_utils.database_exists(engine.url): - sqlalchemy_utils.drop_database(engine.url) - - # def create_database(self): - # try: - # with self.db_engine.connect() as connection: - # connection.execute(f"CREATE DATABASE {self.db_name};") - # LOGGER.info('managementDB initalizes database. Name: {self.db_name}') - # return True - # except: - # LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url))) - # return False - - @staticmethod - def create_tables(engine : sqlalchemy.engine.Engine): - try: - Base.metadata.create_all(engine) # type: ignore - LOGGER.info("Tables created in the DB Name: {:}".format(DB_NAME)) - except Exception as e: - LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e))) - - def verify_tables(self): - try: - with self.db_engine.connect() as connection: - result = connection.execute("SHOW TABLES;") - tables = result.fetchall() # type: ignore - LOGGER.info("Tables verified: {:}".format(tables)) - except Exception as e: - LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) - - @staticmethod - def add_row_to_db(self, row): - session = self.Session() - try: - session.add(row) - session.commit() - LOGGER.info(f"Row inserted into {row.__class__.__name__} table.") - except Exception as e: - session.rollback() - LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") - finally: - session.close() - - def search_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - entity = session.query(model).filter_by(**{col_name: id_to_search}).first() - if entity: - LOGGER.info(f"{model.__name__} ID found: {str(entity)}") - return entity - else: - LOGGER.warning(f"{model.__name__} ID not found: {str(id_to_search)}") - return None - except Exception as e: - session.rollback() - LOGGER.info(f"Failed to retrieve {model.__name__} ID. {str(e)}") - raise - finally: - session.close() - - def delete_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - record = session.query(model).filter_by(**{col_name: id_to_search}).first() - if record: - session.delete(record) - session.commit() - LOGGER.info("Deleted %s with %s: %s", model.__name__, col_name, id_to_search) - else: - LOGGER.warning("%s with %s %s not found", model.__name__, col_name, id_to_search) - except Exception as e: - session.rollback() - LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) - finally: - session.close() - - def select_with_filter(self, model, **filters): - session = self.Session() - try: - query = session.query(model) - for column, value in filters.items(): - query = query.filter(getattr(model, column) == value) # type: ignore - result = query.all() - if result: - LOGGER.info(f"Fetched filtered rows from {model.__name__} table with filters: {filters}") # - Results: {result} - else: - LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filters}") - return result - except Exception as e: - LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filters} ::: {e}") - return [] - finally: - session.close() \ No newline at end of file diff --git a/src/telemetry/database/tests/managementDBtests.py b/src/telemetry/database/tests/managementDBtests.py deleted file mode 100644 index 24138abe42be742bd9b16d7840343f9d7c7fe133..0000000000000000000000000000000000000000 --- a/src/telemetry/database/tests/managementDBtests.py +++ /dev/null @@ -1,22 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from telemetry.database.managementDB import managementDB -from telemetry.database.tests.messages import create_collector_model_object - - -def test_add_row_to_db(): - managementDBobj = managementDB() - managementDBobj.add_row_to_db(create_collector_model_object()) \ No newline at end of file diff --git a/src/telemetry/database/tests/telemetryDBtests.py b/src/telemetry/database/tests/telemetryDBtests.py deleted file mode 100644 index 0d221106419d6e4ee4b313adf10c90c5e6be7666..0000000000000000000000000000000000000000 --- a/src/telemetry/database/tests/telemetryDBtests.py +++ /dev/null @@ -1,86 +0,0 @@ - -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -from typing import Any -from sqlalchemy.ext.declarative import declarative_base -from telemetry.database.TelemetryDBmanager import TelemetryDBmanager -from telemetry.database.TelemetryEngine import TelemetryEngine -from telemetry.database.tests import temp_DB -from .messages import create_kpi_request, create_collector_request, \ - create_kpi_id_request, create_kpi_filter_request, \ - create_collector_id_request, create_collector_filter_request - -logging.basicConfig(level=logging.INFO) -LOGGER = logging.getLogger(__name__) - - -# def test_temp_DB(): -# temp_DB.main() - -def test_telemetry_object_creation(): - LOGGER.info('--- test_telemetry_object_creation: START') - - LOGGER.info('>>> Creating TelemetryDBmanager Object <<< ') - TelemetryDBmanagerObj = TelemetryDBmanager() - TelemetryEngine.create_database(TelemetryDBmanagerObj.db_engine) # creates 'frontend' db, if it doesnot exists. - - LOGGER.info('>>> Creating database <<< ') - TelemetryDBmanagerObj.create_database() - - LOGGER.info('>>> verifing database <<< ') - TelemetryDBmanagerObj.list_databases() - - # # LOGGER.info('>>> Droping Tables: ') - # # TelemetryDBmanagerObj.drop_table("table_naem_here") - - LOGGER.info('>>> Creating Tables <<< ') - TelemetryDBmanagerObj.create_tables() - - LOGGER.info('>>> Verifing Table creation <<< ') - TelemetryDBmanagerObj.verify_tables() - - # LOGGER.info('>>> TESTING: Row Insertion Operation: kpi Table <<<') - # kpi_obj = create_kpi_request() - # TelemetryDBmanagerObj.inser_kpi(kpi_obj) - - # LOGGER.info('>>> TESTING: Row Insertion Operation: collector Table <<<') - # collector_obj = create_collector_request() - # TelemetryDBmanagerObj.insert_collector(collector_obj) - - # LOGGER.info('>>> TESTING: Get KpiDescriptor <<<') - # kpi_id_obj = create_kpi_id_request() - # TelemetryDBmanagerObj.get_kpi_descriptor(kpi_id_obj) - - # LOGGER.info('>>> TESTING: Select Collector <<<') - # collector_id_obj = create_collector_id_request() - # TelemetryDBmanagerObj.get_collector(collector_id_obj) - - # LOGGER.info('>>> TESTING: Applying kpi filter <<< ') - # kpi_filter : dict[str, Any] = create_kpi_filter_request() - # TelemetryDBmanagerObj.select_kpi_descriptor(**kpi_filter) - - # LOGGER.info('>>> TESTING: Applying collector filter <<<') - # collector_filter : dict[str, Any] = create_collector_filter_request() - # TelemetryDBmanagerObj.select_collector(**collector_filter) - - # LOGGER.info('>>> TESTING: Delete KpiDescriptor ') - # kpi_id_obj = create_kpi_id_request() - # TelemetryDBmanagerObj.delete_kpi_descriptor(kpi_id_obj) - - # LOGGER.info('>>> TESTING: Delete Collector ') - # collector_id_obj = create_collector_id_request() - # TelemetryDBmanagerObj.delete_collector(collector_id_obj) - \ No newline at end of file diff --git a/src/telemetry/database/tests/temp_DB.py b/src/telemetry/database/tests/temp_DB.py deleted file mode 100644 index 089d3542492c2da87b839416f7118749bb82caad..0000000000000000000000000000000000000000 --- a/src/telemetry/database/tests/temp_DB.py +++ /dev/null @@ -1,327 +0,0 @@ -from sqlalchemy import create_engine, Column, String, Integer, Text, Float, ForeignKey -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker, relationship -from sqlalchemy.dialects.postgresql import UUID -import logging - -LOGGER = logging.getLogger(__name__) -Base = declarative_base() - -class Kpi(Base): - __tablename__ = 'kpi' - - kpi_id = Column(UUID(as_uuid=False), primary_key=True) - kpi_description = Column(Text) - kpi_sample_type = Column(Integer) - device_id = Column(String) - endpoint_id = Column(String) - service_id = Column(String) - slice_id = Column(String) - connection_id = Column(String) - link_id = Column(String) - - collectors = relationship('Collector', back_populates='kpi') - - def __repr__(self): - return (f"") - -class Collector(Base): - __tablename__ = 'collector' - - collector_id = Column(UUID(as_uuid=False), primary_key=True) - kpi_id = Column(UUID(as_uuid=False), ForeignKey('kpi.kpi_id')) - collector = Column(String) - sampling_duration_s = Column(Float) - sampling_interval_s = Column(Float) - start_timestamp = Column(Float) - end_timestamp = Column(Float) - - kpi = relationship('Kpi', back_populates='collectors') - - def __repr__(self): - return (f"") - -class DatabaseManager: - def __init__(self, db_url, db_name): - self.engine = create_engine(db_url) - self.db_name = db_name - self.Session = sessionmaker(bind=self.engine) - LOGGER.info("DatabaseManager initialized with DB URL: %s and DB Name: %s", db_url, db_name) - - def create_database(self): - try: - with self.engine.connect() as connection: - connection.execute(f"CREATE DATABASE {self.db_name};") - LOGGER.info("Database '%s' created successfully.", self.db_name) - except Exception as e: - LOGGER.error("Error creating database '%s': %s", self.db_name, e) - finally: - LOGGER.info("create_database method execution finished.") - - def create_tables(self): - try: - Base.metadata.create_all(self.engine) - LOGGER.info("Tables created successfully.") - except Exception as e: - LOGGER.error("Error creating tables: %s", e) - finally: - LOGGER.info("create_tables method execution finished.") - - def verify_table_creation(self): - try: - with self.engine.connect() as connection: - result = connection.execute("SHOW TABLES;") - tables = result.fetchall() - LOGGER.info("Tables verified: %s", tables) - return tables - except Exception as e: - LOGGER.error("Error verifying table creation: %s", e) - return [] - finally: - LOGGER.info("verify_table_creation method execution finished.") - - def insert_row_kpi(self, kpi_data): - session = self.Session() - try: - new_kpi = Kpi(**kpi_data) - session.add(new_kpi) - session.commit() - LOGGER.info("Inserted row into KPI table: %s", kpi_data) - except Exception as e: - session.rollback() - LOGGER.error("Error inserting row into KPI table: %s", e) - finally: - session.close() - LOGGER.info("insert_row_kpi method execution finished.") - - def insert_row_collector(self, collector_data): - session = self.Session() - try: - new_collector = Collector(**collector_data) - session.add(new_collector) - session.commit() - LOGGER.info("Inserted row into Collector table: %s", collector_data) - except Exception as e: - session.rollback() - LOGGER.error("Error inserting row into Collector table: %s", e) - finally: - session.close() - LOGGER.info("insert_row_collector method execution finished.") - - def verify_insertion_kpi(self, kpi_id): - session = self.Session() - try: - kpi = session.query(Kpi).filter_by(kpi_id=kpi_id).first() - LOGGER.info("Verified insertion in KPI table for kpi_id: %s, Result: %s", kpi_id, kpi) - return kpi - except Exception as e: - LOGGER.error("Error verifying insertion in KPI table for kpi_id %s: %s", kpi_id, e) - return None - finally: - session.close() - LOGGER.info("verify_insertion_kpi method execution finished.") - - def verify_insertion_collector(self, collector_id): - session = self.Session() - try: - collector = session.query(Collector).filter_by(collector_id=collector_id).first() - LOGGER.info("Verified insertion in Collector table for collector_id: %s, Result: %s", collector_id, collector) - return collector - except Exception as e: - LOGGER.error("Error verifying insertion in Collector table for collector_id %s: %s", collector_id, e) - return None - finally: - session.close() - LOGGER.info("verify_insertion_collector method execution finished.") - - def get_all_kpi_rows(self): - session = self.Session() - try: - kpi_rows = session.query(Kpi).all() - LOGGER.info("Fetched all rows from KPI table: %s", kpi_rows) - return kpi_rows - except Exception as e: - LOGGER.error("Error fetching all rows from KPI table: %s", e) - return [] - finally: - session.close() - LOGGER.info("get_all_kpi_rows method execution finished.") - - def get_all_collector_rows(self): - session = self.Session() - try: - collector_rows = session.query(Collector).all() - LOGGER.info("Fetched all rows from Collector table: %s", collector_rows) - return collector_rows - except Exception as e: - LOGGER.error("Error fetching all rows from Collector table: %s", e) - return [] - finally: - session.close() - LOGGER.info("get_all_collector_rows method execution finished.") - - def get_filtered_kpi_rows(self, **filters): - session = self.Session() - try: - query = session.query(Kpi) - for column, value in filters.items(): - query = query.filter(getattr(Kpi, column) == value) - result = query.all() - LOGGER.info("Fetched filtered rows from KPI table with filters ---------- : {:s}".format(str(result))) - return result - except NoResultFound: - LOGGER.warning("No results found in KPI table with filters %s", filters) - return [] - except Exception as e: - LOGGER.error("Error fetching filtered rows from KPI table with filters %s: %s", filters, e) - return [] - finally: - session.close() - LOGGER.info("get_filtered_kpi_rows method execution finished.") - - def get_filtered_collector_rows(self, **filters): - session = self.Session() - try: - query = session.query(Collector) - for column, value in filters.items(): - query = query.filter(getattr(Collector, column) == value) - result = query.all() - LOGGER.info("Fetched filtered rows from Collector table with filters %s: %s", filters, result) - return result - except NoResultFound: - LOGGER.warning("No results found in Collector table with filters %s", filters) - return [] - except Exception as e: - LOGGER.error("Error fetching filtered rows from Collector table with filters %s: %s", filters, e) - return [] - finally: - session.close() - LOGGER.info("get_filtered_collector_rows method execution finished.") - - def delete_kpi_by_id(self, kpi_id): - session = self.Session() - try: - kpi = session.query(Kpi).filter_by(kpi_id=kpi_id).first() - if kpi: - session.delete(kpi) - session.commit() - LOGGER.info("Deleted KPI with kpi_id: %s", kpi_id) - else: - LOGGER.warning("KPI with kpi_id %s not found", kpi_id) - except SQLAlchemyError as e: - session.rollback() - LOGGER.error("Error deleting KPI with kpi_id %s: %s", kpi_id, e) - finally: - session.close() - LOGGER.info("delete_kpi_by_id method execution finished.") - - def delete_collector_by_id(self, collector_id): - session = self.Session() - try: - collector = session.query(Collector).filter_by(collector_id=collector_id).first() - if collector: - session.delete(collector) - session.commit() - LOGGER.info("Deleted Collector with collector_id: %s", collector_id) - else: - LOGGER.warning("Collector with collector_id %s not found", collector_id) - except SQLAlchemyError as e: - session.rollback() - LOGGER.error("Error deleting Collector with collector_id %s: %s", collector_id, e) - finally: - session.close() - LOGGER.info("delete_collector_by_id method execution finished.") - - -# Example Usage -def main(): - CRDB_SQL_PORT = "26257" - CRDB_DATABASE = "telemetryfrontend" - CRDB_USERNAME = "tfs" - CRDB_PASSWORD = "tfs123" - CRDB_SSLMODE = "require" - CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@127.0.0.1:{:s}/{:s}?sslmode={:s}' - crdb_uri = CRDB_URI_TEMPLATE.format( - CRDB_USERNAME, CRDB_PASSWORD, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) - # db_url = "cockroachdb://username:password@localhost:26257/" - # db_name = "yourdatabase" - db_manager = DatabaseManager(crdb_uri, CRDB_DATABASE) - - # Create database - db_manager.create_database() - - # Update db_url to include the new database name - db_manager.engine = create_engine(f"{crdb_uri}") - db_manager.Session = sessionmaker(bind=db_manager.engine) - - # Create tables - db_manager.create_tables() - - # Verify table creation - tables = db_manager.verify_table_creation() - LOGGER.info('Tables in the database: {:s}'.format(str(tables))) - - # Insert a row into the KPI table - kpi_data = { - 'kpi_id': '123e4567-e89b-12d3-a456-426614174100', - 'kpi_description': 'Sample KPI', - 'kpi_sample_type': 1, - 'device_id': 'device_1', - 'endpoint_id': 'endpoint_1', - 'service_id': 'service_1', - 'slice_id': 'slice_1', - 'connection_id': 'conn_1', - 'link_id': 'link_1' - } - db_manager.insert_row_kpi(kpi_data) - - # Insert a row into the Collector table - collector_data = { - 'collector_id': '123e4567-e89b-12d3-a456-426614174101', - 'kpi_id': '123e4567-e89b-12d3-a456-426614174000', - 'collector': 'Collector 1', - 'sampling_duration_s': 60.0, - 'sampling_interval_s': 10.0, - 'start_timestamp': 1625247600.0, - 'end_timestamp': 1625247660.0 - } - db_manager.insert_row_collector(collector_data) - - # Verify insertion into KPI table - kpi = db_manager.verify_insertion_kpi('123e4567-e89b-12d3-a456-426614174000') - print("Inserted KPI:", kpi) - - # Verify insertion into Collector table - collector = db_manager.verify_insertion_collector('123e4567-e89b-12d3-a456-426614174001') - print("Inserted Collector:", collector) - - # Get all rows from KPI table - all_kpi_rows = db_manager.get_all_kpi_rows() - LOGGER.info("All KPI Rows: %s", all_kpi_rows) - - # Get all rows from Collector table - all_collector_rows = db_manager.get_all_collector_rows() - LOGGER.info("All Collector Rows: %s", all_collector_rows) - - # Get filtered rows from KPI table - filtered_kpi_rows = db_manager.get_filtered_kpi_rows(kpi_description='Sample KPI') - LOGGER.info("Filtered KPI Rows: %s", filtered_kpi_rows) - - # Get filtered rows from Collector table - filtered_collector_rows = db_manager.get_filtered_collector_rows(collector='Collector 1') - LOGGER.info("Filtered Collector Rows: %s", filtered_collector_rows) - - # Delete a KPI by kpi_id - kpi_id_to_delete = '123e4567-e89b-12d3-a456-426614174000' - db_manager.delete_kpi_by_id(kpi_id_to_delete) - - # Delete a Collector by collector_id - collector_id_to_delete = '123e4567-e89b-12d3-a456-426614174001' - db_manager.delete_collector_by_id(collector_id_to_delete) diff --git a/src/telemetry/frontend/Dockerfile b/src/telemetry/frontend/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..7125d31fe74f7c44a52c2783369c2dc7a4a31160 --- /dev/null +++ b/src/telemetry/frontend/Dockerfile @@ -0,0 +1,70 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.9-slim + +# Install dependencies +RUN apt-get --yes --quiet --quiet update && \ + apt-get --yes --quiet --quiet install wget g++ git && \ + rm -rf /var/lib/apt/lists/* + +# Set Python to show logs as they occur +ENV PYTHONUNBUFFERED=0 + +# Download the gRPC health probe +RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \ + wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ + chmod +x /bin/grpc_health_probe + +# Get generic Python packages +RUN python3 -m pip install --upgrade pip +RUN python3 -m pip install --upgrade setuptools wheel +RUN python3 -m pip install --upgrade pip-tools + +# Get common Python packages +# Note: this step enables sharing the previous Docker build steps among all the Python components +WORKDIR /var/teraflow +COPY common_requirements.in common_requirements.in +RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in +RUN python3 -m pip install -r common_requirements.txt + +# Add common files into working directory +WORKDIR /var/teraflow/common +COPY src/common/. ./ +RUN rm -rf proto + +# Create proto sub-folder, copy .proto files, and generate Python code +RUN mkdir -p /var/teraflow/common/proto +WORKDIR /var/teraflow/common/proto +RUN touch __init__.py +COPY proto/*.proto ./ +RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto +RUN rm *.proto +RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; + +# Create component sub-folders, get specific Python packages +RUN mkdir -p /var/teraflow/telemetry/frontend +WORKDIR /var/teraflow/telemetry/frontend +COPY src/telemetry/frontend/requirements.in requirements.in +RUN pip-compile --quiet --output-file=requirements.txt requirements.in +RUN python3 -m pip install -r requirements.txt + +# Add component files into working directory +WORKDIR /var/teraflow +COPY src/telemetry/__init__.py telemetry/__init__.py +COPY src/telemetry/frontend/. telemetry/frontend/ +COPY src/telemetry/database/. telemetry/database/ + +# Start the service +ENTRYPOINT ["python", "-m", "telemetry.frontend.service"] diff --git a/src/telemetry/database/tests/__init__.py b/src/telemetry/frontend/requirements.in similarity index 79% rename from src/telemetry/database/tests/__init__.py rename to src/telemetry/frontend/requirements.in index 839e45e3b646bc60de7edd81fcfb91b7b38feadf..231dc04e820387c95ffea72cbe67b9f0a9a0865a 100644 --- a/src/telemetry/database/tests/__init__.py +++ b/src/telemetry/frontend/requirements.in @@ -10,4 +10,10 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file +# limitations under the License. + +confluent-kafka==2.3.* +psycopg2-binary==2.9.* +SQLAlchemy==1.4.* +sqlalchemy-cockroachdb==1.4.* +SQLAlchemy-Utils==0.38.* diff --git a/src/telemetry/frontend/service/TelemetryFrontendService.py b/src/telemetry/frontend/service/TelemetryFrontendService.py index dc3f8df363a882db0f0ba3112a38f3bba3921c30..abd361aa0082e2de1d1f5fa7e81a336f3091af9a 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendService.py +++ b/src/telemetry/frontend/service/TelemetryFrontendService.py @@ -14,17 +14,16 @@ from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc -from monitoring.service.NameMapping import NameMapping from common.tools.service.GenericGrpcService import GenericGrpcService from common.proto.telemetry_frontend_pb2_grpc import add_TelemetryFrontendServiceServicer_to_server from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl class TelemetryFrontendService(GenericGrpcService): - def __init__(self, name_mapping : NameMapping, cls_name: str = __name__) -> None: + def __init__(self, cls_name: str = __name__) -> None: port = get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND) super().__init__(port, cls_name=cls_name) - self.telemetry_frontend_servicer = TelemetryFrontendServiceServicerImpl(name_mapping) + self.telemetry_frontend_servicer = TelemetryFrontendServiceServicerImpl() def install_servicers(self): add_TelemetryFrontendServiceServicer_to_server(self.telemetry_frontend_servicer, self.server) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index e6830ad676d3934c88b01575ebdd1d0549fb00d1..2b872dba33bbe1434b68d5b5d2449e0b228312f7 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -12,126 +12,160 @@ # See the License for the specific language governing permissions and # limitations under the License. -import ast +import json import threading -import time -from typing import Tuple, Any +from typing import Any, Dict import grpc import logging -from confluent_kafka import Consumer as KafkaConsumer +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.context_pb2 import Empty -from monitoring.service.NameMapping import NameMapping -from confluent_kafka import Producer as KafkaProducer -from confluent_kafka import KafkaException -from confluent_kafka import KafkaError from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, CollectorFilter, CollectorList -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer from telemetry.database.TelemetryModel import Collector as CollectorModel -from telemetry.database.managementDB import managementDB +from telemetry.database.Telemetry_DB import TelemetryDB + +from confluent_kafka import Consumer as KafkaConsumer +from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import KafkaError + LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('Monitoring', 'TelemetryFrontend') -KAFKA_SERVER_IP = '127.0.0.1:9092' -ACTIVE_COLLECTORS = [] -KAFKA_TOPICS = {'request' : 'topic_request', - 'response': 'topic_response'} +METRICS_POOL = MetricsPool('TelemetryFrontend', 'NBIgRPC') +ACTIVE_COLLECTORS = [] # keep and can be populated from DB class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): - def __init__(self, name_mapping : NameMapping): + def __init__(self): LOGGER.info('Init TelemetryFrontendService') - self.managementDBobj = managementDB() - self.kafka_producer = KafkaProducer({'bootstrap.servers': KAFKA_SERVER_IP,}) - self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KAFKA_SERVER_IP, - 'group.id' : 'frontend', - 'auto.offset.reset' : 'latest'}) - - def add_collector_to_db(self, request: Collector ): # type: ignore - try: - # Create a new Collector instance - collector_to_insert = CollectorModel() - collector_to_insert.collector_id = request.collector_id.collector_id.uuid - collector_to_insert.kpi_id = request.kpi_id.kpi_id.uuid - # collector_to_insert.collector_decription= request.collector - collector_to_insert.sampling_duration_s = request.duration_s - collector_to_insert.sampling_interval_s = request.interval_s - collector_to_insert.start_timestamp = time.time() - collector_to_insert.end_timestamp = time.time() - managementDB.add_row_to_db(collector_to_insert) - except Exception as e: - LOGGER.info("Unable to create collectorModel class object. {:}".format(e)) + self.tele_db_obj = TelemetryDB() + self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'frontend', + 'auto.offset.reset' : 'latest'}) - # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartCollector(self, request : Collector, grpc_context: grpc.ServicerContext # type: ignore ) -> CollectorId: # type: ignore - # push info to frontend db LOGGER.info ("gRPC message: {:}".format(request)) response = CollectorId() - _collector_id = str(request.collector_id.collector_id.uuid) - _collector_kpi_id = str(request.kpi_id.kpi_id.uuid) - _collector_duration = int(request.duration_s) - _collector_interval = int(request.interval_s) - # pushing Collector to DB - self.add_collector_to_db(request) - self.publish_to_kafka_request_topic(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) - # self.run_publish_to_kafka_request_topic(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) - response.collector_id.uuid = request.collector_id.collector_id.uuid # type: ignore + + # TODO: Verify the presence of Kpi ID in KpiDB or assume that KPI ID already exists? + self.tele_db_obj.add_row_to_db( + CollectorModel.ConvertCollectorToRow(request) + ) + self.PublishStartRequestOnKafka(request) + + response.collector_id.uuid = request.collector_id.collector_id.uuid return response - - def run_publish_to_kafka_request_topic(self, msg_key: str, kpi: str, duration : int, interval: int): - # Add threading.Thread() response to dictonary and call start() in the next statement - threading.Thread(target=self.publish_to_kafka_request_topic, args=(msg_key, kpi, duration, interval)).start() - - def publish_to_kafka_request_topic(self, - collector_id: str, kpi: str, duration : int, interval: int - ): + + def PublishStartRequestOnKafka(self, collector_obj): """ - Method to generate collector request to Kafka topic. + Method to generate collector request on Kafka. """ - # time.sleep(5) - # producer_configs = { - # 'bootstrap.servers': KAFKA_SERVER_IP, - # } - # topic_request = "topic_request" - msg_value : Tuple [str, int, int] = (kpi, duration, interval) - # print ("Request generated: ", "Colletcor Id: ", collector_id, \ - # ", \nKPI: ", kpi, ", Duration: ", duration, ", Interval: ", interval) - # producerObj = KafkaProducer(producer_configs) - self.kafka_producer.produce(KAFKA_TOPICS['request'], key=collector_id, value= str(msg_value), callback=self.delivery_callback) - # producerObj.produce(KAFKA_TOPICS['request'], key=collector_id, value= str(msg_value), callback=self.delivery_callback) - LOGGER.info("Collector Request Generated: {:}, {:}, {:}, {:}".format(collector_id, kpi, duration, interval)) - # producerObj.produce(topic_request, key=collector_id, value= str(msg_value), callback=self.delivery_callback) - ACTIVE_COLLECTORS.append(collector_id) + collector_uuid = collector_obj.collector_id.collector_id.uuid + collector_to_generate : Dict = { + "kpi_id" : collector_obj.kpi_id.kpi_id.uuid, + "duration": collector_obj.duration_s, + "interval": collector_obj.interval_s + } + self.kafka_producer.produce( + KafkaTopic.REQUEST.value, + key = collector_uuid, + value = json.dumps(collector_to_generate), + callback = self.delivery_callback + ) + LOGGER.info("Collector Request Generated: Collector Id: {:}, Value: {:}".format(collector_uuid, collector_to_generate)) + ACTIVE_COLLECTORS.append(collector_uuid) self.kafka_producer.flush() - def run_kafka_listener(self): - # print ("--- STARTED: run_kafka_listener ---") - threading.Thread(target=self.kafka_listener).start() + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def StopCollector(self, + request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore + ) -> Empty: # type: ignore + LOGGER.info ("gRPC message: {:}".format(request)) + self.PublishStopRequestOnKafka(request) + return Empty() + + def PublishStopRequestOnKafka(self, collector_id): + """ + Method to generate stop collector request on Kafka. + """ + collector_uuid = collector_id.collector_id.uuid + collector_to_stop : Dict = { + "kpi_id" : collector_uuid, + "duration": -1, + "interval": -1 + } + self.kafka_producer.produce( + KafkaTopic.REQUEST.value, + key = collector_uuid, + value = json.dumps(collector_to_stop), + callback = self.delivery_callback + ) + LOGGER.info("Collector Stop Request Generated: Collector Id: {:}, Value: {:}".format(collector_uuid, collector_to_stop)) + try: + ACTIVE_COLLECTORS.remove(collector_uuid) + except ValueError: + LOGGER.warning('Collector ID {:} not found in active collector list'.format(collector_uuid)) + self.kafka_producer.flush() + + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def SelectCollectors(self, + request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore + ) -> CollectorList: # type: ignore + LOGGER.info("gRPC message: {:}".format(request)) + response = CollectorList() + + try: + rows = self.tele_db_obj.select_with_filter(CollectorModel, request) + except Exception as e: + LOGGER.info('Unable to apply filter on kpi descriptor. {:}'.format(e)) + try: + for row in rows: + collector_obj = CollectorModel.ConvertRowToCollector(row) + response.collector_list.append(collector_obj) + return response + except Exception as e: + LOGGER.info('Unable to process filter response {:}'.format(e)) + + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def delivery_callback(self, err, msg): + """ + Callback function to handle message delivery status. + Args: + err (KafkaError): Kafka error object. + msg (Message): Kafka message object. + """ + if err: + LOGGER.debug('Message delivery failed: {:}'.format(err)) + print('Message delivery failed: {:}'.format(err)) + # else: + # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) + # print('Message delivered to topic {:}'.format(msg.topic())) + + # ---------- Independent Method --------------- + # Listener method is independent of any method (same lifetime as service) + # continously listens for responses + def RunResponseListener(self): + threading.Thread(target=self.ResponseListener).start() return True - def kafka_listener(self): + def ResponseListener(self): """ listener for response on Kafka topic. """ - # # print ("--- STARTED: kafka_listener ---") - # conusmer_configs = { - # 'bootstrap.servers' : KAFKA_SERVER_IP, - # 'group.id' : 'frontend', - # 'auto.offset.reset' : 'latest' - # } - # # topic_response = "topic_response" - - # consumerObj = KafkaConsumer(conusmer_configs) - self.kafka_consumer.subscribe([KAFKA_TOPICS['response']]) - # print (time.time()) + self.kafka_consumer.subscribe([KafkaTopic.RESPONSE.value]) while True: receive_msg = self.kafka_consumer.poll(2.0) if receive_msg is None: - # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['response']) # added for debugging purposes continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: @@ -142,63 +176,16 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): try: collector_id = receive_msg.key().decode('utf-8') if collector_id in ACTIVE_COLLECTORS: - (kpi_id, kpi_value) = ast.literal_eval(receive_msg.value().decode('utf-8')) - self.process_response(collector_id, kpi_id, kpi_value) + kpi_value = json.loads(receive_msg.value().decode('utf-8')) + self.process_response(collector_id, kpi_value['kpi_id'], kpi_value['kpi_value']) else: print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") except Exception as e: - print(f"No message key found: {str(e)}") + print(f"Error extarcting msg key or value: {str(e)}") continue - # return None def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any): if kpi_id == "-1" and kpi_value == -1: - # LOGGER.info("Sucessfully terminated Collector: {:}".format(collector_id)) - print ("Sucessfully terminated Collector: ", collector_id) - else: - print ("Frontend-Received values Collector Id:", collector_id, "-KPI:", kpi_id, "-VALUE:", kpi_value) - - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def delivery_callback(self, err, msg): - """ - Callback function to handle message delivery status. - Args: - err (KafkaError): Kafka error object. - msg (Message): Kafka message object. - """ - if err: - print(f'Message delivery failed: {err}') + print ("Backend termination confirmation for collector id: ", collector_id) else: - print(f'Message delivered to topic {msg.topic()}') - - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def StopCollector(self, - request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore - ) -> Empty: # type: ignore - LOGGER.info ("gRPC message: {:}".format(request)) - _collector_id = request.collector_id.uuid - self.publish_to_kafka_request_topic(_collector_id, "", -1, -1) - return Empty() - - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def SelectCollectors(self, - request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore - ) -> CollectorList: # type: ignore - LOGGER.info("gRPC message: {:}".format(request)) - response = CollectorList() - filter_to_apply = dict() - filter_to_apply['kpi_id'] = request.kpi_id[0].kpi_id.uuid - # filter_to_apply['duration_s'] = request.duration_s[0] - try: - rows = self.managementDBobj.select_with_filter(CollectorModel, **filter_to_apply) - except Exception as e: - LOGGER.info('Unable to apply filter on kpi descriptor. {:}'.format(e)) - try: - if len(rows) != 0: - for row in rows: - collector_obj = Collector() - collector_obj.collector_id.collector_id.uuid = row.collector_id - response.collector_list.append(collector_obj) - return response - except Exception as e: - LOGGER.info('Unable to process response {:}'.format(e)) \ No newline at end of file + print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value) diff --git a/src/telemetry/frontend/service/__main__.py b/src/telemetry/frontend/service/__main__.py index 3b0263706c3dad3756306d1ba8a3a104d568cd6f..2a6c5dbcf2da6b6a074c2b8ee23791bc4896442f 100644 --- a/src/telemetry/frontend/service/__main__.py +++ b/src/telemetry/frontend/service/__main__.py @@ -12,16 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import signal -import sys -import logging, threading +import logging, signal, sys, threading from prometheus_client import start_http_server -from monitoring.service.NameMapping import NameMapping +from common.Settings import get_log_level, get_metrics_port from .TelemetryFrontendService import TelemetryFrontendService -from monitoring.service.EventTools import EventsDeviceCollector -from common.Settings import ( - get_log_level, wait_for_environment_variables, get_env_var_name, - get_metrics_port ) terminate = threading.Event() LOGGER = None @@ -31,20 +25,12 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name terminate.set() def main(): - global LOGGER + global LOGGER # pylint: disable=global-statement log_level = get_log_level() logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") LOGGER = logging.getLogger(__name__) -# ------- will be added later -------------- - # wait_for_environment_variables([ - # get_env_var_name - - - # ]) -# ------- will be added later -------------- - signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @@ -54,9 +40,7 @@ def main(): metrics_port = get_metrics_port() start_http_server(metrics_port) - name_mapping = NameMapping() - - grpc_service = TelemetryFrontendService(name_mapping) + grpc_service = TelemetryFrontendService() grpc_service.start() # Wait for Ctrl+C or termination signal @@ -69,4 +53,4 @@ def main(): return 0 if __name__ == '__main__': - sys.exit(main()) \ No newline at end of file + sys.exit(main()) diff --git a/src/telemetry/frontend/tests/Messages.py b/src/telemetry/frontend/tests/Messages.py index 1205898d13a610cd262979242e4f489f5e35cdb8..a0e93e8a121b9efaac83f7169419911c8ee6e3ea 100644 --- a/src/telemetry/frontend/tests/Messages.py +++ b/src/telemetry/frontend/tests/Messages.py @@ -16,68 +16,27 @@ import uuid import random from common.proto import telemetry_frontend_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType - +from common.proto.kpi_manager_pb2 import KpiId # ----------------------- "2nd" Iteration -------------------------------- def create_collector_id(): _collector_id = telemetry_frontend_pb2.CollectorId() - _collector_id.collector_id.uuid = uuid.uuid4() + # _collector_id.collector_id.uuid = str(uuid.uuid4()) + _collector_id.collector_id.uuid = "5d45f53f-d567-429f-9427-9196ac72ff0c" return _collector_id -# def create_collector_id_a(coll_id_str : str): -# _collector_id = telemetry_frontend_pb2.CollectorId() -# _collector_id.collector_id.uuid = str(coll_id_str) -# return _collector_id - def create_collector_request(): _create_collector_request = telemetry_frontend_pb2.Collector() _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) - _create_collector_request.kpi_id.kpi_id.uuid = "165d20c5-a446-42fa-812f-e2b7ed283c6f" - # _create_collector_request.collector = "collector description" + _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_collector_request.duration_s = float(random.randint(8, 16)) _create_collector_request.interval_s = float(random.randint(2, 4)) return _create_collector_request def create_collector_filter(): _create_collector_filter = telemetry_frontend_pb2.CollectorFilter() - new_kpi_id = _create_collector_filter.kpi_id.add() - new_kpi_id.kpi_id.uuid = "165d20c5-a446-42fa-812f-e2b7ed283c6f" + kpi_id_obj = KpiId() + # kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) + kpi_id_obj.kpi_id.uuid = "a7237fa3-caf4-479d-84b6-4d9f9738fb7f" + _create_collector_filter.kpi_id.append(kpi_id_obj) return _create_collector_filter - -# ----------------------- "First" Iteration -------------------------------- -# def create_collector_request_a(): -# _create_collector_request_a = telemetry_frontend_pb2.Collector() -# _create_collector_request_a.collector_id.collector_id.uuid = "-1" -# return _create_collector_request_a - -# def create_collector_request_b(str_kpi_id, coll_duration_s, coll_interval_s -# ) -> telemetry_frontend_pb2.Collector: -# _create_collector_request_b = telemetry_frontend_pb2.Collector() -# _create_collector_request_b.collector_id.collector_id.uuid = '1' -# _create_collector_request_b.kpi_id.kpi_id.uuid = str_kpi_id -# _create_collector_request_b.duration_s = coll_duration_s -# _create_collector_request_b.interval_s = coll_interval_s -# return _create_collector_request_b - -# def create_collector_filter(): -# _create_collector_filter = telemetry_frontend_pb2.CollectorFilter() -# new_collector_id = _create_collector_filter.collector_id.add() -# new_collector_id.collector_id.uuid = "COLL1" -# new_kpi_id = _create_collector_filter.kpi_id.add() -# new_kpi_id.kpi_id.uuid = "KPI1" -# new_device_id = _create_collector_filter.device_id.add() -# new_device_id.device_uuid.uuid = 'DEV1' -# new_service_id = _create_collector_filter.service_id.add() -# new_service_id.service_uuid.uuid = 'SERV1' -# new_slice_id = _create_collector_filter.slice_id.add() -# new_slice_id.slice_uuid.uuid = 'SLC1' -# new_endpoint_id = _create_collector_filter.endpoint_id.add() -# new_endpoint_id.endpoint_uuid.uuid = 'END1' -# new_connection_id = _create_collector_filter.connection_id.add() -# new_connection_id.connection_uuid.uuid = 'CON1' -# _create_collector_filter.kpi_sample_type.append(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED) -# return _create_collector_filter - -# def create_collector_list(): -# _create_collector_list = telemetry_frontend_pb2.CollectorList() -# return _create_collector_list \ No newline at end of file diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index 002cc430721845aa5aa18274375e2c22b5d77ff7..9c3f9d3a8f545792eb2bb3a371c6c20664d24f69 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -13,129 +13,40 @@ # limitations under the License. import os -import time import pytest import logging -from typing import Union -from common.proto.context_pb2 import Empty from common.Constants import ServiceNameEnum from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList -from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server -from context.client.ContextClient import ContextClient -from common.tools.service.GenericGrpcService import GenericGrpcService -from common.tests.MockServicerImpl_Context import MockServicerImpl_Context +from common.proto.context_pb2 import Empty +from common.tools.kafka.Variables import KafkaTopic from common.Settings import ( get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC) from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService +from telemetry.frontend.tests.Messages import ( + create_collector_request, create_collector_id, create_collector_filter) from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl -from telemetry.frontend.tests.Messages import ( create_collector_request, create_collector_filter) -from telemetry.database.managementDB import managementDB -from telemetry.database.TelemetryEngine import TelemetryEngine - -from device.client.DeviceClient import DeviceClient -from device.service.DeviceService import DeviceService -from device.service.driver_api.DriverFactory import DriverFactory -from device.service.driver_api.DriverInstanceCache import DriverInstanceCache -from monitoring.service.NameMapping import NameMapping - -os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE' -from device.service.drivers import DRIVERS ########################### # Tests Setup ########################### LOCAL_HOST = '127.0.0.1' -MOCKSERVICE_PORT = 10000 -TELEMETRY_FRONTEND_PORT = str(MOCKSERVICE_PORT) + str(get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND)) +TELEMETRY_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND)) os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(TELEMETRY_FRONTEND_PORT) LOGGER = logging.getLogger(__name__) -class MockContextService(GenericGrpcService): - # Mock Service implementing Context to simplify unitary tests of Monitoring - - def __init__(self, bind_port: Union[str, int]) -> None: - super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService') - - # pylint: disable=attribute-defined-outside-init - def install_servicers(self): - self.context_servicer = MockServicerImpl_Context() - add_ContextServiceServicer_to_server(self.context_servicer, self.server) - @pytest.fixture(scope='session') -def context_service(): - LOGGER.info('Initializing MockContextService...') - _service = MockContextService(MOCKSERVICE_PORT) - _service.start() - - LOGGER.info('Yielding MockContextService...') - yield _service - - LOGGER.info('Terminating MockContextService...') - _service.context_servicer.msg_broker.terminate() - _service.stop() - - LOGGER.info('Terminated MockContextService...') - -@pytest.fixture(scope='session') -def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument - LOGGER.info('Initializing ContextClient...') - _client = ContextClient() - - LOGGER.info('Yielding ContextClient...') - yield _client - - LOGGER.info('Closing ContextClient...') - _client.close() - - LOGGER.info('Closed ContextClient...') - -@pytest.fixture(scope='session') -def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument - LOGGER.info('Initializing DeviceService...') - driver_factory = DriverFactory(DRIVERS) - driver_instance_cache = DriverInstanceCache(driver_factory) - _service = DeviceService(driver_instance_cache) - _service.start() - - # yield the server, when test finishes, execution will resume to stop it - LOGGER.info('Yielding DeviceService...') - yield _service - - LOGGER.info('Terminating DeviceService...') - _service.stop() - - LOGGER.info('Terminated DeviceService...') - -@pytest.fixture(scope='session') -def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument - LOGGER.info('Initializing DeviceClient...') - _client = DeviceClient() - - LOGGER.info('Yielding DeviceClient...') - yield _client - - LOGGER.info('Closing DeviceClient...') - _client.close() - - LOGGER.info('Closed DeviceClient...') - -@pytest.fixture(scope='session') -def telemetryFrontend_service( - context_service : MockContextService, - device_service : DeviceService - ): +def telemetryFrontend_service(): LOGGER.info('Initializing TelemetryFrontendService...') - name_mapping = NameMapping() - _service = TelemetryFrontendService(name_mapping) + _service = TelemetryFrontendService() _service.start() # yield the server, when test finishes, execution will resume to stop it @@ -168,37 +79,73 @@ def telemetryFrontend_client( # Tests Implementation of Telemetry Frontend ########################### -def test_verify_db_and_table(): - LOGGER.info(' >>> test_verify_database_and_tables START: <<< ') - _engine = TelemetryEngine.get_engine() - managementDB.create_database(_engine) - managementDB.create_tables(_engine) +# ------- Re-structuring Test --------- +# --- "test_validate_kafka_topics" should be run before the functionality tests --- +def test_validate_kafka_topics(): + LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) +# ----- core funtionality test ----- def test_StartCollector(telemetryFrontend_client): LOGGER.info(' >>> test_StartCollector START: <<< ') response = telemetryFrontend_client.StartCollector(create_collector_request()) LOGGER.debug(str(response)) assert isinstance(response, CollectorId) -def test_run_kafka_listener(): - LOGGER.info(' >>> test_run_kafka_listener START: <<< ') - name_mapping = NameMapping() - TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping) - response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto - LOGGER.debug(str(response)) - assert isinstance(response, bool) - def test_StopCollector(telemetryFrontend_client): LOGGER.info(' >>> test_StopCollector START: <<< ') - _collector_id = telemetryFrontend_client.StartCollector(create_collector_request()) - time.sleep(3) # wait for small amount before call the stopCollecter() - response = telemetryFrontend_client.StopCollector(_collector_id) + response = telemetryFrontend_client.StopCollector(create_collector_id()) LOGGER.debug(str(response)) assert isinstance(response, Empty) -def test_select_collectors(telemetryFrontend_client): - LOGGER.info(' >>> test_select_collector requesting <<< ') +def test_SelectCollectors(telemetryFrontend_client): + LOGGER.info(' >>> test_SelectCollectors START: <<< ') response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) - LOGGER.info('Received Rows after applying Filter: {:} '.format(response)) LOGGER.debug(str(response)) - assert isinstance(response, CollectorList) \ No newline at end of file + assert isinstance(response, CollectorList) + +# ----- Non-gRPC method tests ----- +def test_RunResponseListener(): + LOGGER.info(' >>> test_RunResponseListener START: <<< ') + TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl() + response = TelemetryFrontendServiceObj.RunResponseListener() # becasue Method "run_kafka_listener" is not define in frontend.proto + LOGGER.debug(str(response)) + assert isinstance(response, bool) + +# ------- previous test ---------------- + +# def test_verify_db_and_table(): +# LOGGER.info(' >>> test_verify_database_and_tables START: <<< ') +# _engine = TelemetryEngine.get_engine() +# managementDB.create_database(_engine) +# managementDB.create_tables(_engine) + +# def test_StartCollector(telemetryFrontend_client): +# LOGGER.info(' >>> test_StartCollector START: <<< ') +# response = telemetryFrontend_client.StartCollector(create_collector_request()) +# LOGGER.debug(str(response)) +# assert isinstance(response, CollectorId) + +# def test_run_kafka_listener(): +# LOGGER.info(' >>> test_run_kafka_listener START: <<< ') +# name_mapping = NameMapping() +# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping) +# response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto +# LOGGER.debug(str(response)) +# assert isinstance(response, bool) + +# def test_StopCollector(telemetryFrontend_client): +# LOGGER.info(' >>> test_StopCollector START: <<< ') +# _collector_id = telemetryFrontend_client.StartCollector(create_collector_request()) +# time.sleep(3) # wait for small amount before call the stopCollecter() +# response = telemetryFrontend_client.StopCollector(_collector_id) +# LOGGER.debug(str(response)) +# assert isinstance(response, Empty) + +# def test_select_collectors(telemetryFrontend_client): +# LOGGER.info(' >>> test_select_collector requesting <<< ') +# response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) +# LOGGER.info('Received Rows after applying Filter: {:} '.format(response)) +# LOGGER.debug(str(response)) +# assert isinstance(response, CollectorList) \ No newline at end of file diff --git a/src/telemetry/telemetry_virenv.txt b/src/telemetry/telemetry_virenv.txt deleted file mode 100644 index e39f80b6593d6c41411751cdd0ea59ee05344570..0000000000000000000000000000000000000000 --- a/src/telemetry/telemetry_virenv.txt +++ /dev/null @@ -1,49 +0,0 @@ -anytree==2.8.0 -APScheduler==3.10.1 -attrs==23.2.0 -certifi==2024.2.2 -charset-normalizer==2.0.12 -colorama==0.4.6 -confluent-kafka==2.3.0 -coverage==6.3 -future-fstrings==1.2.0 -greenlet==3.0.3 -grpcio==1.47.5 -grpcio-health-checking==1.47.5 -grpcio-tools==1.47.5 -grpclib==0.4.4 -h2==4.1.0 -hpack==4.0.0 -hyperframe==6.0.1 -idna==3.7 -influx-line-protocol==0.1.4 -iniconfig==2.0.0 -kafka-python==2.0.2 -multidict==6.0.5 -networkx==3.3 -packaging==24.0 -pluggy==1.5.0 -prettytable==3.5.0 -prometheus-client==0.13.0 -protobuf==3.20.3 -psycopg2-binary==2.9.3 -py==1.11.0 -py-cpuinfo==9.0.0 -pytest==6.2.5 -pytest-benchmark==3.4.1 -pytest-depends==1.0.1 -python-dateutil==2.8.2 -python-json-logger==2.0.2 -pytz==2024.1 -questdb==1.0.1 -requests==2.27.1 -six==1.16.0 -SQLAlchemy==1.4.52 -sqlalchemy-cockroachdb==1.4.4 -SQLAlchemy-Utils==0.38.3 -toml==0.10.2 -typing_extensions==4.12.0 -tzlocal==5.2 -urllib3==1.26.18 -wcwidth==0.2.13 -xmltodict==0.12.0 diff --git a/src/telemetry/database/tests/messages.py b/src/telemetry/tests/messages.py similarity index 100% rename from src/telemetry/database/tests/messages.py rename to src/telemetry/tests/messages.py diff --git a/scripts/run_tests_locally-telemetry-mgtDB.sh b/src/telemetry/tests/test_telemetryDB.py old mode 100755 new mode 100644 similarity index 59% rename from scripts/run_tests_locally-telemetry-mgtDB.sh rename to src/telemetry/tests/test_telemetryDB.py index 8b68104eaf343b57ec4953334cda37167cca3529..c4976f8c2144fcdcad43a3e25d43091010de0d18 --- a/scripts/run_tests_locally-telemetry-mgtDB.sh +++ b/src/telemetry/tests/test_telemetryDB.py @@ -1,4 +1,3 @@ -#!/bin/bash # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,13 +13,16 @@ # limitations under the License. -PROJECTDIR=`pwd` +import logging +from telemetry.database.Telemetry_DB import TelemetryDB -cd $PROJECTDIR/src -# RCFILE=$PROJECTDIR/coverage/.coveragerc -# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ -# kpi_manager/tests/test_unitary.py +LOGGER = logging.getLogger(__name__) -RCFILE=$PROJECTDIR/coverage/.coveragerc -python3 -m pytest --log-cli-level=INFO --verbose \ - telemetry/database/tests/managementDBtests.py +def test_verify_databases_and_tables(): + LOGGER.info('>>> test_verify_databases_and_tables : START <<< ') + TelemetryDBobj = TelemetryDB() + TelemetryDBobj.drop_database() + TelemetryDBobj.verify_tables() + TelemetryDBobj.create_database() + TelemetryDBobj.create_tables() + TelemetryDBobj.verify_tables() \ No newline at end of file