Commit 4f46147d authored by adrian-pino's avatar adrian-pino Committed by GitHub
Browse files

Merge pull request #17 from OpenOperatorPlatform/feature/add-edgecloud-i2edge

Feature/add edgecloud i2edge
parents 90a98ac0 f6aac40a
Loading
Loading
Loading
Loading

.flake8

0 → 100644
+2 −0
Original line number Diff line number Diff line
[flake8]
max-line-length = 88
 No newline at end of file
+1 −0
Original line number Diff line number Diff line
@@ -4,3 +4,4 @@
.pyc
__pycache__/
tmp/
.vscode/
+4 −2
Original line number Diff line number Diff line
@@ -13,8 +13,10 @@ participant PiEdge
note over AP,CE: CAMARA EdgeCloud API
AP ->> CE: GET /edge-cloud-zones
CE ->> API: GET /av. zones
API ->> SDK: sdk.i2edge.get_zones()
API ->> SDK: sbi = EdgeCloudFactory.create_edgecloud_client(i2Edge)
API ->> SDK: sbi.get_edge_cloud_zones()
SDK ->> i2Edge: GET /zones/list
API ->> SDK: sdk.piedge.get_zones()
API ->> SDK: sbi = EdgeCloudFactory.create_edgecloud_client(PiEdge)
API ->> SDK: sbi.get_edge_cloud_zones()
SDK ->> PiEdge: GET /nodes
```
 No newline at end of file
+4 −0
Original line number Diff line number Diff line
annotated-types==0.7.0
certifi==2025.1.31
charset-normalizer==3.4.1
colorlog==6.8.2
@@ -6,7 +7,10 @@ idna==3.10
iniconfig==2.0.0
packaging==24.2
pluggy==1.5.0
pydantic==2.10.6
pydantic_core==2.27.2
pytest==8.3.2
requests==2.32.3
tomli==2.2.1
typing_extensions==4.12.2
urllib3==2.3.0
+128 −11
Original line number Diff line number Diff line
# Mocked API for testing purposes
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
##
# Copyright 2025-present by Software Networks Area, i2CAT.
# All rights reserved.
#
# This file is part of the Open SDK
#
# Contributors:
#   - Adrián Pino Martínez (adrian.pino@i2cat.net)
#   - Sergio Giménez (sergio.gimenez@i2cat.net)
##
from typing import Dict, List, Optional

from src import logger
from src.edgecloud.core.edgecloud_interface import EdgeCloudManagementInterface

from . import schemas
from .common import (
    I2EdgeError,
    i2edge_delete,
    i2edge_get,
    i2edge_post,
    i2edge_post_multiform_data,
)

log = logger.get_logger(__name__)


class EdgeApplicationManager(EdgeCloudManagementInterface):
    def __init__(self, base_url: str):
        self.base_url = base_url

    def get_edge_cloud_zones(
        self, region: Optional[str] = None, status: Optional[str] = None
    ) -> list[dict]:
        url = "{}/zones/list".format(self.base_url)
        params = {}
        try:
            response = i2edge_get(url, params=params)
            log.info("Availability zones retrieved successfully")
            return response
        except I2EdgeError as e:
            raise e

    def _create_artefact(
        self,
        artefact_id: str,
        artefact_name: str,
        repo_name: str,
        repo_type: str,
        repo_url: str,
        password: Optional[str] = None,
        token: Optional[str] = None,
        user_name: Optional[str] = None,
    ):
        repo_type = schemas.RepoType(repo_type)
        url = "{}/artefact".format(self.base_url)
        payload = schemas.ArtefactOnboarding(
            artefact_id=artefact_id,
            name=artefact_name,
            repo_password=password,
            repo_name=repo_name,
            repo_type=repo_type,
            repo_url=repo_url,
            repo_token=token,
            repo_user_name=user_name,
        )
        try:
            i2edge_post_multiform_data(url, payload)
            log.info("Artifact added successfully")
        except I2EdgeError as e:
            raise e

    def _get_artefact(self, artefact_id: str) -> Dict:
        url = "{}/artefact/{}".format(self.base_url, artefact_id)
        try:
            response = i2edge_get(url, artefact_id)
            log.info("Artifact retrieved successfully")
            return response
        except I2EdgeError as e:
            raise e

    def _get_all_artefacts(self) -> List[Dict]:
        url = "{}/artefact".format(self.base_url)
        try:
            response = i2edge_get(url, {})
            log.info("Artifacts retrieved successfully")
            return response
        except I2EdgeError as e:
            raise e

    def _delete_artefact(self, artefact_id: str):
        url = "{}/artefact".format(self.base_url)
        try:
            i2edge_delete(url, artefact_id)
            log.info("Artifact deleted successfully")
        except I2EdgeError as e:
            raise e

    def onboard_app(self, app_manifest: Dict) -> Dict:
        print(f"Submitting application: {app_manifest}")
        return {"appId": "1234-5678"}
        try:
            app_id = app_manifest["appId"]
            artefact_id = app_id

    def get_all_onboarded_apps(self) -> List[Dict]:
        return [{"appId": "1234-5678", "name": "TestApp"}]
            app_component_spec = schemas.AppComponentSpec(artefactId=artefact_id)
            data = schemas.ApplicationOnboardingData(
                app_id=app_id, appComponentSpecs=[app_component_spec]
            )
            payload = schemas.ApplicationOnboardingRequest(profile_data=data)
            url = "{}/application/onboarding".format(self.base_url)
            i2edge_post(url, payload)
        except I2EdgeError as e:
            raise e
        except KeyError as e:
            raise I2EdgeError("Missing required field in app_manifest: {}".format(e))

    def delete_onboarded_app(self, app_id: str) -> None:
        url = "{}/application/onboarding".format(self.base_url)
        try:
            i2edge_delete(url, app_id)
        except I2EdgeError as e:
            raise e

    def get_onboarded_app(self, app_id: str) -> Dict:
        return {"appId": app_id, "name": "TestApp"}
        url = "{}/application/onboarding/{}".format(self.base_url, app_id)
        try:
            response = i2edge_get(url, app_id)
            return response
        except I2EdgeError as e:
            raise e

    def delete_onboarded_app(self, app_id: str) -> None:
        print(f"Deleting application: {app_id}")
    def get_all_onboarded_apps(self) -> List[Dict]:
        url = "{}/applications/onboarding".format(self.base_url)
        params = {}
        try:
            response = i2edge_get(url, params)
            return response
        except I2EdgeError as e:
            raise e

    def deploy_app(self, app_id: str, app_zones: List[Dict]) -> Dict:
        return {"appInstanceId": "abcd-efgh"}
@@ -27,6 +147,3 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):

    def undeploy_app(self, app_instance_id: str) -> None:
        print(f"Deleting app instance: {app_instance_id}")

    def get_edge_cloud_zones(self, region: Optional[str] = None, status: Optional[str] = None) -> List[Dict]:
        return [{"edgeCloudZoneId": "zone-1", "status": "active"}]
Loading