Commit a2327b8a authored by Adrian Pino's avatar Adrian Pino
Browse files

Make get edge cloud zones + onboard app CAMARA compliant. Update helper

Rename _build_custom_gsma_response to _build_custom_http_response
parent 291d98b8
Loading
Loading
Loading
Loading
+152 −76
Original line number Diff line number Diff line
@@ -12,15 +12,16 @@ from copy import deepcopy
from typing import Dict, List, Optional
from uuid import NAMESPACE_DNS, UUID, uuid5

from pydantic import ValidationError
from requests import Response

from sunrise6g_opensdk import logger
from sunrise6g_opensdk.edgecloud.core import schemas as camara
from sunrise6g_opensdk.edgecloud.core import schemas as camara_schemas
from sunrise6g_opensdk.edgecloud.core.edgecloud_interface import (
    EdgeCloudManagementInterface,
)

from ...adapters.i2edge import schemas
from ...adapters.i2edge import schemas as i2edge_schemas
from .common import (
    I2EdgeError,
    i2edge_delete,
@@ -32,7 +33,8 @@ from .common import (
log = logger.get_logger(__name__)


# TODO: Workaround to avoid the SDK crash when ZoneId is not a valid UUID (e.g. Omega)
# TODO: Check if this should be deleted
# Workaround function to avoid the SDK crash when ZoneId is not a valid UUID (e.g. Omega)
def _ensure_valid_uuid(value: str) -> str:
    """
    Return the original value if it's a valid UUID,
@@ -43,10 +45,37 @@ def _ensure_valid_uuid(value: str) -> str:
        return value
    except ValueError:
        generated = str(uuid5(NAMESPACE_DNS, value))
        log.warning(f"Invalid UUID '{value}' – using generated UUIDv5: {generated}")
        log.warning(
            f"[WARNING] Invalid UUID '{value}' – using generated UUIDv5: {generated}"
        )
    return generated


# TODO: Move this to common utils file
def _build_custom_http_response(
    status_code: int,
    content: str | bytes | dict | list,
    headers: dict = None,
    encoding: str = None,
    url: str = None,
    request=None,
) -> Response:
    response = Response()
    response.status_code = status_code
    if isinstance(content, (dict, list)):
        content = json.dumps(content)
    response._content = (
        content.encode(encoding or "utf-8") if isinstance(content, str) else content
    )
    response.headers.update(headers or {})
    response.encoding = encoding or "utf-8"
    if url:
        response.url = url
    if request:
        response.request = request
    return response


class EdgeApplicationManager(EdgeCloudManagementInterface):
    """
    i2Edge Client
@@ -61,33 +90,54 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
    # --------------------------------------------------------------------
    # CAMARA Edge Cloud Management Functions
    # --------------------------------------------------------------------

    # Edge Cloud

    def get_edge_cloud_zones(
        self, region: Optional[str] = None, status: Optional[str] = None
    ) -> list[camara.EdgeCloudZone]:
    ) -> Response:
        url = f"{self.base_url}/zones/list"
        params = {}

        try:
            i2edge_response = i2edge_get(url, params=params).json()
            response = i2edge_get(url, params=params)
            response.raise_for_status()
            i2edge_response = response.json()
            log.info("Availability zones retrieved successfully")
            # Normalise to CAMARA format
            camara_response = []
            for z in i2edge_response:
                edgeCloudZoneId = camara.EdgeCloudZoneId(
                zone = camara_schemas.EdgeCloudZone(
                    # edgeCloudZoneId = camara_schemas.EdgeCloudZoneId(z["zoneId"]),
                    edgeCloudZoneId=camara_schemas.EdgeCloudZoneId(
                        _ensure_valid_uuid(z["zoneId"])
                )
                zone = camara.EdgeCloudZone(
                    edgeCloudZoneId=edgeCloudZoneId,
                    edgeCloudZoneName=camara.EdgeCloudZoneName(z["nodeName"]),
                    edgeCloudProvider=camara.EdgeCloudProvider("i2edge"),
                    edgeCloudRegion=camara.EdgeCloudRegion(z["geographyDetails"]),
                    edgeCloudZoneStatus=camara.EdgeCloudZoneStatus.unknown,
                    ),
                    edgeCloudZoneName=camara_schemas.EdgeCloudZoneName(z["nodeName"]),
                    edgeCloudProvider=camara_schemas.EdgeCloudProvider("i2edge"),
                    edgeCloudRegion=camara_schemas.EdgeCloudRegion(
                        z["geographyDetails"]
                    ),
                    edgeCloudZoneStatus=camara_schemas.EdgeCloudZoneStatus.unknown,
                )
                camara_response.append(zone)
            return camara_response
            # Wrap into a Response object
            return _build_custom_http_response(
                status_code=response.status_code,
                content=[zone.model_dump(mode="json") for zone in camara_response],
                headers={"Content-Type": "application/json"},
                encoding=response.encoding,
                url=response.url,
                request=response.request,
            )
        except KeyError as e:
            log.error(f"Missing required CAMARA field in app manifest: {e}")
            raise ValueError(f"Invalid CAMARA manifest – missing field: {e}")
        except I2EdgeError as e:
            log.error(f"Failed to retrieve edge cloud zones: {e}")
            raise

    # Artefact Management (Not included in CAMARA atm)

    def create_artefact(
        self,
        artefact_id: str,
@@ -99,9 +149,9 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
        token: Optional[str] = None,
        user_name: Optional[str] = None,
    ):
        repo_type = schemas.RepoType(repo_type)
        repo_type = i2edge_schemas.RepoType(repo_type)
        url = "{}/artefact".format(self.base_url)
        payload = schemas.ArtefactOnboarding(
        payload = i2edge_schemas.ArtefactOnboarding(
            artefact_id=artefact_id,
            name=artefact_name,
            repo_password=password,
@@ -145,28 +195,80 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
        except I2EdgeError as e:
            raise e

    def onboard_app(self, app_manifest: Dict) -> Dict:
    # Application

    def onboard_app(self, app_manifest: Dict) -> Response:
        """
        Onboards an application using a CAMARA-compliant manifest.
        Translates the manifest to the i2Edge format and returns a CAMARA-compliant response.

        :param app_manifest: CAMARA-compliant application manifest
        :return: Response with status code, headers, and CAMARA-normalised payload
        """
        try:
            # Validate CAMARA input
            camara_schemas.AppManifest(**app_manifest)

            # Extract relevant fields from CAMARA manifest
            app_id = app_manifest["appId"]
            app_name = app_manifest["name"]
            app_version = app_manifest["version"]
            app_provider = app_manifest["appProvider"]

            # Map CAMARA to i2Edge
            artefact_id = app_id
            app_component_spec = i2edge_schemas.AppComponentSpec(artefactId=artefact_id)
            app_metadata = i2edge_schemas.AppMetaData(
                appName=app_name, appProviderId=app_provider, version=app_version
            )

            app_component_spec = schemas.AppComponentSpec(artefactId=artefact_id)
            data = schemas.ApplicationOnboardingData(
                app_id=app_id, appComponentSpecs=[app_component_spec]
            onboarding_data = i2edge_schemas.ApplicationOnboardingData(
                app_id=app_id,
                appProviderId=app_provider,
                appComponentSpecs=[app_component_spec],
                appMetaData=app_metadata,
            )
            payload = schemas.ApplicationOnboardingRequest(profile_data=data)
            url = "{}/application/onboarding".format(self.base_url)
            response = i2edge_post(url, payload)
            return response

            i2edge_payload = i2edge_schemas.ApplicationOnboardingRequest(
                profile_data=onboarding_data
            )

            # Call i2Edge API
            i2edge_response = i2edge_post(
                f"{self.base_url}/application/onboarding",
                model_payload=i2edge_payload.model_dump(
                    mode="json", exclude_defaults=True
                ),
            )
            i2edge_response.raise_for_status()

            # Build CAMARA-compliant response
            camara_payload = {
                "appId": app_id,
                "message": "Application onboarded successfully",
            }
            log.info("App onboarded successfully")
            return _build_custom_http_response(
                status_code=i2edge_response.status_code,
                content=camara_payload,
                headers={"Content-Type": "application/json"},
                encoding="utf-8",
                url=i2edge_response.url,
                request=i2edge_response.request,
            )

        except ValidationError as e:
            log.error(f"Invalid CAMARA manifest: {e}")
            raise ValueError(f"Invalid CAMARA manifest: {e}")
        except I2EdgeError as e:
            raise e
        except KeyError as e:
            raise I2EdgeError("Missing required field in app_manifest: {}".format(e))
            log.error(f"Failed to onboard app to i2Edge: {e}")
            raise

    def delete_onboarded_app(self, app_id: str) -> None:
        url = "{}/application/onboarding".format(self.base_url)
        try:
            response = i2edge_delete(url, app_id)
            log.info("App onboarded deleted successfully")
            return response
        except I2EdgeError as e:
            raise e
@@ -203,14 +305,14 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
        zone_id = zone_info["edgeCloudZoneId"]
        # TODO: atm the flavour id is specified as an input parameter
        # flavourId = self._select_best_flavour_for_app(zone_id=zone_id)
        app_deploy_data = schemas.AppDeployData(
        app_deploy_data = i2edge_schemas.AppDeployData(
            appId=appId,
            appProviderId=appProviderId,
            appVersion=appVersion,
            zoneInfo=schemas.ZoneInfo(flavourId=self.flavour_id, zoneId=zone_id),
            zoneInfo=i2edge_schemas.ZoneInfo(flavourId=self.flavour_id, zoneId=zone_id),
        )
        url = "{}/app/".format(self.base_url)
        payload = schemas.AppDeploy(app_deploy_data=app_deploy_data)
        payload = i2edge_schemas.AppDeploy(app_deploy_data=app_deploy_data)
        try:
            response = i2edge_post(url, payload)
            log.info("App deployed successfully")
@@ -294,7 +396,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
                        "geographyDetails": item.get("geographyDetails"),
                    }
                    response_list.append(content)
                return self._build_custom_gsma_response(
                return self._build_custom_http_response(
                    status_code=200,
                    content=response_list,
                    headers={"Content-Type": self.content_type_gsma},
@@ -330,7 +432,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
                        ),
                    }
                    response_list.append(content)
                return self._build_custom_gsma_response(
                return self._build_custom_http_response(
                    status_code=200,
                    content=response_list,
                    headers={"Content-Type": self.content_type_gsma},
@@ -353,7 +455,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
            response = i2edge_get(url, params=params)
            if response.status_code == 200:
                content = {"acceptedZoneResourceInfo": response.json()}
                return self._build_custom_gsma_response(
                return self._build_custom_http_response(
                    status_code=200,
                    content=content,
                    headers={"Content-Type": self.content_type_gsma},
@@ -388,7 +490,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
                        "zoneServiceLevelObjsInfo"
                    ),
                }
                return self._build_custom_gsma_response(
                return self._build_custom_http_response(
                    status_code=200,
                    content=content,
                    headers={"Content-Type": self.content_type_gsma},
@@ -423,7 +525,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):

            response = self._create_artefact(**transformed)
            if response.status_code == 201:
                return self._build_custom_gsma_response(
                return self._build_custom_http_response(
                    status_code=200,
                    content={"response": "Artefact uploaded successfully"},
                    headers={"Content-Type": self.content_type_gsma},
@@ -459,7 +561,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
                        "token": response_json.get("repo_token"),
                    },
                }
                return self._build_custom_gsma_response(
                return self._build_custom_http_response(
                    status_code=200,
                    content=content,
                    headers={"Content-Type": self.content_type_gsma},
@@ -475,7 +577,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
        try:
            response = self._delete_artefact(artefact_id)
            if response.status_code == 200:
                return self._build_custom_gsma_response(
                return self._build_custom_http_response(
                    status_code=200,
                    content='{"response": "Artefact deletion successful"}',
                    headers={"Content-Type": self.content_type_gsma},
@@ -497,11 +599,11 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
            body["app_id"] = body.pop("appId")
            body.pop("edgeAppFQDN", None)
            data = body
            payload = schemas.ApplicationOnboardingRequest(profile_data=data)
            payload = i2edge_schemas.ApplicationOnboardingRequest(profile_data=data)
            url = "{}/application/onboarding".format(self.base_url)
            response = i2edge_post(url, payload)
            if response.status_code == 200:
                return self._build_custom_gsma_response(
                return self._build_custom_http_response(
                    status_code=200,
                    content={"response": "Application onboarded successfully"},
                    headers={"Content-Type": self.content_type_gsma},
@@ -527,7 +629,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
                    "appQoSProfile": profile_data.get("appQoSProfile"),
                    "appComponentSpecs": profile_data.get("appComponentSpecs"),
                }
                return self._build_custom_gsma_response(
                return self._build_custom_http_response(
                    status_code=200,
                    content=content,
                    headers={"Content-Type": self.content_type_gsma},
@@ -548,7 +650,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
        try:
            response = self.delete_onboarded_app(app_id)
            if response.status_code == 200:
                return self._build_custom_gsma_response(
                return self._build_custom_http_response(
                    status_code=200,
                    content={"response": "App deletion successful"},
                    headers={"Content-Type": self.content_type_gsma},
@@ -569,13 +671,13 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
        try:
            zone_id = body.get("zoneInfo").get("zoneId")
            flavour_id = body.get("zoneInfo").get("flavourId")
            app_deploy_data = schemas.AppDeployData(
            app_deploy_data = i2edge_schemas.AppDeployData(
                appId=body.get("appId"),
                appProviderId=body.get("appProviderId"),
                appVersion=body.get("appVersion"),
                zoneInfo=schemas.ZoneInfo(flavourId=flavour_id, zoneId=zone_id),
                zoneInfo=i2edge_schemas.ZoneInfo(flavourId=flavour_id, zoneId=zone_id),
            )
            payload = schemas.AppDeploy(
            payload = i2edge_schemas.AppDeploy(
                app_deploy_data=app_deploy_data, app_parameters={"namespace": "test"}
            )
            url = "{}/application_instance".format(self.base_url)
@@ -586,7 +688,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
                    "zoneId": response_json.get("zoneID"),
                    "appInstIdentifier": response_json.get("app_instance_id"),
                }
                return self._build_custom_gsma_response(
                return self._build_custom_http_response(
                    status_code=202,
                    content=content,
                    headers={"Content-Type": self.content_type_gsma},
@@ -617,7 +719,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
                    "appInstanceState": response_json.get("appInstanceState"),
                    "accesspointInfo": response_json.get("accesspointInfo"),
                }
                return self._build_custom_gsma_response(
                return self._build_custom_http_response(
                    status_code=200,
                    content=content,
                    headers={"Content-Type": self.content_type_gsma},
@@ -657,7 +759,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
                        }
                    ]
                    response_list.append(content)
                return self._build_custom_gsma_response(
                return self._build_custom_http_response(
                    status_code=200,
                    content=response_list,
                    headers={"Content-Type": self.content_type_gsma},
@@ -680,7 +782,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
            url = "{}/application_instance".format(self.base_url)
            response = i2edge_delete(url, app_instance_id)
            if response.status_code == 200:
                return self._build_custom_gsma_response(
                return self._build_custom_http_response(
                    status_code=200,
                    content={
                        "response": "Application instance termination request accepted"
@@ -693,29 +795,3 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
            return response
        except KeyError as e:
            raise I2EdgeError(f"Missing appInstanceId in GSMA payload: {e}")

    # GSMA Support methods

    def _build_custom_gsma_response(
        self,
        status_code: int,
        content: str | bytes | dict | list,
        headers: dict = None,
        encoding: str = None,
        url: str = None,
        request=None,
    ) -> Response:
        response = Response()
        response.status_code = status_code
        if isinstance(content, (dict, list)):
            content = json.dumps(content)
        response._content = (
            content.encode(encoding or "utf-8") if isinstance(content, str) else content
        )
        response.headers.update(headers or {})
        response.encoding = encoding or "utf-8"
        if url:
            response.url = url
        if request:
            response.request = request
        return response
+3 −4
Original line number Diff line number Diff line
@@ -63,9 +63,9 @@ class ArtefactOnboarding(BaseModel):

class AppComponentSpec(BaseModel):
    artefactId: str
    componentName: str = Field(default="default_component")
    serviceNameEW: str = Field(default="default_ew_service")
    serviceNameNB: str = Field(default="default_nb_service")
    componentName: Optional[str] = Field(default="default_component")
    serviceNameEW: Optional[str] = Field(default="default_ew_service")
    serviceNameNB: Optional[str] = Field(default="default_nb_service")


class AppMetaData(BaseModel):
@@ -91,7 +91,6 @@ class ApplicationOnboardingData(BaseModel):
    appMetaData: AppMetaData = Field(default_factory=AppMetaData)
    appProviderId: str = Field(default="default_provider")
    appQoSProfile: AppQoSProfile = Field(default_factory=AppQoSProfile)
    appStatusCallbackLink: Optional[str] = None


class ApplicationOnboardingRequest(BaseModel):
+15 −17
Original line number Diff line number Diff line
@@ -10,9 +10,7 @@
from abc import ABC, abstractmethod
from typing import Dict, List, Optional

from .schemas import (
    EdgeCloudZone,
)
from requests import Response


class EdgeCloudManagementInterface(ABC):
@@ -21,7 +19,20 @@ class EdgeCloudManagementInterface(ABC):
    """

    @abstractmethod
    def onboard_app(self, app_manifest: Dict) -> Dict:
    def get_edge_cloud_zones(
        self, region: Optional[str] = None, status: Optional[str] = None
    ) -> Response:
        """
        Retrieves a list of available Edge Cloud Zones.

        :param region: Filter by geographical region.
        :param status: Filter by status (active, inactive, unknown).
        :return: List of Edge Cloud Zones.
        """
        pass

    @abstractmethod
    def onboard_app(self, app_manifest: Dict) -> Response:
        """
        Onboards an app, submitting application metadata
        to the Edge Cloud Provider.
@@ -97,19 +108,6 @@ class EdgeCloudManagementInterface(ABC):
        """
        pass

    @abstractmethod
    def get_edge_cloud_zones(
        self, region: Optional[str] = None, status: Optional[str] = None
    ) -> List[EdgeCloudZone]:
        """
        Retrieves a list of available Edge Cloud Zones.

        :param region: Filter by geographical region.
        :param status: Filter by status (active, inactive, unknown).
        :return: List of Edge Cloud Zones.
        """
        pass

    # --- GSMA-specific methods ---

    # FederationManagement
+6 −6
Original line number Diff line number Diff line
CONFIG = {
    "i2edge": {
        "ZONE_ID": "Omega",
        "ARTEFACT_ID": "i2edgechart-id-2",
        "ARTEFACT_ID": "9c9143f0-f44f-49df-939e-1e8b891ba8f5",
        "ARTEFACT_NAME": "i2edgechart",
        "REPO_NAME": "github-cesar",
        "REPO_TYPE": "PUBLICREPO",
        "REPO_URL": "https://cesarcajas.github.io/helm-charts-examples/",
        "APP_ONBOARD_MANIFEST": {
            "appId": "i2edgechart-id-2",
            "name": "i2edge-app-SDK",
            "appId": "9c9143f0-f44f-49df-939e-1e8b891ba8f5",
            "name": "i2edge_app_SDK",
            "version": "1.0.0",
            "appProvider": "i2CAT",
            "appProvider": "i2CAT_DEV",
            "packageType": "CONTAINER",
            "appRepo": {
                "type": "PUBLICREPO",
@@ -46,7 +46,7 @@ CONFIG = {
                }
            ],
        },
        "APP_ID": "i2edgechart-id-2",
        "APP_ID": "9c9143f0-f44f-49df-939e-1e8b891ba8f5",
        "APP_ZONES": [
            {
                "kubernetesClusterRef": "not-used",
@@ -124,7 +124,7 @@ CONFIG = {
    "kubernetes": {
        "K8S_ONBOARDED_APP_NAME": "nginx",
        "K8S_APP_ID": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
        "ZONE_ID": "999b7746-d2e2-4bb4-96e6-f1e895adef0c",
        "ZONE_ID": "b2a1b33d-f382-47de-b555-2d32155eb74c",
        "K8S_DEPLOY_PAYLOAD": {
            "appId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
            "name": "nginx-test",
+53 −66

File changed.

Preview size limit exceeded, changes collapsed.