Skip to content
Snippets Groups Projects
Commit 4a934dbe authored by Mohamad Rahhal's avatar Mohamad Rahhal
Browse files

Device-Service component - Ryu - Driver - Service Handler:

- Completed Driver and Service
parent ba897132
Branches
Tags
2 merge requests!359Release TeraFlowSDN 5.0,!296Resolve "(CTTC) Add OpenFlow support through Ryu SDN controller"
...@@ -123,16 +123,13 @@ class OpenFlowDriver(_Driver): ...@@ -123,16 +123,13 @@ class OpenFlowDriver(_Driver):
mac_address_source = resource_value_dict.get("mac_address_source", "") mac_address_source = resource_value_dict.get("mac_address_source", "")
mac_address_destination = resource_value_dict.get("mac_address_destination", "") mac_address_destination = resource_value_dict.get("mac_address_destination", "")
# Default priority
#priority = 1000
if "h1-h3" in resource_key: if "h1-h3" in resource_key:
priority = 65535 priority = 65535
match_fields = { match_fields = {
"in_port": in_port, "in_port": in_port,
"eth_type": 0x0800, "eth_type": 0x0800,
#"ipv4_src": ip_address_source , "ipv4_src": ip_address_source ,
#"ipv4_dst": ip_address_destination, "ipv4_dst": ip_address_destination,
"eth_src": mac_address_source, "eth_src": mac_address_source,
"dl_dst": mac_address_destination "dl_dst": mac_address_destination
} }
...@@ -141,8 +138,8 @@ class OpenFlowDriver(_Driver): ...@@ -141,8 +138,8 @@ class OpenFlowDriver(_Driver):
match_fields = { match_fields = {
"in_port": in_port, "in_port": in_port,
"eth_type": 0x0800, "eth_type": 0x0800,
#"ipv4_src": ip_address_source, "ipv4_src": ip_address_source,
#"ipv4_dst": ip_address_destination, "ipv4_dst": ip_address_destination,
"eth_src": mac_address_source, "eth_src": mac_address_source,
"dl_dst": mac_address_destination "dl_dst": mac_address_destination
} }
...@@ -177,7 +174,6 @@ class OpenFlowDriver(_Driver): ...@@ -177,7 +174,6 @@ class OpenFlowDriver(_Driver):
flow_entry = { flow_entry = {
"dpid": dpid, "dpid": dpid,
#cookie": 0,
"priority": priority, "priority": priority,
"match": match_fields, "match": match_fields,
"instructions": [ "instructions": [
...@@ -243,53 +239,163 @@ class OpenFlowDriver(_Driver): ...@@ -243,53 +239,163 @@ class OpenFlowDriver(_Driver):
#@metered_subclass_method(METRICS_POOL) @metered_subclass_method(METRICS_POOL)
#def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
# LOGGER.info(f"DeleteConfig123_resources: {resources}") LOGGER.info(f"DeleteConfig called for resources: {resources}")
# url = f"{self.__base_url}/stats/flowentry/delete_strict" url = f"{self.__base_url}/stats/flowentry/delete_strict"
# results = [] results = []
# if not resources: if not resources:
# return results return results
# with self.__lock: with self.__lock:
# for resource in resources: for resource in resources:
# try: try:
# resource_key, resource_value = resource resource_key, resource_value = resource
# if not resource_key.startswith("/device[") or not "/flow[" in resource_key:
# LOGGER.error(f"Invalid resource_key format: {resource_key}") if not resource_key.startswith("/device[") or not "/flow[" in resource_key:
# results.append(Exception(f"Invalid resource_key format: {resource_key}")) LOGGER.error(f"Invalid resource_key format: {resource_key}")
# continue results.append(Exception(f"Invalid resource_key format: {resource_key}"))
# try: continue
# resource_value_dict = json.loads(resource_value)
# dpid = int(resource_value_dict["dpid"], 16) try:
# in_port = int(resource_value_dict["in-port"].split("-")[1][3:]) resource_value_dict = json.loads(resource_value)
# out_port = int(resource_value_dict["out-port"].split("-")[1][3:]) LOGGER.info(f"resource_value_dict: {resource_value_dict}")
# except (KeyError, ValueError, IndexError) as e: dpid = int(resource_value_dict["dpid"], 16)
# LOGGER.error(f"Invalid resource_value: {resource_value}, error: {e}") in_port = int(resource_value_dict["in-port"].split("-")[1][3:])
# results.append(Exception(f"Invalid resource_value: {resource_value}")) out_port = int(resource_value_dict["out-port"].split("-")[1][3:])
# continue self.__cookie_counter += 1
# flow_entry = { cookie = self.__cookie_counter
# "dpid": dpid, ip_address_source = resource_value_dict.get("ip_address_source", "")
# "table_id": 0, ip_address_destination = resource_value_dict.get("ip_address_destination", "")
# "priority": 11111, mac_address_source = resource_value_dict.get("mac_address_source", "")
# "match": {"in_port": in_port}, mac_address_destination = resource_value_dict.get("mac_address_destination", "")
# }
# try: if "h1-h3" in resource_key:
# response = requests.post(url, json=flow_entry, timeout=self.__timeout, verify=False, auth=self.__auth) priority = 65535
# response.raise_for_status() match_fields = {
# results.append(True) "in_port": in_port,
# LOGGER.info(f"Successfully posted flow entry: {flow_entry}") "eth_type": 0x0800,
# except requests.exceptions.Timeout: "ipv4_src": ip_address_source ,
# LOGGER.error(f"Timeout connecting to {url}") "ipv4_dst": ip_address_destination,
# results.append(Exception(f"Timeout connecting to {url}")) "eth_src": mac_address_source,
# except requests.exceptions.RequestException as e: "dl_dst": mac_address_destination,
# LOGGER.error(f"Error posting flow entry {flow_entry} to {url}: {e}") "table_id": 0,
# results.append(e) "cookie": 0,
# except Exception as e: "cookie_mask": 0,
# LOGGER.error(f"Error processing resource {resource_key}: {e}", exc_info=True) }
# results.append(e) elif "h3-h1" in resource_key:
# return results priority = 65535
# match_fields = {
# "in_port": in_port,
"eth_type": 0x0800,
"ipv4_src": ip_address_source,
"ipv4_dst": ip_address_destination,
"eth_src": mac_address_source,
"dl_dst": mac_address_destination,
"table_id": 0,
"cookie": 0,
"cookie_mask": 0,
}
elif "h2-h4" in resource_key:
priority = 1500
match_fields = {
"in_port": in_port,
"eth_type": 0x0800,
"ipv4_src": ip_address_source ,
"ipv4_dst": ip_address_destination,
"eth_src": mac_address_source,
"eth_dst": mac_address_destination,
"table_id": 0,
"cookie": 0,
"cookie_mask": 0,
}
elif "h4-h2" in resource_key:
priority = 1500
match_fields = {
"in_port": in_port,
"eth_type": 0x0800,
"ipv4_src": ip_address_source,
"ipv4_dst": ip_address_destination,
"eth_src": mac_address_source,
"eth_dst": mac_address_destination,
"table_id": 0,
"cookie": 0,
"cookie_mask": 0,
}
except (KeyError, ValueError, IndexError) as e:
LOGGER.error(f"Invalid resource_value: {resource_value}, error: {e}")
results.append(Exception(f"Invalid resource_value: {resource_value}"))
continue
LOGGER.debug(f"Flow match fields: {match_fields}")
flow_entry = {
"dpid": dpid,
#cookie": 0,
"priority": priority,
"match": match_fields,
"instructions": [
{
"type": "APPLY_ACTIONS",
"actions": [
{
"max_len": 65535,
"type": "OUTPUT",
"port": out_port
}]}]}
flow_entry_arp_foraward = {
"dpid": dpid,
"priority": 65535,
"match": {
"eth_dst": "ff:ff:ff:ff:ff:ff",
"eth_type": 0x0806
},
"instructions": [
{
"type": "APPLY_ACTIONS",
"actions": [
{
"type": "OUTPUT",
"port": "0xfffffffb"
}
]}]}
flow_entry_arp_reply = {
"dpid": dpid,
"priority": 65535,
"match": {
"eth_type": 0x0806,
"arp_op": 2
},
"instructions": [
{
"type": "APPLY_ACTIONS",
"actions": [
{
"type": "OUTPUT",
"port": "0xfffffffb"
}
]}]}
try:
response = requests.post(url, json=flow_entry_arp_foraward, timeout=self.__timeout, verify=False, auth=self.__auth)
response = requests.post(url, json=flow_entry_arp_reply, timeout=self.__timeout, verify=False, auth=self.__auth)
response = requests.post(url, json=flow_entry, timeout=self.__timeout, verify=False, auth=self.__auth)
response.raise_for_status()
results.append(True)
LOGGER.info(f"Successfully posted flow entry: {flow_entry}")
except requests.exceptions.Timeout:
LOGGER.error(f"Timeout connecting to {url}")
results.append(Exception(f"Timeout connecting to {url}"))
except requests.exceptions.RequestException as e:
LOGGER.error(f"Error posting flow entry {flow_entry} to {url}: {e}")
results.append(e)
except Exception as e:
LOGGER.error(f"Error processing resource {resource_key}: {e}", exc_info=True)
results.append(e)
return results
## @metered_subclass_method(METRICS_POOL) ## @metered_subclass_method(METRICS_POOL)
## def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: ## def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
## # TODO: TAPI does not support monitoring by now ## # TODO: TAPI does not support monitoring by now
......
...@@ -16,13 +16,13 @@ import json, logging, netaddr ...@@ -16,13 +16,13 @@ import json, logging, netaddr
from venv import logger from venv import logger
from re import L from re import L
from typing import Any, Dict, List, Optional, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union
from py import log from py import log
from pyparsing import C
from pytest import skip from pytest import skip
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.proto.context_pb2 import ConfigRule, Device, DeviceId, EndPoint, Service,ConfigRule_Custom,ConfigActionEnum from common.proto.context_pb2 import ConfigRule, Device, DeviceId, EndPoint, Service,ConfigRule_Custom,ConfigActionEnum
from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.ConfigRule import json_config_rule_delete, json_config_rule_set from common.tools.object_factory.ConfigRule import json_config_rule_delete, json_config_rule_set,json_config_rule
from common.tools.object_factory.Device import json_device_id from common.tools.object_factory.Device import json_device_id
from common.type_checkers.Checkers import chk_type from common.type_checkers.Checkers import chk_type
#from nbi.service.rest_server.nbi_plugins.ietf_network.bindings.networks.network.link import destination #from nbi.service.rest_server.nbi_plugins.ietf_network.bindings.networks.network.link import destination
...@@ -185,37 +185,112 @@ class RYUServiceHandler(_ServiceHandler): ...@@ -185,37 +185,112 @@ class RYUServiceHandler(_ServiceHandler):
def DeleteEndpoint( def DeleteEndpoint(
self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None
) -> List[Union[bool, Exception]]: ) -> List[Union[bool, Exception]]:
LOGGER.debug('endpoints = {:s}'.format(str(endpoints))) LOGGER.debug('endpoints_delete = {:s}'.format(str(endpoints)))
chk_type('endpoints', endpoints, list) chk_type('endpoints', endpoints, list)
if len(endpoints) < 2: return [] if len(endpoints) < 2:
LOGGER.warning('nothing done: not enough endpoints')
return []
service_uuid = self.__service.service_id.service_uuid.uuid service_uuid = self.__service.service_id.service_uuid.uuid
service_name= self.__service.name service_name= self.__service.name
service_configuration_rules=self.__service.service_config.config_rules
#LOGGER.debug('service_configuration_rules = {:s}'.format(str(service_configuration_rules)))
ip_addresses = []
mac_addresses = []
for rule in service_configuration_rules:
try:
custom_field = rule.custom
resource_value_str = custom_field.resource_value
resource_value = json.loads(resource_value_str)
ip_address = resource_value.get("address_ip")
mac_address = resource_value.get("mac_address")
ip_addresses.append(ip_address)
mac_addresses.append(mac_address)
except Exception as e:
print(f"Error parsing rule: {e}, Rule: {rule}")
LOGGER.debug('ip_address = {:s}'.format(str(ip_addresses)))
LOGGER.debug('mac_address = {:s}'.format(str(mac_addresses)))
LOGGER.debug('service_name = {:s}'.format(str(service_name)))
#LOGGER.debug('service_uuid = {:s}'.format(str(service_uuid)))
#LOGGER.debug('self.__settings_handler = {:s}'.format(str(self.__settings_handler.dump_config_rules())))
results = [] results = []
try: try:
src_device_uuid, _ = get_device_endpoint_uuids(endpoints[0]) src_device, src_endpoint, = self._get_endpoint_details(endpoints[0])
src_device = self.__task_executor.get_device(DeviceId(**json_device_id(src_device_uuid))) dst_device, dst_endpoint, = self._get_endpoint_details(endpoints[-1])
src_controller = self.__task_executor.get_device_controller(src_device) src_controller = self.__task_executor.get_device_controller(src_device)
#LOGGER.debug(f"Source controller: {src_controller.device_config.config_rules}")
del src_controller.device_config.config_rules[:]
for index in range(len(endpoints) - 1):
current_device, current_endpoint = self._get_endpoint_details(endpoints[index])
#LOGGER.debug(f"Current device: {current_device.name}, Current endpoint: {current_endpoint.name}")
next_device, next_endpoint = self._get_endpoint_details(endpoints[index + 1])
#LOGGER.debug(f"Next device: {next_device.name}, Next endpoint: {next_endpoint.name}")
if current_device.name == next_device.name:
in_port_forward = current_endpoint.name
out_port_forward = next_endpoint.name
flow_split = service_name.split('-')
dpid_src = int(current_device.name)
LOGGER.debug(f"DPID source: {dpid_src}")
dpid_dst = int(next_device.name)
LOGGER.debug(f"DPID destination: {dpid_dst}")
flow_rule_forward = f"{flow_split[0]}-{flow_split[2]}"
flow_rule_reverse = f"{flow_split[2]}-{flow_split[0]}"
ip_address_source = ip_addresses[0]
ip_address_destination = ip_addresses[1]
mac_address_source = mac_addresses[0]
mac_address_destination = mac_addresses[1]
forward_resource_value = ({"dpid": current_device.name,
"in-port": in_port_forward,
"out-port": out_port_forward,
"ip_address_source": ip_address_source,
"ip_address_destination": ip_address_destination,
"mac_address_source": mac_address_source,
"mac_address_destination": mac_address_destination
})
forward_rule = json_config_rule_delete (
resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_forward}]",
resource_value=forward_resource_value
)
LOGGER.debug(f"Forward configuration rule: {forward_rule}")
in_port_reverse = next_endpoint.name
out_port_reverse = current_endpoint.name
reverse_resource_value = {
"dpid": current_device.name,
"in-port": in_port_reverse,
"out-port": out_port_reverse,
"ip_address_source": ip_address_destination,
"ip_address_destination": ip_address_source,
"mac_address_source": mac_address_destination,
"mac_address_destination": mac_address_source
}
reverse_rule = json_config_rule_delete(
resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_reverse}]",
resource_value=reverse_resource_value
)
LOGGER.debug(f"Reverse configuration rule: {reverse_rule}")
src_controller.device_config.config_rules.append(ConfigRule(**reverse_rule))
src_controller.device_config.config_rules.append(ConfigRule(**forward_rule))
dst_device_uuid, _ = get_device_endpoint_uuids(endpoints[1]) json_config_rule_delete_1 = json_config_rule_delete('/services/service[{:s}]'.format(service_uuid), {
dst_device = self.__task_executor.get_device(DeviceId(**json_device_id(dst_device_uuid)))
dst_controller = self.__task_executor.get_device_controller(dst_device)
if src_controller.device_id.device_uuid.uuid != dst_controller.device_id.device_uuid.uuid:
raise Exception('Different Src-Dst devices not supported by now')
controller = src_controller
json_config_rule = json_config_rule_delete('/services/service[{:s}]'.format(service_uuid), {
'uuid': service_uuid 'uuid': service_uuid
}) })
del controller.device_config.config_rules[:] src_controller.device_config.config_rules.append(ConfigRule(**json_config_rule_delete_1))
controller.device_config.config_rules.append(ConfigRule(**json_config_rule)) self.__task_executor.configure_device(src_controller)
self.__task_executor.configure_device(controller)
results.append(True) results.append(True)
except Exception as e: # pylint: disable=broad-except def get_config_rules(controller):
LOGGER.exception('Unable to DeleteEndpoint for Service({:s})'.format(str(service_uuid))) try:
results.append(e) config_rules = controller.device_config.config_rules
for rule in config_rules:
if rule.HasField("custom"):
resource_key = rule.custom.resource_key
resource_value = rule.custom.resource_value
LOGGER.debug(f"Resource key in config: {resource_key}, Resource value in config: {resource_value}")
except Exception as e:
print(f"Error accessing config rules: {e}")
get_config_rules(src_controller)
LOGGER.debug(f"Configuration rules: {src_controller.device_config.config_rules}")
return results return results
except Exception as e:
LOGGER.error(f"Error in SetEndpoint: {e}")
return [e]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment