diff --git a/src/device/service/drivers/OpenFlow/OpenFlowDriver.py b/src/device/service/drivers/OpenFlow/OpenFlowDriver.py index 070dc597d98f6dab7f2b4baab040bedbe9e83dfa..cd336f7bff2d5e64056f661caff20f4a49f0101d 100644 --- a/src/device/service/drivers/OpenFlow/OpenFlowDriver.py +++ b/src/device/service/drivers/OpenFlow/OpenFlowDriver.py @@ -123,16 +123,13 @@ class OpenFlowDriver(_Driver): mac_address_source = resource_value_dict.get("mac_address_source", "") mac_address_destination = resource_value_dict.get("mac_address_destination", "") - # Default priority - #priority = 1000 - if "h1-h3" in resource_key: priority = 65535 match_fields = { "in_port": in_port, "eth_type": 0x0800, - #"ipv4_src": ip_address_source , - #"ipv4_dst": ip_address_destination, + "ipv4_src": ip_address_source , + "ipv4_dst": ip_address_destination, "eth_src": mac_address_source, "dl_dst": mac_address_destination } @@ -141,8 +138,8 @@ class OpenFlowDriver(_Driver): match_fields = { "in_port": in_port, "eth_type": 0x0800, - #"ipv4_src": ip_address_source, - #"ipv4_dst": ip_address_destination, + "ipv4_src": ip_address_source, + "ipv4_dst": ip_address_destination, "eth_src": mac_address_source, "dl_dst": mac_address_destination } @@ -177,7 +174,6 @@ class OpenFlowDriver(_Driver): flow_entry = { "dpid": dpid, - #cookie": 0, "priority": priority, "match": match_fields, "instructions": [ @@ -243,53 +239,163 @@ class OpenFlowDriver(_Driver): - #@metered_subclass_method(METRICS_POOL) - #def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: - # LOGGER.info(f"DeleteConfig123_resources: {resources}") - # url = f"{self.__base_url}/stats/flowentry/delete_strict" - # results = [] - # if not resources: - # return results - # with self.__lock: - # for resource in resources: - # try: - # resource_key, resource_value = resource - # if not resource_key.startswith("/device[") or not "/flow[" in resource_key: - # LOGGER.error(f"Invalid resource_key format: {resource_key}") - # results.append(Exception(f"Invalid resource_key format: {resource_key}")) - # continue - # try: - # resource_value_dict = json.loads(resource_value) - # dpid = int(resource_value_dict["dpid"], 16) - # in_port = int(resource_value_dict["in-port"].split("-")[1][3:]) - # out_port = int(resource_value_dict["out-port"].split("-")[1][3:]) - # except (KeyError, ValueError, IndexError) as e: - # LOGGER.error(f"Invalid resource_value: {resource_value}, error: {e}") - # results.append(Exception(f"Invalid resource_value: {resource_value}")) - # continue - # flow_entry = { - # "dpid": dpid, - # "table_id": 0, - # "priority": 11111, - # "match": {"in_port": in_port}, - # } - # try: - # response = requests.post(url, json=flow_entry, timeout=self.__timeout, verify=False, auth=self.__auth) - # response.raise_for_status() - # results.append(True) - # LOGGER.info(f"Successfully posted flow entry: {flow_entry}") - # except requests.exceptions.Timeout: - # LOGGER.error(f"Timeout connecting to {url}") - # results.append(Exception(f"Timeout connecting to {url}")) - # except requests.exceptions.RequestException as e: - # LOGGER.error(f"Error posting flow entry {flow_entry} to {url}: {e}") - # results.append(e) - # except Exception as e: - # LOGGER.error(f"Error processing resource {resource_key}: {e}", exc_info=True) - # results.append(e) - # return results -# -# + @metered_subclass_method(METRICS_POOL) + def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: + LOGGER.info(f"DeleteConfig called for resources: {resources}") + url = f"{self.__base_url}/stats/flowentry/delete_strict" + results = [] + if not resources: + return results + with self.__lock: + for resource in resources: + try: + resource_key, resource_value = resource + + if not resource_key.startswith("/device[") or not "/flow[" in resource_key: + LOGGER.error(f"Invalid resource_key format: {resource_key}") + results.append(Exception(f"Invalid resource_key format: {resource_key}")) + continue + + try: + resource_value_dict = json.loads(resource_value) + LOGGER.info(f"resource_value_dict: {resource_value_dict}") + dpid = int(resource_value_dict["dpid"], 16) + in_port = int(resource_value_dict["in-port"].split("-")[1][3:]) + out_port = int(resource_value_dict["out-port"].split("-")[1][3:]) + self.__cookie_counter += 1 + cookie = self.__cookie_counter + ip_address_source = resource_value_dict.get("ip_address_source", "") + ip_address_destination = resource_value_dict.get("ip_address_destination", "") + mac_address_source = resource_value_dict.get("mac_address_source", "") + mac_address_destination = resource_value_dict.get("mac_address_destination", "") + + if "h1-h3" in resource_key: + priority = 65535 + match_fields = { + "in_port": in_port, + "eth_type": 0x0800, + "ipv4_src": ip_address_source , + "ipv4_dst": ip_address_destination, + "eth_src": mac_address_source, + "dl_dst": mac_address_destination, + "table_id": 0, + "cookie": 0, + "cookie_mask": 0, + } + elif "h3-h1" in resource_key: + priority = 65535 + match_fields = { + "in_port": in_port, + "eth_type": 0x0800, + "ipv4_src": ip_address_source, + "ipv4_dst": ip_address_destination, + "eth_src": mac_address_source, + "dl_dst": mac_address_destination, + "table_id": 0, + "cookie": 0, + "cookie_mask": 0, + } + + elif "h2-h4" in resource_key: + priority = 1500 + match_fields = { + "in_port": in_port, + "eth_type": 0x0800, + "ipv4_src": ip_address_source , + "ipv4_dst": ip_address_destination, + "eth_src": mac_address_source, + "eth_dst": mac_address_destination, + "table_id": 0, + "cookie": 0, + "cookie_mask": 0, + } + elif "h4-h2" in resource_key: + priority = 1500 + match_fields = { + "in_port": in_port, + "eth_type": 0x0800, + "ipv4_src": ip_address_source, + "ipv4_dst": ip_address_destination, + "eth_src": mac_address_source, + "eth_dst": mac_address_destination, + "table_id": 0, + "cookie": 0, + "cookie_mask": 0, + } + + except (KeyError, ValueError, IndexError) as e: + LOGGER.error(f"Invalid resource_value: {resource_value}, error: {e}") + results.append(Exception(f"Invalid resource_value: {resource_value}")) + continue + + LOGGER.debug(f"Flow match fields: {match_fields}") + + flow_entry = { + "dpid": dpid, + #cookie": 0, + "priority": priority, + "match": match_fields, + "instructions": [ + { + "type": "APPLY_ACTIONS", + "actions": [ + { + "max_len": 65535, + "type": "OUTPUT", + "port": out_port + }]}]} + + flow_entry_arp_foraward = { + "dpid": dpid, + "priority": 65535, + "match": { + "eth_dst": "ff:ff:ff:ff:ff:ff", + "eth_type": 0x0806 + }, + "instructions": [ + { + "type": "APPLY_ACTIONS", + "actions": [ + { + "type": "OUTPUT", + "port": "0xfffffffb" + } + ]}]} + flow_entry_arp_reply = { + "dpid": dpid, + "priority": 65535, + "match": { + "eth_type": 0x0806, + "arp_op": 2 + }, + "instructions": [ + { + "type": "APPLY_ACTIONS", + "actions": [ + { + "type": "OUTPUT", + "port": "0xfffffffb" + } + ]}]} + + try: + response = requests.post(url, json=flow_entry_arp_foraward, timeout=self.__timeout, verify=False, auth=self.__auth) + response = requests.post(url, json=flow_entry_arp_reply, timeout=self.__timeout, verify=False, auth=self.__auth) + response = requests.post(url, json=flow_entry, timeout=self.__timeout, verify=False, auth=self.__auth) + response.raise_for_status() + results.append(True) + LOGGER.info(f"Successfully posted flow entry: {flow_entry}") + except requests.exceptions.Timeout: + LOGGER.error(f"Timeout connecting to {url}") + results.append(Exception(f"Timeout connecting to {url}")) + except requests.exceptions.RequestException as e: + LOGGER.error(f"Error posting flow entry {flow_entry} to {url}: {e}") + results.append(e) + except Exception as e: + LOGGER.error(f"Error processing resource {resource_key}: {e}", exc_info=True) + results.append(e) + return results + ## @metered_subclass_method(METRICS_POOL) ## def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: ## # TODO: TAPI does not support monitoring by now diff --git a/src/service/service/service_handlers/l3nm_ryu/L3NMryuServiceHandler.py b/src/service/service/service_handlers/l3nm_ryu/L3NMryuServiceHandler.py index 493ddd60ebf241f551a5a887b983ecaa417f15b8..0b78861364953d6e3af396b5e10deb31e48a44fd 100644 --- a/src/service/service/service_handlers/l3nm_ryu/L3NMryuServiceHandler.py +++ b/src/service/service/service_handlers/l3nm_ryu/L3NMryuServiceHandler.py @@ -16,13 +16,13 @@ import json, logging, netaddr from venv import logger from re import L from typing import Any, Dict, List, Optional, Tuple, Union - from py import log +from pyparsing import C from pytest import skip from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.proto.context_pb2 import ConfigRule, Device, DeviceId, EndPoint, Service,ConfigRule_Custom,ConfigActionEnum from common.tools.grpc.Tools import grpc_message_to_json_string -from common.tools.object_factory.ConfigRule import json_config_rule_delete, json_config_rule_set +from common.tools.object_factory.ConfigRule import json_config_rule_delete, json_config_rule_set,json_config_rule from common.tools.object_factory.Device import json_device_id from common.type_checkers.Checkers import chk_type #from nbi.service.rest_server.nbi_plugins.ietf_network.bindings.networks.network.link import destination @@ -185,37 +185,112 @@ class RYUServiceHandler(_ServiceHandler): def DeleteEndpoint( self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None ) -> List[Union[bool, Exception]]: - LOGGER.debug('endpoints = {:s}'.format(str(endpoints))) + LOGGER.debug('endpoints_delete = {:s}'.format(str(endpoints))) chk_type('endpoints', endpoints, list) - if len(endpoints) < 2: return [] + if len(endpoints) < 2: + LOGGER.warning('nothing done: not enough endpoints') + return [] service_uuid = self.__service.service_id.service_uuid.uuid service_name= self.__service.name + service_configuration_rules=self.__service.service_config.config_rules + #LOGGER.debug('service_configuration_rules = {:s}'.format(str(service_configuration_rules))) + ip_addresses = [] + mac_addresses = [] + for rule in service_configuration_rules: + try: + custom_field = rule.custom + resource_value_str = custom_field.resource_value + resource_value = json.loads(resource_value_str) + ip_address = resource_value.get("address_ip") + mac_address = resource_value.get("mac_address") + ip_addresses.append(ip_address) + mac_addresses.append(mac_address) + except Exception as e: + print(f"Error parsing rule: {e}, Rule: {rule}") + LOGGER.debug('ip_address = {:s}'.format(str(ip_addresses))) + LOGGER.debug('mac_address = {:s}'.format(str(mac_addresses))) + LOGGER.debug('service_name = {:s}'.format(str(service_name))) + #LOGGER.debug('service_uuid = {:s}'.format(str(service_uuid))) + #LOGGER.debug('self.__settings_handler = {:s}'.format(str(self.__settings_handler.dump_config_rules()))) results = [] try: - src_device_uuid, _ = get_device_endpoint_uuids(endpoints[0]) - src_device = self.__task_executor.get_device(DeviceId(**json_device_id(src_device_uuid))) + src_device, src_endpoint, = self._get_endpoint_details(endpoints[0]) + dst_device, dst_endpoint, = self._get_endpoint_details(endpoints[-1]) src_controller = self.__task_executor.get_device_controller(src_device) + #LOGGER.debug(f"Source controller: {src_controller.device_config.config_rules}") + del src_controller.device_config.config_rules[:] + for index in range(len(endpoints) - 1): + current_device, current_endpoint = self._get_endpoint_details(endpoints[index]) + #LOGGER.debug(f"Current device: {current_device.name}, Current endpoint: {current_endpoint.name}") + next_device, next_endpoint = self._get_endpoint_details(endpoints[index + 1]) + #LOGGER.debug(f"Next device: {next_device.name}, Next endpoint: {next_endpoint.name}") + if current_device.name == next_device.name: + in_port_forward = current_endpoint.name + out_port_forward = next_endpoint.name + flow_split = service_name.split('-') + dpid_src = int(current_device.name) + LOGGER.debug(f"DPID source: {dpid_src}") + dpid_dst = int(next_device.name) + LOGGER.debug(f"DPID destination: {dpid_dst}") + flow_rule_forward = f"{flow_split[0]}-{flow_split[2]}" + flow_rule_reverse = f"{flow_split[2]}-{flow_split[0]}" + ip_address_source = ip_addresses[0] + ip_address_destination = ip_addresses[1] + mac_address_source = mac_addresses[0] + mac_address_destination = mac_addresses[1] + forward_resource_value = ({"dpid": current_device.name, + "in-port": in_port_forward, + "out-port": out_port_forward, + "ip_address_source": ip_address_source, + "ip_address_destination": ip_address_destination, + "mac_address_source": mac_address_source, + "mac_address_destination": mac_address_destination + }) + forward_rule = json_config_rule_delete ( + resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_forward}]", + resource_value=forward_resource_value + ) + LOGGER.debug(f"Forward configuration rule: {forward_rule}") + in_port_reverse = next_endpoint.name + out_port_reverse = current_endpoint.name + reverse_resource_value = { + "dpid": current_device.name, + "in-port": in_port_reverse, + "out-port": out_port_reverse, + "ip_address_source": ip_address_destination, + "ip_address_destination": ip_address_source, + "mac_address_source": mac_address_destination, + "mac_address_destination": mac_address_source + } + reverse_rule = json_config_rule_delete( + resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_reverse}]", + resource_value=reverse_resource_value + ) + LOGGER.debug(f"Reverse configuration rule: {reverse_rule}") + src_controller.device_config.config_rules.append(ConfigRule(**reverse_rule)) + src_controller.device_config.config_rules.append(ConfigRule(**forward_rule)) - dst_device_uuid, _ = get_device_endpoint_uuids(endpoints[1]) - dst_device = self.__task_executor.get_device(DeviceId(**json_device_id(dst_device_uuid))) - dst_controller = self.__task_executor.get_device_controller(dst_device) - - if src_controller.device_id.device_uuid.uuid != dst_controller.device_id.device_uuid.uuid: - raise Exception('Different Src-Dst devices not supported by now') - controller = src_controller - - json_config_rule = json_config_rule_delete('/services/service[{:s}]'.format(service_uuid), { + json_config_rule_delete_1 = json_config_rule_delete('/services/service[{:s}]'.format(service_uuid), { 'uuid': service_uuid }) - del controller.device_config.config_rules[:] - controller.device_config.config_rules.append(ConfigRule(**json_config_rule)) - self.__task_executor.configure_device(controller) + src_controller.device_config.config_rules.append(ConfigRule(**json_config_rule_delete_1)) + self.__task_executor.configure_device(src_controller) results.append(True) - except Exception as e: # pylint: disable=broad-except - LOGGER.exception('Unable to DeleteEndpoint for Service({:s})'.format(str(service_uuid))) - results.append(e) - - return results - + def get_config_rules(controller): + try: + config_rules = controller.device_config.config_rules + for rule in config_rules: + if rule.HasField("custom"): + resource_key = rule.custom.resource_key + resource_value = rule.custom.resource_value + LOGGER.debug(f"Resource key in config: {resource_key}, Resource value in config: {resource_value}") + except Exception as e: + print(f"Error accessing config rules: {e}") + get_config_rules(src_controller) + LOGGER.debug(f"Configuration rules: {src_controller.device_config.config_rules}") + return results + except Exception as e: + LOGGER.error(f"Error in SetEndpoint: {e}") + return [e]