diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index a1f17df3cb65a6bd13ffb8e96a6a07b536200825..0c515768e9f668aff964cf6dc3f25d5ea84baa4a 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import queue import json import time import logging @@ -27,6 +26,7 @@ from common.Settings import get_service_port_grpc from common.method_wrappers.Decorator import MetricsPool from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.service.GenericGrpcService import GenericGrpcService +from telemetry.backend.collectors.emulated.EmulatedCollector import EmulatedCollector LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') @@ -44,9 +44,7 @@ class TelemetryBackendService(GenericGrpcService): self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'backend', 'auto.offset.reset' : 'latest'}) - self.running_threads = {} - self.emulatorCollector = None - self.metric_queue = queue.Queue() + self.active_jobs = {} def install_servicers(self): threading.Thread(target=self.RequestListener).start() @@ -60,49 +58,88 @@ class TelemetryBackendService(GenericGrpcService): consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.TELEMETRY_REQUEST.value]) while True: - receive_msg = consumer.poll(2.0) + receive_msg = consumer.poll(1.0) if receive_msg is None: continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue + elif receive_msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART: + LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist. May be topic does not have any messages.") + continue else: LOGGER.error("Consumer error: {}".format(receive_msg.error())) break try: - collector = json.loads(receive_msg.value().decode('utf-8')) + collector = json.loads( + receive_msg.value().decode('utf-8') + ) collector_id = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) - if collector['duration'] == -1 and collector['interval'] == -1: - self.TerminateCollectorBackend(collector_id) + duration = collector.get('duration', -1) + if duration == -1 and collector['interval'] == -1: + self.TerminateCollector(collector_id) else: - threading.Thread(target=self.InitiateCollectorBackend, - args=(collector_id, collector)).start() + LOGGER.info("Collector ID: {:} - Scheduling...".format(collector_id)) + if collector_id not in self.active_jobs: + stop_event = threading.Event() + self.active_jobs[collector_id] = stop_event + threading.Thread(target = self.CollectorHandler, + args=( + collector_id, + collector['kpi_id'], + duration, + collector['interval'], + stop_event + )).start() + # Stop the Collector after the given duration + if duration > 0: + def stop_after_duration(): + time.sleep(duration) + LOGGER.warning(f"Execution duration ({duration}) completed of Collector: {collector_id}") + self.TerminateCollector(collector_id) + + duration_thread = threading.Thread( + target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}" + ) + duration_thread.start() + else: + LOGGER.warning("Collector ID: {:} - Already scheduled or running".format(collector_id)) except Exception as e: LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e)) - def InitiateCollectorBackend(self, collector_id, collector): + def CollectorHandler(self, collector_id, kpi_id, duration, interval, stop_event): """ - Method receives collector request and initiates collecter backend. + Method to handle collector request. """ - LOGGER.info("Initiating backend for collector: (Not Implemented... In progress ) {:s}".format(str(collector_id))) - # start_time = time.time() - # self.emulatorCollector = NetworkMetricsEmulator( - # duration = collector['duration'], - # interval = collector['interval'], - # metric_queue = self.metric_queue - # ) - # self.emulatorCollector.start() - # self.running_threads[collector_id] = self.emulatorCollector - - # while self.emulatorCollector.is_alive(): - # if not self.metric_queue.empty(): - # metric_value = self.metric_queue.get() - # LOGGER.debug("Metric: {:} - Value : {:}".format(collector['kpi_id'], metric_value)) - # self.GenerateKpiValue(collector_id, collector['kpi_id'] , metric_value) - # time.sleep(1) - # self.TerminateCollectorBackend(collector_id) + end_points : list = self.get_endpoints_from_kpi_id(kpi_id) + if not end_points: + LOGGER.warning("KPI ID: {:} - Endpoints not found. Skipping...".format(kpi_id)) + + device_type : str = self.get_device_type_from_kpi_id(kpi_id) + + if device_type == "Unknown": + LOGGER.warning("KPI ID: {:} - Device Type not found. Skipping...".format(kpi_id)) + + if device_type == "EMU-Device": + LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points)) + subscription = [collector_id, end_points, duration, interval] + self.EmulatedCollectorHandler(subscription, kpi_id, stop_event) + else: + LOGGER.warning("KPI ID: {:} - Device Type: {:} - Not Supported".format(kpi_id, device_type)) + + + def EmulatedCollectorHandler(self, subscription, kpi_id, stop_event): + # EmulatedCollector + collector = EmulatedCollector(address="127.0.0.1", port=8000) + collector.Connect() + while not stop_event.is_set(): + # samples = collector.SubscribeState(subscription) + # LOGGER.debug("KPI: {:} - Value: {:}".format(kpi_id, samples)) + # self.GenerateKpiValue(job_id, kpi_id, samples) + LOGGER.info("Generating KPI Values ...") + time.sleep(1) def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ @@ -122,38 +159,74 @@ class TelemetryBackendService(GenericGrpcService): ) producer.flush() - def TerminateCollectorBackend(self, collector_id): + def TerminateCollector(self, job_id): LOGGER.debug("Terminating collector backend...") - if collector_id in self.running_threads: - thread = self.running_threads[collector_id] - thread.stop() - del self.running_threads[collector_id] - LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id)) - self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. - else: - LOGGER.warning('Backend collector {:} not found'.format(collector_id)) + try: + if job_id not in self.active_jobs: # not job_ids: + # self.logger.warning(f"Active jobs: {self.active_jobs}") + self.logger.warning(f"No active jobs found for {job_id}. It might have already terminated.") + else: + LOGGER.info(f"Terminating job: {job_id}") + stop_event = self.active_jobs.pop(job_id, None) + if stop_event: + stop_event.set() + LOGGER.info(f"Job {job_id} terminated.") + else: + LOGGER.warning(f"Job {job_id} not found in active jobs.") + except: + LOGGER.exception("Error terminating job: {:}".format(job_id)) - def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): + def get_endpoints_from_kpi_id(self, kpi_id: str) -> list: """ - Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic + Method to get endpoints based on kpi_id. """ - producer = self.kafka_producer - kpi_value : Dict = { - "kpi_id" : kpi_id, - "kpi_value" : measured_kpi_value, + kpi_endpoints = { + '6e22f180-ba28-4641-b190-2287bf448888': {"uuid": "123e4567-e89b-12d3-a456-42661417ed06", "name": "eth0", "type": "ethernet", "sample_types": [101, 102]}, + '123e4567-e89b-12d3-a456-426614174001': {"uuid": "123e4567-e89b-12d3-a456-42661417ed07", "name": "eth1", "type": "ethernet", "sample_types": []}, + '123e4567-e89b-12d3-a456-426614174002': {"uuid": "123e4567-e89b-12d3-a456-42661417ed08", "name": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]}, } - producer.produce( - KafkaTopic.TELEMETRY_RESPONSE.value, - key = collector_id, - value = json.dumps(kpi_value), - callback = self.delivery_callback - ) - producer.flush() + return [kpi_endpoints.get(kpi_id, {})] if kpi_id in kpi_endpoints else [] + + def get_device_type_from_kpi_id(self, kpi_id: str) -> str: + """ + Method to get device type based on kpi_id. + """ + kpi_device_types = { + "123e4567-e89b-12d3-a456-42661type003" : {'device_type': "PKT-Device"}, + "123e4567-e89b-12d3-a456-42661type004" : {'device_type': "OPT-Device"}, + "6e22f180-ba28-4641-b190-2287bf448888" : {'device_type': "EMU-Device"}, + } + return kpi_device_types.get(kpi_id, {}).get('device_type', "Unknown") + + + # def TerminateCollectorBackend(self, collector_id): + # LOGGER.debug("Terminating collector backend...") + # if collector_id in self.running_threads: + # thread = self.running_threads[collector_id] + # thread.stop() + # del self.running_threads[collector_id] + # LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id)) + # self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. + # else: + # LOGGER.warning('Backend collector {:} not found'.format(collector_id)) + + # def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): + # """ + # Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic + # """ + # producer = self.kafka_producer + # kpi_value : Dict = { + # "kpi_id" : kpi_id, + # "kpi_value" : measured_kpi_value, + # } + # producer.produce( + # KafkaTopic.TELEMETRY_RESPONSE.value, + # key = collector_id, + # value = json.dumps(kpi_value), + # callback = self.delivery_callback + # ) + # producer.flush() def delivery_callback(self, err, msg): if err: LOGGER.error('Message delivery failed: {:s}'.format(str(err))) - # print(f'Message delivery failed: {err}') - # else: - # LOGGER.info('Message delivered to topic {:}'.format(msg.topic())) - # print(f'Message delivered to topic {msg.topic()}') diff --git a/src/telemetry/backend/tests/test_backend.py b/src/telemetry/backend/tests/test_backend.py index e75b33ca58c6bf27c5d2e1c2012dc31de5274ad3..28b92fb29c60eb099d253d12c36fa59b87b0a701 100644 --- a/src/telemetry/backend/tests/test_backend.py +++ b/src/telemetry/backend/tests/test_backend.py @@ -12,12 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest import logging import time -from typing import Dict -from common.tools.kafka.Variables import KafkaTopic from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService from .messages import create_collector_request +from .Fixtures import context_client, device_client +from .add_devices import load_topology LOGGER = logging.getLogger(__name__) @@ -26,28 +27,42 @@ LOGGER = logging.getLogger(__name__) # Tests Implementation of Telemetry Backend ########################### +@pytest.fixture(autouse=True) +def log_all_methods(request): + ''' + This fixture logs messages before and after each test function runs, indicating the start and end of the test. + The autouse=True parameter ensures that this logging happens automatically for all tests in the module. + ''' + LOGGER.info(f" >>>>> Starting test: {request.node.name} ") + yield + LOGGER.info(f" <<<<< Finished test: {request.node.name} ") + +@pytest.fixture +def telemetryBackend_service(): + LOGGER.info('Initializing TelemetryBackendService...') + + _service = TelemetryBackendService() + _service.start() + + LOGGER.info('Yielding TelemetryBackendService...') + yield _service + + LOGGER.info('Terminating TelemetryBackendService...') + _service.stop() + LOGGER.info('Terminated TelemetryBackendService...') + + +def test_InitiateCollectorBackend(telemetryBackend_service): + LOGGER.info(" Backend Initiated Successfully. Waiting for timer to finish ...") + time.sleep(300) + LOGGER.info(" Backend Timer Finished Successfully. ") + # --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) - -# def test_RunRequestListener(): -# LOGGER.info('test_RunRequestListener') -# TelemetryBackendServiceObj = TelemetryBackendService() -# threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() - -def test_RunInitiateCollectorBackend(): - LOGGER.debug(">>> RunInitiateCollectorBackend <<<") - collector_obj = create_collector_request() - collector_id = collector_obj.collector_id.collector_id.uuid - collector_dict : Dict = { - "kpi_id" : collector_obj.kpi_id.kpi_id.uuid, - "duration": collector_obj.duration_s, - "interval": collector_obj.interval_s - } - TeleObj = TelemetryBackendService() - TeleObj.InitiateCollectorBackend(collector_id, collector_dict) - time.sleep(20) - - LOGGER.debug("--- Execution Finished Sucessfully---") +# def test_validate_kafka_topics(): +# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() +# assert isinstance(response, bool) + +# # Call load_topology from the add_devices.py file +# def test_load_topology(context_client, device_client): +# load_topology(context_client, device_client) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index f74e97ffd4998ca0b3255ca4e1ebe496ebc6737b..1ef8ed46b049ec96daffffc46748e51f55144df9 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -13,7 +13,6 @@ # limitations under the License. import json -import threading from typing import Any, Dict import grpc import logging @@ -29,7 +28,6 @@ from telemetry.database.Telemetry_DB import TelemetryDB from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Producer as KafkaProducer -from confluent_kafka import KafkaError LOGGER = logging.getLogger(__name__) @@ -49,7 +47,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartCollector(self, - request : Collector, grpc_context: grpc.ServicerContext # type: ignore + request : Collector, context: grpc.ServicerContext # type: ignore ) -> CollectorId: # type: ignore LOGGER.info ("gRPC message: {:}".format(request)) response = CollectorId() @@ -86,7 +84,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StopCollector(self, - request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore + request : CollectorId, context: grpc.ServicerContext # type: ignore ) -> Empty: # type: ignore LOGGER.info ("gRPC message: {:}".format(request)) try: @@ -125,7 +123,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectCollectors(self, - request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore + request : CollectorFilter, context: grpc.ServicerContext # type: ignore ) -> CollectorList: # type: ignore LOGGER.info("gRPC message: {:}".format(request)) response = CollectorList()