Commit f7057df8 authored by Laskaratos Dimitris's avatar Laskaratos Dimitris
Browse files

Fixed bugs related to edge cloud management api

parent 5f2e3345
Loading
Loading
Loading
Loading
+21 −0
Original line number Original line Diff line number Diff line
"""
aerOS client
  This module provides a client for interacting with the aerOS REST API.
  It includes methods for onboarding/deploying applications,
    and querying aerOS continuum entities
  aerOS domain is exposed as zones
  aerOS services and service components are exposed as applications
  Client is initialized with a base URL for the aerOS API
    and an access token for authentication.
"""

from src.edgecloud.clients.aeros import config
from src.logger import setup_logger

logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE)

logger.info("aerOS client initialized")
logger.debug("aerOS API URL: %s", config.aerOS_API_URL)
logger.debug("aerOS access token: %s", config.aerOS_ACCESS_TOKEN)
logger.debug("aerOS debug mode: %s", config.DEBUG)
logger.debug("aerOS log file: %s", config.LOG_FILE)
+232 −13
Original line number Original line Diff line number Diff line
# Mocked API for testing purposes
##
from typing import Dict, List, Optional
# This file is part of the Open SDK
from swagger_server.adapters.edgecloud.core.edgecloud_interface import EdgeCloudManagementInterface
#
# Contributors:
#   - Vasilis Pitsilis (vpitsilis@dat.demokritos.gr, vpitsilis@iit.demokritos.gr)
#   - Andreas Sakellaropoulos (asakellaropoulos@iit.demokritos.gr)
##
from typing import Any, Dict, List, Optional

from src.edgecloud.clients.aeros import config
from src.edgecloud.clients.aeros.continuum_client import ContinuumClient
from src.edgecloud.core.edgecloud_interface import EdgeCloudManagementInterface
from src.logger import setup_logger



