diff --git a/src/device/service/drivers/OpenFlow/OpenFlowDriver.py b/src/device/service/drivers/OpenFlow/OpenFlowDriver.py index 91a1d8da0994fdf5179cb172bdaf68fc74b260ab..070dc597d98f6dab7f2b4baab040bedbe9e83dfa 100644 --- a/src/device/service/drivers/OpenFlow/OpenFlowDriver.py +++ b/src/device/service/drivers/OpenFlow/OpenFlowDriver.py @@ -43,6 +43,8 @@ class OpenFlowDriver(_Driver): self.__base_url = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port)) self.__timeout = int(self.settings.get('timeout', 120)) self.tac = TfsApiClient(self.address, int(self.port), scheme=scheme, username=username, password=password) + self.__cookie_counter = 0 + self._priority_counter = 1 def Connect(self) -> bool: url = f"{self.__base_url}" @@ -89,120 +91,208 @@ class OpenFlowDriver(_Driver): results.append((resource_key, e)) return results - -# @metered_subclass_method(METRICS_POOL) -# def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: -# chk_type('resources', resource_keys, list) -# results = [] -# with self.__lock: -# for key in resource_keys: -# try: -# if key.startswith('flows:'): -# dpid = key.split(':', 1)[1] -# flows = get_flows(self.__base_url, dpid, auth=self.__auth, timeout=self.__timeout) -# results.append((key, flows)) -# elif key.startswith('description:'): -# dpid = key.split(':', 1)[1] -# desc = get_desc(self.__base_url, dpid, auth=self.__auth, timeout=self.__timeout) -# results.append((key, desc)) -# elif key.startswith('switches'): -# switches = get_switches(self.__base_url, auth=self.__auth, timeout=self.__timeout) -# results.append((key, switches)) -# elif key.startswith('port_description:'): -# dpid = key.split(':', 1)[1] -# desc = get_port_desc(self.__base_url,dpid, auth=self.__auth, timeout=self.__timeout) -# results.append((key, desc)) -# elif key.startswith('switch_info'): -# sin = get_switches_information(self.__base_url, auth=self.__auth, timeout=self.__timeout) -# results.append((key, sin)) -# elif key.startswith('links_info'): -# lin = get_links_information(self.__base_url, auth=self.__auth, timeout=self.__timeout) -# results.append((key, lin)) -# else: -# results.append((key, None)) # If key not handled, append None -# except Exception as e: -# results.append((key, e)) -# return results -# -# @metered_subclass_method(METRICS_POOL) -# def DeleteConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: -# chk_type('resources', resource_keys, list) -# results = [] -# with self.__lock: -# for item in resource_keys: -# try: -# if isinstance(item, tuple): -# key, data = item -# else: -# key, data = item, None -# if key.startswith('flowentry_delete:'): -# dpid = key.split(':', 1)[1] -# flows = del_flow_entry(self.__base_url, dpid, auth=self.__auth, timeout=self.__timeout) -# results.append((key, flows)) -# elif key=='flow_data' and data: -# flow_del = delete_flow (self.__base_url,data,auth=self.__auth, timeout=self.__timeout) -# results.append((key, flow_del)) -# else: -# results.append((key, None)) -# except Exception as e: -# results.append((key, e)) -# return results -# @metered_subclass_method(METRICS_POOL) def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: url = f"{self.__base_url}/stats/flowentry/add" results = [] LOGGER.info(f"SetConfig_resources: {resources}") + if not resources: return results + with self.__lock: for resource in resources: try: - resource_key, resource_value = resource - resource_value_dict = json.loads(resource_value) - dpid = int(resource_value_dict["dpid"], 16) - in_port = int(resource_value_dict["in-port"].split("-")[1][3:]) - out_port = int(resource_value_dict["out-port"].split("-")[1][3:]) - flow_entry = { - "dpid": dpid, - "cookie": 0, - "priority": 32768, - "match": { - "in_port": in_port - }, - "actions": [ - { - "type": "OUTPUT", - "port": out_port - } - ] - } - try: - response = requests.post(url,json=flow_entry,timeout=self.__timeout,verify=False,auth=self.__auth) - response.raise_for_status() - results.append(True) - LOGGER.info(f"Successfully posted flow entry: {flow_entry}") - except requests.exceptions.Timeout: - LOGGER.error(f"Timeout connecting to {url}") - results.append(Exception(f"Timeout connecting to {url}")) - except requests.exceptions.RequestException as e: - LOGGER.error(f"Error posting to {url}: {e}") - results.append(e) - else: - LOGGER.warning(f"Skipped invalid resource_key: {resource_key}") - results.append(Exception(f"Invalid resource_key: {resource_key}")) + resource_key, resource_value = resource + + if not resource_key.startswith("/device[") or not "/flow[" in resource_key: + LOGGER.error(f"Invalid resource_key format: {resource_key}") + results.append(Exception(f"Invalid resource_key format: {resource_key}")) + continue + + try: + resource_value_dict = json.loads(resource_value) + LOGGER.info(f"resource_value_dict: {resource_value_dict}") + dpid = int(resource_value_dict["dpid"], 16) + in_port = int(resource_value_dict["in-port"].split("-")[1][3:]) + out_port = int(resource_value_dict["out-port"].split("-")[1][3:]) + self.__cookie_counter += 1 + cookie = self.__cookie_counter + ip_address_source = resource_value_dict.get("ip_address_source", "") + ip_address_destination = resource_value_dict.get("ip_address_destination", "") + mac_address_source = resource_value_dict.get("mac_address_source", "") + mac_address_destination = resource_value_dict.get("mac_address_destination", "") + + # Default priority + #priority = 1000 + + if "h1-h3" in resource_key: + priority = 65535 + match_fields = { + "in_port": in_port, + "eth_type": 0x0800, + #"ipv4_src": ip_address_source , + #"ipv4_dst": ip_address_destination, + "eth_src": mac_address_source, + "dl_dst": mac_address_destination + } + elif "h3-h1" in resource_key: + priority = 65535 + match_fields = { + "in_port": in_port, + "eth_type": 0x0800, + #"ipv4_src": ip_address_source, + #"ipv4_dst": ip_address_destination, + "eth_src": mac_address_source, + "dl_dst": mac_address_destination + } + elif "h2-h4" in resource_key: + priority = 1500 + match_fields = { + "in_port": in_port, + "eth_type": 0x0800, + "ipv4_src": ip_address_source , + "ipv4_dst": ip_address_destination, + "eth_src": mac_address_source, + "eth_dst": mac_address_destination + } + elif "h4-h2" in resource_key: + priority = 1500 + match_fields = { + "in_port": in_port, + "eth_type": 0x0800, + "ipv4_src": ip_address_source, + "ipv4_dst": ip_address_destination, + "eth_src": mac_address_source, + "eth_dst": mac_address_destination + } + + except (KeyError, ValueError, IndexError) as e: + LOGGER.error(f"Invalid resource_value: {resource_value}, error: {e}") + results.append(Exception(f"Invalid resource_value: {resource_value}")) + continue + + LOGGER.debug(f"Flow match fields: {match_fields}") + + flow_entry = { + "dpid": dpid, + #cookie": 0, + "priority": priority, + "match": match_fields, + "instructions": [ + { + "type": "APPLY_ACTIONS", + "actions": [ + { + "max_len": 65535, + "type": "OUTPUT", + "port": out_port + }]}]} + + flow_entry_arp_foraward = { + "dpid": dpid, + "priority": 65535, + "match": { + "eth_dst": "ff:ff:ff:ff:ff:ff", + "eth_type": 0x0806 + }, + "instructions": [ + { + "type": "APPLY_ACTIONS", + "actions": [ + { + "type": "OUTPUT", + "port": "0xfffffffb" + } + ]}]} + flow_entry_arp_reply = { + "dpid": dpid, + "priority": 65535, + "match": { + "eth_type": 0x0806, + "arp_op": 2 + }, + "instructions": [ + { + "type": "APPLY_ACTIONS", + "actions": [ + { + "type": "OUTPUT", + "port": "0xfffffffb" + } + ]}]} + + try: + response = requests.post(url, json=flow_entry_arp_foraward, timeout=self.__timeout, verify=False, auth=self.__auth) + response = requests.post(url, json=flow_entry_arp_reply, timeout=self.__timeout, verify=False, auth=self.__auth) + response = requests.post(url, json=flow_entry, timeout=self.__timeout, verify=False, auth=self.__auth) + response.raise_for_status() + results.append(True) + LOGGER.info(f"Successfully posted flow entry: {flow_entry}") + except requests.exceptions.Timeout: + LOGGER.error(f"Timeout connecting to {url}") + results.append(Exception(f"Timeout connecting to {url}")) + except requests.exceptions.RequestException as e: + LOGGER.error(f"Error posting flow entry {flow_entry} to {url}: {e}") + results.append(e) except Exception as e: LOGGER.error(f"Error processing resource {resource_key}: {e}", exc_info=True) results.append(e) return results + + + #@metered_subclass_method(METRICS_POOL) + #def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: + # LOGGER.info(f"DeleteConfig123_resources: {resources}") + # url = f"{self.__base_url}/stats/flowentry/delete_strict" + # results = [] + # if not resources: + # return results + # with self.__lock: + # for resource in resources: + # try: + # resource_key, resource_value = resource + # if not resource_key.startswith("/device[") or not "/flow[" in resource_key: + # LOGGER.error(f"Invalid resource_key format: {resource_key}") + # results.append(Exception(f"Invalid resource_key format: {resource_key}")) + # continue + # try: + # resource_value_dict = json.loads(resource_value) + # dpid = int(resource_value_dict["dpid"], 16) + # in_port = int(resource_value_dict["in-port"].split("-")[1][3:]) + # out_port = int(resource_value_dict["out-port"].split("-")[1][3:]) + # except (KeyError, ValueError, IndexError) as e: + # LOGGER.error(f"Invalid resource_value: {resource_value}, error: {e}") + # results.append(Exception(f"Invalid resource_value: {resource_value}")) + # continue + # flow_entry = { + # "dpid": dpid, + # "table_id": 0, + # "priority": 11111, + # "match": {"in_port": in_port}, + # } + # try: + # response = requests.post(url, json=flow_entry, timeout=self.__timeout, verify=False, auth=self.__auth) + # response.raise_for_status() + # results.append(True) + # LOGGER.info(f"Successfully posted flow entry: {flow_entry}") + # except requests.exceptions.Timeout: + # LOGGER.error(f"Timeout connecting to {url}") + # results.append(Exception(f"Timeout connecting to {url}")) + # except requests.exceptions.RequestException as e: + # LOGGER.error(f"Error posting flow entry {flow_entry} to {url}: {e}") + # results.append(e) + # except Exception as e: + # LOGGER.error(f"Error processing resource {resource_key}: {e}", exc_info=True) + # results.append(e) + # return results # # -# -# @metered_subclass_method(METRICS_POOL) -# def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: -# # TODO: TAPI does not support monitoring by now +## @metered_subclass_method(METRICS_POOL) +## def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: +## # TODO: TAPI does not support monitoring by now # return [False for _ in subscriptions] # # @metered_subclass_method(METRICS_POOL) diff --git a/src/service/service/service_handlers/l3nm_ryu/L3NMryuServiceHandler.py b/src/service/service/service_handlers/l3nm_ryu/L3NMryuServiceHandler.py index 29e73c42eca36697e311930ea4f32fdacea3dd3d..493ddd60ebf241f551a5a887b983ecaa417f15b8 100644 --- a/src/service/service/service_handlers/l3nm_ryu/L3NMryuServiceHandler.py +++ b/src/service/service/service_handlers/l3nm_ryu/L3NMryuServiceHandler.py @@ -17,6 +17,7 @@ from venv import logger from re import L from typing import Any, Dict, List, Optional, Tuple, Union +from py import log from pytest import skip from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.proto.context_pb2 import ConfigRule, Device, DeviceId, EndPoint, Service,ConfigRule_Custom,ConfigActionEnum @@ -30,6 +31,7 @@ from service.service.service_handler_api._ServiceHandler import _ServiceHandler from service.service.service_handler_api.SettingsHandler import SettingsHandler from service.service.task_scheduler.TaskExecutor import TaskExecutor import requests +import re logging.basicConfig(level=logging.DEBUG) LOGGER = logging.getLogger(__name__) @@ -80,6 +82,23 @@ class RYUServiceHandler(_ServiceHandler): return [] service_uuid = self.__service.service_id.service_uuid.uuid service_name= self.__service.name + service_configuration_rules=self.__service.service_config.config_rules + LOGGER.debug('service_configuration_rules = {:s}'.format(str(service_configuration_rules))) + ip_addresses = [] + mac_addresses = [] + for rule in service_configuration_rules: + try: + custom_field = rule.custom + resource_value_str = custom_field.resource_value + resource_value = json.loads(resource_value_str) + ip_address = resource_value.get("address_ip") + mac_address = resource_value.get("mac_address") + ip_addresses.append(ip_address) + mac_addresses.append(mac_address) + except Exception as e: + print(f"Error parsing rule: {e}, Rule: {rule}") + LOGGER.debug('ip_address = {:s}'.format(str(ip_addresses))) + LOGGER.debug('mac_address = {:s}'.format(str(mac_addresses))) LOGGER.debug('service_name = {:s}'.format(str(service_name))) #LOGGER.debug('service_uuid = {:s}'.format(str(service_uuid))) #LOGGER.debug('self.__settings_handler = {:s}'.format(str(self.__settings_handler.dump_config_rules()))) @@ -88,6 +107,7 @@ class RYUServiceHandler(_ServiceHandler): src_device, src_endpoint, = self._get_endpoint_details(endpoints[0]) dst_device, dst_endpoint, = self._get_endpoint_details(endpoints[-1]) src_controller = self.__task_executor.get_device_controller(src_device) + #LOGGER.debug(f"Source controller: {src_controller.device_config.config_rules}") del src_controller.device_config.config_rules[:] for index in range(len(endpoints) - 1): @@ -99,9 +119,24 @@ class RYUServiceHandler(_ServiceHandler): in_port_forward = current_endpoint.name out_port_forward = next_endpoint.name flow_split = service_name.split('-') + dpid_src = int(current_device.name) + LOGGER.debug(f"DPID source: {dpid_src}") + dpid_dst = int(next_device.name) + LOGGER.debug(f"DPID destination: {dpid_dst}") flow_rule_forward = f"{flow_split[0]}-{flow_split[2]}" flow_rule_reverse = f"{flow_split[2]}-{flow_split[0]}" - forward_resource_value = ({"dpid": current_device.name, "in-port": in_port_forward, "out-port": out_port_forward}) + ip_address_source = ip_addresses[0] + ip_address_destination = ip_addresses[1] + mac_address_source = mac_addresses[0] + mac_address_destination = mac_addresses[1] + forward_resource_value = ({"dpid": current_device.name, + "in-port": in_port_forward, + "out-port": out_port_forward, + "ip_address_source": ip_address_source, + "ip_address_destination": ip_address_destination, + "mac_address_source": mac_address_source, + "mac_address_destination": mac_address_destination + }) forward_rule = json_config_rule_set ( resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_forward}]", resource_value=forward_resource_value @@ -109,8 +144,16 @@ class RYUServiceHandler(_ServiceHandler): LOGGER.debug(f"Forward configuration rule: {forward_rule}") src_controller.device_config.config_rules.append(ConfigRule(**forward_rule)) in_port_reverse = next_endpoint.name - out_port_reverse = current_endpoint.name - reverse_resource_value = ({"dpid": current_device.name, "in-port": in_port_reverse, "out-port": out_port_reverse}) + out_port_reverse = current_endpoint.name + reverse_resource_value = { + "dpid": current_device.name, + "in-port": in_port_reverse, + "out-port": out_port_reverse, + "ip_address_source": ip_address_destination, + "ip_address_destination": ip_address_source, + "mac_address_source": mac_address_destination, + "mac_address_destination": mac_address_source + } reverse_rule = json_config_rule_set( resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_reverse}]", resource_value=reverse_resource_value @@ -137,5 +180,42 @@ class RYUServiceHandler(_ServiceHandler): except Exception as e: LOGGER.error(f"Error in SetEndpoint: {e}") return [e] + + @metered_subclass_method(METRICS_POOL) + def DeleteEndpoint( + self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None + ) -> List[Union[bool, Exception]]: + LOGGER.debug('endpoints = {:s}'.format(str(endpoints))) + chk_type('endpoints', endpoints, list) + if len(endpoints) < 2: return [] + service_uuid = self.__service.service_id.service_uuid.uuid + service_name= self.__service.name + results = [] + try: + src_device_uuid, _ = get_device_endpoint_uuids(endpoints[0]) + src_device = self.__task_executor.get_device(DeviceId(**json_device_id(src_device_uuid))) + src_controller = self.__task_executor.get_device_controller(src_device) + + dst_device_uuid, _ = get_device_endpoint_uuids(endpoints[1]) + dst_device = self.__task_executor.get_device(DeviceId(**json_device_id(dst_device_uuid))) + dst_controller = self.__task_executor.get_device_controller(dst_device) + + if src_controller.device_id.device_uuid.uuid != dst_controller.device_id.device_uuid.uuid: + raise Exception('Different Src-Dst devices not supported by now') + controller = src_controller + + json_config_rule = json_config_rule_delete('/services/service[{:s}]'.format(service_uuid), { + 'uuid': service_uuid + }) + del controller.device_config.config_rules[:] + controller.device_config.config_rules.append(ConfigRule(**json_config_rule)) + self.__task_executor.configure_device(controller) + results.append(True) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Unable to DeleteEndpoint for Service({:s})'.format(str(service_uuid))) + results.append(e) + + return results + + -