Commit e852fa72 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'develop' of ssh://gifrerenom_labs.etsi.org/tfs/controller into...

Merge branch 'develop' of ssh://gifrerenom_labs.etsi.org/tfs/controller into feat/292-cttc-implement-integration-test-for-ryu-openflow
parents 66cfb8ae 26a57404
Loading
Loading
Loading
Loading
+265 −0
Original line number Original line Diff line number Diff line
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Dict, List, Optional
from common.Constants import DEFAULT_CONTEXT_NAME
from common.proto.context_pb2 import Service, ServiceStatusEnum, ServiceTypeEnum
from common.tools.context_queries.Service import get_service_by_uuid
from common.tools.grpc.ConfigRules import update_config_rule_custom
from common.tools.grpc.Constraints import (
    update_constraint_custom_dict, update_constraint_endpoint_location,
    update_constraint_endpoint_priority, update_constraint_sla_availability,
    update_constraint_sla_capacity, update_constraint_sla_latency,
)
from common.tools.grpc.EndPointIds import update_endpoint_ids
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from service.client.ServiceClient import ServiceClient
from .Constants import (
    #DEFAULT_ADDRESS_FAMILIES, DEFAULT_BGP_AS, DEFAULT_BGP_ROUTE_TARGET,
    BEARER_MAPPINGS, DEFAULT_MTU,
)

LOGGER = logging.getLogger(__name__)

def create_service(
    service_uuid : str, context_uuid : Optional[str] = DEFAULT_CONTEXT_NAME
) -> Optional[Exception]:
    # pylint: disable=no-member
    service_request = Service()
    service_request.service_id.context_id.context_uuid.uuid = context_uuid
    service_request.service_id.service_uuid.uuid = service_uuid
    service_request.service_type = ServiceTypeEnum.SERVICETYPE_L2NM
    service_request.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED

    try:
        service_client = ServiceClient()
        service_client.CreateService(service_request)
        return None
    except Exception as e: # pylint: disable=broad-except
        LOGGER.exception('Unhandled exception creating Service')
        return e

def process_vpn_service(
    vpn_service : Dict, errors : List[Dict]
) -> None:
    vpn_id = vpn_service['vpn-id']
    exc = create_service(vpn_id)
    if exc is not None: errors.append({'error': str(exc)})


