Loading src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py +32 −3 Original line number Diff line number Diff line Loading @@ -248,6 +248,17 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): # GSMA FM # FederationManagement def get_edge_cloud_zones_gsma(self, federation_context_id: str) -> Dict: url = "{}/zones".format(self.base_url) params = {} try: response = i2edge_get(url, params=params) return response except I2EdgeError as e: raise e # AvailabilityZoneInfoSynchronization def get_edge_cloud_zone_details_gsma( Loading @@ -257,7 +268,6 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): params = {} try: response = i2edge_get(url, params=params) log.info("Zone metadata retrieved successfully") return response except I2EdgeError as e: raise e Loading Loading @@ -335,6 +345,11 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): except KeyError as e: raise I2EdgeError(f"Missing appId in GSMA payload: {e}") def patch_onboarded_app_gsma( self, federation_context_id: str, app_id: str, request_body: dict ) -> Dict: pass def delete_onboarded_app_gsma(self, federation_context_id: str, app_id: str): try: response = self.delete_onboarded_app(app_id) Loading @@ -359,7 +374,6 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): payload = schemas.AppDeploy( app_deploy_data=app_deploy_data, app_parameters={"namespace": "test"} ) print(payload) url = "{}/application_instance".format(self.base_url) response = i2edge_post(url, payload) return response Loading @@ -377,11 +391,26 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): url = "{}/application_instance/{}/{}".format( self.base_url, zone_id, app_instance_id ) response = i2edge_get(url) params = {} response = i2edge_get(url, params=params) return response except KeyError as e: raise I2EdgeError(f"Missing appId or zoneId in GSMA payload: {e}") def get_all_deployed_apps_gsma( self, federation_context_id: str, app_id: str, app_provider: str, ): try: url = "{}/application_instances".format(self.base_url) params = {} response = i2edge_get(url, params=params) return response except KeyError as e: raise I2EdgeError(f"Error retrieving apps: {e}") def undeploy_app_gsma( self, federation_context_id: str, Loading src/sunrise6g_opensdk/edgecloud/adapters/i2edge/common.py +17 −0 Original line number Diff line number Diff line Loading @@ -53,6 +53,23 @@ def i2edge_post(url: str, model_payload: BaseModel) -> dict: raise I2EdgeError(err_msg) def i2edge_put(url: str, model_payload: BaseModel) -> dict: headers = { "Content-Type": "application/json", "accept": "application/json", } json_payload = json.dumps(model_payload.model_dump(mode="json")) try: response = requests.put(url, data=json_payload, headers=headers) response.raise_for_status() return response except requests.exceptions.HTTPError as e: i2edge_err_msg = get_error_message_from(response) err_msg = "Failed to patch: {}. Detail: {}".format(i2edge_err_msg, e) log.error(err_msg) raise I2EdgeError(err_msg) def i2edge_post_multiform_data(url: str, model_payload: BaseModel) -> dict: headers = { "accept": "application/json", Loading src/sunrise6g_opensdk/edgecloud/core/edgecloud_interface.py +43 −0 Original line number Diff line number Diff line Loading @@ -121,6 +121,18 @@ class EdgeCloudManagementInterface(ABC): # --- GSMA-specific methods --- # FederationManagement @abstractmethod def get_edge_cloud_zones_gsma(self, federation_context_id: str) -> List: """ Retrieves details of Zones :param federation_context_id: Identifier of the federation context. :return: List. """ pass # AvailabilityZoneInfoSynchronization @abstractmethod Loading Loading @@ -204,6 +216,20 @@ class EdgeCloudManagementInterface(ABC): """ pass @abstractmethod def patch_onboarded_app_gsma( self, federation_context_id: str, app_id: str, request_body: dict ) -> Dict: """ Updates partner OP about changes in application compute resource requirements, QOS Profile, associated descriptor or change in associated components :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the application onboarded. :return: """ pass @abstractmethod def delete_onboarded_app_gsma(self, federation_context_id: str, app_id: str): """ Loading Loading @@ -249,6 +275,23 @@ class EdgeCloudManagementInterface(ABC): """ pass @abstractmethod def get_all_deployed_apps_gsma( self, federation_context_id: str, app_id: str, app_provider: str, ): """ Retrieves all application instance of partner OP :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the app. :param app_provider: App provider :return: """ pass @abstractmethod def undeploy_app_gsma( self, Loading Loading
src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py +32 −3 Original line number Diff line number Diff line Loading @@ -248,6 +248,17 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): # GSMA FM # FederationManagement def get_edge_cloud_zones_gsma(self, federation_context_id: str) -> Dict: url = "{}/zones".format(self.base_url) params = {} try: response = i2edge_get(url, params=params) return response except I2EdgeError as e: raise e # AvailabilityZoneInfoSynchronization def get_edge_cloud_zone_details_gsma( Loading @@ -257,7 +268,6 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): params = {} try: response = i2edge_get(url, params=params) log.info("Zone metadata retrieved successfully") return response except I2EdgeError as e: raise e Loading Loading @@ -335,6 +345,11 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): except KeyError as e: raise I2EdgeError(f"Missing appId in GSMA payload: {e}") def patch_onboarded_app_gsma( self, federation_context_id: str, app_id: str, request_body: dict ) -> Dict: pass def delete_onboarded_app_gsma(self, federation_context_id: str, app_id: str): try: response = self.delete_onboarded_app(app_id) Loading @@ -359,7 +374,6 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): payload = schemas.AppDeploy( app_deploy_data=app_deploy_data, app_parameters={"namespace": "test"} ) print(payload) url = "{}/application_instance".format(self.base_url) response = i2edge_post(url, payload) return response Loading @@ -377,11 +391,26 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): url = "{}/application_instance/{}/{}".format( self.base_url, zone_id, app_instance_id ) response = i2edge_get(url) params = {} response = i2edge_get(url, params=params) return response except KeyError as e: raise I2EdgeError(f"Missing appId or zoneId in GSMA payload: {e}") def get_all_deployed_apps_gsma( self, federation_context_id: str, app_id: str, app_provider: str, ): try: url = "{}/application_instances".format(self.base_url) params = {} response = i2edge_get(url, params=params) return response except KeyError as e: raise I2EdgeError(f"Error retrieving apps: {e}") def undeploy_app_gsma( self, federation_context_id: str, Loading
src/sunrise6g_opensdk/edgecloud/adapters/i2edge/common.py +17 −0 Original line number Diff line number Diff line Loading @@ -53,6 +53,23 @@ def i2edge_post(url: str, model_payload: BaseModel) -> dict: raise I2EdgeError(err_msg) def i2edge_put(url: str, model_payload: BaseModel) -> dict: headers = { "Content-Type": "application/json", "accept": "application/json", } json_payload = json.dumps(model_payload.model_dump(mode="json")) try: response = requests.put(url, data=json_payload, headers=headers) response.raise_for_status() return response except requests.exceptions.HTTPError as e: i2edge_err_msg = get_error_message_from(response) err_msg = "Failed to patch: {}. Detail: {}".format(i2edge_err_msg, e) log.error(err_msg) raise I2EdgeError(err_msg) def i2edge_post_multiform_data(url: str, model_payload: BaseModel) -> dict: headers = { "accept": "application/json", Loading
src/sunrise6g_opensdk/edgecloud/core/edgecloud_interface.py +43 −0 Original line number Diff line number Diff line Loading @@ -121,6 +121,18 @@ class EdgeCloudManagementInterface(ABC): # --- GSMA-specific methods --- # FederationManagement @abstractmethod def get_edge_cloud_zones_gsma(self, federation_context_id: str) -> List: """ Retrieves details of Zones :param federation_context_id: Identifier of the federation context. :return: List. """ pass # AvailabilityZoneInfoSynchronization @abstractmethod Loading Loading @@ -204,6 +216,20 @@ class EdgeCloudManagementInterface(ABC): """ pass @abstractmethod def patch_onboarded_app_gsma( self, federation_context_id: str, app_id: str, request_body: dict ) -> Dict: """ Updates partner OP about changes in application compute resource requirements, QOS Profile, associated descriptor or change in associated components :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the application onboarded. :return: """ pass @abstractmethod def delete_onboarded_app_gsma(self, federation_context_id: str, app_id: str): """ Loading Loading @@ -249,6 +275,23 @@ class EdgeCloudManagementInterface(ABC): """ pass @abstractmethod def get_all_deployed_apps_gsma( self, federation_context_id: str, app_id: str, app_provider: str, ): """ Retrieves all application instance of partner OP :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the app. :param app_provider: App provider :return: """ pass @abstractmethod def undeploy_app_gsma( self, Loading