Skip to content
RoutingPolicy.py 5.1 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy, logging, lxml.etree as ET
from typing import Any, Dict, List, Tuple
from .Namespace import NAMESPACES
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .Tools import add_value_from_tag

LOGGER = logging.getLogger(__name__)

XPATH_POLICY_DEFINITIONS = "//ocrp:routing-policy/ocrp:policy-definitions/ocrp:policy-definition"
XPATH_PD_STATEMENTS      = ".//ocrp:statements/ocrp:statement"
XPATH_PD_ST_CONDITIONS   = ".//ocrp:conditions/ocbp:bgp-conditions/ocbp:match-ext-community-set"
XPATH_PD_ST_ACTIONS      = ".//ocrp:actions"

XPATH_BGP_EXT_COMMUN_SET = "//ocrp:routing-policy/ocrp:defined-sets/ocbp:bgp-defined-sets/" + \
                           "ocbp:ext-community-sets/ocbp:ext-community-set"

def parse(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]:
    #LOGGER.info('[RoutePolicy] xml_data = {:s}'.format(str(ET.tostring(xml_data))))

    response = []
    for xml_policy_definition in xml_data.xpath(XPATH_POLICY_DEFINITIONS, namespaces=NAMESPACES):
        #LOGGER.info('xml_policy_definition = {:s}'.format(str(ET.tostring(xml_policy_definition))))

        policy_definition = {}
Pablo Armingol's avatar
Pablo Armingol committed
        statement_name = ''
        policy_name = xml_policy_definition.find('ocrp:name', namespaces=NAMESPACES)
        if policy_name is None or policy_name.text is None: continue
        add_value_from_tag(policy_definition, 'policy_name', policy_name)

        resource_key = '/routing_policy/policy_definition[{:s}]'.format(policy_definition['policy_name'])
        response.append((resource_key, copy.deepcopy(policy_definition)))

        for xml_statement in xml_policy_definition.xpath(XPATH_PD_STATEMENTS, namespaces=NAMESPACES):
            statement_name = xml_statement.find('ocrp:name', namespaces=NAMESPACES)
            if len(statement_name) != 0:                                                                        #FIX: In case there is a route policy defined without a statement name
                add_value_from_tag(policy_definition, 'statement_name', statement_name)

            for xml_condition in xml_statement.xpath(XPATH_PD_ST_CONDITIONS, namespaces=NAMESPACES):
                ext_community_set_name = xml_condition.find('ocbp:config/ocbp:ext-community-set', namespaces=NAMESPACES)
                add_value_from_tag(policy_definition, 'ext_community_set_name', ext_community_set_name)

                match_set_options = xml_condition.find('ocbp:config/ocbp:match-set-options', namespaces=NAMESPACES)
                add_value_from_tag(policy_definition, 'match_set_options', match_set_options)

            for xml_action in xml_statement.xpath(XPATH_PD_ST_ACTIONS, namespaces=NAMESPACES):
                policy_result = xml_action.find('ocbp:config/ocbp:policy-result', namespaces=NAMESPACES)
                add_value_from_tag(policy_definition, 'policy_result', policy_result)

        if len(statement_name) != 0:                                                                            #FIX: In case there is a route policy defined without a statement name
            resource_key = '/routing_policy/policy_definition[{:s}]/statement[{:s}]'.format(
                policy_definition['policy_name'], policy_definition['statement_name'])
            response.append((resource_key, copy.deepcopy(policy_definition)))

    for xml_bgp_ext_community_set in xml_data.xpath(XPATH_BGP_EXT_COMMUN_SET, namespaces=NAMESPACES):
        #LOGGER.info('xml_bgp_ext_community_set = {:s}'.format(str(ET.tostring(xml_bgp_ext_community_set))))

        bgp_ext_community_set = {}

        ext_community_set_name = xml_bgp_ext_community_set.find('ocbp:ext-community-set-name', namespaces=NAMESPACES)
        if ext_community_set_name is None or ext_community_set_name.text is None: continue
        add_value_from_tag(bgp_ext_community_set, 'ext_community_set_name', ext_community_set_name)

        resource_key = '/routing_policy/bgp_defined_set[{:s}]'.format(bgp_ext_community_set['ext_community_set_name'])
        response.append((resource_key, copy.deepcopy(bgp_ext_community_set)))

        ext_community_member = xml_bgp_ext_community_set.find('ocbp:config/ocbp:ext-community-member', namespaces=NAMESPACES)
        if ext_community_member is not None and ext_community_member.text is not None:
            add_value_from_tag(bgp_ext_community_set, 'ext_community_member', ext_community_member)

            resource_key = '/routing_policy/bgp_defined_set[{:s}][{:s}]'.format(
                bgp_ext_community_set['ext_community_set_name'], bgp_ext_community_set['ext_community_member'])
            response.append((resource_key, copy.deepcopy(bgp_ext_community_set)))

    return response