Commit af6688ba authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Updated Telemetry Backend

- Enhance telemetry tests with logging
- Update collector API methods
- Updated Telemetry backend collector management
parent 1be101f8
Loading
Loading
Loading
Loading
+18 −24
Original line number Diff line number Diff line
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
import queue
from typing import Any, Iterator, List, Optional, Tuple, Union

# Special resource names to request to the collector to retrieve the specified
@@ -135,31 +135,25 @@ class _Collector:
        """
        raise NotImplementedError()

    def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> \
    def SubscribeState(self, subscriptions: List[Tuple[str, dict, float, float]]) -> \
                bool:
        """ Subscribe to state information of the entire device or selected resources. 
            Subscriptions are incremental, and the collector should keep track of requested resources.
                    List of tuples, each containing:
                        - resource_id (str): Identifier pointing to the resource to be subscribed.
                        - resource_dict (dict): Dictionary containing resource name, KPI to be subscribed, and type.
                        - sampling_duration (float): Duration (in seconds) for how long monitoring should last.
                        - sampling_interval (float): Desired monitoring interval (in seconds) for the specified resource.
                    List of results for the requested resource key subscriptions. 
                    The return values are in the same order as the requested resource keys.
                    - True if a resource is successfully subscribed.
                    - Exception if an error occurs during the subscription process.
            List[Union[bool, Exception]]:
        """ Subscribe to state information of entire device or
        selected resources. Subscriptions are incremental.
            Collector should keep track of requested resources.
            Parameters:
                subscriptions : List[Tuple[str, float, float]]
                    List of tuples, each containing a resource_key pointing the
                    resource to be subscribed, a sampling_duration, and a
                    sampling_interval (both in seconds with float
                    representation) defining, respectively, for how long
                    monitoring should last, and the desired monitoring interval
                    for the resource specified.
            Returns:
                results : List[Union[bool, Exception]]
                    List of results for resource key subscriptions requested.
                    Return values must be in the same order as the resource keys
                    requested. If a resource is properly subscribed,
                    True must be retrieved; otherwise, the Exception that is
                    raised during the processing must be retrieved.
        """ 
        raise NotImplementedError()

    def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) \
            -> List[Union[bool, Exception]]:
    def UnsubscribeState(self, resource_key: str) \
            -> bool:
        """ Unsubscribe from state information of entire device
        or selected resources. Subscriptions are incremental.
            Collector should keep track of requested resources.
@@ -182,7 +176,7 @@ class _Collector:
        raise NotImplementedError()

    def GetState(
        self, blocking=False, terminate : Optional[threading.Event] = None
        self, duration : int, blocking=False, terminate: Optional[queue.Queue] = None
    ) -> Iterator[Tuple[float, str, Any]]:
        """ Retrieve last collected values for subscribed resources.
        Operates as a generator, so this method should be called once and will
+37 −341

File changed.

Preview size limit exceeded, changes collapsed.

+33 −51
Original line number Diff line number Diff line
@@ -44,6 +44,7 @@ class TelemetryBackendService(GenericGrpcService):
        self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
                                            'group.id'           : 'backend',
                                            'auto.offset.reset'  : 'latest'})
        self.collector = EmulatedCollector(address="127.0.0.1", port=8000)
        self.active_jobs = {}

    def install_servicers(self):
@@ -65,7 +66,7 @@ class TelemetryBackendService(GenericGrpcService):
                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                elif receive_msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
                    LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist. May be topic does not have any messages.")
                    LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist or topic does not have any messages.")
                    continue
                else:
                    LOGGER.error("Consumer error: {}".format(receive_msg.error()))
@@ -77,11 +78,11 @@ class TelemetryBackendService(GenericGrpcService):
                collector_id = receive_msg.key().decode('utf-8')
                LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))

                duration = collector.get('duration', -1)
                duration = collector.get('duration', 0)
                if duration == -1 and collector['interval'] == -1:
                    self.TerminateCollector(collector_id)
                else:
                    LOGGER.info("Collector ID: {:} - Scheduling...".format(collector_id))
                    LOGGER.info("Received Collector ID: {:} - Scheduling...".format(collector_id))
                    if collector_id not in self.active_jobs:
                        stop_event = threading.Event()
                        self.active_jobs[collector_id] = stop_event
