Loading src/common/sdk_catalog.py +5 −6 Original line number Diff line number Diff line Loading @@ -7,15 +7,14 @@ from src.network.clients.open5gs.client import NetworkManager as Open5GSClient # from src.edgecloud.clients.piedge.client import EdgeApplicationManager as PiEdgeClient def _edgecloud_catalog(client_name: str, base_url: str): def _edgecloud_catalog(client_name: str, base_url: str, **kwargs): edge_cloud_factory = { "aeros": lambda url: AerosClient(base_url=url), "aeros": lambda url, **kw: AerosClient(base_url=url, **kw), "i2edge": lambda url: I2EdgeClient(base_url=url), # TODO: uncomment when missing PiEdge's imports are added # "piedge": lambda url: PiEdgeClient(base_url=url), # "piedge": lambda url: PiEdgeClient(base_url=url), Uncomment when import issues are solved } try: return edge_cloud_factory[client_name](base_url) return edge_cloud_factory[client_name](base_url, **kwargs) except KeyError: raise ValueError( f"Invalid edgecloud client '{client_name}'. Available: {list(edge_cloud_factory)}" Loading Loading @@ -57,7 +56,7 @@ class SdkClientCatalog: raise ValueError( f"Unsupported domain '{domain}'. Supported: {list(cls._domain_factories)}" ) return catalog(client_name, base_url, **kwargs) if domain == "network": if "scs_as_id" not in kwargs: raise ValueError("Missing required 'scs_as_id' for network clients.") Loading src/edgecloud/clients/aeros/client.py +16 −1 Original line number Diff line number Diff line Loading @@ -19,10 +19,25 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): FIXME: Handle None responses from continuum client """ def __init__(self, base_url: str): def __init__(self, base_url: str, **kwargs): self.base_url = base_url self.logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE) # Overwrite config values if provided via kwargs if "aerOS_API_URL" in kwargs: config.aerOS_API_URL = kwargs["aerOS_API_URL"] if "aerOS_ACCESS_TOKEN" in kwargs: config.aerOS_ACCESS_TOKEN = kwargs["aerOS_ACCESS_TOKEN"] if "aerOS_HLO_TOKEN" in kwargs: config.aerOS_HLO_TOKEN = kwargs["aerOS_HLO_TOKEN"] if not config.aerOS_API_URL: raise ValueError("Missing 'aerOS_API_URL'") if not config.aerOS_ACCESS_TOKEN: raise ValueError("Missing 'aerOS_ACCESS_TOKEN'") if not config.aerOS_HLO_TOKEN: raise ValueError("Missing 'aerOS_HLO_TOKEN'") def onboard_app(self, app_manifest: Dict) -> Dict: # HLO-FE POST with TOSCA and app_id (service_id) service_id = app_manifest.get("serviceId") Loading src/edgecloud/clients/aeros/config.py +8 −5 Original line number Diff line number Diff line Loading @@ -9,16 +9,19 @@ aerOS access configuration Access tokens need to be provided in environment variables. """ import os # import os aerOS_API_URL = os.environ.get("aerOS_API_URL") # aerOS_API_URL = os.environ.get("aerOS_API_URL") aerOS_API_URL = "harcoded_api" if not aerOS_API_URL: raise ValueError("Environment variable 'aerOS_API_URL' is not set.") aerOS_ACCESS_TOKEN = os.environ.get("aerOS_ACCESS_TOKEN") # aerOS_ACCESS_TOKEN = os.environ.get("aerOS_ACCESS_TOKEN") aerOS_ACCESS_TOKEN = "harcoded_access_token" if not aerOS_ACCESS_TOKEN: raise ValueError("Environment variable 'aerOS_ACCESS_TOKEN' is not set.") aerOS_HLO_TOKEN = os.environ.get("aerOS_HLO_TOKEN") # aerOS_HLO_TOKEN = os.environ.get("aerOS_HLO_TOKEN") aerOS_HLO_TOKEN = "harcoded_hlo_token" if not aerOS_HLO_TOKEN: raise ValueError("Environment variable 'aerOS_HLO_TOKEN' is not set.") DEBUG = True DEBUG = False LOG_FILE = ".log/aeros_client.log" Loading
src/common/sdk_catalog.py +5 −6 Original line number Diff line number Diff line Loading @@ -7,15 +7,14 @@ from src.network.clients.open5gs.client import NetworkManager as Open5GSClient # from src.edgecloud.clients.piedge.client import EdgeApplicationManager as PiEdgeClient def _edgecloud_catalog(client_name: str, base_url: str): def _edgecloud_catalog(client_name: str, base_url: str, **kwargs): edge_cloud_factory = { "aeros": lambda url: AerosClient(base_url=url), "aeros": lambda url, **kw: AerosClient(base_url=url, **kw), "i2edge": lambda url: I2EdgeClient(base_url=url), # TODO: uncomment when missing PiEdge's imports are added # "piedge": lambda url: PiEdgeClient(base_url=url), # "piedge": lambda url: PiEdgeClient(base_url=url), Uncomment when import issues are solved } try: return edge_cloud_factory[client_name](base_url) return edge_cloud_factory[client_name](base_url, **kwargs) except KeyError: raise ValueError( f"Invalid edgecloud client '{client_name}'. Available: {list(edge_cloud_factory)}" Loading Loading @@ -57,7 +56,7 @@ class SdkClientCatalog: raise ValueError( f"Unsupported domain '{domain}'. Supported: {list(cls._domain_factories)}" ) return catalog(client_name, base_url, **kwargs) if domain == "network": if "scs_as_id" not in kwargs: raise ValueError("Missing required 'scs_as_id' for network clients.") Loading
src/edgecloud/clients/aeros/client.py +16 −1 Original line number Diff line number Diff line Loading @@ -19,10 +19,25 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): FIXME: Handle None responses from continuum client """ def __init__(self, base_url: str): def __init__(self, base_url: str, **kwargs): self.base_url = base_url self.logger = setup_logger(__name__, is_debug=True, file_name=config.LOG_FILE) # Overwrite config values if provided via kwargs if "aerOS_API_URL" in kwargs: config.aerOS_API_URL = kwargs["aerOS_API_URL"] if "aerOS_ACCESS_TOKEN" in kwargs: config.aerOS_ACCESS_TOKEN = kwargs["aerOS_ACCESS_TOKEN"] if "aerOS_HLO_TOKEN" in kwargs: config.aerOS_HLO_TOKEN = kwargs["aerOS_HLO_TOKEN"] if not config.aerOS_API_URL: raise ValueError("Missing 'aerOS_API_URL'") if not config.aerOS_ACCESS_TOKEN: raise ValueError("Missing 'aerOS_ACCESS_TOKEN'") if not config.aerOS_HLO_TOKEN: raise ValueError("Missing 'aerOS_HLO_TOKEN'") def onboard_app(self, app_manifest: Dict) -> Dict: # HLO-FE POST with TOSCA and app_id (service_id) service_id = app_manifest.get("serviceId") Loading
src/edgecloud/clients/aeros/config.py +8 −5 Original line number Diff line number Diff line Loading @@ -9,16 +9,19 @@ aerOS access configuration Access tokens need to be provided in environment variables. """ import os # import os aerOS_API_URL = os.environ.get("aerOS_API_URL") # aerOS_API_URL = os.environ.get("aerOS_API_URL") aerOS_API_URL = "harcoded_api" if not aerOS_API_URL: raise ValueError("Environment variable 'aerOS_API_URL' is not set.") aerOS_ACCESS_TOKEN = os.environ.get("aerOS_ACCESS_TOKEN") # aerOS_ACCESS_TOKEN = os.environ.get("aerOS_ACCESS_TOKEN") aerOS_ACCESS_TOKEN = "harcoded_access_token" if not aerOS_ACCESS_TOKEN: raise ValueError("Environment variable 'aerOS_ACCESS_TOKEN' is not set.") aerOS_HLO_TOKEN = os.environ.get("aerOS_HLO_TOKEN") # aerOS_HLO_TOKEN = os.environ.get("aerOS_HLO_TOKEN") aerOS_HLO_TOKEN = "harcoded_hlo_token" if not aerOS_HLO_TOKEN: raise ValueError("Environment variable 'aerOS_HLO_TOKEN' is not set.") DEBUG = True DEBUG = False LOG_FILE = ".log/aeros_client.log"