Commit 705e74ec authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Telemetry.

Frontend:
- `ResponseListener` will start with service deployment.

Backend:
- Added `GenerateCollectorTerminationSignal()` to inform the Frontend about the termination of a Collector.
- Updated `GenerateCollectorResponse()` to stream extracted `KpiValues` on the `topic_value` .
- Updated test file.
parent 143f8742
Loading
Loading
Loading
Loading
+27 −100
Original line number Diff line number Diff line
@@ -83,7 +83,7 @@ class TelemetryBackendService(GenericGrpcService):
            thread.join()
            print ("Terminating backend (by StopCollector): Collector Id: ", collector_id)
            del self.running_threads[collector_id]
            self.GenerateCollectorResponse(collector_id, "-1", -1)          # Termination confirmation to frontend.
            self.GenerateCollectorTerminationSignal(collector_id, "-1", -1)          # Termination confirmation to frontend.
        else:
            print ('Backend collector {:} not found'.format(collector_id))

@@ -103,11 +103,28 @@ class TelemetryBackendService(GenericGrpcService):
        while not stop_event.is_set():
            if time.time() - start_time >= collector['duration']:            # condition to terminate backend
                print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time)
                self.GenerateCollectorResponse(collector_id, "-1", -1)       # Termination confirmation to frontend.
                self.GenerateCollectorTerminationSignal(collector_id, "-1", -1)       # Termination confirmation to frontend.
                break
            self.ExtractKpiValue(collector_id, collector['kpi_id'])
            time.sleep(collector['interval'])

    def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
        """
        Method to write kpi Termination signat on RESPONSE Kafka topic
        """
        producer = self.kafka_producer
        kpi_value : Dict = {
            "kpi_id"    : kpi_id,
            "kpi_value" : measured_kpi_value,
        }
        producer.produce(
            KafkaTopic.RESPONSE.value, # TODO: to  the topic ...
            key      = collector_id,
            value    = json.dumps(kpi_value),
            callback = self.delivery_callback
        )
        producer.flush()

    def ExtractKpiValue(self, collector_id: str, kpi_id: str):
        """
        Method to extract kpi value.
@@ -123,117 +140,27 @@ class TelemetryBackendService(GenericGrpcService):
        """
        producer = self.kafka_producer
        kpi_value : Dict = {
            "time_stamp": str(time.time()),
            "kpi_id"    : kpi_id,
            "kpi_value" : measured_kpi_value
        }
        producer.produce(
            KafkaTopic.RESPONSE.value,
            KafkaTopic.VALUE.value, # TODO: to  the topic ...
            key      = collector_id,
            value    = json.dumps(kpi_value),
            callback = self.delivery_callback
        )
        producer.flush()

    def GenerateRawMetric(self, metrics: Any):
        """
        Method writes raw metrics on VALUE Kafka topic
        """
        producer = self.kafka_producer
        some_metric : Dict = {
            "some_id"    : metrics
        }
        producer.produce(
            KafkaTopic.VALUE.value,
            key      = 'raw',
            value    = json.dumps(some_metric),
            callback = self.delivery_callback
        )
        producer.flush()

    def delivery_callback(self, err, msg):
        """
        Callback function to handle message delivery status.
        Args: err (KafkaError): Kafka error object.
              msg (Message): Kafka message object.
        """
        if err: print(f'Message delivery failed: {err}')
        # else:   print(f'Message delivered to topic {msg.topic()}')

# # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
#     @staticmethod
#     def fetch_single_node_exporter_metric():
#         """
#         Method to fetch metrics from Node Exporter.
#         Returns:
#             str: Metrics fetched from Node Exporter.
#         """
#         KPI = "node_network_receive_packets_total"
#         try:
#             response = requests.get(EXPORTER_ENDPOINT) # type: ignore
#             LOGGER.info("Request status {:}".format(response))
#             if response.status_code == 200:
#                 # print(f"Metrics fetched sucessfully...")
#                 metrics = response.text
#                 # Check if the desired metric is available in the response
#                 if KPI in metrics:
#                     KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI)
#                     # Extract the metric value
#                     if KPI_VALUE is not None:
#                         LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE))
#                         print(f"Extracted value of {KPI} is: {KPI_VALUE}")
#                         return KPI_VALUE
#             else:
#                 LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code))
#                 # print(f"Failed to fetch metrics. Status code: {response.status_code}")
#                 return None
#         except Exception as e:
#             LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
#             # print(f"Failed to fetch metrics: {str(e)}")
#             return None

#     @staticmethod
#     def extract_metric_value(metrics, metric_name):
#         """
#         Method to extract the value of a metric from the metrics string.
#         Args:
#             metrics (str): Metrics string fetched from Exporter.
#             metric_name (str): Name of the metric to extract.
#         Returns:
#             float: Value of the extracted metric, or None if not found.
#         """
#         try:
#             # Find the metric line containing the desired metric name
#             metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
#             # Split the line to extract the metric value
#             metric_value = float(metric_line.split()[1])
#             return metric_value
#         except StopIteration:
#             print(f"Metric '{metric_name}' not found in the metrics.")
#             return None

#     @staticmethod
#     def stream_node_export_metrics_to_raw_topic():
#         try:
#             while True:
#                 response = requests.get(EXPORTER_ENDPOINT)
#                 # print("Response Status {:} ".format(response))
#                 # LOGGER.info("Response Status {:} ".format(response))
#                 try: 
#                     if response.status_code == 200:
#                         producerObj = KafkaProducer(PRODUCER_CONFIG)
#                         producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback)
#                         producerObj.flush()
#                         LOGGER.info("Produce to topic")
#                     else:
#                         LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code))
#                         print(f"Didn't received expected response. Status code: {response.status_code}")
#                         return None
#                     time.sleep(15)
#                 except Exception as e:
#                     LOGGER.info("Failed to process response. Status code: {:}".format(e))
#                     return None
#         except Exception as e:
#             LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
#             print(f"Failed to fetch metrics: {str(e)}")
#             return None
# # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------
 No newline at end of file
        if err: 
            LOGGER.debug('Message delivery failed: {:}'.format(err))
            print(f'Message delivery failed: {err}')
        else:
            LOGGER.info('Message delivered to topic {:}'.format(msg.topic()))
            print(f'Message delivered to topic {msg.topic()}')
+6 −6
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@
# limitations under the License.

import logging
import threading
from common.tools.kafka.Variables import KafkaTopic
from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService

@@ -25,14 +26,13 @@ LOGGER = logging.getLogger(__name__)
###########################

# --- "test_validate_kafka_topics" should be run before the functionality tests ---
def test_validate_kafka_topics():
    LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
    response = KafkaTopic.create_all_topics()
    assert isinstance(response, bool)
# def test_validate_kafka_topics():
#     LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
#     response = KafkaTopic.create_all_topics()
#     assert isinstance(response, bool)

def test_RunRequestListener():
    LOGGER.info('test_RunRequestListener')
    TelemetryBackendServiceObj = TelemetryBackendService()
    response = TelemetryBackendServiceObj.RunRequestListener()
    response = threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start()
    LOGGER.debug(str(response))
    assert isinstance(response, bool)
+1 −2
Original line number Diff line number Diff line
@@ -161,9 +161,8 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
    # ---------- Independent Method ---------------
    # Listener method is independent of any method (same lifetime as service)
    # continously listens for responses
    def RunResponseListener(self):
    def install_servicers(self):
        threading.Thread(target=self.ResponseListener).start()
        return True

    def ResponseListener(self):
        """
+1 −1

File changed.

Contains only whitespace changes.