Loading src/service/service/service_handlers/l3nm_ryu/L3NMRyuServiceHandler.py +176 −202 Original line number Diff line number Diff line Loading @@ -12,22 +12,106 @@ # See the License for the specific language governing permissions and # limitations under the License. import json, logging, re from typing import List, Optional, Tuple, Union import logging, re from dataclasses import dataclass from typing import Dict, List, Optional, Tuple, Union from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.proto.context_pb2 import ConfigRule, Device, DeviceId, EndPoint, Service from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.ConfigRule import json_config_rule_delete, json_config_rule_set from common.tools.object_factory.Device import json_device_id from common.type_checkers.Checkers import chk_type from service.service.service_handler_api.SettingsHandler import SettingsHandler from service.service.service_handler_api.Tools import get_device_endpoint_uuids, get_endpoint_matching from service.service.service_handler_api._ServiceHandler import _ServiceHandler from service.service.service_handler_api.SettingsHandler import SettingsHandler from service.service.task_scheduler.TaskExecutor import TaskExecutor LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Service', 'Handler', labels={'handler': 'l3nm_ryu'}) RE_DEV_EP_SETTINGS = re.compile(r'/device\[(.*?)\]/endpoint\[(.*?)\]/settings') @dataclass class EndpointData: device_uuid : str device_name : str endpoint_uuid : str endpoint_name : str ipv4_address : Optional[str] @classmethod def create( cls, device_obj : Device, endpoint_obj : Endpoint, endpoint_settings : Optional[TreeNode] ) -> 'EndpointIds': device_uuid = device_obj.device_id.device_uuid.uuid ep_device_uuid = endpoint_obj.endpoint_id.device_id.device_uuid.uuid if device_uuid != ep_device_uuid: MSG = 'Malformed endpoint: device_uuid({:s}) != endpoint.device_uuid({:s})' raise Exception(MSG.format(str(device_uuid), str(ep_device_uuid))) ipv4_address = None if endpoint_settings is not None: json_settings : Dict = endpoint_settings.value if 'address_ip' in json_settings: ipv4_address = json_settings['address_ip'] elif 'ip_address' in json_settings: ipv4_address = json_settings['ip_address'] else: MSG = 'IP Address not found. Tried: address_ip and ip_address. endpoint_obj={:s} settings={:s}' LOGGER.warning(MSG.format(str(endpoint_obj), str(json_settings))) return cls( device_uuid = device_uuid, device_name = device_obj.name, endpoint_uuid = endpoint_obj.endpoint_id.endpoint_uuid.uuid, endpoint_name = endpoint_obj.name, ipv4_address = ipv4_address, ) def get_flow_rule_specs( path_endpoints_data : List[EndpointData], is_reverse : bool ) -> Tuple[str, str, str, str, str]: src_index = -1 if is_reverse else 0 dst_index = 0 if is_reverse else -1 src_endpoint_data = path_endpoints_data[src_index] src_device_name = src_endpoint_data.device_name src_endpoint_name = src_endpoint_data.endpoint_name dst_endpoint_data = path_endpoints_data[dst_index] dst_device_name = dst_endpoint_data.device_name dst_endpoint_name = dst_endpoint_data.endpoint_name flow_rule_name = '{:s}-{:s}'.format(src_device_name, dst_device_name) src_ip_addr = src_endpoint_data.ipv4_address dst_ip_addr = dst_endpoint_data.ipv4_address return flow_rule_name, src_endpoint_name, dst_endpoint_name, src_ip_addr, dst_ip_addr def compose_flow_rule( device_name : str, dpid : str, path_endpoints_data : List[EndpointData], is_reverse : bool, is_delete : bool ) -> ConfigRule: flow_specs = get_flow_rule_specs(path_endpoints_data, is_reverse) flow_rule_name, in_port, out_port, src_ip_addr, dst_ip_addr = flow_specs RSRC_KEY_TMPL = '/device[{:s}]/flow[{:d}]' resource_key = RSRC_KEY_TMPL.format(device_name, flow_rule_name) resource_value = { 'dpid' : dpid, 'in-port' : in_port, 'out-port' : out_port, 'src-ip-addr' : src_ip_addr, 'dst-ip-addr' : dst_ip_addr, } json_config_rule_method = json_config_rule_delete if is_delete else json_config_rule_set flow_rule = json_config_rule_method(resource_key, resource_value) return ConfigRule(**flow_rule) class L3NMRyuServiceHandler(_ServiceHandler): def __init__( # pylint: disable=super-init-not-called self, service : Service, task_executor : TaskExecutor, **settings Loading @@ -36,125 +120,96 @@ class L3NMRyuServiceHandler(_ServiceHandler): self.__task_executor = task_executor self.__settings_handler = SettingsHandler(service.service_config, **settings) def _compose_path_endpoint_data( self, endpoints : List[Tuple[str, str, Optional[str]]] ) -> List[EndpointData]: path_endpoints_data : List[EndpointData] = list() for endpoint in endpoints: device_uuid, endpoint_uuid = get_device_endpoint_uuids(endpoint) device_obj = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid))) endpoint_obj = get_endpoint_matching(device_obj, endpoint_uuid) endpoint_settings = self.__settings_handler.get_endpoint_settings(device_obj, endpoint_obj) endpoint_ids = EndpointData.create(device_obj, endpoint_obj, endpoint_settings) path_endpoints_data.append(endpoint_ids) LOGGER.debug('path_endpoints_data = {:s}'.format(str(path_endpoints_data))) return path_endpoints_data def _get_endpoint_details( self, endpoint : Tuple[str, str, Optional[str]] ) -> Tuple[Device, EndPoint]: #Dict]: ) -> Tuple[Device, EndPoint]: device_uuid, endpoint_uuid = get_device_endpoint_uuids(endpoint) device_obj = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid))) endpoint_obj = get_endpoint_matching(device_obj, endpoint_uuid) device_name = device_obj.name endpoint_name = endpoint_obj.name return device_obj, endpoint_obj def _configure_hop_flow_rules( self, src_ep_id : Tuple[str, str, Optional[str]], dst_ep_id : Tuple[str, str, Optional[str]], path_endpoints_data : List[EndpointData], is_delete : bool = False ) -> None: src_dev, src_ep = self._get_endpoint_details(src_ep_id) dst_dev, dst_ep = self._get_endpoint_details(dst_ep_id) if src_dev.device_id.device_uuid.uuid != dst_dev.device_id.device_uuid.uuid: MSG = 'Mismatching device for endpoints: {:s}-{:s}' raise Exception(MSG.format(str(src_ep_id), str(dst_ep_id))) src_ctrl = self.__task_executor.get_device_controller(src_dev) dst_ctrl = self.__task_executor.get_device_controller(dst_dev) if src_ctrl != dst_ctrl: MSG = 'Mismatching controller for endpoints: {:s}-{:s}' raise Exception(MSG.format(str(src_ep_id), str(dst_ep_id))) device_name = src_ep.name.split('-')[0] device_dpid = src_dev.name ctrl = src_ctrl LOGGER.debug('Ctrl: {:s}'.format(str(ctrl.name))) del ctrl.device_config.config_rules[:] del ctrl.device_endpoints[:] forward_flow_rule = compose_flow_rule( device_name, device_dpid, path_endpoints_data, is_reverse=False, is_delete=is_delete ) MSG = 'Forward Config Rule: {:s}' LOGGER.debug(MSG.format(grpc_message_to_json_string(forward_flow_rule))) ctrl.device_config.config_rules.append(forward_flow_rule) reverse_flow_rule = compose_flow_rule( device_name, device_dpid, path_endpoints_data, is_reverse=True, is_delete=is_delete ) MSG = 'Reverse Config Rule: {:s}' LOGGER.debug(MSG.format(grpc_message_to_json_string(reverse_flow_rule))) ctrl.device_config.config_rules.append(reverse_flow_rule) self.__task_executor.configure_device(ctrl) @metered_subclass_method(METRICS_POOL) def SetEndpoint( self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None ) -> List[Union[bool, Exception]]: LOGGER.debug('endpoints = {:s}'.format(str(endpoints))) LOGGER.debug('[SetEndpoint] endpoints = {:s}'.format(str(endpoints))) chk_type('endpoints', endpoints, list) if len(endpoints) < 2: LOGGER.warning('nothing done: not enough endpoints') return [] service_uuid = self.__service.service_id.service_uuid.uuid service_name= self.__service.name service_configuration_rules=self.__service.service_config.config_rules LOGGER.debug('service_configuration_rules = {:s}'.format(str(service_configuration_rules))) ip_addresses = [] flow_rules = [] for rule in service_configuration_rules: try: custom_field = rule.custom resource_value_str = custom_field.resource_value resource_value = json.loads(resource_value_str) resource_key_str = custom_field.resource_key LOGGER.debug(f"resource_key_str = {resource_key_str}") match = re.search(r"/device\[(.*?)\]/", resource_key_str) if match: device_name = match.group(1) flow_rules.append(device_name) ip_address = resource_value.get("ip_address") ip_addresses.append(ip_address) except Exception as e: LOGGER.exception("Error in Rules") LOGGER.debug('ip_address = {:s}'.format(str(ip_addresses))) LOGGER.debug('flow_rules = {:s}'.format(str(flow_rules))) if len(flow_rules) < 2: LOGGER.warning('Not enough devices to construct flow rules') return [] if len(ip_addresses) < 2: LOGGER.warning('Not enough IP addresses found') path_endpoints_data = self._compose_path_endpoint_data(endpoints) if len(path_endpoints_data) < 2: MSG = 'Wrong number of end devices: {:s}' LOGGER.warning(MSG.format(str(path_endpoints_data))) return [] results = [] try: src_device, src_endpoint, = self._get_endpoint_details(endpoints[0]) dst_device, dst_endpoint, = self._get_endpoint_details(endpoints[-1]) src_controller = self.__task_executor.get_device_controller(src_device) del src_controller.device_config.config_rules[:] for index in range(len(endpoints) - 1): current_device, current_endpoint = self._get_endpoint_details(endpoints[index]) next_device, next_endpoint = self._get_endpoint_details(endpoints[index + 1]) if current_device.name == next_device.name: in_port_forward = current_endpoint.name out_port_forward = next_endpoint.name dpid_src = int(current_device.name) LOGGER.debug(f"DPID source: {dpid_src}") dpid_dst = int(next_device.name) LOGGER.debug(f"DPID destination: {dpid_dst}") flow_rule_forward = f"{flow_rules[0]}-{flow_rules[1]}" flow_rule_reverse = f"{flow_rules[1]}-{flow_rules[0]}" ip_address_source = ip_addresses[0] ip_address_destination = ip_addresses[1] forward_resource_value = ({"dpid": current_device.name, "in-port": in_port_forward, "out-port": out_port_forward, "ip_address_source": ip_address_source, "ip_address_destination": ip_address_destination, }) forward_rule = json_config_rule_set ( resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_forward}]", resource_value=forward_resource_value it_endpoints = iter(endpoints) for src_ep_id, dst_ep_id in zip(it_endpoints, it_endpoints): self._configure_hop_flow_rules( src_ep_id, dst_ep_id, path_endpoints_data, is_delete=False ) LOGGER.debug(f"Forward configuration rule: {forward_rule}") src_controller.device_config.config_rules.append(ConfigRule(**forward_rule)) in_port_reverse = next_endpoint.name out_port_reverse = current_endpoint.name reverse_resource_value = { "dpid": current_device.name, "in-port": in_port_reverse, "out-port": out_port_reverse, "ip_address_source": ip_address_destination, "ip_address_destination": ip_address_source, } reverse_rule = json_config_rule_set( resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_reverse}]", resource_value=reverse_resource_value ) LOGGER.debug(f"Reverse configuration rule: {reverse_rule}") src_controller.device_config.config_rules.append(ConfigRule(**reverse_rule)) self.__task_executor.configure_device(src_controller) results.append(True) def get_config_rules(controller): try: config_rules = controller.device_config.config_rules for rule in config_rules: if rule.HasField("custom"): resource_key = rule.custom.resource_key resource_value = rule.custom.resource_value LOGGER.debug(f"Resource key in config: {resource_key}, Resource value in config: {resource_value}") except Exception as e: LOGGER.exception("Error in Configuration Rules") get_config_rules(src_controller) LOGGER.debug(f"Configuration rules: {src_controller.device_config.config_rules}") return results except Exception as e: LOGGER.exception("Error in SetEndpoint") return [e] Loading @@ -163,109 +218,28 @@ class L3NMRyuServiceHandler(_ServiceHandler): def DeleteEndpoint( self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None ) -> List[Union[bool, Exception]]: LOGGER.debug('endpoints_delete = {:s}'.format(str(endpoints))) LOGGER.debug('[DeleteEndpoint] endpoints = {:s}'.format(str(endpoints))) chk_type('endpoints', endpoints, list) if len(endpoints) < 2: LOGGER.warning('nothing done: not enough endpoints') return [] service_uuid = self.__service.service_id.service_uuid.uuid service_name= self.__service.name service_configuration_rules=self.__service.service_config.config_rules LOGGER.debug('service_configuration_rules = {:s}'.format(str(service_configuration_rules))) ip_addresses = [] flow_rules = [] for rule in service_configuration_rules: try: custom_field = rule.custom resource_value_str = custom_field.resource_value resource_value = json.loads(resource_value_str) resource_key_str = custom_field.resource_key LOGGER.debug(f"resource_key_str = {resource_key_str}") match = re.search(r"/device\[(.*?)\]/", resource_key_str) if match: device_name = match.group(1) else: device_name = None flow_rules.append(device_name) ip_address = resource_value.get("ip_address") ip_addresses.append(ip_address) except Exception as e: LOGGER.exception("Error in Rules") LOGGER.debug('ip_address = {:s}'.format(str(ip_addresses))) LOGGER.debug('flow_rules = {:s}'.format(str(flow_rules))) path_endpoints_data = self._compose_path_endpoint_data(endpoints) if len(path_endpoints_data) < 2: MSG = 'Wrong number of end devices: {:s}' LOGGER.warning(MSG.format(str(path_endpoints_data))) return [] results = [] try: src_device, src_endpoint, = self._get_endpoint_details(endpoints[0]) dst_device, dst_endpoint, = self._get_endpoint_details(endpoints[-1]) src_controller = self.__task_executor.get_device_controller(src_device) del src_controller.device_config.config_rules[:] for index in range(len(endpoints) - 1): current_device, current_endpoint = self._get_endpoint_details(endpoints[index]) next_device, next_endpoint = self._get_endpoint_details(endpoints[index + 1]) if current_device.name == next_device.name: in_port_forward = current_endpoint.name out_port_forward = next_endpoint.name dpid_src = int(current_device.name) LOGGER.debug(f"DPID source: {dpid_src}") dpid_dst = int(next_device.name) LOGGER.debug(f"DPID destination: {dpid_dst}") flow_rule_forward = f"{flow_rules[0]}-{flow_rules[1]}" flow_rule_reverse = f"{flow_rules[1]}-{flow_rules[0]}" ip_address_source = ip_addresses[0] ip_address_destination = ip_addresses[1] forward_resource_value = ({"dpid": current_device.name, "in-port": in_port_forward, "out-port": out_port_forward, "ip_address_source": ip_address_source, "ip_address_destination": ip_address_destination, }) forward_rule = json_config_rule_delete ( resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_forward}]", resource_value=forward_resource_value ) LOGGER.debug(f"Forward configuration rule: {forward_rule}") in_port_reverse = next_endpoint.name out_port_reverse = current_endpoint.name reverse_resource_value = { "dpid": current_device.name, "in-port": in_port_reverse, "out-port": out_port_reverse, "ip_address_source": ip_address_destination, "ip_address_destination": ip_address_source, } reverse_rule = json_config_rule_delete( resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_reverse}]", resource_value=reverse_resource_value it_endpoints = iter(endpoints) for src_ep_id, dst_ep_id in zip(it_endpoints, it_endpoints): self._configure_hop_flow_rules( src_ep_id, dst_ep_id, path_endpoints_data, is_delete=True ) LOGGER.debug(f"Reverse configuration rule: {reverse_rule}") src_controller.device_config.config_rules.append(ConfigRule(**reverse_rule)) src_controller.device_config.config_rules.append(ConfigRule(**forward_rule)) json_config_rule_delete_1 = json_config_rule_delete('/services/service[{:s}]'.format(service_uuid), { 'uuid': service_uuid }) src_controller.device_config.config_rules.append(ConfigRule(**json_config_rule_delete_1)) self.__task_executor.configure_device(src_controller) results.append(True) def get_config_rules(controller): try: config_rules = controller.device_config.config_rules for rule in config_rules: if rule.HasField("custom"): resource_key = rule.custom.resource_key resource_value = rule.custom.resource_value LOGGER.debug(f"Resource key in config: {resource_key}, Resource value in config: {resource_value}") except Exception as e: print(f"Error accessing config rules: {e}") get_config_rules(src_controller) LOGGER.debug(f"Configuration rules: {src_controller.device_config.config_rules}") return results except Exception as e: LOGGER.exception(f"Error in DeleteEndpoint") LOGGER.exception("Error in SetEndpoint") return [e] Loading
src/service/service/service_handlers/l3nm_ryu/L3NMRyuServiceHandler.py +176 −202 Original line number Diff line number Diff line Loading @@ -12,22 +12,106 @@ # See the License for the specific language governing permissions and # limitations under the License. import json, logging, re from typing import List, Optional, Tuple, Union import logging, re from dataclasses import dataclass from typing import Dict, List, Optional, Tuple, Union from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.proto.context_pb2 import ConfigRule, Device, DeviceId, EndPoint, Service from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.ConfigRule import json_config_rule_delete, json_config_rule_set from common.tools.object_factory.Device import json_device_id from common.type_checkers.Checkers import chk_type from service.service.service_handler_api.SettingsHandler import SettingsHandler from service.service.service_handler_api.Tools import get_device_endpoint_uuids, get_endpoint_matching from service.service.service_handler_api._ServiceHandler import _ServiceHandler from service.service.service_handler_api.SettingsHandler import SettingsHandler from service.service.task_scheduler.TaskExecutor import TaskExecutor LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Service', 'Handler', labels={'handler': 'l3nm_ryu'}) RE_DEV_EP_SETTINGS = re.compile(r'/device\[(.*?)\]/endpoint\[(.*?)\]/settings') @dataclass class EndpointData: device_uuid : str device_name : str endpoint_uuid : str endpoint_name : str ipv4_address : Optional[str] @classmethod def create( cls, device_obj : Device, endpoint_obj : Endpoint, endpoint_settings : Optional[TreeNode] ) -> 'EndpointIds': device_uuid = device_obj.device_id.device_uuid.uuid ep_device_uuid = endpoint_obj.endpoint_id.device_id.device_uuid.uuid if device_uuid != ep_device_uuid: MSG = 'Malformed endpoint: device_uuid({:s}) != endpoint.device_uuid({:s})' raise Exception(MSG.format(str(device_uuid), str(ep_device_uuid))) ipv4_address = None if endpoint_settings is not None: json_settings : Dict = endpoint_settings.value if 'address_ip' in json_settings: ipv4_address = json_settings['address_ip'] elif 'ip_address' in json_settings: ipv4_address = json_settings['ip_address'] else: MSG = 'IP Address not found. Tried: address_ip and ip_address. endpoint_obj={:s} settings={:s}' LOGGER.warning(MSG.format(str(endpoint_obj), str(json_settings))) return cls( device_uuid = device_uuid, device_name = device_obj.name, endpoint_uuid = endpoint_obj.endpoint_id.endpoint_uuid.uuid, endpoint_name = endpoint_obj.name, ipv4_address = ipv4_address, ) def get_flow_rule_specs( path_endpoints_data : List[EndpointData], is_reverse : bool ) -> Tuple[str, str, str, str, str]: src_index = -1 if is_reverse else 0 dst_index = 0 if is_reverse else -1 src_endpoint_data = path_endpoints_data[src_index] src_device_name = src_endpoint_data.device_name src_endpoint_name = src_endpoint_data.endpoint_name dst_endpoint_data = path_endpoints_data[dst_index] dst_device_name = dst_endpoint_data.device_name dst_endpoint_name = dst_endpoint_data.endpoint_name flow_rule_name = '{:s}-{:s}'.format(src_device_name, dst_device_name) src_ip_addr = src_endpoint_data.ipv4_address dst_ip_addr = dst_endpoint_data.ipv4_address return flow_rule_name, src_endpoint_name, dst_endpoint_name, src_ip_addr, dst_ip_addr def compose_flow_rule( device_name : str, dpid : str, path_endpoints_data : List[EndpointData], is_reverse : bool, is_delete : bool ) -> ConfigRule: flow_specs = get_flow_rule_specs(path_endpoints_data, is_reverse) flow_rule_name, in_port, out_port, src_ip_addr, dst_ip_addr = flow_specs RSRC_KEY_TMPL = '/device[{:s}]/flow[{:d}]' resource_key = RSRC_KEY_TMPL.format(device_name, flow_rule_name) resource_value = { 'dpid' : dpid, 'in-port' : in_port, 'out-port' : out_port, 'src-ip-addr' : src_ip_addr, 'dst-ip-addr' : dst_ip_addr, } json_config_rule_method = json_config_rule_delete if is_delete else json_config_rule_set flow_rule = json_config_rule_method(resource_key, resource_value) return ConfigRule(**flow_rule) class L3NMRyuServiceHandler(_ServiceHandler): def __init__( # pylint: disable=super-init-not-called self, service : Service, task_executor : TaskExecutor, **settings Loading @@ -36,125 +120,96 @@ class L3NMRyuServiceHandler(_ServiceHandler): self.__task_executor = task_executor self.__settings_handler = SettingsHandler(service.service_config, **settings) def _compose_path_endpoint_data( self, endpoints : List[Tuple[str, str, Optional[str]]] ) -> List[EndpointData]: path_endpoints_data : List[EndpointData] = list() for endpoint in endpoints: device_uuid, endpoint_uuid = get_device_endpoint_uuids(endpoint) device_obj = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid))) endpoint_obj = get_endpoint_matching(device_obj, endpoint_uuid) endpoint_settings = self.__settings_handler.get_endpoint_settings(device_obj, endpoint_obj) endpoint_ids = EndpointData.create(device_obj, endpoint_obj, endpoint_settings) path_endpoints_data.append(endpoint_ids) LOGGER.debug('path_endpoints_data = {:s}'.format(str(path_endpoints_data))) return path_endpoints_data def _get_endpoint_details( self, endpoint : Tuple[str, str, Optional[str]] ) -> Tuple[Device, EndPoint]: #Dict]: ) -> Tuple[Device, EndPoint]: device_uuid, endpoint_uuid = get_device_endpoint_uuids(endpoint) device_obj = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid))) endpoint_obj = get_endpoint_matching(device_obj, endpoint_uuid) device_name = device_obj.name endpoint_name = endpoint_obj.name return device_obj, endpoint_obj def _configure_hop_flow_rules( self, src_ep_id : Tuple[str, str, Optional[str]], dst_ep_id : Tuple[str, str, Optional[str]], path_endpoints_data : List[EndpointData], is_delete : bool = False ) -> None: src_dev, src_ep = self._get_endpoint_details(src_ep_id) dst_dev, dst_ep = self._get_endpoint_details(dst_ep_id) if src_dev.device_id.device_uuid.uuid != dst_dev.device_id.device_uuid.uuid: MSG = 'Mismatching device for endpoints: {:s}-{:s}' raise Exception(MSG.format(str(src_ep_id), str(dst_ep_id))) src_ctrl = self.__task_executor.get_device_controller(src_dev) dst_ctrl = self.__task_executor.get_device_controller(dst_dev) if src_ctrl != dst_ctrl: MSG = 'Mismatching controller for endpoints: {:s}-{:s}' raise Exception(MSG.format(str(src_ep_id), str(dst_ep_id))) device_name = src_ep.name.split('-')[0] device_dpid = src_dev.name ctrl = src_ctrl LOGGER.debug('Ctrl: {:s}'.format(str(ctrl.name))) del ctrl.device_config.config_rules[:] del ctrl.device_endpoints[:] forward_flow_rule = compose_flow_rule( device_name, device_dpid, path_endpoints_data, is_reverse=False, is_delete=is_delete ) MSG = 'Forward Config Rule: {:s}' LOGGER.debug(MSG.format(grpc_message_to_json_string(forward_flow_rule))) ctrl.device_config.config_rules.append(forward_flow_rule) reverse_flow_rule = compose_flow_rule( device_name, device_dpid, path_endpoints_data, is_reverse=True, is_delete=is_delete ) MSG = 'Reverse Config Rule: {:s}' LOGGER.debug(MSG.format(grpc_message_to_json_string(reverse_flow_rule))) ctrl.device_config.config_rules.append(reverse_flow_rule) self.__task_executor.configure_device(ctrl) @metered_subclass_method(METRICS_POOL) def SetEndpoint( self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None ) -> List[Union[bool, Exception]]: LOGGER.debug('endpoints = {:s}'.format(str(endpoints))) LOGGER.debug('[SetEndpoint] endpoints = {:s}'.format(str(endpoints))) chk_type('endpoints', endpoints, list) if len(endpoints) < 2: LOGGER.warning('nothing done: not enough endpoints') return [] service_uuid = self.__service.service_id.service_uuid.uuid service_name= self.__service.name service_configuration_rules=self.__service.service_config.config_rules LOGGER.debug('service_configuration_rules = {:s}'.format(str(service_configuration_rules))) ip_addresses = [] flow_rules = [] for rule in service_configuration_rules: try: custom_field = rule.custom resource_value_str = custom_field.resource_value resource_value = json.loads(resource_value_str) resource_key_str = custom_field.resource_key LOGGER.debug(f"resource_key_str = {resource_key_str}") match = re.search(r"/device\[(.*?)\]/", resource_key_str) if match: device_name = match.group(1) flow_rules.append(device_name) ip_address = resource_value.get("ip_address") ip_addresses.append(ip_address) except Exception as e: LOGGER.exception("Error in Rules") LOGGER.debug('ip_address = {:s}'.format(str(ip_addresses))) LOGGER.debug('flow_rules = {:s}'.format(str(flow_rules))) if len(flow_rules) < 2: LOGGER.warning('Not enough devices to construct flow rules') return [] if len(ip_addresses) < 2: LOGGER.warning('Not enough IP addresses found') path_endpoints_data = self._compose_path_endpoint_data(endpoints) if len(path_endpoints_data) < 2: MSG = 'Wrong number of end devices: {:s}' LOGGER.warning(MSG.format(str(path_endpoints_data))) return [] results = [] try: src_device, src_endpoint, = self._get_endpoint_details(endpoints[0]) dst_device, dst_endpoint, = self._get_endpoint_details(endpoints[-1]) src_controller = self.__task_executor.get_device_controller(src_device) del src_controller.device_config.config_rules[:] for index in range(len(endpoints) - 1): current_device, current_endpoint = self._get_endpoint_details(endpoints[index]) next_device, next_endpoint = self._get_endpoint_details(endpoints[index + 1]) if current_device.name == next_device.name: in_port_forward = current_endpoint.name out_port_forward = next_endpoint.name dpid_src = int(current_device.name) LOGGER.debug(f"DPID source: {dpid_src}") dpid_dst = int(next_device.name) LOGGER.debug(f"DPID destination: {dpid_dst}") flow_rule_forward = f"{flow_rules[0]}-{flow_rules[1]}" flow_rule_reverse = f"{flow_rules[1]}-{flow_rules[0]}" ip_address_source = ip_addresses[0] ip_address_destination = ip_addresses[1] forward_resource_value = ({"dpid": current_device.name, "in-port": in_port_forward, "out-port": out_port_forward, "ip_address_source": ip_address_source, "ip_address_destination": ip_address_destination, }) forward_rule = json_config_rule_set ( resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_forward}]", resource_value=forward_resource_value it_endpoints = iter(endpoints) for src_ep_id, dst_ep_id in zip(it_endpoints, it_endpoints): self._configure_hop_flow_rules( src_ep_id, dst_ep_id, path_endpoints_data, is_delete=False ) LOGGER.debug(f"Forward configuration rule: {forward_rule}") src_controller.device_config.config_rules.append(ConfigRule(**forward_rule)) in_port_reverse = next_endpoint.name out_port_reverse = current_endpoint.name reverse_resource_value = { "dpid": current_device.name, "in-port": in_port_reverse, "out-port": out_port_reverse, "ip_address_source": ip_address_destination, "ip_address_destination": ip_address_source, } reverse_rule = json_config_rule_set( resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_reverse}]", resource_value=reverse_resource_value ) LOGGER.debug(f"Reverse configuration rule: {reverse_rule}") src_controller.device_config.config_rules.append(ConfigRule(**reverse_rule)) self.__task_executor.configure_device(src_controller) results.append(True) def get_config_rules(controller): try: config_rules = controller.device_config.config_rules for rule in config_rules: if rule.HasField("custom"): resource_key = rule.custom.resource_key resource_value = rule.custom.resource_value LOGGER.debug(f"Resource key in config: {resource_key}, Resource value in config: {resource_value}") except Exception as e: LOGGER.exception("Error in Configuration Rules") get_config_rules(src_controller) LOGGER.debug(f"Configuration rules: {src_controller.device_config.config_rules}") return results except Exception as e: LOGGER.exception("Error in SetEndpoint") return [e] Loading @@ -163,109 +218,28 @@ class L3NMRyuServiceHandler(_ServiceHandler): def DeleteEndpoint( self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None ) -> List[Union[bool, Exception]]: LOGGER.debug('endpoints_delete = {:s}'.format(str(endpoints))) LOGGER.debug('[DeleteEndpoint] endpoints = {:s}'.format(str(endpoints))) chk_type('endpoints', endpoints, list) if len(endpoints) < 2: LOGGER.warning('nothing done: not enough endpoints') return [] service_uuid = self.__service.service_id.service_uuid.uuid service_name= self.__service.name service_configuration_rules=self.__service.service_config.config_rules LOGGER.debug('service_configuration_rules = {:s}'.format(str(service_configuration_rules))) ip_addresses = [] flow_rules = [] for rule in service_configuration_rules: try: custom_field = rule.custom resource_value_str = custom_field.resource_value resource_value = json.loads(resource_value_str) resource_key_str = custom_field.resource_key LOGGER.debug(f"resource_key_str = {resource_key_str}") match = re.search(r"/device\[(.*?)\]/", resource_key_str) if match: device_name = match.group(1) else: device_name = None flow_rules.append(device_name) ip_address = resource_value.get("ip_address") ip_addresses.append(ip_address) except Exception as e: LOGGER.exception("Error in Rules") LOGGER.debug('ip_address = {:s}'.format(str(ip_addresses))) LOGGER.debug('flow_rules = {:s}'.format(str(flow_rules))) path_endpoints_data = self._compose_path_endpoint_data(endpoints) if len(path_endpoints_data) < 2: MSG = 'Wrong number of end devices: {:s}' LOGGER.warning(MSG.format(str(path_endpoints_data))) return [] results = [] try: src_device, src_endpoint, = self._get_endpoint_details(endpoints[0]) dst_device, dst_endpoint, = self._get_endpoint_details(endpoints[-1]) src_controller = self.__task_executor.get_device_controller(src_device) del src_controller.device_config.config_rules[:] for index in range(len(endpoints) - 1): current_device, current_endpoint = self._get_endpoint_details(endpoints[index]) next_device, next_endpoint = self._get_endpoint_details(endpoints[index + 1]) if current_device.name == next_device.name: in_port_forward = current_endpoint.name out_port_forward = next_endpoint.name dpid_src = int(current_device.name) LOGGER.debug(f"DPID source: {dpid_src}") dpid_dst = int(next_device.name) LOGGER.debug(f"DPID destination: {dpid_dst}") flow_rule_forward = f"{flow_rules[0]}-{flow_rules[1]}" flow_rule_reverse = f"{flow_rules[1]}-{flow_rules[0]}" ip_address_source = ip_addresses[0] ip_address_destination = ip_addresses[1] forward_resource_value = ({"dpid": current_device.name, "in-port": in_port_forward, "out-port": out_port_forward, "ip_address_source": ip_address_source, "ip_address_destination": ip_address_destination, }) forward_rule = json_config_rule_delete ( resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_forward}]", resource_value=forward_resource_value ) LOGGER.debug(f"Forward configuration rule: {forward_rule}") in_port_reverse = next_endpoint.name out_port_reverse = current_endpoint.name reverse_resource_value = { "dpid": current_device.name, "in-port": in_port_reverse, "out-port": out_port_reverse, "ip_address_source": ip_address_destination, "ip_address_destination": ip_address_source, } reverse_rule = json_config_rule_delete( resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_reverse}]", resource_value=reverse_resource_value it_endpoints = iter(endpoints) for src_ep_id, dst_ep_id in zip(it_endpoints, it_endpoints): self._configure_hop_flow_rules( src_ep_id, dst_ep_id, path_endpoints_data, is_delete=True ) LOGGER.debug(f"Reverse configuration rule: {reverse_rule}") src_controller.device_config.config_rules.append(ConfigRule(**reverse_rule)) src_controller.device_config.config_rules.append(ConfigRule(**forward_rule)) json_config_rule_delete_1 = json_config_rule_delete('/services/service[{:s}]'.format(service_uuid), { 'uuid': service_uuid }) src_controller.device_config.config_rules.append(ConfigRule(**json_config_rule_delete_1)) self.__task_executor.configure_device(src_controller) results.append(True) def get_config_rules(controller): try: config_rules = controller.device_config.config_rules for rule in config_rules: if rule.HasField("custom"): resource_key = rule.custom.resource_key resource_value = rule.custom.resource_value LOGGER.debug(f"Resource key in config: {resource_key}, Resource value in config: {resource_value}") except Exception as e: print(f"Error accessing config rules: {e}") get_config_rules(src_controller) LOGGER.debug(f"Configuration rules: {src_controller.device_config.config_rules}") return results except Exception as e: LOGGER.exception(f"Error in DeleteEndpoint") LOGGER.exception("Error in SetEndpoint") return [e]