def process_site_network_access(
    site_id : str, network_access : Dict, errors : List[Dict]
) -> None:
    try:
        site_network_access_type = network_access['site-network-access-type']
        site_network_access_type = site_network_access_type.replace('ietf-l2vpn-svc:', '')
        if site_network_access_type != 'multipoint':
            MSG = 'Site Network Access Type: {:s}'
            msg = MSG.format(str(network_access['site-network-access-type']))
            raise NotImplementedError(msg)

        access_role : str = network_access['vpn-attachment']['site-role']
        access_role = access_role.replace('ietf-l2vpn-svc:', '').replace('-role', '') # hub/spoke
        if access_role not in {'hub', 'spoke'}:
            MSG = 'Site VPN Attackment Role: {:s}'
            raise NotImplementedError(MSG.format(str(network_access['site-network-access-type'])))

        device_uuid   = network_access['device-reference']
        endpoint_uuid = network_access['site-network-access-id']
        service_uuid  = network_access['vpn-attachment']['vpn-id']

        encapsulation_type = network_access['connection']['encapsulation-type']
        cvlan_tag_id = network_access['connection']['tagged-interface'][encapsulation_type]['cvlan-id']

        bearer_reference = network_access['bearer']['bearer-reference']

        service_mtu              = network_access['service']['svc-mtu']
        service_input_bandwidth  = network_access['service']['svc-input-bandwidth']
        service_output_bandwidth = network_access['service']['svc-output-bandwidth']
        service_bandwidth_bps    = max(service_input_bandwidth, service_output_bandwidth)
        service_bandwidth_gbps   = service_bandwidth_bps / 1.e9

        max_e2e_latency_ms = None
        availability       = None
        for qos_profile_class in network_access['service']['qos']['qos-profile']['classes']['class']:
            if qos_profile_class['class-id'] != 'qos-realtime':
                MSG = 'Site Network Access QoS Class Id: {:s}'
                raise NotImplementedError(MSG.format(str(qos_profile_class['class-id'])))

            qos_profile_class_direction = qos_profile_class['direction']
            qos_profile_class_direction = qos_profile_class_direction.replace('ietf-l2vpn-svc:', '')
            if qos_profile_class_direction != 'both':
                MSG = 'Site Network Access QoS Class Direction: {:s}'
                raise NotImplementedError(MSG.format(str(qos_profile_class['direction'])))

            max_e2e_latency_ms = qos_profile_class['latency']['latency-boundary']
            availability       = qos_profile_class['bandwidth']['guaranteed-bw-percent']

        network_access_diversity = network_access.get('access-diversity', {})
        diversity_constraints = network_access_diversity.get('constraints', {}).get('constraint', [])
        raise_if_differs = True
        diversity_constraints = {
            constraint['constraint-type']:([
                target[0]
                for target in constraint['target'].items()
                if len(target[1]) == 1
            ][0], raise_if_differs)
            for constraint in diversity_constraints
        }

        network_access_availability = network_access.get('availability', {})
        access_priority : Optional[int] = network_access_availability.get('access-priority')
        single_active   : bool = len(network_access_availability.get('single-active', [])) > 0
        all_active      : bool = len(network_access_availability.get('all-active', [])) > 0

        mapping = BEARER_MAPPINGS.get(bearer_reference)
        if mapping is None:
            msg = 'Specified Bearer({:s}) is not configured.'
            raise Exception(msg.format(str(bearer_reference)))
        (
            device_uuid, endpoint_uuid, router_id, route_dist, sub_if_index,
            address_ip, address_prefix, remote_router, circuit_id
        ) = mapping

        context_client = ContextClient()
        service = get_service_by_uuid(
            context_client, service_uuid, context_uuid=DEFAULT_CONTEXT_NAME, rw_copy=True
        )
        if service is None:
            raise Exception('VPN({:s}) not found in database'.format(str(service_uuid)))

        endpoint_ids = service.service_endpoint_ids
        config_rules = service.service_config.config_rules
        constraints  = service.service_constraints

        endpoint_id = update_endpoint_ids(endpoint_ids, device_uuid, endpoint_uuid)

        update_constraint_endpoint_location(constraints, endpoint_id, region=site_id)
        if access_priority is not None:
            update_constraint_endpoint_priority(constraints, endpoint_id, access_priority)
        if service_bandwidth_gbps is not None:
            update_constraint_sla_capacity(constraints, service_bandwidth_gbps)
        if max_e2e_latency_ms is not None:
            update_constraint_sla_latency(constraints, max_e2e_latency_ms)
        if availability is not None:
            update_constraint_sla_availability(constraints, 1, True, availability)
        if len(diversity_constraints) > 0:
            update_constraint_custom_dict(constraints, 'diversity', diversity_constraints)
        if single_active or all_active:
            # assume 1 disjoint path per endpoint/location included in service
            location_endpoints = {}
            for constraint in constraints:
                if constraint.WhichOneof('constraint') != 'endpoint_location': continue
                str_endpoint_id = grpc_message_to_json_string(constraint.endpoint_location.endpoint_id)
                str_location_id = grpc_message_to_json_string(constraint.endpoint_location.location)
                location_endpoints.setdefault(str_location_id, set()).add(str_endpoint_id)
            num_endpoints_per_location = {len(endpoints) for endpoints in location_endpoints.values()}
            num_disjoint_paths = max(num_endpoints_per_location)
            update_constraint_sla_availability(constraints, num_disjoint_paths, all_active, 0.0)

        service_settings_key = '/settings'
        if service_mtu is None: service_mtu = DEFAULT_MTU
        update_config_rule_custom(config_rules, service_settings_key, {
            'mtu'             : (service_mtu,              True),
            #'address_families': (DEFAULT_ADDRESS_FAMILIES, True),
            #'bgp_as'          : (DEFAULT_BGP_AS,           True),
            #'bgp_route_target': (DEFAULT_BGP_ROUTE_TARGET, True),
        })

        #ENDPOINT_SETTINGS_KEY = '/device[{:s}]/endpoint[{:s}]/vlan[{:d}]/settings'
        #endpoint_settings_key = ENDPOINT_SETTINGS_KEY.format(device_uuid, endpoint_uuid, cvlan_tag_id)
        ENDPOINT_SETTINGS_KEY = '/device[{:s}]/endpoint[{:s}]/settings'
        endpoint_settings_key = ENDPOINT_SETTINGS_KEY.format(device_uuid, endpoint_uuid)
        field_updates = {}
        if router_id      is not None: field_updates['router_id'          ] = (router_id,      True)
        if route_dist     is not None: field_updates['route_distinguisher'] = (route_dist,     True)
        if sub_if_index   is not None: field_updates['sub_interface_index'] = (sub_if_index,   True)
        if cvlan_tag_id   is not None: field_updates['vlan_id'            ] = (cvlan_tag_id,   True)
        if address_ip     is not None: field_updates['address_ip'         ] = (address_ip,     True)
        if address_prefix is not None: field_updates['address_prefix'     ] = (address_prefix, True)
        if remote_router  is not None: field_updates['remote_router'      ] = (remote_router,  True)
        if circuit_id     is not None: field_updates['circuit_id'         ] = (circuit_id,     True)
        update_config_rule_custom(config_rules, endpoint_settings_key, field_updates)

        service_client = ServiceClient()
        service_client.UpdateService(service)
    except Exception as exc:
        LOGGER.exception('Unhandled Exception')
        errors.append({'error': str(exc)})


