Skip to content
Snippets Groups Projects
Commit b565b186 authored by Carlos Manso's avatar Carlos Manso Committed by Lluis Gifre Renom
Browse files

Transport API Driver 0.1

parent 4138bf59
No related branches found
No related tags found
1 merge request!54Release 2.0.0
......@@ -11,4 +11,5 @@ pytest-benchmark
python-json-logger
pytz
redis
requests
xmltodict
import json, logging, requests
from device.service.driver_api._Driver import RESOURCE_ENDPOINTS
LOGGER = logging.getLogger(__name__)
def find_key(resource, key):
return json.loads(resource[1])[key]
def config_getter(root_url, resource_key, timeout):
url = '{:s}/restconf/data/tapi-common:context'.format(root_url)
result = []
try:
response = requests.get(url, timeout=timeout)
except requests.exceptions.Timeout:
LOGGER.exception('Timeout connecting {:s}'.format(url))
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception retrieving {:s}'.format(resource_key))
result.append((resource_key, e))
else:
context = json.loads(response.content)
if resource_key == RESOURCE_ENDPOINTS:
for sip in context['tapi-common:context']['service-interface-point']:
result.append(
('/endpoints/endpoint[{:s}]'.format(sip['uuid']), {'uuid': sip['uuid'], 'type': '10Gbps'}))
return result
def create_connectivity_service(
root_url, timeout, uuid, input_sip, output_sip, direction, capacity_value, capacity_unit, layer_protocol_name,
layer_protocol_qualifier):
url = '{:s}/restconf/data/tapi-common:context/tapi-connectivity:connectivity-context'.format(root_url)
headers = {'content-type': 'application/json'}
data = {
'tapi-connectivity:connectivity-service': [
{
'uuid': uuid,
'connectivity-constraint': {
'requested-capacity': {
'total-size': {
'value': capacity_value,
'unit': capacity_unit
}
},
'connectivity-direction': direction
},
'end-point': [
{
'service-interface-point': {
'service-interface-point-uuid': input_sip
},
'layer-protocol-name': layer_protocol_name,
'layer-protocol-qualifier': layer_protocol_qualifier,
'local-id': input_sip
},
{
'service-interface-point': {
'service-interface-point-uuid': output_sip
},
'layer-protocol-name': layer_protocol_name,
'layer-protocol-qualifier': layer_protocol_qualifier,
'local-id': output_sip
}
]
}
]
}
results = []
try:
LOGGER.info('Connectivity service {:s}: {:s}'.format(str(uuid), str(data)))
response = requests.post(url=url, data=json.dumps(data), timeout=timeout, headers=headers)
LOGGER.info('TAPI response: {:s}'.format(str(response)))
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception creating ConnectivityService(uuid={:s}, data={:s})'.format(str(uuid), str(data)))
results.append(e)
else:
if response.status_code != 201:
msg = 'Could not create ConnectivityService(uuid={:s}, data={:s}). status_code={:s} reply={:s}'
LOGGER.error(msg.format(str(uuid), str(data), str(response.status_code), str(response)))
results.append(response.status_code == 201)
return results
def delete_connectivity_service(root_url, timeout, uuid):
url = '{:s}/restconf/data/tapi-common:context/tapi-connectivity:connectivity-context/connectivity-service={:s}'
url = url.format(root_url, uuid)
results = []
try:
response = requests.delete(url=url, timeout=timeout)
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception deleting ConnectivityService(uuid={:s})'.format(str(uuid)))
results.append(e)
else:
if response.status_code != 201:
msg = 'Could not delete ConnectivityService(uuid={:s}). status_code={:s} reply={:s}'
LOGGER.error(msg.format(str(uuid), str(response.status_code), str(response)))
results.append(response.status_code == 202)
return results
import logging
import logging, requests, threading
from typing import Any, Iterator, List, Tuple, Union
from common.type_checkers.Checkers import chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from . import ALL_RESOURCE_KEYS
from .Tools import create_connectivity_service, find_key, config_getter, delete_connectivity_service
LOGGER = logging.getLogger(__name__)
# TODO: Implement TransportAPI Driver
class TransportApiDriver(_Driver):
pass
def __init__(self, address: str, port: int, **settings) -> None: # pylint: disable=super-init-not-called
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
self.__tapi_root = 'http://' + address + ':' + str(port)
self.__timeout = int(settings.get('timeout', 120))
def Connect(self) -> bool:
url = self.__tapi_root + '/restconf/data/tapi-common:context'
with self.__lock:
if self.__started.is_set(): return True
try:
requests.get(url, timeout=self.__timeout)
except requests.exceptions.Timeout:
LOGGER.exception('Timeout connecting {:s}'.format(str(self.__tapi_root)))
return False
except Exception: # pylint: disable=broad-except
LOGGER.exception('Exception connecting {:s}'.format(str(self.__tapi_root)))
return False
else:
self.__started.set()
return True
def Disconnect(self) -> bool:
with self.__lock:
self.__terminate.set()
return True
def GetInitialConfig(self) -> List[Tuple[str, Any]]:
with self.__lock:
return []
def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
chk_type('resources', resource_keys, list)
results = []
with self.__lock:
if len(resource_keys) == 0: resource_keys = ALL_RESOURCE_KEYS
for i, resource_key in enumerate(resource_keys):
str_resource_name = 'resource_key[#{:d}]'.format(i)
chk_string(str_resource_name, resource_key, allow_empty=False)
results.extend(config_getter(self.__tapi_root, resource_key, self.__timeout))
return results
def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
results = []
if len(resources) == 0:
return results
with self.__lock:
for resource in resources:
LOGGER.info('resource = {:s}'.format(str(resource)))
input_sip = find_key(resource, 'input_sip')
output_sip = find_key(resource, 'output_sip')
uuid = find_key(resource, 'uuid')
capacity_value = find_key(resource, 'capacity_value')
capacity_unit = find_key(resource, 'capacity_unit')
layer_protocol_name = find_key(resource, 'layer_protocol_name')
layer_protocol_qualifier = find_key(resource, 'layer_protocol_qualifier')
direction = find_key(resource, 'direction')
data = create_connectivity_service(
self.__tapi_root, self.__timeout, uuid, input_sip, output_sip, direction, capacity_value,
capacity_unit, layer_protocol_name, layer_protocol_qualifier)
results.extend(data)
return results
def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
results = []
if len(resources) == 0: return results
with self.__lock:
for resource in resources:
LOGGER.info('resource = {:s}'.format(str(resource)))
uuid = find_key(resource, 'uuid')
results.extend(delete_connectivity_service(self.__tapi_root, self.__timeout, uuid))
return results
def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
# TODO: TAPI does not support monitoring by now
return [False for _ in subscriptions]
def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
# TODO: TAPI does not support monitoring by now
return [False for _ in subscriptions]
def GetState(self, blocking=False) -> Iterator[Tuple[float, str, Any]]:
# TODO: TAPI does not support monitoring by now
return []
from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
ALL_RESOURCE_KEYS = [
RESOURCE_ENDPOINTS,
RESOURCE_INTERFACES,
RESOURCE_NETWORK_INSTANCES,
]
RESOURCE_KEY_MAPPINGS = {
RESOURCE_ENDPOINTS : 'component',
RESOURCE_INTERFACES : 'interface',
RESOURCE_NETWORK_INSTANCES: 'network_instance',
}
from copy import deepcopy
from device.proto.context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum
from .Tools import config_rule_set, config_rule_delete
# use "deepcopy" to prevent propagating forced changes during tests
DEVICE_TAPI_UUID = 'DEVICE-TAPI'
DEVICE_TAPI_TYPE = 'optical-line-system'
DEVICE_TAPI_ADDRESS = '0.0.0.0'
DEVICE_TAPI_PORT = '4900'
DEVICE_TAPI_TIMEOUT = '120'
DEVICE_TAPI_DRIVERS = [DeviceDriverEnum.DEVICEDRIVER_TRANSPORT_API]
DEVICE_TAPI_ID = {'device_uuid': {'uuid': DEVICE_TAPI_UUID}}
DEVICE_TAPI = {
'device_id': deepcopy(DEVICE_TAPI_ID),
'device_type': DEVICE_TAPI_TYPE,
'device_config': {'config_rules': []},
'device_operational_status': DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED,
'device_drivers': DEVICE_TAPI_DRIVERS,
'device_endpoints': [],
}
DEVICE_TAPI_CONNECT_RULES = [
config_rule_set('_connect/address', DEVICE_TAPI_ADDRESS),
config_rule_set('_connect/port', DEVICE_TAPI_PORT),
config_rule_set('_connect/timeout', DEVICE_TAPI_TIMEOUT),
]
DEVICE_TAPI_CONFIG_RULES = [
config_rule_set('network_instance[DemoOFC-NetInst]/interface[13/1/3]', {
'name': 'DemoOFC-NetInst', 'id': '13/1/3',
})
]
DEVICE_TAPI_DECONFIG_RULES = [
config_rule_delete('network_instance[DemoOFC-NetInst]/interface[13/1/3]', {
'name': 'DemoOFC-NetInst', 'id': '13/1/3'
})
]
\ No newline at end of file
......@@ -49,6 +49,21 @@ except ImportError:
# DEVICE_OC, DEVICE_OC_CONFIG_RULES, DEVICE_OC_DECONFIG_RULES, DEVICE_OC_CONNECT_RULES, DEVICE_OC_ID,
# DEVICE_OC_UUID)
try:
from .Device_Transport_Api_CTTC import (
DEVICE_TAPI, DEVICE_TAPI_CONNECT_RULES, DEVICE_TAPI_UUID, DEVICE_TAPI_ID, DEVICE_TAPI_CONFIG_RULES,
DEVICE_TAPI_DECONFIG_RULES)
ENABLE_TAPI = True
except ImportError:
ENABLE_TAPI = False
# Create a Device_Transport_Api_??.py file with the details for your device to test it and import it as follows in
# the try block of this import statement.
# from .Device_Transport_Api_?? import(
# DEVICE_TAPI, DEVICE_TAPI_CONFIG_RULES, DEVICE_TAPI_DECONFIG_RULES, DEVICE_TAPI_CONNECT_RULES,
# DEVICE_TAPI_ID, DEVICE_TAPI_UUID)
#ENABLE_OPENCONFIG = False
#ENABLE_TAPI = False
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
......@@ -586,3 +601,130 @@ def test_device_openconfig_delete(
device_client.DeleteDevice(DeviceId(**DEVICE_OC_ID))
driver : _Driver = device_service.driver_instance_cache.get(DEVICE_OC_UUID, {})
assert driver is None
# ----- Test Device Driver TAPI ----------------------------------------------------------------------------------
def test_device_tapi_add_error_cases(
device_client : DeviceClient): # pylint: disable=redefined-outer-name
if not ENABLE_TAPI: return # if there is no device to test against, asusme test is silently passed.
with pytest.raises(grpc.RpcError) as e:
DEVICE_TAPI_WITH_EXTRA_RULES = copy.deepcopy(DEVICE_TAPI)
DEVICE_TAPI_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_TAPI_CONNECT_RULES)
DEVICE_TAPI_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_TAPI_CONFIG_RULES)
device_client.AddDevice(Device(**DEVICE_TAPI_WITH_EXTRA_RULES))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg_head = 'device.device_config.config_rules(['
msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\
'with "_connect/" tag. Others should be configured after adding the device.'
except_msg = str(e.value.details())
assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail)
def test_device_tapi_add_correct(
device_client: DeviceClient, # pylint: disable=redefined-outer-name
device_service: DeviceService): # pylint: disable=redefined-outer-name
if not ENABLE_TAPI: return # if there is no device to test against, asusme test is silently passed.
DEVICE_TAPI_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_TAPI)
DEVICE_TAPI_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_TAPI_CONNECT_RULES)
device_client.AddDevice(Device(**DEVICE_TAPI_WITH_CONNECT_RULES))
driver: _Driver = device_service.driver_instance_cache.get(DEVICE_TAPI_UUID)
assert driver is not None
def test_device_tapi_get(
context_client: ContextClient, # pylint: disable=redefined-outer-name
device_client: DeviceClient): # pylint: disable=redefined-outer-name
if not ENABLE_TAPI: return # if there is no device to test against, asusme test is silently passed.
initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_TAPI_ID))
LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config)))
device_data = context_client.GetDevice(DeviceId(**DEVICE_TAPI_ID))
LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data)))
def test_device_tapi_configure(
context_client: ContextClient, # pylint: disable=redefined-outer-name
device_client: DeviceClient, # pylint: disable=redefined-outer-name
device_service: DeviceService): # pylint: disable=redefined-outer-name
if not ENABLE_TAPI: return # if there is no device to test against, asusme test is silently passed.
driver : _Driver = device_service.driver_instance_cache.get(DEVICE_TAPI_UUID)
assert driver is not None
# Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly.
#driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
#LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
DEVICE_TAPI_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_TAPI)
DEVICE_TAPI_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_TAPI_CONFIG_RULES)
device_client.ConfigureDevice(Device(**DEVICE_TAPI_WITH_CONFIG_RULES))
# Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly.
#driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
#LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
device_data = context_client.GetDevice(DeviceId(**DEVICE_TAPI_ID))
config_rules = [
(ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value)
for config_rule in device_data.device_config.config_rules
]
LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format(
'\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules])))
for config_rule in DEVICE_TAPI_CONFIG_RULES:
config_rule = (
ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], config_rule['resource_value'])
assert config_rule in config_rules
def test_device_tapi_deconfigure(
context_client: ContextClient, # pylint: disable=redefined-outer-name
device_client: DeviceClient, # pylint: disable=redefined-outer-name
device_service: DeviceService): # pylint: disable=redefined-outer-name
if not ENABLE_TAPI: return # if there is no device to test against, asusme test is silently passed.
driver: _Driver = device_service.driver_instance_cache.get(DEVICE_TAPI_UUID)
assert driver is not None
# Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly.
#driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
#LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
DEVICE_TAPI_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_TAPI)
DEVICE_TAPI_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_TAPI_DECONFIG_RULES)
device_client.ConfigureDevice(Device(**DEVICE_TAPI_WITH_DECONFIG_RULES))
# Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly.
#driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
#LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
device_data = context_client.GetDevice(DeviceId(**DEVICE_TAPI_ID))
config_rules = [
(ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value)
for config_rule in device_data.device_config.config_rules
]
LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format(
'\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules])))
for config_rule in DEVICE_TAPI_DECONFIG_RULES:
action_set = ConfigActionEnum.Name(ConfigActionEnum.CONFIGACTION_SET)
config_rule = (action_set, config_rule['resource_key'], config_rule['resource_value'])
assert config_rule not in config_rules
def test_device_tapi_delete(
device_client : DeviceClient, # pylint: disable=redefined-outer-name
device_service : DeviceService): # pylint: disable=redefined-outer-name
if not ENABLE_TAPI: return # if there is no device to test against, asusme test is silently passed.
device_client.DeleteDevice(DeviceId(**DEVICE_TAPI_ID))
driver : _Driver = device_service.driver_instance_cache.get(DEVICE_TAPI_UUID, {})
assert driver is None
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment