Commit c1ea3b73 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

NBI component - IETF L2 VPN connector:

- Migrated to libyang-based validator
- Replaced old schemas by YANG data models
- Migrated endpoints
- Ongoing: migrate handlers
parent 80133447
Loading
Loading
Loading
Loading
+304 −0
Original line number Diff line number Diff line
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, netaddr
from typing import Dict, List, Optional, Tuple
from common.Constants import DEFAULT_CONTEXT_NAME
from common.proto.context_pb2 import Service, ServiceStatusEnum, ServiceTypeEnum
from common.tools.context_queries.Service import get_service_by_uuid
from common.tools.grpc.ConfigRules import update_config_rule_custom
from common.tools.grpc.Constraints import (
    update_constraint_custom_dict, update_constraint_endpoint_location,
    update_constraint_endpoint_priority, update_constraint_sla_availability,
    update_constraint_sla_capacity, update_constraint_sla_latency,
)
from common.tools.grpc.EndPointIds import update_endpoint_ids
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from service.client.ServiceClient import ServiceClient
from nbi.service._tools.Authentication import HTTP_AUTH
from nbi.service._tools.HttpStatusCodes import HTTP_NOCONTENT, HTTP_SERVERERROR
from .Constants import BEARER_MAPPINGS, DEFAULT_ADDRESS_FAMILIES, DEFAULT_BGP_AS, DEFAULT_BGP_ROUTE_TARGET, DEFAULT_MTU

LOGGER = logging.getLogger(__name__)

def create_service(
    service_uuid : str, context_uuid : Optional[str] = DEFAULT_CONTEXT_NAME
) -> Optional[Exception]:
    # pylint: disable=no-member
    service_request = Service()
    service_request.service_id.context_id.context_uuid.uuid = context_uuid
    service_request.service_id.service_uuid.uuid = service_uuid
    service_request.service_type = ServiceTypeEnum.SERVICETYPE_L2NM
    service_request.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED

    try:
        service_client = ServiceClient()
        service_client.CreateService(service_request)
        return None
    except Exception as e: # pylint: disable=broad-except
        LOGGER.exception('Unhandled exception creating Service')
        return e

def process_vpn_service(
    vpn_service : Dict, errors : List[Dict]
) -> None:
    vpn_id = vpn_service['vpn-id']
    exc = create_service(vpn_id)
    if exc is not None: errors.append({'error': str(exc)})


def process_site_network_access(
    site_id : str, network_access : Dict, errors : List[Dict]
) -> None:
    endpoint_uuid = network_access['site-network-access-id']

    site_network_access_type = network_access['site-network-access-type']
    site_network_access_type = site_network_access_type.replace('ietf-l2vpn-svc:', '')
    if site_network_access_type != 'multipoint':
        MSG = 'Site Network Access Type: {:s}'
        msg = MSG.format(str(network_access['site-network-access-type']))
        raise NotImplementedError(msg)

    device_uuid  = network_access['device-reference']
    service_uuid = network_access['vpn-attachment']['vpn-id']

    access_role : str = network_access['vpn-attachment']['site-role']
    access_role = access_role.replace('ietf-l2vpn-svc:', '').replace('-role', '') # hub/spoke
    if access_role not in {'hub', 'spoke'}:
        MSG = 'Site VPN Attackment Role: {:s}'
        raise NotImplementedError(MSG.format(str(network_access['site-network-access-type'])))



    encapsulation_type = network_access['connection']['encapsulation-type']
    cvlan_tag_id = network_access['connection']['tagged-interface'][encapsulation_type]['cvlan-id']

    service_mtu              = network_access['service']['svc-mtu']
    service_input_bandwidth  = network_access['service']['svc-input-bandwidth']
    service_output_bandwidth = network_access['service']['svc-output-bandwidth']
    service_bandwidth_bps    = max(service_input_bandwidth, service_output_bandwidth)
    service_bandwidth_gbps   = service_bandwidth_bps / 1.e9

    max_e2e_latency_ms = None
    availability       = None
    for qos_profile_class in network_access['service']['qos']['qos-profile']['classes']['class']:
        if qos_profile_class['class-id'] != 'qos-realtime':
            MSG = 'Site Network Access QoS Class Id: {:s}'
            raise NotImplementedError(MSG.format(str(qos_profile_class['class-id'])))

        if 'ietf-l2vpn-svc' in qos_profile_class['direction']:
            # replace 'ietf-l2vpn-svc:both' with 'both' for backward compatibility
            qos_profile_class['direction'] = qos_profile_class['direction'].replace('ietf-l2vpn-svc:', '')
        if qos_profile_class['direction'] != 'both':
            MSG = 'Site Network Access QoS Class Direction: {:s}'
            raise NotImplementedError(MSG.format(str(qos_profile_class['direction'])))

        max_e2e_latency_ms = qos_profile_class['latency']['latency-boundary']
        availability       = qos_profile_class['bandwidth']['guaranteed-bw-percent']

    errors.append({'error': str(exc)})

    
    context_uuid : Optional[str] = DEFAULT_CONTEXT_NAME
    context_client = ContextClient()
    service = get_service_by_uuid(context_client, service_uuid, context_uuid=context_uuid, rw_copy=True)
    if service is None: raise Exception('VPN({:s}) not found in database'.format(str(service_uuid)))

    endpoint_ids = service.service_endpoint_ids
    endpoint_id =  update_endpoint_ids(endpoint_ids, device_uuid, endpoint_uuid)

    constraints  = service.service_constraints
    update_constraint_endpoint_location(constraints, endpoint_id, region=site_id)
    if service_bandwidth_gbps  is not None: update_constraint_sla_capacity    (constraints, service_bandwidth_gbps)
    if max_e2e_latency_ms is not None: update_constraint_sla_latency     (constraints, max_e2e_latency_ms)
    if availability   is not None: update_constraint_sla_availability(constraints, 1, True, availability)

    config_rules = service.service_config.config_rules

    service_settings_key = '/settings'
    service_settings = dict()
    if service_mtu is not None: service_settings['mtu'] = (service_mtu, True)
    update_config_rule_custom(config_rules, service_settings_key, service_settings)

    #ENDPOINT_SETTINGS_KEY = '/device[{:s}]/endpoint[{:s}]/vlan[{:d}]/settings'
    #endpoint_settings_key = ENDPOINT_SETTINGS_KEY.format(device_uuid, endpoint_uuid, cvlan_tag_id)
    ENDPOINT_SETTINGS_KEY = '/device[{:s}]/endpoint[{:s}]/settings'
    endpoint_settings_key = ENDPOINT_SETTINGS_KEY.format(device_uuid, endpoint_uuid)
    field_updates = {}
    if cvlan_tag_id is not None: field_updates['vlan_tag'] = (cvlan_tag_id, True)
    update_config_rule_custom(config_rules, endpoint_settings_key, field_updates)

    try:
        service_client = ServiceClient()
        service_client.UpdateService(service)
        return None
    except Exception as e: # pylint: disable=broad-except
        LOGGER.exception('Unhandled exception updating Service')
        return e


# TODO: merge with


