Skip to content
Snippets Groups Projects
Commit 1be101f8 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Updated Telemetry Backend.

- Refactor Telemetry backend service methods
- Enhance test logging for better clarity.
parent a833bdbd
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!320Resolve "(CTTC) Telemetry Enhancement"
...@@ -12,7 +12,6 @@ ...@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import queue
import json import json
import time import time
import logging import logging
...@@ -27,6 +26,7 @@ from common.Settings import get_service_port_grpc ...@@ -27,6 +26,7 @@ from common.Settings import get_service_port_grpc
from common.method_wrappers.Decorator import MetricsPool from common.method_wrappers.Decorator import MetricsPool
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.tools.service.GenericGrpcService import GenericGrpcService from common.tools.service.GenericGrpcService import GenericGrpcService
from telemetry.backend.collectors.emulated.EmulatedCollector import EmulatedCollector
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService')
...@@ -44,9 +44,7 @@ class TelemetryBackendService(GenericGrpcService): ...@@ -44,9 +44,7 @@ class TelemetryBackendService(GenericGrpcService):
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'backend', 'group.id' : 'backend',
'auto.offset.reset' : 'latest'}) 'auto.offset.reset' : 'latest'})
self.running_threads = {} self.active_jobs = {}
self.emulatorCollector = None
self.metric_queue = queue.Queue()
def install_servicers(self): def install_servicers(self):
threading.Thread(target=self.RequestListener).start() threading.Thread(target=self.RequestListener).start()
...@@ -60,49 +58,88 @@ class TelemetryBackendService(GenericGrpcService): ...@@ -60,49 +58,88 @@ class TelemetryBackendService(GenericGrpcService):
consumer = self.kafka_consumer consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.TELEMETRY_REQUEST.value]) consumer.subscribe([KafkaTopic.TELEMETRY_REQUEST.value])
while True: while True:
receive_msg = consumer.poll(2.0) receive_msg = consumer.poll(1.0)
if receive_msg is None: if receive_msg is None:
continue continue
elif receive_msg.error(): elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF: if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue continue
elif receive_msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist. May be topic does not have any messages.")
continue
else: else:
LOGGER.error("Consumer error: {}".format(receive_msg.error())) LOGGER.error("Consumer error: {}".format(receive_msg.error()))
break break
try: try:
collector = json.loads(receive_msg.value().decode('utf-8')) collector = json.loads(
receive_msg.value().decode('utf-8')
)
collector_id = receive_msg.key().decode('utf-8') collector_id = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))
if collector['duration'] == -1 and collector['interval'] == -1: duration = collector.get('duration', -1)
self.TerminateCollectorBackend(collector_id) if duration == -1 and collector['interval'] == -1:
self.TerminateCollector(collector_id)
else: else:
threading.Thread(target=self.InitiateCollectorBackend, LOGGER.info("Collector ID: {:} - Scheduling...".format(collector_id))
args=(collector_id, collector)).start() if collector_id not in self.active_jobs:
stop_event = threading.Event()
self.active_jobs[collector_id] = stop_event
threading.Thread(target = self.CollectorHandler,
args=(
collector_id,
collector['kpi_id'],
duration,
collector['interval'],
stop_event
)).start()
# Stop the Collector after the given duration
if duration > 0:
def stop_after_duration():
time.sleep(duration)
LOGGER.warning(f"Execution duration ({duration}) completed of Collector: {collector_id}")
self.TerminateCollector(collector_id)
duration_thread = threading.Thread(
target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}"
)
duration_thread.start()
else:
LOGGER.warning("Collector ID: {:} - Already scheduled or running".format(collector_id))
except Exception as e: except Exception as e:
LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e)) LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e))
def InitiateCollectorBackend(self, collector_id, collector): def CollectorHandler(self, collector_id, kpi_id, duration, interval, stop_event):
""" """
Method receives collector request and initiates collecter backend. Method to handle collector request.
""" """
LOGGER.info("Initiating backend for collector: (Not Implemented... In progress ) {:s}".format(str(collector_id))) end_points : list = self.get_endpoints_from_kpi_id(kpi_id)
# start_time = time.time() if not end_points:
# self.emulatorCollector = NetworkMetricsEmulator( LOGGER.warning("KPI ID: {:} - Endpoints not found. Skipping...".format(kpi_id))
# duration = collector['duration'],
# interval = collector['interval'], device_type : str = self.get_device_type_from_kpi_id(kpi_id)
# metric_queue = self.metric_queue
# ) if device_type == "Unknown":
# self.emulatorCollector.start() LOGGER.warning("KPI ID: {:} - Device Type not found. Skipping...".format(kpi_id))
# self.running_threads[collector_id] = self.emulatorCollector
if device_type == "EMU-Device":
# while self.emulatorCollector.is_alive(): LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points))
# if not self.metric_queue.empty(): subscription = [collector_id, end_points, duration, interval]
# metric_value = self.metric_queue.get() self.EmulatedCollectorHandler(subscription, kpi_id, stop_event)
# LOGGER.debug("Metric: {:} - Value : {:}".format(collector['kpi_id'], metric_value)) else:
# self.GenerateKpiValue(collector_id, collector['kpi_id'] , metric_value) LOGGER.warning("KPI ID: {:} - Device Type: {:} - Not Supported".format(kpi_id, device_type))
# time.sleep(1)
# self.TerminateCollectorBackend(collector_id)
def EmulatedCollectorHandler(self, subscription, kpi_id, stop_event):
# EmulatedCollector
collector = EmulatedCollector(address="127.0.0.1", port=8000)
collector.Connect()
while not stop_event.is_set():
# samples = collector.SubscribeState(subscription)
# LOGGER.debug("KPI: {:} - Value: {:}".format(kpi_id, samples))
# self.GenerateKpiValue(job_id, kpi_id, samples)
LOGGER.info("Generating KPI Values ...")
time.sleep(1)
def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
""" """
...@@ -122,38 +159,74 @@ class TelemetryBackendService(GenericGrpcService): ...@@ -122,38 +159,74 @@ class TelemetryBackendService(GenericGrpcService):
) )
producer.flush() producer.flush()
def TerminateCollectorBackend(self, collector_id): def TerminateCollector(self, job_id):
LOGGER.debug("Terminating collector backend...") LOGGER.debug("Terminating collector backend...")
if collector_id in self.running_threads: try:
thread = self.running_threads[collector_id] if job_id not in self.active_jobs: # not job_ids:
thread.stop() # self.logger.warning(f"Active jobs: {self.active_jobs}")
del self.running_threads[collector_id] self.logger.warning(f"No active jobs found for {job_id}. It might have already terminated.")
LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id)) else:
self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. LOGGER.info(f"Terminating job: {job_id}")
else: stop_event = self.active_jobs.pop(job_id, None)
LOGGER.warning('Backend collector {:} not found'.format(collector_id)) if stop_event:
stop_event.set()
LOGGER.info(f"Job {job_id} terminated.")
else:
LOGGER.warning(f"Job {job_id} not found in active jobs.")
except:
LOGGER.exception("Error terminating job: {:}".format(job_id))
def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): def get_endpoints_from_kpi_id(self, kpi_id: str) -> list:
""" """
Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic Method to get endpoints based on kpi_id.
""" """
producer = self.kafka_producer kpi_endpoints = {
kpi_value : Dict = { '6e22f180-ba28-4641-b190-2287bf448888': {"uuid": "123e4567-e89b-12d3-a456-42661417ed06", "name": "eth0", "type": "ethernet", "sample_types": [101, 102]},
"kpi_id" : kpi_id, '123e4567-e89b-12d3-a456-426614174001': {"uuid": "123e4567-e89b-12d3-a456-42661417ed07", "name": "eth1", "type": "ethernet", "sample_types": []},
"kpi_value" : measured_kpi_value, '123e4567-e89b-12d3-a456-426614174002': {"uuid": "123e4567-e89b-12d3-a456-42661417ed08", "name": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]},
} }
producer.produce( return [kpi_endpoints.get(kpi_id, {})] if kpi_id in kpi_endpoints else []
KafkaTopic.TELEMETRY_RESPONSE.value,
key = collector_id, def get_device_type_from_kpi_id(self, kpi_id: str) -> str:
value = json.dumps(kpi_value), """
callback = self.delivery_callback Method to get device type based on kpi_id.
) """
producer.flush() kpi_device_types = {
"123e4567-e89b-12d3-a456-42661type003" : {'device_type': "PKT-Device"},
"123e4567-e89b-12d3-a456-42661type004" : {'device_type': "OPT-Device"},
"6e22f180-ba28-4641-b190-2287bf448888" : {'device_type': "EMU-Device"},
}
return kpi_device_types.get(kpi_id, {}).get('device_type', "Unknown")
# def TerminateCollectorBackend(self, collector_id):
# LOGGER.debug("Terminating collector backend...")
# if collector_id in self.running_threads:
# thread = self.running_threads[collector_id]
# thread.stop()
# del self.running_threads[collector_id]
# LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id))
# self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend.
# else:
# LOGGER.warning('Backend collector {:} not found'.format(collector_id))
# def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
# """
# Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic
# """
# producer = self.kafka_producer
# kpi_value : Dict = {
# "kpi_id" : kpi_id,
# "kpi_value" : measured_kpi_value,
# }
# producer.produce(
# KafkaTopic.TELEMETRY_RESPONSE.value,
# key = collector_id,
# value = json.dumps(kpi_value),
# callback = self.delivery_callback
# )
# producer.flush()
def delivery_callback(self, err, msg): def delivery_callback(self, err, msg):
if err: if err:
LOGGER.error('Message delivery failed: {:s}'.format(str(err))) LOGGER.error('Message delivery failed: {:s}'.format(str(err)))
# print(f'Message delivery failed: {err}')
# else:
# LOGGER.info('Message delivered to topic {:}'.format(msg.topic()))
# print(f'Message delivered to topic {msg.topic()}')
...@@ -12,12 +12,13 @@ ...@@ -12,12 +12,13 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import pytest
import logging import logging
import time import time
from typing import Dict
from common.tools.kafka.Variables import KafkaTopic
from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService
from .messages import create_collector_request from .messages import create_collector_request
from .Fixtures import context_client, device_client
from .add_devices import load_topology
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -26,28 +27,42 @@ LOGGER = logging.getLogger(__name__) ...@@ -26,28 +27,42 @@ LOGGER = logging.getLogger(__name__)
# Tests Implementation of Telemetry Backend # Tests Implementation of Telemetry Backend
########################### ###########################
@pytest.fixture(autouse=True)
def log_all_methods(request):
'''
This fixture logs messages before and after each test function runs, indicating the start and end of the test.
The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
'''
LOGGER.info(f" >>>>> Starting test: {request.node.name} ")
yield
LOGGER.info(f" <<<<< Finished test: {request.node.name} ")
@pytest.fixture
def telemetryBackend_service():
LOGGER.info('Initializing TelemetryBackendService...')
_service = TelemetryBackendService()
_service.start()
LOGGER.info('Yielding TelemetryBackendService...')
yield _service
LOGGER.info('Terminating TelemetryBackendService...')
_service.stop()
LOGGER.info('Terminated TelemetryBackendService...')
def test_InitiateCollectorBackend(telemetryBackend_service):
LOGGER.info(" Backend Initiated Successfully. Waiting for timer to finish ...")
time.sleep(300)
LOGGER.info(" Backend Timer Finished Successfully. ")
# --- "test_validate_kafka_topics" should be run before the functionality tests --- # --- "test_validate_kafka_topics" should be run before the functionality tests ---
def test_validate_kafka_topics(): # def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") # LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics() # response = KafkaTopic.create_all_topics()
assert isinstance(response, bool) # assert isinstance(response, bool)
# def test_RunRequestListener(): # # Call load_topology from the add_devices.py file
# LOGGER.info('test_RunRequestListener') # def test_load_topology(context_client, device_client):
# TelemetryBackendServiceObj = TelemetryBackendService() # load_topology(context_client, device_client)
# threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start()
def test_RunInitiateCollectorBackend():
LOGGER.debug(">>> RunInitiateCollectorBackend <<<")
collector_obj = create_collector_request()
collector_id = collector_obj.collector_id.collector_id.uuid
collector_dict : Dict = {
"kpi_id" : collector_obj.kpi_id.kpi_id.uuid,
"duration": collector_obj.duration_s,
"interval": collector_obj.interval_s
}
TeleObj = TelemetryBackendService()
TeleObj.InitiateCollectorBackend(collector_id, collector_dict)
time.sleep(20)
LOGGER.debug("--- Execution Finished Sucessfully---")
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
# limitations under the License. # limitations under the License.
import json import json
import threading
from typing import Any, Dict from typing import Any, Dict
import grpc import grpc
import logging import logging
...@@ -29,7 +28,6 @@ from telemetry.database.Telemetry_DB import TelemetryDB ...@@ -29,7 +28,6 @@ from telemetry.database.Telemetry_DB import TelemetryDB
from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import KafkaError
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -49,7 +47,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): ...@@ -49,7 +47,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StartCollector(self, def StartCollector(self,
request : Collector, grpc_context: grpc.ServicerContext # type: ignore request : Collector, context: grpc.ServicerContext # type: ignore
) -> CollectorId: # type: ignore ) -> CollectorId: # type: ignore
LOGGER.info ("gRPC message: {:}".format(request)) LOGGER.info ("gRPC message: {:}".format(request))
response = CollectorId() response = CollectorId()
...@@ -86,7 +84,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): ...@@ -86,7 +84,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StopCollector(self, def StopCollector(self,
request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore request : CollectorId, context: grpc.ServicerContext # type: ignore
) -> Empty: # type: ignore ) -> Empty: # type: ignore
LOGGER.info ("gRPC message: {:}".format(request)) LOGGER.info ("gRPC message: {:}".format(request))
try: try:
...@@ -125,7 +123,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): ...@@ -125,7 +123,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectCollectors(self, def SelectCollectors(self,
request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore request : CollectorFilter, context: grpc.ServicerContext # type: ignore
) -> CollectorList: # type: ignore ) -> CollectorList: # type: ignore
LOGGER.info("gRPC message: {:}".format(request)) LOGGER.info("gRPC message: {:}".format(request))
response = CollectorList() response = CollectorList()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment