Loading src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py +19 −19 Original line number Diff line number Diff line Loading @@ -19,8 +19,8 @@ from sunrise6g_opensdk.edgecloud.core.edgecloud_interface import ( EdgeCloudManagementInterface, ) from sunrise6g_opensdk.edgecloud.core.utils import ( _build_custom_http_response, _ensure_valid_uuid, build_custom_http_response, ensure_valid_uuid, ) from ...adapters.i2edge import schemas as i2edge_schemas Loading Loading @@ -69,7 +69,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): zone = camara_schemas.EdgeCloudZone( # edgeCloudZoneId = camara_schemas.EdgeCloudZoneId(z["zoneId"]), edgeCloudZoneId=camara_schemas.EdgeCloudZoneId( _ensure_valid_uuid(z["zoneId"]) ensure_valid_uuid(z["zoneId"]) ), edgeCloudZoneName=camara_schemas.EdgeCloudZoneName(z["nodeName"]), edgeCloudProvider=camara_schemas.EdgeCloudProvider("i2edge"), Loading @@ -80,7 +80,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): ) camara_response.append(zone) # Wrap into a Response object return _build_custom_http_response( return build_custom_http_response( status_code=response.status_code, content=[zone.model_dump(mode="json") for zone in camara_response], headers={"Content-Type": "application/json"}, Loading Loading @@ -207,7 +207,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "message": "Application onboarded successfully", } log.info("App onboarded successfully") return _build_custom_http_response( return build_custom_http_response( status_code=i2edge_response.status_code, content=camara_payload, headers={"Content-Type": "application/json"}, Loading Loading @@ -355,7 +355,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "geographyDetails": item.get("geographyDetails"), } response_list.append(content) return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=response_list, headers={"Content-Type": self.content_type_gsma}, Loading Loading @@ -391,7 +391,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): ), } response_list.append(content) return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=response_list, headers={"Content-Type": self.content_type_gsma}, Loading @@ -414,7 +414,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): response = i2edge_get(url, params=params) if response.status_code == 200: content = {"acceptedZoneResourceInfo": response.json()} return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=content, headers={"Content-Type": self.content_type_gsma}, Loading Loading @@ -449,7 +449,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "zoneServiceLevelObjsInfo" ), } return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=content, headers={"Content-Type": self.content_type_gsma}, Loading Loading @@ -484,7 +484,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): response = self._create_artefact(**transformed) if response.status_code == 201: return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content={"response": "Artefact uploaded successfully"}, headers={"Content-Type": self.content_type_gsma}, Loading Loading @@ -520,7 +520,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "token": response_json.get("repo_token"), }, } return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=content, headers={"Content-Type": self.content_type_gsma}, Loading @@ -536,7 +536,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): try: response = self._delete_artefact(artefact_id) if response.status_code == 200: return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content='{"response": "Artefact deletion successful"}', headers={"Content-Type": self.content_type_gsma}, Loading @@ -562,7 +562,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): url = "{}/application/onboarding".format(self.base_url) response = i2edge_post(url, payload) if response.status_code == 200: return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content={"response": "Application onboarded successfully"}, headers={"Content-Type": self.content_type_gsma}, Loading @@ -588,7 +588,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "appQoSProfile": profile_data.get("appQoSProfile"), "appComponentSpecs": profile_data.get("appComponentSpecs"), } return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=content, headers={"Content-Type": self.content_type_gsma}, Loading @@ -609,7 +609,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): try: response = self.delete_onboarded_app(app_id) if response.status_code == 200: return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content={"response": "App deletion successful"}, headers={"Content-Type": self.content_type_gsma}, Loading Loading @@ -647,7 +647,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "zoneId": response_json.get("zoneID"), "appInstIdentifier": response_json.get("app_instance_id"), } return self._build_custom_http_response( return self.build_custom_http_response( status_code=202, content=content, headers={"Content-Type": self.content_type_gsma}, Loading Loading @@ -678,7 +678,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "appInstanceState": response_json.get("appInstanceState"), "accesspointInfo": response_json.get("accesspointInfo"), } return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=content, headers={"Content-Type": self.content_type_gsma}, Loading Loading @@ -718,7 +718,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): } ] response_list.append(content) return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=response_list, headers={"Content-Type": self.content_type_gsma}, Loading @@ -741,7 +741,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): url = "{}/application_instance".format(self.base_url) response = i2edge_delete(url, app_instance_id) if response.status_code == 200: return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content={ "response": "Application instance termination request accepted" Loading src/sunrise6g_opensdk/edgecloud/core/utils.py +2 −2 Original line number Diff line number Diff line Loading @@ -16,7 +16,7 @@ from sunrise6g_opensdk import logger log = logger.get_logger(__name__) def _ensure_valid_uuid(value: str) -> str: def ensure_valid_uuid(value: str) -> str: """ Return the original value if it's a valid UUID, or generate a deterministic UUIDv5 from the input string otherwise. Loading @@ -32,7 +32,7 @@ def _ensure_valid_uuid(value: str) -> str: return generated def _build_custom_http_response( def build_custom_http_response( status_code: int, content: str | bytes | dict | list, headers: dict = None, Loading Loading
src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py +19 −19 Original line number Diff line number Diff line Loading @@ -19,8 +19,8 @@ from sunrise6g_opensdk.edgecloud.core.edgecloud_interface import ( EdgeCloudManagementInterface, ) from sunrise6g_opensdk.edgecloud.core.utils import ( _build_custom_http_response, _ensure_valid_uuid, build_custom_http_response, ensure_valid_uuid, ) from ...adapters.i2edge import schemas as i2edge_schemas Loading Loading @@ -69,7 +69,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): zone = camara_schemas.EdgeCloudZone( # edgeCloudZoneId = camara_schemas.EdgeCloudZoneId(z["zoneId"]), edgeCloudZoneId=camara_schemas.EdgeCloudZoneId( _ensure_valid_uuid(z["zoneId"]) ensure_valid_uuid(z["zoneId"]) ), edgeCloudZoneName=camara_schemas.EdgeCloudZoneName(z["nodeName"]), edgeCloudProvider=camara_schemas.EdgeCloudProvider("i2edge"), Loading @@ -80,7 +80,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): ) camara_response.append(zone) # Wrap into a Response object return _build_custom_http_response( return build_custom_http_response( status_code=response.status_code, content=[zone.model_dump(mode="json") for zone in camara_response], headers={"Content-Type": "application/json"}, Loading Loading @@ -207,7 +207,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "message": "Application onboarded successfully", } log.info("App onboarded successfully") return _build_custom_http_response( return build_custom_http_response( status_code=i2edge_response.status_code, content=camara_payload, headers={"Content-Type": "application/json"}, Loading Loading @@ -355,7 +355,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "geographyDetails": item.get("geographyDetails"), } response_list.append(content) return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=response_list, headers={"Content-Type": self.content_type_gsma}, Loading Loading @@ -391,7 +391,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): ), } response_list.append(content) return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=response_list, headers={"Content-Type": self.content_type_gsma}, Loading @@ -414,7 +414,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): response = i2edge_get(url, params=params) if response.status_code == 200: content = {"acceptedZoneResourceInfo": response.json()} return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=content, headers={"Content-Type": self.content_type_gsma}, Loading Loading @@ -449,7 +449,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "zoneServiceLevelObjsInfo" ), } return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=content, headers={"Content-Type": self.content_type_gsma}, Loading Loading @@ -484,7 +484,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): response = self._create_artefact(**transformed) if response.status_code == 201: return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content={"response": "Artefact uploaded successfully"}, headers={"Content-Type": self.content_type_gsma}, Loading Loading @@ -520,7 +520,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "token": response_json.get("repo_token"), }, } return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=content, headers={"Content-Type": self.content_type_gsma}, Loading @@ -536,7 +536,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): try: response = self._delete_artefact(artefact_id) if response.status_code == 200: return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content='{"response": "Artefact deletion successful"}', headers={"Content-Type": self.content_type_gsma}, Loading @@ -562,7 +562,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): url = "{}/application/onboarding".format(self.base_url) response = i2edge_post(url, payload) if response.status_code == 200: return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content={"response": "Application onboarded successfully"}, headers={"Content-Type": self.content_type_gsma}, Loading @@ -588,7 +588,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "appQoSProfile": profile_data.get("appQoSProfile"), "appComponentSpecs": profile_data.get("appComponentSpecs"), } return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=content, headers={"Content-Type": self.content_type_gsma}, Loading @@ -609,7 +609,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): try: response = self.delete_onboarded_app(app_id) if response.status_code == 200: return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content={"response": "App deletion successful"}, headers={"Content-Type": self.content_type_gsma}, Loading Loading @@ -647,7 +647,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "zoneId": response_json.get("zoneID"), "appInstIdentifier": response_json.get("app_instance_id"), } return self._build_custom_http_response( return self.build_custom_http_response( status_code=202, content=content, headers={"Content-Type": self.content_type_gsma}, Loading Loading @@ -678,7 +678,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "appInstanceState": response_json.get("appInstanceState"), "accesspointInfo": response_json.get("accesspointInfo"), } return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=content, headers={"Content-Type": self.content_type_gsma}, Loading Loading @@ -718,7 +718,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): } ] response_list.append(content) return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content=response_list, headers={"Content-Type": self.content_type_gsma}, Loading @@ -741,7 +741,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): url = "{}/application_instance".format(self.base_url) response = i2edge_delete(url, app_instance_id) if response.status_code == 200: return self._build_custom_http_response( return self.build_custom_http_response( status_code=200, content={ "response": "Application instance termination request accepted" Loading
src/sunrise6g_opensdk/edgecloud/core/utils.py +2 −2 Original line number Diff line number Diff line Loading @@ -16,7 +16,7 @@ from sunrise6g_opensdk import logger log = logger.get_logger(__name__) def _ensure_valid_uuid(value: str) -> str: def ensure_valid_uuid(value: str) -> str: """ Return the original value if it's a valid UUID, or generate a deterministic UUIDv5 from the input string otherwise. Loading @@ -32,7 +32,7 @@ def _ensure_valid_uuid(value: str) -> str: return generated def _build_custom_http_response( def build_custom_http_response( status_code: int, content: str | bytes | dict | list, headers: dict = None, Loading