Skip to content
EthService.py 4.17 KiB
Newer Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import enum
from typing import Dict, List, Tuple

OSU_TUNNEL_URL = '/restconf/data/ietf-te:tunnel'

class BandwidthProfileTypeEnum(enum.Enum):
    MEF_10_BWP = 'ietf-eth-tran-types:mef-10-bwp'

class EndpointLayerSpecificAccessTypeEnum(enum.Enum):
    PORT = 'port'

class EndpointProtectionRoleEnum(enum.Enum):
    WORK = 'work'

class OptimizationMetricRole(enum.Enum):
    WORK = 'work'

class OptimizationMetricType(enum.Enum):
    PATH_METRIC_TE = 'ietf-te-types:path-metric-te'

class OuterTagTypeEnum(enum.Enum):
    CLASSIFY_C_VLAN = 'ietf-eth-tran-types:classify-c-vlan'

class ServiceClassificationTypeEnum(enum.Enum):
    VLAN_CLASSIFICATION = 'ietf-eth-tran-type:vlan-classification'

class ServiceTypeEnum(enum.Enum):
    MP2MP = 'op-mp2mp-svc'
    P2MP  = 'op-p2mp-svc'

def compose_outer_tag(tag_type : OuterTagTypeEnum, vlan_value : int) -> Dict:
    return {'tag-type': tag_type.value, 'vlan-value': vlan_value}

def compose_ingress_egress_bandwidth_profile() -> Dict:
    return {
        'bandwidth-profile-type': BandwidthProfileTypeEnum.MEF_10_BWP.value,
        'CIR': 10_000_000,
        'EIR': 10_000_000,
    }

def compose_layer_specific_access_type() -> Dict:
    return {'access-type': EndpointLayerSpecificAccessTypeEnum.PORT.value}

def compose_static_route(prefix : str, mask : int, next_hop : str) -> Dict:
    return {'destination': prefix, 'destination-mask': mask, 'next-hop': next_hop}

def compose_static_route_list(static_routes : List[Tuple[str, int, str]]) -> List[Dict]:
    return [
        compose_static_route(prefix, mask, next_hop)
        for prefix, mask, next_hop in static_routes
    ]

def compose_etht_service_endpoint(
    node_id : str, tp_id : str, vlan_value : int, static_routes : List[Tuple[str, int, str]] = list()
) -> Dict:
    return {
        'node-id'           : node_id,
        'tp-id'             : tp_id,
        'protection-role'   : EndpointProtectionRoleEnum.WORK.value,
        'layer-specific'    : compose_layer_specific_access_type,
        'is-extendable'     : False,
        'is-terminal'       : True,
        'static-route-list' : compose_static_route_list(static_routes),
        'outer-tag'         : compose_outer_tag(OuterTagTypeEnum.CLASSIFY_C_VLAN, vlan_value),
        'service-classification-type'     : ServiceClassificationTypeEnum.VLAN_CLASSIFICATION.value,
        'ingress-egress-bandwidth-profile': compose_ingress_egress_bandwidth_profile(),
    }

def compose_optimizations() -> Dict:
    return {'optimization-metric': [{
        'metric-role': OptimizationMetricRole.WORK.value,
        'metric-type': OptimizationMetricType.PATH_METRIC_TE.value,
    }]}

def compose_etht_service(
    name : str, service_type : ServiceTypeEnum, osu_tunnel_name : str,
    src_node_id : str, src_tp_id : str, src_vlan_tag : int, dst_node_id : str, dst_tp_id : str, dst_vlan_tag : int,
    src_static_routes : List[Tuple[str, int, str]] = list(), dst_static_routes : List[Tuple[str, int, str]] = list()
) -> Dict:
    return {'ietf-eth-tran-service:etht-svc': {'etht-svc-instances': [{
        'etht-svc-name' : name.lower(),
        'etht-svc-title': name.upper(),
        'etht-svc-type' : service_type.value,
        'source-endpoints': {'source-endpoint': [
            compose_etht_service_endpoint(src_node_id, src_tp_id, src_vlan_tag, src_static_routes),
        ]},
        'destination-endpoints': {'destination-endpoint': [
            compose_etht_service_endpoint(dst_node_id, dst_tp_id, dst_vlan_tag, dst_static_routes),
        ]},
        'svc-tunnel': [{'tunnel-name': osu_tunnel_name}],
        'optimizations': compose_optimizations(),
    }]}}