Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
ConfigRules.py 3.07 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    # RFC 8466 - L2VPN Service Model (L2SM)
    # Ref: https://datatracker.ietf.org/doc/html/rfc8466
    
    
    import json
    from typing import Any, Dict, Tuple
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.proto.context_pb2 import ConfigActionEnum
    
    from common.tools.grpc.Tools import grpc_message_to_json_string
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    def update_config_rule_custom(
        config_rules, resource_key : str, fields : Dict[str, Tuple[Any, bool]],
        new_action : ConfigActionEnum = ConfigActionEnum.CONFIGACTION_SET
    ) -> None:
    
        # fields: Dict[field_name : str, Tuple[field_value : Any, raise_if_differs : bool]]
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        # TODO: add support for ACL config rules
    
    
        for config_rule in config_rules:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            kind = config_rule.WhichOneof('config_rule')
            if kind != 'custom': continue
    
            if config_rule.custom.resource_key != resource_key: continue
            json_resource_value = json.loads(config_rule.custom.resource_value)
            break   # found, end loop
        else:
            # not found, add it
            config_rule = config_rules.add()    # pylint: disable=no-member
            config_rule.custom.resource_key = resource_key
            json_resource_value = {}
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        config_rule.action = new_action
    
    
        for field_name,(field_value, raise_if_differs) in fields.items():
            if (field_name not in json_resource_value) or not raise_if_differs:
                # missing or raise_if_differs=False, add/update it
                json_resource_value[field_name] = field_value
            elif json_resource_value[field_name] != field_value:
                # exists, differs, and raise_if_differs=True
                msg = 'Specified {:s}({:s}) differs existing value({:s})'
                raise Exception(msg.format(str(field_name), str(field_value), str(json_resource_value[field_name])))
    
        config_rule.custom.resource_value = json.dumps(json_resource_value, sort_keys=True)
    
    def copy_config_rules(source_config_rules, target_config_rules):
        for source_config_rule in source_config_rules:
            config_rule_kind = source_config_rule.WhichOneof('config_rule')
            if config_rule_kind == 'custom':
                custom = source_config_rule.custom
                resource_key = custom.resource_key
                resource_value = json.loads(custom.resource_value)
                raise_if_differs = True
                fields = {name:(value, raise_if_differs) for name,value in resource_value.items()}
                update_config_rule_custom(target_config_rules, resource_key, fields)
    
            else:
                raise NotImplementedError('ConfigRule({:s})'.format(grpc_message_to_json_string(source_config_rule)))