Skip to content
Snippets Groups Projects
Commit 178b72d0 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

StartCollector is working fine with unique communication identification logic.

parent 0bcd283f
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!207Resolve "(CTTC) Separation of Monitoring"
...@@ -19,7 +19,7 @@ import random ...@@ -19,7 +19,7 @@ import random
import logging import logging
import requests import requests
import threading import threading
from typing import Tuple from typing import Any, Tuple
from common.proto.context_pb2 import Empty from common.proto.context_pb2 import Empty
from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Consumer as KafkaConsumer
...@@ -32,6 +32,8 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m ...@@ -32,6 +32,8 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend') METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend')
KAFKA_SERVER_IP = '127.0.0.1:9092' KAFKA_SERVER_IP = '127.0.0.1:9092'
ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
ACTIVE_COLLECTORS = []
class TelemetryBackendService: class TelemetryBackendService:
""" """
...@@ -41,6 +43,10 @@ class TelemetryBackendService: ...@@ -41,6 +43,10 @@ class TelemetryBackendService:
def __init__(self): def __init__(self):
LOGGER.info('Init TelemetryBackendService') LOGGER.info('Init TelemetryBackendService')
def run_kafka_listener(self)->bool: # type: ignore
threading.Thread(target=self.kafka_listener).start()
return True
def kafka_listener(self): def kafka_listener(self):
""" """
listener for requests on Kafka topic. listener for requests on Kafka topic.
...@@ -51,14 +57,14 @@ class TelemetryBackendService: ...@@ -51,14 +57,14 @@ class TelemetryBackendService:
'auto.offset.reset' : 'latest' 'auto.offset.reset' : 'latest'
} }
topic_request = "topic_request" topic_request = "topic_request"
if (self.create_topic_if_not_exists(topic_request)): if (self.create_topic_if_not_exists([topic_request])):
consumerObj = KafkaConsumer(conusmer_configs) consumerObj = KafkaConsumer(conusmer_configs)
consumerObj.subscribe([topic_request]) consumerObj.subscribe([topic_request])
while True: while True:
receive_msg = consumerObj.poll(2.0) receive_msg = consumerObj.poll(2.0)
if receive_msg is None: if receive_msg is None:
print ("Telemetry backend is listening on Kafka Topic: ", topic_request) # added for debugging purposes print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", topic_request) # added for debugging purposes
continue continue
elif receive_msg.error(): elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF: if receive_msg.error().code() == KafkaError._PARTITION_EOF:
...@@ -67,40 +73,38 @@ class TelemetryBackendService: ...@@ -67,40 +73,38 @@ class TelemetryBackendService:
print("Consumer error: {}".format(receive_msg.error())) print("Consumer error: {}".format(receive_msg.error()))
break break
(kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8')) (kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8'))
self.execute_initiate_collector_backend(kpi_id, duration, interval) collector_id = receive_msg.key().decode('utf-8')
self.run_initiate_collector_backend(collector_id, kpi_id, duration, interval)
def run_kafka_listener(self)->bool: # type: ignore
threading.Thread(target=self.kafka_listener).start()
return True
def initiate_collector_backend(self, kpi_id, duration, interval
def run_initiate_collector_backend(self, collector_id: str, kpi_id: str, duration: int, interval: int):
threading.Thread(target=self.initiate_collector_backend, args=(collector_id, kpi_id, duration, interval)).start()
def initiate_collector_backend(self, collector_id, kpi_id, duration, interval
): # type: ignore ): # type: ignore
""" """
Method to receive collector request attribues and initiates collecter backend. Method to receive collector request attribues and initiates collecter backend.
""" """
start_time = time.time() start_time = time.time()
while True: while True:
ACTIVE_COLLECTORS.append(collector_id)
if time.time() - start_time >= duration: # type: ignore if time.time() - start_time >= duration: # type: ignore
print("Requested Execution Time Completed: \n --- Consumer terminating: KPI ID: ", kpi_id, " - ", time.time() - start_time) print("Requested Execution Time Completed: \n --- Consumer terminating: KPI ID: ", kpi_id, " - ", time.time() - start_time)
break break
# print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval) # print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval)
self.extract_kpi_value(kpi_id) self.extract_kpi_value(collector_id, kpi_id)
# print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval) # print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval)
time.sleep(interval) time.sleep(interval)
def execute_initiate_collector_backend(self, kpi_id: str, duration: int, interval: int):
threading.Thread(target=self.initiate_collector_backend, args=(kpi_id, duration, interval)).start()
def extract_kpi_value(self, kpi_id: str): def extract_kpi_value(self, collector_id: str, kpi_id: str):
""" """
Method to extract kpi value. Method to extract kpi value.
""" """
measured_kpi_value = random.randint(1,100) measured_kpi_value = random.randint(1,100)
self.generate_kafka_reply(kpi_id , measured_kpi_value) self.generate_kafka_response(collector_id, kpi_id , measured_kpi_value)
def generate_kafka_reply(self, kpi_id: str, kpi_value: any): def generate_kafka_response(self, collector_id: str, kpi_id: str, kpi_value: Any):
""" """
Method to write response on Kafka topic Method to write response on Kafka topic
""" """
...@@ -108,33 +112,32 @@ class TelemetryBackendService: ...@@ -108,33 +112,32 @@ class TelemetryBackendService:
'bootstrap.servers': KAFKA_SERVER_IP, 'bootstrap.servers': KAFKA_SERVER_IP,
} }
topic_response = "topic_response" topic_response = "topic_response"
if (self.create_topic_if_not_exists(topic_response)): msg_value : Tuple [str, Any] = (kpi_id, kpi_value)
msg_value = Tuple [str, any] msg_key = collector_id
msg_value = (kpi_id, kpi_value) producerObj = KafkaProducer(producer_configs)
msg_key = "111" # to be fetch from db??? producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
producerObj.flush()
producerObj = KafkaProducer(producer_configs) def create_topic_if_not_exists(self, new_topics: list):
producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
producerObj.flush()
def create_topic_if_not_exists(self, new_topic_name: str):
""" """
Method to create Kafka topic if it does not exist. Method to create Kafka topic if it does not exist.
Args: Args:
admin_client (AdminClient): Kafka admin client. admin_client (AdminClient): Kafka admin client.
""" """
admin_kafka_client = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) for topic in new_topics:
try: try:
topic_metadata = admin_kafka_client.list_topics(timeout=5) topic_metadata = ADMIN_KAFKA_CLIENT.list_topics(timeout=5)
if new_topic_name not in topic_metadata.topics: if topic not in topic_metadata.topics:
# If the topic does not exist, create a new topic # If the topic does not exist, create a new topic
print(f"Topic '{new_topic_name}' does not exist. Creating...") print(f"Topic '{topic}' does not exist. Creating...")
new_topic = NewTopic(new_topic_name, num_partitions=1, replication_factor=1) new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
admin_kafka_client.create_topics([new_topic]) ADMIN_KAFKA_CLIENT.create_topics([new_topic])
return True return True
except KafkaException as e: except KafkaException as e:
print(f"Failed to create topic: {e}") print(f"Failed to create topic: {e}")
return False return False
self.verify_required_kafka_topics()
def delivery_callback(self, err, msg): def delivery_callback(self, err, msg):
""" """
...@@ -148,7 +151,21 @@ class TelemetryBackendService: ...@@ -148,7 +151,21 @@ class TelemetryBackendService:
else: else:
print(f'Message delivered to topic {msg.topic()}') print(f'Message delivered to topic {msg.topic()}')
# Function to create a list of topics
# Function to list all topics in the Kafka cluster
def verify_required_kafka_topics(self) -> list:
"""List all topics in the Kafka cluster."""
try:
# Fetch metadata from the broker
metadata = ADMIN_KAFKA_CLIENT.list_topics(timeout=10)
topics = list(metadata.topics.keys())
print("Topics in the cluster:", topics)
return topics
except Exception as e:
print(f"Failed to list topics: {e}")
return []
# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
...@@ -159,8 +176,9 @@ class TelemetryBackendService: ...@@ -159,8 +176,9 @@ class TelemetryBackendService:
str: Metrics fetched from Node Exporter. str: Metrics fetched from Node Exporter.
""" """
KPI = "node_network_receive_packets_total" KPI = "node_network_receive_packets_total"
EXPORTER_ENDPOINT = "http://node-exporter-7465c69b87-b6ks5.telebackend:9100/metrics"
try: try:
response = requests.get(self.exporter_endpoint) # type: ignore response = requests.get(EXPORTER_ENDPOINT) # type: ignore
if response.status_code == 200: if response.status_code == 200:
# print(f"Metrics fetched sucessfully...") # print(f"Metrics fetched sucessfully...")
metrics = response.text metrics = response.text
...@@ -202,7 +220,7 @@ class TelemetryBackendService: ...@@ -202,7 +220,7 @@ class TelemetryBackendService:
Method to produce metrics to Kafka topic as per Kafka configs. Method to produce metrics to Kafka topic as per Kafka configs.
""" """
conf = { conf = {
'bootstrap.servers': self.bootstrap_servers, 'bootstrap.servers': KAFKA_SERVER_IP,
} }
admin_client = AdminClient(conf) admin_client = AdminClient(conf)
...@@ -216,7 +234,7 @@ class TelemetryBackendService: ...@@ -216,7 +234,7 @@ class TelemetryBackendService:
metrics = self.fetch_node_exporter_metrics() # select the function name based on the provided requirements metrics = self.fetch_node_exporter_metrics() # select the function name based on the provided requirements
if metrics: if metrics:
kafka_producer.produce(self.kafka_topic, str(metrics), callback=self.delivery_callback) kafka_producer.produce("topic_raw", str(metrics), callback=self.delivery_callback)
kafka_producer.flush() kafka_producer.flush()
# print("Metrics produced to Kafka topic") # print("Metrics produced to Kafka topic")
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
import ast import ast
import threading import threading
import time import time
from typing import Tuple from typing import Tuple, Any
import grpc import grpc
import logging import logging
...@@ -30,9 +30,10 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m ...@@ -30,9 +30,10 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m
from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Monitoring', 'TelemetryFrontend') METRICS_POOL = MetricsPool('Monitoring', 'TelemetryFrontend')
KAFKA_SERVER_IP = '127.0.0.1:9092' KAFKA_SERVER_IP = '127.0.0.1:9092'
ACTIVE_COLLECTORS = []
class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
def __init__(self, name_mapping : NameMapping): def __init__(self, name_mapping : NameMapping):
...@@ -50,7 +51,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): ...@@ -50,7 +51,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
_collector_interval = int(request.interval_s) _collector_interval = int(request.interval_s)
self.generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) self.generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval)
# self.run_generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) # self.run_generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval)
response.collector_id.uuid = request.collector_id.collector_id.uuid response.collector_id.uuid = request.collector_id.collector_id.uuid # type: ignore
return response return response
def run_generate_kafka_request(self, msg_key: str, kpi: str, duration : int, interval: int): def run_generate_kafka_request(self, msg_key: str, kpi: str, duration : int, interval: int):
...@@ -74,6 +75,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): ...@@ -74,6 +75,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
", \nKPI: ", kpi, ", Duration: ", duration, ", Interval: ", interval) ", \nKPI: ", kpi, ", Duration: ", duration, ", Interval: ", interval)
producerObj = KafkaProducer(producer_configs) producerObj = KafkaProducer(producer_configs)
producerObj.produce(topic_request, key=msg_key, value= str(msg_value), callback=self.delivery_callback) producerObj.produce(topic_request, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
ACTIVE_COLLECTORS.append(msg_key)
producerObj.flush() producerObj.flush()
return producerObj return producerObj
...@@ -108,11 +110,19 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): ...@@ -108,11 +110,19 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
else: else:
print("Consumer error: {}".format(receive_msg.error())) print("Consumer error: {}".format(receive_msg.error()))
break break
(kpi_id, kpi_value) = ast.literal_eval(receive_msg.value().decode('utf-8')) try:
self.process_response(kpi_id, kpi_value) collector_id = receive_msg.key().decode('utf-8')
# threading.Thread(target=self.process_response, args=(kpi_id, kpi_value)).start() if collector_id in ACTIVE_COLLECTORS:
(kpi_id, kpi_value) = ast.literal_eval(receive_msg.value().decode('utf-8'))
self.process_response(kpi_id, kpi_value)
else:
print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ")
except Exception as e:
print(f"No message key found: {str(e)}")
continue
# return None
def process_response(self, kpi_id: str, kpi_value: any): def process_response(self, kpi_id: str, kpi_value: Any):
print ("Frontend - KPI: ", kpi_id, ", VALUE: ", kpi_value) print ("Frontend - KPI: ", kpi_id, ", VALUE: ", kpi_value)
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
......
...@@ -22,7 +22,7 @@ def create_collector_id(): ...@@ -22,7 +22,7 @@ def create_collector_id():
_collector_id.collector_id.uuid = uuid.uuid4() _collector_id.collector_id.uuid = uuid.uuid4()
return _collector_id return _collector_id
def create_collector_id(coll_id_str : str): def create_collector_id_a(coll_id_str : str):
_collector_id = telemetry_frontend_pb2.CollectorId() _collector_id = telemetry_frontend_pb2.CollectorId()
_collector_id.collector_id.uuid = str(coll_id_str) _collector_id.collector_id.uuid = str(coll_id_str)
return _collector_id return _collector_id
...@@ -32,7 +32,7 @@ def create_collector_request(): ...@@ -32,7 +32,7 @@ def create_collector_request():
_create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4())
_create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_collector_request.duration_s = float(random.randint(8, 16)) _create_collector_request.duration_s = float(random.randint(8, 16))
_create_collector_request.interval_s = float(random.randint(2, 3)) _create_collector_request.interval_s = float(random.randint(2, 4))
return _create_collector_request return _create_collector_request
def create_collector_request_a(): def create_collector_request_a():
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
import os import os
import time
import pytest import pytest
import logging import logging
from typing import Union from typing import Union
...@@ -50,7 +51,7 @@ from device.service.drivers import DRIVERS ...@@ -50,7 +51,7 @@ from device.service.drivers import DRIVERS
LOCAL_HOST = '127.0.0.1' LOCAL_HOST = '127.0.0.1'
MOCKSERVICE_PORT = 10000 MOCKSERVICE_PORT = 10000
TELEMETRY_FRONTEND_PORT = MOCKSERVICE_PORT + get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND) TELEMETRY_FRONTEND_PORT = str(MOCKSERVICE_PORT) + str(get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND))
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(TELEMETRY_FRONTEND_PORT) os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(TELEMETRY_FRONTEND_PORT)
...@@ -172,6 +173,18 @@ def test_start_collector(telemetryFrontend_client): ...@@ -172,6 +173,18 @@ def test_start_collector(telemetryFrontend_client):
LOGGER.debug(str(response)) LOGGER.debug(str(response))
assert isinstance(response, CollectorId) assert isinstance(response, CollectorId)
def test_start_collector_a(telemetryFrontend_client):
LOGGER.warning('test_start_collector requesting')
response = telemetryFrontend_client.StartCollector(create_collector_request())
LOGGER.debug(str(response))
assert isinstance(response, CollectorId)
def test_start_collector_b(telemetryFrontend_client):
LOGGER.warning('test_start_collector requesting')
response = telemetryFrontend_client.StartCollector(create_collector_request())
LOGGER.debug(str(response))
assert isinstance(response, CollectorId)
def test_run_kafka_listener(): def test_run_kafka_listener():
LOGGER.warning('test_receive_kafka_request requesting') LOGGER.warning('test_receive_kafka_request requesting')
name_mapping = NameMapping() name_mapping = NameMapping()
...@@ -180,11 +193,11 @@ def test_run_kafka_listener(): ...@@ -180,11 +193,11 @@ def test_run_kafka_listener():
LOGGER.debug(str(response)) LOGGER.debug(str(response))
assert isinstance(response, bool) assert isinstance(response, bool)
def test_stop_collector(telemetryFrontend_client): # def test_stop_collector(telemetryFrontend_client):
LOGGER.warning('test_stop_collector requesting') # LOGGER.warning('test_stop_collector requesting')
response = telemetryFrontend_client.StopCollector(create_collector_id("1")) # response = telemetryFrontend_client.StopCollector(create_collector_id("1"))
LOGGER.debug(str(response)) # LOGGER.debug(str(response))
assert isinstance(response, Empty) # assert isinstance(response, Empty)
# def test_select_collectors(telemetryFrontend_client): # def test_select_collectors(telemetryFrontend_client):
# LOGGER.warning('test_select_collector requesting') # LOGGER.warning('test_select_collector requesting')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment