Skip to content
Snippets Groups Projects
Commit 7dbbfc03 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Merge branch 'feat/191-cttc-implement-telemetry-backend-collector-emulated'...

Merge branch 'feat/191-cttc-implement-telemetry-backend-collector-emulated' into feat/247-cttc-analytics-module-enhancements
parents 1a383d92 69684356
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!317Resolve "(CTTC) Analytics Module Enhancements"
...@@ -12,7 +12,13 @@ ...@@ -12,7 +12,13 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from telemetry.backend.drivers.core._Driver import _Driver import pytz
import queue
import logging
import uuid
import json
from telemetry.backend.drivers.emulated.EmulatedHelper import EmulatedDriverHelper
from telemetry.backend.driver_api._Driver import _Driver
from anytree import Node, Resolver from anytree import Node, Resolver
from apscheduler.events import EVENT_JOB_ADDED, EVENT_JOB_REMOVED from apscheduler.events import EVENT_JOB_ADDED, EVENT_JOB_REMOVED
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
...@@ -21,11 +27,7 @@ from apscheduler.executors.pool import ThreadPoolExecutor ...@@ -21,11 +27,7 @@ from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, Iterator, List, Tuple, Union, Optional from typing import Any, Iterator, List, Tuple, Union, Optional
from .SyntheticMetricsGenerator import SyntheticMetricsGenerator from .SyntheticMetricsGenerator import SyntheticMetricsGenerator
import pytz
import queue
import logging
import uuid
import json
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
...@@ -36,11 +38,11 @@ class EmulatedDriver(_Driver): ...@@ -36,11 +38,11 @@ class EmulatedDriver(_Driver):
""" """
def __init__(self, address: str, port: int, **settings): def __init__(self, address: str, port: int, **settings):
super().__init__('emulated_driver', address, port, **settings) super().__init__('emulated_driver', address, port, **settings)
self._initial_config = Node('root') # Tree structure for initial config self._initial_config = Node('root') # Tree structure for initial config
self._running_config = Node('root') # Tree structure for running config self._running_config = Node('root') # Tree structure for running config
self._subscriptions = Node('subscriptions') # Tree for state subscriptions self._subscriptions = Node('subscriptions') # Tree for state subscriptions
self._resolver = Resolver() # For path resolution in tree structures self._resolver = Resolver() # For path resolution in tree structures
self._out_samples = queue.Queue() # Queue to hold synthetic state samples self._out_samples = queue.Queue() # Queue to hold synthetic state samples
self._synthetic_data = SyntheticMetricsGenerator(metric_queue=self._out_samples) # Placeholder for synthetic data generator self._synthetic_data = SyntheticMetricsGenerator(metric_queue=self._out_samples) # Placeholder for synthetic data generator
self._scheduler = BackgroundScheduler(daemon=True) self._scheduler = BackgroundScheduler(daemon=True)
self._scheduler.configure( self._scheduler.configure(
...@@ -50,9 +52,10 @@ class EmulatedDriver(_Driver): ...@@ -50,9 +52,10 @@ class EmulatedDriver(_Driver):
) )
self._scheduler.add_listener(self._listener_job_added_to_subscription_tree, EVENT_JOB_ADDED) self._scheduler.add_listener(self._listener_job_added_to_subscription_tree, EVENT_JOB_ADDED)
self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED) self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED)
self._helper_methods = EmulatedDriverHelper()
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.connected = False # To track connection state self.connected = False # To track connection state
self.logger.info("EmulatedDriver initialized") self.logger.info("EmulatedDriver initialized")
def Connect(self) -> bool: def Connect(self) -> bool:
...@@ -76,331 +79,11 @@ class EmulatedDriver(_Driver): ...@@ -76,331 +79,11 @@ class EmulatedDriver(_Driver):
if not self.connected: if not self.connected:
raise RuntimeError("Driver is not connected. Please connect before performing operations.") raise RuntimeError("Driver is not connected. Please connect before performing operations.")
# ------------- GetConfig, SetConfig, DeleteConfig, with helper methods (START)-----------------
def GetInitialConfig(self) -> List[Tuple[str, Any]]:
self._require_connection()
results = []
for node in self._initial_config.descendants:
value = getattr(node, "value", None)
results.append((node.name, json.loads(value) if value else None))
self.logger.info("Retrieved initial configurations")
return results
def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, dict, Exception]]]:
"""
Retrieves the configuration for the specified resource keys.
If no keys are provided, returns the full configuration tree.
Args:
resource_keys (List[str]): A list of keys specifying the configuration to retrieve.
Returns:
List[Tuple[str, Union[Any, dict, Exception]]]: A list of tuples with the resource key and its value,
subtree, or an exception.
"""
self._require_connection()
results = []
try:
if not resource_keys:
# If no specific keys are provided, return the full configuration tree
full_tree = self._generate_subtree(self._running_config)
return [("full_configuration", full_tree)]
for key in resource_keys:
try:
# Parse the resource key
resource_path = self._parse_resource_key(key)
self.logger.info(f"1. Retrieving configuration for resource path : {resource_path}")
# Navigate to the node corresponding to the key
parent = self._running_config
for part in resource_path:
parent = self._find_or_raise_node(part, parent)
# Check if the node has a value
value = getattr(parent, "value", None)
if value:
# If a value exists, return it
results.append((key, json.loads(value)))
else:
# If no value, return the subtree of this node
subtree = self._generate_subtree(parent)
results.append((key, subtree))
except Exception as e:
self.logger.exception(f"Failed to retrieve configuration for key: {key}")
results.append((key, e))
except Exception as e:
self.logger.exception("Failed to retrieve configurations")
results.append(("Error", e))
return results
def SetConfig(self, config: dict) -> List[Union[bool, Exception]]:
self._require_connection()
results = []
if not isinstance(config, dict):
self.logger.error("Invalid configuration format: config must be a dictionary.")
raise ValueError("Invalid configuration format. Must be a dictionary.")
if 'config_rules' not in config or not isinstance(config['config_rules'], list):
self.logger.error("Invalid configuration format: 'config_rules' key missing or not a list.")
raise ValueError("Invalid configuration format. Must contain a 'config_rules' key with a list of rules.")
for rule in config['config_rules']:
try:
if 'action' not in rule or 'custom' not in rule:
raise ValueError(f"Invalid rule format: {rule}")
action = rule['action']
custom = rule['custom']
resource_key = custom.get('resource_key')
resource_value = custom.get('resource_value')
if not resource_key:
raise ValueError(f"Resource key is missing in rule: {rule}")
if resource_value is None:
raise ValueError(f"Resource value is None for key: {resource_key}")
if not resource_key:
raise ValueError(f"Resource key is missing in rule: {rule}")
if action == 1: # Set action
resource_path = self._parse_resource_key(resource_key)
# self.logger.info(f"1. Setting configuration for resource key {resource_key} and resource_path: {resource_path}")
parent = self._running_config
for part in resource_path[:-1]:
if '[' in part and ']' in part:
base, index = part.split('[', 1)
index = index.rstrip(']')
parent = self._find_or_create_node(index, self._find_or_create_node(base, parent))
# self.logger.info(f"2a. Creating node: {base}, {index}, {parent}")
elif resource_path[-1] != 'settings':
# self.logger.info(f"2b. Creating node: {part}")
parent = self._find_or_create_node(part, parent)
final_part = resource_path[-1]
if final_part in ['address', 'port']:
self._create_or_update_node(final_part, parent, resource_value)
self.logger.info(f"Configured: {resource_key} = {resource_value}")
if resource_key.startswith("_connect/settings"):
parent = self._find_or_create_node("_connect", self._running_config)
settings_node = self._find_or_create_node("settings", parent)
settings_node.value = None # Ensure settings node has None value
endpoints_node = self._find_or_create_node("endpoints", settings_node)
for endpoint in resource_value.get("endpoints", []):
uuid = endpoint.get("uuid")
uuid = uuid.replace('/', '_') if uuid else None
if uuid:
# self.logger.info(f"3. Creating endpoint: {uuid}, {endpoint}, {endpoints_node}")
self._create_or_update_node(uuid, endpoints_node, endpoint)
self.logger.info(f"Configured endpoint: {uuid} : {endpoint}")
elif resource_key.startswith("/interface"):
interface_parent = self._find_or_create_node("interface", self._running_config)
name = resource_value.get("name")
name = name.replace('/', '_') if name else None
if name:
self._create_or_update_node(name, interface_parent, resource_value)
self.logger.info(f"Configured interface: {name} : {resource_value}")
# self.logger.info(f"4. Configured interface: {name}")
results.append(True)
else:
raise ValueError(f"Unsupported action '{action}' in rule: {rule}")
if resource_value is None:
raise ValueError(f"Resource value is None for key: {resource_key}")
except Exception as e:
self.logger.exception(f"Failed to apply rule: {rule}")
results.append(e)
return results
def _generate_subtree(self, node: Node) -> dict:
"""
Generates a subtree of the configuration tree starting from the specified node.
Args:
node (Node): The node from which to generate the subtree.
Returns:
dict: The subtree as a dictionary.
"""
subtree = {}
for child in node.children:
if child.children:
subtree[child.name] = self._generate_subtree(child)
else:
value = getattr(child, "value", None)
subtree[child.name] = json.loads(value) if value else None
return subtree
def DeleteConfig(self, resource_keys: List[str]) -> List[Union[bool, Exception]]:
self._require_connection()
results = []
for key in resource_keys:
try:
# Parse resource key into parts, handling brackets correctly
resource_path = self._parse_resource_key(key)
parent = self._running_config
for part in resource_path:
parent = self._find_or_raise_node(part, parent)
# Delete the final node
node_to_delete = parent
parent = node_to_delete.parent
parent.children = tuple(child for child in parent.children if child != node_to_delete)
self.logger.info(f"Deleted configuration for key: {key}")
# Handle endpoints structure
if "interface" in key and "settings" in key:
interface_name = key.split('[')[-1].split(']')[0]
endpoints_parent = self._find_or_raise_node("_connect", self._running_config)
endpoints_node = self._find_or_raise_node("endpoints", endpoints_parent)
endpoint_to_delete = next((child for child in endpoints_node.children if child.name == interface_name), None)
if endpoint_to_delete:
endpoints_node.children = tuple(child for child in endpoints_node.children if child != endpoint_to_delete)
self.logger.info(f"Removed endpoint entry for interface '{interface_name}'")
# Check if parent has no more children and is not the root
while parent and parent.name != "root" and not parent.children:
node_to_delete = parent
parent = node_to_delete.parent
parent.children = tuple(child for child in parent.children if child != node_to_delete)
self.logger.info(f"Deleted empty parent node: {node_to_delete.name}")
results.append(True)
except Exception as e:
self.logger.exception(f"Failed to delete configuration for key: {key}")
results.append(e)
return results
def _parse_resource_key(self, resource_key: str) -> List[str]:
"""
Parses the resource key into parts, correctly handling brackets.
Args:
resource_key (str): The resource key to parse.
Returns:
List[str]: A list of parts from the resource key.
"""
resource_path = []
current_part = ""
in_brackets = False
if not resource_key.startswith('/interface'):
for char in resource_key.strip('/'):
if char == '[':
in_brackets = True
current_part += char
elif char == ']':
in_brackets = False
current_part += char
elif char == '/' and not in_brackets:
resource_path.append(current_part)
current_part = ""
else:
current_part += char
if current_part:
resource_path.append(current_part)
return resource_path
else:
resource_path = resource_key.strip('/').split('/', 1)
if resource_path[1] == 'settings':
return resource_path
else:
resource_path = [resource_key.strip('/').split('[')[0].strip('/'), resource_key.strip('/').split('[')[1].split(']')[0].replace('/', '_')]
return resource_path
def _find_or_raise_node(self, name: str, parent: Node) -> Node:
"""
Finds a node with the given name under the specified parent or raises an exception if not found.
Args:
name (str): The name of the node to find.
parent (Node): The parent node.
Returns:
Node: The found node.
Raises:
ValueError: If the node is not found.
"""
node = next((child for child in parent.children if child.name == name), None)
if not node:
raise ValueError(f"Node '{name}' not found under parent '{parent.name}'.")
return node
def _find_or_create_node(self, name: str, parent: Node) -> Node:
"""
Finds or creates a node with the given name under the specified parent.
Args:
name (str): The name of the node to find or create.
parent (Node): The parent node.
Returns:
Node: The found or created node.
"""
node = next((child for child in parent.children if child.name == name), None)
if not node:
node = Node(name, parent=parent)
return node
def _create_or_update_node(self, name: str, parent: Node, value: Any):
"""
Creates or updates a node with the given name and value under the specified parent.
Args:
name (str): The name of the node.
parent (Node): The parent node.
value (Any): The value to set on the node.
"""
node = next((child for child in parent.children if child.name == name), None)
if node:
node.value = json.dumps(value)
else:
Node(name, parent=parent, value=json.dumps(value))
def validate_resource_key(self, key: str) -> str:
"""
Splits the input string into two parts:
- The first part is '_connect/settings/endpoints/'.
- The second part is the remaining string after the first part, with '/' replaced by '_'.
Args:
key (str): The input string to process.
Returns:
str: A single string with the processed result.
"""
prefix = '_connect/settings/endpoints/'
if not key.startswith(prefix):
raise ValueError(f"The input path '{key}' does not start with the expected prefix: {prefix}")
second_part = key[len(prefix):]
second_part_processed = second_part.replace('/', '_')
validated_key = prefix + second_part_processed
return validated_key
# ------------- GetConfig, SetConfig, DeleteConfig, with helper methods (END)-----------------
def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
self._require_connection() self._require_connection()
results = [] results = []
for resource_key, duration, interval in subscriptions: for resource_key, duration, interval in subscriptions:
resource_key = self.validate_resource_key(resource_key) # Validate the endpoint name resource_key = self._helper_methods.validate_resource_key(resource_key) # Validate the endpoint name
self.logger.info(f"1. Subscribing to {resource_key} with duration {duration}s and interval {interval}s") self.logger.info(f"1. Subscribing to {resource_key} with duration {duration}s and interval {interval}s")
try: try:
self._resolver.get(self._running_config, resource_key) # Verify if the resource key exists in the running configuration self._resolver.get(self._running_config, resource_key) # Verify if the resource key exists in the running configuration
...@@ -409,8 +92,13 @@ class EmulatedDriver(_Driver): ...@@ -409,8 +92,13 @@ class EmulatedDriver(_Driver):
if resource_value is not None: if resource_value is not None:
sample_type_ids = resource_value['sample_types'] sample_type_ids = resource_value['sample_types']
self.logger.info(f"Sample type IDs for {resource_key}: {sample_type_ids}") self.logger.info(f"Sample type IDs for {resource_key}: {sample_type_ids}")
if len(sample_type_ids) == 0:
self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.")
results.append(False)
continue
else: else:
self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.") self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.")
results.append(False)
continue continue
# Add the job to the scheduler # Add the job to the scheduler
job_id = f"{resource_key}-{uuid.uuid4()}" job_id = f"{resource_key}-{uuid.uuid4()}"
...@@ -434,7 +122,7 @@ class EmulatedDriver(_Driver): ...@@ -434,7 +122,7 @@ class EmulatedDriver(_Driver):
self._require_connection() self._require_connection()
results = [] results = []
for resource_key, _, _ in subscriptions: for resource_key, _, _ in subscriptions:
resource_key = self.validate_resource_key(resource_key) resource_key = self._helper_methods.validate_resource_key(resource_key)
try: try:
# Check if job exists # Check if job exists
job_ids = [job.id for job in self._scheduler.get_jobs() if resource_key in job.id] job_ids = [job.id for job in self._scheduler.get_jobs() if resource_key in job.id]
...@@ -453,7 +141,7 @@ class EmulatedDriver(_Driver): ...@@ -453,7 +141,7 @@ class EmulatedDriver(_Driver):
results.append(e) results.append(e)
return results return results
def GetState(self, blocking: bool = False, terminate: Optional[queue.Queue] = None) -> Iterator[Tuple[str, Any]]: def GetState(self, blocking: bool = False, terminate: Optional[queue.Queue] = None) -> Iterator[Tuple[float, str, Any]]:
self._require_connection() self._require_connection()
start_time = datetime.now(pytz.utc) start_time = datetime.now(pytz.utc)
duration = 10 # Duration of the subscription in seconds (as an example) duration = 10 # Duration of the subscription in seconds (as an example)
...@@ -478,7 +166,6 @@ class EmulatedDriver(_Driver): ...@@ -478,7 +166,6 @@ class EmulatedDriver(_Driver):
return None return None
def _generate_sample(self, resource_key: str, sample_type_ids : List[int]): def _generate_sample(self, resource_key: str, sample_type_ids : List[int]):
self._require_connection()
# Simulate generating a sample for the resource key # Simulate generating a sample for the resource key
self.logger.debug(f"Executing _generate_sample for resource: {resource_key}") self.logger.debug(f"Executing _generate_sample for resource: {resource_key}")
sample = self._synthetic_data.generate_synthetic_data_point(resource_key, sample_type_ids) sample = self._synthetic_data.generate_synthetic_data_point(resource_key, sample_type_ids)
...@@ -490,7 +177,7 @@ class EmulatedDriver(_Driver): ...@@ -490,7 +177,7 @@ class EmulatedDriver(_Driver):
if event.job_id: if event.job_id:
# Extract the resource key from the job ID # Extract the resource key from the job ID
resource_key = event.job_id.split('-')[0] resource_key = event.job_id.split('-')[0]
resource_key = self.validate_resource_key(resource_key) resource_key = self._helper_methods.validate_resource_key(resource_key)
# Remove the subscription from the tree # Remove the subscription from the tree
try: try:
...@@ -502,7 +189,7 @@ class EmulatedDriver(_Driver): ...@@ -502,7 +189,7 @@ class EmulatedDriver(_Driver):
raise ValueError(f"Subscription path '{resource_key}' not found in tree.") raise ValueError(f"Subscription path '{resource_key}' not found in tree.")
if parent: if parent:
parent.parent.children = tuple(child for child in parent.parent.children if child != parent) parent.parent.children = tuple(child for child in parent.parent.children if child != parent)
self.logger.info(f"Automatically removed subscription from subscription_tree for {resource_key} after job termination by listener.") self.logger.warning(f"Automatically removed subscription from subscription_tree for {resource_key} after job termination by listener. Maybe due to timeout.")
except Exception as e: except Exception as e:
self.logger.warning(f"Failed to remove subscription for {resource_key}: {e}") self.logger.warning(f"Failed to remove subscription for {resource_key}: {e}")
...@@ -511,7 +198,7 @@ class EmulatedDriver(_Driver): ...@@ -511,7 +198,7 @@ class EmulatedDriver(_Driver):
job_id = event.job_id job_id = event.job_id
if job_id: if job_id:
resource_key = job_id.split('-')[0] # Extract resource key from job ID resource_key = job_id.split('-')[0] # Extract resource key from job ID
resource_key = self.validate_resource_key(resource_key) resource_key = self._helper_methods.validate_resource_key(resource_key)
subscription_path = resource_key.split('/') subscription_path = resource_key.split('/')
parent = self._subscriptions parent = self._subscriptions
for part in subscription_path: for part in subscription_path:
...@@ -528,36 +215,238 @@ class EmulatedDriver(_Driver): ...@@ -528,36 +215,238 @@ class EmulatedDriver(_Driver):
# ------------- Event Listeners (END)----------------- # ------------- Event Listeners (END)-----------------
def log_active_jobs(self): #-------------------------------------------------------------------------------------
""" # ------- The below methods are kept for debugging purposes (test-case) only ---------
Logs the IDs of all active jobs. #-------------------------------------------------------------------------------------
This method retrieves the list of active jobs from the scheduler and logs their IDs using the logger.
""" # This method can be commented but this will arise an error in the test-case (@pytest.fixture --> connected_configured_driver()).
self._require_connection() def SetConfig(self, resources: dict) -> List[Union[bool, Exception]]: # For debugging purposes.
jobs = self._scheduler.get_jobs()
self.logger.info(f"Active jobs: {[job.id for job in jobs]}")
def print_config_tree(self):
"""
Reads the configuration using GetConfig and prints it as a hierarchical tree structure.
For debugging purposes.
"""
self._require_connection() self._require_connection()
results = []
# if not isinstance(resources, dict):
# self.logger.error("Invalid configuration format: resources must be a dictionary.")
# raise ValueError("Invalid configuration format. Must be a dictionary.")
if 'config_rules' not in resources or not isinstance(resources['config_rules'], list):
self.logger.error("Invalid configuration format: 'config_rules' key missing or not a list.")
raise ValueError("Invalid configuration format. Must contain a 'config_rules' key with a list of rules.")
for rule in resources['config_rules']:
try:
if 'action' not in rule or 'custom' not in rule:
raise ValueError(f"Invalid rule format: {rule}")
action = rule['action']
custom = rule['custom']
resource_key = custom.get('resource_key')
resource_value = custom.get('resource_value')
if not resource_key:
raise ValueError(f"Resource key is missing in rule: {rule}")
if resource_value is None:
raise ValueError(f"Resource value is None for key: {resource_key}")
if not resource_key:
raise ValueError(f"Resource key is missing in rule: {rule}")
if action == 1: # Set action
resource_path = self._helper_methods._parse_resource_key(resource_key)
# self.logger.info(f"1. Setting configuration for resource key {resource_key} and resource_path: {resource_path}")
parent = self._running_config
for part in resource_path[:-1]:
if '[' in part and ']' in part:
base, index = part.split('[', 1)
index = index.rstrip(']')
parent = self._helper_methods._find_or_create_node(index, self._helper_methods._find_or_create_node(base, parent))
# self.logger.info(f"2a. Creating node: {base}, {index}, {parent}")
elif resource_path[-1] != 'settings':
# self.logger.info(f"2b. Creating node: {part}")
parent = self._helper_methods._find_or_create_node(part, parent)
final_part = resource_path[-1]
if final_part in ['address', 'port']:
self._helper_methods._create_or_update_node(final_part, parent, resource_value)
self.logger.info(f"Configured: {resource_key} = {resource_value}")
if resource_key.startswith("_connect/settings"):
parent = self._helper_methods._find_or_create_node("_connect", self._running_config)
settings_node = self._helper_methods._find_or_create_node("settings", parent)
settings_node.value = None # Ensure settings node has None value
endpoints_node = self._helper_methods._find_or_create_node("endpoints", settings_node)
for endpoint in resource_value.get("endpoints", []):
uuid = endpoint.get("uuid")
uuid = uuid.replace('/', '_') if uuid else None
if uuid:
# self.logger.info(f"3. Creating endpoint: {uuid}, {endpoint}, {endpoints_node}")
self._helper_methods._create_or_update_node(uuid, endpoints_node, endpoint)
self.logger.info(f"Configured endpoint: {uuid} : {endpoint}")
elif resource_key.startswith("/interface"):
interface_parent = self._helper_methods._find_or_create_node("interface", self._running_config)
name = resource_value.get("name")
name = name.replace('/', '_') if name else None
if name:
self._helper_methods._create_or_update_node(name, interface_parent, resource_value)
self.logger.info(f"Configured interface: {name} : {resource_value}")
# self.logger.info(f"4. Configured interface: {name}")
results.append(True)
else:
raise ValueError(f"Unsupported action '{action}' in rule: {rule}")
if resource_value is None:
raise ValueError(f"Resource value is None for key: {resource_key}")
except Exception as e:
self.logger.exception(f"Failed to apply rule: {rule}")
results.append(e)
return results
#-----------------------------------
# ------- EXTRA Methods ------------
#-----------------------------------
# def log_active_jobs(self): # For debugging purposes.
# """
# Logs the IDs of all active jobs.
# This method retrieves the list of active jobs from the scheduler and logs their IDs using the logger.
# """
# self._require_connection()
# jobs = self._scheduler.get_jobs()
# self.logger.info(f"Active jobs: {[job.id for job in jobs]}")
# def print_config_tree(self): # For debugging purposes.
# """
# Reads the configuration using GetConfig and prints it as a hierarchical tree structure.
# """
# self._require_connection()
# def print_tree(node, indent=""):
# """
# Recursively prints the configuration tree.
# Args:
# node (Node): The current node to print.
# indent (str): The current indentation level.
# """
# if node.name != "root": # Skip the root node's name
# value = getattr(node, "value", None)
# print(f"{indent}- {node.name}: {json.loads(value) if value else ''}")
# for child in node.children:
# print_tree(child, indent + " ")
# print("Configuration Tree:")
# print_tree(self._running_config)
# def GetInitialConfig(self) -> List[Tuple[str, Any]]: # comment
# self._require_connection()
# results = []
# for node in self._initial_config.descendants:
# value = getattr(node, "value", None)
# results.append((node.name, json.loads(value) if value else None))
# self.logger.info("Retrieved initial configurations")
# return results
# def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, dict, Exception]]]: # comment
# """
# Retrieves the configuration for the specified resource keys.
# If no keys are provided, returns the full configuration tree.
def print_tree(node, indent=""): # Args:
""" # resource_keys (List[str]): A list of keys specifying the configuration to retrieve.
Recursively prints the configuration tree.
Args:
node (Node): The current node to print.
indent (str): The current indentation level.
"""
if node.name != "root": # Skip the root node's name
value = getattr(node, "value", None)
print(f"{indent}- {node.name}: {json.loads(value) if value else ''}")
for child in node.children: # Returns:
print_tree(child, indent + " ") # List[Tuple[str, Union[Any, dict, Exception]]]: A list of tuples with the resource key and its value,
# subtree, or an exception.
# """
# self._require_connection()
# results = []
# try:
# if not resource_keys:
# # If no specific keys are provided, return the full configuration tree
# full_tree = self._helper_methods._generate_subtree(self._running_config)
# # full_tree = self._generate_subtree(self._running_config)
# return [("full_configuration", full_tree)]
# for key in resource_keys:
# try:
# # Parse the resource key
# resource_path = self._helper_methods.(key)
# self.logger.info(f"1. Retrieving configuration for resource path : {resource_path}")
# # Navigate to the node corresponding to the key
# parent = self._running_config
# for part in resource_path:
# parent = self._find_or_raise_node(part, parent)
# # Check if the node has a value
# value = getattr(parent, "value", None)
# if value:
# # If a value exists, return it
# results.append((key, json.loads(value)))
# else:
# # If no value, return the subtree of this node
# subtree = self._helper_methods._generate_subtree(parent)
# # subtree = self._generate_subtree(parent)
# results.append((key, subtree))
# except Exception as e:
# self.logger.exception(f"Failed to retrieve configuration for key: {key}")
# results.append((key, e))
# except Exception as e:
# self.logger.exception("Failed to retrieve configurations")
# results.append(("Error", e))
# return results
# def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: # comment
# self._require_connection()
# results = []
# for key in resources:
# try:
# # Parse resource key into parts, handling brackets correctly
# resource_path = self._helper_methods.(key)
# parent = self._running_config
# for part in resource_path:
# parent = self._find_or_raise_node(part, parent)
# # Delete the final node
# node_to_delete = parent
# parent = node_to_delete.parent
# parent.children = tuple(child for child in parent.children if child != node_to_delete)
# self.logger.info(f"Deleted configuration for key: {key}")
# # Handle endpoints structure
# if "interface" in key and "settings" in key:
# interface_name = key.split('[')[-1].split(']')[0]
# endpoints_parent = self._find_or_raise_node("_connect", self._running_config)
# endpoints_node = self._find_or_raise_node("endpoints", endpoints_parent)
# endpoint_to_delete = next((child for child in endpoints_node.children if child.name == interface_name), None)
# if endpoint_to_delete:
# endpoints_node.children = tuple(child for child in endpoints_node.children if child != endpoint_to_delete)
# self.logger.info(f"Removed endpoint entry for interface '{interface_name}'")
# # Check if parent has no more children and is not the root
# while parent and parent.name != "root" and not parent.children:
# node_to_delete = parent
# parent = node_to_delete.parent
# parent.children = tuple(child for child in parent.children if child != node_to_delete)
# self.logger.info(f"Deleted empty parent node: {node_to_delete.name}")
# results.append(True)
# except Exception as e:
# self.logger.exception(f"Failed to delete configuration for key: {key}")
# results.append(e)
# return results
print("Configuration Tree:")
print_tree(self._running_config)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from anytree import Node
import json
from typing import Any, List
class EmulatedDriverHelper:
"""
Helper class for the emulated driver.
"""
def __init__(self):
pass
def validate_resource_key(self, key: str) -> str:
"""
Splits the input string into two parts:
- The first part is '_connect/settings/endpoints/'.
- The second part is the remaining string after the first part, with '/' replaced by '_'.
Args:
key (str): The input string to process.
Returns:
str: A single string with the processed result.
"""
prefix = '_connect/settings/endpoints/'
if not key.startswith(prefix):
raise ValueError(f"The input path '{key}' does not start with the expected prefix: {prefix}")
second_part = key[len(prefix):]
second_part_processed = second_part.replace('/', '_')
validated_key = prefix + second_part_processed
return validated_key
#--------------------------------------------------------------------------------------
# ------- Below function is kept for debugging purposes (test-cases) only -------------
#--------------------------------------------------------------------------------------
# This below methods can be commented but are called by the SetConfig method in EmulatedDriver.py
def _find_or_create_node(self, name: str, parent: Node) -> Node:
"""
Finds or creates a node with the given name under the specified parent.
Args:
name (str): The name of the node to find or create.
parent (Node): The parent node.
Returns:
Node: The found or created node.
"""
node = next((child for child in parent.children if child.name == name), None)
if not node:
node = Node(name, parent=parent)
return node
def _create_or_update_node(self, name: str, parent: Node, value: Any):
"""
Creates or updates a node with the given name and value under the specified parent.
Args:
name (str): The name of the node.
parent (Node): The parent node.
value (Any): The value to set on the node.
"""
node = next((child for child in parent.children if child.name == name), None)
if node:
node.value = json.dumps(value)
else:
Node(name, parent=parent, value=json.dumps(value))
def _parse_resource_key(self, resource_key: str) -> List[str]:
"""
Parses the resource key into parts, correctly handling brackets.
Args:
resource_key (str): The resource key to parse.
Returns:
List[str]: A list of parts from the resource key.
"""
resource_path = []
current_part = ""
in_brackets = False
if not resource_key.startswith('/interface'):
for char in resource_key.strip('/'):
if char == '[':
in_brackets = True
current_part += char
elif char == ']':
in_brackets = False
current_part += char
elif char == '/' and not in_brackets:
resource_path.append(current_part)
current_part = ""
else:
current_part += char
if current_part:
resource_path.append(current_part)
return resource_path
else:
resource_path = resource_key.strip('/').split('/', 1)
if resource_path[1] == 'settings':
return resource_path
else:
resource_path = [resource_key.strip('/').split('[')[0].strip('/'), resource_key.strip('/').split('[')[1].split(']')[0].replace('/', '_')]
return resource_path
#-----------------------------------
# ------- EXTRA Methods ------------
#-----------------------------------
# def _generate_subtree(self, node: Node) -> dict:
# """
# Generates a subtree of the configuration tree starting from the specified node.
# Args:
# node (Node): The node from which to generate the subtree.
# Returns:
# dict: The subtree as a dictionary.
# """
# subtree = {}
# for child in node.children:
# if child.children:
# subtree[child.name] = self._generate_subtree(child)
# else:
# value = getattr(child, "value", None)
# subtree[child.name] = json.loads(value) if value else None
# return subtree
# def _find_or_raise_node(self, name: str, parent: Node) -> Node:
# """
# Finds a node with the given name under the specified parent or raises an exception if not found.
# Args:
# name (str): The name of the node to find.
# parent (Node): The parent node.
# Returns:
# Node: The found node.
# Raises:
# ValueError: If the node is not found.
# """
# node = next((child for child in parent.children if child.name == name), None)
# if not node:
# raise ValueError(f"Node '{name}' not found under parent '{parent.name}'.")
# return node
...@@ -52,38 +52,39 @@ def test_disconnect(setup_driver): ...@@ -52,38 +52,39 @@ def test_disconnect(setup_driver):
assert driver.Disconnect() is True assert driver.Disconnect() is True
assert driver.connected is False assert driver.connected is False
def test_set_config(setup_driver): # def test_set_config(setup_driver):
logger.info(">>> test_set_config <<<") # logger.info(">>> test_set_config <<<")
driver = setup_driver # driver = setup_driver
driver.Connect() # driver.Connect()
config = create_test_configuration() # config = create_test_configuration()
results = driver.SetConfig(config) # results = driver.SetConfig(config)
assert all(result is True for result in results) # assert all(result is True for result in results)
def test_get_config(connected_configured_driver): # def test_get_config(connected_configured_driver):
logger.info(">>> test_get_config <<<") # logger.info(">>> test_get_config <<<")
resource_keys = create_specific_config_keys() # resource_keys = create_specific_config_keys()
results = connected_configured_driver.GetConfig(resource_keys) # results = connected_configured_driver.GetConfig(resource_keys)
for key, value in results: # for key, value in results:
assert key in create_specific_config_keys() # assert key in create_specific_config_keys()
assert value is not None # assert value is not None
def test_delete_config(connected_configured_driver): # def test_delete_config(connected_configured_driver):
logger.info(">>> test_delete_config <<<") # logger.info(">>> test_delete_config <<<")
resource_keys = create_config_for_delete() # resource_keys = create_config_for_delete()
results = connected_configured_driver.DeleteConfig(resource_keys) # results = connected_configured_driver.DeleteConfig(resource_keys)
assert all(result is True for result in results) # assert all(result is True for result in results)
def test_subscribe_state(connected_configured_driver): def test_subscribe_state(connected_configured_driver):
logger.info(">>> test_subscribe_state <<<") logger.info(">>> test_subscribe_state <<<")
subscriptions = create_test_subscriptions() subscriptions = create_test_subscriptions()
results = connected_configured_driver.SubscribeState(subscriptions) results = connected_configured_driver.SubscribeState(subscriptions)
assert all(result is True for result in results) # logger.info(f"Subscribed result: {results}.")
assert results == [False, True, True] # all(result is True for result in results)
def test_unsubscribe_state(connected_configured_driver): def test_unsubscribe_state(connected_configured_driver):
logger.info(">>> test_unsubscribe_state <<<") logger.info(">>> test_unsubscribe_state <<<")
...@@ -91,15 +92,15 @@ def test_unsubscribe_state(connected_configured_driver): ...@@ -91,15 +92,15 @@ def test_unsubscribe_state(connected_configured_driver):
connected_configured_driver.SubscribeState(subscriptions) connected_configured_driver.SubscribeState(subscriptions)
results = connected_configured_driver.UnsubscribeState(subscriptions) results = connected_configured_driver.UnsubscribeState(subscriptions)
assert all(result is True for result in results) assert results == [False, True, True] # all(result is True for result in results)
def test_get_state(connected_configured_driver): def test_get_state(connected_configured_driver):
logger.info(">>> test_get_state <<<") logger.info(">>> test_get_state <<<")
subscriptions = create_test_subscriptions() subscriptions = create_test_subscriptions()
connected_configured_driver.SubscribeState(subscriptions) connected_configured_driver.SubscribeState(subscriptions)
logger.info(f"Subscribed to state: {subscriptions}. waiting for 3 seconds ...") logger.info(f"Subscribed to state: {subscriptions}. waiting for 12 seconds ...")
time.sleep(3) time.sleep(12)
state_iterator = connected_configured_driver.GetState(blocking=False) state_iterator = connected_configured_driver.GetState(blocking=False)
states = list(state_iterator) states = list(state_iterator)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment