Skip to content
Snippets Groups Projects
Commit 2b9013e1 authored by Konstantinos Poulakakis's avatar Konstantinos Poulakakis
Browse files

Merge branch...

Merge branch 'feat/194-unable-to-correctly-extract-the-aggregation-function-names-from-the-dictionary-received-as' into feat/159-automation-component-skeleton
parents 9a254805 ad695f99
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!238Automation component skeleton
Showing
with 258 additions and 257 deletions
......@@ -20,7 +20,7 @@ import "kpi_manager.proto";
//import "kpi_sample_types.proto";
service AnalyticsFrontendService {
rpc StartAnalyzer (Analyzer ) returns (AnalyzerId ) {}
rpc StartAnalyzer (Analyzer ) returns (AnalyzerId) {}
rpc StopAnalyzer (AnalyzerId ) returns (context.Empty) {}
rpc SelectAnalyzers(AnalyzerFilter) returns (AnalyzerList ) {}
}
......
......@@ -19,8 +19,9 @@ import "context.proto";
import "kpi_manager.proto";
service KpiValueAPIService {
rpc StoreKpiValues (KpiValueList) returns (context.Empty) {}
rpc SelectKpiValues (KpiValueFilter) returns (KpiValueList) {}
rpc StoreKpiValues (KpiValueList) returns (context.Empty) {}
rpc SelectKpiValues (KpiValueFilter) returns (KpiValueList) {}
rpc GetKpiAlarms (kpi_manager.KpiId) returns (stream KpiAlarms) {}
}
message KpiValue {
......@@ -50,3 +51,10 @@ message KpiValueFilter {
repeated context.Timestamp start_timestamp = 2;
repeated context.Timestamp end_timestamp = 3;
}
message KpiAlarms {
context.Timestamp start_timestamp = 1;
context.Timestamp end_timestamp = 2;
kpi_manager.KpiId kpi_id = 3;
map<string, bool> alarms = 4;
}
......@@ -78,20 +78,17 @@ class AnalyticsBackendService(GenericGrpcService):
LOGGER.error("Failed to terminate analytics backend {:}".format(e))
return False
def install_services(self):
stop_event = threading.Event()
thread = threading.Thread(target=self.RequestListener,
args=(stop_event,) )
thread.start()
return (thread, stop_event)
def install_servicers(self):
threading.Thread(target=self.RequestListener, args=()).start()
def RequestListener(self, stop_event):
def RequestListener(self):
"""
listener for requests on Kafka topic.
"""
LOGGER.info("Request Listener is initiated ...")
consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
while not stop_event.is_set():
while True:
receive_msg = consumer.poll(2.0)
if receive_msg is None:
continue
......@@ -101,7 +98,7 @@ class AnalyticsBackendService(GenericGrpcService):
else:
print("Consumer error: {}".format(receive_msg.error()))
break
analyzer = json.loads(receive_msg.value().decode('utf-8'))
analyzer = json.loads(receive_msg.value().decode('utf-8'))
analyzer_uuid = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
......
......@@ -33,7 +33,7 @@ def SettingKafkaConsumerParams(): # TODO: create get_kafka_consumer() in comm
return {
# "kafka.bootstrap.servers": '127.0.0.1:9092',
"kafka.bootstrap.servers": KafkaConfig.get_kafka_address(),
"subscribe" : KafkaTopic.VALUE.value,
"subscribe" : KafkaTopic.VALUE.value, # topic should have atleast one message before spark session
"startingOffsets" : 'latest',
"failOnDataLoss" : 'false' # Optional: Set to "true" to fail the query on data loss
}
......@@ -64,7 +64,7 @@ def ApplyThresholds(aggregated_df, thresholds):
for col_name, (fail_th, raise_th) in thresholds.items():
# Apply TH-Fail condition (if column value is less than the fail threshold)
aggregated_df = aggregated_df.withColumn(
f"{col_name}_THRESHOLD_FAIL",
f"{col_name}_THRESHOLD_FALL",
when(col(col_name) < fail_th, True).otherwise(False)
)
# Apply TH-RAISE condition (if column value is greater than the raise threshold)
......@@ -128,11 +128,11 @@ def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event,
# --- This will write output to Kafka: ACTUAL IMPLEMENTATION
query = thresholded_stream_data \
.selectExpr(f"'{key}' AS key", "to_json(struct(*)) AS value") \
.selectExpr(f"CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \
.option("topic", KafkaTopic.ANALYTICS_RESPONSE.value) \
.option("topic", KafkaTopic.ALARMS.value) \
.option("checkpointLocation", "analytics/.spark/checkpoint") \
.outputMode("update")
......
......@@ -12,6 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import json
from common.proto.kpi_manager_pb2 import KpiId
from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode,
Analyzer )
def get_kpi_id_list():
return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"]
......@@ -32,3 +37,37 @@ def get_threshold_dict():
return {
op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict
}
def create_analyzer():
_create_analyzer = Analyzer()
# _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
_kpi_id = KpiId()
# input IDs to analyze
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.input_kpi_ids.append(_kpi_id)
# output IDs after analysis
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter
_threshold_dict = {
# 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),
'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
_create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks"
_create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size
_create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet
return _create_analyzer
\ No newline at end of file
......@@ -12,12 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import time, json
from typing import Dict
import logging
import threading
from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
from .messages import create_analyzer
LOGGER = logging.getLogger(__name__)
......@@ -32,6 +34,24 @@ def test_validate_kafka_topics():
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
def test_StartSparkStreamer():
LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
analyzer_obj = create_analyzer()
analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
analyzer_to_generate : Dict = {
"algo_name" : analyzer_obj.algorithm_name,
"input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
"output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
"oper_mode" : analyzer_obj.operation_mode,
"thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
"window_size" : analyzer_obj.parameters["window_size"],
"window_slider" : analyzer_obj.parameters["window_slider"],
# "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
}
AnalyticsBackendServiceObj = AnalyticsBackendService()
response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate)
assert isinstance(response, bool)
# def test_StartRequestListener():
# LOGGER.info('test_RunRequestListener')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
......
......@@ -12,13 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, grpc, json, queue
import logging, grpc, json
from typing import Dict
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import KafkaError
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.context_pb2 import Empty
......@@ -27,8 +24,7 @@ from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, Analy
from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer
from analytics.database.Analyzer_DB import AnalyzerDB
from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC')
......@@ -36,19 +32,13 @@ METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC')
class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
def __init__(self):
LOGGER.info('Init AnalyticsFrontendService')
self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value
self.db_obj = AnalyzerDB()
self.result_queue = queue.Queue()
self.scheduler = BackgroundScheduler()
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StartAnalyzer(self,
request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore
) -> AnalyzerId: # type: ignore
) -> AnalyzerAlarms: # type: ignore
LOGGER.info ("At Service gRPC message: {:}".format(request))
response = AnalyzerId()
......@@ -56,7 +46,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
AnalyzerModel.ConvertAnalyzerToRow(request)
)
self.PublishStartRequestOnKafka(request)
response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid
return response
......@@ -83,63 +72,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
)
LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate))
self.kafka_producer.flush()
# self.StartResponseListener(analyzer_uuid)
def StartResponseListener(self, filter_key=None):
"""
Start the Kafka response listener with APScheduler and return key-value pairs periodically.
"""
LOGGER.info("Starting StartResponseListener")
# Schedule the ResponseListener at fixed intervals
self.scheduler.add_job(
self.response_listener,
trigger=IntervalTrigger(seconds=5),
args=[filter_key],
id=f"response_listener_{self.listener_topic}",
replace_existing=True
)
self.scheduler.start()
LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...")
try:
while True:
LOGGER.info("entering while...")
key, value = self.result_queue.get() # Wait until a result is available
LOGGER.info("In while true ...")
yield key, value # Yield the result to the calling function
except KeyboardInterrupt:
LOGGER.warning("Listener stopped manually.")
finally:
self.StopListener()
def response_listener(self, filter_key=None):
"""
Poll Kafka messages and put key-value pairs into the queue.
"""
LOGGER.info(f"Polling Kafka topic {self.listener_topic}...")
consumer = self.kafka_consumer
consumer.subscribe([self.listener_topic])
msg = consumer.poll(2.0)
if msg is None:
return
elif msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
LOGGER.error(f"Kafka error: {msg.error()}")
return
try:
key = msg.key().decode('utf-8') if msg.key() else None
if filter_key is not None and key == filter_key:
value = json.loads(msg.value().decode('utf-8'))
LOGGER.info(f"Received key: {key}, value: {value}")
self.result_queue.put((key, value))
else:
LOGGER.info(f"Skipping message with unmatched key: {key}")
# value = json.loads(msg.value().decode('utf-8')) # Added for debugging
# self.result_queue.put((filter_key, value)) # Added for debugging
except Exception as e:
LOGGER.error(f"Error processing Kafka message: {e}")
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StopAnalyzer(self,
......@@ -175,15 +107,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
)
LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid))
self.kafka_producer.flush()
self.StopListener()
def StopListener(self):
"""
Gracefully stop the Kafka listener and the scheduler.
"""
LOGGER.info("Stopping Kafka listener...")
self.scheduler.shutdown()
LOGGER.info("Kafka listener stopped.")
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectAnalyzers(self,
......@@ -203,12 +126,11 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
LOGGER.info('Unable to process filter response {:}'.format(e))
except Exception as e:
LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e))
def delivery_callback(self, err, msg):
if err:
LOGGER.debug('Message delivery failed: {:}'.format(err))
print ('Message delivery failed: {:}'.format(err))
# else:
# LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
# print('Message delivered to topic {:}'.format(msg.topic()))
else:
LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
print('Message delivered to topic {:}'.format(msg.topic()))
......@@ -25,7 +25,8 @@ from common.Settings import ( get_service_port_grpc, get_env_var_name,
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC )
from common.tools.kafka.Variables import KafkaTopic
from common.proto.analytics_frontend_pb2 import AnalyzerId, AnalyzerList
from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.analytics_frontend_pb2 import AnalyzerAlarms
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService
from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer,
......@@ -89,12 +90,13 @@ def test_validate_kafka_topics():
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
# ----- core funtionality test -----
# def test_StartAnalytics(analyticsFrontend_client):
# LOGGER.info(' >>> test_StartAnalytic START: <<< ')
# response = analyticsFrontend_client.StartAnalyzer(create_analyzer())
# LOGGER.debug(str(response))
# assert isinstance(response, AnalyzerId)
# # ----- core funtionality test -----
def test_StartAnalytics(analyticsFrontend_client):
LOGGER.info(' >>> test_StartAnalytic START: <<< ')
stream = analyticsFrontend_client.StartAnalyzer(create_analyzer())
for response in stream:
LOGGER.debug(str(response))
assert isinstance(response, KpiValue)
# To test start and stop listener together
def test_StartStopAnalyzers(analyticsFrontend_client):
......@@ -131,4 +133,4 @@ def test_StartStopAnalyzers(analyticsFrontend_client):
# class_obj = AnalyticsFrontendServiceServicerImpl()
# for response in class_obj.StartResponseListener(analyzer_id.analyzer_id.uuid):
# LOGGER.debug(response)
# assert isinstance(response, tuple)
\ No newline at end of file
# assert isinstance(response, tuple)
......@@ -15,17 +15,18 @@
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Empty
from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.proto.context_pb2 import Empty
from common.proto.kpi_manager_pb2 import KpiId
from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiAlarms
from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceStub
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 10
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 10
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class KpiValueApiClient:
......@@ -34,8 +35,8 @@ class KpiValueApiClient:
if not port: port = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None
self.stub = None
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
......@@ -46,18 +47,25 @@ class KpiValueApiClient:
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
self.stub = None
@RETRY_DECORATOR
def StoreKpiValues(self, request: KpiValueList) -> Empty:
def StoreKpiValues(self, request: KpiValueList) -> Empty: # type: ignore
LOGGER.debug('StoreKpiValues: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.StoreKpiValues(request)
LOGGER.debug('StoreKpiValues result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def SelectKpiValues(self, request: KpiValueFilter) -> KpiValueList:
def SelectKpiValues(self, request: KpiValueFilter) -> KpiValueList: # type: ignore
LOGGER.debug('SelectKpiValues: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.SelectKpiValues(request)
LOGGER.debug('SelectKpiValues result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def GetKpiAlarms(self, request: KpiId) -> KpiAlarms: # type: ignore
LOGGER.debug('GetKpiAlarms: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetKpiAlarms(request)
LOGGER.debug('GetKpiAlarms result: {:s}'.format(grpc_message_to_json_string(response)))
return response
......@@ -12,18 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, grpc, json
from datetime import datetime
import logging, grpc, json, queue
from typing import Dict
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import KafkaError
from common.proto.context_pb2 import Empty
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer
from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType
from common.proto.kpi_value_api_pb2 import KpiAlarms, KpiValueList, KpiValueFilter, KpiValue, KpiValueType
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from prometheus_api_client import PrometheusConnect
from prometheus_api_client.utils import parse_datetime
......@@ -37,8 +41,14 @@ PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TO
class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
def __init__(self):
LOGGER.debug('Init KpiValueApiService')
self.listener_topic = KafkaTopic.ALARMS.value
self.result_queue = queue.Queue()
self.scheduler = BackgroundScheduler()
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext
) -> Empty:
......@@ -109,17 +119,14 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
kpi_value = KpiValue()
kpi_value.kpi_id.kpi_id = record['metric']['__name__'],
kpi_value.timestamp = value[0],
kpi_value.kpi_value_type = self.ConverValueToKpiValueType(value[1])
kpi_value.kpi_value_type.CopyFrom(self.ConverValueToKpiValueType(value['kpi_value']))
response.kpi_value_list.append(kpi_value)
return response
def GetKpiSampleType(self, kpi_value: str, kpi_manager_client):
print("--- START -----")
kpi_id = KpiId()
kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid
# print("KpiId generated: {:}".format(kpi_id))
try:
kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
......@@ -135,26 +142,91 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e))
print ("Unable to get KpiDescriptor. Error: {:}".format(e))
def ConverValueToKpiValueType(self, value):
# Check if the value is an integer (int64)
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetKpiAlarms(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiAlarms: # type: ignore
"""
Get Alarms from Kafka return Alrams periodically.
"""
LOGGER.debug('GetKpiAlarms: {:}'.format(request))
response = KpiAlarms()
for alarm_key, value in self.StartResponseListener(request.kpi_id.uuid):
response.start_timestamp.timestamp = datetime.strptime(
value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
response.end_timestamp.timestamp = datetime.strptime(
value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
response.kpi_id.kpi_id.uuid = value['kpi_id']
for key, threshold in value.items():
if "THRESHOLD_" in key:
response.alarms[key] = threshold
yield response
def StartResponseListener(self, filter_key=None):
"""
Start the Kafka response listener with APScheduler and return key-value pairs periodically.
"""
LOGGER.info("Starting StartResponseListener")
# Schedule the ResponseListener at fixed intervals
self.scheduler.add_job(
self.response_listener,
trigger=IntervalTrigger(seconds=5),
args=[filter_key],
id=f"response_listener_{self.listener_topic}",
replace_existing=True
)
self.scheduler.start()
LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...")
try:
int_value = int(value)
return KpiValueType(int64Val=int_value)
except (ValueError, TypeError):
pass
# Check if the value is a float
while True:
LOGGER.info("entering while...")
key, value = self.result_queue.get() # Wait until a result is available
LOGGER.info("In while true ...")
yield key, value # Yield the result to the calling function
except KeyboardInterrupt:
LOGGER.warning("Listener stopped manually.")
finally:
self.StopListener()
def response_listener(self, filter_key=None):
"""
Poll Kafka messages and put key-value pairs into the queue.
"""
LOGGER.info(f"Polling Kafka topic {self.listener_topic}...")
consumer = self.kafka_consumer
consumer.subscribe([self.listener_topic])
msg = consumer.poll(2.0)
if msg is None:
return
elif msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
LOGGER.error(f"Kafka error: {msg.error()}")
return
try:
float_value = float(value)
return KpiValueType(floatVal=float_value)
except (ValueError, TypeError):
pass
# Check if the value is a boolean
if value.lower() in ['true', 'false']:
bool_value = value.lower() == 'true'
return KpiValueType(boolVal=bool_value)
# If none of the above, treat it as a string
return KpiValueType(stringVal=value)
key = msg.key().decode('utf-8') if msg.key() else None
if filter_key is not None and key == filter_key:
value = json.loads(msg.value().decode('utf-8'))
LOGGER.info(f"Received key: {key}, value: {value}")
self.result_queue.put((key, value))
else:
LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}")
except Exception as e:
LOGGER.error(f"Error processing Kafka message: {e}")
def delivery_callback(self, err, msg):
if err: LOGGER.debug('Message delivery failed: {:}'.format(err))
else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
def ConverValueToKpiValueType(self, value):
kpi_value_type = KpiValueType()
if isinstance(value, int):
kpi_value_type.int32Val = value
elif isinstance(value, float):
kpi_value_type.floatVal = value
elif isinstance(value, str):
kpi_value_type.stringVal = value
elif isinstance(value, bool):
kpi_value_type.boolVal = value
# Add other checks for different types as needed
return kpi_value_type
......@@ -13,9 +13,16 @@
# limitations under the License.
import uuid, time
from common.proto import kpi_manager_pb2
from common.proto.kpi_value_api_pb2 import KpiValue, KpiValueList
def create_kpi_id_request():
_create_kpi_id = kpi_manager_pb2.KpiId()
_create_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
# _create_kpi_id.kpi_id.uuid = str(uuid.uuid4())
return _create_kpi_id
def create_kpi_value_list():
_create_kpi_value_list = KpiValueList()
# To run this experiment sucessfully, add an existing UUID of a KPI Descriptor from the KPI DB.
......
......@@ -21,8 +21,8 @@ from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from kpi_value_api.service.KpiValueApiService import KpiValueApiService
from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient
from kpi_value_api.tests.messages import create_kpi_value_list
from kpi_value_api.tests.messages import create_kpi_value_list, create_kpi_id_request
from common.proto.kpi_value_api_pb2 import KpiAlarms
LOCAL_HOST = '127.0.0.1'
KPIVALUEAPI_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI) # type: ignore
......
......@@ -83,7 +83,7 @@ class TelemetryBackendService(GenericGrpcService):
thread.join()
print ("Terminating backend (by StopCollector): Collector Id: ", collector_id)
del self.running_threads[collector_id]
self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend.
self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend.
else:
print ('Backend collector {:} not found'.format(collector_id))
......@@ -103,11 +103,28 @@ class TelemetryBackendService(GenericGrpcService):
while not stop_event.is_set():
if time.time() - start_time >= collector['duration']: # condition to terminate backend
print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time)
self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend.
self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend.
break
self.ExtractKpiValue(collector_id, collector['kpi_id'])
time.sleep(collector['interval'])
def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
"""
Method to write kpi Termination signat on RESPONSE Kafka topic
"""
producer = self.kafka_producer
kpi_value : Dict = {
"kpi_id" : kpi_id,
"kpi_value" : measured_kpi_value,
}
producer.produce(
KafkaTopic.RESPONSE.value, # TODO: to the topic ...
key = collector_id,
value = json.dumps(kpi_value),
callback = self.delivery_callback
)
producer.flush()
def ExtractKpiValue(self, collector_id: str, kpi_id: str):
"""
Method to extract kpi value.
......@@ -123,117 +140,27 @@ class TelemetryBackendService(GenericGrpcService):
"""
producer = self.kafka_producer
kpi_value : Dict = {
"time_stamp": str(time.time()),
"kpi_id" : kpi_id,
"kpi_value" : measured_kpi_value
}
producer.produce(
KafkaTopic.RESPONSE.value,
KafkaTopic.VALUE.value, # TODO: to the topic ...
key = collector_id,
value = json.dumps(kpi_value),
callback = self.delivery_callback
)
producer.flush()
def GenerateRawMetric(self, metrics: Any):
"""
Method writes raw metrics on VALUE Kafka topic
"""
producer = self.kafka_producer
some_metric : Dict = {
"some_id" : metrics
}
producer.produce(
KafkaTopic.VALUE.value,
key = 'raw',
value = json.dumps(some_metric),
callback = self.delivery_callback
)
producer.flush()
def delivery_callback(self, err, msg):
"""
Callback function to handle message delivery status.
Args: err (KafkaError): Kafka error object.
msg (Message): Kafka message object.
"""
if err: print(f'Message delivery failed: {err}')
# else: print(f'Message delivered to topic {msg.topic()}')
# # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
# @staticmethod
# def fetch_single_node_exporter_metric():
# """
# Method to fetch metrics from Node Exporter.
# Returns:
# str: Metrics fetched from Node Exporter.
# """
# KPI = "node_network_receive_packets_total"
# try:
# response = requests.get(EXPORTER_ENDPOINT) # type: ignore
# LOGGER.info("Request status {:}".format(response))
# if response.status_code == 200:
# # print(f"Metrics fetched sucessfully...")
# metrics = response.text
# # Check if the desired metric is available in the response
# if KPI in metrics:
# KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI)
# # Extract the metric value
# if KPI_VALUE is not None:
# LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE))
# print(f"Extracted value of {KPI} is: {KPI_VALUE}")
# return KPI_VALUE
# else:
# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code))
# # print(f"Failed to fetch metrics. Status code: {response.status_code}")
# return None
# except Exception as e:
# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
# # print(f"Failed to fetch metrics: {str(e)}")
# return None
# @staticmethod
# def extract_metric_value(metrics, metric_name):
# """
# Method to extract the value of a metric from the metrics string.
# Args:
# metrics (str): Metrics string fetched from Exporter.
# metric_name (str): Name of the metric to extract.
# Returns:
# float: Value of the extracted metric, or None if not found.
# """
# try:
# # Find the metric line containing the desired metric name
# metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
# # Split the line to extract the metric value
# metric_value = float(metric_line.split()[1])
# return metric_value
# except StopIteration:
# print(f"Metric '{metric_name}' not found in the metrics.")
# return None
# @staticmethod
# def stream_node_export_metrics_to_raw_topic():
# try:
# while True:
# response = requests.get(EXPORTER_ENDPOINT)
# # print("Response Status {:} ".format(response))
# # LOGGER.info("Response Status {:} ".format(response))
# try:
# if response.status_code == 200:
# producerObj = KafkaProducer(PRODUCER_CONFIG)
# producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback)
# producerObj.flush()
# LOGGER.info("Produce to topic")
# else:
# LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code))
# print(f"Didn't received expected response. Status code: {response.status_code}")
# return None
# time.sleep(15)
# except Exception as e:
# LOGGER.info("Failed to process response. Status code: {:}".format(e))
# return None
# except Exception as e:
# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
# print(f"Failed to fetch metrics: {str(e)}")
# return None
# # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------
\ No newline at end of file
if err:
LOGGER.debug('Message delivery failed: {:}'.format(err))
print(f'Message delivery failed: {err}')
else:
LOGGER.info('Message delivered to topic {:}'.format(msg.topic()))
print(f'Message delivered to topic {msg.topic()}')
......@@ -13,6 +13,7 @@
# limitations under the License.
import logging
import threading
from common.tools.kafka.Variables import KafkaTopic
from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService
......@@ -25,14 +26,13 @@ LOGGER = logging.getLogger(__name__)
###########################
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
# def test_validate_kafka_topics():
# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
# response = KafkaTopic.create_all_topics()
# assert isinstance(response, bool)
def test_RunRequestListener():
LOGGER.info('test_RunRequestListener')
TelemetryBackendServiceObj = TelemetryBackendService()
response = TelemetryBackendServiceObj.RunRequestListener()
response = threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start()
LOGGER.debug(str(response))
assert isinstance(response, bool)
......@@ -161,9 +161,8 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
# ---------- Independent Method ---------------
# Listener method is independent of any method (same lifetime as service)
# continously listens for responses
def RunResponseListener(self):
def install_servicers(self):
threading.Thread(target=self.ResponseListener).start()
return True
def ResponseListener(self):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment