Loading opencapif_sdk/__init__.py +2 −2 Original line number Diff line number Diff line Loading @@ -3,6 +3,6 @@ from opencapif_sdk.capif_provider_connector import capif_provider_connector from opencapif_sdk.service_discoverer import service_discoverer from opencapif_sdk.api_schema_translator import api_schema_translator from opencapif_sdk.capif_logging_feature import capif_logging_feature from opencapif_sdk.capif_event_feature import capif_invoker_event_feature from opencapif_sdk.capif_event_feature import capif_invoker_event_feature, capif_provider_event_feature __all__ = ["capif_invoker_connector", "service_discoverer", "capif_provider_connector", "api_schema_translator", "capif_logging_feature","capif_invoker_event_feature"] No newline at end of file __all__ = ["capif_invoker_connector", "service_discoverer", "capif_provider_connector", "api_schema_translator", "capif_logging_feature", "capif_invoker_event_feature", "capif_provider_event_feature"] No newline at end of file opencapif_sdk/capif_event_feature.py +216 −1 Original line number Diff line number Diff line from opencapif_sdk import capif_invoker_connector from opencapif_sdk import capif_invoker_connector,capif_provider_connector import os import logging import shutil Loading Loading @@ -216,3 +216,218 @@ class capif_invoker_event_feature(capif_invoker_connector): def patch_subcription(self, name): self.update_subcription(self, name) class capif_provider_event_feature(capif_provider_connector): def create_subscription(self, name, id): subscriberId = id path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions" list_of_ids = self._load_provider_api_details() number = self._find_key_by_value(list_of_ids, id) payload = { "events": self.events_description, "eventFilters": self.events_filter, "eventReq": {}, # TO IMPROVE !!! "notificationDestination": f"{self.notification_destination}", "requestTestNotification": True, "websockNotifConfig": self.websock_notif_config, "supportedFeatures": f"{self.supported_features}" } number_low = number.lower() cert = ( os.path.join(self.provider_folder, f"{number_low}.crt"), os.path.join(self.provider_folder, f"{number}_private_key.key"), ) try: response = requests.post( url=path, json=payload, headers={"Content-Type": "application/json"}, cert=cert, verify=os.path.join(self.provider_folder, "ca.crt") ) response.raise_for_status() location_header = response.headers.get("Location") if location_header: # Extrae el identificador de la URL en el encabezado 'Location' identifier = location_header.rstrip('/').split('/')[-1] self.logger.info(f"Subscriptionid obtained: {identifier}") else: self.logger.error("The Location header is not available in the response") path = os.path.join(self.provider_folder, "capif_subscriptions_id.json") # Load or initialize the subscription dictionary # Load or initialize the subscription dictionary if os.path.exists(path): subscription = self._load_config_file(path) if not isinstance(subscription, dict): raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}") else: subscription = {} if not isinstance(subscriberId, (str, int)): raise TypeError(f"Expected 'subscriberId' to be a string or integer, but got {type(subscriberId).__name__}") # Convert events_description to a string if it isn't already if not isinstance(name, str): name = str(name) # Update the subscription structure subscription[str(subscriberId)] = { f"{name}": identifier } # Save the updated dictionary back to the file self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} def delete_subscription(self, name, id): subscriberId = id path = os.path.join(self.provider_folder, "capif_subscriptions_id.json") if os.path.exists(path): subscription = self._load_config_file(path) if not isinstance(subscription, dict): raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}") if subscriberId in subscription and name in subscription[subscriberId]: identifier = subscription[subscriberId][name] # Attempt to delete the subscription from CAPIF delete_path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions/{identifier}" list_of_ids = self._load_provider_api_details() number = self._find_key_by_value(list_of_ids, id) number_low = number.lower() cert = ( os.path.join(self.provider_folder, f"{number_low}.crt"), os.path.join(self.provider_folder, f"{number}_private_key.key"), ) try: response = requests.delete( url=delete_path, headers={"Content-Type": "application/json"}, cert=cert, verify=os.path.join(self.provider_folder, "ca.crt") ) response.raise_for_status() # Remove the service entry from the subscription dictionary del subscription[subscriberId][name] # If no more services exist for the subscriber, remove the subscriber entry if not subscription[subscriberId]: del subscription[subscriberId] # Save the updated dictionary back to the file self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w") self.logger.info(f"Successfully deleted subscription for service '{name}'") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} else: self.logger.warning(f"Service '{name}' not found for subscriber '{subscriberId}'") return None, {"error": f"Service '{name}' not found for subscriber '{subscriberId}'"} else: self.logger.error("Subscription file not found at path: %s", path) return None, {"error": "Subscription file not found"} def update_subcription(self, name, id): subscriberId = id path = os.path.join(self.provider_folder, "capif_subscriptions_id.json") list_of_ids = self._load_provider_api_details() number = self._find_key_by_value(list_of_ids, id) payload = { "events": self.events_description, "eventFilters": self.events_filter, "eventReq": {}, # TO IMPROVE !!! "notificationDestination": f"{self.notification_destination}", "requestTestNotification": True, "websockNotifConfig": self.websock_notif_config, "supportedFeatures": f"{self.supported_features}" } if os.path.exists(path): subscription = self._load_config_file(path) if not isinstance(subscription, dict): raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}") if subscriberId in subscription and name in subscription[subscriberId]: identifier = subscription[subscriberId][name] # Attempt to delete the subscription from CAPIF put_path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions/{identifier}" list_of_ids = self._load_provider_api_details() number = self._find_key_by_value(list_of_ids, id) number_low = number.lower() cert = ( os.path.join(self.provider_folder, f"{number_low}.crt"), os.path.join(self.provider_folder, f"{number}_private_key.key"), ) try: response = requests.put( url=put_path, json=payload, headers={"Content-Type": "application/json"}, cert=cert, verify=os.path.join(self.provider_folder, "ca.crt") ) response.raise_for_status() # Remove the service entry from the subscription dictionary del subscription[subscriberId][name] # If no more services exist for the subscriber, remove the subscriber entry if not subscription[subscriberId]: del subscription[subscriberId] # Save the updated dictionary back to the file self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w") self.logger.info(f"Successfully updated subscription for service '{name}'") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} else: self.logger.warning(f"Service '{name}' not found for subscriber '{subscriberId}'") return None, {"error": f"Service '{name}' not found for subscriber '{subscriberId}'"} else: self.logger.error("Subscription file not found at path: %s", path) return None, {"error": "Subscription file not found"} def patch_subcription(self, name, id): self.update_subcription(self, name, id) No newline at end of file opencapif_sdk/capif_provider_connector.py +86 −14 Original line number Diff line number Diff line Loading @@ -153,7 +153,7 @@ class capif_provider_connector: path_prov_funcs = os.path.join(self.provider_folder, "provider_capif_ids.json") if os.path.exists(path_prov_funcs): self.provider_capif_ids=self.__load_provider_api_details() self.provider_capif_ids = self._load_provider_api_details() path_published = os.path.join(self.provider_folder, "provider_service_ids.json") if os.path.exists(path_published): Loading @@ -171,6 +171,11 @@ class capif_provider_connector: else: self.capif_register_url = f"https://{capif_register_host.strip()}:{capif_register_port.strip()}/" events_config = provider_config.get('events', {}) self.events_description = os.getenv('PROVIDER_EVENTS_DESCRIPTION', events_config.get('description', '')) self.events_filter = os.getenv('PROVIDER_EVENTS_FILTERS', events_config.get('eventFilters', '')) self.notification_destination = os.getenv('PROVIDER_EVENTS_FILTERS', events_config.get('notificationDestination', '')) self.websock_notif_config = os.getenv('PROVIDER_EVENTS_FILTERS', events_config.get('websockNotifConfig', '')) # Log initialization success message self.logger.info("capif_provider_connector initialized with the capif_sdk_config.json parameters") Loading Loading @@ -427,7 +432,7 @@ class capif_provider_connector: self.logger.info( f"Loading provider details from {provider_details_path}") provider_details = self.__load_provider_api_details() provider_details = self._load_provider_api_details() publish_url = provider_details["publish_url"] Loading Loading @@ -604,7 +609,7 @@ class capif_provider_connector: self.logger.info( f"Loading provider details from {provider_details_path}") provider_details = self.__load_provider_api_details() provider_details = self._load_provider_api_details() publish_url = provider_details["publish_url"] # Load provider details Loading Loading @@ -722,7 +727,7 @@ class capif_provider_connector: self.logger.info( f"Loading provider details from {provider_details_path}") provider_details = self.__load_provider_api_details() provider_details = self._load_provider_api_details() publish_url = provider_details["publish_url"] chosenAPFsandAEFs = self.publish_req Loading Loading @@ -796,7 +801,7 @@ class capif_provider_connector: self.logger.info( f"Loading provider details from {provider_details_path}") provider_details = self.__load_provider_api_details() provider_details = self._load_provider_api_details() publish_url = provider_details["publish_url"] chosenAPFsandAEFs = self.publish_req Loading Loading @@ -872,7 +877,7 @@ class capif_provider_connector: self.logger.info( f"Loading provider details from {provider_details_path}") provider_details = self.__load_provider_api_details() provider_details = self._load_provider_api_details() publish_url = provider_details["publish_url"] chosenAPFsandAEFs = self.publish_req Loading Loading @@ -1071,7 +1076,7 @@ class capif_provider_connector: self.logger.info("Offboarding the provider") # Load CAPIF API details capif_api_details = self.__load_provider_api_details() capif_api_details = self._load_provider_api_details() url = f"{self.capif_https_url}api-provider-management/v1/registrations/{capif_api_details['capif_registration_id']}" # Define certificate paths Loading Loading @@ -1120,7 +1125,7 @@ class capif_provider_connector: self.logger.error(f"Error during removing folder contents: {e}") raise def __load_provider_api_details(self) -> dict: def _load_provider_api_details(self) -> dict: """ Loads NEF API details from the CAPIF provider details JSON file. Loading Loading @@ -1163,7 +1168,7 @@ class capif_provider_connector: ) def certs_modifications(self): api_details = self.__load_provider_api_details() api_details = self._load_provider_api_details() apf_count = 0 aef_count = 0 Loading Loading @@ -1253,7 +1258,7 @@ class capif_provider_connector: def update_onboard(self, capif_onboarding_url, access_token): self.logger.info( "Onboarding Provider to CAPIF and waiting signed certificate by giving our public keys to CAPIF") api_details = self.__load_provider_api_details() api_details = self._load_provider_api_details() capif_id = "/" + api_details["capif_registration_id"] url = f"{self.capif_https_url}{capif_onboarding_url}{capif_id}" Loading Loading @@ -1346,3 +1351,70 @@ class capif_provider_connector: self.logger.error( f"Onboarding failed: {e} - Response: {response.text}") raise def _create_or_update_file(self, file_name, file_type, content, mode="w"): """ Create or update a file with the specified content. :param file_name: Name of the file (without extension). :param file_type: File type or extension (e.g., "txt", "json", "html"). :param content: Content to write into the file. Can be a string, dictionary, or list. :param mode: Write mode ('w' to overwrite, 'a' to append). Default is 'w'. """ # Validate the mode if mode not in ["w", "a"]: raise ValueError("Mode must be 'w' (overwrite) or 'a' (append).") # Construct the full file name full_file_name = f"{file_name}.{file_type}" full_path = os.path.join(self.provider_folder, full_file_name) # Ensure the content is properly formatted if isinstance(content, (dict, list)): if file_type == "json": try: # Serialize content to JSON content = json.dumps(content, indent=4) except TypeError as e: raise ValueError(f"Failed to serialize content to JSON: {e}") else: raise TypeError("Content must be a string when the file type is not JSON.") elif not isinstance(content, str): raise TypeError("Content must be a string, dictionary, or list.") try: # Open the file in the specified mode with open(full_path, mode, encoding="utf-8") as file: file.write(content) # Log success based on the mode if mode == "w": self.logger.info(f"File '{full_file_name}' created or overwritten successfully.") elif mode == "a": self.logger.info(f"Content appended to file '{full_file_name}' successfully.") except Exception as e: self.logger.error(f"Error handling the file '{full_file_name}': {e}") raise def _find_key_by_value(self, data, target_value): """ Given a dictionary and a value, return the key corresponding to that value. :param data: Dictionary to search. :param target_value: Value to find the corresponding key for. :return: Key corresponding to the target value, or None if not found. """ for key, value in data.items(): if value == target_value: return key return None def _load_config_file(self, config_file: str): """Loads the configuration file.""" try: with open(config_file, 'r') as file: return json.load(file) except FileNotFoundError: self.logger.warning( f"Configuration file {config_file} not found. Using defaults or environment variables.") return {} No newline at end of file scripts/invoker_capif_event_subcription.py +5 −8 Original line number Diff line number Diff line import utilities from opencapif_sdk import capif_invoker_connector, capif_invoker_event_feature from opencapif_sdk import capif_invoker_event_feature def showcase_capif_connector(): Loading @@ -7,10 +7,6 @@ def showcase_capif_connector(): This method showcases how one can use the CAPIFConnector class. """ capif_connector = capif_invoker_connector(config_file=utilities.get_config_file()) capif_connector.onboard_invoker() events = capif_invoker_event_feature(config_file=utilities.get_config_file()) events.create_subscription(name="Servicio_2") Loading @@ -18,6 +14,7 @@ def showcase_capif_connector(): events.update_subcription(name="Servicio_2") events.delete_subscription(name="Servicio_2") print("COMPLETED") Loading scripts/provider_capif_event_feature.py 0 → 100644 +26 −0 Original line number Diff line number Diff line import utilities from opencapif_sdk import capif_provider_connector, capif_provider_event_feature def showcase_capif_connector(): """ This method showcases how one can use the CAPIFConnector class. """ provider = capif_provider_connector(config_file=utilities.get_config_file()) id = provider.provider_capif_ids["AEF-1"] events = capif_provider_event_feature(config_file=utilities.get_config_file()) events.create_subscription(name="Servicio_2", id=id) events.update_subcription(name="Servicio_2", id=id) events.delete_subscription(name="Servicio_2", id=id) print("COMPLETED") if __name__ == "__main__": # Register invoker to CAPIF. This should happen exactly once showcase_capif_connector() Loading
opencapif_sdk/__init__.py +2 −2 Original line number Diff line number Diff line Loading @@ -3,6 +3,6 @@ from opencapif_sdk.capif_provider_connector import capif_provider_connector from opencapif_sdk.service_discoverer import service_discoverer from opencapif_sdk.api_schema_translator import api_schema_translator from opencapif_sdk.capif_logging_feature import capif_logging_feature from opencapif_sdk.capif_event_feature import capif_invoker_event_feature from opencapif_sdk.capif_event_feature import capif_invoker_event_feature, capif_provider_event_feature __all__ = ["capif_invoker_connector", "service_discoverer", "capif_provider_connector", "api_schema_translator", "capif_logging_feature","capif_invoker_event_feature"] No newline at end of file __all__ = ["capif_invoker_connector", "service_discoverer", "capif_provider_connector", "api_schema_translator", "capif_logging_feature", "capif_invoker_event_feature", "capif_provider_event_feature"] No newline at end of file
opencapif_sdk/capif_event_feature.py +216 −1 Original line number Diff line number Diff line from opencapif_sdk import capif_invoker_connector from opencapif_sdk import capif_invoker_connector,capif_provider_connector import os import logging import shutil Loading Loading @@ -216,3 +216,218 @@ class capif_invoker_event_feature(capif_invoker_connector): def patch_subcription(self, name): self.update_subcription(self, name) class capif_provider_event_feature(capif_provider_connector): def create_subscription(self, name, id): subscriberId = id path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions" list_of_ids = self._load_provider_api_details() number = self._find_key_by_value(list_of_ids, id) payload = { "events": self.events_description, "eventFilters": self.events_filter, "eventReq": {}, # TO IMPROVE !!! "notificationDestination": f"{self.notification_destination}", "requestTestNotification": True, "websockNotifConfig": self.websock_notif_config, "supportedFeatures": f"{self.supported_features}" } number_low = number.lower() cert = ( os.path.join(self.provider_folder, f"{number_low}.crt"), os.path.join(self.provider_folder, f"{number}_private_key.key"), ) try: response = requests.post( url=path, json=payload, headers={"Content-Type": "application/json"}, cert=cert, verify=os.path.join(self.provider_folder, "ca.crt") ) response.raise_for_status() location_header = response.headers.get("Location") if location_header: # Extrae el identificador de la URL en el encabezado 'Location' identifier = location_header.rstrip('/').split('/')[-1] self.logger.info(f"Subscriptionid obtained: {identifier}") else: self.logger.error("The Location header is not available in the response") path = os.path.join(self.provider_folder, "capif_subscriptions_id.json") # Load or initialize the subscription dictionary # Load or initialize the subscription dictionary if os.path.exists(path): subscription = self._load_config_file(path) if not isinstance(subscription, dict): raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}") else: subscription = {} if not isinstance(subscriberId, (str, int)): raise TypeError(f"Expected 'subscriberId' to be a string or integer, but got {type(subscriberId).__name__}") # Convert events_description to a string if it isn't already if not isinstance(name, str): name = str(name) # Update the subscription structure subscription[str(subscriberId)] = { f"{name}": identifier } # Save the updated dictionary back to the file self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} def delete_subscription(self, name, id): subscriberId = id path = os.path.join(self.provider_folder, "capif_subscriptions_id.json") if os.path.exists(path): subscription = self._load_config_file(path) if not isinstance(subscription, dict): raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}") if subscriberId in subscription and name in subscription[subscriberId]: identifier = subscription[subscriberId][name] # Attempt to delete the subscription from CAPIF delete_path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions/{identifier}" list_of_ids = self._load_provider_api_details() number = self._find_key_by_value(list_of_ids, id) number_low = number.lower() cert = ( os.path.join(self.provider_folder, f"{number_low}.crt"), os.path.join(self.provider_folder, f"{number}_private_key.key"), ) try: response = requests.delete( url=delete_path, headers={"Content-Type": "application/json"}, cert=cert, verify=os.path.join(self.provider_folder, "ca.crt") ) response.raise_for_status() # Remove the service entry from the subscription dictionary del subscription[subscriberId][name] # If no more services exist for the subscriber, remove the subscriber entry if not subscription[subscriberId]: del subscription[subscriberId] # Save the updated dictionary back to the file self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w") self.logger.info(f"Successfully deleted subscription for service '{name}'") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} else: self.logger.warning(f"Service '{name}' not found for subscriber '{subscriberId}'") return None, {"error": f"Service '{name}' not found for subscriber '{subscriberId}'"} else: self.logger.error("Subscription file not found at path: %s", path) return None, {"error": "Subscription file not found"} def update_subcription(self, name, id): subscriberId = id path = os.path.join(self.provider_folder, "capif_subscriptions_id.json") list_of_ids = self._load_provider_api_details() number = self._find_key_by_value(list_of_ids, id) payload = { "events": self.events_description, "eventFilters": self.events_filter, "eventReq": {}, # TO IMPROVE !!! "notificationDestination": f"{self.notification_destination}", "requestTestNotification": True, "websockNotifConfig": self.websock_notif_config, "supportedFeatures": f"{self.supported_features}" } if os.path.exists(path): subscription = self._load_config_file(path) if not isinstance(subscription, dict): raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}") if subscriberId in subscription and name in subscription[subscriberId]: identifier = subscription[subscriberId][name] # Attempt to delete the subscription from CAPIF put_path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions/{identifier}" list_of_ids = self._load_provider_api_details() number = self._find_key_by_value(list_of_ids, id) number_low = number.lower() cert = ( os.path.join(self.provider_folder, f"{number_low}.crt"), os.path.join(self.provider_folder, f"{number}_private_key.key"), ) try: response = requests.put( url=put_path, json=payload, headers={"Content-Type": "application/json"}, cert=cert, verify=os.path.join(self.provider_folder, "ca.crt") ) response.raise_for_status() # Remove the service entry from the subscription dictionary del subscription[subscriberId][name] # If no more services exist for the subscriber, remove the subscriber entry if not subscription[subscriberId]: del subscription[subscriberId] # Save the updated dictionary back to the file self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w") self.logger.info(f"Successfully updated subscription for service '{name}'") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} else: self.logger.warning(f"Service '{name}' not found for subscriber '{subscriberId}'") return None, {"error": f"Service '{name}' not found for subscriber '{subscriberId}'"} else: self.logger.error("Subscription file not found at path: %s", path) return None, {"error": "Subscription file not found"} def patch_subcription(self, name, id): self.update_subcription(self, name, id) No newline at end of file
opencapif_sdk/capif_provider_connector.py +86 −14 Original line number Diff line number Diff line Loading @@ -153,7 +153,7 @@ class capif_provider_connector: path_prov_funcs = os.path.join(self.provider_folder, "provider_capif_ids.json") if os.path.exists(path_prov_funcs): self.provider_capif_ids=self.__load_provider_api_details() self.provider_capif_ids = self._load_provider_api_details() path_published = os.path.join(self.provider_folder, "provider_service_ids.json") if os.path.exists(path_published): Loading @@ -171,6 +171,11 @@ class capif_provider_connector: else: self.capif_register_url = f"https://{capif_register_host.strip()}:{capif_register_port.strip()}/" events_config = provider_config.get('events', {}) self.events_description = os.getenv('PROVIDER_EVENTS_DESCRIPTION', events_config.get('description', '')) self.events_filter = os.getenv('PROVIDER_EVENTS_FILTERS', events_config.get('eventFilters', '')) self.notification_destination = os.getenv('PROVIDER_EVENTS_FILTERS', events_config.get('notificationDestination', '')) self.websock_notif_config = os.getenv('PROVIDER_EVENTS_FILTERS', events_config.get('websockNotifConfig', '')) # Log initialization success message self.logger.info("capif_provider_connector initialized with the capif_sdk_config.json parameters") Loading Loading @@ -427,7 +432,7 @@ class capif_provider_connector: self.logger.info( f"Loading provider details from {provider_details_path}") provider_details = self.__load_provider_api_details() provider_details = self._load_provider_api_details() publish_url = provider_details["publish_url"] Loading Loading @@ -604,7 +609,7 @@ class capif_provider_connector: self.logger.info( f"Loading provider details from {provider_details_path}") provider_details = self.__load_provider_api_details() provider_details = self._load_provider_api_details() publish_url = provider_details["publish_url"] # Load provider details Loading Loading @@ -722,7 +727,7 @@ class capif_provider_connector: self.logger.info( f"Loading provider details from {provider_details_path}") provider_details = self.__load_provider_api_details() provider_details = self._load_provider_api_details() publish_url = provider_details["publish_url"] chosenAPFsandAEFs = self.publish_req Loading Loading @@ -796,7 +801,7 @@ class capif_provider_connector: self.logger.info( f"Loading provider details from {provider_details_path}") provider_details = self.__load_provider_api_details() provider_details = self._load_provider_api_details() publish_url = provider_details["publish_url"] chosenAPFsandAEFs = self.publish_req Loading Loading @@ -872,7 +877,7 @@ class capif_provider_connector: self.logger.info( f"Loading provider details from {provider_details_path}") provider_details = self.__load_provider_api_details() provider_details = self._load_provider_api_details() publish_url = provider_details["publish_url"] chosenAPFsandAEFs = self.publish_req Loading Loading @@ -1071,7 +1076,7 @@ class capif_provider_connector: self.logger.info("Offboarding the provider") # Load CAPIF API details capif_api_details = self.__load_provider_api_details() capif_api_details = self._load_provider_api_details() url = f"{self.capif_https_url}api-provider-management/v1/registrations/{capif_api_details['capif_registration_id']}" # Define certificate paths Loading Loading @@ -1120,7 +1125,7 @@ class capif_provider_connector: self.logger.error(f"Error during removing folder contents: {e}") raise def __load_provider_api_details(self) -> dict: def _load_provider_api_details(self) -> dict: """ Loads NEF API details from the CAPIF provider details JSON file. Loading Loading @@ -1163,7 +1168,7 @@ class capif_provider_connector: ) def certs_modifications(self): api_details = self.__load_provider_api_details() api_details = self._load_provider_api_details() apf_count = 0 aef_count = 0 Loading Loading @@ -1253,7 +1258,7 @@ class capif_provider_connector: def update_onboard(self, capif_onboarding_url, access_token): self.logger.info( "Onboarding Provider to CAPIF and waiting signed certificate by giving our public keys to CAPIF") api_details = self.__load_provider_api_details() api_details = self._load_provider_api_details() capif_id = "/" + api_details["capif_registration_id"] url = f"{self.capif_https_url}{capif_onboarding_url}{capif_id}" Loading Loading @@ -1346,3 +1351,70 @@ class capif_provider_connector: self.logger.error( f"Onboarding failed: {e} - Response: {response.text}") raise def _create_or_update_file(self, file_name, file_type, content, mode="w"): """ Create or update a file with the specified content. :param file_name: Name of the file (without extension). :param file_type: File type or extension (e.g., "txt", "json", "html"). :param content: Content to write into the file. Can be a string, dictionary, or list. :param mode: Write mode ('w' to overwrite, 'a' to append). Default is 'w'. """ # Validate the mode if mode not in ["w", "a"]: raise ValueError("Mode must be 'w' (overwrite) or 'a' (append).") # Construct the full file name full_file_name = f"{file_name}.{file_type}" full_path = os.path.join(self.provider_folder, full_file_name) # Ensure the content is properly formatted if isinstance(content, (dict, list)): if file_type == "json": try: # Serialize content to JSON content = json.dumps(content, indent=4) except TypeError as e: raise ValueError(f"Failed to serialize content to JSON: {e}") else: raise TypeError("Content must be a string when the file type is not JSON.") elif not isinstance(content, str): raise TypeError("Content must be a string, dictionary, or list.") try: # Open the file in the specified mode with open(full_path, mode, encoding="utf-8") as file: file.write(content) # Log success based on the mode if mode == "w": self.logger.info(f"File '{full_file_name}' created or overwritten successfully.") elif mode == "a": self.logger.info(f"Content appended to file '{full_file_name}' successfully.") except Exception as e: self.logger.error(f"Error handling the file '{full_file_name}': {e}") raise def _find_key_by_value(self, data, target_value): """ Given a dictionary and a value, return the key corresponding to that value. :param data: Dictionary to search. :param target_value: Value to find the corresponding key for. :return: Key corresponding to the target value, or None if not found. """ for key, value in data.items(): if value == target_value: return key return None def _load_config_file(self, config_file: str): """Loads the configuration file.""" try: with open(config_file, 'r') as file: return json.load(file) except FileNotFoundError: self.logger.warning( f"Configuration file {config_file} not found. Using defaults or environment variables.") return {} No newline at end of file
scripts/invoker_capif_event_subcription.py +5 −8 Original line number Diff line number Diff line import utilities from opencapif_sdk import capif_invoker_connector, capif_invoker_event_feature from opencapif_sdk import capif_invoker_event_feature def showcase_capif_connector(): Loading @@ -7,10 +7,6 @@ def showcase_capif_connector(): This method showcases how one can use the CAPIFConnector class. """ capif_connector = capif_invoker_connector(config_file=utilities.get_config_file()) capif_connector.onboard_invoker() events = capif_invoker_event_feature(config_file=utilities.get_config_file()) events.create_subscription(name="Servicio_2") Loading @@ -18,6 +14,7 @@ def showcase_capif_connector(): events.update_subcription(name="Servicio_2") events.delete_subscription(name="Servicio_2") print("COMPLETED") Loading
scripts/provider_capif_event_feature.py 0 → 100644 +26 −0 Original line number Diff line number Diff line import utilities from opencapif_sdk import capif_provider_connector, capif_provider_event_feature def showcase_capif_connector(): """ This method showcases how one can use the CAPIFConnector class. """ provider = capif_provider_connector(config_file=utilities.get_config_file()) id = provider.provider_capif_ids["AEF-1"] events = capif_provider_event_feature(config_file=utilities.get_config_file()) events.create_subscription(name="Servicio_2", id=id) events.update_subcription(name="Servicio_2", id=id) events.delete_subscription(name="Servicio_2", id=id) print("COMPLETED") if __name__ == "__main__": # Register invoker to CAPIF. This should happen exactly once showcase_capif_connector()