def process_site_network_access(site_id : str, network_access : Dict) -> Service:
    service_uuid = network_access['vpn-attachment']['vpn-id']

    bearer_reference = network_access['bearer']['bearer-reference']

    access_priority : Optional[int] = network_access.get('availability', {}).get('access-priority')
    single_active   : bool = len(network_access.get('availability', {}).get('single-active', [])) > 0
    all_active      : bool = len(network_access.get('availability', {}).get('all-active', [])) > 0

    diversity_constraints = network_access.get('access-diversity', {}).get('constraints', {}).get('constraint', [])
    raise_if_differs = True
    diversity_constraints = {
        constraint['constraint-type']:([
            target[0]
            for target in constraint['target'].items()
            if len(target[1]) == 1
        ][0], raise_if_differs)
        for constraint in diversity_constraints
    }

    mapping = BEARER_MAPPINGS.get(bearer_reference)
    if mapping is None:
        msg = 'Specified Bearer({:s}) is not configured.'
        raise Exception(msg.format(str(bearer_reference)))
    (
        device_uuid, endpoint_uuid, router_id, route_dist, sub_if_index,
        address_ip, address_prefix, remote_router, circuit_id
    ) = mapping

    target = get_service_by_uuid(context_client, service_uuid, rw_copy=True)
    if target is None: raise Exception('VPN({:s}) not found in database'.format(str(service_uuid)))

    endpoint_ids = target.service_endpoint_ids       # pylint: disable=no-member
    config_rules = target.service_config.config_rules # pylint: disable=no-member
    constraints  = target.service_constraints        # pylint: disable=no-member

    endpoint_id = update_endpoint_ids(endpoint_ids, device_uuid, endpoint_uuid)

    service_settings_key = '/settings'
    update_config_rule_custom(config_rules, service_settings_key, {
        'mtu'             : (DEFAULT_MTU,              True),
        'address_families': (DEFAULT_ADDRESS_FAMILIES, True),
        'bgp_as'          : (DEFAULT_BGP_AS,           True),
        'bgp_route_target': (DEFAULT_BGP_ROUTE_TARGET, True),
    })

    endpoint_settings_key = '/device[{:s}]/endpoint[{:s}]/settings'.format(device_uuid, endpoint_uuid)
    field_updates = {}
    if router_id      is not None: field_updates['router_id'          ] = (router_id,      True)
    if route_dist     is not None: field_updates['route_distinguisher'] = (route_dist,     True)
    if sub_if_index   is not None: field_updates['sub_interface_index'] = (sub_if_index,   True)
    if cvlan_id       is not None: field_updates['vlan_id'            ] = (cvlan_id,       True)
    if address_ip     is not None: field_updates['address_ip'         ] = (address_ip,     True)
    if address_prefix is not None: field_updates['address_prefix'     ] = (address_prefix, True)
    if remote_router  is not None: field_updates['remote_router'      ] = (remote_router,  True)
    if circuit_id     is not None: field_updates['circuit_id'         ] = (circuit_id,     True)
    update_config_rule_custom(config_rules, endpoint_settings_key, field_updates)

    if len(diversity_constraints) > 0:
        update_constraint_custom_dict(constraints, 'diversity', diversity_constraints)

    update_constraint_endpoint_location(constraints, endpoint_id, region=site_id)
    if access_priority is not None: update_constraint_endpoint_priority(constraints, endpoint_id, access_priority)
    if single_active or all_active:
        # assume 1 disjoint path per endpoint/location included in service/slice
        location_endpoints = {}
        for constraint in constraints:
            if constraint.WhichOneof('constraint') != 'endpoint_location': continue
            str_endpoint_id = grpc_message_to_json_string(constraint.endpoint_location.endpoint_id)
            str_location_id = grpc_message_to_json_string(constraint.endpoint_location.location)
            location_endpoints.setdefault(str_location_id, set()).add(str_endpoint_id)
        num_endpoints_per_location = {len(endpoints) for endpoints in location_endpoints.values()}
        num_disjoint_paths = max(num_endpoints_per_location)
        update_constraint_sla_availability(constraints, num_disjoint_paths, all_active, 0.0)

    return target












def process_site(site : Dict, errors : List[Dict]) -> None:
    site_id = site['site-id']

    # this change is made for ECOC2025 demo purposes
    if site['management']['type'] != 'provider-managed':
    # if site['management']['type'] == 'customer-managed':
        MSG = 'Site Management Type: {:s}'
        raise NotImplementedError(MSG.format(str(site['management']['type'])))

    network_accesses : List[Dict] = site['site-network-accesses']['site-network-access']
    for network_access in network_accesses:
        process_site_network_access(site_id, network_access, errors)

def update_vpn(site : Dict, errors : List[Dict]) -> None:
    if site['management']['type'] != 'provider-managed':
        MSG = 'Site Management Type: {:s}'
        raise NotImplementedError(MSG.format(str(site['management']['type'])))

    network_accesses : List[Dict] = site['site-network-accesses']['site-network-access']
    for network_access in network_accesses:
        update_site_network_access(network_access, errors)

def update_site_network_access(network_access : Dict, errors : List[Dict]) -> None:
    try:
        site_network_access_type = network_access['site-network-access-type']
        site_network_access_type = site_network_access_type.replace('ietf-l2vpn-svc:', '')
        if site_network_access_type != 'multipoint':
            MSG = 'Site Network Access Type: {:s}'
            msg = MSG.format(str(network_access['site-network-access-type']))
            raise NotImplementedError(msg)

        service_uuid = network_access['vpn-attachment']['vpn-id']

        service_input_bandwidth  = network_access['service']['svc-input-bandwidth']
        service_output_bandwidth = network_access['service']['svc-output-bandwidth']
        service_bandwidth_bps    = max(service_input_bandwidth, service_output_bandwidth)
        service_bandwidth_gbps   = service_bandwidth_bps / 1.e9

        max_e2e_latency_ms = None
        availability       = None

        context_client = ContextClient()
        service = get_service_by_uuid(
            context_client, service_uuid, context_uuid=DEFAULT_CONTEXT_NAME, rw_copy=True
        )
        if service is None:
            MSG = 'VPN({:s}) not found in database'
            raise Exception(MSG.format(str(service_uuid)))

        constraints = service.service_constraints
        if service_bandwidth_gbps is not None:
            update_constraint_sla_capacity(constraints, service_bandwidth_gbps)
        if max_e2e_latency_ms is not None:
            update_constraint_sla_latency(constraints, max_e2e_latency_ms)
        if availability is not None:
            update_constraint_sla_availability(constraints, 1, True, availability)

        service_client = ServiceClient()
        service_client.UpdateService(service)
    except Exception as e: # pylint: disable=broad-except
        LOGGER.exception('Unhandled exception updating Service')
        errors.append({'error': str(e)})
+50 −5
Original line number Diff line number Diff line
@@ -16,14 +16,18 @@ import logging
from flask import request
from flask.json import jsonify
from flask_restful import Resource
from common.proto.context_pb2 import ServiceStatusEnum
from common.proto.context_pb2 import ServiceStatusEnum, ServiceTypeEnum
from common.tools.context_queries.Service import get_service_by_uuid
from context.client.ContextClient import ContextClient
from service.client.ServiceClient import ServiceClient
from typing import Dict, List
from werkzeug.exceptions import UnsupportedMediaType
from nbi.service._tools.Authentication import HTTP_AUTH
from nbi.service._tools.HttpStatusCodes import (
    HTTP_GATEWAYTIMEOUT, HTTP_NOCONTENT, HTTP_OK, HTTP_SERVERERROR
)
from .Handlers import  update_vpn
from .YangValidator import YangValidator

LOGGER = logging.getLogger(__name__)

@@ -40,12 +44,16 @@ class L2VPN_Service(Resource):
            if target is None:
                raise Exception('VPN({:s}) not found in database'.format(str(vpn_id)))
            
            if target.service_id.service_uuid.uuid != vpn_id: # pylint: disable=no-member
            if target.service_type != ServiceTypeEnum.SERVICETYPE_L2NM:
                raise Exception('VPN({:s}) is not L2VPN'.format(str(vpn_id)))

            service_ids = {target.service_id.service_uuid.uuid, target.name} # pylint: disable=no-member
            if vpn_id not in service_ids:
                raise Exception('Service retrieval failed. Wrong Service Id was returned')

            service_ready_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE
            service_status = target.service_status.service_status # pylint: disable=no-member
            response = jsonify({})
            response = jsonify({'service-id': target.service_id.service_uuid.uuid})
            response.status_code = HTTP_OK if service_status == service_ready_status else HTTP_GATEWAYTIMEOUT
        except Exception as e: # pylint: disable=broad-except
            LOGGER.exception('Something went wrong Retrieving VPN({:s})'.format(str(vpn_id)))
@@ -64,8 +72,11 @@ class L2VPN_Service(Resource):
            target = get_service_by_uuid(context_client, vpn_id)
            if target is None:
                LOGGER.warning('VPN({:s}) not found in database. Nothing done.'.format(str(vpn_id)))
            elif target.service_type != ServiceTypeEnum.SERVICETYPE_L2NM:
                raise Exception('VPN({:s}) is not L2VPN'.format(str(vpn_id)))
            else:
                if target.service_id.service_uuid.uuid != vpn_id: # pylint: disable=no-member
                service_ids = {target.service_id.service_uuid.uuid, target.name} # pylint: disable=no-member
                if vpn_id not in service_ids:
                    raise Exception('Service retrieval failed. Wrong Service Id was returned')
                service_client = ServiceClient()
                service_client.DeleteService(target.service_id)
@@ -76,3 +87,37 @@ class L2VPN_Service(Resource):
            response = jsonify({'error': str(e)})
            response.status_code = HTTP_SERVERERROR
        return response

    def put(self, vpn_id : str):
        #TODO: check vpn_id with request service_id in body
        if not request.is_json: raise UnsupportedMediaType('JSON payload is required')
        request_data: Dict = request.json
        LOGGER.debug('PUT Request: {:s}'.format(str(request_data)))

        errors = list()

        if 'ietf-l2vpn-svc:l2vpn-services' in request_data:
            for l2vpn_svc in request_data['ietf-l2vpn-svc:l2vpn-services']['l2vpn-svc']:
                l2vpn_svc.pop('service-id', None)
                l2vpn_svc_request_data = {'ietf-l2vpn-svc:l2vpn-svc': l2vpn_svc}
                errors.extend(self._update_l2vpn(l2vpn_svc_request_data))
        elif 'ietf-l2vpn-svc:l2vpn-svc' in request_data:
            errors.extend(self._update_l2vpn(request_data))
        else:
            errors.append('Unexpected request format: {:s}'.format(str(request_data)))

        response = jsonify(errors)
        response.status_code = HTTP_NOCONTENT if len(errors) == 0 else HTTP_SERVERERROR
        return response

    def _update_l2vpn(self, request_data: Dict) -> List[Dict]:
        yang_validator = YangValidator('ietf-l2vpn-svc')
        request_data = yang_validator.parse_to_dict(request_data)
        yang_validator.destroy()

        errors = list()

        for site in request_data['l2vpn-svc']['sites']['site']:
            update_vpn(site, errors)

        return errors
+45 −27
Original line number Diff line number Diff line
@@ -18,13 +18,10 @@ from flask import request
from flask.json import jsonify
from flask_restful import Resource
from werkzeug.exceptions import UnsupportedMediaType
from common.Constants import DEFAULT_CONTEXT_NAME
from common.proto.context_pb2 import Service, ServiceStatusEnum, ServiceTypeEnum
from service.client.ServiceClient import ServiceClient
from nbi.service._tools.HttpStatusCodes import HTTP_CREATED, HTTP_SERVERERROR
from nbi.service._tools.Validator import validate_message
from nbi.service._tools.Authentication import HTTP_AUTH
from .schemas.vpn_service import SCHEMA_VPN_SERVICE
from .Handlers import process_site, process_vpn_service
from .YangValidator import YangValidator

LOGGER = logging.getLogger(__name__)

@@ -38,25 +35,46 @@ class L2VPN_Services(Resource):
        if not request.is_json: raise UnsupportedMediaType('JSON payload is required')
        request_data : Dict = request.json
        LOGGER.debug('Request: {:s}'.format(str(request_data)))
        validate_message(SCHEMA_VPN_SERVICE, request_data)

        vpn_services : List[Dict] = request_data['ietf-l2vpn-svc:vpn-service']
        for vpn_service in vpn_services:
            try:
                # pylint: disable=no-member
                service_request = Service()
                service_request.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
                service_request.service_id.service_uuid.uuid = vpn_service['vpn-id']
                service_request.service_type = ServiceTypeEnum.SERVICETYPE_L2NM
                service_request.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED

                service_client = ServiceClient()
                service_client.CreateService(service_request)

                response = jsonify({})
                response.status_code = HTTP_CREATED
            except Exception as e: # pylint: disable=broad-except
                LOGGER.exception('Something went wrong Creating Service {:s}'.format(str(request)))
                response = jsonify({'error': str(e)})
                response.status_code = HTTP_SERVERERROR

        errors = list()
        if 'ietf-l2vpn-svc:l2vpn-services' in request_data:
            # processing multiple L2VPN service requests formatted as:
            #{
            #  "ietf-l2vpn-svc:l2vpn-services": {
            #    "l2vpn-svc": [
            #      {
            #        "service-id": "vpn1",
            #        "vpn-services": {
            #          "vpn-service": [
            for l2vpn_svc in request_data['ietf-l2vpn-svc:l2vpn-services']['l2vpn-svc']:
                l2vpn_svc.pop('service-id', None)
                l2vpn_svc_request_data = {'ietf-l2vpn-svc:l2vpn-svc': l2vpn_svc}
                errors.extend(self._process_l2vpn(l2vpn_svc_request_data))
        elif 'ietf-l2vpn-svc:l2vpn-svc' in request_data:
            # processing single (standard) L2VPN service request formatted as:
            #{
            #  "ietf-l2vpn-svc:l2vpn-svc": {
            #    "vpn-services": {
            #      "vpn-service": [
            errors.extend(self._process_l2vpn(request_data))
        else:
            errors.append('Unexpected request: {:s}'.format(str(request_data)))

        response = jsonify(errors)
        response.status_code = HTTP_CREATED if len(errors) == 0 else HTTP_SERVERERROR
        return response

    def _process_l2vpn(self, request_data : Dict) -> List[Dict]:
        yang_validator = YangValidator('ietf-l2vpn-svc')
        request_data = yang_validator.parse_to_dict(request_data)
        yang_validator.destroy()

        errors = list()

        for vpn_service in request_data['l2vpn-svc']['vpn-services']['vpn-service']:
            process_vpn_service(vpn_service, errors)

        for site in request_data['l2vpn-svc']['sites']['site']:
            process_site(site, errors)

        return errors
+18 −118

File changed.

Preview size limit exceeded, changes collapsed.

+36 −0
Original line number Diff line number Diff line
@@ -12,5 +12,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# String pattern for UUIDs such as '3fd942ee-2dc3-41d1-aeec-65aa85d117b2'
REGEX_UUID = r'[a-fA-F0-9]{8}\-[a-fA-F0-9]{4}\-[a-fA-F0-9]{4}\-[a-fA-F0-9]{4}\-[a-fA-F0-9]{12}'
import libyang, os
from typing import Dict, Optional

YANG_DIR = os.path.join(os.path.dirname(__file__), 'yang')

class YangValidator:
    def __init__(self, module_name : str) -> None:
        self._yang_context = libyang.Context(YANG_DIR)
        self._yang_module  = self._yang_context.load_module(module_name)
        self._yang_module.feature_enable_all()

    def parse_to_dict(self, message : Dict) -> Dict:
        dnode : Optional[libyang.DNode] = self._yang_module.parse_data_dict(
            message, validate_present=True, validate=True, strict=True
        )
        if dnode is None: raise Exception('Unable to parse Message({:s})'.format(str(message)))
        message = dnode.print_dict()
        dnode.free()
        return message

    def destroy(self) -> None:
        self._yang_context.destroy()
Loading