Commit 65fb3e86 authored by Cesar Cajas's avatar Cesar Cajas
Browse files

feature/add-edgecloud-gsma-i2edge: add gsma methods

parent 9152d5d0
Loading
Loading
Loading
Loading
+161 −4
Original line number Diff line number Diff line
@@ -6,8 +6,11 @@
#   - Adrián Pino Martínez (adrian.pino@i2cat.net)
#   - Sergio Giménez (sergio.gimenez@i2cat.net)
##
from copy import deepcopy
from typing import Dict, List, Optional

from requests import Response

from sunrise6g_opensdk import logger
from sunrise6g_opensdk.edgecloud.core.edgecloud_interface import (
    EdgeCloudManagementInterface,
@@ -82,8 +85,9 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
            repo_user_name=user_name,
        )
        try:
            i2edge_post_multiform_data(url, payload)
            response = i2edge_post_multiform_data(url, payload)
            log.info("Artifact added successfully")
            return response
        except I2EdgeError as e:
            raise e

@@ -108,8 +112,9 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
    def _delete_artefact(self, artefact_id: str):
        url = "{}/artefact".format(self.base_url)
        try:
            i2edge_delete(url, artefact_id)
            response = i2edge_delete(url, artefact_id)
            log.info("Artifact deleted successfully")
            return response
        except I2EdgeError as e:
            raise e

@@ -124,7 +129,8 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
            )
            payload = schemas.ApplicationOnboardingRequest(profile_data=data)
            url = "{}/application/onboarding".format(self.base_url)
            i2edge_post(url, payload)
            response = i2edge_post(url, payload)
            return response
        except I2EdgeError as e:
            raise e
        except KeyError as e:
@@ -133,7 +139,8 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
    def delete_onboarded_app(self, app_id: str) -> None:
        url = "{}/application/onboarding".format(self.base_url)
        try:
            i2edge_delete(url, app_id)
            response = i2edge_delete(url, app_id)
            return response
        except I2EdgeError as e:
            raise e

@@ -238,3 +245,153 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
            log.info("App instance deleted successfully")
        except I2EdgeError as e:
            raise e

    # GSMA FM

    # AvailabilityZoneInfoSynchronization

    def get_edge_cloud_zone_details_gsma(
        self, federation_context_id: str, zone_id: str
    ) -> Dict:
        url = "{}/zone/{}".format(self.base_url, zone_id)
        params = {}
        try:
            response = i2edge_get(url, params=params)
            log.info("Zone metadata retrieved successfully")
            return response
        except I2EdgeError as e:
            raise e

    # ArtefactManagement

    def create_artefact_gsma(
        self, federation_context_id: str, request_body: Dict
    ) -> Dict:
        try:
            artefact_id = request_body["artefactId"]
            artefact_name = request_body["artefactName"]
            repo_data = request_body["artefactRepoLocation"]

            transformed = {
                "artefact_id": artefact_id,
                "artefact_name": artefact_name,
                "repo_name": repo_data.get("repoName", "unknown-repo"),
                "repo_type": request_body.get("repoType", "PUBLICREPO"),
                "repo_url": repo_data["repoURL"],
                "user_name": repo_data.get("userName"),
                "password": repo_data.get("password"),
                "token": repo_data.get("token"),
            }

            response = self._create_artefact(**transformed)
            # if response.status_code == 201:
            #     gsma_format_response = Response()
            #     gsma_format_response.status_code = 200
            #     gsma_format_response._content = b'{"response": "Artefact uploaded successfully"}'
            #     gsma_format_response.headers["Content-Type"] = "application/json"
            #     gsma_format_response.encoding = "utf-8"
            #     gsma_format_response.url = response.url
            #     gsma_format_response.request = response.request
            #     return gsma_format_response
            return response
        except KeyError as e:
            raise I2EdgeError(f"Missing required field in GSMA artefact payload: {e}")

    def get_artefact_gsma(self, federation_context_id: str, artefact_id: str) -> Dict:
        try:
            response = self._get_artefact(artefact_id)
            return response
        except KeyError as e:
            raise I2EdgeError(f"Missing artefactId in GSMA payload: {e}")

    def delete_artefact_gsma(self, federation_context_id: str, artefact_id: str):
        try:
            response = self._delete_artefact(artefact_id)
            return response
        except KeyError as e:
            raise I2EdgeError(f"Missing artefactId in GSMA payload: {e}")

    # ApplicationOnboardingManagement

    def onboard_app_gsma(
        self, federation_context_id: str, request_body: dict
    ) -> Response:
        body = deepcopy(request_body)
        try:
            body["app_id"] = body.pop("appId")
            body.pop("edgeAppFQDN", None)
            data = body
            payload = schemas.ApplicationOnboardingRequest(profile_data=data)
            url = "{}/application/onboarding".format(self.base_url)
            response = i2edge_post(url, payload)
            return response
        except KeyError as e:
            raise I2EdgeError(f"Missing required field in GSMA onboarding payload: {e}")

    def get_onboarded_app_gsma(self, federation_context_id: str, app_id: str) -> Dict:
        try:
            response = self.get_onboarded_app(app_id)
            return response
        except KeyError as e:
            raise I2EdgeError(f"Missing appId in GSMA payload: {e}")

    def delete_onboarded_app_gsma(self, federation_context_id: str, app_id: str):
        try:
            response = self.delete_onboarded_app(app_id)
            return response
        except KeyError as e:
            raise I2EdgeError(f"Missing appId in GSMA payload: {e}")

    # ApplicationDeploymentManagement

    def deploy_app_gsma(
        self, federation_context_id: str, idempotency_key: str, request_body: dict
    ):
        body = deepcopy(request_body)
        try:
            zone_id = body.get("zoneInfo").get("zoneId")
            app_deploy_data = schemas.AppDeployData(
                appId=body.get("appId"),
                appProviderId=body.get("appProviderId"),
                appVersion=body.get("appVersion"),
                zoneInfo=schemas.ZoneInfo(flavourId=self.flavour_id, zoneId=zone_id),
            )
            payload = schemas.AppDeploy(
                app_deploy_data=app_deploy_data, app_parameters={"namespace": "test"}
            )
            print(payload)
            url = "{}/application_instance".format(self.base_url)
            response = i2edge_post(url, payload)
            return response
        except KeyError as e:
            raise I2EdgeError(f"Missing required field in GSMA deployment payload: {e}")

    def get_deployed_app_gsma(
        self,
        federation_context_id: str,
        app_id: str,
        app_instance_id: str,
        zone_id: str,
    ):
        try:
            url = "{}/application_instance/{}/{}".format(
                self.base_url, zone_id, app_instance_id
            )
            response = i2edge_get(url)
            return response
        except KeyError as e:
            raise I2EdgeError(f"Missing appId or zoneId in GSMA payload: {e}")

    def undeploy_app_gsma(
        self,
        federation_context_id: str,
        app_id: str,
        app_instance_id: str,
        zone_id: str,
    ):
        try:
            url = "{}/application_instance".format(self.base_url)
            response = i2edge_delete(url, app_instance_id)
            return response
        except KeyError as e:
            raise I2EdgeError(f"Missing appInstanceId in GSMA payload: {e}")
+4 −4
Original line number Diff line number Diff line
@@ -45,7 +45,7 @@ def i2edge_post(url: str, model_payload: BaseModel) -> dict:
    try:
        response = requests.post(url, data=json_payload, headers=headers)
        response.raise_for_status()
        return response.json()
        return response
    except requests.exceptions.HTTPError as e:
        i2edge_err_msg = get_error_message_from(response)
        err_msg = "Failed to deploy app: {}. Detail: {}".format(i2edge_err_msg, e)
@@ -62,7 +62,7 @@ def i2edge_post_multiform_data(url: str, model_payload: BaseModel) -> dict:
    try:
        response = requests.post(url, data=payload_in_str, headers=headers)
        response.raise_for_status()
        return response.json()
        return response
    except requests.exceptions.HTTPError as e:
        i2edge_err_msg = get_error_message_from(response)
        err_msg = "Failed to deploy app: {}. Detail: {}".format(i2edge_err_msg, e)
@@ -76,7 +76,7 @@ def i2edge_delete(url: str, id: str) -> dict:
        query = "{}/{}".format(url, id)
        response = requests.delete(query, headers=headers)
        response.raise_for_status()
        return response.json()
        return response
    except requests.exceptions.HTTPError as e:
        i2edge_err_msg = get_error_message_from(response)
        err_msg = "Failed to undeploy app: {}. Detail: {}".format(i2edge_err_msg, e)
@@ -89,7 +89,7 @@ def i2edge_get(url: str, params: Optional[dict]):
    try:
        response = requests.get(url, params=params, headers=headers)
        response.raise_for_status()
        return response.json()
        return response
    except requests.exceptions.HTTPError as e:
        i2edge_err_msg = get_error_message_from(response)
        err_msg = "Failed to get apps: {}. Detail: {}".format(i2edge_err_msg, e)
+149 −0
Original line number Diff line number Diff line
@@ -118,3 +118,152 @@ class EdgeCloudManagementInterface(ABC):
        :return: Dictionary with Edge Cloud Zone details.
        """
        pass

    # --- GSMA-specific methods ---

    # AvailabilityZoneInfoSynchronization

    @abstractmethod
    def get_edge_cloud_zone_details_gsma(
        self, federation_context_id: str, zone_id: str
    ) -> Dict:
        """
        Retrieves details of a specific Edge Cloud Zone reserved
        for the specified zone by the partner OP.

        :param federation_context_id: Identifier of the federation context.
        :param zone_id: Unique identifier of the Edge Cloud Zone.
        :return: Dictionary with Edge Cloud Zone details.
        """
        pass

    # ArtefactManagement

    @abstractmethod
    def create_artefact_gsma(
        self, federation_context_id: str, request_body: dict
    ) -> Dict:
        """
        Uploads application artefact on partner OP. Artefact is a zip file
        containing scripts and/or packaging files like Terraform or Helm
        which are required to create an instance of an application

        :param federation_context_id: Identifier of the federation context.
        :param request_body: Payload with artefact information.
        :return: Details with artefact deployment info.
        """
        pass

    @abstractmethod
    def get_artefact_gsma(self, federation_context_id: str, artefact_id: str) -> Dict:
        """
        Retrieves details about an artefact

        :param federation_context_id: Identifier of the federation context.
        :param artefact_id: Unique identifier of the artefact.
        :return: Dictionary with artefact details.
        """
        pass

    @abstractmethod
    def delete_artefact_gsma(
        self, federation_context_id: str, artefact_id: str
    ) -> Dict:
        """
        Removes an artefact from partners OP.

        :param federation_context_id: Identifier of the federation context.
        :param artefact_id: Unique identifier of the artefact.
        :return: Dictionary with artefact deletion details.
        """
        pass

    # ApplicationOnboardingManagement

    @abstractmethod
    def onboard_app_gsma(self, federation_context_id: str, request_body: dict) -> Dict:
        """
        Submits an application details to a partner OP.
        Based on the details provided, partner OP shall do bookkeeping,
        resource validation and other pre-deployment operations.

        :param federation_context_id: Identifier of the federation context.
        :param request_body: Payload with onboarding info.
        :return: Dictionary with onboarding details.
        """
        pass

    @abstractmethod
    def get_onboarded_app_gsma(self, federation_context_id: str, app_id: str) -> Dict:
        """
        Retrieves application details from partner OP

        :param federation_context_id: Identifier of the federation context.
        :param app_id: Identifier of the application onboarded.
        :return:
        """
        pass

    @abstractmethod
    def delete_onboarded_app_gsma(self, federation_context_id: str, app_id: str):
        """
        Deboards an application from specific partner OP zones

        :param federation_context_id: Identifier of the federation context.
        :param app_id: Identifier of the application onboarded.
        :return:
        """
        pass

    # ApplicationDeploymentManagement

    @abstractmethod
    def deploy_app_gsma(
        self, federation_context_id: str, idempotency_key: str, request_body: dict
    ):
        """
        Instantiates an application on a partner OP zone.

        :param federation_context_id: Identifier of the federation context.
        :param idempotency_key: Idempotency key.
        :return:
        """
        pass

    @abstractmethod
    def get_deployed_app_gsma(
        self,
        federation_context_id: str,
        app_id: str,
        app_instance_id: str,
        zone_id: str,
    ):
        """
        Retrieves an application instance details from partner OP.

        :param federation_context_id: Identifier of the federation context.
        :param app_id: Identifier of the app.
        :param app_instance_id: Identifier of the deployed app.
        :param zone_id: Identifier of the zone
        :return:
        """
        pass

    @abstractmethod
    def undeploy_app_gsma(
        self,
        federation_context_id: str,
        app_id: str,
        app_instance_id: str,
        zone_id: str,
    ):
        """
        Terminate an application instance on a partner OP zone.

        :param federation_context_id: Identifier of the federation context.
        :param app_id: Identifier of the app.
        :param app_instance_id: Identifier of the deployed app.
        :param zone_id: Identifier of the zone
        :return:
        """
        pass