diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 40698a06cb480e438c3fd2d1144eee15b3d721d7..cd4c94acd3a2366699444b884d9eab08bc2d7179 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -12,8 +12,8 @@ validate-mr: before_script: - echo "Running merge request validation..." - pip install -r requirements.txt - - pip install -e . - - pip install isort black flake8 pytest + - pip install -e . + - pip install isort black flake8 pytest script: - echo "Running linters..." - isort src tests --check --profile black --filter-files diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/client.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/client.py index 99dece8220e7abe78f9447223102c9921b69bd8a..3bea2d03a47e7de4fb290120f9abd1db38abeaeb 100644 --- a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/client.py +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/client.py @@ -5,33 +5,57 @@ # - Vasilis Pitsilis (vpitsilis@dat.demokritos.gr, vpitsilis@iit.demokritos.gr) # - Andreas Sakellaropoulos (asakellaropoulos@iit.demokritos.gr) ## +import json import uuid +from collections import defaultdict from typing import Any, Dict, List, Optional -import yaml +from pydantic import ValidationError from requests import Response from sunrise6g_opensdk.edgecloud.adapters.aeros import config from sunrise6g_opensdk.edgecloud.adapters.aeros.continuum_client import ContinuumClient +from sunrise6g_opensdk.edgecloud.adapters.aeros.converters import ( + aeros2gsma_zone_details, + camara2aeros_converter, + gsma2aeros_converter, +) +from sunrise6g_opensdk.edgecloud.adapters.aeros.errors import ( + InvalidArgumentError, + ResourceNotFoundError, +) +from sunrise6g_opensdk.edgecloud.adapters.aeros.storageManagement import inMemoryStorage +from sunrise6g_opensdk.edgecloud.adapters.aeros.storageManagement.appStorageManager import ( + AppStorageManager, +) +from sunrise6g_opensdk.edgecloud.adapters.aeros.utils import ( + encode_app_instance_name, + map_aeros_service_status_to_gsma, + urn_to_uuid, +) from sunrise6g_opensdk.edgecloud.adapters.errors import EdgeCloudPlatformError +from sunrise6g_opensdk.edgecloud.core import camara_schemas, gsma_schemas from sunrise6g_opensdk.edgecloud.core.edgecloud_interface import ( EdgeCloudManagementInterface, ) +from sunrise6g_opensdk.edgecloud.core.utils import build_custom_http_response from sunrise6g_opensdk.logger import setup_logger class EdgeApplicationManager(EdgeCloudManagementInterface): """ - aerOS Continuum Client - FIXME: Handle None responses from continuum client + aerOS Edge Application Manager Adapter implementing CAMARA and GSMA APIs. """ - def __init__(self, base_url: str, **kwargs): + def __init__(self, base_url: str, storage: Optional[AppStorageManager] = None, **kwargs): + """ + storage can + """ self.base_url = base_url self.logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE) - self._app_store: Dict[str, Dict] = {} - self._deployed_services: Dict[str, List[str]] = {} - self._stopped_services: Dict[str, List[str]] = {} + self.content_type_gsma = "application/json" + self.encoding_gsma = "utf-8" + self.storage = storage or inMemoryStorage.InMemoryAppStorage() # Overwrite config values if provided via kwargs if "aerOS_API_URL" in kwargs: @@ -48,32 +72,250 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): if not config.aerOS_HLO_TOKEN: raise ValueError("Missing 'aerOS_HLO_TOKEN'") - def onboard_app(self, app_manifest: Dict) -> Dict: + # ######################################################################## + # CAMARA EDGE CLOUD MANAGEMENT API + # ######################################################################## + + # ------------------------------------------------------------------------ + # Edge Cloud Zone Management (CAMARA) + # ------------------------------------------------------------------------ + + # Zones methods + def get_edge_cloud_zones( + self, region: Optional[str] = None, status: Optional[str] = None + ) -> Response: + """ + Retrieves a list of available Edge Cloud Zones. + + :param region: Filter by geographical region. + :param status: Filter by status (active, inactive, unknown). + :return: Response with list of Edge Cloud Zones in CAMARA format. + """ + try: + aeros_client = ContinuumClient(self.base_url) + ngsild_params = "type=Domain&format=simplified" + camara_response = aeros_client.query_entities(ngsild_params) + aeros_domains = camara_response.json() + if config.DEBUG: + self.logger.debug("aerOS edge cloud zones: %s", aeros_domains) + + zone_list = [] + for domain in aeros_domains: + domain_id = domain.get("id") + if not domain_id: + continue + + # Normalize status + raw_status = domain.get("domainStatus", "") + status_token = raw_status.split(":")[-1].strip().lower() + status = "Active" if status_token == "functional" else "Unknown" + + zone = { + "edgeCloudZoneId": str(urn_to_uuid(domain_id)), + "edgeCloudZoneName": domain_id, # or domain_id.split(":")[-1] if you prefer short name + "edgeCloudProvider": ( + domain.get("owner", ["unknown"])[0] + if isinstance(domain.get("owner"), list) + else domain.get("owner", "unknown") + ), + "status": status, + "geographyDetails": "NOT_USED", + } + zone_list.append(zone) + + # Store zones keyed by the aerOS domain id + self.storage.store_zones({d["edgeCloudZoneName"]: d for d in zone_list}) + if config.DEBUG: + self.logger.debug("aerOS Local domains store: %s", zone_list) + return build_custom_http_response( + status_code=camara_response.status_code, + content=zone_list, + headers={"Content-Type": "application/json"}, + encoding=camara_response.encoding, + url=camara_response.url, + request=camara_response.request, + ) + except json.JSONDecodeError as e: + self.logger.error("Invalid JSON in aerOS response: %s", e) + raise + except KeyError as e: + self.logger.error("Missing expected field in aerOS data: %s", e) + raise + except EdgeCloudPlatformError as e: + self.logger.error("Error retrieving edge cloud zones: %s", e) + raise + + def get_edge_cloud_zones_details(self, zone_id: str, flavour_id: Optional[str] = None) -> Dict: + """ + Get details of a specific edge cloud zone. + :param zone_id: The ID of the edge cloud zone + :param flavour_id: Optional flavour ID to filter the results + :return: Details of the edge cloud zone + """ + aeros_client = ContinuumClient(self.base_url) + ngsild_params = f'format=simplified&type=InfrastructureElement&q=domain=="{zone_id}"' + if config.DEBUG: + self.logger.debug( + "Querying infrastructure elements for zone %s with params: %s", + zone_id, + ngsild_params, + ) + try: + # Query the infrastructure elements for the specified zonese + aeros_response = aeros_client.query_entities(ngsild_params) + aeros_domain_ies = aeros_response.json() + # Transform the infrastructure elements into the required format + # and return the details of the edge cloud zone + camara_response = self.transform_infrastructure_elements( + domain_ies=aeros_domain_ies, domain=zone_id + ) + if config.DEBUG: + self.logger.debug("Transformed response: %s", camara_response) + # Return the transformed response + return build_custom_http_response( + status_code=aeros_response.status_code, + content=camara_response, + headers={"Content-Type": "application/json"}, + encoding=aeros_response.encoding, + url=aeros_response.url, + request=aeros_response.request, + ) + except json.JSONDecodeError as e: + self.logger.error("Invalid JSON in aerOS response: %s", e) + raise + except KeyError as e: + self.logger.error("Missing expected field in aerOS data: %s", e) + raise + except EdgeCloudPlatformError as e: + self.logger.error("Error retrieving edge cloud zones: %s", e) + raise + + def transform_infrastructure_elements( + self, domain_ies: List[Dict[str, Any]], domain: str + ) -> Dict[str, Any]: + """ + Transform the infrastructure elements into a format suitable for the + edge cloud zone details. + :param domain_ies: List of infrastructure elements + :param domain: The ID of the edge cloud zone + :return: Transformed details of the edge cloud zone + """ + total_cpu = 0 + total_ram = 0 + total_disk = 0 + total_available_ram = 0 + total_available_disk = 0 + + flavours_supported = [] + + for element in domain_ies: + total_cpu += element.get("cpuCores", 0) + total_ram += element.get("ramCapacity", 0) + total_available_ram += element.get("availableRam", 0) + total_disk += element.get("diskCapacity", 0) + total_available_disk += element.get("availableDisk", 0) + + # Create a flavour per machine + flavour = { + "flavourId": f"{element.get('hostname')}-{element.get('containerTechnology')}", + "cpuArchType": f"{element.get('cpuArchitecture')}", + "supportedOSTypes": [ + { + "architecture": f"{element.get('cpuArchitecture')}", + "distribution": f"{element.get('operatingSystem')}", # assume + "version": "OS_VERSION_UBUNTU_2204_LTS", + "license": "OS_LICENSE_TYPE_FREE", + } + ], + "numCPU": element.get("cpuCores", 0), + "memorySize": element.get("ramCapacity", 0), + "storageSize": element.get("diskCapacity", 0), + } + flavours_supported.append(flavour) + + result = { + "zoneId": domain, + "reservedComputeResources": [ + { + "cpuArchType": "ISA_X86_64", + "numCPU": int(total_cpu), + "memory": total_ram, + } + ], + "computeResourceQuotaLimits": [ + { + "cpuArchType": "ISA_X86_64", + "numCPU": int(total_cpu * 2), # Assume quota is 2x total? + "memory": total_ram * 2, + } + ], + "flavoursSupported": flavours_supported, + } + return result + + # ------------------------------------------------------------------------ + # Application Management (CAMARA-Compliant) + # ------------------------------------------------------------------------ + + # Onboarding methods + def onboard_app(self, app_manifest: Dict) -> Response: + # Validate CAMARA input + camara_schemas.AppManifest(**app_manifest) + app_id = app_manifest.get("appId") if not app_id: raise EdgeCloudPlatformError("Missing 'appId' in app manifest") - if app_id in self._app_store: + if self.storage.get_app(app_id=app_id): raise EdgeCloudPlatformError(f"Application with id '{app_id}' already exists") - self._app_store[app_id] = app_manifest + self.storage.store_app(app_id, app_manifest) self.logger.debug("Onboarded application with id: %s", app_id) - return {"appId": app_id} + submitted_app = camara_schemas.SubmittedApp(appId=camara_schemas.AppId(app_id)) + return build_custom_http_response( + status_code=201, + content=submitted_app.model_dump(mode="json"), + headers={"Content-Type": "application/json"}, + encoding="utf-8", + ) - def get_all_onboarded_apps(self) -> List[Dict]: - self.logger.debug("Onboarded applications: %s", list(self._app_store.keys())) - return list(self._app_store.values()) + def get_all_onboarded_apps(self) -> Response: + apps = self.storage.list_apps() + self.logger.debug("Onboarded applications: %s", apps) + return build_custom_http_response( + status_code=200, + content=apps, + headers={"Content-Type": "application/json"}, + encoding="utf-8", + ) - def get_onboarded_app(self, app_id: str) -> Dict: - if app_id not in self._app_store: + def get_onboarded_app(self, app_id: str) -> Response: + app_data = self.storage.get_app(app_id) + if not app_data: raise EdgeCloudPlatformError(f"Application with id '{app_id}' does not exist") self.logger.debug("Retrieved application with id: %s", app_id) - return self._app_store[app_id] - def delete_onboarded_app(self, app_id: str) -> None: - if app_id not in self._app_store: + app_manifest_response = { + "appManifest": app_data + } # We already keep the app manifest when onboarding + + return build_custom_http_response( + status_code=200, + content=app_manifest_response, + headers={"Content-Type": "application/json"}, + encoding="utf-8", + ) + + def delete_onboarded_app(self, app_id: str) -> Response: + app = self.storage.get_app(app_id) + if not app: raise EdgeCloudPlatformError(f"Application with id '{app_id}' does not exist") - service_instances = self._stopped_services.get(app_id, []) + + service_instances = self.storage.get_stopped_instances(app_id=app_id) + if not service_instances: + raise EdgeCloudPlatformError( + f"Application with id '{app_id}' cannot be deleted — please stop it first" + ) self.logger.debug( "Deleting application with id: %s and instances: %s", app_id, @@ -82,147 +324,142 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): for service_instance in service_instances: self._purge_deployed_app_from_continuum(service_instance) self.logger.debug("successfully purged service instance: %s", service_instance) - del self._stopped_services[app_id] # Clean up stopped services - del self._app_store[app_id] # Remove from onboarded apps - - def _generate_service_id(self, app_id: str) -> str: - return f"urn:ngsi-ld:Service:{app_id}-{uuid.uuid4().hex[:4]}" - - def _generate_tosca_yaml_dict(self, app_manifest: Dict, app_zones: List[Dict]) -> Dict: - component = app_manifest.get("componentSpec", [{}])[0] - component_name = component.get("componentName", "application") - - image_path = app_manifest.get("appRepo", {}).get("imagePath", "") - image_file = image_path.split("/")[-1] - repository_url = "/".join(image_path.split("/")[:-1]) if "/" in image_path else "docker_hub" - zone_id = app_zones[0].get("EdgeCloudZone", {}).get("edgeCloudZoneId", "default-zone") - - # Extract minNodeMemory - min_node_memory = ( - app_manifest.get("requiredResources", {}) - .get("applicationResources", {}) - .get("cpuPool", {}) - .get("topology", {}) - .get("minNodeMemory", 1024) - ) - ports = {} - for iface in component.get("networkInterfaces", []): - interface_id = iface.get("interfaceId", "default") - protocol = iface.get("protocol", "TCP").lower() - port = iface.get("port", 8080) - ports[interface_id] = {"properties": {"protocol": [protocol], "source": port}} + self.storage.remove_stopped_instances(app_id) + self.storage.delete_app(app_id) - expose_ports = any( - iface.get("visibilityType") == "VISIBILITY_EXTERNAL" - for iface in component.get("networkInterfaces", []) + return build_custom_http_response( + status_code=204, + content=b"", # absolutely no body for 204 + headers={"Content-Type": "application/json"}, + encoding="utf-8", + # url=None, + # request=None, ) - yaml_dict = { - "tosca_definitions_version": "tosca_simple_yaml_1_3", - "description": f"TOSCA for {app_manifest.get('name', 'application')}", - "serviceOverlay": False, - "node_templates": { - component_name: { - "type": "tosca.nodes.Container.Application", - "isJob": False, - "requirements": [ - { - "network": { - "properties": { - "ports": ports, - "exposePorts": expose_ports, - } - } - }, - { - "host": { - "node_filter": { - "capabilities": [ - { - "host": { - "properties": { - "cpu_arch": {"equal": "x64"}, - "realtime": {"equal": False}, - "cpu_usage": {"less_or_equal": "0.1"}, - "mem_size": { - "greater_or_equal": str(min_node_memory) - }, - "domain_id": {"equal": zone_id}, - } - } - } - ], - "properties": None, - } - } - }, - ], - "artifacts": { - "application_image": { - "file": image_file, - "type": "tosca.artifacts.Deployment.Image.Container.Docker", - "is_private": False, - "repository": repository_url, - } - }, - "interfaces": { - "Standard": { - "create": { - "implementation": "application_image", - "inputs": {"cliArgs": [], "envVars": []}, - } - } - }, - } - }, - } + def _generate_service_id(self, app_id: str) -> str: + """ + Generate a unique service ID for aerOS continuum. + The service ID is in the format of a NGSI-LD URN with a random suffix. + :param app_id: The application ID + :return: The generated service ID + """ + return f"{app_id}-{uuid.uuid4().hex[:4]}" - return yaml_dict + def _generate_aeros_service_id(self, camara_app_instance_id: str) -> str: + """ + Convert CAMARA appInstanceId to aerOS service ID. + :param camara_app_instance_id: The CAMARA appInstanceId + :return: The corresponding aerOS service ID + """ + return f"urn:ngsi-ld:Service:{camara_app_instance_id}" - def deploy_app(self, app_id: str, app_zones: List[Dict]) -> Dict: + # Instantiation methods + def deploy_app(self, app_id: str, app_zones: List[Dict]) -> Response: # 1. Get app CAMARA manifest - app_manifest = self._app_store.get(app_id) + app_manifest = self.storage.get_app(app_id) if not app_manifest: raise EdgeCloudPlatformError(f"Application with id '{app_id}' does not exist") + app_manifest = camara_schemas.AppManifest.model_validate(app_manifest) # 2. Generate unique service ID + # (aerOS) service id <=> CAMARA appInstanceId service_id = self._generate_service_id(app_id) # 3. Convert dict to YAML string - yaml_dict = self._generate_tosca_yaml_dict(app_manifest, app_zones) - tosca_yaml = yaml.dump(yaml_dict, sort_keys=False) - self.logger.info("Generated TOSCA YAML:") - self.logger.info(tosca_yaml) + # 3a. Get aerOS domain IDs from zones uuids + aeros_domain_ids = [ + self.storage.resolve_domain_id_by_zone_uuid(z["EdgeCloudZone"]["edgeCloudZoneId"]) + for z in app_zones + if z.get("EdgeCloudZone", {}).get("edgeCloudZoneId") + ] + tosca_str = camara2aeros_converter.generate_tosca( + app_manifest=app_manifest, app_zones=aeros_domain_ids + ) + if config.DEBUG: + self.logger.info("Generated TOSCA YAML:") + self.logger.info(tosca_str) # 4. Instantiate client and call continuum to deploy service - aeros_client = ContinuumClient(self.base_url) - response = aeros_client.onboard_and_deploy_service(service_id, tosca_yaml) - - if "serviceId" not in response: - raise EdgeCloudPlatformError( - "Invalid response from onboard_service: missing 'serviceId'" + try: + aeros_client = ContinuumClient(self.base_url) + aeros_response = aeros_client.onboard_and_deploy_service( + self._generate_aeros_service_id(service_id), tosca_str ) - # 5. Track deployment - if app_id not in self._deployed_services: - self._deployed_services[app_id] = [] - self._deployed_services[app_id].append(service_id) + if "serviceId" not in aeros_response.json(): + raise EdgeCloudPlatformError( + "Invalid response from onboard_service: missing 'serviceId'" + ) + + # Build CAMARA-compliant info + app_provider_id = app_manifest.appProvider.root + zone_id = app_zones[0].get("EdgeCloudZone", {}).get("edgeCloudZoneId", "default-zone") + app_instance_info = camara_schemas.AppInstanceInfo( + name=camara_schemas.AppInstanceName(encode_app_instance_name(service_id)), + appId=camara_schemas.AppId(app_id), + appInstanceId=camara_schemas.AppInstanceId(service_id), + appProvider=camara_schemas.AppProvider(app_provider_id), + status=camara_schemas.Status.instantiating, + edgeCloudZoneId=camara_schemas.EdgeCloudZoneId(zone_id), + ) - # 6. Return expected format - return {"appInstanceId": response["serviceId"]} + # 5. Track deployment + self.storage.store_deployment(app_instance=app_instance_info) + + # 6. Return expected format + self.logger.info("App deployment request submitted successfully") + + # CAMARA spec requires appInstances array wrapper + camara_response = app_instance_info.model_dump(mode="json") + # Add mandatory Location header + location_url = f"/appinstances/{service_id}" + camara_headers = {"Content-Type": "application/json", "Location": location_url} + + return build_custom_http_response( + status_code=aeros_response.status_code, + content=camara_response, + headers=camara_headers, + encoding="utf-8", + url=aeros_response.url, + request=aeros_response.request, + ) + except EdgeCloudPlatformError as ex: + # Catch all platform-specific errors. + # All custom exception types (InvalidArgumentError, UnauthenticatedError, etc.) + # inherit from EdgeCloudPlatformError, so a single handler here will capture + # any of them. We can further elaborate per eachone of needed. + self.logger.error("Failed to deploy app '%s': %s", app_id, str(ex)) + raise def get_all_deployed_apps( self, app_id: Optional[str] = None, app_instance_id: Optional[str] = None, region: Optional[str] = None, - ) -> List[Dict]: - deployed = [] - for stored_app_id, instance_ids in self._deployed_services.items(): - for instance_id in instance_ids: - deployed.append({"appId": stored_app_id, "appInstanceId": instance_id}) - return deployed + ) -> Response: + + instances = self.storage.find_deployments(app_id, app_instance_id, region) + + # CAMARA spec format for multiple instances response + camara_response = { + "appInstances": [ + inst.model_dump(mode="json") if hasattr(inst, "model_dump") else inst + for inst in instances + ] + } + + self.logger.info("All app instances retrieved successfully") + if config.DEBUG: + self.logger.debug("Onboarded applications: %s", camara_response) + return build_custom_http_response( + status_code=200, + content=camara_response, + headers={"Content-Type": "application/json"}, + encoding="utf-8", + # url=response.url, + # request=response.request, + ) def get_deployed_app( self, app_instance_id: str, app_id: Optional[str] = None, region: Optional[str] = None @@ -236,28 +473,78 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param region: Optional filter by Edge Cloud region :return: Response with application instance details """ - # TODO: Implement actual aeros-specific logic for retrieving a specific deployed app - raise NotImplementedError("get_deployed_app is not yet implemented for aeros adapter") + try: + if not app_instance_id: + raise InvalidArgumentError("app_instance_id is required") + + # Look up the instance in CAMARA storage (returns List[AppInstanceInfo]) + matches = self.storage.find_deployments( + app_id=app_id, + app_instance_id=app_instance_id, + region=region, + ) + if not matches: + # Be explicit in the error so callers know what was used to filter + scope = [] + scope.append(f"instance_id={app_instance_id}") + if app_id: + scope.append(f"app_id={app_id}") + if region: + scope.append(f"region={region}") + raise ResourceNotFoundError(f"Deployed app not found ({', '.join(scope)})") + + # If multiple matched (shouldn't normally happen after filtering by instance id), + # return the first deterministically. + inst = matches[0] + + # Serialize to JSON-safe dict + content = {"appInstance": inst.model_dump(mode="json")} + + return build_custom_http_response( + status_code=200, + content=content, + headers={"Content-Type": "application/json"}, + encoding="utf-8", + ) + + except (InvalidArgumentError, ResourceNotFoundError): + # Let well-typed domain errors propagate + raise + except EdgeCloudPlatformError: + raise + except Exception as e: + # Defensive catch-all with context + self.logger.exception( + "Unhandled error retrieving deployed app instance '%s' (app_id=%s, region=%s): %s", + app_instance_id, + app_id, + region, + e, + ) + raise EdgeCloudPlatformError(str(e)) def _purge_deployed_app_from_continuum(self, app_id: str) -> None: + """ + Purge the deployed application from aerOS continuum. + :param app_id: The application ID to purge + All instances of this app should be stopped + """ aeros_client = ContinuumClient(self.base_url) - response = aeros_client.purge_service(app_id) + response = aeros_client.purge_service(self._generate_aeros_service_id(app_id)) if response: - self.logger.debug("Purged deployed application with id: %s", app_id) + self.logger.debug( + "Purged deployed application with id: %s", self._generate_aeros_service_id(app_id) + ) else: raise EdgeCloudPlatformError( - f"Failed to purg service with id from the continuum '{app_id}'" + f"Failed to purge service with id from the continuum '{app_id}'" ) - def undeploy_app(self, app_instance_id: str) -> None: - # 1. Locate app_id corresponding to this instance - found_app_id = None - for app_id, instances in self._deployed_services.items(): - if app_instance_id in instances: - found_app_id = app_id - break - - if not found_app_id: + def undeploy_app(self, app_instance_id: str) -> Response: + # 1. Locate app_id corresponding to this instance and + # remove from deployed instances for this appId + app_id = self.storage.remove_deployment(app_instance_id=app_instance_id) + if not app_id: raise EdgeCloudPlatformError( f"No deployed app instance with ID '{app_instance_id}' found" ) @@ -265,7 +552,9 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): # 2. Call the external undeploy_service aeros_client = ContinuumClient(self.base_url) try: - aeros_client.undeploy_service(app_instance_id) + aeros_response = aeros_client.undeploy_service( + self._generate_aeros_service_id(app_instance_id) + ) except Exception as e: raise EdgeCloudPlatformError( f"Failed to undeploy app instance '{app_instance_id}': {str(e)}" @@ -276,187 +565,162 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): # self._purge_deployed_app_from_continuum(app_instance_id) # 4. Clean up internal tracking - self._deployed_services[found_app_id].remove(app_instance_id) - # Add instance to _stopped_services to purge it later - if found_app_id not in self._stopped_services: - self._stopped_services[found_app_id] = [] - self._stopped_services[found_app_id].append(app_instance_id) - # If app has no instances left, remove it from deployed services - if not self._deployed_services[found_app_id]: - del self._deployed_services[found_app_id] - - def get_edge_cloud_zones( - self, region: Optional[str] = None, status: Optional[str] = None - ) -> List[Dict]: - aeros_client = ContinuumClient(self.base_url) - ngsild_params = "type=Domain&format=simplified" - aeros_domains = aeros_client.query_entities(ngsild_params) - return [ - { - "zoneId": domain["id"], - "status": domain["domainStatus"].split(":")[-1].lower(), - "geographyDetails": "NOT_USED", - } - for domain in aeros_domains - ] - - def get_edge_cloud_zones_details(self, zone_id: str, flavour_id: Optional[str] = None) -> Dict: - """ - Get details of a specific edge cloud zone. - :param zone_id: The ID of the edge cloud zone - :param flavour_id: Optional flavour ID to filter the results - :return: Details of the edge cloud zone - """ - # Minimal mocked response based on required fields of 'ZoneRegisteredData' in GSMA OPG E/WBI API - # return { - # "zoneId": - # zone_id, - # "reservedComputeResources": [{ - # "cpuArchType": "ISA_X86_64", - # "numCPU": "4", - # "memory": 8192, - # }], - # "computeResourceQuotaLimits": [{ - # "cpuArchType": "ISA_X86_64", - # "numCPU": "8", - # "memory": 16384, - # }], - # "flavoursSupported": [{ - # "flavourId": - # "medium-x86", - # "cpuArchType": - # "ISA_X86_64", - # "supportedOSTypes": [{ - # "architecture": "x86_64", - # "distribution": "UBUNTU", - # "version": "OS_VERSION_UBUNTU_2204_LTS", - # "license": "OS_LICENSE_TYPE_FREE", - # }], - # "numCPU": - # 4, - # "memorySize": - # 8192, - # "storageSize": - # 100, - # }], - # # - # } - aeros_client = ContinuumClient(self.base_url) - ngsild_params = f'format=simplified&type=InfrastructureElement&q=domain=="{zone_id}"' - self.logger.debug( - "Querying infrastructure elements for zone %s with params: %s", - zone_id, - ngsild_params, - ) - # Query the infrastructure elements for the specified zonese - aeros_domain_ies = aeros_client.query_entities(ngsild_params) - # Transform the infrastructure elements into the required format - # and return the details of the edge cloud zone - response = self.transform_infrastructure_elements( - domain_ies=aeros_domain_ies, domain=zone_id + self.storage.remove_deployment(app_instance_id) + self.storage.store_stopped_instance(app_id, app_instance_id) + return build_custom_http_response( + status_code=204, + content="", + headers={"Content-Type": "application/json"}, + encoding="utf-8", + url=aeros_response.url, + request=aeros_response.request, ) - self.logger.debug("Transformed response: %s", response) - # Return the transformed response - return response - - def transform_infrastructure_elements( - self, domain_ies: List[Dict[str, Any]], domain: str - ) -> Dict[str, Any]: - """ - Transform the infrastructure elements into a format suitable for the - edge cloud zone details. - :param domain_ies: List of infrastructure elements - :param domain: The ID of the edge cloud zone - :return: Transformed details of the edge cloud zone - """ - total_cpu = 0 - total_ram = 0 - total_disk = 0 - total_available_ram = 0 - total_available_disk = 0 - - flavours_supported = [] - - for element in domain_ies: - total_cpu += element.get("cpuCores", 0) - total_ram += element.get("ramCapacity", 0) - total_available_ram += element.get("availableRam", 0) - total_disk += element.get("diskCapacity", 0) - total_available_disk += element.get("availableDisk", 0) - - # Create a flavour per machine - flavour = { - "flavourId": f"{element.get('hostname')}-{element.get('containerTechnology')}", - "cpuArchType": f"{element.get('cpuArchitecture')}", - "supportedOSTypes": [ - { - "architecture": f"{element.get('cpuArchitecture')}", - "distribution": f"{element.get('operatingSystem')}", # assume - "version": "OS_VERSION_UBUNTU_2204_LTS", - "license": "OS_LICENSE_TYPE_FREE", - } - ], - "numCPU": element.get("cpuCores", 0), - "memorySize": element.get("ramCapacity", 0), - "storageSize": element.get("diskCapacity", 0), - } - flavours_supported.append(flavour) - result = { - "zoneId": domain, - "reservedComputeResources": [ - { - "cpuArchType": "ISA_X86_64", - "numCPU": str(total_cpu), - "memory": total_ram, - } - ], - "computeResourceQuotaLimits": [ - { - "cpuArchType": "ISA_X86_64", - "numCPU": str(total_cpu * 2), # Assume quota is 2x total? - "memory": total_ram * 2, - } - ], - "flavoursSupported": flavours_supported, - } - return result + # ######################################################################## + # GSMA EDGE COMPUTING API (EWBI OPG) - FEDERATION + # ######################################################################## - # --- GSMA-specific methods --- + # ------------------------------------------------------------------------ + # Zone Management (GSMA) + # ------------------------------------------------------------------------ - # FederationManagement - - def get_edge_cloud_zones_list_gsma(self) -> List: + def get_edge_cloud_zones_list_gsma(self) -> Response: """ - Retrieves list of all Zones + Retrieves details of all Zones for GSMA federation. - :return: List. + :return: Response with zone details in GSMA format. """ - pass + try: + aeros_client = ContinuumClient(self.base_url) + ngsild_params = "type=Domain&format=simplified" + aeros_response = aeros_client.query_entities(ngsild_params) + aeros_domains = aeros_response.json() + zone_list = [ + { + "zoneId": domain["id"], + "geolocation": "NOT_Available", + "geographyDetails": domain["description"], + } + for domain in aeros_domains + ] + return build_custom_http_response( + status_code=aeros_response.status_code, + content=zone_list, + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=aeros_response.url, + request=aeros_response.request, + ) + except json.JSONDecodeError as e: + self.logger.error("Invalid JSON in aerOS response: %s", e) + raise + except KeyError as e: + self.logger.error("Missing expected field in aerOS data: %s", e) + raise + except EdgeCloudPlatformError as e: + self.logger.error("Error retrieving edge cloud zones: %s", e) + raise # AvailabilityZoneInfoSynchronization - def get_edge_cloud_zones_gsma(self) -> List: + def get_edge_cloud_zones_gsma(self) -> Response: """ - Retrieves details of all Zones + Retrieves details of all Zones with compute resources and flavours for GSMA federation. - :return: List. + :return: Response with zones and detailed resource information. """ - pass + aeros_client = ContinuumClient(self.base_url) + ngsild_params = "format=simplified&type=InfrastructureElement" - def get_edge_cloud_zone_details_gsma(self, zone_id: str) -> Dict: + try: + # Query the infrastructure elements whithin the whole continuum + aeros_response = aeros_client.query_entities(ngsild_params) + aeros_ies = aeros_response.json() # IEs as list of dicts + + # Create a dict that groups by "domain" + grouped_by_domain = defaultdict(list) + for item in aeros_ies: + domain = item["domain"] + grouped_by_domain[domain].append(item) + + # Transform the IEs to required format + # per domain and append to response list + gsma_response = [] + for domain, ies in grouped_by_domain.items(): + result = aeros2gsma_zone_details.transformer(domain_ies=ies, domain=domain) + gsma_response.append(result) + # Return the transformed response + return build_custom_http_response( + status_code=aeros_response.status_code, + content=gsma_response, + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=aeros_response.url, + request=aeros_response.request, + ) + except json.JSONDecodeError as e: + self.logger.error("Invalid JSON in aerOS response: %s", e) + raise + except KeyError as e: + self.logger.error("Missing expected field in aerOS data: %s", e) + raise + except EdgeCloudPlatformError as e: + self.logger.error("Error retrieving edge cloud zones: %s", e) + raise + + def get_edge_cloud_zone_details_gsma(self, zone_id: str) -> Response: """ Retrieves details of a specific Edge Cloud Zone reserved - for the specified zone by the partner OP. + for the specified zone by the partner OP using GSMA federation. :param zone_id: Unique identifier of the Edge Cloud Zone. - :return: Dictionary with Edge Cloud Zone details. + :return: Response with Edge Cloud Zone details. """ - pass - - # ArtefactManagement - - def create_artefact_gsma(self, request_body: dict): + aeros_client = ContinuumClient(self.base_url) + ngsild_params = f'format=simplified&type=InfrastructureElement&q=domain=="{zone_id}"' + if config.DEBUG: + self.logger.debug( + "Querying infrastructure elements for zone %s with params: %s", + zone_id, + ngsild_params, + ) + try: + # Query the infrastructure elements for the specified zonese + aeros_response = aeros_client.query_entities(ngsild_params) + aeros_domain_ies = aeros_response.json() + # Transform the infrastructure elements into the required format + # and return the details of the edge cloud zone + # camara_response = self.transform_infrastructure_elements( + # domain_ies=aeros_domain_ies, domain=zone_id) + gsma_response = aeros2gsma_zone_details.transformer( + domain_ies=aeros_domain_ies, domain=zone_id + ) + if config.DEBUG: + self.logger.debug("Transformed response: %s", gsma_response) + # Return the transformed response + return build_custom_http_response( + status_code=aeros_response.status_code, + content=gsma_response, + headers={"Content-Type": "application/json"}, + encoding=aeros_response.encoding, + url=aeros_response.url, + request=aeros_response.request, + ) + except json.JSONDecodeError as e: + self.logger.error("Invalid JSON in aerOS response: %s", e) + raise + except KeyError as e: + self.logger.error("Missing expected field in aerOS data: %s", e) + raise + except EdgeCloudPlatformError as e: + self.logger.error("Error retrieving edge cloud zones: %s", e) + raise + + # ------------------------------------------------------------------------ + # Artefact Management (GSMA) + # ------------------------------------------------------------------------ + + def create_artefact_gsma(self, request_body: dict) -> Response: """ Uploads application artefact on partner OP. Artefact is a zip file containing scripts and/or packaging files like Terraform or Helm @@ -465,27 +729,79 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param request_body: Payload with artefact information. :return: """ - pass + try: + artefact = gsma_schemas.Artefact.model_validate(request_body) + self.storage.store_artefact_gsma(artefact) + return build_custom_http_response( + status_code=201, + content=artefact.model_dump(mode="json"), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) + except ValidationError as e: + self.logger.error("Invalid GSMA artefact schema: %s", e) + raise InvalidArgumentError(str(e)) - def get_artefact_gsma(self, artefact_id: str) -> Dict: + def get_artefact_gsma(self, artefact_id: str) -> Response: """ Retrieves details about an artefact :param artefact_id: Unique identifier of the artefact. :return: Dictionary with artefact details. """ - pass + art = self.storage.get_artefact_gsma(artefact_id) + if not art: + raise ResourceNotFoundError(f"GSMA artefact '{artefact_id}' not found") + return build_custom_http_response( + status_code=200, + content=art.model_dump(mode="json"), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) + + def list_artefacts_gsma(self): + """List all GSMA Artefacts.""" + arts = [a.model_dump(mode="json") for a in self.storage.list_artefacts_gsma()] + return build_custom_http_response( + status_code=200, + content=arts, + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) - def delete_artefact_gsma(self, artefact_id: str): + def delete_artefact_gsma(self, artefact_id: str) -> Response: """ Removes an artefact from partners OP. :param artefact_id: Unique identifier of the artefact. :return: """ - pass - - # ApplicationOnboardingManagement + if not self.storage.get_artefact_gsma(artefact_id): + raise ResourceNotFoundError(f"GSMA artefact '{artefact_id}' not found") + self.storage.delete_artefact_gsma(artefact_id) + return build_custom_http_response(status_code=204, content=b"", headers={}, encoding=None) + + # ------------------------------------------------------------------------ + # Application Onboarding Management (GSMA) + # ------------------------------------------------------------------------ + + def _to_application_model( + self, entry: gsma_schemas.AppOnboardManifestGSMA + ) -> gsma_schemas.ApplicationModel: + """Internal helper to convert GSMA onboarding entry into canonical ApplicationModel.""" + zones = [ + gsma_schemas.AppDeploymentZone(countryCode="XX", zoneInfo=z) + for z in entry.appDeploymentZones + ] + return gsma_schemas.ApplicationModel( + appId=entry.appId, + appProviderId=entry.appProviderId, + appDeploymentZones=zones, + appMetaData=entry.appMetaData, + appQoSProfile=entry.appQoSProfile, + appComponentSpecs=entry.appComponentSpecs, + onboardStatusInfo="ONBOARDED", + ) def onboard_app_gsma(self, request_body: dict): """ @@ -494,9 +810,39 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): resource validation and other pre-deployment operations. :param request_body: Payload with onboarding info. - :return: + :return: Response with onboarding confirmation. """ - pass + try: + # Validate input against GSMA schema + entry = gsma_schemas.AppOnboardManifestGSMA.model_validate(request_body) + except ValidationError as e: + self.logger.error("Invalid GSMA input schema: %s", e) + raise InvalidArgumentError(str(e)) + + try: + # Convert to ApplicationModel (canonical onboarded representation) + app_model = self._to_application_model(entry) + + # Ensure uniqueness + if self.storage.get_app_gsma(app_model.appId): + raise InvalidArgumentError(f"GSMA app '{app_model.appId}' already exists") + + # Store in GSMA apps storage + self.storage.store_app_gsma(app_model.appId, app_model) + + # Build and return confirmation response + return build_custom_http_response( + status_code=201, + content=app_model.model_dump(mode="json"), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) + except EdgeCloudPlatformError as e: + self.logger.error("Error during GSMA app onboarding: %s", e) + raise + except Exception as e: + self.logger.exception("Unhandled error during GSMA onboarding: %s", e) + raise EdgeCloudPlatformError(str(e)) def get_onboarded_app_gsma(self, app_id: str) -> Dict: """ @@ -505,29 +851,151 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param app_id: Identifier of the application onboarded. :return: Dictionary with application details. """ - pass + try: + app = self.storage.get_app_gsma(app_id) + if not app: + raise ResourceNotFoundError(f"GSMA app '{app_id}' not found") + + return build_custom_http_response( + status_code=200, + content=app.model_dump(mode="json"), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) + except EdgeCloudPlatformError as e: + self.logger.error("Error retrieving GSMA app '%s': %s", app_id, e) + raise + except Exception as e: + self.logger.exception("Unhandled error retrieving GSMA app '%s': %s", app_id, e) + raise EdgeCloudPlatformError(str(e)) def patch_onboarded_app_gsma(self, app_id: str, request_body: dict): """ Updates partner OP about changes in application compute resource requirements, - QOS Profile, associated descriptor or change in associated components + QOS Profile, associated descriptor or change in associated components. :param app_id: Identifier of the application onboarded. :param request_body: Payload with updated onboarding info. - :return: + :return: Response with updated application details. """ - pass + try: + patch = gsma_schemas.PatchOnboardedAppGSMA.model_validate(request_body) + except ValidationError as e: + self.logger.error("Invalid GSMA patch schema: %s", e) + raise InvalidArgumentError(str(e)) + + try: + app = self.storage.get_app_gsma(app_id) + if not app: + raise ResourceNotFoundError(f"GSMA app '{app_id}' not found") + + upd = patch.appUpdQoSProfile + + # Update QoS profile fields + if upd.latencyConstraints is not None: + app.appQoSProfile.latencyConstraints = upd.latencyConstraints + if upd.bandwidthRequired is not None: + app.appQoSProfile.bandwidthRequired = upd.bandwidthRequired + if upd.multiUserClients is not None: + app.appQoSProfile.multiUserClients = upd.multiUserClients + if upd.noOfUsersPerAppInst is not None: + app.appQoSProfile.noOfUsersPerAppInst = upd.noOfUsersPerAppInst + if upd.appProvisioning is not None: + app.appQoSProfile.appProvisioning = upd.appProvisioning + + # mobilitySupport lives under AppMetaData + if upd.mobilitySupport is not None: + app.appMetaData.mobilitySupport = upd.mobilitySupport + + # Replace component specs if provided + if patch.appComponentSpecs: + app.appComponentSpecs = [ + gsma_schemas.AppComponentSpec( + serviceNameNB=p.serviceNameNB, + serviceNameEW=p.serviceNameEW, + componentName=p.componentName, + artefactId=p.artefactId, + ) + for p in patch.appComponentSpecs + ] + + # Persist updated model + self.storage.store_app_gsma(app_id, app) + + return build_custom_http_response( + status_code=200, + content=app.model_dump(mode="json"), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) + except EdgeCloudPlatformError as e: + self.logger.error("Error updating GSMA app '%s': %s", app_id, e) + raise + except Exception as e: + self.logger.exception("Unhandled error patching GSMA app '%s': %s", app_id, e) + raise EdgeCloudPlatformError(str(e)) def delete_onboarded_app_gsma(self, app_id: str): """ - Deboards an application from specific partner OP zones + Deboards an application from specific partner OP zones. :param app_id: Identifier of the application onboarded. - :return: + :return: 204 No Content on success. """ - pass + try: + if not self.storage.get_app_gsma(app_id): + raise ResourceNotFoundError(f"GSMA app '{app_id}' not found") + + # CHECKME: update for GSMA + service_instances = self.storage.get_stopped_instances_gsma(app_id=app_id) + if not service_instances: + raise EdgeCloudPlatformError( + f"Application with id '{app_id}' cannot be deleted — please stop it first" + ) + self.logger.debug( + "Deleting application with id: %s and instances: %s", + app_id, + service_instances, + ) + for service_instance in service_instances: + self._purge_deployed_app_from_continuum_gsma(service_instance) + self.logger.debug("successfully purged service instance: %s", service_instance) + + self.storage.remove_stopped_instances_gsma(app_id) + + self.storage.delete_app_gsma(app_id) + + return build_custom_http_response( + status_code=204, + content=b"", + headers={}, + encoding=None, + ) + except EdgeCloudPlatformError as e: + self.logger.error("Error deleting GSMA app '%s': %s", app_id, e) + raise + except Exception as e: + self.logger.exception("Unhandled error deleting GSMA app '%s': %s", app_id, e) + raise EdgeCloudPlatformError(str(e)) + + def _purge_deployed_app_from_continuum_gsma(self, app_instance_id: str) -> None: + """ + Purge the deployed application from aerOS continuum. + :param app_id: The application ID to purge + All instances of this app should be stopped + """ + aeros_client = ContinuumClient(self.base_url) + response = aeros_client.purge_service(app_instance_id) + if response: + self.logger.debug("Purged deployed application with id: %s", app_instance_id) + else: + raise EdgeCloudPlatformError( + f"Failed to purge service with id from the continuum '{app_instance_id}'" + ) - # ApplicationDeploymentManagement + # ------------------------------------------------------------------------ + # Application Deployment Management (GSMA) + # ------------------------------------------------------------------------ def deploy_app_gsma(self, request_body: dict) -> Dict: """ @@ -536,7 +1004,76 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param request_body: Payload with deployment info. :return: Dictionary with deployment details. """ - pass + try: + payload = gsma_schemas.AppDeployPayloadGSMA.model_validate(request_body) + except ValidationError as e: + self.logger.error("Invalid GSMA deploy schema: %s", e) + raise InvalidArgumentError(str(e)) + + try: + # Ensure app exists + onboarded_app = self.storage.get_app_gsma(payload.appId) + if not onboarded_app: + raise ResourceNotFoundError(f"GSMA app '{payload.appId}' not found") + + # 2. Generate unique service ID + # (aerOS) service id <=> GSMA appInstanceId + service_id = self._generate_aeros_service_id( + self._generate_service_id(onboarded_app.appId) + ) + + # 3. Create TOSCA (yaml str) from GSMA onboarded_app + connected artefacts + # GSMA app corresponds to aerOS Service + # Each GSMA AppComponentSpec references an artefact which is mapped to aerOS Service Component + tosca_yaml = gsma2aeros_converter.generate_tosca_from_gsma_with_artefacts( + app_model=onboarded_app, + zone_id=payload.zoneInfo.zoneId, + artefact_resolver=self.storage.get_artefact_gsma, # cleaner + ) + self.logger.info("Generated TOSCA YAML:") + self.logger.info(tosca_yaml) + + # 4. Instantiate client and call continuum to deploy servic + aeros_client = ContinuumClient(self.base_url) + aeros_response = aeros_client.onboard_and_deploy_service( + service_id, tosca_str=tosca_yaml + ) + + if "serviceId" not in aeros_response.json(): + raise EdgeCloudPlatformError( + "Invalid response from onboard_service: missing 'serviceId'" + ) + + # 5. Track deployment (Store in GSMA deployment store) + # Build AppInstance and optional status (if you want to persist status later) + inst = gsma_schemas.AppInstance( + zoneId=payload.zoneInfo.zoneId, + appInstIdentifier=service_id, + ) + status = gsma_schemas.AppInstanceStatus( + appInstanceState="PENDING", + accesspointInfo=[], + ) + + self.storage.store_deployment_gsma(onboarded_app.appId, inst, status=status) + + # 6. Return expected format (deployment details) + body = inst.model_dump(mode="json") + + return build_custom_http_response( + status_code=202, + content=body, + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=aeros_response.json().get("url", ""), + request=aeros_response.request, + ) + except EdgeCloudPlatformError as ex: + self.logger.error("Failed to deploy app '%s': %s", onboarded_app.appId, str(ex)) + raise + except Exception as e: + self.logger.exception("Unhandled error during GSMA deploy: %s", e) + raise EdgeCloudPlatformError(str(e)) def get_deployed_app_gsma(self, app_id: str, app_instance_id: str, zone_id: str) -> Dict: """ @@ -547,7 +1084,49 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param zone_id: Identifier of the zone :return: Dictionary with application instance details """ - pass + try: + # Ensure app exists + if not self.storage.get_app_gsma(app_id): + raise ResourceNotFoundError(f"GSMA app '{app_id}' not found") + + # 4. Instantiate client and call continuum to deploy servic + aeros_client = ContinuumClient(self.base_url) + aeros_response = aeros_client.query_entity( + entity_id=app_instance_id, ngsild_params="format=simplified" + ) + + response_json = aeros_response.json() + content = gsma_schemas.AppInstanceStatus( + appInstanceState=map_aeros_service_status_to_gsma(response_json.get("actionType")), + accesspointInfo=[ + {"service_status": f"{self.base_url}/entities/{app_instance_id}"}, + { + "serviceComponents_status": f"{self.base_url}/hlo_fe/services//{app_instance_id}" + }, + ], + ) + + validated_data = gsma_schemas.AppInstanceStatus.model_validate(content) + + return build_custom_http_response( + status_code=200, + content=validated_data.model_dump(mode="json"), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=aeros_response.url, + request=aeros_response.request, + ) + except EdgeCloudPlatformError: + raise + except Exception as e: + self.logger.exception( + "Unhandled error retrieving GSMA deployment '%s' (%s/%s): %s", + app_instance_id, + app_id, + zone_id, + e, + ) + raise EdgeCloudPlatformError(str(e)) def get_all_deployed_apps_gsma(self) -> Response: """ @@ -557,7 +1136,23 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param app_provider: App provider :return: List with application instances details """ - pass + try: + insts = self.storage.find_deployments_gsma() + body = [i.model_dump(mode="json") for i in insts] + self.logger.info("All GSMA app instances retrieved successfully") + self.logger.debug("Deployed GSMA applications: %s", body) + + return build_custom_http_response( + status_code=200, + content=body, + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) + except EdgeCloudPlatformError: + raise + except Exception as e: + self.logger.exception("Unhandled error listing GSMA deployments: '%s'", e) + raise EdgeCloudPlatformError(str(e)) def undeploy_app_gsma(self, app_id: str, app_instance_id: str, zone_id: str): """ @@ -568,4 +1163,55 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param zone_id: Identifier of the zone :return: """ - pass + try: + # Ensure app exists + if not self.storage.get_app_gsma(app_id): + raise ResourceNotFoundError(f"GSMA app '{app_id}' not found") + + # Ensure the (app_id, instance, zone) exists + matches = self.storage.find_deployments_gsma( + app_id=app_id, app_instance_id=app_instance_id, zone_id=zone_id + ) + if not matches: + raise ResourceNotFoundError( + f"Deployment not found (app_id={app_id}, instance={app_instance_id}, zone={zone_id})" + ) + + # 2. Call the external undeploy_service + aeros_client = ContinuumClient(self.base_url) + try: + aeros_response = aeros_client.undeploy_service(app_instance_id) + except Exception as e: + raise EdgeCloudPlatformError( + f"Failed to undeploy app instance '{app_instance_id}': {str(e)}" + ) from e + + # Remove from deployed and mark as stopped so it can be purged later + removed_app_id = self.storage.remove_deployment_gsma(app_instance_id) + if removed_app_id: + self.storage.store_stopped_instance_gsma(removed_app_id, app_instance_id) + + # Async-friendly: 202 Accepted (termination in progress) + body = { + "appId": app_id, + "appInstIdentifier": app_instance_id, + "zoneId": zone_id, + "state": "TERMINATING", + } + return build_custom_http_response( + status_code=aeros_response.status_code, + content=body, + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) + except EdgeCloudPlatformError: + raise + except Exception as e: + self.logger.exception( + "Unhandled error undeploying GSMA app instance '%s' (app=%s zone=%s): %s", + app_instance_id, + app_id, + zone_id, + e, + ) + raise EdgeCloudPlatformError(str(e)) diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/config.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/config.py index 81d7b8c3833d017fb6b73eab0f3cdde8e20eb092..ded19cbe21dd7be42f63937f8b4d4cdf05cbc966 100644 --- a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/config.py +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/config.py @@ -23,5 +23,5 @@ if not aerOS_ACCESS_TOKEN: aerOS_HLO_TOKEN = "harcoded_hlo_token" if not aerOS_HLO_TOKEN: raise ValueError("Environment variable 'aerOS_HLO_TOKEN' is not set.") -DEBUG = False +DEBUG = True LOG_FILE = ".log/aeros_client.log" diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/continuum_client.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/continuum_client.py index 67ed942ce0683136705584cbd80ad6194d22300a..1ba40975fe53043be02d7297d2fb66f11fd74755 100644 --- a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/continuum_client.py +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/continuum_client.py @@ -51,7 +51,7 @@ class ContinuumClient: } @catch_requests_exceptions - def query_entity(self, entity_id, ngsild_params) -> dict: + def query_entity(self, entity_id, ngsild_params) -> requests.Response: """ Query entity with ngsi-ld params :input @@ -70,10 +70,10 @@ class ContinuumClient: self.logger.debug( "Query entity response: %s %s", response.status_code, response.text ) - return response.json() + return response @catch_requests_exceptions - def query_entities(self, ngsild_params): + def query_entities(self, ngsild_params) -> requests.Response: """ Query entities with ngsi-ld params :input @@ -90,7 +90,7 @@ class ContinuumClient: # self.logger.debug("Query entities URL: %s", entities_url) # self.logger.debug("Query entities response: %s %s", # response.status_code, response.text) - return response.json() + return response @catch_requests_exceptions def deploy_service(self, service_id: str) -> dict: @@ -116,7 +116,7 @@ class ContinuumClient: return response.json() @catch_requests_exceptions - def undeploy_service(self, service_id: str) -> dict: + def undeploy_service(self, service_id: str) -> requests.Response: """ Undeploy service :input @@ -129,6 +129,7 @@ class ContinuumClient: if response is None: return None else: + self.logger.debug("In OK Undeploy and text: %s", response.text) if config.DEBUG: self.logger.debug("Re-allocate service URL: %s", undeploy_url) self.logger.debug( @@ -136,10 +137,10 @@ class ContinuumClient: response.status_code, response.text, ) - return response.json() + return response @catch_requests_exceptions - def onboard_and_deploy_service(self, service_id: str, tosca_str: str) -> dict: + def onboard_and_deploy_service(self, service_id: str, tosca_str: str) -> requests.Response: """ Onboard (& deploy) service on aerOS continuum :input @@ -165,7 +166,7 @@ class ContinuumClient: response.status_code, response.text, ) - return response.json() + return response @catch_requests_exceptions def purge_service(self, service_id: str) -> bool: diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/continuum_models.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/continuum_models.py new file mode 100644 index 0000000000000000000000000000000000000000..d254840cecc1e616ff2c3bea3b17e62ceec9514c --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/continuum_models.py @@ -0,0 +1,289 @@ +""" +aerOS continuum models +""" + +from enum import Enum +from typing import Any, Dict, List, Optional, Union + +from pydantic import BaseModel, Field + + +class ServiceNotFound(BaseModel): + """ + Docstring + """ + + detail: str = "Service not found" + + +class CPUComparisonOperator(BaseModel): + """ + CPU requirment for now is that usage should be less than + """ + + less_or_equal: Union[float, None] = None + + +class CPUArchComparisonOperator(BaseModel): + """ + CPU arch requirment, equal to str + """ + + equal: Union[str, None] = None + + +class MEMComparisonOperator(BaseModel): + """ + RAM requirment for now is that available RAM should be more than + """ + + greater_or_equal: Union[str, None] = None + + +class EnergyEfficienyComparisonOperator(BaseModel): + """ + Energy Efficiency requirment for now is that IE should have energy efficiency more than a % + """ + + greater_or_equal: Union[str, None] = None + + +class GreenComparisonOperator(BaseModel): + """ + IE Green requirment for now is that IE should have green energy mix which us more than a % + """ + + greater_or_equal: Union[str, None] = None + + +class RTComparisonOperator(BaseModel): + """ + Real Time requirment T/F + """ + + equal: Union[bool, None] = None + + +class CpuArch(str, Enum): + """ + Enumeration with possible cpu types + """ + + x86_64 = "x86_64" + arm64 = "arm64" + arm32 = "arm32" + + +class Coordinates(BaseModel): + """ + IE coordinate requirements + """ + + coordinates: List[List[float]] + + +class DomainIdOperator(BaseModel): + """ + CPU arch requirment, equal to str + """ + + equal: Union[str, None] = None + + +class Property(BaseModel): + """ + IE capabilities + """ + + cpu_usage: CPUComparisonOperator = Field(default_factory=CPUComparisonOperator) + cpu_arch: CPUArchComparisonOperator = Field(default_factory=CPUArchComparisonOperator) + mem_size: MEMComparisonOperator = Field(default_factory=MEMComparisonOperator) + realtime: RTComparisonOperator = Field(default_factory=RTComparisonOperator) + area: Coordinates = None + energy_efficiency: EnergyEfficienyComparisonOperator = Field( + default_factory=EnergyEfficienyComparisonOperator + ) + green: GreenComparisonOperator = Field(default_factory=GreenComparisonOperator) + domain_id: DomainIdOperator = Field(default_factory=DomainIdOperator) + + # @field_validator('mem_size') + # def validate_mem_size(cls, v): + # if not v or "MB" not in v: + # raise ValueError("mem_size must be in MB and specified") + # mem_size_value = int(v.split(" ")[0]) + # if mem_size_value < 2000: + # raise ValueError("mem_size must be greater or equal to 2000 MB") + # return v + + +class HostCapability(BaseModel): + """ + Host properties + """ + + properties: Property + + +class NodeFilter(BaseModel): + """ + Node filter, + How to filter continuum IE and select canditate list + """ + + properties: Optional[Dict[str, List[str]]] = None + capabilities: Optional[List[Dict[str, HostCapability]]] = None + + +class HostRequirement(BaseModel): + """ + capabilities of node + """ + + # node_filter: Dict[str, List[Dict[str, HostCapability]]] + node_filter: NodeFilter + + +class PortProperties(BaseModel): + """ + Workload port description + """ + + protocol: List[str] = Field(...) + source: int = Field(...) + + +class ExposedPort(BaseModel): + """ + Workload exposed network ports + """ + + properties: PortProperties = Field(...) + + +class NetworkProperties(BaseModel): + """ + Dict of network requirments, name of port and protperty = [protocol, port] mapping + """ + + ports: Dict[str, ExposedPort] = Field(...) + exposePorts: Optional[bool] + + +class NetworkRequirement(BaseModel): + """ + Top level key of network requirments + """ + + properties: NetworkProperties + + +class CustomRequirement(BaseModel): + """ + Define a custom requirement type that can be either a host or a network requirement + """ + + host: HostRequirement = None + network: NetworkRequirement = None + + +class ArtifactModel(BaseModel): + """ + Artifact has a useer defined id and then a dict with the following keys: + """ + + file: str + type: str + repository: str + is_private: Optional[bool] = False + username: Optional[str] = None + password: Optional[str] = None + + +class NodeTemplate(BaseModel): + """ + Node template "tosca.nodes.Container.Application" + """ + + type: str + requirements: List[CustomRequirement] + artifacts: Dict[str, ArtifactModel] + interfaces: Dict[str, Any] + isJob: Optional[bool] = False + + +class TOSCA(BaseModel): + """ + The TOSCA structure + """ + + tosca_definitions_version: str + description: str + serviceOverlay: Optional[bool] = False + node_templates: Dict[str, NodeTemplate] + + +TOSCA_YAML_EXAMPLE = """ +tosca_definitions_version: tosca_simple_yaml_1_3 +description: A test service for testing TOSCA generation +serviceOverlay: false + +node_templates: + auto-component: + type: tosca.nodes.Container.Application + isJob: False + artifacts: + application_image: + file: aeros-public/common-deployments/nginx:latest + repository: registry.gitlab.aeros-project.eu + type: tosca.artifacts.Deployment.Image.Container.Docker + interfaces: + Standard: + create: + implementation: application_image + inputs: + cliArgs: + - -a: aa + envVars: + - URL: bb + requirements: + - network: + properties: + ports: + port1: + properties: + protocol: + - tcp + source: 80 + port2: + properties: + protocol: + - tcp + source: 443 + exposePorts: True + - host: + node_filter: + capabilities: + - host: + properties: + cpu_arch: + equal: x64 + realtime: + equal: false + cpu_usage: + less_or_equal: '0.4' + mem_size: + greater_or_equal: '1' + domain_id: + equal: urn:ngsi-ld:Domain:NCSRD + energy_efficiency: + greater_or_equal: '0.5' + green: + greater_or_equal: '0.5' + domain_id: + equal: urn:ngsi-ld:Domain:ncsrd01 + properties: null + + + + +""" diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/__init__.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/aeros2gsma_zone_details.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/aeros2gsma_zone_details.py new file mode 100644 index 0000000000000000000000000000000000000000..337818d028c9b3ee4db44bc0253956d1a59754f0 --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/aeros2gsma_zone_details.py @@ -0,0 +1,152 @@ +""" +aeros2gsma_zone_details.py +""" + +from typing import Any, Dict, List + + +def transformer(domain_ies: List[Dict[str, Any]], domain: str) -> Dict[str, Any]: # noqa: C901 + """ + Transform aerOS InfrastructureElements into GSMA ZoneRegisteredData structure. + :param domain_ies: List of aerOS InfrastructureElement dicts + :param domain: The ID of the edge cloud zone (zoneId) + :return: Dict matching gsma_schemas.ZoneRegisteredData (JSON-serializable) + """ + + def map_cpu_arch_to_isa(urn: str) -> str: + """ + Map aerOS cpuArchitecture URN to GSMA ISA_* literal. + Examples: + 'urn:ngsi-ld:CpuArchitecture:x64' -> 'ISA_X86_64' + 'urn:ngsi-ld:CpuArchitecture:arm64' -> 'ISA_ARM_64' + 'urn:ngsi-ld:CpuArchitecture:arm32' -> 'ISA_ARM_64' (closest) + 'urn:ngsi-ld:CpuArchitecture:x86' -> 'ISA_X86' + Fallback: 'ISA_X86_64' + """ + if not isinstance(urn, str): + return "ISA_X86_64" + tail = urn.split(":")[-1].lower() + if tail in ("x64", "x86_64", "amd64"): + return "ISA_X86_64" + if tail in ("x86", "i386", "i686"): + return "ISA_X86" + if tail in ("arm64", "aarch64"): + return "ISA_ARM_64" + if tail in ("arm32", "arm"): + # GSMA only has ARM_64 vs X86/X86_64; pick closest + return "ISA_ARM_64" + return "ISA_X86_64" + + def map_cpu_arch_to_ostype_arch(urn: str) -> str: + """ + Map aerOS cpuArchitecture URN to OSType.architecture literal: 'x86_64' or 'x86'. + Use 'x86_64' for x64/arm64 (closest allowed), and 'x86' for x86/arm32. + """ + if not isinstance(urn, str): + return "x86_64" + tail = urn.split(":")[-1].lower() + if tail in ("x64", "x86_64", "amd64", "arm64", "aarch64"): + return "x86_64" + if tail in ("x86", "i386", "i686", "arm32", "arm"): + return "x86" + return "x86_64" + + def map_os_distribution(_urn: str) -> str: + """ + aerOS uses 'urn:ngsi-ld:OperatingSystem:Linux' etc. + map Linux -> UBUNTU (assume), else OTHER. + """ + if isinstance(_urn, str) and _urn.split(":")[-1].lower() == "linux": + return "UBUNTU" + return "OTHER" + + def default_os_version(dist: str) -> str: + # You asked to assume Ubuntu 22.04 LTS for Linux + return "OS_VERSION_UBUNTU_2204_LTS" if dist == "UBUNTU" else "OTHER" + + # Totals (aggregate over elements) + total_cpu = 0 + total_ram = 0 + total_disk = 0 + total_available_ram = 0 + total_available_disk = 0 + + flavours_supported: List[Dict[str, Any]] = [] + seen_cpu_isas: set[str] = set() + + for element in domain_ies: + cpu_cores = int(element.get("cpuCores", 0) or 0) + ram_cap = int(element.get("ramCapacity", 0) or 0) # MB? + avail_ram = int(element.get("availableRam", 0) or 0) # MB? + disk_cap = int(element.get("diskCapacity", 0) or 0) # MB/GB? (pass-through) + avail_disk = int(element.get("availableDisk", 0) or 0) + + total_cpu += cpu_cores + total_ram += ram_cap + total_available_ram += avail_ram + total_disk += disk_cap + total_available_disk += avail_disk + + cpu_arch_urn = element.get("cpuArchitecture", "") + os_urn = element.get("operatingSystem", "") + + isa = map_cpu_arch_to_isa(cpu_arch_urn) + seen_cpu_isas.add(isa) + ost_arch = map_cpu_arch_to_ostype_arch(cpu_arch_urn) + dist = map_os_distribution(os_urn) + ver = default_os_version(dist) + + # Create a flavour per machine + flavour = { + "flavourId": f"{element.get('hostname', 'host')}-{element.get('containerTechnology', 'CT')}", + "cpuArchType": isa, # Literal ISA_* + "supportedOSTypes": [ + { + "architecture": ost_arch, # 'x86_64' or 'x86' + "distribution": dist, # 'UBUNTU' or 'OTHER' + "version": ver, # 'OS_VERSION_UBUNTU_2204_LTS' or 'OTHER' + "license": "OS_LICENSE_TYPE_FREE", + } + ], + "numCPU": cpu_cores, + "memorySize": ram_cap, + "storageSize": disk_cap, + } + flavours_supported.append(flavour) + + # Decide a single ISA for the aggregate reserved/quota entries + # Preference order: X86_64, ARM_64, X86 + def pick_aggregate_isa() -> str: + if "ISA_X86_64" in seen_cpu_isas: + return "ISA_X86_64" + if "ISA_ARM_64" in seen_cpu_isas: + return "ISA_ARM_64" + if "ISA_X86" in seen_cpu_isas: + return "ISA_X86" + # fallback + return "ISA_X86_64" + + agg_isa = pick_aggregate_isa() + + result = { + "zoneId": domain, + "reservedComputeResources": [ + { + "cpuArchType": agg_isa, + "numCPU": int( + total_cpu + ), # Same as Quotas untill we have somem policy or data to differentiate + "memory": total_ram, # ditto + } + ], + "computeResourceQuotaLimits": [ + { + "cpuArchType": agg_isa, + "numCPU": int(total_cpu), + "memory": total_ram, + } + ], + "flavoursSupported": flavours_supported, + } + + return result diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/camara2aeros_converter.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/camara2aeros_converter.py new file mode 100644 index 0000000000000000000000000000000000000000..b7d03b0b862e973a20c8ad229851e064a5f00502 --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/camara2aeros_converter.py @@ -0,0 +1,143 @@ +""" +Module: converter.py +This module provides functions to convert application manifests into TOSCA models. +It includes the `generate_tosca` function that constructs a TOSCA model based on +the application manifest and associated app zones. +""" + +from typing import List + +import yaml + +from sunrise6g_opensdk.edgecloud.adapters.aeros import config +from sunrise6g_opensdk.edgecloud.adapters.aeros.continuum_models import ( + TOSCA, + ArtifactModel, + CustomRequirement, + DomainIdOperator, + ExposedPort, + HostCapability, + HostRequirement, + NetworkProperties, + NetworkRequirement, + NodeFilter, + NodeTemplate, + PortProperties, +) +from sunrise6g_opensdk.edgecloud.adapters.aeros.continuum_models import ( + Property as HostProperty, +) +from sunrise6g_opensdk.edgecloud.core.camara_schemas import AppManifest, VisibilityType +from sunrise6g_opensdk.logger import setup_logger + +logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE) + + +def generate_tosca(app_manifest: AppManifest, app_zones: List[str]) -> str: + """ + Generate a TOSCA model from the application manifest and app zones. + Args: + app_manifest (AppManifest): The application manifest containing details about the app. + app_zones (List[Dict[str, Any]]): List of app zones where the app will be deployed. + Returns: + TOSCA yaml as string which can be used in a POST request with applcation type yaml + """ + component = app_manifest.componentSpec[0] + image_path = app_manifest.appRepo.imagePath.root + image_file = image_path.split("/")[-1] + repository_url = "/".join(image_path.split("/")[:-1]) if "/" in image_path else "docker_hub" + + zone_id = app_zones[0] + logger.info("DEBUG : %s", app_manifest.requiredResources.root) + # Extract minNodeMemory (fallback = 1024 MB) + + res = app_manifest.requiredResources.root + if hasattr(res, "applicationResources") and hasattr( + res.applicationResources.cpuPool.topology, "minNodeMemory" + ): + min_node_memory = res.applicationResources.cpuPool.topology.minNodeMemory + else: + min_node_memory = 1024 + + # Build exposed network ports + ports = { + iface.interfaceId: ExposedPort( + properties=PortProperties(protocol=[iface.protocol.value.lower()], source=iface.port) + ) + for iface in component.networkInterfaces + } + + expose_ports = any( + iface.visibilityType == VisibilityType.VISIBILITY_EXTERNAL + for iface in component.networkInterfaces + ) + + # Define host property constraints + host_props = HostProperty( + cpu_arch={"equal": "x64"}, + realtime={"equal": False}, + cpu_usage={"less_or_equal": "0.4"}, + mem_size={"greater_or_equal": str(min_node_memory)}, + energy_efficiency={"greater_or_equal": "0"}, + green={"greater_or_equal": "0"}, + domain_id=DomainIdOperator(equal=zone_id), + ) + + # Create Node compute and network requirements + requirements = [ + CustomRequirement( + network=NetworkRequirement( + properties=NetworkProperties(ports=ports, exposePorts=expose_ports) + ) + ), + CustomRequirement( + host=HostRequirement( + node_filter=NodeFilter( + capabilities=[{"host": HostCapability(properties=host_props)}], properties=None + ) + ) + ), + ] + # Define the NodeTemplate + node_template = NodeTemplate( + type="tosca.nodes.Container.Application", + isJob=False, + requirements=requirements, + artifacts={ + "application_image": ArtifactModel( + file=image_file, + type="tosca.artifacts.Deployment.Image.Container.Docker", + repository=repository_url, + is_private=app_manifest.appRepo.type == "PRIVATEREPO", + username=app_manifest.appRepo.userName, + password=app_manifest.appRepo.credentials, + ) + }, + interfaces={ + "Standard": { + "create": { + "implementation": "application_image", + "inputs": {"cliArgs": [], "envVars": []}, + } + } + }, + ) + + # Assemble full TOSCA object + tosca = TOSCA( + tosca_definitions_version="tosca_simple_yaml_1_3", + description=f"TOSCA for {app_manifest.name}", + serviceOverlay=False, + node_templates={component.componentName: node_template}, + ) + + tosca_dict = tosca.model_dump(by_alias=True, exclude_none=True) + + for template in tosca_dict.get("node_templates", {}).values(): + template["requirements"] = [ + {k: v for k, v in req.items() if v is not None} + for req in template.get("requirements", []) + ] + + yaml_str = yaml.dump(tosca_dict, sort_keys=False) + return yaml_str diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/gsma2aeros_converter.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/gsma2aeros_converter.py new file mode 100644 index 0000000000000000000000000000000000000000..bc30cd784eb03b36b95dc5beba81854e18075310 --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/gsma2aeros_converter.py @@ -0,0 +1,226 @@ +""" +Module: gsm2aeros_converter.py +Initial GSMA -> TOSCA generator. + +Notes: +- GSMA ApplicationModel does not include container image or ports directly. + (Those usually come from Artefacts, which we're ignoring for now.) +- We provide an `image_map` hook to resolve artefactId -> image string. +- Defaults to a public nginx image if nothing is provided. +- Network ports are omitted for now (exposePorts = False). +""" + +from typing import Callable, Dict, List, Optional + +import yaml + +from sunrise6g_opensdk.edgecloud.adapters.aeros import config +from sunrise6g_opensdk.edgecloud.adapters.aeros.continuum_models import ( + TOSCA, + ArtifactModel, + CustomRequirement, + DomainIdOperator, + ExposedPort, + HostCapability, + HostRequirement, + NetworkProperties, + NetworkRequirement, + NodeFilter, + NodeTemplate, + PortProperties, +) +from sunrise6g_opensdk.edgecloud.adapters.aeros.continuum_models import ( + Property as HostProperty, +) +from sunrise6g_opensdk.edgecloud.adapters.aeros.errors import ( + InvalidArgumentError, + ResourceNotFoundError, +) +from sunrise6g_opensdk.edgecloud.core import gsma_schemas +from sunrise6g_opensdk.logger import setup_logger + +logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE) + + +def generate_tosca_from_gsma_with_artefacts( # noqa: C901 + app_model: gsma_schemas.ApplicationModel, + zone_id: str, + artefact_resolver: Callable[[str], Optional[gsma_schemas.Artefact]], +) -> str: + """ + Build a TOSCA YAML from a GSMA `ApplicationModel` by resolving each component's `artefactId`. + + Rules/assumptions: + - One node_template per `AppComponentSpec` in the application model. + - Container image is taken from the first entry of `artefact.componentSpec[i].images`. + - Ports come (best-effort) from `exposedInterfaces` items in the matching componentSpec, e.g. {"protocol": "TCP", "port": 8080}. + - Host filter includes domain_id == `zone_id` and basic CPU/mem constraints. + - For PUBLICREPO artefacts: set `is_private=False` and omit credentials entirely. + For PRIVATEREPO artefacts: set `is_private=True` and include non-empty username/password if present. + - `cliArgs` are derived from `commandLineParams` dict: + - bool True -> "flag" + - key/value -> "key=value" + `envVars` are derived from `compEnvParams` list: + - [{"name": "KEY", "value": "VAL"}] -> [{"KEY": "VAL"}, ...] + - If a component name mismatch occurs between app and artefact, fall back to the first artefact componentSpec. + + :param app_model: GSMA ApplicationModel (already validated) + :param zone_id: Target aerOS domain id/zone urn for host node filter + :param artefact_resolver: Callable that returns an Artefact for a given artefactId + :return: TOSCA YAML string (tosca_simple_yaml_1_3) + """ + node_templates: Dict[str, NodeTemplate] = {} + + for comp in app_model.appComponentSpecs: + artefact = artefact_resolver(comp.artefactId) + if not artefact: + raise ResourceNotFoundError(f"GSMA artefact '{comp.artefactId}' not found") + + # pick the componentSpec that matches componentName, else first + comp_spec = None + if artefact.componentSpec: + for c in artefact.componentSpec: + if c.componentName == comp.componentName: + comp_spec = c + break + if comp_spec is None: + comp_spec = artefact.componentSpec[0] + else: + raise InvalidArgumentError(f"Artefact '{artefact.artefactId}' has no componentSpec") + + # Resolve container image + image = comp_spec.images[0] if comp_spec.images else "docker.io/library/nginx:stable" + if "/" in image: + repository_url = "/".join(image.split("/")[:-1]) + image_file = image.split("/")[-1] + else: + repository_url, image_file = "docker_hub", image + + # Ports (best-effort) from exposedInterfaces + ports: Dict[str, ExposedPort] = {} + expose_ports = False + if comp_spec.exposedInterfaces: + for idx, iface in enumerate(comp_spec.exposedInterfaces): + protocol = str(iface.get("protocol", "TCP")).lower() + port = iface.get("port") + if isinstance(port, int): + ports[f"if{idx}"] = ExposedPort( + properties=PortProperties(protocol=[protocol], source=port) + ) + expose_ports = True + + # Build cliArgs as a list of dicts: [{"KEY": "VAL"}, {"FLAG": ""}, ...] + cli_args: List[Dict[str, str]] = [] + cmd = getattr(comp_spec, "commandLineParams", None) + + if isinstance(cmd, dict): + for k, v in cmd.items(): + if v is True: + cli_args.append({str(k): ""}) # flag without value + elif v is False or v is None: + continue + else: + cli_args.append({str(k): str(v)}) + elif isinstance(cmd, list): + # if someone passes ["--flag", "--opt=1"] style + for item in cmd: + if isinstance(item, str): + if "=" in item: + k, v = item.split("=", 1) + cli_args.append({k: v}) + else: + cli_args.append({item: ""}) + + # Build envVars from compEnvParams list of {"name": "...", "value": "..."} + env_vars: List[Dict[str, str]] = [] + if isinstance(getattr(comp_spec, "compEnvParams", None), list): + for item in comp_spec.compEnvParams: + if isinstance(item, dict): + if "name" in item and "value" in item: + env_vars.append({str(item["name"]): str(item["value"])}) + elif len(item) == 1: # already mapping-like {"KEY": "VAL"} + k, v = next(iter(item.items())) + env_vars.append({str(k): str(v)}) + + # Host filter (basic example) + host_props = HostProperty( + cpu_arch={"equal": "x64"}, + realtime={"equal": False}, + cpu_usage={"less_or_equal": "0.4"}, + mem_size={"greater_or_equal": "1024"}, + energy_efficiency={"greater_or_equal": "0"}, + green={"greater_or_equal": "0"}, + domain_id=DomainIdOperator(equal=zone_id), + ) + + requirements = [ + CustomRequirement( + network=NetworkRequirement( + properties=NetworkProperties(ports=ports, exposePorts=expose_ports) + ) + ), + CustomRequirement( + host=HostRequirement( + node_filter=NodeFilter( + capabilities=[{"host": HostCapability(properties=host_props)}], + properties=None, + ) + ) + ), + ] + + # PUBLICREPO => is_private=False and omit credentials + repo_type = getattr(artefact, "repoType", None) + is_private = bool(repo_type == "PRIVATEREPO") + username = None + password = None + if is_private and artefact.artefactRepoLocation: + u = artefact.artefactRepoLocation.userName + p = artefact.artefactRepoLocation.password + username = u if u else None + password = p if p else None + + node_templates[comp.componentName] = NodeTemplate( + type="tosca.nodes.Container.Application", + isJob=False, + requirements=requirements, + artifacts={ + "application_image": ArtifactModel( + file=image_file, + type="tosca.artifacts.Deployment.Image.Container.Docker", + repository=repository_url, + is_private=is_private, # False for PUBLICREPO + username=username, # None for PUBLICREPO + password=password, # None for PUBLICREPO + ) + }, + interfaces={ + "Standard": { + "create": { + "implementation": "application_image", + "inputs": { + "cliArgs": cli_args, + "envVars": env_vars, + }, + } + } + }, + ) + + # Assemble and dump TOSCA + tosca = TOSCA( + tosca_definitions_version="tosca_simple_yaml_1_3", + description=f"GSMA->TOSCA for {app_model.appMetaData.appName} ({app_model.appId})", + serviceOverlay=False, + node_templates=node_templates, + ) + + tosca_dict = tosca.model_dump(by_alias=True, exclude_none=True) + # Clean requirements lists from None entries + for template in tosca_dict.get("node_templates", {}).values(): + template["requirements"] = [ + {k: v for k, v in req.items() if v is not None} + for req in template.get("requirements", []) + ] + + return yaml.dump(tosca_dict, sort_keys=False) diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/errors.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/errors.py new file mode 100644 index 0000000000000000000000000000000000000000..16beed29c5b29eb73544697a3be65e77379f2599 --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/errors.py @@ -0,0 +1,35 @@ +""" +Custom aerOS adapter exceptions on top of EdgeCloudPlatformError +""" + +from sunrise6g_opensdk.edgecloud.adapters.errors import EdgeCloudPlatformError + + +class InvalidArgumentError(EdgeCloudPlatformError): + """400 Bad Request""" + + pass + + +class UnauthenticatedError(EdgeCloudPlatformError): + """401 Unauthorized""" + + pass + + +class PermissionDeniedError(EdgeCloudPlatformError): + """403 Forbidden""" + + pass + + +class ResourceNotFoundError(EdgeCloudPlatformError): + """404 Not Found""" + + pass + + +class ServiceUnavailableError(EdgeCloudPlatformError): + """503 Service Unavailable""" + + pass diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/__init__.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..22b535e043b0080f8d8b3b97f2fda044dd1dff47 --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/__init__.py @@ -0,0 +1,6 @@ +""" +This module contains the storage management implementations for aerOS. +""" + +from .inMemoryStorage import InMemoryAppStorage +from .sqlite_storage import SQLiteAppStorage diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/appStorageManager.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/appStorageManager.py new file mode 100644 index 0000000000000000000000000000000000000000..c1cee6576f7b1facb94031d8bbc568dddee193a8 --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/appStorageManager.py @@ -0,0 +1,186 @@ +""" +# Class: AppStorageManager +# Abstract base class for application storage backends. +# This module defines the interface for managing application storage, +#""" + +from abc import ABC, abstractmethod +from typing import Dict, List, Optional, Union + +from sunrise6g_opensdk.edgecloud.core.camara_schemas import AppInstanceInfo +from sunrise6g_opensdk.edgecloud.core.gsma_schemas import ( + AppInstance, + AppInstanceStatus, + ApplicationModel, + Artefact, +) + + +class AppStorageManager(ABC): + """Abstract base class for application storage backends.""" + + # ------------------------------------------------------------------------ + # aerOS Domain → Zone mapping + # ------------------------------------------------------------------------ + @abstractmethod + def store_zones(self, zones: Dict[str, Dict]) -> None: + """Store or update the aerOS domain → zone info mapping.""" + + @abstractmethod + def list_zones(self) -> List[Dict]: + """Return a list of all stored zone records (values).""" + + @abstractmethod + def resolve_domain_id_by_zone_uuid(self, zone_uuid: str) -> Optional[str]: + """Return the aerOS domain id (key) for a given edgeCloudZoneId (UUID).""" + + # ------------------------------------------------------------------------ + # CAMARA + # ------------------------------------------------------------------------ + + @abstractmethod + def store_app(self, app_id: str, manifest: Dict) -> None: + pass + + @abstractmethod + def get_app(self, app_id: str) -> Optional[Dict]: + pass + + @abstractmethod + def app_exists(self, app_id: str) -> bool: + pass + + @abstractmethod + def list_apps(self) -> List[Dict]: + pass + + @abstractmethod + def delete_app(self, app_id: str) -> None: + pass + + @abstractmethod + def store_deployment(self, app_instance: AppInstanceInfo) -> None: + pass + + @abstractmethod + def get_deployments(self, app_id: Optional[str] = None) -> Dict[str, List[str]]: + pass + + @abstractmethod + def find_deployments( + self, + app_id: Optional[str] = None, + app_instance_id: Optional[str] = None, + region: Optional[str] = None, + ) -> List[AppInstanceInfo]: + pass + + @abstractmethod + def remove_deployment(self, app_instance_id: str) -> Optional[str]: + """Removes the given instance ID and returns the corresponding app_id, if found.""" + pass + + @abstractmethod + def store_stopped_instance(self, app_id: str, app_instance_id: str) -> None: + pass + + @abstractmethod + def get_stopped_instances( + self, app_id: Optional[str] = None + ) -> List[str] | Dict[str, List[str]]: + pass + + @abstractmethod + def remove_stopped_instances(self, app_id: str) -> None: + pass + + # ------------------------------------------------------------------------ + # GSMA + # ------------------------------------------------------------------------ + @abstractmethod + def store_app_gsma(self, app_id: str, model: ApplicationModel) -> None: + """Implement in subclass.""" + raise NotImplementedError + + @abstractmethod + def get_app_gsma(self, app_id: str) -> Optional[ApplicationModel]: + """Implement in subclass.""" + raise NotImplementedError + + @abstractmethod + def list_apps_gsma(self) -> List[ApplicationModel]: + """Implement in subclass.""" + raise NotImplementedError + + @abstractmethod + def delete_app_gsma(self, app_id: str) -> None: + """Implement in subclass.""" + raise NotImplementedError + + @abstractmethod + def store_deployment_gsma( + self, + app_id: str, + inst: AppInstance, + status: Optional[AppInstanceStatus] = None, # optional future use + ) -> None: + """Implement in subclass.""" + raise NotImplementedError + + @abstractmethod + def get_deployments_gsma(self, app_id: Optional[str] = None) -> Dict[str, List[str]]: + """Implement in subclass.""" + raise NotImplementedError + + @abstractmethod + def find_deployments_gsma( + self, + app_id: Optional[str] = None, + app_instance_id: Optional[str] = None, + zone_id: Optional[str] = None, + ) -> List[AppInstance]: + """Implement in subclass.""" + raise NotImplementedError + + @abstractmethod + def remove_deployment_gsma(self, app_instance_id: str) -> Optional[str]: + """Implement in subclass.""" + raise NotImplementedError + + @abstractmethod + def store_stopped_instance_gsma(self, app_id: str, app_instance_id: str) -> None: + """Implement in subclass.""" + raise NotImplementedError + + @abstractmethod + def get_stopped_instances_gsma( + self, app_id: Optional[str] = None + ) -> Union[List[str], Dict[str, List[str]]]: + """Implement in subclass.""" + raise NotImplementedError + + @abstractmethod + def remove_stopped_instances_gsma(self, app_id: str) -> None: + """Implement in subclass.""" + raise NotImplementedError + + # --- GSMA Artefacts --- + @abstractmethod + def store_artefact_gsma(self, artefact: Artefact) -> None: + """Implement in subclass.""" + raise NotImplementedError + + @abstractmethod + def get_artefact_gsma(self, artefact_id: str) -> Optional[Artefact]: + """Implement in subclass.""" + raise NotImplementedError + + @abstractmethod + def list_artefacts_gsma(self) -> List[Artefact]: + """Implement in subclass.""" + raise NotImplementedError + + @abstractmethod + def delete_artefact_gsma(self, artefact_id: str) -> None: + """Implement in subclass.""" + raise NotImplementedError diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/inMemoryStorage.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/inMemoryStorage.py new file mode 100644 index 0000000000000000000000000000000000000000..df36810b757ac6914ba1bc5907f221ab0323136a --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/inMemoryStorage.py @@ -0,0 +1,347 @@ +""" +Class: InMemoryAppStorage +Process-wide singleton, thread-safe with a single RLock. +Keeps CAMARA and GSMA stores separate to avoid schema confusion. +""" + +from abc import ABCMeta +from threading import RLock +from typing import Dict, List, Optional, Union + +from sunrise6g_opensdk.edgecloud.adapters.aeros import config +from sunrise6g_opensdk.edgecloud.adapters.aeros.storageManagement.appStorageManager import ( + AppStorageManager, +) +from sunrise6g_opensdk.edgecloud.core.camara_schemas import AppInstanceInfo +from sunrise6g_opensdk.edgecloud.core.gsma_schemas import ( + AppInstance, + AppInstanceStatus, + ApplicationModel, + Artefact, +) +from sunrise6g_opensdk.logger import setup_logger + + +class SingletonMeta(ABCMeta): + """Thread-safe Singleton metaclass (process-wide).""" + + _instances: Dict[type, object] = {} + _lock = RLock() + + def __call__(cls, *args, **kwargs): + # Double-checked locking + if cls not in cls._instances: + with cls._lock: + if cls not in cls._instances: + cls._instances[cls] = super().__call__(*args, **kwargs) + return cls._instances[cls] + + +class InMemoryAppStorage(AppStorageManager, metaclass=SingletonMeta): + """ + In-memory implementation of the AppStorageManager interface. + CAMARA and GSMA data are stored in separate namespaces. + """ + + def __init__(self): + if getattr(self, "_initialized", False): + return + + # Always have a logger; gate noisy messages by DEBUG + self.logger = setup_logger() + if config.DEBUG: + self.logger.info("Using InMemoryStorage (singleton)") + + self._lock = RLock() + + # aerOS Domain → Zone mapping + self._zones: Dict[str, Dict] = {} # {aeros_domain_id: camara_zone_dict} + + # CAMARA stores + self._apps: Dict[str, Dict] = {} # app_id -> manifest (CAMARA dict) + self._deployed: Dict[str, List[AppInstanceInfo]] = {} # app_id -> [AppInstanceInfo] + self._stopped: Dict[str, List[str]] = {} # app_id -> [stopped instance ids] + + # GSMA stores + self._apps_gsma: Dict[str, ApplicationModel] = {} # app_id -> ApplicationModel + self._deployed_gsma: Dict[str, List[AppInstance]] = {} # app_id -> [AppInstance] + self._stopped_gsma: Dict[str, List[str]] = {} # app_id -> [stopped instance ids] + + self._artefacts_gsma: Dict[str, Artefact] = {} # artefact_id -> Artefact + + self._initialized = True + + # ------------------------------------------------------------------------ + # Utilities + # ------------------------------------------------------------------------ + def reset(self) -> None: + """Helper for tests to clear global state.""" + with self._lock: + # CAMARA + self._apps.clear() + self._deployed.clear() + self._stopped.clear() + # GSMA + self._apps_gsma.clear() + self._deployed_gsma.clear() + self._stopped_gsma.clear() + + # ------------------------------------------------------------------------ + # aerOS Domain → Zone mapping + # ------------------------------------------------------------------------ + + def store_zones(self, zones: Dict[str, Dict]) -> None: + """ + Directly store a mapping of aerOS domain_id -> zone_info dict. + Example: + { + "urn:ngsi-ld:Domain:Athens": { + "edgeCloudZoneId": "550e8400-e29b-41d4-a716-446655440000", + "edgeCloudZoneName": "Athens", + "edgeCloudProvider": "aeros_dev", + "status": "active", + "geographyDetails": "NOT_USED", + }, + ... + } + """ + with self._lock: + self._zones.update(zones) + + def list_zones(self) -> List[Dict]: + """Return all zone records as a list of dicts.""" + with self._lock: + return [dict(v) for v in self._zones.values()] + + def resolve_domain_id_by_zone_uuid(self, zone_uuid: str) -> Optional[str]: + """ + Given the edgeCloudZoneId (UUID string), return the original aerOS domain id. + Performs a simple scan — fine for small to medium sets. + """ + with self._lock: + for domain_id, zone in self._zones.items(): + if zone.get("edgeCloudZoneId") == zone_uuid: + return domain_id + return None + + # ------------------------------------------------------------------------ + # CAMARA + # ------------------------------------------------------------------------ + def store_app(self, app_id: str, manifest: Dict) -> None: + with self._lock: + self._apps[app_id] = manifest + + def get_app(self, app_id: str) -> Optional[Dict]: + with self._lock: + return self._apps.get(app_id) + + def app_exists(self, app_id: str) -> bool: + with self._lock: + return app_id in self._apps + + def list_apps(self) -> List[Dict]: + with self._lock: + # shallow copies to avoid external mutation of nested dicts + return [dict(m) for m in self._apps.values()] + + def delete_app(self, app_id: str) -> None: + with self._lock: + self._apps.pop(app_id, None) + + def store_deployment(self, app_instance: AppInstanceInfo) -> None: + with self._lock: + # Ensure the key is a plain string + aid = getattr(app_instance.appId, "root", str(app_instance.appId)) + self._deployed.setdefault(aid, []).append(app_instance) + + def get_deployments(self, app_id: Optional[str] = None) -> Dict[str, List[str]]: + with self._lock: + if app_id: + ids = [str(i.appInstanceId) for i in self._deployed.get(app_id, [])] + return {app_id: ids} + return { + aid: [str(i.appInstanceId) for i in insts] for aid, insts in self._deployed.items() + } + + def find_deployments( + self, + app_id: Optional[str] = None, + app_instance_id: Optional[str] = None, + region: Optional[str] = None, + ) -> List[AppInstanceInfo]: + with self._lock: + # Fast path by instance id + if app_instance_id: + for insts in self._deployed.values(): + for inst in insts: + if str(inst.appInstanceId.root) == app_instance_id: + if app_id and str(inst.appId) != app_id: + return [] + if region is not None and getattr(inst, "region", None) != region: + return [] + return [inst] + return [] + + results: List[AppInstanceInfo] = [] + for aid, insts in self._deployed.items(): + if app_id and aid != app_id: + continue + for inst in insts: + if region is not None and getattr(inst, "region", None) != region: + continue + results.append(inst) + return results + + def remove_deployment(self, app_instance_id: str) -> Optional[str]: + with self._lock: + for aid, insts in list(self._deployed.items()): + for idx, inst in enumerate(insts): + # Compare using the instance id string + inst_id = getattr(inst.appInstanceId, "root", str(inst.appInstanceId)) + if inst_id == app_instance_id: + insts.pop(idx) + if not insts: + self._deployed.pop(aid, None) + # Return a plain string app_id + aid_str = getattr(aid, "root", str(aid)) + return aid_str + return None + + def store_stopped_instance(self, app_id: str, app_instance_id: str) -> None: + with self._lock: + lst = self._stopped.setdefault(app_id, []) + if app_instance_id not in lst: + lst.append(app_instance_id) + + def get_stopped_instances( + self, app_id: Optional[str] = None + ) -> Union[List[str], Dict[str, List[str]]]: + with self._lock: + if app_id: + return list(self._stopped.get(app_id, [])) + return {aid: list(ids) for aid, ids in self._stopped.items()} + + def remove_stopped_instances(self, app_id: str) -> None: + with self._lock: + self._stopped.pop(app_id, None) + + # ------------------------------------------------------------------------ + # GSMA + # ------------------------------------------------------------------------ + def store_app_gsma(self, app_id: str, model: ApplicationModel) -> None: + with self._lock: + self._apps_gsma[app_id] = model + + def get_app_gsma(self, app_id: str) -> Optional[ApplicationModel]: + with self._lock: + return self._apps_gsma.get(app_id) + + def list_apps_gsma(self) -> List[ApplicationModel]: + with self._lock: + return list(self._apps_gsma.values()) + + def delete_app_gsma(self, app_id: str) -> None: + with self._lock: + self._apps_gsma.pop(app_id, None) + + def store_deployment_gsma( + self, + app_id: str, + inst: AppInstance, + status: Optional[AppInstanceStatus] = None, # not persisted yet + ) -> None: + with self._lock: + self._deployed_gsma.setdefault(app_id, []).append(inst) + # If you later want to persist status per instance, keep a side map: + # self._status_gsma[inst.appInstIdentifier] = status + + def get_deployments_gsma(self, app_id: Optional[str] = None) -> Dict[str, List[str]]: + with self._lock: + if app_id: + ids = [str(i.appInstIdentifier) for i in self._deployed_gsma.get(app_id, [])] + return {app_id: ids} + return { + aid: [str(i.appInstIdentifier) for i in insts] + for aid, insts in self._deployed_gsma.items() + } + + def find_deployments_gsma( + self, + app_id: Optional[str] = None, + app_instance_id: Optional[str] = None, + zone_id: Optional[str] = None, + ) -> List[AppInstance]: + with self._lock: + # Limit the search space if app_id is provided + iter_lists = ( + [self._deployed_gsma.get(app_id, [])] if app_id else self._deployed_gsma.values() + ) + + # Fast path: instance id provided + if app_instance_id: + target_id = str(app_instance_id) + for insts in iter_lists: + for inst in insts: + if str(inst.appInstIdentifier) != target_id: + continue + if zone_id is not None and inst.zoneId != zone_id: + continue + return [inst] + return [] + + # General filtering + results: List[AppInstance] = [] + for insts in iter_lists: + for inst in insts: + if zone_id is not None and inst.zoneId != zone_id: + continue + results.append(inst) + return results + + def remove_deployment_gsma(self, app_instance_id: str) -> Optional[str]: + with self._lock: + for aid, insts in list(self._deployed_gsma.items()): + for idx, inst in enumerate(insts): + if str(inst.appInstIdentifier) == app_instance_id: + insts.pop(idx) + if not insts: + self._deployed_gsma.pop(aid, None) + return aid + return None + + def store_stopped_instance_gsma(self, app_id: str, app_instance_id: str) -> None: + with self._lock: + lst = self._stopped_gsma.setdefault(app_id, []) + if app_instance_id not in lst: + lst.append(app_instance_id) + + def get_stopped_instances_gsma( + self, app_id: Optional[str] = None + ) -> Union[List[str], Dict[str, List[str]]]: + with self._lock: + if app_id: + return list(self._stopped_gsma.get(app_id, [])) + return {aid: list(ids) for aid, ids in self._stopped_gsma.items()} + + def remove_stopped_instances_gsma(self, app_id: str) -> None: + with self._lock: + self._stopped_gsma.pop(app_id, None) + + # ------------------------------------------------------------------------ + # GSMA Artefacts + # ------------------------------------------------------------------------ + def store_artefact_gsma(self, artefact: Artefact) -> None: + with self._lock: + self._artefacts_gsma[artefact.artefactId] = artefact + + def get_artefact_gsma(self, artefact_id: str) -> Optional[Artefact]: + with self._lock: + return self._artefacts_gsma.get(artefact_id) + + def list_artefacts_gsma(self) -> List[Artefact]: + with self._lock: + return list(self._artefacts_gsma.values()) + + def delete_artefact_gsma(self, artefact_id: str) -> None: + with self._lock: + self._artefacts_gsma.pop(artefact_id, None) diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/sqlite_storage.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/sqlite_storage.py new file mode 100644 index 0000000000000000000000000000000000000000..ef1bf9b34454fc1c65421ed12fa2383c633722dc --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/sqlite_storage.py @@ -0,0 +1,246 @@ +""" +SQLite storage implementation +""" + +import json +import sqlite3 +from functools import wraps +from typing import Dict, List, Optional, Union + +from sunrise6g_opensdk.edgecloud.adapters.aeros import config +from sunrise6g_opensdk.edgecloud.adapters.aeros.storageManagement.appStorageManager import ( + AppStorageManager, +) +from sunrise6g_opensdk.edgecloud.core.camara_schemas import ( + AppId, + AppInstanceId, + AppInstanceInfo, + AppInstanceName, + AppProvider, + EdgeCloudZoneId, + Status, +) +from sunrise6g_opensdk.logger import setup_logger + +decorator_logger = setup_logger() + + +def debug_log(msg: str): + """ + Decorator that logs the given message if config.DEBUG is True. + """ + + def decorator(func): + + @wraps(func) + def wrapper(*args, **kwargs): + if config.DEBUG: + decorator_logger.debug("[DEBUG] %s", msg) + return func(*args, **kwargs) + + return wrapper + + return decorator + + +class SQLiteAppStorage(AppStorageManager): + """ + SQLite storage implementation + """ + + @debug_log("Initializing SQLITE storage manager") + def __init__(self, db_path: str = "app_storage.db"): + if config.DEBUG: + self.logger = setup_logger() + self.logger.info("DB Path: %s", db_path) + self.conn = sqlite3.connect(db_path, check_same_thread=False) + self._init_schema() + + def _init_schema(self): + if config.DEBUG: + self.logger.info("Initializing db schema") + cursor = self.conn.cursor() + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS apps ( + app_id TEXT PRIMARY KEY, + manifest TEXT + ); + """ + ) + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS deployments ( + app_instance_id TEXT PRIMARY KEY, + app_id TEXT, + name TEXT, + app_provider TEXT, + status TEXT, + component_endpoint_info TEXT, + kubernetes_cluster_ref TEXT, + edge_cloud_zone_id TEXT + ); + """ + ) + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS stopped ( + app_id TEXT, + app_instance_id TEXT + ); + """ + ) + self.conn.commit() + + @debug_log("In SQLITE store_app method ") + def store_app(self, app_id: str, manifest: Dict) -> None: + self.conn.execute( + "INSERT OR REPLACE INTO apps (app_id, manifest) VALUES (?, ?);", + (app_id, json.dumps(manifest)), + ) + self.conn.commit() + + @debug_log("In SQLITE get_app method ") + def get_app(self, app_id: str) -> Optional[Dict]: + row = self.conn.execute("SELECT manifest FROM apps WHERE app_id = ?;", (app_id,)).fetchone() + return json.loads(row[0]) if row else None + + @debug_log("In SQLITE app_exists method ") + def app_exists(self, app_id: str) -> bool: + row = self.conn.execute("SELECT 1 FROM apps WHERE app_id = ?;", (app_id,)).fetchone() + return row is not None + + @debug_log("In SQLITE list_apps method ") + def list_apps(self) -> List[Dict]: + rows = self.conn.execute("SELECT manifest FROM apps;").fetchall() + return [json.loads(row[0]) for row in rows] + + @debug_log("In SQLITE delete_app method ") + def delete_app(self, app_id: str) -> None: + self.conn.execute("DELETE FROM apps WHERE app_id = ?;", (app_id,)) + self.conn.commit() + + @debug_log("In SQLITE store_deployment method ") + def store_deployment(self, app_instance: AppInstanceInfo) -> None: + resolved_status = ( + str(app_instance.status.value) + if hasattr(app_instance.status, "value") + else str(app_instance.status) if app_instance.status else "unknown" + ) + self.logger.info("Resolved status for DB insert: %s", resolved_status) + + self.conn.execute( + """ + INSERT OR REPLACE INTO deployments ( + app_instance_id, app_id, name, app_provider, status, + component_endpoint_info, kubernetes_cluster_ref, edge_cloud_zone_id + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?); + """, + ( + str(app_instance.appInstanceId), + str(app_instance.appId), + str(app_instance.name.root), + str(app_instance.appProvider.root), + ( + str(app_instance.status.value) + if hasattr(app_instance.status, "value") + else str(app_instance.status) if app_instance.status else "unknown" + ), + ( + json.dumps(app_instance.componentEndpointInfo) + if app_instance.componentEndpointInfo + else None + ), + app_instance.kubernetesClusterRef, + str(app_instance.edgeCloudZoneId.root), + ), + ) + + self.conn.commit() + + @debug_log("In SQLITE get_deployments method ") + def get_deployments(self, app_id: Optional[str] = None) -> Dict[str, List[str]]: + if app_id: + rows = self.conn.execute( + "SELECT app_id, app_instance_id FROM deployments WHERE app_id = ?;", (app_id,) + ).fetchall() + else: + rows = self.conn.execute("SELECT app_id, app_instance_id FROM deployments;").fetchall() + + result: Dict[str, List[str]] = {} + for app_id_val, instance_id in rows: + result.setdefault(app_id_val, []).append(instance_id) + return result + + @debug_log("In SQLITE find_deployments method ") + def find_deployments( + self, + app_id: Optional[str] = None, + app_instance_id: Optional[str] = None, + region: Optional[str] = None, + ) -> List[AppInstanceInfo]: + query = "SELECT * FROM deployments WHERE 1=1" + params = [] + if app_id: + query += " AND app_id = ?" + params.append(app_id) + if app_instance_id: + query += " AND app_instance_id = ?" + params.append(app_instance_id) + + rows = self.conn.execute(query, params).fetchall() + + result = [] + for row in rows: + result.append( + AppInstanceInfo( + appInstanceId=AppInstanceId(row[0]), + appId=AppId(row[1]), + name=AppInstanceName(row[2]), + appProvider=AppProvider(row[3]), + status=Status(row[4]) if row[4] else Status.unknown, + componentEndpointInfo=json.loads(row[5]) if row[5] else None, + kubernetesClusterRef=row[6], + edgeCloudZoneId=EdgeCloudZoneId(row[7]), + ) + ) + + return result + + @debug_log("In SQLITE remove_deployments method ") + def remove_deployment(self, app_instance_id: str) -> Optional[str]: + row = self.conn.execute( + "SELECT app_id FROM deployments WHERE app_instance_id = ?;", (app_instance_id,) + ).fetchone() + self.conn.execute("DELETE FROM deployments WHERE app_instance_id = ?;", (app_instance_id,)) + self.conn.commit() + return row[0] if row else None + + @debug_log("In SQLITE store_stopped_instance method ") + def store_stopped_instance(self, app_id: str, app_instance_id: str) -> None: + self.conn.execute( + "INSERT INTO stopped (app_id, app_instance_id) VALUES (?, ?);", + (app_id, app_instance_id), + ) + self.conn.commit() + + @debug_log("In SQLITE get_Stopped_instances method ") + def get_stopped_instances( + self, app_id: Optional[str] = None + ) -> Union[List[str], Dict[str, List[str]]]: + if app_id: + rows = self.conn.execute( + "SELECT app_instance_id FROM stopped WHERE app_id = ?;", (app_id,) + ).fetchall() + return [r[0] for r in rows] + else: + rows = self.conn.execute("SELECT app_id, app_instance_id FROM stopped;").fetchall() + result: Dict[str, List[str]] = {} + for aid, iid in rows: + result.setdefault(aid, []).append(iid) + return result + + @debug_log("In SQLITE remove_stopped_instances method ") + def remove_stopped_instances(self, app_id: str) -> None: + self.conn.execute("DELETE FROM stopped WHERE app_id = ?;", (app_id,)) + self.conn.commit() diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/utils.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/utils.py index d2424e33d9a1aa8d384604623146aada83fdcf7e..281e3545b061a05dc4d92af8e74d3297c9bb891a 100644 --- a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/utils.py +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/utils.py @@ -6,38 +6,172 @@ # - Andreas Sakellaropoulos (asakellaropoulos@iit.demokritos.gr) ## """ -Docstring +aerOS help methods """ +import string +import uuid + from requests.exceptions import HTTPError, RequestException, Timeout import sunrise6g_opensdk.edgecloud.adapters.aeros.config as config +import sunrise6g_opensdk.edgecloud.adapters.aeros.errors as errors from sunrise6g_opensdk.logger import setup_logger +_HEX = "0123456789abcdef" +_ALLOWED = set( + string.ascii_letters + string.digits +) # no underscore here; underscore is always escaped +_PREFIX = "A0_" # ensures name starts with a letter; stripped during decode + + +def encode_app_instance_name(original: str, *, max_len: int = 64) -> str: + """ + aerOS to CAMARA AppInstanceName encoder. + Reversibly encode `original` into a string matching ^[A-Za-z][A-Za-z0-9_]{1,63}$. + Uses underscore + two hex digits to escape any non [A-Za-z0-9] chars, including '_' itself. + If the encoded result would exceed `max_len`, raise ValueError (reversibility would be lost otherwise). + """ + out = [] + for ch in original: + if ch in _ALLOWED: + out.append(ch) + elif ch == "_": + out.append("_5f") + else: + # escape any other byte as _hh (lowercase hex) + out.append("_" + format(ord(ch), "02x")) + + enc = "".join(out) + + # must start with a letter + if not enc or enc[0] not in string.ascii_letters: + enc = _PREFIX + enc + + if len(enc) > max_len: + raise ValueError( + f"Encoded name exceeds {max_len} chars; cannot keep reversibility without external mapping." + ) + return enc + + +def decode_app_instance_name(encoded: str) -> str: + """ + CAMARA AppInstanceName to aerOS original app_id decoder. + Reverse of encode_app_instance_name. Restores the exact original string. + """ + s = encoded + if s.startswith(_PREFIX): + s = s[len(_PREFIX) :] + + # walk and decode _hh sequences; underscores never appear unescaped in the encoding + i = 0 + out = [] + while i < len(s): + ch = s[i] + if ch != "_": + out.append(ch) + i += 1 + continue + + # expect two hex digits after underscore + if i + 2 >= len(s): + raise ValueError("Invalid escape at end of string.") + h1 = s[i + 1].lower() + h2 = s[i + 2].lower() + if h1 not in _HEX or h2 not in _HEX: + raise ValueError(f"Invalid escape sequence: _{h1}{h2}") + code = int(h1 + h2, 16) + out.append(chr(code)) + i += 3 + + return "".join(out) + + +def urn_to_uuid(urn: str) -> uuid.UUID: + """Convert a (ngsi-ld) URN string to a deterministic UUID.""" + return uuid.uuid5(uuid.NAMESPACE_URL, urn) + + +def map_aeros_service_status_to_gsma(status: str) -> str: + """ + Map aerOS service lifecycle states to GSMA-compliant status values. + + aerOS → GSMA + DEPLOYING → PENDING + DESTROYING → TERMINATING + DEPLOYED → DEPLOYED + FINISHED → No_Match + No_Match → READY + urn:ngsi-ld:null → No Match + """ + mapping = { + "DEPLOYING": "PENDING", + "DESTROYING": "TERMINATING", + "DEPLOYED": "DEPLOYED", + "FINISHED": "READY", + # "urn:ngsi-ld:null": "READY", + } + if not status: + return "FAILED" + return mapping.get(status.strip().upper(), "FAILED") + def catch_requests_exceptions(func): """ - Docstring + Decorator to catch and translate requests exceptions into custom app errors. """ logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE) def wrapper(*args, **kwargs): try: - result = func(*args, **kwargs) - return result + return func(*args, **kwargs) + except HTTPError as e: - logger.info("4xx or 5xx: %s \n", {e}) - return None # raise our custom exception or log, etc. - except ConnectionError as e: - logger.info( - "Raised for connection-related issues (e.g., DNS resolution failure, network issues): %s \n", - {e}, - ) - return None # raise our custom exception or log, etc. + response = getattr(e, "response", None) + status_code = getattr(response, "status_code", None) + logger.error("HTTPError occurred: %s", e) + + if status_code == 401: + raise errors.UnauthenticatedError("Unauthorized access") from e + elif status_code == 403: + raise errors.PermissionDeniedError("Forbidden access") from e + elif status_code == 404: + raise errors.ResourceNotFoundError("Resource not found") from e + elif status_code == 400: + raise errors.InvalidArgumentError("Bad request") from e + elif status_code == 503: + raise errors.ServiceUnavailableError("Service unavailable") from e + + raise errors.EdgeCloudPlatformError(f"Unhandled HTTP error: {status_code}") from e + except Timeout as e: - logger.info("Timeout occured: %s \n", {e}) - return None # raise our custom exception or log, etc. + logger.warning("Timeout occurred: %s", e) + raise errors.ServiceUnavailableError("Request timed out") from e + + except ConnectionError as e: + logger.warning("Connection error (e.g., DNS): %s", e) + raise errors.ServiceUnavailableError("Connection issue") from e + except RequestException as e: - logger.info("Request failed: %s \n", {e}) - return None # raise our custom exception or log, etc. + # Catch other unclassified request exceptions (non-HTTP) + logger.error("Request failed: %s", str(e)) + + if e.response is not None: + logger.error("Status Code: %s", e.response.status_code) + logger.error("Response Body (raw): %s", e.response.text) + + try: + json_data = e.response.json() + logger.debug("Parsed JSON response: %s", json_data) + except ValueError: + logger.warning("Response body is not valid JSON.") + + if e.request is not None: + logger.error("Request URL: %s", e.request.url) + logger.error("Request Method: %s", e.request.method) + logger.error("Request Headers: %s", e.request.headers) + logger.error("Request Body: %s", e.request.body) + + raise errors.EdgeCloudPlatformError("Unhandled request error") from e return wrapper diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py b/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py index cce2e7371d1b85214d259a5f6efa15f6b1b3cdbb..20e4d18d46068022cb17a96ddb524b2bdd733004 100644 --- a/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py +++ b/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py @@ -801,7 +801,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): try: validated_data = gsma_schemas.ZoneRegisteredData.model_validate(mapped) except ValidationError as e: - raise ValueError(f"Invalid schema: {e}") + raise ValueError(f"Invalid schema: {e}") from e return build_custom_http_response( status_code=200, content=validated_data.model_dump(), diff --git a/tests/edgecloud/test_config_camara.py b/tests/edgecloud/test_config_camara.py index 6ce39c296215859b52f094cf9dd6087e5c1d4514..945545692cc2f206ecc145205653bb166601429a 100644 --- a/tests/edgecloud/test_config_camara.py +++ b/tests/edgecloud/test_config_camara.py @@ -68,14 +68,14 @@ CONFIG = { }, "aeros": { # Basic identifiers - "ZONE_ID": "", - "APP_ID": "", + "ZONE_ID": "8a4d95e8-8550-5664-8c67-b6c0c602f9be", + "APP_ID": "aeros-app-1", # CAMARA onboard_app payload "APP_ONBOARD_MANIFEST": { - "appId": "", - "name": "aeros-SDK-app", + "appId": "aeros-app-1", + "name": "aeros_SDK_app", "version": "1.0.0", - "appProvider": "aeros", + "appProvider": "aerOS_SDK", "packageType": "CONTAINER", "appRepo": { "type": "PUBLICREPO", @@ -113,11 +113,11 @@ CONFIG = { }, # CAMARA deploy_app payload "APP_DEPLOY_PAYLOAD": { - "appId": "", + "appId": "aeros-app-1", "appZones": [ { "EdgeCloudZone": { - "edgeCloudZoneId": "", + "edgeCloudZoneId": "8a4d95e8-8550-5664-8c67-b6c0c602f9be", "edgeCloudZoneName": "aeros-zone-1", "edgeCloudZoneStatus": "active", "edgeCloudProvider": "NCSRD", diff --git a/tests/edgecloud/test_config_gsma.py b/tests/edgecloud/test_config_gsma.py index 4cb97dffde930dfb197d5ebef053dd67768ba025..14a462197a4513992132d4ad05a5b1fc537bc402 100644 --- a/tests/edgecloud/test_config_gsma.py +++ b/tests/edgecloud/test_config_gsma.py @@ -159,7 +159,119 @@ CONFIG = { }, }, "aeros": { - # PLACEHOLDER + "ZONE_ID": "urn:ngsi-ld:Domain:ncsrd01", + "ARTEFACT_ID": "artefact-nginx-001", + "ARTEFACT_NAME": "aeros-component", + "REPO_NAME": "dockerhub", + "REPO_TYPE": "PUBLICREPO", + "REPO_URL": "docker.io/library/nginx:stable", + "APP_ONBOARD_MANIFEST_GSMA": { + "appId": "aeros-sdk-app", + "appProviderId": "aeros-sdk-provider", + "appDeploymentZones": ["urn:ngsi-ld:Domain:ncsrd01"], + "appMetaData": { + "appName": "aeros_SDK_app", + "version": "string", + "appDescription": "test aeros sdk app", + "mobilitySupport": False, + "accessToken": "MfxADOjxDgBhMrqmBeG8XdQFLp2XviG3cZ_LM7uQKc9b", + "category": "IOT", + }, + "appQoSProfile": { + "latencyConstraints": "NONE", + "bandwidthRequired": 1, + "multiUserClients": "APP_TYPE_SINGLE_USER", + "noOfUsersPerAppInst": 1, + "appProvisioning": True, + }, + "appComponentSpecs": [ + { + "serviceNameNB": "gsma-deployed-app-service-nb", + "serviceNameEW": "gsma-deployed-app-service-ew", + "componentName": "nginx-component", + "artefactId": "artefact-nginx-001", + } + ], + "appStatusCallbackLink": "string", + "edgeAppFQDN": "string", + }, + "APP_DEPLOY_PAYLOAD_GSMA": { + "appId": "aeros-sdk-app", + "appVersion": "1.0.0", + "appProviderId": "apps-sdk-deployer", + "zoneInfo": { + "zoneId": "urn:ngsi-ld:Domain:ncsrd01", + "flavourId": "FLAVOUR_BASIC", + "resourceConsumption": "RESERVED_RES_AVOID", + "resPool": "RESPOOL_DEFAULT", + }, + "appInstCallbackLink": "string", + }, + "PATCH_ONBOARDED_APP_GSMA": { + "appUpdQoSProfile": { + "latencyConstraints": "NONE", + "bandwidthRequired": 1, + "mobilitySupport": False, + "multiUserClients": "APP_TYPE_SINGLE_USER", + "noOfUsersPerAppInst": 1, + "appProvisioning": True, + }, + "appComponentSpecs": [ + { + "serviceNameNB": "gsma-deployed-app-service-nb", + "serviceNameEW": "gsma-deployed-app-service-ew", + "componentName": "nginx-component", + "artefactId": "artefact-nginx-001", + }, + { + "serviceNameNB": "JCjR0Lc3J0sm2PcItECdbHXtpCLQCfq3B", + "serviceNameEW": "N8KBAdqT8L_sWOxeFZs3XYn6oykTTFHLiPKOS7kdYbw", + "componentName": "9aCfCEDe2Dv0Peg", + "artefactId": "9c9143f0-f44f-49df-939e-1e8b891ba8f5", + }, + { + "serviceNameNB": "RIfXlfU9cDeLnrOBYzz9LJGdAjwPRp_3Mjp0Wq_RDlQiAPyXm", + "serviceNameEW": "31y8sCwvvyNCXfwtLhwJw6hoblG7ZcFzEjyFdAnzq7M8cxiOtDik0", + "componentName": "3kTa4zKEX", + "artefactId": "9c9143f0-f44f-49df-939e-1e8b891ba8f5", + }, + ], + }, + "ARTEFACT_PAYLOAD_GSMA": { + "artefactId": "artefact-nginx-001", + "appProviderId": "ncsrd-provider", + "artefactName": "nginx-web-server", + "artefactVersionInfo": "1.0.0", + "artefactDescription": "Containerized Nginx Web Server", + "artefactVirtType": "CONTAINER_TYPE", + "artefactFileName": "nginx-web-server-1.0.0.tgz", + "artefactFileFormat": "TARGZ", + "artefactDescriptorType": "COMPONENTSPEC", + "repoType": "PUBLICREPO", + "artefactRepoLocation": { + "repoURL": "docker.io/library/nginx:stable", + "userName": "", + "password": "", + "token": "", + }, + "artefactFile": "", + "componentSpec": [ + { + "componentName": "nginx-component", + "images": ["docker.io/library/nginx:stable"], + "numOfInstances": 1, + "restartPolicy": "Always", + "commandLineParams": {}, + "exposedInterfaces": [{"name": "http-api", "protocol": "TCP", "port": 8080}], + "computeResourceProfile": {"cpu": "2", "memory": "4Gi"}, + "compEnvParams": [{"name": "TEST_ENV", "value": "TEST_VALUE_ENV"}], + "deploymentConfig": {"replicaStrategy": "RollingUpdate", "maxUnavailable": 1}, + "persistentVolumes": [ + {"name": "NOT_USE", "mountPath": "NOT_USED", "size": "NOT_USED"} + ], + } + ], + }, }, "kubernetes": { # PLACEHOLDER diff --git a/tests/edgecloud/test_e2e_gsma.py b/tests/edgecloud/test_e2e_gsma.py index d2c361cee1e0afe70f3802f0a62959dffb1e1544..903fe21b8ed990e9884b2776f31369ff55cbf0bc 100644 --- a/tests/edgecloud/test_e2e_gsma.py +++ b/tests/edgecloud/test_e2e_gsma.py @@ -28,6 +28,9 @@ import pytest from requests import Response from sunrise6g_opensdk.common.sdk import Sdk as sdkclient +from sunrise6g_opensdk.edgecloud.adapters.aeros.client import ( + EdgeApplicationManager as aerosClient, +) from sunrise6g_opensdk.edgecloud.adapters.errors import EdgeCloudPlatformError from sunrise6g_opensdk.edgecloud.adapters.i2edge.client import ( EdgeApplicationManager as I2EdgeClient, @@ -78,6 +81,11 @@ def test_config_gsma_compliance(edgecloud_client): patch_payload = config["PATCH_ONBOARDED_APP_GSMA"] gsma_schemas.PatchOnboardedAppGSMA(**patch_payload) + # Validate ARTEFACT creation payload is GSMA-compliant + if "ARTEFACT_PAYLOAD_GSMA" in config: + artefact_payload = config["ARTEFACT_PAYLOAD_GSMA"] + gsma_schemas.Artefact(**artefact_payload) + except Exception as e: pytest.fail(f"Configuration is not GSMA-compliant for {edgecloud_client.client_name}: {e}") @@ -173,6 +181,19 @@ def test_artefact_methods_gsma(edgecloud_client): pytest.fail(f"Artefact creation failed: {e}") +@pytest.mark.parametrize("edgecloud_client", test_cases, ids=id_func, indirect=True) +def test_artefact_create_gsma(edgecloud_client): + config = CONFIG[edgecloud_client.client_name] + if isinstance(edgecloud_client, aerosClient): + try: + response = edgecloud_client.create_artefact_gsma( + request_body=config["ARTEFACT_PAYLOAD_GSMA"] + ) + assert response.status_code == 201 + except EdgeCloudPlatformError as e: + pytest.fail(f"Artefact creation failed: {e}") + + @pytest.mark.parametrize("edgecloud_client", test_cases, ids=id_func, indirect=True) def test_get_artefact_gsma(edgecloud_client): config = CONFIG[edgecloud_client.client_name] @@ -204,7 +225,7 @@ def test_onboard_app_gsma(edgecloud_client): try: response = edgecloud_client.onboard_app_gsma(config["APP_ONBOARD_MANIFEST_GSMA"]) assert isinstance(response, Response) - assert response.status_code == 200 + assert response.status_code == 201 except EdgeCloudPlatformError as e: pytest.fail(f"App onboarding failed: {e}") @@ -286,7 +307,7 @@ def test_get_all_deployed_apps_gsma(edgecloud_client): validated_instances = [] for instance_data in instances_data: - validated_instance = gsma_schemas.ZoneIdentifier(**instance_data) + validated_instance = gsma_schemas.AppInstance(**instance_data) validated_instances.append(validated_instance) except EdgeCloudPlatformError as e: @@ -327,6 +348,11 @@ def test_undeploy_app_gsma(edgecloud_client, app_instance_id_gsma): pytest.fail(f"App undeployment failed: {e}") +@pytest.mark.parametrize("edgecloud_client", test_cases, ids=id_func, indirect=True) +def test_timer_wait_10_seconds_2(edgecloud_client): + time.sleep(10) + + @pytest.mark.parametrize("edgecloud_client", test_cases, ids=id_func, indirect=True) def test_patch_onboarded_app_gsma(edgecloud_client): config = CONFIG[edgecloud_client.client_name] @@ -347,7 +373,7 @@ def test_delete_onboarded_app_gsma(edgecloud_client): app_id = config["APP_ONBOARD_MANIFEST_GSMA"]["appId"] response = edgecloud_client.delete_onboarded_app_gsma(app_id) assert isinstance(response, Response) - assert response.status_code == 200 + assert response.status_code == 204 except EdgeCloudPlatformError as e: pytest.fail(f"App onboarding deletion failed: {e}")