def process_site(site : Dict, errors : List[Dict]) -> None:
    site_id = site['site-id']

    # this change is made for ECOC2025 demo purposes
    if site['management']['type'] != 'provider-managed':
    # if site['management']['type'] == 'customer-managed':
        MSG = 'Site Management Type: {:s}'
        raise NotImplementedError(MSG.format(str(site['management']['type'])))

    network_accesses : List[Dict] = site['site-network-accesses']['site-network-access']
    for network_access in network_accesses:
        process_site_network_access(site_id, network_access, errors)

def update_vpn(site : Dict, errors : List[Dict]) -> None:
    if site['management']['type'] != 'provider-managed':
        MSG = 'Site Management Type: {:s}'
        raise NotImplementedError(MSG.format(str(site['management']['type'])))

    network_accesses : List[Dict] = site['site-network-accesses']['site-network-access']
    for network_access in network_accesses:
        update_site_network_access(network_access, errors)

def update_site_network_access(network_access : Dict, errors : List[Dict]) -> None:
    try:
        site_network_access_type = network_access['site-network-access-type']
        site_network_access_type = site_network_access_type.replace('ietf-l2vpn-svc:', '')
        if site_network_access_type != 'multipoint':
            MSG = 'Site Network Access Type: {:s}'
            msg = MSG.format(str(network_access['site-network-access-type']))
            raise NotImplementedError(msg)

        service_uuid = network_access['vpn-attachment']['vpn-id']

        service_input_bandwidth  = network_access['service']['svc-input-bandwidth']
        service_output_bandwidth = network_access['service']['svc-output-bandwidth']
        service_bandwidth_bps    = max(service_input_bandwidth, service_output_bandwidth)
        service_bandwidth_gbps   = service_bandwidth_bps / 1.e9

        max_e2e_latency_ms = None
        availability       = None

        context_client = ContextClient()
        service = get_service_by_uuid(
            context_client, service_uuid, context_uuid=DEFAULT_CONTEXT_NAME, rw_copy=True
        )
        if service is None:
            MSG = 'VPN({:s}) not found in database'
            raise Exception(MSG.format(str(service_uuid)))

        constraints = service.service_constraints
        if service_bandwidth_gbps is not None:
            update_constraint_sla_capacity(constraints, service_bandwidth_gbps)
        if max_e2e_latency_ms is not None:
            update_constraint_sla_latency(constraints, max_e2e_latency_ms)
        if availability is not None:
            update_constraint_sla_availability(constraints, 1, True, availability)

        service_client = ServiceClient()
        service_client.UpdateService(service)
    except Exception as e: # pylint: disable=broad-except
        LOGGER.exception('Unhandled exception updating Service')
        errors.append({'error': str(e)})
