Commit 13e67200 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Telemetry backend component:

- Renamed Driver to Collector
parent 6f8ca79b
Loading
Loading
Loading
Loading
+15 −15
Original line number Diff line number Diff line
@@ -15,7 +15,7 @@
import threading
from typing import Any, Iterator, List, Optional, Tuple, Union

# Special resource names to request to the driver to retrieve the specified
# Special resource names to request to the collector to retrieve the specified
# configuration/structural resources.
# These resource names should be used with GetConfig() method.
RESOURCE_ENDPOINTS = '__endpoints__'
@@ -27,16 +27,16 @@ RESOURCE_ACL = '__acl__'
RESOURCE_INVENTORY = '__inventory__'


class _Driver:
class _Collector:
    def __init__(self, name : str, address: str, port: int, **settings) -> None:
        """ Initialize Driver.
        """ Initialize Collector.
            Parameters:
                address : str
                    The address of the device
                port : int
                    The port of the device
                **settings
                    Extra settings required by the driver.
                    Extra settings required by the collector.
        """
        self._name = name
        self._address = address
@@ -139,7 +139,7 @@ class _Driver:
            List[Union[bool, Exception]]:
        """ Subscribe to state information of entire device or
        selected resources. Subscriptions are incremental.
            Driver should keep track of requested resources.
            Collector should keep track of requested resources.
            Parameters:
                subscriptions : List[Tuple[str, float, float]]
                    List of tuples, each containing a resource_key pointing the
