Commit c51a694d authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Service component - L3VPN - IETF ACTN Service Handler:

- Implemented rule composition
- Added missing python requirement
parent 4c7e8a31
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@

anytree==2.8.0
geopy==2.3.0
netaddr==0.9.0
networkx==2.6.3
pydot==1.4.2
redis==4.1.2
+0 −128
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, List, Optional, Tuple
from common.proto.context_pb2 import Device, EndPoint
from common.tools.object_factory.ConfigRule import json_config_rule_delete, json_config_rule_set

from service.service.service_handler_api.AnyTreeTools import TreeNode

def _interface(
    if_name : str, ipv4_address : str, ipv4_prefix_length : int, enabled : bool,
    vlan_id : Optional[int] = None, sif_index : Optional[int] = 1
) -> Tuple[str, Dict]:
    str_path = '/interface[{:s}]'.format(if_name)
    data = {
        'name': if_name, 'enabled': enabled, 'sub_if_index': sif_index,
        'sub_if_enabled': enabled, 'sub_if_ipv4_enabled': enabled,
        'sub_if_ipv4_address': ipv4_address, 'sub_if_ipv4_prefix_length': ipv4_prefix_length
    }
    if vlan_id is not None: data['sub_if_vlan'] = vlan_id
    return str_path, data

def _network_instance(ni_name, ni_type) -> Tuple[str, Dict]:
    str_path = '/network_instance[{:s}]'.format(ni_name)
    data = {'name': ni_name, 'type': ni_type}
    return str_path, data

def _network_instance_static_route(ni_name, prefix, next_hop, next_hop_index=0) -> Tuple[str, Dict]:
    str_path = '/network_instance[{:s}]/static_route[{:s}]'.format(ni_name, prefix)
    data = {'name': ni_name, 'prefix': prefix, 'next_hop': next_hop, 'next_hop_index': next_hop_index}
    return str_path, data

def _network_instance_interface(ni_name, if_name, sif_index) -> Tuple[str, Dict]:
    str_path = '/network_instance[{:s}]/interface[{:s}.{:d}]'.format(ni_name, if_name, sif_index)
    data = {'name': ni_name, 'if_name': if_name, 'sif_index': sif_index}
    return str_path, data

class EndpointComposer:
    def __init__(self, endpoint_uuid : str) -> None:
        self.uuid = endpoint_uuid
        self.objekt : Optional[EndPoint] = None
        self.sub_interface_index = 0
        self.ipv4_address = None
        self.ipv4_prefix_length = None
        self.sub_interface_vlan_id = 0

    def configure(self, endpoint_obj : EndPoint, settings : Optional[TreeNode]) -> None:
        self.objekt = endpoint_obj
        if settings is None: return
        json_settings : Dict = settings.value
        self.ipv4_address = json_settings['ipv4_address']
        self.ipv4_prefix_length = json_settings['ipv4_prefix_length']
        self.sub_interface_index = json_settings['sub_interface_index']
        self.sub_interface_vlan_id = json_settings['sub_interface_vlan_id']

    def get_config_rules(self, network_instance_name : str, delete : bool = False) -> List[Dict]:
        json_config_rule = json_config_rule_delete if delete else json_config_rule_set
        return [
            json_config_rule(*_interface(
                self.objekt.name, self.ipv4_address, self.ipv4_prefix_length, True,
                sif_index=self.sub_interface_index, vlan_id=self.sub_interface_vlan_id,
            )),
            json_config_rule(*_network_instance_interface(
                network_instance_name, self.objekt.name, self.sub_interface_index
            )),
        ]

class DeviceComposer:
    def __init__(self, device_uuid : str) -> None:
        self.uuid = device_uuid
        self.objekt : Optional[Device] = None
        self.endpoints : Dict[str, EndpointComposer] = dict()
        self.static_routes : Dict[str, str] = dict()
    
    def get_endpoint(self, endpoint_uuid : str) -> EndpointComposer:
        if endpoint_uuid not in self.endpoints:
            self.endpoints[endpoint_uuid] = EndpointComposer(endpoint_uuid)
        return self.endpoints[endpoint_uuid]

    def configure(self, device_obj : Device, settings : Optional[TreeNode]) -> None:
        self.objekt = device_obj
        if settings is None: return
        json_settings : Dict = settings.value
        static_routes = json_settings.get('static_routes', [])
        for static_route in static_routes:
            prefix   = static_route['prefix']
            next_hop = static_route['next_hop']
            self.static_routes[prefix] = next_hop

    def get_config_rules(self, network_instance_name : str, delete : bool = False) -> List[Dict]:
        json_config_rule = json_config_rule_delete if delete else json_config_rule_set
        config_rules = [
            json_config_rule(*_network_instance(network_instance_name, 'L3VRF'))
        ]
        for endpoint in self.endpoints.values():
            config_rules.extend(endpoint.get_config_rules(network_instance_name, delete=delete))
        for prefix, next_hop in self.static_routes.items():
            config_rules.append(
                json_config_rule(*_network_instance_static_route(network_instance_name, prefix, next_hop))
            )
        if delete: config_rules = list(reversed(config_rules))
        return config_rules

class ConfigRuleComposer:
    def __init__(self) -> None:
        self.devices : Dict[str, DeviceComposer] = dict()

    def get_device(self, device_uuid : str) -> DeviceComposer:
        if device_uuid not in self.devices:
            self.devices[device_uuid] = DeviceComposer(device_uuid)
        return self.devices[device_uuid]

    def get_config_rules(self, network_instance_name : str, delete : bool = False) -> Dict[str, List[Dict]]:
        return {
            device_uuid : device.get_config_rules(network_instance_name, delete=delete)
            for device_uuid, device in self.devices.items()
        }
+52 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These hardcoded values will be updated with proper logic in second phase of the PoC

VPN_VLAN_TAGS_TO_SERVICE_NAME = {
    (21, 101): ('osu_tunnel_1', 'etht_service_1'),
    (31, 201): ('osu_tunnel_2', 'etht_service_2'),
}

OSU_TUNNEL_SETTINGS = {
    'osu_tunnel_1': {
        'odu_type': 'osuflex',
        'osuflex_number': 40,
        'bidirectional': True,
        'delay': 20,
        'ttp_channel_names': {
            ('10.0.10.1', '200'): 'och:1-odu2:1-oduflex:1-osuflex:2',
            ('10.0.30.1', '200'): 'och:1-odu2:1-oduflex:3-osuflex:1',
        }
    },
    'osu_tunnel_2': {
        'odu_type': 'osuflex',
        'osuflex_number': 40,
        'bidirectional': True,
        'delay': 20,
        'ttp_channel_names': {
            ('10.0.10.1', '200'): 'och:1-odu2:1-oduflex:1-osuflex:2',
            ('10.0.30.1', '200'): 'och:1-odu2:1-oduflex:3-osuflex:1',
        }
    },
}

ETHT_SERVICE_SETTINGS = {
    'etht_service_1': {
        'service_type': 'op-mp2mp-svc',
    },
    'etht_service_2': {
        'service_type': 'op-mp2mp-svc',
    },
}
+212 −55
Original line number Diff line number Diff line
@@ -12,17 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json, logging
import json, logging, netaddr
from typing import Any, Dict, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.proto.context_pb2 import ConfigRule, DeviceId, Service
from common.proto.context_pb2 import ConfigRule, Device, DeviceId, EndPoint, Service
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.ConfigRule import json_config_rule_delete, json_config_rule_set
from common.tools.object_factory.Device import json_device_id
from common.type_checkers.Checkers import chk_type
from service.service.service_handler_api.Tools import get_device_endpoint_uuids, get_endpoint_matching
from service.service.service_handler_api._ServiceHandler import _ServiceHandler
from service.service.service_handler_api.SettingsHandler import SettingsHandler
from service.service.task_scheduler.TaskExecutor import TaskExecutor
from .ConfigRuleComposer import ConfigRuleComposer
from .Constants import ETHT_SERVICE_SETTINGS, OSU_TUNNEL_SETTINGS, VPN_VLAN_TAGS_TO_SERVICE_NAME

LOGGER = logging.getLogger(__name__)

@@ -35,79 +37,234 @@ class L3NMIetfActnServiceHandler(_ServiceHandler):
        self.__service = service
        self.__task_executor = task_executor
        self.__settings_handler = SettingsHandler(service.service_config, **settings)
        self.__composer = ConfigRuleComposer()
        self.__endpoint_map : Dict[Tuple[str, str], str] = dict()

    def _compose_config_rules(self, endpoints : List[Tuple[str, str, Optional[str]]]) -> None:
        for endpoint in endpoints:
    def _get_endpoint_details(
        self, endpoint : Tuple[str, str, Optional[str]]
    ) -> Tuple[Device, EndPoint, Dict]:
        device_uuid, endpoint_uuid = get_device_endpoint_uuids(endpoint)

        device_obj = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid)))
            device_settings = self.__settings_handler.get_device_settings(device_obj)
            _device = self.__composer.get_device(device_obj.name)
            _device.configure(device_obj, device_settings)

        endpoint_obj = get_endpoint_matching(device_obj, endpoint_uuid)
        endpoint_settings = self.__settings_handler.get_endpoint_settings(device_obj, endpoint_obj)
            _endpoint = _device.get_endpoint(endpoint_obj.name)
            _endpoint.configure(endpoint_obj, endpoint_settings)
        device_name = device_obj.name
        endpoint_name = endpoint_obj.name
        if endpoint_settings is None:
            MSG = 'Settings not found for Endpoint(device=[uuid={:s}, name={:s}], endpoint=[uuid={:s}, name={:s}])'
            raise Exception(MSG.format(device_uuid, device_name, endpoint_uuid, endpoint_name))
        endpoint_settings_dict : Dict = endpoint_settings.value
        return device_obj, endpoint_obj, endpoint_settings_dict

            self.__endpoint_map[(device_uuid, endpoint_uuid)] = device_obj.name
    def _get_service_names(
        self,
        src_endpoint_details : Tuple[Device, EndPoint, Dict],
        dst_endpoint_details : Tuple[Device, EndPoint, Dict]
    ) -> Tuple[str, str]:
        _, _, src_endpoint_settings_dict = src_endpoint_details
        src_vlan_tag = src_endpoint_settings_dict['vlan_tag']

    def _do_configurations(
        self, config_rules_per_device : Dict[str, List[Dict]], endpoints : List[Tuple[str, str, Optional[str]]],
        delete : bool = False
    ) -> List[Union[bool, Exception]]:
        # Configuration is done atomically on each device, all OK / all KO per device
        results_per_device = dict()
        for device_name,json_config_rules in config_rules_per_device.items():
            try:
                device_obj = self.__composer.get_device(device_name).objekt
                if len(json_config_rules) == 0: continue
                del device_obj.device_config.config_rules[:]
                for json_config_rule in json_config_rules:
                    device_obj.device_config.config_rules.append(ConfigRule(**json_config_rule))
                self.__task_executor.configure_device(device_obj)
                results_per_device[device_name] = True
            except Exception as e: # pylint: disable=broad-exception-caught
                verb = 'deconfigure' if delete else 'configure'
                MSG = 'Unable to {:s} Device({:s}) : ConfigRules({:s})'
                LOGGER.exception(MSG.format(verb, str(device_name), str(json_config_rules)))
                results_per_device[device_name] = e
        _, _, dst_endpoint_settings_dict = dst_endpoint_details
        dst_vlan_tag = dst_endpoint_settings_dict['vlan_tag']

        results = []
        for endpoint in endpoints:
            device_uuid, endpoint_uuid = get_device_endpoint_uuids(endpoint)
            device_name = self.__endpoint_map[(device_uuid, endpoint_uuid)]
            results.append(results_per_device[device_name])
        return results
        service_names = VPN_VLAN_TAGS_TO_SERVICE_NAME.get((src_vlan_tag, dst_vlan_tag))
        if service_names is None:
            MSG = 'Unable to find service names from VLAN tags(src={:s}, dst={:s})'
            raise Exception(MSG.format(str(src_vlan_tag), str(dst_vlan_tag)))
        return service_names

    def _compose_osu_tunnel(
        self, osu_tunnel_name : str,
        src_endpoint_details : Tuple[Device, EndPoint, Dict],
        dst_endpoint_details : Tuple[Device, EndPoint, Dict],
        is_delete : bool = False
    ) -> ConfigRule:
        osu_tunnel_resource_key = '/osu_tunnels/osu_tunnel[{:s}]'.format(osu_tunnel_name)
        osu_tunnel_resource_value = {'name' : osu_tunnel_name}
        if is_delete:
            osu_tunnel_config_rule = json_config_rule_delete(osu_tunnel_resource_key, osu_tunnel_resource_value)
        else:
            src_device_obj, src_endpoint_obj, _ = src_endpoint_details
            dst_device_obj, dst_endpoint_obj, _ = dst_endpoint_details

            osu_tunnel_settings = OSU_TUNNEL_SETTINGS[osu_tunnel_name]
            ttp_channel_names = osu_tunnel_settings['ttp_channel_names']
            src_ttp_channel_name = ttp_channel_names[(src_device_obj.name, src_endpoint_obj.name)]
            dst_ttp_channel_name = ttp_channel_names[(dst_device_obj.name, dst_endpoint_obj.name)]

            osu_tunnel_resource_value.update({
                'odu_type'            : osu_tunnel_settings['odu_type'],
                'osuflex_number'      : osu_tunnel_settings['osuflex_number'],
                'bidirectional'       : osu_tunnel_settings['bidirectional'],
                'delay'               : osu_tunnel_settings['delay'],
                'src_node_id'         : src_device_obj.name,
                'src_tp_id'           : src_endpoint_obj.name,
                'src_ttp_channel_name': src_ttp_channel_name,
                'dst_node_id'         : dst_device_obj.name,
                'dst_tp_id'           : dst_endpoint_obj.name,
                'dst_ttp_channel_name': dst_ttp_channel_name,
            })
            osu_tunnel_config_rule = json_config_rule_set(osu_tunnel_resource_key, osu_tunnel_resource_value)
        LOGGER.debug('osu_tunnel_config_rule = {:s}'.format(str(osu_tunnel_config_rule)))
        return ConfigRule(**osu_tunnel_config_rule)

    def _compose_static_routing(
        self, src_vlan_tag : int, dst_vlan_tag : int
    ) -> Tuple[List[Dict], List[Dict]]:
        static_routing = self.__settings_handler.get('/static_routing')
        if static_routing is None: raise Exception('static_routing not found')
        static_routing_dict : Dict = static_routing.value
        src_static_routes = list()
        dst_static_routes = list()
        for _, static_route in static_routing_dict.items():
            vlan_id     = static_route['vlan-id']
            ipn_cidr    = netaddr.IPNetwork(static_route['ip-network'])
            ipn_network = str(ipn_cidr.network)
            ipn_preflen = int(ipn_cidr.prefixlen)
            next_hop = static_route['next-hop']
            if vlan_id == src_vlan_tag:
                src_static_routes.append([ipn_network, ipn_preflen, next_hop])
            elif vlan_id == dst_vlan_tag:
                dst_static_routes.append([ipn_network, ipn_preflen, next_hop])
        return src_static_routes, dst_static_routes

    def _compose_etht_service(
        self, etht_service_name : str, osu_tunnel_name : str,
        src_endpoint_details : Tuple[Device, EndPoint, Dict],
        dst_endpoint_details : Tuple[Device, EndPoint, Dict],
        is_delete : bool = False
    ) -> ConfigRule:
        etht_service_resource_key = '/etht_services/etht_service[{:s}]'.format(etht_service_name)
        etht_service_resource_value = {'name' : etht_service_name}
        if is_delete:
            etht_service_config_rule = json_config_rule_delete(etht_service_resource_key, etht_service_resource_value)
        else:
            src_device_obj, src_endpoint_obj, src_endpoint_details = src_endpoint_details
            src_vlan_tag = src_endpoint_details['vlan_tag']
            dst_device_obj, dst_endpoint_obj, dst_endpoint_details = dst_endpoint_details
            dst_vlan_tag = dst_endpoint_details['vlan_tag']
            src_static_routes, dst_static_routes = self._compose_static_routing(src_vlan_tag, dst_vlan_tag)
            etht_service_resource_value.update({
                'osu_tunnel_name'  : osu_tunnel_name,
                'service_type'     : ETHT_SERVICE_SETTINGS[etht_service_name]['service_type'],
                'src_node_id'      : src_device_obj.name,
                'src_tp_id'        : src_endpoint_obj.name,
                'src_vlan_tag'     : src_vlan_tag,
                'src_static_routes': src_static_routes,
                'dst_node_id'      : dst_device_obj.name,
                'dst_tp_id'        : dst_endpoint_obj.name,
                'dst_vlan_tag'     : dst_vlan_tag,
                'dst_static_routes': dst_static_routes,
            })
            etht_service_config_rule = json_config_rule_set(etht_service_resource_key, etht_service_resource_value)
        LOGGER.debug('etht_service_config_rule = {:s}'.format(str(etht_service_config_rule)))
        return ConfigRule(**etht_service_config_rule)

    @metered_subclass_method(METRICS_POOL)
    def SetEndpoint(
        self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None
    ) -> List[Union[bool, Exception]]:
        LOGGER.debug('endpoints = {:s}'.format(str(endpoints)))
        chk_type('endpoints', endpoints, list)
        if len(endpoints) == 0: return []
        if len(endpoints) < 2:
            LOGGER.warning('nothing done: not enough endpoints')
            return []
        service_uuid = self.__service.service_id.service_uuid.uuid
        #settings = self.__settings_handler.get('/settings')
        self._compose_config_rules(endpoints)
        network_instance_name = service_uuid.split('-')[0]
        config_rules_per_device = self.__composer.get_config_rules(network_instance_name, delete=False)
        results = self._do_configurations(config_rules_per_device, endpoints)
        LOGGER.debug('service_uuid = {:s}'.format(str(service_uuid)))
        LOGGER.debug('self.__settings_handler = {:s}'.format(str(self.__settings_handler.dump_config_rules())))

        results = []
        try:
            src_endpoint_details = self._get_endpoint_details(endpoints[0])
            src_device_obj, _, _ = src_endpoint_details
            src_controller = self.__task_executor.get_device_controller(src_device_obj)
            if src_controller is None: src_controller = src_device_obj

            dst_endpoint_details = self._get_endpoint_details(endpoints[-1])
            dst_device_obj, _, _ = dst_endpoint_details
            dst_controller = self.__task_executor.get_device_controller(dst_device_obj)
            if dst_controller is None: dst_controller = dst_device_obj

            if src_controller.device_id.device_uuid.uuid != dst_controller.device_id.device_uuid.uuid:
                raise Exception('Different Src-Dst devices not supported by now')
            controller = src_controller

            osu_tunnel_name, etht_service_name = self._get_service_names(
                src_endpoint_details, dst_endpoint_details
            )

            osu_tunnel_config_rule = self._compose_osu_tunnel(
                osu_tunnel_name, src_endpoint_details, dst_endpoint_details,
                is_delete=False
            )

            etht_service_config_rule = self._compose_etht_service(
                etht_service_name, osu_tunnel_name, src_endpoint_details,
                dst_endpoint_details, is_delete=False
            )

            del controller.device_config.config_rules[:]
            controller.device_config.config_rules.append(osu_tunnel_config_rule)
            controller.device_config.config_rules.append(etht_service_config_rule)
            self.__task_executor.configure_device(controller)
            results.append(True)
        except Exception as e: # pylint: disable=broad-except
            LOGGER.exception('Unable to SetEndpoint for Service({:s})'.format(str(service_uuid)))
            results.append(e)

        LOGGER.debug('results = {:s}'.format(str(results)))
        return results

    @metered_subclass_method(METRICS_POOL)
    def DeleteEndpoint(
        self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None
    ) -> List[Union[bool, Exception]]:
        LOGGER.debug('endpoints = {:s}'.format(str(endpoints)))
        chk_type('endpoints', endpoints, list)
        if len(endpoints) == 0: return []
        if len(endpoints) < 2:
            LOGGER.warning('nothing done: not enough endpoints')
            return []
        service_uuid = self.__service.service_id.service_uuid.uuid
        #settings = self.__settings_handler.get('/settings')
        self._compose_config_rules(endpoints)
        network_instance_name = service_uuid.split('-')[0]
        config_rules_per_device = self.__composer.get_config_rules(network_instance_name, delete=True)
        results = self._do_configurations(config_rules_per_device, endpoints, delete=True)
        LOGGER.debug('service_uuid = {:s}'.format(str(service_uuid)))
        LOGGER.debug('self.__settings_handler = {:s}'.format(str(self.__settings_handler.dump_config_rules())))

        results = []
        try:
            src_endpoint_details = self._get_endpoint_details(endpoints[0])
            src_device_obj, _, _ = src_endpoint_details
            src_controller = self.__task_executor.get_device_controller(src_device_obj)
            if src_controller is None: src_controller = src_device_obj

            dst_endpoint_details = self._get_endpoint_details(endpoints[-1])
            dst_device_obj, _, _ = dst_endpoint_details
            dst_controller = self.__task_executor.get_device_controller(dst_device_obj)
            if dst_controller is None: dst_controller = dst_device_obj

            if src_controller.device_id.device_uuid.uuid != dst_controller.device_id.device_uuid.uuid:
                raise Exception('Different Src-Dst devices not supported by now')
            controller = src_controller

            osu_tunnel_name, etht_service_name = self._get_service_names(
                src_endpoint_details, dst_endpoint_details
            )

            osu_tunnel_config_rule = self._compose_osu_tunnel(
                osu_tunnel_name, src_endpoint_details, dst_endpoint_details,
                is_delete=True
            )

            etht_service_config_rule = self._compose_etht_service(
                etht_service_name, osu_tunnel_name, src_endpoint_details,
                dst_endpoint_details, is_delete=True
            )

            del controller.device_config.config_rules[:]
            controller.device_config.config_rules.append(osu_tunnel_config_rule)
            controller.device_config.config_rules.append(etht_service_config_rule)
            self.__task_executor.configure_device(controller)
            results.append(True)
        except Exception as e: # pylint: disable=broad-except
            LOGGER.exception('Unable to DeleteEndpoint for Service({:s})'.format(str(service_uuid)))
            results.append(e)

        LOGGER.debug('results = {:s}'.format(str(results)))
        return results

    @metered_subclass_method(METRICS_POOL)