From f47ed4750eabe802756182f58b81e4de2270967a Mon Sep 17 00:00:00 2001 From: rahhal <mrahhal@cttc.es> Date: Mon, 13 Jan 2025 15:47:28 +0000 Subject: [PATCH] Service component - Ryu Driver: - Added Configuration Rules In Service Handler --- .../drivers/OpenFlow/OpenFlowDriver.py | 36 ++----- .../l3nm_ryu/L3NMryuServiceHandler.py | 96 +++++++++++++------ .../service/task_scheduler/TaskExecutor.py | 2 +- 3 files changed, 78 insertions(+), 56 deletions(-) diff --git a/src/device/service/drivers/OpenFlow/OpenFlowDriver.py b/src/device/service/drivers/OpenFlow/OpenFlowDriver.py index a425943e4..8ccf7e71d 100644 --- a/src/device/service/drivers/OpenFlow/OpenFlowDriver.py +++ b/src/device/service/drivers/OpenFlow/OpenFlowDriver.py @@ -147,34 +147,14 @@ class OpenFlowDriver(_Driver): # results.append((key, e)) # return results # -# @metered_subclass_method(METRICS_POOL) -# def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: -# results = [] -# if not resources: -# return results -# with self.__lock: -# for item in resources: -# LOGGER.info('resources contains: %s', item) -# try: -# if isinstance(item, tuple) and len(item) == 2: -# key, flow_data = item -# else: -# LOGGER.warning("Resource format invalid. Each item should be a tuple with (key, data).") -# results.append(False) -# continue -# if key == "flow_data" and isinstance(flow_data, dict): -# LOGGER.info(f"Found valid flow_data entry: {flow_data}") -# success = add_flow(self.__base_url, flow_data, auth=self.__auth, timeout=self.__timeout) -# results.append(success) -# else: -# LOGGER.warning(f"Skipping item with key: {key} due to invalid format or missing data.") -# results.append(False) -# -# except Exception as e: -# LOGGER.error(f"Exception while setting configuration for item {item}: {str(e)}") -# results.append(e) -# -# return results + @metered_subclass_method(METRICS_POOL) + def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: + results = [] + LOGGER.info(f'SetConfig_resources:{resources}') + if not resources: + return results + + return results # # # diff --git a/src/service/service/service_handlers/l3nm_ryu/L3NMryuServiceHandler.py b/src/service/service/service_handlers/l3nm_ryu/L3NMryuServiceHandler.py index 536f3997d..08141fda5 100644 --- a/src/service/service/service_handlers/l3nm_ryu/L3NMryuServiceHandler.py +++ b/src/service/service/service_handlers/l3nm_ryu/L3NMryuServiceHandler.py @@ -13,14 +13,18 @@ # limitations under the License. import json, logging, netaddr +from venv import logger from re import L from typing import Any, Dict, List, Optional, Tuple, Union + +from pytest import skip from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method -from common.proto.context_pb2 import ConfigRule, Device, DeviceId, EndPoint, Service +from common.proto.context_pb2 import ConfigRule, Device, DeviceId, EndPoint, Service,ConfigRule_Custom,ConfigActionEnum from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.ConfigRule import json_config_rule_delete, json_config_rule_set from common.tools.object_factory.Device import json_device_id from common.type_checkers.Checkers import chk_type +#from nbi.service.rest_server.nbi_plugins.ietf_network.bindings.networks.network.link import destination from service.service.service_handler_api.Tools import get_device_endpoint_uuids, get_endpoint_matching from service.service.service_handler_api._ServiceHandler import _ServiceHandler from service.service.service_handler_api.SettingsHandler import SettingsHandler @@ -41,25 +45,28 @@ class RYUServiceHandler(_ServiceHandler): self.__settings_handler = SettingsHandler(service.service_config, **settings) - #def _get_endpoint_details( - # self, endpoint : Tuple[str, str, Optional[str]] - #) -> Tuple[Device, EndPoint, Dict]: - # device_uuid, endpoint_uuid = get_device_endpoint_uuids(endpoint) - # LOGGER.debug('device_uuid = {:s}'.format(str(device_uuid))) - # LOGGER.debug('endpoint_uuid = {:s}'.format(str(endpoint_uuid))) - # device_obj = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid))) - # LOGGER.debug('device_obj = {:s}'.format(str(grpc_message_to_json_string(device_obj)))) - # endpoint_obj = get_endpoint_matching(device_obj, endpoint_uuid) - # LOGGER.debug('endpoint_obj = {:s}'.format(str(grpc_message_to_json_string(endpoint_obj)))) - # endpoint_settings = self.__settings_handler.get_endpoint_settings(device_obj, endpoint_obj) - # device_name = device_obj.name - # endpoint_name = endpoint_obj.name - # if endpoint_settings is None: - # MSG = 'Settings not found for Endpoint(device=[uuid={:s}, name={:s}], endpoint=[uuid={:s}, name={:s}])' - # raise Exception(MSG.format(device_uuid, device_name, endpoint_uuid, endpoint_name)) - # endpoint_settings_dict : Dict = endpoint_settings.value - # LOGGER.debug('endpoint_settings_dict = {:s}'.format(str(endpoint_settings_dict))) - # return device_obj, endpoint_obj, endpoint_settings_dict + def _get_endpoint_details( + self, endpoint : Tuple[str, str, Optional[str]] + ) -> Tuple[Device, EndPoint]: #Dict]: + device_uuid, endpoint_uuid = get_device_endpoint_uuids(endpoint) + #LOGGER.debug('device_uuid = {:s}'.format(str(device_uuid))) + #LOGGER.debug('endpoint_uuid = {:s}'.format(str(endpoint_uuid))) + device_obj = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid))) + #LOGGER.debug('device_obj = {:s}'.format(str(grpc_message_to_json_string(device_obj)))) + endpoint_obj = get_endpoint_matching(device_obj, endpoint_uuid) + #LOGGER.debug('endpoint_obj = {:s}'.format(str(grpc_message_to_json_string(endpoint_obj)))) + #endpoint_settings = self.__settings_handler.get_endpoint_settings(device_obj, endpoint_obj) + #LOGGER.debug('endpoint_settings = {:s}'.format(str(endpoint_settings))) + device_name = device_obj.name + #LOGGER.debug('device_name = {:s}'.format(str(device_name))) + endpoint_name = endpoint_obj.name + #LOGGER.debug('endpoint_name = {:s}'.format(str(endpoint_name))) + #if endpoint_settings is None: + # MSG = 'Settings not found for Endpoint(device=[uuid={:s}, name={:s}], endpoint=[uuid={:s}, name={:s}])' + # raise Exception(MSG.format(device_uuid, device_name, endpoint_uuid, endpoint_name)) + #endpoint_settings_dict : Dict = endpoint_settings.value + #LOGGER.debug('endpoint_settings_dict = {:s}'.format(str(endpoint_settings_dict))) + return device_obj, endpoint_obj #endpoint_settings_dict @metered_subclass_method(METRICS_POOL) @@ -72,15 +79,50 @@ class RYUServiceHandler(_ServiceHandler): LOGGER.warning('nothing done: not enough endpoints') return [] service_uuid = self.__service.service_id.service_uuid.uuid - LOGGER.debug('service_uuid = {:s}'.format(str(service_uuid))) - LOGGER.debug('self.__settings_handler = {:s}'.format(str(self.__settings_handler.dump_config_rules()))) + service_name= self.__service.name + LOGGER.debug('service_name = {:s}'.format(str(service_name))) + #LOGGER.debug('service_uuid = {:s}'.format(str(service_uuid))) + #LOGGER.debug('self.__settings_handler = {:s}'.format(str(self.__settings_handler.dump_config_rules()))) results = [] try: - # Get endpoint details - src_device, src_endpoint, src_settings = self._get_endpoint_details(endpoints[0]) - dst_device, dst_endpoint, dst_settings = self._get_endpoint_details(endpoints[-1]) - LOGGER.debug(f"Source settings: {src_settings}") - LOGGER.debug(f"Destination settings: {dst_settings}") + src_device, src_endpoint, = self._get_endpoint_details(endpoints[0]) + dst_device, dst_endpoint, = self._get_endpoint_details(endpoints[-1]) + src_controller = self.__task_executor.get_device_controller(src_device) + + for index in range(len(endpoints) - 1): + current_device, current_endpoint = self._get_endpoint_details(endpoints[index]) + #LOGGER.debug(f"Current device: {current_device.name}, Current endpoint: {current_endpoint.name}") + next_device, next_endpoint = self._get_endpoint_details(endpoints[index + 1]) + #LOGGER.debug(f"Next device: {next_device.name}, Next endpoint: {next_endpoint.name}") + if current_device.name == next_device.name: + in_port_forward = current_endpoint.name + out_port_forward = next_endpoint.name + flow_split = service_name.split('-') + flow_rule_forward = f"{flow_split[0]}-{flow_split[2]}" + flow_rule_reverse = f"{flow_split[2]}-{flow_split[0]}" + forward_resource_value = json.dumps({"dpid": current_device.name, "in-port": in_port_forward, "out-port": out_port_forward}) + forward_rule = ConfigRule( + custom=ConfigRule_Custom( + resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_forward}]", + resource_value=forward_resource_value + ) + ) + LOGGER.debug(f"Forward configuration rule: {forward_rule}") + src_controller.device_config.config_rules.append(forward_rule) + in_port_reverse = next_endpoint.name + out_port_reverse = current_endpoint.name + reverse_resource_value = json.dumps({"dpid": current_device.name, "in-port": in_port_reverse, "out-port": out_port_reverse}) + reverse_rule = ConfigRule( + custom=ConfigRule_Custom( + resource_key=f"/device[{current_endpoint.name.split('-')[0]}]/flow[{flow_rule_reverse}]", + resource_value=reverse_resource_value + ) + ) + LOGGER.debug(f"Reverse configuration rule: {reverse_rule}") + src_controller.device_config.config_rules.append(reverse_rule) + + self.__task_executor.configure_device(src_controller) + results.append(True) return results diff --git a/src/service/service/task_scheduler/TaskExecutor.py b/src/service/service/task_scheduler/TaskExecutor.py index 55f50f044..19f12ad4b 100644 --- a/src/service/service/task_scheduler/TaskExecutor.py +++ b/src/service/service/task_scheduler/TaskExecutor.py @@ -288,7 +288,7 @@ class TaskExecutor: else: if not exclude_managed_by_controller: LOGGER.debug('device managed by controller = {:s}'.format(str(device_uuid))) - device_type = DeviceTypeEnum._value2member_map_[device.device_type] + device_type = DeviceTypeEnum._value2member_map_[controller.device_type] LOGGER.debug('device_type not exlude by controller = {:s}'.format(str(device_type))) devices.setdefault(device_type, dict())[device_uuid] = device else: -- GitLab