Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Commits on Source (112)
Showing
with 1527 additions and 0 deletions
......@@ -229,6 +229,21 @@ kubectl create secret generic qdb-data --namespace ${TFS_K8S_NAMESPACE} --type='
--from-literal=METRICSDB_PASSWORD=${QDB_PASSWORD}
printf "\n"
# Check if "dlt" is in the list of components
if [[ " ${TFS_COMPONENTS[@]} " =~ " dlt " ]]; then
echo "Create secret for HLF keystore"
kubectl create secret generic dlt-keystone --namespace ${TFS_K8S_NAMESPACE} --from-file=keystore=${KEY_DIRECTORY_PATH}
printf "\n"
echo "Create secret for HLF signcerts"
kubectl create secret generic dlt-signcerts --namespace ${TFS_K8S_NAMESPACE} --from-file=signcerts.pem=${CERT_DIRECTORY_PATH}
printf "\n"
echo "Create secret for HLF ca.crt"
kubectl create secret generic dlt-ca-crt --namespace ${TFS_K8S_NAMESPACE} --from-file=ca.crt=${TLS_CERT_PATH}
printf "\n"
fi
echo "Deploying components and collecting environment variables..."
ENV_VARS_SCRIPT=tfs_runtime_env_vars.sh
echo "# Environment variables for TeraFlowSDN deployment" > $ENV_VARS_SCRIPT
......
......@@ -12,6 +12,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: v1
kind: ConfigMap
metadata:
name: dlt-config
data:
CHANNEL_NAME: "tfs_channel" # Change according to your blockchain configuration
CHAINCODE_NAME: "tfs_dlt" # Change according to your blockchain configuration
MSP_ID: "ETSI" # Change according to your blockchain configuration
PEER_ENDPOINT: "127.0.0.1:7051" # Change according to your blockchain configuration
PEER_HOST_ALIAS: "peer0.org1.tfs.etsi.org" # Change according to your blockchain configuration
KEY_DIRECTORY_PATH: "/etc/hyperledger/fabric-keystore/keystore"
CERT_DIRECTORY_PATH: "/etc/hyperledger/fabric-signcerts/signcerts.pem"
TLS_CERT_PATH: "/etc/hyperledger/fabric-ca-crt/ca.crt"
---
apiVersion: apps/v1
kind: Deployment
metadata:
......@@ -78,6 +93,52 @@ spec:
limits:
cpu: 700m
memory: 1024Mi
volumeMounts:
- name: keystore
mountPath: /etc/hyperledger/fabric-keystore
readOnly: true
- name: signcerts
mountPath: /etc/hyperledger/fabric-signcerts
readOnly: true
- name: ca-crt
mountPath: /etc/hyperledger/fabric-ca-crt
readOnly: true
envFrom:
- configMapRef:
name: dlt-config
env:
- name: KEY_DIRECTORY_PATH
value: "/etc/hyperledger/fabric-keystore/keystore"
- name: CERT_DIRECTORY_PATH
value: "/etc/hyperledger/fabric-signcerts/signcerts.pem"
- name: TLS_CERT_PATH
value: "/etc/hyperledger/fabric-ca-crt/ca.crt"
volumes:
- name: keystore
secret:
secretName: dlt-keystone
- name: signcerts
secret:
secretName: dlt-signcerts
- name: ca-crt
secret:
secretName: dlt-ca-crt
---
apiVersion: v1
kind: Service
metadata:
name: gatewayservice
spec:
selector:
app: dltservice
ports:
- protocol: TCP
port: 50051
targetPort: 50051
nodePort: 32001
type: NodePort
---
apiVersion: v1
kind: Service
......
......@@ -38,6 +38,8 @@ spec:
value: "INFO"
- name: TOPOLOGY_ABSTRACTOR
value: "DISABLE"
- name: DLT_INTEGRATION
value: "DISABLE"
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:10010"]
......
......@@ -62,6 +62,18 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene
# Uncomment to activate E2E Orchestrator
#export TFS_COMPONENTS="${TFS_COMPONENTS} e2e_orchestrator"
# Uncomment to activate DLT and Interdomain
#export TFS_COMPONENTS="${TFS_COMPONENTS} interdomain dlt"
#if [[ "$TFS_COMPONENTS" == *"dlt"* ]]; then
# export KEY_DIRECTORY_PATH="src/dlt/gateway/keys/priv_sk"
# export CERT_DIRECTORY_PATH="src/dlt/gateway/keys/cert.pem"
# export TLS_CERT_PATH="src/dlt/gateway/keys/ca.crt"
#fi
# Uncomment to activate QKD App
#export TFS_COMPONENTS="${TFS_COMPONENTS} app"
# Set the tag you want to use for your images.
export TFS_IMAGE_TAG="dev"
......
......@@ -214,6 +214,7 @@ enum DeviceDriverEnum {
DEVICEDRIVER_OPTICAL_TFS = 9;
DEVICEDRIVER_IETF_ACTN = 10;
DEVICEDRIVER_OC = 11;
DEVICEDRIVER_QKD = 12;
}
enum DeviceOperationalStatusEnum {
......@@ -300,6 +301,7 @@ enum ServiceTypeEnum {
SERVICETYPE_TE = 4;
SERVICETYPE_E2E = 5;
SERVICETYPE_OPTICAL_CONNECTIVITY = 6;
SERVICETYPE_QKD = 7;
}
enum ServiceStatusEnum {
......
......@@ -47,6 +47,7 @@ class DeviceTypeEnum(Enum):
PACKET_ROUTER = 'packet-router'
PACKET_SWITCH = 'packet-switch'
XR_CONSTELLATION = 'xr-constellation'
QKD_NODE = 'qkd-node'
# ETSI TeraFlowSDN controller
TERAFLOWSDN_CONTROLLER = 'teraflowsdn'
......@@ -235,3 +235,35 @@ def safe_and_metered_rpc_method(metrics_pool : MetricsPool, logger : logging.Log
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
return inner_wrapper
return outer_wrapper
def safe_and_metered_rpc_method_async(metrics_pool: MetricsPool, logger: logging.Logger):
def outer_wrapper(func):
method_name = func.__name__
metrics = metrics_pool.get_metrics(method_name)
histogram_duration, counter_started, counter_completed, counter_failed = metrics
async def inner_wrapper(self, request, grpc_context: grpc.aio.ServicerContext):
counter_started.inc()
try:
logger.debug('{:s} request: {:s}'.format(method_name, grpc_message_to_json_string(request)))
reply = await func(self, request, grpc_context)
logger.debug('{:s} reply: {:s}'.format(method_name, grpc_message_to_json_string(reply)))
counter_completed.inc()
return reply
except ServiceException as e: # pragma: no cover (ServiceException not thrown)
if e.code not in [grpc.StatusCode.NOT_FOUND, grpc.StatusCode.ALREADY_EXISTS]:
# Assume not found or already exists is just a condition, not an error
logger.exception('{:s} exception'.format(method_name))
counter_failed.inc()
else:
counter_completed.inc()
await grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover, pylint: disable=broad-except
logger.exception('{:s} exception'.format(method_name))
counter_failed.inc()
await grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
return inner_wrapper
return outer_wrapper
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, Union
import grpc
import logging
from concurrent import futures
from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH
from grpc_health.v1.health_pb2 import HealthCheckResponse
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from common.Settings import get_grpc_bind_address, get_grpc_grace_period, get_grpc_max_workers
class GenericGrpcServiceAsync:
def __init__(
self, bind_port: Union[str, int], bind_address: Optional[str] = None, max_workers: Optional[int] = None,
grace_period: Optional[int] = None, enable_health_servicer: bool = True, cls_name: str = __name__
) -> None:
self.logger = logging.getLogger(cls_name)
self.bind_port = bind_port
self.bind_address = get_grpc_bind_address() if bind_address is None else bind_address
self.max_workers = get_grpc_max_workers() if max_workers is None else max_workers
self.grace_period = get_grpc_grace_period() if grace_period is None else grace_period
self.enable_health_servicer = enable_health_servicer
self.endpoint = None
self.health_servicer = None
self.pool = None
self.server = None
async def install_servicers(self):
pass
async def start(self):
self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port))
self.logger.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format(
str(self.endpoint), str(self.max_workers)))
self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers)
self.server = grpc.aio.server(self.pool)
await self.install_servicers() # Ensure this is awaited
if self.enable_health_servicer:
self.health_servicer = HealthServicer(
experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
add_HealthServicer_to_server(self.health_servicer, self.server)
self.bind_port = self.server.add_insecure_port(self.endpoint)
self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port))
self.logger.info('Listening on {:s}...'.format(str(self.endpoint)))
await self.server.start()
if self.enable_health_servicer:
self.health_servicer.set(OVERALL_HEALTH, HealthCheckResponse.SERVING)
self.logger.debug('Service started')
async def stop(self):
self.logger.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period)))
if self.enable_health_servicer:
self.health_servicer.enter_graceful_shutdown()
await self.server.stop(self.grace_period)
self.logger.debug('Service stopped')
......@@ -34,6 +34,7 @@ class ORM_DeviceDriverEnum(enum.Enum):
OPTICAL_TFS = DeviceDriverEnum.DEVICEDRIVER_OPTICAL_TFS
IETF_ACTN = DeviceDriverEnum.DEVICEDRIVER_IETF_ACTN
OC = DeviceDriverEnum.DEVICEDRIVER_OC
QKD = DeviceDriverEnum.DEVICEDRIVER_QKD
grpc_to_enum__device_driver = functools.partial(
grpc_to_enum, DeviceDriverEnum, ORM_DeviceDriverEnum)
......@@ -29,6 +29,7 @@ class ORM_ServiceTypeEnum(enum.Enum):
TE = ServiceTypeEnum.SERVICETYPE_TE
E2E = ServiceTypeEnum.SERVICETYPE_E2E
OPTICAL_CONNECTIVITY = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY
QKD = ServiceTypeEnum.SERVICETYPE_QKD
grpc_to_enum__service_type = functools.partial(
grpc_to_enum, ServiceTypeEnum, ORM_ServiceTypeEnum)
......@@ -178,3 +178,14 @@ if LOAD_ALL_DEVICE_DRIVERS:
FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_OC,
}
]))
if LOAD_ALL_DEVICE_DRIVERS:
from .qkd.QKDDriver2 import QKDDriver # pylint: disable=wrong-import-position
DRIVERS.append(
(QKDDriver, [
{
# Close enough, it does optical switching
FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.QKD_NODE,
FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_QKD,
}
]))
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, requests, threading
from requests.auth import HTTPBasicAuth
from typing import Any, Iterator, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from . import ALL_RESOURCE_KEYS
from .Tools import find_key, config_getter, create_connectivity_link
LOGGER = logging.getLogger(__name__)
DRIVER_NAME = 'qkd'
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME})
class QKDDriver(_Driver):
def __init__(self, address: str, port: int, **settings) -> None:
super().__init__(DRIVER_NAME, address, port, **settings)
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
username = self.settings.get('username')
password = self.settings.get('password')
self.__auth = HTTPBasicAuth(username, password) if username is not None and password is not None else None
scheme = self.settings.get('scheme', 'http')
self.__qkd_root = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port))
self.__timeout = int(self.settings.get('timeout', 120))
self.__node_ids = set(self.settings.get('node_ids', []))
token = self.settings.get('token')
self.__headers = {'Authorization': 'Bearer ' + token}
self.__initial_data = None
def Connect(self) -> bool:
url = self.__qkd_root + '/restconf/data/etsi-qkd-sdn-node:qkd_node'
with self.__lock:
if self.__started.is_set(): return True
r = None
try:
LOGGER.info(f'requests.get("{url}", timeout={self.__timeout}, verify=False, auth={self.__auth}, headers={self.__headers})')
r = requests.get(url, timeout=self.__timeout, verify=False, auth=self.__auth, headers=self.__headers)
LOGGER.info(f'R: {r}')
LOGGER.info(f'Text: {r.text}')
LOGGER.info(f'Json: {r.json()}')
except requests.exceptions.Timeout:
LOGGER.exception('Timeout connecting {:s}'.format(str(self.__qkd_root)))
return False
except Exception: # pylint: disable=broad-except
LOGGER.exception('Exception connecting {:s}'.format(str(self.__qkd_root)))
return False
else:
self.__started.set()
self.__initial_data = r.json()
return True
def Disconnect(self) -> bool:
with self.__lock:
self.__terminate.set()
return True
@metered_subclass_method(METRICS_POOL)
def GetInitialConfig(self) -> List[Tuple[str, Any]]:
with self.__lock:
return self.__initial_data
@metered_subclass_method(METRICS_POOL)
def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
chk_type('resources', resource_keys, list)
results = []
with self.__lock:
if len(resource_keys) == 0: resource_keys = ALL_RESOURCE_KEYS
for i, resource_key in enumerate(resource_keys):
str_resource_name = 'resource_key[#{:d}]'.format(i)
chk_string(str_resource_name, resource_key, allow_empty=False)
results.extend(config_getter(
self.__qkd_root, resource_key, timeout=self.__timeout, auth=self.__auth,
node_ids=self.__node_ids, headers=self.__headers))
return results
@metered_subclass_method(METRICS_POOL)
def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
results = []
if len(resources) == 0:
return results
with self.__lock:
for resource_key, resource_value in resources:
LOGGER.info('resource = {:s}'.format(str(resource_key)))
if resource_key.startswith('/link'):
try:
resource_value = json.loads(resource_value)
link_uuid = resource_value['uuid']
node_id_src = resource_value['src_qkdn_id']
interface_id_src = resource_value['src_interface_id']
node_id_dst = resource_value['dst_qkdn_id']
interface_id_dst = resource_value['dst_interface_id']
virt_prev_hop = resource_value.get('virt_prev_hop')
virt_next_hops = resource_value.get('virt_next_hops')
virt_bandwidth = resource_value.get('virt_bandwidth')
data = create_connectivity_link(
self.__qkd_root, link_uuid, node_id_src, interface_id_src, node_id_dst, interface_id_dst,
virt_prev_hop, virt_next_hops, virt_bandwidth,
timeout=self.__timeout, auth=self.__auth, headers=self.__headers
)
#data = create_connectivity_link(
# self.__qkd_root, link_uuid, node_id_src, interface_id_src, node_id_dst, interface_id_dst,
# timeout=self.__timeout, auth=self.__auth
#)
results.append(True)
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Unhandled error processing resource_key({:s})'.format(str(resource_key)))
results.append(e)
else:
results.append(True)
LOGGER.info('Test keys: ' + str([x for x,y in resources]))
LOGGER.info('Test values: ' + str(results))
return results
'''
@metered_subclass_method(METRICS_POOL)
def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
results = []
if len(resources) == 0: return results
with self.__lock:
for resource in resources:
LOGGER.info('resource = {:s}'.format(str(resource)))
uuid = find_key(resource, 'uuid')
results.extend(delete_connectivity_service(
self.__qkd_root, uuid, timeout=self.__timeout, auth=self.__auth))
return results
'''
@metered_subclass_method(METRICS_POOL)
def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
# TODO: QKD API Driver does not support monitoring by now
LOGGER.info(f'Subscribe {self.address}: {subscriptions}')
return [True for _ in subscriptions]
@metered_subclass_method(METRICS_POOL)
def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
# TODO: QKD API Driver does not support monitoring by now
return [False for _ in subscriptions]
def GetState(
self, blocking=False, terminate : Optional[threading.Event] = None
) -> Iterator[Tuple[float, str, Any]]:
# TODO: QKD API Driver does not support monitoring by now
LOGGER.info(f'GetState {self.address} called')
return []
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import json
import logging
import requests
import threading
from requests.auth import HTTPBasicAuth
from typing import Any, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from .Tools2 import config_getter, create_connectivity_link
from device.service.driver_api._Driver import _Driver
from . import ALL_RESOURCE_KEYS
LOGGER = logging.getLogger(__name__)
DRIVER_NAME = 'qkd'
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME})
class QKDDriver(_Driver):
def __init__(self, address: str, port: int, **settings) -> None:
LOGGER.info(f"Initializing QKDDriver with address={address}, port={port}, settings={settings}")
super().__init__(DRIVER_NAME, address, port, **settings)
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
self.__auth = None
self.__headers = {}
self.__qkd_root = os.getenv('QKD_API_URL', f"http://{self.address}:{self.port}") # Simplified URL management
self.__timeout = int(self.settings.get('timeout', 120))
self.__node_ids = set(self.settings.get('node_ids', []))
self.__initial_data = None
# Optionally pass headers for authentication (e.g., JWT)
self.__headers = settings.get('headers', {})
self.__auth = settings.get('auth', None)
LOGGER.info(f"QKDDriver initialized with QKD root URL: {self.__qkd_root}")
def Connect(self) -> bool:
url = self.__qkd_root + '/restconf/data/etsi-qkd-sdn-node:qkd_node'
with self.__lock:
LOGGER.info(f"Starting connection to {url}")
if self.__started.is_set():
LOGGER.info("Already connected, skipping re-connection.")
return True
try:
LOGGER.info(f'Attempting to connect to {url} with headers {self.__headers} and timeout {self.__timeout}')
response = requests.get(url, timeout=self.__timeout, verify=False, headers=self.__headers, auth=self.__auth)
LOGGER.info(f'Received response: {response.status_code}, content: {response.text}')
response.raise_for_status()
self.__initial_data = response.json()
self.__started.set()
LOGGER.info('Connection successful')
return True
except requests.exceptions.RequestException as e:
LOGGER.error(f'Connection failed: {e}')
return False
def Disconnect(self) -> bool:
LOGGER.info("Disconnecting QKDDriver")
with self.__lock:
self.__terminate.set()
LOGGER.info("QKDDriver disconnected successfully")
return True
@metered_subclass_method(METRICS_POOL)
def GetInitialConfig(self) -> List[Tuple[str, Any]]:
LOGGER.info("Getting initial configuration")
with self.__lock:
if isinstance(self.__initial_data, dict):
initial_config = [('qkd_node', self.__initial_data.get('qkd_node', {}))]
LOGGER.info(f"Initial configuration: {initial_config}")
return initial_config
LOGGER.warning("Initial data is not a dictionary")
return []
@metered_subclass_method(METRICS_POOL)
def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
chk_type('resources', resource_keys, list)
LOGGER.info(f"Getting configuration for resource_keys: {resource_keys}")
results = []
with self.__lock:
if not resource_keys:
resource_keys = ALL_RESOURCE_KEYS
for i, resource_key in enumerate(resource_keys):
chk_string(f'resource_key[{i}]', resource_key, allow_empty=False)
LOGGER.info(f"Retrieving resource key: {resource_key}")
resource_results = config_getter(
self.__qkd_root, resource_key, timeout=self.__timeout, headers=self.__headers, auth=self.__auth)
results.extend(resource_results)
LOGGER.info(f"Resource results for {resource_key}: {resource_results}")
LOGGER.info(f"Final configuration results: {results}")
return results
@metered_subclass_method(METRICS_POOL)
def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
results = []
if len(resources) == 0:
return results
with self.__lock:
for resource_key, resource_value in resources:
LOGGER.info('Processing resource_key = {:s}'.format(str(resource_key)))
# Only process '/link' keys
if resource_key.startswith('/link'):
try:
# Ensure resource_value is deserialized
if isinstance(resource_value, str):
resource_value = json.loads(resource_value)
# Extract values from resource_value dictionary
link_uuid = resource_value['uuid']
node_id_src = resource_value['src_qkdn_id']
interface_id_src = resource_value['src_interface_id']
node_id_dst = resource_value['dst_qkdn_id']
interface_id_dst = resource_value['dst_interface_id']
virt_prev_hop = resource_value.get('virt_prev_hop')
virt_next_hops = resource_value.get('virt_next_hops')
virt_bandwidth = resource_value.get('virt_bandwidth')
# Call create_connectivity_link with the extracted values
LOGGER.info(f"Creating connectivity link with UUID: {link_uuid}")
data = create_connectivity_link(
self.__qkd_root, link_uuid, node_id_src, interface_id_src, node_id_dst, interface_id_dst,
virt_prev_hop, virt_next_hops, virt_bandwidth,
timeout=self.__timeout, auth=self.__auth
)
# Append success result
results.append(True)
LOGGER.info(f"Connectivity link {link_uuid} created successfully")
except Exception as e:
# Catch and log any unhandled exceptions
LOGGER.exception(f'Unhandled error processing resource_key({resource_key})')
results.append(e)
else:
# Skip unsupported resource keys and append success
results.append(True)
# Logging test results
LOGGER.info('Test keys: ' + str([x for x,y in resources]))
LOGGER.info('Test values: ' + str(results))
return results
@metered_subclass_method(METRICS_POOL)
def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
LOGGER.info(f"Deleting configuration for resources: {resources}")
results = []
if not resources:
LOGGER.warning("No resources provided for DeleteConfig")
return results
with self.__lock:
for resource in resources:
LOGGER.info(f'Resource to delete: {resource}')
uuid = resource[1].get('uuid')
if uuid:
LOGGER.info(f'Resource with UUID {uuid} deleted successfully')
results.append(True)
else:
LOGGER.warning(f"UUID not found in resource: {resource}")
results.append(False)
LOGGER.info(f"DeleteConfig results: {results}")
return results
@metered_subclass_method(METRICS_POOL)
def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
LOGGER.info(f"Subscribing to state updates: {subscriptions}")
results = [True for _ in subscriptions]
LOGGER.info(f"Subscription results: {results}")
return results
@metered_subclass_method(METRICS_POOL)
def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
LOGGER.info(f"Unsubscribing from state updates: {subscriptions}")
results = [True for _ in subscriptions]
LOGGER.info(f"Unsubscription results: {results}")
return results
@metered_subclass_method(METRICS_POOL)
def GetState(self, blocking=False, terminate: Optional[threading.Event] = None) -> Union[dict, list]:
LOGGER.info(f"GetState called with blocking={blocking}, terminate={terminate}")
url = self.__qkd_root + '/restconf/data/etsi-qkd-sdn-node:qkd_node'
try:
LOGGER.info(f"Making GET request to {url} to retrieve state")
response = requests.get(url, timeout=self.__timeout, verify=False, headers=self.__headers, auth=self.__auth)
LOGGER.info(f"Received state response: {response.status_code}, content: {response.text}")
response.raise_for_status()
state_data = response.json()
LOGGER.info(f"State data retrieved: {state_data}")
return state_data
except requests.exceptions.Timeout:
LOGGER.error(f'Timeout getting state from {self.__qkd_root}')
return []
except Exception as e:
LOGGER.error(f'Exception getting state from {self.__qkd_root}: {e}')
return []
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, requests
from requests.auth import HTTPBasicAuth
from typing import Dict, Optional, Set
from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
from . import RESOURCE_APPS, RESOURCE_LINKS, RESOURCE_CAPABILITES, RESOURCE_NODE
LOGGER = logging.getLogger(__name__)
HTTP_OK_CODES = {
200, # OK
201, # Created
202, # Accepted
204, # No Content
}
def find_key(resource, key):
return json.loads(resource[1])[key]
def config_getter(
root_url : str, resource_key : str, auth : Optional[HTTPBasicAuth] = None, timeout : Optional[int] = None,
node_ids : Set[str] = set(), headers={}
):
# getting endpoints
url = root_url + '/restconf/data/etsi-qkd-sdn-node:qkd_node/'
result = []
try:
if resource_key in [RESOURCE_ENDPOINTS, RESOURCE_INTERFACES]:
url += 'qkd_interfaces/'
r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers)
interfaces = r.json()['qkd_interfaces']['qkd_interface']
# If it's a physical endpoint
if resource_key == RESOURCE_ENDPOINTS:
for interface in interfaces:
resource_value = interface.get('qkdi_att_point', {})
if 'device' in resource_value and 'port' in resource_value:
uuid = '{}:{}'.format(resource_value['device'], resource_value['port'])
resource_key = '/endpoints/endpoint[{:s}]'.format(uuid)
resource_value['uuid'] = uuid
sample_types = {}
metric_name = 'KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS'
metric_id = 301
metric_name = metric_name.lower().replace('kpisampletype_', '')
monitoring_resource_key = '{:s}/state/{:s}'.format(resource_key, metric_name)
sample_types[metric_id] = monitoring_resource_key
resource_value['sample_types'] = sample_types
result.append((resource_key, resource_value))
else:
for interface in interfaces:
resource_key = '/interface[{:s}]'.format(interface['qkdi_id'])
endpoint_value = interface.get('qkdi_att_point', {})
if 'device' in endpoint_value and 'port' in endpoint_value:
name = '{}:{}'.format(endpoint_value['device'], endpoint_value['port'])
interface['name'] = name
interface['enabled'] = True # For test purpose only
result.append((resource_key, interface))
elif resource_key in [RESOURCE_LINKS, RESOURCE_NETWORK_INSTANCES]:
url += 'qkd_links/'
r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers)
links = r.json()['qkd_links']['qkd_link']
if resource_key == RESOURCE_LINKS:
for link in links:
link_type = link.get('qkdl_type', 'Direct')
if link_type == 'Direct':
resource_key = '/link[{:s}]'.format(link['qkdl_id'])
result.append((resource_key, link))
else:
for link in links:
link_type = link.get('qkdl_type', 'Direct')
if link_type == 'Virtual':
resource_key = '/service[{:s}]'.format(link['qkdl_id'])
result.append((resource_key, link))
elif resource_key == RESOURCE_APPS:
url += 'qkd_applications/'
r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers)
apps = r.json()['qkd_applications']['qkd_app']
for app in apps:
resource_key = '/app[{:s}]'.format(app['app_id'])
result.append((resource_key, app))
elif resource_key == RESOURCE_CAPABILITES:
url += 'qkdn_capabilities/'
r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers)
capabilities = r.json()['qkdn_capabilities']
result.append((resource_key, capabilities))
elif resource_key == RESOURCE_NODE:
r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers)
node = r.json()['qkd_node']
result.append((resource_key, node))
except requests.exceptions.Timeout:
LOGGER.exception('Timeout connecting {:s}'.format(url))
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception retrieving/parsing endpoints for {:s}'.format(resource_key))
result.append((resource_key, e))
return result
def create_connectivity_link(
root_url, link_uuid, node_id_src, interface_id_src, node_id_dst, interface_id_dst,
virt_prev_hop = None, virt_next_hops = None, virt_bandwidth = None,
auth : Optional[HTTPBasicAuth] = None, timeout : Optional[int] = None, headers={}
):
url = root_url + '/restconf/data/etsi-qkd-sdn-node:qkd_node/qkd_links/'
is_virtual = bool(virt_prev_hop or virt_next_hops)
qkd_link = {
'qkdl_id': link_uuid,
'qkdl_type': 'etsi-qkd-node-types:' + ('VIRT' if is_virtual else 'PHYS'),
'qkdl_local': {
'qkdn_id': node_id_src,
'qkdi_id': interface_id_src
},
'qkdl_remote': {
'qkdn_id': node_id_dst,
'qkdi_id': interface_id_dst
}
}
if is_virtual:
qkd_link['virt_prev_hop'] = virt_prev_hop
qkd_link['virt_next_hop'] = virt_next_hops or []
qkd_link['virt_bandwidth'] = virt_bandwidth
data = {'qkd_links': {'qkd_link': [qkd_link]}}
requests.post(url, json=data, headers=headers)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import requests
from typing import Dict, Optional, Set, List, Tuple, Union, Any
from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
from . import RESOURCE_APPS, RESOURCE_LINKS, RESOURCE_CAPABILITES, RESOURCE_NODE
LOGGER = logging.getLogger(__name__)
HTTP_OK_CODES = {200, 201, 202, 204}
def find_key(resource: Tuple[str, str], key: str) -> Any:
"""
Extracts a specific key from a JSON resource.
"""
return json.loads(resource[1]).get(key)
def config_getter(
root_url: str, resource_key: str, auth: Optional[Any] = None, timeout: Optional[int] = None,
node_ids: Set[str] = set(), headers: Dict[str, str] = {}
) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]:
"""
Fetches configuration data from a QKD node for a specified resource key.
Returns a list of tuples containing the resource key and the corresponding data or exception.
The function is agnostic to authentication: headers and auth are passed from external sources.
"""
url = f"{root_url}/restconf/data/etsi-qkd-sdn-node:qkd_node/"
LOGGER.info(f"Fetching configuration for {resource_key} from {root_url}")
try:
if resource_key in [RESOURCE_ENDPOINTS, RESOURCE_INTERFACES]:
return fetch_interfaces(url, resource_key, headers, auth, timeout)
elif resource_key in [RESOURCE_LINKS, RESOURCE_NETWORK_INSTANCES]:
return fetch_links(url, resource_key, headers, auth, timeout)
elif resource_key in [RESOURCE_APPS]:
return fetch_apps(url, resource_key, headers, auth, timeout)
elif resource_key in [RESOURCE_CAPABILITES]:
return fetch_capabilities(url, resource_key, headers, auth, timeout)
elif resource_key in [RESOURCE_NODE]:
return fetch_node(url, resource_key, headers, auth, timeout)
else:
LOGGER.warning(f"Unknown resource key: {resource_key}")
return [(resource_key, ValueError(f"Unknown resource key: {resource_key}"))]
except requests.exceptions.RequestException as e:
LOGGER.error(f'Error retrieving/parsing {resource_key} from {url}: {e}')
return [(resource_key, e)]
def fetch_interfaces(url: str, resource_key: str, headers: Dict[str, str], auth: Optional[Any], timeout: Optional[int]) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]:
"""
Fetches interface data from the QKD node. Adapts to both mocked and real QKD data structures.
"""
result = []
url += 'qkd_interfaces/'
try:
r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers)
r.raise_for_status()
# Handle both real and mocked QKD response structures
response_data = r.json()
if isinstance(response_data.get('qkd_interfaces'), dict):
interfaces = response_data.get('qkd_interfaces', {}).get('qkd_interface', [])
else:
interfaces = response_data.get('qkd_interface', [])
for interface in interfaces:
if resource_key in [RESOURCE_ENDPOINTS]:
# Handle real QKD data format
resource_value = interface.get('qkdi_att_point', {})
if 'device' in resource_value and 'port' in resource_value:
uuid = f"{resource_value['device']}:{resource_value['port']}"
resource_key_with_uuid = f"/endpoints/endpoint[{uuid}]"
resource_value['uuid'] = uuid
# Add sample types (for demonstration purposes)
sample_types = {}
metric_name = 'KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS'
metric_id = 301
metric_name = metric_name.lower().replace('kpisampletype_', '')
monitoring_resource_key = '{:s}/state/{:s}'.format(resource_key, metric_name)
sample_types[metric_id] = monitoring_resource_key
resource_value['sample_types'] = sample_types
result.append((resource_key_with_uuid, resource_value))
else:
# Handle both real and mocked QKD formats
endpoint_value = interface.get('qkdi_att_point', {})
if 'device' in endpoint_value and 'port' in endpoint_value:
# Real QKD data format
interface_uuid = f"{endpoint_value['device']}:{endpoint_value['port']}"
interface['uuid'] = interface_uuid
interface['name'] = interface_uuid
interface['enabled'] = True # Assume enabled for real data
else:
# Mocked QKD data format
interface_uuid = interface.get('uuid', f"/interface[{interface['qkdi_id']}]")
interface['uuid'] = interface_uuid
interface['name'] = interface.get('name', interface_uuid)
interface['enabled'] = interface.get('enabled', False) # Mocked enabled status
result.append((f"/interface[{interface['qkdi_id']}]", interface))
except requests.RequestException as e:
LOGGER.error(f"Error fetching interfaces from {url}: {e}")
result.append((resource_key, e))
return result
def fetch_links(url: str, resource_key: str, headers: Dict[str, str], auth: Optional[Any], timeout: Optional[int]) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]:
"""
Fetches link data from the QKD node. Adapts to both mocked and real QKD data structures.
"""
result = []
if resource_key in [RESOURCE_LINKS, RESOURCE_NETWORK_INSTANCES]:
url += 'qkd_links/'
try:
r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers)
r.raise_for_status()
# Handle real and mocked QKD data structures
links = r.json().get('qkd_links', [])
for link in links:
# For real QKD format (QKD links returned as dictionary objects)
if isinstance(link, dict):
qkdl_id = link.get('qkdl_id')
link_type = link.get('qkdl_type', 'Direct')
# Handle both real (PHYS, VIRT) and mocked (DIRECT) link types
if link_type == 'PHYS' or link_type == 'VIRT':
resource_key_direct = f"/link[{qkdl_id}]"
result.append((resource_key_direct, link))
elif link_type == 'DIRECT':
# Mocked QKD format has a slightly different structure
result.append((f"/link/link[{qkdl_id}]", link))
# For mocked QKD format (QKD links returned as lists)
elif isinstance(link, list):
for l in link:
qkdl_id = l.get('uuid')
link_type = l.get('type', 'Direct')
if link_type == 'DIRECT':
resource_key_direct = f"/link/link[{qkdl_id}]"
result.append((resource_key_direct, l))
except requests.RequestException as e:
LOGGER.error(f"Error fetching links from {url}: {e}")
result.append((resource_key, e))
return result
def fetch_apps(url: str, resource_key: str, headers: Dict[str, str], auth: Optional[Any], timeout: Optional[int]) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]:
"""
Fetches application data from the QKD node.
"""
result = []
url += 'qkd_applications/'
try:
r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers)
r.raise_for_status()
apps = r.json().get('qkd_applications', {}).get('qkd_app', [])
for app in apps:
result.append((f"/app[{app['app_id']}]", app))
except requests.RequestException as e:
LOGGER.error(f"Error fetching applications from {url}: {e}")
result.append((resource_key, e))
return result
def fetch_capabilities(url: str, resource_key: str, headers: Dict[str, str], auth: Optional[Any], timeout: Optional[int]) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]:
"""
Fetches capabilities data from the QKD node.
"""
result = []
url += 'qkdn_capabilities/'
try:
r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers)
r.raise_for_status()
result.append((resource_key, r.json()))
except requests.RequestException as e:
LOGGER.error(f"Error fetching capabilities from {url}: {e}")
result.append((resource_key, e))
return result
def fetch_node(url: str, resource_key: str, headers: Dict[str, str], auth: Optional[Any], timeout: Optional[int]) -> List[Tuple[str, Union[Dict[str, Any], Exception]]]:
"""
Fetches node data from the QKD node.
"""
result = []
try:
r = requests.get(url, timeout=timeout, verify=False, auth=auth, headers=headers)
r.raise_for_status()
result.append((resource_key, r.json().get('qkd_node', {})))
except requests.RequestException as e:
LOGGER.error(f"Error fetching node from {url}: {e}")
result.append((resource_key, e))
return result
def create_connectivity_link(
root_url: str, link_uuid: str, node_id_src: str, interface_id_src: str, node_id_dst: str, interface_id_dst: str,
virt_prev_hop: Optional[str] = None, virt_next_hops: Optional[List[str]] = None, virt_bandwidth: Optional[int] = None,
auth: Optional[Any] = None, timeout: Optional[int] = None, headers: Dict[str, str] = {}
) -> Union[bool, Exception]:
"""
Creates a connectivity link between QKD nodes using the provided parameters.
"""
url = f"{root_url}/restconf/data/etsi-qkd-sdn-node:qkd_node/qkd_links/"
qkd_link = {
'qkdl_id': link_uuid,
'qkdl_type': 'etsi-qkd-node-types:' + ('VIRT' if virt_prev_hop or virt_next_hops else 'PHYS'),
'qkdl_local': {'qkdn_id': node_id_src, 'qkdi_id': interface_id_src},
'qkdl_remote': {'qkdn_id': node_id_dst, 'qkdi_id': interface_id_dst}
}
if virt_prev_hop or virt_next_hops:
qkd_link['virt_prev_hop'] = virt_prev_hop
qkd_link['virt_next_hop'] = virt_next_hops or []
qkd_link['virt_bandwidth'] = virt_bandwidth
data = {'qkd_links': {'qkd_link': [qkd_link]}}
LOGGER.info(f"Creating connectivity link with payload: {json.dumps(data)}")
try:
r = requests.post(url, json=data, timeout=timeout, verify=False, auth=auth, headers=headers)
r.raise_for_status()
if r.status_code in HTTP_OK_CODES:
LOGGER.info(f"Link {link_uuid} created successfully.")
return True
else:
LOGGER.error(f"Failed to create link {link_uuid}, status code: {r.status_code}")
return False
except requests.exceptions.RequestException as e:
LOGGER.error(f"Exception creating link {link_uuid} with payload {json.dumps(data)}: {e}")
return e
from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
RESOURCE_LINKS = '__links__'
RESOURCE_APPS = '__apps__'
RESOURCE_CAPABILITES = '__capabilities__'
RESOURCE_NODE = '__node__'
ALL_RESOURCE_KEYS = [
RESOURCE_ENDPOINTS,
RESOURCE_INTERFACES,
RESOURCE_NETWORK_INSTANCES,
RESOURCE_LINKS,
RESOURCE_APPS,
RESOURCE_CAPABILITES,
RESOURCE_NODE
]
RESOURCE_KEY_MAPPINGS = {
RESOURCE_ENDPOINTS : 'component',
RESOURCE_INTERFACES : 'interface',
RESOURCE_NETWORK_INSTANCES: 'network_instance',
RESOURCE_LINKS : 'links',
RESOURCE_APPS : 'apps',
RESOURCE_CAPABILITES : 'capabilities',
RESOURCE_NODE : 'node'
}
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
import requests
import json
import os
from device.service.drivers.qkd.QKDDriver2 import QKDDriver
from device.service.drivers.qkd.Tools2 import (
RESOURCE_INTERFACES,
RESOURCE_LINKS,
RESOURCE_CAPABILITES,
RESOURCE_NODE,
RESOURCE_APPS
)
# Test ID: INT_LQ_Test_01 (QKD Node Authentication)
# Function to retrieve JWT token
def get_jwt_token(node_address, port, username, password):
""" Retrieve JWT token from a node's login endpoint if it's secured. """
login_url = f"http://{node_address}:{port}/login"
payload = {'username': username, 'password': password}
try:
print(f"Attempting to retrieve JWT token from {login_url}...")
response = requests.post(login_url, headers={'Content-Type': 'application/x-www-form-urlencoded'}, data=payload)
response.raise_for_status()
print(f"Successfully retrieved JWT token from {login_url}")
return response.json().get('access_token')
except requests.exceptions.RequestException as e:
print(f"Failed to retrieve JWT token from {login_url}: {e}")
return None
# Environment variables for sensitive information
QKD1_ADDRESS = os.getenv("QKD1_ADDRESS")
QKD2_ADDRESS = os.getenv("QKD2_ADDRESS")
PORT = os.getenv("QKD_PORT")
USERNAME = os.getenv("QKD_USERNAME")
PASSWORD = os.getenv("QKD_PASSWORD")
# Pytest fixture to initialize QKDDriver with token for Node 1
@pytest.fixture
def driver_qkd1():
token = get_jwt_token(QKD1_ADDRESS, PORT, USERNAME, PASSWORD)
headers = {'Authorization': f'Bearer {token}'} if token else {}
return QKDDriver(address=QKD1_ADDRESS, port=PORT, headers=headers)
# Pytest fixture to initialize QKDDriver with token for Node 2
@pytest.fixture
def driver_qkd2():
token = get_jwt_token(QKD2_ADDRESS, PORT, USERNAME, PASSWORD)
headers = {'Authorization': f'Bearer {token}'} if token else {}
return QKDDriver(address=QKD2_ADDRESS, port=PORT, headers=headers)
# Utility function to save data to a JSON file, filtering out non-serializable objects
def save_json_file(filename, data):
serializable_data = filter_serializable(data)
with open(filename, 'w') as f:
json.dump(serializable_data, f, indent=2)
print(f"Saved data to {filename}")
# Function to filter out non-serializable objects like HTTPError
def filter_serializable(data):
if isinstance(data, list):
return [filter_serializable(item) for item in data if not isinstance(item, requests.exceptions.RequestException)]
elif isinstance(data, dict):
return {key: filter_serializable(value) for key, value in data.items() if not isinstance(value, requests.exceptions.RequestException)}
return data
# Utility function to print the retrieved data for debugging, handling errors
def print_data(label, data):
try:
print(f"{label}: {json.dumps(data, indent=2)}")
except TypeError as e:
print(f"Error printing {label}: {e}, Data: {data}")
# General function to retrieve and handle HTTP errors
def retrieve_data(driver_qkd, resource, resource_name):
try:
data = driver_qkd.GetConfig([resource])
assert isinstance(data, list), f"Expected a list for {resource_name}"
assert len(data) > 0, f"No {resource_name} found in the system"
return data
except requests.exceptions.HTTPError as e:
print(f"HTTPError while fetching {resource_name}: {e}")
return None
except AssertionError as e:
print(f"AssertionError: {e}")
return None
# Test ID: INT_LQ_Test_02 (QKD Node Capabilities)
def retrieve_capabilities(driver_qkd, node_name):
capabilities = retrieve_data(driver_qkd, RESOURCE_CAPABILITES, "capabilities")
if capabilities:
print_data(f"{node_name} Capabilities", capabilities)
return capabilities
# Test ID: INT_LQ_Test_03 (QKD Interfaces)
def retrieve_interfaces(driver_qkd, node_name):
interfaces = retrieve_data(driver_qkd, RESOURCE_INTERFACES, "interfaces")
if interfaces:
print_data(f"{node_name} Interfaces", interfaces)
return interfaces
# Test ID: INT_LQ_Test_04 (QKD Links)
def retrieve_links(driver_qkd, node_name):
links = retrieve_data(driver_qkd, RESOURCE_LINKS, "links")
if links:
print_data(f"{node_name} Links", links)
return links
# Test ID: INT_LQ_Test_05 (QKD Link Metrics)
def retrieve_link_metrics(driver_qkd, node_name):
links = retrieve_links(driver_qkd, node_name)
if links:
for link in links:
if 'performance_metrics' in link[1]:
print_data(f"{node_name} Link Metrics", link[1]['performance_metrics'])
else:
print(f"No metrics found for link {link[0]}")
return links
# Test ID: INT_LQ_Test_06 (QKD Applications)
def retrieve_applications(driver_qkd, node_name):
applications = retrieve_data(driver_qkd, RESOURCE_APPS, "applications")
if applications:
print_data(f"{node_name} Applications", applications)
return applications
# Test ID: INT_LQ_Test_07 (System Health Check)
def retrieve_node_data(driver_qkd, node_name):
node_data = retrieve_data(driver_qkd, RESOURCE_NODE, "node data")
if node_data:
print_data(f"{node_name} Node Data", node_data)
return node_data
# Main test to retrieve and save data from QKD1 and QKD2 to files
def test_retrieve_and_save_data(driver_qkd1, driver_qkd2):
# Retrieve data for QKD1
qkd1_interfaces = retrieve_interfaces(driver_qkd1, "QKD1")
qkd1_links = retrieve_links(driver_qkd1, "QKD1")
qkd1_capabilities = retrieve_capabilities(driver_qkd1, "QKD1")
qkd1_node_data = retrieve_node_data(driver_qkd1, "QKD1")
qkd1_apps = retrieve_applications(driver_qkd1, "QKD1")
qkd1_data = {
"interfaces": qkd1_interfaces,
"links": qkd1_links,
"capabilities": qkd1_capabilities,
"apps": qkd1_apps,
"node_data": qkd1_node_data
}
# Save QKD1 data to file
save_json_file('qkd1_data.json', qkd1_data)
# Retrieve data for QKD2
qkd2_interfaces = retrieve_interfaces(driver_qkd2, "QKD2")
qkd2_links = retrieve_links(driver_qkd2, "QKD2")
qkd2_capabilities = retrieve_capabilities(driver_qkd2, "QKD2")
qkd2_node_data = retrieve_node_data(driver_qkd2, "QKD2")
qkd2_apps = retrieve_applications(driver_qkd2, "QKD2")
qkd2_data = {
"interfaces": qkd2_interfaces,
"links": qkd2_links,
"capabilities": qkd2_capabilities,
"apps": qkd2_apps,
"node_data": qkd2_node_data
}
# Save QKD2 data to file
save_json_file('qkd2_data.json', qkd2_data)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest, os, time, logging
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_HTTP,
get_env_var_name, get_service_port_http
)
from context.client.ContextClient import ContextClient
from nbi.service.rest_server.RestServer import RestServer
from nbi.service.rest_server.nbi_plugins.tfs_api import register_tfs_api
from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from device.service.drivers import DRIVERS
from device.tests.CommonObjects import CONTEXT, TOPOLOGY
from device.tests.MockService_Dependencies import MockService_Dependencies
from monitoring.client.MonitoringClient import MonitoringClient
from requests import codes as requests_codes
import requests
# Constants
LOCAL_HOST = '127.0.0.1'
MOCKSERVICE_PORT = 8080
# Get dynamic port for NBI service
NBI_SERVICE_PORT = MOCKSERVICE_PORT + get_service_port_http(ServiceNameEnum.NBI)
# Set environment variables for the NBI service host and port
os.environ[get_env_var_name(ServiceNameEnum.NBI, ENVVAR_SUFIX_SERVICE_HOST)] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.NBI, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(NBI_SERVICE_PORT)
# Expected status codes for requests
EXPECTED_STATUS_CODES = {requests_codes['OK'], requests_codes['CREATED'], requests_codes['ACCEPTED'], requests_codes['NO_CONTENT']}
# Debugging output for the port number
print(f"MOCKSERVICE_PORT: {MOCKSERVICE_PORT}")
print(f"NBI_SERVICE_PORT: {NBI_SERVICE_PORT}")
@pytest.fixture(scope='session')
def mock_service():
_service = MockService_Dependencies(MOCKSERVICE_PORT)
_service.configure_env_vars()
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def nbi_service_rest(mock_service): # Pass the `mock_service` as an argument if needed
_rest_server = RestServer()
register_tfs_api(_rest_server) # Register the TFS API with the REST server
_rest_server.start()
time.sleep(1) # Give time for the server to start
yield _rest_server
_rest_server.shutdown()
_rest_server.join()
@pytest.fixture(scope='session')
def context_client(mock_service):
_client = ContextClient()
yield _client
_client.close()
@pytest.fixture(scope='session')
def device_service(context_client, monitoring_client):
_driver_factory = DriverFactory(DRIVERS)
_driver_instance_cache = DriverInstanceCache(_driver_factory)
_service = DeviceService(_driver_instance_cache)
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def device_client(device_service):
_client = DeviceClient()
yield _client
_client.close()
# General request function
def do_rest_request(method, url, body=None, timeout=10, allow_redirects=True, logger=None):
# Construct the request URL with NBI service port
request_url = f"http://{LOCAL_HOST}:{NBI_SERVICE_PORT}{url}"
# Log the request details for debugging
if logger:
msg = f"Request: {method.upper()} {request_url}"
if body:
msg += f" body={body}"
logger.warning(msg)
# Send the request
reply = requests.request(method, request_url, timeout=timeout, json=body, allow_redirects=allow_redirects)
# Log the response details for debugging
if logger:
logger.warning(f"Reply: {reply.text}")
# Print status code and response for debugging instead of asserting
print(f"Status code: {reply.status_code}")
print(f"Response: {reply.text}")
# Return the JSON response if present
if reply.content:
return reply.json()
return None
# Function for GET requests
def do_rest_get_request(url, body=None, timeout=10, allow_redirects=True, logger=None):
return do_rest_request('get', url, body, timeout, allow_redirects, logger=logger)
# Function for POST requests
def do_rest_post_request(url, body=None, timeout=10, allow_redirects=True, logger=None):
return do_rest_request('post', url, body, timeout, allow_redirects, logger=logger)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, urllib
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.proto.context_pb2 import ContextId
from common.tools.descriptor.Loader import DescriptorLoader
from context.client.ContextClient import ContextClient
from nbi.service.rest_server.RestServer import RestServer
from common.tools.object_factory.Context import json_context_id
from device.tests.qkd.unit.PrepareScenario import mock_service, nbi_service_rest, do_rest_get_request
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME)
ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID)
# ----- Context --------------------------------------------------------------------------------------------------------
def test_rest_get_context_ids(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument
reply = do_rest_get_request('/tfs-api/context_ids')
print("Context IDs:", reply)
def test_rest_get_contexts(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument
reply = do_rest_get_request('/tfs-api/contexts')
print("Contexts:", reply)
def test_rest_get_context(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
reply = do_rest_get_request(f'/tfs-api/context/{context_uuid}')
print("Context data:", reply)
# ----- Topology -------------------------------------------------------------------------------------------------------
def test_rest_get_topology_ids(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
reply = do_rest_get_request(f'/tfs-api/context/{context_uuid}/topology_ids')
print("Topology IDs:", reply)
def test_rest_get_topologies(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
reply = do_rest_get_request(f'/tfs-api/context/{context_uuid}/topologies')
print("Topologies:", reply)
def test_rest_get_topology(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
topology_uuid = urllib.parse.quote(DEFAULT_TOPOLOGY_NAME)
reply = do_rest_get_request(f'/tfs-api/context/{context_uuid}/topology/{topology_uuid}')
print("Topology data:", reply)
# ----- Device ---------------------------------------------------------------------------------------------------------
def test_rest_get_device_ids(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument
reply = do_rest_get_request('/tfs-api/device_ids')
print("Device IDs:", reply)
def test_rest_get_devices(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument
reply = do_rest_get_request('/tfs-api/devices')
print("Devices:", reply)
# ----- Link -----------------------------------------------------------------------------------------------------------
def test_rest_get_link_ids(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument
reply = do_rest_get_request('/tfs-api/link_ids')
print("Link IDs:", reply)
def test_rest_get_links(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument
reply = do_rest_get_request('/tfs-api/links')
print("Links:", reply)
# ----- Service --------------------------------------------------------------------------------------------------------
def test_rest_get_service_ids(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument
reply = do_rest_get_request('/tfs-api/link_ids')
print("Service IDs:", reply)
def test_rest_get_topologies(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
reply = do_rest_get_request(f'/tfs-api/context/{context_uuid}/services')
print("Services:", reply)
# ----- Apps -----------------------------------------------------------------------------------------------------------
def test_rest_get_apps(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME) # Context ID
reply = do_rest_get_request(f'/tfs-api/context/{context_uuid}/apps')
print("Apps:", reply)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
import json
from device.service.drivers.qkd.QKDDriver2 import QKDDriver
MOCK_QKD_ADDRRESS = '127.0.0.1'
MOCK_PORT = 11111
@pytest.fixture
def qkd_driver():
# Initialize the QKD driver with the appropriate settings
return QKDDriver(address=MOCK_QKD_ADDRRESS, port=MOCK_PORT, username='user', password='pass')
def test_application_deployment(qkd_driver):
qkd_driver.Connect()
# Application registration data
app_data = {
'qkd_app': [
{
'app_id': '00000001-0001-0000-0000-000000000001',
'client_app_id': [],
'app_statistics': {'statistics': []},
'app_qos': {},
'backing_qkdl_id': []
}
]
}
# Send a POST request to create the application
response = qkd_driver.SetConfig([('/qkd_applications/qkd_app', json.dumps(app_data))])
# Verify response
assert response[0] is True, "Expected application registration to succeed"