diff --git a/deploy/kafka.sh b/deploy/kafka.sh new file mode 100755 index 0000000000000000000000000000000000000000..f2fb666b545b86e36d7647a4e4e1de19731caa8d --- /dev/null +++ b/deploy/kafka.sh @@ -0,0 +1,69 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +######################################################################################################################## +# Read deployment settings +######################################################################################################################## + +# If not already set, set the namespace where Apache Kafka will be deployed. +export KFK_NAMESPACE=${KFK_NAMESPACE:-"kafka"} + + +######################################################################################################################## +# Automated steps start here +######################################################################################################################## + +# Constants +TMP_FOLDER="./tmp" +KFK_MANIFESTS_PATH="manifests/kafka" +KFK_ZOOKEEPER_MANIFEST="01-zookeeper.yaml" +KFK_MANIFEST="02-kafka.yaml" + +# Create a tmp folder for files modified during the deployment +TMP_MANIFESTS_FOLDER="${TMP_FOLDER}/${KFK_NAMESPACE}/manifests" +mkdir -p ${TMP_MANIFESTS_FOLDER} + +# copy zookeeper and kafka manifest files to temporary manifest location +cp "${KFK_MANIFESTS_PATH}/${KFK_ZOOKEEPER_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}" +cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}" + +echo "Apache Kafka Namespace" +echo ">>> Delete Apache Kafka Namespace" +kubectl delete namespace ${KFK_NAMESPACE} --ignore-not-found + +echo ">>> Create Apache Kafka Namespace" +kubectl create namespace ${KFK_NAMESPACE} + +echo ">>> Deplying Apache Kafka Zookeeper" +# Kafka zookeeper service should be deployed before the kafka service +kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}" + +KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically +KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}') + +# Kafka service should be deployed after the zookeeper service +sed -i "s//${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST" + +echo ">>> Deploying Apache Kafka Broker" +kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST" + +echo ">>> Verifing Apache Kafka deployment" +sleep 5 +KFK_PODS_STATUS=$(kubectl --namespace ${KFK_NAMESPACE} get pods) +if echo "$KFK_PODS_STATUS" | grep -qEv 'STATUS|Running'; then + echo "Deployment Error: $KFK_PODS_STATUS" +else + echo "$KFK_PODS_STATUS" +fi \ No newline at end of file diff --git a/manifests/kafka/01-zookeeper.yaml b/manifests/kafka/01-zookeeper.yaml new file mode 100644 index 0000000000000000000000000000000000000000..0f5ade5d945f7be2ede9d5ec0480a755bd7795c2 --- /dev/null +++ b/manifests/kafka/01-zookeeper.yaml @@ -0,0 +1,40 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + app: zookeeper-service + name: zookeeper-service + namespace: kafka +spec: + type: NodePort + ports: + - name: zookeeper-port + port: 2181 + nodePort: 30181 + targetPort: 2181 + selector: + app: zookeeper +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: zookeeper + name: zookeeper + namespace: kafka +spec: + replicas: 1 + selector: + matchLabels: + app: zookeeper + template: + metadata: + labels: + app: zookeeper + spec: + containers: + - image: wurstmeister/zookeeper + imagePullPolicy: IfNotPresent + name: zookeeper + ports: + - containerPort: 2181 \ No newline at end of file diff --git a/manifests/kafka/02-kafka.yaml b/manifests/kafka/02-kafka.yaml new file mode 100644 index 0000000000000000000000000000000000000000..8a2b51724f4ca3687cc695bd85a99d14368b9279 --- /dev/null +++ b/manifests/kafka/02-kafka.yaml @@ -0,0 +1,46 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + app: kafka-broker + name: kafka-service + namespace: kafka +spec: + ports: + - port: 9092 + selector: + app: kafka-broker +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: kafka-broker + name: kafka-broker + namespace: kafka +spec: + replicas: 1 + selector: + matchLabels: + app: kafka-broker + template: + metadata: + labels: + app: kafka-broker + spec: + hostname: kafka-broker + containers: + - env: + - name: KAFKA_BROKER_ID + value: "1" + - name: KAFKA_ZOOKEEPER_CONNECT + value: :2181 + - name: KAFKA_LISTENERS + value: PLAINTEXT://:9092 + - name: KAFKA_ADVERTISED_LISTENERS + value: PLAINTEXT://localhost:9092 + image: wurstmeister/kafka + imagePullPolicy: IfNotPresent + name: kafka-broker + ports: + - containerPort: 9092 \ No newline at end of file diff --git a/manifests/mock_nodeexporter.yaml b/manifests/mock_nodeexporter.yaml new file mode 100644 index 0000000000000000000000000000000000000000..bf595d63a22fc5fdcf0a04d706d8fd140c6f2e6e --- /dev/null +++ b/manifests/mock_nodeexporter.yaml @@ -0,0 +1,21 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: node-exporter + labels: + app: node-exporter +spec: + replicas: 1 + selector: + matchLabels: + app: node-exporter + template: + metadata: + labels: + app: node-exporter + spec: + containers: + - name: node-exporter + image: prom/node-exporter:latest + ports: + - containerPort: 9100 diff --git a/manifests/mock_nodeexporterservice.yaml b/manifests/mock_nodeexporterservice.yaml new file mode 100644 index 0000000000000000000000000000000000000000..b7bb4f879c3cdc029ac5f106ebfb0f845c03307c --- /dev/null +++ b/manifests/mock_nodeexporterservice.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + name: node-exporter +spec: + selector: + app: node-exporter + ports: + - protocol: TCP + port: 9100 + targetPort: 9100 + type: NodePort diff --git a/my_deploy.sh b/my_deploy.sh index 0fcb51f90509732452620126c9062597a17735ed..74c293619bb9ea28c13d3afc6dabde61cd2aa53b 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -23,7 +23,7 @@ export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_generator" # Uncomment to activate Monitoring -#export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring" +export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring" # Uncomment to activate ZTP #export TFS_COMPONENTS="${TFS_COMPONENTS} ztp" @@ -44,7 +44,7 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene #export TFS_COMPONENTS="${TFS_COMPONENTS} forecaster" # Uncomment to activate E2E Orchestrator -#export TFS_COMPONENTS="${TFS_COMPONENTS} e2e_orchestrator" +export TFS_COMPONENTS="${TFS_COMPONENTS} e2e_orchestrator" # Set the tag you want to use for your images. export TFS_IMAGE_TAG="dev" @@ -93,7 +93,7 @@ export CRDB_DATABASE="tfs" export CRDB_DEPLOY_MODE="single" # Disable flag for dropping database, if it exists. -export CRDB_DROP_DATABASE_IF_EXISTS="" +export CRDB_DROP_DATABASE_IF_EXISTS="YES" # Disable flag for re-deploying CockroachDB from scratch. export CRDB_REDEPLOY="" @@ -141,7 +141,7 @@ export QDB_TABLE_MONITORING_KPIS="tfs_monitoring_kpis" export QDB_TABLE_SLICE_GROUPS="tfs_slice_groups" # Disable flag for dropping tables if they exist. -export QDB_DROP_TABLES_IF_EXIST="" +export QDB_DROP_TABLES_IF_EXIST="YES" # Disable flag for re-deploying QuestDB from scratch. export QDB_REDEPLOY="" @@ -154,3 +154,10 @@ export PROM_EXT_PORT_HTTP="9090" # Set the external port Grafana HTTP Dashboards will be exposed to. export GRAF_EXT_PORT_HTTP="3000" + + +# ----- Apache Kafka ----------------------------------------------------------- + +# Set the namespace where Apache Kafka will be deployed. +export KFK_NAMESPACE="kafka" + diff --git a/proto/device.proto b/proto/device.proto index 30e60079db6c1eb8641d10115f6f43840eabf39c..98cca8ce937d4bccdb5076a7d26e9a27388641be 100644 --- a/proto/device.proto +++ b/proto/device.proto @@ -16,7 +16,8 @@ syntax = "proto3"; package device; import "context.proto"; -import "monitoring.proto"; +//import "monitoring.proto"; +import "kpi_manager.proto"; service DeviceService { rpc AddDevice (context.Device ) returns (context.DeviceId ) {} @@ -27,8 +28,8 @@ service DeviceService { } message MonitoringSettings { - monitoring.KpiId kpi_id = 1; - monitoring.KpiDescriptor kpi_descriptor = 2; + kpi_manager.KpiId kpi_id = 1; + kpi_manager.KpiDescriptor kpi_descriptor = 2; float sampling_duration_s = 3; float sampling_interval_s = 4; } diff --git a/proto/kpi_manager.proto b/proto/kpi_manager.proto new file mode 100644 index 0000000000000000000000000000000000000000..ad48eb84ff7433d48ebcd5ebd902f0bea7d28504 --- /dev/null +++ b/proto/kpi_manager.proto @@ -0,0 +1,61 @@ +// Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; +package kpi_manager; + +import "context.proto"; +import "kpi_sample_types.proto"; + +service KpiManagerService{ + rpc SetKpiDescriptor (KpiDescriptor ) returns (KpiId ) {} // Stable not final + rpc DeleteKpiDescriptor (KpiId ) returns (context.Empty ) {} // Stable and final + rpc GetKpiDescriptor (KpiId ) returns (KpiDescriptor ) {} // Stable and final + rpc SelectKpiDescriptor (KpiDescriptorFilter) returns (KpiDescriptorList ) {} // Stable and final +} + + +message KpiId { + context.Uuid kpi_id = 1; +} + +message KpiDescriptor { + KpiId kpi_id = 1; + string kpi_description = 2; + kpi_sample_types.KpiSampleType kpi_sample_type = 3; + context.DeviceId device_id = 4; + context.EndPointId endpoint_id = 5; + context.ServiceId service_id = 6; + context.SliceId slice_id = 7; + context.ConnectionId connection_id = 8; + context.LinkId link_id = 9; +} + +message KpiDescriptorFilter { + // KPI Descriptors that fulfill the filter are those that match ALL the following fields. + // An empty list means: any value is accepted. + // All fields empty means: list all KPI Descriptors + repeated KpiId kpi_id = 1; + repeated kpi_sample_types.KpiSampleType kpi_sample_type = 2; + repeated context.DeviceId device_id = 3; + repeated context.EndPointId endpoint_id = 4; + repeated context.ServiceId service_id = 5; + repeated context.SliceId slice_id = 6; + repeated context.ConnectionId connection_id = 7; + repeated context.LinkId link_id = 8; +} + +message KpiDescriptorList { + repeated KpiDescriptor kpi_descriptor_list = 1; +} \ No newline at end of file diff --git a/proto/monitoring.proto b/proto/monitoring.proto old mode 100644 new mode 100755 index 45ba48b0271c6e8890d7125ff44f62d2b6da6b58..2706988aa474b88f237017d72e468a34e4945af9 --- a/proto/monitoring.proto +++ b/proto/monitoring.proto @@ -16,13 +16,14 @@ syntax = "proto3"; package monitoring; import "context.proto"; -import "kpi_sample_types.proto"; +import "kpi_manager.proto"; +//import "kpi_sample_types.proto"; service MonitoringService { - rpc SetKpi (KpiDescriptor ) returns (KpiId ) {} // Stable not final - rpc DeleteKpi (KpiId ) returns (context.Empty ) {} // Stable and final - rpc GetKpiDescriptor (KpiId ) returns (KpiDescriptor ) {} // Stable and final - rpc GetKpiDescriptorList (context.Empty ) returns (KpiDescriptorList ) {} // Stable and final +// rpc SetKpi (KpiDescriptor ) returns (KpiId ) {} // Stable not final +// rpc DeleteKpi (KpiId ) returns (context.Empty ) {} // Stable and final +// rpc GetKpiDescriptor (KpiId ) returns (KpiDescriptor ) {} // Stable and final +// rpc GetKpiDescriptorList (context.Empty ) returns (KpiDescriptorList ) {} // Stable and final rpc IncludeKpi (Kpi ) returns (context.Empty ) {} // Stable and final rpc MonitorKpi (MonitorKpiRequest ) returns (context.Empty ) {} // Stable and final rpc QueryKpiData (KpiQuery ) returns (RawKpiTable ) {} // Not implemented @@ -35,36 +36,25 @@ service MonitoringService { rpc GetAlarmDescriptor (AlarmID ) returns (AlarmDescriptor ) {} // Stable and final rpc GetAlarmResponseStream(AlarmSubscription ) returns (stream AlarmResponse) {} // Not Stable not final rpc DeleteAlarm (AlarmID ) returns (context.Empty ) {} // Stable and final - rpc GetStreamKpi (KpiId ) returns (stream Kpi ) {} // Stable not final - rpc GetInstantKpi (KpiId ) returns (Kpi ) {} // Stable not final +// rpc GetStreamKpi (KpiId ) returns (stream Kpi ) {} // Stable not final +// rpc GetInstantKpi (KpiId ) returns (Kpi ) {} // Stable not final } -message KpiDescriptor { - KpiId kpi_id = 1; - string kpi_description = 2; - repeated KpiId kpi_id_list = 3; - kpi_sample_types.KpiSampleType kpi_sample_type = 4; - context.DeviceId device_id = 5; - context.EndPointId endpoint_id = 6; - context.ServiceId service_id = 7; - context.SliceId slice_id = 8; - context.ConnectionId connection_id = 9; - context.LinkId link_id = 10; -} + message MonitorKpiRequest { - KpiId kpi_id = 1; + kpi_manager.KpiId kpi_id = 1; float monitoring_window_s = 2; float sampling_rate_s = 3; // Pending add field to reflect Available Device Protocols } message KpiQuery { - repeated KpiId kpi_ids = 1; - float monitoring_window_s = 2; - uint32 last_n_samples = 3; // used when you want something like "get the last N many samples - context.Timestamp start_timestamp = 4; // used when you want something like "get the samples since X date/time" - context.Timestamp end_timestamp = 5; // used when you want something like "get the samples until X date/time" + repeated kpi_manager.KpiId kpi_ids = 1; + float monitoring_window_s = 2; + uint32 last_n_samples = 3; // used when you want something like "get the last N many samples + context.Timestamp start_timestamp = 4; // used when you want something like "get the samples since X date/time" + context.Timestamp end_timestamp = 5; // used when you want something like "get the samples until X date/time" } @@ -74,20 +64,18 @@ message RawKpi { // cell } message RawKpiList { // column - KpiId kpi_id = 1; - repeated RawKpi raw_kpis = 2; + kpi_manager.KpiId kpi_id = 1; + repeated RawKpi raw_kpis = 2; } message RawKpiTable { // table repeated RawKpiList raw_kpi_lists = 1; } -message KpiId { - context.Uuid kpi_id = 1; -} + message Kpi { - KpiId kpi_id = 1; + kpi_manager.KpiId kpi_id = 1; context.Timestamp timestamp = 2; KpiValue kpi_value = 3; } @@ -117,13 +105,11 @@ message KpiList { repeated Kpi kpi = 1; } -message KpiDescriptorList { - repeated KpiDescriptor kpi_descriptor_list = 1; -} + message SubsDescriptor{ SubscriptionID subs_id = 1; - KpiId kpi_id = 2; + kpi_manager.KpiId kpi_id = 2; float sampling_duration_s = 3; float sampling_interval_s = 4; context.Timestamp start_timestamp = 5; // used when you want something like "get the samples since X date/time" @@ -148,7 +134,7 @@ message AlarmDescriptor { AlarmID alarm_id = 1; string alarm_description = 2; string name = 3; - KpiId kpi_id = 4; + kpi_manager.KpiId kpi_id = 4; KpiValueRange kpi_value_range = 5; context.Timestamp timestamp = 6; } diff --git a/proto/optical_attack_detector.proto b/proto/optical_attack_detector.proto index ebe3b5e06163c6e5a3bf7889065d5bb31923dd89..0d3ed58de81283e3f77fb013bed77ede14c7e849 100644 --- a/proto/optical_attack_detector.proto +++ b/proto/optical_attack_detector.proto @@ -17,7 +17,8 @@ syntax = "proto3"; package optical_attack_detector; import "context.proto"; -import "monitoring.proto"; +//import "monitoring.proto"; +import "kpi_manager.proto"; service OpticalAttackDetectorService { @@ -28,5 +29,5 @@ service OpticalAttackDetectorService { message DetectionRequest { context.ServiceId service_id = 1; - monitoring.KpiId kpi_id = 2; + kpi_manager.KpiId kpi_id = 2; } diff --git a/proto/policy_condition.proto b/proto/policy_condition.proto index 2037af93c375838209e78a07ec95e25d469d1d5a..c0af929efdbdbd6f237d6a6aba471952da1d2d6a 100644 --- a/proto/policy_condition.proto +++ b/proto/policy_condition.proto @@ -16,10 +16,11 @@ syntax = "proto3"; package policy; import "monitoring.proto"; +import "kpi_manager.proto"; // Condition message PolicyRuleCondition { - monitoring.KpiId kpiId = 1; + kpi_manager.KpiId kpiId = 1; NumericalOperator numericalOperator = 2; monitoring.KpiValue kpiValue = 3; } diff --git a/proto/telemetry_frontend.proto b/proto/telemetry_frontend.proto new file mode 100644 index 0000000000000000000000000000000000000000..1f89a5d544dec837a7233296839b0f9eb5f4989f --- /dev/null +++ b/proto/telemetry_frontend.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; +package device; + +import "context.proto"; +import "kpi_manager.proto"; +import "kpi_sample_types.proto"; + +service TelemetryFrontendService { + rpc StartCollector (Collector ) returns (CollectorId ) {} + rpc StopCollector (CollectorId ) returns (context.Empty) {} + rpc SelectCollectors(CollectorFilter) returns (CollectorList) {} +} + +message CollectorId { + context.Uuid collector_id = 1; +} + +message Collector { + CollectorId collector_id = 1; // The Collector ID + kpi_manager.KpiId kpi_id = 2; // The KPI Id to be associated to the collected samples + float duration_s = 3; // Terminate data collection after duration[seconds]; duration==0 means indefinitely + float interval_s = 4; // Interval between collected samples +} + +message CollectorFilter { + // Collector that fulfill the filter are those that match ALL the following fields. + // An empty list means: any value is accepted. + // All fields empty means: list all Collectors + repeated CollectorId collector_id = 1; + repeated kpi_manager.KpiId kpi_id = 2; + repeated kpi_sample_types.KpiSampleType kpi_sample_type = 3; + repeated context.DeviceId device_id = 4; + repeated context.EndPointId endpoint_id = 5; + repeated context.ServiceId service_id = 6; + repeated context.SliceId slice_id = 7; + repeated context.ConnectionId connection_id = 8; + repeated context.LinkId link_id = 9; +} + +message CollectorList { + repeated Collector collector_list = 1; +} diff --git a/scripts/run_tests_locally-kpi_manager.sh b/scripts/run_tests_locally-kpi_manager.sh new file mode 100755 index 0000000000000000000000000000000000000000..8ed855a8e34261173a3d788fa06777f0e66825a3 --- /dev/null +++ b/scripts/run_tests_locally-kpi_manager.sh @@ -0,0 +1,28 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +# RCFILE=$PROJECTDIR/coverage/.coveragerc +# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ +# kpi_manager/tests/test_unitary.py + +# python3 kpi_manager/tests/test_unitary.py + +RCFILE=$PROJECTDIR/coverage/.coveragerc +python3 -m pytest --log-level=INFO --verbose \ + kpi_manager/tests/test_unitary.py \ No newline at end of file diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh new file mode 100755 index 0000000000000000000000000000000000000000..34e9e0542bf1d46fc03cb2442abdd0a7b6c5961d --- /dev/null +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -0,0 +1,28 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +# RCFILE=$PROJECTDIR/coverage/.coveragerc +# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ +# kpi_manager/tests/test_unitary.py + +# python3 kpi_manager/tests/test_unitary.py + +RCFILE=$PROJECTDIR/coverage/.coveragerc +python3 -m pytest --log-level=INFO --verbose \ + telemetry/backend/tests/testTelemetryBackend.py \ No newline at end of file diff --git a/scripts/run_tests_locally-telemetry-frontend.sh b/scripts/run_tests_locally-telemetry-frontend.sh new file mode 100755 index 0000000000000000000000000000000000000000..ac59f6dde13986a8e0f683caa6a8670a6b6fd11d --- /dev/null +++ b/scripts/run_tests_locally-telemetry-frontend.sh @@ -0,0 +1,28 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +# RCFILE=$PROJECTDIR/coverage/.coveragerc +# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ +# kpi_manager/tests/test_unitary.py + +# python3 kpi_manager/tests/test_unitary.py + +RCFILE=$PROJECTDIR/coverage/.coveragerc +python3 -m pytest --log-level=INFO --verbose \ + telemetry_frontend/tests/test_unitary.py \ No newline at end of file diff --git a/src/build.sh b/src/build.sh index b1a7d299e546e4607a494dde3ec435f093da0c8d..9ae91ef105371db4bb4f20e0cb9e397c0c6f0e56 100755 --- a/src/build.sh +++ b/src/build.sh @@ -18,16 +18,20 @@ cd $(dirname $0) echo "BUILD context" -context/genproto.sh +# context/genproto.sh # genproto.sh file doesn't exist docker build -t "context:develop" -f context/Dockerfile --quiet . -docker build -t "context:test" -f context/tests/Dockerfile --quiet . +# docker build -t "context:test" -f context/tests/Dockerfile --quiet . # Dockerfile doesn't exist -cd monitoring -./genproto.sh -cd .. +# genproto.sh file doesn't exist +# cd monitoring +# ./genproto.sh +# cd .. echo "BUILD monitoring" docker build -t "monitoring:dockerfile" -f monitoring/Dockerfile . +echo "BUILD kpi manager" +docker build -t "kpi_manager:dockerfile" -f kpi_manager/Dockerfile . + echo "Prune unused images" docker image prune --force diff --git a/src/common/Constants.py b/src/common/Constants.py index 30aa09b4caa34a54dd54126195b3f322c07c932e..72003846b1c3737246aa6ea277426a470fd4df27 100644 --- a/src/common/Constants.py +++ b/src/common/Constants.py @@ -43,6 +43,8 @@ class ServiceNameEnum(Enum): ZTP = 'ztp' POLICY = 'policy' MONITORING = 'monitoring' + KPIMANAGER = 'kpiManager' + TELEMETRYFRONTEND = 'telemetryfrontend' DLT = 'dlt' NBI = 'nbi' CYBERSECURITY = 'cybersecurity' @@ -73,6 +75,8 @@ DEFAULT_SERVICE_GRPC_PORTS = { ServiceNameEnum.ZTP .value : 5050, ServiceNameEnum.POLICY .value : 6060, ServiceNameEnum.MONITORING .value : 7070, + ServiceNameEnum.KPIMANAGER .value : 7071, + ServiceNameEnum.TELEMETRYFRONTEND .value : 7072, ServiceNameEnum.DLT .value : 8080, ServiceNameEnum.NBI .value : 9090, ServiceNameEnum.L3_CAD .value : 10001, diff --git a/src/kpi_manager/Dockerfile b/src/kpi_manager/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..d3d962b9fedd249e2a9bfe43a54dd558a43e4563 --- /dev/null +++ b/src/kpi_manager/Dockerfile @@ -0,0 +1,71 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.9-slim + +# Install dependencies +RUN apt-get --yes --quiet --quiet update && \ + apt-get --yes --quiet --quiet install wget g++ git && \ + rm -rf /var/lib/apt/lists/* + +# Set Python to show logs as they occur +ENV PYTHONUNBUFFERED=0 + +# Download the gRPC health probe +RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \ + wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ + chmod +x /bin/grpc_health_probe + +# Get generic Python packages +RUN python3 -m pip install --upgrade pip +RUN python3 -m pip install --upgrade setuptools wheel +RUN python3 -m pip install --upgrade pip-tools + +# Get common Python packages +# Note: this step enables sharing the previous Docker build steps among all the Python components +WORKDIR /var/teraflow +COPY common_requirements.in common_requirements.in +RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in +RUN python3 -m pip install -r common_requirements.txt + +# Add common files into working directory +WORKDIR /var/teraflow/common +COPY src/common/. ./ +RUN rm -rf proto + +# Create proto sub-folder, copy .proto files, and generate Python code +RUN mkdir -p /var/teraflow/common/proto +WORKDIR /var/teraflow/common/proto +RUN touch __init__.py +COPY proto/*.proto ./ +RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto +RUN rm *.proto +RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; + +# Create component sub-folders, get specific Python packages +RUN mkdir -p /var/teraflow/kpi_manager +WORKDIR /var/teraflow/kpi_manager +COPY src/kpi_manager/requirements.in requirements.in +RUN pip-compile --quiet --output-file=requirements.txt requirements.in +RUN python3 -m pip install -r requirements.txt + +# Add component files into working directory +WORKDIR /var/teraflow +COPY src/context/. context/ +COPY src/device/. device/ +COPY src/monitoring/. monitoring/ +COPY src/kpi_manager/. kpi_manager/ + +# Start the service +ENTRYPOINT ["python", "-m", "kpi_manager.service"] diff --git a/src/kpi_manager/__init__.py b/src/kpi_manager/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1549d9811aa5d1c193a44ad45d0d7773236c0612 --- /dev/null +++ b/src/kpi_manager/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/kpi_manager/client/KpiManagerClient.py b/src/kpi_manager/client/KpiManagerClient.py new file mode 100755 index 0000000000000000000000000000000000000000..30b1720fb892c599abd5b206d7478873e72cc5fd --- /dev/null +++ b/src/kpi_manager/client/KpiManagerClient.py @@ -0,0 +1,76 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, logging +from common.Constants import ServiceNameEnum +from common.Settings import get_service_host, get_service_port_grpc + +from common.proto.context_pb2 import Empty +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.tools.client.RetryDecorator import retry, delay_exponential +from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceStub +from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList + +LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 10 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') + +class KpiManagerClient: + def __init__(self, host=None, port=None): + if not host: host = get_service_host(ServiceNameEnum.KPIMANAGER) + if not port: port = get_service_port_grpc(ServiceNameEnum.KPIMANAGER) + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) + LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) + self.channel = None + self.stub = None + self.connect() + LOGGER.debug('Channel created') + + def connect(self): + self.channel = grpc.insecure_channel(self.endpoint) + self.stub = KpiManagerServiceStub(self.channel) + + def close(self): + if self.channel is not None: self.channel.close() + self.channel = None + self.stub = None + + @RETRY_DECORATOR + def SetKpiDescriptor(self, request : KpiDescriptor) -> KpiId: + LOGGER.debug('SetKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.SetKpiDescriptor(request) + LOGGER.debug('SetKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def DeleteKpiDescriptor(self,request : KpiId) -> Empty: + LOGGER.debug('DeleteKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.DeleteKpiDescriptor(request) + LOGGER.info('DeleteKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetKpiDescriptor(self, request : KpiId) -> KpiDescriptor: + LOGGER.debug('GetKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetKpiDescriptor(request) + LOGGER.debug('GetKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def SelectKpiDescriptor(self, request : KpiDescriptorFilter) -> KpiDescriptorList: + LOGGER.debug('SelectKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.SelectKpiDescriptor(request) + LOGGER.debug('SelectKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) + return response \ No newline at end of file diff --git a/src/kpi_manager/client/__init__.py b/src/kpi_manager/client/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1549d9811aa5d1c193a44ad45d0d7773236c0612 --- /dev/null +++ b/src/kpi_manager/client/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/kpi_manager/requirements.in b/src/kpi_manager/requirements.in new file mode 100644 index 0000000000000000000000000000000000000000..a6183b57e6d26e90daac5857b7e46d8ac0f27c66 --- /dev/null +++ b/src/kpi_manager/requirements.in @@ -0,0 +1,24 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +anytree==2.8.0 +APScheduler==3.10.1 +influx-line-protocol==0.1.4 +psycopg2-binary==2.9.3 +python-dateutil==2.8.2 +python-json-logger==2.0.2 +questdb==1.0.1 +requests==2.27.1 +xmltodict==0.12.0 +# grpc_health_probe==0.2.0 #getting error on this library \ No newline at end of file diff --git a/src/kpi_manager/service/KpiManagerService.py b/src/kpi_manager/service/KpiManagerService.py new file mode 100755 index 0000000000000000000000000000000000000000..dbbcec2cf0e017b5797348578b3537da1420ecbc --- /dev/null +++ b/src/kpi_manager/service/KpiManagerService.py @@ -0,0 +1,31 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.Constants import ServiceNameEnum +from common.Settings import get_service_port_grpc +# from common.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server +from common.proto.kpi_manager_pb2_grpc import add_KpiManagerServiceServicer_to_server +from common.tools.service.GenericGrpcService import GenericGrpcService +from kpi_manager.service.KpiManagerServiceServicerImpl import KpiManagerServiceServicerImpl +# from monitoring.service.MonitoringServiceServicerImpl import MonitoringServiceServicerImpl +from monitoring.service.NameMapping import NameMapping + +class KpiManagerService(GenericGrpcService): + def __init__(self, name_mapping : NameMapping, cls_name: str = __name__) -> None: + port = get_service_port_grpc(ServiceNameEnum.KPIMANAGER) + super().__init__(port, cls_name=cls_name) + self.kpiManagerService_servicer = KpiManagerServiceServicerImpl(name_mapping) + + def install_servicers(self): + add_KpiManagerServiceServicer_to_server(self.kpiManagerService_servicer, self.server) diff --git a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py new file mode 100644 index 0000000000000000000000000000000000000000..f1d370f30b20e871d2dd3e636afa4cad6ba0110c --- /dev/null +++ b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py @@ -0,0 +1,105 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# do tests to verify the "grpc.ServicerContext" is required or not. +import logging, grpc +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from common.proto.context_pb2 import Empty +from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceServicer +from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList +from monitoring.service.NameMapping import NameMapping +from monitoring.service import ManagementDBTools + + +LOGGER = logging.getLogger(__name__) + +METRICS_POOL = MetricsPool('Monitoring', 'KpiManager') + +class KpiManagerServiceServicerImpl(KpiManagerServiceServicer): + def __init__(self, name_mapping : NameMapping): + LOGGER.info('Init KpiManagerService') + + # Init sqlite monitoring db + self.management_db = ManagementDBTools.ManagementDB('monitoring.db') # why monitoring.db here??? + LOGGER.info('MetricsDB initialized --- KPI Manager Service') + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def SetKpiDescriptor( + self, request: KpiDescriptor, grpc_context: grpc.ServicerContext # type: ignore + ) -> KpiId: # type: ignore + response = KpiId() + kpi_description = request.kpi_description + kpi_sample_type = request.kpi_sample_type + kpi_device_id = request.device_id.device_uuid.uuid + kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid + kpi_service_id = request.service_id.service_uuid.uuid + kpi_slice_id = request.slice_id.slice_uuid.uuid + kpi_connection_id = request.connection_id.connection_uuid.uuid + kpi_link_id = request.link_id.link_uuid.uuid + if request.kpi_id.kpi_id.uuid != "": + response.kpi_id.uuid = request.kpi_id.kpi_id.uuid + # Here the code to modify an existing kpi + else: + data = self.management_db.insert_KPI( + kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, + kpi_service_id, kpi_slice_id, kpi_connection_id, kpi_link_id) + response.kpi_id.uuid = str(data) + return response + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def DeleteKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Empty: # type: ignore + kpi_id = int(request.kpi_id.uuid) + kpi = self.management_db.get_KPI(kpi_id) + if kpi: + self.management_db.delete_KPI(kpi_id) + else: + LOGGER.info('DeleteKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) + return Empty() + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiDescriptor: # type: ignore + kpi_id = request.kpi_id.uuid + kpi_db = self.management_db.get_KPI(int(kpi_id)) + kpiDescriptor = KpiDescriptor() + if kpi_db is None: + LOGGER.info('GetKpiDescriptor error: KpiID({:s}): not found in database'.format(str(kpi_id))) + else: + kpiDescriptor.kpi_description = kpi_db[1] + kpiDescriptor.kpi_sample_type = kpi_db[2] + kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) + kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) + kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) + kpiDescriptor.slice_id.slice_uuid.uuid = str(kpi_db[6]) + kpiDescriptor.connection_id.connection_uuid.uuid = str(kpi_db[7]) + kpiDescriptor.link_id.link_uuid.uuid = str(kpi_db[8]) + return kpiDescriptor + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def SelectKpiDescriptor(self, request: KpiDescriptorFilter, grpc_context: grpc.ServicerContext) -> KpiDescriptorList: # type: ignore + kpi_descriptor_list = KpiDescriptorList() + data = self.management_db.get_KPIS() + LOGGER.debug(f"data: {data}") + for item in data: + kpi_descriptor = KpiDescriptor() + kpi_descriptor.kpi_id.kpi_id.uuid = str(item[0]) + kpi_descriptor.kpi_description = item[1] + kpi_descriptor.kpi_sample_type = item[2] + kpi_descriptor.device_id.device_uuid.uuid = str(item[3]) + kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4]) + kpi_descriptor.service_id.service_uuid.uuid = str(item[5]) + kpi_descriptor.slice_id.slice_uuid.uuid = str(item[6]) + kpi_descriptor.connection_id.connection_uuid.uuid = str(item[7]) + kpi_descriptor.link_id.link_uuid.uuid = str(item[8]) + kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor) + return kpi_descriptor_list \ No newline at end of file diff --git a/src/kpi_manager/service/__init__.py b/src/kpi_manager/service/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1549d9811aa5d1c193a44ad45d0d7773236c0612 --- /dev/null +++ b/src/kpi_manager/service/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/kpi_manager/service/__main__.py b/src/kpi_manager/service/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..9f0e5324644a5a58eb47483688f71306a0808d1a --- /dev/null +++ b/src/kpi_manager/service/__main__.py @@ -0,0 +1,107 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, signal, sys, threading, time +from prometheus_client import start_http_server +from common.Constants import ServiceNameEnum +from common.Settings import ( + ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, + wait_for_environment_variables) +from common.proto import monitoring_pb2 +from monitoring.service.EventTools import EventsDeviceCollector # import updated +from monitoring.service.NameMapping import NameMapping # import updated +# from .MonitoringService import MonitoringService +from .KpiManagerService import KpiManagerService + +terminate = threading.Event() +LOGGER = None + +def signal_handler(signal, frame): # pylint: disable=redefined-outer-name + LOGGER.warning('Terminate signal received') + terminate.set() + +def start_kpi_manager(name_mapping : NameMapping): + LOGGER.info('Start Monitoring...',) + + events_collector = EventsDeviceCollector(name_mapping) + events_collector.start() + + # TODO: redesign this method to be more clear and clean + + # Iterate while terminate is not set + while not terminate.is_set(): + list_new_kpi_ids = events_collector.listen_events() + + # Monitor Kpis + if bool(list_new_kpi_ids): + for kpi_id in list_new_kpi_ids: + # Create Monitor Kpi Requests + monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() + monitor_kpi_request.kpi_id.CopyFrom(kpi_id) + monitor_kpi_request.monitoring_window_s = 86400 + monitor_kpi_request.sampling_rate_s = 10 + events_collector._monitoring_client.MonitorKpi(monitor_kpi_request) + + time.sleep(0.5) # let other tasks run; do not overload CPU + else: + # Terminate is set, looping terminates + LOGGER.warning("Stopping execution...") + + events_collector.start() + +def main(): + global LOGGER # pylint: disable=global-statement + + log_level = get_log_level() + logging.basicConfig(level=log_level) + LOGGER = logging.getLogger(__name__) + + wait_for_environment_variables([ + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + ]) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + LOGGER.info('Starting...') + + # Start metrics server + metrics_port = get_metrics_port() + start_http_server(metrics_port) + + name_mapping = NameMapping() + # Starting monitoring service + # grpc_service = MonitoringService(name_mapping) + # grpc_service.start() + # start_monitoring(name_mapping) + + grpc_service = KpiManagerService(name_mapping) + grpc_service.start() + + start_kpi_manager(name_mapping) + + # Wait for Ctrl+C or termination signal + while not terminate.wait(timeout=1.0): pass + + LOGGER.info('Terminating...') + grpc_service.stop() + + LOGGER.info('Bye') + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/kpi_manager/tests/__init__.py b/src/kpi_manager/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1549d9811aa5d1c193a44ad45d0d7773236c0612 --- /dev/null +++ b/src/kpi_manager/tests/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/kpi_manager/tests/test_messages.py b/src/kpi_manager/tests/test_messages.py new file mode 100755 index 0000000000000000000000000000000000000000..72ff74c1686c2d5fa35ee5a118aeca4dca0f4438 --- /dev/null +++ b/src/kpi_manager/tests/test_messages.py @@ -0,0 +1,85 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.proto import kpi_manager_pb2 +from common.proto.kpi_sample_types_pb2 import KpiSampleType + +def kpi_id(): + _kpi_id = kpi_manager_pb2.KpiId() + _kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member + return _kpi_id + +def create_kpi_request(kpi_id_str): + _create_kpi_request = kpi_manager_pb2.KpiDescriptor() + _create_kpi_request.kpi_description = 'KPI Description Test' + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED + _create_kpi_request.device_id.device_uuid.uuid = 'DEV' + str(kpi_id_str) + _create_kpi_request.service_id.service_uuid.uuid = 'SERV' + str(kpi_id_str) + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC' + str(kpi_id_str) + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END' + str(kpi_id_str) + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON' + str(kpi_id_str) + return _create_kpi_request + +def create_kpi_request_b(): + _create_kpi_request = kpi_manager_pb2.KpiDescriptor() + _create_kpi_request.kpi_description = 'KPI Description Test' + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED + _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member + _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC2' # pylint: disable=maybe-no-member + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member + return _create_kpi_request + +def create_kpi_request_c(): + _create_kpi_request = kpi_manager_pb2.KpiDescriptor() + _create_kpi_request.kpi_description = 'KPI Description Test' + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED + _create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member + _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON3' # pylint: disable=maybe-no-member + return _create_kpi_request + +def create_kpi_request_d(): + _create_kpi_request = kpi_manager_pb2.KpiDescriptor() + _create_kpi_request.kpi_description = 'KPI Description Test' + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED + _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' # pylint: disable=maybe-no-member + _create_kpi_request.service_id.service_uuid.uuid = 'SERV4' # pylint: disable=maybe-no-member + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC4' # pylint: disable=maybe-no-member + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END4' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON4' # pylint: disable=maybe-no-member + return _create_kpi_request + +def kpi_descriptor_list(): + _kpi_descriptor_list = kpi_manager_pb2.KpiDescriptorList() + return _kpi_descriptor_list + +def create_kpi_filter_request(): + _create_kpi_filter_request = kpi_manager_pb2.KpiDescriptorFilter() + _create_kpi_filter_request.kpi_sample_type.append(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED) + new_device_id = _create_kpi_filter_request.device_id.add() + new_device_id.device_uuid.uuid = 'DEV1' + new_service_id = _create_kpi_filter_request.service_id.add() + new_service_id.service_uuid.uuid = 'SERV1' + new_slice_id = _create_kpi_filter_request.slice_id.add() + new_slice_id.slice_uuid.uuid = 'SLC1' + new_endpoint_id = _create_kpi_filter_request.endpoint_id.add() + new_endpoint_id.endpoint_uuid.uuid = 'END1' + new_connection_id = _create_kpi_filter_request.connection_id.add() + new_connection_id.connection_uuid.uuid = 'CON1' + + return _create_kpi_filter_request \ No newline at end of file diff --git a/src/kpi_manager/tests/test_unitary.py b/src/kpi_manager/tests/test_unitary.py new file mode 100755 index 0000000000000000000000000000000000000000..75987a5f4320bc36ef0a298641e807c9c7dfb8fb --- /dev/null +++ b/src/kpi_manager/tests/test_unitary.py @@ -0,0 +1,233 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# import sys +# sys.path.append('.') +import os, pytest +import logging, json +from typing import Union + +from apscheduler.schedulers.background import BackgroundScheduler + +from common.proto.context_pb2 import ConfigActionEnum, Context, ContextId, DeviceOperationalStatusEnum, EventTypeEnum, DeviceEvent, Device, Empty, Topology, TopologyId +from common.Constants import ServiceNameEnum +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, ServiceNameEnum +from common.Settings import ( + ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) +from common.tests.MockServicerImpl_Context import MockServicerImpl_Context +from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.tools.object_factory.Context import json_context, json_context_id +from common.tools.object_factory.Topology import json_topology, json_topology_id +# from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, SubsDescriptor, SubsList, AlarmID, \ +# AlarmDescriptor, AlarmList, KpiDescriptorList, SubsResponse, AlarmResponse, RawKpiTable #, Kpi, KpiList +from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList +from common.tools.service.GenericGrpcService import GenericGrpcService +from context.client.ContextClient import ContextClient + + +from device.service.driver_api.DriverFactory import DriverFactory +from device.service.driver_api.DriverInstanceCache import DriverInstanceCache +from device.service.DeviceService import DeviceService +from device.client.DeviceClient import DeviceClient + +from kpi_manager.tests.test_messages import create_kpi_request, create_kpi_request_b, create_kpi_request_c, create_kpi_request_d, create_kpi_filter_request +# from monitoring.service.MonitoringService import MonitoringService +from kpi_manager.service.KpiManagerService import KpiManagerService +# from monitoring.client.MonitoringClient import MonitoringClient +from kpi_manager.client.KpiManagerClient import KpiManagerClient + +from monitoring.service.ManagementDBTools import ManagementDB +from monitoring.service.MetricsDBTools import MetricsDB +from monitoring.service.NameMapping import NameMapping + +os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE' +from device.service.drivers import DRIVERS + +########################### +# Tests Setup +########################### + +LOCAL_HOST = '127.0.0.1' +MOCKSERVICE_PORT = 10000 + +KPIMANAGER_SERVICE_PORT = MOCKSERVICE_PORT + get_service_port_grpc(ServiceNameEnum.KPIMANAGER) # avoid privileged ports +os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(KPIMANAGER_SERVICE_PORT) + +METRICSDB_HOSTNAME = os.environ.get('METRICSDB_HOSTNAME') + +LOGGER = logging.getLogger(__name__) + +class MockContextService(GenericGrpcService): + # Mock Service implementing Context to simplify unitary tests of Monitoring + + def __init__(self, bind_port: Union[str, int]) -> None: + super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService') + + # pylint: disable=attribute-defined-outside-init + def install_servicers(self): + self.context_servicer = MockServicerImpl_Context() + add_ContextServiceServicer_to_server(self.context_servicer, self.server) + +@pytest.fixture(scope='session') +def context_service(): + LOGGER.info('Initializing MockContextService...') + _service = MockContextService(MOCKSERVICE_PORT) + _service.start() + + LOGGER.info('Yielding MockContextService...') + yield _service + + LOGGER.info('Terminating MockContextService...') + _service.context_servicer.msg_broker.terminate() + _service.stop() + + LOGGER.info('Terminated MockContextService...') + +@pytest.fixture(scope='session') +def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument + LOGGER.info('Initializing ContextClient...') + _client = ContextClient() + + LOGGER.info('Yielding ContextClient...') + yield _client + + LOGGER.info('Closing ContextClient...') + _client.close() + + LOGGER.info('Closed ContextClient...') + +@pytest.fixture(scope='session') +def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument + LOGGER.info('Initializing DeviceService...') + driver_factory = DriverFactory(DRIVERS) + driver_instance_cache = DriverInstanceCache(driver_factory) + _service = DeviceService(driver_instance_cache) + _service.start() + + # yield the server, when test finishes, execution will resume to stop it + LOGGER.info('Yielding DeviceService...') + yield _service + + LOGGER.info('Terminating DeviceService...') + _service.stop() + + LOGGER.info('Terminated DeviceService...') + +@pytest.fixture(scope='session') +def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument + LOGGER.info('Initializing DeviceClient...') + _client = DeviceClient() + + LOGGER.info('Yielding DeviceClient...') + yield _client + + LOGGER.info('Closing DeviceClient...') + _client.close() + + LOGGER.info('Closed DeviceClient...') + +@pytest.fixture(scope='session') +def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument + LOGGER.info('Initializing DeviceClient...') + _client = DeviceClient() + + LOGGER.info('Yielding DeviceClient...') + yield _client + + LOGGER.info('Closing DeviceClient...') + _client.close() + + LOGGER.info('Closed DeviceClient...') + +# This fixture will be requested by test cases and last during testing session +@pytest.fixture(scope='session') +def kpi_manager_service( + context_service : MockContextService, # pylint: disable=redefined-outer-name,unused-argument + device_service : DeviceService # pylint: disable=redefined-outer-name,unused-argument + ): + LOGGER.info('Initializing KpiManagerService...') + name_mapping = NameMapping() + # _service = MonitoringService(name_mapping) + _service = KpiManagerService(name_mapping) + _service.start() + + # yield the server, when test finishes, execution will resume to stop it + LOGGER.info('Yielding KpiManagerService...') + yield _service + + LOGGER.info('Terminating KpiManagerService...') + _service.stop() + + LOGGER.info('Terminated KpiManagerService...') + +# This fixture will be requested by test cases and last during testing session. +# The client requires the server, so client fixture has the server as dependency. +# def monitoring_client(monitoring_service : MonitoringService): (Add for better understanding) +@pytest.fixture(scope='session') +def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disable=redefined-outer-name,unused-argument + LOGGER.info('Initializing KpiManagerClient...') + _client = KpiManagerClient() + + # yield the server, when test finishes, execution will resume to stop it + LOGGER.info('Yielding KpiManagerClient...') + yield _client + + LOGGER.info('Closing KpiManagerClient...') + _client.close() + + LOGGER.info('Closed KpiManagerClient...') + +################################################## +# Prepare Environment, should be the first test +################################################## + +# ERROR on this test --- +def test_prepare_environment( + context_client : ContextClient, # pylint: disable=redefined-outer-name,unused-argument +): + context_id = json_context_id(DEFAULT_CONTEXT_NAME) + context_client.SetContext(Context(**json_context(DEFAULT_CONTEXT_NAME))) + context_client.SetTopology(Topology(**json_topology(DEFAULT_TOPOLOGY_NAME, context_id=context_id))) + +########################### +# Tests Implementation of Kpi Manager +########################### + +# Test case that makes use of client fixture to test server's CreateKpi method +def test_set_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name + # make call to server + LOGGER.warning('test_create_kpi requesting') + for i in range(3): + response = kpi_manager_client.SetKpiDescriptor(create_kpi_request(str(i+1))) + LOGGER.debug(str(response)) + assert isinstance(response, KpiId) + +# Test case that makes use of client fixture to test server's DeleteKpi method +def test_delete_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name + # make call to server + LOGGER.warning('delete_kpi requesting') + response = kpi_manager_client.SetKpiDescriptor(create_kpi_request('4')) + response = kpi_manager_client.DeleteKpiDescriptor(response) + LOGGER.debug(str(response)) + assert isinstance(response, Empty) + +# Test case that makes use of client fixture to test server's GetKpiDescriptor method +def test_select_kpi_descriptor(kpi_manager_client): # pylint: disable=redefined-outer-name + LOGGER.warning('test_selectkpidescritor begin') + response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request()) + LOGGER.debug(str(response)) + assert isinstance(response, KpiDescriptorList) diff --git a/src/monitoring/client/MonitoringClient.py b/src/monitoring/client/MonitoringClient.py index 751ff6e38e52a9d09d170c440ced0a04a44e10a3..493e96ca8e866ec1cf31d7d91e8cdef4afb99fbe 100644 --- a/src/monitoring/client/MonitoringClient.py +++ b/src/monitoring/client/MonitoringClient.py @@ -20,8 +20,9 @@ from common.Settings import get_service_host, get_service_port_grpc from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string from common.proto.context_pb2 import Empty -from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest, \ - KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsList, \ +from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorList +from common.proto.monitoring_pb2 import Kpi, MonitorKpiRequest, \ + KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsList, \ SubsResponse, AlarmDescriptor, AlarmID, AlarmList, AlarmResponse, AlarmSubscription, RawKpiTable from common.proto.monitoring_pb2_grpc import MonitoringServiceStub diff --git a/src/monitoring/requirements.in b/src/monitoring/requirements.in index 4e57dd0193485b3f7ed3ea346534fb1cb43c5538..bea1bc165b505e9824ac90aea49208a2dc8a3b93 100644 --- a/src/monitoring/requirements.in +++ b/src/monitoring/requirements.in @@ -32,3 +32,15 @@ requests==2.27.1 xmltodict==0.12.0 questdb==1.0.1 psycopg2-binary==2.9.3 +coverage==6.3 +grpcio==1.47.* +grpcio-health-checking==1.47.* +grpcio-tools==1.47.* +grpclib==0.4.4 +prettytable==3.5.0 +prometheus-client==0.13.0 +protobuf==3.20.* +pytest==6.2.5 +pytest-benchmark==3.4.1 +python-dateutil==2.8.2 +pytest-depends==1.0.1 diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 608b0bad9d5869cde35be60157fec9e0a6d34c90..e98cfa23670c66ba9de3bbbe4918345685de97fe 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -20,8 +20,8 @@ from common.proto.context_pb2 import Empty from common.proto.device_pb2 import MonitoringSettings from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer -from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \ - KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \ +from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, \ + KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiList,\ MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse, RawKpiTable, RawKpi, RawKpiList from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float from device.client.DeviceClient import DeviceClient @@ -30,6 +30,8 @@ from monitoring.service.AlarmManager import AlarmManager from monitoring.service.NameMapping import NameMapping from monitoring.service.SubscriptionManager import SubscriptionManager +from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorList + LOGGER = logging.getLogger(__name__) METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME") diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index a56207d9a74869ef625631f6a21762608ad59c14..23f4db017c6d5249651f7e4d3c65653a7f6fa614 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -17,54 +17,54 @@ from common.proto import monitoring_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.tools.timestamp.Converters import timestamp_utcnow_to_float -def kpi_id(): - _kpi_id = monitoring_pb2.KpiId() - _kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member - return _kpi_id - -def create_kpi_request(kpi_id_str): - _create_kpi_request = monitoring_pb2.KpiDescriptor() - _create_kpi_request.kpi_description = 'KPI Description Test' - _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - _create_kpi_request.device_id.device_uuid.uuid = 'DEV' + str(kpi_id_str) - _create_kpi_request.service_id.service_uuid.uuid = 'SERV' + str(kpi_id_str) - _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC' + str(kpi_id_str) - _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END' + str(kpi_id_str) - _create_kpi_request.connection_id.connection_uuid.uuid = 'CON' + str(kpi_id_str) - return _create_kpi_request - -def create_kpi_request_b(): - _create_kpi_request = monitoring_pb2.KpiDescriptor() - _create_kpi_request.kpi_description = 'KPI Description Test' - _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member - _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member - _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC2' # pylint: disable=maybe-no-member - _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member - _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member - return _create_kpi_request - -def create_kpi_request_c(): - _create_kpi_request = monitoring_pb2.KpiDescriptor() - _create_kpi_request.kpi_description = 'KPI Description Test' - _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - _create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member - _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member - _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member - _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member - _create_kpi_request.connection_id.connection_uuid.uuid = 'CON3' # pylint: disable=maybe-no-member - return _create_kpi_request - -def create_kpi_request_d(): - _create_kpi_request = monitoring_pb2.KpiDescriptor() - _create_kpi_request.kpi_description = 'KPI Description Test' - _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' # pylint: disable=maybe-no-member - _create_kpi_request.service_id.service_uuid.uuid = 'SERV4' # pylint: disable=maybe-no-member - _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC4' # pylint: disable=maybe-no-member - _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END4' # pylint: disable=maybe-no-member - _create_kpi_request.connection_id.connection_uuid.uuid = 'CON4' # pylint: disable=maybe-no-member - return _create_kpi_request +# def kpi_id(): +# _kpi_id = monitoring_pb2.KpiId() +# _kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member +# return _kpi_id + +# def create_kpi_request(kpi_id_str): +# _create_kpi_request = monitoring_pb2.KpiDescriptor() +# _create_kpi_request.kpi_description = 'KPI Description Test' +# _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED +# _create_kpi_request.device_id.device_uuid.uuid = 'DEV' + str(kpi_id_str) +# _create_kpi_request.service_id.service_uuid.uuid = 'SERV' + str(kpi_id_str) +# _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC' + str(kpi_id_str) +# _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END' + str(kpi_id_str) +# _create_kpi_request.connection_id.connection_uuid.uuid = 'CON' + str(kpi_id_str) +# return _create_kpi_request + +# def create_kpi_request_b(): +# _create_kpi_request = monitoring_pb2.KpiDescriptor() +# _create_kpi_request.kpi_description = 'KPI Description Test' +# _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED +# _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member +# _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member +# _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC2' # pylint: disable=maybe-no-member +# _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member +# _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member +# return _create_kpi_request + +# def create_kpi_request_c(): +# _create_kpi_request = monitoring_pb2.KpiDescriptor() +# _create_kpi_request.kpi_description = 'KPI Description Test' +# _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED +# _create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member +# _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member +# _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member +# _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member +# _create_kpi_request.connection_id.connection_uuid.uuid = 'CON3' # pylint: disable=maybe-no-member +# return _create_kpi_request + +# def create_kpi_request_d(): +# _create_kpi_request = monitoring_pb2.KpiDescriptor() +# _create_kpi_request.kpi_description = 'KPI Description Test' +# _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED +# _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' # pylint: disable=maybe-no-member +# _create_kpi_request.service_id.service_uuid.uuid = 'SERV4' # pylint: disable=maybe-no-member +# _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC4' # pylint: disable=maybe-no-member +# _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END4' # pylint: disable=maybe-no-member +# _create_kpi_request.connection_id.connection_uuid.uuid = 'CON4' # pylint: disable=maybe-no-member +# return _create_kpi_request def monitor_kpi_request(kpi_uuid, monitoring_window_s, sampling_rate_s): _monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() @@ -80,10 +80,10 @@ def include_kpi_request(kpi_id): _include_kpi_request.kpi_value.floatVal = 500*random() # pylint: disable=maybe-no-member return _include_kpi_request -def kpi_descriptor_list(): - _kpi_descriptor_list = monitoring_pb2.KpiDescriptorList() +# def kpi_descriptor_list(): +# _kpi_descriptor_list = monitoring_pb2.KpiDescriptorList() - return _kpi_descriptor_list +# return _kpi_descriptor_list def kpi_query(kpi_id_list): _kpi_query = monitoring_pb2.KpiQuery() diff --git a/src/start.sh b/src/start.sh index 32a016cc07d2602d5e00b7540b03355f539ed61d..8c3fafe6ebfbff288af1b6183c67da3e356d1555 100755 --- a/src/start.sh +++ b/src/start.sh @@ -15,4 +15,5 @@ docker network create -d bridge teraflowbridge -docker run -d -p 7070:7070 --name monitoring --network=teraflowbridge monitoring:dockerfile +# docker run -d -p 7070:7070 --name monitoring --network=teraflowbridge monitoring:dockerfile +docker run -d -p 7071:7071 --name kpi_manager --network=teraflowbridge kpi_manager:dockerfile \ No newline at end of file diff --git a/src/telemetry/__init__.py b/src/telemetry/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..6a8f397461ef6ef2fbcf09b6078482c2df954a1e --- /dev/null +++ b/src/telemetry/__init__.py @@ -0,0 +1,15 @@ + +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/telemetry/backend/__init__.py b/src/telemetry/backend/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..38d04994fb0fa1951fb465bc127eb72659dc2eaf --- /dev/null +++ b/src/telemetry/backend/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py new file mode 100755 index 0000000000000000000000000000000000000000..8e6fb243ea324b9eb572c165a43a8bbaf22466f3 --- /dev/null +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -0,0 +1,37 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class TelemetryBackendService: + """ + Class to control Kafka producer functionality. + """ + def __init__(self): + pass + + + def generate_kafka_configs(self): + """ + Method to generate Kafka configurations + """ + create_kafka_configs = { + 'bootstrap_servers' : "test_server", # Kafka broker address - Replace with your Kafka broker address + 'exporter_endpoint' : "test_exporter", # Node Exporter metrics endpoint - Replace with your Node Exporter endpoint + 'kafka_topic' : "test_kafka_topic", # Kafka topic to produce to + 'run_duration' : 10, # Total duration to execute the producer + 'fetch_interval' : 2 # Time between two fetch requests + } + return create_kafka_configs + + + diff --git a/src/telemetry/backend/service/TelemetryBackendServiceImpl.py b/src/telemetry/backend/service/TelemetryBackendServiceImpl.py new file mode 100755 index 0000000000000000000000000000000000000000..abcc30baf82daa7013965a7888370f05129746bc --- /dev/null +++ b/src/telemetry/backend/service/TelemetryBackendServiceImpl.py @@ -0,0 +1,188 @@ + +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import logging +import requests +from typing import Tuple +from common.proto.context_pb2 import Empty +from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import KafkaException +from confluent_kafka.admin import AdminClient, NewTopic +from common.proto.telemetry_frontend_pb2 import Collector, CollectorId +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method + +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend') +ACTIVE_KAFKA_PRODUCERS = [] # list of active kafka producers + +class TelemetryBackendServiceImpl: + """ + Class to fetch metrics from Exporter and produce them to Kafka. + """ + + def __init__(self, bootstrap_servers=None, exporter_endpoint=None, + kafka_topic=None, run_duration=None, fetch_interval=None): + """ + Constructor to initialize Kafka producer parameters. + Args: + bootstrap_servers (str): Kafka broker address. + exporter_endpoint (str): Node Exporter metrics endpoint. + kafka_topic (str): Kafka topic to produce metrics to. + run_interval (int): Time interval in seconds to run the producer. + """ + LOGGER.info('Init TelemetryBackendService') + + self.bootstrap_servers = bootstrap_servers + self.exporter_endpoint = exporter_endpoint + self.kafka_topic = kafka_topic + self.run_duration = run_duration + self.fetch_interval = fetch_interval + + # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def export_collector_value(self, request : Collector) -> Tuple[str, str]: # type: ignore + response = Tuple[str, str] + collector_id = str('test collector Id') + collected_Value = str('test collected value') # Metric to be fetched from endpoint based on Collector message + response = (collector_id, collected_Value) + return response + + # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def write_to_kafka(self, request: Tuple[str, str]) -> KafkaProducer: + (collector_id, collector_value) = request + response = KafkaProducer({'bootstrap.servers': self.bootstrap_servers}) + # _collector_id, _collector_id_value = request + # write collector_id and collector_id value on the Kafka topic + + # get kafka bootstrap server and topic name + # write to kafka topic + + return response + + def stop_producer(self, request: KafkaProducer) -> Empty: # type: ignore + # flush and close kafka producer object + return Empty() + +# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- + + def fetch_node_exporter_metrics(self): + """ + Method to fetch metrics from Node Exporter. + Returns: + str: Metrics fetched from Node Exporter. + """ + KPI = "node_network_receive_packets_total" + try: + response = requests.get(self.exporter_endpoint) # type: ignore + if response.status_code == 200: + # print(f"Metrics fetched sucessfully...") + metrics = response.text + # Check if the desired metric is available in the response + if KPI in metrics: + KPI_VALUE = self.extract_metric_value(metrics, KPI) + # Extract the metric value + if KPI_VALUE is not None: + print(f"KPI value: {KPI_VALUE}") + return KPI_VALUE + else: + print(f"Failed to fetch metrics. Status code: {response.status_code}") + return None + except Exception as e: + print(f"Failed to fetch metrics: {str(e)}") + return None + + def extract_metric_value(self, metrics, metric_name): + """ + Method to extract the value of a metric from the metrics string. + Args: + metrics (str): Metrics string fetched from Node Exporter. + metric_name (str): Name of the metric to extract. + Returns: + float: Value of the extracted metric, or None if not found. + """ + try: + # Find the metric line containing the desired metric name + metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name)) + # Split the line to extract the metric value + metric_value = float(metric_line.split()[1]) + return metric_value + except StopIteration: + print(f"Metric '{metric_name}' not found in the metrics.") + return None + + def delivery_callback(self, err, msg): + """ + Callback function to handle message delivery status. + Args: + err (KafkaError): Kafka error object. + msg (Message): Kafka message object. + """ + if err: + print(f'Message delivery failed: {err}') + else: + print(f'Message delivered to topic {msg.topic()}') + + def create_topic_if_not_exists(self, admin_client): + """ + Method to create Kafka topic if it does not exist. + Args: + admin_client (AdminClient): Kafka admin client. + """ + try: + topic_metadata = admin_client.list_topics(timeout=5) + if self.kafka_topic not in topic_metadata.topics: + # If the topic does not exist, create a new topic + print(f"Topic '{self.kafka_topic}' does not exist. Creating...") + new_topic = NewTopic(self.kafka_topic, num_partitions=1, replication_factor=1) + admin_client.create_topics([new_topic]) + except KafkaException as e: + print(f"Failed to create topic: {e}") + + def produce_metrics(self): + """ + Method to produce metrics to Kafka topic as per Kafka configs. + """ + conf = { + 'bootstrap.servers': self.bootstrap_servers, + } + + admin_client = AdminClient(conf) + self.create_topic_if_not_exists(admin_client) + + kafka_producer = KafkaProducer(conf) + + try: + start_time = time.time() + while True: + metrics = self.fetch_node_exporter_metrics() # select the function name based on the provided requirements + + if metrics: + kafka_producer.produce(self.kafka_topic, str(metrics), callback=self.delivery_callback) + kafka_producer.flush() + # print("Metrics produced to Kafka topic") + + # Check if the specified run duration has elapsed + if time.time() - start_time >= self.run_duration: # type: ignore + break + + # waiting time until next fetch + time.sleep(self.fetch_interval) # type: ignore + except KeyboardInterrupt: + print("Keyboard interrupt detected. Exiting...") + finally: + kafka_producer.flush() + # kafka_producer.close() # this command generates ERROR + +# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter ----------- \ No newline at end of file diff --git a/src/telemetry/backend/service/__init__.py b/src/telemetry/backend/service/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..38d04994fb0fa1951fb465bc127eb72659dc2eaf --- /dev/null +++ b/src/telemetry/backend/service/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/telemetry/backend/tests/__init__.py b/src/telemetry/backend/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..38d04994fb0fa1951fb465bc127eb72659dc2eaf --- /dev/null +++ b/src/telemetry/backend/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/telemetry/backend/tests/messagesBackend.py b/src/telemetry/backend/tests/messagesBackend.py new file mode 100644 index 0000000000000000000000000000000000000000..ef12353837ece7093ef44e153c5e47b806223002 --- /dev/null +++ b/src/telemetry/backend/tests/messagesBackend.py @@ -0,0 +1,32 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def create_kafka_config_a(bootstrap_server: str, exporter_endpoint: str, kafka_topic: str, + run_duration: int, fetch_interval: int): + """ + Provide ... + Bootstrap_server IP address as String. + Exporter endpoint with port address as String. + Kafka topic name as String. + Total duration of the test as Int. + Fetch_interval as Int. + """ + _bootstrap_servers = bootstrap_server + _exporter_endpoint = exporter_endpoint + _kafka_topic = kafka_topic + _run_duration = run_duration + _fetch_interval = fetch_interval + + return _bootstrap_servers, _exporter_endpoint, _kafka_topic, _run_duration, _fetch_interval diff --git a/src/telemetry/backend/tests/testTelemetryBackend.py b/src/telemetry/backend/tests/testTelemetryBackend.py new file mode 100644 index 0000000000000000000000000000000000000000..8c3fbd2478cccbd94461451c20df4ff6b36509c7 --- /dev/null +++ b/src/telemetry/backend/tests/testTelemetryBackend.py @@ -0,0 +1,59 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# import sys +# print (sys.path) +import logging +from typing import Tuple +from confluent_kafka import Producer as KafkaProducer +from common.proto.context_pb2 import Empty +from src.telemetry.frontend.tests.Messages import create_collector_request, create_collector_id +from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService +from src.telemetry.backend.service.TelemetryBackendServiceImpl import TelemetryBackendServiceImpl + +LOGGER = logging.getLogger(__name__) + + +########################### +# Tests Implementation of Telemetry Backend +########################### +def test_get_kafka_configs(): + LOGGER.warning('test_get_kafka_configs requesting') + TelemetryBackendServiceObj = TelemetryBackendService() + response = TelemetryBackendServiceObj.generate_kafka_configs() + LOGGER.debug(str(response)) + assert isinstance(response, dict) + +def test_export_collector_value(): + LOGGER.warning('test_export_collector_value requesting') + TelemetryBackendServiceObj = TelemetryBackendServiceImpl() + response = TelemetryBackendServiceObj.export_collector_value(create_collector_request('1')) + LOGGER.debug(str(response)) + assert isinstance(response, Tuple) + +def test_write_to_kafka(): + LOGGER.warning('test_write_to_kafka requesting') + TelemetryBackendServiceObj = TelemetryBackendServiceImpl() + _collector_value = TelemetryBackendServiceObj.export_collector_value(create_collector_request('1')) + response = TelemetryBackendServiceObj.write_to_kafka(_collector_value) + LOGGER.debug(str(response)) + assert isinstance(response, KafkaProducer) + +def test_stop_producer(): + LOGGER.warning('test_write_to_kafka requesting') + _kafka_configs = {'bootstrap.servers': '127.0.0.1:9092'} + TelemetryBackendServiceObj = TelemetryBackendServiceImpl() + response = TelemetryBackendServiceObj.stop_producer(KafkaProducer(_kafka_configs)) + LOGGER.debug(str(response)) + assert isinstance(response, Empty) \ No newline at end of file diff --git a/src/telemetry/frontend/__init__.py b/src/telemetry/frontend/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..6a8f397461ef6ef2fbcf09b6078482c2df954a1e --- /dev/null +++ b/src/telemetry/frontend/__init__.py @@ -0,0 +1,15 @@ + +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/telemetry/frontend/client/TelemetryFrontendClient.py b/src/telemetry/frontend/client/TelemetryFrontendClient.py new file mode 100644 index 0000000000000000000000000000000000000000..9b4e27b367579240e3d77056b4a862f17590cada --- /dev/null +++ b/src/telemetry/frontend/client/TelemetryFrontendClient.py @@ -0,0 +1,70 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, logging +from common.Constants import ServiceNameEnum +from common.Settings import get_service_host, get_service_port_grpc + +from common.proto.context_pb2 import Empty +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.tools.client.RetryDecorator import retry, delay_exponential +from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceStub +from common.proto.telemetry_frontend_pb2 import Collector, CollectorId, CollectorFilter, CollectorList + +LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 10 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') + +class TelemetryFrontendClient: + def __init__(self, host=None, port=None): + if not host: host = get_service_host(ServiceNameEnum.TELEMETRYFRONTEND) + if not port: port = get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND) + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) + LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) + self.channel = None + self.stub = None + self.connect() + LOGGER.debug('Channel created') + + def connect(self): + self.channel = grpc.insecure_channel(self.endpoint) + self.stub = TelemetryFrontendServiceStub(self.channel) + + def close(self): + if self.channel is not None: self.channel.close() + self.channel = None + self.stub = None + + @RETRY_DECORATOR + def StartCollector(self, request : Collector) -> CollectorId: # type: ignore + LOGGER.debug('StartCollector: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.StartCollector(request) + LOGGER.debug('StartCollector result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def StopCollector(self, request : CollectorId) -> Empty: # type: ignore + LOGGER.debug('StopCollector: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.StopCollector(request) + LOGGER.debug('StopCollector result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def SelectCollectors(self, request : CollectorFilter) -> CollectorList: # type: ignore + LOGGER.debug('SelectCollectors: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.SelectCollectors(request) + LOGGER.debug('SelectCollectors result: {:s}'.format(grpc_message_to_json_string(response))) + return response + diff --git a/src/telemetry/frontend/client/__init__.py b/src/telemetry/frontend/client/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1549d9811aa5d1c193a44ad45d0d7773236c0612 --- /dev/null +++ b/src/telemetry/frontend/client/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/telemetry/frontend/service/TelemetryFrontendService.py b/src/telemetry/frontend/service/TelemetryFrontendService.py new file mode 100644 index 0000000000000000000000000000000000000000..522d125e6f7d01ecd603c761db1461b35c6d614a --- /dev/null +++ b/src/telemetry/frontend/service/TelemetryFrontendService.py @@ -0,0 +1,30 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.Constants import ServiceNameEnum +from common.Settings import get_service_port_grpc +from monitoring.service.NameMapping import NameMapping +from common.tools.service.GenericGrpcService import GenericGrpcService +from common.proto.telemetry_frontend_pb2_grpc import add_TelemetryFrontendServiceServicer_to_server +from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl + + +class TelemetryFrontendService(GenericGrpcService): + def __init__(self, name_mapping : NameMapping, cls_name: str = __name__) -> None: + port = get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND) + super().__init__(port, cls_name=cls_name) + self.telemetry_frontend_servicer = TelemetryFrontendServiceServicerImpl(name_mapping) + + def install_servicers(self): + add_TelemetryFrontendServiceServicer_to_server(self.telemetry_frontend_servicer, self.server) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py new file mode 100644 index 0000000000000000000000000000000000000000..498d07a91d43d28ce65bc5eaa980d44541348013 --- /dev/null +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -0,0 +1,53 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc +import logging +from common.proto.context_pb2 import Empty +from monitoring.service.NameMapping import NameMapping +from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, CollectorFilter, CollectorList +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer + + +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('Monitoring', 'TelemetryFrontend') + +class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): + def __init__(self, name_mapping : NameMapping): + LOGGER.info('Init TelemetryFrontendService') + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def StartCollector(self, request : Collector, grpc_context: grpc.ServicerContext # type: ignore + ) -> CollectorId: # type: ignore + response = CollectorId() + _collector_id = request.collector_id + # collector_kpi_id = request.kpi_id + # collector_duration = request.duration_s + # collector_interval = request.interval_s + + response.collector_id.uuid = _collector_id.collector_id.uuid + return response + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def StopCollector(self, request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore + ) -> Empty: # type: ignore + request.collector_id.uuid = "" + return Empty() + + def SelectCollectors(self, request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore + ) -> CollectorList: # type: ignore + response = CollectorList() + + return response \ No newline at end of file diff --git a/src/telemetry/frontend/service/__init__.py b/src/telemetry/frontend/service/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1549d9811aa5d1c193a44ad45d0d7773236c0612 --- /dev/null +++ b/src/telemetry/frontend/service/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/telemetry/frontend/service/__main__.py b/src/telemetry/frontend/service/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..0f48a4de10168ec5d238a2f0bbdd7a97b0e481c5 --- /dev/null +++ b/src/telemetry/frontend/service/__main__.py @@ -0,0 +1,72 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import signal +import sys +import logging, threading +from prometheus_client import start_http_server +from monitoring.service.NameMapping import NameMapping +from .TelemetryFrontendService import TelemetryFrontendService +from monitoring.service.EventTools import EventsDeviceCollector +from common.Settings import ( + get_log_level, wait_for_environment_variables, get_env_var_name, + get_metrics_port ) + +terminate = threading.Event() +LOGGER = None + +def signal_handler(signal, frame): # pylint: disable=redefined-outer-name + LOGGER.warning('Terminate signal received') + terminate.set() + +def main(): + global LOGGER + + log_level = get_log_level() + logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") + LOGGER = logging.getLogger(__name__) + +# ------- will be added later -------------- + # wait_for_environment_variables([ + # get_env_var_name + + + # ]) +# ------- will be added later -------------- + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + LOGGER.info('Starting...') + + # Start metrics server + metrics_port = get_metrics_port() + start_http_server(metrics_port) + + name_mapping = NameMapping() + + grpc_service = TelemetryFrontendService(name_mapping) + grpc_service.start() + + # Wait for Ctrl+C or termination signal + while not terminate.wait(timeout=1.0): pass + + LOGGER.info('Terminating...') + grpc_service.stop() + + LOGGER.info('Bye') + return 0 + +if __name__ == '__main__': + sys.exit(main()) \ No newline at end of file diff --git a/src/telemetry/frontend/tests/Messages.py b/src/telemetry/frontend/tests/Messages.py new file mode 100644 index 0000000000000000000000000000000000000000..d323aa7fd0169deb97e3eb8b7d99987473d16fbf --- /dev/null +++ b/src/telemetry/frontend/tests/Messages.py @@ -0,0 +1,65 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.proto import telemetry_frontend_pb2 +from common.proto.kpi_sample_types_pb2 import KpiSampleType + +def create_collector_id(coll_id_str : str): + _collector_id = telemetry_frontend_pb2.CollectorId() + _collector_id.collector_id.uuid = str(coll_id_str) + return _collector_id + +def create_collector_request(coll_id_str : str): + _create_collector_request = telemetry_frontend_pb2.Collector() + _create_collector_request.collector_id.collector_id.uuid = str(coll_id_str) + _create_collector_request.kpi_id.kpi_id.uuid = 'KPIid' + str(coll_id_str) + _create_collector_request.duration_s = float(-1) + _create_collector_request.interval_s = float(-1) + return _create_collector_request + +def create_collector_request_a(): + _create_collector_request_a = telemetry_frontend_pb2.Collector() + _create_collector_request_a.collector_id.collector_id.uuid = "-1" + return _create_collector_request_a + +def create_collector_request_b(str_kpi_id, coll_duration_s, coll_interval_s): + _create_collector_request_b = telemetry_frontend_pb2.Collector() + _create_collector_request_b.collector_id.collector_id.uuid = '-1' + _create_collector_request_b.kpi_id.kpi_id.uuid = str_kpi_id + _create_collector_request_b.duration_s = coll_duration_s + _create_collector_request_b.interval_s = coll_interval_s + return _create_collector_request_b + +def create_collector_filter(): + _create_collector_filter = telemetry_frontend_pb2.CollectorFilter() + new_collector_id = _create_collector_filter.collector_id.add() + new_collector_id.collector_id.uuid = "COLL1" + new_kpi_id = _create_collector_filter.kpi_id.add() + new_kpi_id.kpi_id.uuid = "KPI1" + new_device_id = _create_collector_filter.device_id.add() + new_device_id.device_uuid.uuid = 'DEV1' + new_service_id = _create_collector_filter.service_id.add() + new_service_id.service_uuid.uuid = 'SERV1' + new_slice_id = _create_collector_filter.slice_id.add() + new_slice_id.slice_uuid.uuid = 'SLC1' + new_endpoint_id = _create_collector_filter.endpoint_id.add() + new_endpoint_id.endpoint_uuid.uuid = 'END1' + new_connection_id = _create_collector_filter.connection_id.add() + new_connection_id.connection_uuid.uuid = 'CON1' + _create_collector_filter.kpi_sample_type.append(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED) + return _create_collector_filter + +def create_collector_list(): + _create_collector_list = telemetry_frontend_pb2.CollectorList() + return _create_collector_list \ No newline at end of file diff --git a/src/telemetry/frontend/tests/__init__.py b/src/telemetry/frontend/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1549d9811aa5d1c193a44ad45d0d7773236c0612 --- /dev/null +++ b/src/telemetry/frontend/tests/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/telemetry/frontend/tests/test_unitary.py b/src/telemetry/frontend/tests/test_unitary.py new file mode 100644 index 0000000000000000000000000000000000000000..312695659783a0ec59e0163605a857760cd0d2cb --- /dev/null +++ b/src/telemetry/frontend/tests/test_unitary.py @@ -0,0 +1,195 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import pytest +import logging +from typing import Union + +from common.proto.context_pb2 import Empty +from common.Constants import ServiceNameEnum +from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList +from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server +from context.client.ContextClient import ContextClient +from common.tools.service.GenericGrpcService import GenericGrpcService +from common.tests.MockServicerImpl_Context import MockServicerImpl_Context +from common.Settings import ( + get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC) + +from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient +from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService +from telemetry.frontend.tests.Messages import ( create_collector_id, create_collector_request, + create_collector_filter, create_collector_request_a, create_collector_request_b) + +from device.client.DeviceClient import DeviceClient +from device.service.DeviceService import DeviceService +from device.service.driver_api.DriverFactory import DriverFactory +from device.service.driver_api.DriverInstanceCache import DriverInstanceCache + +from monitoring.service.NameMapping import NameMapping + +os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE' +from device.service.drivers import DRIVERS + +########################### +# Tests Setup +########################### + +LOCAL_HOST = '127.0.0.1' +MOCKSERVICE_PORT = 10000 + +TELEMETRY_FRONTEND_PORT = MOCKSERVICE_PORT + get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND) +os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(TELEMETRY_FRONTEND_PORT) + +LOGGER = logging.getLogger(__name__) + +class MockContextService(GenericGrpcService): + # Mock Service implementing Context to simplify unitary tests of Monitoring + + def __init__(self, bind_port: Union[str, int]) -> None: + super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService') + + # pylint: disable=attribute-defined-outside-init + def install_servicers(self): + self.context_servicer = MockServicerImpl_Context() + add_ContextServiceServicer_to_server(self.context_servicer, self.server) + +@pytest.fixture(scope='session') +def context_service(): + LOGGER.info('Initializing MockContextService...') + _service = MockContextService(MOCKSERVICE_PORT) + _service.start() + + LOGGER.info('Yielding MockContextService...') + yield _service + + LOGGER.info('Terminating MockContextService...') + _service.context_servicer.msg_broker.terminate() + _service.stop() + + LOGGER.info('Terminated MockContextService...') + +@pytest.fixture(scope='session') +def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument + LOGGER.info('Initializing ContextClient...') + _client = ContextClient() + + LOGGER.info('Yielding ContextClient...') + yield _client + + LOGGER.info('Closing ContextClient...') + _client.close() + + LOGGER.info('Closed ContextClient...') + +@pytest.fixture(scope='session') +def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument + LOGGER.info('Initializing DeviceService...') + driver_factory = DriverFactory(DRIVERS) + driver_instance_cache = DriverInstanceCache(driver_factory) + _service = DeviceService(driver_instance_cache) + _service.start() + + # yield the server, when test finishes, execution will resume to stop it + LOGGER.info('Yielding DeviceService...') + yield _service + + LOGGER.info('Terminating DeviceService...') + _service.stop() + + LOGGER.info('Terminated DeviceService...') + +@pytest.fixture(scope='session') +def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument + LOGGER.info('Initializing DeviceClient...') + _client = DeviceClient() + + LOGGER.info('Yielding DeviceClient...') + yield _client + + LOGGER.info('Closing DeviceClient...') + _client.close() + + LOGGER.info('Closed DeviceClient...') + +@pytest.fixture(scope='session') +def telemetryFrontend_service( + context_service : MockContextService, + device_service : DeviceService + ): + LOGGER.info('Initializing TelemetryFrontendService...') + name_mapping = NameMapping() + + _service = TelemetryFrontendService(name_mapping) + _service.start() + + # yield the server, when test finishes, execution will resume to stop it + LOGGER.info('Yielding TelemetryFrontendService...') + yield _service + + LOGGER.info('Terminating TelemetryFrontendService...') + _service.stop() + + LOGGER.info('Terminated TelemetryFrontendService...') + +@pytest.fixture(scope='session') +def telemetryFrontend_client( + telemetryFrontend_service : TelemetryFrontendService + ): + LOGGER.info('Initializing TelemetryFrontendClient...') + _client = TelemetryFrontendClient() + + # yield the server, when test finishes, execution will resume to stop it + LOGGER.info('Yielding TelemetryFrontendClient...') + yield _client + + LOGGER.info('Closing TelemetryFrontendClient...') + _client.close() + + LOGGER.info('Closed TelemetryFrontendClient...') + + +########################### +# Tests Implementation of Telemetry Frontend +########################### +def test_start_collector(telemetryFrontend_client): + LOGGER.warning('test_start_collector requesting') + response = telemetryFrontend_client.StartCollector(create_collector_request('1')) + LOGGER.debug(str(response)) + assert isinstance(response, CollectorId) + +def test_start_collector_a(telemetryFrontend_client): + LOGGER.warning('test_start_collector requesting') + response = telemetryFrontend_client.StartCollector(create_collector_request_a()) + LOGGER.debug(str(response)) + assert isinstance(response, CollectorId) + +def test_start_collector_b(telemetryFrontend_client): + LOGGER.warning('test_start_collector requesting') + response = telemetryFrontend_client.StartCollector(create_collector_request_b('1',10,2)) + LOGGER.debug(str(response)) + assert isinstance(response, CollectorId) + +def test_stop_collector(telemetryFrontend_client): + LOGGER.warning('test_stop_collector requesting') + response = telemetryFrontend_client.StopCollector(create_collector_id("1")) + LOGGER.debug(str(response)) + assert isinstance(response, Empty) + +def test_select_collectors(telemetryFrontend_client): + LOGGER.warning('test_select_collector requesting') + response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) + LOGGER.debug(str(response)) + assert isinstance(response, CollectorList) \ No newline at end of file diff --git a/src/telemetry/requirements.in b/src/telemetry/requirements.in new file mode 100644 index 0000000000000000000000000000000000000000..1dd24fe32295556b383ad78c94b844ca3b91c176 --- /dev/null +++ b/src/telemetry/requirements.in @@ -0,0 +1,24 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +anytree==2.8.0 +APScheduler==3.10.1 +influx-line-protocol==0.1.4 +psycopg2-binary==2.9.3 +python-dateutil==2.8.2 +python-json-logger==2.0.2 +pytz==2024.1 +questdb==1.0.1 +requests==2.27.1 +xmltodict==0.12.0 \ No newline at end of file diff --git a/src/telemetry/telemetry_virenv.txt b/src/telemetry/telemetry_virenv.txt new file mode 100644 index 0000000000000000000000000000000000000000..0ce9b803a495183439a03b58d20ca02c7ae25c7a --- /dev/null +++ b/src/telemetry/telemetry_virenv.txt @@ -0,0 +1,44 @@ +anytree==2.8.0 +APScheduler==3.10.1 +attrs==23.2.0 +certifi==2024.2.2 +charset-normalizer==2.0.12 +colorama==0.4.6 +confluent-kafka==2.3.0 +coverage==6.3 +future-fstrings==1.2.0 +grpcio==1.47.5 +grpcio-health-checking==1.47.5 +grpcio-tools==1.47.5 +grpclib==0.4.4 +h2==4.1.0 +hpack==4.0.0 +hyperframe==6.0.1 +idna==3.7 +influx-line-protocol==0.1.4 +iniconfig==2.0.0 +kafka-python==2.0.2 +multidict==6.0.5 +networkx==3.3 +packaging==24.0 +pluggy==1.5.0 +prettytable==3.5.0 +prometheus-client==0.13.0 +protobuf==3.20.3 +psycopg2-binary==2.9.3 +py==1.11.0 +py-cpuinfo==9.0.0 +pytest==6.2.5 +pytest-benchmark==3.4.1 +pytest-depends==1.0.1 +python-dateutil==2.8.2 +python-json-logger==2.0.2 +pytz==2024.1 +questdb==1.0.1 +requests==2.27.1 +six==1.16.0 +toml==0.10.2 +tzlocal==5.2 +urllib3==1.26.18 +wcwidth==0.2.13 +xmltodict==0.12.0