Commit 566ebe0f authored by Adrian Pino's avatar Adrian Pino
Browse files

Add app onboarding-related endpoints & tests

parent bb95acd1
Loading
Loading
Loading
Loading
+35 −7
Original line number Diff line number Diff line
@@ -109,17 +109,45 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
            raise e

    def onboard_app(self, app_manifest: Dict) -> Dict:
        print(f"Submitting application: {app_manifest}")
        return {"appId": "1234-5678"}
        try:
            app_id = app_manifest["appId"]
            artefact_id = app_id

    def get_all_onboarded_apps(self) -> List[Dict]:
        return [{"appId": "1234-5678", "name": "TestApp"}]
            app_component_spec = schemas.AppComponentSpec(artefactId=artefact_id)
            data = schemas.ApplicationOnboardingData(
                app_id=app_id, appComponentSpecs=[app_component_spec]
            )
            payload = schemas.ApplicationOnboardingRequest(profile_data=data)
            url = "{}/application/onboarding".format(self.base_url)
            i2edge_post(url, payload)
        except I2EdgeError as e:
            raise e
        except KeyError as e:
            raise I2EdgeError("Missing required field in app_manifest: {}".format(e))

    def delete_onboarded_app(self, app_id: str) -> None:
        url = "{}/application/onboarding".format(self.base_url)
        try:
            i2edge_delete(url, app_id)
        except I2EdgeError as e:
            raise e

    def get_onboarded_app(self, app_id: str) -> Dict:
        return {"appId": app_id, "name": "TestApp"}
        url = "{}/application/onboarding/{}".format(self.base_url, app_id)
        try:
            response = i2edge_get(url, app_id)
            return response
        except I2EdgeError as e:
            raise e

    def delete_onboarded_app(self, app_id: str) -> None:
        print(f"Deleting application: {app_id}")
    def get_all_onboarded_apps(self) -> List[Dict]:
        url = "{}/applications/onboarding".format(self.base_url)
        params = {}
        try:
            response = i2edge_get(url, params)
            return response
        except I2EdgeError as e:
            raise e

    def deploy_app(self, app_id: str, app_zones: List[Dict]) -> Dict:
        return {"appInstanceId": "abcd-efgh"}
+133 −1
Original line number Diff line number Diff line
@@ -194,7 +194,139 @@ def test_delete_artefact_failure(client_name, base_url):
#######################################
# APP ONBOARDING
#######################################
# TODO
# CAMARA app payload (only mandatory fields)
app_manifest = {
    "appId": "test_app_from_SDK",
    "name": "my-application",
    "version": "1.0.0",
    "appProvider": "MyAppProvider",
    "packageType": "CONTAINER",
    "appRepo": {
        "type": "PUBLICREPO",
        "imagePath": "https://example.com/my-app-image:1.0.0",
    },
    "requiredResources": {
        "infraKind": "kubernetes",
        "applicationResources": {
            "cpuPool": {
                "numCPU": 2,
                "memory": 2048,
                "topology": {
                    "minNumberOfNodes": 2,
                    "minNodeCpu": 1,
                    "minNodeMemory": 1024,
                },
            }
        },
        "isStandalone": False,
        "version": "1.29",
    },
    "componentSpec": [
        {
            "componentName": "my-component",
            "networkInterfaces": [
                {
                    "interfaceId": "eth0",
                    "protocol": "TCP",
                    "port": 8080,
                    "visibilityType": "VISIBILITY_EXTERNAL",
                }
            ],
        }
    ],
}
# artefactId needs to be added; same ID as appId
app_manifest.update({"artefactId": app_manifest["appId"]})


@pytest.mark.parametrize("client_name, base_url", test_cases)
def test_onboard_app_success(client_name, base_url):
    edgecloud_platform = EdgeCloudFactory.create_edgecloud_client(
        client_name, base_url
    )

    try:
        edgecloud_platform.onboard_app(app_manifest)
    except I2EdgeError as e:
        pytest.fail(f"App onboarding failed unexpectedly: {e}")


@pytest.mark.parametrize("client_name, base_url", test_cases)
def test_onboard_app_failure(client_name, base_url):
    edgecloud_platform = EdgeCloudFactory.create_edgecloud_client(
        client_name, base_url
    )

    with pytest.raises(I2EdgeError):
        edgecloud_platform.onboard_app({})


@pytest.mark.parametrize("client_name, base_url", test_cases)
def test_onboard_app_failure_artefact_id_missing(client_name, base_url):
    app_manifest.pop("artefactId")

    edgecloud_platform = EdgeCloudFactory.create_edgecloud_client(
        client_name, base_url
    )

    with pytest.raises(I2EdgeError):
        edgecloud_platform.onboard_app({})


@pytest.mark.parametrize("client_name, base_url", test_cases)
def test_get_onboarded_app_success(client_name, base_url):
    edgecloud_platform = EdgeCloudFactory.create_edgecloud_client(
        client_name, base_url
    )

    try:
        edgecloud_platform.get_onboarded_app(app_id=app_manifest["appId"])
    except I2EdgeError as e:
        pytest.fail(f"App onboarding failed unexpectedly: {e}")


@pytest.mark.parametrize("client_name, base_url", test_cases)
def test_get_onboarded_app_failure(client_name, base_url):
    edgecloud_platform = EdgeCloudFactory.create_edgecloud_client(
        client_name, base_url
    )

    with pytest.raises(I2EdgeError):
        edgecloud_platform.get_onboarded_app(app_id="non-existent-app")


@pytest.mark.parametrize("client_name, base_url", test_cases)
def test_get_all_onboarded_app_success(client_name, base_url):
    edgecloud_platform = EdgeCloudFactory.create_edgecloud_client(
        client_name, base_url
    )

    try:
        edgecloud_platform.get_all_onboarded_apps()
    except I2EdgeError as e:
        pytest.fail(f"App onboarding failed unexpectedly: {e}")


@pytest.mark.parametrize("client_name, base_url", test_cases)
def test_delete_onboarded_app_success(client_name, base_url):
    edgecloud_platform = EdgeCloudFactory.create_edgecloud_client(
        client_name, base_url
    )

    try:
        edgecloud_platform.delete_onboarded_app(app_id=app_manifest["appId"])
    except I2EdgeError as e:
        pytest.fail(f"App onboarding deletion failed unexpectedly: {e}")


@pytest.mark.parametrize("client_name, base_url", test_cases)
def test_delete_onboarded_app_failure(client_name, base_url):
    edgecloud_platform = EdgeCloudFactory.create_edgecloud_client(
        client_name, base_url
    )

    with pytest.raises(I2EdgeError):
        edgecloud_platform.delete_onboarded_app(app_id="non-existent-app")


#######################################