Loading src/edgecloud/clients/i2edge/client.py +73 −12 Original line number Diff line number Diff line Loading @@ -151,19 +151,80 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): except I2EdgeError as e: raise e # Harcoded # WIP - Harcoded by now def _select_best_flavour_for_app(self, zone_id) -> str: """ Selects the best flavour for the specified app requirements in a given zone. """ # list_of_flavours = self.get_edge_cloud_zones_details(zone_id) # TODO - Harcoded # flavourId = "67080247e43a30ca79b50d7d" flavourId = "6800c5199f29328e5691cd68" return flavourId def deploy_app(self, app_id: str, app_zones: List[Dict]) -> Dict: return {"appInstanceId": "abcd-efgh"} appId = app_id # appProviderId & appVersion are specified in the previous call (onboard_app) app = self.get_onboarded_app(appId) profile_data = app["profile_data"] appProviderId = profile_data["appProviderId"] appVersion = profile_data["appMetaData"]["version"] # TODO: Iterate in the list; deploy the app in all zones zone_info = app_zones[0]["EdgeCloudZone"] zone_id = zone_info["edgeCloudZoneId"] flavourId = self._select_best_flavour_for_app(zone_id=zone_id) app_deploy_data = schemas.AppDeployData( appId=appId, appProviderId=appProviderId, appVersion=appVersion, zoneInfo=schemas.ZoneInfo(flavourId=flavourId, zoneId=zone_id), ) url = "{}/app/".format(self.base_url) payload = schemas.AppDeploy(app_deploy_data=app_deploy_data) try: response = i2edge_post(url, payload) log.info("App deployed successfully") print(response) return response except I2EdgeError as e: raise e # Harcoded def get_all_deployed_apps( self, app_id: Optional[str] = None, app_instance_id: Optional[str] = None, region: Optional[str] = None, ) -> List[Dict]: return [{"appInstanceId": "abcd-efgh", "status": "ready"}] def get_all_deployed_apps(self) -> List[Dict]: url = "{}/app/".format(self.base_url) params = {} try: response = i2edge_get(url, params=params) log.info("All app instances retrieved successfully") return response except I2EdgeError as e: raise e # TODO: Partially harcoded def get_deployed_app(self, app_id, zone_id) -> List[Dict]: # Idea: Get all onboarded apps and filter the one where release_name == artifact name # Step 1) Extract the app name from the app id app = self.get_onboarded_app(app_id) appName = app["profile_data"]["appMetaData"]["appName"] # Step 2) Retrieve all deployed apps and filter the ones where release_name == app_name deployed_apps = self.get_all_deployed_apps() # This logic should be improved deploy_names = [app["deploy_name"] for app in deployed_apps] appName = deploy_names[0] url = "{}/app/{}/{}".format(self.base_url, zone_id, appName) params = {} try: response = i2edge_get(url, params=params) log.info("App instance retrieved successfully") return response except I2EdgeError as e: raise e # Harcoded def undeploy_app(self, app_instance_id: str) -> None: print(f"Deleting app instance: {app_instance_id}") url = "{}/app".format(self.base_url) try: i2edge_delete(url, app_instance_id) log.info("App instance deleted successfully") except I2EdgeError as e: raise e src/edgecloud/clients/i2edge/schemas.py +1 −1 Original line number Diff line number Diff line Loading @@ -34,7 +34,7 @@ class AppDeployData(BaseModel): class AppDeploy(BaseModel): app_deploy_data: AppDeployData app_parameters: Optional[AppParameters] = None app_parameters: Optional[AppParameters] = Field(default=AppParameters()) # Artefact Loading tests/edgecloud/test_4_app_onboarding.py +1 −1 Original line number Diff line number Diff line Loading @@ -10,7 +10,7 @@ app_manifest = { "appId": "test_app_from_SDK", "name": "my-application", "version": "1.0.0", "appProvider": "MyAppProvider", "appProvider": "i2CAT", "packageType": "CONTAINER", "appRepo": { "type": "PUBLICREPO", Loading tests/edgecloud/test_5_app_deployment.py +78 −1 Original line number Diff line number Diff line # -*- coding: utf-8 -*- # TODO import pytest from src.edgecloud.clients.errors import EdgeCloudPlatformError from src.edgecloud.core.edgecloud_factory import EdgeCloudFactory from tests.edgecloud.test_cases import test_cases appId = "i2edgechart-id" app_zones = [ { "kubernetesClusterRef": "not-used", "EdgeCloudZone": { # "edgeCloudZoneId": "Omega", "edgeCloudZoneId": "Omega12345", "edgeCloudZoneName": "not-used", "edgeCloudZoneStatus": "not-used", "edgeCloudProvider": "not-used", "edgeCloudRegion": "not-used", }, } ] @pytest.fixture(scope="module") # or "session" depending on your needs def deployed_app(request): client_name, base_url = request.param edgecloud_platform = EdgeCloudFactory.create_edgecloud_client(client_name, base_url) try: output = edgecloud_platform.deploy_app(appId, app_zones) return { "client_name": client_name, "base_url": base_url, "appInstanceId": output["deploy_name"], } except EdgeCloudPlatformError as e: pytest.fail(f"App deployment failed unexpectedly: {e}") @pytest.mark.parametrize("deployed_app", test_cases, indirect=True) def test_deploy_app_success(deployed_app): assert "appInstanceId" in deployed_app assert deployed_app["appInstanceId"].startswith("i2edgechart") # simple check @pytest.mark.parametrize("client_name, base_url", test_cases) def test_get_all_apps_success(client_name, base_url): edgecloud_platform = EdgeCloudFactory.create_edgecloud_client(client_name, base_url) try: edgecloud_platform.get_all_deployed_apps() except EdgeCloudPlatformError as e: pytest.fail(f"App instance retrieval failed unexpectedly: {e}") @pytest.mark.parametrize("client_name, base_url", test_cases) def test_get_app_success(client_name, base_url): edgecloud_platform = EdgeCloudFactory.create_edgecloud_client(client_name, base_url) try: edgecloud_platform.get_deployed_app( appId, app_zones[0]["EdgeCloudZone"]["edgeCloudZoneId"] ) except EdgeCloudPlatformError as e: pytest.fail(f"App instance retrieval failed unexpectedly: {e}") @pytest.mark.parametrize("deployed_app", test_cases, indirect=True) def test_undeploy_app_success(deployed_app): edgecloud_platform = EdgeCloudFactory.create_edgecloud_client( deployed_app["client_name"], deployed_app["base_url"] ) try: edgecloud_platform.undeploy_app(deployed_app["appInstanceId"]) except EdgeCloudPlatformError as e: pytest.fail(f"App undeployment failed unexpectedly: {e}") Loading
src/edgecloud/clients/i2edge/client.py +73 −12 Original line number Diff line number Diff line Loading @@ -151,19 +151,80 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): except I2EdgeError as e: raise e # Harcoded # WIP - Harcoded by now def _select_best_flavour_for_app(self, zone_id) -> str: """ Selects the best flavour for the specified app requirements in a given zone. """ # list_of_flavours = self.get_edge_cloud_zones_details(zone_id) # TODO - Harcoded # flavourId = "67080247e43a30ca79b50d7d" flavourId = "6800c5199f29328e5691cd68" return flavourId def deploy_app(self, app_id: str, app_zones: List[Dict]) -> Dict: return {"appInstanceId": "abcd-efgh"} appId = app_id # appProviderId & appVersion are specified in the previous call (onboard_app) app = self.get_onboarded_app(appId) profile_data = app["profile_data"] appProviderId = profile_data["appProviderId"] appVersion = profile_data["appMetaData"]["version"] # TODO: Iterate in the list; deploy the app in all zones zone_info = app_zones[0]["EdgeCloudZone"] zone_id = zone_info["edgeCloudZoneId"] flavourId = self._select_best_flavour_for_app(zone_id=zone_id) app_deploy_data = schemas.AppDeployData( appId=appId, appProviderId=appProviderId, appVersion=appVersion, zoneInfo=schemas.ZoneInfo(flavourId=flavourId, zoneId=zone_id), ) url = "{}/app/".format(self.base_url) payload = schemas.AppDeploy(app_deploy_data=app_deploy_data) try: response = i2edge_post(url, payload) log.info("App deployed successfully") print(response) return response except I2EdgeError as e: raise e # Harcoded def get_all_deployed_apps( self, app_id: Optional[str] = None, app_instance_id: Optional[str] = None, region: Optional[str] = None, ) -> List[Dict]: return [{"appInstanceId": "abcd-efgh", "status": "ready"}] def get_all_deployed_apps(self) -> List[Dict]: url = "{}/app/".format(self.base_url) params = {} try: response = i2edge_get(url, params=params) log.info("All app instances retrieved successfully") return response except I2EdgeError as e: raise e # TODO: Partially harcoded def get_deployed_app(self, app_id, zone_id) -> List[Dict]: # Idea: Get all onboarded apps and filter the one where release_name == artifact name # Step 1) Extract the app name from the app id app = self.get_onboarded_app(app_id) appName = app["profile_data"]["appMetaData"]["appName"] # Step 2) Retrieve all deployed apps and filter the ones where release_name == app_name deployed_apps = self.get_all_deployed_apps() # This logic should be improved deploy_names = [app["deploy_name"] for app in deployed_apps] appName = deploy_names[0] url = "{}/app/{}/{}".format(self.base_url, zone_id, appName) params = {} try: response = i2edge_get(url, params=params) log.info("App instance retrieved successfully") return response except I2EdgeError as e: raise e # Harcoded def undeploy_app(self, app_instance_id: str) -> None: print(f"Deleting app instance: {app_instance_id}") url = "{}/app".format(self.base_url) try: i2edge_delete(url, app_instance_id) log.info("App instance deleted successfully") except I2EdgeError as e: raise e
src/edgecloud/clients/i2edge/schemas.py +1 −1 Original line number Diff line number Diff line Loading @@ -34,7 +34,7 @@ class AppDeployData(BaseModel): class AppDeploy(BaseModel): app_deploy_data: AppDeployData app_parameters: Optional[AppParameters] = None app_parameters: Optional[AppParameters] = Field(default=AppParameters()) # Artefact Loading
tests/edgecloud/test_4_app_onboarding.py +1 −1 Original line number Diff line number Diff line Loading @@ -10,7 +10,7 @@ app_manifest = { "appId": "test_app_from_SDK", "name": "my-application", "version": "1.0.0", "appProvider": "MyAppProvider", "appProvider": "i2CAT", "packageType": "CONTAINER", "appRepo": { "type": "PUBLICREPO", Loading
tests/edgecloud/test_5_app_deployment.py +78 −1 Original line number Diff line number Diff line # -*- coding: utf-8 -*- # TODO import pytest from src.edgecloud.clients.errors import EdgeCloudPlatformError from src.edgecloud.core.edgecloud_factory import EdgeCloudFactory from tests.edgecloud.test_cases import test_cases appId = "i2edgechart-id" app_zones = [ { "kubernetesClusterRef": "not-used", "EdgeCloudZone": { # "edgeCloudZoneId": "Omega", "edgeCloudZoneId": "Omega12345", "edgeCloudZoneName": "not-used", "edgeCloudZoneStatus": "not-used", "edgeCloudProvider": "not-used", "edgeCloudRegion": "not-used", }, } ] @pytest.fixture(scope="module") # or "session" depending on your needs def deployed_app(request): client_name, base_url = request.param edgecloud_platform = EdgeCloudFactory.create_edgecloud_client(client_name, base_url) try: output = edgecloud_platform.deploy_app(appId, app_zones) return { "client_name": client_name, "base_url": base_url, "appInstanceId": output["deploy_name"], } except EdgeCloudPlatformError as e: pytest.fail(f"App deployment failed unexpectedly: {e}") @pytest.mark.parametrize("deployed_app", test_cases, indirect=True) def test_deploy_app_success(deployed_app): assert "appInstanceId" in deployed_app assert deployed_app["appInstanceId"].startswith("i2edgechart") # simple check @pytest.mark.parametrize("client_name, base_url", test_cases) def test_get_all_apps_success(client_name, base_url): edgecloud_platform = EdgeCloudFactory.create_edgecloud_client(client_name, base_url) try: edgecloud_platform.get_all_deployed_apps() except EdgeCloudPlatformError as e: pytest.fail(f"App instance retrieval failed unexpectedly: {e}") @pytest.mark.parametrize("client_name, base_url", test_cases) def test_get_app_success(client_name, base_url): edgecloud_platform = EdgeCloudFactory.create_edgecloud_client(client_name, base_url) try: edgecloud_platform.get_deployed_app( appId, app_zones[0]["EdgeCloudZone"]["edgeCloudZoneId"] ) except EdgeCloudPlatformError as e: pytest.fail(f"App instance retrieval failed unexpectedly: {e}") @pytest.mark.parametrize("deployed_app", test_cases, indirect=True) def test_undeploy_app_success(deployed_app): edgecloud_platform = EdgeCloudFactory.create_edgecloud_client( deployed_app["client_name"], deployed_app["base_url"] ) try: edgecloud_platform.undeploy_app(deployed_app["appInstanceId"]) except EdgeCloudPlatformError as e: pytest.fail(f"App undeployment failed unexpectedly: {e}")