Loading src/device/service/drivers/restconf/RestconfDriver.py 0 → 100644 +78 −0 Original line number Diff line number Diff line import logging, json, threading from typing import List, Tuple, Any, Union, Optional, Iterator from .handlers import ALL_RESOURCE_KEYS, get_path, parse, compose from .RestconfHandler import RestconfHandler from device.service.driver_api._Driver import _Driver DRIVER_NAME = 'restconf' class RestconfDriver(_Driver): def __init__(self, address : str, port : int, **settings) -> None: super().__init__(DRIVER_NAME, address, port, **settings) self.__logger = logging.getLogger('{:s}:[{:s}:{:s}]'.format(str(__name__), str(self.address), str(self.port))) self.__restconf_handler = RestconfHandler(self.address, self.port, **(self.settings)) def Connect(self) -> bool: return self.__restconf_handler.is_endpoint_available def Disconnect(self) -> bool: return True def GetInitialConfig(self) -> List[Tuple[str, Any]]: return [] def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: results = [] if len(resource_keys) == 0: resource_keys = ALL_RESOURCE_KEYS for i,resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) try: resource_path = get_path(resource_key) if resource_path is None: resource_path = resource_key data = self.__restconf_handler.get(resource_path) results.extend(parse(resource_path, data)) except Exception as e: MSG = 'Exception retrieving {:s}: {:s}' self.__logger.exception(MSG.format(str_resource_name, str(resource_key))) results.append((resource_key, e)) return results def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: if len(resources) == 0: return [] results = [] for resource, data in resources: if type(data) == str: try: data = json.loads(data) except json.decoder.JSONDecodeError: results.append((resource, True)) continue compose_result = compose(resource, data) results.append(compose_result if compose_result is not None else (resource, Exception('Unexpected'))) return self.__restconf_handler.set(results) def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: if len(resources) == 0: return [] return self.__restconf_handler.delete([ key for key, _ in resources ]) def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: return [ True for _ in subscriptions ] def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: return [ True for _ in subscriptions ] def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[float, str, Any]]: return [] src/device/service/drivers/restconf/RestconfHandler.py 0 → 100644 +72 −0 Original line number Diff line number Diff line import requests, logging, json import xml.etree.ElementTree as ET from typing import List, Tuple, Any, Union class RestconfHandler: def __init__(self, address : str, port : int, **settings) -> None: self.__address = address self.__port = int(port) self.__base_path = '' self.__logger = logging.getLogger('{:s}:[{:s}:{:s}]'.format(str(__name__), str(self.__address), str(self.__port))) @property def _base_path(self) -> str: return f"http://{self.__address}:{self.__port}/{self.__base_path}".strip('/') @property def is_endpoint_available(self) -> bool: response = requests.get(f"{self._base_path}/.well-known/host-meta") if response.status_code != 200: return False response = ET.fromstring(response.text) restconf_link = response.find("xrd:Link[@rel='restconf']", {'xrd': 'http://docs.oasis-open.org/ns/xri/xrd-1.0'}) if restconf_link is None or 'href' not in restconf_link.attrib: return False self.__base_path = restconf_link.attrib.get('href').strip('/') return True def get(self, resource: str) -> dict: resource = resource.strip('/') response = requests.get(f"{self._base_path}/data/{resource}") return response.json() def set(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: is_valid_data = lambda resource_key, resource_value: (not resource_key.startswith('_connect/') )and ((not isinstance(resource_value, bool)) and (not isinstance(resource_value, Exception))) results = [ (resource_key, resource_value) for resource_key, resource_value in resources if not is_valid_data(resource_key, resource_value) ] resources = [ (resource_key, json.loads(resource_value)) if isinstance(resource_value, str) else (resource_key, resource_value) for resource_key, resource_value in resources if is_valid_data(resource_key, resource_value) ] delete_resources = [ resource_key for resource_key, resource_value in resources if len(resource_value) == 0 ] response = self.delete(delete_resources) self.__logger.debug(response) for resource_key, resource_value in resources: if resource_key in delete_resources: continue if isinstance(resource_value, bool) or isinstance(resource_value, Exception): results.append((resource_key, resource_value)) continue response = requests.put(f"{self._base_path}/data/{resource_key.strip('/')}", json=resource_value) response_success = response.status_code in [200, 201] results.append((resource_key, True if response_success else Exception('Unexpected'))) return results def delete(self, resources : List[str]) -> List[Union[bool, Exception]]: results = [] for resource_key in resources: response = requests.delete(path) response_success = response.status_code in [200, 201] results.append((resource_key, True if response_success else Exception('Unexpected'))) return results src/device/service/drivers/restconf/handlers/Component.py 0 → 100644 +52 −0 Original line number Diff line number Diff line import re import json import logging from typing import Dict, Tuple from ._Handler import _Handler from common.proto.kpi_sample_types_pb2 import KpiSampleType PATH_IF_CTR = '/oci:interfaces/interface={:s}/state/counters/{:s}' class ComponentHandler(_Handler): def __init__(self): self.__logger = logging.getLogger(str(__name__)) def get_resource_key(self): return '/endpoints/endpoint' def get_path(self): return '/ocp:components' def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]: return resource_key, resource_value def parse(self, json_data : Dict): self.__logger.debug('json_data = {:s}'.format(json.dumps(json_data))) entries = [] for component in json_data['components']: self.__logger.debug('component={:s}'.format(str(component))) component_name = component['name'] component_state = component.get('state', {}) component_type = component_state.get('type') if component_type is None: continue component_type = component_type.split(':')[-1] if component_type not in {'PORT'}: continue # TODO: improve mapping between interface name and component name # By now, computed by time for the sake of saving time for the Hackfest. interface_name = re.sub(r'\-[pP][oO][rR][tT]', '', component_name) endpoint = {'uuid': interface_name, 'type': '-'} endpoint['sample_types'] = { KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED : PATH_IF_CTR.format(interface_name, 'in-octets' ), KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED : PATH_IF_CTR.format(interface_name, 'out-octets'), KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED : PATH_IF_CTR.format(interface_name, 'in-pkts' ), KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED: PATH_IF_CTR.format(interface_name, 'out-pkts' ), } entries.append(('/endpoints/endpoint[{:s}]'.format(endpoint['uuid']), endpoint)) return entries src/device/service/drivers/restconf/handlers/Interface.py 0 → 100644 +145 −0 Original line number Diff line number Diff line import json import logging from typing import Dict, Tuple from ._Handler import _Handler class InterfaceHandler(_Handler): def __init__(self): self.__logger = logging.getLogger(str(__name__)) def get_resource_key(self): return '/interface' def get_path(self): return '/oci:interfaces' def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]: if_name = resource_value.get('name') if_index = int(resource_value.get('index', 0)) str_path = '/interfaces/interface={:s}'.format(if_name) if delete: return str_path, {} prefix = resource_value.get('prefix') enabled = bool(resource_value.get('enabled', True)) address_ip = resource_value.get('address_ip') address_prefix = int(resource_value.get('address_prefix', 32)) mtu = int(resource_value.get('mtu')) data = { 'config': { 'index': if_index, 'name': if_name, 'enabled': enabled, 'mtu': mtu }, 'ipv4': { 'config': { 'enabled': enabled }, 'addresses': [{ 'prefix': prefix, 'ip': address_ip, 'prefix-length': address_prefix, 'mtu': mtu }] }, } return str_path, data def parse(self, json_data : Dict): self.__logger.debug('json_data = {:s}'.format(json.dumps(json_data))) entries = [] for interface in json_data['interfaces']: self.__logger.debug('interface={:s}'.format(str(interface))) interface_name = interface['name'] interface_config = interface.get('config', {}) interface_state = interface.get('state', {}) interface_type = interface_state.get('type') if interface_type is None: continue interface_type = interface_type.split(':')[-1] if interface_type not in {'ether'}: continue _interface = { 'name' : interface_name, 'type' : interface_type, 'mtu' : interface_state['mtu'], 'admin-status' : interface_state['admin-status'], 'oper-status' : interface_state['oper-status'], 'management' : interface_state['management'], } if not interface_state['management'] and 'ifindex' in interface_state: _interface['ifindex'] = interface_state['ifindex'] if 'description' in interface_config: _interface['description'] = interface_config['description'] if 'enabled' in interface_config: _interface['enabled'] = interface_config['enabled'] if 'hardware-port' in interface_state: _interface['hardware-port'] = interface_state['hardware-port'] if 'transceiver' in interface_state: _interface['transceiver'] = interface_state['transceiver'] entry_interface_key = '/interface[{:s}]'.format(interface_name) entries.append((entry_interface_key, _interface)) if interface_type == 'ether': ethernet_state = interface['ethernet']['state'] _ethernet = { # TODO Fill these fields in the wrapper # 'mac-address' : ethernet_state['mac-address'], # 'hw-mac-address' : ethernet_state['hw-mac-address'], # 'port-speed' : ethernet_state['port-speed'].split(':')[-1], # 'negotiated-port-speed' : ethernet_state['negotiated-port-speed'].split(':')[-1], } entry_ethernet_key = '{:s}/ethernet'.format(entry_interface_key) entries.append((entry_ethernet_key, _ethernet)) # TODO Validate need for subinterfaces # subinterfaces = interface.get('subinterfaces', {}).get('subinterface', []) # for subinterface in subinterfaces: # self.__logger.debug('subinterface={:s}'.format(str(subinterface))) # subinterface_index = subinterface['index'] # subinterface_state = subinterface.get('state', {}) # _subinterface = {'index': subinterface_index} # if 'name' in subinterface_state: # _subinterface['name'] = subinterface_state['name'] # if 'enabled' in subinterface_state: # _subinterface['enabled'] = subinterface_state['enabled'] # if 'vlan' in subinterface: # vlan = subinterface['vlan'] # vlan_match = vlan['match'] # single_tagged = vlan_match.pop('single-tagged', None) # if single_tagged is not None: # single_tagged_config = single_tagged['config'] # vlan_id = single_tagged_config['vlan-id'] # _subinterface['vlan_id'] = vlan_id # if len(vlan_match) > 0: # raise Exception('Unsupported VLAN schema: {:s}'.format(str(vlan))) # ipv4_addresses = subinterface.get('ipv4', {}).get('addresses', {}).get('address', []) # if len(ipv4_addresses) > 1: # raise Exception('Multiple IPv4 Addresses not supported: {:s}'.format(str(ipv4_addresses))) # for ipv4_address in ipv4_addresses: # self.__logger.debug('ipv4_address={:s}'.format(str(ipv4_address))) # _subinterface['address_ip'] = ipv4_address['ip'] # ipv4_address_state = ipv4_address.get('state', {}) # #if 'origin' in ipv4_address_state: # # _subinterface['origin'] = ipv4_address_state['origin'] # if 'prefix-length' in ipv4_address_state: # _subinterface['address_prefix'] = ipv4_address_state['prefix-length'] # ipv6_addresses = subinterface.get('ipv6', {}).get('addresses', {}).get('address', []) # if len(ipv6_addresses) > 1: # raise Exception('Multiple IPv6 Addresses not supported: {:s}'.format(str(ipv6_addresses))) # for ipv6_address in ipv6_addresses: # self.__logger.debug('ipv6_address={:s}'.format(str(ipv6_address))) # _subinterface['address_ipv6'] = ipv6_address['ip'] # ipv6_address_state = ipv6_address.get('state', {}) # #if 'origin' in ipv6_address_state: # # _subinterface['origin_ipv6'] = ipv6_address_state['origin'] # if 'prefix-length' in ipv6_address_state: # _subinterface['address_prefix_ipv6'] = ipv6_address_state['prefix-length'] # entry_subinterface_key = '{:s}/subinterface[{:d}]'.format(entry_interface_key, subinterface_index) # entries.append((entry_subinterface_key, _subinterface)) return entries src/device/service/drivers/restconf/handlers/InterfaceLimit.py 0 → 100644 +54 −0 Original line number Diff line number Diff line import logging from typing import Dict, Tuple from ._Handler import _Handler SERVICE_LIMIT_NAMES = ("latency", "corrupt", "duplicate", "loss", "reorder", "rate") class InterfaceLimitHandler(_Handler): def __init__(self): self.__logger = logging.getLogger(str(__name__)) def get_resource_key(self): return '/interface/service-limits' def get_path(self): return '/oci:interfaces/interface/service-limits' def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]: if_name = resource_value['interface'] if delete: PATH_TMPL = '/interfaces/interface={:s}/service-limits' str_path = PATH_TMPL.format(if_name) return str_path, {} data = { 'interface': if_name } for key, value in resource_value.items(): if key not in SERVICE_LIMIT_NAMES: continue if key == 'latency': config = { 'delay': value['delay'], 'jitter': value.get('jitter', ''), 'correlation': value.get('correlation', '') } elif key == 'rate': config = { 'size': value['rate'] } else: config = { 'percent': value['percent'], 'correlation': value.get('correlation', ''), } data[key] = config str_path = '/interfaces/interface={:s}/service-limits'.format(if_name) return str_path, data def parse(self, json_data : Dict): # TODO Implement return [] Loading
src/device/service/drivers/restconf/RestconfDriver.py 0 → 100644 +78 −0 Original line number Diff line number Diff line import logging, json, threading from typing import List, Tuple, Any, Union, Optional, Iterator from .handlers import ALL_RESOURCE_KEYS, get_path, parse, compose from .RestconfHandler import RestconfHandler from device.service.driver_api._Driver import _Driver DRIVER_NAME = 'restconf' class RestconfDriver(_Driver): def __init__(self, address : str, port : int, **settings) -> None: super().__init__(DRIVER_NAME, address, port, **settings) self.__logger = logging.getLogger('{:s}:[{:s}:{:s}]'.format(str(__name__), str(self.address), str(self.port))) self.__restconf_handler = RestconfHandler(self.address, self.port, **(self.settings)) def Connect(self) -> bool: return self.__restconf_handler.is_endpoint_available def Disconnect(self) -> bool: return True def GetInitialConfig(self) -> List[Tuple[str, Any]]: return [] def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: results = [] if len(resource_keys) == 0: resource_keys = ALL_RESOURCE_KEYS for i,resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) try: resource_path = get_path(resource_key) if resource_path is None: resource_path = resource_key data = self.__restconf_handler.get(resource_path) results.extend(parse(resource_path, data)) except Exception as e: MSG = 'Exception retrieving {:s}: {:s}' self.__logger.exception(MSG.format(str_resource_name, str(resource_key))) results.append((resource_key, e)) return results def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: if len(resources) == 0: return [] results = [] for resource, data in resources: if type(data) == str: try: data = json.loads(data) except json.decoder.JSONDecodeError: results.append((resource, True)) continue compose_result = compose(resource, data) results.append(compose_result if compose_result is not None else (resource, Exception('Unexpected'))) return self.__restconf_handler.set(results) def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: if len(resources) == 0: return [] return self.__restconf_handler.delete([ key for key, _ in resources ]) def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: return [ True for _ in subscriptions ] def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: return [ True for _ in subscriptions ] def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[float, str, Any]]: return []
src/device/service/drivers/restconf/RestconfHandler.py 0 → 100644 +72 −0 Original line number Diff line number Diff line import requests, logging, json import xml.etree.ElementTree as ET from typing import List, Tuple, Any, Union class RestconfHandler: def __init__(self, address : str, port : int, **settings) -> None: self.__address = address self.__port = int(port) self.__base_path = '' self.__logger = logging.getLogger('{:s}:[{:s}:{:s}]'.format(str(__name__), str(self.__address), str(self.__port))) @property def _base_path(self) -> str: return f"http://{self.__address}:{self.__port}/{self.__base_path}".strip('/') @property def is_endpoint_available(self) -> bool: response = requests.get(f"{self._base_path}/.well-known/host-meta") if response.status_code != 200: return False response = ET.fromstring(response.text) restconf_link = response.find("xrd:Link[@rel='restconf']", {'xrd': 'http://docs.oasis-open.org/ns/xri/xrd-1.0'}) if restconf_link is None or 'href' not in restconf_link.attrib: return False self.__base_path = restconf_link.attrib.get('href').strip('/') return True def get(self, resource: str) -> dict: resource = resource.strip('/') response = requests.get(f"{self._base_path}/data/{resource}") return response.json() def set(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: is_valid_data = lambda resource_key, resource_value: (not resource_key.startswith('_connect/') )and ((not isinstance(resource_value, bool)) and (not isinstance(resource_value, Exception))) results = [ (resource_key, resource_value) for resource_key, resource_value in resources if not is_valid_data(resource_key, resource_value) ] resources = [ (resource_key, json.loads(resource_value)) if isinstance(resource_value, str) else (resource_key, resource_value) for resource_key, resource_value in resources if is_valid_data(resource_key, resource_value) ] delete_resources = [ resource_key for resource_key, resource_value in resources if len(resource_value) == 0 ] response = self.delete(delete_resources) self.__logger.debug(response) for resource_key, resource_value in resources: if resource_key in delete_resources: continue if isinstance(resource_value, bool) or isinstance(resource_value, Exception): results.append((resource_key, resource_value)) continue response = requests.put(f"{self._base_path}/data/{resource_key.strip('/')}", json=resource_value) response_success = response.status_code in [200, 201] results.append((resource_key, True if response_success else Exception('Unexpected'))) return results def delete(self, resources : List[str]) -> List[Union[bool, Exception]]: results = [] for resource_key in resources: response = requests.delete(path) response_success = response.status_code in [200, 201] results.append((resource_key, True if response_success else Exception('Unexpected'))) return results
src/device/service/drivers/restconf/handlers/Component.py 0 → 100644 +52 −0 Original line number Diff line number Diff line import re import json import logging from typing import Dict, Tuple from ._Handler import _Handler from common.proto.kpi_sample_types_pb2 import KpiSampleType PATH_IF_CTR = '/oci:interfaces/interface={:s}/state/counters/{:s}' class ComponentHandler(_Handler): def __init__(self): self.__logger = logging.getLogger(str(__name__)) def get_resource_key(self): return '/endpoints/endpoint' def get_path(self): return '/ocp:components' def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]: return resource_key, resource_value def parse(self, json_data : Dict): self.__logger.debug('json_data = {:s}'.format(json.dumps(json_data))) entries = [] for component in json_data['components']: self.__logger.debug('component={:s}'.format(str(component))) component_name = component['name'] component_state = component.get('state', {}) component_type = component_state.get('type') if component_type is None: continue component_type = component_type.split(':')[-1] if component_type not in {'PORT'}: continue # TODO: improve mapping between interface name and component name # By now, computed by time for the sake of saving time for the Hackfest. interface_name = re.sub(r'\-[pP][oO][rR][tT]', '', component_name) endpoint = {'uuid': interface_name, 'type': '-'} endpoint['sample_types'] = { KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED : PATH_IF_CTR.format(interface_name, 'in-octets' ), KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED : PATH_IF_CTR.format(interface_name, 'out-octets'), KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED : PATH_IF_CTR.format(interface_name, 'in-pkts' ), KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED: PATH_IF_CTR.format(interface_name, 'out-pkts' ), } entries.append(('/endpoints/endpoint[{:s}]'.format(endpoint['uuid']), endpoint)) return entries
src/device/service/drivers/restconf/handlers/Interface.py 0 → 100644 +145 −0 Original line number Diff line number Diff line import json import logging from typing import Dict, Tuple from ._Handler import _Handler class InterfaceHandler(_Handler): def __init__(self): self.__logger = logging.getLogger(str(__name__)) def get_resource_key(self): return '/interface' def get_path(self): return '/oci:interfaces' def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]: if_name = resource_value.get('name') if_index = int(resource_value.get('index', 0)) str_path = '/interfaces/interface={:s}'.format(if_name) if delete: return str_path, {} prefix = resource_value.get('prefix') enabled = bool(resource_value.get('enabled', True)) address_ip = resource_value.get('address_ip') address_prefix = int(resource_value.get('address_prefix', 32)) mtu = int(resource_value.get('mtu')) data = { 'config': { 'index': if_index, 'name': if_name, 'enabled': enabled, 'mtu': mtu }, 'ipv4': { 'config': { 'enabled': enabled }, 'addresses': [{ 'prefix': prefix, 'ip': address_ip, 'prefix-length': address_prefix, 'mtu': mtu }] }, } return str_path, data def parse(self, json_data : Dict): self.__logger.debug('json_data = {:s}'.format(json.dumps(json_data))) entries = [] for interface in json_data['interfaces']: self.__logger.debug('interface={:s}'.format(str(interface))) interface_name = interface['name'] interface_config = interface.get('config', {}) interface_state = interface.get('state', {}) interface_type = interface_state.get('type') if interface_type is None: continue interface_type = interface_type.split(':')[-1] if interface_type not in {'ether'}: continue _interface = { 'name' : interface_name, 'type' : interface_type, 'mtu' : interface_state['mtu'], 'admin-status' : interface_state['admin-status'], 'oper-status' : interface_state['oper-status'], 'management' : interface_state['management'], } if not interface_state['management'] and 'ifindex' in interface_state: _interface['ifindex'] = interface_state['ifindex'] if 'description' in interface_config: _interface['description'] = interface_config['description'] if 'enabled' in interface_config: _interface['enabled'] = interface_config['enabled'] if 'hardware-port' in interface_state: _interface['hardware-port'] = interface_state['hardware-port'] if 'transceiver' in interface_state: _interface['transceiver'] = interface_state['transceiver'] entry_interface_key = '/interface[{:s}]'.format(interface_name) entries.append((entry_interface_key, _interface)) if interface_type == 'ether': ethernet_state = interface['ethernet']['state'] _ethernet = { # TODO Fill these fields in the wrapper # 'mac-address' : ethernet_state['mac-address'], # 'hw-mac-address' : ethernet_state['hw-mac-address'], # 'port-speed' : ethernet_state['port-speed'].split(':')[-1], # 'negotiated-port-speed' : ethernet_state['negotiated-port-speed'].split(':')[-1], } entry_ethernet_key = '{:s}/ethernet'.format(entry_interface_key) entries.append((entry_ethernet_key, _ethernet)) # TODO Validate need for subinterfaces # subinterfaces = interface.get('subinterfaces', {}).get('subinterface', []) # for subinterface in subinterfaces: # self.__logger.debug('subinterface={:s}'.format(str(subinterface))) # subinterface_index = subinterface['index'] # subinterface_state = subinterface.get('state', {}) # _subinterface = {'index': subinterface_index} # if 'name' in subinterface_state: # _subinterface['name'] = subinterface_state['name'] # if 'enabled' in subinterface_state: # _subinterface['enabled'] = subinterface_state['enabled'] # if 'vlan' in subinterface: # vlan = subinterface['vlan'] # vlan_match = vlan['match'] # single_tagged = vlan_match.pop('single-tagged', None) # if single_tagged is not None: # single_tagged_config = single_tagged['config'] # vlan_id = single_tagged_config['vlan-id'] # _subinterface['vlan_id'] = vlan_id # if len(vlan_match) > 0: # raise Exception('Unsupported VLAN schema: {:s}'.format(str(vlan))) # ipv4_addresses = subinterface.get('ipv4', {}).get('addresses', {}).get('address', []) # if len(ipv4_addresses) > 1: # raise Exception('Multiple IPv4 Addresses not supported: {:s}'.format(str(ipv4_addresses))) # for ipv4_address in ipv4_addresses: # self.__logger.debug('ipv4_address={:s}'.format(str(ipv4_address))) # _subinterface['address_ip'] = ipv4_address['ip'] # ipv4_address_state = ipv4_address.get('state', {}) # #if 'origin' in ipv4_address_state: # # _subinterface['origin'] = ipv4_address_state['origin'] # if 'prefix-length' in ipv4_address_state: # _subinterface['address_prefix'] = ipv4_address_state['prefix-length'] # ipv6_addresses = subinterface.get('ipv6', {}).get('addresses', {}).get('address', []) # if len(ipv6_addresses) > 1: # raise Exception('Multiple IPv6 Addresses not supported: {:s}'.format(str(ipv6_addresses))) # for ipv6_address in ipv6_addresses: # self.__logger.debug('ipv6_address={:s}'.format(str(ipv6_address))) # _subinterface['address_ipv6'] = ipv6_address['ip'] # ipv6_address_state = ipv6_address.get('state', {}) # #if 'origin' in ipv6_address_state: # # _subinterface['origin_ipv6'] = ipv6_address_state['origin'] # if 'prefix-length' in ipv6_address_state: # _subinterface['address_prefix_ipv6'] = ipv6_address_state['prefix-length'] # entry_subinterface_key = '{:s}/subinterface[{:d}]'.format(entry_interface_key, subinterface_index) # entries.append((entry_subinterface_key, _subinterface)) return entries
src/device/service/drivers/restconf/handlers/InterfaceLimit.py 0 → 100644 +54 −0 Original line number Diff line number Diff line import logging from typing import Dict, Tuple from ._Handler import _Handler SERVICE_LIMIT_NAMES = ("latency", "corrupt", "duplicate", "loss", "reorder", "rate") class InterfaceLimitHandler(_Handler): def __init__(self): self.__logger = logging.getLogger(str(__name__)) def get_resource_key(self): return '/interface/service-limits' def get_path(self): return '/oci:interfaces/interface/service-limits' def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]: if_name = resource_value['interface'] if delete: PATH_TMPL = '/interfaces/interface={:s}/service-limits' str_path = PATH_TMPL.format(if_name) return str_path, {} data = { 'interface': if_name } for key, value in resource_value.items(): if key not in SERVICE_LIMIT_NAMES: continue if key == 'latency': config = { 'delay': value['delay'], 'jitter': value.get('jitter', ''), 'correlation': value.get('correlation', '') } elif key == 'rate': config = { 'size': value['rate'] } else: config = { 'percent': value['percent'], 'correlation': value.get('correlation', ''), } data[key] = config str_path = '/interfaces/interface={:s}/service-limits'.format(if_name) return str_path, data def parse(self, json_data : Dict): # TODO Implement return []