diff --git a/opencapif_sdk/capif_event_feature.py b/opencapif_sdk/capif_event_feature.py index 14263803a53905616010e238895998755bbdee1b..09f8ee58d5019e948e27e2af962e7ac78859d6a5 100644 --- a/opencapif_sdk/capif_event_feature.py +++ b/opencapif_sdk/capif_event_feature.py @@ -41,7 +41,7 @@ logging.basicConfig( class capif_invoker_event_feature(capif_invoker_connector): - def create_subscription(self): + def create_subscription(self, name): invoker_capif_details = self.invoker_capif_details @@ -79,39 +79,140 @@ class capif_invoker_event_feature(capif_invoker_connector): self.logger.info(f"Subscriptionid obtained: {identifier}") else: self.logger.error("The Location header is not available in the response") - + path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json") - # Load or initialize the subscription list + # Load or initialize the subscription dictionary + # Load or initialize the subscription dictionary if os.path.exists(path): subscription = self._load_config_file(path) - # Ensure subscription is a list - if not isinstance(subscription, list): - raise TypeError(f"Expected 'subscription' to be a list, but got {type(subscription).__name__}") + if not isinstance(subscription, dict): + raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}") else: - subscription = [] + subscription = {} - # Find if the subscriberId already exists in the list - subscriber_entry = next((item for item in subscription if item.get("subscriberId") == subscriberId), None) + if not isinstance(subscriberId, (str, int)): + raise TypeError(f"Expected 'subscriberId' to be a string or integer, but got {type(subscriberId).__name__}") - if subscriber_entry is None: - # If subscriberId is not found, create a new entry - subscriber_entry = {"subscriberId": subscriberId, "events": []} - subscription.append(subscriber_entry) + # Convert events_description to a string if it isn't already + if not isinstance(name, str): + name = str(name) - # Add the event to the subscriber's events list - subscriber_entry["events"].append({self.events_description: identifier}) + # Update the subscription structure + subscription[str(subscriberId)] = { + f"{name}": identifier + } - # Save the updated list back to the file + # Save the updated dictionary back to the file self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} - # def delete_subcription(self): + def delete_subscription(self, name): + invoker_capif_details = self.invoker_capif_details + + subscriberId = invoker_capif_details["api_invoker_id"] + + path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json") + + if os.path.exists(path): + subscription = self._load_config_file(path) + if not isinstance(subscription, dict): + raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}") + + if subscriberId in subscription and name in subscription[subscriberId]: + identifier = subscription[subscriberId][name] + + # Attempt to delete the subscription from CAPIF + delete_path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions/{identifier}" + + try: + response = requests.delete( + url=delete_path, + headers={"Content-Type": "application/json"}, + cert=(self.signed_key_crt_path, self.private_key_path), + verify=os.path.join(self.invoker_folder, "ca.crt") + ) + response.raise_for_status() + + # Remove the service entry from the subscription dictionary + del subscription[subscriberId][name] + + # If no more services exist for the subscriber, remove the subscriber entry + if not subscription[subscriberId]: + del subscription[subscriberId] + + # Save the updated dictionary back to the file + self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w") + + self.logger.info(f"Successfully deleted subscription for service '{name}'") + + except Exception as e: + self.logger.error("Unexpected error: %s", e) + return None, {"error": f"Unexpected error: {e}"} + + else: + self.logger.warning(f"Service '{name}' not found for subscriber '{subscriberId}'") + return None, {"error": f"Service '{name}' not found for subscriber '{subscriberId}'"} + else: + self.logger.error("Subscription file not found at path: %s", path) + return None, {"error": "Subscription file not found"} + + def update_subcription(self, name): + invoker_capif_details = self.invoker_capif_details + + subscriberId = invoker_capif_details["api_invoker_id"] - # def modify_subcription(self): + path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json") - # def patch_subcription(self): + payload = { + "events": self.events_description, + "eventFilters": self.events_filter, + "eventReq": {}, # TO IMPROVE !!! + "notificationDestination": f"{self.capif_callback_url}", + "requestTestNotification": True, + "websockNotifConfig": { + "websocketUri": f"{self.capif_callback_url}", + "requestWebsocketUri": True + }, + "supportedFeatures": f"{self.supported_features}" + } + if os.path.exists(path): + subscription = self._load_config_file(path) + if not isinstance(subscription, dict): + raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}") + + if subscriberId in subscription and name in subscription[subscriberId]: + identifier = subscription[subscriberId][name] + + # Attempt to delete the subscription from CAPIF + put_path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions/{identifier}" + + try: + response = requests.put( + url=put_path, + json=payload, + headers={"Content-Type": "application/json"}, + cert=(self.signed_key_crt_path, self.private_key_path), + verify=os.path.join(self.invoker_folder, "ca.crt") + ) + response.raise_for_status() + + self.logger.info(f"Successfully updated subscription for service '{name}'") + + except Exception as e: + self.logger.error("Unexpected error: %s", e) + return None, {"error": f"Unexpected error: {e}"} + + else: + self.logger.warning(f"Service '{name}' not found for subscriber '{subscriberId}'") + return None, {"error": f"Service '{name}' not found for subscriber '{subscriberId}'"} + else: + self.logger.error("Subscription file not found at path: %s", path) + return None, {"error": "Subscription file not found"} + + def patch_subcription(self, name): + self.update_subcription(self, name) diff --git a/scripts/invoker_capif_event_subcription.py b/scripts/invoker_capif_event_subcription.py index d3ffec82038f8c4b12dec0e5e100d4d88afa25c0..d9dfd4c6d2fd672cbda2ec3c12a3625056f3b3f3 100644 --- a/scripts/invoker_capif_event_subcription.py +++ b/scripts/invoker_capif_event_subcription.py @@ -1,6 +1,7 @@ import utilities from opencapif_sdk import capif_invoker_connector, capif_invoker_event_feature + def showcase_capif_connector(): """ This method showcases how one can use the CAPIFConnector class. @@ -12,7 +13,11 @@ def showcase_capif_connector(): events = capif_invoker_event_feature(config_file=utilities.get_config_file()) - events.create_subscription() + events.create_subscription(name="Servicio_2") + + events.update_subcription(name="Servicio_2") + + events.delete_subscription(name="Servicio_2") print("COMPLETED")