Commit c29be8d0 authored by Sergio Gimenez's avatar Sergio Gimenez
Browse files

No need tests, at least for now

parent 89226390
Loading
Loading
Loading
Loading

tests/__init__.py

deleted100644 → 0
+0 −0

Empty file deleted.

tests/i2edge/__init__.py

deleted100644 → 0
+0 −7
Original line number Diff line number Diff line
# Copyright 2021-2023 Fundació i2CAT
# 
# This file is part of I2EDGE.
# I2EDGE is free software: you can redistribute it and/or modify it under the terms of the Affero GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
# I2EDGE is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Affero GNU General Public License for more details.
# You should have received a copy of the Affero GNU General Public License along with I2EDGE. If not, see <https://www.gnu.org/licenses/>.
import os, sys; sys.path.append(os.path.dirname(os.path.realpath(__file__)))
 No newline at end of file

tests/i2edge/client.py

deleted100644 → 0
+0 −171
Original line number Diff line number Diff line
from typing import Optional

from edgecloud import logger

from . import schemas
from .common import I2EdgeError, i2edge_delete, i2edge_get, i2edge_post, i2edge_post_multiform_data

log = logger.get_logger(__name__)


class I2EdgeClient:

    def __init__(
        self,
        i2edge_base_url: str,
        requests_max_retries: int,
        requests_retry_time_interval: int,
    ):
        self.requests_max_retries = requests_max_retries
        self.requests_retry_time_interval = requests_retry_time_interval
        self.base_url = i2edge_base_url
        log.info("I2Edge client created successfully pointing to: {}".format(i2edge_base_url))

    def deploy_app(
        self,
        appId: str,
        zoneId: str,
        flavourId: str,
        namespace: str,
        appProviderId: Optional[str] = None,
        appVersion: Optional[str] = None,
    ):
        app_provider_id = "" if appProviderId is None else appProviderId
        app_version = "" if appVersion is None else appVersion
        app_deploy_data = schemas.AppDeployData(
            appId=appId,
            appProviderId=app_provider_id,
            appVersion=app_version,
            zoneInfo=schemas.ZoneInfo(flavourId=flavourId, zoneId=zoneId),
        )
        app_parameters = schemas.AppParameters(namespace=namespace)
        url = "{}/app/".format(self.base_url)
        payload = schemas.AppDeploy(app_deploy_data=app_deploy_data, app_parameters=app_parameters)
        try:
            i2edge_post(url, payload)  # TODO Think if we care about the response. So far we don't
            log.info("App deployed successfully")
            return namespace
        except I2EdgeError as e:
            raise e

    def undeploy_app(self, app_name):
        url = "{}/app".format(self.base_url)
        try:
            i2edge_delete(url, app_name)
            log.info("App undeployed successfully")
        except I2EdgeError as e:
            raise e

    def create_artefact(
        self,
        artefact_id: str,
        artefact_name: str,
        repo_name: str,
        repo_type: str,
        repo_url: str,
        password: Optional[str] = None,
        token: Optional[str] = None,
        user_name: Optional[str] = None,
    ):
        repo_type = schemas.RepoType(repo_type)
        url = "{}/artefact".format(self.base_url)
        payload = schemas.ArtefactOnboarding(
            artefact_id=artefact_id,
            name=artefact_name,
            # chart=None,  # XXX AFAIK not supported by CAMARA.
            repo_password=password,
            repo_name=repo_name,
            repo_type=repo_type,
            repo_url=repo_url,
            repo_token=token,
            repo_user_name=user_name,
        )
        try:
            i2edge_post_multiform_data(url, payload)
            log.info("Artefact created successfully")
        except I2EdgeError as e:
            raise e

    def delete_artefact(self, artefact_id: str):
        url = "{}/artefact".format(self.base_url)
        try:
            i2edge_delete(url, artefact_id)
            log.info("Artefact deleted successfully")
        except I2EdgeError as e:
            raise e

    def onboard_app(self, app_id: str, artefact_id: str):
        app_component_spec = schemas.AppComponentSpec(artefactId=artefact_id)
        # TODO Why passing a list of app_component_specs? AFAIK i2edge only accepts one artifact at time.
        data = schemas.ApplicationOnboardingData(
            app_id=app_id, appComponentSpecs=[app_component_spec]
        )
        payload = schemas.ApplicationOnboardingRequest(profile_data=data)
        url = "{}/application/onboarding".format(self.base_url)
        try:
            i2edge_post(url, payload)
            log.info("App onboarded successfully")
        except I2EdgeError as e:
            raise e

    def delete_onboarded_app(self, app_id: str):
        url = "{}/application/onboarding".format(self.base_url)
        try:
            i2edge_delete(url, app_id)
            log.info("App deleted successfully")
        except I2EdgeError as e:
            raise e

    def create_flavour(
        self,
        zone_id: str,
        memory_size: str,
        num_cpu: int,
        num_gpus: Optional[int] = None,
    ) -> str:
        url = "{}/zone/{}/flavour".format(self.base_url, zone_id)
        gpu_list = [schemas.GPU(numGPU=num_gpus)] if num_gpus is not None else None
        flavour_data = schemas.Flavour(
            flavour_supported=schemas.FlavourSupported(
                memorySize=memory_size, numCPU=num_cpu, gpu=gpu_list
            )
        )
        try:
            response = i2edge_post(url, flavour_data)
            log.info("Flavour created successfully")
            id_value = response["response"].split("id ")[1].split(" ")[0]
            return id_value
        except I2EdgeError as e:
            raise e

    def delete_flavour(self, flavour_id: str):
        url = "{}/flavour".format(self.base_url)
        try:
            i2edge_delete(url, flavour_id)
            log.info("Flavour deleted successfully")
        except I2EdgeError as e:
            raise e

    def get_all_deployed_apps(self) -> list[dict]:
        url = "{}/app".format(self.base_url)
        try:
            response = i2edge_get(url, params=None)
            return response
        except I2EdgeError as e:
            raise e

    def get_deployed_app(self, zone_id, app_name) -> dict:
        url = "{}/app/{}/{}".format(self.base_url, zone_id, app_name)
        try:
            response = i2edge_get(url, params=None)
            return response
        except I2EdgeError as e:
            raise e

    def get_zones_list(self) -> list[dict]:
        url = "{}/zones/list".format(self.base_url)
        try:
            response = i2edge_get(url, params=None)
            return response
        except I2EdgeError as e:
            raise e

tests/i2edge/common.py

deleted100644 → 0
+0 −85
Original line number Diff line number Diff line
import json
from typing import Optional

import requests
from pydantic import BaseModel

from edgecloud import logger

log = logger.get_logger(__name__)


class I2EdgeError(Exception):
    pass


class I2EdgeErrorResponse(BaseModel):
    message: str
    detail: dict


def get_error_message_from(response: requests.Response) -> str:
    try:
        error_response = I2EdgeErrorResponse(**response.json())
        return error_response.message
    except Exception as e:
        log.error("Failed to parse error response from i2edge: {}".format(e))
        return response.text


def i2edge_post(url: str, model_payload: BaseModel) -> dict:
    headers = {"Content-Type": "application/json", "accept": "application/json"}
    json_payload = json.dumps(model_payload.model_dump(mode="json"))
    try:
        response = requests.post(url, data=json_payload, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        i2edge_err_msg = get_error_message_from(response)
        err_msg = "Failed to deploy app: {}. Detail: {}".format(i2edge_err_msg, e)
        log.error(err_msg)
        raise I2EdgeError(err_msg)


def i2edge_post_multiform_data(url: str, model_payload: BaseModel) -> dict:
    headers = {
        "accept": "application/json",
    }
    payload_dict = model_payload.model_dump(mode="json")
    payload_in_str = {k: str(v) for k, v in payload_dict.items()}
    try:
        response = requests.post(url, data=payload_in_str, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        i2edge_err_msg = get_error_message_from(response)
        err_msg = "Failed to deploy app: {}. Detail: {}".format(i2edge_err_msg, e)
        log.error(err_msg)
        raise I2EdgeError(err_msg)


def i2edge_delete(url: str, id: str) -> dict:
    headers = {"accept": "application/json"}
    try:
        query = "{}/{}".format(url, id)
        response = requests.delete(query, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        i2edge_err_msg = get_error_message_from(response)
        err_msg = "Failed to undeploy app: {}. Detail: {}".format(i2edge_err_msg, e)
        log.error(err_msg)
        raise I2EdgeError(err_msg)


def i2edge_get(url: str, params: Optional[dict]):
    headers = {"accept": "application/json"}
    try:
        response = requests.get(url, params=params, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        i2edge_err_msg = get_error_message_from(response)
        err_msg = "Failed to get apps: {}. Detail: {}".format(i2edge_err_msg, e)
        log.warning(err_msg)
        raise I2EdgeError(err_msg)

tests/i2edge/schemas.py

deleted100644 → 0
+0 −153
Original line number Diff line number Diff line
from enum import Enum
from typing import List, Optional

from pydantic import BaseModel, ConfigDict, Field, field_validator


class ZoneInfo(BaseModel):
    flavourId: str
    zoneId: str


class AppParameters(BaseModel):
    namespace: Optional[str] = None


class AppDeployData(BaseModel):
    appId: str
    appProviderId: str
    appVersion: str
    zoneInfo: ZoneInfo


class AppDeploy(BaseModel):
    app_deploy_data: AppDeployData
    app_parameters: Optional[AppParameters] = None


# Artefact


class RepoType(str, Enum):
    UPLOAD = "UPLOAD"
    PUBLICREPO = "PUBLICREPO"
    PRIVATEREPO = "PRIVATEREPO"


class ArtefactOnboarding(BaseModel):
    artefact_id: str
    name: str
    # chart: Optional[bytes] = Field(default=None) # XXX AFAIK not supported by CAMARA.
    repo_password: Optional[str] = None
    repo_name: Optional[str] = None
    repo_type: RepoType
    repo_url: Optional[str] = None
    repo_token: Optional[str] = None
    repo_user_name: Optional[str] = None
    model_config = ConfigDict(use_enum_values=True)


# Application Onboarding

# XXX Leaving default values since i2edge only cares about appid and artifactid, at least for now.


class AppComponentSpec(BaseModel):
    artefactId: str
    componentName: str = Field(default="default_component")
    serviceNameEW: str = Field(default="default_ew_service")
    serviceNameNB: str = Field(default="default_nb_service")


class AppMetaData(BaseModel):
    appDescription: str = Field(default="Default app description")
    appName: str = Field(default="Default App")
    category: str = Field(default="DEFAULT")
    mobilitySupport: bool = Field(default=False)
    version: str = Field(default="1.0")


class AppQoSProfile(BaseModel):
    appProvisioning: bool = Field(default=True)
    bandwidthRequired: int = Field(default=1)
    latencyConstraints: str = Field(default="NONE")
    multiUserClients: str = Field(default="APP_TYPE_SINGLE_USER")
    noOfUsersPerAppInst: int = Field(default=1)


class ApplicationOnboardingData(BaseModel):
    appComponentSpecs: List[AppComponentSpec]
    appDeploymentZones: List[str] = Field(default=["default_zone"])
    app_id: str
    appMetaData: AppMetaData = Field(default_factory=AppMetaData)
    appProviderId: str = Field(default="default_provider")
    appQoSProfile: AppQoSProfile = Field(default_factory=AppQoSProfile)
    appStatusCallbackLink: Optional[str] = None


class ApplicationOnboardingRequest(BaseModel):
    profile_data: ApplicationOnboardingData


# Flavour


class GPU(BaseModel):
    gpuMemory: int = Field(default=0, description="GPU memory in MB")
    gpuModeName: str = Field(default="", description="GPU mode name")
    gpuVendorType: str = Field(default="GPU_PROVIDER_NVIDIA", description="GPU vendor type")
    numGPU: int = Field(..., description="Number of GPUs")


class Hugepages(BaseModel):
    number: int = Field(default=0, description="Number of hugepages")
    pageSize: str = Field(default="2MB", description="Size of hugepages")


class SupportedOSTypes(BaseModel):
    architecture: str = Field(default="x86_64", description="OS architecture")
    distribution: str = Field(default="RHEL", description="OS distribution")
    license: str = Field(default="OS_LICENSE_TYPE_FREE", description="OS license type")
    version: str = Field(default="OS_VERSION_UBUNTU_2204_LTS", description="OS version")


class FlavourSupported(BaseModel):
    cpuArchType: str = Field(default="ISA_X86", description="CPU architecture type")
    cpuExclusivity: bool = Field(default=True, description="CPU exclusivity")
    fpga: int = Field(default=0, description="Number of FPGAs")
    gpu: Optional[List[GPU]] = Field(default=None, description="List of GPUs")
    hugepages: List[Hugepages] = Field(
        default_factory=lambda: [Hugepages()], description="List of hugepages"
    )
    memorySize: str = Field(..., description="Memory size (e.g., '1024MB' or '2GB')")
    numCPU: int = Field(..., description="Number of CPUs")
    storageSize: int = Field(default=0, description="Storage size in GB")
    supportedOSTypes: List[SupportedOSTypes] = Field(
        default_factory=lambda: [SupportedOSTypes()],
        description="List of supported OS types",
    )
    vpu: int = Field(default=0, description="Number of VPUs")

    @field_validator("memorySize")
    @classmethod
    def validate_memory_size(cls, v):
        if not (v.endswith("MB") or v.endswith("GB")):
            raise ValueError("memorySize must end with MB or GB")
        try:
            int(v[:-2])
        except ValueError:
            raise ValueError("memorySize must be a number followed by MB or GB")
        return v


class Flavour(BaseModel):
    flavour_supported: FlavourSupported


# EdgeCloud Zones


class Zone(BaseModel):
    geographyDetails: str
    geolocation: str
    zoneId: str
Loading