Skip to content
Snippets Groups Projects
service_discoverer.py 23 KiB
Newer Older
JorgeEcheva26's avatar
JorgeEcheva26 committed
from requests.exceptions import RequestsDependencyWarning
import warnings
import json
import requests
import os
import logging
import urllib3
JorgeEcheva26's avatar
JorgeEcheva26 committed
import re
JorgeEcheva26's avatar
JorgeEcheva26 committed
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


warnings.filterwarnings("ignore", category=RequestsDependencyWarning)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
# Basic logger configuration
JorgeEcheva26's avatar
JorgeEcheva26 committed

log_path = 'logs/sdk_logs.log'

log_dir = os.path.dirname(log_path)

if not os.path.exists(log_dir):
    os.makedirs(log_dir)

logging.basicConfig(
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
    level=logging.NOTSET,  # Minimum severity level to log
    # Log message format
JorgeEcheva26's avatar
JorgeEcheva26 committed
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        logging.FileHandler(log_path),  # Logs to a file
        logging.StreamHandler()  # Also outputs to the console
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
class service_discoverer:
JorgeEcheva26's avatar
JorgeEcheva26 committed
    class ServiceDiscovererException(Exception):
        pass

    def __init__(
            self,
            config_file
    ):
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Load configuration from file if necessary
JorgeEcheva26's avatar
JorgeEcheva26 committed
        config_file = os.path.abspath(config_file)
        config = self.__load_config_file(config_file)
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        debug_mode = os.getenv('DEBUG_MODE', config.get('debug_mode', 'False')).strip().lower()
JorgeEcheva26's avatar
JorgeEcheva26 committed
        if debug_mode == "false":
            debug_mode = False
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        else:
            debug_mode = True
JorgeEcheva26's avatar
JorgeEcheva26 committed

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Initialize logger for this class
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger = logging.getLogger(self.__class__.__name__)
        if debug_mode:
            self.logger.setLevel(logging.DEBUG)
        else:
            self.logger.setLevel(logging.WARNING)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Set logging level for urllib based on debug_mode
JorgeEcheva26's avatar
JorgeEcheva26 committed
        urllib_logger = logging.getLogger("urllib3")
        if not debug_mode:
            urllib_logger.setLevel(logging.WARNING)
        else:
            urllib_logger.setLevel(logging.DEBUG)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Configuration path to store files
        self.config_path = os.path.dirname(os.path.abspath(config_file)) + "/"

        # Retrieve host and port information from environment variables or config
        capif_host = os.getenv('CAPIF_HOST', config.get('capif_host', '')).strip()
        capif_https_port = str(os.getenv('CAPIF_HTTPS_PORT', config.get('capif_https_port', '')).strip())
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Get the folder for storing invoker certificates from environment or config
        invoker_config = config.get('invoker', {})
JorgeEcheva26's avatar
JorgeEcheva26 committed
        invoker_general_folder = os.path.abspath(
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            os.getenv('invoker_folder', invoker_config.get('invoker_folder', '')).strip()
        )
JorgeEcheva26's avatar
JorgeEcheva26 committed
        capif_callback_url = os.getenv('INVOKER_CAPIF_CALLBACK_URL', invoker_config.get('capif_callback_url', '')).strip()
        supported_features = os.getenv('INVOKER_FOLDER', invoker_config.get('supported_features', '')).strip()
        check_authentication_data = invoker_config.get('check_authentication_data', {})
        self.check_authentication_data = {
            "ip": os.getenv('INVOKER_CHECK_AUTHENTICATION_DATA_IP', check_authentication_data.get('ip', '')).strip(),
            "port":  os.getenv('INVOKER_CHECK_AUTHENTICATION_DATA_PORT', check_authentication_data.get('port', '')).strip()
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Retrieve CAPIF invoker username
        capif_invoker_username = os.getenv('CAPIF_USERNAME', config.get('capif_username', '')).strip()
JorgeEcheva26's avatar
JorgeEcheva26 committed

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Extract discover filter configuration from JSON or environment variables
        discover_filter_config = invoker_config.get('discover_filter', {})
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.discover_filter = {
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            "api-name": os.getenv('DISCOVER_FILTER_API_NAME', discover_filter_config.get('api-name', '')).strip(),
            "api-version": os.getenv('DISCOVER_FILTER_API_VERSION', discover_filter_config.get('api-version', '')).strip(),
            "comm-type": os.getenv('DISCOVER_FILTER_COMM_TYPE', discover_filter_config.get('comm-type', '')).strip(),
            "protocol": os.getenv('DISCOVER_FILTER_PROTOCOL', discover_filter_config.get('protocol', '')).strip(),
            "aef-id": os.getenv('DISCOVER_FILTER_AEF_ID', discover_filter_config.get('aef-id', '')).strip(),
            "data-format": os.getenv('DISCOVER_FILTER_DATA_FORMAT', discover_filter_config.get('data-format', '')).strip(),
            "api-cat": os.getenv('DISCOVER_FILTER_API_CAT', discover_filter_config.get('api-cat', '')).strip(),
            "preferred-aef-loc": os.getenv('DISCOVER_FILTER_PREFERRED_AEF_LOC', discover_filter_config.get('preferred-aef-loc', '')).strip(),
            "req-api-prov-name": os.getenv('DISCOVER_FILTER_REQ_API_PROV_NAME', discover_filter_config.get('req-api-prov-name', '')).strip(),
            "supported-features": os.getenv('DISCOVER_FILTER_SUPPORTED_FEATURES', discover_filter_config.get('supported-features', '')).strip(),
            "api-supported-features": os.getenv('DISCOVER_FILTER_API_SUPPORTED_FEATURES', discover_filter_config.get('api-supported-features', '')).strip(),
            "ue-ip-addr": os.getenv('DISCOVER_FILTER_UE_IP_ADDR', discover_filter_config.get('ue-ip-addr', '')).strip(),
            "service-kpis": os.getenv('DISCOVER_FILTER_SERVICE_KPIS', discover_filter_config.get('service-kpis', '')).strip()
JorgeEcheva26's avatar
JorgeEcheva26 committed
        }
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed

        # Store important attributes for CAPIF invocation
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.capif_invoker_username = capif_invoker_username
        self.capif_host = capif_host
        self.capif_https_port = capif_https_port
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        self.token = ""
JorgeEcheva26's avatar
JorgeEcheva26 committed
        if supported_features is None:
            supported_features = 0
        self.supported_features = supported_features
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed

        # Create invoker folder dynamically based on username and folder path
        self.invoker_folder = os.path.join(invoker_general_folder, capif_invoker_username)
JorgeEcheva26's avatar
JorgeEcheva26 committed
        os.makedirs(self.invoker_folder, exist_ok=True)
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed

        # Load CAPIF API details
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.capif_callback_url = capif_callback_url
        self.invoker_capif_details = self.__load_provider_api_details()
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        try:
            self.token = self.invoker_capif_details["access_token"]

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            pass
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Define paths for certificates, private keys, and CA root
        self.signed_key_crt_path = os.path.join(self.invoker_folder, self.invoker_capif_details["user_name"] + ".crt")
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        self.private_key_path = os.path.join(self.invoker_folder, "private.key")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.ca_root_path = os.path.join(self.invoker_folder, "ca.crt")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Log initialization success message
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info("ServiceDiscoverer initialized correctly")

    def __load_config_file(self, config_file: str):
        """Carga el archivo de configuración."""
        try:
            with open(config_file, 'r') as file:
                return json.load(file)
        except FileNotFoundError:
            self.logger.warning(
                f"Configuration file {config_file} not found. Using defaults or environment variables.")
            return {}

    def __load_provider_api_details(self):
        try:
            path = os.path.join(
                self.invoker_folder, "capif_api_security_context_details-"+self.capif_invoker_username+".json")
            with open(
                    path,
                    "r",
            ) as openfile:
                details = json.load(openfile)
            self.logger.info("Api provider details correctly loaded")
            return details
        except Exception as e:
            self.logger.error(
                "Error while loading Api invoker details: %s", str(e))
            raise

    def _add_trailing_slash_to_url_if_missing(self, url):
        if not url.endswith("/"):
            url += "/"
        return url

    def get_security_context(self, supp_features):
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info("Getting security context for all API's filtered")

        self.logger.info("Trying to update security context")
        self.__update_security_service(supp_features)
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.__cache_security_context()

    def get_access_token(self):
        """
        :param api_name: El nombre del API devuelto por descubrir servicios
        :param api_id: El id del API devuelto por descubrir servicios
        :param aef_id: El aef_id relevante devuelto por descubrir servicios
        :return: El token de acceso (jwt)
        """
        token_dic = self.__get_security_token()
        self.logger.info("Access token successfully obtained")
        return token_dic["access_token"]

    def __cache_security_context(self):
        try:
            path = os.path.join(
                self.invoker_folder, "capif_api_security_context_details-"+self.capif_invoker_username+".json")
            with open(
                    path, "w"
            ) as outfile:
                json.dump(self.invoker_capif_details, outfile)
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.logger.info("Security context saved correctly")
        except Exception as e:
            self.logger.error(
                "Error when saving the security context: %s", str(e))
            raise

    def __update_security_service(self, supp_features):
JorgeEcheva26's avatar
JorgeEcheva26 committed
        """
        Actualiza el servicio de seguridad.

        :param api_id: El id del API devuelto por descubrir servicios.
        :param aef_id: El aef_id devuelto por descubrir servicios.
        :return: None.
        """
        url = f"https://{self.capif_host}:{self.capif_https_port}/capif-security/v1/trustedInvokers/{self.invoker_capif_details['api_invoker_id']}/update"
JorgeEcheva26's avatar
JorgeEcheva26 committed
        payload = {
            "securityInfo": [],
JorgeEcheva26's avatar
JorgeEcheva26 committed
            "notificationDestination": f"{self.capif_callback_url}",
JorgeEcheva26's avatar
JorgeEcheva26 committed
            "requestTestNotification": True,
            "websockNotifConfig": {
                "websocketUri": "string",
                "requestWebsocketUri": True
            },
JorgeEcheva26's avatar
JorgeEcheva26 committed
        }

        number_of_apis = len(
            self.invoker_capif_details["registered_security_contexes"])
JorgeEcheva26's avatar
JorgeEcheva26 committed
        for i in range(0, number_of_apis):
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Obtaining the values of api_id and aef_id for each API
            api_id = self.invoker_capif_details["registered_security_contexes"][i]['api_id']
JorgeEcheva26's avatar
JorgeEcheva26 committed
            for n in range(len(self.invoker_capif_details["registered_security_contexes"][i]['aef_profiles'])):
                aef_id = self.invoker_capif_details["registered_security_contexes"][i]['aef_profiles'][n]['aef_id']
                security_info = {
JorgeEcheva26's avatar
JorgeEcheva26 committed
                    "prefSecurityMethods": self.invoker_capif_details["registered_security_contexes"][i]['aef_profiles'][n]['security_methods'],
                    "authenticationInfo": "string",
                    "authorizationInfo": "string",
                    "aefId": aef_id,
                    "apiId": api_id
                }
                payload["securityInfo"].append(security_info)
JorgeEcheva26's avatar
JorgeEcheva26 committed
        try:
            response = requests.post(
                url,
                json=payload,
                cert=(self.signed_key_crt_path, self.private_key_path),
                verify=self.ca_root_path)
JorgeEcheva26's avatar
JorgeEcheva26 committed
            response.raise_for_status()
            self.logger.info("Security context correctly updated")

        except requests.exceptions.HTTPError as http_err:
            if response.status_code == 404:
                self.logger.warning(
                    "Received 404 exception from target CAPIF. This means it is the first time this CAPIF user is getting the JWT token, redirecting to register security service in CAPIF. The process continues correctly.")
                self.__register_security_service(supp_features)
JorgeEcheva26's avatar
JorgeEcheva26 committed
            else:
                self.logger.error("HTTP error occurred: %s", str(http_err))
                raise

        except requests.RequestException as e:
            self.logger.error(
                "Error trying to update Security context: %s", str(e))
            raise

    def __register_security_service(self, supp_features):
JorgeEcheva26's avatar
JorgeEcheva26 committed
        """
        :param api_id: El id del API devuelto por descubrir servicios
        :param aef_id: El aef_id devuelto por descubrir servicios
        :return: None
        """

        url = f"https://{self.capif_host}:{self.capif_https_port}/capif-security/v1/trustedInvokers/{self.invoker_capif_details['api_invoker_id']}"
JorgeEcheva26's avatar
JorgeEcheva26 committed
        payload = {
            "securityInfo": [],
JorgeEcheva26's avatar
JorgeEcheva26 committed
            "notificationDestination": f"{self.capif_callback_url}",
JorgeEcheva26's avatar
JorgeEcheva26 committed
            "requestTestNotification": True,
            "websockNotifConfig": {
                "websocketUri": "string",
                "requestWebsocketUri": True
            },
JorgeEcheva26's avatar
JorgeEcheva26 committed
        }

        number_of_apis = len(
            self.invoker_capif_details["registered_security_contexes"])
JorgeEcheva26's avatar
JorgeEcheva26 committed

        for i in range(0, number_of_apis):
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Obtaining the values of api_id and aef_id for each API
            api_id = self.invoker_capif_details["registered_security_contexes"][i]['api_id']
JorgeEcheva26's avatar
JorgeEcheva26 committed
            for n in range(len(self.invoker_capif_details["registered_security_contexes"][i]['aef_profiles'])):
                aef_id = self.invoker_capif_details["registered_security_contexes"][i]['aef_profiles'][n]['aef_id']
                security_info = {
JorgeEcheva26's avatar
JorgeEcheva26 committed
                    "prefSecurityMethods": self.invoker_capif_details["registered_security_contexes"][i]['aef_profiles'][n]['security_methods'],
                    "authenticationInfo": "string",
                    "authorizationInfo": "string",
                    "aefId": aef_id,
                    "apiId": api_id
                }
                payload["securityInfo"].append(security_info)
JorgeEcheva26's avatar
JorgeEcheva26 committed

        try:
            response = requests.put(url,
                                    json=payload,
                                    cert=(self.signed_key_crt_path,
                                          self.private_key_path),
                                    verify=self.ca_root_path
                                    )
            response.raise_for_status()
            self.logger.info("Security service properly registered")
        except requests.RequestException as e:
            self.logger.error(
                "Error when registering the security service: %s", str(e))
            raise

    def __get_security_token(self):
        """
        :param api_name: El nombre del API devuelto por descubrir servicios
        :param aef_id: El aef_id relevante devuelto por descubrir servicios
        :return: El token de acceso (jwt)
        """
        url = f"https://{self.capif_host}:{self.capif_https_port}/capif-security/v1/securities/{self.invoker_capif_details['api_invoker_id']}/token"
        # Build the scope by concatenating aef_id and api_name separated by a ';'
JorgeEcheva26's avatar
JorgeEcheva26 committed
        scope_parts = []

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Iterate over the registered contexts and build the scope parts
        for context in self.invoker_capif_details["registered_security_contexes"]:
JorgeEcheva26's avatar
JorgeEcheva26 committed
            api_name = context["api_name"]
            for i in range(0, len(context['aef_profiles'])):
                aef_id = context['aef_profiles'][i]['aef_id']
                scope_parts.append(f"{aef_id}:{api_name}")
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Join all the scope parts with ';' and add the prefix '3gpp#'
JorgeEcheva26's avatar
JorgeEcheva26 committed
        scope = "3gpp#" + ";".join(scope_parts)

        payload = {
            "grant_type": "client_credentials",
            "client_id": self.invoker_capif_details["api_invoker_id"],
JorgeEcheva26's avatar
JorgeEcheva26 committed
            "client_secret": "string",
            "scope": scope
        }
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded',
        }

        try:
            response = requests.post(url,
                                     headers=headers,
                                     data=payload,
                                     cert=(self.signed_key_crt_path,
                                           self.private_key_path),
                                     verify=self.ca_root_path
                                     )
            response.raise_for_status()
            response_payload = response.json()
            self.logger.info("Security token successfully obtained")
            return response_payload
        except requests.RequestException as e:
            self.logger.error(
                "Error obtaining the security token: %s ", str(e))
            raise

    def discover_service_apis(self):
        """
        Descubre los APIs de servicio desde CAPIF con filtros basados en un archivo JSON.
        :return: Payload JSON con los detalles de los APIs de servicio
        """
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Load the parameters from the JSON file
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Filter out parameters that are not empty
JorgeEcheva26's avatar
JorgeEcheva26 committed
        filters = self.discover_filter

        query_params = {k: v for k, v in filters.items() if v.strip()}

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Form the URL with the query parameters
JorgeEcheva26's avatar
JorgeEcheva26 committed
        query_string = "&".join([f"{k}={v}" for k, v in query_params.items()])

        url = f"https://{self.capif_host}:{self.capif_https_port}/{self.invoker_capif_details['discover_services_url']}{self.invoker_capif_details['api_invoker_id']}"
JorgeEcheva26's avatar
JorgeEcheva26 committed

        if query_string:
            url += f"&{query_string}"

        try:
            response = requests.get(
                url,
                headers={"Content-Type": "application/json"},
                cert=(self.signed_key_crt_path, self.private_key_path),
                verify=self.ca_root_path
            )

            response.raise_for_status()
            response_payload = response.json()
            self.logger.info("Service APIs successfully discovered")
            return response_payload
        except requests.RequestException as e:
            self.logger.error("Error discovering service APIs: %s", str(e))
            raise

    def retrieve_api_description_by_name(self, api_name):
        """
        Recupera la descripción del API por nombre.
        :param api_name: Nombre del API
        :return: Descripción del API
        """
        self.logger.info(
            "Retrieving the API description for api_name=%s", api_name)
        capif_apifs = self.discover_service_apis()
        endpoints = [api for api in capif_apifs["serviceAPIDescriptions"]
                     if api["apiName"] == api_name]
        if not endpoints:
            error_message = (
                f"Could not find available endpoints for api_name: {api_name}. "
                "Make sure that a) your Invoker is registered and onboarded to CAPIF and "
                "b) the NEF emulator has been registered and onboarded to CAPIF"
            )
            self.logger.error(error_message)
            raise ServiceDiscoverer.ServiceDiscovererException(error_message)
        else:
            self.logger.info("API description successfully retrieved")
            return endpoints[0]

    def retrieve_specific_resource_name(self, api_name, resource_name):
        """
        Recupera la URL para recursos específicos dentro de los APIs.
        :param api_name: Nombre del API
        :param resource_name: Nombre del recurso
        :return: URL del recurso específico
        """
        self.logger.info(
            "Retrieving the URL for resource_name=%s in api_name=%s", resource_name, api_name)
        api_description = self.retrieve_api_description_by_name(api_name)
        version_dictionary = api_description["aefProfiles"][0]["versions"][0]
        version = version_dictionary["apiVersion"]
        resources = version_dictionary["resources"]
        uris = [resource["uri"]
                for resource in resources if resource["resourceName"] == resource_name]

        if not uris:
            error_message = f"Could not find resource_name: {resource_name} at api_name {api_name}"
            self.logger.error(error_message)
            raise ServiceDiscoverer.ServiceDiscovererException(error_message)
        else:
            uri = uris[0]
            if not uri.startswith("/"):
                uri = "/" + uri
            if api_name.endswith("/"):
                api_name = api_name[:-1]
            result_url = api_name + "/" + version + uri
            self.logger.info(
                "URL of the specific resource successfully retrieved: %s", result_url)
            return result_url

    def save_security_token(self, token):
        self.invoker_capif_details["access_token"] = token
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.__cache_security_context()

    def get_tokens(self, supp_features="0"):
JorgeEcheva26's avatar
JorgeEcheva26 committed
        token = self.get_access_token()
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        self.token = token
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.save_security_token(token)

    def discover(self):
        endpoints = self.discover_service_apis()

        if len(endpoints) > 0:
            self.save_api_discovered(endpoints)
        else:
            self.logger.error(
                "No endpoints have been registered. Make sure a Provider has Published an API to CAPIF first")

    def save_api_discovered(self, endpoints):
        self.invoker_capif_details["registered_security_contexes"] = []
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.invoker_capif_details["registered_security_contexes"] = self.convert_keys_to_snake_case(endpoints["serviceAPIDescriptions"])
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.save_api_details()
JorgeEcheva26's avatar
JorgeEcheva26 committed
    def convert_keys_to_snake_case(self, data):
        if isinstance(data, dict):
            new_dict = {}
            for key, value in data.items():
                new_key = self.to_snake_case(key)
                new_dict[new_key] = self.convert_keys_to_snake_case(value) if isinstance(value, (dict, list)) else value
            return new_dict
        elif isinstance(data, list):
            return [self.convert_keys_to_snake_case(item) if isinstance(item, (dict, list)) else item for item in data]
        else:
            return data
JorgeEcheva26's avatar
JorgeEcheva26 committed
    def to_snake_case(self, camel_case_str):
        # Convertir CamelCase a snake_case
        return re.sub(r'(?<!^)(?=[A-Z])', '_', camel_case_str).lower()
JorgeEcheva26's avatar
JorgeEcheva26 committed
    def save_api_details(self):
        try:
            # Define the path to save the details
            file_path = os.path.join(
                self.invoker_folder, "capif_api_security_context_details-" + self.capif_invoker_username + ".json")

            # Save the details as a JSON file
            with open(file_path, "w") as outfile:
                json.dump(self.invoker_capif_details, outfile, indent=4)
JorgeEcheva26's avatar
JorgeEcheva26 committed

            # Log the success of the operation
            self.logger.info("API provider details correctly saved")

        except Exception as e:
            # Log any errors that occur during the save process
            self.logger.error(
                "Error while saving API provider details: %s", str(e))
            raise
JorgeEcheva26's avatar
JorgeEcheva26 committed
    def check_authentication(self, supported_features):
        self.logger.info("Checking authentication")
        try:
            invoker_details = self.__load_provider_api_details()
            invoker_id = invoker_details["api_invoker_id"]
            check_auth = self.check_authentication_data
            url = "http://"+f"{check_auth['ip']}:{check_auth['port']}/" + "aef-security/v1/check-authentication"
            payload = {
                "apiInvokerId": f"{invoker_id}",
JorgeEcheva26's avatar
JorgeEcheva26 committed
                "supportedFeatures": f"{supported_features}"
            headers = {
                "Authorization": "Bearer {}".format(self.token),
                "Content-Type": "application/json",
            }
            response = requests.request(
                "POST",
                url,
                headers=headers,
                json=payload
            response.raise_for_status()
            self.logger.info("Authentication of supported_features checked")
        except Exception as e:
            self.logger.error(
                f"Error during checking Invoker supported_features : {e} - Response: {response.text}")
            raise