Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 788 additions and 93 deletions
! device: r1 (cEOSLab, EOS-4.31.2F-35442176.4312F (engineering build))
!
no aaa root
!
username admin privilege 15 role network-admin secret sha512 $6$tUMBMqI5iPca5XcJ$5QU/R83S.zjpHQyeB3H63BGWOgxewjqZ1NsxdaWPo3gLwRXVTrgYvMmwwZlzjYoqrD7yp7e9YD073/.FKLYEY1
!
transceiver qsfp default-mode 4x10G
!
service routing protocols model multi-agent
!
hostname r1
!
spanning-tree mode mstp
!
system l1
unsupported speed action error
unsupported error-correction action error
!
management api http-commands
no shutdown
!
management api gnmi
transport grpc default
!
management api netconf
transport ssh default
!
interface Ethernet2
!
interface Ethernet10
!
interface Management0
ip address 172.20.20.101/24
!
ip routing
!
ip route 0.0.0.0/0 172.20.20.1
!
end
! device: r2 (cEOSLab, EOS-4.31.2F-35442176.4312F (engineering build))
!
no aaa root
!
username admin privilege 15 role network-admin secret sha512 $6$Z/om4jI3S5BmwxfB$igaSOaJnh3m36TbSMHKCusA77m07CU8JJxalupXIUFuy7HaGt6k.C1kfSJsPqjn1AhLaL.LvLkt/hcqTFgpjG.
!
transceiver qsfp default-mode 4x10G
!
service routing protocols model multi-agent
!
hostname r2
!
spanning-tree mode mstp
!
system l1
unsupported speed action error
unsupported error-correction action error
!
management api http-commands
no shutdown
!
management api gnmi
transport grpc default
!
management api netconf
transport ssh default
!
interface Ethernet1
!
interface Ethernet10
!
interface Management0
ip address 172.20.20.102/24
!
ip routing
!
ip route 0.0.0.0/0 172.20.20.1
!
end
#!/bin/bash
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
source ~/tfs-ctrl/hackfest5/deploy_specs.sh
echo "Cleaning-up old NATS and Kafka deployments..."
helm3 uninstall --namespace ${NATS_NAMESPACE} ${NATS_NAMESPACE}
kubectl delete namespace ${NATS_NAMESPACE} --ignore-not-found
kubectl delete namespace kafka --ignore-not-found
printf "\n"
echo "Deployting TeraFlowSDN..."
# Deploy CockroachDB
./deploy/crdb.sh
# Deploy NATS
./deploy/nats.sh
# Deploy QuestDB
./deploy/qdb.sh
# Expose Dashboard
./deploy/expose_dashboard.sh
# Deploy TeraFlowSDN
./deploy/tfs.sh
# Show deploy summary
./deploy/show.sh
printf "\n"
echo "Waiting for Context to be subscribed to NATS..."
while ! kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/contextservice -c server 2>&1 | grep -q 'Subscriber is Ready? True'; do
printf "%c" "."
sleep 1
done
kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/contextservice -c server
printf "\n"
...@@ -575,9 +575,9 @@ apiVersion: monitoring.coreos.com/v1 ...@@ -575,9 +575,9 @@ apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor kind: ServiceMonitor
metadata: metadata:
namespace: monitoring # namespace where prometheus is running namespace: monitoring # namespace where prometheus is running
name: tfs-kpi_value_apiservice-metric name: tfs-kpi-value-apiservice-metric
labels: labels:
app: kpi_value_apiservice app: kpi-value-apiservice
#release: prometheus #release: prometheus
#release: prom # name of the release #release: prom # name of the release
# ( VERY IMPORTANT: You need to know the correct release name by viewing # ( VERY IMPORTANT: You need to know the correct release name by viewing
...@@ -588,7 +588,7 @@ spec: ...@@ -588,7 +588,7 @@ spec:
matchLabels: matchLabels:
# Target app service # Target app service
#namespace: tfs #namespace: tfs
app: kpi_value_apiservice # same as above app: kpi-value-apiservice # same as above
#release: prometheus # same as above #release: prometheus # same as above
endpoints: endpoints:
- port: metrics # named port in target app - port: metrics # named port in target app
...@@ -604,9 +604,9 @@ apiVersion: monitoring.coreos.com/v1 ...@@ -604,9 +604,9 @@ apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor kind: ServiceMonitor
metadata: metadata:
namespace: monitoring # namespace where prometheus is running namespace: monitoring # namespace where prometheus is running
name: tfs-kpi_value_writerservice-metric name: tfs-kpi-value-writerservice-metric
labels: labels:
app: kpi_value_writerservice app: kpi-value-writerservice
#release: prometheus #release: prometheus
#release: prom # name of the release #release: prom # name of the release
# ( VERY IMPORTANT: You need to know the correct release name by viewing # ( VERY IMPORTANT: You need to know the correct release name by viewing
...@@ -617,7 +617,7 @@ spec: ...@@ -617,7 +617,7 @@ spec:
matchLabels: matchLabels:
# Target app service # Target app service
#namespace: tfs #namespace: tfs
app: kpi_value_writerservice # same as above app: kpi-value-writerservice # same as above
#release: prometheus # same as above #release: prometheus # same as above
endpoints: endpoints:
- port: metrics # named port in target app - port: metrics # named port in target app
......
...@@ -12,6 +12,17 @@ ...@@ -12,6 +12,17 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: grafana-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
...@@ -99,6 +110,13 @@ spec: ...@@ -99,6 +110,13 @@ spec:
limits: limits:
cpu: 500m cpu: 500m
memory: 1024Mi memory: 1024Mi
volumeMounts:
- mountPath: /var/lib/grafana
name: grafana-pv
volumes:
- name: grafana-pv
persistentVolumeClaim:
claimName: grafana-pvc
--- ---
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
......
...@@ -223,6 +223,9 @@ enum DeviceDriverEnum { ...@@ -223,6 +223,9 @@ enum DeviceDriverEnum {
DEVICEDRIVER_IETF_ACTN = 10; DEVICEDRIVER_IETF_ACTN = 10;
DEVICEDRIVER_OC = 11; DEVICEDRIVER_OC = 11;
DEVICEDRIVER_QKD = 12; DEVICEDRIVER_QKD = 12;
DEVICEDRIVER_IETF_L3VPN = 13;
DEVICEDRIVER_IETF_SLICE = 14;
DEVICEDRIVER_NCE = 15;
} }
enum DeviceOperationalStatusEnum { enum DeviceOperationalStatusEnum {
...@@ -258,6 +261,14 @@ message LinkId { ...@@ -258,6 +261,14 @@ message LinkId {
Uuid link_uuid = 1; Uuid link_uuid = 1;
} }
enum LinkTypeEnum {
LINKTYPE_UNKNOWN = 0;
LINKTYPE_COPPER = 1;
LINKTYPE_FIBER = 2;
LINKTYPE_RADIO = 3;
LINKTYPE_VIRTUAL = 4;
}
message LinkAttributes { message LinkAttributes {
float total_capacity_gbps = 1; float total_capacity_gbps = 1;
float used_capacity_gbps = 2; float used_capacity_gbps = 2;
...@@ -266,9 +277,9 @@ message LinkAttributes { ...@@ -266,9 +277,9 @@ message LinkAttributes {
message Link { message Link {
LinkId link_id = 1; LinkId link_id = 1;
string name = 2; string name = 2;
repeated EndPointId link_endpoint_ids = 3; LinkTypeEnum link_type = 3;
LinkAttributes attributes = 4; repeated EndPointId link_endpoint_ids = 4;
LinkTypeEnum link_type = 5; LinkAttributes attributes = 5;
} }
message LinkIdList { message LinkIdList {
...@@ -284,14 +295,6 @@ message LinkEvent { ...@@ -284,14 +295,6 @@ message LinkEvent {
LinkId link_id = 2; LinkId link_id = 2;
} }
enum LinkTypeEnum {
LINKTYPE_UNKNOWN = 0;
LINKTYPE_COPPER = 1;
LINKTYPE_VIRTUAL_COPPER = 2;
LINKTYPE_OPTICAL = 3;
LINKTYPE_VIRTUAL_OPTICAL = 4;
}
// ----- Service ------------------------------------------------------------------------------------------------------- // ----- Service -------------------------------------------------------------------------------------------------------
message ServiceId { message ServiceId {
ContextId context_id = 1; ContextId context_id = 1;
......
...@@ -39,4 +39,14 @@ enum KpiSampleType { ...@@ -39,4 +39,14 @@ enum KpiSampleType {
KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO = 605; KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO = 605;
KPISAMPLETYPE_SERVICE_LATENCY_MS = 701; KPISAMPLETYPE_SERVICE_LATENCY_MS = 701;
// output KPIs
KPISAMPLETYPE_PACKETS_TRANSMITTED_AGG_OUTPUT = 1101;
KPISAMPLETYPE_PACKETS_RECEIVED_AGG_OUTPUT = 1102;
KPISAMPLETYPE_PACKETS_DROPPED_AGG_OUTPUT = 1103;
KPISAMPLETYPE_BYTES_TRANSMITTED_AGG_OUTPUT = 1201;
KPISAMPLETYPE_BYTES_RECEIVED_AGG_OUTPUT = 1202;
KPISAMPLETYPE_BYTES_DROPPED_AGG_OUTPUT = 1203;
KPISAMPLETYPE_SERVICE_LATENCY_MS_AGG_OUTPUT = 1701;
} }
#!/bin/bash
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
# Ref: https://github.com/fullstorydev/grpcurl
source tfs_runtime_env_vars.sh
GRPC_ENDPOINT="$CONTEXTSERVICE_SERVICE_HOST:$CONTEXTSERVICE_SERVICE_PORT_GRPC"
GRP_CURL_CMD="docker run fullstorydev/grpcurl --plaintext $GRPC_ENDPOINT"
GRPC_SERVICES=`$GRP_CURL_CMD list`
echo "gRPC Services found in $GRPC_ENDPOINT:"
printf "\n"
for GRPC_SERVICE in $GRPC_SERVICES; do
echo "gRPC Service: $GRPC_SERVICE"
$GRP_CURL_CMD describe $GRPC_SERVICE
printf "\n"
done
echo "Done!"
#!/bin/bash
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
# Ref: https://github.com/fullstorydev/grpcurl
source tfs_runtime_env_vars.sh
GRPC_ENDPOINT="$DEVICESERVICE_SERVICE_HOST:$DEVICESERVICE_SERVICE_PORT_GRPC"
GRP_CURL_CMD="docker run fullstorydev/grpcurl --plaintext $GRPC_ENDPOINT"
GRPC_SERVICES=`$GRP_CURL_CMD list`
echo "gRPC Services found in $GRPC_ENDPOINT:"
printf "\n"
for GRPC_SERVICE in $GRPC_SERVICES; do
echo "gRPC Service: $GRPC_SERVICE"
$GRP_CURL_CMD describe $GRPC_SERVICE
printf "\n"
done
echo "Done!"
#!/bin/bash
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
# Ref: https://github.com/fullstorydev/grpcurl
source tfs_runtime_env_vars.sh
GRPC_ENDPOINT="$PATHCOMPSERVICE_SERVICE_HOST:$PATHCOMPSERVICE_SERVICE_PORT_GRPC"
GRP_CURL_CMD="docker run fullstorydev/grpcurl --plaintext $GRPC_ENDPOINT"
GRPC_SERVICES=`$GRP_CURL_CMD list`
echo "gRPC Services found in $GRPC_ENDPOINT:"
printf "\n"
for GRPC_SERVICE in $GRPC_SERVICES; do
echo "gRPC Service: $GRPC_SERVICE"
$GRP_CURL_CMD describe $GRPC_SERVICE
printf "\n"
done
echo "Done!"
#!/bin/bash
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
# Ref: https://github.com/fullstorydev/grpcurl
source tfs_runtime_env_vars.sh
GRPC_ENDPOINT="$SERVICESERVICE_SERVICE_HOST:$SERVICESERVICE_SERVICE_PORT_GRPC"
GRP_CURL_CMD="docker run fullstorydev/grpcurl --plaintext $GRPC_ENDPOINT"
GRPC_SERVICES=`$GRP_CURL_CMD list`
echo "gRPC Services found in $GRPC_ENDPOINT:"
printf "\n"
for GRPC_SERVICE in $GRPC_SERVICES; do
echo "gRPC Service: $GRPC_SERVICE"
$GRP_CURL_CMD describe $GRPC_SERVICE
printf "\n"
done
echo "Done!"
#!/bin/bash
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
# Ref: https://github.com/fullstorydev/grpcurl
source tfs_runtime_env_vars.sh
GRPC_ENDPOINT="$SLICESERVICE_SERVICE_HOST:$SLICESERVICE_SERVICE_PORT_GRPC"
GRP_CURL_CMD="docker run fullstorydev/grpcurl --plaintext $GRPC_ENDPOINT"
GRPC_SERVICES=`$GRP_CURL_CMD list`
echo "gRPC Services found in $GRPC_ENDPOINT:"
printf "\n"
for GRPC_SERVICE in $GRPC_SERVICES; do
echo "gRPC Service: $GRPC_SERVICE"
$GRP_CURL_CMD describe $GRPC_SERVICE
printf "\n"
done
echo "Done!"
...@@ -18,8 +18,11 @@ PROJECTDIR=`pwd` ...@@ -18,8 +18,11 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
export KFK_SERVER_ADDRESS='127.0.0.1:9092' export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require" export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \
analytics/backend/tests/test_backend.py analytics/backend/tests/test_backend.py
...@@ -18,8 +18,10 @@ PROJECTDIR=`pwd` ...@@ -18,8 +18,10 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
export KFK_SERVER_ADDRESS='127.0.0.1:9092' export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require" export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \
analytics/frontend/tests/test_frontend.py analytics/frontend/tests/test_frontend.py
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
# Run unitary tests and analyze coverage of code at same time
# helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
nbi/tests/test_slice_2.py
...@@ -18,15 +18,12 @@ PROJECTDIR=`pwd` ...@@ -18,15 +18,12 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc # RCFILE=$PROJECTDIR/coverage/.coveragerc
# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
# kpi_manager/tests/test_unitary.py
# python3 kpi_manager/tests/test_unitary.py
export KFK_SERVER_ADDRESS='127.0.0.1:9092' export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require" export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \ python3 -m pytest --log-level=debug --log-cli-level=debug --verbose \
telemetry/backend/tests/test_TelemetryBackend.py telemetry/backend/tests/test_backend.py
#!/bin/bash
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc
# export KFK_SERVER_ADDRESS='127.0.0.1:9092'
# CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
# export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=debug --log-cli-level=info --verbose \
telemetry/backend/tests/test_emulated.py
...@@ -16,109 +16,164 @@ import time ...@@ -16,109 +16,164 @@ import time
import json import json
import logging import logging
import threading import threading
from common.tools.service.GenericGrpcService import GenericGrpcService from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Consumer
from confluent_kafka import KafkaError from confluent_kafka import KafkaError, KafkaException
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc from common.Settings import get_service_port_grpc
from threading import Thread, Event from analytics.backend.service.Streamer import DaskStreamer
from .DaskStreaming import DaskStreamer from analytics.backend.service.AnalyzerHelper import AnalyzerHelper
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class AnalyticsBackendService(GenericGrpcService): class AnalyticsBackendService(GenericGrpcService):
""" """
Class listens for ... AnalyticsBackendService class is responsible for handling the requests from the AnalyticsFrontendService.
It listens to the Kafka topic for the requests to start and stop the Streamer accordingly.
It also initializes the Kafka producer and Dask cluster for the streamer.
""" """
def __init__(self, cls_name : str = __name__) -> None: def __init__(self, cls_name : str = __name__, n_workers=1, threads_per_worker=1
) -> None:
LOGGER.info('Init AnalyticsBackendService') LOGGER.info('Init AnalyticsBackendService')
port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND) port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND)
super().__init__(port, cls_name=cls_name) super().__init__(port, cls_name=cls_name)
self.running_threads = {} # To keep track of all running analyzers self.active_streamers = {}
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), self.central_producer = AnalyzerHelper.initialize_kafka_producer() # Multi-threaded producer
'group.id' : 'analytics-frontend', self.cluster = AnalyzerHelper.initialize_dask_cluster(
'auto.offset.reset' : 'latest'}) n_workers, threads_per_worker) # Local cluster
self.request_consumer = Consumer({
'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-backend',
'auto.offset.reset' : 'latest',
})
def install_servicers(self): def install_servicers(self):
threading.Thread(target=self.RequestListener, args=()).start() threading.Thread(
target=self.RequestListener,
args=()
).start()
def RequestListener(self): def RequestListener(self):
""" """
listener for requests on Kafka topic. listener for requests on Kafka topic.
""" """
LOGGER.info("Request Listener is initiated ...") LOGGER.info("Request Listener is initiated ...")
# print ("Request Listener is initiated ...") consumer = self.request_consumer
consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
while True: while True:
receive_msg = consumer.poll(2.0) message = consumer.poll(2.0)
if receive_msg is None: if message is None:
continue continue
elif receive_msg.error(): elif message.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF: if message.error().code() == KafkaError._PARTITION_EOF:
continue LOGGER.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}")
else: break
LOGGER.error("Consumer error: {:}".format(receive_msg.error())) elif message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
# print ("Consumer error: {:}".format(receive_msg.error())) LOGGER.error(f"Subscribed topic {message.topic()} does not exist. May be topic does not have any messages.")
break continue
elif message.error():
raise KafkaException(message.error())
try: try:
analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer = json.loads(message.value().decode('utf-8'))
analyzer_uuid = receive_msg.key().decode('utf-8') analyzer_uuid = message.key().decode('utf-8')
LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
# print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
self.StopDaskListener(analyzer_uuid) if self.StopStreamer(analyzer_uuid):
LOGGER.info("Dask Streamer stopped.")
else:
LOGGER.warning("Failed to stop Dask Streamer. May be already terminated...")
else: else:
self.StartDaskListener(analyzer_uuid, analyzer) if self.StartStreamer(analyzer_uuid, analyzer):
LOGGER.info("Dask Streamer started.")
else:
LOGGER.warning("Failed to start Dask Streamer.")
except Exception as e: except Exception as e:
LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
# print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
def StartDaskListener(self, analyzer_uuid, analyzer): def StartStreamer(self, analyzer_uuid : str, analyzer : dict):
kpi_list = analyzer[ 'input_kpis' ] """
thresholds = analyzer[ 'thresholds' ] Start the DaskStreamer with the given parameters.
window_size = analyzer[ 'window_size' ] """
window_slider = analyzer[ 'window_slider'] if analyzer_uuid in self.active_streamers:
LOGGER.warning("Dask Streamer already running with the given analyzer_uuid: {:}".format(analyzer_uuid))
LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format( return False
kpi_list, thresholds, window_size, window_slider))
# print ("Received parameters: {:} - {:} - {:} - {:}".format(
# kpi_list, thresholds, window_size, window_slider))
try: try:
stop_event = Event() streamer = DaskStreamer(
thread = Thread( key = analyzer_uuid,
target=DaskStreamer, input_kpis = analyzer['input_kpis' ],
# args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event), output_kpis = analyzer['output_kpis' ],
args=(analyzer['output_kpis'][0] , kpi_list, thresholds, stop_event), thresholds = analyzer['thresholds' ],
kwargs={ batch_size = analyzer['batch_size_min' ],
"window_size" : window_size, batch_duration = analyzer['batch_duration_min'],
} window_size = analyzer['window_size' ],
cluster_instance = self.cluster,
producer_instance = self.central_producer,
) )
thread.start() streamer.start()
self.running_threads[analyzer_uuid] = (thread, stop_event) LOGGER.info(f"Streamer started with analyzer Id: {analyzer_uuid}")
# print ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) # Stop the streamer after the given duration
duration = analyzer['duration']
if duration > 0:
def stop_after_duration():
time.sleep(duration)
LOGGER.warning(f"Execution duration ({duration}) completed of Analyzer: {analyzer_uuid}")
if not self.StopStreamer(analyzer_uuid):
LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.")
duration_thread = threading.Thread(
target=stop_after_duration, daemon=True, name=f"stop_after_duration_{analyzer_uuid}"
)
duration_thread.start()
self.active_streamers[analyzer_uuid] = streamer
return True return True
except Exception as e: except Exception as e:
# print ("Failed to initiate Analyzer backend: {:}".format(e)) LOGGER.error("Failed to start Dask Streamer. ERROR: {:}".format(e))
LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
return False return False
def StopDaskListener(self, analyzer_uuid): def StopStreamer(self, analyzer_uuid : str):
if analyzer_uuid in self.running_threads: """
try: Stop the DaskStreamer with the given analyzer_uuid.
thread, stop_event = self.running_threads[analyzer_uuid] """
stop_event.set() try:
thread.join() if analyzer_uuid not in self.active_streamers:
del self.running_threads[analyzer_uuid] LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid))
# print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
return True return True
except Exception as e: LOGGER.info(f"Terminating streamer with Analyzer Id: {analyzer_uuid}")
LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e)) streamer = self.active_streamers[analyzer_uuid]
return False streamer.stop()
else: streamer.join()
# print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) del self.active_streamers[analyzer_uuid]
LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been trerminated sucessfully.")
return True
except:
LOGGER.exception("Failed to stop Dask Streamer.")
return False
def close(self):
"""
Close the producer and cluster cleanly.
"""
if self.central_producer:
try:
self.central_producer.flush()
LOGGER.info("Kafka producer flushed and closed.")
except:
LOGGER.exception("Error closing Kafka producer")
if self.cluster:
try:
self.cluster.close()
LOGGER.info("Dask cluster closed.")
except:
LOGGER.exception("Error closing Dask cluster")
def stop(self):
self.close()
return super().stop()
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from enum import Enum
import pandas as pd
logger = logging.getLogger(__name__)
class Handlers(Enum):
AGGREGATION_HANDLER = "AggregationHandler"
UNSUPPORTED_HANDLER = "UnsupportedHandler"
@classmethod
def is_valid_handler(cls, handler_name):
return handler_name in cls._value2member_map_
# This method is top-level and should not be part of the class due to serialization issues.
def threshold_handler(key, aggregated_df, thresholds):
"""
Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
on the aggregated DataFrame.
Args:
key (str): Key for the aggregated DataFrame.
aggregated_df (pd.DataFrame): DataFrame with aggregated metrics.
thresholds (dict): Thresholds dictionary with keys in the format '<metricName>' and values as (fail_th, raise_th).
Returns:
pd.DataFrame: DataFrame with additional threshold columns.
"""
for metric_name, threshold_values in thresholds.items():
# Ensure the metric column exists in the DataFrame
if metric_name not in aggregated_df.columns:
logger.warning(f"Metric '{metric_name}' does not exist in the DataFrame for key: {key}. Skipping threshold application.")
continue
# Ensure the threshold values are valid (check for tuple specifically)
if isinstance(threshold_values, list) and len(threshold_values) == 2:
fail_th, raise_th = threshold_values
# Add threshold columns with updated naming
aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th
aggregated_df[f"{metric_name}_TH_FALL"] = aggregated_df[metric_name] < fail_th
else:
logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a list of length 2. Skipping threshold application.")
return aggregated_df
def aggregation_handler(
batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds
):
"""
Process a batch of data and calculate aggregated values for each input KPI
and maps them to the output KPIs. """
logger.info(f"({batch_type_name}) Processing batch for key: {key}")
if not batch:
logger.info("Empty batch received. Skipping processing.")
return []
else:
logger.info(f" >>>>> Processing {len(batch)} records for key: {key}")
# Convert data into a DataFrame
df = pd.DataFrame(batch)
# Filter the DataFrame to retain rows where kpi_id is in the input list (subscribed endpoints only)
df = df[df['kpi_id'].isin(input_kpi_list)].copy()
if df.empty:
logger.warning(f"No data available for KPIs: {input_kpi_list}. Skipping processing.")
return []
# Define all possible aggregation methods
aggregation_methods = {
"min" : ('kpi_value', 'min'),
"max" : ('kpi_value', 'max'),
"avg" : ('kpi_value', 'mean'),
"first" : ('kpi_value', lambda x: x.iloc[0]),
"last" : ('kpi_value', lambda x: x.iloc[-1]),
"variance": ('kpi_value', 'var'),
"count" : ('kpi_value', 'count'),
"range" : ('kpi_value', lambda x: x.max() - x.min()),
"sum" : ('kpi_value', 'sum'),
}
results = []
# Process each KPI-specific task parameter
for kpi_index, kpi_id in enumerate(input_kpi_list):
# logger.info(f"1.Processing KPI: {kpi_id}")
kpi_task_parameters = thresholds["task_parameter"][kpi_index]
# Get valid task parameters for this KPI
valid_task_parameters = [
method for method in kpi_task_parameters.keys()
if method in aggregation_methods
]
# Select the aggregation methods based on valid task parameters
selected_methods = {method: aggregation_methods[method] for method in valid_task_parameters}
# logger.info(f"2. Processing KPI: {kpi_id} with task parameters: {kpi_task_parameters}")
kpi_df = df[df['kpi_id'] == kpi_id]
# Check if kpi_df is not empty before applying the aggregation methods
if not kpi_df.empty:
agg_df = kpi_df.groupby('kpi_id').agg(**selected_methods).reset_index()
# logger.info(f"3. Aggregated DataFrame for KPI: {kpi_id}: {agg_df}")
agg_df['kpi_id'] = output_kpi_list[kpi_index]
# logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}")
record = threshold_handler(key, agg_df, kpi_task_parameters)
results.extend(record.to_dict(orient='records'))
else:
logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.")
continue
if results:
return results
else:
return []
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dask.distributed import Client, LocalCluster
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer, Producer
import logging
logger = logging.getLogger(__name__)
class AnalyzerHelper:
def __init__(self):
pass
@staticmethod
def initialize_dask_client(cluster_instance):
"""Initialize a local Dask client."""
if cluster_instance is None:
logger.error("Dask Cluster is not initialized. Exiting.")
return None
client = Client(cluster_instance)
logger.info(f"Dask Client Initialized: {client}")
return client
@staticmethod
def initialize_dask_cluster(n_workers=1, threads_per_worker=2):
"""Initialize a local Dask cluster"""
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker)
logger.info(f"Dask Cluster Initialized: {cluster}")
return cluster
@staticmethod
def initialize_kafka_consumer(): # TODO: update to receive topic and group_id as parameters
"""Initialize the Kafka consumer."""
consumer_conf = {
'bootstrap.servers': KafkaConfig.get_kafka_address(),
'group.id': 'analytics-backend',
'auto.offset.reset': 'latest'
}
consumer = Consumer(consumer_conf)
consumer.subscribe([KafkaTopic.VALUE.value])
return consumer
@staticmethod
def initialize_kafka_producer():
"""Initialize the Kafka producer."""
return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()})
@staticmethod
def delivery_report(err, msg):
if err is not None:
logger.error(f"Message delivery failed: {err}")
else:
logger.debug(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")