import json import logging import requests from requests.auth import HTTPBasicAuth from typing import Dict, Optional, Set, List, Tuple, Union, Any from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry LOGGER = logging.getLogger(__name__) HTTP_OK_CODES = { 200, # OK 201, # Created 202, # Accepted 204, # No Content } def get_request_session(retries=5, backoff_factor=1.0, status_forcelist=(500, 502, 504)): """ Creates a requests session with retries and backoff strategy. """ LOGGER.info(f"Creating request session with retries={retries}, backoff_factor={backoff_factor}, status_forcelist={status_forcelist}") session = requests.Session() retry = Retry( total=retries, read=retries, connect=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist, ) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) LOGGER.info("Request session created successfully") return session def find_key(resource, key): """ Extracts a specific key from a JSON resource. """ return json.loads(resource[1])[key] def verify_endpoint_existence(session, endpoint_uuid, root_url, headers): """ Verifies if the given endpoint exists. """ url = f"{root_url}/restconf/data/etsi-qkd-sdn-node:qkd_node/qkd_interfaces/qkd_interface={endpoint_uuid}" r = session.get(url, headers=headers) if r.status_code == 200 and r.json(): return True else: LOGGER.error(f"Endpoint {endpoint_uuid} does not exist or is not accessible") return False def config_getter( root_url: str, resource_key: str, auth: Optional[HTTPBasicAuth] = None, timeout: Optional[int] = None, node_ids: Set[str] = set(), headers: Dict[str, str] = {} ) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]: """ Fetches configuration data from a QKD node for a specified resource key. Returns a list of tuples containing the resource key and the corresponding data or exception. """ url = f"{root_url}/restconf/data/etsi-qkd-sdn-node:qkd_node/" result = [] session = get_request_session() LOGGER.info(f"Starting config_getter with root_url={root_url}, resource_key={resource_key}, headers={headers}") try: if resource_key in ['endpoints', '__endpoints__', 'interfaces']: url += 'qkd_interfaces/' LOGGER.info(f"Making GET request to {url} with headers: {headers}") r = session.get(url, timeout=timeout, verify=False, auth=auth, headers=headers) LOGGER.info(f"Received response: {r.status_code}, content: {r.text}") r.raise_for_status() interfaces = r.json().get('qkd_interfaces', {}).get('qkd_interface', []) if not interfaces: raise KeyError('qkd_interfaces') for interface in interfaces: if resource_key in ['endpoints', '__endpoints__']: endpoint_uuid = f"{interface['qkdi_att_point'].get('device', 'N/A')}:{interface['qkdi_att_point'].get('port', 'N/A')}" resource_key_with_uuid = f"/endpoints/endpoint[{endpoint_uuid}]" interface['uuid'] = endpoint_uuid result.append((resource_key_with_uuid, interface)) else: interface_uuid = f"{interface['qkdi_att_point'].get('device', 'N/A')}:{interface['qkdi_att_point'].get('port', 'N/A')}" interface['uuid'] = interface_uuid interface['name'] = interface_uuid interface['enabled'] = True resource_key_with_uuid = f"/interface[{interface['qkdi_id']}]" result.append((resource_key_with_uuid, interface)) elif resource_key in ['links', '__links__', '__network_instances__', 'network_instances']: url += 'qkd_links/' LOGGER.info(f"Making GET request to {url} with headers: {headers}") r = session.get(url, timeout=timeout, verify=False, auth=auth, headers=headers) LOGGER.info(f"Received response: {r.status_code}, content: {r.text}") r.raise_for_status() links = r.json().get('qkd_links', {}).get('qkd_link', []) if not links: LOGGER.warning(f"No links found in the response for 'qkd_links'") for link in links: link_type = link.get('qkdl_type', 'Direct') if resource_key == 'links': if link_type == 'Direct': resource_key_with_uuid = f"/link[{link['qkdl_id']}]" result.append((resource_key_with_uuid, link)) else: if link_type == 'Virtual': resource_key_with_uuid = f"/service[{link['qkdl_id']}]" result.append((resource_key_with_uuid, link)) elif resource_key in ['apps', '__apps__']: url += 'qkd_applications/' LOGGER.info(f"Making GET request to {url} with headers: {headers}") r = session.get(url, timeout=timeout, verify=False, auth=auth, headers=headers) LOGGER.info(f"Received response: {r.status_code}, content: {r.text}") r.raise_for_status() apps = r.json().get('qkd_applications', {}).get('qkd_app', []) if not apps: raise KeyError('qkd_applications') for app in apps: app_resource_key = f"/app[{app['app_id']}]" result.append((app_resource_key, app)) elif resource_key in ['capabilities', '__capabilities__']: url += 'qkdn_capabilities/' LOGGER.info(f"Making GET request to {url} with headers: {headers}") r = session.get(url, timeout=timeout, verify=False, auth=auth, headers=headers) LOGGER.info(f"Received response: {r.status_code}, content: {r.text}") r.raise_for_status() capabilities = r.json() result.append((resource_key, capabilities)) elif resource_key in ['node', '__node__']: LOGGER.info(f"Making GET request to {url} with headers: {headers}") r = session.get(url, timeout=timeout, verify=False, auth=auth, headers=headers) LOGGER.info(f"Received response: {r.status_code}, content: {r.text}") r.raise_for_status() node = r.json().get('qkd_node', {}) result.append((resource_key, node)) else: LOGGER.warning(f"Unknown resource key: {resource_key}") result.append((resource_key, ValueError(f"Unknown resource key: {resource_key}"))) except requests.exceptions.RequestException as e: LOGGER.error(f'Exception retrieving/parsing {resource_key} from {url}: {e}') result.append((resource_key, e)) LOGGER.info(f"config_getter results for {resource_key}: {result}") return result def create_connectivity_link( root_url: str, link_uuid: str, node_id_src: str, interface_id_src: str, node_id_dst: str, interface_id_dst: str, virt_prev_hop: Optional[str] = None, virt_next_hops: Optional[List[str]] = None, virt_bandwidth: Optional[int] = None, auth: Optional[HTTPBasicAuth] = None, timeout: Optional[int] = None, headers: Dict[str, str] = {} ) -> Union[bool, Exception]: """ Creates a connectivity link between QKD nodes using the provided parameters. """ url = f"{root_url}/restconf/data/etsi-qkd-sdn-node:qkd_node/qkd_links/" session = get_request_session() # Verify that endpoints exist before creating the link if not (verify_endpoint_existence(session, interface_id_src, root_url, headers) and verify_endpoint_existence(session, interface_id_dst, root_url, headers)): LOGGER.error(f"Cannot create link {link_uuid} because one or both endpoints do not exist.") return Exception(f"Endpoint verification failed for link {link_uuid}") is_virtual = bool(virt_prev_hop or virt_next_hops) qkd_link = { 'qkdl_id': link_uuid, 'qkdl_type': 'etsi-qkd-node-types:' + ('VIRT' if is_virtual else 'PHYS'), 'qkdl_local': { 'qkdn_id': node_id_src, 'qkdi_id': interface_id_src }, 'qkdl_remote': { 'qkdn_id': node_id_dst, 'qkdi_id': interface_id_dst } } if is_virtual: qkd_link['virt_prev_hop'] = virt_prev_hop qkd_link['virt_next_hop'] = virt_next_hops or [] qkd_link['virt_bandwidth'] = virt_bandwidth data = {'qkd_links': {'qkd_link': [qkd_link]}} LOGGER.info(f"Creating connectivity link with payload: {json.dumps(data)}") try: r = session.post(url, json=data, timeout=timeout, verify=False, auth=auth, headers=headers) LOGGER.info(f"Received response for link creation: {r.status_code}, content: {r.text}") r.raise_for_status() if r.status_code in HTTP_OK_CODES: LOGGER.info(f"Link {link_uuid} created successfully.") return True else: LOGGER.error(f"Failed to create link {link_uuid}, status code: {r.status_code}") return False except requests.exceptions.RequestException as e: LOGGER.error(f"Exception creating link {link_uuid} with payload {json.dumps(data)}: {e}") return e