Skip to content
Snippets Groups Projects
QKDDriver2.py 10.2 KiB
Newer Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import json
import logging
import requests
import threading
from requests.auth import HTTPBasicAuth
from typing import Any, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from .Tools2 import config_getter, create_connectivity_link
from device.service.driver_api._Driver import _Driver
from . import ALL_RESOURCE_KEYS

LOGGER = logging.getLogger(__name__)

DRIVER_NAME = 'qkd'
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME})


class QKDDriver(_Driver):
    def __init__(self, address: str, port: int, **settings) -> None:
        LOGGER.info(f"Initializing QKDDriver with address={address}, port={port}, settings={settings}")
        super().__init__(DRIVER_NAME, address, port, **settings)
        self.__lock = threading.Lock()
        self.__started = threading.Event()
        self.__terminate = threading.Event()
        self.__auth = None
        self.__headers = {}
        self.__qkd_root = os.getenv('QKD_API_URL', f"http://{self.address}:{self.port}")  # Simplified URL management
        self.__timeout = int(self.settings.get('timeout', 120))
        self.__node_ids = set(self.settings.get('node_ids', []))
        self.__initial_data = None

        # Optionally pass headers for authentication (e.g., JWT)
        self.__headers = settings.get('headers', {})
        self.__auth = settings.get('auth', None)

        LOGGER.info(f"QKDDriver initialized with QKD root URL: {self.__qkd_root}")

    def Connect(self) -> bool:
        url = self.__qkd_root + '/restconf/data/etsi-qkd-sdn-node:qkd_node'
        with self.__lock:
            LOGGER.info(f"Starting connection to {url}")
            if self.__started.is_set():
                LOGGER.info("Already connected, skipping re-connection.")
                return True

            try:
                LOGGER.info(f'Attempting to connect to {url} with headers {self.__headers} and timeout {self.__timeout}')
                response = requests.get(url, timeout=self.__timeout, verify=False, headers=self.__headers, auth=self.__auth)
                LOGGER.info(f'Received response: {response.status_code}, content: {response.text}')
                response.raise_for_status()
                self.__initial_data = response.json()
                self.__started.set()
                LOGGER.info('Connection successful')
                return True
            except requests.exceptions.RequestException as e:
                LOGGER.error(f'Connection failed: {e}')
                return False

    def Disconnect(self) -> bool:
        LOGGER.info("Disconnecting QKDDriver")
        with self.__lock:
            self.__terminate.set()
            LOGGER.info("QKDDriver disconnected successfully")
            return True

    @metered_subclass_method(METRICS_POOL)
    def GetInitialConfig(self) -> List[Tuple[str, Any]]:
        LOGGER.info("Getting initial configuration")
        with self.__lock:
            if isinstance(self.__initial_data, dict):
                initial_config = [('qkd_node', self.__initial_data.get('qkd_node', {}))]
                LOGGER.info(f"Initial configuration: {initial_config}")
                return initial_config
            LOGGER.warning("Initial data is not a dictionary")
            return []

    @metered_subclass_method(METRICS_POOL)
    def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
        chk_type('resources', resource_keys, list)
        LOGGER.info(f"Getting configuration for resource_keys: {resource_keys}")
        results = []
        with self.__lock:
            if not resource_keys:
                resource_keys = ALL_RESOURCE_KEYS
            for i, resource_key in enumerate(resource_keys):
                chk_string(f'resource_key[{i}]', resource_key, allow_empty=False)
                LOGGER.info(f"Retrieving resource key: {resource_key}")
                resource_results = config_getter(
                    self.__qkd_root, resource_key, timeout=self.__timeout, headers=self.__headers, auth=self.__auth)
                results.extend(resource_results)
                LOGGER.info(f"Resource results for {resource_key}: {resource_results}")
        LOGGER.info(f"Final configuration results: {results}")
        return results

    @metered_subclass_method(METRICS_POOL)
    def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        results = []
        with self.__lock:
            for resource_key, resource_value in resources:
                LOGGER.info('Processing resource_key = {:s}'.format(str(resource_key)))
                        # Ensure resource_value is deserialized
                        if isinstance(resource_value, str):
                            resource_value = json.loads(resource_value)
                        
                        # Extract values from resource_value dictionary
                        link_uuid = resource_value['uuid']
                        node_id_src = resource_value['src_qkdn_id']
                        interface_id_src = resource_value['src_interface_id']
                        node_id_dst = resource_value['dst_qkdn_id']
                        interface_id_dst = resource_value['dst_interface_id']
                        virt_prev_hop = resource_value.get('virt_prev_hop')
                        virt_next_hops = resource_value.get('virt_next_hops')
                        virt_bandwidth = resource_value.get('virt_bandwidth')

                        # Call create_connectivity_link with the extracted values
                        LOGGER.info(f"Creating connectivity link with UUID: {link_uuid}")
                            self.__qkd_root, link_uuid, node_id_src, interface_id_src, node_id_dst, interface_id_dst,
                            virt_prev_hop, virt_next_hops, virt_bandwidth,
                            timeout=self.__timeout, auth=self.__auth
                        results.append(True)
                        LOGGER.info(f"Connectivity link {link_uuid} created successfully")
                        # Catch and log any unhandled exceptions
                        LOGGER.exception(f'Unhandled error processing resource_key({resource_key})')
                        results.append(e)
                else:
                    # Skip unsupported resource keys and append success
                    results.append(True)

        # Logging test results
        LOGGER.info('Test keys: ' + str([x for x,y in resources]))
        LOGGER.info('Test values: ' + str(results))

        return results

    @metered_subclass_method(METRICS_POOL)
    def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        LOGGER.info(f"Deleting configuration for resources: {resources}")
        results = []
        if not resources:
            LOGGER.warning("No resources provided for DeleteConfig")
            return results
        with self.__lock:
            for resource in resources:
                LOGGER.info(f'Resource to delete: {resource}')
                uuid = resource[1].get('uuid')
                if uuid:
                    LOGGER.info(f'Resource with UUID {uuid} deleted successfully')
                    results.append(True)
                else:
                    LOGGER.warning(f"UUID not found in resource: {resource}")
                    results.append(False)
        LOGGER.info(f"DeleteConfig results: {results}")
        return results

    @metered_subclass_method(METRICS_POOL)
    def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        LOGGER.info(f"Subscribing to state updates: {subscriptions}")
        results = [True for _ in subscriptions]
        LOGGER.info(f"Subscription results: {results}")
        return results

    @metered_subclass_method(METRICS_POOL)
    def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        LOGGER.info(f"Unsubscribing from state updates: {subscriptions}")
        results = [True for _ in subscriptions]
        LOGGER.info(f"Unsubscription results: {results}")
        return results

    @metered_subclass_method(METRICS_POOL)
    def GetState(self, blocking=False, terminate: Optional[threading.Event] = None) -> Union[dict, list]:
        LOGGER.info(f"GetState called with blocking={blocking}, terminate={terminate}")
        url = self.__qkd_root + '/restconf/data/etsi-qkd-sdn-node:qkd_node'
        try:
            LOGGER.info(f"Making GET request to {url} to retrieve state")
            response = requests.get(url, timeout=self.__timeout, verify=False, headers=self.__headers, auth=self.__auth)
            LOGGER.info(f"Received state response: {response.status_code}, content: {response.text}")
            response.raise_for_status()
            state_data = response.json()
            LOGGER.info(f"State data retrieved: {state_data}")
            return state_data
        except requests.exceptions.Timeout:
            LOGGER.error(f'Timeout getting state from {self.__qkd_root}')
            return []
        except Exception as e:
            LOGGER.error(f'Exception getting state from {self.__qkd_root}: {e}')
            return []