class EdgeApplicationManager(EdgeCloudManagementInterface):
class EdgeApplicationManager(EdgeCloudManagementInterface):
    """
    aerOS Continuum Client
    FIXME: Handle None responses from continuum client
    """

    def __init__(self, base_url: str):
        self.base_url = base_url
        self.logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE)

    def onboard_app(self, app_manifest: Dict) -> Dict:
    def onboard_app(self, app_manifest: Dict) -> Dict:
        print(f"Submitting application: {app_manifest}")
        # HLO-FE POST with TOSCA and app_id (service_id)
        return {"appId": "1234-5678"}
        service_id = app_manifest.get("serviceId")
        tosca_str = app_manifest.get("tosca")
        aeros_client = ContinuumClient(self.base_url)
        onboard_response = aeros_client.onboard_service(
            service_id=service_id, tosca_str=tosca_str
        )
        return {"appId": onboard_response["serviceId"]}


    def get_all_onboarded_apps(self) -> List[Dict]:
    def get_all_onboarded_apps(self) -> List[Dict]:
        return [{"appId": "1234-5678", "name": "TestApp"}]
        aeros_client = ContinuumClient(self.base_url)
        ngsild_params = "type=Service&format=simplified"
        aeros_apps = aeros_client.query_entities(ngsild_params)
        return [
            {"appId": service["id"], "name": service["name"]} for service in aeros_apps
        ]
        # return [{"appId": "1234-5678", "name": "TestApp"}]


    def get_onboarded_app(self, app_id: str) -> Dict:
    def get_onboarded_app(self, app_id: str) -> Dict:
        return {"appId": app_id, "name": "TestApp"}
        aeros_client = ContinuumClient(self.base_url)
        ngsild_params = "format=simplified"
        aeros_app = aeros_client.query_entity(app_id, ngsild_params)
        return {"appId": aeros_app["id"], "name": aeros_app["name"]}


    def delete_onboarded_app(self, app_id: str) -> None:
    def delete_onboarded_app(self, app_id: str) -> None:
        print(f"Deleting application: {app_id}")
        print(f"Deleting application: {app_id}")
        # TBD: Purge from continuum (make all ngsil-ld calls for servieId connected entities)
        # Should check if undeployed first


    def deploy_app(self, app_id: str, app_zones: List[Dict]) -> Dict:
    def deploy_app(self, app_id: str, app_zones: List[Dict]) -> Dict:
        return {"appInstanceId": "abcd-efgh"}
        # HLO-FE PUT with app_id (service_id)
        aeros_client = ContinuumClient(self.base_url)
        deploy_response = aeros_client.deploy_service(app_id)
        return {"appInstanceId": deploy_response["serviceId"]}

    def get_all_deployed_apps(
        self,
        app_id: Optional[str] = None,
        app_instance_id: Optional[str] = None,
        region: Optional[str] = None,
    ) -> List[Dict]:
        # FIXME: Get services in deployed state
        aeros_client = ContinuumClient(self.base_url)
        ngsild_params = 'type=Service&format=simplified&q=actionType=="DEPLOYED"'
        if app_id:
            ngsild_params += f'&q=service=="{app_id}"'
        aeros_apps = aeros_client.query_entities(ngsild_params)
        return [
            {
                "appInstanceId": service["id"],
                "status":
                # scomponent["serviceComponentStatus"].split(":")[-1].lower()
                service["actionType"],
            }
            for service in aeros_apps
        ]
        # return [{"appInstanceId": "abcd-efgh", "status": "ready"}]


    def get_all_deployed_apps(self, app_id: Optional[str] = None, app_instance_id: Optional[str] = None, region: Optional[str] = None) -> List[Dict]:
    # def get_all_deployed_apps(self,
        return [{"appInstanceId": "abcd-efgh", "status": "ready"}]
    #                           app_id: Optional[str] = None,
    #                           app_instance_id: Optional[str] = None,
    #                           region: Optional[str] = None) -> List[Dict]:
    #     # FIXME: Get services in deployed state
    #     aeros_client = ContinuumClient(self.base_url)
    #     ngsild_params = "type=ServiceComponent&format=simplified"
    #     if app_id:
    #         ngsild_params += f'&q=service=="{app_id}"'
    #     aeros_apps = aeros_client.query_entities(ngsild_params)
    #     return [{
    #         "appInstanceId":
    #         scomponent["id"],
    #         "status":
    #         scomponent["serviceComponentStatus"].split(":")[-1].lower()
    #     } for scomponent in aeros_apps]
    #     # return [{"appInstanceId": "abcd-efgh", "status": "ready"}]


    def undeploy_app(self, app_instance_id: str) -> None:
    def undeploy_app(self, app_instance_id: str) -> None:
        print(f"Deleting app instance: {app_instance_id}")
        # HLO-FE DELETE with app_id (service_id)
        aeros_client = ContinuumClient(self.base_url)
        _ = aeros_client.undeploy_service(app_instance_id)

    def get_edge_cloud_zones(
        self, region: Optional[str] = None, status: Optional[str] = None
    ) -> List[Dict]:
        aeros_client = ContinuumClient(self.base_url)
        ngsild_params = "type=Domain&format=simplified"
        aeros_domains = aeros_client.query_entities(ngsild_params)
        return [
            {
                "edgeCloudZoneId": domain["id"],
                "status": domain["domainStatus"].split(":")[-1].lower(),
            }
            for domain in aeros_domains
        ]

    # return [{"edgeCloudZoneId": "zone-1", "status": "active"}]

    def get_edge_cloud_zones_details(
        self, zone_id: str, flavour_id: Optional[str] = None
    ) -> Dict:
        """
        Get details of a specific edge cloud zone.
        :param zone_id: The ID of the edge cloud zone
        :param flavour_id: Optional flavour ID to filter the results
        :return: Details of the edge cloud zone
        """
        # Minimal mocked response based on required fields of 'ZoneRegisteredData' in GSMA OPG E/WBI API
        # return {
        #     "zoneId":
        #     zone_id,
        #     "reservedComputeResources": [{
        #         "cpuArchType": "ISA_X86_64",
        #         "numCPU": "4",
        #         "memory": 8192,
        #     }],
        #     "computeResourceQuotaLimits": [{
        #         "cpuArchType": "ISA_X86_64",
        #         "numCPU": "8",
        #         "memory": 16384,
        #     }],
        #     "flavoursSupported": [{
        #         "flavourId":
        #         "medium-x86",
        #         "cpuArchType":
        #         "ISA_X86_64",
        #         "supportedOSTypes": [{
        #             "architecture": "x86_64",
        #             "distribution": "UBUNTU",
        #             "version": "OS_VERSION_UBUNTU_2204_LTS",
        #             "license": "OS_LICENSE_TYPE_FREE",
        #         }],
        #         "numCPU":
        #         4,
        #         "memorySize":
        #         8192,
        #         "storageSize":
        #         100,
        #     }],
        #     #
        # }
        aeros_client = ContinuumClient(self.base_url)
        ngsild_params = (
            f'format=simplified&type=InfrastructureElement&q=domain=="{zone_id}"'
        )
        self.logger.debug(
            "Querying infrastructure elements for zone %s with params: %s",
            zone_id,
            ngsild_params,
        )
        # Query the infrastructure elements for the specified zonese
        aeros_domain_ies = aeros_client.query_entities(ngsild_params)
        # Transform the infrastructure elements into the required format
        # and return the details of the edge cloud zone
        response = self.transform_infrastructure_elements(
            domain_ies=aeros_domain_ies, domain=zone_id
        )
        self.logger.debug("Transformed response: %s", response)
        # Return the transformed response
        return response

    def transform_infrastructure_elements(
        self, domain_ies: List[Dict[str, Any]], domain: str
    ) -> Dict[str, Any]:
        """
        Transform the infrastructure elements into a format suitable for the
        edge cloud zone details.
        :param domain_ies: List of infrastructure elements
        :param domain: The ID of the edge cloud zone
        :return: Transformed details of the edge cloud zone
        """
        total_cpu = 0
        total_ram = 0
        total_disk = 0
        total_available_ram = 0
        total_available_disk = 0

        flavours_supported = []

        for element in domain_ies:
            total_cpu += element.get("cpuCores", 0)
            total_ram += element.get("ramCapacity", 0)
            total_available_ram += element.get("availableRam", 0)
            total_disk += element.get("diskCapacity", 0)
            total_available_disk += element.get("availableDisk", 0)

            # Create a flavour per machine
            flavour = {
                "flavourId": f"{element.get('hostname')}-{element.get('containerTechnology')}",
                "cpuArchType": f"{element.get('cpuArchitecture')}",
                "supportedOSTypes": [
                    {
                        "architecture": f"{element.get('cpuArchitecture')}",
                        "distribution": f"{element.get('operatingSystem')}",  # assume
                        "version": "OS_VERSION_UBUNTU_2204_LTS",
                        "license": "OS_LICENSE_TYPE_FREE",
                    }
                ],
                "numCPU": element.get("cpuCores", 0),
                "memorySize": element.get("ramCapacity", 0),
                "storageSize": element.get("diskCapacity", 0),
            }
            flavours_supported.append(flavour)


    def get_edge_cloud_zones(self, region: Optional[str] = None, status: Optional[str] = None) -> List[Dict]:
        result = {
        return [{"edgeCloudZoneId": "zone-1", "status": "active"}]
            "zoneId": domain,
            "reservedComputeResources": [
                {
                    "cpuArchType": "ISA_X86_64",
                    "numCPU": str(total_cpu),
                    "memory": total_ram,
                }
            ],
            "computeResourceQuotaLimits": [
                {
                    "cpuArchType": "ISA_X86_64",
                    "numCPU": str(total_cpu * 2),  # Assume quota is 2x total?
                    "memory": total_ram * 2,
                }
            ],
            "flavoursSupported": flavours_supported,
        }
        return result
