Skip to content
Snippets Groups Projects
EmulatedCollector.py 22.6 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    import pytz
    import queue
    import logging
    import uuid
    import json
    
    from anytree import Node, Resolver
    from apscheduler.events import EVENT_JOB_ADDED, EVENT_JOB_REMOVED
    from apscheduler.schedulers.background import BackgroundScheduler
    from apscheduler.jobstores.memory import MemoryJobStore
    from apscheduler.executors.pool import ThreadPoolExecutor
    from datetime import datetime, timedelta
    from typing import Any, Iterator, List, Tuple, Union, Optional
    
    from telemetry.backend.collector_api._Collector import _Collector
    from .EmulatedHelper import EmulatedCollectorHelper
    
    from .SyntheticMetricsGenerator import SyntheticMetricsGenerator
    
    class EmulatedCollector(_Collector):
    
        EmulatedCollector is a class that simulates a network collector for testing purposes.
    
        It provides functionalities to manage configurations, state subscriptions, and synthetic data generation.
        """
        def __init__(self, address: str, port: int, **settings):
    
            super().__init__('emulated_collector', address, port, **settings)
    
            self._initial_config = Node('root')                 # Tree structure for initial config
            self._running_config = Node('root')                 # Tree structure for running config
            self._subscriptions  = Node('subscriptions')        # Tree for state subscriptions
            self._resolver       = Resolver()                   # For path resolution in tree structures
            self._out_samples    = queue.Queue()                # Queue to hold synthetic state samples
    
            self._synthetic_data = SyntheticMetricsGenerator(metric_queue=self._out_samples)  # Placeholder for synthetic data generator
            self._scheduler      = BackgroundScheduler(daemon=True)
            self._scheduler.configure(
                jobstores = {'default': MemoryJobStore()},
                executors = {'default': ThreadPoolExecutor(max_workers=1)},
                timezone  = pytz.utc
            )
            self._scheduler.add_listener(self._listener_job_added_to_subscription_tree,     EVENT_JOB_ADDED)
            self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED)
    
            self._helper_methods = EmulatedCollectorHelper()
    
    
            self.logger    = logging.getLogger(__name__)
    
            self.connected = False          # To track connection state
    
            self.logger.info("EmulatedCollector initialized")
    
    
        def Connect(self) -> bool:
            self.logger.info(f"Connecting to {self.address}:{self.port}")
            self.connected = True
            self._scheduler.start()
            self.logger.info(f"Successfully connected to {self.address}:{self.port}")
            return True
    
        def Disconnect(self) -> bool:
            self.logger.info(f"Disconnecting from {self.address}:{self.port}")
            if not self.connected:
    
                self.logger.warning("Collector is not connected. Nothing to disconnect.")
    
                return False
            self._scheduler.shutdown()
            self.connected = False
            self.logger.info(f"Successfully disconnected from {self.address}:{self.port}")
            return True
    
        def _require_connection(self):
            if not self.connected:
    
                raise RuntimeError("Collector is not connected. Please connect before performing operations.")
    
    
        def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
            self._require_connection()
            results = []
            for resource_key, duration, interval in subscriptions:
    
                resource_key = self._helper_methods.validate_resource_key(resource_key)    # Validate the endpoint name
    
                self.logger.info(f"1. Subscribing to {resource_key} with duration {duration}s and interval {interval}s")
                try:
                    self._resolver.get(self._running_config, resource_key)   # Verify if the resource key exists in the running configuration
                    self.logger.info(f"Resource key {resource_key} exists in the configuration.")
                    resource_value = json.loads(self._resolver.get(self._running_config, resource_key).value) 
                    if resource_value is not None:
                        sample_type_ids = resource_value['sample_types']
                        self.logger.info(f"Sample type IDs for {resource_key}: {sample_type_ids}")
    
                        if len(sample_type_ids) == 0:
                            self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.")
                            results.append(False)
                            continue
    
                    else:
                        self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.")
    
                        results.append(False)
    
                        continue
                    # Add the job to the scheduler
                    job_id = f"{resource_key}-{uuid.uuid4()}"
                    self._scheduler.add_job(
                        self._generate_sample,
                        'interval',
                        seconds=interval,
                        args=[resource_key, sample_type_ids],
                        id=job_id,
                        replace_existing=True,
                        end_date=datetime.now(pytz.utc) + timedelta(seconds=duration)
                    )
                    self.logger.info(f"Job added to scheduler for resource key {resource_key} with duration {duration}s and interval {interval}s")
                    results.append(True)
                except Exception as e:
                    self.logger.error(f"Failed to verify resource key or add job: {e}")
                    results.append(e)
            return results
    
        def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
            self._require_connection()
            results = []
            for resource_key, _, _ in subscriptions:
    
                resource_key = self._helper_methods.validate_resource_key(resource_key)
    
                try:
                    # Check if job exists
                    job_ids = [job.id for job in self._scheduler.get_jobs() if resource_key in job.id]
                    if not job_ids:
                        self.logger.warning(f"No active jobs found for {resource_key}. It might have already terminated.")
                        results.append(False)
                        continue
                    # Remove jobs
                    for job_id in job_ids:
                        self._scheduler.remove_job(job_id)
                    
                    self.logger.info(f"Unsubscribed from {resource_key} with job IDs: {job_ids}")
                    results.append(True)
                except Exception as e:
                    self.logger.exception(f"Failed to unsubscribe from {resource_key}")
                    results.append(e)
            return results
    
    
        def GetState(self, blocking: bool = False, terminate: Optional[queue.Queue] = None) -> Iterator[Tuple[float, str, Any]]:
    
            self._require_connection()
            start_time = datetime.now(pytz.utc)
            duration = 10  # Duration of the subscription in seconds (as an example)
    
            while True:
                try:
                    if terminate and not terminate.empty():
                        self.logger.info("Termination signal received, stopping GetState")
                        break
    
                    elapsed_time = (datetime.now(pytz.utc) - start_time).total_seconds()
                    if elapsed_time >= duration:
                        self.logger.info("Duration expired, stopping GetState")
                        break
    
                    sample = self._out_samples.get(block=blocking, timeout=1 if blocking else 0.1)
                    self.logger.info(f"Retrieved state sample: {sample}")
                    yield sample
                except queue.Empty:
                    if not blocking:
                        self.logger.info("No more samples in queue, exiting GetState")
                        return None
    
        def _generate_sample(self, resource_key: str, sample_type_ids : List[int]):
            # Simulate generating a sample for the resource key
            self.logger.debug(f"Executing _generate_sample for resource: {resource_key}")
            sample = self._synthetic_data.generate_synthetic_data_point(resource_key, sample_type_ids)
            self._out_samples.put(sample)
    
    # ------------- Event Listeners (START)-----------------
    
        def _listener_job_removed_from_subscription_tree(self, event):
            if event.job_id:
                # Extract the resource key from the job ID
                resource_key = event.job_id.split('-')[0]
    
                resource_key = self._helper_methods.validate_resource_key(resource_key)
    
                
                # Remove the subscription from the tree
                try:
                    subscription_path = resource_key.split('/')
                    parent = self._subscriptions
                    for part in subscription_path:
                        parent = next((child for child in parent.children if child.name == part), None)
                        if not parent:
                            raise ValueError(f"Subscription path '{resource_key}' not found in tree.")
                    if parent:
                        parent.parent.children = tuple(child for child in parent.parent.children if child != parent)
    
                        self.logger.warning(f"Automatically removed subscription from subscription_tree for {resource_key} after job termination by listener. Maybe due to timeout.")
    
                except Exception as e:
                    self.logger.warning(f"Failed to remove subscription for {resource_key}: {e}")
    
        def _listener_job_added_to_subscription_tree(self, event):
            try:
                job_id = event.job_id
                if job_id:
                    resource_key = job_id.split('-')[0]  # Extract resource key from job ID
    
                    resource_key = self._helper_methods.validate_resource_key(resource_key)
    
                    subscription_path = resource_key.split('/')
                    parent = self._subscriptions
                    for part in subscription_path:
                        node = next((child for child in parent.children if child.name == part), None)
                        if not node:
                            node = Node(part, parent=parent)
                        parent = node
                    parent.value = {
                        "job_id": job_id
                    }
                    self.logger.info(f"Automatically added subscription for {resource_key} to the subscription_tree by listener.")
            except Exception as e:
                self.logger.exception("Failed to add subscription to the tree")
    
    # ------------- Event Listeners (END)-----------------
    
    
    #-------------------------------------------------------------------------------------
    # ------- The below methods are kept for debugging purposes (test-case) only ---------
    #-------------------------------------------------------------------------------------
    
    
    #  This method can be commented but this will arise an error in the test-case (@pytest.fixture --> connected_configured_collector()).
    
        def SetConfig(self, resources: dict) -> List[Union[bool, Exception]]:  # For debugging purposes.
    
            self._require_connection()
    
            results = []
    
            # if not isinstance(resources, dict):
            #     self.logger.error("Invalid configuration format: resources must be a dictionary.")
            #     raise ValueError("Invalid configuration format. Must be a dictionary.")
            if 'config_rules' not in resources or not isinstance(resources['config_rules'], list):
                self.logger.error("Invalid configuration format: 'config_rules' key missing or not a list.")
                raise ValueError("Invalid configuration format. Must contain a 'config_rules' key with a list of rules.")
    
            for rule in resources['config_rules']:
                try:
                    if 'action' not in rule or 'custom' not in rule:
                        raise ValueError(f"Invalid rule format: {rule}")
    
                    action = rule['action']
                    custom = rule['custom']
                    resource_key = custom.get('resource_key')
                    resource_value = custom.get('resource_value')
    
                    if not resource_key:
                        raise ValueError(f"Resource key is missing in rule: {rule}")
    
                    if resource_value is None:
                        raise ValueError(f"Resource value is None for key: {resource_key}")
                    if not resource_key:
                        raise ValueError(f"Resource key is missing in rule: {rule}")
    
                    if action == 1:  # Set action
                        resource_path = self._helper_methods._parse_resource_key(resource_key)
                        # self.logger.info(f"1. Setting configuration for resource key {resource_key} and resource_path: {resource_path}")
                        parent = self._running_config
    
                        for part in resource_path[:-1]:
                            if '[' in part and ']' in part:
                                base, index = part.split('[', 1)
                                index = index.rstrip(']')
                                parent = self._helper_methods._find_or_create_node(index, self._helper_methods._find_or_create_node(base, parent))
                                # self.logger.info(f"2a. Creating node: {base}, {index}, {parent}")
                            elif resource_path[-1] != 'settings':
                                # self.logger.info(f"2b. Creating node: {part}")
                                parent = self._helper_methods._find_or_create_node(part, parent)
    
                        final_part = resource_path[-1]
                        if final_part in ['address', 'port']: 
                            self._helper_methods._create_or_update_node(final_part, parent, resource_value)
                            self.logger.info(f"Configured: {resource_key} = {resource_value}")
    
                        if resource_key.startswith("_connect/settings"):
                            parent = self._helper_methods._find_or_create_node("_connect", self._running_config)
                            settings_node = self._helper_methods._find_or_create_node("settings", parent)
                            settings_node.value = None  # Ensure settings node has None value
                            endpoints_node = self._helper_methods._find_or_create_node("endpoints", settings_node)
    
                            for endpoint in resource_value.get("endpoints", []):
                                uuid = endpoint.get("uuid")
                                uuid = uuid.replace('/', '_') if uuid else None
                                if uuid:
                                    # self.logger.info(f"3. Creating endpoint: {uuid}, {endpoint}, {endpoints_node}")
                                    self._helper_methods._create_or_update_node(uuid, endpoints_node, endpoint)
                                    self.logger.info(f"Configured endpoint: {uuid} : {endpoint}")
    
                        elif resource_key.startswith("/interface"):
                            interface_parent = self._helper_methods._find_or_create_node("interface", self._running_config)
                            name = resource_value.get("name")
                            name = name.replace('/', '_') if name else None
                            if name:
                                self._helper_methods._create_or_update_node(name, interface_parent, resource_value)
                                self.logger.info(f"Configured interface: {name} : {resource_value}")
                                # self.logger.info(f"4. Configured interface: {name}")
    
                        results.append(True)
                    else:
                        raise ValueError(f"Unsupported action '{action}' in rule: {rule}")
    
                    if resource_value is None:
                        raise ValueError(f"Resource value is None for key: {resource_key}")
    
                except Exception as e:
                    self.logger.exception(f"Failed to apply rule: {rule}")
                    results.append(e)
    
            return results
    
    #-----------------------------------
    # ------- EXTRA Methods ------------
    #-----------------------------------
    
        # def log_active_jobs(self):              # For debugging purposes.
        #     """
        #     Logs the IDs of all active jobs.
        #     This method retrieves the list of active jobs from the scheduler and logs their IDs using the logger.
        #     """
        #     self._require_connection()
        #     jobs = self._scheduler.get_jobs()
        #     self.logger.info(f"Active jobs: {[job.id for job in jobs]}")
    
        # def print_config_tree(self):            # For debugging purposes.
        #     """
        #     Reads the configuration using GetConfig and prints it as a hierarchical tree structure.
        #     """
        #     self._require_connection()
    
        #     def print_tree(node, indent=""):
        #         """
        #         Recursively prints the configuration tree.
    
        #         Args:
        #             node (Node): The current node to print.
        #             indent (str): The current indentation level.
        #         """
        #         if node.name != "root":  # Skip the root node's name
        #             value = getattr(node, "value", None)
        #             print(f"{indent}- {node.name}: {json.loads(value) if value else ''}")
                
        #         for child in node.children:
        #             print_tree(child, indent + "  ")
    
        #     print("Configuration Tree:")
        #     print_tree(self._running_config)
    
    
        # def GetInitialConfig(self) -> List[Tuple[str, Any]]:    # comment
        #     self._require_connection()
        #     results = []
        #     for node in self._initial_config.descendants:
        #         value = getattr(node, "value", None)
        #         results.append((node.name, json.loads(value) if value else None))
        #     self.logger.info("Retrieved initial configurations")
        #     return results
    
        # def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, dict, Exception]]]:    # comment
        #     """
        #     Retrieves the configuration for the specified resource keys.
        #     If no keys are provided, returns the full configuration tree.
    
        #     Args:
        #         resource_keys (List[str]): A list of keys specifying the configuration to retrieve.
    
        #     Returns:
        #         List[Tuple[str, Union[Any, dict, Exception]]]: A list of tuples with the resource key and its value, 
        #                                                     subtree, or an exception.
        #     """
        #     self._require_connection()
        #     results = []
    
        #     try:
        #         if not resource_keys:
        #             # If no specific keys are provided, return the full configuration tree
    
        #             full_tree = self._helper_methods._generate_subtree(self._running_config)
        #             # full_tree = self._generate_subtree(self._running_config)
        #             return [("full_configuration", full_tree)]
    
        #         for key in resource_keys:
        #             try:
        #                 # Parse the resource key
        #                 resource_path = self._helper_methods.(key)
        #                 self.logger.info(f"1. Retrieving configuration for resource path : {resource_path}")
    
        #                 # Navigate to the node corresponding to the key
        #                 parent = self._running_config
        #                 for part in resource_path:
        #                     parent = self._find_or_raise_node(part, parent)
    
        #                 # Check if the node has a value
        #                 value = getattr(parent, "value", None)
        #                 if value:
        #                     # If a value exists, return it
        #                     results.append((key, json.loads(value)))
        #                 else:
        #                     # If no value, return the subtree of this node
        #                     subtree = self._helper_methods._generate_subtree(parent)
        #                     # subtree = self._generate_subtree(parent)
        #                     results.append((key, subtree))
    
        #             except Exception as e:
        #                 self.logger.exception(f"Failed to retrieve configuration for key: {key}")
        #                 results.append((key, e))
    
        #     except Exception as e:
        #         self.logger.exception("Failed to retrieve configurations")
        #         results.append(("Error", e))
    
        #     return results
    
        # def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: # comment
        #     self._require_connection()
        #     results = []
    
        #     for key in resources:
        #         try:
        #             # Parse resource key into parts, handling brackets correctly
        #             resource_path = self._helper_methods.(key)
    
        #             parent = self._running_config
        #             for part in resource_path:
        #                 parent = self._find_or_raise_node(part, parent)
    
        #             # Delete the final node
        #             node_to_delete = parent
        #             parent = node_to_delete.parent
        #             parent.children = tuple(child for child in parent.children if child != node_to_delete)
        #             self.logger.info(f"Deleted configuration for key: {key}")
    
        #             # Handle endpoints structure
        #             if "interface" in key and "settings" in key:
        #                 interface_name = key.split('[')[-1].split(']')[0]
        #                 endpoints_parent = self._find_or_raise_node("_connect", self._running_config)
        #                 endpoints_node = self._find_or_raise_node("endpoints", endpoints_parent)
        #                 endpoint_to_delete = next((child for child in endpoints_node.children if child.name == interface_name), None)
        #                 if endpoint_to_delete:
        #                     endpoints_node.children = tuple(child for child in endpoints_node.children if child != endpoint_to_delete)
        #                     self.logger.info(f"Removed endpoint entry for interface '{interface_name}'")
    
        #             # Check if parent has no more children and is not the root
        #             while parent and parent.name != "root" and not parent.children:
        #                 node_to_delete = parent
        #                 parent = node_to_delete.parent
        #                 parent.children = tuple(child for child in parent.children if child != node_to_delete)
        #                 self.logger.info(f"Deleted empty parent node: {node_to_delete.name}")
    
        #             results.append(True)
        #         except Exception as e:
        #             self.logger.exception(f"Failed to delete configuration for key: {key}")
        #             results.append(e)
    
        #     return results