Loading src/device/service/drivers/openconfig/templates/__init__.py +20 −18 Original line number Diff line number Diff line Loading @@ -21,14 +21,6 @@ import paramiko from .Tools import generate_templates from device.service.driver_api._Driver import ( RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES, RESOURCE_ROUTING_POLICIES, RESOURCE_ACL, RESOURCE_INVENTORY) from .EndPoints import parse as parse_endpoints from .Interfaces import parse as parse_interfaces, parse_counters from .NetworkInstances import parse as parse_network_instances from .RoutingPolicy import parse as parse_routing_policy from .Acl import parse as parse_acl from .Inventory import parse as parse_inventory from .acl.acl_adapter import acl_cr_to_dict from .acl.acl_adapter_ipinfusion_proprietary import acl_cr_to_dict_ipinfusion_proprietary from inter_device_translation import get_dict_helper LOGGER = logging.getLogger(__name__) Loading @@ -51,15 +43,7 @@ RESOURCE_KEY_MAPPINGS = { RESOURCE_ACL : 'acl', } RESOURCE_PARSERS = { 'inventory' : parse_inventory, 'component' : parse_endpoints, 'interface' : parse_interfaces, 'network_instance': parse_network_instances, 'routing_policy' : parse_routing_policy, 'interfaces/interface/state/counters': parse_counters, 'acl' : parse_acl, } RESOURCE_PARSERS = None # will be initialized after dependent imports LOGGER = logging.getLogger(__name__) RE_REMOVE_FILTERS = re.compile(r'\[[^\]]+\]') Loading @@ -67,7 +51,7 @@ RE_REMOVE_FILTERS_2 = re.compile(r'\/[a-z]+:') EMPTY_CONFIG = '<config></config>' EMPTY_FILTER = '<filter></filter>' def get_dict_object(device_type: str, object_name: str, plural: bool = False) -> str: def get_dict_object(object_name: str, device_type: str, plural: bool = False) -> str: dict_helper = get_dict_helper('oc' if device_type is None else device_type) return dict_helper.get_object(object_name, plural=plural) Loading @@ -90,6 +74,24 @@ JINJA_ENV.filters['dict_object'] = get_dict_object from .EndPoints import parse as parse_endpoints from .Interfaces import parse as parse_interfaces, parse_counters from .NetworkInstances import parse as parse_network_instances from .RoutingPolicy import parse as parse_routing_policy from .Acl import parse as parse_acl from .Inventory import parse as parse_inventory from .acl.acl_adapter import acl_cr_to_dict from .acl.acl_adapter_ipinfusion_proprietary import acl_cr_to_dict_ipinfusion_proprietary RESOURCE_PARSERS = { 'inventory' : parse_inventory, 'component' : parse_endpoints, 'interface' : parse_interfaces, 'network_instance': parse_network_instances, 'routing_policy' : parse_routing_policy, 'interfaces/interface/state/counters': parse_counters, 'acl' : parse_acl, } def get_filter(resource_key : str, device_type: str = None): resource_key = RESOURCE_KEY_MAPPINGS.get(resource_key, resource_key) resource_key = RE_REMOVE_FILTERS.sub('', resource_key) Loading src/inter_device_translation/__init__.py +18 −9 Original line number Diff line number Diff line Loading @@ -46,19 +46,28 @@ class DictHelper: return path path_segments = path.split('/') required_namespaces = len([s for s in path_segments if len(s) > 1]) # Only segments without explicit namespace prefixes require one segments_requiring_ns = [s for s in path_segments if len(s) > 1 and ':' not in s] if type(namespace) == list and len(namespace) != required_namespaces: if isinstance(namespace, list): if len(namespace) != len(segments_requiring_ns): raise Exception(f'Number of namespaces do not match the specified path : {path}') elif type(namespace) == str: namespace = [namespace for _ in range(required_namespaces)] namespace_list = namespace elif isinstance(namespace, str): namespace_list = [namespace for _ in range(len(segments_requiring_ns))] else: namespace_list = [] ns_i = 0 processed_segments = [] for s in path_segments: if len(s) > 1: processed_segments.append(f'{namespace[ns_i]}:{s}') if ':' in s: # Segment already includes a namespace prefix; leave as is processed_segments.append(s) else: ns = namespace_list[ns_i] if ns_i < len(namespace_list) else None processed_segments.append(f'{ns}:{s}' if ns else s) ns_i += 1 else: processed_segments.append(s) Loading src/load_generator/requirements.in +1 −0 Original line number Diff line number Diff line Loading @@ -13,3 +13,4 @@ # limitations under the License. APScheduler>=3.10.4 pytz Loading
src/device/service/drivers/openconfig/templates/__init__.py +20 −18 Original line number Diff line number Diff line Loading @@ -21,14 +21,6 @@ import paramiko from .Tools import generate_templates from device.service.driver_api._Driver import ( RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES, RESOURCE_ROUTING_POLICIES, RESOURCE_ACL, RESOURCE_INVENTORY) from .EndPoints import parse as parse_endpoints from .Interfaces import parse as parse_interfaces, parse_counters from .NetworkInstances import parse as parse_network_instances from .RoutingPolicy import parse as parse_routing_policy from .Acl import parse as parse_acl from .Inventory import parse as parse_inventory from .acl.acl_adapter import acl_cr_to_dict from .acl.acl_adapter_ipinfusion_proprietary import acl_cr_to_dict_ipinfusion_proprietary from inter_device_translation import get_dict_helper LOGGER = logging.getLogger(__name__) Loading @@ -51,15 +43,7 @@ RESOURCE_KEY_MAPPINGS = { RESOURCE_ACL : 'acl', } RESOURCE_PARSERS = { 'inventory' : parse_inventory, 'component' : parse_endpoints, 'interface' : parse_interfaces, 'network_instance': parse_network_instances, 'routing_policy' : parse_routing_policy, 'interfaces/interface/state/counters': parse_counters, 'acl' : parse_acl, } RESOURCE_PARSERS = None # will be initialized after dependent imports LOGGER = logging.getLogger(__name__) RE_REMOVE_FILTERS = re.compile(r'\[[^\]]+\]') Loading @@ -67,7 +51,7 @@ RE_REMOVE_FILTERS_2 = re.compile(r'\/[a-z]+:') EMPTY_CONFIG = '<config></config>' EMPTY_FILTER = '<filter></filter>' def get_dict_object(device_type: str, object_name: str, plural: bool = False) -> str: def get_dict_object(object_name: str, device_type: str, plural: bool = False) -> str: dict_helper = get_dict_helper('oc' if device_type is None else device_type) return dict_helper.get_object(object_name, plural=plural) Loading @@ -90,6 +74,24 @@ JINJA_ENV.filters['dict_object'] = get_dict_object from .EndPoints import parse as parse_endpoints from .Interfaces import parse as parse_interfaces, parse_counters from .NetworkInstances import parse as parse_network_instances from .RoutingPolicy import parse as parse_routing_policy from .Acl import parse as parse_acl from .Inventory import parse as parse_inventory from .acl.acl_adapter import acl_cr_to_dict from .acl.acl_adapter_ipinfusion_proprietary import acl_cr_to_dict_ipinfusion_proprietary RESOURCE_PARSERS = { 'inventory' : parse_inventory, 'component' : parse_endpoints, 'interface' : parse_interfaces, 'network_instance': parse_network_instances, 'routing_policy' : parse_routing_policy, 'interfaces/interface/state/counters': parse_counters, 'acl' : parse_acl, } def get_filter(resource_key : str, device_type: str = None): resource_key = RESOURCE_KEY_MAPPINGS.get(resource_key, resource_key) resource_key = RE_REMOVE_FILTERS.sub('', resource_key) Loading
src/inter_device_translation/__init__.py +18 −9 Original line number Diff line number Diff line Loading @@ -46,19 +46,28 @@ class DictHelper: return path path_segments = path.split('/') required_namespaces = len([s for s in path_segments if len(s) > 1]) # Only segments without explicit namespace prefixes require one segments_requiring_ns = [s for s in path_segments if len(s) > 1 and ':' not in s] if type(namespace) == list and len(namespace) != required_namespaces: if isinstance(namespace, list): if len(namespace) != len(segments_requiring_ns): raise Exception(f'Number of namespaces do not match the specified path : {path}') elif type(namespace) == str: namespace = [namespace for _ in range(required_namespaces)] namespace_list = namespace elif isinstance(namespace, str): namespace_list = [namespace for _ in range(len(segments_requiring_ns))] else: namespace_list = [] ns_i = 0 processed_segments = [] for s in path_segments: if len(s) > 1: processed_segments.append(f'{namespace[ns_i]}:{s}') if ':' in s: # Segment already includes a namespace prefix; leave as is processed_segments.append(s) else: ns = namespace_list[ns_i] if ns_i < len(namespace_list) else None processed_segments.append(f'{ns}:{s}' if ns else s) ns_i += 1 else: processed_segments.append(s) Loading
src/load_generator/requirements.in +1 −0 Original line number Diff line number Diff line Loading @@ -13,3 +13,4 @@ # limitations under the License. APScheduler>=3.10.4 pytz