Loading service-resource-manager-implementation/Dockerfile +3 −0 Original line number Diff line number Diff line Loading @@ -29,6 +29,9 @@ ENV PYTHONUNBUFFERED=1 #RUN pip3 install --no-cache --upgrade pip setuptools RUN python3 -m venv .venv RUN source .venv/bin/activate RUN pip3 install --upgrade pip RUN pip3 install wheel Loading service-resource-manager-implementation/requirements.txt +3 −0 Original line number Diff line number Diff line Loading @@ -34,3 +34,6 @@ psycopg2-binary #pandas==0.24.2 paramiko>=2.12.0 urllib3 colorlog==6.8.2 pydantic==2.10.6 pydantic-extra-types==2.10.3 No newline at end of file service-resource-manager-implementation/src/adapters/__init__.py 0 → 100644 +2 −0 Original line number Diff line number Diff line # -*- coding: utf-8 -*- from .common.sdk import Sdk service-resource-manager-implementation/src/adapters/common/__init__.py 0 → 100644 +0 −0 Empty file added. service-resource-manager-implementation/src/adapters/common/adapters_factory.py 0 → 100644 +95 −0 Original line number Diff line number Diff line # -*- coding: utf-8 -*- ## # Copyright 2025-present by Software Networks Area, i2CAT. # All rights reserved. # # This file is part of the Open SDK # # Contributors: # - Adrián Pino Martínez (adrian.pino@i2cat.net) ## from src.adapters.edgecloud.adapters.aeros.client import ( EdgeApplicationManager as AerosClient, ) from src.adapters.edgecloud.adapters.i2edge.client import ( EdgeApplicationManager as I2EdgeClient, ) from src.adapters.edgecloud.adapters.kubernetes.client import ( EdgeApplicationManager as kubernetesClient, ) # from src.adapters.network.adapters.oai.client import ( # NetworkManager as OaiCoreClient, # ) # from src.adapters.network.adapters.open5gcore.client import ( # NetworkManager as Open5GCoreClient, # ) # from src.adapters.network.adapters.open5gs.client import ( # NetworkManager as Open5GSClient, # ) def _edgecloud_adapters_factory(client_name: str, base_url: str, **kwargs): if client_name == "i2edge": if "flavour_id" not in kwargs: raise ValueError("Missing required 'flavour_id' for i2edge client.") edge_cloud_factory = { "aeros": lambda url, **kw: AerosClient(base_url=url, **kw), "i2edge": lambda url, **kw: I2EdgeClient(base_url=url, **kw), "kubernetes": lambda url, **kw: kubernetesClient(base_url=url, **kw), } try: return edge_cloud_factory[client_name](base_url, **kwargs) except KeyError: raise ValueError( f"Invalid edgecloud client '{client_name}'. Available: {list(edge_cloud_factory)}" ) # def _network_adapters_factory(client_name: str, base_url: str, **kwargs): # if "scs_as_id" not in kwargs: # raise ValueError("Missing required 'scs_as_id' for network adapters.") # scs_as_id = kwargs.pop("scs_as_id") # network_factory = { # "open5gs": lambda url, scs_id, **kw: Open5GSClient( # base_url=url, scs_as_id=scs_id, **kw # ), # "oai": lambda url, scs_id, **kw: OaiCoreClient( # base_url=url, scs_as_id=scs_id, **kw # ), # "open5gcore": lambda url, scs_id, **kw: Open5GCoreClient( # base_url=url, scs_as_id=scs_id, **kw # ), # } # try: # return network_factory[client_name](base_url, scs_as_id, **kwargs) # except KeyError: # raise ValueError( # f"Invalid network client '{client_name}'. Available: {list(network_factory)}" # ) # def _oran_adapters_factory(client_name: str, base_url: str): # # TODO class AdaptersFactory: _domain_factories = { "edgecloud": _edgecloud_adapters_factory, # "network": _network_adapters_factory, # "oran": _oran_adapters_factory, } @classmethod def instantiate_and_retrieve_adapters( cls, domain: str, client_name: str, base_url: str, **kwargs ): try: catalog = cls._domain_factories[domain] except KeyError: raise ValueError( f"Unsupported domain '{domain}'. Supported: {list(cls._domain_factories)}" ) return catalog(client_name, base_url, **kwargs) No newline at end of file Loading
service-resource-manager-implementation/Dockerfile +3 −0 Original line number Diff line number Diff line Loading @@ -29,6 +29,9 @@ ENV PYTHONUNBUFFERED=1 #RUN pip3 install --no-cache --upgrade pip setuptools RUN python3 -m venv .venv RUN source .venv/bin/activate RUN pip3 install --upgrade pip RUN pip3 install wheel Loading
service-resource-manager-implementation/requirements.txt +3 −0 Original line number Diff line number Diff line Loading @@ -34,3 +34,6 @@ psycopg2-binary #pandas==0.24.2 paramiko>=2.12.0 urllib3 colorlog==6.8.2 pydantic==2.10.6 pydantic-extra-types==2.10.3 No newline at end of file
service-resource-manager-implementation/src/adapters/__init__.py 0 → 100644 +2 −0 Original line number Diff line number Diff line # -*- coding: utf-8 -*- from .common.sdk import Sdk
service-resource-manager-implementation/src/adapters/common/__init__.py 0 → 100644 +0 −0 Empty file added.
service-resource-manager-implementation/src/adapters/common/adapters_factory.py 0 → 100644 +95 −0 Original line number Diff line number Diff line # -*- coding: utf-8 -*- ## # Copyright 2025-present by Software Networks Area, i2CAT. # All rights reserved. # # This file is part of the Open SDK # # Contributors: # - Adrián Pino Martínez (adrian.pino@i2cat.net) ## from src.adapters.edgecloud.adapters.aeros.client import ( EdgeApplicationManager as AerosClient, ) from src.adapters.edgecloud.adapters.i2edge.client import ( EdgeApplicationManager as I2EdgeClient, ) from src.adapters.edgecloud.adapters.kubernetes.client import ( EdgeApplicationManager as kubernetesClient, ) # from src.adapters.network.adapters.oai.client import ( # NetworkManager as OaiCoreClient, # ) # from src.adapters.network.adapters.open5gcore.client import ( # NetworkManager as Open5GCoreClient, # ) # from src.adapters.network.adapters.open5gs.client import ( # NetworkManager as Open5GSClient, # ) def _edgecloud_adapters_factory(client_name: str, base_url: str, **kwargs): if client_name == "i2edge": if "flavour_id" not in kwargs: raise ValueError("Missing required 'flavour_id' for i2edge client.") edge_cloud_factory = { "aeros": lambda url, **kw: AerosClient(base_url=url, **kw), "i2edge": lambda url, **kw: I2EdgeClient(base_url=url, **kw), "kubernetes": lambda url, **kw: kubernetesClient(base_url=url, **kw), } try: return edge_cloud_factory[client_name](base_url, **kwargs) except KeyError: raise ValueError( f"Invalid edgecloud client '{client_name}'. Available: {list(edge_cloud_factory)}" ) # def _network_adapters_factory(client_name: str, base_url: str, **kwargs): # if "scs_as_id" not in kwargs: # raise ValueError("Missing required 'scs_as_id' for network adapters.") # scs_as_id = kwargs.pop("scs_as_id") # network_factory = { # "open5gs": lambda url, scs_id, **kw: Open5GSClient( # base_url=url, scs_as_id=scs_id, **kw # ), # "oai": lambda url, scs_id, **kw: OaiCoreClient( # base_url=url, scs_as_id=scs_id, **kw # ), # "open5gcore": lambda url, scs_id, **kw: Open5GCoreClient( # base_url=url, scs_as_id=scs_id, **kw # ), # } # try: # return network_factory[client_name](base_url, scs_as_id, **kwargs) # except KeyError: # raise ValueError( # f"Invalid network client '{client_name}'. Available: {list(network_factory)}" # ) # def _oran_adapters_factory(client_name: str, base_url: str): # # TODO class AdaptersFactory: _domain_factories = { "edgecloud": _edgecloud_adapters_factory, # "network": _network_adapters_factory, # "oran": _oran_adapters_factory, } @classmethod def instantiate_and_retrieve_adapters( cls, domain: str, client_name: str, base_url: str, **kwargs ): try: catalog = cls._domain_factories[domain] except KeyError: raise ValueError( f"Unsupported domain '{domain}'. Supported: {list(cls._domain_factories)}" ) return catalog(client_name, base_url, **kwargs) No newline at end of file