+24 −0
Original line number Original line Diff line number Diff line
##
# This file is part of the Open SDK
#
# Contributors:
#   - Vasilis Pitsilis (vpitsilis@dat.demokritos.gr, vpitsilis@iit.demokritos.gr)
#   - Andreas Sakellaropoulos (asakellaropoulos@iit.demokritos.gr)
##
"""
aerOS access configuration
Access tokens need to be provided in environment variables.
"""
import os

aerOS_API_URL = os.environ.get("aerOS_API_URL")
if not aerOS_API_URL:
    raise ValueError("Environment variable 'aerOS_API_URL' is not set.")
aerOS_ACCESS_TOKEN = os.environ.get("aerOS_ACCESS_TOKEN")
if not aerOS_ACCESS_TOKEN:
    raise ValueError("Environment variable 'aerOS_ACCESS_TOKEN' is not set.")
aerOS_HLO_TOKEN = os.environ.get("aerOS_HLO_TOKEN")
if not aerOS_HLO_TOKEN:
    raise ValueError("Environment variable 'aerOS_HLO_TOKEN' is not set.")
DEBUG = True
LOG_FILE = ".log/aeros_client.log"
+170 −0
Original line number Original line Diff line number Diff line
##
# This file is part of the Open SDK
#
# Contributors:
#   - Vasilis Pitsilis (vpitsilis@dat.demokritos.gr, vpitsilis@iit.demokritos.gr)
#   - Andreas Sakellaropoulos (asakellaropoulos@iit.demokritos.gr)
##
"""
aerOS REST API Client
   This client is used to interact with the aerOS REST API.
"""

import requests

from src.edgecloud.clients.aeros import config
from src.edgecloud.clients.aeros.utils import catch_requests_exceptions
from src.logger import setup_logger


class ContinuumClient:
    """
    Client to aerOS ngsi-ld based continuum exposure
    """

    def __init__(self, base_url: str = None):
        """
        :param base_url: the base url of the aerOS API
        """
        if base_url is None:
            self.api_url = config.aerOS_API_URL
        else:
            self.api_url = base_url
        self.logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE)
        self.m2m_cb_token = config.aerOS_ACCESS_TOKEN
        self.hlo_token = config.aerOS_HLO_TOKEN
        self.headers = {
            "Content-Type": "application/json",
            "Accept": "application/json",
            "aerOS": "true",
            "Authorization": f"Bearer {self.m2m_cb_token}",
        }
        self.hlo_headers = {
            "Content-Type": "application/json",
            "Accept": "application/json",
            "aerOS": "true",
            "Authorization": f"Bearer {self.hlo_token}",
        }
        self.hlo_onboard_headers = {
            "Content-Type": "application/yaml",
            "Authorization": f"Bearer {self.hlo_token}",
        }

    @catch_requests_exceptions
    def query_entity(self, entity_id, ngsild_params) -> dict:
        """
        Query entity with ngsi-ld params
        :input
        @param entity_id: the id of the queried entity
        @param ngsi-ld: the query params
        :output
        ngsi-ld object
        """
        entity_url = f"{self.api_url}/entities/{entity_id}?{ngsild_params}"
        response = requests.get(entity_url, headers=self.headers, timeout=15)
        if response is None:
            return None
        else:
            if config.DEBUG:
                self.logger.debug("Query entity URL: %s", entity_url)
                self.logger.debug(
                    "Query entity response: %s %s", response.status_code, response.text
                )
            return response.json()

    @catch_requests_exceptions
    def query_entities(self, ngsild_params):
        """
        Query entities with ngsi-ld params
        :input
        @param ngsi-ld: the query params
        :output
        ngsi-ld object
        """
        entities_url = f"{self.api_url}/entities?{ngsild_params}"
        response = requests.get(entities_url, headers=self.headers, timeout=15)
        if response is None:
            return None
        # else:
        #     if config.DEBUG:
        #         self.logger.debug("Query entities URL: %s", entities_url)
        #         self.logger.debug("Query entities response: %s %s",
        #                           response.status_code, response.text)
        return response.json()

    @catch_requests_exceptions
    def deploy_service(self, service_id: str) -> dict:
        """
        Re-allocate (deploy) service  on aerOS continuum
        :input
        @param service_id: the id of the service to be re-allocated
        :output
        the re-allocated service json object
        """
        re_allocate_url = f"{self.api_url}/hlo_fe/services/{service_id}"
        response = requests.put(re_allocate_url, headers=self.hlo_headers, timeout=15)
        if response is None:
            return None
        else:
            if config.DEBUG:
                self.logger.debug("Re-allocate service URL: %s", re_allocate_url)
                self.logger.debug(
                    "Re-allocate service response: %s %s",
                    response.status_code,
                    response.text,
                )
            return response.json()

    @catch_requests_exceptions
    def undeploy_service(self, service_id: str) -> dict:
        """
        Undeploy service
        :input
        @param service_id: the id of the service to be undeployed
        :output
        the undeployed service json object
        """
        undeploy_url = f"{self.api_url}/hlo_fe/services/{service_id}"
        response = requests.delete(undeploy_url, headers=self.hlo_headers, timeout=15)
        if response is None:
            return None
        else:
            if config.DEBUG:
                self.logger.debug("Re-allocate service URL: %s", undeploy_url)
                self.logger.debug(
                    "Undeploy service response: %s %s",
                    response.status_code,
                    response.text,
                )
            return response.json()

    @catch_requests_exceptions
    def onboard_service(self, service_id: str, tosca_str: str) -> dict:
        """
        Onboard (& deploy) service  on aerOS continuum
        :input
        @param service_id: the id of the service to onboarded (& deployed)
        @param tosca_str: the tosca whith all orchestration information
        :output
        the allocated service json object
        """
        onboard_url = f"{self.api_url}/hlo_fe/services/{service_id}"
        if config.DEBUG:
            self.logger.debug("Onboard service URL: %s", onboard_url)
            self.logger.debug(
                "Onboard service request body (TOSCA-YAML): %s", tosca_str
            )
        response = requests.post(
            onboard_url, data=tosca_str, headers=self.hlo_onboard_headers, timeout=15
        )
        if response is None:
            return None
        else:
            if config.DEBUG:
                self.logger.debug("Onboard service URL: %s", onboard_url)
                self.logger.debug(
                    "Onboard service response: %s %s",
                    response.status_code,
                    response.text,
                )
            return response.json()
Loading