Loading examples/example.py +249 −8 Original line number Diff line number Diff line # from sunrise6g_opensdk import Sdk as sdkclient # For PyPI users import time from sunrise6g_opensdk.common.sdk import Sdk as sdkclient # For developers Loading @@ -6,8 +8,9 @@ def main(): # The module that imports the SDK package, must specify which adapters will be used: adapter_specs = { "edgecloud": { "client_name": "kubernetes", "base_url": "http://IP:PORT", "client_name": "i2edge", "base_url": "http://192.168.123.48:30769", "flavour_id": "67f3a0b0e3184a85952e174d", }, "network": { "client_name": "open5gs", Loading @@ -20,18 +23,256 @@ def main(): edgecloud_client = adapters.get("edgecloud") network_client = adapters.get("network") print("EdgeCloud client ready to be used:", edgecloud_client) print("EdgeCloud client ready to be used:", edgecloud_client.__dict__) print("Network client ready to be used:", network_client) # Examples: # EdgeCloud # print("Testing edgecloud client function: get_edge_cloud_zones:") # zones = edgecloud_client.get_edge_cloud_zones() # print(zones) # FederationManagement zones_list = edgecloud_client.get_edge_cloud_zones_list_gsma() print(zones_list) print(zones_list.status_code) print(zones_list.json()) zones = edgecloud_client.get_edge_cloud_zones_gsma("federation_context_id") print(zones) print(zones.status_code) print(zones.json()) # AvailabilityZoneInfoSynchronization zones_info = edgecloud_client.availability_zone_info_gsma( "federation_context_id", {"request_body": "value"} ) print(zones_info) print(zones_info.status_code) print(zones_info.json()) zone_id = "Omega" zones = edgecloud_client.get_edge_cloud_zone_details_gsma( "federation_context_id", zone_id ) print(zones) print(zones.status_code) print(zones.json()) # ArtefactManager request_body = { "artefactId": "i2edgechart", "appProviderId": "string", "artefactName": "i2edgechart", "artefactVersionInfo": "string", "artefactDescription": "string", "artefactVirtType": "VM_TYPE", "artefactFileName": "string", "artefactFileFormat": "WINZIP", "artefactDescriptorType": "HELM", "repoType": "PUBLICREPO", "artefactRepoLocation": { "repoURL": "https://cesarcajas.github.io/helm-charts-examples/", "userName": "string", "password": "string", "token": "string", }, "artefactFile": "string", "componentSpec": [ { "componentName": "string", "images": ["3fa85f64-5717-4562-b3fc-2c963f66afa6"], "numOfInstances": 0, "restartPolicy": "RESTART_POLICY_ALWAYS", "commandLineParams": {"command": ["string"], "commandArgs": ["string"]}, "exposedInterfaces": [ { "interfaceId": "string", "commProtocol": "TCP", "commPort": 0, "visibilityType": "VISIBILITY_EXTERNAL", "network": "string", "InterfaceName": "string", } ], "computeResourceProfile": { "cpuArchType": "ISA_X86_64", "numCPU": { "whole": {"value": 2}, "decimal": {"value": 0.5}, "millivcpu": {"value": "500m"}, }, "memory": 0, "diskStorage": 0, "gpu": [ { "gpuVendorType": "GPU_PROVIDER_NVIDIA", "gpuModeName": "string", "gpuMemory": 0, "numGPU": 0, } ], "vpu": 0, "fpga": 0, "hugepages": [{"pageSize": "2MB", "number": 0}], "cpuExclusivity": True, }, "compEnvParams": [ { "envVarName": "string", "envValueType": "USER_DEFINED", "envVarValue": "string", "envVarSrc": "string", } ], "deploymentConfig": { "configType": "DOCKER_COMPOSE", "contents": "string", }, "persistentVolumes": [ { "volumeSize": "10Gi", "volumeMountPath": "string", "volumeName": "string", "ephemeralType": False, "accessMode": "RW", "sharingPolicy": "EXCLUSIVE", } ], } ], } artefact = edgecloud_client.create_artefact_gsma( "federation_context_id", request_body ) print(artefact) print(artefact.status_code) print(artefact.json()) artefact_id = "i2edgechart" get_artefact = edgecloud_client.get_artefact_gsma( "federation_context_id", artefact_id ) print(get_artefact) print(get_artefact.status_code) print(get_artefact.json()) # ApplicationOnboardingManager request_body = { "appId": "demo-app-id", "appProviderId": "Y89TSlxMPDKlXZz7rN6vU2y", "appDeploymentZones": [ "Dmgoc-y2zv97lar0UKqQd53aS6MCTTdoGMY193yvRBYgI07zOAIktN2b9QB2THbl5Gqvbj5Zp92vmNeg7v4M" ], "appMetaData": { "appName": "pj1iEkprop", "version": "string", "appDescription": "stringstringstri", "mobilitySupport": False, "accessToken": "MfxADOjxDgBhMrqmBeG8XdQFLp2XviG3cZ_LM7uQKc9b", "category": "IOT", }, "appQoSProfile": { "latencyConstraints": "NONE", "bandwidthRequired": 1, "multiUserClients": "APP_TYPE_SINGLE_USER", "noOfUsersPerAppInst": 1, "appProvisioning": True, }, "appComponentSpecs": [ { "serviceNameNB": "k8yyElSyJN4ctbNVqwodEQNUoGb2EzOEt4vQBjGnPii_5", "serviceNameEW": "iDm08OZN", "componentName": "HIEWqstajCmZJQmSFUj0kNHZ0xYvKWq720BKt8wjA41p", "artefactId": "i2edgechart", } ], "appStatusCallbackLink": "string", "edgeAppFQDN": "string", } onboard_app = edgecloud_client.onboard_app_gsma( "federation_context_id", request_body ) print(onboard_app) print(onboard_app.status_code) print(onboard_app.json()) app_id = "demo-app-id" get_onboarded_app = edgecloud_client.get_onboarded_app_gsma( "federation_context_id", app_id ) print(get_onboarded_app) print(get_onboarded_app.status_code) print(get_onboarded_app.json()) idempotency_key = "idempotency-key" request_body = { "appId": "demo-app-id", "appVersion": "string", "appProviderId": "Y89TSlxMPDKlXZz7rN6vU2y", "zoneInfo": { "zoneId": "Omega", "flavourId": "67f3a0b0e3184a85952e174d", "resourceConsumption": "RESERVED_RES_AVOID", "resPool": "ySIT0LuZ6ApHs0wlyGZve", }, "appInstCallbackLink": "string", } deploy_app = edgecloud_client.deploy_app_gsma( "federation_context_id", idempotency_key, request_body ) app_instance_id = deploy_app.json().get("appInstIdentifier") print(deploy_app) print(deploy_app.status_code) print(deploy_app.json()) time.sleep(10) app_id = "demo-app-id" zone_id = "Omega" get_deploy_app = edgecloud_client.get_deployed_app_gsma( "federation_context_id", app_id, app_instance_id, zone_id ) print(get_deploy_app) print(get_deploy_app.status_code) print(get_deploy_app.json()) get_deployed_apps = edgecloud_client.get_all_deployed_apps_gsma( "federation_context_id", app_id, "app_provider" ) print(get_deployed_apps) print(get_deployed_apps.status_code) print(get_deployed_apps.json()) app_id = "demo-app-id" zone_id = "Omega" delete_deploy_app = edgecloud_client.undeploy_app_gsma( "federation_context_id", app_id, app_instance_id, zone_id ) print(delete_deploy_app) print(delete_deploy_app.status_code) print(delete_deploy_app.json()) app_id = "demo-app-id" delete_onboarded_app = edgecloud_client.delete_onboarded_app_gsma( "federation_context_id", app_id ) print(delete_onboarded_app) print(delete_onboarded_app.status_code) print(delete_onboarded_app.json()) artefact_id = "i2edgechart" delete_artefact = edgecloud_client.delete_artefact_gsma( "federation_context_id", artefact_id ) print(delete_artefact) print(delete_artefact.status_code) print(delete_artefact.json()) # Network # print("Testing network client function: 'get_qod_session'") # network_client.get_qod_session(session_id="example_session_id") print("Testing network client function: 'get_qod_session'") network_client.get_qod_session(session_id="example_session_id") if __name__ == "__main__": Loading src/sunrise6g_opensdk/common/adapters_schemas.py 0 → 100644 +28 −0 Original line number Diff line number Diff line # EXAMPLE OF PYDANTIC SCHEMAS from typing import Optional from pydantic import AnyHttpUrl, BaseModel, Field class _AdapterBase(BaseModel): client_name: str = Field(..., min_length=1) base_url: AnyHttpUrl class Config: extra = "allow" class EdgeCloudConfig(_AdapterBase): pass class NetworkConfig(_AdapterBase): pass class AdapterSpecs(BaseModel): edgecloud: Optional[EdgeCloudConfig] = None network: Optional[NetworkConfig] = None class Config: extra = "forbid" src/sunrise6g_opensdk/edgecloud/adapters/aeros/client.py +201 −0 Original line number Diff line number Diff line Loading @@ -431,3 +431,204 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "flavoursSupported": flavours_supported, } return result # --- GSMA-specific methods --- # FederationManagement def get_edge_cloud_zones_list_gsma(self) -> List: """ Retrieves details of all Zones :return: List. """ pass def get_edge_cloud_zones_gsma(self, federation_context_id: str) -> List: """ Retrieves details of Zones :param federation_context_id: Identifier of the federation context. :return: List. """ pass # AvailabilityZoneInfoSynchronization def availability_zone_info_gsma( self, federation_context_id: str, request_body: dict ) -> Dict: """ Originating OP informs partner OP that it is willing to access the specified zones and partner OP shall reserve compute and network resources for these zones. :param federation_context_id: Identifier of the federation context. :param request_body: Payload. :return: """ pass def get_edge_cloud_zone_details_gsma( self, federation_context_id: str, zone_id: str ) -> Dict: """ Retrieves details of a specific Edge Cloud Zone reserved for the specified zone by the partner OP. :param federation_context_id: Identifier of the federation context. :param zone_id: Unique identifier of the Edge Cloud Zone. :return: Dictionary with Edge Cloud Zone details. """ pass # ArtefactManagement def create_artefact_gsma( self, federation_context_id: str, request_body: dict ) -> Dict: """ Uploads application artefact on partner OP. Artefact is a zip file containing scripts and/or packaging files like Terraform or Helm which are required to create an instance of an application :param federation_context_id: Identifier of the federation context. :param request_body: Payload with artefact information. :return: Details with artefact deployment info. """ pass def get_artefact_gsma(self, federation_context_id: str, artefact_id: str) -> Dict: """ Retrieves details about an artefact :param federation_context_id: Identifier of the federation context. :param artefact_id: Unique identifier of the artefact. :return: Dictionary with artefact details. """ pass def delete_artefact_gsma( self, federation_context_id: str, artefact_id: str ) -> Dict: """ Removes an artefact from partners OP. :param federation_context_id: Identifier of the federation context. :param artefact_id: Unique identifier of the artefact. :return: Dictionary with artefact deletion details. """ pass # ApplicationOnboardingManagement def onboard_app_gsma(self, federation_context_id: str, request_body: dict) -> Dict: """ Submits an application details to a partner OP. Based on the details provided, partner OP shall do bookkeeping, resource validation and other pre-deployment operations. :param federation_context_id: Identifier of the federation context. :param request_body: Payload with onboarding info. :return: Dictionary with onboarding details. """ pass def get_onboarded_app_gsma(self, federation_context_id: str, app_id: str) -> Dict: """ Retrieves application details from partner OP :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the application onboarded. :return: """ pass def patch_onboarded_app_gsma( self, federation_context_id: str, app_id: str, request_body: dict ) -> Dict: """ Updates partner OP about changes in application compute resource requirements, QOS Profile, associated descriptor or change in associated components :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the application onboarded. :return: """ pass def delete_onboarded_app_gsma(self, federation_context_id: str, app_id: str): """ Deboards an application from specific partner OP zones :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the application onboarded. :return: """ pass # ApplicationDeploymentManagement def deploy_app_gsma( self, federation_context_id: str, idempotency_key: str, request_body: dict ): """ Instantiates an application on a partner OP zone. :param federation_context_id: Identifier of the federation context. :param idempotency_key: Idempotency key. :return: """ pass def get_deployed_app_gsma( self, federation_context_id: str, app_id: str, app_instance_id: str, zone_id: str, ): """ Retrieves an application instance details from partner OP. :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the app. :param app_instance_id: Identifier of the deployed app. :param zone_id: Identifier of the zone :return: """ pass def get_all_deployed_apps_gsma( self, federation_context_id: str, app_id: str, app_provider: str, ): """ Retrieves all application instance of partner OP :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the app. :param app_provider: App provider :return: """ pass def undeploy_app_gsma( self, federation_context_id: str, app_id: str, app_instance_id: str, zone_id: str, ): """ Terminate an application instance on a partner OP zone. :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the app. :param app_instance_id: Identifier of the deployed app. :param zone_id: Identifier of the zone :return: """ pass src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py +101 −1 Original line number Diff line number Diff line Loading @@ -254,17 +254,92 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): # FederationManagement def get_edge_cloud_zones_list_gsma(self) -> List: url = "{}/zones/list".format(self.base_url) params = {} try: response = i2edge_get(url, params=params) if response.status_code == 200: response_json = response.json() response_list = [] for item in response_json: content = { "zoneId": item.get("zoneId"), "geolocation": item.get("geolocation"), "geographyDetails": item.get("geographyDetails"), } response_list.append(content) return self._build_custom_gsma_response( status_code=200, content=response_list, headers={"Content-Type": self.content_type_gsma}, encoding=self.encoding_gsma, url=response.url, request=response.request, ) return response except I2EdgeError as e: raise e def get_edge_cloud_zones_gsma(self, federation_context_id: str) -> Dict: url = "{}/zones".format(self.base_url) params = {} try: response = i2edge_get(url, params=params) if response.status_code == 200: response_json = response.json() response_list = [] for item in response_json: content = { "zoneId": item.get("zoneId"), "reservedComputeResources": item.get( "reservedComputeResources" ), "computeResourceQuotaLimits": item.get( "computeResourceQuotaLimits" ), "flavoursSupported": item.get("flavoursSupported"), "networkResources": item.get("networkResources"), "zoneServiceLevelObjsInfo": item.get( "zoneServiceLevelObjsInfo" ), } response_list.append(content) return self._build_custom_gsma_response( status_code=200, content=response_list, headers={"Content-Type": self.content_type_gsma}, encoding=self.encoding_gsma, url=response.url, request=response.request, ) return response except I2EdgeError as e: raise e # AvailabilityZoneInfoSynchronization def availability_zone_info_gsma( self, federation_context_id: str, request_body: dict ) -> Dict: url = "{}/zones".format(self.base_url) params = {} try: response = i2edge_get(url, params=params) if response.status_code == 200: content = {"acceptedZoneResourceInfo": response.json()} return self._build_custom_gsma_response( status_code=200, content=content, headers={"Content-Type": self.content_type_gsma}, encoding=self.encoding_gsma, url=response.url, request=response.request, ) return response except I2EdgeError as e: raise e def get_edge_cloud_zone_details_gsma( self, federation_context_id: str, zone_id: str ) -> Dict: Loading @@ -272,6 +347,30 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): params = {} try: response = i2edge_get(url, params=params) if response.status_code == 200: response_json = response.json() content = { "zoneId": response_json.get("zoneID"), "reservedComputeResources": response_json.get( "reservedComputeResources" ), "computeResourceQuotaLimits": response_json.get( "computeResourceQuotaLimits" ), "flavoursSupported": response_json.get("flavoursSupported"), "networkResources": response_json.get("networkResources"), "zoneServiceLevelObjsInfo": response_json.get( "zoneServiceLevelObjsInfo" ), } return self._build_custom_gsma_response( status_code=200, content=content, headers={"Content-Type": self.content_type_gsma}, encoding=self.encoding_gsma, url=response.url, request=response.request, ) return response except I2EdgeError as e: raise e Loading Loading @@ -444,11 +543,12 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): body = deepcopy(request_body) try: zone_id = body.get("zoneInfo").get("zoneId") flavour_id = body.get("zoneInfo").get("flavourId") app_deploy_data = schemas.AppDeployData( appId=body.get("appId"), appProviderId=body.get("appProviderId"), appVersion=body.get("appVersion"), zoneInfo=schemas.ZoneInfo(flavourId=self.flavour_id, zoneId=zone_id), zoneInfo=schemas.ZoneInfo(flavourId=flavour_id, zoneId=zone_id), ) payload = schemas.AppDeploy( app_deploy_data=app_deploy_data, app_parameters={"namespace": "test"} Loading src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/client.py +202 −0 File changed.Preview size limit exceeded, changes collapsed. Show changes Loading
examples/example.py +249 −8 Original line number Diff line number Diff line # from sunrise6g_opensdk import Sdk as sdkclient # For PyPI users import time from sunrise6g_opensdk.common.sdk import Sdk as sdkclient # For developers Loading @@ -6,8 +8,9 @@ def main(): # The module that imports the SDK package, must specify which adapters will be used: adapter_specs = { "edgecloud": { "client_name": "kubernetes", "base_url": "http://IP:PORT", "client_name": "i2edge", "base_url": "http://192.168.123.48:30769", "flavour_id": "67f3a0b0e3184a85952e174d", }, "network": { "client_name": "open5gs", Loading @@ -20,18 +23,256 @@ def main(): edgecloud_client = adapters.get("edgecloud") network_client = adapters.get("network") print("EdgeCloud client ready to be used:", edgecloud_client) print("EdgeCloud client ready to be used:", edgecloud_client.__dict__) print("Network client ready to be used:", network_client) # Examples: # EdgeCloud # print("Testing edgecloud client function: get_edge_cloud_zones:") # zones = edgecloud_client.get_edge_cloud_zones() # print(zones) # FederationManagement zones_list = edgecloud_client.get_edge_cloud_zones_list_gsma() print(zones_list) print(zones_list.status_code) print(zones_list.json()) zones = edgecloud_client.get_edge_cloud_zones_gsma("federation_context_id") print(zones) print(zones.status_code) print(zones.json()) # AvailabilityZoneInfoSynchronization zones_info = edgecloud_client.availability_zone_info_gsma( "federation_context_id", {"request_body": "value"} ) print(zones_info) print(zones_info.status_code) print(zones_info.json()) zone_id = "Omega" zones = edgecloud_client.get_edge_cloud_zone_details_gsma( "federation_context_id", zone_id ) print(zones) print(zones.status_code) print(zones.json()) # ArtefactManager request_body = { "artefactId": "i2edgechart", "appProviderId": "string", "artefactName": "i2edgechart", "artefactVersionInfo": "string", "artefactDescription": "string", "artefactVirtType": "VM_TYPE", "artefactFileName": "string", "artefactFileFormat": "WINZIP", "artefactDescriptorType": "HELM", "repoType": "PUBLICREPO", "artefactRepoLocation": { "repoURL": "https://cesarcajas.github.io/helm-charts-examples/", "userName": "string", "password": "string", "token": "string", }, "artefactFile": "string", "componentSpec": [ { "componentName": "string", "images": ["3fa85f64-5717-4562-b3fc-2c963f66afa6"], "numOfInstances": 0, "restartPolicy": "RESTART_POLICY_ALWAYS", "commandLineParams": {"command": ["string"], "commandArgs": ["string"]}, "exposedInterfaces": [ { "interfaceId": "string", "commProtocol": "TCP", "commPort": 0, "visibilityType": "VISIBILITY_EXTERNAL", "network": "string", "InterfaceName": "string", } ], "computeResourceProfile": { "cpuArchType": "ISA_X86_64", "numCPU": { "whole": {"value": 2}, "decimal": {"value": 0.5}, "millivcpu": {"value": "500m"}, }, "memory": 0, "diskStorage": 0, "gpu": [ { "gpuVendorType": "GPU_PROVIDER_NVIDIA", "gpuModeName": "string", "gpuMemory": 0, "numGPU": 0, } ], "vpu": 0, "fpga": 0, "hugepages": [{"pageSize": "2MB", "number": 0}], "cpuExclusivity": True, }, "compEnvParams": [ { "envVarName": "string", "envValueType": "USER_DEFINED", "envVarValue": "string", "envVarSrc": "string", } ], "deploymentConfig": { "configType": "DOCKER_COMPOSE", "contents": "string", }, "persistentVolumes": [ { "volumeSize": "10Gi", "volumeMountPath": "string", "volumeName": "string", "ephemeralType": False, "accessMode": "RW", "sharingPolicy": "EXCLUSIVE", } ], } ], } artefact = edgecloud_client.create_artefact_gsma( "federation_context_id", request_body ) print(artefact) print(artefact.status_code) print(artefact.json()) artefact_id = "i2edgechart" get_artefact = edgecloud_client.get_artefact_gsma( "federation_context_id", artefact_id ) print(get_artefact) print(get_artefact.status_code) print(get_artefact.json()) # ApplicationOnboardingManager request_body = { "appId": "demo-app-id", "appProviderId": "Y89TSlxMPDKlXZz7rN6vU2y", "appDeploymentZones": [ "Dmgoc-y2zv97lar0UKqQd53aS6MCTTdoGMY193yvRBYgI07zOAIktN2b9QB2THbl5Gqvbj5Zp92vmNeg7v4M" ], "appMetaData": { "appName": "pj1iEkprop", "version": "string", "appDescription": "stringstringstri", "mobilitySupport": False, "accessToken": "MfxADOjxDgBhMrqmBeG8XdQFLp2XviG3cZ_LM7uQKc9b", "category": "IOT", }, "appQoSProfile": { "latencyConstraints": "NONE", "bandwidthRequired": 1, "multiUserClients": "APP_TYPE_SINGLE_USER", "noOfUsersPerAppInst": 1, "appProvisioning": True, }, "appComponentSpecs": [ { "serviceNameNB": "k8yyElSyJN4ctbNVqwodEQNUoGb2EzOEt4vQBjGnPii_5", "serviceNameEW": "iDm08OZN", "componentName": "HIEWqstajCmZJQmSFUj0kNHZ0xYvKWq720BKt8wjA41p", "artefactId": "i2edgechart", } ], "appStatusCallbackLink": "string", "edgeAppFQDN": "string", } onboard_app = edgecloud_client.onboard_app_gsma( "federation_context_id", request_body ) print(onboard_app) print(onboard_app.status_code) print(onboard_app.json()) app_id = "demo-app-id" get_onboarded_app = edgecloud_client.get_onboarded_app_gsma( "federation_context_id", app_id ) print(get_onboarded_app) print(get_onboarded_app.status_code) print(get_onboarded_app.json()) idempotency_key = "idempotency-key" request_body = { "appId": "demo-app-id", "appVersion": "string", "appProviderId": "Y89TSlxMPDKlXZz7rN6vU2y", "zoneInfo": { "zoneId": "Omega", "flavourId": "67f3a0b0e3184a85952e174d", "resourceConsumption": "RESERVED_RES_AVOID", "resPool": "ySIT0LuZ6ApHs0wlyGZve", }, "appInstCallbackLink": "string", } deploy_app = edgecloud_client.deploy_app_gsma( "federation_context_id", idempotency_key, request_body ) app_instance_id = deploy_app.json().get("appInstIdentifier") print(deploy_app) print(deploy_app.status_code) print(deploy_app.json()) time.sleep(10) app_id = "demo-app-id" zone_id = "Omega" get_deploy_app = edgecloud_client.get_deployed_app_gsma( "federation_context_id", app_id, app_instance_id, zone_id ) print(get_deploy_app) print(get_deploy_app.status_code) print(get_deploy_app.json()) get_deployed_apps = edgecloud_client.get_all_deployed_apps_gsma( "federation_context_id", app_id, "app_provider" ) print(get_deployed_apps) print(get_deployed_apps.status_code) print(get_deployed_apps.json()) app_id = "demo-app-id" zone_id = "Omega" delete_deploy_app = edgecloud_client.undeploy_app_gsma( "federation_context_id", app_id, app_instance_id, zone_id ) print(delete_deploy_app) print(delete_deploy_app.status_code) print(delete_deploy_app.json()) app_id = "demo-app-id" delete_onboarded_app = edgecloud_client.delete_onboarded_app_gsma( "federation_context_id", app_id ) print(delete_onboarded_app) print(delete_onboarded_app.status_code) print(delete_onboarded_app.json()) artefact_id = "i2edgechart" delete_artefact = edgecloud_client.delete_artefact_gsma( "federation_context_id", artefact_id ) print(delete_artefact) print(delete_artefact.status_code) print(delete_artefact.json()) # Network # print("Testing network client function: 'get_qod_session'") # network_client.get_qod_session(session_id="example_session_id") print("Testing network client function: 'get_qod_session'") network_client.get_qod_session(session_id="example_session_id") if __name__ == "__main__": Loading
src/sunrise6g_opensdk/common/adapters_schemas.py 0 → 100644 +28 −0 Original line number Diff line number Diff line # EXAMPLE OF PYDANTIC SCHEMAS from typing import Optional from pydantic import AnyHttpUrl, BaseModel, Field class _AdapterBase(BaseModel): client_name: str = Field(..., min_length=1) base_url: AnyHttpUrl class Config: extra = "allow" class EdgeCloudConfig(_AdapterBase): pass class NetworkConfig(_AdapterBase): pass class AdapterSpecs(BaseModel): edgecloud: Optional[EdgeCloudConfig] = None network: Optional[NetworkConfig] = None class Config: extra = "forbid"
src/sunrise6g_opensdk/edgecloud/adapters/aeros/client.py +201 −0 Original line number Diff line number Diff line Loading @@ -431,3 +431,204 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "flavoursSupported": flavours_supported, } return result # --- GSMA-specific methods --- # FederationManagement def get_edge_cloud_zones_list_gsma(self) -> List: """ Retrieves details of all Zones :return: List. """ pass def get_edge_cloud_zones_gsma(self, federation_context_id: str) -> List: """ Retrieves details of Zones :param federation_context_id: Identifier of the federation context. :return: List. """ pass # AvailabilityZoneInfoSynchronization def availability_zone_info_gsma( self, federation_context_id: str, request_body: dict ) -> Dict: """ Originating OP informs partner OP that it is willing to access the specified zones and partner OP shall reserve compute and network resources for these zones. :param federation_context_id: Identifier of the federation context. :param request_body: Payload. :return: """ pass def get_edge_cloud_zone_details_gsma( self, federation_context_id: str, zone_id: str ) -> Dict: """ Retrieves details of a specific Edge Cloud Zone reserved for the specified zone by the partner OP. :param federation_context_id: Identifier of the federation context. :param zone_id: Unique identifier of the Edge Cloud Zone. :return: Dictionary with Edge Cloud Zone details. """ pass # ArtefactManagement def create_artefact_gsma( self, federation_context_id: str, request_body: dict ) -> Dict: """ Uploads application artefact on partner OP. Artefact is a zip file containing scripts and/or packaging files like Terraform or Helm which are required to create an instance of an application :param federation_context_id: Identifier of the federation context. :param request_body: Payload with artefact information. :return: Details with artefact deployment info. """ pass def get_artefact_gsma(self, federation_context_id: str, artefact_id: str) -> Dict: """ Retrieves details about an artefact :param federation_context_id: Identifier of the federation context. :param artefact_id: Unique identifier of the artefact. :return: Dictionary with artefact details. """ pass def delete_artefact_gsma( self, federation_context_id: str, artefact_id: str ) -> Dict: """ Removes an artefact from partners OP. :param federation_context_id: Identifier of the federation context. :param artefact_id: Unique identifier of the artefact. :return: Dictionary with artefact deletion details. """ pass # ApplicationOnboardingManagement def onboard_app_gsma(self, federation_context_id: str, request_body: dict) -> Dict: """ Submits an application details to a partner OP. Based on the details provided, partner OP shall do bookkeeping, resource validation and other pre-deployment operations. :param federation_context_id: Identifier of the federation context. :param request_body: Payload with onboarding info. :return: Dictionary with onboarding details. """ pass def get_onboarded_app_gsma(self, federation_context_id: str, app_id: str) -> Dict: """ Retrieves application details from partner OP :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the application onboarded. :return: """ pass def patch_onboarded_app_gsma( self, federation_context_id: str, app_id: str, request_body: dict ) -> Dict: """ Updates partner OP about changes in application compute resource requirements, QOS Profile, associated descriptor or change in associated components :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the application onboarded. :return: """ pass def delete_onboarded_app_gsma(self, federation_context_id: str, app_id: str): """ Deboards an application from specific partner OP zones :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the application onboarded. :return: """ pass # ApplicationDeploymentManagement def deploy_app_gsma( self, federation_context_id: str, idempotency_key: str, request_body: dict ): """ Instantiates an application on a partner OP zone. :param federation_context_id: Identifier of the federation context. :param idempotency_key: Idempotency key. :return: """ pass def get_deployed_app_gsma( self, federation_context_id: str, app_id: str, app_instance_id: str, zone_id: str, ): """ Retrieves an application instance details from partner OP. :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the app. :param app_instance_id: Identifier of the deployed app. :param zone_id: Identifier of the zone :return: """ pass def get_all_deployed_apps_gsma( self, federation_context_id: str, app_id: str, app_provider: str, ): """ Retrieves all application instance of partner OP :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the app. :param app_provider: App provider :return: """ pass def undeploy_app_gsma( self, federation_context_id: str, app_id: str, app_instance_id: str, zone_id: str, ): """ Terminate an application instance on a partner OP zone. :param federation_context_id: Identifier of the federation context. :param app_id: Identifier of the app. :param app_instance_id: Identifier of the deployed app. :param zone_id: Identifier of the zone :return: """ pass
src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py +101 −1 Original line number Diff line number Diff line Loading @@ -254,17 +254,92 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): # FederationManagement def get_edge_cloud_zones_list_gsma(self) -> List: url = "{}/zones/list".format(self.base_url) params = {} try: response = i2edge_get(url, params=params) if response.status_code == 200: response_json = response.json() response_list = [] for item in response_json: content = { "zoneId": item.get("zoneId"), "geolocation": item.get("geolocation"), "geographyDetails": item.get("geographyDetails"), } response_list.append(content) return self._build_custom_gsma_response( status_code=200, content=response_list, headers={"Content-Type": self.content_type_gsma}, encoding=self.encoding_gsma, url=response.url, request=response.request, ) return response except I2EdgeError as e: raise e def get_edge_cloud_zones_gsma(self, federation_context_id: str) -> Dict: url = "{}/zones".format(self.base_url) params = {} try: response = i2edge_get(url, params=params) if response.status_code == 200: response_json = response.json() response_list = [] for item in response_json: content = { "zoneId": item.get("zoneId"), "reservedComputeResources": item.get( "reservedComputeResources" ), "computeResourceQuotaLimits": item.get( "computeResourceQuotaLimits" ), "flavoursSupported": item.get("flavoursSupported"), "networkResources": item.get("networkResources"), "zoneServiceLevelObjsInfo": item.get( "zoneServiceLevelObjsInfo" ), } response_list.append(content) return self._build_custom_gsma_response( status_code=200, content=response_list, headers={"Content-Type": self.content_type_gsma}, encoding=self.encoding_gsma, url=response.url, request=response.request, ) return response except I2EdgeError as e: raise e # AvailabilityZoneInfoSynchronization def availability_zone_info_gsma( self, federation_context_id: str, request_body: dict ) -> Dict: url = "{}/zones".format(self.base_url) params = {} try: response = i2edge_get(url, params=params) if response.status_code == 200: content = {"acceptedZoneResourceInfo": response.json()} return self._build_custom_gsma_response( status_code=200, content=content, headers={"Content-Type": self.content_type_gsma}, encoding=self.encoding_gsma, url=response.url, request=response.request, ) return response except I2EdgeError as e: raise e def get_edge_cloud_zone_details_gsma( self, federation_context_id: str, zone_id: str ) -> Dict: Loading @@ -272,6 +347,30 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): params = {} try: response = i2edge_get(url, params=params) if response.status_code == 200: response_json = response.json() content = { "zoneId": response_json.get("zoneID"), "reservedComputeResources": response_json.get( "reservedComputeResources" ), "computeResourceQuotaLimits": response_json.get( "computeResourceQuotaLimits" ), "flavoursSupported": response_json.get("flavoursSupported"), "networkResources": response_json.get("networkResources"), "zoneServiceLevelObjsInfo": response_json.get( "zoneServiceLevelObjsInfo" ), } return self._build_custom_gsma_response( status_code=200, content=content, headers={"Content-Type": self.content_type_gsma}, encoding=self.encoding_gsma, url=response.url, request=response.request, ) return response except I2EdgeError as e: raise e Loading Loading @@ -444,11 +543,12 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): body = deepcopy(request_body) try: zone_id = body.get("zoneInfo").get("zoneId") flavour_id = body.get("zoneInfo").get("flavourId") app_deploy_data = schemas.AppDeployData( appId=body.get("appId"), appProviderId=body.get("appProviderId"), appVersion=body.get("appVersion"), zoneInfo=schemas.ZoneInfo(flavourId=self.flavour_id, zoneId=zone_id), zoneInfo=schemas.ZoneInfo(flavourId=flavour_id, zoneId=zone_id), ) payload = schemas.AppDeploy( app_deploy_data=app_deploy_data, app_parameters={"namespace": "test"} Loading
src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/client.py +202 −0 File changed.Preview size limit exceeded, changes collapsed. Show changes