+60 −15
Original line number Original line Diff line number Diff line
@@ -16,14 +16,18 @@ import logging
from flask import request
from flask import request
from flask.json import jsonify
from flask.json import jsonify
from flask_restful import Resource
from flask_restful import Resource
from common.proto.context_pb2 import SliceStatusEnum
from common.proto.context_pb2 import ServiceStatusEnum, ServiceTypeEnum
from common.tools.context_queries.Slice import get_slice_by_uuid
from common.tools.context_queries.Service import get_service_by_uuid
from context.client.ContextClient import ContextClient
from context.client.ContextClient import ContextClient
from slice.client.SliceClient import SliceClient
from service.client.ServiceClient import ServiceClient
from typing import Dict, List
from werkzeug.exceptions import UnsupportedMediaType
from nbi.service._tools.Authentication import HTTP_AUTH
from nbi.service._tools.Authentication import HTTP_AUTH
from nbi.service._tools.HttpStatusCodes import (
from nbi.service._tools.HttpStatusCodes import (
    HTTP_GATEWAYTIMEOUT, HTTP_NOCONTENT, HTTP_OK, HTTP_SERVERERROR
    HTTP_GATEWAYTIMEOUT, HTTP_NOCONTENT, HTTP_OK, HTTP_SERVERERROR
)
)
from .Handlers import  update_vpn
from .YangValidator import YangValidator


LOGGER = logging.getLogger(__name__)
LOGGER = logging.getLogger(__name__)


@@ -36,17 +40,21 @@ class L2VPN_Service(Resource):
        try:
        try:
            context_client = ContextClient()
            context_client = ContextClient()


            target = get_slice_by_uuid(context_client, vpn_id, rw_copy=True)
            target = get_service_by_uuid(context_client, vpn_id, rw_copy=True)
            if target is None:
            if target is None:
                raise Exception('VPN({:s}) not found in database'.format(str(vpn_id)))
                raise Exception('VPN({:s}) not found in database'.format(str(vpn_id)))
            
            
            if target.slice_id.slice_uuid.uuid != vpn_id: # pylint: disable=no-member
            if target.service_type != ServiceTypeEnum.SERVICETYPE_L2NM:
                raise Exception('Slice retrieval failed. Wrong Slice Id was returned')
                raise Exception('VPN({:s}) is not L2VPN'.format(str(vpn_id)))


            slice_ready_status = SliceStatusEnum.SLICESTATUS_ACTIVE
            service_ids = {target.service_id.service_uuid.uuid, target.name} # pylint: disable=no-member
            slice_status = target.slice_status.slice_status # pylint: disable=no-member
            if vpn_id not in service_ids:
            response = jsonify({})
                raise Exception('Service retrieval failed. Wrong Service Id was returned')
            response.status_code = HTTP_OK if slice_status == slice_ready_status else HTTP_GATEWAYTIMEOUT

            service_ready_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE
            service_status = target.service_status.service_status # pylint: disable=no-member
            response = jsonify({'service-id': target.service_id.service_uuid.uuid})
            response.status_code = HTTP_OK if service_status == service_ready_status else HTTP_GATEWAYTIMEOUT
        except Exception as e: # pylint: disable=broad-except
        except Exception as e: # pylint: disable=broad-except
            LOGGER.exception('Something went wrong Retrieving VPN({:s})'.format(str(vpn_id)))
            LOGGER.exception('Something went wrong Retrieving VPN({:s})'.format(str(vpn_id)))
            response = jsonify({'error': str(e)})
            response = jsonify({'error': str(e)})
@@ -61,14 +69,17 @@ class L2VPN_Service(Resource):
        try:
        try:
            context_client = ContextClient()
            context_client = ContextClient()


            target = get_slice_by_uuid(context_client, vpn_id)
            target = get_service_by_uuid(context_client, vpn_id)
            if target is None:
            if target is None:
                LOGGER.warning('VPN({:s}) not found in database. Nothing done.'.format(str(vpn_id)))
                LOGGER.warning('VPN({:s}) not found in database. Nothing done.'.format(str(vpn_id)))
            elif target.service_type != ServiceTypeEnum.SERVICETYPE_L2NM:
                raise Exception('VPN({:s}) is not L2VPN'.format(str(vpn_id)))
            else:
            else:
                if target.slice_id.slice_uuid.uuid != vpn_id: # pylint: disable=no-member
                service_ids = {target.service_id.service_uuid.uuid, target.name} # pylint: disable=no-member
                    raise Exception('Slice retrieval failed. Wrong Slice Id was returned')
                if vpn_id not in service_ids:
                slice_client = SliceClient()
                    raise Exception('Service retrieval failed. Wrong Service Id was returned')
                slice_client.DeleteSlice(target.slice_id)
                service_client = ServiceClient()
                service_client.DeleteService(target.service_id)
            response = jsonify({})
            response = jsonify({})
            response.status_code = HTTP_NOCONTENT
            response.status_code = HTTP_NOCONTENT
        except Exception as e: # pylint: disable=broad-except
        except Exception as e: # pylint: disable=broad-except
