import os import json import logging import requests import threading from requests.auth import HTTPBasicAuth from typing import Any, List, Optional, Tuple, Union from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.type_checkers.Checkers import chk_string, chk_type from device.service.driver_api._Driver import _Driver from .Tools2 import config_getter, create_connectivity_link LOGGER = logging.getLogger(__name__) DRIVER_NAME = 'qkd' METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME}) class QKDDriver(_Driver): def __init__(self, address: str, port: int, **settings) -> None: LOGGER.info(f"Initializing QKDDriver with address={address}, port={port}, settings={settings}") super().__init__(DRIVER_NAME, address, port, **settings) self.__lock = threading.Lock() self.__started = threading.Event() self.__terminate = threading.Event() self.__auth = None self.__headers = {} self.__qkd_root = os.getenv('QKD_API_URL', '{:s}://{:s}:{:d}'.format(settings.get('scheme', 'http'), self.address, int(self.port))) self.__timeout = int(self.settings.get('timeout', 120)) self.__node_ids = set(self.settings.get('node_ids', [])) self.__initial_data = None # Authentication settings self.__username = settings.get('username') self.__password = settings.get('password') self.__use_jwt = settings.get('use_jwt', True) # Default to True if JWT is required self.__token = settings.get('token') if self.__token: self.__headers = {'Authorization': 'Bearer ' + self.__token} elif self.__username and self.__password: self.__auth = HTTPBasicAuth(self.__username, self.__password) LOGGER.info(f"QKDDriver initialized with QKD root URL: {self.__qkd_root}") def authenticate(self) -> bool: if self.__use_jwt and not self.__token: return self.__authenticate_with_jwt() return True def __authenticate_with_jwt(self) -> bool: login_url = f'{self.__qkd_root}/login' payload = {'username': self.__username, 'password': self.__password} try: LOGGER.info(f'Attempting to authenticate with JWT at {login_url}') response = requests.post(login_url, data=payload, timeout=self.__timeout) response.raise_for_status() token = response.json().get('access_token') if not token: LOGGER.error('Failed to retrieve access token') return False self.__token = token # Store the token self.__headers = {'Authorization': f'Bearer {token}'} LOGGER.info('JWT authentication successful') return True except requests.exceptions.RequestException as e: LOGGER.exception(f'JWT authentication failed: {e}') return False def Connect(self) -> bool: url = self.__qkd_root + '/restconf/data/etsi-qkd-sdn-node:qkd_node' with self.__lock: LOGGER.info(f"Starting connection to {url}") if self.__started.is_set(): LOGGER.info("Already connected, skipping re-connection.") return True try: if not self.__headers and not self.__auth: LOGGER.info("No headers or auth found, calling authenticate.") if not self.authenticate(): return False LOGGER.info(f'Attempting to connect to {url} with headers {self.__headers} and timeout {self.__timeout}') response = requests.get(url, timeout=self.__timeout, verify=False, headers=self.__headers, auth=self.__auth) LOGGER.info(f'Received response: {response.status_code}, content: {response.text}') response.raise_for_status() self.__initial_data = response.json() self.__started.set() LOGGER.info('Connection successful') return True except requests.exceptions.RequestException as e: LOGGER.error(f'Connection failed: {e}') return False def Disconnect(self) -> bool: LOGGER.info("Disconnecting QKDDriver") with self.__lock: self.__terminate.set() LOGGER.info("QKDDriver disconnected successfully") return True @metered_subclass_method(METRICS_POOL) def GetInitialConfig(self) -> List[Tuple[str, Any]]: LOGGER.info("Getting initial configuration") with self.__lock: if isinstance(self.__initial_data, dict): initial_config = [('qkd_node', self.__initial_data.get('qkd_node', {}))] LOGGER.info(f"Initial configuration: {initial_config}") return initial_config LOGGER.warning("Initial data is not a dictionary") return [] @metered_subclass_method(METRICS_POOL) def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: chk_type('resources', resource_keys, list) LOGGER.info(f"Getting configuration for resource_keys: {resource_keys}") results = [] with self.__lock: if not resource_keys: resource_keys = ['capabilities', 'interfaces', 'links', 'endpoints', 'apps'] for i, resource_key in enumerate(resource_keys): chk_string(f'resource_key[{i}]', resource_key, allow_empty=False) LOGGER.info(f"Retrieving resource key: {resource_key}") resource_results = config_getter( self.__qkd_root, resource_key, timeout=self.__timeout, headers=self.__headers, auth=self.__auth, node_ids=self.__node_ids) results.extend(resource_results) LOGGER.info(f"Resource results for {resource_key}: {resource_results}") LOGGER.info(f"Final configuration results: {results}") return results @metered_subclass_method(METRICS_POOL) def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: LOGGER.info(f"Setting configuration for resources: {resources}") results = [] if not resources: LOGGER.warning("No resources provided for SetConfig") return results with self.__lock: for resource_key, resource_value in resources: LOGGER.info(f'Processing resource_key: {resource_key}, resource_value: {resource_value}') if resource_key.startswith('/link'): try: if not isinstance(resource_value, dict): raise TypeError(f"Expected dictionary but got {type(resource_value).__name__}") link_uuid = resource_value.get('uuid') node_id_src = resource_value.get('src_qkdn_id') interface_id_src = resource_value.get('src_interface_id') node_id_dst = resource_value.get('dst_qkdn_id') interface_id_dst = resource_value.get('dst_interface_id') virt_prev_hop = resource_value.get('virt_prev_hop') virt_next_hops = resource_value.get('virt_next_hops') virt_bandwidth = resource_value.get('virt_bandwidth') LOGGER.info(f"Creating connectivity link with UUID: {link_uuid}") create_connectivity_link( self.__qkd_root, link_uuid, node_id_src, interface_id_src, node_id_dst, interface_id_dst, virt_prev_hop, virt_next_hops, virt_bandwidth, headers=self.__headers, timeout=self.__timeout, auth=self.__auth ) results.append(True) LOGGER.info(f"Connectivity link {link_uuid} created successfully") except Exception as e: LOGGER.exception(f'Unhandled error processing resource_key({resource_key})') results.append(e) else: LOGGER.error(f'Invalid resource key detected: {resource_key}') results.append(ValueError(f'Invalid resource key: {resource_key}')) LOGGER.info(f"SetConfig results: {results}") return results @metered_subclass_method(METRICS_POOL) def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: LOGGER.info(f"Deleting configuration for resources: {resources}") results = [] if not resources: LOGGER.warning("No resources provided for DeleteConfig") return results with self.__lock: for resource in resources: LOGGER.info(f'Resource to delete: {resource}') uuid = resource[1].get('uuid') if uuid: LOGGER.info(f'Resource with UUID {uuid} deleted successfully') results.append(True) else: LOGGER.warning(f"UUID not found in resource: {resource}") results.append(False) LOGGER.info(f"DeleteConfig results: {results}") return results @metered_subclass_method(METRICS_POOL) def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: LOGGER.info(f"Subscribing to state updates: {subscriptions}") results = [True for _ in subscriptions] LOGGER.info(f"Subscription results: {results}") return results @metered_subclass_method(METRICS_POOL) def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: LOGGER.info(f"Unsubscribing from state updates: {subscriptions}") results = [True for _ in subscriptions] LOGGER.info(f"Unsubscription results: {results}") return results @metered_subclass_method(METRICS_POOL) def GetState(self, blocking=False, terminate: Optional[threading.Event] = None) -> Union[dict, list]: LOGGER.info(f"GetState called with blocking={blocking}, terminate={terminate}") url = self.__qkd_root + '/restconf/data/etsi-qkd-sdn-node:qkd_node' try: LOGGER.info(f"Making GET request to {url} to retrieve state") response = requests.get(url, timeout=self.__timeout, verify=False, headers=self.__headers, auth=self.__auth) LOGGER.info(f"Received state response: {response.status_code}, content: {response.text}") response.raise_for_status() state_data = response.json() LOGGER.info(f"State data retrieved: {state_data}") return state_data except requests.exceptions.Timeout: LOGGER.error(f'Timeout getting state from {self.__qkd_root}') return [] except Exception as e: LOGGER.error(f'Exception getting state from {self.__qkd_root}: {e}') return []