Loading opencapif_sdk/capif_event_feature.py +120 −19 Original line number Diff line number Diff line Loading @@ -41,7 +41,7 @@ logging.basicConfig( class capif_invoker_event_feature(capif_invoker_connector): def create_subscription(self): def create_subscription(self, name): invoker_capif_details = self.invoker_capif_details Loading Loading @@ -82,36 +82,137 @@ class capif_invoker_event_feature(capif_invoker_connector): path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json") # Load or initialize the subscription list # Load or initialize the subscription dictionary # Load or initialize the subscription dictionary if os.path.exists(path): subscription = self._load_config_file(path) # Ensure subscription is a list if not isinstance(subscription, list): raise TypeError(f"Expected 'subscription' to be a list, but got {type(subscription).__name__}") if not isinstance(subscription, dict): raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}") else: subscription = [] subscription = {} # Find if the subscriberId already exists in the list subscriber_entry = next((item for item in subscription if item.get("subscriberId") == subscriberId), None) if not isinstance(subscriberId, (str, int)): raise TypeError(f"Expected 'subscriberId' to be a string or integer, but got {type(subscriberId).__name__}") if subscriber_entry is None: # If subscriberId is not found, create a new entry subscriber_entry = {"subscriberId": subscriberId, "events": []} subscription.append(subscriber_entry) # Convert events_description to a string if it isn't already if not isinstance(name, str): name = str(name) # Add the event to the subscriber's events list subscriber_entry["events"].append({self.events_description: identifier}) # Update the subscription structure subscription[str(subscriberId)] = { f"{name}": identifier } # Save the updated dictionary back to the file self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} def delete_subscription(self, name): invoker_capif_details = self.invoker_capif_details subscriberId = invoker_capif_details["api_invoker_id"] path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json") if os.path.exists(path): subscription = self._load_config_file(path) if not isinstance(subscription, dict): raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}") if subscriberId in subscription and name in subscription[subscriberId]: identifier = subscription[subscriberId][name] # Save the updated list back to the file # Attempt to delete the subscription from CAPIF delete_path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions/{identifier}" try: response = requests.delete( url=delete_path, headers={"Content-Type": "application/json"}, cert=(self.signed_key_crt_path, self.private_key_path), verify=os.path.join(self.invoker_folder, "ca.crt") ) response.raise_for_status() # Remove the service entry from the subscription dictionary del subscription[subscriberId][name] # If no more services exist for the subscriber, remove the subscriber entry if not subscription[subscriberId]: del subscription[subscriberId] # Save the updated dictionary back to the file self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w") self.logger.info(f"Successfully deleted subscription for service '{name}'") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} # def delete_subcription(self): else: self.logger.warning(f"Service '{name}' not found for subscriber '{subscriberId}'") return None, {"error": f"Service '{name}' not found for subscriber '{subscriberId}'"} else: self.logger.error("Subscription file not found at path: %s", path) return None, {"error": "Subscription file not found"} def update_subcription(self, name): invoker_capif_details = self.invoker_capif_details subscriberId = invoker_capif_details["api_invoker_id"] path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json") payload = { "events": self.events_description, "eventFilters": self.events_filter, "eventReq": {}, # TO IMPROVE !!! "notificationDestination": f"{self.capif_callback_url}", "requestTestNotification": True, "websockNotifConfig": { "websocketUri": f"{self.capif_callback_url}", "requestWebsocketUri": True }, "supportedFeatures": f"{self.supported_features}" } if os.path.exists(path): subscription = self._load_config_file(path) if not isinstance(subscription, dict): raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}") if subscriberId in subscription and name in subscription[subscriberId]: identifier = subscription[subscriberId][name] # Attempt to delete the subscription from CAPIF put_path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions/{identifier}" # def modify_subcription(self): try: response = requests.put( url=put_path, json=payload, headers={"Content-Type": "application/json"}, cert=(self.signed_key_crt_path, self.private_key_path), verify=os.path.join(self.invoker_folder, "ca.crt") ) response.raise_for_status() self.logger.info(f"Successfully updated subscription for service '{name}'") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} else: self.logger.warning(f"Service '{name}' not found for subscriber '{subscriberId}'") return None, {"error": f"Service '{name}' not found for subscriber '{subscriberId}'"} else: self.logger.error("Subscription file not found at path: %s", path) return None, {"error": "Subscription file not found"} # def patch_subcription(self): def patch_subcription(self, name): self.update_subcription(self, name) scripts/invoker_capif_event_subcription.py +6 −1 Original line number Diff line number Diff line import utilities from opencapif_sdk import capif_invoker_connector, capif_invoker_event_feature def showcase_capif_connector(): """ This method showcases how one can use the CAPIFConnector class. Loading @@ -12,7 +13,11 @@ def showcase_capif_connector(): events = capif_invoker_event_feature(config_file=utilities.get_config_file()) events.create_subscription() events.create_subscription(name="Servicio_2") events.update_subcription(name="Servicio_2") events.delete_subscription(name="Servicio_2") print("COMPLETED") Loading Loading
opencapif_sdk/capif_event_feature.py +120 −19 Original line number Diff line number Diff line Loading @@ -41,7 +41,7 @@ logging.basicConfig( class capif_invoker_event_feature(capif_invoker_connector): def create_subscription(self): def create_subscription(self, name): invoker_capif_details = self.invoker_capif_details Loading Loading @@ -82,36 +82,137 @@ class capif_invoker_event_feature(capif_invoker_connector): path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json") # Load or initialize the subscription list # Load or initialize the subscription dictionary # Load or initialize the subscription dictionary if os.path.exists(path): subscription = self._load_config_file(path) # Ensure subscription is a list if not isinstance(subscription, list): raise TypeError(f"Expected 'subscription' to be a list, but got {type(subscription).__name__}") if not isinstance(subscription, dict): raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}") else: subscription = [] subscription = {} # Find if the subscriberId already exists in the list subscriber_entry = next((item for item in subscription if item.get("subscriberId") == subscriberId), None) if not isinstance(subscriberId, (str, int)): raise TypeError(f"Expected 'subscriberId' to be a string or integer, but got {type(subscriberId).__name__}") if subscriber_entry is None: # If subscriberId is not found, create a new entry subscriber_entry = {"subscriberId": subscriberId, "events": []} subscription.append(subscriber_entry) # Convert events_description to a string if it isn't already if not isinstance(name, str): name = str(name) # Add the event to the subscriber's events list subscriber_entry["events"].append({self.events_description: identifier}) # Update the subscription structure subscription[str(subscriberId)] = { f"{name}": identifier } # Save the updated dictionary back to the file self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} def delete_subscription(self, name): invoker_capif_details = self.invoker_capif_details subscriberId = invoker_capif_details["api_invoker_id"] path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json") if os.path.exists(path): subscription = self._load_config_file(path) if not isinstance(subscription, dict): raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}") if subscriberId in subscription and name in subscription[subscriberId]: identifier = subscription[subscriberId][name] # Save the updated list back to the file # Attempt to delete the subscription from CAPIF delete_path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions/{identifier}" try: response = requests.delete( url=delete_path, headers={"Content-Type": "application/json"}, cert=(self.signed_key_crt_path, self.private_key_path), verify=os.path.join(self.invoker_folder, "ca.crt") ) response.raise_for_status() # Remove the service entry from the subscription dictionary del subscription[subscriberId][name] # If no more services exist for the subscriber, remove the subscriber entry if not subscription[subscriberId]: del subscription[subscriberId] # Save the updated dictionary back to the file self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w") self.logger.info(f"Successfully deleted subscription for service '{name}'") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} # def delete_subcription(self): else: self.logger.warning(f"Service '{name}' not found for subscriber '{subscriberId}'") return None, {"error": f"Service '{name}' not found for subscriber '{subscriberId}'"} else: self.logger.error("Subscription file not found at path: %s", path) return None, {"error": "Subscription file not found"} def update_subcription(self, name): invoker_capif_details = self.invoker_capif_details subscriberId = invoker_capif_details["api_invoker_id"] path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json") payload = { "events": self.events_description, "eventFilters": self.events_filter, "eventReq": {}, # TO IMPROVE !!! "notificationDestination": f"{self.capif_callback_url}", "requestTestNotification": True, "websockNotifConfig": { "websocketUri": f"{self.capif_callback_url}", "requestWebsocketUri": True }, "supportedFeatures": f"{self.supported_features}" } if os.path.exists(path): subscription = self._load_config_file(path) if not isinstance(subscription, dict): raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}") if subscriberId in subscription and name in subscription[subscriberId]: identifier = subscription[subscriberId][name] # Attempt to delete the subscription from CAPIF put_path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions/{identifier}" # def modify_subcription(self): try: response = requests.put( url=put_path, json=payload, headers={"Content-Type": "application/json"}, cert=(self.signed_key_crt_path, self.private_key_path), verify=os.path.join(self.invoker_folder, "ca.crt") ) response.raise_for_status() self.logger.info(f"Successfully updated subscription for service '{name}'") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} else: self.logger.warning(f"Service '{name}' not found for subscriber '{subscriberId}'") return None, {"error": f"Service '{name}' not found for subscriber '{subscriberId}'"} else: self.logger.error("Subscription file not found at path: %s", path) return None, {"error": "Subscription file not found"} # def patch_subcription(self): def patch_subcription(self, name): self.update_subcription(self, name)
scripts/invoker_capif_event_subcription.py +6 −1 Original line number Diff line number Diff line import utilities from opencapif_sdk import capif_invoker_connector, capif_invoker_event_feature def showcase_capif_connector(): """ This method showcases how one can use the CAPIFConnector class. Loading @@ -12,7 +13,11 @@ def showcase_capif_connector(): events = capif_invoker_event_feature(config_file=utilities.get_config_file()) events.create_subscription() events.create_subscription(name="Servicio_2") events.update_subcription(name="Servicio_2") events.delete_subscription(name="Servicio_2") print("COMPLETED") Loading