Skip to content
Snippets Groups Projects
Commit cd3a4525 authored by Konstantinos Poulakakis's avatar Konstantinos Poulakakis
Browse files

Add a kafka topic to policy. Spotless apply. Bug fixes on the monitoring component.

parent 38a64dc7
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!238Automation component skeleton
Showing
with 154 additions and 223 deletions
...@@ -207,24 +207,24 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"} ...@@ -207,24 +207,24 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"}
######################################################################################################################## ########################################################################################################################
# Deploy CockroachDB # Deploy CockroachDB
./deploy/crdb.sh #./deploy/crdb.sh
# Deploy NATS # Deploy NATS
./deploy/nats.sh #./deploy/nats.sh
# Deploy QuestDB # Deploy QuestDB
./deploy/qdb.sh #./deploy/qdb.sh
# Deploy Apache Kafka # Deploy Apache Kafka
./deploy/kafka.sh #./deploy/kafka.sh
# Expose Dashboard # Expose Dashboard
./deploy/expose_dashboard.sh #./deploy/expose_dashboard.sh
# Deploy TeraFlowSDN # Deploy TeraFlowSDN
./deploy/tfs.sh ./deploy/tfs.sh
# Show deploy summary # Show deploy summary
./deploy/show.sh #./deploy/show.sh
echo "Done!" echo "Done!"
...@@ -141,8 +141,8 @@ TMP_LOGS_FOLDER="${TMP_FOLDER}/${TFS_K8S_NAMESPACE}/logs" ...@@ -141,8 +141,8 @@ TMP_LOGS_FOLDER="${TMP_FOLDER}/${TFS_K8S_NAMESPACE}/logs"
mkdir -p $TMP_LOGS_FOLDER mkdir -p $TMP_LOGS_FOLDER
echo "Deleting and Creating a new namespace..." echo "Deleting and Creating a new namespace..."
kubectl delete namespace $TFS_K8S_NAMESPACE --ignore-not-found #kubectl delete namespace $TFS_K8S_NAMESPACE --ignore-not-found
kubectl create namespace $TFS_K8S_NAMESPACE #kubectl create namespace $TFS_K8S_NAMESPACE
sleep 2 sleep 2
printf "\n" printf "\n"
...@@ -252,7 +252,7 @@ echo "export PYTHONPATH=${PYTHONPATH}" >> $ENV_VARS_SCRIPT ...@@ -252,7 +252,7 @@ echo "export PYTHONPATH=${PYTHONPATH}" >> $ENV_VARS_SCRIPT
echo "Create Redis secret..." echo "Create Redis secret..."
# first try to delete an old one if exists # first try to delete an old one if exists
kubectl delete secret redis-secrets --namespace=$TFS_K8S_NAMESPACE --ignore-not-found #kubectl delete secret redis-secrets --namespace=$TFS_K8S_NAMESPACE --ignore-not-found
REDIS_PASSWORD=`uuidgen` REDIS_PASSWORD=`uuidgen`
kubectl create secret generic redis-secrets --namespace=$TFS_K8S_NAMESPACE \ kubectl create secret generic redis-secrets --namespace=$TFS_K8S_NAMESPACE \
--from-literal=REDIS_PASSWORD=$REDIS_PASSWORD --from-literal=REDIS_PASSWORD=$REDIS_PASSWORD
......
...@@ -84,7 +84,7 @@ spec: ...@@ -84,7 +84,7 @@ spec:
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
metadata: metadata:
name: analyticsservice name: analytics-frontendservice
labels: labels:
app: analyticsservice app: analyticsservice
spec: spec:
......
...@@ -40,6 +40,11 @@ spec: ...@@ -40,6 +40,11 @@ spec:
env: env:
- name: LOG_LEVEL - name: LOG_LEVEL
value: "INFO" value: "INFO"
envFrom:
- secretRef:
name: crdb-analytics
- secretRef:
name: kfk-kpi-data
startupProbe: startupProbe:
exec: exec:
command: ["/bin/grpc_health_probe", "-addr=:30200"] command: ["/bin/grpc_health_probe", "-addr=:30200"]
......
...@@ -84,7 +84,7 @@ spec: ...@@ -84,7 +84,7 @@ spec:
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
metadata: metadata:
name: telemetryservice name: telemetry-frontendservice
labels: labels:
app: telemetryservice app: telemetryservice
spec: spec:
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/"
# Set the list of components, separated by spaces, you want to build images for, and deploy. # Set the list of components, separated by spaces, you want to build images for, and deploy.
export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_generator automation" export TFS_COMPONENTS="analytics"
# Uncomment to activate Monitoring (old) # Uncomment to activate Monitoring (old)
#export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring" #export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring"
...@@ -75,7 +75,7 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene ...@@ -75,7 +75,7 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene
# Set the tag you want to use for your images. # Set the tag you want to use for your images.
export TFS_IMAGE_TAG="dev" export TFS_IMAGE_TAG="panos31"
# Set the name of the Kubernetes namespace to deploy TFS to. # Set the name of the Kubernetes namespace to deploy TFS to.
export TFS_K8S_NAMESPACE="tfs" export TFS_K8S_NAMESPACE="tfs"
...@@ -124,7 +124,7 @@ export CRDB_DEPLOY_MODE="single" ...@@ -124,7 +124,7 @@ export CRDB_DEPLOY_MODE="single"
export CRDB_DROP_DATABASE_IF_EXISTS="" export CRDB_DROP_DATABASE_IF_EXISTS=""
# Disable flag for re-deploying CockroachDB from scratch. # Disable flag for re-deploying CockroachDB from scratch.
export CRDB_REDEPLOY="" export CRDB_REDEPLOY="YES"
# ----- NATS ------------------------------------------------------------------- # ----- NATS -------------------------------------------------------------------
...@@ -143,7 +143,7 @@ export NATS_EXT_PORT_HTTP="8222" ...@@ -143,7 +143,7 @@ export NATS_EXT_PORT_HTTP="8222"
export NATS_DEPLOY_MODE="single" export NATS_DEPLOY_MODE="single"
# Disable flag for re-deploying NATS from scratch. # Disable flag for re-deploying NATS from scratch.
export NATS_REDEPLOY="" export NATS_REDEPLOY="YES"
# ----- QuestDB ---------------------------------------------------------------- # ----- QuestDB ----------------------------------------------------------------
...@@ -176,7 +176,7 @@ export QDB_TABLE_SLICE_GROUPS="tfs_slice_groups" ...@@ -176,7 +176,7 @@ export QDB_TABLE_SLICE_GROUPS="tfs_slice_groups"
export QDB_DROP_TABLES_IF_EXIST="" export QDB_DROP_TABLES_IF_EXIST=""
# Disable flag for re-deploying QuestDB from scratch. # Disable flag for re-deploying QuestDB from scratch.
export QDB_REDEPLOY="" export QDB_REDEPLOY="YES"
# ----- K8s Observability ------------------------------------------------------ # ----- K8s Observability ------------------------------------------------------
......
...@@ -19,6 +19,6 @@ PROJECTDIR=`pwd` ...@@ -19,6 +19,6 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs-analyzer?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
analytics/tests/test_analytics_db.py analytics/tests/test_analytics_db.py
...@@ -21,7 +21,8 @@ cd $PROJECTDIR/src ...@@ -21,7 +21,8 @@ cd $PROJECTDIR/src
# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ # coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
# kpi_manager/tests/test_unitary.py # kpi_manager/tests/test_unitary.py
CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}') CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs-telemetry?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=debug --verbose \ python3 -m pytest --log-level=DEBUG --log-cli-level=debug --verbose \
telemetry/tests/test_telemetryDB.py telemetry/tests/test_telemetryDB.py
...@@ -36,7 +36,7 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -36,7 +36,7 @@ class AnalyticsBackendService(GenericGrpcService):
port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND) port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND)
super().__init__(port, cls_name=cls_name) super().__init__(port, cls_name=cls_name)
self.running_threads = {} # To keep track of all running analyzers self.running_threads = {} # To keep track of all running analyzers
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : '10.152.183.186:9092',
'group.id' : 'analytics-frontend', 'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'}) 'auto.offset.reset' : 'latest'})
...@@ -91,7 +91,7 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -91,7 +91,7 @@ class AnalyticsBackendService(GenericGrpcService):
thread = Thread( thread = Thread(
target=DaskStreamer, target=DaskStreamer,
# args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event), # args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event),
args=(analyzer_uuid, kpi_list, thresholds, stop_event), args=(analyzer['output_kpis'][0] , kpi_list, thresholds, stop_event),
kwargs={ kwargs={
"window_size" : window_size, "window_size" : window_size,
} }
......
...@@ -81,7 +81,7 @@ def delivery_report(err, msg): ...@@ -81,7 +81,7 @@ def delivery_report(err, msg):
else: else:
LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
def process_batch(batch, agg_mappings, thresholds): def process_batch(batch, agg_mappings, thresholds, key):
""" """
Process a batch of data and apply thresholds. Process a batch of data and apply thresholds.
Args: batch (list of dict): List of messages from Kafka. Args: batch (list of dict): List of messages from Kafka.
...@@ -93,9 +93,12 @@ def process_batch(batch, agg_mappings, thresholds): ...@@ -93,9 +93,12 @@ def process_batch(batch, agg_mappings, thresholds):
LOGGER.info("Empty batch received. Skipping processing.") LOGGER.info("Empty batch received. Skipping processing.")
return [] return []
df = pd.DataFrame(batch) df = pd.DataFrame(batch)
df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce') LOGGER.info(f"df {df} ")
df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce',unit='s')
df.dropna(subset=['time_stamp'], inplace=True) df.dropna(subset=['time_stamp'], inplace=True)
LOGGER.info(f"df {df} ")
required_columns = {'time_stamp', 'kpi_id', 'kpi_value'} required_columns = {'time_stamp', 'kpi_id', 'kpi_value'}
if not required_columns.issubset(df.columns): if not required_columns.issubset(df.columns):
LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.") LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.")
...@@ -107,13 +110,14 @@ def process_batch(batch, agg_mappings, thresholds): ...@@ -107,13 +110,14 @@ def process_batch(batch, agg_mappings, thresholds):
# Perform aggregations using named aggregation # Perform aggregations using named aggregation
try: try:
agg_dict = {key: value for key, value in agg_mappings.items()} agg_dict = {key: value for key, value in agg_mappings.items()}
df_agg = df.groupby(['window_start', 'kpi_id']).agg(**agg_dict).reset_index() df_agg = df.groupby(['window_start']).agg(**agg_dict).reset_index()
except Exception as e: except Exception as e:
LOGGER.error(f"Aggregation error: {e}") LOGGER.error(f"Aggregation error: {e}")
return [] return []
# Apply thresholds # Apply thresholds
df_thresholded = ApplyThresholds(df_agg, thresholds) df_thresholded = ApplyThresholds(df_agg, thresholds)
df_thresholded['kpi_id'] = key
df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ') df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
# Convert aggregated DataFrame to list of dicts # Convert aggregated DataFrame to list of dicts
result = df_thresholded.to_dict(orient='records') result = df_thresholded.to_dict(orient='records')
...@@ -193,11 +197,14 @@ def DaskStreamer(key, kpi_list, thresholds, stop_event, ...@@ -193,11 +197,14 @@ def DaskStreamer(key, kpi_list, thresholds, stop_event,
continue continue
try: try:
message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce') message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce',unit='s')
LOGGER.warning(f"message_timestamp: {message_timestamp}. Skipping message.")
if pd.isna(message_timestamp): if pd.isna(message_timestamp):
LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.") LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.")
continue continue
window_start = message_timestamp.floor(window_size) window_start = message_timestamp.floor(window_size)
LOGGER.warning(f"window_start: {window_start}. Skipping message.")
message_value['window_start'] = window_start message_value['window_start'] = window_start
except Exception as e: except Exception as e:
LOGGER.error(f"Error processing timestamp: {e}. Skipping message.") LOGGER.error(f"Error processing timestamp: {e}. Skipping message.")
...@@ -212,7 +219,7 @@ def DaskStreamer(key, kpi_list, thresholds, stop_event, ...@@ -212,7 +219,7 @@ def DaskStreamer(key, kpi_list, thresholds, stop_event,
current_time = time.time() current_time = time.time()
if (current_time - last_batch_time) >= window_size_seconds and batch: if (current_time - last_batch_time) >= window_size_seconds and batch:
LOGGER.info("Time-based batch threshold reached. Processing batch.") LOGGER.info("Time-based batch threshold reached. Processing batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds) future = client.submit(process_batch, batch, agg_mappings, thresholds, key)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
batch = [] batch = []
last_batch_time = current_time last_batch_time = current_time
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import from_json, col, window, avg, min, max, first, last, stddev, when, round
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
LOGGER = logging.getLogger(__name__)
def DefiningSparkSession():
# Create a Spark session with specific spark verions (3.5.0)
return SparkSession.builder \
.appName("Analytics") \
.config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.getOrCreate()
def SettingKafkaConsumerParams(): # TODO: create get_kafka_consumer() in common with inputs (bootstrap server, subscribe, startingOffset and failOnDataLoss with default values)
return {
# "kafka.bootstrap.servers": '127.0.0.1:9092',
"kafka.bootstrap.servers": KafkaConfig.get_kafka_address(),
"subscribe" : KafkaTopic.VALUE.value, # topic should have atleast one message before spark session
"startingOffsets" : 'latest',
"failOnDataLoss" : 'false' # Optional: Set to "true" to fail the query on data loss
}
def DefiningRequestSchema():
return StructType([
StructField("time_stamp" , StringType() , True),
StructField("kpi_id" , StringType() , True),
StructField("kpi_value" , DoubleType() , True)
])
def GetAggregations(oper_list):
# Define the possible aggregation functions
agg_functions = {
'avg' : round(avg ("kpi_value"), 3) .alias("avg_value"),
'min' : round(min ("kpi_value"), 3) .alias("min_value"),
'max' : round(max ("kpi_value"), 3) .alias("max_value"),
'first': round(first ("kpi_value"), 3) .alias("first_value"),
'last' : round(last ("kpi_value"), 3) .alias("last_value"),
'stdev': round(stddev ("kpi_value"), 3) .alias("stdev_value")
}
return [agg_functions[op] for op in oper_list if op in agg_functions] # Filter and return only the selected aggregations
def ApplyThresholds(aggregated_df, thresholds):
# Apply thresholds (TH-Fail and TH-RAISE) based on the thresholds dictionary on the aggregated DataFrame.
# Loop through each column name and its associated thresholds
for col_name, (fail_th, raise_th) in thresholds.items():
# Apply TH-Fail condition (if column value is less than the fail threshold)
aggregated_df = aggregated_df.withColumn(
f"{col_name}_THRESHOLD_FALL",
when(col(col_name) < fail_th, True).otherwise(False)
)
# Apply TH-RAISE condition (if column value is greater than the raise threshold)
aggregated_df = aggregated_df.withColumn(
f"{col_name}_THRESHOLD_RAISE",
when(col(col_name) > raise_th, True).otherwise(False)
)
return aggregated_df
def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event,
window_size=None, win_slide_duration=None, time_stamp_col=None):
"""
Method to perform Spark operation Kafka stream.
NOTE: Kafka topic to be processesd should have atleast one row before initiating the spark session.
"""
kafka_consumer_params = SettingKafkaConsumerParams() # Define the Kafka consumer parameters
schema = DefiningRequestSchema() # Define the schema for the incoming JSON data
spark = DefiningSparkSession() # Define the spark session with app name and spark version
# extra options default assignment
if window_size is None: window_size = "60 seconds" # default
if win_slide_duration is None: win_slide_duration = "30 seconds" # default
if time_stamp_col is None: time_stamp_col = "time_stamp" # default
try:
# Read data from Kafka
raw_stream_data = spark \
.readStream \
.format("kafka") \
.options(**kafka_consumer_params) \
.load()
# Convert the value column from Kafka to a string
stream_data = raw_stream_data.selectExpr("CAST(value AS STRING)")
# Parse the JSON string into a DataFrame with the defined schema
parsed_stream_data = stream_data.withColumn("parsed_value", from_json(col("value"), schema))
# Select the parsed fields
final_stream_data = parsed_stream_data.select("parsed_value.*")
# Convert the time_stamp to proper timestamp (assuming it's in ISO format)
final_stream_data = final_stream_data.withColumn(time_stamp_col, col(time_stamp_col).cast(TimestampType()))
# Filter the stream to only include rows where the kpi_id is in the kpi_list
filtered_stream_data = final_stream_data.filter(col("kpi_id").isin(kpi_list))
# Define a window for aggregation
windowed_stream_data = filtered_stream_data \
.groupBy(
window( col(time_stamp_col),
window_size, slideDuration=win_slide_duration
),
col("kpi_id")
) \
.agg(*GetAggregations(oper_list))
# Apply thresholds to the aggregated data
thresholded_stream_data = ApplyThresholds(windowed_stream_data, thresholds)
# --- This will write output on console: FOR TESTING PURPOSES
# Start the Spark streaming query
# query = thresholded_stream_data \
# .writeStream \
# .outputMode("update") \
# .format("console")
# --- This will write output to Kafka: ACTUAL IMPLEMENTATION
query = thresholded_stream_data \
.selectExpr(f"CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \
.option("topic", KafkaTopic.ALARMS.value) \
.option("checkpointLocation", "analytics/.spark/checkpoint") \
.outputMode("update")
# Start the query execution
queryHandler = query.start()
# Loop to check for stop event flag. To be set by stop collector method.
while True:
if stop_event.is_set():
LOGGER.debug("Stop Event activated. Terminating in 5 seconds...")
print ("Stop Event activated. Terminating in 5 seconds...")
time.sleep(5)
queryHandler.stop()
break
time.sleep(5)
except Exception as e:
print("Error in Spark streaming process: {:}".format(e))
LOGGER.debug("Error in Spark streaming process: {:}".format(e))
...@@ -37,8 +37,8 @@ def main(): ...@@ -37,8 +37,8 @@ def main():
LOGGER.info('Starting...') LOGGER.info('Starting...')
# Start metrics server # Start metrics server
metrics_port = get_metrics_port() # metrics_port = get_metrics_port()
start_http_server(metrics_port) # start_http_server(metrics_port)
grpc_service = AnalyticsBackendService() grpc_service = AnalyticsBackendService()
grpc_service.start() grpc_service.start()
......
...@@ -49,17 +49,17 @@ def create_analyzer_id(): ...@@ -49,17 +49,17 @@ def create_analyzer_id():
def create_analyzer(): def create_analyzer():
_create_analyzer = Analyzer() _create_analyzer = Analyzer()
# _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.analyzer_id.analyzer_id.uuid = "20540c4f-6797-45e5-af70-6491b49283f9"
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
_kpi_id = KpiId() _kpi_id = KpiId()
# input IDs to analyze # input IDs to analyze
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" _kpi_id.kpi_id.uuid = "5716c369-932b-4a02-b4c7-6a2e808b92d7"
_create_analyzer.input_kpi_ids.append(_kpi_id) _create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _kpi_id.kpi_id.uuid = "8f70d908-cc48-48de-8664-dc9be2de0089"
_create_analyzer.input_kpi_ids.append(_kpi_id) _create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.input_kpi_ids.append(_kpi_id) _create_analyzer.input_kpi_ids.append(_kpi_id)
......
...@@ -62,9 +62,9 @@ RUN python3 -m pip install -r requirements.txt ...@@ -62,9 +62,9 @@ RUN python3 -m pip install -r requirements.txt
# Add component files into working directory # Add component files into working directory
WORKDIR /var/teraflow WORKDIR /var/teraflow
COPY src/analytics/__init__.py analytics/__init__.py COPY ./src/analytics/__init__.py analytics/__init__.py
COPY src/analytics/frontend/. analytics/frontend/ COPY ./src/analytics/frontend/. analytics/frontend/
COPY src/analytics/database/. analytics/database/ COPY ./src/analytics/database/. analytics/database/
# Start the service # Start the service
ENTRYPOINT ["python", "-m", "analytics.frontend.service"] ENTRYPOINT ["python", "-m", "analytics.frontend.service"]
...@@ -64,12 +64,24 @@ RUN python3 -m pip install -r requirements.txt ...@@ -64,12 +64,24 @@ RUN python3 -m pip install -r requirements.txt
WORKDIR /var/teraflow WORKDIR /var/teraflow
COPY src/telemetry/frontend/__init__.py telemetry/frontend/__init__.py COPY src/telemetry/frontend/__init__.py telemetry/frontend/__init__.py
COPY src/telemetry/frontend/client/. telemetry/frontend/client/ COPY src/telemetry/frontend/client/. telemetry/frontend/client/
COPY src/analytics/frontend/client/. analytics/frontend/client/
COPY src/analytics/frontend/service/. analytics/frontend/service/
COPY src/analytics/database/. analytics/database/
COPY src/analytics/frontend/__init__.py analytics/frontend/__init__.py
COPY src/context/__init__.py context/__init__.py COPY src/context/__init__.py context/__init__.py
COPY src/context/client/. context/client/ COPY src/context/client/. context/client/
COPY src/kpi_value_api/__init__.py kpi_value_api/__init__.py
COPY src/kpi_value_api/client/. kpi_value_api/client/
COPY src/kpi_manager/__init__.py kpi_manager/__init__.py COPY src/kpi_manager/__init__.py kpi_manager/__init__.py
COPY src/kpi_manager/client/. kpi_manager/client/ COPY src/kpi_manager/client/. kpi_manager/client/
COPY src/monitoring/__init__.py monitoring/__init__.py COPY src/monitoring/__init__.py monitoring/__init__.py
COPY src/monitoring/client/. monitoring/client/ COPY src/monitoring/client/. monitoring/client/
COPY src/automation/. automation/ COPY src/automation/. automation/
# Start the service # Start the service
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apscheduler==3.10.4
confluent-kafka==2.3.*
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
sqlalchemy-cockroachdb==1.4.*
SQLAlchemy-Utils==0.38.*
...@@ -27,11 +27,19 @@ from kpi_manager.client.KpiManagerClient import KpiManagerClient ...@@ -27,11 +27,19 @@ from kpi_manager.client.KpiManagerClient import KpiManagerClient
from common.proto.context_pb2 import ( Service ) from common.proto.context_pb2 import ( Service )
from common.proto.kpi_manager_pb2 import (KpiId, KpiDescriptor) from common.proto.kpi_manager_pb2 import (KpiId, KpiDescriptor)
from common.proto.kpi_value_api_pb2 import (KpiAlarms)
from common.proto.policy_pb2 import PolicyRuleService, PolicyRuleState from common.proto.policy_pb2 import PolicyRuleService, PolicyRuleState
from common.proto.policy_action_pb2 import PolicyRuleAction , PolicyRuleActionConfig from common.proto.policy_action_pb2 import PolicyRuleAction , PolicyRuleActionConfig
from common.proto.policy_condition_pb2 import PolicyRuleCondition from common.proto.policy_condition_pb2 import PolicyRuleCondition
from uuid import uuid4 from uuid import uuid4
import json
from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId
from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient
from common.method_wrappers.ServiceExceptions import InvalidArgumentException from common.method_wrappers.ServiceExceptions import InvalidArgumentException
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -49,6 +57,8 @@ class AutomationServiceServicerImpl(AutomationServiceServicer): ...@@ -49,6 +57,8 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
kpi_manager_client = KpiManagerClient() kpi_manager_client = KpiManagerClient()
policy_client = PolicyClient() policy_client = PolicyClient()
telemetry_frontend_client = TelemetryFrontendClient() telemetry_frontend_client = TelemetryFrontendClient()
analytics_frontend_client = AnalyticsFrontendClient()
analytic_frontend_service = AnalyticsFrontendServiceServicerImpl()
LOGGER.info('Trying to get the service ') LOGGER.info('Trying to get the service ')
LOGGER.info('request.serviceId.service_uuid.uuid({:s})'.format(str(request.serviceId.service_uuid.uuid))) LOGGER.info('request.serviceId.service_uuid.uuid({:s})'.format(str(request.serviceId.service_uuid.uuid)))
...@@ -98,16 +108,31 @@ class AutomationServiceServicerImpl(AutomationServiceServicer): ...@@ -98,16 +108,31 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
LOGGER.info('kpi_id_rx({:s})'.format(str(kpi_id_rx))) LOGGER.info('kpi_id_rx({:s})'.format(str(kpi_id_rx)))
########################################### ###########################################
####### START Analyzer LAT ################
# analyzer = Analyzer()
# analyzer.algorithm_name = '' # static ####### START Collector TX #################
# analyzer.operation_mode = '' collect_tx = Collector()
# analyzer.input_kpi_ids[] = [kpi_id_rx,kpi_id_tx] collect_tx.collector_id.collector_id.uuid = str(uuid4())
# analyzer.output_kpi_ids[] = [kpi_id_lat] collect_tx.kpi_id.kpi_id.uuid = kpi_id_tx.kpi_id.uuid
# collect_tx.duration_s = 2000 # static
# analyzer_id_lat: AnalyzerId = analyzer_client.StartAnalyzer(analyzer) collect_tx.interval_s = 1 # static
# LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat))) LOGGER.info('Start Collector TX'.format(str(collect_tx)))
###########################################
collect_id_tx: CollectorId = telemetry_frontend_client.StartCollector(collect_tx)
LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_tx)))
#############################################
####### START Collector RX ##################
collect_rx = Collector()
collect_rx.collector_id.collector_id.uuid = str(uuid4())
collect_rx.kpi_id.kpi_id.uuid = kpi_id_rx.kpi_id.uuid
collect_rx.duration_s = 2000 # static
collect_rx.interval_s = 1 # static
LOGGER.info('Start Collector RX'.format(str(collect_rx)))
collect_id_rx: CollectorId = telemetry_frontend_client.StartCollector(collect_rx)
LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_rx)))
###############################################
####### SET Policy LAT ################ ####### SET Policy LAT ################
policy_lat = PolicyRuleService() policy_lat = PolicyRuleService()
...@@ -144,29 +169,35 @@ class AutomationServiceServicerImpl(AutomationServiceServicer): ...@@ -144,29 +169,35 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state))) LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state)))
####### START Collector TX ################# ####### START Analyzer LAT ################
collect_tx = Collector() analyzer = Analyzer()
collect_tx.collector_id.collector_id.uuid = str(uuid4()) analyzer.analyzer_id.analyzer_id.uuid = str(uuid4())
collect_tx.kpi_id.kpi_id.uuid = kpi_id_tx.kpi_id.uuid analyzer.algorithm_name = 'Test_Aggergate_and_Threshold' # static
collect_tx.duration_s = 0 # static analyzer.operation_mode = 2
collect_tx.interval_s = 1 # static analyzer.input_kpi_ids.append(kpi_id_rx)
LOGGER.info('Start Collector TX'.format(str(collect_tx))) analyzer.input_kpi_ids.append(kpi_id_tx)
analyzer.output_kpi_ids.append(kpi_id_lat)
collect_id_tx: CollectorId = telemetry_frontend_client.StartCollector(collect_tx)
LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_tx))) _threshold_dict = {'min_latency_E2E': (2, 105)}
############################################# analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
analyzer.parameters['window_size'] = "60s"
analyzer.parameters['window_slider'] = "30s"
analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(analyzer)
LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat)))
kpi_value_api_client = KpiValueApiClient()
stream: KpiAlarms = kpi_value_api_client.GetKpiAlarms(kpi_id_lat.kpi_id.uuid)
for response in stream:
if response is None:
LOGGER.debug('NO message')
else:
LOGGER.debug(str(response))
###########################################
####### START Collector RX ################## # for response in analytic_frontend_service.StartResponseListener( analyzer_id_lat.analyzer_id.uuid):
collect_rx = Collector() # LOGGER.info("response.value {:s}",response)
collect_rx.collector_id.collector_id.uuid = str(uuid4())
collect_rx.kpi_id.kpi_id.uuid = kpi_id_rx.kpi_id.uuid
collect_rx.duration_s = 0 # static
collect_rx.interval_s = 1 # static
LOGGER.info('Start Collector RX'.format(str(collect_rx)))
collect_id_rx: CollectorId = telemetry_frontend_client.StartCollector(collect_rx)
LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_rx)))
###############################################
except grpc.RpcError as e: except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member
......
...@@ -19,7 +19,8 @@ from common.Settings import get_setting ...@@ -19,7 +19,8 @@ from common.Settings import get_setting
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}' # KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
KFK_SERVER_ADDRESS_TEMPLATE = '10.152.183.186'
class KafkaConfig(Enum): class KafkaConfig(Enum):
...@@ -29,8 +30,10 @@ class KafkaConfig(Enum): ...@@ -29,8 +30,10 @@ class KafkaConfig(Enum):
if kafka_server_address is None: if kafka_server_address is None:
KFK_NAMESPACE = get_setting('KFK_NAMESPACE') KFK_NAMESPACE = get_setting('KFK_NAMESPACE')
KFK_PORT = get_setting('KFK_SERVER_PORT') KFK_PORT = get_setting('KFK_SERVER_PORT')
kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE+':'+KFK_PORT
# kafka_server_address = "127.0.0.1:9092" #print("XXXXXXXXXXXXXXXXXXXXXXXXX")
print(kafka_server_address)
#kafka_server_address = "1"
return kafka_server_address return kafka_server_address
@staticmethod @staticmethod
......
...@@ -54,7 +54,7 @@ ...@@ -54,7 +54,7 @@
<maven-resources-plugin.version>3.2.0</maven-resources-plugin.version> <maven-resources-plugin.version>3.2.0</maven-resources-plugin.version>
<maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version> <maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
<sonarsource-scanner-plugin.version>3.8.0.2131</sonarsource-scanner-plugin.version> <sonarsource-scanner-plugin.version>3.8.0.2131</sonarsource-scanner-plugin.version>
<spotless-plugin.version>2.10.3</spotless-plugin.version> <spotless-plugin.version>2.43.0</spotless-plugin.version>
<versions-maven-plugin.version>2.8.1</versions-maven-plugin.version> <versions-maven-plugin.version>2.8.1</versions-maven-plugin.version>
<quarkus-maven-plugin.version>3.1.3.Final</quarkus-maven-plugin.version> <quarkus-maven-plugin.version>3.1.3.Final</quarkus-maven-plugin.version>
<quarkus-bootstrap-maven-version>2.16.12.Final</quarkus-bootstrap-maven-version> <quarkus-bootstrap-maven-version>2.16.12.Final</quarkus-bootstrap-maven-version>
...@@ -136,6 +136,11 @@ ...@@ -136,6 +136,11 @@
<artifactId>quarkus-resteasy-reactive</artifactId> <artifactId>quarkus-resteasy-reactive</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency> <dependency>
<groupId>io.quarkus</groupId> <groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId> <artifactId>quarkus-arc</artifactId>
...@@ -425,7 +430,7 @@ ...@@ -425,7 +430,7 @@
</toggleOffOn> </toggleOffOn>
<googleJavaFormat> <googleJavaFormat>
<version>1.10.0</version> <version>1.15.0</version>
<style>GOOGLE</style> <style>GOOGLE</style>
</googleJavaFormat> </googleJavaFormat>
......
...@@ -37,6 +37,7 @@ FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4 AS release ...@@ -37,6 +37,7 @@ FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4 AS release
ARG JAVA_PACKAGE=java-11-openjdk-headless ARG JAVA_PACKAGE=java-11-openjdk-headless
ARG RUN_JAVA_VERSION=1.3.8 ARG RUN_JAVA_VERSION=1.3.8
ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en'
# Install java and the run-java script # Install java and the run-java script
# Also set up permissions for user `1001` # Also set up permissions for user `1001`
RUN microdnf install curl ca-certificates ${JAVA_PACKAGE} \ RUN microdnf install curl ca-certificates ${JAVA_PACKAGE} \
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment