Commit 5cfbde3f authored by Adrian Pino's avatar Adrian Pino
Browse files

Bring main, solve conflicts

parents f6fb15c2 9f4ffa7f
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -45,6 +45,7 @@ jupyter_client==8.6.3
jupyter_core==5.8.1
jupyterlab_pygments==0.3.0
keyring==25.6.0
kubernetes
markdown-it-py==3.0.0
MarkupSafe==3.0.2
matplotlib-inline==0.1.7
@@ -80,6 +81,7 @@ pydantic_core==2.33.1
pydub==0.25.1
pyflakes==3.2.0
Pygments==2.19.1
pymongo
pyproject_hooks==1.2.0
pytest==8.3.2
pytest-cov==6.0.0
@@ -111,5 +113,3 @@ wcwidth==0.2.13
webencodings==0.5.1
yarg==0.1.9
zipp==3.23.0
pymongo
kubernetes
+3 −2
Original line number Diff line number Diff line
@@ -16,8 +16,9 @@ from sunrise6g_opensdk.edgecloud.adapters.i2edge.client import (
    EdgeApplicationManager as I2EdgeClient,
)
from sunrise6g_opensdk.edgecloud.adapters.kubernetes.client import (
    EdgeApplicationManager as kubernetesClient
    EdgeApplicationManager as kubernetesClient,
)

# from sunrise6g_opensdk.network.adapters.oai.client import (
#     NetworkManager as OaiCoreClient,
# )
+231 −63
Original line number Diff line number Diff line
@@ -5,10 +5,14 @@
#   - Vasilis Pitsilis (vpitsilis@dat.demokritos.gr, vpitsilis@iit.demokritos.gr)
#   - Andreas Sakellaropoulos (asakellaropoulos@iit.demokritos.gr)
##
import uuid
from typing import Any, Dict, List, Optional

import yaml

from sunrise6g_opensdk.edgecloud.adapters.aeros import config
from sunrise6g_opensdk.edgecloud.adapters.aeros.continuum_client import ContinuumClient
from sunrise6g_opensdk.edgecloud.adapters.errors import EdgeCloudPlatformError
from sunrise6g_opensdk.edgecloud.core.edgecloud_interface import (
    EdgeCloudManagementInterface,
)
@@ -24,6 +28,9 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
    def __init__(self, base_url: str, **kwargs):
        self.base_url = base_url
        self.logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE)
        self._app_store: Dict[str, Dict] = {}
        self._deployed_services: Dict[str, List[str]] = {}
        self._stopped_services: Dict[str, List[str]] = {}

        # Overwrite config values if provided via kwargs
        if "aerOS_API_URL" in kwargs:
@@ -41,40 +48,190 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
            raise ValueError("Missing 'aerOS_HLO_TOKEN'")

    def onboard_app(self, app_manifest: Dict) -> Dict:
        # HLO-FE POST with TOSCA and app_id (service_id)
        service_id = app_manifest.get("serviceId")
        tosca_str = app_manifest.get("tosca")
        aeros_client = ContinuumClient(self.base_url)
        onboard_response = aeros_client.onboard_service(
            service_id=service_id, tosca_str=tosca_str
        app_id = app_manifest.get("appId")
        if not app_id:
            raise EdgeCloudPlatformError("Missing 'appId' in app manifest")

        if app_id in self._app_store:
            raise EdgeCloudPlatformError(
                f"Application with id '{app_id}' already exists"
            )
        return {"appId": onboard_response["serviceId"]}

        self._app_store[app_id] = app_manifest
        self.logger.debug("Onboarded application with id: %s", app_id)
        return {"appId": app_id}

    def get_all_onboarded_apps(self) -> List[Dict]:
        aeros_client = ContinuumClient(self.base_url)
        ngsild_params = "type=Service&format=simplified"
        aeros_apps = aeros_client.query_entities(ngsild_params)
        return [
            {"appId": service["id"], "name": service["name"]} for service in aeros_apps
        ]
        # return [{"appId": "1234-5678", "name": "TestApp"}]
        self.logger.debug("Onboarded applications: %s", list(self._app_store.keys()))
        return list(self._app_store.values())

    def get_onboarded_app(self, app_id: str) -> Dict:
        aeros_client = ContinuumClient(self.base_url)
        ngsild_params = "format=simplified"
        aeros_app = aeros_client.query_entity(app_id, ngsild_params)
        return {"appId": aeros_app["id"], "name": aeros_app["name"]}
        if app_id not in self._app_store:
            raise EdgeCloudPlatformError(
                f"Application with id '{app_id}' does not exist"
            )
        self.logger.debug("Retrieved application with id: %s", app_id)
        return self._app_store[app_id]

    def delete_onboarded_app(self, app_id: str) -> None:
        print(f"Deleting application: {app_id}")
        # TBD: Purge from continuum (make all ngsil-ld calls for servieId connected entities)
        # Should check if undeployed first
        if app_id not in self._app_store:
            raise EdgeCloudPlatformError(
                f"Application with id '{app_id}' does not exist"
            )
        service_instances = self._stopped_services.get(app_id, [])
        self.logger.debug(
            "Deleting application with id: %s and instances: %s",
            app_id,
            service_instances,
        )
        for service_instance in service_instances:
            self._purge_deployed_app_from_continuum(service_instance)
            self.logger.debug(
                "successfully purged service instance: %s", service_instance
            )
        del self._stopped_services[app_id]  # Clean up stopped services
        del self._app_store[app_id]  # Remove from onboarded apps

    def _generate_service_id(self, app_id: str) -> str:
        return f"urn:ngsi-ld:Service:{app_id}-{uuid.uuid4().hex[:4]}"

    def _generate_tosca_yaml_dict(
        self, app_manifest: Dict, app_zones: List[Dict]
    ) -> Dict:
        component = app_manifest.get("componentSpec", [{}])[0]
        component_name = component.get("componentName", "application")

        image_path = app_manifest.get("appRepo", {}).get("imagePath", "")
        image_file = image_path.split("/")[-1]
        repository_url = (
            "/".join(image_path.split("/")[:-1]) if "/" in image_path else "docker_hub"
        )
        zone_id = (
            app_zones[0].get("EdgeCloudZone", {}).get("edgeCloudZoneId", "default-zone")
        )

        # Extract minNodeMemory
        min_node_memory = (
            app_manifest.get("requiredResources", {})
            .get("applicationResources", {})
            .get("cpuPool", {})
            .get("topology", {})
            .get("minNodeMemory", 1024)
        )

        ports = {}
        for iface in component.get("networkInterfaces", []):
            interface_id = iface.get("interfaceId", "default")
            protocol = iface.get("protocol", "TCP").lower()
            port = iface.get("port", 8080)
            ports[interface_id] = {
                "properties": {"protocol": [protocol], "source": port}
            }

        expose_ports = any(
            iface.get("visibilityType") == "VISIBILITY_EXTERNAL"
            for iface in component.get("networkInterfaces", [])
        )

        yaml_dict = {
            "tosca_definitions_version": "tosca_simple_yaml_1_3",
            "description": f"TOSCA for {app_manifest.get('name', 'application')}",
            "serviceOverlay": False,
            "node_templates": {
                component_name: {
                    "type": "tosca.nodes.Container.Application",
                    "isJob": False,
                    "requirements": [
                        {
                            "network": {
                                "properties": {
                                    "ports": ports,
                                    "exposePorts": expose_ports,
                                }
                            }
                        },
                        {
                            "host": {
                                "node_filter": {
                                    "capabilities": [
                                        {
                                            "host": {
                                                "properties": {
                                                    "cpu_arch": {"equal": "x64"},
                                                    "realtime": {"equal": False},
                                                    "cpu_usage": {
                                                        "less_or_equal": "0.1"
                                                    },
                                                    "mem_size": {
                                                        "greater_or_equal": str(
                                                            min_node_memory
                                                        )
                                                    },
                                                    "domain_id": {"equal": zone_id},
                                                }
                                            }
                                        }
                                    ],
                                    "properties": None,
                                }
                            }
                        },
                    ],
                    "artifacts": {
                        "application_image": {
                            "file": image_file,
                            "type": "tosca.artifacts.Deployment.Image.Container.Docker",
                            "is_private": False,
                            "repository": repository_url,
                        }
                    },
                    "interfaces": {
                        "Standard": {
                            "create": {
                                "implementation": "application_image",
                                "inputs": {"cliArgs": [], "envVars": []},
                            }
                        }
                    },
                }
            },
        }

        return yaml_dict

    def deploy_app(self, app_id: str, app_zones: List[Dict]) -> Dict:
        # HLO-FE PUT with app_id (service_id)
        # 1. Get app CAMARA manifest
        app_manifest = self._app_store.get(app_id)
        if not app_manifest:
            raise EdgeCloudPlatformError(
                f"Application with id '{app_id}' does not exist"
            )

        # 2. Generate unique service ID
        service_id = self._generate_service_id(app_id)

        # 3. Convert dict to YAML string
        yaml_dict = self._generate_tosca_yaml_dict(app_manifest, app_zones)
        tosca_yaml = yaml.dump(yaml_dict, sort_keys=False)
        self.logger.info("Generated TOSCA YAML:")
        self.logger.info(tosca_yaml)

        # 4. Instantiate client and call continuum to deploy service
        aeros_client = ContinuumClient(self.base_url)
        deploy_response = aeros_client.deploy_service(app_id)
        return {"appInstanceId": deploy_response["serviceId"]}
        response = aeros_client.onboard_and_deploy_service(service_id, tosca_yaml)

        if "serviceId" not in response:
            raise EdgeCloudPlatformError(
                "Invalid response from onboard_service: missing 'serviceId'"
            )

        # 5. Track deployment
        if app_id not in self._deployed_services:
            self._deployed_services[app_id] = []
        self._deployed_services[app_id].append(service_id)

        # 6. Return expected format
        return {"appInstanceId": response["serviceId"]}

    def get_all_deployed_apps(
        self,
@@ -82,45 +239,57 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
        app_instance_id: Optional[str] = None,
        region: Optional[str] = None,
    ) -> List[Dict]:
        # FIXME: Get services in deployed state
        deployed = []
        for stored_app_id, instance_ids in self._deployed_services.items():
            for instance_id in instance_ids:
                deployed.append({"appId": stored_app_id, "appInstanceId": instance_id})
        return deployed

    def _purge_deployed_app_from_continuum(self, app_id: str) -> None:
        aeros_client = ContinuumClient(self.base_url)
        ngsild_params = 'type=Service&format=simplified&q=actionType=="DEPLOYED"'
        if app_id:
            ngsild_params += f'&q=service=="{app_id}"'
        aeros_apps = aeros_client.query_entities(ngsild_params)
        return [
            {
                "appInstanceId": service["id"],
                "status":
                # scomponent["serviceComponentStatus"].split(":")[-1].lower()
                service["actionType"],
            }
            for service in aeros_apps
        ]
        # return [{"appInstanceId": "abcd-efgh", "status": "ready"}]

    # def get_all_deployed_apps(self,
    #                           app_id: Optional[str] = None,
    #                           app_instance_id: Optional[str] = None,
    #                           region: Optional[str] = None) -> List[Dict]:
    #     # FIXME: Get services in deployed state
    #     aeros_client = ContinuumClient(self.base_url)
    #     ngsild_params = "type=ServiceComponent&format=simplified"
    #     if app_id:
    #         ngsild_params += f'&q=service=="{app_id}"'
    #     aeros_apps = aeros_client.query_entities(ngsild_params)
    #     return [{
    #         "appInstanceId":
    #         scomponent["id"],
    #         "status":
    #         scomponent["serviceComponentStatus"].split(":")[-1].lower()
    #     } for scomponent in aeros_apps]
    #     # return [{"appInstanceId": "abcd-efgh", "status": "ready"}]
        response = aeros_client.purge_service(app_id)
        if response:
            self.logger.debug("Purged deployed application with id: %s", app_id)
        else:
            raise EdgeCloudPlatformError(
                f"Failed to purg service with id from the continuum '{app_id}'"
            )

    def undeploy_app(self, app_instance_id: str) -> None:
        # HLO-FE DELETE with app_id (service_id)
        # 1. Locate app_id corresponding to this instance
        found_app_id = None
        for app_id, instances in self._deployed_services.items():
            if app_instance_id in instances:
                found_app_id = app_id
                break

        if not found_app_id:
            raise EdgeCloudPlatformError(
                f"No deployed app instance with ID '{app_instance_id}' found"
            )

        # 2. Call the external undeploy_service
        aeros_client = ContinuumClient(self.base_url)
        _ = aeros_client.undeploy_service(app_instance_id)
        try:
            aeros_client.undeploy_service(app_instance_id)
        except Exception as e:
            raise EdgeCloudPlatformError(
                f"Failed to undeploy app instance '{app_instance_id}': {str(e)}"
            ) from e

        # We could do it here with a little wait but better all instances in the same app are purged at once
        # 3. Purge the deployed app from continuum
        # self._purge_deployed_app_from_continuum(app_instance_id)

        # 4. Clean up internal tracking
        self._deployed_services[found_app_id].remove(app_instance_id)
        # Add instance to _stopped_services to purge it later
        if found_app_id not in self._stopped_services:
            self._stopped_services[found_app_id] = []
        self._stopped_services[found_app_id].append(app_instance_id)
        # If app has no instances left, remove it from deployed services
        if not self._deployed_services[found_app_id]:
            del self._deployed_services[found_app_id]

    def get_edge_cloud_zones(
        self, region: Optional[str] = None, status: Optional[str] = None
@@ -130,14 +299,13 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
        aeros_domains = aeros_client.query_entities(ngsild_params)
        return [
            {
                "edgeCloudZoneId": domain["id"],
                "zoneId": domain["id"],
                "status": domain["domainStatus"].split(":")[-1].lower(),
                "geographyDetails": "NOT_USED",
            }
            for domain in aeros_domains
        ]

    # return [{"edgeCloudZoneId": "zone-1", "status": "active"}]

    def get_edge_cloud_zones_details(
        self, zone_id: str, flavour_id: Optional[str] = None
    ) -> Dict:
+27 −1
Original line number Diff line number Diff line
@@ -139,7 +139,7 @@ class ContinuumClient:
            return response.json()

    @catch_requests_exceptions
    def onboard_service(self, service_id: str, tosca_str: str) -> dict:
    def onboard_and_deploy_service(self, service_id: str, tosca_str: str) -> dict:
        """
        Onboard (& deploy) service  on aerOS continuum
        :input
@@ -168,3 +168,29 @@ class ContinuumClient:
                    response.text,
                )
            return response.json()

    @catch_requests_exceptions
    def purge_service(self, service_id: str) -> bool:
        """
        Purge service from aerOS continuum
        :input
        @param service_id: the id of the service to be purged
        :output
        the purge result message from aerOS continuum
        """
        purge_url = f"{self.api_url}/hlo_fe/services/{service_id}/purge"
        response = requests.delete(purge_url, headers=self.hlo_headers, timeout=15)
        if response is None:
            return False
        else:
            if config.DEBUG:
                self.logger.debug("Purge service URL: %s", purge_url)
                self.logger.debug(
                    "Purge service response: %s %s",
                    response.status_code,
                    response.text,
                )
            if response.status_code != 200:
                self.logger.error("Failed to purge service: %s", response.text)
                return False
            return True
+34 −32
Original line number Diff line number Diff line
# Mocked API for testing purposes
import logging
import os
from typing import Dict, List, Optional

from kubernetes.client import V1Deployment

from sunrise6g_opensdk.edgecloud.adapters.kubernetes.lib.core.piedge_encoder import (
@@ -103,20 +103,22 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):

    def deploy_app(self, body: dict) -> Dict:
        logging.info(
            "Searching for registered app with ID: " + body.get('appId') + " in database..."
            "Searching for registered app with ID: "
            + body.get("appId")
            + " in database..."
        )
        app = self.connector_db.get_documents_from_collection(
            "service_functions", input_type="_id", input_value=body.get('appId')
            "service_functions", input_type="_id", input_value=body.get("appId")
        )
        # success_response = []
        result = None
        response = None
        if len(app) < 1:
            return "Application with ID: " + body.get('appId') + " not found", 404
            return "Application with ID: " + body.get("appId") + " not found", 404
        if app is not None:
            sf = DeployServiceFunction(
                service_function_name=app[0].get("name"),
                    service_function_instance_name=body.get('name'),
                service_function_instance_name=body.get("name"),
                # location=body.get('edgeCloudZoneId'),
            )
            result = deploy_service_function(
@@ -126,16 +128,16 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
            )
        if type(result) is V1Deployment:
            response = {}
            response['name'] = body.get('name')
            response['appId']= app[0].get('_id')
            response['appInstanceId'] = result.metadata.uid
            response['appProvider'] = app[0].get('appProvider')
            response['status'] = 'unknown'
            response['componentEndpointInfo']= {}
            response['kubernetesClusterRef'] = ''
            response['edgeCloudZoneId'] = body.get('edgeCloudZoneId')
            response["name"] = body.get("name")
            response["appId"] = app[0].get("_id")
            response["appInstanceId"] = result.metadata.uid
            response["appProvider"] = app[0].get("appProvider")
            response["status"] = "unknown"
            response["componentEndpointInfo"] = {}
            response["kubernetesClusterRef"] = ""
            response["edgeCloudZoneId"] = body.get("edgeCloudZoneId")
        else:
            response = {'Error': result}
            response = {"Error": result}
        return response

    def get_all_deployed_apps(
@@ -151,15 +153,15 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
        response = []
        for deployment in deployments:
            item = {}
            item['name'] = deployment.get('service_function_catalogue_name')
            item['appId'] = deployment.get('id')
            item['appProvider'] = deployment.get('appProvider')
            item["name"] = deployment.get("service_function_catalogue_name")
            item["appId"] = deployment.get("id")
            item["appProvider"] = deployment.get("appProvider")
            item["appInstanceId"] = deployment.get("uid")
            item["status"] = deployment.get("status")
            interfaces = []
            for port in deployment.get('ports'):
                access_point = {'port': port}
                interfaces.append({'interfaceId' : '','accessPoints': access_point})
            for port in deployment.get("ports"):
                access_point = {"port": port}
                interfaces.append({"interfaceId": "", "accessPoints": access_point})
            item["componentEndpointInfo"] = interfaces
            item["kubernetesClusterRef"] = ""
            item["edgeCloudZoneId"] = {}
@@ -209,8 +211,8 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
    ) -> Dict:
        nodes = self.k8s_connector.get_node_details()
        node_details = None
        for item in nodes.get('items'):
            if item.get('metadata').get('uid')==zone_id:
        for item in nodes.get("items"):
            if item.get("metadata").get("uid") == zone_id:
                node_details = item
                break
        labels = node_details.get("metadata").get("labels")
@@ -220,7 +222,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
            {
                "cpuArchType": arch_type,
                "numCPU": status.get("capacity").get("cpu"),
                "memory": status.get("capacity").get("memory")
                "memory": status.get("capacity").get("memory"),
                # "memory": int(status.get("capacity").get("memory")) / (1024 * 1024),
            }
        ]
@@ -228,7 +230,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
            {
                "cpuArchType": arch_type,
                "numCPU": status.get("allocatable").get("cpu"),
                "memory": status.get("allocatable").get("memory")
                "memory": status.get("allocatable").get("memory"),
                # "memory": int(status.get("allocatable").get("memory")) / (1024 * 1024),
            }
        ]
Loading