diff --git a/pyproject.toml b/pyproject.toml index 319623b68f9a32be912302ef4fc710afb3cfa6cb..e626c343f9630ac5eeb8113253f3aaf8c26eb0f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "sunrise6g-opensdk" -version = "1.0.7" +version = "1.0.11" description = "Open source SDK to abstract CAMARA/GSMA Transformation Functions (TFs) for Edge Cloud platforms, 5G network cores and Open RAN solutions." keywords = [ "Federation", diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/client.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/client.py index c287136fb21ca4834a4e05989e20f7678a2ebaac..cd85cab2aded3bdaa6e910de69e58ee348a4c440 100644 --- a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/client.py +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/client.py @@ -6,32 +6,52 @@ # - Andreas Sakellaropoulos (asakellaropoulos@iit.demokritos.gr) ## import uuid +import json from typing import Any, Dict, List, Optional - -import yaml +from collections import defaultdict +from pydantic import ValidationError from requests import Response from sunrise6g_opensdk.edgecloud.adapters.aeros import config +from sunrise6g_opensdk.edgecloud.adapters.aeros.utils import ( + urn_to_uuid, encode_app_instance_name, map_aeros_service_status_to_gsma) from sunrise6g_opensdk.edgecloud.adapters.aeros.continuum_client import ContinuumClient +from sunrise6g_opensdk.edgecloud.adapters.aeros.storageManagement import inMemoryStorage +from sunrise6g_opensdk.edgecloud.adapters.aeros.converters import ( + camara2aeros_converter, gsma2aeros_converter, aeros2gsma_zone_details) +from sunrise6g_opensdk.edgecloud.adapters.aeros.storageManagement.appStorageManager import ( + AppStorageManager, ) from sunrise6g_opensdk.edgecloud.adapters.errors import EdgeCloudPlatformError -from sunrise6g_opensdk.edgecloud.core.edgecloud_interface import ( - EdgeCloudManagementInterface, +from sunrise6g_opensdk.edgecloud.adapters.aeros.errors import ( + ResourceNotFoundError, + InvalidArgumentError, ) +from sunrise6g_opensdk.edgecloud.core.edgecloud_interface import ( + EdgeCloudManagementInterface, ) +from sunrise6g_opensdk.edgecloud.core.utils import build_custom_http_response +from sunrise6g_opensdk.edgecloud.core import camara_schemas, gsma_schemas from sunrise6g_opensdk.logger import setup_logger class EdgeApplicationManager(EdgeCloudManagementInterface): """ - aerOS Continuum Client - FIXME: Handle None responses from continuum client + aerOS Edge Application Manager Adapter implementing CAMARA and GSMA APIs. """ - def __init__(self, base_url: str, **kwargs): + def __init__(self, + base_url: str, + storage: Optional[AppStorageManager] = None, + **kwargs): + ''' + storage can + ''' self.base_url = base_url - self.logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE) - self._app_store: Dict[str, Dict] = {} - self._deployed_services: Dict[str, List[str]] = {} - self._stopped_services: Dict[str, List[str]] = {} + self.logger = setup_logger(__name__, + is_debug=True, + file_name=config.LOG_FILE) + self.content_type_gsma = "application/json" + self.encoding_gsma = "utf-8" + self.storage = storage or inMemoryStorage.InMemoryAppStorage() # Overwrite config values if provided via kwargs if "aerOS_API_URL" in kwargs: @@ -48,32 +68,261 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): if not config.aerOS_HLO_TOKEN: raise ValueError("Missing 'aerOS_HLO_TOKEN'") - def onboard_app(self, app_manifest: Dict) -> Dict: + # ######################################################################## + # CAMARA EDGE CLOUD MANAGEMENT API + # ######################################################################## + + # ------------------------------------------------------------------------ + # Edge Cloud Zone Management (CAMARA) + # ------------------------------------------------------------------------ + + # Zones methods + def get_edge_cloud_zones(self, + region: Optional[str] = None, + status: Optional[str] = None) -> Response: + """ + Retrieves a list of available Edge Cloud Zones. + + :param region: Filter by geographical region. + :param status: Filter by status (active, inactive, unknown). + :return: Response with list of Edge Cloud Zones in CAMARA format. + """ + try: + aeros_client = ContinuumClient(self.base_url) + ngsild_params = "type=Domain&format=simplified" + camara_response = aeros_client.query_entities(ngsild_params) + aeros_domains = camara_response.json() + if config.DEBUG: + self.logger.debug("aerOS edge cloud zones: %s", aeros_domains) + + zone_list = [] + for domain in aeros_domains: + domain_id = domain.get("id") + if not domain_id: + continue + + # Normalize status + raw_status = domain.get("domainStatus", "") + status_token = raw_status.split(":")[-1].strip().lower() + status = "Active" if status_token == "functional" else "Unknown" + + zone = { + "edgeCloudZoneId": + str(urn_to_uuid(domain_id)), + "edgeCloudZoneName": + domain_id, # or domain_id.split(":")[-1] if you prefer short name + "edgeCloudProvider": + (domain.get("owner", ["unknown"])[0] if isinstance( + domain.get("owner"), list) else domain.get( + "owner", "unknown")), + "status": + status, + "geographyDetails": + "NOT_USED", + } + zone_list.append(zone) + + # Store zones keyed by the aerOS domain id + self.storage.store_zones( + {d["edgeCloudZoneName"]: d + for d in zone_list}) + if config.DEBUG: + self.logger.debug("aerOS Local domains store: %s", zone_list) + return build_custom_http_response( + status_code=camara_response.status_code, + content=zone_list, + headers={"Content-Type": "application/json"}, + encoding=camara_response.encoding, + url=camara_response.url, + request=camara_response.request, + ) + except json.JSONDecodeError as e: + self.logger.error("Invalid JSON in aerOS response: %s", e) + raise + except KeyError as e: + self.logger.error("Missing expected field in aerOS data: %s", e) + raise + except EdgeCloudPlatformError as e: + self.logger.error("Error retrieving edge cloud zones: %s", e) + raise + + def get_edge_cloud_zones_details(self, + zone_id: str, + flavour_id: Optional[str] = None) -> Dict: + """ + Get details of a specific edge cloud zone. + :param zone_id: The ID of the edge cloud zone + :param flavour_id: Optional flavour ID to filter the results + :return: Details of the edge cloud zone + """ + aeros_client = ContinuumClient(self.base_url) + ngsild_params = f'format=simplified&type=InfrastructureElement&q=domain=="{zone_id}"' + if config.DEBUG: + self.logger.debug( + "Querying infrastructure elements for zone %s with params: %s", + zone_id, + ngsild_params, + ) + try: + # Query the infrastructure elements for the specified zonese + aeros_response = aeros_client.query_entities(ngsild_params) + aeros_domain_ies = aeros_response.json() + # Transform the infrastructure elements into the required format + # and return the details of the edge cloud zone + camara_response = self.transform_infrastructure_elements( + domain_ies=aeros_domain_ies, domain=zone_id) + if config.DEBUG: + self.logger.debug("Transformed response: %s", camara_response) + # Return the transformed response + return build_custom_http_response( + status_code=aeros_response.status_code, + content=camara_response, + headers={"Content-Type": "application/json"}, + encoding=aeros_response.encoding, + url=aeros_response.url, + request=aeros_response.request, + ) + except json.JSONDecodeError as e: + self.logger.error("Invalid JSON in aerOS response: %s", e) + raise + except KeyError as e: + self.logger.error("Missing expected field in aerOS data: %s", e) + raise + except EdgeCloudPlatformError as e: + self.logger.error("Error retrieving edge cloud zones: %s", e) + raise + + def transform_infrastructure_elements(self, domain_ies: List[Dict[str, + Any]], + domain: str) -> Dict[str, Any]: + """ + Transform the infrastructure elements into a format suitable for the + edge cloud zone details. + :param domain_ies: List of infrastructure elements + :param domain: The ID of the edge cloud zone + :return: Transformed details of the edge cloud zone + """ + total_cpu = 0 + total_ram = 0 + total_disk = 0 + total_available_ram = 0 + total_available_disk = 0 + + flavours_supported = [] + + for element in domain_ies: + total_cpu += element.get("cpuCores", 0) + total_ram += element.get("ramCapacity", 0) + total_available_ram += element.get("availableRam", 0) + total_disk += element.get("diskCapacity", 0) + total_available_disk += element.get("availableDisk", 0) + + # Create a flavour per machine + flavour = { + "flavourId": + f"{element.get('hostname')}-{element.get('containerTechnology')}", + "cpuArchType": + f"{element.get('cpuArchitecture')}", + "supportedOSTypes": [{ + "architecture": f"{element.get('cpuArchitecture')}", + "distribution": + f"{element.get('operatingSystem')}", # assume + "version": "OS_VERSION_UBUNTU_2204_LTS", + "license": "OS_LICENSE_TYPE_FREE", + }], + "numCPU": + element.get("cpuCores", 0), + "memorySize": + element.get("ramCapacity", 0), + "storageSize": + element.get("diskCapacity", 0), + } + flavours_supported.append(flavour) + + result = { + "zoneId": + domain, + "reservedComputeResources": [{ + "cpuArchType": "ISA_X86_64", + "numCPU": int(total_cpu), + "memory": total_ram, + }], + "computeResourceQuotaLimits": [{ + "cpuArchType": "ISA_X86_64", + "numCPU": int(total_cpu * 2), # Assume quota is 2x total? + "memory": total_ram * 2, + }], + "flavoursSupported": + flavours_supported, + } + return result + + # ------------------------------------------------------------------------ + # Application Management (CAMARA-Compliant) + # ------------------------------------------------------------------------ + + # Onboarding methods + def onboard_app(self, app_manifest: Dict) -> Response: + # Validate CAMARA input + camara_schemas.AppManifest(**app_manifest) + app_id = app_manifest.get("appId") if not app_id: raise EdgeCloudPlatformError("Missing 'appId' in app manifest") - if app_id in self._app_store: - raise EdgeCloudPlatformError(f"Application with id '{app_id}' already exists") + if self.storage.get_app(app_id=app_id): + raise EdgeCloudPlatformError( + f"Application with id '{app_id}' already exists") - self._app_store[app_id] = app_manifest + self.storage.store_app(app_id, app_manifest) self.logger.debug("Onboarded application with id: %s", app_id) - return {"appId": app_id} - - def get_all_onboarded_apps(self) -> List[Dict]: - self.logger.debug("Onboarded applications: %s", list(self._app_store.keys())) - return list(self._app_store.values()) + submitted_app = camara_schemas.SubmittedApp( + appId=camara_schemas.AppId(app_id)) + return build_custom_http_response( + status_code=201, + content=submitted_app.model_dump(mode="json"), + headers={"Content-Type": "application/json"}, + encoding="utf-8") + + def get_all_onboarded_apps(self) -> Response: + apps = self.storage.list_apps() + self.logger.debug("Onboarded applications: %s", apps) + return build_custom_http_response( + status_code=200, + content=apps, + headers={"Content-Type": "application/json"}, + encoding="utf-8", + ) - def get_onboarded_app(self, app_id: str) -> Dict: - if app_id not in self._app_store: - raise EdgeCloudPlatformError(f"Application with id '{app_id}' does not exist") + def get_onboarded_app(self, app_id: str) -> Response: + app_data = self.storage.get_app(app_id) + if not app_data: + raise EdgeCloudPlatformError( + f"Application with id '{app_id}' does not exist") self.logger.debug("Retrieved application with id: %s", app_id) - return self._app_store[app_id] - def delete_onboarded_app(self, app_id: str) -> None: - if app_id not in self._app_store: - raise EdgeCloudPlatformError(f"Application with id '{app_id}' does not exist") - service_instances = self._stopped_services.get(app_id, []) + app_manifest_response = { + "appManifest": app_data + } # We already keep the app manifest when onboarding + + return build_custom_http_response( + status_code=200, + content=app_manifest_response, + headers={"Content-Type": "application/json"}, + encoding="utf-8", + ) + + def delete_onboarded_app(self, app_id: str) -> Response: + app = self.storage.get_app(app_id) + if not app: + raise EdgeCloudPlatformError( + f"Application with id '{app_id}' does not exist") + + service_instances = self.storage.get_stopped_instances(app_id=app_id) + if not service_instances: + raise EdgeCloudPlatformError( + f"Application with id '{app_id}' cannot be deleted — please stop it first" + ) self.logger.debug( "Deleting application with id: %s and instances: %s", app_id, @@ -81,152 +330,155 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): ) for service_instance in service_instances: self._purge_deployed_app_from_continuum(service_instance) - self.logger.debug("successfully purged service instance: %s", service_instance) - del self._stopped_services[app_id] # Clean up stopped services - del self._app_store[app_id] # Remove from onboarded apps - - def _generate_service_id(self, app_id: str) -> str: - return f"urn:ngsi-ld:Service:{app_id}-{uuid.uuid4().hex[:4]}" - - def _generate_tosca_yaml_dict(self, app_manifest: Dict, app_zones: List[Dict]) -> Dict: - component = app_manifest.get("componentSpec", [{}])[0] - component_name = component.get("componentName", "application") - - image_path = app_manifest.get("appRepo", {}).get("imagePath", "") - image_file = image_path.split("/")[-1] - repository_url = "/".join(image_path.split("/")[:-1]) if "/" in image_path else "docker_hub" - zone_id = app_zones[0].get("EdgeCloudZone", {}).get("edgeCloudZoneId", "default-zone") - - # Extract minNodeMemory - min_node_memory = ( - app_manifest.get("requiredResources", {}) - .get("applicationResources", {}) - .get("cpuPool", {}) - .get("topology", {}) - .get("minNodeMemory", 1024) + self.logger.debug("successfully purged service instance: %s", + service_instance) + + self.storage.remove_stopped_instances(app_id) + self.storage.delete_app(app_id) + + return build_custom_http_response( + status_code=204, + content=b"", # absolutely no body for 204 + headers={"Content-Type": "application/json"}, + encoding="utf-8", + # url=None, + # request=None, ) - ports = {} - for iface in component.get("networkInterfaces", []): - interface_id = iface.get("interfaceId", "default") - protocol = iface.get("protocol", "TCP").lower() - port = iface.get("port", 8080) - ports[interface_id] = {"properties": {"protocol": [protocol], "source": port}} - - expose_ports = any( - iface.get("visibilityType") == "VISIBILITY_EXTERNAL" - for iface in component.get("networkInterfaces", []) - ) - - yaml_dict = { - "tosca_definitions_version": "tosca_simple_yaml_1_3", - "description": f"TOSCA for {app_manifest.get('name', 'application')}", - "serviceOverlay": False, - "node_templates": { - component_name: { - "type": "tosca.nodes.Container.Application", - "isJob": False, - "requirements": [ - { - "network": { - "properties": { - "ports": ports, - "exposePorts": expose_ports, - } - } - }, - { - "host": { - "node_filter": { - "capabilities": [ - { - "host": { - "properties": { - "cpu_arch": {"equal": "x64"}, - "realtime": {"equal": False}, - "cpu_usage": {"less_or_equal": "0.1"}, - "mem_size": { - "greater_or_equal": str(min_node_memory) - }, - "domain_id": {"equal": zone_id}, - } - } - } - ], - "properties": None, - } - } - }, - ], - "artifacts": { - "application_image": { - "file": image_file, - "type": "tosca.artifacts.Deployment.Image.Container.Docker", - "is_private": False, - "repository": repository_url, - } - }, - "interfaces": { - "Standard": { - "create": { - "implementation": "application_image", - "inputs": {"cliArgs": [], "envVars": []}, - } - } - }, - } - }, - } - - return yaml_dict - - def deploy_app(self, app_id: str, app_zones: List[Dict]) -> Dict: + def _generate_service_id(self, app_id: str) -> str: + ''' + Generate a unique service ID for aerOS continuum. + The service ID is in the format of a NGSI-LD URN with a random suffix. + :param app_id: The application ID + :return: The generated service ID + ''' + return f"{app_id}-{uuid.uuid4().hex[:4]}" + + def _generate_aeros_service_id(self, camara_app_instance_id: str) -> str: + ''' + Convert CAMARA appInstanceId to aerOS service ID. + :param camara_app_instance_id: The CAMARA appInstanceId + :return: The corresponding aerOS service ID + ''' + return f"urn:ngsi-ld:Service:{camara_app_instance_id}" + + # Instantiation methods + def deploy_app(self, app_id: str, app_zones: List[Dict]) -> Response: # 1. Get app CAMARA manifest - app_manifest = self._app_store.get(app_id) + app_manifest = self.storage.get_app(app_id) if not app_manifest: - raise EdgeCloudPlatformError(f"Application with id '{app_id}' does not exist") + raise EdgeCloudPlatformError( + f"Application with id '{app_id}' does not exist") + app_manifest = camara_schemas.AppManifest.model_validate(app_manifest) # 2. Generate unique service ID + # (aerOS) service id <=> CAMARA appInstanceId service_id = self._generate_service_id(app_id) # 3. Convert dict to YAML string - yaml_dict = self._generate_tosca_yaml_dict(app_manifest, app_zones) - tosca_yaml = yaml.dump(yaml_dict, sort_keys=False) - self.logger.info("Generated TOSCA YAML:") - self.logger.info(tosca_yaml) + # 3a. Get aerOS domain IDs from zones uuids + aeros_domain_ids = [ + self.storage.resolve_domain_id_by_zone_uuid( + z["EdgeCloudZone"]["edgeCloudZoneId"]) for z in app_zones + if z.get("EdgeCloudZone", {}).get("edgeCloudZoneId") + ] + tosca_str = camara2aeros_converter.generate_tosca( + app_manifest=app_manifest, app_zones=aeros_domain_ids) + if config.DEBUG: + self.logger.info("Generated TOSCA YAML:") + self.logger.info(tosca_str) # 4. Instantiate client and call continuum to deploy service - aeros_client = ContinuumClient(self.base_url) - response = aeros_client.onboard_and_deploy_service(service_id, tosca_yaml) - - if "serviceId" not in response: - raise EdgeCloudPlatformError( - "Invalid response from onboard_service: missing 'serviceId'" + try: + aeros_client = ContinuumClient(self.base_url) + aeros_response = aeros_client.onboard_and_deploy_service( + self._generate_aeros_service_id(service_id), tosca_str) + + if "serviceId" not in aeros_response.json(): + raise EdgeCloudPlatformError( + "Invalid response from onboard_service: missing 'serviceId'" + ) + + # Build CAMARA-compliant info + app_provider_id = app_manifest.appProvider.root + zone_id = app_zones[0].get("EdgeCloudZone", + {}).get("edgeCloudZoneId", + "default-zone") + app_instance_info = camara_schemas.AppInstanceInfo( + name=camara_schemas.AppInstanceName( + encode_app_instance_name(service_id)), + appId=camara_schemas.AppId(app_id), + appInstanceId=camara_schemas.AppInstanceId(service_id), + appProvider=camara_schemas.AppProvider(app_provider_id), + status=camara_schemas.Status.instantiating, + edgeCloudZoneId=camara_schemas.EdgeCloudZoneId(zone_id), ) - # 5. Track deployment - if app_id not in self._deployed_services: - self._deployed_services[app_id] = [] - self._deployed_services[app_id].append(service_id) + # 5. Track deployment + self.storage.store_deployment(app_instance=app_instance_info) - # 6. Return expected format - return {"appInstanceId": response["serviceId"]} + # 6. Return expected format + self.logger.info("App deployment request submitted successfully") + + # CAMARA spec requires appInstances array wrapper + camara_response = app_instance_info.model_dump(mode="json") + # Add mandatory Location header + location_url = f"/appinstances/{service_id}" + camara_headers = { + "Content-Type": "application/json", + "Location": location_url + } + + return build_custom_http_response( + status_code=aeros_response.status_code, + content=camara_response, + headers=camara_headers, + encoding="utf-8", + url=aeros_response.url, + request=aeros_response.request) + except EdgeCloudPlatformError as ex: + # Catch all platform-specific errors. + # All custom exception types (InvalidArgumentError, UnauthenticatedError, etc.) + # inherit from EdgeCloudPlatformError, so a single handler here will capture + # any of them. We can further elaborate per eachone of needed. + self.logger.error("Failed to deploy app '%s': %s", app_id, str(ex)) + raise def get_all_deployed_apps( self, app_id: Optional[str] = None, app_instance_id: Optional[str] = None, region: Optional[str] = None, - ) -> List[Dict]: - deployed = [] - for stored_app_id, instance_ids in self._deployed_services.items(): - for instance_id in instance_ids: - deployed.append({"appId": stored_app_id, "appInstanceId": instance_id}) - return deployed - - def get_deployed_app( - self, app_instance_id: str, app_id: Optional[str] = None, region: Optional[str] = None ) -> Response: + + instances = self.storage.find_deployments(app_id, app_instance_id, + region) + + # CAMARA spec format for multiple instances response + camara_response = { + "appInstances": [ + inst.model_dump( + mode="json") if hasattr(inst, "model_dump") else inst + for inst in instances + ] + } + + self.logger.info("All app instances retrieved successfully") + if config.DEBUG: + self.logger.debug("Onboarded applications: %s", camara_response) + return build_custom_http_response( + status_code=200, + content=camara_response, + headers={"Content-Type": "application/json"}, + encoding="utf-8", + # url=response.url, + # request=response.request, + ) + + def get_deployed_app(self, + app_instance_id: str, + app_id: Optional[str] = None, + region: Optional[str] = None) -> Response: """ Placeholder implementation for CAMARA compliance. Retrieves information of a specific application instance. @@ -236,36 +488,84 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param region: Optional filter by Edge Cloud region :return: Response with application instance details """ - # TODO: Implement actual aeros-specific logic for retrieving a specific deployed app - raise NotImplementedError("get_deployed_app is not yet implemented for aeros adapter") + try: + if not app_instance_id: + raise InvalidArgumentError("app_instance_id is required") + + # Look up the instance in CAMARA storage (returns List[AppInstanceInfo]) + matches = self.storage.find_deployments( + app_id=app_id, + app_instance_id=app_instance_id, + region=region, + ) + if not matches: + # Be explicit in the error so callers know what was used to filter + scope = [] + scope.append(f"instance_id={app_instance_id}") + if app_id: + scope.append(f"app_id={app_id}") + if region: + scope.append(f"region={region}") + raise ResourceNotFoundError( + f"Deployed app not found ({', '.join(scope)})") + + # If multiple matched (shouldn't normally happen after filtering by instance id), + # return the first deterministically. + inst = matches[0] + + # Serialize to JSON-safe dict + content = {"appInstance": inst.model_dump(mode="json")} + + return build_custom_http_response( + status_code=200, + content=content, + headers={"Content-Type": "application/json"}, + encoding="utf-8", + ) + + except (InvalidArgumentError, ResourceNotFoundError): + # Let well-typed domain errors propagate + raise + except EdgeCloudPlatformError: + raise + except Exception as e: + # Defensive catch-all with context + self.logger.exception( + "Unhandled error retrieving deployed app instance '%s' (app_id=%s, region=%s): %s", + app_instance_id, app_id, region, e) + raise EdgeCloudPlatformError(str(e)) def _purge_deployed_app_from_continuum(self, app_id: str) -> None: + ''' + Purge the deployed application from aerOS continuum. + :param app_id: The application ID to purge + All instances of this app should be stopped + ''' aeros_client = ContinuumClient(self.base_url) - response = aeros_client.purge_service(app_id) + response = aeros_client.purge_service( + self._generate_aeros_service_id(app_id)) if response: - self.logger.debug("Purged deployed application with id: %s", app_id) + self.logger.debug("Purged deployed application with id: %s", + self._generate_aeros_service_id(app_id)) else: raise EdgeCloudPlatformError( - f"Failed to purg service with id from the continuum '{app_id}'" + f"Failed to purge service with id from the continuum '{app_id}'" ) - def undeploy_app(self, app_instance_id: str) -> None: - # 1. Locate app_id corresponding to this instance - found_app_id = None - for app_id, instances in self._deployed_services.items(): - if app_instance_id in instances: - found_app_id = app_id - break - - if not found_app_id: + def undeploy_app(self, app_instance_id: str) -> Response: + # 1. Locate app_id corresponding to this instance and + # remove from deployed instances for this appId + app_id = self.storage.remove_deployment( + app_instance_id=app_instance_id) + if not app_id: raise EdgeCloudPlatformError( - f"No deployed app instance with ID '{app_instance_id}' found" - ) + f"No deployed app instance with ID '{app_instance_id}' found") # 2. Call the external undeploy_service aeros_client = ContinuumClient(self.base_url) try: - aeros_client.undeploy_service(app_instance_id) + aeros_response = aeros_client.undeploy_service( + self._generate_aeros_service_id(app_instance_id)) except Exception as e: raise EdgeCloudPlatformError( f"Failed to undeploy app instance '{app_instance_id}': {str(e)}" @@ -276,187 +576,159 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): # self._purge_deployed_app_from_continuum(app_instance_id) # 4. Clean up internal tracking - self._deployed_services[found_app_id].remove(app_instance_id) - # Add instance to _stopped_services to purge it later - if found_app_id not in self._stopped_services: - self._stopped_services[found_app_id] = [] - self._stopped_services[found_app_id].append(app_instance_id) - # If app has no instances left, remove it from deployed services - if not self._deployed_services[found_app_id]: - del self._deployed_services[found_app_id] - - def get_edge_cloud_zones( - self, region: Optional[str] = None, status: Optional[str] = None - ) -> List[Dict]: - aeros_client = ContinuumClient(self.base_url) - ngsild_params = "type=Domain&format=simplified" - aeros_domains = aeros_client.query_entities(ngsild_params) - return [ - { - "zoneId": domain["id"], - "status": domain["domainStatus"].split(":")[-1].lower(), - "geographyDetails": "NOT_USED", - } - for domain in aeros_domains - ] - - def get_edge_cloud_zones_details(self, zone_id: str, flavour_id: Optional[str] = None) -> Dict: - """ - Get details of a specific edge cloud zone. - :param zone_id: The ID of the edge cloud zone - :param flavour_id: Optional flavour ID to filter the results - :return: Details of the edge cloud zone - """ - # Minimal mocked response based on required fields of 'ZoneRegisteredData' in GSMA OPG E/WBI API - # return { - # "zoneId": - # zone_id, - # "reservedComputeResources": [{ - # "cpuArchType": "ISA_X86_64", - # "numCPU": "4", - # "memory": 8192, - # }], - # "computeResourceQuotaLimits": [{ - # "cpuArchType": "ISA_X86_64", - # "numCPU": "8", - # "memory": 16384, - # }], - # "flavoursSupported": [{ - # "flavourId": - # "medium-x86", - # "cpuArchType": - # "ISA_X86_64", - # "supportedOSTypes": [{ - # "architecture": "x86_64", - # "distribution": "UBUNTU", - # "version": "OS_VERSION_UBUNTU_2204_LTS", - # "license": "OS_LICENSE_TYPE_FREE", - # }], - # "numCPU": - # 4, - # "memorySize": - # 8192, - # "storageSize": - # 100, - # }], - # # - # } - aeros_client = ContinuumClient(self.base_url) - ngsild_params = f'format=simplified&type=InfrastructureElement&q=domain=="{zone_id}"' - self.logger.debug( - "Querying infrastructure elements for zone %s with params: %s", - zone_id, - ngsild_params, + self.storage.remove_deployment(app_instance_id) + self.storage.store_stopped_instance(app_id, app_instance_id) + return build_custom_http_response( + status_code=204, + content="", + headers={"Content-Type": "application/json"}, + encoding="utf-8", + url=aeros_response.url, + request=aeros_response.request, ) - # Query the infrastructure elements for the specified zonese - aeros_domain_ies = aeros_client.query_entities(ngsild_params) - # Transform the infrastructure elements into the required format - # and return the details of the edge cloud zone - response = self.transform_infrastructure_elements( - domain_ies=aeros_domain_ies, domain=zone_id - ) - self.logger.debug("Transformed response: %s", response) - # Return the transformed response - return response - - def transform_infrastructure_elements( - self, domain_ies: List[Dict[str, Any]], domain: str - ) -> Dict[str, Any]: - """ - Transform the infrastructure elements into a format suitable for the - edge cloud zone details. - :param domain_ies: List of infrastructure elements - :param domain: The ID of the edge cloud zone - :return: Transformed details of the edge cloud zone - """ - total_cpu = 0 - total_ram = 0 - total_disk = 0 - total_available_ram = 0 - total_available_disk = 0 - flavours_supported = [] + # ######################################################################## + # GSMA EDGE COMPUTING API (EWBI OPG) - FEDERATION + # ######################################################################## - for element in domain_ies: - total_cpu += element.get("cpuCores", 0) - total_ram += element.get("ramCapacity", 0) - total_available_ram += element.get("availableRam", 0) - total_disk += element.get("diskCapacity", 0) - total_available_disk += element.get("availableDisk", 0) + # ------------------------------------------------------------------------ + # Zone Management (GSMA) + # ------------------------------------------------------------------------ - # Create a flavour per machine - flavour = { - "flavourId": f"{element.get('hostname')}-{element.get('containerTechnology')}", - "cpuArchType": f"{element.get('cpuArchitecture')}", - "supportedOSTypes": [ - { - "architecture": f"{element.get('cpuArchitecture')}", - "distribution": f"{element.get('operatingSystem')}", # assume - "version": "OS_VERSION_UBUNTU_2204_LTS", - "license": "OS_LICENSE_TYPE_FREE", - } - ], - "numCPU": element.get("cpuCores", 0), - "memorySize": element.get("ramCapacity", 0), - "storageSize": element.get("diskCapacity", 0), - } - flavours_supported.append(flavour) - - result = { - "zoneId": domain, - "reservedComputeResources": [ - { - "cpuArchType": "ISA_X86_64", - "numCPU": str(total_cpu), - "memory": total_ram, - } - ], - "computeResourceQuotaLimits": [ - { - "cpuArchType": "ISA_X86_64", - "numCPU": str(total_cpu * 2), # Assume quota is 2x total? - "memory": total_ram * 2, - } - ], - "flavoursSupported": flavours_supported, - } - return result - - # --- GSMA-specific methods --- - - # FederationManagement - - def get_edge_cloud_zones_list_gsma(self) -> List: + def get_edge_cloud_zones_list_gsma(self) -> Response: """ - Retrieves list of all Zones + Retrieves details of all Zones for GSMA federation. - :return: List. + :return: Response with zone details in GSMA format. """ - pass + try: + aeros_client = ContinuumClient(self.base_url) + ngsild_params = "type=Domain&format=simplified" + aeros_response = aeros_client.query_entities(ngsild_params) + aeros_domains = aeros_response.json() + zone_list = [{ + "zoneId": domain["id"], + "geolocation": "NOT_Available", + "geographyDetails": domain["description"], + } for domain in aeros_domains] + return build_custom_http_response( + status_code=aeros_response.status_code, + content=zone_list, + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=aeros_response.url, + request=aeros_response.request, + ) + except json.JSONDecodeError as e: + self.logger.error("Invalid JSON in aerOS response: %s", e) + raise + except KeyError as e: + self.logger.error("Missing expected field in aerOS data: %s", e) + raise + except EdgeCloudPlatformError as e: + self.logger.error("Error retrieving edge cloud zones: %s", e) + raise # AvailabilityZoneInfoSynchronization - def get_edge_cloud_zones_gsma(self) -> List: + def get_edge_cloud_zones_gsma(self) -> Response: """ - Retrieves details of all Zones + Retrieves details of all Zones with compute resources and flavours for GSMA federation. - :return: List. + :return: Response with zones and detailed resource information. """ - pass + aeros_client = ContinuumClient(self.base_url) + ngsild_params = 'format=simplified&type=InfrastructureElement' - def get_edge_cloud_zone_details_gsma(self, zone_id: str) -> Dict: + try: + # Query the infrastructure elements whithin the whole continuum + aeros_response = aeros_client.query_entities(ngsild_params) + aeros_ies = aeros_response.json() # IEs as list of dicts + + # Create a dict that groups by "domain" + grouped_by_domain = defaultdict(list) + for item in aeros_ies: + domain = item["domain"] + grouped_by_domain[domain].append(item) + + # Transform the IEs to required format + # per domain and append to response list + gsma_response = [] + for domain, ies in grouped_by_domain.items(): + result = aeros2gsma_zone_details.transformer(domain_ies=ies, + domain=domain) + gsma_response.append(result) + # Return the transformed response + return build_custom_http_response( + status_code=aeros_response.status_code, + content=gsma_response, + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=aeros_response.url, + request=aeros_response.request, + ) + except json.JSONDecodeError as e: + self.logger.error("Invalid JSON in aerOS response: %s", e) + raise + except KeyError as e: + self.logger.error("Missing expected field in aerOS data: %s", e) + raise + except EdgeCloudPlatformError as e: + self.logger.error("Error retrieving edge cloud zones: %s", e) + raise + + def get_edge_cloud_zone_details_gsma(self, zone_id: str) -> Response: """ Retrieves details of a specific Edge Cloud Zone reserved - for the specified zone by the partner OP. + for the specified zone by the partner OP using GSMA federation. :param zone_id: Unique identifier of the Edge Cloud Zone. - :return: Dictionary with Edge Cloud Zone details. + :return: Response with Edge Cloud Zone details. """ - pass - - # ArtefactManagement - - def create_artefact_gsma(self, request_body: dict): + aeros_client = ContinuumClient(self.base_url) + ngsild_params = f'format=simplified&type=InfrastructureElement&q=domain=="{zone_id}"' + if config.DEBUG: + self.logger.debug( + "Querying infrastructure elements for zone %s with params: %s", + zone_id, + ngsild_params, + ) + try: + # Query the infrastructure elements for the specified zonese + aeros_response = aeros_client.query_entities(ngsild_params) + aeros_domain_ies = aeros_response.json() + # Transform the infrastructure elements into the required format + # and return the details of the edge cloud zone + # camara_response = self.transform_infrastructure_elements( + # domain_ies=aeros_domain_ies, domain=zone_id) + gsma_response = aeros2gsma_zone_details.transformer( + domain_ies=aeros_domain_ies, domain=zone_id) + if config.DEBUG: + self.logger.debug("Transformed response: %s", gsma_response) + # Return the transformed response + return build_custom_http_response( + status_code=aeros_response.status_code, + content=gsma_response, + headers={"Content-Type": "application/json"}, + encoding=aeros_response.encoding, + url=aeros_response.url, + request=aeros_response.request, + ) + except json.JSONDecodeError as e: + self.logger.error("Invalid JSON in aerOS response: %s", e) + raise + except KeyError as e: + self.logger.error("Missing expected field in aerOS data: %s", e) + raise + except EdgeCloudPlatformError as e: + self.logger.error("Error retrieving edge cloud zones: %s", e) + raise + + # ------------------------------------------------------------------------ + # Artefact Management (GSMA) + # ------------------------------------------------------------------------ + + def create_artefact_gsma(self, request_body: dict) -> Response: """ Uploads application artefact on partner OP. Artefact is a zip file containing scripts and/or packaging files like Terraform or Helm @@ -465,27 +737,87 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param request_body: Payload with artefact information. :return: """ - pass + try: + artefact = gsma_schemas.Artefact.model_validate(request_body) + self.storage.store_artefact_gsma(artefact) + return build_custom_http_response( + status_code=201, + content=artefact.model_dump(mode="json"), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) + except ValidationError as e: + self.logger.error("Invalid GSMA artefact schema: %s", e) + raise InvalidArgumentError(str(e)) - def get_artefact_gsma(self, artefact_id: str) -> Dict: + def get_artefact_gsma(self, artefact_id: str) -> Response: """ Retrieves details about an artefact :param artefact_id: Unique identifier of the artefact. :return: Dictionary with artefact details. """ - pass + art = self.storage.get_artefact_gsma(artefact_id) + if not art: + raise ResourceNotFoundError( + f"GSMA artefact '{artefact_id}' not found") + return build_custom_http_response( + status_code=200, + content=art.model_dump(mode="json"), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) - def delete_artefact_gsma(self, artefact_id: str): + def list_artefacts_gsma(self): + """List all GSMA Artefacts.""" + arts = [ + a.model_dump(mode="json") + for a in self.storage.list_artefacts_gsma() + ] + return build_custom_http_response( + status_code=200, + content=arts, + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) + + def delete_artefact_gsma(self, artefact_id: str) -> Response: """ Removes an artefact from partners OP. :param artefact_id: Unique identifier of the artefact. :return: """ - pass - - # ApplicationOnboardingManagement + if not self.storage.get_artefact_gsma(artefact_id): + raise ResourceNotFoundError( + f"GSMA artefact '{artefact_id}' not found") + self.storage.delete_artefact_gsma(artefact_id) + return build_custom_http_response(status_code=204, + content=b"", + headers={}, + encoding=None) + + # ------------------------------------------------------------------------ + # Application Onboarding Management (GSMA) + # ------------------------------------------------------------------------ + + def _to_application_model( + self, entry: gsma_schemas.AppOnboardManifestGSMA + ) -> gsma_schemas.ApplicationModel: + """Internal helper to convert GSMA onboarding entry into canonical ApplicationModel.""" + zones = [ + gsma_schemas.AppDeploymentZone(countryCode="XX", zoneInfo=z) + for z in entry.appDeploymentZones + ] + return gsma_schemas.ApplicationModel( + appId=entry.appId, + appProviderId=entry.appProviderId, + appDeploymentZones=zones, + appMetaData=entry.appMetaData, + appQoSProfile=entry.appQoSProfile, + appComponentSpecs=entry.appComponentSpecs, + onboardStatusInfo="ONBOARDED", + ) def onboard_app_gsma(self, request_body: dict): """ @@ -494,9 +826,42 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): resource validation and other pre-deployment operations. :param request_body: Payload with onboarding info. - :return: + :return: Response with onboarding confirmation. """ - pass + try: + # Validate input against GSMA schema + entry = gsma_schemas.AppOnboardManifestGSMA.model_validate( + request_body) + except ValidationError as e: + self.logger.error("Invalid GSMA input schema: %s", e) + raise InvalidArgumentError(str(e)) + + try: + # Convert to ApplicationModel (canonical onboarded representation) + app_model = self._to_application_model(entry) + + # Ensure uniqueness + if self.storage.get_app_gsma(app_model.appId): + raise InvalidArgumentError( + f"GSMA app '{app_model.appId}' already exists") + + # Store in GSMA apps storage + self.storage.store_app_gsma(app_model.appId, app_model) + + # Build and return confirmation response + return build_custom_http_response( + status_code=201, + content=app_model.model_dump(mode="json"), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) + except EdgeCloudPlatformError as e: + self.logger.error("Error during GSMA app onboarding: %s", e) + raise + except Exception as e: + self.logger.exception("Unhandled error during GSMA onboarding: %s", + e) + raise EdgeCloudPlatformError(str(e)) def get_onboarded_app_gsma(self, app_id: str) -> Dict: """ @@ -505,29 +870,158 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param app_id: Identifier of the application onboarded. :return: Dictionary with application details. """ - pass + try: + app = self.storage.get_app_gsma(app_id) + if not app: + raise ResourceNotFoundError(f"GSMA app '{app_id}' not found") + + return build_custom_http_response( + status_code=200, + content=app.model_dump(mode="json"), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) + except EdgeCloudPlatformError as e: + self.logger.error("Error retrieving GSMA app '%s': %s", app_id, e) + raise + except Exception as e: + self.logger.exception( + "Unhandled error retrieving GSMA app '%s': %s", app_id, e) + raise EdgeCloudPlatformError(str(e)) def patch_onboarded_app_gsma(self, app_id: str, request_body: dict): """ Updates partner OP about changes in application compute resource requirements, - QOS Profile, associated descriptor or change in associated components + QOS Profile, associated descriptor or change in associated components. :param app_id: Identifier of the application onboarded. :param request_body: Payload with updated onboarding info. - :return: + :return: Response with updated application details. """ - pass + try: + patch = gsma_schemas.PatchOnboardedAppGSMA.model_validate( + request_body) + except ValidationError as e: + self.logger.error("Invalid GSMA patch schema: %s", e) + raise InvalidArgumentError(str(e)) + + try: + app = self.storage.get_app_gsma(app_id) + if not app: + raise ResourceNotFoundError(f"GSMA app '{app_id}' not found") + + upd = patch.appUpdQoSProfile + + # Update QoS profile fields + if upd.latencyConstraints is not None: + app.appQoSProfile.latencyConstraints = upd.latencyConstraints + if upd.bandwidthRequired is not None: + app.appQoSProfile.bandwidthRequired = upd.bandwidthRequired + if upd.multiUserClients is not None: + app.appQoSProfile.multiUserClients = upd.multiUserClients + if upd.noOfUsersPerAppInst is not None: + app.appQoSProfile.noOfUsersPerAppInst = upd.noOfUsersPerAppInst + if upd.appProvisioning is not None: + app.appQoSProfile.appProvisioning = upd.appProvisioning + + # mobilitySupport lives under AppMetaData + if upd.mobilitySupport is not None: + app.appMetaData.mobilitySupport = upd.mobilitySupport + + # Replace component specs if provided + if patch.appComponentSpecs: + app.appComponentSpecs = [ + gsma_schemas.AppComponentSpec( + serviceNameNB=p.serviceNameNB, + serviceNameEW=p.serviceNameEW, + componentName=p.componentName, + artefactId=p.artefactId, + ) for p in patch.appComponentSpecs + ] + + # Persist updated model + self.storage.store_app_gsma(app_id, app) + + return build_custom_http_response( + status_code=200, + content=app.model_dump(mode="json"), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) + except EdgeCloudPlatformError as e: + self.logger.error("Error updating GSMA app '%s': %s", app_id, e) + raise + except Exception as e: + self.logger.exception("Unhandled error patching GSMA app '%s': %s", + app_id, e) + raise EdgeCloudPlatformError(str(e)) def delete_onboarded_app_gsma(self, app_id: str): """ - Deboards an application from specific partner OP zones + Deboards an application from specific partner OP zones. :param app_id: Identifier of the application onboarded. - :return: + :return: 204 No Content on success. """ - pass + try: + if not self.storage.get_app_gsma(app_id): + raise ResourceNotFoundError(f"GSMA app '{app_id}' not found") + + # CHECKME: update for GSMA + service_instances = self.storage.get_stopped_instances_gsma( + app_id=app_id) + if not service_instances: + raise EdgeCloudPlatformError( + f"Application with id '{app_id}' cannot be deleted — please stop it first" + ) + self.logger.debug( + "Deleting application with id: %s and instances: %s", + app_id, + service_instances, + ) + for service_instance in service_instances: + self._purge_deployed_app_from_continuum_gsma(service_instance) + self.logger.debug("successfully purged service instance: %s", + service_instance) + + self.storage.remove_stopped_instances_gsma(app_id) + + self.storage.delete_app_gsma(app_id) + + return build_custom_http_response( + status_code=204, + content=b"", + headers={}, + encoding=None, + ) + except EdgeCloudPlatformError as e: + self.logger.error("Error deleting GSMA app '%s': %s", app_id, e) + raise + except Exception as e: + self.logger.exception("Unhandled error deleting GSMA app '%s': %s", + app_id, e) + raise EdgeCloudPlatformError(str(e)) + + def _purge_deployed_app_from_continuum_gsma(self, + app_instance_id: str) -> None: + ''' + Purge the deployed application from aerOS continuum. + :param app_id: The application ID to purge + All instances of this app should be stopped + ''' + aeros_client = ContinuumClient(self.base_url) + response = aeros_client.purge_service(app_instance_id) + if response: + self.logger.debug("Purged deployed application with id: %s", + app_instance_id) + else: + raise EdgeCloudPlatformError( + f"Failed to purge service with id from the continuum '{app_instance_id}'" + ) - # ApplicationDeploymentManagement + # ------------------------------------------------------------------------ + # Application Deployment Management (GSMA) + # ------------------------------------------------------------------------ def deploy_app_gsma(self, request_body: dict) -> Dict: """ @@ -536,9 +1030,81 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param request_body: Payload with deployment info. :return: Dictionary with deployment details. """ - pass + try: + payload = gsma_schemas.AppDeployPayloadGSMA.model_validate( + request_body) + except ValidationError as e: + self.logger.error("Invalid GSMA deploy schema: %s", e) + raise InvalidArgumentError(str(e)) + + try: + # Ensure app exists + onboarded_app = self.storage.get_app_gsma(payload.appId) + if not onboarded_app: + raise ResourceNotFoundError( + f"GSMA app '{payload.appId}' not found") + + # 2. Generate unique service ID + # (aerOS) service id <=> GSMA appInstanceId + service_id = self._generate_aeros_service_id( + self._generate_service_id(onboarded_app.appId)) + + # 3. Create TOSCA (yaml str) from GSMA onboarded_app + connected artefacts + # GSMA app corresponds to aerOS Service + # Each GSMA AppComponentSpec references an artefact which is mapped to aerOS Service Component + tosca_yaml = gsma2aeros_converter.generate_tosca_from_gsma_with_artefacts( + app_model=onboarded_app, + zone_id=payload.zoneInfo.zoneId, + artefact_resolver=self.storage.get_artefact_gsma, # cleaner + ) + self.logger.info("Generated TOSCA YAML:") + self.logger.info(tosca_yaml) + + # 4. Instantiate client and call continuum to deploy servic + aeros_client = ContinuumClient(self.base_url) + aeros_response = aeros_client.onboard_and_deploy_service( + service_id, tosca_str=tosca_yaml) + + if "serviceId" not in aeros_response.json(): + raise EdgeCloudPlatformError( + "Invalid response from onboard_service: missing 'serviceId'" + ) + + # 5. Track deployment (Store in GSMA deployment store) + # Build AppInstance and optional status (if you want to persist status later) + inst = gsma_schemas.AppInstance( + zoneId=payload.zoneInfo.zoneId, + appInstIdentifier=service_id, + ) + status = gsma_schemas.AppInstanceStatus( + appInstanceState="PENDING", + accesspointInfo=[], + ) + + self.storage.store_deployment_gsma(onboarded_app.appId, + inst, + status=status) + + # 6. Return expected format (deployment details) + body = inst.model_dump(mode="json") + + return build_custom_http_response( + status_code=202, + content=body, + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=aeros_response.json().get("url", ""), + request=aeros_response.request) + except EdgeCloudPlatformError as ex: + self.logger.error("Failed to deploy app '%s': %s", + onboarded_app.appId, str(ex)) + raise + except Exception as e: + self.logger.exception("Unhandled error during GSMA deploy: %s", e) + raise EdgeCloudPlatformError(str(e)) - def get_deployed_app_gsma(self, app_id: str, app_instance_id: str, zone_id: str) -> Dict: + def get_deployed_app_gsma(self, app_id: str, app_instance_id: str, + zone_id: str) -> Dict: """ Retrieves an application instance details from partner OP. @@ -547,7 +1113,47 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param zone_id: Identifier of the zone :return: Dictionary with application instance details """ - pass + try: + # Ensure app exists + if not self.storage.get_app_gsma(app_id): + raise ResourceNotFoundError(f"GSMA app '{app_id}' not found") + + # 4. Instantiate client and call continuum to deploy servic + aeros_client = ContinuumClient(self.base_url) + aeros_response = aeros_client.query_entity( + entity_id=app_instance_id, ngsild_params='format=simplified') + + response_json = aeros_response.json() + content = gsma_schemas.AppInstanceStatus( + appInstanceState=map_aeros_service_status_to_gsma( + response_json.get("actionType")), + accesspointInfo=[{ + "service_status": + f'{self.base_url}/entities/{app_instance_id}' + }, { + "serviceComponents_status": + f'{self.base_url}/hlo_fe/services//{app_instance_id}' + }], + ) + + validated_data = gsma_schemas.AppInstanceStatus.model_validate( + content) + + return build_custom_http_response( + status_code=200, + content=validated_data.model_dump(mode="json"), + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + url=aeros_response.url, + request=aeros_response.request, + ) + except EdgeCloudPlatformError: + raise + except Exception as e: + self.logger.exception( + "Unhandled error retrieving GSMA deployment '%s' (%s/%s): %s", + app_instance_id, app_id, zone_id, e) + raise EdgeCloudPlatformError(str(e)) def get_all_deployed_apps_gsma(self) -> Response: """ @@ -557,9 +1163,27 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param app_provider: App provider :return: List with application instances details """ - pass + try: + insts = self.storage.find_deployments_gsma() + body = [i.model_dump(mode="json") for i in insts] + self.logger.info("All GSMA app instances retrieved successfully") + self.logger.debug("Deployed GSMA applications: %s", body) + + return build_custom_http_response( + status_code=200, + content=body, + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) + except EdgeCloudPlatformError: + raise + except Exception as e: + self.logger.exception( + "Unhandled error listing GSMA deployments: '%s'", e) + raise EdgeCloudPlatformError(str(e)) - def undeploy_app_gsma(self, app_id: str, app_instance_id: str, zone_id: str): + def undeploy_app_gsma(self, app_id: str, app_instance_id: str, + zone_id: str): """ Terminate an application instance on a partner OP zone. @@ -568,4 +1192,54 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param zone_id: Identifier of the zone :return: """ - pass + try: + # Ensure app exists + if not self.storage.get_app_gsma(app_id): + raise ResourceNotFoundError(f"GSMA app '{app_id}' not found") + + # Ensure the (app_id, instance, zone) exists + matches = self.storage.find_deployments_gsma( + app_id=app_id, + app_instance_id=app_instance_id, + zone_id=zone_id) + if not matches: + raise ResourceNotFoundError( + f"Deployment not found (app_id={app_id}, instance={app_instance_id}, zone={zone_id})" + ) + + # 2. Call the external undeploy_service + aeros_client = ContinuumClient(self.base_url) + try: + aeros_response = aeros_client.undeploy_service(app_instance_id) + except Exception as e: + raise EdgeCloudPlatformError( + f"Failed to undeploy app instance '{app_instance_id}': {str(e)}" + ) from e + + # Remove from deployed and mark as stopped so it can be purged later + removed_app_id = self.storage.remove_deployment_gsma( + app_instance_id) + if removed_app_id: + self.storage.store_stopped_instance_gsma( + removed_app_id, app_instance_id) + + # Async-friendly: 202 Accepted (termination in progress) + body = { + "appId": app_id, + "appInstIdentifier": app_instance_id, + "zoneId": zone_id, + "state": "TERMINATING", + } + return build_custom_http_response( + status_code=aeros_response.status_code, + content=body, + headers={"Content-Type": self.content_type_gsma}, + encoding=self.encoding_gsma, + ) + except EdgeCloudPlatformError: + raise + except Exception as e: + self.logger.exception( + "Unhandled error undeploying GSMA app instance '%s' (app=%s zone=%s): %s", + app_instance_id, app_id, zone_id, e) + raise EdgeCloudPlatformError(str(e)) diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/config.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/config.py index 794cba555d3d862e4fa4bfb431ae11ba05afdbb6..31f1b41ebed347801979cb550319fffff97880af 100644 --- a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/config.py +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/config.py @@ -23,5 +23,5 @@ if not aerOS_ACCESS_TOKEN: aerOS_HLO_TOKEN = "harcoded_hlo_token" if not aerOS_HLO_TOKEN: raise ValueError("Environment variable 'aerOS_HLO_TOKEN' is not set.") -DEBUG = False +DEBUG = True LOG_FILE = ".log/aeros_client.log" diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/continuum_client.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/continuum_client.py index e1bf149213a1a8b0707abee4c60358d7ed4ef1be..413facfa5263ee4e256cc24f808364d7db442540 100644 --- a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/continuum_client.py +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/continuum_client.py @@ -51,7 +51,7 @@ class ContinuumClient: } @catch_requests_exceptions - def query_entity(self, entity_id, ngsild_params) -> dict: + def query_entity(self, entity_id, ngsild_params) -> requests.Response: """ Query entity with ngsi-ld params :input @@ -70,10 +70,10 @@ class ContinuumClient: self.logger.debug( "Query entity response: %s %s", response.status_code, response.text ) - return response.json() + return response @catch_requests_exceptions - def query_entities(self, ngsild_params): + def query_entities(self, ngsild_params) -> requests.Response: """ Query entities with ngsi-ld params :input @@ -90,7 +90,7 @@ class ContinuumClient: # self.logger.debug("Query entities URL: %s", entities_url) # self.logger.debug("Query entities response: %s %s", # response.status_code, response.text) - return response.json() + return response @catch_requests_exceptions def deploy_service(self, service_id: str) -> dict: @@ -116,7 +116,7 @@ class ContinuumClient: return response.json() @catch_requests_exceptions - def undeploy_service(self, service_id: str) -> dict: + def undeploy_service(self, service_id: str) -> requests.Response: """ Undeploy service :input @@ -129,6 +129,7 @@ class ContinuumClient: if response is None: return None else: + self.logger.debug("In OK Undeploy and text: %s", response.text) if config.DEBUG: self.logger.debug("Re-allocate service URL: %s", undeploy_url) self.logger.debug( @@ -136,10 +137,10 @@ class ContinuumClient: response.status_code, response.text, ) - return response.json() + return response @catch_requests_exceptions - def onboard_and_deploy_service(self, service_id: str, tosca_str: str) -> dict: + def onboard_and_deploy_service(self, service_id: str, tosca_str: str) -> requests.Response: """ Onboard (& deploy) service on aerOS continuum :input @@ -165,7 +166,7 @@ class ContinuumClient: response.status_code, response.text, ) - return response.json() + return response @catch_requests_exceptions def purge_service(self, service_id: str) -> bool: diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/continuum_models.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/continuum_models.py new file mode 100644 index 0000000000000000000000000000000000000000..a0a7fe137b5d6b7e29e0097b88f1c3071bfc77cb --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/continuum_models.py @@ -0,0 +1,269 @@ +''' + aerOS continuum models +''' +from enum import Enum +from typing import List, Dict, Any, Union, Optional +from pydantic import BaseModel, Field + + +class ServiceNotFound(BaseModel): + ''' + Docstring + ''' + detail: str = "Service not found" + + +class CPUComparisonOperator(BaseModel): + """ + CPU requirment for now is that usage should be less than + """ + less_or_equal: Union[float, None] = None + + +class CPUArchComparisonOperator(BaseModel): + """ + CPU arch requirment, equal to str + """ + equal: Union[str, None] = None + + +class MEMComparisonOperator(BaseModel): + """ + RAM requirment for now is that available RAM should be more than + """ + greater_or_equal: Union[str, None] = None + + +class EnergyEfficienyComparisonOperator(BaseModel): + """ + Energy Efficiency requirment for now is that IE should have energy efficiency more than a % + """ + greater_or_equal: Union[str, None] = None + + +class GreenComparisonOperator(BaseModel): + """ + IE Green requirment for now is that IE should have green energy mix which us more than a % + """ + greater_or_equal: Union[str, None] = None + + +class RTComparisonOperator(BaseModel): + """ + Real Time requirment T/F + """ + equal: Union[bool, None] = None + + +class CpuArch(str, Enum): + ''' + Enumeration with possible cpu types + ''' + x86_64 = "x86_64" + arm64 = "arm64" + arm32 = "arm32" + + +class Coordinates(BaseModel): + ''' + IE coordinate requirements + ''' + coordinates: List[List[float]] + + +class DomainIdOperator(BaseModel): + """ + CPU arch requirment, equal to str + """ + equal: Union[str, None] = None + + +class Property(BaseModel): + ''' + IE capabilities + ''' + cpu_usage: CPUComparisonOperator = Field( + default_factory=CPUComparisonOperator) + cpu_arch: CPUArchComparisonOperator = Field( + default_factory=CPUArchComparisonOperator) + mem_size: MEMComparisonOperator = Field( + default_factory=MEMComparisonOperator) + realtime: RTComparisonOperator = Field( + default_factory=RTComparisonOperator) + area: Coordinates = None + energy_efficiency: EnergyEfficienyComparisonOperator = Field( + default_factory=EnergyEfficienyComparisonOperator) + green: GreenComparisonOperator = Field( + default_factory=GreenComparisonOperator) + domain_id: DomainIdOperator = Field(default_factory=DomainIdOperator) + + # @field_validator('mem_size') + # def validate_mem_size(cls, v): + # if not v or "MB" not in v: + # raise ValueError("mem_size must be in MB and specified") + # mem_size_value = int(v.split(" ")[0]) + # if mem_size_value < 2000: + # raise ValueError("mem_size must be greater or equal to 2000 MB") + # return v + + +class HostCapability(BaseModel): + ''' + Host properties + ''' + properties: Property + + +class NodeFilter(BaseModel): + ''' + Node filter, + How to filter continuum IE and select canditate list + ''' + properties: Optional[Dict[str, List[str]]] = None + capabilities: Optional[List[Dict[str, HostCapability]]] = None + + +class HostRequirement(BaseModel): + ''' + capabilities of node + ''' + # node_filter: Dict[str, List[Dict[str, HostCapability]]] + node_filter: NodeFilter + + +class PortProperties(BaseModel): + ''' + Workload port description + ''' + protocol: List[str] = Field(...) + source: int = Field(...) + + +class ExposedPort(BaseModel): + ''' + Workload exposed network ports + ''' + properties: PortProperties = Field(...) + + +class NetworkProperties(BaseModel): + ''' + Dict of network requirments, name of port and protperty = [protocol, port] mapping + ''' + ports: Dict[str, ExposedPort] = Field(...) + exposePorts: Optional[bool] + + +class NetworkRequirement(BaseModel): + ''' + Top level key of network requirments + ''' + properties: NetworkProperties + + +class CustomRequirement(BaseModel): + ''' + Define a custom requirement type that can be either a host or a network requirement + ''' + host: HostRequirement = None + network: NetworkRequirement = None + + +class ArtifactModel(BaseModel): + ''' + Artifact has a useer defined id and then a dict with the following keys: + ''' + file: str + type: str + repository: str + is_private: Optional[bool] = False + username: Optional[str] = None + password: Optional[str] = None + + +class NodeTemplate(BaseModel): + ''' + Node template "tosca.nodes.Container.Application" + ''' + type: str + requirements: List[CustomRequirement] + artifacts: Dict[str, ArtifactModel] + interfaces: Dict[str, Any] + isJob: Optional[bool] = False + + +class TOSCA(BaseModel): + ''' + The TOSCA structure + ''' + tosca_definitions_version: str + description: str + serviceOverlay: Optional[bool] = False + node_templates: Dict[str, NodeTemplate] + + +TOSCA_YAML_EXAMPLE = """ +tosca_definitions_version: tosca_simple_yaml_1_3 +description: A test service for testing TOSCA generation +serviceOverlay: false + +node_templates: + auto-component: + type: tosca.nodes.Container.Application + isJob: False + artifacts: + application_image: + file: aeros-public/common-deployments/nginx:latest + repository: registry.gitlab.aeros-project.eu + type: tosca.artifacts.Deployment.Image.Container.Docker + interfaces: + Standard: + create: + implementation: application_image + inputs: + cliArgs: + - -a: aa + envVars: + - URL: bb + requirements: + - network: + properties: + ports: + port1: + properties: + protocol: + - tcp + source: 80 + port2: + properties: + protocol: + - tcp + source: 443 + exposePorts: True + - host: + node_filter: + capabilities: + - host: + properties: + cpu_arch: + equal: x64 + realtime: + equal: false + cpu_usage: + less_or_equal: '0.4' + mem_size: + greater_or_equal: '1' + domain_id: + equal: urn:ngsi-ld:Domain:NCSRD + energy_efficiency: + greater_or_equal: '0.5' + green: + greater_or_equal: '0.5' + domain_id: + equal: urn:ngsi-ld:Domain:ncsrd01 + properties: null + + + + +""" diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/__init__.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/aeros2gsma_zone_details.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/aeros2gsma_zone_details.py new file mode 100644 index 0000000000000000000000000000000000000000..b83228ead7c92b20e7a6bce50e602d928c106ce7 --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/aeros2gsma_zone_details.py @@ -0,0 +1,154 @@ +''' +aeros2gsma_zone_details.py +''' +from typing import List, Dict, Any + + +def transformer(domain_ies: List[Dict[str, Any]], + domain: str) -> Dict[str, Any]: + """ + Transform aerOS InfrastructureElements into GSMA ZoneRegisteredData structure. + :param domain_ies: List of aerOS InfrastructureElement dicts + :param domain: The ID of the edge cloud zone (zoneId) + :return: Dict matching gsma_schemas.ZoneRegisteredData (JSON-serializable) + """ + + def map_cpu_arch_to_isa(urn: str) -> str: + """ + Map aerOS cpuArchitecture URN to GSMA ISA_* literal. + Examples: + 'urn:ngsi-ld:CpuArchitecture:x64' -> 'ISA_X86_64' + 'urn:ngsi-ld:CpuArchitecture:arm64' -> 'ISA_ARM_64' + 'urn:ngsi-ld:CpuArchitecture:arm32' -> 'ISA_ARM_64' (closest) + 'urn:ngsi-ld:CpuArchitecture:x86' -> 'ISA_X86' + Fallback: 'ISA_X86_64' + """ + if not isinstance(urn, str): + return "ISA_X86_64" + tail = urn.split(":")[-1].lower() + if tail in ("x64", "x86_64", "amd64"): + return "ISA_X86_64" + if tail in ("x86", "i386", "i686"): + return "ISA_X86" + if tail in ("arm64", "aarch64"): + return "ISA_ARM_64" + if tail in ("arm32", "arm"): + # GSMA only has ARM_64 vs X86/X86_64; pick closest + return "ISA_ARM_64" + return "ISA_X86_64" + + def map_cpu_arch_to_ostype_arch(urn: str) -> str: + """ + Map aerOS cpuArchitecture URN to OSType.architecture literal: 'x86_64' or 'x86'. + Use 'x86_64' for x64/arm64 (closest allowed), and 'x86' for x86/arm32. + """ + if not isinstance(urn, str): + return "x86_64" + tail = urn.split(":")[-1].lower() + if tail in ("x64", "x86_64", "amd64", "arm64", "aarch64"): + return "x86_64" + if tail in ("x86", "i386", "i686", "arm32", "arm"): + return "x86" + return "x86_64" + + def map_os_distribution(_urn: str) -> str: + """ + aerOS uses 'urn:ngsi-ld:OperatingSystem:Linux' etc. + map Linux -> UBUNTU (assume), else OTHER. + """ + if isinstance(_urn, str) and _urn.split(":")[-1].lower() == "linux": + return "UBUNTU" + return "OTHER" + + def default_os_version(dist: str) -> str: + # You asked to assume Ubuntu 22.04 LTS for Linux + return "OS_VERSION_UBUNTU_2204_LTS" if dist == "UBUNTU" else "OTHER" + + # Totals (aggregate over elements) + total_cpu = 0 + total_ram = 0 + total_disk = 0 + total_available_ram = 0 + total_available_disk = 0 + + flavours_supported: List[Dict[str, Any]] = [] + seen_cpu_isas: set[str] = set() + + for element in domain_ies: + cpu_cores = int(element.get("cpuCores", 0) or 0) + ram_cap = int(element.get("ramCapacity", 0) or 0) # MB? + avail_ram = int(element.get("availableRam", 0) or 0) # MB? + disk_cap = int(element.get("diskCapacity", 0) + or 0) # MB/GB? (pass-through) + avail_disk = int(element.get("availableDisk", 0) or 0) + + total_cpu += cpu_cores + total_ram += ram_cap + total_available_ram += avail_ram + total_disk += disk_cap + total_available_disk += avail_disk + + cpu_arch_urn = element.get("cpuArchitecture", "") + os_urn = element.get("operatingSystem", "") + + isa = map_cpu_arch_to_isa(cpu_arch_urn) + seen_cpu_isas.add(isa) + ost_arch = map_cpu_arch_to_ostype_arch(cpu_arch_urn) + dist = map_os_distribution(os_urn) + ver = default_os_version(dist) + + # Create a flavour per machine + flavour = { + "flavourId": + f"{element.get('hostname', 'host')}-{element.get('containerTechnology', 'CT')}", + "cpuArchType": + isa, # Literal ISA_* + "supportedOSTypes": [{ + "architecture": ost_arch, # 'x86_64' or 'x86' + "distribution": dist, # 'UBUNTU' or 'OTHER' + "version": ver, # 'OS_VERSION_UBUNTU_2204_LTS' or 'OTHER' + "license": "OS_LICENSE_TYPE_FREE", + }], + "numCPU": + cpu_cores, + "memorySize": + ram_cap, + "storageSize": + disk_cap, + } + flavours_supported.append(flavour) + + # Decide a single ISA for the aggregate reserved/quota entries + # Preference order: X86_64, ARM_64, X86 + def pick_aggregate_isa() -> str: + if "ISA_X86_64" in seen_cpu_isas: + return "ISA_X86_64" + if "ISA_ARM_64" in seen_cpu_isas: + return "ISA_ARM_64" + if "ISA_X86" in seen_cpu_isas: + return "ISA_X86" + # fallback + return "ISA_X86_64" + + agg_isa = pick_aggregate_isa() + + result = { + "zoneId": + domain, + "reservedComputeResources": [{ + "cpuArchType": agg_isa, + "numCPU": int( + total_cpu + ), # Same as Quotas untill we have somem policy or data to differentiate + "memory": total_ram, # ditto + }], + "computeResourceQuotaLimits": [{ + "cpuArchType": agg_isa, + "numCPU": int(total_cpu), + "memory": total_ram, + }], + "flavoursSupported": + flavours_supported, + } + + return result diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/camara2aeros_converter.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/camara2aeros_converter.py new file mode 100644 index 0000000000000000000000000000000000000000..318d575b80aa741db134706897dea7d77659c7f6 --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/camara2aeros_converter.py @@ -0,0 +1,122 @@ +''' +Module: converter.py +This module provides functions to convert application manifests into TOSCA models. +It includes the `generate_tosca` function that constructs a TOSCA model based on +the application manifest and associated app zones. +''' +from typing import List, Dict, Any +import yaml +from sunrise6g_opensdk.edgecloud.adapters.aeros import config +from sunrise6g_opensdk.logger import setup_logger +from sunrise6g_opensdk.edgecloud.core.camara_schemas import AppManifest, VisibilityType +from sunrise6g_opensdk.edgecloud.adapters.aeros.continuum_models import ( + TOSCA, NodeTemplate, CustomRequirement, HostRequirement, HostCapability, + Property as HostProperty, DomainIdOperator, NodeFilter, NetworkRequirement, + NetworkProperties, ExposedPort, PortProperties, ArtifactModel) + +logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE) + + +def generate_tosca(app_manifest: AppManifest, app_zones: List[str]) -> str: + ''' + Generate a TOSCA model from the application manifest and app zones. + Args: + app_manifest (AppManifest): The application manifest containing details about the app. + app_zones (List[Dict[str, Any]]): List of app zones where the app will be deployed. + Returns: + TOSCA yaml as string which can be used in a POST request with applcation type yaml + ''' + component = app_manifest.componentSpec[0] + image_path = app_manifest.appRepo.imagePath.root + image_file = image_path.split("/")[-1] + repository_url = "/".join( + image_path.split("/")[:-1]) if "/" in image_path else "docker_hub" + + zone_id = app_zones[0] + logger.info("DEBUG : %s", app_manifest.requiredResources.root) + # Extract minNodeMemory (fallback = 1024 MB) + + res = app_manifest.requiredResources.root + if hasattr(res, "applicationResources") and hasattr( + res.applicationResources.cpuPool.topology, "minNodeMemory"): + min_node_memory = res.applicationResources.cpuPool.topology.minNodeMemory + else: + min_node_memory = 1024 + + # Build exposed network ports + ports = { + iface.interfaceId: + ExposedPort(properties=PortProperties( + protocol=[iface.protocol.value.lower()], source=iface.port)) + for iface in component.networkInterfaces + } + + expose_ports = any( + iface.visibilityType == VisibilityType.VISIBILITY_EXTERNAL + for iface in component.networkInterfaces) + + # Define host property constraints + host_props = HostProperty( + cpu_arch={"equal": "x64"}, + realtime={"equal": False}, + cpu_usage={"less_or_equal": "0.4"}, + mem_size={"greater_or_equal": str(min_node_memory)}, + energy_efficiency={"greater_or_equal": "0"}, + green={"greater_or_equal": "0"}, + domain_id=DomainIdOperator(equal=zone_id), + ) + + # Create Node compute and network requirements + requirements = [ + CustomRequirement(network=NetworkRequirement( + properties=NetworkProperties(ports=ports, + exposePorts=expose_ports))), + CustomRequirement(host=HostRequirement(node_filter=NodeFilter( + capabilities=[{ + "host": HostCapability(properties=host_props) + }], + properties=None))) + ] + # Define the NodeTemplate + node_template = NodeTemplate( + type="tosca.nodes.Container.Application", + isJob=False, + requirements=requirements, + artifacts={ + "application_image": + ArtifactModel( + file=image_file, + type="tosca.artifacts.Deployment.Image.Container.Docker", + repository=repository_url, + is_private=app_manifest.appRepo.type == "PRIVATEREPO", + username=app_manifest.appRepo.userName, + password=app_manifest.appRepo.credentials) + }, + interfaces={ + "Standard": { + "create": { + "implementation": "application_image", + "inputs": { + "cliArgs": [], + "envVars": [] + } + } + } + }) + + # Assemble full TOSCA object + tosca = TOSCA(tosca_definitions_version="tosca_simple_yaml_1_3", + description=f"TOSCA for {app_manifest.name}", + serviceOverlay=False, + node_templates={component.componentName: node_template}) + + tosca_dict = tosca.model_dump(by_alias=True, exclude_none=True) + + for template in tosca_dict.get("node_templates", {}).values(): + template["requirements"] = [{ + k: v + for k, v in req.items() if v is not None + } for req in template.get("requirements", [])] + + yaml_str = yaml.dump(tosca_dict, sort_keys=False) + return yaml_str diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/gsma2aeros_converter.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/gsma2aeros_converter.py new file mode 100644 index 0000000000000000000000000000000000000000..c3a0bae1ad7e6113f2a71fbba01df861e1ba494f --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/converters/gsma2aeros_converter.py @@ -0,0 +1,223 @@ +""" +Module: gsm2aeros_converter.py +Initial GSMA -> TOSCA generator. + +Notes: +- GSMA ApplicationModel does not include container image or ports directly. + (Those usually come from Artefacts, which we're ignoring for now.) +- We provide an `image_map` hook to resolve artefactId -> image string. +- Defaults to a public nginx image if nothing is provided. +- Network ports are omitted for now (exposePorts = False). +""" + +from typing import Optional, Callable, Dict, Any, List +import yaml +from sunrise6g_opensdk.edgecloud.adapters.aeros import config +from sunrise6g_opensdk.edgecloud.adapters.aeros.errors import ( + ResourceNotFoundError, + InvalidArgumentError, +) +from sunrise6g_opensdk.logger import setup_logger +from sunrise6g_opensdk.edgecloud.core import gsma_schemas +from sunrise6g_opensdk.edgecloud.adapters.aeros.continuum_models import ( + TOSCA, + NodeTemplate, + CustomRequirement, + HostRequirement, + HostCapability, + Property as HostProperty, + DomainIdOperator, + NodeFilter, + NetworkRequirement, + NetworkProperties, + ExposedPort, + PortProperties, + ArtifactModel, +) + +logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE) + + +def generate_tosca_from_gsma_with_artefacts( + app_model: gsma_schemas.ApplicationModel, + zone_id: str, + artefact_resolver: Callable[[str], Optional[gsma_schemas.Artefact]], +) -> str: + """ + Build a TOSCA YAML from a GSMA `ApplicationModel` by resolving each component's `artefactId`. + + Rules/assumptions: + - One node_template per `AppComponentSpec` in the application model. + - Container image is taken from the first entry of `artefact.componentSpec[i].images`. + - Ports come (best-effort) from `exposedInterfaces` items in the matching componentSpec, e.g. {"protocol": "TCP", "port": 8080}. + - Host filter includes domain_id == `zone_id` and basic CPU/mem constraints. + - For PUBLICREPO artefacts: set `is_private=False` and omit credentials entirely. + For PRIVATEREPO artefacts: set `is_private=True` and include non-empty username/password if present. + - `cliArgs` are derived from `commandLineParams` dict: + - bool True -> "flag" + - key/value -> "key=value" + `envVars` are derived from `compEnvParams` list: + - [{"name": "KEY", "value": "VAL"}] -> [{"KEY": "VAL"}, ...] + - If a component name mismatch occurs between app and artefact, fall back to the first artefact componentSpec. + + :param app_model: GSMA ApplicationModel (already validated) + :param zone_id: Target aerOS domain id/zone urn for host node filter + :param artefact_resolver: Callable that returns an Artefact for a given artefactId + :return: TOSCA YAML string (tosca_simple_yaml_1_3) + """ + node_templates: Dict[str, NodeTemplate] = {} + + for comp in app_model.appComponentSpecs: + artefact = artefact_resolver(comp.artefactId) + if not artefact: + raise ResourceNotFoundError( + f"GSMA artefact '{comp.artefactId}' not found") + + # pick the componentSpec that matches componentName, else first + comp_spec = None + if artefact.componentSpec: + for c in artefact.componentSpec: + if c.componentName == comp.componentName: + comp_spec = c + break + if comp_spec is None: + comp_spec = artefact.componentSpec[0] + else: + raise InvalidArgumentError( + f"Artefact '{artefact.artefactId}' has no componentSpec") + + # Resolve container image + image = comp_spec.images[ + 0] if comp_spec.images else "docker.io/library/nginx:stable" + if "/" in image: + repository_url = "/".join(image.split("/")[:-1]) + image_file = image.split("/")[-1] + else: + repository_url, image_file = "docker_hub", image + + # Ports (best-effort) from exposedInterfaces + ports: Dict[str, ExposedPort] = {} + expose_ports = False + if comp_spec.exposedInterfaces: + for idx, iface in enumerate(comp_spec.exposedInterfaces): + protocol = str(iface.get("protocol", "TCP")).lower() + port = iface.get("port") + if isinstance(port, int): + ports[f"if{idx}"] = ExposedPort(properties=PortProperties( + protocol=[protocol], source=port)) + expose_ports = True + + # Build cliArgs as a list of dicts: [{"KEY": "VAL"}, {"FLAG": ""}, ...] + cli_args: List[Dict[str, str]] = [] + cmd = getattr(comp_spec, "commandLineParams", None) + + if isinstance(cmd, dict): + for k, v in cmd.items(): + if v is True: + cli_args.append({str(k): ""}) # flag without value + elif v is False or v is None: + continue + else: + cli_args.append({str(k): str(v)}) + elif isinstance(cmd, list): + # if someone passes ["--flag", "--opt=1"] style + for item in cmd: + if isinstance(item, str): + if "=" in item: + k, v = item.split("=", 1) + cli_args.append({k: v}) + else: + cli_args.append({item: ""}) + + # Build envVars from compEnvParams list of {"name": "...", "value": "..."} + env_vars: List[Dict[str, str]] = [] + if isinstance(getattr(comp_spec, "compEnvParams", None), list): + for item in comp_spec.compEnvParams: + if isinstance(item, dict): + if "name" in item and "value" in item: + env_vars.append( + {str(item["name"]): str(item["value"])}) + elif len(item) == 1: # already mapping-like {"KEY": "VAL"} + k, v = next(iter(item.items())) + env_vars.append({str(k): str(v)}) + + # Host filter (basic example) + host_props = HostProperty( + cpu_arch={"equal": "x64"}, + realtime={"equal": False}, + cpu_usage={"less_or_equal": "0.4"}, + mem_size={"greater_or_equal": "1024"}, + energy_efficiency={"greater_or_equal": "0"}, + green={"greater_or_equal": "0"}, + domain_id=DomainIdOperator(equal=zone_id), + ) + + requirements = [ + CustomRequirement(network=NetworkRequirement( + properties=NetworkProperties(ports=ports, + exposePorts=expose_ports))), + CustomRequirement(host=HostRequirement(node_filter=NodeFilter( + capabilities=[{ + "host": HostCapability(properties=host_props) + }], + properties=None, + ))), + ] + + # PUBLICREPO => is_private=False and omit credentials + repo_type = getattr(artefact, "repoType", None) + is_private = bool(repo_type == "PRIVATEREPO") + username = None + password = None + if is_private and artefact.artefactRepoLocation: + u = artefact.artefactRepoLocation.userName + p = artefact.artefactRepoLocation.password + username = u if u else None + password = p if p else None + + node_templates[comp.componentName] = NodeTemplate( + type="tosca.nodes.Container.Application", + isJob=False, + requirements=requirements, + artifacts={ + "application_image": + ArtifactModel( + file=image_file, + type="tosca.artifacts.Deployment.Image.Container.Docker", + repository=repository_url, + is_private=is_private, # False for PUBLICREPO + username=username, # None for PUBLICREPO + password=password, # None for PUBLICREPO + ) + }, + interfaces={ + "Standard": { + "create": { + "implementation": "application_image", + "inputs": { + "cliArgs": cli_args, + "envVars": env_vars, + }, + } + } + }, + ) + + # Assemble and dump TOSCA + tosca = TOSCA( + tosca_definitions_version="tosca_simple_yaml_1_3", + description= + f"GSMA->TOSCA for {app_model.appMetaData.appName} ({app_model.appId})", + serviceOverlay=False, + node_templates=node_templates, + ) + + tosca_dict = tosca.model_dump(by_alias=True, exclude_none=True) + # Clean requirements lists from None entries + for template in tosca_dict.get("node_templates", {}).values(): + template["requirements"] = [{ + k: v + for k, v in req.items() if v is not None + } for req in template.get("requirements", [])] + + return yaml.dump(tosca_dict, sort_keys=False) diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/errors.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/errors.py new file mode 100644 index 0000000000000000000000000000000000000000..2ca44f4470fe85217131f81b481cfc7737b60e34 --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/errors.py @@ -0,0 +1,30 @@ +''' +Custom aerOS adapter exceptions on top of EdgeCloudPlatformError +''' + +from sunrise6g_opensdk.edgecloud.adapters.errors import EdgeCloudPlatformError + + +class InvalidArgumentError(EdgeCloudPlatformError): + """400 Bad Request""" + pass + + +class UnauthenticatedError(EdgeCloudPlatformError): + """401 Unauthorized""" + pass + + +class PermissionDeniedError(EdgeCloudPlatformError): + """403 Forbidden""" + pass + + +class ResourceNotFoundError(EdgeCloudPlatformError): + """404 Not Found""" + pass + + +class ServiceUnavailableError(EdgeCloudPlatformError): + """503 Service Unavailable""" + pass diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/__init__.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..612b363b9872a884918f2e0c8aa01bcb67c2d4d2 --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/__init__.py @@ -0,0 +1,5 @@ +''' +This module contains the storage management implementations for aerOS. +''' +from .inMemoryStorage import InMemoryAppStorage +from .sqlite_storage import SQLiteAppStorage diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/appStorageManager.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/appStorageManager.py new file mode 100644 index 0000000000000000000000000000000000000000..9bff25a1276dce8dfba66448af38105be4279007 --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/appStorageManager.py @@ -0,0 +1,172 @@ +''' +# Class: AppStorageManager +# Abstract base class for application storage backends. +# This module defines the interface for managing application storage, +# ''' +from abc import ABC, abstractmethod +from typing import Dict, List, Optional, Union +from sunrise6g_opensdk.edgecloud.core.camara_schemas import AppInstanceInfo +from sunrise6g_opensdk.edgecloud.core.gsma_schemas import (ApplicationModel, + AppInstance, + AppInstanceStatus, + Artefact) + + +class AppStorageManager(ABC): + """Abstract base class for application storage backends.""" + + # ------------------------------------------------------------------------ + # aerOS Domain → Zone mapping + # ------------------------------------------------------------------------ + @abstractmethod + def store_zones(self, zones: Dict[str, Dict]) -> None: + """Store or update the aerOS domain → zone info mapping.""" + + @abstractmethod + def list_zones(self) -> List[Dict]: + """Return a list of all stored zone records (values).""" + + @abstractmethod + def resolve_domain_id_by_zone_uuid(self, zone_uuid: str) -> Optional[str]: + """Return the aerOS domain id (key) for a given edgeCloudZoneId (UUID).""" + + # ------------------------------------------------------------------------ + # CAMARA + # ------------------------------------------------------------------------ + + @abstractmethod + def store_app(self, app_id: str, manifest: Dict) -> None: + pass + + @abstractmethod + def get_app(self, app_id: str) -> Optional[Dict]: + pass + + @abstractmethod + def app_exists(self, app_id: str) -> bool: + pass + + @abstractmethod + def list_apps(self) -> List[Dict]: + pass + + @abstractmethod + def delete_app(self, app_id: str) -> None: + pass + + @abstractmethod + def store_deployment(self, app_instance: AppInstanceInfo) -> None: + pass + + @abstractmethod + def get_deployments(self, + app_id: Optional[str] = None) -> Dict[str, List[str]]: + pass + + @abstractmethod + def find_deployments( + self, + app_id: Optional[str] = None, + app_instance_id: Optional[str] = None, + region: Optional[str] = None) -> List[AppInstanceInfo]: + pass + + @abstractmethod + def remove_deployment(self, app_instance_id: str) -> Optional[str]: + """Removes the given instance ID and returns the corresponding app_id, if found.""" + pass + + @abstractmethod + def store_stopped_instance(self, app_id: str, + app_instance_id: str) -> None: + pass + + @abstractmethod + def get_stopped_instances( + self, + app_id: Optional[str] = None) -> List[str] | Dict[str, List[str]]: + pass + + @abstractmethod + def remove_stopped_instances(self, app_id: str) -> None: + pass + + # ------------------------------------------------------------------------ + # GSMA + # ------------------------------------------------------------------------ + @abstractmethod + def store_app_gsma(self, app_id: str, model: ApplicationModel) -> None: + ... + + @abstractmethod + def get_app_gsma(self, app_id: str) -> Optional[ApplicationModel]: + ... + + @abstractmethod + def list_apps_gsma(self) -> List[ApplicationModel]: + ... + + @abstractmethod + def delete_app_gsma(self, app_id: str) -> None: + ... + + @abstractmethod + def store_deployment_gsma( + self, + app_id: str, + inst: AppInstance, + status: Optional[AppInstanceStatus] = None, # optional future use + ) -> None: + ... + + @abstractmethod + def get_deployments_gsma(self, + app_id: Optional[str] = None + ) -> Dict[str, List[str]]: + ... + + @abstractmethod + def find_deployments_gsma( + self, + app_id: Optional[str] = None, + app_instance_id: Optional[str] = None, + zone_id: Optional[str] = None, + ) -> List[AppInstance]: + ... + + @abstractmethod + def remove_deployment_gsma(self, app_instance_id: str) -> Optional[str]: + ... + + @abstractmethod + def store_stopped_instance_gsma(self, app_id: str, + app_instance_id: str) -> None: + ... + + @abstractmethod + def get_stopped_instances_gsma( + self, + app_id: Optional[str] = None + ) -> Union[List[str], Dict[str, List[str]]]: + ... + + @abstractmethod + def remove_stopped_instances_gsma(self, app_id: str) -> None: + ... + + # --- GSMA Artefacts --- + @abstractmethod + def store_artefact_gsma(self, artefact: Artefact) -> None: + ... + + @abstractmethod + def get_artefact_gsma(self, artefact_id: str) -> Optional[Artefact]: + ... + + @abstractmethod + def list_artefacts_gsma(self) -> List[Artefact]: + ... + + @abstractmethod + def delete_artefact_gsma(self, artefact_id: str) -> None: + ... diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/inMemoryStorage.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/inMemoryStorage.py new file mode 100644 index 0000000000000000000000000000000000000000..a2e27ef7b7546ffe7ce3f702b5e994131891bea5 --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/inMemoryStorage.py @@ -0,0 +1,366 @@ +""" +Class: InMemoryAppStorage +Process-wide singleton, thread-safe with a single RLock. +Keeps CAMARA and GSMA stores separate to avoid schema confusion. +""" + +from abc import ABCMeta +from threading import RLock +from typing import Dict, List, Optional, Union + +from sunrise6g_opensdk.edgecloud.adapters.aeros.storageManagement.appStorageManager import ( + AppStorageManager) +from sunrise6g_opensdk.edgecloud.core.camara_schemas import AppInstanceInfo +from sunrise6g_opensdk.edgecloud.core.gsma_schemas import (ApplicationModel, + AppInstance, + AppInstanceStatus, + Artefact) +from sunrise6g_opensdk.edgecloud.adapters.aeros import config +from sunrise6g_opensdk.logger import setup_logger + + +class SingletonMeta(ABCMeta): + """Thread-safe Singleton metaclass (process-wide).""" + _instances: Dict[type, object] = {} + _lock = RLock() + + def __call__(cls, *args, **kwargs): + # Double-checked locking + if cls not in cls._instances: + with cls._lock: + if cls not in cls._instances: + cls._instances[cls] = super().__call__(*args, **kwargs) + return cls._instances[cls] + + +class InMemoryAppStorage(AppStorageManager, metaclass=SingletonMeta): + """ + In-memory implementation of the AppStorageManager interface. + CAMARA and GSMA data are stored in separate namespaces. + """ + + def __init__(self): + if getattr(self, "_initialized", False): + return + + # Always have a logger; gate noisy messages by DEBUG + self.logger = setup_logger() + if config.DEBUG: + self.logger.info("Using InMemoryStorage (singleton)") + + self._lock = RLock() + + # aerOS Domain → Zone mapping + self._zones: Dict[str, + Dict] = {} # {aeros_domain_id: camara_zone_dict} + + # CAMARA stores + self._apps: Dict[str, Dict] = {} # app_id -> manifest (CAMARA dict) + self._deployed: Dict[str, List[AppInstanceInfo]] = { + } # app_id -> [AppInstanceInfo] + self._stopped: Dict[str, + List[str]] = {} # app_id -> [stopped instance ids] + + # GSMA stores + self._apps_gsma: Dict[str, ApplicationModel] = { + } # app_id -> ApplicationModel + self._deployed_gsma: Dict[str, List[AppInstance]] = { + } # app_id -> [AppInstance] + self._stopped_gsma: Dict[str, List[str]] = { + } # app_id -> [stopped instance ids] + + self._artefacts_gsma: Dict[str, + Artefact] = {} # artefact_id -> Artefact + + self._initialized = True + + # ------------------------------------------------------------------------ + # Utilities + # ------------------------------------------------------------------------ + def reset(self) -> None: + """Helper for tests to clear global state.""" + with self._lock: + # CAMARA + self._apps.clear() + self._deployed.clear() + self._stopped.clear() + # GSMA + self._apps_gsma.clear() + self._deployed_gsma.clear() + self._stopped_gsma.clear() + + # ------------------------------------------------------------------------ + # aerOS Domain → Zone mapping + # ------------------------------------------------------------------------ + + def store_zones(self, zones: Dict[str, Dict]) -> None: + """ + Directly store a mapping of aerOS domain_id -> zone_info dict. + Example: + { + "urn:ngsi-ld:Domain:Athens": { + "edgeCloudZoneId": "550e8400-e29b-41d4-a716-446655440000", + "edgeCloudZoneName": "Athens", + "edgeCloudProvider": "aeros_dev", + "status": "active", + "geographyDetails": "NOT_USED", + }, + ... + } + """ + with self._lock: + self._zones.update(zones) + + def list_zones(self) -> List[Dict]: + """Return all zone records as a list of dicts.""" + with self._lock: + return [dict(v) for v in self._zones.values()] + + def resolve_domain_id_by_zone_uuid(self, zone_uuid: str) -> Optional[str]: + """ + Given the edgeCloudZoneId (UUID string), return the original aerOS domain id. + Performs a simple scan — fine for small to medium sets. + """ + with self._lock: + for domain_id, zone in self._zones.items(): + if zone.get("edgeCloudZoneId") == zone_uuid: + return domain_id + return None + + # ------------------------------------------------------------------------ + # CAMARA + # ------------------------------------------------------------------------ + def store_app(self, app_id: str, manifest: Dict) -> None: + with self._lock: + self._apps[app_id] = manifest + + def get_app(self, app_id: str) -> Optional[Dict]: + with self._lock: + return self._apps.get(app_id) + + def app_exists(self, app_id: str) -> bool: + with self._lock: + return app_id in self._apps + + def list_apps(self) -> List[Dict]: + with self._lock: + # shallow copies to avoid external mutation of nested dicts + return [dict(m) for m in self._apps.values()] + + def delete_app(self, app_id: str) -> None: + with self._lock: + self._apps.pop(app_id, None) + + def store_deployment(self, app_instance: AppInstanceInfo) -> None: + with self._lock: + # Ensure the key is a plain string + aid = getattr(app_instance.appId, "root", str(app_instance.appId)) + self._deployed.setdefault(aid, []).append(app_instance) + + def get_deployments(self, + app_id: Optional[str] = None) -> Dict[str, List[str]]: + with self._lock: + if app_id: + ids = [ + str(i.appInstanceId) + for i in self._deployed.get(app_id, []) + ] + return {app_id: ids} + return { + aid: [str(i.appInstanceId) for i in insts] + for aid, insts in self._deployed.items() + } + + def find_deployments( + self, + app_id: Optional[str] = None, + app_instance_id: Optional[str] = None, + region: Optional[str] = None, + ) -> List[AppInstanceInfo]: + with self._lock: + # Fast path by instance id + if app_instance_id: + for insts in self._deployed.values(): + for inst in insts: + if str(inst.appInstanceId.root) == app_instance_id: + if app_id and str(inst.appId) != app_id: + return [] + if region is not None and getattr( + inst, "region", None) != region: + return [] + return [inst] + return [] + + results: List[AppInstanceInfo] = [] + for aid, insts in self._deployed.items(): + if app_id and aid != app_id: + continue + for inst in insts: + if region is not None and getattr(inst, "region", + None) != region: + continue + results.append(inst) + return results + + def remove_deployment(self, app_instance_id: str) -> Optional[str]: + with self._lock: + for aid, insts in list(self._deployed.items()): + for idx, inst in enumerate(insts): + # Compare using the instance id string + inst_id = getattr(inst.appInstanceId, "root", + str(inst.appInstanceId)) + if inst_id == app_instance_id: + insts.pop(idx) + if not insts: + self._deployed.pop(aid, None) + # Return a plain string app_id + aid_str = getattr(aid, "root", str(aid)) + return aid_str + return None + + def store_stopped_instance(self, app_id: str, + app_instance_id: str) -> None: + with self._lock: + lst = self._stopped.setdefault(app_id, []) + if app_instance_id not in lst: + lst.append(app_instance_id) + + def get_stopped_instances( + self, + app_id: Optional[str] = None + ) -> Union[List[str], Dict[str, List[str]]]: + with self._lock: + if app_id: + return list(self._stopped.get(app_id, [])) + return {aid: list(ids) for aid, ids in self._stopped.items()} + + def remove_stopped_instances(self, app_id: str) -> None: + with self._lock: + self._stopped.pop(app_id, None) + + # ------------------------------------------------------------------------ + # GSMA + # ------------------------------------------------------------------------ + def store_app_gsma(self, app_id: str, model: ApplicationModel) -> None: + with self._lock: + self._apps_gsma[app_id] = model + + def get_app_gsma(self, app_id: str) -> Optional[ApplicationModel]: + with self._lock: + return self._apps_gsma.get(app_id) + + def list_apps_gsma(self) -> List[ApplicationModel]: + with self._lock: + return list(self._apps_gsma.values()) + + def delete_app_gsma(self, app_id: str) -> None: + with self._lock: + self._apps_gsma.pop(app_id, None) + + def store_deployment_gsma( + self, + app_id: str, + inst: AppInstance, + status: Optional[AppInstanceStatus] = None, # not persisted yet + ) -> None: + with self._lock: + self._deployed_gsma.setdefault(app_id, []).append(inst) + # If you later want to persist status per instance, keep a side map: + # self._status_gsma[inst.appInstIdentifier] = status + + def get_deployments_gsma(self, + app_id: Optional[str] = None + ) -> Dict[str, List[str]]: + with self._lock: + if app_id: + ids = [ + str(i.appInstIdentifier) + for i in self._deployed_gsma.get(app_id, []) + ] + return {app_id: ids} + return { + aid: [str(i.appInstIdentifier) for i in insts] + for aid, insts in self._deployed_gsma.items() + } + + def find_deployments_gsma( + self, + app_id: Optional[str] = None, + app_instance_id: Optional[str] = None, + zone_id: Optional[str] = None, + ) -> List[AppInstance]: + with self._lock: + # Limit the search space if app_id is provided + iter_lists = ([self._deployed_gsma.get(app_id, [])] + if app_id else self._deployed_gsma.values()) + + # Fast path: instance id provided + if app_instance_id: + target_id = str(app_instance_id) + for insts in iter_lists: + for inst in insts: + if str(inst.appInstIdentifier) != target_id: + continue + if zone_id is not None and inst.zoneId != zone_id: + continue + return [inst] + return [] + + # General filtering + results: List[AppInstance] = [] + for insts in iter_lists: + for inst in insts: + if zone_id is not None and inst.zoneId != zone_id: + continue + results.append(inst) + return results + + def remove_deployment_gsma(self, app_instance_id: str) -> Optional[str]: + with self._lock: + for aid, insts in list(self._deployed_gsma.items()): + for idx, inst in enumerate(insts): + if str(inst.appInstIdentifier) == app_instance_id: + insts.pop(idx) + if not insts: + self._deployed_gsma.pop(aid, None) + return aid + return None + + def store_stopped_instance_gsma(self, app_id: str, + app_instance_id: str) -> None: + with self._lock: + lst = self._stopped_gsma.setdefault(app_id, []) + if app_instance_id not in lst: + lst.append(app_instance_id) + + def get_stopped_instances_gsma( + self, + app_id: Optional[str] = None + ) -> Union[List[str], Dict[str, List[str]]]: + with self._lock: + if app_id: + return list(self._stopped_gsma.get(app_id, [])) + return {aid: list(ids) for aid, ids in self._stopped_gsma.items()} + + def remove_stopped_instances_gsma(self, app_id: str) -> None: + with self._lock: + self._stopped_gsma.pop(app_id, None) + + # ------------------------------------------------------------------------ + # GSMA Artefacts + # ------------------------------------------------------------------------ + def store_artefact_gsma(self, artefact: Artefact) -> None: + with self._lock: + self._artefacts_gsma[artefact.artefactId] = artefact + + def get_artefact_gsma(self, artefact_id: str) -> Optional[Artefact]: + with self._lock: + return self._artefacts_gsma.get(artefact_id) + + def list_artefacts_gsma(self) -> List[Artefact]: + with self._lock: + return list(self._artefacts_gsma.values()) + + def delete_artefact_gsma(self, artefact_id: str) -> None: + with self._lock: + self._artefacts_gsma.pop(artefact_id, None) diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/sqlite_storage.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/sqlite_storage.py new file mode 100644 index 0000000000000000000000000000000000000000..9fcb360a3a94b2ef691462a72a5b5a8445f67453 --- /dev/null +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/storageManagement/sqlite_storage.py @@ -0,0 +1,229 @@ +''' +SQLite storage implementation +''' +import sqlite3 +import json +from functools import wraps +from typing import Dict, List, Optional, Union +from sunrise6g_opensdk.edgecloud.core.camara_schemas import AppInstanceInfo, AppInstanceName,\ + AppProvider, AppId, AppInstanceId, EdgeCloudZoneId, Status +from sunrise6g_opensdk.edgecloud.adapters.aeros.storageManagement.appStorageManager\ + import AppStorageManager +from sunrise6g_opensdk.edgecloud.adapters.aeros import config +from sunrise6g_opensdk.logger import setup_logger + +decorator_logger = setup_logger() + + +def debug_log(msg: str): + """ + Decorator that logs the given message if config.DEBUG is True. + """ + + def decorator(func): + + @wraps(func) + def wrapper(*args, **kwargs): + if config.DEBUG: + decorator_logger.debug("[DEBUG] %s", msg) + return func(*args, **kwargs) + + return wrapper + + return decorator + + +class SQLiteAppStorage(AppStorageManager): + ''' + SQLite storage implementation + ''' + + @debug_log("Initializing SQLITE storage manager") + def __init__(self, db_path: str = "app_storage.db"): + if config.DEBUG: + self.logger = setup_logger() + self.logger.info("DB Path: %s", db_path) + self.conn = sqlite3.connect(db_path, check_same_thread=False) + self._init_schema() + + def _init_schema(self): + if config.DEBUG: + self.logger.info("Initializing db schema") + cursor = self.conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS apps ( + app_id TEXT PRIMARY KEY, + manifest TEXT + ); + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS deployments ( + app_instance_id TEXT PRIMARY KEY, + app_id TEXT, + name TEXT, + app_provider TEXT, + status TEXT, + component_endpoint_info TEXT, + kubernetes_cluster_ref TEXT, + edge_cloud_zone_id TEXT + ); + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS stopped ( + app_id TEXT, + app_instance_id TEXT + ); + """) + self.conn.commit() + + @debug_log("In SQLITE store_app method ") + def store_app(self, app_id: str, manifest: Dict) -> None: + self.conn.execute( + "INSERT OR REPLACE INTO apps (app_id, manifest) VALUES (?, ?);", + (app_id, json.dumps(manifest))) + self.conn.commit() + + @debug_log("In SQLITE get_app method ") + def get_app(self, app_id: str) -> Optional[Dict]: + row = self.conn.execute("SELECT manifest FROM apps WHERE app_id = ?;", + (app_id, )).fetchone() + return json.loads(row[0]) if row else None + + @debug_log("In SQLITE app_exists method ") + def app_exists(self, app_id: str) -> bool: + row = self.conn.execute("SELECT 1 FROM apps WHERE app_id = ?;", + (app_id, )).fetchone() + return row is not None + + @debug_log("In SQLITE list_apps method ") + def list_apps(self) -> List[Dict]: + rows = self.conn.execute("SELECT manifest FROM apps;").fetchall() + return [json.loads(row[0]) for row in rows] + + @debug_log("In SQLITE delete_app method ") + def delete_app(self, app_id: str) -> None: + self.conn.execute("DELETE FROM apps WHERE app_id = ?;", (app_id, )) + self.conn.commit() + + @debug_log("In SQLITE store_deployment method ") + def store_deployment(self, app_instance: AppInstanceInfo) -> None: + resolved_status = (str(app_instance.status.value) if hasattr( + app_instance.status, "value") else str(app_instance.status) + if app_instance.status else "unknown") + self.logger.info("Resolved status for DB insert: %s", resolved_status) + + self.conn.execute( + """ + INSERT OR REPLACE INTO deployments ( + app_instance_id, app_id, name, app_provider, status, + component_endpoint_info, kubernetes_cluster_ref, edge_cloud_zone_id + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?); + """, + ( + str(app_instance.appInstanceId), + str(app_instance.appId), + str(app_instance.name.root), + str(app_instance.appProvider.root), + str(app_instance.status.value) if hasattr( + app_instance.status, "value") else + str(app_instance.status) if app_instance.status else "unknown", + json.dumps(app_instance.componentEndpointInfo) + if app_instance.componentEndpointInfo else None, + app_instance.kubernetesClusterRef, + str(app_instance.edgeCloudZoneId.root), + ), + ) + + self.conn.commit() + + @debug_log("In SQLITE get_deployments method ") + def get_deployments(self, + app_id: Optional[str] = None) -> Dict[str, List[str]]: + if app_id: + rows = self.conn.execute( + "SELECT app_id, app_instance_id FROM deployments WHERE app_id = ?;", + (app_id, )).fetchall() + else: + rows = self.conn.execute( + "SELECT app_id, app_instance_id FROM deployments;").fetchall() + + result: Dict[str, List[str]] = {} + for app_id_val, instance_id in rows: + result.setdefault(app_id_val, []).append(instance_id) + return result + + @debug_log("In SQLITE find_deployments method ") + def find_deployments( + self, + app_id: Optional[str] = None, + app_instance_id: Optional[str] = None, + region: Optional[str] = None, + ) -> List[AppInstanceInfo]: + query = "SELECT * FROM deployments WHERE 1=1" + params = [] + if app_id: + query += " AND app_id = ?" + params.append(app_id) + if app_instance_id: + query += " AND app_instance_id = ?" + params.append(app_instance_id) + + rows = self.conn.execute(query, params).fetchall() + + result = [] + for row in rows: + result.append( + AppInstanceInfo( + appInstanceId=AppInstanceId(row[0]), + appId=AppId(row[1]), + name=AppInstanceName(row[2]), + appProvider=AppProvider(row[3]), + status=Status(row[4]) if row[4] else Status.unknown, + componentEndpointInfo=json.loads(row[5]) + if row[5] else None, + kubernetesClusterRef=row[6], + edgeCloudZoneId=EdgeCloudZoneId(row[7]), + )) + + return result + + @debug_log("In SQLITE remove_deployments method ") + def remove_deployment(self, app_instance_id: str) -> Optional[str]: + row = self.conn.execute( + "SELECT app_id FROM deployments WHERE app_instance_id = ?;", + (app_instance_id, )).fetchone() + self.conn.execute("DELETE FROM deployments WHERE app_instance_id = ?;", + (app_instance_id, )) + self.conn.commit() + return row[0] if row else None + + @debug_log("In SQLITE store_stopped_instance method ") + def store_stopped_instance(self, app_id: str, + app_instance_id: str) -> None: + self.conn.execute( + "INSERT INTO stopped (app_id, app_instance_id) VALUES (?, ?);", + (app_id, app_instance_id)) + self.conn.commit() + + @debug_log("In SQLITE get_Stopped_instances method ") + def get_stopped_instances( + self, + app_id: Optional[str] = None + ) -> Union[List[str], Dict[str, List[str]]]: + if app_id: + rows = self.conn.execute( + "SELECT app_instance_id FROM stopped WHERE app_id = ?;", + (app_id, )).fetchall() + return [r[0] for r in rows] + else: + rows = self.conn.execute( + "SELECT app_id, app_instance_id FROM stopped;").fetchall() + result: Dict[str, List[str]] = {} + for aid, iid in rows: + result.setdefault(aid, []).append(iid) + return result + + @debug_log("In SQLITE remove_stopped_instances method ") + def remove_stopped_instances(self, app_id: str) -> None: + self.conn.execute("DELETE FROM stopped WHERE app_id = ?;", (app_id, )) + self.conn.commit() diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/utils.py b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/utils.py index 67cc543ba94fcbd607d5f0b4c3e7307df9f0b6cf..87f22e98a7ef6cfa20d8057fec70cc262378fbc3 100644 --- a/src/sunrise6g_opensdk/edgecloud/adapters/aeros/utils.py +++ b/src/sunrise6g_opensdk/edgecloud/adapters/aeros/utils.py @@ -6,38 +6,174 @@ # - Andreas Sakellaropoulos (asakellaropoulos@iit.demokritos.gr) ## """ -Docstring +aerOS help methods """ +import uuid +import string from requests.exceptions import HTTPError, RequestException, Timeout import sunrise6g_opensdk.edgecloud.adapters.aeros.config as config +import sunrise6g_opensdk.edgecloud.adapters.aeros.errors as errors from sunrise6g_opensdk.logger import setup_logger +_HEX = "0123456789abcdef" +_ALLOWED = set( + string.ascii_letters + + string.digits) # no underscore here; underscore is always escaped +_PREFIX = "A0_" # ensures name starts with a letter; stripped during decode + + +def encode_app_instance_name(original: str, *, max_len: int = 64) -> str: + """ + aerOS to CAMARA AppInstanceName encoder. + Reversibly encode `original` into a string matching ^[A-Za-z][A-Za-z0-9_]{1,63}$. + Uses underscore + two hex digits to escape any non [A-Za-z0-9] chars, including '_' itself. + If the encoded result would exceed `max_len`, raise ValueError (reversibility would be lost otherwise). + """ + out = [] + for ch in original: + if ch in _ALLOWED: + out.append(ch) + elif ch == "_": + out.append("_5f") + else: + # escape any other byte as _hh (lowercase hex) + out.append("_" + format(ord(ch), "02x")) + + enc = "".join(out) + + # must start with a letter + if not enc or enc[0] not in string.ascii_letters: + enc = _PREFIX + enc + + if len(enc) > max_len: + raise ValueError( + f"Encoded name exceeds {max_len} chars; cannot keep reversibility without external mapping." + ) + return enc + + +def decode_app_instance_name(encoded: str) -> str: + """ + CAMARA AppInstanceName to aerOS original app_id decoder. + Reverse of encode_app_instance_name. Restores the exact original string. + """ + s = encoded + if s.startswith(_PREFIX): + s = s[len(_PREFIX):] + + # walk and decode _hh sequences; underscores never appear unescaped in the encoding + i = 0 + out = [] + while i < len(s): + ch = s[i] + if ch != "_": + out.append(ch) + i += 1 + continue + + # expect two hex digits after underscore + if i + 2 >= len(s): + raise ValueError("Invalid escape at end of string.") + h1 = s[i + 1].lower() + h2 = s[i + 2].lower() + if h1 not in _HEX or h2 not in _HEX: + raise ValueError(f"Invalid escape sequence: _{h1}{h2}") + code = int(h1 + h2, 16) + out.append(chr(code)) + i += 3 + + return "".join(out) + + +def urn_to_uuid(urn: str) -> uuid.UUID: + """Convert a (ngsi-ld) URN string to a deterministic UUID.""" + return uuid.uuid5(uuid.NAMESPACE_URL, urn) + + +def map_aeros_service_status_to_gsma(status: str) -> str: + """ + Map aerOS service lifecycle states to GSMA-compliant status values. + + aerOS → GSMA + DEPLOYING → PENDING + DESTROYING → TERMINATING + DEPLOYED → DEPLOYED + FINISHED → No_Match + No_Match → READY + urn:ngsi-ld:null → No Match + """ + mapping = { + "DEPLOYING": "PENDING", + "DESTROYING": "TERMINATING", + "DEPLOYED": "DEPLOYED", + "FINISHED": "READY", + # "urn:ngsi-ld:null": "READY", + } + if not status: + return "FAILED" + return mapping.get(status.strip().upper(), "FAILED") + def catch_requests_exceptions(func): """ - Docstring + Decorator to catch and translate requests exceptions into custom app errors. """ logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE) def wrapper(*args, **kwargs): try: - result = func(*args, **kwargs) - return result + return func(*args, **kwargs) + except HTTPError as e: - logger.info("4xx or 5xx: %s \n", {e}) - return None # raise our custom exception or log, etc. - except ConnectionError as e: - logger.info( - "Raised for connection-related issues (e.g., DNS resolution failure, network issues): %s \n", - {e}, - ) - return None # raise our custom exception or log, etc. + response = getattr(e, "response", None) + status_code = getattr(response, "status_code", None) + logger.error("HTTPError occurred: %s", e) + + if status_code == 401: + raise errors.UnauthenticatedError("Unauthorized access") from e + elif status_code == 403: + raise errors.PermissionDeniedError("Forbidden access") from e + elif status_code == 404: + raise errors.ResourceNotFoundError("Resource not found") from e + elif status_code == 400: + raise errors.InvalidArgumentError("Bad request") from e + elif status_code == 503: + raise errors.ServiceUnavailableError( + "Service unavailable") from e + + raise errors.EdgeCloudPlatformError( + f"Unhandled HTTP error: {status_code}") from e + except Timeout as e: - logger.info("Timeout occured: %s \n", {e}) - return None # raise our custom exception or log, etc. + logger.warning("Timeout occurred: %s", e) + raise errors.ServiceUnavailableError("Request timed out") from e + + except ConnectionError as e: + logger.warning("Connection error (e.g., DNS): %s", e) + raise errors.ServiceUnavailableError("Connection issue") from e + except RequestException as e: - logger.info("Request failed: %s \n", {e}) - return None # raise our custom exception or log, etc. + # Catch other unclassified request exceptions (non-HTTP) + logger.error("Request failed: %s", str(e)) + + if e.response is not None: + logger.error("Status Code: %s", e.response.status_code) + logger.error("Response Body (raw): %s", e.response.text) + + try: + json_data = e.response.json() + logger.debug("Parsed JSON response: %s", json_data) + except ValueError: + logger.warning("Response body is not valid JSON.") + + if e.request is not None: + logger.error("Request URL: %s", e.request.url) + logger.error("Request Method: %s", e.request.method) + logger.error("Request Headers: %s", e.request.headers) + logger.error("Request Body: %s", e.request.body) + + raise errors.EdgeCloudPlatformError( + "Unhandled request error") from e return wrapper diff --git a/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py b/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py index 70fa247f0f36911f76ce8460fffc7a71dbbffd7e..f76d6a53beac2a196de718799e5c61310fd7fa1d 100644 --- a/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py +++ b/src/sunrise6g_opensdk/edgecloud/adapters/i2edge/client.py @@ -8,7 +8,6 @@ # - César Cajas (cesar.cajas@i2cat.net) ## import json -from copy import deepcopy from typing import Dict, List, Optional from pydantic import ValidationError @@ -798,11 +797,28 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): try: response = i2edge_get(url, params=params, expected_status=200) response_json = response.json() + # TODO: fix malformed GPU field in i2Edge, it should be a list of objects, not strings + # --- Quick fix for malformed GPU entries --- + quota_limits = response_json.get("computeResourceQuotaLimits", []) + for item in quota_limits: + if isinstance(item, dict) and isinstance(item.get("gpu"), list): + fixed_gpu = [] + for g in item["gpu"]: + if isinstance(g, str): + try: + # Convert single quotes to double quotes for valid JSON + fixed_gpu.append(json.loads(g.replace("'", '"'))) + except json.JSONDecodeError: + continue # ignore invalid entries + else: + fixed_gpu.append(g) + item["gpu"] = fixed_gpu + # --- End quick fix --- mapped = map_zone(response_json) try: validated_data = gsma_schemas.ZoneRegisteredData.model_validate(mapped) except ValidationError as e: - raise ValueError(f"Invalid schema: {e}") + raise ValueError(f"Invalid schema: {e}") from e return build_custom_http_response( status_code=200, content=validated_data.model_dump(), @@ -829,15 +845,23 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :return: Response with artefact upload confirmation. """ try: - artefact_id = request_body["artefactId"] - artefact_name = request_body["artefactName"] - repo_data = request_body["artefactRepoLocation"] + # Validate input body with GSMA schema + gsma_validated_body = gsma_schemas.Artefact.model_validate(request_body) + body = gsma_validated_body.model_dump() + except ValidationError as e: + log.error(f"Invalid GSMA artefact request body: {e}") + raise + + try: + artefact_id = body["artefactId"] + artefact_name = body["artefactName"] + repo_data = body["artefactRepoLocation"] transformed = { "artefact_id": artefact_id, "artefact_name": artefact_name, "repo_name": repo_data.get("repoName", ""), - "repo_type": request_body.get("repoType"), + "repo_type": body.get("repoType"), "repo_url": repo_data["repoURL"], "user_name": repo_data.get("userName"), "password": repo_data.get("password"), @@ -855,6 +879,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): request=response.request, ) return response + except I2EdgeError as e: log.error(f"Failed to create artefact: {e}") raise @@ -870,7 +895,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): response = self.get_artefact(artefact_id) if response.status_code == 200: response_json = response.json() - content = gsma_schemas.Artefact( + content = gsma_schemas.ArtefactRetrieve( artefactId=response_json.get("artefact_id"), appProviderId=response_json.get("id"), artefactName=response_json.get("name"), @@ -889,7 +914,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): ), ) try: - validated_data = gsma_schemas.Artefact.model_validate(content) + validated_data = gsma_schemas.ArtefactRetrieve.model_validate(content) except ValidationError as e: raise ValueError(f"Invalid schema: {e}") return build_custom_http_response( @@ -940,13 +965,18 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param request_body: Payload with onboarding info. :return: Response with onboarding confirmation. """ - body = deepcopy(request_body) try: - body["app_id"] = body.pop("appId") - body.pop("edgeAppFQDN", None) - data = body + # Validate input against GSMA schema + gsma_validated_body = gsma_schemas.AppOnboardManifestGSMA.model_validate(request_body) + data = gsma_validated_body.model_dump() + except ValidationError as e: + log.error(f"Invalid GSMA input schema: {e}") + raise + try: + data["app_id"] = data.pop("appId") + data.pop("edgeAppFQDN", None) payload = i2edge_schemas.ApplicationOnboardingRequest(profile_data=data) - url = "{}/application/onboarding".format(self.base_url) + url = f"{self.base_url}/application/onboarding" response = i2edge_post(url, payload, expected_status=201) return build_custom_http_response( status_code=200, @@ -1029,18 +1059,26 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param request_body: Payload with updated onboarding info. :return: Response with update confirmation. """ - url = f"{self.base_url}/application/onboarding/{app_id}" - params = {} - response = i2edge_get(url, params, expected_status=200) - response_json = response.json() - app_component_specs = request_body.get("appComponentSpecs") - app_qos_profile = request_body.get("appUpdQoSProfile") - response_json["profile_data"]["appQoSProfile"] = app_qos_profile - response_json["profile_data"]["appComponentSpecs"] = app_component_specs - data = response_json.get("profile_data") try: + # Validate input body using GSMA schema + gsma_validated_body = gsma_schemas.PatchOnboardedAppGSMA.model_validate(request_body) + patch_payload = gsma_validated_body.model_dump() + except ValidationError as e: + log.error(f"Invalid GSMA input schema: {e}") + raise + try: + url = f"{self.base_url}/application/onboarding/{app_id}" + params = {} + response = i2edge_get(url, params, expected_status=200) + response_json = response.json() + # Update fields + app_component_specs = patch_payload.get("appComponentSpecs") + app_qos_profile = patch_payload.get("appUpdQoSProfile") + response_json["profile_data"]["appQoSProfile"] = app_qos_profile + response_json["profile_data"]["appComponentSpecs"] = app_component_specs + data = response_json.get("profile_data") payload = i2edge_schemas.ApplicationOnboardingRequest(profile_data=data) - url = "{}/application/onboarding/{}".format(self.base_url, app_id) + url = f"{self.base_url}/application/onboarding/{app_id}" response = i2edge_patch(url, payload, expected_status=200) return build_custom_http_response( status_code=200, @@ -1088,10 +1126,16 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param request_body: Payload with deployment information. :return: Response with deployment details. """ - body = deepcopy(request_body) try: - zone_id = body.get("zoneInfo").get("zoneId") - flavour_id = body.get("zoneInfo").get("flavourId") + # Validate input against GSMA schema + gsma_validated_body = gsma_schemas.AppDeployPayloadGSMA.model_validate(request_body) + body = gsma_validated_body.model_dump() + except ValidationError as e: + log.error(f"Invalid GSMA input schema: {e}") + raise + try: + zone_id = body.get("zoneInfo", {}).get("zoneId") + flavour_id = body.get("zoneInfo", {}).get("flavourId") app_deploy_data = i2edge_schemas.AppDeployData( appId=body.get("appId"), appProviderId=body.get("appProviderId"), @@ -1099,18 +1143,15 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): zoneInfo=i2edge_schemas.ZoneInfoRef(flavourId=flavour_id, zoneId=zone_id), ) payload = i2edge_schemas.AppDeploy(app_deploy_data=app_deploy_data) - url = "{}/application_instance".format(self.base_url) + url = f"{self.base_url}/application_instance" response = i2edge_post(url, payload, expected_status=202) - response_json = response.json() - content = gsma_schemas.AppInstance( + # Validate response against GSMA schema + app_instance = gsma_schemas.AppInstance( zoneId=response_json.get("zoneID"), appInstIdentifier=response_json.get("app_instance_id"), ) - try: - validated_data = gsma_schemas.AppInstance.model_validate(content) - except ValidationError as e: - raise ValueError(f"Invalid schema: {e}") + validated_data = gsma_schemas.AppInstance.model_validate(app_instance) return build_custom_http_response( status_code=202, content=validated_data.model_dump(), @@ -1123,6 +1164,10 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): log.error(f"Failed to deploy app: {e}") raise + except ValidationError as e: + log.error(f"Invalid GSMA response schema: {e}") + raise + def get_deployed_app_gsma(self, app_id: str, app_instance_id: str, zone_id: str) -> Response: """ Retrieves an application instance details from partner OP using GSMA federation. diff --git a/src/sunrise6g_opensdk/edgecloud/core/gsma_schemas.py b/src/sunrise6g_opensdk/edgecloud/core/gsma_schemas.py index 1a1a661fa081c50a607fc968b5e6f243883b4451..af2bd51943ef1e8fc62b866aecae66ecf5142254 100644 --- a/src/sunrise6g_opensdk/edgecloud/core/gsma_schemas.py +++ b/src/sunrise6g_opensdk/edgecloud/core/gsma_schemas.py @@ -128,7 +128,20 @@ class ArtefactRepoLocation(BaseModel): token: Optional[str] = None -class Artefact(BaseModel): +class ArtefactComponentSpec(BaseModel): + componentName: str + images: List[str] + numOfInstances: int + restartPolicy: str + commandLineParams: Optional[dict] = None + exposedInterfaces: Optional[List[dict]] = None + computeResourceProfile: Optional[dict] = None + compEnvParams: Optional[List[dict]] = None + deploymentConfig: Optional[dict] = None + persistentVolumes: Optional[List[dict]] = None + + +class ArtefactRetrieve(BaseModel): artefactId: str appProviderId: Optional[str] = None artefactName: str @@ -142,6 +155,22 @@ class Artefact(BaseModel): artefactRepoLocation: Optional[ArtefactRepoLocation] = None +class Artefact(BaseModel): + artefactId: str + appProviderId: str + artefactName: str + artefactVersionInfo: str + artefactDescription: Optional[str] = None + artefactVirtType: Literal["VM_TYPE", "CONTAINER_TYPE"] + artefactFileName: Optional[str] = None + artefactFileFormat: Optional[Literal["ZIP", "TAR", "TEXT", "TARGZ"]] = None + artefactDescriptorType: Literal["HELM", "TERRAFORM", "ANSIBLE", "SHELL", "COMPONENTSPEC"] + repoType: Optional[Literal["PRIVATEREPO", "PUBLICREPO", "UPLOAD"]] = None + artefactRepoLocation: Optional[ArtefactRepoLocation] = None + artefactFile: Optional[str] = None + componentSpec: List[ArtefactComponentSpec] + + # --------------------------- # ApplicationOnboardingManagement # --------------------------- diff --git a/tests/edgecloud/test_config_camara.py b/tests/edgecloud/test_config_camara.py index 6ce39c296215859b52f094cf9dd6087e5c1d4514..945545692cc2f206ecc145205653bb166601429a 100644 --- a/tests/edgecloud/test_config_camara.py +++ b/tests/edgecloud/test_config_camara.py @@ -68,14 +68,14 @@ CONFIG = { }, "aeros": { # Basic identifiers - "ZONE_ID": "", - "APP_ID": "", + "ZONE_ID": "8a4d95e8-8550-5664-8c67-b6c0c602f9be", + "APP_ID": "aeros-app-1", # CAMARA onboard_app payload "APP_ONBOARD_MANIFEST": { - "appId": "", - "name": "aeros-SDK-app", + "appId": "aeros-app-1", + "name": "aeros_SDK_app", "version": "1.0.0", - "appProvider": "aeros", + "appProvider": "aerOS_SDK", "packageType": "CONTAINER", "appRepo": { "type": "PUBLICREPO", @@ -113,11 +113,11 @@ CONFIG = { }, # CAMARA deploy_app payload "APP_DEPLOY_PAYLOAD": { - "appId": "", + "appId": "aeros-app-1", "appZones": [ { "EdgeCloudZone": { - "edgeCloudZoneId": "", + "edgeCloudZoneId": "8a4d95e8-8550-5664-8c67-b6c0c602f9be", "edgeCloudZoneName": "aeros-zone-1", "edgeCloudZoneStatus": "active", "edgeCloudProvider": "NCSRD", diff --git a/tests/edgecloud/test_config_gsma.py b/tests/edgecloud/test_config_gsma.py index 94fea67121eaaffe57b0ac6b87352b78b37b5b6b..5f2688fd81021c56757c6fdd181d70c284848463 100644 --- a/tests/edgecloud/test_config_gsma.py +++ b/tests/edgecloud/test_config_gsma.py @@ -7,8 +7,10 @@ CONFIG = { "REPO_TYPE": "PUBLICREPO", "REPO_URL": "https://cesarcajas.github.io/helm-charts-examples/", "APP_ONBOARD_MANIFEST_GSMA": { - "appId": "demo-app-id", - "appProviderId": "Y89TSlxMPDKlXZz7rN6vU2y", + "appId": + "demo-app-id", + "appProviderId": + "Y89TSlxMPDKlXZz7rN6vU2y", "appDeploymentZones": [ "Dmgoc-y2zv97lar0UKqQd53aS6MCTTdoGMY193yvRBYgI07zOAIktN2b9QB2THbl5Gqvbj5Zp92vmNeg7v4M" ], @@ -27,16 +29,20 @@ CONFIG = { "noOfUsersPerAppInst": 1, "appProvisioning": True, }, - "appComponentSpecs": [ - { - "serviceNameNB": "k8yyElSyJN4ctbNVqwodEQNUoGb2EzOEt4vQBjGnPii_5", - "serviceNameEW": "iDm08OZN", - "componentName": "HIEWqstajCmZJQmSFUj0kNHZ0xYvKWq720BKt8wjA41p", - "artefactId": "9c9143f0-f44f-49df-939e-1e8b891ba8f5", - } - ], - "appStatusCallbackLink": "string", - "edgeAppFQDN": "string", + "appComponentSpecs": [{ + "serviceNameNB": + "k8yyElSyJN4ctbNVqwodEQNUoGb2EzOEt4vQBjGnPii_5", + "serviceNameEW": + "iDm08OZN", + "componentName": + "HIEWqstajCmZJQmSFUj0kNHZ0xYvKWq720BKt8wjA41p", + "artefactId": + "9c9143f0-f44f-49df-939e-1e8b891ba8f5", + }], + "appStatusCallbackLink": + "string", + "edgeAppFQDN": + "string", }, "APP_DEPLOY_PAYLOAD_GSMA": { "appId": "demo-app-id", @@ -61,20 +67,25 @@ CONFIG = { }, "appComponentSpecs": [ { - "serviceNameNB": "7CI_9d4lAK90vU4ASUkKxYdQjsv3y3IuwucISSQ6lG5_EMqeyVUHPIhwa5", - "serviceNameEW": "tPihoUFj30938Bu9blpsHkvsec1iA7gqZZRMpsx6o7aSSj5", + "serviceNameNB": + "7CI_9d4lAK90vU4ASUkKxYdQjsv3y3IuwucISSQ6lG5_EMqeyVUHPIhwa5", + "serviceNameEW": + "tPihoUFj30938Bu9blpsHkvsec1iA7gqZZRMpsx6o7aSSj5", "componentName": "YCAhqPadfld8y68wJfTc6QNGguI41z", "artefactId": "9c9143f0-f44f-49df-939e-1e8b891ba8f5", }, { "serviceNameNB": "JCjR0Lc3J0sm2PcItECdbHXtpCLQCfq3B", - "serviceNameEW": "N8KBAdqT8L_sWOxeFZs3XYn6oykTTFHLiPKOS7kdYbw", + "serviceNameEW": + "N8KBAdqT8L_sWOxeFZs3XYn6oykTTFHLiPKOS7kdYbw", "componentName": "9aCfCEDe2Dv0Peg", "artefactId": "9c9143f0-f44f-49df-939e-1e8b891ba8f5", }, { - "serviceNameNB": "RIfXlfU9cDeLnrOBYzz9LJGdAjwPRp_3Mjp0Wq_RDlQiAPyXm", - "serviceNameEW": "31y8sCwvvyNCXfwtLhwJw6hoblG7ZcFzEjyFdAnzq7M8cxiOtDik0", + "serviceNameNB": + "RIfXlfU9cDeLnrOBYzz9LJGdAjwPRp_3Mjp0Wq_RDlQiAPyXm", + "serviceNameEW": + "31y8sCwvvyNCXfwtLhwJw6hoblG7ZcFzEjyFdAnzq7M8cxiOtDik0", "componentName": "3kTa4zKEX", "artefactId": "9c9143f0-f44f-49df-939e-1e8b891ba8f5", }, @@ -82,7 +93,158 @@ CONFIG = { }, }, "aeros": { - # PLACEHOLDER + "ZONE_ID": "urn:ngsi-ld:Domain:ncsrd01", + "ARTEFACT_ID": "artefact-nginx-001", + "ARTEFACT_NAME": "aeros-component", + "REPO_NAME": "dockerhub", + "REPO_TYPE": "PUBLICREPO", + "REPO_URL": "docker.io/library/nginx:stable", + "APP_ONBOARD_MANIFEST_GSMA": { + "appId": + "aeros-sdk-app", + "appProviderId": + "aeros-sdk-provider", + "appDeploymentZones": ["urn:ngsi-ld:Domain:ncsrd01"], + "appMetaData": { + "appName": "aeros_SDK_app", + "version": "string", + "appDescription": "test aeros sdk app", + "mobilitySupport": False, + "accessToken": "MfxADOjxDgBhMrqmBeG8XdQFLp2XviG3cZ_LM7uQKc9b", + "category": "IOT", + }, + "appQoSProfile": { + "latencyConstraints": "NONE", + "bandwidthRequired": 1, + "multiUserClients": "APP_TYPE_SINGLE_USER", + "noOfUsersPerAppInst": 1, + "appProvisioning": True, + }, + "appComponentSpecs": [{ + "serviceNameNB": + "gsma-deployed-app-service-nb", + "serviceNameEW": + "gsma-deployed-app-service-ew", + "componentName": + "nginx-component", + "artefactId": + "artefact-nginx-001", + }], + "appStatusCallbackLink": + "string", + "edgeAppFQDN": + "string", + }, + "APP_DEPLOY_PAYLOAD_GSMA": { + "appId": "aeros-sdk-app", + "appVersion": "1.0.0", + "appProviderId": "apps-sdk-deployer", + "zoneInfo": { + "zoneId": "urn:ngsi-ld:Domain:ncsrd01", + "flavourId": "FLAVOUR_BASIC", + "resourceConsumption": "RESERVED_RES_AVOID", + "resPool": "RESPOOL_DEFAULT", + }, + "appInstCallbackLink": "string", + }, + "PATCH_ONBOARDED_APP_GSMA": { + "appUpdQoSProfile": { + "latencyConstraints": "NONE", + "bandwidthRequired": 1, + "mobilitySupport": False, + "multiUserClients": "APP_TYPE_SINGLE_USER", + "noOfUsersPerAppInst": 1, + "appProvisioning": True, + }, + "appComponentSpecs": [ + { + "serviceNameNB": + "gsma-deployed-app-service-nb", + "serviceNameEW": + "gsma-deployed-app-service-ew", + "componentName": "nginx-component", + "artefactId": "artefact-nginx-001", + }, + { + "serviceNameNB": "JCjR0Lc3J0sm2PcItECdbHXtpCLQCfq3B", + "serviceNameEW": + "N8KBAdqT8L_sWOxeFZs3XYn6oykTTFHLiPKOS7kdYbw", + "componentName": "9aCfCEDe2Dv0Peg", + "artefactId": "9c9143f0-f44f-49df-939e-1e8b891ba8f5", + }, + { + "serviceNameNB": + "RIfXlfU9cDeLnrOBYzz9LJGdAjwPRp_3Mjp0Wq_RDlQiAPyXm", + "serviceNameEW": + "31y8sCwvvyNCXfwtLhwJw6hoblG7ZcFzEjyFdAnzq7M8cxiOtDik0", + "componentName": "3kTa4zKEX", + "artefactId": "9c9143f0-f44f-49df-939e-1e8b891ba8f5", + }, + ], + }, + "ARTEFACT_PAYLOAD_GSMA": { + "artefactId": + "artefact-nginx-001", + "appProviderId": + "ncsrd-provider", + "artefactName": + "nginx-web-server", + "artefactVersionInfo": + "1.0.0", + "artefactDescription": + "Containerized Nginx Web Server", + "artefactVirtType": + "CONTAINER_TYPE", + "artefactFileName": + "nginx-web-server-1.0.0.tgz", + "artefactFileFormat": + "TARGZ", + "artefactDescriptorType": + "COMPONENTSPEC", + "repoType": + "PUBLICREPO", + "artefactRepoLocation": { + "repoURL": "docker.io/library/nginx:stable", + "userName": "", + "password": "", + "token": "" + }, + "artefactFile": + "", + "componentSpec": [{ + "componentName": + "nginx-component", + "images": ["docker.io/library/nginx:stable"], + "numOfInstances": + 1, + "restartPolicy": + "Always", + "commandLineParams": { + }, + "exposedInterfaces": [{ + "name": "http-api", + "protocol": "TCP", + "port": 8080 + }], + "computeResourceProfile": { + "cpu": "2", + "memory": "4Gi" + }, + "compEnvParams": [{ + "name": "TEST_ENV", + "value": "TEST_VALUE_ENV" + }], + "deploymentConfig": { + "replicaStrategy": "RollingUpdate", + "maxUnavailable": 1 + }, + "persistentVolumes": [{ + "name": "NOT_USE", + "mountPath": "NOT_USED", + "size": "NOT_USED" + }] + }] + } }, "kubernetes": { # PLACEHOLDER diff --git a/tests/edgecloud/test_e2e_gsma.py b/tests/edgecloud/test_e2e_gsma.py index 089d0dc46ac04721ef320bbc39ae66c070ecabcd..efb47bd8e1384854f6118ee6fcf464dad6ffb740 100644 --- a/tests/edgecloud/test_e2e_gsma.py +++ b/tests/edgecloud/test_e2e_gsma.py @@ -32,6 +32,9 @@ from sunrise6g_opensdk.edgecloud.adapters.errors import EdgeCloudPlatformError from sunrise6g_opensdk.edgecloud.adapters.i2edge.client import ( EdgeApplicationManager as I2EdgeClient, ) +from sunrise6g_opensdk.edgecloud.adapters.aeros.client import ( + EdgeApplicationManager as aerosClient, +) from sunrise6g_opensdk.edgecloud.core import gsma_schemas from tests.edgecloud.test_cases import test_cases from tests.edgecloud.test_config_gsma import CONFIG @@ -72,6 +75,11 @@ def test_config_gsma_compliance(edgecloud_client): if "PATCH_ONBOARDED_APP_GSMA" in config: patch_payload = config["PATCH_ONBOARDED_APP_GSMA"] gsma_schemas.PatchOnboardedAppGSMA(**patch_payload) + + # Validate ARTEFACT creation payload is GSMA-compliant + if "ARTEFACT_PAYLOAD_GSMA" in config: + artefact_payload = config["ARTEFACT_PAYLOAD_GSMA"] + gsma_schemas.Artefact(**artefact_payload) except Exception as e: pytest.fail(f"Configuration is not GSMA-compliant for {edgecloud_client.client_name}: {e}") @@ -176,6 +184,21 @@ def test_artefact_methods_gsma(edgecloud_client): pytest.fail(f"Artefact creation failed: {e}") +@pytest.mark.parametrize("edgecloud_client", test_cases, ids=id_func, indirect=True) +def test_artefact_create_gsma(edgecloud_client): + config = CONFIG[edgecloud_client.client_name] + if isinstance(edgecloud_client, aerosClient): + try: + response = edgecloud_client.create_artefact_gsma( + request_body=config["ARTEFACT_PAYLOAD_GSMA"] + ) + assert response.status_code == 201 + except EdgeCloudPlatformError as e: + pytest.fail(f"Artefact creation failed: {e}") + + + + @pytest.mark.parametrize("edgecloud_client", test_cases, ids=id_func, indirect=True) def test_get_artefact_gsma(edgecloud_client): config = CONFIG[edgecloud_client.client_name] @@ -188,7 +211,7 @@ def test_get_artefact_gsma(edgecloud_client): assert isinstance(artefact, dict) # GSMA schema validation for artefact - validated_artefact = gsma_schemas.Artefact(**artefact) + validated_artefact = gsma_schemas.ArtefactRetrieve(**artefact) # Logical validation: verify our expected artefact_id is in the dict assert ( @@ -207,7 +230,7 @@ def test_onboard_app_gsma(edgecloud_client): try: response = edgecloud_client.onboard_app_gsma(config["APP_ONBOARD_MANIFEST_GSMA"]) assert isinstance(response, Response) - assert response.status_code == 200 + assert response.status_code == 201 except EdgeCloudPlatformError as e: pytest.fail(f"App onboarding failed: {e}") @@ -289,7 +312,7 @@ def test_get_all_deployed_apps_gsma(edgecloud_client): validated_instances = [] for instance_data in instances_data: - validated_instance = gsma_schemas.ZoneIdentifier(**instance_data) + validated_instance = gsma_schemas.AppInstance(**instance_data) validated_instances.append(validated_instance) except EdgeCloudPlatformError as e: @@ -330,6 +353,12 @@ def test_undeploy_app_gsma(edgecloud_client, app_instance_id_gsma): pytest.fail(f"App undeployment failed: {e}") +@pytest.mark.parametrize("edgecloud_client", test_cases, ids=id_func, indirect=True) +def test_timer_wait_10_seconds_2(edgecloud_client): + time.sleep(10) + + + @pytest.mark.parametrize("edgecloud_client", test_cases, ids=id_func, indirect=True) def test_patch_onboarded_app_gsma(edgecloud_client): config = CONFIG[edgecloud_client.client_name] @@ -350,7 +379,7 @@ def test_delete_onboarded_app_gsma(edgecloud_client): app_id = config["APP_ONBOARD_MANIFEST_GSMA"]["appId"] response = edgecloud_client.delete_onboarded_app_gsma(app_id) assert isinstance(response, Response) - assert response.status_code == 200 + assert response.status_code == 204 except EdgeCloudPlatformError as e: pytest.fail(f"App onboarding deletion failed: {e}")