Loading opencapif_sdk/__init__.py +2 −1 Original line number Diff line number Diff line Loading @@ -3,5 +3,6 @@ from opencapif_sdk.capif_provider_connector import capif_provider_connector from opencapif_sdk.service_discoverer import service_discoverer from opencapif_sdk.api_schema_translator import api_schema_translator from opencapif_sdk.capif_logging_feature import capif_logging_feature from opencapif_sdk.capif_event_feature import capif_invoker_event_feature __all__ = ["capif_invoker_connector", "service_discoverer", "capif_provider_connector", "api_schema_translator", "capif_logging_feature"] No newline at end of file __all__ = ["capif_invoker_connector", "service_discoverer", "capif_provider_connector", "api_schema_translator", "capif_logging_feature","capif_invoker_event_feature"] No newline at end of file opencapif_sdk/capif_event_feature.py +55 −18 Original line number Diff line number Diff line from capif_invoker_connector import capif_invoker_connector from capif_provider_connector import capif_provider_connector from opencapif_sdk import capif_invoker_connector import os import logging import shutil Loading Loading @@ -39,42 +38,80 @@ logging.basicConfig( ] ) class capif_invoker_event_feature(capif_invoker_connector): def create_subscription: def create_subscription(self): invoker_capif_details = self.__load_invoker_api_details() invoker_capif_details = self.invoker_capif_details subscriberId = invoker_capif_details["api_invoker_id"] path = self.capif_https_url + f"/{subscriberId}/subscriptions" payload = self.events_config path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions" payload = { "events": self.events_description, "eventFilters": self.events_filter, "eventReq": {}, # TO IMPROVE !!! "notificationDestination": f"{self.capif_callback_url}", "requestTestNotification": True, "websockNotifConfig": { "websocketUri": f"{self.capif_callback_url}", "requestWebsocketUri": True }, "supportedFeatures": f"{self.supported_features}" } try: response = requests.post( url=path, json=payload, headers={"Content-Type": "application/json"}, cert=cert, cert=(self.signed_key_crt_path, self.private_key_path), verify=os.path.join(self.invoker_folder, "ca.crt") ) response.raise_for_status() location_header = response.headers.get("Location") if location_header: # Extrae el identificador de la URL en el encabezado 'Location' identifier = location_header.rstrip('/').split('/')[-1] self.logger.info(f"Subscriptionid obtained: {identifier}") else: self.logger.error("The Location header is not available in the response") path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json") return response.status_code, response.json() # Load or initialize the subscription list if os.path.exists(path): subscription = self._load_config_file(path) # Ensure subscription is a list if not isinstance(subscription, list): raise TypeError(f"Expected 'subscription' to be a list, but got {type(subscription).__name__}") else: subscription = [] # Find if the subscriberId already exists in the list subscriber_entry = next((item for item in subscription if item.get("subscriberId") == subscriberId), None) if subscriber_entry is None: # If subscriberId is not found, create a new entry subscriber_entry = {"subscriberId": subscriberId, "events": []} subscription.append(subscriber_entry) # Add the event to the subscriber's events list subscriber_entry["events"].append({self.events_description: identifier}) # Save the updated list back to the file self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} # def delete_subcription(self): def delete_subcription: def modify_subcription: def patch_subcription: # def modify_subcription(self): # def patch_subcription(self): opencapif_sdk/capif_invoker_connector.py +67 −35 Original line number Diff line number Diff line Loading @@ -46,7 +46,7 @@ class capif_invoker_connector: config_file = os.path.abspath(config_file) # Load configuration from file if necessary config = self.__load_config_file(config_file) config = self._load_config_file(config_file) debug_mode = os.getenv('DEBUG_MODE', config.get('debug_mode', 'False')).strip().lower() if debug_mode == "false": Loading Loading @@ -101,7 +101,8 @@ class capif_invoker_connector: # Events configuration events_config = invoker_config.get('events', {}) self.events_config = events_config self.events_description = os.getenv('INVOKER_EVENTS_DESCRIPTION', events_config.get('description', '')) self.events_filter = os.getenv('INVOKER_EVENTS_FILTERS', events_config.get('eventFilters', '')) # Define the invoker folder path and create it if it doesn't exist self.invoker_folder = os.path.join(invoker_general_folder, capif_username) Loading Loading @@ -142,9 +143,21 @@ class capif_invoker_connector: if os.path.exists(path): self.invoker_capif_details = self.__load_invoker_api_details() self.signed_key_crt_path = os.path.join( self.invoker_folder, self.capif_username + ".crt" ) self.private_key_path = os.path.join( self.invoker_folder, "private.key" ) self.pathca = os.path.join(self.invoker_folder, "ca.crt") self.logger.info("capif_invoker_connector initialized with the JSON parameters") def __load_config_file(self, config_file: str): def _load_config_file(self, config_file: str): """Loads the configuration file.""" try: with open(config_file, 'r') as file: Loading Loading @@ -198,25 +211,11 @@ class capif_invoker_connector: + invoker_capif_details["api_invoker_id"] ) signed_key_crt_path = os.path.join( self.invoker_folder, invoker_capif_details["user_name"] + ".crt" ) private_key_path = os.path.join( self.invoker_folder, "private.key" ) path = os.path.join( self.invoker_folder, "ca.crt" ) response = requests.request( "DELETE", url, cert=(signed_key_crt_path, private_key_path), verify=path, cert=(self.signed_key_crt_path, self.private_key_path), verify=self.pathca, ) response.raise_for_status() self.logger.info("Invoker offboarded successfully") Loading @@ -241,7 +240,6 @@ class capif_invoker_connector: self.logger.info( "Creating private and public keys for the Invoker cert") try: private_key_path = os.path.join(self.invoker_folder, "private.key") csr_file_path = os.path.join(self.invoker_folder, "cert_req.csr") Loading @@ -262,7 +260,7 @@ class capif_invoker_connector: with open(csr_file_path, "wb+") as f: f.write(dump_certificate_request(FILETYPE_PEM, req)) public_key = dump_certificate_request(FILETYPE_PEM, req) with open(private_key_path, "wb+") as f: with open(self.private_key_path, "wb+") as f: f.write(dump_privatekey(FILETYPE_PEM, key)) self.logger.info("Keys created successfully") Loading Loading @@ -309,7 +307,7 @@ class capif_invoker_connector: response.raise_for_status() response_payload = json.loads(response.text) ca_root_file_path = os.path.join(self.invoker_folder, "ca.crt") ca_root_file_path = self.pathca ca_root_file = open(ca_root_file_path, "wb+") ca_root_file.write(bytes(response_payload["ca_root"], "utf-8")) self.logger.info( Loading Loading @@ -343,13 +341,12 @@ class capif_invoker_connector: "Authorization": "Bearer {}".format(capif_access_token), "Content-Type": "application/json", } pathca = os.path.join(self.invoker_folder, "ca.crt") response = requests.request( "POST", url, headers=headers, data=payload, verify=pathca, verify=self.pathca, ) response.raise_for_status() response_payload = json.loads(response.text) Loading Loading @@ -447,23 +444,14 @@ class capif_invoker_connector: "Authorization": "Bearer {}".format(capif_access_token), "Content-Type": "application/json", } signed_key_crt_path = os.path.join( self.invoker_folder, self.capif_username + ".crt" ) private_key_path = os.path.join( self.invoker_folder, "private.key" ) pathca = os.path.join(self.invoker_folder, "ca.crt") response = requests.request( "PUT", url, headers=headers, data=payload, cert=(signed_key_crt_path, private_key_path), verify=pathca, cert=(self.signed_key_crt_path, self.private_key_path), verify=self.pathca, ) response.raise_for_status() Loading @@ -476,5 +464,49 @@ class capif_invoker_connector: f"Error during updating Invoker to CAPIF: {e} - Response: {response.text}") raise def _create_or_update_file(self, file_name, file_type, content, mode="w"): """ Create or update a file with the specified content. :param file_name: Name of the file (without extension). :param file_type: File type or extension (e.g., "txt", "json", "html"). :param content: Content to write into the file. Can be a string, dictionary, or list. :param mode: Write mode ('w' to overwrite, 'a' to append). Default is 'w'. """ # Validate the mode if mode not in ["w", "a"]: raise ValueError("Mode must be 'w' (overwrite) or 'a' (append).") # Construct the full file name full_file_name = f"{file_name}.{file_type}" full_path = os.path.join(self.invoker_folder, full_file_name) # Ensure the content is properly formatted if isinstance(content, (dict, list)): if file_type == "json": try: # Serialize content to JSON content = json.dumps(content, indent=4) except TypeError as e: raise ValueError(f"Failed to serialize content to JSON: {e}") else: raise TypeError("Content must be a string when the file type is not JSON.") elif not isinstance(content, str): raise TypeError("Content must be a string, dictionary, or list.") try: # Open the file in the specified mode with open(full_path, mode, encoding="utf-8") as file: file.write(content) # Log success based on the mode if mode == "w": self.logger.info(f"File '{full_file_name}' created or overwritten successfully.") elif mode == "a": self.logger.info(f"Content appended to file '{full_file_name}' successfully.") except Exception as e: self.logger.error(f"Error handling the file '{full_file_name}': {e}") raise scripts/invoker_capif_event_subcription.py 0 → 100644 +21 −0 Original line number Diff line number Diff line import utilities from opencapif_sdk import capif_invoker_connector, capif_invoker_event_feature def showcase_capif_connector(): """ This method showcases how one can use the CAPIFConnector class. """ capif_connector = capif_invoker_connector(config_file=utilities.get_config_file()) capif_connector.onboard_invoker() events = capif_invoker_event_feature(config_file=utilities.get_config_file()) events.create_subscription() print("COMPLETED") if __name__ == "__main__": # Register invoker to CAPIF. This should happen exactly once showcase_capif_connector() scripts/utilities.py +1 −1 Original line number Diff line number Diff line def get_config_file() -> str: return "../config/capif_sdk_config.json" return "../test/capif_sdk_config_sample_test.json" def get_register_file() -> str: Loading Loading
opencapif_sdk/__init__.py +2 −1 Original line number Diff line number Diff line Loading @@ -3,5 +3,6 @@ from opencapif_sdk.capif_provider_connector import capif_provider_connector from opencapif_sdk.service_discoverer import service_discoverer from opencapif_sdk.api_schema_translator import api_schema_translator from opencapif_sdk.capif_logging_feature import capif_logging_feature from opencapif_sdk.capif_event_feature import capif_invoker_event_feature __all__ = ["capif_invoker_connector", "service_discoverer", "capif_provider_connector", "api_schema_translator", "capif_logging_feature"] No newline at end of file __all__ = ["capif_invoker_connector", "service_discoverer", "capif_provider_connector", "api_schema_translator", "capif_logging_feature","capif_invoker_event_feature"] No newline at end of file
opencapif_sdk/capif_event_feature.py +55 −18 Original line number Diff line number Diff line from capif_invoker_connector import capif_invoker_connector from capif_provider_connector import capif_provider_connector from opencapif_sdk import capif_invoker_connector import os import logging import shutil Loading Loading @@ -39,42 +38,80 @@ logging.basicConfig( ] ) class capif_invoker_event_feature(capif_invoker_connector): def create_subscription: def create_subscription(self): invoker_capif_details = self.__load_invoker_api_details() invoker_capif_details = self.invoker_capif_details subscriberId = invoker_capif_details["api_invoker_id"] path = self.capif_https_url + f"/{subscriberId}/subscriptions" payload = self.events_config path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions" payload = { "events": self.events_description, "eventFilters": self.events_filter, "eventReq": {}, # TO IMPROVE !!! "notificationDestination": f"{self.capif_callback_url}", "requestTestNotification": True, "websockNotifConfig": { "websocketUri": f"{self.capif_callback_url}", "requestWebsocketUri": True }, "supportedFeatures": f"{self.supported_features}" } try: response = requests.post( url=path, json=payload, headers={"Content-Type": "application/json"}, cert=cert, cert=(self.signed_key_crt_path, self.private_key_path), verify=os.path.join(self.invoker_folder, "ca.crt") ) response.raise_for_status() location_header = response.headers.get("Location") if location_header: # Extrae el identificador de la URL en el encabezado 'Location' identifier = location_header.rstrip('/').split('/')[-1] self.logger.info(f"Subscriptionid obtained: {identifier}") else: self.logger.error("The Location header is not available in the response") path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json") return response.status_code, response.json() # Load or initialize the subscription list if os.path.exists(path): subscription = self._load_config_file(path) # Ensure subscription is a list if not isinstance(subscription, list): raise TypeError(f"Expected 'subscription' to be a list, but got {type(subscription).__name__}") else: subscription = [] # Find if the subscriberId already exists in the list subscriber_entry = next((item for item in subscription if item.get("subscriberId") == subscriberId), None) if subscriber_entry is None: # If subscriberId is not found, create a new entry subscriber_entry = {"subscriberId": subscriberId, "events": []} subscription.append(subscriber_entry) # Add the event to the subscriber's events list subscriber_entry["events"].append({self.events_description: identifier}) # Save the updated list back to the file self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} # def delete_subcription(self): def delete_subcription: def modify_subcription: def patch_subcription: # def modify_subcription(self): # def patch_subcription(self):
opencapif_sdk/capif_invoker_connector.py +67 −35 Original line number Diff line number Diff line Loading @@ -46,7 +46,7 @@ class capif_invoker_connector: config_file = os.path.abspath(config_file) # Load configuration from file if necessary config = self.__load_config_file(config_file) config = self._load_config_file(config_file) debug_mode = os.getenv('DEBUG_MODE', config.get('debug_mode', 'False')).strip().lower() if debug_mode == "false": Loading Loading @@ -101,7 +101,8 @@ class capif_invoker_connector: # Events configuration events_config = invoker_config.get('events', {}) self.events_config = events_config self.events_description = os.getenv('INVOKER_EVENTS_DESCRIPTION', events_config.get('description', '')) self.events_filter = os.getenv('INVOKER_EVENTS_FILTERS', events_config.get('eventFilters', '')) # Define the invoker folder path and create it if it doesn't exist self.invoker_folder = os.path.join(invoker_general_folder, capif_username) Loading Loading @@ -142,9 +143,21 @@ class capif_invoker_connector: if os.path.exists(path): self.invoker_capif_details = self.__load_invoker_api_details() self.signed_key_crt_path = os.path.join( self.invoker_folder, self.capif_username + ".crt" ) self.private_key_path = os.path.join( self.invoker_folder, "private.key" ) self.pathca = os.path.join(self.invoker_folder, "ca.crt") self.logger.info("capif_invoker_connector initialized with the JSON parameters") def __load_config_file(self, config_file: str): def _load_config_file(self, config_file: str): """Loads the configuration file.""" try: with open(config_file, 'r') as file: Loading Loading @@ -198,25 +211,11 @@ class capif_invoker_connector: + invoker_capif_details["api_invoker_id"] ) signed_key_crt_path = os.path.join( self.invoker_folder, invoker_capif_details["user_name"] + ".crt" ) private_key_path = os.path.join( self.invoker_folder, "private.key" ) path = os.path.join( self.invoker_folder, "ca.crt" ) response = requests.request( "DELETE", url, cert=(signed_key_crt_path, private_key_path), verify=path, cert=(self.signed_key_crt_path, self.private_key_path), verify=self.pathca, ) response.raise_for_status() self.logger.info("Invoker offboarded successfully") Loading @@ -241,7 +240,6 @@ class capif_invoker_connector: self.logger.info( "Creating private and public keys for the Invoker cert") try: private_key_path = os.path.join(self.invoker_folder, "private.key") csr_file_path = os.path.join(self.invoker_folder, "cert_req.csr") Loading @@ -262,7 +260,7 @@ class capif_invoker_connector: with open(csr_file_path, "wb+") as f: f.write(dump_certificate_request(FILETYPE_PEM, req)) public_key = dump_certificate_request(FILETYPE_PEM, req) with open(private_key_path, "wb+") as f: with open(self.private_key_path, "wb+") as f: f.write(dump_privatekey(FILETYPE_PEM, key)) self.logger.info("Keys created successfully") Loading Loading @@ -309,7 +307,7 @@ class capif_invoker_connector: response.raise_for_status() response_payload = json.loads(response.text) ca_root_file_path = os.path.join(self.invoker_folder, "ca.crt") ca_root_file_path = self.pathca ca_root_file = open(ca_root_file_path, "wb+") ca_root_file.write(bytes(response_payload["ca_root"], "utf-8")) self.logger.info( Loading Loading @@ -343,13 +341,12 @@ class capif_invoker_connector: "Authorization": "Bearer {}".format(capif_access_token), "Content-Type": "application/json", } pathca = os.path.join(self.invoker_folder, "ca.crt") response = requests.request( "POST", url, headers=headers, data=payload, verify=pathca, verify=self.pathca, ) response.raise_for_status() response_payload = json.loads(response.text) Loading Loading @@ -447,23 +444,14 @@ class capif_invoker_connector: "Authorization": "Bearer {}".format(capif_access_token), "Content-Type": "application/json", } signed_key_crt_path = os.path.join( self.invoker_folder, self.capif_username + ".crt" ) private_key_path = os.path.join( self.invoker_folder, "private.key" ) pathca = os.path.join(self.invoker_folder, "ca.crt") response = requests.request( "PUT", url, headers=headers, data=payload, cert=(signed_key_crt_path, private_key_path), verify=pathca, cert=(self.signed_key_crt_path, self.private_key_path), verify=self.pathca, ) response.raise_for_status() Loading @@ -476,5 +464,49 @@ class capif_invoker_connector: f"Error during updating Invoker to CAPIF: {e} - Response: {response.text}") raise def _create_or_update_file(self, file_name, file_type, content, mode="w"): """ Create or update a file with the specified content. :param file_name: Name of the file (without extension). :param file_type: File type or extension (e.g., "txt", "json", "html"). :param content: Content to write into the file. Can be a string, dictionary, or list. :param mode: Write mode ('w' to overwrite, 'a' to append). Default is 'w'. """ # Validate the mode if mode not in ["w", "a"]: raise ValueError("Mode must be 'w' (overwrite) or 'a' (append).") # Construct the full file name full_file_name = f"{file_name}.{file_type}" full_path = os.path.join(self.invoker_folder, full_file_name) # Ensure the content is properly formatted if isinstance(content, (dict, list)): if file_type == "json": try: # Serialize content to JSON content = json.dumps(content, indent=4) except TypeError as e: raise ValueError(f"Failed to serialize content to JSON: {e}") else: raise TypeError("Content must be a string when the file type is not JSON.") elif not isinstance(content, str): raise TypeError("Content must be a string, dictionary, or list.") try: # Open the file in the specified mode with open(full_path, mode, encoding="utf-8") as file: file.write(content) # Log success based on the mode if mode == "w": self.logger.info(f"File '{full_file_name}' created or overwritten successfully.") elif mode == "a": self.logger.info(f"Content appended to file '{full_file_name}' successfully.") except Exception as e: self.logger.error(f"Error handling the file '{full_file_name}': {e}") raise
scripts/invoker_capif_event_subcription.py 0 → 100644 +21 −0 Original line number Diff line number Diff line import utilities from opencapif_sdk import capif_invoker_connector, capif_invoker_event_feature def showcase_capif_connector(): """ This method showcases how one can use the CAPIFConnector class. """ capif_connector = capif_invoker_connector(config_file=utilities.get_config_file()) capif_connector.onboard_invoker() events = capif_invoker_event_feature(config_file=utilities.get_config_file()) events.create_subscription() print("COMPLETED") if __name__ == "__main__": # Register invoker to CAPIF. This should happen exactly once showcase_capif_connector()
scripts/utilities.py +1 −1 Original line number Diff line number Diff line def get_config_file() -> str: return "../config/capif_sdk_config.json" return "../test/capif_sdk_config_sample_test.json" def get_register_file() -> str: Loading