@@ -95,13 +96,15 @@ class TelemetryBackendService(GenericGrpcService):
                                    )).start()
                        # Stop the Collector after the given duration
                        if duration > 0:
                            def stop_after_duration():
                                time.sleep(duration)
                                LOGGER.warning(f"Execution duration ({duration}) completed of Collector: {collector_id}")
                            def stop_after_duration(completion_time, stop_event):
                                time.sleep(completion_time)
                                if not stop_event.is_set():
                                    LOGGER.warning(f"Execution duration ({completion_time}) completed of Collector: {collector_id}")
                                    self.TerminateCollector(collector_id)
                                
                            duration_thread = threading.Thread(
                                target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}"
                                target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}",
                                args=(duration, stop_event) 
                                )
                            duration_thread.start()
                    else:
@@ -113,7 +116,7 @@ class TelemetryBackendService(GenericGrpcService):
        """
        Method to handle collector request.
        """
        end_points : list = self.get_endpoints_from_kpi_id(kpi_id)
        end_points : dict = self.get_endpoints_from_kpi_id(kpi_id)
        if not end_points:
            LOGGER.warning("KPI ID: {:} - Endpoints not found. Skipping...".format(kpi_id))
        
@@ -125,21 +128,24 @@ class TelemetryBackendService(GenericGrpcService):
        if device_type == "EMU-Device":
            LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points))
            subscription = [collector_id, end_points, duration, interval]
            self.EmulatedCollectorHandler(subscription, kpi_id, stop_event)
            self.EmulatedCollectorHandler(subscription, duration, collector_id, kpi_id, stop_event)
        else:
            LOGGER.warning("KPI ID: {:} - Device Type: {:} - Not Supported".format(kpi_id, device_type))


    def EmulatedCollectorHandler(self, subscription, kpi_id, stop_event):
    def EmulatedCollectorHandler(self, subscription, duration, collector_id, kpi_id, stop_event):
            # EmulatedCollector
            collector = EmulatedCollector(address="127.0.0.1", port=8000)
            collector.Connect()
            
            self.collector.Connect()
            if not self.collector.SubscribeState(subscription):
                LOGGER.warning("KPI ID: {:} - Subscription failed. Skipping...".format(kpi_id))
            else:
                while not stop_event.is_set():
                # samples = collector.SubscribeState(subscription)
                # LOGGER.debug("KPI: {:} - Value: {:}".format(kpi_id, samples))
                # self.GenerateKpiValue(job_id, kpi_id, samples)
                LOGGER.info("Generating KPI Values ...")
                    samples = list(self.collector.GetState(duration=duration, blocking=True))
                    LOGGER.info("KPI: {:} - Value: {:}".format(kpi_id, samples))
                    self.GenerateKpiValue(collector_id, kpi_id, samples)
                    time.sleep(1)
            self.collector.Disconnect()
            # self.TerminateCollector(collector_id)       # No need to terminate, automatically terminated after duration.

    def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
        """
@@ -171,12 +177,17 @@ class TelemetryBackendService(GenericGrpcService):
                if stop_event:
                    stop_event.set()
                    LOGGER.info(f"Job {job_id} terminated.")
                    if self.collector.UnsubscribeState(job_id):
                        LOGGER.info(f"Unsubscribed from collector: {job_id}")
                    else:
                        LOGGER.warning(f"Failed to unsubscribe from collector: {job_id}")
                else:
                    LOGGER.warning(f"Job {job_id} not found in active jobs.")
        except:
            LOGGER.exception("Error terminating job: {:}".format(job_id))

    def get_endpoints_from_kpi_id(self, kpi_id: str) -> list:
# --- Mock Methods ---
    def get_endpoints_from_kpi_id(self, kpi_id: str) -> dict:
        """
        Method to get endpoints based on kpi_id.
        """
@@ -185,7 +196,7 @@ class TelemetryBackendService(GenericGrpcService):
            '123e4567-e89b-12d3-a456-426614174001': {"uuid": "123e4567-e89b-12d3-a456-42661417ed07", "name": "eth1",   "type": "ethernet", "sample_types": []},
            '123e4567-e89b-12d3-a456-426614174002': {"uuid": "123e4567-e89b-12d3-a456-42661417ed08", "name": "13/1/2", "type": "copper",   "sample_types": [101, 102, 201, 202]},
        }
        return [kpi_endpoints.get(kpi_id, {})] if kpi_id in kpi_endpoints else []
        return kpi_endpoints.get(kpi_id, {}) if kpi_id in kpi_endpoints else {}

    def get_device_type_from_kpi_id(self, kpi_id: str) -> str:
        """
@@ -198,35 +209,6 @@ class TelemetryBackendService(GenericGrpcService):
        }
        return kpi_device_types.get(kpi_id, {}).get('device_type', "Unknown")


    # def TerminateCollectorBackend(self, collector_id):
    #     LOGGER.debug("Terminating collector backend...")
    #     if collector_id in self.running_threads:
    #         thread = self.running_threads[collector_id]
    #         thread.stop()
    #         del self.running_threads[collector_id]
    #         LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id))
    #         self.GenerateCollectorTerminationSignal(collector_id, "-1", -1)          # Termination confirmation to frontend.
    #     else:
    #         LOGGER.warning('Backend collector {:} not found'.format(collector_id))

    # def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
    #     """
    #     Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic
    #     """
    #     producer = self.kafka_producer
    #     kpi_value : Dict = {
    #         "kpi_id"    : kpi_id,
    #         "kpi_value" : measured_kpi_value,
    #     }
    #     producer.produce(
    #         KafkaTopic.TELEMETRY_RESPONSE.value,
    #         key      = collector_id,
    #         value    = json.dumps(kpi_value),
    #         callback = self.delivery_callback
    #     )
    #     producer.flush()

    def delivery_callback(self, err, msg):
        if err: 
            LOGGER.error('Message delivery failed: {:s}'.format(str(err)))
+22 −15
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@
import os
import pytest
import logging
import time

from common.Constants import ServiceNameEnum
from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList
@@ -42,6 +43,16 @@ os.environ[get_env_var_name(ServiceNameEnum.TELEMETRY, ENVVAR_SUFIX_SERVICE_PORT

LOGGER = logging.getLogger(__name__)

@pytest.fixture(autouse=True)
def log_all_methods(request):
    '''
    This fixture logs messages before and after each test function runs, indicating the start and end of the test.
    The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
    '''
    LOGGER.info(f" >>>>> Starting test: {request.node.name} ")
    yield
    LOGGER.info(f" <<<<< Finished test: {request.node.name} ")

@pytest.fixture(scope='session')
def telemetryFrontend_service():
    LOGGER.info('Initializing TelemetryFrontendService...')
@@ -82,33 +93,29 @@ def telemetryFrontend_client(
# ------- Re-structuring Test ---------
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
def test_validate_kafka_topics():
    LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
    # LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
    response = KafkaTopic.create_all_topics()
    assert isinstance(response, bool)

# ----- core funtionality test -----
def test_StartCollector(telemetryFrontend_client):
    LOGGER.info(' >>> test_StartCollector START: <<< ')
    # LOGGER.info(' >>> test_StartCollector START: <<< ')
    response = telemetryFrontend_client.StartCollector(create_collector_request())
    LOGGER.debug(str(response))
    assert isinstance(response, CollectorId)


def test_StopCollector(telemetryFrontend_client):
    LOGGER.info(' >>> test_StopCollector START: <<< ')
    # LOGGER.info(' >>> test_StopCollector START: <<< ')
    LOGGER.info("Waiting before termination...")
    time.sleep(30)
    response = telemetryFrontend_client.StopCollector(create_collector_id())
    LOGGER.debug(str(response))
    assert isinstance(response, Empty)

def test_SelectCollectors(telemetryFrontend_client):
    LOGGER.info(' >>> test_SelectCollectors START: <<< ')
    response = telemetryFrontend_client.SelectCollectors(create_collector_filter())
    LOGGER.debug(str(response))
    assert isinstance(response, CollectorList)

# # ----- Non-gRPC method tests ----- 
# def test_RunResponseListener():
#     LOGGER.info(' >>> test_RunResponseListener START: <<< ')
#     TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl()
#     response = TelemetryFrontendServiceObj.RunResponseListener()     # becasue Method "run_kafka_listener" is not define in frontend.proto
# def test_SelectCollectors(telemetryFrontend_client):
#     LOGGER.info(' >>> test_SelectCollectors START: <<< ')
#     response = telemetryFrontend_client.SelectCollectors(create_collector_filter())
#     LOGGER.debug(str(response))
#     assert isinstance(response, bool)
#     assert isinstance(response, CollectorList)