Newer
Older
Alberto Gonzalez Barneo
committed
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Pedro Paulo Tavares
committed
import os
import json
import logging
import requests
import threading
from requests.auth import HTTPBasicAuth
from typing import Any, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from .Tools2 import config_getter, create_connectivity_link
from device.service.driver_api._Driver import _Driver
from . import ALL_RESOURCE_KEYS
Pedro Paulo Tavares
committed
LOGGER = logging.getLogger(__name__)
DRIVER_NAME = 'qkd'
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME})
class QKDDriver(_Driver):
def __init__(self, address: str, port: int, **settings) -> None:
LOGGER.info(f"Initializing QKDDriver with address={address}, port={port}, settings={settings}")
super().__init__(DRIVER_NAME, address, port, **settings)
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
self.__auth = None
self.__headers = {}
self.__qkd_root = os.getenv('QKD_API_URL', f"http://{self.address}:{self.port}") # Simplified URL management
Pedro Paulo Tavares
committed
self.__timeout = int(self.settings.get('timeout', 120))
self.__node_ids = set(self.settings.get('node_ids', []))
self.__initial_data = None
# Optionally pass headers for authentication (e.g., JWT)
self.__headers = settings.get('headers', {})
self.__auth = settings.get('auth', None)
Pedro Paulo Tavares
committed
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
LOGGER.info(f"QKDDriver initialized with QKD root URL: {self.__qkd_root}")
def Connect(self) -> bool:
url = self.__qkd_root + '/restconf/data/etsi-qkd-sdn-node:qkd_node'
with self.__lock:
LOGGER.info(f"Starting connection to {url}")
if self.__started.is_set():
LOGGER.info("Already connected, skipping re-connection.")
return True
try:
LOGGER.info(f'Attempting to connect to {url} with headers {self.__headers} and timeout {self.__timeout}')
response = requests.get(url, timeout=self.__timeout, verify=False, headers=self.__headers, auth=self.__auth)
LOGGER.info(f'Received response: {response.status_code}, content: {response.text}')
response.raise_for_status()
self.__initial_data = response.json()
self.__started.set()
LOGGER.info('Connection successful')
return True
except requests.exceptions.RequestException as e:
LOGGER.error(f'Connection failed: {e}')
return False
def Disconnect(self) -> bool:
LOGGER.info("Disconnecting QKDDriver")
with self.__lock:
self.__terminate.set()
LOGGER.info("QKDDriver disconnected successfully")
return True
@metered_subclass_method(METRICS_POOL)
def GetInitialConfig(self) -> List[Tuple[str, Any]]:
LOGGER.info("Getting initial configuration")
with self.__lock:
if isinstance(self.__initial_data, dict):
initial_config = [('qkd_node', self.__initial_data.get('qkd_node', {}))]
LOGGER.info(f"Initial configuration: {initial_config}")
return initial_config
LOGGER.warning("Initial data is not a dictionary")
return []
@metered_subclass_method(METRICS_POOL)
def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
chk_type('resources', resource_keys, list)
LOGGER.info(f"Getting configuration for resource_keys: {resource_keys}")
results = []
with self.__lock:
if not resource_keys:
resource_keys = ALL_RESOURCE_KEYS
Pedro Paulo Tavares
committed
for i, resource_key in enumerate(resource_keys):
chk_string(f'resource_key[{i}]', resource_key, allow_empty=False)
LOGGER.info(f"Retrieving resource key: {resource_key}")
resource_results = config_getter(
self.__qkd_root, resource_key, timeout=self.__timeout, headers=self.__headers, auth=self.__auth)
Pedro Paulo Tavares
committed
results.extend(resource_results)
LOGGER.info(f"Resource results for {resource_key}: {resource_results}")
LOGGER.info(f"Final configuration results: {results}")
return results
@metered_subclass_method(METRICS_POOL)
def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
results = []
if len(resources) == 0:
Pedro Paulo Tavares
committed
return results
Pedro Paulo Tavares
committed
with self.__lock:
for resource_key, resource_value in resources:
LOGGER.info('Processing resource_key = {:s}'.format(str(resource_key)))
Pedro Paulo Tavares
committed
# Only process '/link' keys
Pedro Paulo Tavares
committed
if resource_key.startswith('/link'):
try:
# Ensure resource_value is deserialized
if isinstance(resource_value, str):
resource_value = json.loads(resource_value)
# Extract values from resource_value dictionary
link_uuid = resource_value['uuid']
node_id_src = resource_value['src_qkdn_id']
interface_id_src = resource_value['src_interface_id']
node_id_dst = resource_value['dst_qkdn_id']
interface_id_dst = resource_value['dst_interface_id']
Pedro Paulo Tavares
committed
virt_prev_hop = resource_value.get('virt_prev_hop')
virt_next_hops = resource_value.get('virt_next_hops')
virt_bandwidth = resource_value.get('virt_bandwidth')
# Call create_connectivity_link with the extracted values
Pedro Paulo Tavares
committed
LOGGER.info(f"Creating connectivity link with UUID: {link_uuid}")
data = create_connectivity_link(
Pedro Paulo Tavares
committed
self.__qkd_root, link_uuid, node_id_src, interface_id_src, node_id_dst, interface_id_dst,
virt_prev_hop, virt_next_hops, virt_bandwidth,
timeout=self.__timeout, auth=self.__auth
Pedro Paulo Tavares
committed
)
# Append success result
Pedro Paulo Tavares
committed
results.append(True)
LOGGER.info(f"Connectivity link {link_uuid} created successfully")
Pedro Paulo Tavares
committed
except Exception as e:
# Catch and log any unhandled exceptions
Pedro Paulo Tavares
committed
LOGGER.exception(f'Unhandled error processing resource_key({resource_key})')
results.append(e)
else:
# Skip unsupported resource keys and append success
results.append(True)
# Logging test results
LOGGER.info('Test keys: ' + str([x for x,y in resources]))
LOGGER.info('Test values: ' + str(results))
Pedro Paulo Tavares
committed
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
return results
@metered_subclass_method(METRICS_POOL)
def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
LOGGER.info(f"Deleting configuration for resources: {resources}")
results = []
if not resources:
LOGGER.warning("No resources provided for DeleteConfig")
return results
with self.__lock:
for resource in resources:
LOGGER.info(f'Resource to delete: {resource}')
uuid = resource[1].get('uuid')
if uuid:
LOGGER.info(f'Resource with UUID {uuid} deleted successfully')
results.append(True)
else:
LOGGER.warning(f"UUID not found in resource: {resource}")
results.append(False)
LOGGER.info(f"DeleteConfig results: {results}")
return results
@metered_subclass_method(METRICS_POOL)
def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
LOGGER.info(f"Subscribing to state updates: {subscriptions}")
results = [True for _ in subscriptions]
LOGGER.info(f"Subscription results: {results}")
return results
@metered_subclass_method(METRICS_POOL)
def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
LOGGER.info(f"Unsubscribing from state updates: {subscriptions}")
results = [True for _ in subscriptions]
LOGGER.info(f"Unsubscription results: {results}")
return results
@metered_subclass_method(METRICS_POOL)
def GetState(self, blocking=False, terminate: Optional[threading.Event] = None) -> Union[dict, list]:
LOGGER.info(f"GetState called with blocking={blocking}, terminate={terminate}")
url = self.__qkd_root + '/restconf/data/etsi-qkd-sdn-node:qkd_node'
try:
LOGGER.info(f"Making GET request to {url} to retrieve state")
response = requests.get(url, timeout=self.__timeout, verify=False, headers=self.__headers, auth=self.__auth)
LOGGER.info(f"Received state response: {response.status_code}, content: {response.text}")
response.raise_for_status()
state_data = response.json()
LOGGER.info(f"State data retrieved: {state_data}")
return state_data
except requests.exceptions.Timeout:
LOGGER.error(f'Timeout getting state from {self.__qkd_root}')
return []
except Exception as e:
LOGGER.error(f'Exception getting state from {self.__qkd_root}: {e}')
return []