# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging, requests, threading import resource from requests.auth import HTTPBasicAuth from typing import Any, Iterator, List, Optional, Tuple, Union from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.type_checkers.Checkers import chk_string, chk_type from device.service.driver_api._Driver import _Driver,RESOURCE_ENDPOINTS from device.service.drivers.OpenFlow.TfsApiClient import TfsApiClient from device.service.drivers.OpenFlow.Tools import find_key, get_switches, get_flows , add_flow , delete_flow , get_desc,get_port_desc, get_links_information,get_switches_information,del_flow_entry LOGGER = logging.getLogger(__name__) DRIVER_NAME = 'ryu' METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME}) ALL_RESOURCE_KEYS = [ RESOURCE_ENDPOINTS, ] class OpenFlowDriver(_Driver): def __init__(self, address: str, port: int, **settings) -> None: super().__init__(DRIVER_NAME, address, port, **settings) self.__lock = threading.Lock() self.__started = threading.Event() self.__terminate = threading.Event() username = self.settings.get('username') password = self.settings.get('password') self.__auth = HTTPBasicAuth(username, password) if username is not None and password is not None else None scheme = self.settings.get('scheme', 'http') self.__base_url = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port)) self.__timeout = int(self.settings.get('timeout', 120)) self.tac = TfsApiClient(self.address, int(self.port), scheme=scheme, username=username, password=password) self.__cookie_counter = 0 self._priority_counter = 1 def Connect(self) -> bool: url = f"{self.__base_url}" with self.__lock: try: response = requests.get(url, timeout=self.__timeout, verify=False, auth=self.__auth) response.raise_for_status() except requests.exceptions.Timeout: LOGGER.exception(f"Timeout connecting to {self.__base_url}") return False except requests.exceptions.RequestException as e: LOGGER.exception(f"Exception connecting to {self.__base_url}: {e}") return False else: self.__started.set() return True def Disconnect(self) -> bool: with self.__lock: self.__terminate.set() return True @metered_subclass_method(METRICS_POOL) def GetInitialConfig(self) -> List[Tuple[str, Any]]: with self.__lock: return [] @metered_subclass_method(METRICS_POOL) def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: chk_type('resources', resource_keys, list) results = [] with self.__lock: if len(resource_keys) == 0:resource_keys = ALL_RESOURCE_KEYS LOGGER.info(f'resource_key:{ALL_RESOURCE_KEYS}') for i, resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) try: chk_string(str_resource_name, resource_key, allow_empty=False) if resource_key == RESOURCE_ENDPOINTS: LOGGER.info(f'resource_key:{RESOURCE_ENDPOINTS}') results.extend(self.tac.get_devices_endpoints()) except Exception as e: LOGGER.exception('Unhandled error processing resource_key({:s})'.format(str(resource_key))) results.append((resource_key, e)) return results @metered_subclass_method(METRICS_POOL) def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: url = f"{self.__base_url}/stats/flowentry/add" results = [] LOGGER.info(f"SetConfig_resources: {resources}") if not resources: return results with self.__lock: for resource in resources: try: resource_key, resource_value = resource if not resource_key.startswith("/device[") or not "/flow[" in resource_key: LOGGER.error(f"Invalid resource_key format: {resource_key}") results.append(Exception(f"Invalid resource_key format: {resource_key}")) continue try: resource_value_dict = json.loads(resource_value) LOGGER.info(f"resource_value_dict: {resource_value_dict}") dpid = int(resource_value_dict["dpid"], 16) in_port = int(resource_value_dict["in-port"].split("-")[1][3:]) out_port = int(resource_value_dict["out-port"].split("-")[1][3:]) self.__cookie_counter += 1 cookie = self.__cookie_counter ip_address_source = resource_value_dict.get("ip_address_source", "") ip_address_destination = resource_value_dict.get("ip_address_destination", "") mac_address_source = resource_value_dict.get("mac_address_source", "") mac_address_destination = resource_value_dict.get("mac_address_destination", "") if "h1-h3" in resource_key: priority = 65535 match_fields = { "in_port": in_port, "eth_type": 0x0800, "ipv4_src": ip_address_source , "ipv4_dst": ip_address_destination, "eth_src": mac_address_source, "dl_dst": mac_address_destination } elif "h3-h1" in resource_key: priority = 65535 match_fields = { "in_port": in_port, "eth_type": 0x0800, "ipv4_src": ip_address_source, "ipv4_dst": ip_address_destination, "eth_src": mac_address_source, "dl_dst": mac_address_destination } elif "h2-h4" in resource_key: priority = 1500 match_fields = { "in_port": in_port, "eth_type": 0x0800, "ipv4_src": ip_address_source , "ipv4_dst": ip_address_destination, "eth_src": mac_address_source, "eth_dst": mac_address_destination } elif "h4-h2" in resource_key: priority = 1500 match_fields = { "in_port": in_port, "eth_type": 0x0800, "ipv4_src": ip_address_source, "ipv4_dst": ip_address_destination, "eth_src": mac_address_source, "eth_dst": mac_address_destination } except (KeyError, ValueError, IndexError) as e: LOGGER.error(f"Invalid resource_value: {resource_value}, error: {e}") results.append(Exception(f"Invalid resource_value: {resource_value}")) continue LOGGER.debug(f"Flow match fields: {match_fields}") flow_entry = { "dpid": dpid, "priority": priority, "match": match_fields, "instructions": [ { "type": "APPLY_ACTIONS", "actions": [ { "max_len": 65535, "type": "OUTPUT", "port": out_port }]}]} flow_entry_arp_foraward = { "dpid": dpid, "priority": 65535, "match": { "eth_dst": "ff:ff:ff:ff:ff:ff", "eth_type": 0x0806 }, "instructions": [ { "type": "APPLY_ACTIONS", "actions": [ { "type": "OUTPUT", "port": "0xfffffffb" } ]}]} flow_entry_arp_reply = { "dpid": dpid, "priority": 65535, "match": { "eth_type": 0x0806, "arp_op": 2 }, "instructions": [ { "type": "APPLY_ACTIONS", "actions": [ { "type": "OUTPUT", "port": "0xfffffffb" } ]}]} try: response = requests.post(url, json=flow_entry_arp_foraward, timeout=self.__timeout, verify=False, auth=self.__auth) response = requests.post(url, json=flow_entry_arp_reply, timeout=self.__timeout, verify=False, auth=self.__auth) response = requests.post(url, json=flow_entry, timeout=self.__timeout, verify=False, auth=self.__auth) response.raise_for_status() results.append(True) LOGGER.info(f"Successfully posted flow entry: {flow_entry}") except requests.exceptions.Timeout: LOGGER.error(f"Timeout connecting to {url}") results.append(Exception(f"Timeout connecting to {url}")) except requests.exceptions.RequestException as e: LOGGER.error(f"Error posting flow entry {flow_entry} to {url}: {e}") results.append(e) except Exception as e: LOGGER.error(f"Error processing resource {resource_key}: {e}", exc_info=True) results.append(e) return results @metered_subclass_method(METRICS_POOL) def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: LOGGER.info(f"DeleteConfig called for resources: {resources}") url = f"{self.__base_url}/stats/flowentry/delete_strict" results = [] if not resources: return results with self.__lock: for resource in resources: try: resource_key, resource_value = resource if not resource_key.startswith("/device[") or not "/flow[" in resource_key: LOGGER.error(f"Invalid resource_key format: {resource_key}") results.append(Exception(f"Invalid resource_key format: {resource_key}")) continue try: resource_value_dict = json.loads(resource_value) LOGGER.info(f"resource_value_dict: {resource_value_dict}") dpid = int(resource_value_dict["dpid"], 16) in_port = int(resource_value_dict["in-port"].split("-")[1][3:]) out_port = int(resource_value_dict["out-port"].split("-")[1][3:]) self.__cookie_counter += 1 cookie = self.__cookie_counter ip_address_source = resource_value_dict.get("ip_address_source", "") ip_address_destination = resource_value_dict.get("ip_address_destination", "") mac_address_source = resource_value_dict.get("mac_address_source", "") mac_address_destination = resource_value_dict.get("mac_address_destination", "") if "h1-h3" in resource_key: priority = 65535 match_fields = { "in_port": in_port, "eth_type": 0x0800, "ipv4_src": ip_address_source , "ipv4_dst": ip_address_destination, "eth_src": mac_address_source, "dl_dst": mac_address_destination, "table_id": 0, "cookie": 0, "cookie_mask": 0, } elif "h3-h1" in resource_key: priority = 65535 match_fields = { "in_port": in_port, "eth_type": 0x0800, "ipv4_src": ip_address_source, "ipv4_dst": ip_address_destination, "eth_src": mac_address_source, "dl_dst": mac_address_destination, "table_id": 0, "cookie": 0, "cookie_mask": 0, } elif "h2-h4" in resource_key: priority = 1500 match_fields = { "in_port": in_port, "eth_type": 0x0800, "ipv4_src": ip_address_source , "ipv4_dst": ip_address_destination, "eth_src": mac_address_source, "eth_dst": mac_address_destination, "table_id": 0, "cookie": 0, "cookie_mask": 0, } elif "h4-h2" in resource_key: priority = 1500 match_fields = { "in_port": in_port, "eth_type": 0x0800, "ipv4_src": ip_address_source, "ipv4_dst": ip_address_destination, "eth_src": mac_address_source, "eth_dst": mac_address_destination, "table_id": 0, "cookie": 0, "cookie_mask": 0, } except (KeyError, ValueError, IndexError) as e: LOGGER.error(f"Invalid resource_value: {resource_value}, error: {e}") results.append(Exception(f"Invalid resource_value: {resource_value}")) continue LOGGER.debug(f"Flow match fields: {match_fields}") flow_entry = { "dpid": dpid, #cookie": 0, "priority": priority, "match": match_fields, "instructions": [ { "type": "APPLY_ACTIONS", "actions": [ { "max_len": 65535, "type": "OUTPUT", "port": out_port }]}]} flow_entry_arp_foraward = { "dpid": dpid, "priority": 65535, "match": { "eth_dst": "ff:ff:ff:ff:ff:ff", "eth_type": 0x0806 }, "instructions": [ { "type": "APPLY_ACTIONS", "actions": [ { "type": "OUTPUT", "port": "0xfffffffb" } ]}]} flow_entry_arp_reply = { "dpid": dpid, "priority": 65535, "match": { "eth_type": 0x0806, "arp_op": 2 }, "instructions": [ { "type": "APPLY_ACTIONS", "actions": [ { "type": "OUTPUT", "port": "0xfffffffb" } ]}]} try: response = requests.post(url, json=flow_entry_arp_foraward, timeout=self.__timeout, verify=False, auth=self.__auth) response = requests.post(url, json=flow_entry_arp_reply, timeout=self.__timeout, verify=False, auth=self.__auth) response = requests.post(url, json=flow_entry, timeout=self.__timeout, verify=False, auth=self.__auth) response.raise_for_status() results.append(True) LOGGER.info(f"Successfully posted flow entry: {flow_entry}") except requests.exceptions.Timeout: LOGGER.error(f"Timeout connecting to {url}") results.append(Exception(f"Timeout connecting to {url}")) except requests.exceptions.RequestException as e: LOGGER.error(f"Error posting flow entry {flow_entry} to {url}: {e}") results.append(e) except Exception as e: LOGGER.error(f"Error processing resource {resource_key}: {e}", exc_info=True) results.append(e) return results ## @metered_subclass_method(METRICS_POOL) ## def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: ## # TODO: TAPI does not support monitoring by now # return [False for _ in subscriptions] # # @metered_subclass_method(METRICS_POOL) # def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: # # TODO: TAPI does not support monitoring by now # return [False for _ in subscriptions] # # def GetState( # self, blocking=False, terminate : Optional[threading.Event] = None # ) -> Iterator[Tuple[float, str, Any]]: # # TODO: TAPI does not support monitoring by now # return []