Skip to content
Snippets Groups Projects
Commit ce7fbb40 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Telemetry Backend Re-structuring

- BackendService is restructured according to the design in report.
- ResponseListener is added in Frontend
- Improvements in test and messages files
parent 9605c9a1
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
......@@ -12,64 +12,52 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import json
import time
import random
import logging
import requests
import threading
from typing import Any, Tuple
from typing import Any, Dict
from common.proto.context_pb2 import Empty
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaException
from confluent_kafka import KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.method_wrappers.Decorator import MetricsPool
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend')
KAFKA_SERVER_IP = '127.0.0.1:9092'
# KAFKA_SERVER_IP = '10.152.183.175:30092'
ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response',
'raw' : 'topic_raw' , 'labeled' : 'topic_labeled'}
EXPORTER_ENDPOINT = "http://10.152.183.2:9100/metrics"
PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,}
METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService')
# EXPORTER_ENDPOINT = "http://10.152.183.2:9100/metrics"
class TelemetryBackendService:
"""
Class to listens for request on Kafka topic, fetches metrics and produces measured values to another Kafka topic.
Class listens for request on Kafka topic, fetches requested metrics from device.
Produces metrics on both RESPONSE and VALUE kafka topics.
"""
def __init__(self):
LOGGER.info('Init TelemetryBackendService')
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value,
'group.id' : 'backend',
'auto.offset.reset' : 'latest'})
self.running_threads = {}
def run_kafka_listener(self)->bool:
threading.Thread(target=self.kafka_listener).start()
return True
def kafka_listener(self):
def RunRequestListener(self)->bool:
threading.Thread(target=self.RequestListener).start()
return True
def RequestListener(self):
"""
listener for requests on Kafka topic.
"""
conusmer_configs = {
'bootstrap.servers' : KAFKA_SERVER_IP,
'group.id' : 'backend',
'auto.offset.reset' : 'latest'
}
# topic_request = "topic_request"
consumerObj = KafkaConsumer(conusmer_configs)
# consumerObj.subscribe([topic_request])
consumerObj.subscribe([KAFKA_TOPICS['request']])
consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.REQUEST.value])
while True:
receive_msg = consumerObj.poll(2.0)
receive_msg = consumer.poll(2.0)
if receive_msg is None:
# print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", KAFKA_TOPICS['request']) # added for debugging purposes
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
......@@ -77,177 +65,159 @@ class TelemetryBackendService:
else:
print("Consumer error: {}".format(receive_msg.error()))
break
(kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8'))
collector = json.loads(receive_msg.value().decode('utf-8'))
collector_id = receive_msg.key().decode('utf-8')
if duration == -1 and interval == -1:
self.terminate_collector_backend(collector_id)
# threading.Thread(target=self.terminate_collector_backend, args=(collector_id))
LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))
print('Recevied Collector: {:} - {:}'.format(collector_id, collector))
if collector['duration'] == -1 and collector['interval'] == -1:
self.TerminateCollectorBackend(collector_id)
else:
self.run_initiate_collector_backend(collector_id, kpi_id, duration, interval)
self.RunInitiateCollectorBackend(collector_id, collector)
def TerminateCollectorBackend(self, collector_id):
if collector_id in self.running_threads:
thread, stop_event = self.running_threads[collector_id]
stop_event.set()
thread.join()
print ("Terminating backend (by StopCollector): Collector Id: ", collector_id)
del self.running_threads[collector_id]
self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend.
else:
print ('Backend collector {:} not found'.format(collector_id))
def run_initiate_collector_backend(self, collector_id: str, kpi_id: str, duration: int, interval: int):
def RunInitiateCollectorBackend(self, collector_id: str, collector: str):
stop_event = threading.Event()
thread = threading.Thread(target=self.initiate_collector_backend,
args=(collector_id, kpi_id, duration, interval, stop_event))
thread = threading.Thread(target=self.InitiateCollectorBackend,
args=(collector_id, collector, stop_event))
self.running_threads[collector_id] = (thread, stop_event)
thread.start()
def initiate_collector_backend(self, collector_id, kpi_id, duration, interval, stop_event
): # type: ignore
def InitiateCollectorBackend(self, collector_id, collector, stop_event):
"""
Method to receive collector request attribues and initiates collecter backend.
Method receives collector request and initiates collecter backend.
"""
print("Initiating backend for collector: ", collector_id)
start_time = time.time()
while not stop_event.is_set():
if time.time() - start_time >= duration: # condition to terminate backend
if time.time() - start_time >= collector['duration']: # condition to terminate backend
print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time)
self.generate_kafka_response(collector_id, "-1", -1)
# write to Kafka to send the termination confirmation.
self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend.
break
# print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval)
self.extract_kpi_value(collector_id, kpi_id)
# print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval)
time.sleep(interval)
self.ExtractKpiValue(collector_id, collector['kpi_id'])
time.sleep(collector['interval'])
def extract_kpi_value(self, collector_id: str, kpi_id: str):
def ExtractKpiValue(self, collector_id: str, kpi_id: str):
"""
Method to extract kpi value.
"""
measured_kpi_value = random.randint(1,100) # Should be extracted from exporter/stream
# measured_kpi_value = self.fetch_node_exporter_metrics() # exporter extracted metric value against default KPI
self.generate_kafka_response(collector_id, kpi_id , measured_kpi_value)
measured_kpi_value = random.randint(1,100) # TODO: To be extracted from a device
print ("Measured Kpi value: {:}".format(measured_kpi_value))
# measured_kpi_value = self.fetch_node_exporter_metrics() # exporter extracted metric value against default KPI
self.GenerateCollectorResponse(collector_id, kpi_id , measured_kpi_value)
def generate_kafka_response(self, collector_id: str, kpi_id: str, kpi_value: Any):
def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
"""
Method to write response on Kafka topic
"""
# topic_response = "topic_response"
msg_value : Tuple [str, Any] = (kpi_id, kpi_value)
msg_key = collector_id
producerObj = KafkaProducer(PRODUCER_CONFIG)
# producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
producerObj.produce(KAFKA_TOPICS['response'], key=msg_key, value= str(msg_value), callback=TelemetryBackendService.delivery_callback)
producerObj.flush()
def terminate_collector_backend(self, collector_id):
if collector_id in self.running_threads:
thread, stop_event = self.running_threads[collector_id]
stop_event.set()
thread.join()
print ("Terminating backend (by StopCollector): Collector Id: ", collector_id)
del self.running_threads[collector_id]
self.generate_kafka_response(collector_id, "-1", -1)
def create_topic_if_not_exists(self, new_topics: list) -> bool:
"""
Method to create Kafka topic if it does not exist.
Args:
admin_client (AdminClient): Kafka admin client.
"""
for topic in new_topics:
try:
topic_metadata = ADMIN_KAFKA_CLIENT.list_topics(timeout=5)
if topic not in topic_metadata.topics:
# If the topic does not exist, create a new topic
print(f"Topic '{topic}' does not exist. Creating...")
LOGGER.warning("Topic {:} does not exist. Creating...".format(topic))
new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
ADMIN_KAFKA_CLIENT.create_topics([new_topic])
except KafkaException as e:
print(f"Failed to create topic: {e}")
return False
return True
producer = self.kafka_producer
kpi_value : Dict = {
"kpi_id" : kpi_id,
"kpi_value" : measured_kpi_value
}
producer.produce(
KafkaTopic.RESPONSE.value,
key = collector_id,
value = json.dumps(kpi_value),
callback = self.delivery_callback
)
producer.flush()
@staticmethod
def delivery_callback( err, msg):
def delivery_callback(self, err, msg):
"""
Callback function to handle message delivery status.
Args:
err (KafkaError): Kafka error object.
msg (Message): Kafka message object.
"""
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to topic {msg.topic()}')
# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
@staticmethod
def fetch_single_node_exporter_metric():
"""
Method to fetch metrics from Node Exporter.
Returns:
str: Metrics fetched from Node Exporter.
"""
KPI = "node_network_receive_packets_total"
try:
response = requests.get(EXPORTER_ENDPOINT) # type: ignore
LOGGER.info("Request status {:}".format(response))
if response.status_code == 200:
# print(f"Metrics fetched sucessfully...")
metrics = response.text
# Check if the desired metric is available in the response
if KPI in metrics:
KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI)
# Extract the metric value
if KPI_VALUE is not None:
LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE))
print(f"Extracted value of {KPI} is: {KPI_VALUE}")
return KPI_VALUE
else:
LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code))
# print(f"Failed to fetch metrics. Status code: {response.status_code}")
return None
except Exception as e:
LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
# print(f"Failed to fetch metrics: {str(e)}")
return None
@staticmethod
def extract_metric_value(metrics, metric_name):
"""
Method to extract the value of a metric from the metrics string.
Args:
metrics (str): Metrics string fetched from Exporter.
metric_name (str): Name of the metric to extract.
Returns:
float: Value of the extracted metric, or None if not found.
"""
try:
# Find the metric line containing the desired metric name
metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
# Split the line to extract the metric value
metric_value = float(metric_line.split()[1])
return metric_value
except StopIteration:
print(f"Metric '{metric_name}' not found in the metrics.")
return None
@staticmethod
def stream_node_export_metrics_to_raw_topic():
try:
while True:
response = requests.get(EXPORTER_ENDPOINT)
# print("Response Status {:} ".format(response))
# LOGGER.info("Response Status {:} ".format(response))
try:
if response.status_code == 200:
producerObj = KafkaProducer(PRODUCER_CONFIG)
producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback)
producerObj.flush()
LOGGER.info("Produce to topic")
else:
LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code))
print(f"Didn't received expected response. Status code: {response.status_code}")
return None
time.sleep(15)
except Exception as e:
LOGGER.info("Failed to process response. Status code: {:}".format(e))
return None
except Exception as e:
LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
print(f"Failed to fetch metrics: {str(e)}")
return None
# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------
\ No newline at end of file
Args: err (KafkaError): Kafka error object.
msg (Message): Kafka message object.
"""
if err: print(f'Message delivery failed: {err}')
# else: print(f'Message delivered to topic {msg.topic()}')
# # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
# @staticmethod
# def fetch_single_node_exporter_metric():
# """
# Method to fetch metrics from Node Exporter.
# Returns:
# str: Metrics fetched from Node Exporter.
# """
# KPI = "node_network_receive_packets_total"
# try:
# response = requests.get(EXPORTER_ENDPOINT) # type: ignore
# LOGGER.info("Request status {:}".format(response))
# if response.status_code == 200:
# # print(f"Metrics fetched sucessfully...")
# metrics = response.text
# # Check if the desired metric is available in the response
# if KPI in metrics:
# KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI)
# # Extract the metric value
# if KPI_VALUE is not None:
# LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE))
# print(f"Extracted value of {KPI} is: {KPI_VALUE}")
# return KPI_VALUE
# else:
# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code))
# # print(f"Failed to fetch metrics. Status code: {response.status_code}")
# return None
# except Exception as e:
# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
# # print(f"Failed to fetch metrics: {str(e)}")
# return None
# @staticmethod
# def extract_metric_value(metrics, metric_name):
# """
# Method to extract the value of a metric from the metrics string.
# Args:
# metrics (str): Metrics string fetched from Exporter.
# metric_name (str): Name of the metric to extract.
# Returns:
# float: Value of the extracted metric, or None if not found.
# """
# try:
# # Find the metric line containing the desired metric name
# metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
# # Split the line to extract the metric value
# metric_value = float(metric_line.split()[1])
# return metric_value
# except StopIteration:
# print(f"Metric '{metric_name}' not found in the metrics.")
# return None
# @staticmethod
# def stream_node_export_metrics_to_raw_topic():
# try:
# while True:
# response = requests.get(EXPORTER_ENDPOINT)
# # print("Response Status {:} ".format(response))
# # LOGGER.info("Response Status {:} ".format(response))
# try:
# if response.status_code == 200:
# producerObj = KafkaProducer(PRODUCER_CONFIG)
# producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback)
# producerObj.flush()
# LOGGER.info("Produce to topic")
# else:
# LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code))
# print(f"Didn't received expected response. Status code: {response.status_code}")
# return None
# time.sleep(15)
# except Exception as e:
# LOGGER.info("Failed to process response. Status code: {:}".format(e))
# return None
# except Exception as e:
# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
# print(f"Failed to fetch metrics: {str(e)}")
# return None
# # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------
\ No newline at end of file
......@@ -12,15 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
print (sys.path)
sys.path.append('/home/tfs/tfs-ctrl')
import threading
import logging
from typing import Tuple
# from common.proto.context_pb2 import Empty
from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService
LOGGER = logging.getLogger(__name__)
......@@ -28,26 +23,9 @@ LOGGER = logging.getLogger(__name__)
# Tests Implementation of Telemetry Backend
###########################
def test_verify_kafka_topics():
LOGGER.info('test_verify_kafka_topics requesting')
def test_RunRequestListener():
LOGGER.info('test_RunRequestListener')
TelemetryBackendServiceObj = TelemetryBackendService()
KafkaTopics = ['topic_request', 'topic_response', 'topic_raw', 'topic_labled']
response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics)
response = TelemetryBackendServiceObj.RunRequestListener()
LOGGER.debug(str(response))
assert isinstance(response, bool)
# def test_run_kafka_listener():
# LOGGER.info('test_receive_kafka_request requesting')
# TelemetryBackendServiceObj = TelemetryBackendService()
# response = TelemetryBackendServiceObj.run_kafka_listener()
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
# def test_fetch_node_exporter_metrics():
# LOGGER.info(' >>> test_fetch_node_exporter_metrics START <<< ')
# TelemetryBackendService.fetch_single_node_exporter_metric()
def test_stream_node_export_metrics_to_raw_topic():
LOGGER.info(' >>> test_stream_node_export_metrics_to_raw_topic START <<< ')
threading.Thread(target=TelemetryBackendService.stream_node_export_metrics_to_raw_topic, args=()).start()
......@@ -12,25 +12,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import json
import threading
from typing import Tuple, Any
from typing import Any, Dict
import grpc
import logging
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import KafkaError
from common.proto.context_pb2 import Empty
from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, CollectorFilter, CollectorList
from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer
from telemetry.database.TelemetryModel import Collector as CollectorModel
from telemetry.database.Telemetry_DB import TelemetryDB
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import KafkaError
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('TelemetryFrontend', 'NBIgRPC')
......@@ -46,8 +46,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
'group.id' : 'frontend',
'auto.offset.reset' : 'latest'})
# --->>> SECTION: StartCollector with all helper methods <<<---
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StartCollector(self,
request : Collector, grpc_context: grpc.ServicerContext # type: ignore
......@@ -55,101 +54,35 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
LOGGER.info ("gRPC message: {:}".format(request))
response = CollectorId()
# TODO: Verify the presence of Kpi ID in KpiDB or assume that KPI ID already exists.
# TODO: Verify the presence of Kpi ID in KpiDB or assume that KPI ID already exists?
self.tele_db_obj.add_row_to_db(
CollectorModel.ConvertCollectorToRow(request)
)
self.PublishRequestOnKafka(request)
self.PublishStartRequestOnKafka(request)
response.collector_id.uuid = request.collector_id.collector_id.uuid # type: ignore
response.collector_id.uuid = request.collector_id.collector_id.uuid
return response
def PublishRequestOnKafka(self, collector_obj):
def PublishStartRequestOnKafka(self, collector_obj):
"""
Method to generate collector request on Kafka.
"""
collector_uuid = collector_obj.collector_id.collector_id.uuid
collector_to_generate : Tuple [str, int, int] = (
collector_obj.kpi_id.kpi_id.uuid,
collector_obj.duration_s,
collector_obj.interval_s
)
collector_to_generate : Dict = {
"kpi_id" : collector_obj.kpi_id.kpi_id.uuid,
"duration": collector_obj.duration_s,
"interval": collector_obj.interval_s
}
self.kafka_producer.produce(
KafkaTopic.REQUEST.value,
key = collector_uuid,
value = str(collector_to_generate),
value = json.dumps(collector_to_generate),
callback = self.delivery_callback
)
LOGGER.info("Collector Request Generated: Collector Id: {:}, Value: {:}".format(collector_uuid, collector_to_generate))
ACTIVE_COLLECTORS.append(collector_uuid)
self.kafka_producer.flush()
def run_kafka_listener(self):
# print ("--- STARTED: run_kafka_listener ---")
threading.Thread(target=self.kafka_listener).start()
return True
def kafka_listener(self):
"""
listener for response on Kafka topic.
"""
# # print ("--- STARTED: kafka_listener ---")
# conusmer_configs = {
# 'bootstrap.servers' : KAFKA_SERVER_IP,
# 'group.id' : 'frontend',
# 'auto.offset.reset' : 'latest'
# }
# # topic_response = "topic_response"
# consumerObj = KafkaConsumer(conusmer_configs)
self.kafka_consumer.subscribe([KAFKA_TOPICS['response']])
# print (time.time())
while True:
receive_msg = self.kafka_consumer.poll(2.0)
if receive_msg is None:
# print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['response']) # added for debugging purposes
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
break
try:
collector_id = receive_msg.key().decode('utf-8')
if collector_id in ACTIVE_COLLECTORS:
(kpi_id, kpi_value) = ast.literal_eval(receive_msg.value().decode('utf-8'))
self.process_response(collector_id, kpi_id, kpi_value)
else:
print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ")
except Exception as e:
print(f"No message key found: {str(e)}")
continue
# return None
def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any):
if kpi_id == "-1" and kpi_value == -1:
# LOGGER.info("Sucessfully terminated Collector: {:}".format(collector_id))
print ("Sucessfully terminated Collector: ", collector_id)
else:
print ("Frontend-Received values Collector Id:", collector_id, "-KPI:", kpi_id, "-VALUE:", kpi_value)
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def delivery_callback(self, err, msg):
"""
Callback function to handle message delivery status.
Args:
err (KafkaError): Kafka error object.
msg (Message): Kafka message object.
"""
if err:
LOGGER.debug('Message delivery failed: {:}'.format(err))
print('Message delivery failed: {:}'.format(err))
else:
LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
print('Message delivered to topic {:}'.format(msg.topic()))
# <<<--- SECTION: StopCollector with all helper methods --->>>
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StopCollector(self,
......@@ -164,13 +97,15 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
Method to generate stop collector request on Kafka.
"""
collector_uuid = collector_id.collector_id.uuid
collector_to_stop : Tuple [str, int, int] = (
collector_uuid , -1, -1
)
collector_to_stop : Dict = {
"kpi_id" : collector_uuid,
"duration": -1,
"interval": -1
}
self.kafka_producer.produce(
KafkaTopic.REQUEST.value,
key = collector_uuid,
value = str(collector_to_stop),
value = json.dumps(collector_to_stop),
callback = self.delivery_callback
)
LOGGER.info("Collector Stop Request Generated: Collector Id: {:}, Value: {:}".format(collector_uuid, collector_to_stop))
......@@ -180,6 +115,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
LOGGER.warning('Collector ID {:} not found in active collector list'.format(collector_uuid))
self.kafka_producer.flush()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectCollectors(self,
request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore
......@@ -199,3 +135,57 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
except Exception as e:
LOGGER.info('Unable to process filter response {:}'.format(e))
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def delivery_callback(self, err, msg):
"""
Callback function to handle message delivery status.
Args:
err (KafkaError): Kafka error object.
msg (Message): Kafka message object.
"""
if err:
LOGGER.debug('Message delivery failed: {:}'.format(err))
print('Message delivery failed: {:}'.format(err))
# else:
# LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
# print('Message delivered to topic {:}'.format(msg.topic()))
# ---------- Independent Method ---------------
# Listener method is independent of any method (same lifetime as service)
# continously listens for responses
def RunResponseListener(self):
threading.Thread(target=self.ResponseListener).start()
return True
def ResponseListener(self):
"""
listener for response on Kafka topic.
"""
self.kafka_consumer.subscribe([KafkaTopic.RESPONSE.value])
while True:
receive_msg = self.kafka_consumer.poll(2.0)
if receive_msg is None:
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
break
try:
collector_id = receive_msg.key().decode('utf-8')
if collector_id in ACTIVE_COLLECTORS:
kpi_value = json.loads(receive_msg.value().decode('utf-8'))
self.process_response(collector_id, kpi_value['kpi_id'], kpi_value['kpi_value'])
else:
print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ")
except Exception as e:
print(f"Error extarcting msg key or value: {str(e)}")
continue
def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any):
if kpi_id == "-1" and kpi_value == -1:
print ("Backend termination confirmation for collector id: ", collector_id)
else:
print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
......@@ -28,6 +28,7 @@ from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendC
from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService
from telemetry.frontend.tests.Messages import (
create_collector_request, create_collector_id, create_collector_filter)
from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl
###########################
......@@ -98,6 +99,13 @@ def test_SelectCollectors(telemetryFrontend_client):
LOGGER.debug(str(response))
assert isinstance(response, CollectorList)
def test_RunResponseListener():
LOGGER.info(' >>> test_RunResponseListener START: <<< ')
TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl()
response = TelemetryFrontendServiceObj.RunResponseListener() # becasue Method "run_kafka_listener" is not define in frontend.proto
LOGGER.debug(str(response))
assert isinstance(response, bool)
# ------- previous test ----------------
# def test_verify_db_and_table():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment