Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Commits on Source (204)
Showing
with 590 additions and 84 deletions
......@@ -179,5 +179,3 @@ libyang/
# Other logs
**/logs/*.log.*
# PySpark checkpoints
src/analytics/.spark/*
......@@ -215,6 +215,9 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"}
# Deploy Apache Kafka
./deploy/kafka.sh
#Deploy Monitoring (Prometheus, Mimir, Grafana)
./deploy/monitoring.sh
# Expose Dashboard
./deploy/expose_dashboard.sh
......
#!/bin/bash
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -euo pipefail
# -----------------------------------------------------------
# Global namespace for all deployments
# -----------------------------------------------------------
NAMESPACE="monitoring"
VALUES_FILE_PATH="manifests/monitoring"
# -----------------------------------------------------------
# Prometheus Configuration
# -----------------------------------------------------------
RELEASE_NAME_PROM="mon-prometheus"
CHART_REPO_NAME_PROM="prometheus-community"
CHART_REPO_URL_PROM="https://prometheus-community.github.io/helm-charts"
CHART_NAME_PROM="prometheus"
VALUES_FILE_PROM="$VALUES_FILE_PATH/prometheus_values.yaml"
# -----------------------------------------------------------
# Mimir Configuration
# -----------------------------------------------------------
# RELEASE_NAME_MIMIR="mon-mimir"
# CHART_REPO_NAME_MIMIR="grafana"
# CHART_REPO_URL_MIMIR="https://grafana.github.io/helm-charts"
# CHART_NAME_MIMIR="mimir-distributed"
# VALUES_FILE_MIMIR="$VALUES_FILE_PATH/mimir_values.yaml"
# -----------------------------------------------------------
# Grafana Configuration
# -----------------------------------------------------------
# RELEASE_NAME_GRAFANA="mon-grafana"
# CHART_REPO_NAME_GRAFANA="grafana"
# CHART_REPO_URL_GRAFANA="https://grafana.github.io/helm-charts"
# CHART_NAME_GRAFANA="grafana"
# VALUES_FILE_GRAFANA="$VALUES_FILE_PATH/grafana_values.yaml"
# -----------------------------------------------------------
# Function to deploy or upgrade a Helm chart
# -----------------------------------------------------------
deploy_chart() {
local release_name="$1"
local chart_repo_name="$2"
local chart_repo_url="$3"
local chart_name="$4"
local values_file="$5"
local namespace="$6"
echo ">>> Deploying [${release_name}] from repo [${chart_repo_name}]..."
# Add or update the Helm repo
echo "Adding/updating Helm repo: $chart_repo_name -> $chart_repo_url"
helm repo add "$chart_repo_name" "$chart_repo_url" || true
helm repo update
# Create namespace if needed
echo "Creating namespace '$namespace' if it doesn't exist..."
kubectl get namespace "$namespace" >/dev/null 2>&1 || kubectl create namespace "$namespace"
# Install or upgrade the chart
if [ -n "$values_file" ] && [ -f "$values_file" ]; then
echo "Installing/Upgrading $release_name using custom values from $values_file..."
helm upgrade --install "$release_name" "$chart_repo_name/$chart_name" \
--namespace "$namespace" \
--values "$values_file"
else
echo "Installing/Upgrading $release_name with default chart values..."
helm upgrade --install "$release_name" "$chart_repo_name/$chart_name" \
--namespace "$namespace"
fi
echo "<<< Deployment initiated for [$release_name]."
echo
}
# -----------------------------------------------------------
# Actual Deployments
# -----------------------------------------------------------
# 1) Deploy Prometheus
deploy_chart "$RELEASE_NAME_PROM" \
"$CHART_REPO_NAME_PROM" \
"$CHART_REPO_URL_PROM" \
"$CHART_NAME_PROM" \
"$VALUES_FILE_PROM" \
"$NAMESPACE"
# Optionally wait for Prometheus server pod to become ready
kubectl rollout status deployment/"$RELEASE_NAME_PROM-server" -n "$NAMESPACE" || true
# 2) Deploy Mimir
# deploy_chart "$RELEASE_NAME_MIMIR" \
# "$CHART_REPO_NAME_MIMIR" \
# "$CHART_REPO_URL_MIMIR" \
# "$CHART_NAME_MIMIR" \
# "$VALUES_FILE_MIMIR" \
# "$NAMESPACE"
# Depending on how Mimir runs (StatefulSets, Deployments), you can wait for
# the correct resource to be ready. For example:
# kubectl rollout status statefulset/"$RELEASE_NAME_MIMIR-distributor" -n "$NAMESPACE" || true
# 3) Deploy Grafana
# deploy_chart "$RELEASE_NAME_GRAFANA" \
# "$CHART_REPO_NAME_GRAFANA" \
# "$CHART_REPO_URL_GRAFANA" \
# "$CHART_NAME_GRAFANA" \
# "$VALUES_FILE_GRAFANA" \
# "$NAMESPACE"
# kubectl rollout status deployment/"$RELEASE_NAME_GRAFANA" -n "$NAMESPACE" || true
# -----------------------------------------------------------
echo "All deployments completed!"
......@@ -39,6 +39,8 @@ spec:
env:
- name: LOG_LEVEL
value: "INFO"
- name: PUSHGATEWAY_URL
value: "http://mon-prometheus-prometheus-pushgateway.monitoring.svc.cluster.local:9091"
envFrom:
- secretRef:
name: kfk-kpi-data
......
rbac:
create: true
## Use an existing ClusterRole/Role (depending on rbac.namespaced false/true)
# useExistingRole: name-of-some-role
# useExistingClusterRole: name-of-some-clusterRole
pspEnabled: false
pspUseAppArmor: false
namespaced: false
serviceAccount:
create: true
name:
nameTest:
## ServiceAccount labels.
automountServiceAccountToken: false
replicas: 1
## Create a headless service for the deployment
headlessService: false
## Should the service account be auto mounted on the pod
automountServiceAccountToken: true
## Create HorizontalPodAutoscaler object for deployment type
#
autoscaling:
enabled: false
minReplicas: 1
maxReplicas: 3
targetCPU: "60"
targetMemory: ""
behavior: {}
deploymentStrategy:
type: RollingUpdate
readinessProbe:
httpGet:
path: /api/health
port: 3000
livenessProbe:
httpGet:
path: /api/health
port: 3000
initialDelaySeconds: 60
timeoutSeconds: 30
failureThreshold: 10
image:
registry: docker.io
repository: grafana/grafana
# Overrides the Grafana image tag whose default is the chart appVersion
tag: ""
sha: ""
pullPolicy: IfNotPresent
## Optionally specify an array of imagePullSecrets.
## Secrets must be manually created in the namespace.
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/
## Can be templated.
##
pullSecrets: []
# - myRegistrKeySecretName
testFramework:
enabled: true
## The type of Helm hook used to run this test. Defaults to test.
## ref: https://helm.sh/docs/topics/charts_hooks/#the-available-hooks
##
# hookType: test
image:
# -- The Docker registry
registry: docker.io
repository: bats/bats
tag: "v1.4.1"
imagePullPolicy: IfNotPresent
# dns configuration for pod
dnsPolicy: ~
dnsConfig: {}
# nameservers:
# - 8.8.8.8
# options:
# - name: ndots
# value: "2"
# - name: edns0
securityContext:
runAsNonRoot: true
runAsUser: 472
runAsGroup: 472
fsGroup: 472
containerSecurityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
seccompProfile:
type: RuntimeDefault
# Enable creating the grafana configmap
createConfigmap: true
downloadDashboardsImage:
registry: docker.io
repository: curlimages/curl
tag: 8.9.1
sha: ""
pullPolicy: IfNotPresent
downloadDashboards:
env: {}
envFromSecret: ""
resources: {}
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
seccompProfile:
type: RuntimeDefault
envValueFrom: {}
# ENV_NAME:
# configMapKeyRef:
# name: configmap-name
# key: value_key
## Pod Annotations
# podAnnotations: {}
## ConfigMap Annotations
# configMapAnnotations: {}
# argocd.argoproj.io/sync-options: Replace=true
## Pod Labels
# podLabels: {}
podPortName: grafana
gossipPortName: gossip
## Deployment annotations
# annotations: {}
service:
enabled: true
type: NodePort
port: 80
targetPort: 3000
nodePort: 30080
portName: service
## Enable persistence using Persistent Volume Claims
## ref: https://kubernetes.io/docs/user-guide/persistent-volumes/
##
persistence:
type: pvc
enabled: true
# storageClassName: default
accessModes:
- ReadWriteOnce
size: 10Gi
# annotations: {}
finalizers:
- kubernetes.io/pvc-protection
disableWarning: false
## If 'lookupVolumeName' is set to true, Helm will attempt to retrieve
## the current value of 'spec.volumeName' and incorporate it into the template.
lookupVolumeName: true
# Administrator credentials when not using an existing secret (see below)
adminUser: admin
# adminPassword: strongpassword
# Use an existing secret for the admin user.
admin:
## Name of the secret. Can be templated.
existingSecret: ""
userKey: admin-user
passwordKey: admin-password
## Configure grafana datasources
## ref: http://docs.grafana.org/administration/provisioning/#datasources
##
datasources:
datasources.yaml:
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
url: http://mon-prometheus-server.monitoring.svc.cluster.local
access: proxy
isDefault: true
- name: Mimir
type: prometheus
url: http://mimir-nginx.mon-mimir.svc:80/prometheus
access: proxy
isDefault: false
## Grafana's primary configuration
## NOTE: values in map will be converted to ini format
## ref: http://docs.grafana.org/installation/configuration/
##
grafana.ini:
paths:
data: /var/lib/grafana/
logs: /var/log/grafana
plugins: /var/lib/grafana/plugins
provisioning: /etc/grafana/provisioning
analytics:
check_for_updates: true
log:
mode: console
grafana_net:
url: https://grafana.net
server:
domain: "{{ if (and .Values.ingress.enabled .Values.ingress.hosts) }}{{ tpl (.Values.ingress.hosts | first) . }}{{ else }}''{{ end }}"
## Number of old ReplicaSets to retain
##
revisionHistoryLimit: 5
# assertNoLeakedSecrets is a helper function defined in _helpers.tpl that checks if secret
# values are not exposed in the rendered grafana.ini configmap. It is enabled by default.
#
# To pass values into grafana.ini without exposing them in a configmap, use variable expansion:
# https://grafana.com/docs/grafana/latest/setup-grafana/configure-grafana/#variable-expansion
#
# Alternatively, if you wish to allow secret values to be exposed in the rendered grafana.ini configmap,
# you can disable this check by setting assertNoLeakedSecrets to false.
assertNoLeakedSecrets: true
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Configuration for Prometheus components and server settings
# Global Prometheus configuration
alertmanager:
enabled: false # Default is true
kube-state-metrics:
enabled: false # Default is true
prometheus-node-exporter:
enabled: false # Default is true
prometheus-pushgateway:
enabled: true # Default is true
# Prometheus server-specific configuration
server:
retention: "30d"
logLevel: "debug"
resources:
requests:
cpu: "250m"
memory: "256Mi"
limits:
cpu: "1"
memory: "1Gi"
# Expose the Prometheus server via a Kubernetes service
service:
type: NodePort
nodePort: 30090
extraScrapeConfigs:
- job_name: 'pushgateway'
static_configs:
- targets:
- 'prometheus-pushgateway.monitoring.svc.cluster.local:9091' # Push Gateway endpoint
# Global Prometheus settings:
global:
scrape_interval: 10s
evaluation_interval: 10s
......@@ -223,6 +223,9 @@ enum DeviceDriverEnum {
DEVICEDRIVER_IETF_ACTN = 10;
DEVICEDRIVER_OC = 11;
DEVICEDRIVER_QKD = 12;
DEVICEDRIVER_IETF_L3VPN = 13;
DEVICEDRIVER_IETF_SLICE = 14;
DEVICEDRIVER_NCE = 15;
}
enum DeviceOperationalStatusEnum {
......
......@@ -18,8 +18,10 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \
analytics/frontend/tests/test_frontend.py
......@@ -19,6 +19,7 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
kpi_value_writer/tests/test_kpi_value_writer.py
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
# Run unitary tests and analyze coverage of code at same time
# helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
nbi/tests/test_slice_2.py
......@@ -25,5 +25,5 @@ export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetr
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=debug --log-cli-level=debug --verbose \
python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
telemetry/backend/tests/test_backend.py
......@@ -18,10 +18,11 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
telemetry/frontend/tests/test_frontend.py
......@@ -20,7 +20,7 @@ import threading
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer
from confluent_kafka import KafkaError
from confluent_kafka import KafkaError, KafkaException
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from analytics.backend.service.Streamer import DaskStreamer
......@@ -32,7 +32,7 @@ LOGGER = logging.getLogger(__name__)
class AnalyticsBackendService(GenericGrpcService):
"""
AnalyticsBackendService class is responsible for handling the requests from the AnalyticsFrontendService.
It listens to the Kafka topic for the requests and starts/stops the DaskStreamer accordingly.
It listens to the Kafka topic for the requests to start and stop the Streamer accordingly.
It also initializes the Kafka producer and Dask cluster for the streamer.
"""
def __init__(self, cls_name : str = __name__, n_workers=1, threads_per_worker=1
......@@ -62,34 +62,36 @@ class AnalyticsBackendService(GenericGrpcService):
listener for requests on Kafka topic.
"""
LOGGER.info("Request Listener is initiated ...")
# print ("Request Listener is initiated ...")
consumer = self.request_consumer
consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
while True:
receive_msg = consumer.poll(2.0)
if receive_msg is None:
message = consumer.poll(2.0)
if message is None:
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
break
elif message.error():
if message.error().code() == KafkaError._PARTITION_EOF:
LOGGER.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}")
break
elif message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
LOGGER.error(f"Subscribed topic {message.topic()} does not exist. May be topic does not have any messages.")
continue
elif message.error():
raise KafkaException(message.error())
try:
analyzer = json.loads(receive_msg.value().decode('utf-8'))
analyzer_uuid = receive_msg.key().decode('utf-8')
analyzer = json.loads(message.value().decode('utf-8'))
analyzer_uuid = message.key().decode('utf-8')
LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
if self.StopStreamer(analyzer_uuid):
LOGGER.info("Dask Streamer stopped.")
else:
LOGGER.error("Failed to stop Dask Streamer.")
LOGGER.warning("Failed to stop Dask Streamer. May be already terminated...")
else:
if self.StartStreamer(analyzer_uuid, analyzer):
LOGGER.info("Dask Streamer started.")
else:
LOGGER.error("Failed to start Dask Streamer.")
LOGGER.warning("Failed to start Dask Streamer.")
except Exception as e:
LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
......@@ -104,11 +106,12 @@ class AnalyticsBackendService(GenericGrpcService):
try:
streamer = DaskStreamer(
key = analyzer_uuid,
input_kpis = analyzer['input_kpis' ],
output_kpis = analyzer['output_kpis'],
thresholds = analyzer['thresholds' ],
batch_size = analyzer['batch_size' ],
window_size = analyzer['window_size'],
input_kpis = analyzer['input_kpis' ],
output_kpis = analyzer['output_kpis' ],
thresholds = analyzer['thresholds' ],
batch_size = analyzer['batch_size_min' ],
batch_duration = analyzer['batch_duration_min'],
window_size = analyzer['window_size' ],
cluster_instance = self.cluster,
producer_instance = self.central_producer,
)
......@@ -116,14 +119,17 @@ class AnalyticsBackendService(GenericGrpcService):
LOGGER.info(f"Streamer started with analyzer Id: {analyzer_uuid}")
# Stop the streamer after the given duration
if analyzer['duration'] > 0:
duration = analyzer['duration']
if duration > 0:
def stop_after_duration():
time.sleep(analyzer['duration'])
LOGGER.warning(f"Execution duration completed of Analyzer: {analyzer_uuid}")
time.sleep(duration)
LOGGER.warning(f"Execution duration ({duration}) completed of Analyzer: {analyzer_uuid}")
if not self.StopStreamer(analyzer_uuid):
LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.")
duration_thread = threading.Thread(target=stop_after_duration, daemon=True)
duration_thread = threading.Thread(
target=stop_after_duration, daemon=True, name=f"stop_after_duration_{analyzer_uuid}"
)
duration_thread.start()
self.active_streamers[analyzer_uuid] = streamer
......@@ -139,7 +145,7 @@ class AnalyticsBackendService(GenericGrpcService):
try:
if analyzer_uuid not in self.active_streamers:
LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid))
return False
return True
LOGGER.info(f"Terminating streamer with Analyzer Id: {analyzer_uuid}")
streamer = self.active_streamers[analyzer_uuid]
streamer.stop()
......@@ -147,11 +153,11 @@ class AnalyticsBackendService(GenericGrpcService):
del self.active_streamers[analyzer_uuid]
LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been trerminated sucessfully.")
return True
except Exception as e:
LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e))
except:
LOGGER.exception("Failed to stop Dask Streamer.")
return False
def close(self): # TODO: Is this function needed?
def close(self):
"""
Close the producer and cluster cleanly.
"""
......@@ -159,11 +165,15 @@ class AnalyticsBackendService(GenericGrpcService):
try:
self.central_producer.flush()
LOGGER.info("Kafka producer flushed and closed.")
except Exception as e:
LOGGER.error(f"Error closing Kafka producer: {e}")
except:
LOGGER.exception("Error closing Kafka producer")
if self.cluster:
try:
self.cluster.close()
LOGGER.info("Dask cluster closed.")
except Exception as e:
LOGGER.error(f"Error closing Dask cluster: {e}")
except:
LOGGER.exception("Error closing Dask cluster")
def stop(self):
self.close()
return super().stop()
......@@ -95,6 +95,8 @@ def aggregation_handler(
"sum" : ('kpi_value', 'sum'),
}
results = []
# Process each KPI-specific task parameter
for kpi_index, kpi_id in enumerate(input_kpi_list):
......@@ -122,10 +124,13 @@ def aggregation_handler(
agg_df['kpi_id'] = output_kpi_list[kpi_index]
# logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}")
result = threshold_handler(key, agg_df, kpi_task_parameters)
record = threshold_handler(key, agg_df, kpi_task_parameters)
return result.to_dict(orient='records')
results.extend(record.to_dict(orient='records'))
else:
logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.")
continue
return []
if results:
return results
else:
return []
......@@ -29,19 +29,21 @@ logger = logging.getLogger(__name__)
class DaskStreamer(threading.Thread):
def __init__(self, key, input_kpis, output_kpis, thresholds,
batch_size = 5,
batch_duration = None,
window_size = None,
cluster_instance = None,
producer_instance = AnalyzerHelper.initialize_kafka_producer()
):
super().__init__()
self.key = key
self.input_kpis = input_kpis
self.output_kpis = output_kpis
self.thresholds = thresholds
self.window_size = window_size
self.batch_size = batch_size
self.running = True
self.batch = []
self.key = key
self.input_kpis = input_kpis
self.output_kpis = output_kpis
self.thresholds = thresholds
self.window_size = window_size # TODO: Not implemented
self.batch_size = batch_size
self.batch_duration = batch_duration
self.running = True
self.batch = []
# Initialize Kafka and Dask components
self.client = AnalyzerHelper.initialize_dask_client(cluster_instance)
......@@ -65,13 +67,16 @@ class DaskStreamer(threading.Thread):
if not self.client:
logger.warning("Dask client is not running. Exiting loop.")
break
message = self.consumer.poll(timeout=2.0)
message = self.consumer.poll(timeout=1.0)
if message is None:
# logger.info("No new messages received.")
continue
if message.error():
if message.error().code() == KafkaError._PARTITION_EOF:
logger.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}")
elif message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
logger.error(f"Subscribed topic {message.topic()} does not exist. May be topic does not have any messages.")
continue
elif message.error():
raise KafkaException(message.error())
else:
......@@ -83,7 +88,7 @@ class DaskStreamer(threading.Thread):
self.batch.append(value)
# Window size has a precedence over batch size
if self.window_size is None:
if self.batch_duration is None:
if len(self.batch) >= self.batch_size: # If batch size is not provided, process continue with the default batch size
logger.info(f"Processing based on batch size {self.batch_size}.")
self.task_handler_selector()
......@@ -91,8 +96,8 @@ class DaskStreamer(threading.Thread):
else:
# Process based on window size
current_time = time.time()
if (current_time - last_batch_time) >= self.window_size and self.batch:
logger.info(f"Processing based on window size {self.window_size}.")
if (current_time - last_batch_time) >= self.batch_duration and self.batch:
logger.info(f"Processing based on window size {self.batch_duration}.")
self.task_handler_selector()
self.batch = []
last_batch_time = current_time
......
......@@ -16,8 +16,11 @@ import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Settings import get_log_level, get_metrics_port
from .AnalyticsBackendService import AnalyticsBackendService
from common.tools.kafka.Variables import KafkaTopic
terminate = threading.Event()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
......@@ -36,6 +39,8 @@ def main():
LOGGER.info('Starting...')
KafkaTopic.create_all_topics()
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
......
......@@ -32,13 +32,16 @@ def get_thresholds():
}
def get_duration():
return 40
return 90
def get_batch_duration():
return 30
def get_windows_size():
return None
def get_batch_size():
return 10
return 5
def get_interval():
return 5
......
......@@ -97,14 +97,15 @@ def analytics_service(mock_kafka_producer, mock_dask_cluster, mock_dask_client,
@pytest.fixture
def analyzer_data():
return {
'algo_name' : 'test_algorithm',
'oper_mode' : 'test_mode',
'input_kpis' : get_input_kpi_list(),
'output_kpis': get_output_kpi_list(),
'thresholds' : get_thresholds(),
'batch_size' : get_batch_size(),
'window_size': get_windows_size(),
'duration' : get_duration(),
'algo_name' : 'test_algorithm',
'oper_mode' : 'test_mode',
'input_kpis' : get_input_kpi_list(),
'output_kpis' : get_output_kpi_list(),
'thresholds' : get_thresholds(),
'duration' : get_duration(),
'batch_size_min' : get_batch_size(),
'window_size' : get_windows_size(),
'batch_duration_min' : get_duration(),
}
def test_start_streamer(analytics_service, analyzer_data):
......@@ -122,7 +123,8 @@ def test_stop_streamer(analytics_service, analyzer_data):
assert analyzer_uuid in analytics_service.active_streamers
# Stop streamer
result = analytics_service.StopStreamer(analyzer_uuid)
with patch('time.sleep', return_value=None):
result = analytics_service.StopStreamer(analyzer_uuid)
assert result is True
assert analyzer_uuid not in analytics_service.active_streamers
......@@ -246,7 +248,7 @@ def test_run_with_valid_consumer(dask_streamer):
assert len(dask_streamer.batch) == 0 # Batch should be cleared after processing
mock_task_handler_selector.assert_called_once() # Task handler should be called once
mock_poll.assert_any_call(timeout=2.0) # Poll should have been called at least once
mock_poll.assert_any_call(timeout=1.0) # Poll should have been called at least once
# # add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py
def test_aggregation_handler():
......@@ -282,20 +284,35 @@ def test_threshold_handler():
###########################
# This is a local machine test to check the integration of the backend service with the Streamer
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
# @pytest.fixture(scope='session')
# def analyticBackend_service():
# logger.info('Initializing AnalyticsBackendService...')
# _service = AnalyticsBackendService()
# _service.start()
# logger.info('Yielding AnalyticsBackendService...')
# yield _service
# logger.info('Terminating AnalyticsBackendService...')
# _service.stop()
# logger.info('Terminated AnalyticsBackendService...')
# # --- "test_validate_kafka_topics" should be run before the functionality tests ---
# def test_validate_kafka_topics():
# logger.debug(" >>> test_validate_kafka_topics: START <<< ")
# response = KafkaTopic.create_all_topics()
# assert isinstance(response, bool)
# def test_backend_integration_with_analyzer():
# backendServiceObject = AnalyticsBackendService()
# backendServiceObject.install_servicers()
# def test_backend_integration_with_frontend(analyticBackend_service: AnalyticsBackendService):
# # backendServiceObject = AnalyticsBackendService()
# # backendServiceObject.install_servicers()
# logger.info(" waiting for 2 minutes for the backend service before termination ... ")
# time.sleep(150)
# time.sleep(300)
# logger.info(" Initiating stop collector ... ")
# status = backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666")
# backendServiceObject.close()
# status = analyticBackend_service.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666")
# analyticBackend_service.close()
# assert isinstance(status, bool)
# assert status == True
# logger.info(" Backend service terminated successfully ... ")
......@@ -46,7 +46,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StartAnalyzer(self,
request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore
request : Analyzer, context: grpc.ServicerContext # type: ignore
) -> AnalyzerId: # type: ignore
LOGGER.info ("At Service gRPC message: {:}".format(request))
response = AnalyzerId()
......@@ -65,14 +65,18 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
"""
analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
analyzer_to_generate : Dict = {
"algo_name" : analyzer_obj.algorithm_name,
"input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
"output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
"oper_mode" : analyzer_obj.operation_mode,
"thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
"window_size" : analyzer_obj.parameters["window_size"],
"window_slider" : analyzer_obj.parameters["window_slider"],
# "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
"algo_name" : analyzer_obj.algorithm_name,
"input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
"output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
"oper_mode" : analyzer_obj.operation_mode,
"duration" : analyzer_obj.duration_s,
"thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
"window_size" : analyzer_obj.parameters["window_size"], # slider window size in seconds (single batch execution time)
"window_slider" : analyzer_obj.parameters["window_slider"], # slider shift in seconds
"batch_size_min" : analyzer_obj.batch_min_size, # currently implemented
"batch_size_max" : analyzer_obj.batch_max_size,
"batch_duration_min" : analyzer_obj.batch_min_duration_s, # currently implemented
"batch_interval_max" : analyzer_obj.batch_max_duration_s
}
self.kafka_producer.produce(
KafkaTopic.ANALYTICS_REQUEST.value,
......@@ -137,7 +141,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StopAnalyzer(self,
request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore
request : AnalyzerId, context: grpc.ServicerContext # type: ignore
) -> Empty: # type: ignore
LOGGER.info ("At Service gRPC message: {:}".format(request))
try:
......@@ -181,7 +185,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectAnalyzers(self,
filter : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore
filter : AnalyzerFilter, context: grpc.ServicerContext # type: ignore
) -> AnalyzerList: # type: ignore
LOGGER.info("At Service gRPC message: {:}".format(filter))
response = AnalyzerList()
......@@ -202,7 +206,5 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
def delivery_callback(self, err, msg):
if err:
LOGGER.debug('Message delivery failed: {:}'.format(err))
# print ('Message delivery failed: {:}'.format(err))
else:
LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
# print('Message delivered to topic {:}'.format(msg.topic()))
......@@ -18,8 +18,11 @@ from common.Settings import get_log_level, get_metrics_port
from .AnalyticsFrontendService import AnalyticsFrontendService
from analytics.database.AnalyzerModel import Analyzer as Model
from common.tools.database.GenericDatabase import Database
from common.tools.kafka.Variables import KafkaTopic
terminate = threading.Event()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
......@@ -43,6 +46,8 @@ def main():
kpiDBobj.create_database()
kpiDBobj.create_tables()
KafkaTopic.create_all_topics()
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
......