Skip to content
Interface.py 10.2 KiB
Newer Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json, libyang, logging
from typing import Any, Dict, List, Tuple
from ._Handler import _Handler
from .Tools import get_bool, get_int, get_str
from .YangHandler import YangHandler

LOGGER = logging.getLogger(__name__)

class InterfaceHandler(_Handler):
    def get_resource_key(self) -> str: return '/interface'
    def get_path(self) -> str: return '/openconfig-interfaces:interfaces'
    def compose(
        self, resource_key : str, resource_value : Dict, yang_handler : YangHandler, delete : bool = False
    ) -> Tuple[str, str]:
        if_name   = get_str(resource_value, 'name'           )  # ethernet-1/1
        sif_index = get_int(resource_value, 'sub_if_index', 0)  # 0

        if delete:
            PATH_TMPL = '/interfaces/interface[name={:s}]/subinterfaces/subinterface[index={:d}]'
            str_path = PATH_TMPL.format(if_name, sif_index)
            str_data = json.dumps({})
            return str_path, str_data

        if_enabled       = get_bool(resource_value, 'enabled',             True)    # True/False
        sif_enabled      = get_bool(resource_value, 'sub_if_enabled',      True)    # True/False
        sif_vlan_id      = get_int (resource_value, 'sif_vlan_id',             )    # 127
        sif_ipv4_enabled = get_bool(resource_value, 'sub_if_ipv4_enabled', True)    # True/False
        sif_ipv4_address = get_str (resource_value, 'sub_if_ipv4_address'      )    # 172.16.0.1
        sif_ipv4_prefix  = get_int (resource_value, 'sub_if_ipv4_prefix'       )    # 24

        yang_ifs : libyang.DContainer = yang_handler.get_data_path('/openconfig-interfaces:interfaces')
        yang_if_path = 'interface[name="{:s}"]'.format(if_name)
        yang_if : libyang.DContainer = yang_ifs.create_path(yang_if_path)
        yang_if.create_path('config/name',    if_name   )
        if if_enabled is not None: yang_if.create_path('config/enabled', if_enabled)

        yang_sifs : libyang.DContainer = yang_if.create_path('subinterfaces')
        yang_sif_path = 'subinterface[index="{:d}"]'.format(sif_index)
        yang_sif : libyang.DContainer = yang_sifs.create_path(yang_sif_path)
        yang_sif.create_path('config/index', sif_index)
        if sif_enabled is not None: yang_sif.create_path('config/enabled', sif_enabled)

        if sif_vlan_id is not None:
            yang_subif_vlan : libyang.DContainer = yang_sif.create_path('openconfig-vlan:vlan')
            yang_subif_vlan.create_path('match/single-tagged/config/vlan-id', sif_vlan_id)

        yang_ipv4 : libyang.DContainer = yang_sif.create_path('openconfig-if-ip:ipv4')
        if sif_ipv4_enabled is not None: yang_ipv4.create_path('config/enabled', sif_ipv4_enabled)

        if sif_ipv4_address is not None:
            yang_ipv4_addrs : libyang.DContainer = yang_ipv4.create_path('addresses')
            yang_ipv4_addr_path = 'address[ip="{:s}"]'.format(sif_ipv4_address)
            yang_ipv4_addr : libyang.DContainer = yang_ipv4_addrs.create_path(yang_ipv4_addr_path)
            yang_ipv4_addr.create_path('config/ip',            sif_ipv4_address)
            yang_ipv4_addr.create_path('config/prefix-length', sif_ipv4_prefix )

        str_path = '/interfaces/interface[name={:s}]'.format(if_name)
        str_data = yang_if.print_mem('json')
        json_data = json.loads(str_data)
        json_data = json_data['openconfig-interfaces:interface'][0]
        str_data = json.dumps(json_data)
        return str_path, str_data

    def parse(
        self, json_data : Dict, yang_handler : YangHandler
    ) -> List[Tuple[str, Dict[str, Any]]]:
        LOGGER.debug('json_data = {:s}'.format(json.dumps(json_data)))

        yang_interfaces_path = self.get_path()
        json_data_valid = yang_handler.parse_to_dict(yang_interfaces_path, json_data, fmt='json')
        for interface in json_data_valid['interfaces']['interface']:
            LOGGER.debug('interface={:s}'.format(str(interface)))

            interface_name = interface['name']
            interface_config = interface.get('config', {})

            #yang_interfaces : libyang.DContainer = yang_handler.get_data_path(yang_interfaces_path)
            #yang_interface_path = 'interface[name="{:s}"]'.format(interface_name)
            #yang_interface : libyang.DContainer = yang_interfaces.create_path(yang_interface_path)
            #yang_interface.merge_data_dict(interface, strict=True, validate=False)

            interface_state = interface.get('state', {})
            interface_type = interface_state.get('type')
            if interface_type is None: continue
            interface_type = interface_type.split(':')[-1]
            if interface_type not in {'ethernetCsmacd'}: continue

            _interface = {
                'name'         : interface_name,
                'type'         : interface_type,
                'mtu'          : interface_state['mtu'],
                'ifindex'      : interface_state['ifindex'],
                'admin-status' : interface_state['admin-status'],
                'oper-status'  : interface_state['oper-status'],
                'management'   : interface_state['management'],
            }
            if 'description' in interface_config:
                _interface['description'] = interface_config['description']
            if 'enabled' in interface_config:
                _interface['enabled'] = interface_config['enabled']
            if 'hardware-port' in interface_state:
                _interface['hardware-port'] = interface_state['hardware-port']
            if 'transceiver' in interface_state:
                _interface['transceiver'] = interface_state['transceiver']

            entry_interface_key = '/interface[{:s}]'.format(interface_name)
            entries.append((entry_interface_key, _interface))

            if interface_type == 'ethernetCsmacd':
                ethernet_state = interface['ethernet']['state']

                _ethernet = {
                    'mac-address'           : ethernet_state['mac-address'],
                    'hw-mac-address'        : ethernet_state['hw-mac-address'],
                    'port-speed'            : ethernet_state['port-speed'].split(':')[-1],
                    'negotiated-port-speed' : ethernet_state['negotiated-port-speed'].split(':')[-1],
                }
                entry_ethernet_key = '{:s}/ethernet'.format(entry_interface_key)
                entries.append((entry_ethernet_key, _ethernet))

            subinterfaces = interface.get('subinterfaces', {}).get('subinterface', [])
            for subinterface in subinterfaces:
                LOGGER.debug('subinterface={:s}'.format(str(subinterface)))

                subinterface_index = subinterface['index']
                subinterface_state = subinterface.get('state', {})

                _subinterface = {'index': subinterface_index}
                if 'name' in subinterface_state:
                    _subinterface['name'] = subinterface_state['name']
                if 'enabled' in subinterface_state:
                    _subinterface['enabled'] = subinterface_state['enabled']
                entry_subinterface_key = '{:s}/subinterface[{:d}]'.format(entry_interface_key, subinterface_index)
                entries.append((entry_subinterface_key, _subinterface))

                if 'vlan' in subinterface:
                    vlan = subinterface['vlan']
                    vlan_match = vlan['match']

                    single_tagged = vlan_match.pop('single-tagged', None)
                    if single_tagged is not None:
                        single_tagged_config = single_tagged['config']
                        vlan_id = single_tagged_config['vlan-id']

                        _vlan = {'vlan_id': vlan_id}
                        entry_vlan_key = '{:s}/vlan[single:{:s}]'.format(entry_subinterface_key, vlan_id)
                        entries.append((entry_vlan_key, _vlan))

                    if len(vlan_match) > 0:
                        raise Exception('Unsupported VLAN schema: {:s}'.format(str(vlan)))

                ipv4_addresses = subinterface.get('ipv4', {}).get('addresses', {}).get('address', [])
                for ipv4_address in ipv4_addresses:
                    LOGGER.debug('ipv4_address={:s}'.format(str(ipv4_address)))

                    ipv4_address_ip = ipv4_address['ip']
                    ipv4_address_state = ipv4_address.get('state', {})

                    _ipv4_address = {'ip': ipv4_address_ip}
                    if 'origin' in ipv4_address_state:
                        _ipv4_address['origin'] = ipv4_address_state['origin']
                    if 'prefix-length' in ipv4_address_state:
                        _ipv4_address['prefix'] = ipv4_address_state['prefix-length']

                    entry_ipv4_address_key = '{:s}/ipv4[{:s}]'.format(entry_subinterface_key, ipv4_address_ip)
                    entries.append((entry_ipv4_address_key, _ipv4_address))

                ipv6_addresses = subinterface.get('ipv6', {}).get('addresses', {}).get('address', [])
                for ipv6_address in ipv6_addresses:
                    LOGGER.debug('ipv6_address={:s}'.format(str(ipv6_address)))

                    ipv6_address_ip = ipv6_address['ip']
                    ipv6_address_state = ipv6_address.get('state', {})

                    _ipv6_address = {'ip': ipv6_address_ip}
                    if 'origin' in ipv6_address_state:
                        _ipv6_address['origin'] = ipv6_address_state['origin']
                    if 'prefix-length' in ipv6_address_state:
                        _ipv6_address['prefix'] = ipv6_address_state['prefix-length']

                    entry_ipv6_address_key = '{:s}/ipv6[{:s}]'.format(entry_subinterface_key, ipv6_address_ip)
                    entries.append((entry_ipv6_address_key, _ipv6_address))