Commit c9d9cfd9 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Service component - L3NM Ryu Service Handler:

- Fixed entire service handler
- Minor fixes in Service Handler API
parent 2defafe2
Loading
Loading
Loading
Loading
+1 −2
Original line number Diff line number Diff line
@@ -17,7 +17,6 @@ from typing import Any, List, Optional, Tuple, Union
from common.method_wrappers.ServiceExceptions import NotFoundException
from common.proto.context_pb2 import Device, EndPoint
from common.type_checkers.Checkers import chk_length, chk_type
from common.tools.grpc.Tools import grpc_message_to_json

ACTION_MSG_SET_ENDPOINT      = 'Set EndPoint(device_uuid={:s}, endpoint_uuid={:s}, topology_uuid={:s})'
ACTION_MSG_DELETE_ENDPOINT   = 'Delete EndPoint(device_uuid={:s}, endpoint_uuid={:s}, topology_uuid={:s})'
@@ -57,7 +56,7 @@ def get_endpoint_matching(device : Device, endpoint_uuid_or_name : str) -> EndPo
def get_device_endpoint_uuids(endpoint : Tuple[str, str, Optional[str]]) -> Tuple[str, str]:
    chk_type('endpoint', endpoint, (tuple, list))
    chk_length('endpoint', endpoint, min_length=2, max_length=3)
    device_uuid, endpoint_uuid = endpoint[0:2] # ignore topology_uuid by now
    device_uuid, endpoint_uuid = endpoint[0:2] # ignore topology_uuid for now
    return device_uuid, endpoint_uuid

def extract_endpoint_index(endpoint_name : str, default_index=0) -> Tuple[str, int]:
+121 −101
Original line number Diff line number Diff line
@@ -16,88 +16,114 @@ import json, logging, re
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.proto.context_pb2 import ConfigRule, Device, DeviceId, EndPoint, Service
from common.proto.context_pb2 import ConfigRule, Device, DeviceId, EndPoint, EndPointId, Service
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.ConfigRule import json_config_rule_delete, json_config_rule_set
from common.tools.object_factory.Device import json_device_id
from common.type_checkers.Checkers import chk_type
from service.service.service_handler_api._ServiceHandler import _ServiceHandler
from service.service.service_handler_api.AnyTreeTools import TreeNode
from service.service.service_handler_api.SettingsHandler import SettingsHandler
from service.service.service_handler_api.Tools import get_device_endpoint_uuids, get_endpoint_matching
from service.service.task_scheduler.TaskExecutor import TaskExecutor


LOGGER = logging.getLogger(__name__)

METRICS_POOL = MetricsPool('Service', 'Handler', labels={'handler': 'l3nm_ryu'})

RE_DEV_EP_SETTINGS = re.compile(r'/device\[(.*?)\]/endpoint\[(.*?)\]/settings')

@dataclass
class EndpointData:
    device_uuid   : str
    device_name   : str
    endpoint_uuid : str
    endpoint_name : str
    ipv4_address  : Optional[str]

    @classmethod
    def create(
        cls, device_obj : Device, endpoint_obj : EndPoint, endpoint_settings : Optional[TreeNode]
    ) -> 'EndpointData':
def get_ip_from_endpoint_settings(
    device_obj : Device, endpoint_obj : EndPoint, settings_handler : SettingsHandler
) -> Optional[str]:
    device_uuid    = device_obj.device_id.device_uuid.uuid
    ep_device_uuid = endpoint_obj.endpoint_id.device_id.device_uuid.uuid
    if device_uuid != ep_device_uuid:
        MSG = 'Malformed endpoint: device_uuid({:s}) != endpoint.device_uuid({:s})'
        raise Exception(MSG.format(str(device_uuid), str(ep_device_uuid)))

        ipv4_address = None
        if endpoint_settings is not None:
    endpoint_settings = settings_handler.get_endpoint_settings(device_obj, endpoint_obj)
    if endpoint_settings is None: return None

    json_settings : Dict = endpoint_settings.value
            if 'address_ip' in json_settings:
                ipv4_address = json_settings['address_ip']
            elif 'ip_address' in json_settings:
                ipv4_address = json_settings['ip_address']
            else:
    if 'address_ip' in json_settings: return json_settings['address_ip']
    if 'ip_address' in json_settings: return json_settings['ip_address']

    MSG = 'IP Address not found. Tried: address_ip and ip_address. endpoint_obj={:s} settings={:s}'
    LOGGER.warning(MSG.format(str(endpoint_obj), str(json_settings)))

        return cls(
            device_uuid   = device_uuid,
            device_name   = device_obj.name,
            endpoint_uuid = endpoint_obj.endpoint_id.endpoint_uuid.uuid,
            endpoint_name = endpoint_obj.name,
            ipv4_address  = ipv4_address,
        )
    return None


def get_flow_rule_specs(
    path_endpoints_data : List[EndpointData], is_reverse : bool
) -> Tuple[str, str, str, str, str]:
    src_index = -1 if is_reverse else  0
    dst_index =  0 if is_reverse else -1
@dataclass
class ServiceData:
    src_device_uuid   : str
    src_device_name   : str
    src_endpoint_uuid : str
    src_endpoint_name : str
    src_ipv4_address  : str

    dst_device_uuid   : str
    dst_device_name   : str
    dst_endpoint_uuid : str
    dst_endpoint_name : str
    dst_ipv4_address  : str

    src_endpoint_data = path_endpoints_data[src_index]
    src_device_name = src_endpoint_data.device_name
    src_endpoint_name = src_endpoint_data.endpoint_name
    @classmethod
    def create(
        cls, src_device_obj : Device, src_endpoint_obj : EndPoint,
        dst_device_obj : Device, dst_endpoint_obj : EndPoint,
        settings_handler : SettingsHandler
    ) -> 'ServiceData':
        src_device_uuid    = src_device_obj.device_id.device_uuid.uuid
        src_ep_device_uuid = src_endpoint_obj.endpoint_id.device_id.device_uuid.uuid
        if src_device_uuid != src_ep_device_uuid:
            MSG = 'Malformed endpoint: src_device_uuid({:s}) != src_endpoint.device_uuid({:s})'
            raise Exception(MSG.format(str(src_device_uuid), str(src_ep_device_uuid)))

        dst_device_uuid    = dst_device_obj.device_id.device_uuid.uuid
        dst_ep_device_uuid = dst_endpoint_obj.endpoint_id.device_id.device_uuid.uuid
        if dst_device_uuid != dst_ep_device_uuid:
            MSG = 'Malformed endpoint: dst_device_uuid({:s}) != dst_endpoint.device_uuid({:s})'
            raise Exception(MSG.format(str(dst_device_uuid), str(dst_ep_device_uuid)))

        src_ipv4_address = get_ip_from_endpoint_settings(src_device_obj, src_endpoint_obj, settings_handler)
        dst_ipv4_address = get_ip_from_endpoint_settings(dst_device_obj, dst_endpoint_obj, settings_handler)

    dst_endpoint_data = path_endpoints_data[dst_index]
    dst_device_name = dst_endpoint_data.device_name
    dst_endpoint_name = dst_endpoint_data.endpoint_name
        return cls(
            src_device_uuid   = src_device_obj.device_id.device_uuid.uuid,
            src_device_name   = src_device_obj.name,
            src_endpoint_uuid = src_endpoint_obj.endpoint_id.endpoint_uuid.uuid,
            src_endpoint_name = src_endpoint_obj.name,
            src_ipv4_address  = src_ipv4_address,
            dst_device_uuid   = dst_device_obj.device_id.device_uuid.uuid,
            dst_device_name   = dst_device_obj.name,
            dst_endpoint_uuid = dst_endpoint_obj.endpoint_id.endpoint_uuid.uuid,
            dst_endpoint_name = dst_endpoint_obj.name,
            dst_ipv4_address  = dst_ipv4_address,
        )

    flow_rule_name = '{:s}-{:s}'.format(src_device_name, dst_device_name)
    src_ip_addr = src_endpoint_data.ipv4_address
    dst_ip_addr = dst_endpoint_data.ipv4_address
    def get_flow_rule_name(self, reverse : bool = False) -> str:
        device_names = [self.src_device_name, self.dst_device_name]
        if reverse: device_names = list(reversed(device_names))
        return '-'.join(device_names)

    return flow_rule_name, src_endpoint_name, dst_endpoint_name, src_ip_addr, dst_ip_addr
    def get_ip_addresses(self, reverse : bool = False) -> Tuple[str, str]:
        ipv4_addresses = [self.src_ipv4_address, self.dst_ipv4_address]
        if reverse: ipv4_addresses = list(reversed(ipv4_addresses))
        return '-'.join(ipv4_addresses)


def compose_flow_rule(
    device_name : str, dpid : str, path_endpoints_data : List[EndpointData],
    is_reverse : bool, is_delete : bool
    device_name : str, dpid : str, in_port : str, out_port : str,
    service_data : ServiceData, reverse : bool, is_delete : bool
) -> ConfigRule:
    flow_specs = get_flow_rule_specs(path_endpoints_data, is_reverse)
    flow_rule_name, in_port, out_port, src_ip_addr, dst_ip_addr = flow_specs
    ports = [in_port, out_port]
    if reverse: ports = list(reversed(ports))
    in_port, out_port = ports

    flow_rule_name = service_data.get_flow_rule_name(reverse=reverse)
    ipv4_addresses = service_data.get_ip_addresses(reverse=reverse)

    RSRC_KEY_TMPL = '/device[{:s}]/flow[{:s}]'
    resource_key = RSRC_KEY_TMPL.format(device_name, flow_rule_name)
@@ -105,8 +131,8 @@ def compose_flow_rule(
        'dpid'        : dpid,
        'in-port'     : in_port,
        'out-port'    : out_port,
        'src-ip-addr' : src_ip_addr,
        'dst-ip-addr' : dst_ip_addr,
        'src-ip-addr' : ipv4_addresses[ 0],
        'dst-ip-addr' : ipv4_addresses[-1],
    }
    json_config_rule_method = json_config_rule_delete if is_delete else json_config_rule_set
    flow_rule = json_config_rule_method(resource_key, resource_value)
@@ -121,44 +147,38 @@ class L3NMRyuServiceHandler(_ServiceHandler):
        self.__task_executor = task_executor
        self.__settings_handler = SettingsHandler(service.service_config, **settings)

    def _compose_path_endpoint_data(
        self, endpoints : List[Tuple[str, str, Optional[str]]]
    ) -> List[EndpointData]:
        path_endpoints_data : List[EndpointData] = list()
        for endpoint in endpoints:
            device_uuid, endpoint_uuid = get_device_endpoint_uuids(endpoint)
    def _get_device_endpoint_obj_from_id(
        self, endpoint_id : EndPointId
    ) -> Tuple[Device, EndPoint]:
        device_uuid   = endpoint_id.device_id.device_uuid.uuid
        endpoint_uuid = endpoint_id.endpoint_uuid.uuid
        device_obj    = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid)))
        endpoint_obj  = get_endpoint_matching(device_obj, endpoint_uuid)
            endpoint_settings = self.__settings_handler.get_endpoint_settings(device_obj, endpoint_obj)
            endpoint_ids = EndpointData.create(device_obj, endpoint_obj, endpoint_settings)
            path_endpoints_data.append(endpoint_ids)

        LOGGER.debug('path_endpoints_data = {:s}'.format(str(path_endpoints_data)))
        return path_endpoints_data
        return device_obj, endpoint_obj

    def _get_endpoint_details(
        self, endpoint : Tuple[str, str, Optional[str]]
    def _get_device_endpoint_obj_from_key(
        self, endpoint_key : Tuple[str, str, Optional[str]]
    ) -> Tuple[Device, EndPoint]:
        device_uuid, endpoint_uuid = get_device_endpoint_uuids(endpoint)
        device_uuid, endpoint_uuid = get_device_endpoint_uuids(endpoint_key)
        device_obj   = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid)))
        endpoint_obj = get_endpoint_matching(device_obj, endpoint_uuid)
        return device_obj, endpoint_obj

    def _configure_hop_flow_rules(
        self, src_ep_id : Tuple[str, str, Optional[str]], dst_ep_id : Tuple[str, str, Optional[str]],
        path_endpoints_data : List[EndpointData], is_delete : bool = False
        self, src_ep_key : Tuple[str, str, Optional[str]], dst_ep_key : Tuple[str, str, Optional[str]],
        service_data : ServiceData, is_delete : bool = False
    ) -> None:
        src_dev, src_ep = self._get_endpoint_details(src_ep_id)
        dst_dev, dst_ep = self._get_endpoint_details(dst_ep_id)
        src_dev, src_ep = self._get_device_endpoint_obj_from_key(src_ep_key)
        dst_dev, dst_ep = self._get_device_endpoint_obj_from_key(dst_ep_key)
        if src_dev.device_id.device_uuid.uuid != dst_dev.device_id.device_uuid.uuid:
            MSG = 'Mismatching device for endpoints: {:s}-{:s}'
            raise Exception(MSG.format(str(src_ep_id), str(dst_ep_id)))
            raise Exception(MSG.format(str(src_ep_key), str(dst_ep_key)))

        src_ctrl = self.__task_executor.get_device_controller(src_dev)
        dst_ctrl = self.__task_executor.get_device_controller(dst_dev)
        if src_ctrl != dst_ctrl:
            MSG = 'Mismatching controller for endpoints: {:s}-{:s}'
            raise Exception(MSG.format(str(src_ep_id), str(dst_ep_id)))
            raise Exception(MSG.format(str(src_ep_key), str(dst_ep_key)))

        device_name = src_ep.name.split('-')[0]
        device_dpid = src_dev.name
@@ -170,14 +190,16 @@ class L3NMRyuServiceHandler(_ServiceHandler):
        del ctrl.device_endpoints[:]

        forward_flow_rule = compose_flow_rule(
            device_name, device_dpid, path_endpoints_data, is_reverse=False, is_delete=is_delete
            device_name, device_dpid, src_ep.name, dst_ep.name, service_data,
            reverse=False, is_delete=is_delete
        )
        MSG = 'Forward Config Rule: {:s}'
        LOGGER.debug(MSG.format(grpc_message_to_json_string(forward_flow_rule)))
        ctrl.device_config.config_rules.append(forward_flow_rule)

        reverse_flow_rule = compose_flow_rule(
            device_name, device_dpid, path_endpoints_data, is_reverse=True, is_delete=is_delete
            device_name, device_dpid, src_ep.name, dst_ep.name, service_data,
            reverse=True, is_delete=is_delete
        )
        MSG = 'Reverse Config Rule: {:s}'
        LOGGER.debug(MSG.format(grpc_message_to_json_string(reverse_flow_rule)))
@@ -196,19 +218,18 @@ class L3NMRyuServiceHandler(_ServiceHandler):
            LOGGER.warning('nothing done: not enough endpoints')
            return []

        path_endpoints_data = self._compose_path_endpoint_data(endpoints)
        if len(path_endpoints_data) < 2:
            MSG = 'Wrong number of end devices: {:s}'
            LOGGER.warning(MSG.format(str(path_endpoints_data)))
            return []
        service_endpoint_ids = list(self.__service.service_endpoint_ids)
        src_device_obj, src_endpoint_obj = self._get_device_endpoint_obj_from_id(service_endpoint_ids[ 0])
        dst_device_obj, dst_endpoint_obj = self._get_device_endpoint_obj_from_id(service_endpoint_ids[-1])
        service_data = ServiceData.create(
            src_device_obj, src_endpoint_obj, dst_device_obj, dst_endpoint_obj, self.__settings_handler
        )

        results = []
        try:
            it_endpoints = iter(endpoints)
            for src_ep_id, dst_ep_id in zip(it_endpoints, it_endpoints):
                self._configure_hop_flow_rules(
                    src_ep_id, dst_ep_id, path_endpoints_data, is_delete=False
                )
            for src_ep_key, dst_ep_key in zip(it_endpoints, it_endpoints):
                self._configure_hop_flow_rules(src_ep_key, dst_ep_key, service_data, is_delete=False)
                results.append(True)
            return results
        except Exception as e:
@@ -226,19 +247,18 @@ class L3NMRyuServiceHandler(_ServiceHandler):
            LOGGER.warning('nothing done: not enough endpoints')
            return []

        path_endpoints_data = self._compose_path_endpoint_data(endpoints)
        if len(path_endpoints_data) < 2:
            MSG = 'Wrong number of end devices: {:s}'
            LOGGER.warning(MSG.format(str(path_endpoints_data)))
            return []
        service_endpoint_ids = list(self.__service.service_endpoint_ids)
        src_device_obj, src_endpoint_obj = self._get_device_endpoint_obj_from_id(service_endpoint_ids[ 0])
        dst_device_obj, dst_endpoint_obj = self._get_device_endpoint_obj_from_id(service_endpoint_ids[-1])
        service_data = ServiceData.create(
            src_device_obj, src_endpoint_obj, dst_device_obj, dst_endpoint_obj, self.__settings_handler
        )

        results = []
        try:
            it_endpoints = iter(endpoints)
            for src_ep_id, dst_ep_id in zip(it_endpoints, it_endpoints):
                self._configure_hop_flow_rules(
                    src_ep_id, dst_ep_id, path_endpoints_data, is_delete=True
                )
            for src_ep_key, dst_ep_key in zip(it_endpoints, it_endpoints):
                self._configure_hop_flow_rules(src_ep_key, dst_ep_key, service_data, is_delete=True)
                results.append(True)
            return results
        except Exception as e: