# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging, requests, threading import resource from requests.auth import HTTPBasicAuth from typing import Any, Iterator, List, Optional, Tuple, Union from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.type_checkers.Checkers import chk_string, chk_type from device.service.driver_api._Driver import _Driver,RESOURCE_ENDPOINTS from device.service.drivers.OpenFlow.TfsApiClient import TfsApiClient from device.service.drivers.OpenFlow.Tools import find_key, get_switches, get_flows , add_flow , delete_flow , get_desc,get_port_desc, get_links_information,get_switches_information,del_flow_entry LOGGER = logging.getLogger(__name__) DRIVER_NAME = 'ryu' METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME}) ALL_RESOURCE_KEYS = [ RESOURCE_ENDPOINTS, ] class OpenFlowDriver(_Driver): def __init__(self, address: str, port: int, **settings) -> None: super().__init__(DRIVER_NAME, address, port, **settings) self.__lock = threading.Lock() self.__started = threading.Event() self.__terminate = threading.Event() username = self.settings.get('username') password = self.settings.get('password') self.__auth = HTTPBasicAuth(username, password) if username is not None and password is not None else None scheme = self.settings.get('scheme', 'http') self.__base_url = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port)) self.__timeout = int(self.settings.get('timeout', 120)) self.tac = TfsApiClient(self.address, int(self.port), scheme=scheme, username=username, password=password) self.__cookie_counter = 0 self._priority_counter = 1 def Connect(self) -> bool: url = f"{self.__base_url}" with self.__lock: try: response = requests.get(url, timeout=self.__timeout, verify=False, auth=self.__auth) response.raise_for_status() except requests.exceptions.Timeout: LOGGER.exception(f"Timeout connecting to {self.__base_url}") return False except requests.exceptions.RequestException as e: LOGGER.exception(f"Exception connecting to {self.__base_url}: {e}") return False else: self.__started.set() return True def Disconnect(self) -> bool: with self.__lock: self.__terminate.set() return True @metered_subclass_method(METRICS_POOL) def GetInitialConfig(self) -> List[Tuple[str, Any]]: with self.__lock: return [] @metered_subclass_method(METRICS_POOL) def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: chk_type('resources', resource_keys, list) results = [] with self.__lock: if len(resource_keys) == 0:resource_keys = ALL_RESOURCE_KEYS LOGGER.info(f'resource_key:{ALL_RESOURCE_KEYS}') for i, resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) try: chk_string(str_resource_name, resource_key, allow_empty=False) if resource_key == RESOURCE_ENDPOINTS: LOGGER.info(f'resource_key:{RESOURCE_ENDPOINTS}') results.extend(self.tac.get_devices_endpoints()) except Exception as e: LOGGER.exception('Unhandled error processing resource_key({:s})'.format(str(resource_key))) results.append((resource_key, e)) return results @metered_subclass_method(METRICS_POOL) def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: url = f"{self.__base_url}/stats/flowentry/add" results = [] LOGGER.info(f"SetConfig_resources: {resources}") if not resources: return results with self.__lock: for resource in resources: try: resource_key, resource_value = resource if not resource_key.startswith("/device[") or not "/flow[" in resource_key: LOGGER.error(f"Invalid resource_key format: {resource_key}") results.append(Exception(f"Invalid resource_key format: {resource_key}")) continue try: resource_value_dict = json.loads(resource_value) LOGGER.info(f"resource_value_dict: {resource_value_dict}") dpid = int(resource_value_dict["dpid"], 16) in_port = int(resource_value_dict["in-port"].split("-")[1][3:]) out_port = int(resource_value_dict["out-port"].split("-")[1][3:]) self.__cookie_counter += 1 cookie = self.__cookie_counter ip_address_source = resource_value_dict.get("ip_address_source", "") ip_address_destination = resource_value_dict.get("ip_address_destination", "") mac_address_source = resource_value_dict.get("mac_address_source", "") mac_address_destination = resource_value_dict.get("mac_address_destination", "") # Default priority #priority = 1000 if "h1-h3" in resource_key: priority = 65535 match_fields = { "in_port": in_port, "eth_type": 0x0800, #"ipv4_src": ip_address_source , #"ipv4_dst": ip_address_destination, "eth_src": mac_address_source, "dl_dst": mac_address_destination } elif "h3-h1" in resource_key: priority = 65535 match_fields = { "in_port": in_port, "eth_type": 0x0800, #"ipv4_src": ip_address_source, #"ipv4_dst": ip_address_destination, "eth_src": mac_address_source, "dl_dst": mac_address_destination } elif "h2-h4" in resource_key: priority = 1500 match_fields = { "in_port": in_port, "eth_type": 0x0800, "ipv4_src": ip_address_source , "ipv4_dst": ip_address_destination, "eth_src": mac_address_source, "eth_dst": mac_address_destination } elif "h4-h2" in resource_key: priority = 1500 match_fields = { "in_port": in_port, "eth_type": 0x0800, "ipv4_src": ip_address_source, "ipv4_dst": ip_address_destination, "eth_src": mac_address_source, "eth_dst": mac_address_destination } except (KeyError, ValueError, IndexError) as e: LOGGER.error(f"Invalid resource_value: {resource_value}, error: {e}") results.append(Exception(f"Invalid resource_value: {resource_value}")) continue LOGGER.debug(f"Flow match fields: {match_fields}") flow_entry = { "dpid": dpid, #cookie": 0, "priority": priority, "match": match_fields, "instructions": [ { "type": "APPLY_ACTIONS", "actions": [ { "max_len": 65535, "type": "OUTPUT", "port": out_port }]}]} flow_entry_arp_foraward = { "dpid": dpid, "priority": 65535, "match": { "eth_dst": "ff:ff:ff:ff:ff:ff", "eth_type": 0x0806 }, "instructions": [ { "type": "APPLY_ACTIONS", "actions": [ { "type": "OUTPUT", "port": "0xfffffffb" } ]}]} flow_entry_arp_reply = { "dpid": dpid, "priority": 65535, "match": { "eth_type": 0x0806, "arp_op": 2 }, "instructions": [ { "type": "APPLY_ACTIONS", "actions": [ { "type": "OUTPUT", "port": "0xfffffffb" } ]}]} try: response = requests.post(url, json=flow_entry_arp_foraward, timeout=self.__timeout, verify=False, auth=self.__auth) response = requests.post(url, json=flow_entry_arp_reply, timeout=self.__timeout, verify=False, auth=self.__auth) response = requests.post(url, json=flow_entry, timeout=self.__timeout, verify=False, auth=self.__auth) response.raise_for_status() results.append(True) LOGGER.info(f"Successfully posted flow entry: {flow_entry}") except requests.exceptions.Timeout: LOGGER.error(f"Timeout connecting to {url}") results.append(Exception(f"Timeout connecting to {url}")) except requests.exceptions.RequestException as e: LOGGER.error(f"Error posting flow entry {flow_entry} to {url}: {e}") results.append(e) except Exception as e: LOGGER.error(f"Error processing resource {resource_key}: {e}", exc_info=True) results.append(e) return results #@metered_subclass_method(METRICS_POOL) #def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: # LOGGER.info(f"DeleteConfig123_resources: {resources}") # url = f"{self.__base_url}/stats/flowentry/delete_strict" # results = [] # if not resources: # return results # with self.__lock: # for resource in resources: # try: # resource_key, resource_value = resource # if not resource_key.startswith("/device[") or not "/flow[" in resource_key: # LOGGER.error(f"Invalid resource_key format: {resource_key}") # results.append(Exception(f"Invalid resource_key format: {resource_key}")) # continue # try: # resource_value_dict = json.loads(resource_value) # dpid = int(resource_value_dict["dpid"], 16) # in_port = int(resource_value_dict["in-port"].split("-")[1][3:]) # out_port = int(resource_value_dict["out-port"].split("-")[1][3:]) # except (KeyError, ValueError, IndexError) as e: # LOGGER.error(f"Invalid resource_value: {resource_value}, error: {e}") # results.append(Exception(f"Invalid resource_value: {resource_value}")) # continue # flow_entry = { # "dpid": dpid, # "table_id": 0, # "priority": 11111, # "match": {"in_port": in_port}, # } # try: # response = requests.post(url, json=flow_entry, timeout=self.__timeout, verify=False, auth=self.__auth) # response.raise_for_status() # results.append(True) # LOGGER.info(f"Successfully posted flow entry: {flow_entry}") # except requests.exceptions.Timeout: # LOGGER.error(f"Timeout connecting to {url}") # results.append(Exception(f"Timeout connecting to {url}")) # except requests.exceptions.RequestException as e: # LOGGER.error(f"Error posting flow entry {flow_entry} to {url}: {e}") # results.append(e) # except Exception as e: # LOGGER.error(f"Error processing resource {resource_key}: {e}", exc_info=True) # results.append(e) # return results # # ## @metered_subclass_method(METRICS_POOL) ## def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: ## # TODO: TAPI does not support monitoring by now # return [False for _ in subscriptions] # # @metered_subclass_method(METRICS_POOL) # def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: # # TODO: TAPI does not support monitoring by now # return [False for _ in subscriptions] # # def GetState( # self, blocking=False, terminate : Optional[threading.Event] = None # ) -> Iterator[Tuple[float, str, Any]]: # # TODO: TAPI does not support monitoring by now # return []