Skip to content
Snippets Groups Projects
Tools2.py 11.6 KiB
Newer Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import requests
from typing import Dict, Optional, Set, List, Tuple, Union, Any
from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
from . import RESOURCE_APPS, RESOURCE_LINKS, RESOURCE_CAPABILITES, RESOURCE_NODE
HTTP_OK_CODES = {200, 201, 202, 204}

def find_key(resource: Tuple[str, str], key: str) -> Any:
    return json.loads(resource[1]).get(key)
    root_url: str, resource_key: str, auth: Optional[Any] = None, timeout: Optional[int] = None,
    node_ids: Set[str] = set(), headers: Dict[str, str] = {}
) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]:
    """
    Fetches configuration data from a QKD node for a specified resource key.
    Returns a list of tuples containing the resource key and the corresponding data or exception.
    The function is agnostic to authentication: headers and auth are passed from external sources.
    """
    url = f"{root_url}/restconf/data/etsi-qkd-sdn-node:qkd_node/"
    LOGGER.info(f"Fetching configuration for {resource_key} from {root_url}")

    try:
        if resource_key in [RESOURCE_ENDPOINTS, RESOURCE_INTERFACES]:
            return fetch_interfaces(url, resource_key, headers, auth, timeout)

        elif resource_key in [RESOURCE_LINKS, RESOURCE_NETWORK_INSTANCES]:
            return fetch_links(url, resource_key, headers, auth, timeout)

        elif resource_key in [RESOURCE_APPS]:
            return fetch_apps(url, resource_key, headers, auth, timeout)

        elif resource_key in [RESOURCE_CAPABILITES]:
            return fetch_capabilities(url, resource_key, headers, auth, timeout)
        elif resource_key in [RESOURCE_NODE]:
            return fetch_node(url, resource_key, headers, auth, timeout)
        else:
            LOGGER.warning(f"Unknown resource key: {resource_key}")
            return [(resource_key, ValueError(f"Unknown resource key: {resource_key}"))]

    except requests.exceptions.RequestException as e:
        LOGGER.error(f'Error retrieving/parsing {resource_key} from {url}: {e}')
        return [(resource_key, e)]


def fetch_interfaces(url: str, resource_key: str, headers: Dict[str, str], auth: Optional[Any], timeout: Optional[int]) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]:
    """
    Fetches interface data from the QKD node. Adapts to both mocked and real QKD data structures.
    """
    result = []
    url += 'qkd_interfaces/'
    
        r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers)
        r.raise_for_status()

        # Handle both real and mocked QKD response structures
        response_data = r.json()

        if isinstance(response_data.get('qkd_interfaces'), dict):
            interfaces = response_data.get('qkd_interfaces', {}).get('qkd_interface', [])
        else:
            interfaces = response_data.get('qkd_interface', [])

        for interface in interfaces:
            if resource_key in [RESOURCE_ENDPOINTS]:
                # Handle real QKD data format
                resource_value = interface.get('qkdi_att_point', {})
                if 'device' in resource_value and 'port' in resource_value:
                    uuid = f"{resource_value['device']}:{resource_value['port']}"
                    resource_key_with_uuid = f"/endpoints/endpoint[{uuid}]"
                    resource_value['uuid'] = uuid

                    # Add sample types (for demonstration purposes)
                    sample_types = {}
                    metric_name = 'KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS'
                    metric_id = 301
                    metric_name = metric_name.lower().replace('kpisampletype_', '')
                    monitoring_resource_key = '{:s}/state/{:s}'.format(resource_key, metric_name)
                    sample_types[metric_id] = monitoring_resource_key
                    resource_value['sample_types'] = sample_types

                    result.append((resource_key_with_uuid, resource_value))

            else:
                # Handle both real and mocked QKD formats
                endpoint_value = interface.get('qkdi_att_point', {})
                if 'device' in endpoint_value and 'port' in endpoint_value:
                    # Real QKD data format
                    interface_uuid = f"{endpoint_value['device']}:{endpoint_value['port']}"
                    interface['uuid'] = interface_uuid
                    interface['name'] = interface_uuid
                    interface['enabled'] = True  # Assume enabled for real data
                else:
                    # Mocked QKD data format
                    interface_uuid = interface.get('uuid', f"/interface[{interface['qkdi_id']}]")
                    interface['uuid'] = interface_uuid
                    interface['name'] = interface.get('name', interface_uuid)
                    interface['enabled'] = interface.get('enabled', False)  # Mocked enabled status

                result.append((f"/interface[{interface['qkdi_id']}]", interface))

    except requests.RequestException as e:
        LOGGER.error(f"Error fetching interfaces from {url}: {e}")
        result.append((resource_key, e))
    
    return result

def fetch_links(url: str, resource_key: str, headers: Dict[str, str], auth: Optional[Any], timeout: Optional[int]) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]:
    """
    Fetches link data from the QKD node. Adapts to both mocked and real QKD data structures.
    """
    result = []
    
    if resource_key in [RESOURCE_LINKS, RESOURCE_NETWORK_INSTANCES]:
        url += 'qkd_links/'
        
        try:
            r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers)
            
            # Handle real and mocked QKD data structures
            links = r.json().get('qkd_links', [])
                # For real QKD format (QKD links returned as dictionary objects)
                if isinstance(link, dict):
                    qkdl_id = link.get('qkdl_id')
                    link_type = link.get('qkdl_type', 'Direct')
                    
                    # Handle both real (PHYS, VIRT) and mocked (DIRECT) link types
                    if link_type == 'PHYS' or link_type == 'VIRT':
                        resource_key_direct = f"/link[{qkdl_id}]"
                        result.append((resource_key_direct, link))
                    elif link_type == 'DIRECT':
                        # Mocked QKD format has a slightly different structure
                        result.append((f"/link/link[{qkdl_id}]", link))
                # For mocked QKD format (QKD links returned as lists)
                elif isinstance(link, list):
                    for l in link:
                        qkdl_id = l.get('uuid')
                        link_type = l.get('type', 'Direct')
                        
                        if link_type == 'DIRECT':
                            resource_key_direct = f"/link/link[{qkdl_id}]"
                            result.append((resource_key_direct, l))
        
        except requests.RequestException as e:
            LOGGER.error(f"Error fetching links from {url}: {e}")
            result.append((resource_key, e))
    
    return result
def fetch_apps(url: str, resource_key: str, headers: Dict[str, str], auth: Optional[Any], timeout: Optional[int]) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]:
    """
    Fetches application data from the QKD node.
    """
    result = []
    url += 'qkd_applications/'
    
    try:
        r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers)
        r.raise_for_status()
        
        apps = r.json().get('qkd_applications', {}).get('qkd_app', [])
        for app in apps:
            result.append((f"/app[{app['app_id']}]", app))
    except requests.RequestException as e:
        LOGGER.error(f"Error fetching applications from {url}: {e}")
        result.append((resource_key, e))
    
    return result
def fetch_capabilities(url: str, resource_key: str, headers: Dict[str, str], auth: Optional[Any], timeout: Optional[int]) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]:
    """
    Fetches capabilities data from the QKD node.
    """
    result = []
    url += 'qkdn_capabilities/'
    
    try:
        r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers)
        r.raise_for_status()
        result.append((resource_key, r.json()))
    except requests.RequestException as e:
        LOGGER.error(f"Error fetching capabilities from {url}: {e}")
def fetch_node(url: str, resource_key: str, headers: Dict[str, str], auth: Optional[Any], timeout: Optional[int]) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]:
    """
    Fetches node data from the QKD node.
    """
    result = []
    
    try:
        r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers)
        r.raise_for_status()
        result.append((resource_key, r.json().get('qkd_node', {})))
    except requests.RequestException as e:
        LOGGER.error(f"Error fetching node from {url}: {e}")
        result.append((resource_key, e))
    
def create_connectivity_link(
    root_url: str, link_uuid: str, node_id_src: str, interface_id_src: str, node_id_dst: str, interface_id_dst: str,
    virt_prev_hop: Optional[str] = None, virt_next_hops: Optional[List[str]] = None, virt_bandwidth: Optional[int] = None,
    auth: Optional[Any] = None, timeout: Optional[int] = None, headers: Dict[str, str] = {}
) -> Union[bool, Exception]:
    """
    Creates a connectivity link between QKD nodes using the provided parameters.
    """
    url = f"{root_url}/restconf/data/etsi-qkd-sdn-node:qkd_node/qkd_links/"
        'qkdl_type': 'etsi-qkd-node-types:' + ('VIRT' if virt_prev_hop or virt_next_hops else 'PHYS'),
        'qkdl_local': {'qkdn_id': node_id_src, 'qkdi_id': interface_id_src},
        'qkdl_remote': {'qkdn_id': node_id_dst, 'qkdi_id': interface_id_dst}
    if virt_prev_hop or virt_next_hops:
        qkd_link['virt_prev_hop'] = virt_prev_hop
        qkd_link['virt_next_hop'] = virt_next_hops or []
        qkd_link['virt_bandwidth'] = virt_bandwidth

    data = {'qkd_links': {'qkd_link': [qkd_link]}}

    LOGGER.info(f"Creating connectivity link with payload: {json.dumps(data)}")

    try:
        r = requests.post(url, json=data, timeout=timeout, verify=False, auth=auth, headers=headers)
        r.raise_for_status()
        if r.status_code in HTTP_OK_CODES:
            LOGGER.info(f"Link {link_uuid} created successfully.")
            return True
        else:
            LOGGER.error(f"Failed to create link {link_uuid}, status code: {r.status_code}")
            return False
    except requests.exceptions.RequestException as e:
        LOGGER.error(f"Exception creating link {link_uuid} with payload {json.dumps(data)}: {e}")
        return e