diff --git a/my_deploy.sh b/my_deploy.sh index 8d2e733d462ae743c7187eb9b6a58d7da14033a7..4aa4e43b9b8b53cb86c4843bbd70672fbba7f543 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -21,12 +21,13 @@ export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" # Set the list of components, separated by spaces, you want to build images for, and deploy. export TFS_COMPONENTS="context device pathcomp service slice nbi webui" +# export TFS_COMPONENTS="context device" # Uncomment to activate Monitoring (old) #export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring" # Uncomment to activate Monitoring Framework (new) -#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics automation" +export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics automation" # Uncomment to activate QoS Profiles #export TFS_COMPONENTS="${TFS_COMPONENTS} qos_profile" diff --git a/scripts/run_tests_locally-telemetry-emulated.sh b/scripts/run_tests_locally-telemetry-emulated.sh new file mode 100755 index 0000000000000000000000000000000000000000..06d1ffd3751e1cbf7847ed5c11b877d49c77b2e9 --- /dev/null +++ b/scripts/run_tests_locally-telemetry-emulated.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +# RCFILE=$PROJECTDIR/coverage/.coveragerc + +# export KFK_SERVER_ADDRESS='127.0.0.1:9092' +# CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') +# export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require" +RCFILE=$PROJECTDIR/coverage/.coveragerc + + +python3 -m pytest --log-level=debug --log-cli-level=info --verbose \ + telemetry/backend/tests/test_emulated.py diff --git a/src/telemetry/backend/drivers/__init__.py b/src/telemetry/backend/drivers/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..bbfc943b68af13a11e562abbc8680ade71db8f02 --- /dev/null +++ b/src/telemetry/backend/drivers/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/telemetry/backend/drivers/core/_Driver.py b/src/telemetry/backend/drivers/core/_Driver.py new file mode 100644 index 0000000000000000000000000000000000000000..9612952fe4c1da3beb2534f26da68f630f2acacb --- /dev/null +++ b/src/telemetry/backend/drivers/core/_Driver.py @@ -0,0 +1,236 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +from typing import Any, Iterator, List, Optional, Tuple, Union + +# Special resource names to request to the driver to retrieve the specified +# configuration/structural resources. +# These resource names should be used with GetConfig() method. +RESOURCE_ENDPOINTS = '__endpoints__' +RESOURCE_INTERFACES = '__interfaces__' +RESOURCE_NETWORK_INSTANCES = '__network_instances__' +RESOURCE_ROUTING_POLICIES = '__routing_policies__' +RESOURCE_SERVICES = '__services__' +RESOURCE_ACL = '__acl__' +RESOURCE_INVENTORY = '__inventory__' + + +class _Driver: + def __init__(self, name : str, address: str, port: int, **settings) -> None: + """ Initialize Driver. + Parameters: + address : str + The address of the device + port : int + The port of the device + **settings + Extra settings required by the driver. + """ + self._name = name + self._address = address + self._port = port + self._settings = settings + + @property + def name(self): return self._name + + @property + def address(self): return self._address + + @property + def port(self): return self._port + + @property + def settings(self): return self._settings + + def Connect(self) -> bool: + """ Connect to the Device. + Returns: + succeeded : bool + Boolean variable indicating if connection succeeded. + """ + raise NotImplementedError() + + def Disconnect(self) -> bool: + """ Disconnect from the Device. + Returns: + succeeded : bool + Boolean variable indicating if disconnection succeeded. + """ + raise NotImplementedError() + + def GetInitialConfig(self) -> List[Tuple[str, Any]]: + """ Retrieve initial configuration of entire device. + Returns: + values : List[Tuple[str, Any]] + List of tuples (resource key, resource value) for + resource keys. + """ + raise NotImplementedError() + + def GetConfig(self, resource_keys: List[str] = []) -> \ + List[Tuple[str, Union[Any, None, Exception]]]: + """ Retrieve running configuration of entire device or + selected resource keys. + Parameters: + resource_keys : List[str] + List of keys pointing to the resources to be retrieved. + Returns: + values : List[Tuple[str, Union[Any, None, Exception]]] + List of tuples (resource key, resource value) for + resource keys requested. If a resource is found, + the appropriate value type must be retrieved. + If a resource is not found, None must be retrieved as + value for that resource. In case of Exception, + the Exception must be retrieved as value. + """ + raise NotImplementedError() + + def SetConfig(self, resources: List[Tuple[str, Any]]) -> \ + List[Union[bool, Exception]]: + """ Create/Update configuration for a list of resources. + Parameters: + resources : List[Tuple[str, Any]] + List of tuples, each containing a resource_key pointing the + resource to be modified, and a resource_value containing + the new value to be set. + Returns: + results : List[Union[bool, Exception]] + List of results for resource key changes requested. + Return values must be in the same order as the + resource keys requested. If a resource is properly set, + True must be retrieved; otherwise, the Exception that is + raised during the processing must be retrieved. + """ + raise NotImplementedError() + + def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> \ + List[Union[bool, Exception]]: + """ Delete configuration for a list of resources. + Parameters: + resources : List[Tuple[str, Any]] + List of tuples, each containing a resource_key pointing the + resource to be modified, and a resource_value containing + possible additionally required values to locate + the value to be removed. + Returns: + results : List[Union[bool, Exception]] + List of results for resource key deletions requested. + Return values must be in the same order as the resource keys + requested. If a resource is properly deleted, True must be + retrieved; otherwise, the Exception that is raised during + the processing must be retrieved. + """ + raise NotImplementedError() + + def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> \ + List[Union[bool, Exception]]: + """ Subscribe to state information of entire device or + selected resources. Subscriptions are incremental. + Driver should keep track of requested resources. + Parameters: + subscriptions : List[Tuple[str, float, float]] + List of tuples, each containing a resource_key pointing the + resource to be subscribed, a sampling_duration, and a + sampling_interval (both in seconds with float + representation) defining, respectively, for how long + monitoring should last, and the desired monitoring interval + for the resource specified. + Returns: + results : List[Union[bool, Exception]] + List of results for resource key subscriptions requested. + Return values must be in the same order as the resource keys + requested. If a resource is properly subscribed, + True must be retrieved; otherwise, the Exception that is + raised during the processing must be retrieved. + """ + raise NotImplementedError() + + def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) \ + -> List[Union[bool, Exception]]: + """ Unsubscribe from state information of entire device + or selected resources. Subscriptions are incremental. + Driver should keep track of requested resources. + Parameters: + subscriptions : List[str] + List of tuples, each containing a resource_key pointing the + resource to be subscribed, a sampling_duration, and a + sampling_interval (both in seconds with float + representation) defining, respectively, for how long + monitoring should last, and the desired monitoring interval + for the resource specified. + Returns: + results : List[Union[bool, Exception]] + List of results for resource key un-subscriptions requested. + Return values must be in the same order as the resource keys + requested. If a resource is properly unsubscribed, + True must be retrieved; otherwise, the Exception that is + raised during the processing must be retrieved. + """ + raise NotImplementedError() + + def GetState( + self, blocking=False, terminate : Optional[threading.Event] = None + ) -> Iterator[Tuple[float, str, Any]]: + """ Retrieve last collected values for subscribed resources. + Operates as a generator, so this method should be called once and will + block until values are available. When values are available, + it should yield each of them and block again until new values are + available. When the driver is destroyed, GetState() can return instead + of yield to terminate the loop. + Terminate enables to request interruption of the generation. + Examples: + # keep looping waiting for extra samples (generator loop) + terminate = threading.Event() + i = 0 + for timestamp,resource_key,resource_value in my_driver.GetState(blocking=True, terminate=terminate): + process(timestamp, resource_key, resource_value) + i += 1 + if i == 10: terminate.set() + + # just retrieve accumulated samples + samples = my_driver.GetState(blocking=False, terminate=terminate) + # or (as classical loop) + i = 0 + for timestamp,resource_key,resource_value in my_driver.GetState(blocking=False, terminate=terminate): + process(timestamp, resource_key, resource_value) + i += 1 + if i == 10: terminate.set() + Parameters: + blocking : bool + Select the driver behaviour. In both cases, the driver will + first retrieve the samples accumulated and available in the + internal queue. Then, if blocking, the driver does not + terminate the loop and waits for additional samples to come, + thus behaving as a generator. If non-blocking, the driver + terminates the loop and returns. Non-blocking behaviour can + be used for periodically polling the driver, while blocking + can be used when a separate thread is in charge of + collecting the samples produced by the driver. + terminate : threading.Event + Signals the interruption of the GetState method as soon as + possible. + Returns: + results : Iterator[Tuple[float, str, Any]] + Sequences of state sample. Each State sample contains a + float Unix-like timestamps of the samples in seconds with up + to microsecond resolution, the resource_key of the sample, + and its resource_value. + Only resources with an active subscription must be + retrieved. Interval and duration of the sampling process are + specified when creating the subscription using method + SubscribeState(). Order of values yielded is arbitrary. + """ + raise NotImplementedError() diff --git a/src/telemetry/backend/drivers/core/__init__.py b/src/telemetry/backend/drivers/core/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..bbfc943b68af13a11e562abbc8680ade71db8f02 --- /dev/null +++ b/src/telemetry/backend/drivers/core/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/telemetry/backend/drivers/emulated/EmulatedDriver.py b/src/telemetry/backend/drivers/emulated/EmulatedDriver.py new file mode 100644 index 0000000000000000000000000000000000000000..650867dbaa45be2b8279cde89b9e0eabe0d4f370 --- /dev/null +++ b/src/telemetry/backend/drivers/emulated/EmulatedDriver.py @@ -0,0 +1,549 @@ +from telemetry.backend.drivers.core._Driver import _Driver +from anytree import Node, Resolver +from apscheduler.events import EVENT_JOB_ADDED, EVENT_JOB_REMOVED +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.jobstores.memory import MemoryJobStore +from apscheduler.executors.pool import ThreadPoolExecutor +from datetime import datetime, timedelta +from typing import Any, Iterator, List, Tuple, Union, Optional +from .SyntheticMetricsGenerator import SyntheticMetricsGenerator +import pytz +import queue +import logging +import uuid +import json + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +class EmulatedDriver(_Driver): + """ + EmulatedDriver is a class that simulates a network driver for testing purposes. + It provides functionalities to manage configurations, state subscriptions, and synthetic data generation. + """ + def __init__(self, address: str, port: int, **settings): + super().__init__('emulated_driver', address, port, **settings) + self._initial_config = Node('root') # Tree structure for initial config + self._running_config = Node('root') # Tree structure for running config + self._subscriptions = Node('subscriptions') # Tree for state subscriptions + self._resolver = Resolver() # For path resolution in tree structures + self._out_samples = queue.Queue() # Queue to hold synthetic state samples + self._synthetic_data = SyntheticMetricsGenerator(metric_queue=self._out_samples) # Placeholder for synthetic data generator + self._scheduler = BackgroundScheduler(daemon=True) + self._scheduler.configure( + jobstores = {'default': MemoryJobStore()}, + executors = {'default': ThreadPoolExecutor(max_workers=1)}, + timezone = pytz.utc + ) + self._scheduler.add_listener(self._listener_job_added_to_subscription_tree, EVENT_JOB_ADDED) + self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED) + + self.logger = logging.getLogger(__name__) + self.connected = False # To track connection state + self.logger.info("EmulatedDriver initialized") + + def Connect(self) -> bool: + self.logger.info(f"Connecting to {self.address}:{self.port}") + self.connected = True + self._scheduler.start() + self.logger.info(f"Successfully connected to {self.address}:{self.port}") + return True + + def Disconnect(self) -> bool: + self.logger.info(f"Disconnecting from {self.address}:{self.port}") + if not self.connected: + self.logger.warning("Driver is not connected. Nothing to disconnect.") + return False + self._scheduler.shutdown() + self.connected = False + self.logger.info(f"Successfully disconnected from {self.address}:{self.port}") + return True + + def _require_connection(self): + if not self.connected: + raise RuntimeError("Driver is not connected. Please connect before performing operations.") + +# ------------- GetConfig, SetConfig, DeleteConfig, with helper methods (START)----------------- + + def GetInitialConfig(self) -> List[Tuple[str, Any]]: + self._require_connection() + results = [] + for node in self._initial_config.descendants: + value = getattr(node, "value", None) + results.append((node.name, json.loads(value) if value else None)) + self.logger.info("Retrieved initial configurations") + return results + + def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, dict, Exception]]]: + """ + Retrieves the configuration for the specified resource keys. + If no keys are provided, returns the full configuration tree. + + Args: + resource_keys (List[str]): A list of keys specifying the configuration to retrieve. + + Returns: + List[Tuple[str, Union[Any, dict, Exception]]]: A list of tuples with the resource key and its value, + subtree, or an exception. + """ + self._require_connection() + results = [] + + try: + if not resource_keys: + # If no specific keys are provided, return the full configuration tree + full_tree = self._generate_subtree(self._running_config) + return [("full_configuration", full_tree)] + + for key in resource_keys: + try: + # Parse the resource key + resource_path = self._parse_resource_key(key) + self.logger.info(f"1. Retrieving configuration for resource path : {resource_path}") + + # Navigate to the node corresponding to the key + parent = self._running_config + for part in resource_path: + parent = self._find_or_raise_node(part, parent) + + # Check if the node has a value + value = getattr(parent, "value", None) + if value: + # If a value exists, return it + results.append((key, json.loads(value))) + else: + # If no value, return the subtree of this node + subtree = self._generate_subtree(parent) + results.append((key, subtree)) + + except Exception as e: + self.logger.exception(f"Failed to retrieve configuration for key: {key}") + results.append((key, e)) + + except Exception as e: + self.logger.exception("Failed to retrieve configurations") + results.append(("Error", e)) + + return results + + def SetConfig(self, config: dict) -> List[Union[bool, Exception]]: + self._require_connection() + results = [] + + if not isinstance(config, dict): + self.logger.error("Invalid configuration format: config must be a dictionary.") + raise ValueError("Invalid configuration format. Must be a dictionary.") + if 'config_rules' not in config or not isinstance(config['config_rules'], list): + self.logger.error("Invalid configuration format: 'config_rules' key missing or not a list.") + raise ValueError("Invalid configuration format. Must contain a 'config_rules' key with a list of rules.") + + for rule in config['config_rules']: + try: + if 'action' not in rule or 'custom' not in rule: + raise ValueError(f"Invalid rule format: {rule}") + + action = rule['action'] + custom = rule['custom'] + resource_key = custom.get('resource_key') + resource_value = custom.get('resource_value') + + if not resource_key: + raise ValueError(f"Resource key is missing in rule: {rule}") + + if resource_value is None: + raise ValueError(f"Resource value is None for key: {resource_key}") + if not resource_key: + raise ValueError(f"Resource key is missing in rule: {rule}") + + if action == 1: # Set action + resource_path = self._parse_resource_key(resource_key) + # self.logger.info(f"1. Setting configuration for resource key {resource_key} and resource_path: {resource_path}") + parent = self._running_config + + for part in resource_path[:-1]: + if '[' in part and ']' in part: + base, index = part.split('[', 1) + index = index.rstrip(']') + parent = self._find_or_create_node(index, self._find_or_create_node(base, parent)) + # self.logger.info(f"2a. Creating node: {base}, {index}, {parent}") + elif resource_path[-1] != 'settings': + # self.logger.info(f"2b. Creating node: {part}") + parent = self._find_or_create_node(part, parent) + + final_part = resource_path[-1] + if final_part in ['address', 'port']: + self._create_or_update_node(final_part, parent, resource_value) + self.logger.info(f"Configured: {resource_key} = {resource_value}") + + if resource_key.startswith("_connect/settings"): + parent = self._find_or_create_node("_connect", self._running_config) + settings_node = self._find_or_create_node("settings", parent) + settings_node.value = None # Ensure settings node has None value + endpoints_node = self._find_or_create_node("endpoints", settings_node) + + for endpoint in resource_value.get("endpoints", []): + uuid = endpoint.get("uuid") + uuid = uuid.replace('/', '_') if uuid else None + if uuid: + # self.logger.info(f"3. Creating endpoint: {uuid}, {endpoint}, {endpoints_node}") + self._create_or_update_node(uuid, endpoints_node, endpoint) + self.logger.info(f"Configured endpoint: {uuid} : {endpoint}") + + elif resource_key.startswith("/interface"): + interface_parent = self._find_or_create_node("interface", self._running_config) + name = resource_value.get("name") + name = name.replace('/', '_') if name else None + if name: + self._create_or_update_node(name, interface_parent, resource_value) + self.logger.info(f"Configured interface: {name} : {resource_value}") + # self.logger.info(f"4. Configured interface: {name}") + + results.append(True) + else: + raise ValueError(f"Unsupported action '{action}' in rule: {rule}") + + if resource_value is None: + raise ValueError(f"Resource value is None for key: {resource_key}") + + except Exception as e: + self.logger.exception(f"Failed to apply rule: {rule}") + results.append(e) + + return results + + def _generate_subtree(self, node: Node) -> dict: + """ + Generates a subtree of the configuration tree starting from the specified node. + + Args: + node (Node): The node from which to generate the subtree. + + Returns: + dict: The subtree as a dictionary. + """ + subtree = {} + for child in node.children: + if child.children: + subtree[child.name] = self._generate_subtree(child) + else: + value = getattr(child, "value", None) + subtree[child.name] = json.loads(value) if value else None + return subtree + + def DeleteConfig(self, resource_keys: List[str]) -> List[Union[bool, Exception]]: + self._require_connection() + results = [] + + for key in resource_keys: + try: + # Parse resource key into parts, handling brackets correctly + resource_path = self._parse_resource_key(key) + + parent = self._running_config + for part in resource_path: + parent = self._find_or_raise_node(part, parent) + + # Delete the final node + node_to_delete = parent + parent = node_to_delete.parent + parent.children = tuple(child for child in parent.children if child != node_to_delete) + self.logger.info(f"Deleted configuration for key: {key}") + + # Handle endpoints structure + if "interface" in key and "settings" in key: + interface_name = key.split('[')[-1].split(']')[0] + endpoints_parent = self._find_or_raise_node("_connect", self._running_config) + endpoints_node = self._find_or_raise_node("endpoints", endpoints_parent) + endpoint_to_delete = next((child for child in endpoints_node.children if child.name == interface_name), None) + if endpoint_to_delete: + endpoints_node.children = tuple(child for child in endpoints_node.children if child != endpoint_to_delete) + self.logger.info(f"Removed endpoint entry for interface '{interface_name}'") + + # Check if parent has no more children and is not the root + while parent and parent.name != "root" and not parent.children: + node_to_delete = parent + parent = node_to_delete.parent + parent.children = tuple(child for child in parent.children if child != node_to_delete) + self.logger.info(f"Deleted empty parent node: {node_to_delete.name}") + + results.append(True) + except Exception as e: + self.logger.exception(f"Failed to delete configuration for key: {key}") + results.append(e) + + return results + + def _parse_resource_key(self, resource_key: str) -> List[str]: + """ + Parses the resource key into parts, correctly handling brackets. + + Args: + resource_key (str): The resource key to parse. + + Returns: + List[str]: A list of parts from the resource key. + """ + resource_path = [] + current_part = "" + in_brackets = False + + if not resource_key.startswith('/interface'): + for char in resource_key.strip('/'): + if char == '[': + in_brackets = True + current_part += char + elif char == ']': + in_brackets = False + current_part += char + elif char == '/' and not in_brackets: + resource_path.append(current_part) + current_part = "" + else: + current_part += char + if current_part: + resource_path.append(current_part) + return resource_path + else: + resource_path = resource_key.strip('/').split('/', 1) + if resource_path[1] == 'settings': + return resource_path + else: + resource_path = [resource_key.strip('/').split('[')[0].strip('/'), resource_key.strip('/').split('[')[1].split(']')[0].replace('/', '_')] + return resource_path + + def _find_or_raise_node(self, name: str, parent: Node) -> Node: + """ + Finds a node with the given name under the specified parent or raises an exception if not found. + + Args: + name (str): The name of the node to find. + parent (Node): The parent node. + + Returns: + Node: The found node. + + Raises: + ValueError: If the node is not found. + """ + node = next((child for child in parent.children if child.name == name), None) + if not node: + raise ValueError(f"Node '{name}' not found under parent '{parent.name}'.") + return node + + def _find_or_create_node(self, name: str, parent: Node) -> Node: + """ + Finds or creates a node with the given name under the specified parent. + + Args: + name (str): The name of the node to find or create. + parent (Node): The parent node. + + Returns: + Node: The found or created node. + """ + node = next((child for child in parent.children if child.name == name), None) + if not node: + node = Node(name, parent=parent) + return node + + def _create_or_update_node(self, name: str, parent: Node, value: Any): + """ + Creates or updates a node with the given name and value under the specified parent. + + Args: + name (str): The name of the node. + parent (Node): The parent node. + value (Any): The value to set on the node. + """ + node = next((child for child in parent.children if child.name == name), None) + if node: + node.value = json.dumps(value) + else: + Node(name, parent=parent, value=json.dumps(value)) + + def validate_resource_key(self, key: str) -> str: + """ + Splits the input string into two parts: + - The first part is '_connect/settings/endpoints/'. + - The second part is the remaining string after the first part, with '/' replaced by '_'. + + Args: + key (str): The input string to process. + + Returns: + str: A single string with the processed result. + """ + prefix = '_connect/settings/endpoints/' + if not key.startswith(prefix): + raise ValueError(f"The input path '{key}' does not start with the expected prefix: {prefix}") + second_part = key[len(prefix):] + second_part_processed = second_part.replace('/', '_') + validated_key = prefix + second_part_processed + return validated_key + +# ------------- GetConfig, SetConfig, DeleteConfig, with helper methods (END)----------------- + + def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + self._require_connection() + results = [] + for resource_key, duration, interval in subscriptions: + resource_key = self.validate_resource_key(resource_key) # Validate the endpoint name + self.logger.info(f"1. Subscribing to {resource_key} with duration {duration}s and interval {interval}s") + try: + self._resolver.get(self._running_config, resource_key) # Verify if the resource key exists in the running configuration + self.logger.info(f"Resource key {resource_key} exists in the configuration.") + resource_value = json.loads(self._resolver.get(self._running_config, resource_key).value) + if resource_value is not None: + sample_type_ids = resource_value['sample_types'] + self.logger.info(f"Sample type IDs for {resource_key}: {sample_type_ids}") + else: + self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.") + continue + # Add the job to the scheduler + job_id = f"{resource_key}-{uuid.uuid4()}" + self._scheduler.add_job( + self._generate_sample, + 'interval', + seconds=interval, + args=[resource_key, sample_type_ids], + id=job_id, + replace_existing=True, + end_date=datetime.now(pytz.utc) + timedelta(seconds=duration) + ) + self.logger.info(f"Job added to scheduler for resource key {resource_key} with duration {duration}s and interval {interval}s") + results.append(True) + except Exception as e: + self.logger.error(f"Failed to verify resource key or add job: {e}") + results.append(e) + return results + + def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + self._require_connection() + results = [] + for resource_key, _, _ in subscriptions: + resource_key = self.validate_resource_key(resource_key) + try: + # Check if job exists + job_ids = [job.id for job in self._scheduler.get_jobs() if resource_key in job.id] + if not job_ids: + self.logger.warning(f"No active jobs found for {resource_key}. It might have already terminated.") + results.append(False) + continue + # Remove jobs + for job_id in job_ids: + self._scheduler.remove_job(job_id) + + self.logger.info(f"Unsubscribed from {resource_key} with job IDs: {job_ids}") + results.append(True) + except Exception as e: + self.logger.exception(f"Failed to unsubscribe from {resource_key}") + results.append(e) + return results + + def GetState(self, blocking: bool = False, terminate: Optional[queue.Queue] = None) -> Iterator[Tuple[str, Any]]: + self._require_connection() + start_time = datetime.now(pytz.utc) + duration = 10 # Duration of the subscription in seconds (as an example) + + while True: + try: + if terminate and not terminate.empty(): + self.logger.info("Termination signal received, stopping GetState") + break + + elapsed_time = (datetime.now(pytz.utc) - start_time).total_seconds() + if elapsed_time >= duration: + self.logger.info("Duration expired, stopping GetState") + break + + sample = self._out_samples.get(block=blocking, timeout=1 if blocking else 0.1) + self.logger.info(f"Retrieved state sample: {sample}") + yield sample + except queue.Empty: + if not blocking: + self.logger.info("No more samples in queue, exiting GetState") + return None + + def _generate_sample(self, resource_key: str, sample_type_ids : List[int]): + self._require_connection() + # Simulate generating a sample for the resource key + self.logger.debug(f"Executing _generate_sample for resource: {resource_key}") + sample = self._synthetic_data.generate_synthetic_data_point(resource_key, sample_type_ids) + self._out_samples.put(sample) + +# ------------- Event Listeners (START)----------------- + + def _listener_job_removed_from_subscription_tree(self, event): + if event.job_id: + # Extract the resource key from the job ID + resource_key = event.job_id.split('-')[0] + resource_key = self.validate_resource_key(resource_key) + + # Remove the subscription from the tree + try: + subscription_path = resource_key.split('/') + parent = self._subscriptions + for part in subscription_path: + parent = next((child for child in parent.children if child.name == part), None) + if not parent: + raise ValueError(f"Subscription path '{resource_key}' not found in tree.") + if parent: + parent.parent.children = tuple(child for child in parent.parent.children if child != parent) + self.logger.info(f"Automatically removed subscription from subscription_tree for {resource_key} after job termination by listener.") + except Exception as e: + self.logger.warning(f"Failed to remove subscription for {resource_key}: {e}") + + def _listener_job_added_to_subscription_tree(self, event): + try: + job_id = event.job_id + if job_id: + resource_key = job_id.split('-')[0] # Extract resource key from job ID + resource_key = self.validate_resource_key(resource_key) + subscription_path = resource_key.split('/') + parent = self._subscriptions + for part in subscription_path: + node = next((child for child in parent.children if child.name == part), None) + if not node: + node = Node(part, parent=parent) + parent = node + parent.value = { + "job_id": job_id + } + self.logger.info(f"Automatically added subscription for {resource_key} to the subscription_tree by listener.") + except Exception as e: + self.logger.exception("Failed to add subscription to the tree") + +# ------------- Event Listeners (END)----------------- + + def log_active_jobs(self): + """ + Logs the IDs of all active jobs. + This method retrieves the list of active jobs from the scheduler and logs their IDs using the logger. + """ + self._require_connection() + jobs = self._scheduler.get_jobs() + self.logger.info(f"Active jobs: {[job.id for job in jobs]}") + + def print_config_tree(self): + """ + Reads the configuration using GetConfig and prints it as a hierarchical tree structure. + For debugging purposes. + """ + self._require_connection() + + def print_tree(node, indent=""): + """ + Recursively prints the configuration tree. + + Args: + node (Node): The current node to print. + indent (str): The current indentation level. + """ + if node.name != "root": # Skip the root node's name + value = getattr(node, "value", None) + print(f"{indent}- {node.name}: {json.loads(value) if value else ''}") + + for child in node.children: + print_tree(child, indent + " ") + + print("Configuration Tree:") + print_tree(self._running_config) diff --git a/src/telemetry/backend/drivers/emulated/SyntheticMetricsGenerator.py b/src/telemetry/backend/drivers/emulated/SyntheticMetricsGenerator.py new file mode 100644 index 0000000000000000000000000000000000000000..d6606b2c123f89118fa74de80fb411d5840efa9e --- /dev/null +++ b/src/telemetry/backend/drivers/emulated/SyntheticMetricsGenerator.py @@ -0,0 +1,116 @@ +from types import NoneType +import numpy as np +import random +import logging +import queue +import time + +LOGGER = logging.getLogger(__name__) + +class SyntheticMetricsGenerator(): + """ + This collector class generates synthetic network metrics based on the current network state. + The metrics include packet_in, packet_out, bytes_in, bytes_out, packet_loss (percentage), packet_drop_count, byte_drop_count, and latency. + The network state can be 'good', 'moderate', or 'poor', and it affects the generated metrics accordingly. + """ + def __init__(self, metric_queue=None, network_state="good"): + LOGGER.info("Initiaitng Emulator") + super().__init__() + self.metric_queue = metric_queue if metric_queue is not None else queue.Queue() + self.network_state = network_state + self.running = True + self.set_initial_parameter_values() # update this method to set the initial values for the parameters + + def set_initial_parameter_values(self): + self.bytes_per_pkt = random.uniform(65, 150) + self.states = ["good", "moderate", "poor"] + self.state_probabilities = { + "good" : [0.9, 0.1, 0.0], + "moderate": [0.2, 0.7, 0.1], + "poor" : [0.0, 0.3, 0.7] + } + if self.network_state == "good": + self.packet_in = random.uniform(700, 900) + elif self.network_state == "moderate": + self.packet_in = random.uniform(300, 700) + else: + self.packet_in = random.uniform(100, 300) + + def generate_synthetic_data_point(self, resource_key, sample_type_ids): + """ + Generates a synthetic data point based on the current network state. + + Parameters: + resource_key (str): The key associated with the resource for which the data point is generated. + + Returns: + tuple: A tuple containing the timestamp, resource key, and a dictionary of generated metrics. + """ + if self.network_state == "good": + packet_loss = random.uniform(0.01, 0.1) + random_noise = random.uniform(1,10) + latency = random.uniform(5, 25) + elif self.network_state == "moderate": + packet_loss = random.uniform(0.1, 1) + random_noise = random.uniform(10, 40) + latency = random.uniform(25, 100) + elif self.network_state == "poor": + packet_loss = random.uniform(1, 3) + random_noise = random.uniform(40, 100) + latency = random.uniform(100, 300) + else: + raise ValueError("Invalid network state. Must be 'good', 'moderate', or 'poor'.") + + period = 60 * 60 * random.uniform(10, 100) + amplitude = random.uniform(50, 100) + sin_wave = amplitude * np.sin(2 * np.pi * 100 / period) + self.packet_in + packet_in = sin_wave + ((sin_wave/100) * random_noise) + packet_out = packet_in - ((packet_in / 100) * packet_loss) + bytes_in = packet_in * self.bytes_per_pkt + bytes_out = packet_out * self.bytes_per_pkt + packet_drop_count = packet_in * (packet_loss / 100) + byte_drop_count = packet_drop_count * self.bytes_per_pkt + + state_prob = self.state_probabilities[self.network_state] + self.network_state = random.choices(self.states, state_prob)[0] + print (self.network_state) + + generated_samples = { + "packet_in" : int(packet_in), "packet_out" : int(packet_out), "bytes_in" : float(bytes_in), + "bytes_out" : float(bytes_out), "packet_loss": float(packet_loss), "packet_drop_count" : int(packet_drop_count), + "latency" : float(latency), "byte_drop_count": float(byte_drop_count) + } + requested_metrics = self.metric_id_mapper(sample_type_ids) + generated_samples = {metric: generated_samples[metric] for metric in requested_metrics} + + return (time.time(), resource_key, generated_samples) + + def metric_id_mapper(self, sample_type_ids): + """ + Maps the sample type IDs to the corresponding metric names. + + Parameters: + sample_type_ids (list): A list of sample type IDs. + + Returns: + list: A list of metric names. + """ + metric_names = [] + for sample_type_id in sample_type_ids: + if sample_type_id == 102: + metric_names.append("packet_in") + elif sample_type_id == 101: + metric_names.append("packet_out") + elif sample_type_id == 103: + metric_names.append("packet_drop_count") + elif sample_type_id == 202: + metric_names.append("bytes_in") + elif sample_type_id == 201: + metric_names.append("bytes_out") + elif sample_type_id == 203: + metric_names.append("byte_drop_count") + elif sample_type_id == 701: + metric_names.append("latency") + else: + raise ValueError(f"Invalid sample type ID: {sample_type_id}") + return metric_names diff --git a/src/telemetry/backend/drivers/emulated/__init__.py b/src/telemetry/backend/drivers/emulated/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..bbfc943b68af13a11e562abbc8680ade71db8f02 --- /dev/null +++ b/src/telemetry/backend/drivers/emulated/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/telemetry/backend/tests/messages_emulated.py b/src/telemetry/backend/tests/messages_emulated.py new file mode 100644 index 0000000000000000000000000000000000000000..adf0376e94f3499613aab34abf4b25667d70e4ab --- /dev/null +++ b/src/telemetry/backend/tests/messages_emulated.py @@ -0,0 +1,52 @@ +import logging +# Configure logging to ensure logs appear on the console +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +def create_test_configuration(): + return { + "config_rules": [ + {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}}, + {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": 8080}}, + {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": { + "endpoints": [ + {"uuid": "eth0", "type": "ethernet", "sample_types": [101, 102]}, + {"uuid": "eth1", "type": "ethernet", "sample_types": [201, 202]}, + {"uuid": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]} + ] + }}}, + {"action": 1, "custom": {"resource_key": "/interface[eth0]/settings", "resource_value": { + "name": "eth0", "enabled": True + }}}, + {"action": 1, "custom": {"resource_key": "/interface[eth1]/settings", "resource_value": { + "name": "eth1", "enabled": False + }}}, + {"action": 1, "custom": {"resource_key": "/interface[13/1/2]/settings", "resource_value": { + "name": "13/1/2", "enabled": True + }}} + ] + } + +# This method is used to create a specific configuration to be used in the test case test_get_config in the test_EmulatedDriver.py file +def create_specific_config_keys(): + # config = create_test_configuration() + keys_to_return = ["_connect/settings/endpoints/eth1", "/interface/[13/1/2]/settings", "_connect/address"] + return keys_to_return + # return {rule["custom"]["resource_key"]: rule["custom"]["resource_value"] for rule in config["config_rules"] if rule["custom"]["resource_key"] in keys_to_return} + +# write a method to create a specific configuration to be used in the test case test_delete_config in the test_EmulatedDriver1.py file +def create_config_for_delete(): + keys_to_delete = ["_connect/settings/endpoints/eth0", "/interface/[eth1]", "_connect/port"] + return keys_to_delete + +# write a method to generate subscription for specific endpoints. +def create_test_subscriptions(): + return [("_connect/settings/endpoints/eth1", 10, 2), + ("_connect/settings/endpoints/13/1/2", 15, 3), + ("_connect/settings/endpoints/eth0", 8, 2)] + +def create_unscubscribe_subscriptions(): + return [("_connect/settings/endpoints/eth1", 10, 2), + ("_connect/settings/endpoints/13/1/2", 15, 3), + ("_connect/settings/endpoints/eth0", 8, 2)] \ No newline at end of file diff --git a/src/telemetry/backend/tests/test_emulated.py b/src/telemetry/backend/tests/test_emulated.py new file mode 100644 index 0000000000000000000000000000000000000000..94eb7656134a74aa1f621342104341aa02171cf3 --- /dev/null +++ b/src/telemetry/backend/tests/test_emulated.py @@ -0,0 +1,93 @@ +import logging +import time +import pytest +from telemetry.backend.drivers.emulated.EmulatedDriver import EmulatedDriver +from telemetry.backend.tests.messages_emulated import ( + create_test_configuration, + create_specific_config_keys, + create_config_for_delete, + create_test_subscriptions, +) + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +@pytest.fixture +def setup_driver(): + """Sets up an EmulatedDriver instance for testing.""" + yield EmulatedDriver(address="127.0.0.1", port=8080) + +@pytest.fixture +def connected_configured_driver(setup_driver): + driver = setup_driver # EmulatedDriver(address="127.0.0.1", port=8080) + driver.Connect() + driver.SetConfig(create_test_configuration()) + yield driver + driver.Disconnect() + +def test_connect(setup_driver): + logger.info(">>> test_connect <<<") + driver = setup_driver + assert driver.Connect() is True + assert driver.connected is True + +def test_disconnect(setup_driver): + logger.info(">>> test_disconnect <<<") + driver = setup_driver + driver.Connect() + assert driver.Disconnect() is True + assert driver.connected is False + +def test_set_config(setup_driver): + logger.info(">>> test_set_config <<<") + driver = setup_driver + driver.Connect() + + config = create_test_configuration() + + results = driver.SetConfig(config) + assert all(result is True for result in results) + +def test_get_config(connected_configured_driver): + logger.info(">>> test_get_config <<<") + resource_keys = create_specific_config_keys() + results = connected_configured_driver.GetConfig(resource_keys) + + for key, value in results: + assert key in create_specific_config_keys() + assert value is not None + +def test_delete_config(connected_configured_driver): + logger.info(">>> test_delete_config <<<") + resource_keys = create_config_for_delete() + + results = connected_configured_driver.DeleteConfig(resource_keys) + assert all(result is True for result in results) + +def test_subscribe_state(connected_configured_driver): + logger.info(">>> test_subscribe_state <<<") + subscriptions = create_test_subscriptions() + + results = connected_configured_driver.SubscribeState(subscriptions) + assert all(result is True for result in results) + +def test_unsubscribe_state(connected_configured_driver): + logger.info(">>> test_unsubscribe_state <<<") + subscriptions = create_test_subscriptions() + + connected_configured_driver.SubscribeState(subscriptions) + results = connected_configured_driver.UnsubscribeState(subscriptions) + assert all(result is True for result in results) + +def test_get_state(connected_configured_driver): + logger.info(">>> test_get_state <<<") + subscriptions = create_test_subscriptions() + + connected_configured_driver.SubscribeState(subscriptions) + logger.info(f"Subscribed to state: {subscriptions}. waiting for 3 seconds ...") + time.sleep(3) + + state_iterator = connected_configured_driver.GetState(blocking=False) + states = list(state_iterator) + + assert len(states) > 0 diff --git a/src/telemetry/requirements.in b/src/telemetry/requirements.in index 503468a662599f0225b293d0ef4c4e4313fa3e0f..e264104e36637b6285a6503995bf048379461ef5 100644 --- a/src/telemetry/requirements.in +++ b/src/telemetry/requirements.in @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +anytree==2.8.0 APScheduler==3.10.1 +numpy==2.2.1 psycopg2-binary==2.9.3 python-dateutil==2.8.2 python-json-logger==2.0.2