Skip to content
Snippets Groups Projects
Commit 7eae88a0 authored by Dimitrios Laskaratos's avatar Dimitrios Laskaratos
Browse files

Merge branch 'dev' into 'main'

Dev

See merge request !3
parents a48e0e8a 531519ea
No related branches found
No related tags found
1 merge request!3Dev
Showing
with 14 additions and 1099 deletions
...@@ -124,4 +124,4 @@ _{ ...@@ -124,4 +124,4 @@ _{
"kubernetesClusterRef": "", "kubernetesClusterRef": "",
"name": "nginx-test", "name": "nginx-test",
"status": "unknown" "status": "unknown"
}_ }_
\ No newline at end of file
FROM python:3.9-alpine FROM python:3.12-alpine
#RUN apk add git
RUN mkdir -p /usr/src/app RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app WORKDIR /usr/src/app
#RUN apk add --no-cache --virtual .build-deps gcc musl-dev
#RUN apk update && apk add python3-dev \
# gcc \
# libc-dev
#THIS SOLVED THE ISSUE WITH CFFI: building wheel for cffi (setup.py) finished with status 'error'!
#RUN apk add --no-cache libffi-dev build-base
#
COPY requirements.txt /usr/src/app/ COPY requirements.txt /usr/src/app/
#RUN pip3 install connexion
#ENV EMP_STORAGE_DRIVER mongo
#ENV EMP_STORAGE_URI mongodb://203.0.113.8:27017
#
#ENV PIP_ROOT_USER_ACTION=ignore
#
ENV PYTHONUNBUFFERED=1 ENV PYTHONUNBUFFERED=1
#RUN apk add --update --no-cache python3 && ln -sf python3 /usr/bin/python
#RUN python3 -m ensurepip
#RUN pip3 install --no-cache --upgrade pip setuptools RUN python3 -m venv .venv
RUN source .venv/bin/activate
RUN pip3 install --upgrade pip RUN pip3 install --upgrade pip
RUN pip3 install wheel RUN pip3 install wheel --trusted-host pypi.org --trusted-host pypi.python.org --trusted-host=files.pythonhosted.org
#RUN pip3 install --no-cache --upgrade setuptools
RUN pip3 install --trusted-host pypi.org --trusted-host pypi.python.org --trusted-host=files.pythonhosted.org --no-cache-dir -r requirements.txt RUN pip3 install --trusted-host pypi.org --trusted-host pypi.python.org --trusted-host=files.pythonhosted.org --no-cache-dir -r requirements.txt
COPY . /usr/src/app COPY . /usr/src/app
......
#connexion >= 2.6.0 connexion<3.0.0
#connexion[swagger-ui] >= 2.6.0
#python_dateutil == 2.6.0
#setuptools >= 21.0.0
#swagger-ui-bundle >= 0.0.2
#requests
#pymongo
#logging
#traceback
#pprint
#connexion >= 2.6.0
#connexion == 2.5.0 used in python 3.5 running server for dev
#connexion #for test
connexion[swagger-ui] connexion[swagger-ui]
python_dateutil == 2.6.0
setuptools >= 21.0.0 setuptools >= 21.0.0
#setuptools==50.3.2 requests==2.32.4
pymongo==3.12.0
#git+https://github.com/kubernetes-client/python.git
requests==2.25.1
#kubernetes==17.17.0
kubernetes==18.20.0
python-jose[cryptography]
cffi==1.15.1
#bcrypt
#bcrypt==3.1.7 #used in python 3.5 running server for dev
psycopg2-binary psycopg2-binary
#psycopg2==2.7.7 #used in python 3.5 running server for dev urllib3
pydantic-extra-types==2.10.3
#not used! sunrise6g-opensdk==1.0.2.post3
#pandas==0.24.2 \ No newline at end of file
paramiko>=2.12.0
urllib3
#!/usr/bin/env python3 #!/usr/bin/env python3
import connexion import connexion
import logging import logging
import sys import src.encoder as encoder
import os from json import JSONEncoder
from src import encoder
import urllib3 import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def main(): def main():
global driver
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
app = connexion.App(__name__, specification_dir='./swagger/') app = connexion.App(__name__, specification_dir='./swagger/')
app.app.json_encoder = encoder.JSONEncoder app.app.json_encoder = JSONEncoder
app.add_api('swagger.yaml', strict_validation=True, arguments={'title': 'π-edge Controller API'}, pythonic_params=True) app.add_api('swagger.yaml', strict_validation=True, arguments={'title': 'Service Resource Manager Controller API'}, pythonic_params=True)
app.run(port=8080) app.run(port=8080)
......
# -*- coding: utf-8 -*-
##
# Copyright 2025-present by Software Networks Area, i2CAT.
# All rights reserved.
#
# This file is part of the Open SDK
#
# Contributors:
# - Adrián Pino Martínez (adrian.pino@i2cat.net)
##
from typing import Dict
from src.common.sdk_factory import SdkFactory
class Sdk:
@staticmethod
def create_clients_from(
client_specs: Dict[str, Dict[str, str]],
) -> Dict[str, object]:
"""
Create and return a dictionary of instantiated edgecloud/network/o-ran clients
based on the provided specifications.
Args:
client_specs (dict): A dictionary where each key is the client's domain (e.g., 'edgecloud', 'network'),
and each value is a dictionary containing:
- 'client_name' (str): The specific name of the client (e.g., 'i2edge', 'open5gs').
- 'base_url' (str): The base URL for the client's API.
Additional parameters like 'scs_as_id' may also be included.
Returns:
dict: A dictionary where keys are the 'client_name' (str) and values are
the instantiated client objects.
Example:
>>> from src.common.universal_client_catalog import UniversalCatalogClient
>>>
>>> client_specs_example = {
>>> 'edgecloud': {
>>> 'client_name': 'i2edge',
>>> 'base_url': 'http://ip_edge_cloud:port',
>>> 'additionalEdgeCloudParamater1': 'example'
>>> },
>>> 'network': {
>>> 'client_name': 'open5gs',
>>> 'base_url': 'http://ip_network:port',
>>> 'additionalNetworkParamater1': 'example'
>>> }
>>> }
>>>
>>> clients = UniversalCatalogClient.create_clients(client_specs_example)
>>> edgecloud_client = clients.get("edgecloud")
>>> network_client = clients.get("network")
>>>
>>> edgecloud_client.get_edge_cloud_zones()
>>> network_client.get_qod_session(session_id="example_session_id")
"""
sdk_client = SdkFactory()
clients = {}
for domain, config in client_specs.items():
client_name = config["client_name"]
base_url = config["base_url"]
# Support of additional paramaters for specific clients
kwargs = {
k: v for k, v in config.items() if k not in ("client_name", "base_url")
}
client = sdk_client.instantiate_and_retrieve_clients(
domain, client_name, base_url, **kwargs
)
clients[domain] = client
return clients
# -*- coding: utf-8 -*-
##
# Copyright 2025-present by Software Networks Area, i2CAT.
# All rights reserved.
#
# This file is part of the Open SDK
#
# Contributors:
# - Adrián Pino Martínez (adrian.pino@i2cat.net)
##
from src.edgecloud.clients.aeros.client import EdgeApplicationManager as AerosClient
from src.edgecloud.clients.i2edge.client import EdgeApplicationManager as I2EdgeClient
from src.edgecloud.clients.piedge.client import EdgeApplicationManager as PiEdgeClient
# from src.network.clients.oai.client import NetworkManager as OaiCoreClient
# from src.network.clients.open5gcore.client import NetworkManager as Open5GCoreClient
# from src.network.clients.open5gs.client import NetworkManager as Open5GSClient
def _edgecloud_factory(client_name: str, base_url: str, **kwargs):
edge_cloud_factory = {
"aeros": lambda url, **kw: AerosClient(base_url=url, **kw),
"i2edge": lambda url: I2EdgeClient(base_url=url),
"piedge": lambda url, **kw: PiEdgeClient(base_url=url, **kw)
}
try:
return edge_cloud_factory[client_name](base_url, **kwargs)
except KeyError:
raise ValueError(
f"Invalid edgecloud client '{client_name}'. Available: {list(edge_cloud_factory)}"
)
# def _network_factory(client_name: str, base_url: str, **kwargs):
# if "scs_as_id" not in kwargs:
# raise ValueError("Missing required 'scs_as_id' for network clients.")
# scs_as_id = kwargs.pop("scs_as_id")
# network_factory = {
# "open5gs": lambda url, scs_id, **kw: Open5GSClient(
# base_url=url, scs_as_id=scs_id, **kw
# ),
# "oai": lambda url, scs_id, **kw: OaiCoreClient(
# base_url=url, scs_as_id=scs_id, **kw
# ),
# "open5gcore": lambda url, scs_id, **kw: Open5GCoreClient(
# base_url=url, scs_as_id=scs_id, **kw
# ),
# }
# try:
# return network_factory[client_name](base_url, scs_as_id, **kwargs)
# except KeyError:
# raise ValueError(
# f"Invalid network client '{client_name}'. Available: {list(network_factory)}"
# )
# def _oran_factory(client_name: str, base_url: str):
# # TODO
class SdkFactory:
_domain_factories = {
"edgecloud": _edgecloud_factory
# "network": _network_factory,
# "oran": _oran_factory,
}
@classmethod
def instantiate_and_retrieve_clients(
cls, domain: str, client_name: str, base_url: str, **kwargs
):
try:
catalog = cls._domain_factories[domain]
except KeyError:
raise ValueError(
f"Unsupported domain '{domain}'. Supported: {list(cls._domain_factories)}"
)
return catalog(client_name, base_url, **kwargs)
# #### Logging ####
# LOG_LEVEL="debug"
# LOG_FILE="edgecloud.log"
#### EdgeCloud ####
# EDGE_CLOUD="i2edge"
EDGE_CLOUD_URL=http://192.168.123.86:30769
"""
aerOS client
This module provides a client for interacting with the aerOS REST API.
It includes methods for onboarding/deploying applications,
and querying aerOS continuum entities
aerOS domain is exposed as zones
aerOS services and service components are exposed as applications
Client is initialized with a base URL for the aerOS API
and an access token for authentication.
"""
from src.edgecloud.clients.aeros import config
from src.logger import setup_logger
logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE)
# TODO: The following should only appear in case aerOS client is used
# Currently even if another client is used, the logs appear
# logger.info("aerOS client initialized")
# logger.debug("aerOS API URL: %s", config.aerOS_API_URL)
# logger.debug("aerOS access token: %s", config.aerOS_ACCESS_TOKEN)
# logger.debug("aerOS debug mode: %s", config.DEBUG)
# logger.debug("aerOS log file: %s", config.LOG_FILE)
##
# This file is part of the Open SDK
#
# Contributors:
# - Vasilis Pitsilis (vpitsilis@dat.demokritos.gr, vpitsilis@iit.demokritos.gr)
# - Andreas Sakellaropoulos (asakellaropoulos@iit.demokritos.gr)
##
from typing import Any, Dict, List, Optional
from src.edgecloud.clients.aeros import config
from src.edgecloud.clients.aeros.continuum_client import ContinuumClient
from src.edgecloud.core.edgecloud_interface import EdgeCloudManagementInterface
from src.logger import setup_logger
class EdgeApplicationManager(EdgeCloudManagementInterface):
"""
aerOS Continuum Client
FIXME: Handle None responses from continuum client
"""
def __init__(self, base_url: str, **kwargs):
self.base_url = base_url
self.logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE)
# Overwrite config values if provided via kwargs
if "aerOS_API_URL" in kwargs:
config.aerOS_API_URL = kwargs["aerOS_API_URL"]
if "aerOS_ACCESS_TOKEN" in kwargs:
config.aerOS_ACCESS_TOKEN = kwargs["aerOS_ACCESS_TOKEN"]
if "aerOS_HLO_TOKEN" in kwargs:
config.aerOS_HLO_TOKEN = kwargs["aerOS_HLO_TOKEN"]
if not config.aerOS_API_URL:
raise ValueError("Missing 'aerOS_API_URL'")
if not config.aerOS_ACCESS_TOKEN:
raise ValueError("Missing 'aerOS_ACCESS_TOKEN'")
if not config.aerOS_HLO_TOKEN:
raise ValueError("Missing 'aerOS_HLO_TOKEN'")
def onboard_app(self, app_manifest: Dict) -> Dict:
# HLO-FE POST with TOSCA and app_id (service_id)
service_id = app_manifest.get("serviceId")
tosca_str = app_manifest.get("tosca")
aeros_client = ContinuumClient(self.base_url)
onboard_response = aeros_client.onboard_service(
service_id=service_id, tosca_str=tosca_str
)
return {"appId": onboard_response["serviceId"]}
def get_all_onboarded_apps(self) -> List[Dict]:
aeros_client = ContinuumClient(self.base_url)
ngsild_params = "type=Service&format=simplified"
aeros_apps = aeros_client.query_entities(ngsild_params)
return [
{"appId": service["id"], "name": service["name"]} for service in aeros_apps
]
# return [{"appId": "1234-5678", "name": "TestApp"}]
def get_onboarded_app(self, app_id: str) -> Dict:
aeros_client = ContinuumClient(self.base_url)
ngsild_params = "format=simplified"
aeros_app = aeros_client.query_entity(app_id, ngsild_params)
return {"appId": aeros_app["id"], "name": aeros_app["name"]}
def delete_onboarded_app(self, app_id: str) -> None:
print(f"Deleting application: {app_id}")
# TBD: Purge from continuum (make all ngsil-ld calls for servieId connected entities)
# Should check if undeployed first
def deploy_app(self, app_id: str, app_zones: List[Dict]) -> Dict:
# HLO-FE PUT with app_id (service_id)
aeros_client = ContinuumClient(self.base_url)
deploy_response = aeros_client.deploy_service(app_id)
return {"appInstanceId": deploy_response["serviceId"]}
def get_all_deployed_apps(
self,
app_id: Optional[str] = None,
app_instance_id: Optional[str] = None,
region: Optional[str] = None,
) -> List[Dict]:
# FIXME: Get services in deployed state
aeros_client = ContinuumClient(self.base_url)
ngsild_params = 'type=Service&format=simplified&q=actionType=="DEPLOYED"'
if app_id:
ngsild_params += f'&q=service=="{app_id}"'
aeros_apps = aeros_client.query_entities(ngsild_params)
return [
{
"appInstanceId": service["id"],
"status":
# scomponent["serviceComponentStatus"].split(":")[-1].lower()
service["actionType"],
}
for service in aeros_apps
]
# return [{"appInstanceId": "abcd-efgh", "status": "ready"}]
# def get_all_deployed_apps(self,
# app_id: Optional[str] = None,
# app_instance_id: Optional[str] = None,
# region: Optional[str] = None) -> List[Dict]:
# # FIXME: Get services in deployed state
# aeros_client = ContinuumClient(self.base_url)
# ngsild_params = "type=ServiceComponent&format=simplified"
# if app_id:
# ngsild_params += f'&q=service=="{app_id}"'
# aeros_apps = aeros_client.query_entities(ngsild_params)
# return [{
# "appInstanceId":
# scomponent["id"],
# "status":
# scomponent["serviceComponentStatus"].split(":")[-1].lower()
# } for scomponent in aeros_apps]
# # return [{"appInstanceId": "abcd-efgh", "status": "ready"}]
def undeploy_app(self, app_instance_id: str) -> None:
# HLO-FE DELETE with app_id (service_id)
aeros_client = ContinuumClient(self.base_url)
_ = aeros_client.undeploy_service(app_instance_id)
def get_edge_cloud_zones(
self, region: Optional[str] = None, status: Optional[str] = None
) -> List[Dict]:
aeros_client = ContinuumClient(self.base_url)
ngsild_params = "type=Domain&format=simplified"
aeros_domains = aeros_client.query_entities(ngsild_params)
return [
{
"edgeCloudZoneId": domain["id"],
"status": domain["domainStatus"].split(":")[-1].lower(),
}
for domain in aeros_domains
]
# return [{"edgeCloudZoneId": "zone-1", "status": "active"}]
def get_edge_cloud_zones_details(
self, zone_id: str, flavour_id: Optional[str] = None
) -> Dict:
"""
Get details of a specific edge cloud zone.
:param zone_id: The ID of the edge cloud zone
:param flavour_id: Optional flavour ID to filter the results
:return: Details of the edge cloud zone
"""
# Minimal mocked response based on required fields of 'ZoneRegisteredData' in GSMA OPG E/WBI API
# return {
# "zoneId":
# zone_id,
# "reservedComputeResources": [{
# "cpuArchType": "ISA_X86_64",
# "numCPU": "4",
# "memory": 8192,
# }],
# "computeResourceQuotaLimits": [{
# "cpuArchType": "ISA_X86_64",
# "numCPU": "8",
# "memory": 16384,
# }],
# "flavoursSupported": [{
# "flavourId":
# "medium-x86",
# "cpuArchType":
# "ISA_X86_64",
# "supportedOSTypes": [{
# "architecture": "x86_64",
# "distribution": "UBUNTU",
# "version": "OS_VERSION_UBUNTU_2204_LTS",
# "license": "OS_LICENSE_TYPE_FREE",
# }],
# "numCPU":
# 4,
# "memorySize":
# 8192,
# "storageSize":
# 100,
# }],
# #
# }
aeros_client = ContinuumClient(self.base_url)
ngsild_params = (
f'format=simplified&type=InfrastructureElement&q=domain=="{zone_id}"'
)
self.logger.debug(
"Querying infrastructure elements for zone %s with params: %s",
zone_id,
ngsild_params,
)
# Query the infrastructure elements for the specified zonese
aeros_domain_ies = aeros_client.query_entities(ngsild_params)
# Transform the infrastructure elements into the required format
# and return the details of the edge cloud zone
response = self.transform_infrastructure_elements(
domain_ies=aeros_domain_ies, domain=zone_id
)
self.logger.debug("Transformed response: %s", response)
# Return the transformed response
return response
def transform_infrastructure_elements(
self, domain_ies: List[Dict[str, Any]], domain: str
) -> Dict[str, Any]:
"""
Transform the infrastructure elements into a format suitable for the
edge cloud zone details.
:param domain_ies: List of infrastructure elements
:param domain: The ID of the edge cloud zone
:return: Transformed details of the edge cloud zone
"""
total_cpu = 0
total_ram = 0
total_disk = 0
total_available_ram = 0
total_available_disk = 0
flavours_supported = []
for element in domain_ies:
total_cpu += element.get("cpuCores", 0)
total_ram += element.get("ramCapacity", 0)
total_available_ram += element.get("availableRam", 0)
total_disk += element.get("diskCapacity", 0)
total_available_disk += element.get("availableDisk", 0)
# Create a flavour per machine
flavour = {
"flavourId": f"{element.get('hostname')}-{element.get('containerTechnology')}",
"cpuArchType": f"{element.get('cpuArchitecture')}",
"supportedOSTypes": [
{
"architecture": f"{element.get('cpuArchitecture')}",
"distribution": f"{element.get('operatingSystem')}", # assume
"version": "OS_VERSION_UBUNTU_2204_LTS",
"license": "OS_LICENSE_TYPE_FREE",
}
],
"numCPU": element.get("cpuCores", 0),
"memorySize": element.get("ramCapacity", 0),
"storageSize": element.get("diskCapacity", 0),
}
flavours_supported.append(flavour)
result = {
"zoneId": domain,
"reservedComputeResources": [
{
"cpuArchType": "ISA_X86_64",
"numCPU": str(total_cpu),
"memory": total_ram,
}
],
"computeResourceQuotaLimits": [
{
"cpuArchType": "ISA_X86_64",
"numCPU": str(total_cpu * 2), # Assume quota is 2x total?
"memory": total_ram * 2,
}
],
"flavoursSupported": flavours_supported,
}
return result
##
# This file is part of the Open SDK
#
# Contributors:
# - Vasilis Pitsilis (vpitsilis@dat.demokritos.gr, vpitsilis@iit.demokritos.gr)
# - Andreas Sakellaropoulos (asakellaropoulos@iit.demokritos.gr)
##
"""
aerOS access configuration
Access tokens need to be provided in environment variables.
"""
# import os
# aerOS_API_URL = os.environ.get("aerOS_API_URL")
aerOS_API_URL = "harcoded_api"
if not aerOS_API_URL:
raise ValueError("Environment variable 'aerOS_API_URL' is not set.")
# aerOS_ACCESS_TOKEN = os.environ.get("aerOS_ACCESS_TOKEN")
aerOS_ACCESS_TOKEN = "harcoded_access_token"
if not aerOS_ACCESS_TOKEN:
raise ValueError("Environment variable 'aerOS_ACCESS_TOKEN' is not set.")
# aerOS_HLO_TOKEN = os.environ.get("aerOS_HLO_TOKEN")
aerOS_HLO_TOKEN = "harcoded_hlo_token"
if not aerOS_HLO_TOKEN:
raise ValueError("Environment variable 'aerOS_HLO_TOKEN' is not set.")
DEBUG = False
LOG_FILE = ".log/aeros_client.log"
##
# This file is part of the Open SDK
#
# Contributors:
# - Vasilis Pitsilis (vpitsilis@dat.demokritos.gr, vpitsilis@iit.demokritos.gr)
# - Andreas Sakellaropoulos (asakellaropoulos@iit.demokritos.gr)
##
"""
aerOS REST API Client
This client is used to interact with the aerOS REST API.
"""
import requests
from src.edgecloud.clients.aeros import config
from src.edgecloud.clients.aeros.utils import catch_requests_exceptions
from src.logger import setup_logger
class ContinuumClient:
"""
Client to aerOS ngsi-ld based continuum exposure
"""
def __init__(self, base_url: str = None):
"""
:param base_url: the base url of the aerOS API
"""
if base_url is None:
self.api_url = config.aerOS_API_URL
else:
self.api_url = base_url
self.logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE)
self.m2m_cb_token = config.aerOS_ACCESS_TOKEN
self.hlo_token = config.aerOS_HLO_TOKEN
self.headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"aerOS": "true",
"Authorization": f"Bearer {self.m2m_cb_token}",
}
self.hlo_headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"aerOS": "true",
"Authorization": f"Bearer {self.hlo_token}",
}
self.hlo_onboard_headers = {
"Content-Type": "application/yaml",
"Authorization": f"Bearer {self.hlo_token}",
}
@catch_requests_exceptions
def query_entity(self, entity_id, ngsild_params) -> dict:
"""
Query entity with ngsi-ld params
:input
@param entity_id: the id of the queried entity
@param ngsi-ld: the query params
:output
ngsi-ld object
"""
entity_url = f"{self.api_url}/entities/{entity_id}?{ngsild_params}"
response = requests.get(entity_url, headers=self.headers, timeout=15)
if response is None:
return None
else:
if config.DEBUG:
self.logger.debug("Query entity URL: %s", entity_url)
self.logger.debug(
"Query entity response: %s %s", response.status_code, response.text
)
return response.json()
@catch_requests_exceptions
def query_entities(self, ngsild_params):
"""
Query entities with ngsi-ld params
:input
@param ngsi-ld: the query params
:output
ngsi-ld object
"""
entities_url = f"{self.api_url}/entities?{ngsild_params}"
response = requests.get(entities_url, headers=self.headers, timeout=15)
if response is None:
return None
# else:
# if config.DEBUG:
# self.logger.debug("Query entities URL: %s", entities_url)
# self.logger.debug("Query entities response: %s %s",
# response.status_code, response.text)
return response.json()
@catch_requests_exceptions
def deploy_service(self, service_id: str) -> dict:
"""
Re-allocate (deploy) service on aerOS continuum
:input
@param service_id: the id of the service to be re-allocated
:output
the re-allocated service json object
"""
re_allocate_url = f"{self.api_url}/hlo_fe/services/{service_id}"
response = requests.put(re_allocate_url, headers=self.hlo_headers, timeout=15)
if response is None:
return None
else:
if config.DEBUG:
self.logger.debug("Re-allocate service URL: %s", re_allocate_url)
self.logger.debug(
"Re-allocate service response: %s %s",
response.status_code,
response.text,
)
return response.json()
@catch_requests_exceptions
def undeploy_service(self, service_id: str) -> dict:
"""
Undeploy service
:input
@param service_id: the id of the service to be undeployed
:output
the undeployed service json object
"""
undeploy_url = f"{self.api_url}/hlo_fe/services/{service_id}"
response = requests.delete(undeploy_url, headers=self.hlo_headers, timeout=15)
if response is None:
return None
else:
if config.DEBUG:
self.logger.debug("Re-allocate service URL: %s", undeploy_url)
self.logger.debug(
"Undeploy service response: %s %s",
response.status_code,
response.text,
)
return response.json()
@catch_requests_exceptions
def onboard_service(self, service_id: str, tosca_str: str) -> dict:
"""
Onboard (& deploy) service on aerOS continuum
:input
@param service_id: the id of the service to onboarded (& deployed)
@param tosca_str: the tosca whith all orchestration information
:output
the allocated service json object
"""
onboard_url = f"{self.api_url}/hlo_fe/services/{service_id}"
if config.DEBUG:
self.logger.debug("Onboard service URL: %s", onboard_url)
self.logger.debug(
"Onboard service request body (TOSCA-YAML): %s", tosca_str
)
response = requests.post(
onboard_url, data=tosca_str, headers=self.hlo_onboard_headers, timeout=15
)
if response is None:
return None
else:
if config.DEBUG:
self.logger.debug("Onboard service URL: %s", onboard_url)
self.logger.debug(
"Onboard service response: %s %s",
response.status_code,
response.text,
)
return response.json()
##
# This file is part of the Open SDK
#
# Contributors:
# - Vasilis Pitsilis (vpitsilis@dat.demokritos.gr, vpitsilis@iit.demokritos.gr)
# - Andreas Sakellaropoulos (asakellaropoulos@iit.demokritos.gr)
##
"""
Docstring
"""
from requests.exceptions import HTTPError, RequestException, Timeout
import src.edgecloud.clients.aeros.config as config
from src.logger import setup_logger
def catch_requests_exceptions(func):
"""
Docstring
"""
logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE)
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
return result
except HTTPError as e:
logger.info("4xx or 5xx: %s \n", {e})
return None # raise our custom exception or log, etc.
except ConnectionError as e:
logger.info(
"Raised for connection-related issues (e.g., DNS resolution failure, network issues): %s \n",
{e},
)
return None # raise our custom exception or log, etc.
except Timeout as e:
logger.info("Timeout occured: %s \n", {e})
return None # raise our custom exception or log, etc.
except RequestException as e:
logger.info("Request failed: %s \n", {e})
return None # raise our custom exception or log, etc.
return wrapper
# -*- coding: utf-8 -*-
class EdgeCloudPlatformError(Exception):
pass
# -*- coding: utf-8 -*-
##
# Copyright 2025-present by Software Networks Area, i2CAT.
# All rights reserved.
#
# This file is part of the Open SDK
#
# Contributors:
# - Adrián Pino Martínez (adrian.pino@i2cat.net)
# - Sergio Giménez (sergio.gimenez@i2cat.net)
##
from typing import Dict, List, Optional
from src import logger
from src.edgecloud.core.edgecloud_interface import EdgeCloudManagementInterface
from . import schemas
from .common import (
I2EdgeError,
i2edge_delete,
i2edge_get,
i2edge_post,
i2edge_post_multiform_data,
)
log = logger.get_logger(__name__)
class EdgeApplicationManager(EdgeCloudManagementInterface):
def __init__(self, base_url: str):
self.base_url = base_url
def get_edge_cloud_zones(
self, region: Optional[str] = None, status: Optional[str] = None
) -> List[dict]:
url = "{}/zones/list".format(self.base_url)
params = {}
try:
response = i2edge_get(url, params=params)
log.info("Availability zones retrieved successfully")
return response
except I2EdgeError as e:
raise e
def get_edge_cloud_zones_details(
self, zone_id: str, flavour_id: Optional[str] = None
) -> Dict:
url = "{}zone/{}".format(self.base_url, zone_id)
params = {}
try:
response = i2edge_get(url, params=params)
log.info("Availability zone details retrieved successfully")
return response
except I2EdgeError as e:
raise e
def _create_artefact(
self,
artefact_id: str,
artefact_name: str,
repo_name: str,
repo_type: str,
repo_url: str,
password: Optional[str] = None,
token: Optional[str] = None,
user_name: Optional[str] = None,
):
repo_type = schemas.RepoType(repo_type)
url = "{}/artefact".format(self.base_url)
payload = schemas.ArtefactOnboarding(
artefact_id=artefact_id,
name=artefact_name,
repo_password=password,
repo_name=repo_name,
repo_type=repo_type,
repo_url=repo_url,
repo_token=token,
repo_user_name=user_name,
)
try:
i2edge_post_multiform_data(url, payload)
log.info("Artifact added successfully")
except I2EdgeError as e:
raise e
def _get_artefact(self, artefact_id: str) -> Dict:
url = "{}/artefact/{}".format(self.base_url, artefact_id)
try:
response = i2edge_get(url, artefact_id)
log.info("Artifact retrieved successfully")
return response
except I2EdgeError as e:
raise e
def _get_all_artefacts(self) -> List[Dict]:
url = "{}/artefact".format(self.base_url)
try:
response = i2edge_get(url, {})
log.info("Artifacts retrieved successfully")
return response
except I2EdgeError as e:
raise e
def _delete_artefact(self, artefact_id: str):
url = "{}/artefact".format(self.base_url)
try:
i2edge_delete(url, artefact_id)
log.info("Artifact deleted successfully")
except I2EdgeError as e:
raise e
def onboard_app(self, app_manifest: Dict) -> Dict:
try:
app_id = app_manifest["appId"]
artefact_id = app_id
app_component_spec = schemas.AppComponentSpec(artefactId=artefact_id)
data = schemas.ApplicationOnboardingData(
app_id=app_id, appComponentSpecs=[app_component_spec]
)
payload = schemas.ApplicationOnboardingRequest(profile_data=data)
url = "{}/application/onboarding".format(self.base_url)
i2edge_post(url, payload)
except I2EdgeError as e:
raise e
except KeyError as e:
raise I2EdgeError("Missing required field in app_manifest: {}".format(e))
def delete_onboarded_app(self, app_id: str) -> None:
url = "{}/application/onboarding".format(self.base_url)
try:
i2edge_delete(url, app_id)
except I2EdgeError as e:
raise e
def get_onboarded_app(self, app_id: str) -> Dict:
url = "{}/application/onboarding/{}".format(self.base_url, app_id)
try:
response = i2edge_get(url, app_id)
return response
except I2EdgeError as e:
raise e
def get_all_onboarded_apps(self) -> List[Dict]:
url = "{}/applications/onboarding".format(self.base_url)
params = {}
try:
response = i2edge_get(url, params)
return response
except I2EdgeError as e:
raise e
def _select_best_flavour_for_app(self, zone_id) -> str:
"""
Selects the best flavour for the specified app requirements in a given zone.
"""
# list_of_flavours = self.get_edge_cloud_zones_details(zone_id)
# <logic that select the best flavour>
# TODO - Harcoded
flavourId = "67f3a0b0e3184a85952e174d"
return flavourId
def deploy_app(self, app_id: str, app_zones: List[Dict]) -> Dict:
appId = app_id
app = self.get_onboarded_app(appId)
profile_data = app["profile_data"]
appProviderId = profile_data["appProviderId"]
appVersion = profile_data["appMetaData"]["version"]
# TODO: Iterate in the list; deploy the app in all zones
zone_info = app_zones[0]["EdgeCloudZone"]
zone_id = zone_info["edgeCloudZoneId"]
flavourId = self._select_best_flavour_for_app(zone_id=zone_id)
app_deploy_data = schemas.AppDeployData(
appId=appId,
appProviderId=appProviderId,
appVersion=appVersion,
zoneInfo=schemas.ZoneInfo(flavourId=flavourId, zoneId=zone_id),
)
url = "{}/app/".format(self.base_url)
payload = schemas.AppDeploy(app_deploy_data=app_deploy_data)
try:
response = i2edge_post(url, payload)
log.info("App deployed successfully")
print(response)
return response
except I2EdgeError as e:
raise e
def get_all_deployed_apps(self) -> List[Dict]:
url = "{}/app/".format(self.base_url)
params = {}
try:
response = i2edge_get(url, params=params)
log.info("All app instances retrieved successfully")
return response
except I2EdgeError as e:
raise e
def get_deployed_app(self, app_id, zone_id) -> List[Dict]:
# Logic: Get all onboarded apps and filter the one where release_name == artifact name
# Step 1) Extract "app_name" from the onboarded app using the "app_id"
onboarded_app = self.get_onboarded_app(app_id)
if not onboarded_app:
raise ValueError(f"No onboarded app found with ID: {app_id}")
try:
app_name = onboarded_app["profile_data"]["appMetaData"]["appName"]
except KeyError as e:
raise ValueError(f"Onboarded app missing required field: {e}")
# Step 2) Retrieve all deployed apps and filter the one(s) where release_name == app_name
deployed_apps = self.get_all_deployed_apps()
if not deployed_apps:
return []
# Filter apps where release_name matches our app_name and zone matches
for app_instance_name in deployed_apps:
if (
app_instance_name.get("release_name") == app_name
and app_instance_name.get("zone_id") == zone_id
):
return app_instance_name
return None
url = "{}/app/{}/{}".format(self.base_url, zone_id, app_instance_name)
params = {}
try:
response = i2edge_get(url, params=params)
log.info("App instance retrieved successfully")
return response
except I2EdgeError as e:
raise e
def undeploy_app(self, app_instance_id: str) -> None:
url = "{}/app".format(self.base_url)
try:
i2edge_delete(url, app_instance_id)
log.info("App instance deleted successfully")
except I2EdgeError as e:
raise e
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
##
# Copyright 2025-present by Software Networks Area, i2CAT.
# All rights reserved.
#
# This file is part of the Open SDK
#
# Contributors:
# - Sergio Giménez (sergio.gimenez@i2cat.net)
##
import json
from typing import Optional
import requests
from pydantic import BaseModel
from src import logger
from src.edgecloud.clients.errors import EdgeCloudPlatformError
log = logger.get_logger(__name__)
class I2EdgeError(EdgeCloudPlatformError):
pass
class I2EdgeErrorResponse(BaseModel):
message: str
detail: dict
def get_error_message_from(response: requests.Response) -> str:
try:
error_response = I2EdgeErrorResponse(**response.json())
return error_response.message
except Exception as e:
log.error("Failed to parse error response from i2edge: {}".format(e))
return response.text
def i2edge_post(url: str, model_payload: BaseModel) -> dict:
headers = {
"Content-Type": "application/json",
"accept": "application/json",
}
json_payload = json.dumps(model_payload.model_dump(mode="json"))
try:
response = requests.post(url, data=json_payload, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
i2edge_err_msg = get_error_message_from(response)
err_msg = "Failed to deploy app: {}. Detail: {}".format(i2edge_err_msg, e)
log.error(err_msg)
raise I2EdgeError(err_msg)
def i2edge_post_multiform_data(url: str, model_payload: BaseModel) -> dict:
headers = {
"accept": "application/json",
}
payload_dict = model_payload.model_dump(mode="json")
payload_in_str = {k: str(v) for k, v in payload_dict.items()}
try:
response = requests.post(url, data=payload_in_str, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
i2edge_err_msg = get_error_message_from(response)
err_msg = "Failed to deploy app: {}. Detail: {}".format(i2edge_err_msg, e)
log.error(err_msg)
raise I2EdgeError(err_msg)
def i2edge_delete(url: str, id: str) -> dict:
headers = {"accept": "application/json"}
try:
query = "{}/{}".format(url, id)
response = requests.delete(query, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
i2edge_err_msg = get_error_message_from(response)
err_msg = "Failed to undeploy app: {}. Detail: {}".format(i2edge_err_msg, e)
log.error(err_msg)
raise I2EdgeError(err_msg)
def i2edge_get(url: str, params: Optional[dict]):
headers = {"accept": "application/json"}
try:
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
i2edge_err_msg = get_error_message_from(response)
err_msg = "Failed to get apps: {}. Detail: {}".format(i2edge_err_msg, e)
log.error(err_msg)
raise I2EdgeError(err_msg)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment