Loading src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py +44 −6 Original line number Diff line number Diff line Loading @@ -10,10 +10,12 @@ import json from copy import deepcopy from typing import Dict, List, Optional from uuid import NAMESPACE_DNS, UUID, uuid5 from requests import Response from sunrise6g_opensdk import logger from sunrise6g_opensdk.edgecloud.core import schemas as camara from sunrise6g_opensdk.edgecloud.core.edgecloud_interface import ( EdgeCloudManagementInterface, ) Loading @@ -30,6 +32,21 @@ from .common import ( log = logger.get_logger(__name__) # TODO: Workaround to avoid the SDK crash when ZoneId is not a valid UUID (e.g. Omega) def _ensure_valid_uuid(value: str) -> str: """ Return the original value if it's a valid UUID, or generate a deterministic UUIDv5 from the input string otherwise. """ try: UUID(value) return value except ValueError: generated = str(uuid5(NAMESPACE_DNS, value)) log.warning(f"Invalid UUID '{value}' – using generated UUIDv5: {generated}") return generated class EdgeApplicationManager(EdgeCloudManagementInterface): """ i2Edge Client Loading @@ -41,18 +58,37 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): self.content_type_gsma = "application/json" self.encoding_gsma = "utf-8" # -------------------------------------------------------------------- # CAMARA Edge Cloud Management Functions # -------------------------------------------------------------------- def get_edge_cloud_zones( self, region: Optional[str] = None, status: Optional[str] = None ) -> list[dict]: url = "{}/zones/list".format(self.base_url) ) -> list[camara.EdgeCloudZone]: url = f"{self.base_url}/zones/list" params = {} try: response = i2edge_get(url, params=params) i2edge_response = i2edge_get(url, params=params).json() log.info("Availability zones retrieved successfully") return response # Normalise to CAMARA format camara_response = [] for z in i2edge_response: edgeCloudZoneId = camara.EdgeCloudZoneId( _ensure_valid_uuid(z["zoneId"]) ) zone = camara.EdgeCloudZone( edgeCloudZoneId=edgeCloudZoneId, edgeCloudZoneName=camara.EdgeCloudZoneName(z["nodeName"]), edgeCloudProvider=camara.EdgeCloudProvider("i2edge"), edgeCloudRegion=camara.EdgeCloudRegion(z["geographyDetails"]), edgeCloudZoneStatus=camara.EdgeCloudZoneStatus.unknown, ) camara_response.append(zone) return camara_response except I2EdgeError as e: raise e log.error(f"Failed to retrieve edge cloud zones: {e}") raise # TODO: Delete it def get_edge_cloud_zones_details( self, zone_id: str, flavour_id: Optional[str] = None ) -> Dict: Loading Loading @@ -250,7 +286,9 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): except I2EdgeError as e: raise e # GSMA FM # -------------------------------------------------------------------- # EWBI GSMA OPG FUNCTIONS # -------------------------------------------------------------------- # FederationManagement Loading src/sunrise6g_opensdk/edgecloud/core/edgecloud_interface.py +5 −1 Original line number Diff line number Diff line Loading @@ -10,6 +10,10 @@ from abc import ABC, abstractmethod from typing import Dict, List, Optional from .schemas import ( EdgeCloudZone, ) class EdgeCloudManagementInterface(ABC): """ Loading Loading @@ -96,7 +100,7 @@ class EdgeCloudManagementInterface(ABC): @abstractmethod def get_edge_cloud_zones( self, region: Optional[str] = None, status: Optional[str] = None ) -> List[Dict]: ) -> List[EdgeCloudZone]: """ Retrieves a list of available Edge Cloud Zones. Loading Loading
src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py +44 −6 Original line number Diff line number Diff line Loading @@ -10,10 +10,12 @@ import json from copy import deepcopy from typing import Dict, List, Optional from uuid import NAMESPACE_DNS, UUID, uuid5 from requests import Response from sunrise6g_opensdk import logger from sunrise6g_opensdk.edgecloud.core import schemas as camara from sunrise6g_opensdk.edgecloud.core.edgecloud_interface import ( EdgeCloudManagementInterface, ) Loading @@ -30,6 +32,21 @@ from .common import ( log = logger.get_logger(__name__) # TODO: Workaround to avoid the SDK crash when ZoneId is not a valid UUID (e.g. Omega) def _ensure_valid_uuid(value: str) -> str: """ Return the original value if it's a valid UUID, or generate a deterministic UUIDv5 from the input string otherwise. """ try: UUID(value) return value except ValueError: generated = str(uuid5(NAMESPACE_DNS, value)) log.warning(f"Invalid UUID '{value}' – using generated UUIDv5: {generated}") return generated class EdgeApplicationManager(EdgeCloudManagementInterface): """ i2Edge Client Loading @@ -41,18 +58,37 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): self.content_type_gsma = "application/json" self.encoding_gsma = "utf-8" # -------------------------------------------------------------------- # CAMARA Edge Cloud Management Functions # -------------------------------------------------------------------- def get_edge_cloud_zones( self, region: Optional[str] = None, status: Optional[str] = None ) -> list[dict]: url = "{}/zones/list".format(self.base_url) ) -> list[camara.EdgeCloudZone]: url = f"{self.base_url}/zones/list" params = {} try: response = i2edge_get(url, params=params) i2edge_response = i2edge_get(url, params=params).json() log.info("Availability zones retrieved successfully") return response # Normalise to CAMARA format camara_response = [] for z in i2edge_response: edgeCloudZoneId = camara.EdgeCloudZoneId( _ensure_valid_uuid(z["zoneId"]) ) zone = camara.EdgeCloudZone( edgeCloudZoneId=edgeCloudZoneId, edgeCloudZoneName=camara.EdgeCloudZoneName(z["nodeName"]), edgeCloudProvider=camara.EdgeCloudProvider("i2edge"), edgeCloudRegion=camara.EdgeCloudRegion(z["geographyDetails"]), edgeCloudZoneStatus=camara.EdgeCloudZoneStatus.unknown, ) camara_response.append(zone) return camara_response except I2EdgeError as e: raise e log.error(f"Failed to retrieve edge cloud zones: {e}") raise # TODO: Delete it def get_edge_cloud_zones_details( self, zone_id: str, flavour_id: Optional[str] = None ) -> Dict: Loading Loading @@ -250,7 +286,9 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): except I2EdgeError as e: raise e # GSMA FM # -------------------------------------------------------------------- # EWBI GSMA OPG FUNCTIONS # -------------------------------------------------------------------- # FederationManagement Loading
src/sunrise6g_opensdk/edgecloud/core/edgecloud_interface.py +5 −1 Original line number Diff line number Diff line Loading @@ -10,6 +10,10 @@ from abc import ABC, abstractmethod from typing import Dict, List, Optional from .schemas import ( EdgeCloudZone, ) class EdgeCloudManagementInterface(ABC): """ Loading Loading @@ -96,7 +100,7 @@ class EdgeCloudManagementInterface(ABC): @abstractmethod def get_edge_cloud_zones( self, region: Optional[str] = None, status: Optional[str] = None ) -> List[Dict]: ) -> List[EdgeCloudZone]: """ Retrieves a list of available Edge Cloud Zones. Loading