@@ -76,3 +87,37 @@ class L2VPN_Service(Resource):
            response = jsonify({'error': str(e)})
            response = jsonify({'error': str(e)})
            response.status_code = HTTP_SERVERERROR
            response.status_code = HTTP_SERVERERROR
        return response
        return response

    def put(self, vpn_id : str):
        #TODO: check vpn_id with request service_id in body
        if not request.is_json: raise UnsupportedMediaType('JSON payload is required')
        request_data: Dict = request.json
        LOGGER.debug('PUT Request: {:s}'.format(str(request_data)))

        errors = list()

        if 'ietf-l2vpn-svc:l2vpn-services' in request_data:
            for l2vpn_svc in request_data['ietf-l2vpn-svc:l2vpn-services']['l2vpn-svc']:
                l2vpn_svc.pop('service-id', None)
                l2vpn_svc_request_data = {'ietf-l2vpn-svc:l2vpn-svc': l2vpn_svc}
                errors.extend(self._update_l2vpn(l2vpn_svc_request_data))
        elif 'ietf-l2vpn-svc:l2vpn-svc' in request_data:
            errors.extend(self._update_l2vpn(request_data))
        else:
            errors.append('Unexpected request format: {:s}'.format(str(request_data)))

        response = jsonify(errors)
        response.status_code = HTTP_NOCONTENT if len(errors) == 0 else HTTP_SERVERERROR
        return response

    def _update_l2vpn(self, request_data: Dict) -> List[Dict]:
        yang_validator = YangValidator('ietf-l2vpn-svc')
        request_data = yang_validator.parse_to_dict(request_data)
        yang_validator.destroy()

        errors = list()

        for site in request_data['l2vpn-svc']['sites']['site']:
            update_vpn(site, errors)

        return errors
+70 −25
Original line number Original line Diff line number Diff line
@@ -18,13 +18,10 @@ from flask import request
from flask.json import jsonify
from flask.json import jsonify
from flask_restful import Resource
from flask_restful import Resource
from werkzeug.exceptions import UnsupportedMediaType
from werkzeug.exceptions import UnsupportedMediaType
from common.Constants import DEFAULT_CONTEXT_NAME
from common.proto.context_pb2 import SliceStatusEnum, Slice
from slice.client.SliceClient import SliceClient
from nbi.service._tools.HttpStatusCodes import HTTP_CREATED, HTTP_SERVERERROR
from nbi.service._tools.Validator import validate_message
from nbi.service._tools.Authentication import HTTP_AUTH
from nbi.service._tools.Authentication import HTTP_AUTH
from .schemas.vpn_service import SCHEMA_VPN_SERVICE
from nbi.service._tools.HttpStatusCodes import HTTP_CREATED, HTTP_SERVERERROR
from .Handlers import process_site, process_vpn_service
from .YangValidator import YangValidator


LOGGER = logging.getLogger(__name__)
LOGGER = logging.getLogger(__name__)


@@ -38,24 +35,72 @@ class L2VPN_Services(Resource):
        if not request.is_json: raise UnsupportedMediaType('JSON payload is required')
        if not request.is_json: raise UnsupportedMediaType('JSON payload is required')
        request_data : Dict = request.json
        request_data : Dict = request.json
        LOGGER.debug('Request: {:s}'.format(str(request_data)))
        LOGGER.debug('Request: {:s}'.format(str(request_data)))
        validate_message(SCHEMA_VPN_SERVICE, request_data)


        vpn_services : List[Dict] = request_data['ietf-l2vpn-svc:vpn-service']
        errors = list()
        if 'ietf-l2vpn-svc:l2vpn-svc' in request_data:
            # processing single (standard) request formatted as:
            #{
            #  "ietf-l2vpn-svc:l2vpn-svc": {
            #    "vpn-services": {
            #      "vpn-service": [
            errors.extend(self._process_l2vpn(request_data))
        elif 'ietf-l2vpn-svc:vpn-service' in request_data:
            # processing OSM-style payload request formatted as:
            #{
            #  "ietf-l2vpn-svc:vpn-service": [
            vpn_services = request_data['ietf-l2vpn-svc:vpn-service']

            # Add mandatory fields OSM RO driver skips
            for vpn_service in vpn_services:
            for vpn_service in vpn_services:
            try:
                if 'ce-vlan-preservation' not in vpn_service:
                # pylint: disable=no-member
                    vpn_service['ce-vlan-preservation'] = True
                slice_request = Slice()
                if 'ce-vlan-cos-preservation' not in vpn_service:
                slice_request.slice_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
                    vpn_service['ce-vlan-cos-preservation'] = True
                slice_request.slice_id.slice_uuid.uuid = vpn_service['vpn-id']
                if 'frame-delivery' not in vpn_service:
                slice_request.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_PLANNED
                    vpn_service['frame-delivery'] = dict()

                if 'multicast-gp-port-mapping' not in vpn_service['frame-delivery']:
                slice_client = SliceClient()
                    vpn_service['frame-delivery']['multicast-gp-port-mapping'] = \
                slice_client.CreateSlice(slice_request)
                        'ietf-l2vpn-svc:static-mapping'


                response = jsonify({})
            request_data = {
                response.status_code = HTTP_CREATED
                'ietf-l2vpn-svc:l2vpn-svc': {
            except Exception as e: # pylint: disable=broad-except
                    'vpn-services': {
                LOGGER.exception('Something went wrong Creating Service {:s}'.format(str(request)))
                        'vpn-service': vpn_services
                response = jsonify({'error': str(e)})
                    }
                response.status_code = HTTP_SERVERERROR
                }
            }
            errors.extend(self._process_l2vpn(request_data))
        else:
            errors.append('Unexpected request: {:s}'.format(str(request_data)))

        if len(errors) > 0:
            LOGGER.error('Errors: {:s}'.format(str(errors)))

        response = jsonify(errors)
        response.status_code = HTTP_CREATED if len(errors) == 0 else HTTP_SERVERERROR
        return response
        return response

    def _process_l2vpn(self, request_data : Dict) -> List[Dict]:
        yang_validator = YangValidator('ietf-l2vpn-svc')
        request_data = yang_validator.parse_to_dict(request_data)
        yang_validator.destroy()

        errors = list()

        vpn_services = (
            request_data.get('l2vpn-svc', dict())
            .get('vpn-services', dict())
            .get('vpn-service', list())
        )
        for vpn_service in vpn_services:
            process_vpn_service(vpn_service, errors)

        sites = (
            request_data.get('l2vpn-svc', dict())
            .get('sites', dict())
            .get('site', list())
        )
        for site in sites:
            process_site(site, errors)

        return errors
+135 −122

File changed.

Preview size limit exceeded, changes collapsed.

+36 −0
Original line number Original line Diff line number Diff line
@@ -12,5 +12,25 @@
# See the License for the specific language governing permissions and
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License.


# String pattern for UUIDs such as '3fd942ee-2dc3-41d1-aeec-65aa85d117b2'
import libyang, os
REGEX_UUID = r'[a-fA-F0-9]{8}\-[a-fA-F0-9]{4}\-[a-fA-F0-9]{4}\-[a-fA-F0-9]{4}\-[a-fA-F0-9]{12}'
from typing import Dict, Optional

YANG_DIR = os.path.join(os.path.dirname(__file__), 'yang')

class YangValidator:
    def __init__(self, module_name : str) -> None:
        self._yang_context = libyang.Context(YANG_DIR)
        self._yang_module  = self._yang_context.load_module(module_name)
        self._yang_module.feature_enable_all()

    def parse_to_dict(self, message : Dict) -> Dict:
        dnode : Optional[libyang.DNode] = self._yang_module.parse_data_dict(
            message, validate_present=True, validate=True, strict=True
        )
        if dnode is None: raise Exception('Unable to parse Message({:s})'.format(str(message)))
        message = dnode.print_dict()
        dnode.free()
        return message

    def destroy(self) -> None:
        self._yang_context.destroy()
Loading