diff --git a/src/device/service/drivers/qkd/QKDDriver2.py b/src/device/service/drivers/qkd/QKDDriver2.py index 84d9f411ef42b9af6d56a8142394a81e7dfa0104..e4d31cf2dcb4048d5da8d9ce6e99abd245ae5766 100644 --- a/src/device/service/drivers/qkd/QKDDriver2.py +++ b/src/device/service/drivers/qkd/QKDDriver2.py @@ -9,6 +9,8 @@ from common.method_wrappers.Decorator import MetricsPool, metered_subclass_metho from common.type_checkers.Checkers import chk_string, chk_type from device.service.driver_api._Driver import _Driver from .Tools2 import config_getter, create_connectivity_link +from device.service.driver_api._Driver import _Driver +from . import ALL_RESOURCE_KEYS LOGGER = logging.getLogger(__name__) @@ -25,49 +27,17 @@ class QKDDriver(_Driver): self.__terminate = threading.Event() self.__auth = None self.__headers = {} - self.__qkd_root = os.getenv('QKD_API_URL', '{:s}://{:s}:{:d}'.format(settings.get('scheme', 'http'), self.address, int(self.port))) + self.__qkd_root = os.getenv('QKD_API_URL', f"http://{self.address}:{self.port}") # Simplified URL management self.__timeout = int(self.settings.get('timeout', 120)) self.__node_ids = set(self.settings.get('node_ids', [])) self.__initial_data = None - # Authentication settings - self.__username = settings.get('username') - self.__password = settings.get('password') - self.__use_jwt = settings.get('use_jwt', True) # Default to True if JWT is required - self.__token = settings.get('token') - - if self.__token: - self.__headers = {'Authorization': 'Bearer ' + self.__token} - elif self.__username and self.__password: - self.__auth = HTTPBasicAuth(self.__username, self.__password) + # Optionally pass headers for authentication (e.g., JWT) + self.__headers = settings.get('headers', {}) + self.__auth = settings.get('auth', None) LOGGER.info(f"QKDDriver initialized with QKD root URL: {self.__qkd_root}") - def authenticate(self) -> bool: - if self.__use_jwt and not self.__token: - return self.__authenticate_with_jwt() - return True - - def __authenticate_with_jwt(self) -> bool: - login_url = f'{self.__qkd_root}/login' - payload = {'username': self.__username, 'password': self.__password} - - try: - LOGGER.info(f'Attempting to authenticate with JWT at {login_url}') - response = requests.post(login_url, data=payload, timeout=self.__timeout) - response.raise_for_status() - token = response.json().get('access_token') - if not token: - LOGGER.error('Failed to retrieve access token') - return False - self.__token = token # Store the token - self.__headers = {'Authorization': f'Bearer {token}'} - LOGGER.info('JWT authentication successful') - return True - except requests.exceptions.RequestException as e: - LOGGER.exception(f'JWT authentication failed: {e}') - return False - def Connect(self) -> bool: url = self.__qkd_root + '/restconf/data/etsi-qkd-sdn-node:qkd_node' with self.__lock: @@ -77,11 +47,6 @@ class QKDDriver(_Driver): return True try: - if not self.__headers and not self.__auth: - LOGGER.info("No headers or auth found, calling authenticate.") - if not self.authenticate(): - return False - LOGGER.info(f'Attempting to connect to {url} with headers {self.__headers} and timeout {self.__timeout}') response = requests.get(url, timeout=self.__timeout, verify=False, headers=self.__headers, auth=self.__auth) LOGGER.info(f'Received response: {response.status_code}, content: {response.text}') @@ -119,13 +84,12 @@ class QKDDriver(_Driver): results = [] with self.__lock: if not resource_keys: - resource_keys = ['capabilities', 'interfaces', 'links', 'endpoints', 'apps'] + resource_keys = ALL_RESOURCE_KEYS for i, resource_key in enumerate(resource_keys): chk_string(f'resource_key[{i}]', resource_key, allow_empty=False) LOGGER.info(f"Retrieving resource key: {resource_key}") resource_results = config_getter( - self.__qkd_root, resource_key, timeout=self.__timeout, headers=self.__headers, auth=self.__auth, - node_ids=self.__node_ids) + self.__qkd_root, resource_key, timeout=self.__timeout, headers=self.__headers, auth=self.__auth) results.extend(resource_results) LOGGER.info(f"Resource results for {resource_key}: {resource_results}") LOGGER.info(f"Final configuration results: {results}") @@ -133,45 +97,55 @@ class QKDDriver(_Driver): @metered_subclass_method(METRICS_POOL) def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: - LOGGER.info(f"Setting configuration for resources: {resources}") results = [] - if not resources: - LOGGER.warning("No resources provided for SetConfig") + if len(resources) == 0: return results + with self.__lock: for resource_key, resource_value in resources: - LOGGER.info(f'Processing resource_key: {resource_key}, resource_value: {resource_value}') + LOGGER.info('Processing resource_key = {:s}'.format(str(resource_key))) + # Only process '/link' keys if resource_key.startswith('/link'): try: - if not isinstance(resource_value, dict): - raise TypeError(f"Expected dictionary but got {type(resource_value).__name__}") - - link_uuid = resource_value.get('uuid') - node_id_src = resource_value.get('src_qkdn_id') - interface_id_src = resource_value.get('src_interface_id') - node_id_dst = resource_value.get('dst_qkdn_id') - interface_id_dst = resource_value.get('dst_interface_id') + # Ensure resource_value is deserialized + if isinstance(resource_value, str): + resource_value = json.loads(resource_value) + + # Extract values from resource_value dictionary + link_uuid = resource_value['uuid'] + node_id_src = resource_value['src_qkdn_id'] + interface_id_src = resource_value['src_interface_id'] + node_id_dst = resource_value['dst_qkdn_id'] + interface_id_dst = resource_value['dst_interface_id'] virt_prev_hop = resource_value.get('virt_prev_hop') virt_next_hops = resource_value.get('virt_next_hops') virt_bandwidth = resource_value.get('virt_bandwidth') + # Call create_connectivity_link with the extracted values LOGGER.info(f"Creating connectivity link with UUID: {link_uuid}") - create_connectivity_link( + data = create_connectivity_link( self.__qkd_root, link_uuid, node_id_src, interface_id_src, node_id_dst, interface_id_dst, virt_prev_hop, virt_next_hops, virt_bandwidth, - headers=self.__headers, timeout=self.__timeout, auth=self.__auth + timeout=self.__timeout, auth=self.__auth ) + + # Append success result results.append(True) LOGGER.info(f"Connectivity link {link_uuid} created successfully") + except Exception as e: + # Catch and log any unhandled exceptions LOGGER.exception(f'Unhandled error processing resource_key({resource_key})') results.append(e) else: - LOGGER.error(f'Invalid resource key detected: {resource_key}') - results.append(ValueError(f'Invalid resource key: {resource_key}')) - - LOGGER.info(f"SetConfig results: {results}") + # Skip unsupported resource keys and append success + results.append(True) + + # Logging test results + LOGGER.info('Test keys: ' + str([x for x,y in resources])) + LOGGER.info('Test values: ' + str(results)) + return results @metered_subclass_method(METRICS_POOL) diff --git a/src/device/service/drivers/qkd/Tools2.py b/src/device/service/drivers/qkd/Tools2.py index ea88799f4d2ce2d65952ccb0ae59bc2da3770a58..082e820bbbceeafed51f1d425a0b6885d230ef13 100644 --- a/src/device/service/drivers/qkd/Tools2.py +++ b/src/device/service/drivers/qkd/Tools2.py @@ -1,202 +1,251 @@ import json import logging import requests -from requests.auth import HTTPBasicAuth from typing import Dict, Optional, Set, List, Tuple, Union, Any -from requests.adapters import HTTPAdapter -from urllib3.util.retry import Retry +from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES +from . import RESOURCE_APPS, RESOURCE_LINKS, RESOURCE_CAPABILITES, RESOURCE_NODE LOGGER = logging.getLogger(__name__) -HTTP_OK_CODES = { - 200, # OK - 201, # Created - 202, # Accepted - 204, # No Content -} - -def get_request_session(retries=5, backoff_factor=1.0, status_forcelist=(500, 502, 504)): - """ - Creates a requests session with retries and backoff strategy. - """ - LOGGER.info(f"Creating request session with retries={retries}, backoff_factor={backoff_factor}, status_forcelist={status_forcelist}") - session = requests.Session() - retry = Retry( - total=retries, - read=retries, - connect=retries, - backoff_factor=backoff_factor, - status_forcelist=status_forcelist, - ) - adapter = HTTPAdapter(max_retries=retry) - session.mount('http://', adapter) - session.mount('https://', adapter) - LOGGER.info("Request session created successfully") - return session - -def find_key(resource, key): +HTTP_OK_CODES = {200, 201, 202, 204} + +def find_key(resource: Tuple[str, str], key: str) -> Any: """ Extracts a specific key from a JSON resource. """ - return json.loads(resource[1])[key] + return json.loads(resource[1]).get(key) -def verify_endpoint_existence(session, endpoint_uuid, root_url, headers): - """ - Verifies if the given endpoint exists. - """ - url = f"{root_url}/restconf/data/etsi-qkd-sdn-node:qkd_node/qkd_interfaces/qkd_interface={endpoint_uuid}" - r = session.get(url, headers=headers) - if r.status_code == 200 and r.json(): - return True - else: - LOGGER.error(f"Endpoint {endpoint_uuid} does not exist or is not accessible") - return False def config_getter( - root_url: str, resource_key: str, auth: Optional[HTTPBasicAuth] = None, timeout: Optional[int] = None, + root_url: str, resource_key: str, auth: Optional[Any] = None, timeout: Optional[int] = None, node_ids: Set[str] = set(), headers: Dict[str, str] = {} ) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]: """ Fetches configuration data from a QKD node for a specified resource key. Returns a list of tuples containing the resource key and the corresponding data or exception. + The function is agnostic to authentication: headers and auth are passed from external sources. """ url = f"{root_url}/restconf/data/etsi-qkd-sdn-node:qkd_node/" - result = [] - session = get_request_session() + LOGGER.info(f"Fetching configuration for {resource_key} from {root_url}") + + try: + if resource_key in [RESOURCE_ENDPOINTS, RESOURCE_INTERFACES]: + return fetch_interfaces(url, resource_key, headers, auth, timeout) + + elif resource_key in [RESOURCE_LINKS, RESOURCE_NETWORK_INSTANCES]: + return fetch_links(url, resource_key, headers, auth, timeout) + + elif resource_key in [RESOURCE_APPS]: + return fetch_apps(url, resource_key, headers, auth, timeout) + + elif resource_key in [RESOURCE_CAPABILITES]: + return fetch_capabilities(url, resource_key, headers, auth, timeout) - LOGGER.info(f"Starting config_getter with root_url={root_url}, resource_key={resource_key}, headers={headers}") + elif resource_key in [RESOURCE_NODE]: + return fetch_node(url, resource_key, headers, auth, timeout) + else: + LOGGER.warning(f"Unknown resource key: {resource_key}") + return [(resource_key, ValueError(f"Unknown resource key: {resource_key}"))] + + except requests.exceptions.RequestException as e: + LOGGER.error(f'Error retrieving/parsing {resource_key} from {url}: {e}') + return [(resource_key, e)] + + +def fetch_interfaces(url: str, resource_key: str, headers: Dict[str, str], auth: Optional[Any], timeout: Optional[int]) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]: + """ + Fetches interface data from the QKD node. Adapts to both mocked and real QKD data structures. + """ + result = [] + url += 'qkd_interfaces/' + try: - if resource_key in ['endpoints', '__endpoints__', 'interfaces']: - url += 'qkd_interfaces/' - LOGGER.info(f"Making GET request to {url} with headers: {headers}") - r = session.get(url, timeout=timeout, verify=False, auth=auth, headers=headers) - LOGGER.info(f"Received response: {r.status_code}, content: {r.text}") - r.raise_for_status() - interfaces = r.json().get('qkd_interfaces', {}).get('qkd_interface', []) - if not interfaces: - raise KeyError('qkd_interfaces') - for interface in interfaces: - if resource_key in ['endpoints', '__endpoints__']: - endpoint_uuid = f"{interface['qkdi_att_point'].get('device', 'N/A')}:{interface['qkdi_att_point'].get('port', 'N/A')}" - resource_key_with_uuid = f"/endpoints/endpoint[{endpoint_uuid}]" - interface['uuid'] = endpoint_uuid - result.append((resource_key_with_uuid, interface)) - else: - interface_uuid = f"{interface['qkdi_att_point'].get('device', 'N/A')}:{interface['qkdi_att_point'].get('port', 'N/A')}" + r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers) + r.raise_for_status() + + # Handle both real and mocked QKD response structures + response_data = r.json() + + if isinstance(response_data.get('qkd_interfaces'), dict): + interfaces = response_data.get('qkd_interfaces', {}).get('qkd_interface', []) + else: + interfaces = response_data.get('qkd_interface', []) + + for interface in interfaces: + if resource_key in [RESOURCE_ENDPOINTS]: + # Handle real QKD data format + resource_value = interface.get('qkdi_att_point', {}) + if 'device' in resource_value and 'port' in resource_value: + uuid = f"{resource_value['device']}:{resource_value['port']}" + resource_key_with_uuid = f"/endpoints/endpoint[{uuid}]" + resource_value['uuid'] = uuid + + # Add sample types (for demonstration purposes) + sample_types = {} + metric_name = 'KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS' + metric_id = 301 + metric_name = metric_name.lower().replace('kpisampletype_', '') + monitoring_resource_key = '{:s}/state/{:s}'.format(resource_key, metric_name) + sample_types[metric_id] = monitoring_resource_key + resource_value['sample_types'] = sample_types + + result.append((resource_key_with_uuid, resource_value)) + + else: + # Handle both real and mocked QKD formats + endpoint_value = interface.get('qkdi_att_point', {}) + if 'device' in endpoint_value and 'port' in endpoint_value: + # Real QKD data format + interface_uuid = f"{endpoint_value['device']}:{endpoint_value['port']}" interface['uuid'] = interface_uuid interface['name'] = interface_uuid - interface['enabled'] = True - resource_key_with_uuid = f"/interface[{interface['qkdi_id']}]" - result.append((resource_key_with_uuid, interface)) - - elif resource_key in ['links', '__links__', '__network_instances__', 'network_instances']: - url += 'qkd_links/' - LOGGER.info(f"Making GET request to {url} with headers: {headers}") - r = session.get(url, timeout=timeout, verify=False, auth=auth, headers=headers) - LOGGER.info(f"Received response: {r.status_code}, content: {r.text}") + interface['enabled'] = True # Assume enabled for real data + else: + # Mocked QKD data format + interface_uuid = interface.get('uuid', f"/interface[{interface['qkdi_id']}]") + interface['uuid'] = interface_uuid + interface['name'] = interface.get('name', interface_uuid) + interface['enabled'] = interface.get('enabled', False) # Mocked enabled status + + result.append((f"/interface[{interface['qkdi_id']}]", interface)) + + except requests.RequestException as e: + LOGGER.error(f"Error fetching interfaces from {url}: {e}") + result.append((resource_key, e)) + + return result + +def fetch_links(url: str, resource_key: str, headers: Dict[str, str], auth: Optional[Any], timeout: Optional[int]) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]: + """ + Fetches link data from the QKD node. Adapts to both mocked and real QKD data structures. + """ + result = [] + + if resource_key in [RESOURCE_LINKS, RESOURCE_NETWORK_INSTANCES]: + url += 'qkd_links/' + + try: + r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers) r.raise_for_status() - links = r.json().get('qkd_links', {}).get('qkd_link', []) - if not links: - LOGGER.warning(f"No links found in the response for 'qkd_links'") + + # Handle real and mocked QKD data structures + links = r.json().get('qkd_links', []) for link in links: - link_type = link.get('qkdl_type', 'Direct') + # For real QKD format (QKD links returned as dictionary objects) + if isinstance(link, dict): + qkdl_id = link.get('qkdl_id') + link_type = link.get('qkdl_type', 'Direct') + + # Handle both real (PHYS, VIRT) and mocked (DIRECT) link types + if link_type == 'PHYS' or link_type == 'VIRT': + resource_key_direct = f"/link[{qkdl_id}]" + result.append((resource_key_direct, link)) + elif link_type == 'DIRECT': + # Mocked QKD format has a slightly different structure + result.append((f"/link/link[{qkdl_id}]", link)) - if resource_key == 'links': - if link_type == 'Direct': - resource_key_with_uuid = f"/link[{link['qkdl_id']}]" - result.append((resource_key_with_uuid, link)) - else: - if link_type == 'Virtual': - resource_key_with_uuid = f"/service[{link['qkdl_id']}]" - result.append((resource_key_with_uuid, link)) - - elif resource_key in ['apps', '__apps__']: - url += 'qkd_applications/' - LOGGER.info(f"Making GET request to {url} with headers: {headers}") - r = session.get(url, timeout=timeout, verify=False, auth=auth, headers=headers) - LOGGER.info(f"Received response: {r.status_code}, content: {r.text}") - r.raise_for_status() - apps = r.json().get('qkd_applications', {}).get('qkd_app', []) - if not apps: - raise KeyError('qkd_applications') - for app in apps: - app_resource_key = f"/app[{app['app_id']}]" - result.append((app_resource_key, app)) - - elif resource_key in ['capabilities', '__capabilities__']: - url += 'qkdn_capabilities/' - LOGGER.info(f"Making GET request to {url} with headers: {headers}") - r = session.get(url, timeout=timeout, verify=False, auth=auth, headers=headers) - LOGGER.info(f"Received response: {r.status_code}, content: {r.text}") - r.raise_for_status() - capabilities = r.json() - result.append((resource_key, capabilities)) + # For mocked QKD format (QKD links returned as lists) + elif isinstance(link, list): + for l in link: + qkdl_id = l.get('uuid') + link_type = l.get('type', 'Direct') + + if link_type == 'DIRECT': + resource_key_direct = f"/link/link[{qkdl_id}]" + result.append((resource_key_direct, l)) + + except requests.RequestException as e: + LOGGER.error(f"Error fetching links from {url}: {e}") + result.append((resource_key, e)) + + return result - elif resource_key in ['node', '__node__']: - LOGGER.info(f"Making GET request to {url} with headers: {headers}") - r = session.get(url, timeout=timeout, verify=False, auth=auth, headers=headers) - LOGGER.info(f"Received response: {r.status_code}, content: {r.text}") - r.raise_for_status() - node = r.json().get('qkd_node', {}) - result.append((resource_key, node)) +def fetch_apps(url: str, resource_key: str, headers: Dict[str, str], auth: Optional[Any], timeout: Optional[int]) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]: + """ + Fetches application data from the QKD node. + """ + result = [] + url += 'qkd_applications/' + + try: + r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers) + r.raise_for_status() + + apps = r.json().get('qkd_applications', {}).get('qkd_app', []) + for app in apps: + result.append((f"/app[{app['app_id']}]", app)) + except requests.RequestException as e: + LOGGER.error(f"Error fetching applications from {url}: {e}") + result.append((resource_key, e)) + + return result - else: - LOGGER.warning(f"Unknown resource key: {resource_key}") - result.append((resource_key, ValueError(f"Unknown resource key: {resource_key}"))) - except requests.exceptions.RequestException as e: - LOGGER.error(f'Exception retrieving/parsing {resource_key} from {url}: {e}') +def fetch_capabilities(url: str, resource_key: str, headers: Dict[str, str], auth: Optional[Any], timeout: Optional[int]) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]: + """ + Fetches capabilities data from the QKD node. + """ + result = [] + url += 'qkdn_capabilities/' + + try: + r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers) + r.raise_for_status() + result.append((resource_key, r.json())) + except requests.RequestException as e: + LOGGER.error(f"Error fetching capabilities from {url}: {e}") result.append((resource_key, e)) + + return result + - LOGGER.info(f"config_getter results for {resource_key}: {result}") +def fetch_node(url: str, resource_key: str, headers: Dict[str, str], auth: Optional[Any], timeout: Optional[int]) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]: + """ + Fetches node data from the QKD node. + """ + result = [] + + try: + r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers) + r.raise_for_status() + result.append((resource_key, r.json().get('qkd_node', {}))) + except requests.RequestException as e: + LOGGER.error(f"Error fetching node from {url}: {e}") + result.append((resource_key, e)) + return result + def create_connectivity_link( root_url: str, link_uuid: str, node_id_src: str, interface_id_src: str, node_id_dst: str, interface_id_dst: str, virt_prev_hop: Optional[str] = None, virt_next_hops: Optional[List[str]] = None, virt_bandwidth: Optional[int] = None, - auth: Optional[HTTPBasicAuth] = None, timeout: Optional[int] = None, headers: Dict[str, str] = {} + auth: Optional[Any] = None, timeout: Optional[int] = None, headers: Dict[str, str] = {} ) -> Union[bool, Exception]: """ Creates a connectivity link between QKD nodes using the provided parameters. """ url = f"{root_url}/restconf/data/etsi-qkd-sdn-node:qkd_node/qkd_links/" - session = get_request_session() - - # Verify that endpoints exist before creating the link - if not (verify_endpoint_existence(session, interface_id_src, root_url, headers) and - verify_endpoint_existence(session, interface_id_dst, root_url, headers)): - LOGGER.error(f"Cannot create link {link_uuid} because one or both endpoints do not exist.") - return Exception(f"Endpoint verification failed for link {link_uuid}") - - is_virtual = bool(virt_prev_hop or virt_next_hops) - + qkd_link = { 'qkdl_id': link_uuid, - 'qkdl_type': 'etsi-qkd-node-types:' + ('VIRT' if is_virtual else 'PHYS'), - 'qkdl_local': { - 'qkdn_id': node_id_src, - 'qkdi_id': interface_id_src - }, - 'qkdl_remote': { - 'qkdn_id': node_id_dst, - 'qkdi_id': interface_id_dst - } + 'qkdl_type': 'etsi-qkd-node-types:' + ('VIRT' if virt_prev_hop or virt_next_hops else 'PHYS'), + 'qkdl_local': {'qkdn_id': node_id_src, 'qkdi_id': interface_id_src}, + 'qkdl_remote': {'qkdn_id': node_id_dst, 'qkdi_id': interface_id_dst} } - if is_virtual: + if virt_prev_hop or virt_next_hops: qkd_link['virt_prev_hop'] = virt_prev_hop qkd_link['virt_next_hop'] = virt_next_hops or [] qkd_link['virt_bandwidth'] = virt_bandwidth - data = {'qkd_links': {'qkd_link': [qkd_link]}} + data = {'qkd_links': {'qkd_link': [qkd_link]}} LOGGER.info(f"Creating connectivity link with payload: {json.dumps(data)}") try: - r = session.post(url, json=data, timeout=timeout, verify=False, auth=auth, headers=headers) - LOGGER.info(f"Received response for link creation: {r.status_code}, content: {r.text}") + r = requests.post(url, json=data, timeout=timeout, verify=False, auth=auth, headers=headers) r.raise_for_status() if r.status_code in HTTP_OK_CODES: LOGGER.info(f"Link {link_uuid} created successfully.") diff --git a/src/device/tests/qkd/integration/test_external_qkd_retrieve_information.py b/src/device/tests/qkd/integration/test_external_qkd_retrieve_information.py index fdf873bdb99ec0477fa3e73f4a6f9c9434c3c384..f2372002db40f52a531e4d18a18972f0bfc900de 100644 --- a/src/device/tests/qkd/integration/test_external_qkd_retrieve_information.py +++ b/src/device/tests/qkd/integration/test_external_qkd_retrieve_information.py @@ -1,36 +1,32 @@ import pytest +import requests import json -import logging import os -from dotenv import load_dotenv -from src.device.service.drivers.qkd.QKDDriver import QKDDriver - -# Load environment variables from .env file -load_dotenv() - -# Set up logging -logging.basicConfig(level=logging.INFO) -LOGGER = logging.getLogger(__name__) - -class SafeJSONEncoder(json.JSONEncoder): - def default(self, obj): - if isinstance(obj, Exception): - return {'error': str(obj), 'type': type(obj).__name__} - return super().default(obj) - -# Dictionary to store retrieved information -retrieved_info = { - "config_qkd1": None, - "config_qkd2": None, - "capabilities_qkd1": None, - "capabilities_qkd2": None, - "interfaces_qkd1": None, - "interfaces_qkd2": None, - "links_qkd1": None, - "links_qkd2": None, - "state_qkd1": None, - "state_qkd2": None, -} +from src.device.service.drivers.qkd.QKDDriver2 import QKDDriver +from src.device.service.drivers.qkd.Tools2 import ( + RESOURCE_INTERFACES, + RESOURCE_LINKS, + RESOURCE_CAPABILITES, + RESOURCE_NODE, + RESOURCE_APPS +) + +# Test ID: INT_LQ_Test_01 (QKD Node Authentication) +# Function to retrieve JWT token +def get_jwt_token(node_address, port, username, password): + """ Retrieve JWT token from a node's login endpoint if it's secured. """ + login_url = f"http://{node_address}:{port}/login" + payload = {'username': username, 'password': password} + try: + print(f"Attempting to retrieve JWT token from {login_url}...") + response = requests.post(login_url, headers={'Content-Type': 'application/x-www-form-urlencoded'}, data=payload) + response.raise_for_status() + print(f"Successfully retrieved JWT token from {login_url}") + return response.json().get('access_token') + except requests.exceptions.RequestException as e: + print(f"Failed to retrieve JWT token from {login_url}: {e}") + return None + # Environment variables for sensitive information QKD1_ADDRESS = os.getenv("QKD1_ADDRESS") @@ -39,119 +35,136 @@ PORT = os.getenv("QKD_PORT") USERNAME = os.getenv("QKD_USERNAME") PASSWORD = os.getenv("QKD_PASSWORD") +# Pytest fixture to initialize QKDDriver with token for Node 1 @pytest.fixture def driver_qkd1(): - return QKDDriver(address=QKD1_ADDRESS, port=PORT, username=USERNAME, password=PASSWORD, use_jwt=True) + token = get_jwt_token(QKD1_ADDRESS, PORT, USERNAME, PASSWORD) + headers = {'Authorization': f'Bearer {token}'} if token else {} + return QKDDriver(address=QKD1_ADDRESS, port=PORT, headers=headers) +# Pytest fixture to initialize QKDDriver with token for Node 2 @pytest.fixture def driver_qkd2(): - return QKDDriver(address=QKD2_ADDRESS, port=PORT, username=USERNAME, password=PASSWORD, use_jwt=True) - -def log_data(label, data): - """Logs data in JSON format with a label.""" - LOGGER.info(f"{label}: {json.dumps(data, indent=2, cls=SafeJSONEncoder)}") - -def get_jwt_token(driver): - """Retrieve JWT token from the driver.""" - try: - return driver._QKDDriver__headers.get('Authorization').split(' ')[1] - except (AttributeError, KeyError, TypeError): - LOGGER.error("Failed to retrieve JWT token") - return None + token = get_jwt_token(QKD2_ADDRESS, PORT, USERNAME, PASSWORD) + headers = {'Authorization': f'Bearer {token}'} if token else {} + return QKDDriver(address=QKD2_ADDRESS, port=PORT, headers=headers) +# Utility function to save data to a JSON file, filtering out non-serializable objects def save_json_file(filename, data): - """Save data to a JSON file.""" + serializable_data = filter_serializable(data) + with open(filename, 'w') as f: + json.dump(serializable_data, f, indent=2) + print(f"Saved data to {filename}") + +# Function to filter out non-serializable objects like HTTPError +def filter_serializable(data): + if isinstance(data, list): + return [filter_serializable(item) for item in data if not isinstance(item, requests.exceptions.RequestException)] + elif isinstance(data, dict): + return {key: filter_serializable(value) for key, value in data.items() if not isinstance(value, requests.exceptions.RequestException)} + return data + +# Utility function to print the retrieved data for debugging, handling errors +def print_data(label, data): try: - with open(filename, 'w') as f: - json.dump(data, f, indent=2) - LOGGER.info(f"Successfully saved {filename}") - except Exception as e: - LOGGER.error(f"Failed to save {filename}: {e}") - -def retrieve_data(driver, label, method, *args): - """Retrieve data from the driver and log it.""" + print(f"{label}: {json.dumps(data, indent=2)}") + except TypeError as e: + print(f"Error printing {label}: {e}, Data: {data}") + +# General function to retrieve and handle HTTP errors +def retrieve_data(driver_qkd, resource, resource_name): try: - data = method(*args) - log_data(label, data) + data = driver_qkd.GetConfig([resource]) + assert isinstance(data, list), f"Expected a list for {resource_name}" + assert len(data) > 0, f"No {resource_name} found in the system" return data - except Exception as e: - LOGGER.error(f"Failed to retrieve {label}: {e}") + except requests.exceptions.HTTPError as e: + print(f"HTTPError while fetching {resource_name}: {e}") + return None + except AssertionError as e: + print(f"AssertionError: {e}") return None -def test_retrieve_and_create_descriptor(driver_qkd1, driver_qkd2): - # Connect to both QKD nodes - assert driver_qkd1.Connect(), "Failed to connect to QKD1" - assert driver_qkd2.Connect(), "Failed to connect to QKD2" - - # Use the same JWT token for all requests - jwt_token = get_jwt_token(driver_qkd1) - assert jwt_token, "Failed to retrieve JWT token from QKD1" - driver_qkd2._QKDDriver__headers['Authorization'] = f'Bearer {jwt_token}' - - # Retrieve configurations - retrieved_info['config_qkd1'] = retrieve_data(driver_qkd1, "QKD1 Initial Config", driver_qkd1.GetInitialConfig) - retrieved_info['config_qkd2'] = retrieve_data(driver_qkd2, "QKD2 Initial Config", driver_qkd2.GetInitialConfig) - - # Retrieve capabilities - retrieved_info['capabilities_qkd1'] = retrieve_data(driver_qkd1, "QKD1 Capabilities", driver_qkd1.GetConfig, ['capabilities']) - retrieved_info['capabilities_qkd2'] = retrieve_data(driver_qkd2, "QKD2 Capabilities", driver_qkd2.GetConfig, ['capabilities']) - - # Retrieve interfaces - retrieved_info['interfaces_qkd1'] = retrieve_data(driver_qkd1, "QKD1 Interfaces", driver_qkd1.GetConfig, ['interfaces']) - retrieved_info['interfaces_qkd2'] = retrieve_data(driver_qkd2, "QKD2 Interfaces", driver_qkd2.GetConfig, ['interfaces']) - - # Retrieve links - retrieved_info['links_qkd1'] = retrieve_data(driver_qkd1, "QKD1 Links", driver_qkd1.GetConfig, ['links']) - retrieved_info['links_qkd2'] = retrieve_data(driver_qkd2, "QKD2 Links", driver_qkd2.GetConfig, ['links']) - - # Retrieve states - retrieved_info['state_qkd1'] = retrieve_data(driver_qkd1, "QKD1 Current State", driver_qkd1.GetState) - retrieved_info['state_qkd2'] = retrieve_data(driver_qkd2, "QKD2 Current State", driver_qkd2.GetState) - - # Save retrieved information - save_json_file('retrieved_info.json', retrieved_info) - - # Create descriptor dynamically - descriptor = { - "contexts": [{"context_id": {"context_uuid": {"uuid": "admin"}}}], - "topologies": [{"topology_id": {"topology_uuid": {"uuid": "admin"}, "context_id": {"context_uuid": {"uuid": "admin"}}}}], - "devices": [], - "links": [] +# Test ID: INT_LQ_Test_02 (QKD Node Capabilities) +def retrieve_capabilities(driver_qkd, node_name): + capabilities = retrieve_data(driver_qkd, RESOURCE_CAPABILITES, "capabilities") + if capabilities: + print_data(f"{node_name} Capabilities", capabilities) + return capabilities + +# Test ID: INT_LQ_Test_03 (QKD Interfaces) +def retrieve_interfaces(driver_qkd, node_name): + interfaces = retrieve_data(driver_qkd, RESOURCE_INTERFACES, "interfaces") + if interfaces: + print_data(f"{node_name} Interfaces", interfaces) + return interfaces + +# Test ID: INT_LQ_Test_04 (QKD Links) +def retrieve_links(driver_qkd, node_name): + links = retrieve_data(driver_qkd, RESOURCE_LINKS, "links") + if links: + print_data(f"{node_name} Links", links) + return links + +# Test ID: INT_LQ_Test_05 (QKD Link Metrics) +def retrieve_link_metrics(driver_qkd, node_name): + links = retrieve_links(driver_qkd, node_name) + if links: + for link in links: + if 'performance_metrics' in link[1]: + print_data(f"{node_name} Link Metrics", link[1]['performance_metrics']) + else: + print(f"No metrics found for link {link[0]}") + return links + +# Test ID: INT_LQ_Test_06 (QKD Applications) +def retrieve_applications(driver_qkd, node_name): + applications = retrieve_data(driver_qkd, RESOURCE_APPS, "applications") + if applications: + print_data(f"{node_name} Applications", applications) + return applications + +# Test ID: INT_LQ_Test_07 (System Health Check) +def retrieve_node_data(driver_qkd, node_name): + node_data = retrieve_data(driver_qkd, RESOURCE_NODE, "node data") + if node_data: + print_data(f"{node_name} Node Data", node_data) + return node_data + +# Main test to retrieve and save data from QKD1 and QKD2 to files +def test_retrieve_and_save_data(driver_qkd1, driver_qkd2): + # Retrieve data for QKD1 + qkd1_interfaces = retrieve_interfaces(driver_qkd1, "QKD1") + qkd1_links = retrieve_links(driver_qkd1, "QKD1") + qkd1_capabilities = retrieve_capabilities(driver_qkd1, "QKD1") + qkd1_node_data = retrieve_node_data(driver_qkd1, "QKD1") + qkd1_apps = retrieve_applications(driver_qkd1, "QKD1") + + qkd1_data = { + "interfaces": qkd1_interfaces, + "links": qkd1_links, + "capabilities": qkd1_capabilities, + "apps": qkd1_apps, + "node_data": qkd1_node_data + } + + # Save QKD1 data to file + save_json_file('qkd1_data.json', qkd1_data) + + # Retrieve data for QKD2 + qkd2_interfaces = retrieve_interfaces(driver_qkd2, "QKD2") + qkd2_links = retrieve_links(driver_qkd2, "QKD2") + qkd2_capabilities = retrieve_capabilities(driver_qkd2, "QKD2") + qkd2_node_data = retrieve_node_data(driver_qkd2, "QKD2") + qkd2_apps = retrieve_applications(driver_qkd2, "QKD2") + + qkd2_data = { + "interfaces": qkd2_interfaces, + "links": qkd2_links, + "capabilities": qkd2_capabilities, + "apps": qkd2_apps, + "node_data": qkd2_node_data } - # Add device information to descriptor - for config, token, interfaces, device_name, address in [ - (retrieved_info['config_qkd1'], jwt_token, retrieved_info['interfaces_qkd1'], "QKD1", QKD1_ADDRESS), - (retrieved_info['config_qkd2'], jwt_token, retrieved_info['interfaces_qkd2'], "QKD2", QKD2_ADDRESS) - ]: - device_info = { - "device_id": {"device_uuid": {"uuid": device_name}}, - "device_type": "qkd-node", - "device_operational_status": 0, - "device_drivers": [12], - "device_endpoints": [], - "device_config": { - "config_rules": [ - {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": address}}, - {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": PORT}}, - {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {"scheme": "http", "token": token}}} - ] - } - } - descriptor['devices'].append(device_info) - - # Create links based on retrieved link data - if retrieved_info['links_qkd1'] and retrieved_info['links_qkd2']: - for link_data in retrieved_info['links_qkd1']: - link_entry = { - "link_id": {"link_uuid": {"uuid": f"QKD1/{QKD1_ADDRESS}:{PORT}==QKD2/{retrieved_info['links_qkd2'][0][1]['qkdi_status']}/{PORT}"}}, - "link_endpoint_ids": [ - {"device_id": {"device_uuid": {"uuid": "QKD1"}}, "endpoint_uuid": {"uuid": f"{QKD1_ADDRESS}:{PORT}"}}, - {"device_id": {"device_uuid": {"uuid": "QKD2"}}, "endpoint_uuid": {"uuid": f"{QKD2_ADDRESS}:{PORT}"}} - ] - } - descriptor['links'].append(link_entry) - - # Save the dynamically created descriptor - save_json_file('descriptor.json', descriptor) - log_data("Created Descriptor", descriptor) + # Save QKD2 data to file + save_json_file('qkd2_data.json', qkd2_data) diff --git a/src/device/tests/qkd/unit/PrepareScenario.py b/src/device/tests/qkd/unit/PrepareScenario.py new file mode 100644 index 0000000000000000000000000000000000000000..3716bd444cd4ccd40342450e440da4b03848b060 --- /dev/null +++ b/src/device/tests/qkd/unit/PrepareScenario.py @@ -0,0 +1,112 @@ +import pytest, os, time, logging +from common.Constants import ServiceNameEnum +from common.Settings import ( + ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_HTTP, + get_env_var_name, get_service_port_http +) +from context.client.ContextClient import ContextClient +from nbi.service.rest_server.RestServer import RestServer +from nbi.service.rest_server.nbi_plugins.tfs_api import register_tfs_api +from device.client.DeviceClient import DeviceClient +from device.service.DeviceService import DeviceService +from device.service.driver_api.DriverFactory import DriverFactory +from device.service.driver_api.DriverInstanceCache import DriverInstanceCache +from device.service.drivers import DRIVERS +from device.tests.CommonObjects import CONTEXT, TOPOLOGY +from device.tests.MockService_Dependencies import MockService_Dependencies +from monitoring.client.MonitoringClient import MonitoringClient +from requests import codes as requests_codes +import requests + +# Constants +LOCAL_HOST = '127.0.0.1' +MOCKSERVICE_PORT = 8080 + +# Get dynamic port for NBI service +NBI_SERVICE_PORT = MOCKSERVICE_PORT + get_service_port_http(ServiceNameEnum.NBI) + +# Set environment variables for the NBI service host and port +os.environ[get_env_var_name(ServiceNameEnum.NBI, ENVVAR_SUFIX_SERVICE_HOST)] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.NBI, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(NBI_SERVICE_PORT) + +# Expected status codes for requests +EXPECTED_STATUS_CODES = {requests_codes['OK'], requests_codes['CREATED'], requests_codes['ACCEPTED'], requests_codes['NO_CONTENT']} + +# Debugging output for the port number +print(f"MOCKSERVICE_PORT: {MOCKSERVICE_PORT}") +print(f"NBI_SERVICE_PORT: {NBI_SERVICE_PORT}") + +@pytest.fixture(scope='session') +def mock_service(): + _service = MockService_Dependencies(MOCKSERVICE_PORT) + _service.configure_env_vars() + _service.start() + yield _service + _service.stop() + +@pytest.fixture(scope='session') +def nbi_service_rest(mock_service): # Pass the `mock_service` as an argument if needed + _rest_server = RestServer() + register_tfs_api(_rest_server) # Register the TFS API with the REST server + _rest_server.start() + time.sleep(1) # Give time for the server to start + yield _rest_server + _rest_server.shutdown() + _rest_server.join() + +@pytest.fixture(scope='session') +def context_client(mock_service): + _client = ContextClient() + yield _client + _client.close() + +@pytest.fixture(scope='session') +def device_service(context_client, monitoring_client): + _driver_factory = DriverFactory(DRIVERS) + _driver_instance_cache = DriverInstanceCache(_driver_factory) + _service = DeviceService(_driver_instance_cache) + _service.start() + yield _service + _service.stop() + +@pytest.fixture(scope='session') +def device_client(device_service): + _client = DeviceClient() + yield _client + _client.close() + +# General request function +def do_rest_request(method, url, body=None, timeout=10, allow_redirects=True, logger=None): + # Construct the request URL with NBI service port + request_url = f"http://{LOCAL_HOST}:{NBI_SERVICE_PORT}{url}" + + # Log the request details for debugging + if logger: + msg = f"Request: {method.upper()} {request_url}" + if body: + msg += f" body={body}" + logger.warning(msg) + + # Send the request + reply = requests.request(method, request_url, timeout=timeout, json=body, allow_redirects=allow_redirects) + + # Log the response details for debugging + if logger: + logger.warning(f"Reply: {reply.text}") + + # Print status code and response for debugging instead of asserting + print(f"Status code: {reply.status_code}") + print(f"Response: {reply.text}") + + # Return the JSON response if present + if reply.content: + return reply.json() + return None + +# Function for GET requests +def do_rest_get_request(url, body=None, timeout=10, allow_redirects=True, logger=None): + return do_rest_request('get', url, body, timeout, allow_redirects, logger=logger) + +# Function for POST requests +def do_rest_post_request(url, body=None, timeout=10, allow_redirects=True, logger=None): + return do_rest_request('post', url, body, timeout, allow_redirects, logger=logger) diff --git a/src/device/tests/qkd/unit/retrieve_device_mock_information.py b/src/device/tests/qkd/unit/retrieve_device_mock_information.py index 20074924b72eb1557a6af72221e634a4cfe54346..e869fc2848e2d5bf637c0ce10b480dafb4689ef3 100644 --- a/src/device/tests/qkd/unit/retrieve_device_mock_information.py +++ b/src/device/tests/qkd/unit/retrieve_device_mock_information.py @@ -1,70 +1,90 @@ -import unittest -from unittest.mock import patch, MagicMock +import logging, urllib +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME +from common.proto.context_pb2 import ContextId +from common.tools.descriptor.Loader import DescriptorLoader from context.client.ContextClient import ContextClient -from common.proto.context_pb2 import Empty - -def retrieve_descriptor_information(): - client = ContextClient() - contexts = client.ListContexts(Empty()) - topologies = client.ListTopologies(contexts.contexts[0].context_id) - devices = client.ListDevices(Empty()) - links = client.ListLinks(Empty()) - - return { - 'contexts': contexts, - 'topologies': topologies, - 'devices': devices, - 'links': links, - } - -class TestRetrieveDescriptorInformation(unittest.TestCase): - - @patch('context.client.ContextClient.ContextClient') - def test_retrieve_descriptor_information(self, MockContextClient): - # Setup mock responses - mock_client = MagicMock() - MockContextClient.return_value = mock_client - - # Mocking ListContexts response - context_mock = MagicMock() - context_mock.contexts = [MagicMock()] - context_mock.contexts[0].context_id.context_uuid.uuid = "admin" - mock_client.ListContexts.return_value = context_mock - - # Mocking ListTopologies response - topology_mock = MagicMock() - topology_mock.topologies = [MagicMock()] - topology_mock.topologies[0].topology_id.topology_uuid.uuid = "admin" - mock_client.ListTopologies.return_value = topology_mock - - # Mocking ListDevices response - device_mock = MagicMock() - device_mock.devices = [MagicMock()] - device_mock.devices[0].device_id.device_uuid.uuid = "QKD1" - device_mock.devices[0].device_type = "qkd-node" - device_mock.devices[0].device_operational_status = 0 - device_mock.devices[0].device_drivers = [12] - mock_client.ListDevices.return_value = device_mock - - # Mocking ListLinks response - link_mock = MagicMock() - link_mock.links = [MagicMock()] - link_mock.links[0].link_id.link_uuid.uuid = "QKD1/10.211.36.220:1001==QKD2/10.211.36.220:2001" - mock_client.ListLinks.return_value = link_mock - - # Call the function and verify - result = retrieve_descriptor_information() - - mock_client.ListContexts.assert_called_once_with(Empty()) - mock_client.ListTopologies.assert_called_once_with(context_mock.contexts[0].context_id) - mock_client.ListDevices.assert_called_once_with(Empty()) - mock_client.ListLinks.assert_called_once_with(Empty()) - - # Assertions to verify the expected structure - self.assertEqual(result['contexts'].contexts[0].context_id.context_uuid.uuid, "admin") - self.assertEqual(result['topologies'].topologies[0].topology_id.topology_uuid.uuid, "admin") - self.assertEqual(result['devices'].devices[0].device_id.device_uuid.uuid, "QKD1") - self.assertEqual(result['links'].links[0].link_id.link_uuid.uuid, "QKD1/10.211.36.220:1001==QKD2/10.211.36.220:2001") - -if __name__ == "__main__": - unittest.main() +from nbi.service.rest_server.RestServer import RestServer +from common.tools.object_factory.Context import json_context_id +from device.tests.qkd.unit.PrepareScenario import mock_service, nbi_service_rest, do_rest_get_request + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME) +ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID) + + +# ----- Context -------------------------------------------------------------------------------------------------------- + +def test_rest_get_context_ids(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument + reply = do_rest_get_request('/tfs-api/context_ids') + print("Context IDs:", reply) + +def test_rest_get_contexts(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument + reply = do_rest_get_request('/tfs-api/contexts') + print("Contexts:", reply) + +def test_rest_get_context(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument + context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME) + reply = do_rest_get_request(f'/tfs-api/context/{context_uuid}') + print("Context data:", reply) + + +# ----- Topology ------------------------------------------------------------------------------------------------------- + +def test_rest_get_topology_ids(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument + context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME) + reply = do_rest_get_request(f'/tfs-api/context/{context_uuid}/topology_ids') + print("Topology IDs:", reply) + +def test_rest_get_topologies(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument + context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME) + reply = do_rest_get_request(f'/tfs-api/context/{context_uuid}/topologies') + print("Topologies:", reply) + +def test_rest_get_topology(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument + context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME) + topology_uuid = urllib.parse.quote(DEFAULT_TOPOLOGY_NAME) + reply = do_rest_get_request(f'/tfs-api/context/{context_uuid}/topology/{topology_uuid}') + print("Topology data:", reply) + + +# ----- Device --------------------------------------------------------------------------------------------------------- + +def test_rest_get_device_ids(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument + reply = do_rest_get_request('/tfs-api/device_ids') + print("Device IDs:", reply) + +def test_rest_get_devices(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument + reply = do_rest_get_request('/tfs-api/devices') + print("Devices:", reply) + + +# ----- Link ----------------------------------------------------------------------------------------------------------- + +def test_rest_get_link_ids(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument + reply = do_rest_get_request('/tfs-api/link_ids') + print("Link IDs:", reply) + +def test_rest_get_links(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument + reply = do_rest_get_request('/tfs-api/links') + print("Links:", reply) + + +# ----- Service -------------------------------------------------------------------------------------------------------- + +def test_rest_get_service_ids(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument + reply = do_rest_get_request('/tfs-api/link_ids') + print("Service IDs:", reply) + +def test_rest_get_topologies(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument + context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME) + reply = do_rest_get_request(f'/tfs-api/context/{context_uuid}/services') + print("Services:", reply) + +# ----- Apps ----------------------------------------------------------------------------------------------------------- + +def test_rest_get_apps(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument + context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME) # Context ID + reply = do_rest_get_request(f'/tfs-api/context/{context_uuid}/apps') + print("Apps:", reply) diff --git a/src/device/tests/qkd/unit/test_application_deployment.py b/src/device/tests/qkd/unit/test_application_deployment.py new file mode 100644 index 0000000000000000000000000000000000000000..005730e3b4409405ca5ec0273280012e4c10be0b --- /dev/null +++ b/src/device/tests/qkd/unit/test_application_deployment.py @@ -0,0 +1,30 @@ +import pytest +import json +from src.device.service.drivers.qkd.QKDDriver import QKDDriver + +@pytest.fixture +def qkd_driver(): + # Initialize the QKD driver with the appropriate settings + return QKDDriver(address='10.211.36.220', port=11111, username='user', password='pass') + +def test_application_deployment(qkd_driver): + qkd_driver.Connect() + + # Application registration data + app_data = { + 'qkd_app': [ + { + 'app_id': '00000001-0001-0000-0000-000000000001', + 'client_app_id': [], + 'app_statistics': {'statistics': []}, + 'app_qos': {}, + 'backing_qkdl_id': [] + } + ] + } + + # Send a POST request to create the application + response = qkd_driver.SetConfig([('/qkd_applications/qkd_app', json.dumps(app_data))]) + + # Verify response + assert response[0] is True, "Expected application registration to succeed" diff --git a/src/device/tests/qkd/unit/test_qkd_configuration.py b/src/device/tests/qkd/unit/test_qkd_configuration.py index 179e072fa53aae999939968fa2e2b6db8eec2301..c48799b2e962321b60f23cfb48688e63e253879c 100644 --- a/src/device/tests/qkd/unit/test_qkd_configuration.py +++ b/src/device/tests/qkd/unit/test_qkd_configuration.py @@ -1,93 +1,213 @@ import pytest -from src.device.service.drivers.qkd.QKDDriver2 import QKDDriver import json +from requests.exceptions import HTTPError +from src.device.service.drivers.qkd.QKDDriver2 import QKDDriver +import requests +from src.device.service.drivers.qkd.Tools2 import ( + RESOURCE_INTERFACES, + RESOURCE_LINKS, + RESOURCE_ENDPOINTS, + RESOURCE_APPS, + RESOURCE_CAPABILITES, + RESOURCE_NODE +) @pytest.fixture def qkd_driver(): - return QKDDriver(address='10.211.36.220', port=11111, username='user', password='pass') + # Initialize the QKD driver with the appropriate settings, ensure correct JWT headers are included + token = "YOUR_JWT_TOKEN" # Replace with your actual JWT token + if not token: + pytest.fail("JWT token is missing. Make sure to generate a valid JWT token.") + headers = {"Authorization": f"Bearer {token}"} + return QKDDriver(address='10.211.36.220', port=11111, headers=headers) -# Deliverable Test ID: SBI_Test_03 (Initial Config Retrieval) +# Utility function to print the retrieved data for debugging +def print_data(label, data): + print(f"{label}: {json.dumps(data, indent=2)}") + +# Test ID: SBI_Test_03 (Initial Config Retrieval) def test_initial_config_retrieval(qkd_driver): qkd_driver.Connect() - # Retrieve and print initial config + # Retrieve and validate the initial configuration config = qkd_driver.GetInitialConfig() - print("Initial Config:", json.dumps(config, indent=2)) - assert isinstance(config, list) - assert len(config) > 0 - assert isinstance(config[0], tuple) - assert config[0][0] == 'qkd_node' - assert isinstance(config[0][1], dict) + # Since GetInitialConfig returns a list, adjust the assertions accordingly + assert isinstance(config, list), "Expected a list for initial config" + assert len(config) > 0, "Initial config should not be empty" + + # Output for debugging + print_data("Initial Config", config) + +# Test ID: INT_LQ_Test_05 (QKD Devices Retrieval) +def test_retrieve_devices(qkd_driver): + qkd_driver.Connect() + + # Retrieve and validate device information + devices = qkd_driver.GetConfig([RESOURCE_NODE]) + assert isinstance(devices, list), "Expected a list of devices" + + if not devices: + pytest.skip("No devices found in the system. Skipping device test.") + + for device in devices: + assert isinstance(device, tuple), "Each device entry must be a tuple" + assert isinstance(device[1], dict), "Device data must be a dictionary" + if isinstance(device[1], Exception): + pytest.fail(f"Error retrieving devices: {device[1]}") + + # Output for debugging + print_data("Devices", devices) -# Deliverable Test ID: INT_LQ_Test_04 (QKD Links Retrieval) +# Test ID: INT_LQ_Test_04 (QKD Links Retrieval) def test_retrieve_links(qkd_driver): qkd_driver.Connect() + + try: + # Fetch the links using the correct resource key + links = qkd_driver.GetConfig([RESOURCE_LINKS]) + assert isinstance(links, list), "Expected a list of tuples (resource key, data)." + + if len(links) == 0: + pytest.skip("No links found in the system, skipping link validation.") + + for link in links: + assert isinstance(link, tuple), "Each link entry must be a tuple" + resource_key, link_data = link # Unpack the tuple + + # Handle HTTPError or exception in the response + if isinstance(link_data, requests.exceptions.HTTPError): + pytest.fail(f"Failed to retrieve links due to HTTP error: {link_data}") + + if isinstance(link_data, dict): + # For real QKD data (links as dictionaries) + assert 'qkdl_id' in link_data, "Missing 'qkdl_id' in link data" + assert 'qkdl_local' in link_data, "Missing 'qkdl_local' in link data" + assert 'qkdl_remote' in link_data, "Missing 'qkdl_remote' in link data" + assert 'qkdl_type' in link_data, "Missing 'qkdl_type' in link data" + + # Check 'virt_prev_hop' only for virtual links (VIRT) + if link_data['qkdl_type'] == 'etsi-qkd-node-types:VIRT': + virt_prev_hop = link_data.get('virt_prev_hop') + assert virt_prev_hop is None or re.match(r'[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}', str(virt_prev_hop)), \ + f"Invalid 'virt_prev_hop': {virt_prev_hop}" + + # Print out the link details for debugging + print(f"Link ID: {link_data['qkdl_id']}") + print(f"Link Type: {link_data['qkdl_type']}") + print(f"Local QKD: {json.dumps(link_data['qkdl_local'], indent=2)}") + print(f"Remote QKD: {json.dumps(link_data['qkdl_remote'], indent=2)}") + + elif isinstance(link_data, list): + # For mocked QKD data (links as lists of dictionaries) + for mock_link in link_data: + assert 'uuid' in mock_link, "Missing 'uuid' in mocked link data" + assert 'src_qkdn_id' in mock_link, "Missing 'src_qkdn_id' in mocked link data" + assert 'dst_qkdn_id' in mock_link, "Missing 'dst_qkdn_id' in mocked link data" + + # Print out the mocked link details for debugging + print(f"Mock Link ID: {mock_link['uuid']}") + print(f"Source QKD ID: {mock_link['src_qkdn_id']}") + print(f"Destination QKD ID: {mock_link['dst_qkdn_id']}") + + else: + pytest.fail(f"Unexpected link data format: {type(link_data)}") + + except HTTPError as e: + pytest.fail(f"HTTP error occurred while retrieving links: {e}") + except Exception as e: + pytest.fail(f"An unexpected error occurred: {e}") + +# Test for QKD Services +def test_retrieve_services(qkd_driver): + qkd_driver.Connect() + services = qkd_driver.GetConfig([RESOURCE_ENDPOINTS]) + assert isinstance(services, list), "Expected a list of services" + + if not services: + pytest.skip("No services found in the system. Skipping service test.") + + for service in services: + assert isinstance(service, tuple), "Each service entry must be a tuple" + assert isinstance(service[1], dict), "Service data must be a dictionary" + if isinstance(service[1], Exception): + pytest.fail(f"Error retrieving services: {service[1]}") + + print("Services:", json.dumps(services, indent=2)) + +# Test ID: INT_LQ_Test_07 (QKD Applications Retrieval) +def test_retrieve_applications(qkd_driver): + qkd_driver.Connect() - # Retrieve and print link information - links = qkd_driver.GetConfig(['links']) + # Retrieve and validate applications information + applications = qkd_driver.GetConfig([RESOURCE_APPS]) # Adjust to fetch applications using the correct key + assert isinstance(applications, list), "Expected a list of applications" - if not links: - pytest.fail("No links found in the system.") + if not applications: + pytest.skip("No applications found in the system. Skipping applications test.") - if isinstance(links[0][1], Exception): - print(f"Error retrieving links: {links[0][1]}") - else: - print("Links:", json.dumps(links, indent=2)) + for app in applications: + assert isinstance(app, tuple), "Each application entry must be a tuple" + assert isinstance(app[1], dict), "Application data must be a dictionary" + if isinstance(app[1], Exception): + pytest.fail(f"Error retrieving applications: {app[1]}") - assert isinstance(links, list) - assert len(links) > 0 + # Output for debugging + print_data("Applications", applications) -# Deliverable Test ID: INT_LQ_Test_03 (QKD Interfaces Retrieval) +# Test ID: INT_LQ_Test_03 (QKD Interfaces Retrieval) def test_retrieve_interfaces(qkd_driver): qkd_driver.Connect() - # Retrieve and print interface information - interfaces = qkd_driver.GetConfig(['interfaces']) + # Retrieve and validate interface information + interfaces = qkd_driver.GetConfig([RESOURCE_INTERFACES]) - if not interfaces: - pytest.fail("No interfaces found in the system.") + assert isinstance(interfaces, list), "Expected a list of interfaces" + assert len(interfaces) > 0, "No interfaces found in the system" - if isinstance(interfaces[0][1], Exception): - print(f"Error retrieving interfaces: {interfaces[0][1]}") - else: - print("Interfaces:", json.dumps(interfaces, indent=2)) + for interface in interfaces: + assert isinstance(interface, tuple), "Each interface entry must be a tuple" + assert isinstance(interface[1], dict), "Interface data must be a dictionary" + if isinstance(interface[1], Exception): + pytest.fail(f"Error retrieving interfaces: {interface[1]}") - assert isinstance(interfaces, list) - assert len(interfaces) > 0 + # Output for debugging + print_data("Interfaces", interfaces) -# Deliverable Test ID: INT_LQ_Test_02 (QKD Capabilities Retrieval) +# Test ID: INT_LQ_Test_02 (QKD Capabilities Retrieval) def test_retrieve_capabilities(qkd_driver): qkd_driver.Connect() - # Retrieve and print capabilities information - capabilities = qkd_driver.GetConfig(['capabilities']) + # Retrieve and validate capabilities information + capabilities = qkd_driver.GetConfig([RESOURCE_CAPABILITES]) - if not capabilities: - pytest.fail("No capabilities found in the system.") + assert isinstance(capabilities, list), "Expected a list of capabilities" + assert len(capabilities) > 0, "No capabilities found in the system" - if isinstance(capabilities[0][1], Exception): - print(f"Error retrieving capabilities: {capabilities[0][1]}") - else: - print("Capabilities:", json.dumps(capabilities, indent=2)) + for capability in capabilities: + assert isinstance(capability, tuple), "Each capability entry must be a tuple" + assert isinstance(capability[1], dict), "Capability data must be a dictionary" + if isinstance(capability[1], Exception): + pytest.fail(f"Error retrieving capabilities: {capability[1]}") - assert isinstance(capabilities, list) - assert len(capabilities) > 0 + # Output for debugging + print_data("Capabilities", capabilities) -# Deliverable Test ID: INT_LQ_Test_03 (QKD Endpoints Retrieval) +# Test ID: INT_LQ_Test_03 (QKD Endpoints Retrieval) def test_retrieve_endpoints(qkd_driver): qkd_driver.Connect() - # Retrieve and print endpoint information - endpoints = qkd_driver.GetConfig(['endpoints']) + # Retrieve and validate endpoint information + endpoints = qkd_driver.GetConfig([RESOURCE_ENDPOINTS]) - if not endpoints: - pytest.fail("No endpoints found in the system.") + assert isinstance(endpoints, list), "Expected a list of endpoints" + assert len(endpoints) > 0, "No endpoints found in the system" - if isinstance(endpoints[0][1], Exception): - print(f"Error retrieving endpoints: {endpoints[0][1]}") - else: - print("Endpoints:", json.dumps(endpoints, indent=2)) + for endpoint in endpoints: + assert isinstance(endpoint, tuple), "Each endpoint entry must be a tuple" + assert isinstance(endpoint[1], dict), "Endpoint data must be a dictionary" + if isinstance(endpoint[1], Exception): + pytest.fail(f"Error retrieving endpoints: {endpoint[1]}") - assert isinstance(endpoints, list) - assert len(endpoints) > 0 + # Output for debugging + print_data("Endpoints", endpoints) diff --git a/src/device/tests/qkd/unit/test_qkd_error_hanling.py b/src/device/tests/qkd/unit/test_qkd_error_hanling.py index a053e819d14a5093e2d0b5681728df61a5776083..2fbfa4a1529bfffcbe93f68adb9750f92af93566 100644 --- a/src/device/tests/qkd/unit/test_qkd_error_hanling.py +++ b/src/device/tests/qkd/unit/test_qkd_error_hanling.py @@ -1,22 +1,78 @@ -import json import pytest +from requests.exceptions import HTTPError from src.device.service.drivers.qkd.QKDDriver2 import QKDDriver +from requests.exceptions import ConnectionError, Timeout +import requests -def test_error_handling_invalid_operations(): - driver = QKDDriver(address='10.211.36.220', port=11111, username='user', password='pass') - driver.Connect() - result = driver.SetConfig([('/invalid/resource', json.dumps({'invalid': 'data'}))]) +@pytest.fixture +def qkd_driver(): + # Initialize the QKD driver for testing + return QKDDriver(address='10.211.36.220', port=11111, username='user', password='pass') - # Print the result for debugging purposes - print("Result of SetConfig with invalid data:", result) +def test_invalid_operations_on_network_links(qkd_driver): + """ + Test Case ID: SBI_Test_09 - Perform invalid operations and validate error handling. + Objective: Perform invalid operations on network links and ensure proper error handling and logging. + """ + qkd_driver.Connect() - # Check if the result contains ValueError for invalid resource keys - assert all(isinstance(res, ValueError) for res in result), "Expected ValueError for invalid operations" + # Step 1: Perform invalid operation with an incorrect resource key + invalid_payload = { + "invalid_resource_key": { + "invalid_field": "invalid_value" + } + } -def test_network_failure(): - driver = QKDDriver(address='10.211.36.220', port=11111, username='user', password='pass') - driver.Connect() - # Simulate network failure by disconnecting the mock server - # This would require mock server modification to simulate downtime - result = driver.GetConfig(['/qkd_interfaces/qkd_interface']) - assert result == [] # Expecting an empty list instead of None + try: + # Attempt to perform an invalid operation (simulate wrong resource key) + response = requests.post(f'http://{qkd_driver.address}/invalid_resource', json=invalid_payload) + response.raise_for_status() + + except HTTPError as e: + # Step 2: Validate proper error handling and user-friendly messages + print(f"Handled HTTPError: {e}") + assert e.response.status_code in [400, 404], "Expected 400 Bad Request or 404 Not Found for invalid operation." + if e.response.status_code == 404: + assert "Not Found" in e.response.text, "Expected user-friendly 'Not Found' message." + elif e.response.status_code == 400: + assert "Invalid resource key" in e.response.text, "Expected user-friendly 'Bad Request' message." + + except Exception as e: + # Log unexpected exceptions + pytest.fail(f"Unexpected error occurred: {e}") + + finally: + qkd_driver.Disconnect() + +def test_network_failure_simulation(qkd_driver): + """ + Test Case ID: SBI_Test_10 - Simulate network failures and validate resilience and recovery. + Objective: Simulate network failures (e.g., QKD node downtime) and validate system's resilience. + """ + qkd_driver.Connect() + + try: + # Step 1: Simulate network failure (disconnect QKD node, or use unreachable address/port) + qkd_driver_with_failure = QKDDriver(address='10.211.36.220', port=12345, username='user', password='pass') # Valid but incorrect port + + # Try to connect and retrieve state, expecting a failure + response = qkd_driver_with_failure.GetState() + + # Step 2: Validate resilience and recovery mechanisms + # Check if the response is empty, indicating a failure to retrieve state + if not response: + print("Network failure simulated successfully and handled.") + else: + pytest.fail("Expected network failure but received a valid response.") + + except HTTPError as e: + # Log HTTP errors as part of error handling + print(f"Handled network failure error: {e}") + + except Exception as e: + # Step 3: Log unexpected exceptions + print(f"Network failure encountered: {e}") + + finally: + # Step 4: Ensure driver disconnects properly + qkd_driver.Disconnect() diff --git a/src/device/tests/qkd/unit/test_qkd_mock_connectivity.py b/src/device/tests/qkd/unit/test_qkd_mock_connectivity.py index 49afc8efdde6e9e2446c590e5f757cdbba54586c..0874675c02ea6438146c4521de223a30c0105159 100644 --- a/src/device/tests/qkd/unit/test_qkd_mock_connectivity.py +++ b/src/device/tests/qkd/unit/test_qkd_mock_connectivity.py @@ -1,6 +1,6 @@ import pytest from unittest.mock import patch -from src.device.service.drivers.qkd.QKDDriver2 import QKDDriver +from src.device.service.drivers.qkd.QKDDriver import QKDDriver import requests @pytest.fixture diff --git a/src/device/tests/qkd/unit/test_qkd_security.py b/src/device/tests/qkd/unit/test_qkd_security.py index a9602c7833b58e3d702430b92090842fc440bf33..389a5fc1ab246567714ed6cef6b6d6745d541c31 100644 --- a/src/device/tests/qkd/unit/test_qkd_security.py +++ b/src/device/tests/qkd/unit/test_qkd_security.py @@ -1,45 +1,72 @@ -# test_qkd_security.py - -import os import pytest -import requests -import jwt +import json +import os +from requests.exceptions import HTTPError from src.device.service.drivers.qkd.QKDDriver2 import QKDDriver +from src.device.service.drivers.qkd.Tools2 import RESOURCE_CAPABILITES +import requests + +# Helper function to print data in a formatted JSON style for debugging +def print_data(label, data): + print(f"{label}: {json.dumps(data, indent=2)}") -SECRET_KEY = "your_secret_key" +# Environment variables for sensitive information +QKD1_ADDRESS = os.getenv("QKD1_ADDRESS") +PORT = os.getenv("QKD_PORT") +USERNAME = os.getenv("QKD_USERNAME") +PASSWORD = os.getenv("QKD_PASSWORD") -def generate_jwt_token(username: str) -> str: - return jwt.encode({'username': username}, SECRET_KEY, algorithm='HS256') -@pytest.fixture() -def enable_bypass_auth(request): - # Backup the original value of BYPASS_AUTH - original_bypass_auth = os.getenv('BYPASS_AUTH') - # Set BYPASS_AUTH to true for the test - os.environ['BYPASS_AUTH'] = 'true' +# Utility function to retrieve JWT token +def get_jwt_token(address, port, username, password): + url = f"http://{address}:{port}/login" + headers = {"Content-Type": "application/x-www-form-urlencoded"} + payload = f"username={username}&password={password}" - def restore_bypass_auth(): - # Restore the original value of BYPASS_AUTH - if original_bypass_auth is not None: - os.environ['BYPASS_AUTH'] = original_bypass_auth - else: - del os.environ['BYPASS_AUTH'] - - # Add the finalizer to restore the environment variable after the test - request.addfinalizer(restore_bypass_auth) - -@pytest.mark.usefixtures("enable_bypass_auth") -def test_authentication(): - token = generate_jwt_token('wrong_user') - driver = QKDDriver(address='10.211.36.220', port=11111, token=token) - assert driver.Connect() is False - -@pytest.mark.usefixtures("enable_bypass_auth") -def test_authorization(): - token = generate_jwt_token('user') - driver = QKDDriver(address='10.211.36.220', port=11111, token=token) - assert driver.Connect() is True - wrong_token = generate_jwt_token('wrong_user') - headers = {'Authorization': 'Bearer ' + wrong_token} - response = requests.get('http://10.211.36.220:11111/restconf/data/etsi-qkd-sdn-node:qkd_node', headers=headers) - assert response.status_code == 401 + try: + response = requests.post(url, data=payload, headers=headers) + response.raise_for_status() + return response.json().get('access_token') + except requests.exceptions.RequestException as e: + print(f"Failed to retrieve JWT token: {e}") + return None + +# Real QKD Driver (Requires JWT token) +@pytest.fixture +def real_qkd_driver(): + token = get_jwt_token(QKD1_ADDRESS, PORT, USERNAME, PASSWORD) # Replace with actual details + if not token: + pytest.fail("Failed to retrieve JWT token.") + headers = {'Authorization': f'Bearer {token}'} + return QKDDriver(address=QKD1_ADDRESS, port=PORT, headers=headers) + +# Mock QKD Driver (No actual connection, mock capabilities) +@pytest.fixture +def mock_qkd_driver(): + # Initialize the mock QKD driver with mock settings + token = "mock_token" + headers = {"Authorization": f"Bearer {token}"} + return QKDDriver(address='10.211.36.220', port=11111, headers=headers) + +# General function to retrieve and test capabilities +def retrieve_capabilities(qkd_driver, driver_name): + try: + qkd_driver.Connect() + capabilities = qkd_driver.GetConfig([RESOURCE_CAPABILITES]) + assert isinstance(capabilities, list), "Expected a list of capabilities" + assert len(capabilities) > 0, f"No capabilities found for {driver_name}" + print_data(f"{driver_name} Capabilities", capabilities) + except HTTPError as e: + pytest.fail(f"HTTPError while fetching capabilities for {driver_name}: {e}") + except AssertionError as e: + pytest.fail(f"AssertionError: {e}") + except Exception as e: + pytest.fail(f"An unexpected error occurred: {e}") + +# Test for Real QKD Capabilities +def test_real_qkd_capabilities(real_qkd_driver): + retrieve_capabilities(real_qkd_driver, "Real QKD") + +# Test for Mock QKD Capabilities +def test_mock_qkd_capabilities(mock_qkd_driver): + retrieve_capabilities(mock_qkd_driver, "Mock QKD") diff --git a/src/device/tests/qkd/unit/test_qkd_subscription.py b/src/device/tests/qkd/unit/test_qkd_subscription.py index 99a96f08cd06cefa06de388d5ded7e46c5fe7b40..94803862ca2138f17dd7fc117b94387579b3e76e 100644 --- a/src/device/tests/qkd/unit/test_qkd_subscription.py +++ b/src/device/tests/qkd/unit/test_qkd_subscription.py @@ -1,23 +1,36 @@ -# tests/unit/test_qkd_subscription.py - import pytest from src.device.service.drivers.qkd.QKDDriver2 import QKDDriver +from typing import List, Tuple + + +@pytest.fixture +def qkd_driver(): + # Initialize the QKD driver + return QKDDriver(address='10.211.36.220', port=11111, username='user', password='pass') -def test_state_subscription(): - driver = QKDDriver(address='10.211.36.220', port=11111, username='user', password='pass') - driver.Connect() - result = driver.SubscribeState([('/qkd_interfaces/qkd_interface', 1.0, 2.0)]) - assert all(isinstance(res, bool) and res for res in result) -def test_state_unsubscription(): - driver = QKDDriver(address='10.211.36.220', port=11111, username='user', password='pass') - driver.Connect() - result = driver.UnsubscribeState(['/qkd_interfaces/qkd_interface']) - assert all(isinstance(res, bool) and res for res in result) +def test_state_subscription(qkd_driver): + """ + Test Case ID: SBI_Test_06 - Subscribe to state changes and validate the subscription process. + """ + qkd_driver.Connect() -def test_state_retrieval(): - driver = QKDDriver(address='10.211.36.220', port=11111, username='user', password='pass') - driver.Connect() - state = driver.GetState() - assert isinstance(state, dict) or isinstance(state, list) + try: + # Step 1: Define the subscription + subscriptions = [ + ('00000001-0000-0000-0000-000000000000', 60, 10) # (node_id, frequency, timeout) + ] + + # Step 2: Subscribe to state changes using the driver method + subscription_results = qkd_driver.SubscribeState(subscriptions) + + # Step 3: Validate that the subscription was successful + assert all(result is True for result in subscription_results), "Subscription to state changes failed." + print("State subscription successful:", subscription_results) + + except Exception as e: + pytest.fail(f"An unexpected error occurred during state subscription: {e}") + + finally: + qkd_driver.Disconnect() diff --git a/src/device/tests/qkd/unit/test_qkd_unsubscription.py b/src/device/tests/qkd/unit/test_qkd_unsubscription.py new file mode 100644 index 0000000000000000000000000000000000000000..94803862ca2138f17dd7fc117b94387579b3e76e --- /dev/null +++ b/src/device/tests/qkd/unit/test_qkd_unsubscription.py @@ -0,0 +1,36 @@ +import pytest +from src.device.service.drivers.qkd.QKDDriver2 import QKDDriver +from typing import List, Tuple + + +@pytest.fixture +def qkd_driver(): + # Initialize the QKD driver + return QKDDriver(address='10.211.36.220', port=11111, username='user', password='pass') + + +def test_state_subscription(qkd_driver): + """ + Test Case ID: SBI_Test_06 - Subscribe to state changes and validate the subscription process. + """ + qkd_driver.Connect() + + try: + # Step 1: Define the subscription + subscriptions = [ + ('00000001-0000-0000-0000-000000000000', 60, 10) # (node_id, frequency, timeout) + ] + + # Step 2: Subscribe to state changes using the driver method + subscription_results = qkd_driver.SubscribeState(subscriptions) + + # Step 3: Validate that the subscription was successful + assert all(result is True for result in subscription_results), "Subscription to state changes failed." + + print("State subscription successful:", subscription_results) + + except Exception as e: + pytest.fail(f"An unexpected error occurred during state subscription: {e}") + + finally: + qkd_driver.Disconnect() diff --git a/src/device/tests/qkd/unit/test_set_new configuration.py b/src/device/tests/qkd/unit/test_set_new configuration.py new file mode 100644 index 0000000000000000000000000000000000000000..ff57486a1a0022e3b6815b9f9e2d09c7d212b09f --- /dev/null +++ b/src/device/tests/qkd/unit/test_set_new configuration.py @@ -0,0 +1,109 @@ +import pytest +import requests +from requests.exceptions import HTTPError +from src.device.service.drivers.qkd.QKDDriver2 import QKDDriver +from src.device.service.drivers.qkd.Tools2 import RESOURCE_APPS +import uuid + +@pytest.fixture +def qkd_driver1(): + # Initialize the QKD driver for QKD1 + return QKDDriver(address='10.211.36.220', port=11111, username='user', password='pass') + +@pytest.fixture +def qkd_driver3(): + # Initialize the QKD driver for QKD3 + return QKDDriver(address='10.211.36.220', port=33333, username='user', password='pass') + +def create_qkd_app(driver, qkdn_id, backing_qkdl_id, client_app_id=None): + """ + Helper function to create QKD applications on the given driver. + """ + server_app_id = str(uuid.uuid4()) # Generate a unique server_app_id + + app_payload = { + 'app': { + 'server_app_id': server_app_id, + 'client_app_id': client_app_id if client_app_id else [], # Add client_app_id if provided + 'app_status': 'ON', + 'local_qkdn_id': qkdn_id, + 'backing_qkdl_id': backing_qkdl_id + } + } + + try: + # Log the payload being sent + print(f"Sending payload to {driver.address}: {app_payload}") + + # Send POST request to create the application + response = requests.post(f'http://{driver.address}/app/create_qkd_app', json=app_payload) + + # Check if the request was successful (HTTP 2xx) + response.raise_for_status() + + # Validate the response + assert response.status_code == 200, f"Failed to create QKD app for {driver.address}: {response.text}" + + response_data = response.json() + assert response_data.get('status') == 'success', "Application creation failed." + + # Log the response from the server + print(f"Server {driver.address} response: {response_data}") + + return server_app_id # Return the created server_app_id + + except HTTPError as e: + pytest.fail(f"HTTP error occurred while creating the QKD application on {driver.address}: {e}") + except Exception as e: + pytest.fail(f"An unexpected error occurred: {e}") + +def test_create_qkd_application_bidirectional(qkd_driver1, qkd_driver3): + """ + Create QKD applications on both qkd1 and qkd3, and validate the complete creation in both directions. + """ + + qkd_driver1.Connect() + qkd_driver3.Connect() + + try: + # Step 1: Create QKD application for qkd1, referencing qkd3 as the backing QKDL + server_app_id_qkd1 = create_qkd_app( + qkd_driver1, + qkdn_id='00000001-0000-0000-0000-000000000000', + backing_qkdl_id=['00000003-0002-0000-0000-000000000000'] # qkd3's QKDL + ) + + # Step 2: Create QKD application for qkd3, referencing qkd1 as the backing QKDL, and setting client_app_id to qkd1's app + create_qkd_app( + qkd_driver3, + qkdn_id='00000003-0000-0000-0000-000000000000', + backing_qkdl_id=['00000003-0002-0000-0000-000000000000'], # qkd3's QKDL + client_app_id=[server_app_id_qkd1] # Set qkd1 as the client + ) + + # Step 3: Fetch applications from both qkd1 and qkd3 to validate that the applications exist + apps_qkd1 = qkd_driver1.GetConfig([RESOURCE_APPS]) + apps_qkd3 = qkd_driver3.GetConfig([RESOURCE_APPS]) + + print(f"QKD1 applications config: {apps_qkd1}") + print(f"QKD3 applications config: {apps_qkd3}") + + # Debugging: Print the full structure of the apps to understand what is returned + for app in apps_qkd1: + print(f"QKD1 App: {app}") + + # Debugging: Print the full structure of the apps to understand what is returned + for app in apps_qkd3: + print(f"QKD3 App: {app}") + + # Step 4: Validate the applications are created using app_id instead of server_app_id + assert any(app[1].get('app_id') == '00000001-0001-0000-0000-000000000000' for app in apps_qkd1), "QKD app not created on qkd1." + assert any(app[1].get('app_id') == '00000003-0001-0000-0000-000000000000' for app in apps_qkd3), "QKD app not created on qkd3." + + print("QKD applications created successfully in both directions.") + + except Exception as e: + pytest.fail(f"An unexpected error occurred: {e}") + finally: + qkd_driver1.Disconnect() + qkd_driver3.Disconnect()