Commit 725be77e authored by Cesar Cajas's avatar Cesar Cajas
Browse files

feature/add-gsma-schemas-to-edgecloud-adapters: add FM and Availavility schemas

parent e62ef805
Loading
Loading
Loading
Loading
+23 −33
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@ from pydantic import ValidationError
from requests import Response

from sunrise6g_opensdk import logger
from sunrise6g_opensdk.edgecloud.core import gsma_schemas
from sunrise6g_opensdk.edgecloud.core import schemas as camara_schemas
from sunrise6g_opensdk.edgecloud.core.edgecloud_interface import (
    EdgeCloudManagementInterface,
@@ -29,6 +30,7 @@ from .common import (
    i2edge_post,
    i2edge_post_multiform_data,
)
from .gsma_utils import map_zone

log = logger.get_logger(__name__)

@@ -730,23 +732,20 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):

        :return: Response with zone details in GSMA format.
        """
        url = "{}/zones/list".format(self.base_url)
        url = f"{self.base_url}/zones/list"
        params = {}
        try:
            response = i2edge_get(url, params=params)
            if response.status_code == 200:
                response_json = response.json()
                response_list = []
                for item in response_json:
                    content = {
                        "zoneId": item.get("zoneId"),
                        "geolocation": item.get("geolocation"),
                        "geographyDetails": item.get("geographyDetails"),
                    }
                    response_list.append(content)
                try:
                    validated_data = gsma_schemas.ZonesList.model_validate(response_json)
                except ValidationError as e:
                    raise ValueError(f"Response from /zones/list is not a valid schema: {e}")

                return build_custom_http_response(
                    status_code=200,
                    content=response_list,
                    content=[zone.model_dump() for zone in validated_data.root],
                    headers={"Content-Type": self.content_type_gsma},
                    encoding=self.encoding_gsma,
                    url=response.url,
@@ -762,26 +761,20 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):

        :return: Response with zones and detailed resource information.
        """
        url = "{}/zones".format(self.base_url)
        url = f"{self.base_url}/zones"
        params = {}
        try:
            response = i2edge_get(url, params=params)
            if response.status_code == 200:
                response_json = response.json()
                response_list = []
                for item in response_json:
                    content = {
                        "zoneId": item.get("zoneId"),
                        "reservedComputeResources": item.get("reservedComputeResources"),
                        "computeResourceQuotaLimits": item.get("computeResourceQuotaLimits"),
                        "flavoursSupported": item.get("flavoursSupported"),
                        "networkResources": item.get("networkResources"),
                        "zoneServiceLevelObjsInfo": item.get("zoneServiceLevelObjsInfo"),
                    }
                    response_list.append(content)
                mapped = [map_zone(zone) for zone in response_json]
                try:
                    validated_data = gsma_schemas.ZoneRegisteredDataList.model_validate(mapped)
                except ValidationError as e:
                    raise ValueError(f"Invalid response schema from /zones: {e}")
                return build_custom_http_response(
                    status_code=200,
                    content=response_list,
                    content=validated_data.model_dump(),
                    headers={"Content-Type": self.content_type_gsma},
                    encoding=self.encoding_gsma,
                    url=response.url,
@@ -799,23 +792,20 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
        :param zone_id: Unique identifier of the Edge Cloud Zone.
        :return: Response with Edge Cloud Zone details.
        """
        url = "{}/zone/{}".format(self.base_url, zone_id)
        url = f"{self.base_url}/zone/{zone_id}"
        params = {}
        try:
            response = i2edge_get(url, params=params)
            if response.status_code == 200:
                response_json = response.json()
                content = {
                    "zoneId": response_json.get("zoneId"),
                    "reservedComputeResources": response_json.get("reservedComputeResources"),
                    "computeResourceQuotaLimits": response_json.get("computeResourceQuotaLimits"),
                    "flavoursSupported": response_json.get("flavoursSupported"),
                    "networkResources": response_json.get("networkResources"),
                    "zoneServiceLevelObjsInfo": response_json.get("zoneServiceLevelObjsInfo"),
                }
                mapped = map_zone(response_json)
                try:
                    validated_data = gsma_schemas.ZoneRegisteredData.model_validate(mapped)
                except ValidationError as e:
                    raise ValueError(f"Invalid response schema from /zones/{zone_id}: {e}")
                return build_custom_http_response(
                    status_code=200,
                    content=content,
                    content=validated_data.model_dump(),
                    headers={"Content-Type": self.content_type_gsma},
                    encoding=self.encoding_gsma,
                    url=response.url,
+160 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
##
# This file is part of the Open SDK
#
# Contributors:
#   - César Cajas (cesar.cajas@i2cat.net)
##


def map_hugepage(raw_hp: dict) -> dict:
    # Map from {'number': int, 'pageSize': str} to {'count': int, 'size': str}
    return {
        "pageSize": raw_hp.get("pageSize", ""),
        "number": raw_hp.get("number", ""),
    }


def map_compute_resource(raw_cr: dict) -> dict:
    # Map numCPU dict -> int, hugepages list, gpu list, vpu/fpga int to optional int or list
    # Cast cpuExclusivity to bool
    hugepages_raw = raw_cr.get("hugepages") or []
    hugepages = [map_hugepage(hp) for hp in hugepages_raw]

    # numCPU viene {'whole': {'value': int}}
    num_cpu_raw = raw_cr.get("numCPU")
    if isinstance(num_cpu_raw, dict):
        num_cpu = num_cpu_raw.get("whole", {}).get("value", 0)
    else:
        num_cpu = num_cpu_raw if isinstance(num_cpu_raw, int) else 0

    gpu = raw_cr.get("gpu") or None
    vpu = raw_cr.get("vpu")
    if isinstance(vpu, int) and vpu == 0:
        vpu = None

    fpga = raw_cr.get("fpga")
    if isinstance(fpga, int) and fpga == 0:
        fpga = None

    # cpuExclusivity
    cpu_exclusivity = raw_cr.get("cpuExclusivity")
    if isinstance(cpu_exclusivity, int):
        cpu_exclusivity = bool(cpu_exclusivity)

    # dict GSMA
    return {
        "cpuArchType": raw_cr.get("cpuArchType"),
        "numCPU": num_cpu,
        "memory": raw_cr.get("memory"),
        "diskStorage": raw_cr.get("diskStorage"),
        "gpu": gpu if gpu else None,
        "vpu": vpu,
        "fpga": fpga,
        "hugepages": hugepages if hugepages else None,
        "cpuExclusivity": cpu_exclusivity,
    }


def map_ostype(raw_os: dict) -> dict:
    # Simple passthrough
    return {
        "architecture": raw_os.get("architecture"),
        "distribution": raw_os.get("distribution"),
        "version": raw_os.get("version"),
        "license": raw_os.get("license"),
    }


def map_flavour(raw_flavour: dict) -> dict:
    fpga = raw_flavour.get("fpga")
    if isinstance(fpga, int):
        fpga = None if fpga == 0 else [str(fpga)]

    vpu = raw_flavour.get("vpu")
    if isinstance(vpu, int):
        vpu = None if vpu == 0 else [str(vpu)]

    cpu_exclusivity = raw_flavour.get("cpuExclusivity")
    if not isinstance(cpu_exclusivity, list):
        cpu_exclusivity = None

    # Map supportedOSTypes
    supported_os = raw_flavour.get("supportedOSTypes", [])
    supported_ostypes = [map_ostype(os) for os in supported_os]

    return {
        "flavourId": raw_flavour.get("flavourId"),
        "cpuArchType": raw_flavour.get("cpuArchType"),
        "supportedOSTypes": supported_ostypes,
        "numCPU": raw_flavour.get("numCPU"),
        "memorySize": raw_flavour.get("memorySize"),
        "storageSize": raw_flavour.get("storageSize"),
        "gpu": raw_flavour.get("gpu") or None,
        "fpga": fpga,
        "vpu": vpu,
        "hugepages": raw_flavour.get("hugepages") or None,
        "cpuExclusivity": cpu_exclusivity,
    }


def map_network_resources(raw_net: dict) -> dict:
    if not raw_net:
        return None
    return {
        "egressBandWidth": raw_net.get("egressBandWidth", 0),
        "dedicatedNIC": raw_net.get("dedicatedNIC", 0),
        "supportSriov": bool(raw_net.get("supportSriov")),
        "supportDPDK": bool(raw_net.get("supportDPDK")),
    }


def map_zone_service_level(raw_sli: dict) -> dict:
    if not raw_sli:
        return None
    return {
        "latencyRanges": {
            "minLatency": raw_sli.get("latencyRanges", {}).get("minLatency", 1),
            "maxLatency": raw_sli.get("latencyRanges", {}).get("maxLatency", 1),
        },
        "jitterRanges": {
            "minJitter": raw_sli.get("jitterRanges", {}).get("minJitter", 1),
            "maxJitter": raw_sli.get("jitterRanges", {}).get("maxJitter", 1),
        },
        "throughputRanges": {
            "minThroughput": raw_sli.get("throughputRanges", {}).get("minThroughput", 1),
            "maxThroughput": raw_sli.get("throughputRanges", {}).get("maxThroughput", 1),
        },
    }


def map_zone(raw_zone: dict) -> dict:
    reserved_compute = raw_zone.get("reservedComputeResources")
    if not reserved_compute or len(reserved_compute) == 0:
        reserved_compute = [
            {
                "cpuArchType": "ISA_X86_64",
                "numCPU": 0,
                "memory": 0,
                "diskStorage": 0,
                "gpu": None,
                "vpu": None,
                "fpga": None,
                "hugepages": None,
                "cpuExclusivity": False,
            }
        ]

    return {
        "zoneId": raw_zone.get("zoneId"),
        "reservedComputeResources": [map_compute_resource(cr) for cr in reserved_compute],
        "computeResourceQuotaLimits": [
            map_compute_resource(cr) for cr in raw_zone.get("computeResourceQuotaLimits", [])
        ],
        "flavoursSupported": [map_flavour(fl) for fl in raw_zone.get("flavoursSupported", [])],
        "networkResources": map_network_resources(raw_zone.get("networkResources")),
        "zoneServiceLevelObjsInfo": map_zone_service_level(
            raw_zone.get("zoneServiceLevelObjsInfo")
        ),
    }
+131 −0
Original line number Diff line number Diff line
from typing import List, Literal, Optional

from pydantic import BaseModel, Field, RootModel

# ---------------------------
# FederationManagement
# ---------------------------


class ZoneDetails(BaseModel):
    zoneId: str
    geolocation: Optional[str] = None
    geographyDetails: str


class ZonesList(RootModel[List[ZoneDetails]]):
    pass


# ---------------------------
# AvailabilityZoneInfoSynchronization
# ---------------------------


class HugePage(BaseModel):
    pageSize: str
    number: int


class GpuInfo(BaseModel):
    gpuVendorType: Literal["GPU_PROVIDER_NVIDIA", "GPU_PROVIDER_AMD"]
    gpuModeName: str
    gpuMemory: int
    numGPU: int


class ComputeResourceInfo(BaseModel):
    cpuArchType: Literal["ISA_X86", "ISA_X86_64", "ISA_ARM_64"]
    numCPU: int
    memory: int
    diskStorage: Optional[int] = None
    gpu: Optional[List[GpuInfo]] = None
    vpu: Optional[int] = None
    fpga: Optional[int] = None
    hugepages: Optional[List[HugePage]] = None
    cpuExclusivity: Optional[bool] = None


class OSType(BaseModel):
    architecture: Literal["x86_64", "x86"]
    distribution: Literal["RHEL", "UBUNTU", "COREOS", "FEDORA", "WINDOWS", "OTHER"]
    version: Literal[
        "OS_VERSION_UBUNTU_2204_LTS",
        "OS_VERSION_RHEL_8",
        "OS_VERSION_RHEL_7",
        "OS_VERSION_DEBIAN_11",
        "OS_VERSION_COREOS_STABLE",
        "OS_MS_WINDOWS_2012_R2",
        "OTHER",
    ]
    license: Literal["OS_LICENSE_TYPE_FREE", "OS_LICENSE_TYPE_ON_DEMAND", "NOT_SPECIFIED"]


class Flavour(BaseModel):
    flavourId: str
    cpuArchType: Literal["ISA_X86", "ISA_X86_64", "ISA_ARM_64"]
    supportedOSTypes: List[OSType] = Field(..., min_items=1)
    numCPU: int
    memorySize: int
    storageSize: int
    gpu: Optional[List[GpuInfo]] = None
    fpga: Optional[List[str]] = None
    vpu: Optional[List[str]] = None
    hugepages: Optional[List[HugePage]] = None
    cpuExclusivity: Optional[List[str]] = None


class NetworkResources(BaseModel):
    egressBandWidth: int
    dedicatedNIC: int
    supportSriov: bool
    supportDPDK: bool


class LatencyRange(BaseModel):
    minLatency: int = Field(..., ge=1)
    maxLatency: int


class JitterRange(BaseModel):
    minJitter: int = Field(..., ge=1)
    maxJitter: int


class ThroughputRange(BaseModel):
    minThroughput: int = Field(..., ge=1)
    maxThroughput: int


class ZoneServiceLevelObjsInfo(BaseModel):
    latencyRanges: LatencyRange
    jitterRanges: JitterRange
    throughputRanges: ThroughputRange


class ZoneRegisteredData(BaseModel):
    zoneId: str
    reservedComputeResources: List[ComputeResourceInfo] = Field(..., min_items=1)
    computeResourceQuotaLimits: List[ComputeResourceInfo] = Field(..., min_items=1)
    flavoursSupported: List[Flavour] = Field(..., min_items=1)
    networkResources: Optional[NetworkResources] = None
    zoneServiceLevelObjsInfo: Optional[ZoneServiceLevelObjsInfo] = None


class ZoneRegisteredDataList(RootModel[List[ZoneRegisteredData]]):
    pass


# ---------------------------
# ArtefactManagement
# ---------------------------


# ---------------------------
# ApplicationOnboardingManagement
# ---------------------------


# ---------------------------
# ApplicationDeploymentManagement
# ---------------------------