Skip to content
Snippets Groups Projects
Commit ba897132 authored by Mohamad Rahhal's avatar Mohamad Rahhal
Browse files

Device-Service component - Ryu - Driver - Service Handler:

- Added Connectivity Service
parent e21ed3e1
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!296Resolve "(CTTC) Add OpenFlow support through Ryu SDN controller"
...@@ -43,6 +43,8 @@ class OpenFlowDriver(_Driver): ...@@ -43,6 +43,8 @@ class OpenFlowDriver(_Driver):
self.__base_url = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port)) self.__base_url = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port))
self.__timeout = int(self.settings.get('timeout', 120)) self.__timeout = int(self.settings.get('timeout', 120))
self.tac = TfsApiClient(self.address, int(self.port), scheme=scheme, username=username, password=password) self.tac = TfsApiClient(self.address, int(self.port), scheme=scheme, username=username, password=password)
self.__cookie_counter = 0
self._priority_counter = 1
def Connect(self) -> bool: def Connect(self) -> bool:
url = f"{self.__base_url}" url = f"{self.__base_url}"
...@@ -89,120 +91,208 @@ class OpenFlowDriver(_Driver): ...@@ -89,120 +91,208 @@ class OpenFlowDriver(_Driver):
results.append((resource_key, e)) results.append((resource_key, e))
return results return results
# @metered_subclass_method(METRICS_POOL)
# def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
# chk_type('resources', resource_keys, list)
# results = []
# with self.__lock:
# for key in resource_keys:
# try:
# if key.startswith('flows:'):
# dpid = key.split(':', 1)[1]
# flows = get_flows(self.__base_url, dpid, auth=self.__auth, timeout=self.__timeout)
# results.append((key, flows))
# elif key.startswith('description:'):
# dpid = key.split(':', 1)[1]
# desc = get_desc(self.__base_url, dpid, auth=self.__auth, timeout=self.__timeout)
# results.append((key, desc))
# elif key.startswith('switches'):
# switches = get_switches(self.__base_url, auth=self.__auth, timeout=self.__timeout)
# results.append((key, switches))
# elif key.startswith('port_description:'):
# dpid = key.split(':', 1)[1]
# desc = get_port_desc(self.__base_url,dpid, auth=self.__auth, timeout=self.__timeout)
# results.append((key, desc))
# elif key.startswith('switch_info'):
# sin = get_switches_information(self.__base_url, auth=self.__auth, timeout=self.__timeout)
# results.append((key, sin))
# elif key.startswith('links_info'):
# lin = get_links_information(self.__base_url, auth=self.__auth, timeout=self.__timeout)
# results.append((key, lin))
# else:
# results.append((key, None)) # If key not handled, append None
# except Exception as e:
# results.append((key, e))
# return results
#
# @metered_subclass_method(METRICS_POOL)
# def DeleteConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
# chk_type('resources', resource_keys, list)
# results = []
# with self.__lock:
# for item in resource_keys:
# try:
# if isinstance(item, tuple):
# key, data = item
# else:
# key, data = item, None
# if key.startswith('flowentry_delete:'):
# dpid = key.split(':', 1)[1]
# flows = del_flow_entry(self.__base_url, dpid, auth=self.__auth, timeout=self.__timeout)
# results.append((key, flows))
# elif key=='flow_data' and data:
# flow_del = delete_flow (self.__base_url,data,auth=self.__auth, timeout=self.__timeout)
# results.append((key, flow_del))
# else:
# results.append((key, None))
# except Exception as e:
# results.append((key, e))
# return results
#
@metered_subclass_method(METRICS_POOL) @metered_subclass_method(METRICS_POOL)
def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
url = f"{self.__base_url}/stats/flowentry/add" url = f"{self.__base_url}/stats/flowentry/add"
results = [] results = []
LOGGER.info(f"SetConfig_resources: {resources}") LOGGER.info(f"SetConfig_resources: {resources}")
if not resources: if not resources:
return results return results
with self.__lock: with self.__lock:
for resource in resources: for resource in resources:
try: try:
resource_key, resource_value = resource resource_key, resource_value = resource
resource_value_dict = json.loads(resource_value)
dpid = int(resource_value_dict["dpid"], 16) if not resource_key.startswith("/device[") or not "/flow[" in resource_key:
in_port = int(resource_value_dict["in-port"].split("-")[1][3:]) LOGGER.error(f"Invalid resource_key format: {resource_key}")
out_port = int(resource_value_dict["out-port"].split("-")[1][3:]) results.append(Exception(f"Invalid resource_key format: {resource_key}"))
flow_entry = { continue
"dpid": dpid,
"cookie": 0, try:
"priority": 32768, resource_value_dict = json.loads(resource_value)
"match": { LOGGER.info(f"resource_value_dict: {resource_value_dict}")
"in_port": in_port dpid = int(resource_value_dict["dpid"], 16)
}, in_port = int(resource_value_dict["in-port"].split("-")[1][3:])
"actions": [ out_port = int(resource_value_dict["out-port"].split("-")[1][3:])
{ self.__cookie_counter += 1
"type": "OUTPUT", cookie = self.__cookie_counter
"port": out_port ip_address_source = resource_value_dict.get("ip_address_source", "")
} ip_address_destination = resource_value_dict.get("ip_address_destination", "")
] mac_address_source = resource_value_dict.get("mac_address_source", "")
} mac_address_destination = resource_value_dict.get("mac_address_destination", "")
try:
response = requests.post(url,json=flow_entry,timeout=self.__timeout,verify=False,auth=self.__auth) # Default priority
response.raise_for_status() #priority = 1000
results.append(True)
LOGGER.info(f"Successfully posted flow entry: {flow_entry}") if "h1-h3" in resource_key:
except requests.exceptions.Timeout: priority = 65535
LOGGER.error(f"Timeout connecting to {url}") match_fields = {
results.append(Exception(f"Timeout connecting to {url}")) "in_port": in_port,
except requests.exceptions.RequestException as e: "eth_type": 0x0800,
LOGGER.error(f"Error posting to {url}: {e}") #"ipv4_src": ip_address_source ,
results.append(e) #"ipv4_dst": ip_address_destination,
else: "eth_src": mac_address_source,
LOGGER.warning(f"Skipped invalid resource_key: {resource_key}") "dl_dst": mac_address_destination
results.append(Exception(f"Invalid resource_key: {resource_key}")) }
elif "h3-h1" in resource_key:
priority = 65535
match_fields = {
"in_port": in_port,
"eth_type": 0x0800,
#"ipv4_src": ip_address_source,
#"ipv4_dst": ip_address_destination,
"eth_src": mac_address_source,
"dl_dst": mac_address_destination
}
elif "h2-h4" in resource_key:
priority = 1500
match_fields = {
"in_port": in_port,
"eth_type": 0x0800,
"ipv4_src": ip_address_source ,
"ipv4_dst": ip_address_destination,
"eth_src": mac_address_source,
"eth_dst": mac_address_destination
}
elif "h4-h2" in resource_key:
priority = 1500
match_fields = {
"in_port": in_port,
"eth_type": 0x0800,
"ipv4_src": ip_address_source,
"ipv4_dst": ip_address_destination,
"eth_src": mac_address_source,
"eth_dst": mac_address_destination
}
except (KeyError, ValueError, IndexError) as e:
LOGGER.error(f"Invalid resource_value: {resource_value}, error: {e}")
results.append(Exception(f"Invalid resource_value: {resource_value}"))
continue
LOGGER.debug(f"Flow match fields: {match_fields}")
flow_entry = {
"dpid": dpid,
#cookie": 0,
"priority": priority,
"match": match_fields,
"instructions": [
{
"type": "APPLY_ACTIONS",
"actions": [
{
"max_len": 65535,
"type": "OUTPUT",
"port": out_port
}]}]}
flow_entry_arp_foraward = {
"dpid": dpid,
"priority": 65535,
"match": {
"eth_dst": "ff:ff:ff:ff:ff:ff",
"eth_type": 0x0806
},
"instructions": [
{
"type": "APPLY_ACTIONS",
"actions": [
{
"type": "OUTPUT",
"port": "0xfffffffb"
}
]}]}
flow_entry_arp_reply = {
"dpid": dpid,
"priority": 65535,
"match": {
"eth_type": 0x0806,
"arp_op": 2
},
"instructions": [
{
"type": "APPLY_ACTIONS",
"actions": [
{
"type": "OUTPUT",
"port": "0xfffffffb"
}
]}]}
try:
response = requests.post(url, json=flow_entry_arp_foraward, timeout=self.__timeout, verify=False, auth=self.__auth)
response = requests.post(url, json=flow_entry_arp_reply, timeout=self.__timeout, verify=False, auth=self.__auth)
response = requests.post(url, json=flow_entry, timeout=self.__timeout, verify=False, auth=self.__auth)
response.raise_for_status()
results.append(True)
LOGGER.info(f"Successfully posted flow entry: {flow_entry}")
except requests.exceptions.Timeout:
LOGGER.error(f"Timeout connecting to {url}")
results.append(Exception(f"Timeout connecting to {url}"))
except requests.exceptions.RequestException as e:
LOGGER.error(f"Error posting flow entry {flow_entry} to {url}: {e}")
results.append(e)
except Exception as e: except Exception as e:
LOGGER.error(f"Error processing resource {resource_key}: {e}", exc_info=True) LOGGER.error(f"Error processing resource {resource_key}: {e}", exc_info=True)
results.append(e) results.append(e)
return results return results
#@metered_subclass_method(METRICS_POOL)
#def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
# LOGGER.info(f"DeleteConfig123_resources: {resources}")
# url = f"{self.__base_url}/stats/flowentry/delete_strict"
# results = []
# if not resources:
# return results
# with self.__lock:
# for resource in resources:
# try:
# resource_key, resource_value = resource
# if not resource_key.startswith("/device[") or not "/flow[" in resource_key:
# LOGGER.error(f"Invalid resource_key format: {resource_key}")
# results.append(Exception(f"Invalid resource_key format: {resource_key}"))
# continue
# try:
# resource_value_dict = json.loads(resource_value)
# dpid = int(resource_value_dict["dpid"], 16)
# in_port = int(resource_value_dict["in-port"].split("-")[1][3:])
# out_port = int(resource_value_dict["out-port"].split("-")[1][3:])
# except (KeyError, ValueError, IndexError) as e:
# LOGGER.error(f"Invalid resource_value: {resource_value}, error: {e}")
# results.append(Exception(f"Invalid resource_value: {resource_value}"))
# continue
# flow_entry = {
# "dpid": dpid,
# "table_id": 0,
# "priority": 11111,
# "match": {"in_port": in_port},
# }
# try:
# response = requests.post(url, json=flow_entry, timeout=self.__timeout, verify=False, auth=self.__auth)
# response.raise_for_status()
# results.append(True)
# LOGGER.info(f"Successfully posted flow entry: {flow_entry}")
# except requests.exceptions.Timeout:
# LOGGER.error(f"Timeout connecting to {url}")
# results.append(Exception(f"Timeout connecting to {url}"))
# except requests.exceptions.RequestException as e:
# LOGGER.error(f"Error posting flow entry {flow_entry} to {url}: {e}")
# results.append(e)
# except Exception as e:
# LOGGER.error(f"Error processing resource {resource_key}: {e}", exc_info=True)
# results.append(e)
# return results
# #
# #
# ## @metered_subclass_method(METRICS_POOL)
# @metered_subclass_method(METRICS_POOL) ## def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
# def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: ## # TODO: TAPI does not support monitoring by now
# # TODO: TAPI does not support monitoring by now
# return [False for _ in subscriptions] # return [False for _ in subscriptions]
# #
# @metered_subclass_method(METRICS_POOL) # @metered_subclass_method(METRICS_POOL)
......
...@@ -17,6 +17,7 @@ from venv import logger ...@@ -17,6 +17,7 @@ from venv import logger
from re import L from re import L
from typing import Any, Dict, List, Optional, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union
from py import log
from pytest import skip from pytest import skip
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.proto.context_pb2 import ConfigRule, Device, DeviceId, EndPoint, Service,ConfigRule_Custom,ConfigActionEnum from common.proto.context_pb2 import ConfigRule, Device, DeviceId, EndPoint, Service,ConfigRule_Custom,ConfigActionEnum
...@@ -30,6 +31,7 @@ from service.service.service_handler_api._ServiceHandler import _ServiceHandler ...@@ -30,6 +31,7 @@ from service.service.service_handler_api._ServiceHandler import _ServiceHandler
from service.service.service_handler_api.SettingsHandler import SettingsHandler from service.service.service_handler_api.SettingsHandler import SettingsHandler
from service.service.task_scheduler.TaskExecutor import TaskExecutor from service.service.task_scheduler.TaskExecutor import TaskExecutor
import requests import requests
import re
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -80,6 +82,23 @@ class RYUServiceHandler(_ServiceHandler): ...@@ -80,6 +82,23 @@ class RYUServiceHandler(_ServiceHandler):
return [] return []
service_uuid = self.__service.service_id.service_uuid.uuid service_uuid = self.__service.service_id.service_uuid.uuid
service_name= self.__service.name service_name= self.__service.name
service_configuration_rules=self.__service.service_config.config_rules
LOGGER.debug('service_configuration_rules = {:s}'.format(str(service_configuration_rules)))
ip_addresses = []
mac_addresses = []
for rule in service_configuration_rules:
try:
custom_field = rule.custom
resource_value_str = custom_field.resource_value
resource_value = json.loads(resource_value_str)
ip_address = resource_value.get("address_ip")
mac_address = resource_value.get("mac_address")
ip_addresses.append(ip_address)
mac_addresses.append(mac_address)
except Exception as e:
print(f"Error parsing rule: {e}, Rule: {rule}")
LOGGER.debug('ip_address = {:s}'.format(str(ip_addresses)))
LOGGER.debug('mac_address = {:s}'.format(str(mac_addresses)))
LOGGER.debug('service_name = {:s}'.format(str(service_name))) LOGGER.debug('service_name = {:s}'.format(str(service_name)))
#LOGGER.debug('service_uuid = {:s}'.format(str(service_uuid))) #LOGGER.debug('service_uuid = {:s}'.format(str(service_uuid)))
#LOGGER.debug('self.__settings_handler = {:s}'.format(str(self.__settings_handler.dump_config_rules()))) #LOGGER.debug('self.__settings_handler = {:s}'.format(str(self.__settings_handler.dump_config_rules())))
...@@ -88,6 +107,7 @@ class RYUServiceHandler(_ServiceHandler): ...@@ -88,6 +107,7 @@ class RYUServiceHandler(_ServiceHandler):
src_device, src_endpoint, = self._get_endpoint_details(endpoints[0]) src_device, src_endpoint, = self._get_endpoint_details(endpoints[0])
dst_device, dst_endpoint, = self._get_endpoint_details(endpoints[-1]) dst_device, dst_endpoint, = self._get_endpoint_details(endpoints[-1])
src_controller = self.__task_executor.get_device_controller(src_device) src_controller = self.__task_executor.get_device_controller(src_device)
#LOGGER.debug(f"Source controller: {src_controller.device_config.config_rules}")
del src_controller.device_config.config_rules[:] del src_controller.device_config.config_rules[:]
for index in range(len(endpoints) - 1): for index in range(len(endpoints) - 1):
...@@ -99,9 +119,24 @@ class RYUServiceHandler(_ServiceHandler): ...@@ -99,9 +119,24 @@ class RYUServiceHandler(_ServiceHandler):
in_port_forward = current_endpoint.name in_port_forward = current_endpoint.name
out_port_forward = next_endpoint.name out_port_forward = next_endpoint.name
flow_split = service_name.split('-') flow_split = service_name.split('-')
dpid_src = int(current_device.name)
LOGGER.debug(f"DPID source: {dpid_src}")
dpid_dst = int(next_device.name)
LOGGER.debug(f"DPID destination: {dpid_dst}")
flow_rule_forward = f"{flow_split[0]}-{flow_split[2]}" flow_rule_forward = f"{flow_split[0]}-{flow_split[2]}"
flow_rule_reverse = f"{flow_split[2]}-{flow_split[0]}" flow_rule_reverse = f"{flow_split[2]}-{flow_split[0]}"
forward_resource_value = ({"dpid": current_device.name, "in-port": in_port_forward, "out-port": out_port_forward}) ip_address_source = ip_addresses[0]
ip_address_destination = ip_addresses[1]
mac_address_source = mac_addresses[0]
mac_address_destination = mac_addresses[1]
forward_resource_value = ({"dpid": current_device.name,
"in-port": in_port_forward,
"out-port": out_port_forward,
"ip_address_source": ip_address_source,
"ip_address_destination": ip_address_destination,
"mac_address_source": mac_address_source,
"mac_address_destination": mac_address_destination
})
forward_rule = json_config_rule_set ( forward_rule = json_config_rule_set (
resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_forward}]", resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_forward}]",
resource_value=forward_resource_value resource_value=forward_resource_value
...@@ -109,8 +144,16 @@ class RYUServiceHandler(_ServiceHandler): ...@@ -109,8 +144,16 @@ class RYUServiceHandler(_ServiceHandler):
LOGGER.debug(f"Forward configuration rule: {forward_rule}") LOGGER.debug(f"Forward configuration rule: {forward_rule}")
src_controller.device_config.config_rules.append(ConfigRule(**forward_rule)) src_controller.device_config.config_rules.append(ConfigRule(**forward_rule))
in_port_reverse = next_endpoint.name in_port_reverse = next_endpoint.name
out_port_reverse = current_endpoint.name out_port_reverse = current_endpoint.name
reverse_resource_value = ({"dpid": current_device.name, "in-port": in_port_reverse, "out-port": out_port_reverse}) reverse_resource_value = {
"dpid": current_device.name,
"in-port": in_port_reverse,
"out-port": out_port_reverse,
"ip_address_source": ip_address_destination,
"ip_address_destination": ip_address_source,
"mac_address_source": mac_address_destination,
"mac_address_destination": mac_address_source
}
reverse_rule = json_config_rule_set( reverse_rule = json_config_rule_set(
resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_reverse}]", resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_reverse}]",
resource_value=reverse_resource_value resource_value=reverse_resource_value
...@@ -137,5 +180,42 @@ class RYUServiceHandler(_ServiceHandler): ...@@ -137,5 +180,42 @@ class RYUServiceHandler(_ServiceHandler):
except Exception as e: except Exception as e:
LOGGER.error(f"Error in SetEndpoint: {e}") LOGGER.error(f"Error in SetEndpoint: {e}")
return [e] return [e]
@metered_subclass_method(METRICS_POOL)
def DeleteEndpoint(
self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None
) -> List[Union[bool, Exception]]:
LOGGER.debug('endpoints = {:s}'.format(str(endpoints)))
chk_type('endpoints', endpoints, list)
if len(endpoints) < 2: return []
service_uuid = self.__service.service_id.service_uuid.uuid
service_name= self.__service.name
results = []
try:
src_device_uuid, _ = get_device_endpoint_uuids(endpoints[0])
src_device = self.__task_executor.get_device(DeviceId(**json_device_id(src_device_uuid)))
src_controller = self.__task_executor.get_device_controller(src_device)
dst_device_uuid, _ = get_device_endpoint_uuids(endpoints[1])
dst_device = self.__task_executor.get_device(DeviceId(**json_device_id(dst_device_uuid)))
dst_controller = self.__task_executor.get_device_controller(dst_device)
if src_controller.device_id.device_uuid.uuid != dst_controller.device_id.device_uuid.uuid:
raise Exception('Different Src-Dst devices not supported by now')
controller = src_controller
json_config_rule = json_config_rule_delete('/services/service[{:s}]'.format(service_uuid), {
'uuid': service_uuid
})
del controller.device_config.config_rules[:]
controller.device_config.config_rules.append(ConfigRule(**json_config_rule))
self.__task_executor.configure_device(controller)
results.append(True)
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Unable to DeleteEndpoint for Service({:s})'.format(str(service_uuid)))
results.append(e)
return results
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment