Skip to content
Interfaces.py 7.93 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, lxml.etree as ET
from typing     import Any, Dict, List, Tuple
from .Namespace import NAMESPACES
from .Tools     import add_value_from_tag

LOGGER = logging.getLogger(__name__)

XPATH_INTERFACES    = "//oci:interfaces/oci:interface"
XPATH_SUBINTERFACES = ".//oci:subinterfaces/oci:subinterface"
XPATH_IPV4ADDRESSES = ".//ociip:ipv4/ociip:addresses/ociip:address"
Pablo Armingol's avatar
Pablo Armingol committed
XPATH_IPV6ADDRESSES = ".//ociip:ipv6/ociip:addresses/ociip:address"

def parse(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]:
    response = []
    for xml_interface in xml_data.xpath(XPATH_INTERFACES, namespaces=NAMESPACES):
        #LOGGER.info('xml_interface = {:s}'.format(str(ET.tostring(xml_interface))))
        #interface_type = xml_interface.find('oci:config/oci:type', namespaces=NAMESPACES)
        #add_value_from_tag(interface, 'type', interface_type)

        if xml_interface.find('oci:config/oci:type', namespaces=NAMESPACES) is not None:
            interface_type = xml_interface.find('oci:config/oci:type', namespaces=NAMESPACES)
        elif xml_interface.find('oci:state/oci:type', namespaces=NAMESPACES) is not None:
            interface_type = xml_interface.find('oci:state/oci:type', namespaces=NAMESPACES)
Pablo Armingol's avatar
Pablo Armingol committed
        else: continue
            
        interface_name = xml_interface.find('oci:name', namespaces=NAMESPACES)
        if interface_name is None or interface_name.text is None: continue
        add_value_from_tag(interface, 'name', interface_name)
            
        # Get the type of interface according to the vendor's type
        if 'ianaift:' in interface_type.text:
            interface_type.text = interface_type.text.replace('ianaift:', '')                       #ADVA
        elif 'idx'in interface_type.text:
            interface_type.text = interface_type.text.replace('idx:', '')                           #CISCO
PabloArmingolRobles's avatar
PabloArmingolRobles committed
        add_value_from_tag(interface, 'type', interface_type)

        interface_mtu = xml_interface.find('oci:config/oci:mtu', namespaces=NAMESPACES)
        add_value_from_tag(interface, 'mtu', interface_mtu, cast=int)

        interface_description = xml_interface.find('oci:config/oci:description', namespaces=NAMESPACES)
        add_value_from_tag(interface, 'description', interface_description)

        for xml_subinterface in xml_interface.xpath(XPATH_SUBINTERFACES, namespaces=NAMESPACES):
            #LOGGER.info('xml_subinterface = {:s}'.format(str(ET.tostring(xml_subinterface))))
            add_value_from_tag(subinterface, 'name', interface_name)
            add_value_from_tag(subinterface, 'mtu', interface_mtu)
PabloArmingolRobles's avatar
PabloArmingolRobles committed
            add_value_from_tag(subinterface, 'type', interface_type)

            subinterface_index = xml_subinterface.find('oci:index', namespaces=NAMESPACES)
            if subinterface_index is None or subinterface_index.text is None: continue
            add_value_from_tag(subinterface, 'index', subinterface_index, cast=int)
            vlan_id = xml_subinterface.find('ocv:vlan/ocv:match/ocv:single-tagged/ocv:config/ocv:vlan-id', namespaces=NAMESPACES)
            add_value_from_tag(subinterface, 'vlan_id', vlan_id, cast=int)

            # TODO: implement support for multiple IP addresses per subinterface
            #ipv4_addresses = []
            for xml_ipv4_address in xml_subinterface.xpath(XPATH_IPV4ADDRESSES, namespaces=NAMESPACES):
                #LOGGER.info('xml_ipv4_address = {:s}'.format(str(ET.tostring(xml_ipv4_address))))

                #ipv4_address = {}

                #origin = xml_ipv4_address.find('ociip:state/ociip:origin', namespaces=NAMESPACES)
                #add_value_from_tag(ipv4_address, 'origin', origin)

                address = xml_ipv4_address.find('ociip:state/ociip:ip', namespaces=NAMESPACES)
                #add_value_from_tag(ipv4_address, 'ip', address)
                add_value_from_tag(subinterface, 'address_ip', address)

                prefix = xml_ipv4_address.find('ociip:state/ociip:prefix-length', namespaces=NAMESPACES)
                #add_value_from_tag(ipv4_address, 'prefix_length', prefix)
                add_value_from_tag(subinterface, 'address_prefix', prefix, cast=int)

                #if len(ipv4_address) == 0: continue
                #ipv4_addresses.append(ipv4_address)

            #add_value_from_collection(subinterface, 'ipv4_addresses', ipv4_addresses)
            for xml_ipv6_address in xml_subinterface.xpath(XPATH_IPV6ADDRESSES, namespaces=NAMESPACES):
                #LOGGER.info('xml_ipv6_address = {:s}'.format(str(ET.tostring(xml_ipv6_address))))

                address = xml_ipv6_address.find('ociip:state/ociip:ip', namespaces=NAMESPACES)
                add_value_from_tag(subinterface, 'address_ipv6', address)
                
                prefix = xml_ipv6_address.find('ociip:state/ociip:prefix-length', namespaces=NAMESPACES)
                add_value_from_tag(subinterface, 'address_prefix_v6', prefix, cast=int)
                
            if len(subinterface) == 0: continue
            resource_key = '/interface[{:s}]/subinterface[{:s}]'.format(interface['name'], str(subinterface['index']))
            response.append((resource_key, subinterface))

        if len(interface) == 0: continue
        response.append(('/interface[{:s}]'.format(interface['name']), interface))

def parse_counters(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]:
    response = []
    for xml_interface in xml_data.xpath(XPATH_INTERFACES, namespaces=NAMESPACES):
        #LOGGER.info('[parse_counters] xml_interface = {:s}'.format(str(ET.tostring(xml_interface))))

        interface = {}

        interface_name = xml_interface.find('oci:name', namespaces=NAMESPACES)
        if interface_name is None or interface_name.text is None: continue
        add_value_from_tag(interface, 'name', interface_name)

        interface_in_pkts = xml_interface.find('oci:state/oci:counters/oci:in-pkts', namespaces=NAMESPACES)
        add_value_from_tag(interface, 'in-pkts', interface_in_pkts, cast=int)

        interface_in_octets = xml_interface.find('oci:state/oci:counters/oci:in-octets', namespaces=NAMESPACES)
        add_value_from_tag(interface, 'in-octets', interface_in_octets, cast=int)

        interface_in_errors = xml_interface.find('oci:state/oci:counters/oci:in-errors', namespaces=NAMESPACES)
        add_value_from_tag(interface, 'in-errors', interface_in_errors, cast=int)

        interface_out_octets = xml_interface.find('oci:state/oci:counters/oci:out-octets', namespaces=NAMESPACES)
        add_value_from_tag(interface, 'out-octets', interface_out_octets, cast=int)

        interface_out_pkts = xml_interface.find('oci:state/oci:counters/oci:out-pkts', namespaces=NAMESPACES)
        add_value_from_tag(interface, 'out-pkts', interface_out_pkts, cast=int)

        interface_out_errors = xml_interface.find('oci:state/oci:counters/oci:out-errors', namespaces=NAMESPACES)
        add_value_from_tag(interface, 'out-errors', interface_out_errors, cast=int)

        interface_out_discards = xml_interface.find('oci:state/oci:counters/oci:out-discards', namespaces=NAMESPACES)
        add_value_from_tag(interface, 'out-discards', interface_out_discards, cast=int)

        #LOGGER.info('[parse_counters] interface = {:s}'.format(str(interface)))

        if len(interface) == 0: continue
        response.append(('/interface[{:s}]'.format(interface['name']), interface))