# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import pytz import queue import logging import uuid import json from anytree import Node, Resolver from apscheduler.events import EVENT_JOB_ADDED, EVENT_JOB_REMOVED from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor from datetime import datetime, timedelta from typing import Any, Iterator, List, Tuple, Union, Optional from telemetry.backend.collector_api._Collector import _Collector from .EmulatedHelper import EmulatedCollectorHelper from .SyntheticMetricsGenerator import SyntheticMetricsGenerator class EmulatedCollector(_Collector): """ EmulatedCollector is a class that simulates a network collector for testing purposes. It provides functionalities to manage configurations, state subscriptions, and synthetic data generation. """ def __init__(self, address: str, port: int, **settings): super().__init__('emulated_collector', address, port, **settings) self._initial_config = Node('root') # Tree structure for initial config self._running_config = Node('root') # Tree structure for running config self._subscriptions = Node('subscriptions') # Tree for state subscriptions self._resolver = Resolver() # For path resolution in tree structures self._out_samples = queue.Queue() # Queue to hold synthetic state samples self._synthetic_data = SyntheticMetricsGenerator(metric_queue=self._out_samples) # Placeholder for synthetic data generator self._scheduler = BackgroundScheduler(daemon=True) self._scheduler.configure( jobstores = {'default': MemoryJobStore()}, executors = {'default': ThreadPoolExecutor(max_workers=1)}, timezone = pytz.utc ) self._scheduler.add_listener(self._listener_job_added_to_subscription_tree, EVENT_JOB_ADDED) self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED) self._helper_methods = EmulatedCollectorHelper() self.logger = logging.getLogger(__name__) self.connected = False # To track connection state self.logger.info("EmulatedCollector initialized") def Connect(self) -> bool: self.logger.info(f"Connecting to {self.address}:{self.port}") self.connected = True self._scheduler.start() self.logger.info(f"Successfully connected to {self.address}:{self.port}") return True def Disconnect(self) -> bool: self.logger.info(f"Disconnecting from {self.address}:{self.port}") if not self.connected: self.logger.warning("Collector is not connected. Nothing to disconnect.") return False self._scheduler.shutdown() self.connected = False self.logger.info(f"Successfully disconnected from {self.address}:{self.port}") return True def _require_connection(self): if not self.connected: raise RuntimeError("Collector is not connected. Please connect before performing operations.") def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: self._require_connection() results = [] for resource_key, duration, interval in subscriptions: resource_key = self._helper_methods.validate_resource_key(resource_key) # Validate the endpoint name self.logger.info(f"1. Subscribing to {resource_key} with duration {duration}s and interval {interval}s") try: self._resolver.get(self._running_config, resource_key) # Verify if the resource key exists in the running configuration self.logger.info(f"Resource key {resource_key} exists in the configuration.") resource_value = json.loads(self._resolver.get(self._running_config, resource_key).value) if resource_value is not None: sample_type_ids = resource_value['sample_types'] self.logger.info(f"Sample type IDs for {resource_key}: {sample_type_ids}") if len(sample_type_ids) == 0: self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.") results.append(False) continue else: self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.") results.append(False) continue # Add the job to the scheduler job_id = f"{resource_key}-{uuid.uuid4()}" self._scheduler.add_job( self._generate_sample, 'interval', seconds=interval, args=[resource_key, sample_type_ids], id=job_id, replace_existing=True, end_date=datetime.now(pytz.utc) + timedelta(seconds=duration) ) self.logger.info(f"Job added to scheduler for resource key {resource_key} with duration {duration}s and interval {interval}s") results.append(True) except Exception as e: self.logger.error(f"Failed to verify resource key or add job: {e}") results.append(e) return results def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: self._require_connection() results = [] for resource_key, _, _ in subscriptions: resource_key = self._helper_methods.validate_resource_key(resource_key) try: # Check if job exists job_ids = [job.id for job in self._scheduler.get_jobs() if resource_key in job.id] if not job_ids: self.logger.warning(f"No active jobs found for {resource_key}. It might have already terminated.") results.append(False) continue # Remove jobs for job_id in job_ids: self._scheduler.remove_job(job_id) self.logger.info(f"Unsubscribed from {resource_key} with job IDs: {job_ids}") results.append(True) except Exception as e: self.logger.exception(f"Failed to unsubscribe from {resource_key}") results.append(e) return results def GetState(self, blocking: bool = False, terminate: Optional[queue.Queue] = None) -> Iterator[Tuple[float, str, Any]]: self._require_connection() start_time = datetime.now(pytz.utc) duration = 10 # Duration of the subscription in seconds (as an example) while True: try: if terminate and not terminate.empty(): self.logger.info("Termination signal received, stopping GetState") break elapsed_time = (datetime.now(pytz.utc) - start_time).total_seconds() if elapsed_time >= duration: self.logger.info("Duration expired, stopping GetState") break sample = self._out_samples.get(block=blocking, timeout=1 if blocking else 0.1) self.logger.info(f"Retrieved state sample: {sample}") yield sample except queue.Empty: if not blocking: self.logger.info("No more samples in queue, exiting GetState") return None def _generate_sample(self, resource_key: str, sample_type_ids : List[int]): # Simulate generating a sample for the resource key self.logger.debug(f"Executing _generate_sample for resource: {resource_key}") sample = self._synthetic_data.generate_synthetic_data_point(resource_key, sample_type_ids) self._out_samples.put(sample) # ------------- Event Listeners (START)----------------- def _listener_job_removed_from_subscription_tree(self, event): if event.job_id: # Extract the resource key from the job ID resource_key = event.job_id.split('-')[0] resource_key = self._helper_methods.validate_resource_key(resource_key) # Remove the subscription from the tree try: subscription_path = resource_key.split('/') parent = self._subscriptions for part in subscription_path: parent = next((child for child in parent.children if child.name == part), None) if not parent: raise ValueError(f"Subscription path '{resource_key}' not found in tree.") if parent: parent.parent.children = tuple(child for child in parent.parent.children if child != parent) self.logger.warning(f"Automatically removed subscription from subscription_tree for {resource_key} after job termination by listener. Maybe due to timeout.") except Exception as e: self.logger.warning(f"Failed to remove subscription for {resource_key}: {e}") def _listener_job_added_to_subscription_tree(self, event): try: job_id = event.job_id if job_id: resource_key = job_id.split('-')[0] # Extract resource key from job ID resource_key = self._helper_methods.validate_resource_key(resource_key) subscription_path = resource_key.split('/') parent = self._subscriptions for part in subscription_path: node = next((child for child in parent.children if child.name == part), None) if not node: node = Node(part, parent=parent) parent = node parent.value = { "job_id": job_id } self.logger.info(f"Automatically added subscription for {resource_key} to the subscription_tree by listener.") except Exception as e: self.logger.exception("Failed to add subscription to the tree") # ------------- Event Listeners (END)----------------- #------------------------------------------------------------------------------------- # ------- The below methods are kept for debugging purposes (test-case) only --------- #------------------------------------------------------------------------------------- # This method can be commented but this will arise an error in the test-case (@pytest.fixture --> connected_configured_collector()). def SetConfig(self, resources: dict) -> List[Union[bool, Exception]]: # For debugging purposes. self._require_connection() results = [] # if not isinstance(resources, dict): # self.logger.error("Invalid configuration format: resources must be a dictionary.") # raise ValueError("Invalid configuration format. Must be a dictionary.") if 'config_rules' not in resources or not isinstance(resources['config_rules'], list): self.logger.error("Invalid configuration format: 'config_rules' key missing or not a list.") raise ValueError("Invalid configuration format. Must contain a 'config_rules' key with a list of rules.") for rule in resources['config_rules']: try: if 'action' not in rule or 'custom' not in rule: raise ValueError(f"Invalid rule format: {rule}") action = rule['action'] custom = rule['custom'] resource_key = custom.get('resource_key') resource_value = custom.get('resource_value') if not resource_key: raise ValueError(f"Resource key is missing in rule: {rule}") if resource_value is None: raise ValueError(f"Resource value is None for key: {resource_key}") if not resource_key: raise ValueError(f"Resource key is missing in rule: {rule}") if action == 1: # Set action resource_path = self._helper_methods._parse_resource_key(resource_key) # self.logger.info(f"1. Setting configuration for resource key {resource_key} and resource_path: {resource_path}") parent = self._running_config for part in resource_path[:-1]: if '[' in part and ']' in part: base, index = part.split('[', 1) index = index.rstrip(']') parent = self._helper_methods._find_or_create_node(index, self._helper_methods._find_or_create_node(base, parent)) # self.logger.info(f"2a. Creating node: {base}, {index}, {parent}") elif resource_path[-1] != 'settings': # self.logger.info(f"2b. Creating node: {part}") parent = self._helper_methods._find_or_create_node(part, parent) final_part = resource_path[-1] if final_part in ['address', 'port']: self._helper_methods._create_or_update_node(final_part, parent, resource_value) self.logger.info(f"Configured: {resource_key} = {resource_value}") if resource_key.startswith("_connect/settings"): parent = self._helper_methods._find_or_create_node("_connect", self._running_config) settings_node = self._helper_methods._find_or_create_node("settings", parent) settings_node.value = None # Ensure settings node has None value endpoints_node = self._helper_methods._find_or_create_node("endpoints", settings_node) for endpoint in resource_value.get("endpoints", []): uuid = endpoint.get("uuid") uuid = uuid.replace('/', '_') if uuid else None if uuid: # self.logger.info(f"3. Creating endpoint: {uuid}, {endpoint}, {endpoints_node}") self._helper_methods._create_or_update_node(uuid, endpoints_node, endpoint) self.logger.info(f"Configured endpoint: {uuid} : {endpoint}") elif resource_key.startswith("/interface"): interface_parent = self._helper_methods._find_or_create_node("interface", self._running_config) name = resource_value.get("name") name = name.replace('/', '_') if name else None if name: self._helper_methods._create_or_update_node(name, interface_parent, resource_value) self.logger.info(f"Configured interface: {name} : {resource_value}") # self.logger.info(f"4. Configured interface: {name}") results.append(True) else: raise ValueError(f"Unsupported action '{action}' in rule: {rule}") if resource_value is None: raise ValueError(f"Resource value is None for key: {resource_key}") except Exception as e: self.logger.exception(f"Failed to apply rule: {rule}") results.append(e) return results #----------------------------------- # ------- EXTRA Methods ------------ #----------------------------------- # def log_active_jobs(self): # For debugging purposes. # """ # Logs the IDs of all active jobs. # This method retrieves the list of active jobs from the scheduler and logs their IDs using the logger. # """ # self._require_connection() # jobs = self._scheduler.get_jobs() # self.logger.info(f"Active jobs: {[job.id for job in jobs]}") # def print_config_tree(self): # For debugging purposes. # """ # Reads the configuration using GetConfig and prints it as a hierarchical tree structure. # """ # self._require_connection() # def print_tree(node, indent=""): # """ # Recursively prints the configuration tree. # Args: # node (Node): The current node to print. # indent (str): The current indentation level. # """ # if node.name != "root": # Skip the root node's name # value = getattr(node, "value", None) # print(f"{indent}- {node.name}: {json.loads(value) if value else ''}") # for child in node.children: # print_tree(child, indent + " ") # print("Configuration Tree:") # print_tree(self._running_config) # def GetInitialConfig(self) -> List[Tuple[str, Any]]: # comment # self._require_connection() # results = [] # for node in self._initial_config.descendants: # value = getattr(node, "value", None) # results.append((node.name, json.loads(value) if value else None)) # self.logger.info("Retrieved initial configurations") # return results # def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, dict, Exception]]]: # comment # """ # Retrieves the configuration for the specified resource keys. # If no keys are provided, returns the full configuration tree. # Args: # resource_keys (List[str]): A list of keys specifying the configuration to retrieve. # Returns: # List[Tuple[str, Union[Any, dict, Exception]]]: A list of tuples with the resource key and its value, # subtree, or an exception. # """ # self._require_connection() # results = [] # try: # if not resource_keys: # # If no specific keys are provided, return the full configuration tree # full_tree = self._helper_methods._generate_subtree(self._running_config) # # full_tree = self._generate_subtree(self._running_config) # return [("full_configuration", full_tree)] # for key in resource_keys: # try: # # Parse the resource key # resource_path = self._helper_methods.(key) # self.logger.info(f"1. Retrieving configuration for resource path : {resource_path}") # # Navigate to the node corresponding to the key # parent = self._running_config # for part in resource_path: # parent = self._find_or_raise_node(part, parent) # # Check if the node has a value # value = getattr(parent, "value", None) # if value: # # If a value exists, return it # results.append((key, json.loads(value))) # else: # # If no value, return the subtree of this node # subtree = self._helper_methods._generate_subtree(parent) # # subtree = self._generate_subtree(parent) # results.append((key, subtree)) # except Exception as e: # self.logger.exception(f"Failed to retrieve configuration for key: {key}") # results.append((key, e)) # except Exception as e: # self.logger.exception("Failed to retrieve configurations") # results.append(("Error", e)) # return results # def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: # comment # self._require_connection() # results = [] # for key in resources: # try: # # Parse resource key into parts, handling brackets correctly # resource_path = self._helper_methods.(key) # parent = self._running_config # for part in resource_path: # parent = self._find_or_raise_node(part, parent) # # Delete the final node # node_to_delete = parent # parent = node_to_delete.parent # parent.children = tuple(child for child in parent.children if child != node_to_delete) # self.logger.info(f"Deleted configuration for key: {key}") # # Handle endpoints structure # if "interface" in key and "settings" in key: # interface_name = key.split('[')[-1].split(']')[0] # endpoints_parent = self._find_or_raise_node("_connect", self._running_config) # endpoints_node = self._find_or_raise_node("endpoints", endpoints_parent) # endpoint_to_delete = next((child for child in endpoints_node.children if child.name == interface_name), None) # if endpoint_to_delete: # endpoints_node.children = tuple(child for child in endpoints_node.children if child != endpoint_to_delete) # self.logger.info(f"Removed endpoint entry for interface '{interface_name}'") # # Check if parent has no more children and is not the root # while parent and parent.name != "root" and not parent.children: # node_to_delete = parent # parent = node_to_delete.parent # parent.children = tuple(child for child in parent.children if child != node_to_delete) # self.logger.info(f"Deleted empty parent node: {node_to_delete.name}") # results.append(True) # except Exception as e: # self.logger.exception(f"Failed to delete configuration for key: {key}") # results.append(e) # return results