diff --git a/src/telemetry/backend/drivers/core/_Driver.py b/src/telemetry/backend/driver_api/_Driver.py similarity index 100% rename from src/telemetry/backend/drivers/core/_Driver.py rename to src/telemetry/backend/driver_api/_Driver.py diff --git a/src/telemetry/backend/drivers/core/__init__.py b/src/telemetry/backend/driver_api/__init__.py similarity index 100% rename from src/telemetry/backend/drivers/core/__init__.py rename to src/telemetry/backend/driver_api/__init__.py diff --git a/src/telemetry/backend/drivers/emulated/EmulatedDriver.py b/src/telemetry/backend/drivers/emulated/EmulatedDriver.py index 0c5712ffe44202c3e8d53ad42a4b93f5b8f2b5a9..103753b037edcd929e900f3046a9d313b024ce4a 100644 --- a/src/telemetry/backend/drivers/emulated/EmulatedDriver.py +++ b/src/telemetry/backend/drivers/emulated/EmulatedDriver.py @@ -12,7 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from telemetry.backend.drivers.core._Driver import _Driver +import pytz +import queue +import logging +import uuid +import json +from telemetry.backend.drivers.emulated.EmulatedHelper import EmulatedDriverHelper +from telemetry.backend.driver_api._Driver import _Driver from anytree import Node, Resolver from apscheduler.events import EVENT_JOB_ADDED, EVENT_JOB_REMOVED from apscheduler.schedulers.background import BackgroundScheduler @@ -21,11 +27,7 @@ from apscheduler.executors.pool import ThreadPoolExecutor from datetime import datetime, timedelta from typing import Any, Iterator, List, Tuple, Union, Optional from .SyntheticMetricsGenerator import SyntheticMetricsGenerator -import pytz -import queue -import logging -import uuid -import json + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -36,11 +38,11 @@ class EmulatedDriver(_Driver): """ def __init__(self, address: str, port: int, **settings): super().__init__('emulated_driver', address, port, **settings) - self._initial_config = Node('root') # Tree structure for initial config - self._running_config = Node('root') # Tree structure for running config - self._subscriptions = Node('subscriptions') # Tree for state subscriptions - self._resolver = Resolver() # For path resolution in tree structures - self._out_samples = queue.Queue() # Queue to hold synthetic state samples + self._initial_config = Node('root') # Tree structure for initial config + self._running_config = Node('root') # Tree structure for running config + self._subscriptions = Node('subscriptions') # Tree for state subscriptions + self._resolver = Resolver() # For path resolution in tree structures + self._out_samples = queue.Queue() # Queue to hold synthetic state samples self._synthetic_data = SyntheticMetricsGenerator(metric_queue=self._out_samples) # Placeholder for synthetic data generator self._scheduler = BackgroundScheduler(daemon=True) self._scheduler.configure( @@ -50,9 +52,10 @@ class EmulatedDriver(_Driver): ) self._scheduler.add_listener(self._listener_job_added_to_subscription_tree, EVENT_JOB_ADDED) self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED) + self._helper_methods = EmulatedDriverHelper() self.logger = logging.getLogger(__name__) - self.connected = False # To track connection state + self.connected = False # To track connection state self.logger.info("EmulatedDriver initialized") def Connect(self) -> bool: @@ -76,331 +79,11 @@ class EmulatedDriver(_Driver): if not self.connected: raise RuntimeError("Driver is not connected. Please connect before performing operations.") -# ------------- GetConfig, SetConfig, DeleteConfig, with helper methods (START)----------------- - - def GetInitialConfig(self) -> List[Tuple[str, Any]]: - self._require_connection() - results = [] - for node in self._initial_config.descendants: - value = getattr(node, "value", None) - results.append((node.name, json.loads(value) if value else None)) - self.logger.info("Retrieved initial configurations") - return results - - def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, dict, Exception]]]: - """ - Retrieves the configuration for the specified resource keys. - If no keys are provided, returns the full configuration tree. - - Args: - resource_keys (List[str]): A list of keys specifying the configuration to retrieve. - - Returns: - List[Tuple[str, Union[Any, dict, Exception]]]: A list of tuples with the resource key and its value, - subtree, or an exception. - """ - self._require_connection() - results = [] - - try: - if not resource_keys: - # If no specific keys are provided, return the full configuration tree - full_tree = self._generate_subtree(self._running_config) - return [("full_configuration", full_tree)] - - for key in resource_keys: - try: - # Parse the resource key - resource_path = self._parse_resource_key(key) - self.logger.info(f"1. Retrieving configuration for resource path : {resource_path}") - - # Navigate to the node corresponding to the key - parent = self._running_config - for part in resource_path: - parent = self._find_or_raise_node(part, parent) - - # Check if the node has a value - value = getattr(parent, "value", None) - if value: - # If a value exists, return it - results.append((key, json.loads(value))) - else: - # If no value, return the subtree of this node - subtree = self._generate_subtree(parent) - results.append((key, subtree)) - - except Exception as e: - self.logger.exception(f"Failed to retrieve configuration for key: {key}") - results.append((key, e)) - - except Exception as e: - self.logger.exception("Failed to retrieve configurations") - results.append(("Error", e)) - - return results - - def SetConfig(self, config: dict) -> List[Union[bool, Exception]]: - self._require_connection() - results = [] - - if not isinstance(config, dict): - self.logger.error("Invalid configuration format: config must be a dictionary.") - raise ValueError("Invalid configuration format. Must be a dictionary.") - if 'config_rules' not in config or not isinstance(config['config_rules'], list): - self.logger.error("Invalid configuration format: 'config_rules' key missing or not a list.") - raise ValueError("Invalid configuration format. Must contain a 'config_rules' key with a list of rules.") - - for rule in config['config_rules']: - try: - if 'action' not in rule or 'custom' not in rule: - raise ValueError(f"Invalid rule format: {rule}") - - action = rule['action'] - custom = rule['custom'] - resource_key = custom.get('resource_key') - resource_value = custom.get('resource_value') - - if not resource_key: - raise ValueError(f"Resource key is missing in rule: {rule}") - - if resource_value is None: - raise ValueError(f"Resource value is None for key: {resource_key}") - if not resource_key: - raise ValueError(f"Resource key is missing in rule: {rule}") - - if action == 1: # Set action - resource_path = self._parse_resource_key(resource_key) - # self.logger.info(f"1. Setting configuration for resource key {resource_key} and resource_path: {resource_path}") - parent = self._running_config - - for part in resource_path[:-1]: - if '[' in part and ']' in part: - base, index = part.split('[', 1) - index = index.rstrip(']') - parent = self._find_or_create_node(index, self._find_or_create_node(base, parent)) - # self.logger.info(f"2a. Creating node: {base}, {index}, {parent}") - elif resource_path[-1] != 'settings': - # self.logger.info(f"2b. Creating node: {part}") - parent = self._find_or_create_node(part, parent) - - final_part = resource_path[-1] - if final_part in ['address', 'port']: - self._create_or_update_node(final_part, parent, resource_value) - self.logger.info(f"Configured: {resource_key} = {resource_value}") - - if resource_key.startswith("_connect/settings"): - parent = self._find_or_create_node("_connect", self._running_config) - settings_node = self._find_or_create_node("settings", parent) - settings_node.value = None # Ensure settings node has None value - endpoints_node = self._find_or_create_node("endpoints", settings_node) - - for endpoint in resource_value.get("endpoints", []): - uuid = endpoint.get("uuid") - uuid = uuid.replace('/', '_') if uuid else None - if uuid: - # self.logger.info(f"3. Creating endpoint: {uuid}, {endpoint}, {endpoints_node}") - self._create_or_update_node(uuid, endpoints_node, endpoint) - self.logger.info(f"Configured endpoint: {uuid} : {endpoint}") - - elif resource_key.startswith("/interface"): - interface_parent = self._find_or_create_node("interface", self._running_config) - name = resource_value.get("name") - name = name.replace('/', '_') if name else None - if name: - self._create_or_update_node(name, interface_parent, resource_value) - self.logger.info(f"Configured interface: {name} : {resource_value}") - # self.logger.info(f"4. Configured interface: {name}") - - results.append(True) - else: - raise ValueError(f"Unsupported action '{action}' in rule: {rule}") - - if resource_value is None: - raise ValueError(f"Resource value is None for key: {resource_key}") - - except Exception as e: - self.logger.exception(f"Failed to apply rule: {rule}") - results.append(e) - - return results - - def _generate_subtree(self, node: Node) -> dict: - """ - Generates a subtree of the configuration tree starting from the specified node. - - Args: - node (Node): The node from which to generate the subtree. - - Returns: - dict: The subtree as a dictionary. - """ - subtree = {} - for child in node.children: - if child.children: - subtree[child.name] = self._generate_subtree(child) - else: - value = getattr(child, "value", None) - subtree[child.name] = json.loads(value) if value else None - return subtree - - def DeleteConfig(self, resource_keys: List[str]) -> List[Union[bool, Exception]]: - self._require_connection() - results = [] - - for key in resource_keys: - try: - # Parse resource key into parts, handling brackets correctly - resource_path = self._parse_resource_key(key) - - parent = self._running_config - for part in resource_path: - parent = self._find_or_raise_node(part, parent) - - # Delete the final node - node_to_delete = parent - parent = node_to_delete.parent - parent.children = tuple(child for child in parent.children if child != node_to_delete) - self.logger.info(f"Deleted configuration for key: {key}") - - # Handle endpoints structure - if "interface" in key and "settings" in key: - interface_name = key.split('[')[-1].split(']')[0] - endpoints_parent = self._find_or_raise_node("_connect", self._running_config) - endpoints_node = self._find_or_raise_node("endpoints", endpoints_parent) - endpoint_to_delete = next((child for child in endpoints_node.children if child.name == interface_name), None) - if endpoint_to_delete: - endpoints_node.children = tuple(child for child in endpoints_node.children if child != endpoint_to_delete) - self.logger.info(f"Removed endpoint entry for interface '{interface_name}'") - - # Check if parent has no more children and is not the root - while parent and parent.name != "root" and not parent.children: - node_to_delete = parent - parent = node_to_delete.parent - parent.children = tuple(child for child in parent.children if child != node_to_delete) - self.logger.info(f"Deleted empty parent node: {node_to_delete.name}") - - results.append(True) - except Exception as e: - self.logger.exception(f"Failed to delete configuration for key: {key}") - results.append(e) - - return results - - def _parse_resource_key(self, resource_key: str) -> List[str]: - """ - Parses the resource key into parts, correctly handling brackets. - - Args: - resource_key (str): The resource key to parse. - - Returns: - List[str]: A list of parts from the resource key. - """ - resource_path = [] - current_part = "" - in_brackets = False - - if not resource_key.startswith('/interface'): - for char in resource_key.strip('/'): - if char == '[': - in_brackets = True - current_part += char - elif char == ']': - in_brackets = False - current_part += char - elif char == '/' and not in_brackets: - resource_path.append(current_part) - current_part = "" - else: - current_part += char - if current_part: - resource_path.append(current_part) - return resource_path - else: - resource_path = resource_key.strip('/').split('/', 1) - if resource_path[1] == 'settings': - return resource_path - else: - resource_path = [resource_key.strip('/').split('[')[0].strip('/'), resource_key.strip('/').split('[')[1].split(']')[0].replace('/', '_')] - return resource_path - - def _find_or_raise_node(self, name: str, parent: Node) -> Node: - """ - Finds a node with the given name under the specified parent or raises an exception if not found. - - Args: - name (str): The name of the node to find. - parent (Node): The parent node. - - Returns: - Node: The found node. - - Raises: - ValueError: If the node is not found. - """ - node = next((child for child in parent.children if child.name == name), None) - if not node: - raise ValueError(f"Node '{name}' not found under parent '{parent.name}'.") - return node - - def _find_or_create_node(self, name: str, parent: Node) -> Node: - """ - Finds or creates a node with the given name under the specified parent. - - Args: - name (str): The name of the node to find or create. - parent (Node): The parent node. - - Returns: - Node: The found or created node. - """ - node = next((child for child in parent.children if child.name == name), None) - if not node: - node = Node(name, parent=parent) - return node - - def _create_or_update_node(self, name: str, parent: Node, value: Any): - """ - Creates or updates a node with the given name and value under the specified parent. - - Args: - name (str): The name of the node. - parent (Node): The parent node. - value (Any): The value to set on the node. - """ - node = next((child for child in parent.children if child.name == name), None) - if node: - node.value = json.dumps(value) - else: - Node(name, parent=parent, value=json.dumps(value)) - - def validate_resource_key(self, key: str) -> str: - """ - Splits the input string into two parts: - - The first part is '_connect/settings/endpoints/'. - - The second part is the remaining string after the first part, with '/' replaced by '_'. - - Args: - key (str): The input string to process. - - Returns: - str: A single string with the processed result. - """ - prefix = '_connect/settings/endpoints/' - if not key.startswith(prefix): - raise ValueError(f"The input path '{key}' does not start with the expected prefix: {prefix}") - second_part = key[len(prefix):] - second_part_processed = second_part.replace('/', '_') - validated_key = prefix + second_part_processed - return validated_key - -# ------------- GetConfig, SetConfig, DeleteConfig, with helper methods (END)----------------- - def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: self._require_connection() results = [] for resource_key, duration, interval in subscriptions: - resource_key = self.validate_resource_key(resource_key) # Validate the endpoint name + resource_key = self._helper_methods.validate_resource_key(resource_key) # Validate the endpoint name self.logger.info(f"1. Subscribing to {resource_key} with duration {duration}s and interval {interval}s") try: self._resolver.get(self._running_config, resource_key) # Verify if the resource key exists in the running configuration @@ -409,8 +92,13 @@ class EmulatedDriver(_Driver): if resource_value is not None: sample_type_ids = resource_value['sample_types'] self.logger.info(f"Sample type IDs for {resource_key}: {sample_type_ids}") + if len(sample_type_ids) == 0: + self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.") + results.append(False) + continue else: self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.") + results.append(False) continue # Add the job to the scheduler job_id = f"{resource_key}-{uuid.uuid4()}" @@ -434,7 +122,7 @@ class EmulatedDriver(_Driver): self._require_connection() results = [] for resource_key, _, _ in subscriptions: - resource_key = self.validate_resource_key(resource_key) + resource_key = self._helper_methods.validate_resource_key(resource_key) try: # Check if job exists job_ids = [job.id for job in self._scheduler.get_jobs() if resource_key in job.id] @@ -453,7 +141,7 @@ class EmulatedDriver(_Driver): results.append(e) return results - def GetState(self, blocking: bool = False, terminate: Optional[queue.Queue] = None) -> Iterator[Tuple[str, Any]]: + def GetState(self, blocking: bool = False, terminate: Optional[queue.Queue] = None) -> Iterator[Tuple[float, str, Any]]: self._require_connection() start_time = datetime.now(pytz.utc) duration = 10 # Duration of the subscription in seconds (as an example) @@ -478,7 +166,6 @@ class EmulatedDriver(_Driver): return None def _generate_sample(self, resource_key: str, sample_type_ids : List[int]): - self._require_connection() # Simulate generating a sample for the resource key self.logger.debug(f"Executing _generate_sample for resource: {resource_key}") sample = self._synthetic_data.generate_synthetic_data_point(resource_key, sample_type_ids) @@ -490,7 +177,7 @@ class EmulatedDriver(_Driver): if event.job_id: # Extract the resource key from the job ID resource_key = event.job_id.split('-')[0] - resource_key = self.validate_resource_key(resource_key) + resource_key = self._helper_methods.validate_resource_key(resource_key) # Remove the subscription from the tree try: @@ -502,7 +189,7 @@ class EmulatedDriver(_Driver): raise ValueError(f"Subscription path '{resource_key}' not found in tree.") if parent: parent.parent.children = tuple(child for child in parent.parent.children if child != parent) - self.logger.info(f"Automatically removed subscription from subscription_tree for {resource_key} after job termination by listener.") + self.logger.warning(f"Automatically removed subscription from subscription_tree for {resource_key} after job termination by listener. Maybe due to timeout.") except Exception as e: self.logger.warning(f"Failed to remove subscription for {resource_key}: {e}") @@ -511,7 +198,7 @@ class EmulatedDriver(_Driver): job_id = event.job_id if job_id: resource_key = job_id.split('-')[0] # Extract resource key from job ID - resource_key = self.validate_resource_key(resource_key) + resource_key = self._helper_methods.validate_resource_key(resource_key) subscription_path = resource_key.split('/') parent = self._subscriptions for part in subscription_path: @@ -528,36 +215,238 @@ class EmulatedDriver(_Driver): # ------------- Event Listeners (END)----------------- - def log_active_jobs(self): - """ - Logs the IDs of all active jobs. - This method retrieves the list of active jobs from the scheduler and logs their IDs using the logger. - """ - self._require_connection() - jobs = self._scheduler.get_jobs() - self.logger.info(f"Active jobs: {[job.id for job in jobs]}") - - def print_config_tree(self): - """ - Reads the configuration using GetConfig and prints it as a hierarchical tree structure. - For debugging purposes. - """ +#------------------------------------------------------------------------------------- +# ------- The below methods are kept for debugging purposes (test-case) only --------- +#------------------------------------------------------------------------------------- + +# This method can be commented but this will arise an error in the test-case (@pytest.fixture --> connected_configured_driver()). + def SetConfig(self, resources: dict) -> List[Union[bool, Exception]]: # For debugging purposes. self._require_connection() + results = [] + + # if not isinstance(resources, dict): + # self.logger.error("Invalid configuration format: resources must be a dictionary.") + # raise ValueError("Invalid configuration format. Must be a dictionary.") + if 'config_rules' not in resources or not isinstance(resources['config_rules'], list): + self.logger.error("Invalid configuration format: 'config_rules' key missing or not a list.") + raise ValueError("Invalid configuration format. Must contain a 'config_rules' key with a list of rules.") + + for rule in resources['config_rules']: + try: + if 'action' not in rule or 'custom' not in rule: + raise ValueError(f"Invalid rule format: {rule}") + + action = rule['action'] + custom = rule['custom'] + resource_key = custom.get('resource_key') + resource_value = custom.get('resource_value') + + if not resource_key: + raise ValueError(f"Resource key is missing in rule: {rule}") + + if resource_value is None: + raise ValueError(f"Resource value is None for key: {resource_key}") + if not resource_key: + raise ValueError(f"Resource key is missing in rule: {rule}") + + if action == 1: # Set action + resource_path = self._helper_methods._parse_resource_key(resource_key) + # self.logger.info(f"1. Setting configuration for resource key {resource_key} and resource_path: {resource_path}") + parent = self._running_config + + for part in resource_path[:-1]: + if '[' in part and ']' in part: + base, index = part.split('[', 1) + index = index.rstrip(']') + parent = self._helper_methods._find_or_create_node(index, self._helper_methods._find_or_create_node(base, parent)) + # self.logger.info(f"2a. Creating node: {base}, {index}, {parent}") + elif resource_path[-1] != 'settings': + # self.logger.info(f"2b. Creating node: {part}") + parent = self._helper_methods._find_or_create_node(part, parent) + + final_part = resource_path[-1] + if final_part in ['address', 'port']: + self._helper_methods._create_or_update_node(final_part, parent, resource_value) + self.logger.info(f"Configured: {resource_key} = {resource_value}") + + if resource_key.startswith("_connect/settings"): + parent = self._helper_methods._find_or_create_node("_connect", self._running_config) + settings_node = self._helper_methods._find_or_create_node("settings", parent) + settings_node.value = None # Ensure settings node has None value + endpoints_node = self._helper_methods._find_or_create_node("endpoints", settings_node) + + for endpoint in resource_value.get("endpoints", []): + uuid = endpoint.get("uuid") + uuid = uuid.replace('/', '_') if uuid else None + if uuid: + # self.logger.info(f"3. Creating endpoint: {uuid}, {endpoint}, {endpoints_node}") + self._helper_methods._create_or_update_node(uuid, endpoints_node, endpoint) + self.logger.info(f"Configured endpoint: {uuid} : {endpoint}") + + elif resource_key.startswith("/interface"): + interface_parent = self._helper_methods._find_or_create_node("interface", self._running_config) + name = resource_value.get("name") + name = name.replace('/', '_') if name else None + if name: + self._helper_methods._create_or_update_node(name, interface_parent, resource_value) + self.logger.info(f"Configured interface: {name} : {resource_value}") + # self.logger.info(f"4. Configured interface: {name}") + + results.append(True) + else: + raise ValueError(f"Unsupported action '{action}' in rule: {rule}") + + if resource_value is None: + raise ValueError(f"Resource value is None for key: {resource_key}") + + except Exception as e: + self.logger.exception(f"Failed to apply rule: {rule}") + results.append(e) + + return results + +#----------------------------------- +# ------- EXTRA Methods ------------ +#----------------------------------- + + # def log_active_jobs(self): # For debugging purposes. + # """ + # Logs the IDs of all active jobs. + # This method retrieves the list of active jobs from the scheduler and logs their IDs using the logger. + # """ + # self._require_connection() + # jobs = self._scheduler.get_jobs() + # self.logger.info(f"Active jobs: {[job.id for job in jobs]}") + + # def print_config_tree(self): # For debugging purposes. + # """ + # Reads the configuration using GetConfig and prints it as a hierarchical tree structure. + # """ + # self._require_connection() + + # def print_tree(node, indent=""): + # """ + # Recursively prints the configuration tree. + + # Args: + # node (Node): The current node to print. + # indent (str): The current indentation level. + # """ + # if node.name != "root": # Skip the root node's name + # value = getattr(node, "value", None) + # print(f"{indent}- {node.name}: {json.loads(value) if value else ''}") + + # for child in node.children: + # print_tree(child, indent + " ") + + # print("Configuration Tree:") + # print_tree(self._running_config) + + + # def GetInitialConfig(self) -> List[Tuple[str, Any]]: # comment + # self._require_connection() + # results = [] + # for node in self._initial_config.descendants: + # value = getattr(node, "value", None) + # results.append((node.name, json.loads(value) if value else None)) + # self.logger.info("Retrieved initial configurations") + # return results + + # def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, dict, Exception]]]: # comment + # """ + # Retrieves the configuration for the specified resource keys. + # If no keys are provided, returns the full configuration tree. - def print_tree(node, indent=""): - """ - Recursively prints the configuration tree. - - Args: - node (Node): The current node to print. - indent (str): The current indentation level. - """ - if node.name != "root": # Skip the root node's name - value = getattr(node, "value", None) - print(f"{indent}- {node.name}: {json.loads(value) if value else ''}") + # Args: + # resource_keys (List[str]): A list of keys specifying the configuration to retrieve. - for child in node.children: - print_tree(child, indent + " ") + # Returns: + # List[Tuple[str, Union[Any, dict, Exception]]]: A list of tuples with the resource key and its value, + # subtree, or an exception. + # """ + # self._require_connection() + # results = [] + + # try: + # if not resource_keys: + # # If no specific keys are provided, return the full configuration tree + + # full_tree = self._helper_methods._generate_subtree(self._running_config) + # # full_tree = self._generate_subtree(self._running_config) + # return [("full_configuration", full_tree)] + + # for key in resource_keys: + # try: + # # Parse the resource key + # resource_path = self._helper_methods.(key) + # self.logger.info(f"1. Retrieving configuration for resource path : {resource_path}") + + # # Navigate to the node corresponding to the key + # parent = self._running_config + # for part in resource_path: + # parent = self._find_or_raise_node(part, parent) + + # # Check if the node has a value + # value = getattr(parent, "value", None) + # if value: + # # If a value exists, return it + # results.append((key, json.loads(value))) + # else: + # # If no value, return the subtree of this node + # subtree = self._helper_methods._generate_subtree(parent) + # # subtree = self._generate_subtree(parent) + # results.append((key, subtree)) + + # except Exception as e: + # self.logger.exception(f"Failed to retrieve configuration for key: {key}") + # results.append((key, e)) + + # except Exception as e: + # self.logger.exception("Failed to retrieve configurations") + # results.append(("Error", e)) + + # return results + + # def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: # comment + # self._require_connection() + # results = [] + + # for key in resources: + # try: + # # Parse resource key into parts, handling brackets correctly + # resource_path = self._helper_methods.(key) + + # parent = self._running_config + # for part in resource_path: + # parent = self._find_or_raise_node(part, parent) + + # # Delete the final node + # node_to_delete = parent + # parent = node_to_delete.parent + # parent.children = tuple(child for child in parent.children if child != node_to_delete) + # self.logger.info(f"Deleted configuration for key: {key}") + + # # Handle endpoints structure + # if "interface" in key and "settings" in key: + # interface_name = key.split('[')[-1].split(']')[0] + # endpoints_parent = self._find_or_raise_node("_connect", self._running_config) + # endpoints_node = self._find_or_raise_node("endpoints", endpoints_parent) + # endpoint_to_delete = next((child for child in endpoints_node.children if child.name == interface_name), None) + # if endpoint_to_delete: + # endpoints_node.children = tuple(child for child in endpoints_node.children if child != endpoint_to_delete) + # self.logger.info(f"Removed endpoint entry for interface '{interface_name}'") + + # # Check if parent has no more children and is not the root + # while parent and parent.name != "root" and not parent.children: + # node_to_delete = parent + # parent = node_to_delete.parent + # parent.children = tuple(child for child in parent.children if child != node_to_delete) + # self.logger.info(f"Deleted empty parent node: {node_to_delete.name}") + + # results.append(True) + # except Exception as e: + # self.logger.exception(f"Failed to delete configuration for key: {key}") + # results.append(e) + + # return results - print("Configuration Tree:") - print_tree(self._running_config) diff --git a/src/telemetry/backend/drivers/emulated/EmulatedHelper.py b/src/telemetry/backend/drivers/emulated/EmulatedHelper.py new file mode 100644 index 0000000000000000000000000000000000000000..008ad95d6402eb4a02da95bed550e4e44c5b8df8 --- /dev/null +++ b/src/telemetry/backend/drivers/emulated/EmulatedHelper.py @@ -0,0 +1,166 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from anytree import Node +import json +from typing import Any, List + + +class EmulatedDriverHelper: + """ + Helper class for the emulated driver. + """ + def __init__(self): + pass + + def validate_resource_key(self, key: str) -> str: + """ + Splits the input string into two parts: + - The first part is '_connect/settings/endpoints/'. + - The second part is the remaining string after the first part, with '/' replaced by '_'. + + Args: + key (str): The input string to process. + + Returns: + str: A single string with the processed result. + """ + prefix = '_connect/settings/endpoints/' + if not key.startswith(prefix): + raise ValueError(f"The input path '{key}' does not start with the expected prefix: {prefix}") + second_part = key[len(prefix):] + second_part_processed = second_part.replace('/', '_') + validated_key = prefix + second_part_processed + return validated_key + +#-------------------------------------------------------------------------------------- +# ------- Below function is kept for debugging purposes (test-cases) only ------------- +#-------------------------------------------------------------------------------------- + +# This below methods can be commented but are called by the SetConfig method in EmulatedDriver.py + + def _find_or_create_node(self, name: str, parent: Node) -> Node: + """ + Finds or creates a node with the given name under the specified parent. + + Args: + name (str): The name of the node to find or create. + parent (Node): The parent node. + + Returns: + Node: The found or created node. + """ + node = next((child for child in parent.children if child.name == name), None) + if not node: + node = Node(name, parent=parent) + return node + + + def _create_or_update_node(self, name: str, parent: Node, value: Any): + """ + Creates or updates a node with the given name and value under the specified parent. + + Args: + name (str): The name of the node. + parent (Node): The parent node. + value (Any): The value to set on the node. + """ + node = next((child for child in parent.children if child.name == name), None) + if node: + node.value = json.dumps(value) + else: + Node(name, parent=parent, value=json.dumps(value)) + + + def _parse_resource_key(self, resource_key: str) -> List[str]: + """ + Parses the resource key into parts, correctly handling brackets. + + Args: + resource_key (str): The resource key to parse. + + Returns: + List[str]: A list of parts from the resource key. + """ + resource_path = [] + current_part = "" + in_brackets = False + + if not resource_key.startswith('/interface'): + for char in resource_key.strip('/'): + if char == '[': + in_brackets = True + current_part += char + elif char == ']': + in_brackets = False + current_part += char + elif char == '/' and not in_brackets: + resource_path.append(current_part) + current_part = "" + else: + current_part += char + if current_part: + resource_path.append(current_part) + return resource_path + else: + resource_path = resource_key.strip('/').split('/', 1) + if resource_path[1] == 'settings': + return resource_path + else: + resource_path = [resource_key.strip('/').split('[')[0].strip('/'), resource_key.strip('/').split('[')[1].split(']')[0].replace('/', '_')] + return resource_path + + +#----------------------------------- +# ------- EXTRA Methods ------------ +#----------------------------------- + + # def _generate_subtree(self, node: Node) -> dict: + # """ + # Generates a subtree of the configuration tree starting from the specified node. + + # Args: + # node (Node): The node from which to generate the subtree. + + # Returns: + # dict: The subtree as a dictionary. + # """ + # subtree = {} + # for child in node.children: + # if child.children: + # subtree[child.name] = self._generate_subtree(child) + # else: + # value = getattr(child, "value", None) + # subtree[child.name] = json.loads(value) if value else None + # return subtree + + + # def _find_or_raise_node(self, name: str, parent: Node) -> Node: + # """ + # Finds a node with the given name under the specified parent or raises an exception if not found. + + # Args: + # name (str): The name of the node to find. + # parent (Node): The parent node. + + # Returns: + # Node: The found node. + + # Raises: + # ValueError: If the node is not found. + # """ + # node = next((child for child in parent.children if child.name == name), None) + # if not node: + # raise ValueError(f"Node '{name}' not found under parent '{parent.name}'.") + # return node diff --git a/src/telemetry/backend/tests/test_emulated.py b/src/telemetry/backend/tests/test_emulated.py index 14d8a8ab925216735c0cd60a0f5da2ee9892a59f..316b98d83739a426e22e9bbd2488458eef611859 100644 --- a/src/telemetry/backend/tests/test_emulated.py +++ b/src/telemetry/backend/tests/test_emulated.py @@ -52,38 +52,39 @@ def test_disconnect(setup_driver): assert driver.Disconnect() is True assert driver.connected is False -def test_set_config(setup_driver): - logger.info(">>> test_set_config <<<") - driver = setup_driver - driver.Connect() +# def test_set_config(setup_driver): +# logger.info(">>> test_set_config <<<") +# driver = setup_driver +# driver.Connect() - config = create_test_configuration() +# config = create_test_configuration() - results = driver.SetConfig(config) - assert all(result is True for result in results) +# results = driver.SetConfig(config) +# assert all(result is True for result in results) -def test_get_config(connected_configured_driver): - logger.info(">>> test_get_config <<<") - resource_keys = create_specific_config_keys() - results = connected_configured_driver.GetConfig(resource_keys) +# def test_get_config(connected_configured_driver): +# logger.info(">>> test_get_config <<<") +# resource_keys = create_specific_config_keys() +# results = connected_configured_driver.GetConfig(resource_keys) - for key, value in results: - assert key in create_specific_config_keys() - assert value is not None +# for key, value in results: +# assert key in create_specific_config_keys() +# assert value is not None -def test_delete_config(connected_configured_driver): - logger.info(">>> test_delete_config <<<") - resource_keys = create_config_for_delete() +# def test_delete_config(connected_configured_driver): +# logger.info(">>> test_delete_config <<<") +# resource_keys = create_config_for_delete() - results = connected_configured_driver.DeleteConfig(resource_keys) - assert all(result is True for result in results) +# results = connected_configured_driver.DeleteConfig(resource_keys) +# assert all(result is True for result in results) def test_subscribe_state(connected_configured_driver): logger.info(">>> test_subscribe_state <<<") subscriptions = create_test_subscriptions() results = connected_configured_driver.SubscribeState(subscriptions) - assert all(result is True for result in results) + # logger.info(f"Subscribed result: {results}.") + assert results == [False, True, True] # all(result is True for result in results) def test_unsubscribe_state(connected_configured_driver): logger.info(">>> test_unsubscribe_state <<<") @@ -91,15 +92,15 @@ def test_unsubscribe_state(connected_configured_driver): connected_configured_driver.SubscribeState(subscriptions) results = connected_configured_driver.UnsubscribeState(subscriptions) - assert all(result is True for result in results) + assert results == [False, True, True] # all(result is True for result in results) def test_get_state(connected_configured_driver): logger.info(">>> test_get_state <<<") subscriptions = create_test_subscriptions() connected_configured_driver.SubscribeState(subscriptions) - logger.info(f"Subscribed to state: {subscriptions}. waiting for 3 seconds ...") - time.sleep(3) + logger.info(f"Subscribed to state: {subscriptions}. waiting for 12 seconds ...") + time.sleep(12) state_iterator = connected_configured_driver.GetState(blocking=False) states = list(state_iterator)