@@ -162,7 +162,7 @@ class _Driver:
            -> List[Union[bool, Exception]]:
        """ Unsubscribe from state information of entire device
        or selected resources. Subscriptions are incremental.
            Driver should keep track of requested resources.
            Collector should keep track of requested resources.
            Parameters:
                subscriptions : List[str]
                    List of tuples, each containing a resource_key pointing the
@@ -188,37 +188,37 @@ class _Driver:
        Operates as a generator, so this method should be called once and will
        block until values are available. When values are available,
        it should yield each of them and block again until new values are
        available. When the driver is destroyed, GetState() can return instead
        available. When the collector is destroyed, GetState() can return instead
        of yield to terminate the loop.
        Terminate enables to request interruption of the generation.
            Examples:
                # keep looping waiting for extra samples (generator loop)
                terminate = threading.Event()
                i = 0
                for timestamp,resource_key,resource_value in my_driver.GetState(blocking=True, terminate=terminate):
                for timestamp,resource_key,resource_value in my_collector.GetState(blocking=True, terminate=terminate):
                    process(timestamp, resource_key, resource_value)
                    i += 1
                    if i == 10: terminate.set()

                # just retrieve accumulated samples
                samples = my_driver.GetState(blocking=False, terminate=terminate)
                samples = my_collector.GetState(blocking=False, terminate=terminate)
                # or (as classical loop)
                i = 0
                for timestamp,resource_key,resource_value in my_driver.GetState(blocking=False, terminate=terminate):
                for timestamp,resource_key,resource_value in my_collector.GetState(blocking=False, terminate=terminate):
                    process(timestamp, resource_key, resource_value)
                    i += 1
                    if i == 10: terminate.set()
            Parameters:
                blocking : bool
                    Select the driver behaviour. In both cases, the driver will
                    Select the collector behaviour. In both cases, the collector will
                    first retrieve the samples accumulated and available in the
                    internal queue. Then, if blocking, the driver does not
                    internal queue. Then, if blocking, the collector does not
                    terminate the loop and waits for additional samples to come,
                    thus behaving as a generator. If non-blocking, the driver
                    thus behaving as a generator. If non-blocking, the collector
                    terminates the loop and returns. Non-blocking behaviour can
                    be used for periodically polling the driver, while blocking
                    be used for periodically polling the collector, while blocking
                    can be used when a separate thread is in charge of
                    collecting the samples produced by the driver.
                    collecting the samples produced by the collector.
                terminate : threading.Event
                    Signals the interruption of the GetState method as soon as
                    possible.
+10 −10
Original line number Diff line number Diff line
@@ -17,8 +17,6 @@ import queue
import logging
import uuid
import json
from telemetry.backend.drivers.emulated.EmulatedHelper import EmulatedDriverHelper
from telemetry.backend.driver_api._Driver import _Driver
from anytree import Node, Resolver
from apscheduler.events import EVENT_JOB_ADDED, EVENT_JOB_REMOVED
from apscheduler.schedulers.background import BackgroundScheduler
@@ -26,18 +24,20 @@ from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime, timedelta
from typing import Any, Iterator, List, Tuple, Union, Optional
from telemetry.backend.collector_api._Collector import _Collector
from .EmulatedHelper import EmulatedCollectorHelper
from .SyntheticMetricsGenerator import SyntheticMetricsGenerator


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class EmulatedDriver(_Driver):
class EmulatedCollector(_Collector):
    """
    EmulatedDriver is a class that simulates a network driver for testing purposes.
    EmulatedCollector is a class that simulates a network collector for testing purposes.
    It provides functionalities to manage configurations, state subscriptions, and synthetic data generation.
    """
    def __init__(self, address: str, port: int, **settings):
        super().__init__('emulated_driver', address, port, **settings)
        super().__init__('emulated_collector', address, port, **settings)
        self._initial_config = Node('root')                 # Tree structure for initial config
        self._running_config = Node('root')                 # Tree structure for running config
        self._subscriptions  = Node('subscriptions')        # Tree for state subscriptions
@@ -52,11 +52,11 @@ class EmulatedDriver(_Driver):
        )
        self._scheduler.add_listener(self._listener_job_added_to_subscription_tree,     EVENT_JOB_ADDED)
        self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED)
        self._helper_methods = EmulatedDriverHelper()
        self._helper_methods = EmulatedCollectorHelper()

        self.logger    = logging.getLogger(__name__)
        self.connected = False          # To track connection state
        self.logger.info("EmulatedDriver initialized")
        self.logger.info("EmulatedCollector initialized")

    def Connect(self) -> bool:
        self.logger.info(f"Connecting to {self.address}:{self.port}")
@@ -68,7 +68,7 @@ class EmulatedDriver(_Driver):
    def Disconnect(self) -> bool:
        self.logger.info(f"Disconnecting from {self.address}:{self.port}")
        if not self.connected:
            self.logger.warning("Driver is not connected. Nothing to disconnect.")
            self.logger.warning("Collector is not connected. Nothing to disconnect.")
            return False
        self._scheduler.shutdown()
        self.connected = False
@@ -77,7 +77,7 @@ class EmulatedDriver(_Driver):

    def _require_connection(self):
        if not self.connected:
            raise RuntimeError("Driver is not connected. Please connect before performing operations.")
            raise RuntimeError("Collector is not connected. Please connect before performing operations.")

    def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        self._require_connection()
@@ -219,7 +219,7 @@ class EmulatedDriver(_Driver):
# ------- The below methods are kept for debugging purposes (test-case) only ---------
#-------------------------------------------------------------------------------------

#  This method can be commented but this will arise an error in the test-case (@pytest.fixture --> connected_configured_driver()).
#  This method can be commented but this will arise an error in the test-case (@pytest.fixture --> connected_configured_collector()).
    def SetConfig(self, resources: dict) -> List[Union[bool, Exception]]:  # For debugging purposes.
        self._require_connection()
        results = []
+3 −3
Original line number Diff line number Diff line
@@ -17,9 +17,9 @@ import json
from typing import Any, List


class EmulatedDriverHelper:
class EmulatedCollectorHelper:
    """
    Helper class for the emulated driver.
    Helper class for the emulated collector.
    """
    def __init__(self):
        pass
@@ -48,7 +48,7 @@ class EmulatedDriverHelper:
# ------- Below function is kept for debugging purposes (test-cases) only -------------
#--------------------------------------------------------------------------------------

#  This below methods can be commented but are called by the SetConfig method in EmulatedDriver.py
#  This below methods can be commented but are called by the SetConfig method in EmulatedCollector.py

    def _find_or_create_node(self, name: str, parent: Node) -> Node:
        """
Loading