Loading src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/client.py +251 −70 Original line number Diff line number Diff line Loading @@ -23,9 +23,11 @@ from sunrise6g_opensdk.edgecloud.adapters.kubernetes.lib.utils.connector_db impo from sunrise6g_opensdk.edgecloud.adapters.kubernetes.lib.utils.kubernetes_connector import ( KubernetesConnector, ) from sunrise6g_opensdk.edgecloud.core import schemas as camara_schemas from sunrise6g_opensdk.edgecloud.core.edgecloud_interface import ( EdgeCloudManagementInterface, ) from sunrise6g_opensdk.edgecloud.core.utils import build_custom_http_response class EdgeApplicationManager(EdgeCloudManagementInterface): Loading @@ -49,15 +51,19 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): if storage_uri is not None: self.connector_db = ConnectorDB(storage_uri) def onboard_app(self, app_manifest: AppManifest) -> Dict: def onboard_app(self, app_manifest: AppManifest) -> Response: print(f"Submitting application: {app_manifest}") logging.info("Extracting variables from payload...") app_id = app_manifest.get("appId") app_name = app_manifest.get("name") image = app_manifest.get("appRepo").get("imagePath") package_type = app_manifest.get("packageType") network_interfaces = app_manifest.get("componentSpec")[0].get("networkInterfaces") ports = [] req_resources = app_manifest.get("requiredResources") version = app_manifest.get("version") app_provider = app_manifest.get("appProvider") for ni in network_interfaces: ports.append(ni.get("port")) insert_doc = ServiceFunctionRegistrationRequest( Loading @@ -66,54 +72,114 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): service_function_name=app_name, service_function_type=package_type, application_ports=ports, required_resources=req_resources, app_provider=app_provider, version=version, ) result = self.connector_db.insert_document_service_function(insert_doc.to_dict()) if type(result) is str: return result return {"appId": str(result.inserted_id)} status_code = 409 submitted_app = {"message": "App already exists"} else: submitted_app = camara_schemas.SubmittedApp( appId=camara_schemas.AppId(result.inserted_id) ).model_dump(mode="json") status_code = 201 return build_custom_http_response( status_code=status_code, content=submitted_app, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) def get_all_onboarded_apps(self) -> List[Dict]: def get_all_onboarded_apps(self) -> Response: logging.info("Retrieving all registered apps from database...") status_code = None content = [] try: db_list = self.connector_db.get_documents_from_collection( collection_input="service_functions" ) app_list = [] for sf in db_list: app_list.append(self.__transform_to_camara(sf)) return app_list # return [{"appId": "1234-5678", "name": "TestApp"}] content.append(self.__transform_to_camara(sf)) status_code = 200 return build_custom_http_response( status_code=status_code, content=content, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) except Exception as e: logging.error(e.args) status_code = 500 return { "status": 500, "code": "INTERNAL", "message": "Internal server error: " + e.args, } def get_onboarded_app(self, app_id: str) -> Dict: def get_onboarded_app(self, app_id: str) -> Response: logging.info("Searching for registered app with ID: " + app_id + " in database...") status_code = None content = None app = self.connector_db.get_documents_from_collection( "service_functions", input_type="_id", input_value=app_id ) if len(app) > 0: return self.__transform_to_camara(app[0]) status_code = 200 content = {"appManifest": self.__transform_to_camara(app[0])} else: return [] status_code = 404 content = {"status": 404, "code": "NOT_FOUND", "message": "Resource does not exist"} return build_custom_http_response( status_code=status_code, content=content, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) def delete_onboarded_app(self, app_id: str) -> None: def delete_onboarded_app(self, app_id: str) -> Response: result, code = self.connector_db.delete_document_service_function(_id=app_id) print(f"Removing application metadata: {app_id}") return code def deploy_app(self, body: dict) -> Dict: logging.info( "Searching for registered app with ID: " + body.get("appId") + " in database..." if code == 200: status_code = 204 content = None else: status_code = 404 content = {"status": 404, "code": "NOT_FOUND", "message": "Resource does not exist"} return build_custom_http_response( status_code=status_code, content=content, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) def deploy_app(self, app_id: str, app_zones: List[Dict]) -> Response: logging.info("Searching for registered app with ID: " + app_id + " in database...") status_code = None app = self.connector_db.get_documents_from_collection( "service_functions", input_type="_id", input_value=body.get("appId") "service_functions", input_type="_id", input_value=app_id ) # success_response = [] result = None response = None if len(app) < 1: return "Application with ID: " + body.get("appId") + " not found", 404 return "Application with ID: " + app_id + " not found", 404 if app is not None: sf = DeployServiceFunction( service_function_name=app[0].get("name"), service_function_instance_name=body.get("name"), service_function_instance_name=app[0].get("name"), # service_function_instance_name=body.get("name"), # location=body.get('edgeCloudZoneId'), ) result = deploy_service_function( Loading @@ -123,26 +189,47 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): ) if type(result) is V1Deployment: status_code = 202 response = {} response["name"] = body.get("name") response["name"] = result.metadata.name response["appId"] = app[0].get("_id") response["appInstanceId"] = result.metadata.uid response["appProvider"] = app[0].get("appProvider") response["appProvider"] = app[0].get("app_provider") response["status"] = "unknown" response["componentEndpointInfo"] = {} response["kubernetesClusterRef"] = "" response["edgeCloudZoneId"] = body.get("edgeCloudZoneId") else: response = {"Error": result} return response # interfaces = [] # for port in deployment.get("ports"): # access_point = {"port": port} # interfaces.append({"interfaceId": "", "accessPoints": access_point}) # response["componentEndpointInfo"] = interfaces response["kubernetesClusterRef"] = "642f6105-7015-4af1-a4d1-e1ecb8437abc" response["edgeCloudZoneId"] = app_zones[0].get("EdgeCloudZone").get("edgeCloudZoneId") elif "Conflict" in result: status_code = 409 response = { "status": 409, "code": "CONFLICT", "message": "Application already instantiated in the given Edge Cloud Zone", } return build_custom_http_response( status_code=status_code, content=response, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) def get_all_deployed_apps( self, app_id: Optional[str] = None, app_instance_id: Optional[str] = None, region: Optional[str] = None, ) -> List[Dict]: ) -> Response: logging.info("Retrieving all deployed apps in the edge cloud platform") status_code = None content = None try: deployments = self.k8s_connector.get_deployed_service_functions(self.connector_db) response = [] for deployment in deployments: Loading @@ -151,17 +238,34 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): item["appId"] = deployment.get("appId") item["appProvider"] = deployment.get("appProvider") item["appInstanceId"] = deployment.get("appInstanceId") item["status"] = deployment.get("status") item["status"] = deployment.get("status", "unknown") interfaces = [] for port in deployment.get("ports"): access_point = {"port": port} interfaces.append({"interfaceId": "", "accessPoints": access_point}) item["componentEndpointInfo"] = interfaces item["kubernetesClusterRef"] = "" item["edgeCloudZoneId"] = {} # item["componentEndpointInfo"] = interfaces item["kubernetesClusterRef"] = "642f6105-7015-4af1-a4d1-e1ecb8437abc" item["edgeCloudZoneId"] = deployment.get( "edgeCloudZoneId", "6824d63a-4d5b-4624-a487-dbdf118b0fdb" ) response.append(item) return response # return [{"appInstanceId": "abcd-efgh", "status": "ready"}] content = {"appInstances": response} status_code = 200 return build_custom_http_response( status_code=status_code, content=content, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) except Exception as e: logging.error(e.args) return { "status": 500, "code": "INTERNAL", "message": "Internal server error: " + e.args, } def get_deployed_app( self, app_instance_id: str, app_id: Optional[str] = None, region: Optional[str] = None Loading @@ -175,26 +279,78 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param region: Optional filter by Edge Cloud region :return: Response with application instance details """ # TODO: Implement actual kubernetes-specific logic for retrieving a specific deployed app raise NotImplementedError("get_deployed_app is not yet implemented for kubernetes adapter") logging.info("Retrieving for deployed app with ID: " + app_instance_id) deployments = self.k8s_connector.get_deployed_service_functions(self.connector_db) deployed_app = None response = {} for deployment in deployments: if deployment.get("appInstanceId") == app_instance_id: deployed_app = deployment break if deployed_app is not None: status_code = 200 response["name"] = deployed_app.get("service_function_catalogue_name") response["appId"] = deployed_app.get("appId") response["appProvider"] = deployed_app.get("appProvider") response["appInstanceId"] = deployed_app.get("appInstanceId") response["status"] = deployed_app.get("status", "unknown") interfaces = [] for port in deployed_app.get("ports"): access_point = {"port": port} interfaces.append({"interfaceId": "", "accessPoints": access_point}) # response["componentEndpointInfo"] = interfaces response["kubernetesClusterRef"] = "642f6105-7015-4af1-a4d1-e1ecb8437abc" response["edgeCloudZoneId"] = deployed_app.get( "edgeCloudZoneId", "6824d63a-4d5b-4624-a487-dbdf118b0fdb" ) response = {"appInstance": response} else: status_code = 404 response = { "status": 404, "code": "NOT_FOUND", "message": "App instance does not exist", } return build_custom_http_response( status_code=status_code, content=response, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) def undeploy_app(self, app_instance_id: str) -> None: logging.info("Searching for deployed app with ID: " + app_instance_id + " in database...") print(f"Deleting app instance: {app_instance_id}") status_code = 404 try: sfs = self.k8s_connector.get_deployed_service_functions(self.connector_db) response = "App instance with ID [" + app_instance_id + "] not found" # response = "App instance with ID [" + app_instance_id + "] not found" for service_fun in sfs: if service_fun["appInstanceId"] == app_instance_id: print(service_fun["service_function_instance_name"]) self.k8s_connector.delete_service_function( self.connector_db, service_fun["service_function_instance_name"] ) response = "App instance with ID [" + app_instance_id + "] successfully removed" status_code = 204 break return response return build_custom_http_response( status_code=status_code, content=None, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) except Exception as e: logging.error(e.args) return {"status": 404, "code": "NOT_FOUND", "message": "Resource does not exist"} def get_edge_cloud_zones( self, region: Optional[str] = None, status: Optional[str] = None ) -> List[Dict]: ) -> Response: nodes_response = self.k8s_connector.get_PoPs() zone_list = [] Loading @@ -207,9 +363,19 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): zone["edgeCloudProvider"] = self.edge_cloud_provider zone["edgeCloudRegion"] = node.get("location") zone_list.append(zone) return zone_list logging.info(zone_list) return build_custom_http_response( status_code=200, content=zone_list, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) def get_edge_cloud_zones_details(self, zone_id: str, flavour_id: Optional[str] = None) -> Dict: def get_edge_cloud_zones_details( self, zone_id: str, flavour_id: Optional[str] = None ) -> Response: nodes = self.k8s_connector.get_node_details() node_details = None for item in nodes.get("items"): Loading Loading @@ -241,18 +407,30 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): node_details["reservedComputeResources"] = reservedComputeResources node_details["flavoursSupported"] = flavoursSupported node_details["zoneId"] = zone_id return node_details return build_custom_http_response( status_code=200, content=node_details, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) def __transform_to_camara(self, app_data): app = {} app["appId"] = app_data.get("_id") app["name"] = app_data.get("name") app["packageType"] = app_data.get("type") appRepo = {"imagePath": app_data.get("image")} appRepo = {"imagePath": app_data.get("image"), "type": "PUBLICREPO"} app["appRepo"] = appRepo networkInterfaces = [] for port in app_data.get("application_ports"): port_spec = {"protocol": "TCP", "port": port} port_spec = { "protocol": "TCP", "port": port, "visibilityType": "VISIBILITY_EXTERNAL", "interfaceId": "gUn3d3EKG05T0zWZynI5c1d9l9QE", } networkInterfaces.append(port_spec) app["componentSpec"] = [ { Loading @@ -260,6 +438,9 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "networkInterfaces": networkInterfaces, } ] app["appProvider"] = app_data.get("app_provider") app["requiredResources"] = app_data.get("required_resources") app["version"] = app_data.get("version") return app # --- GSMA-specific methods --- Loading src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/lib/core/piedge_encoder.py +1 −3 Original line number Diff line number Diff line Loading @@ -210,5 +210,3 @@ def deploy_service_function( document=deployed_service_function_db ) return response else: return "error instantiating application" src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/lib/models/service_function_registration_request.py +75 −0 Original line number Diff line number Diff line Loading @@ -21,6 +21,9 @@ class ServiceFunctionRegistrationRequest(Model): service_function_image: str = None, service_function_type: str = None, application_ports: List[int] = None, required_resources: dict = None, app_provider: str = None, version: str = None, ): # noqa: E501 """ServiceFunctionRegistrationRequest - a model defined in Swagger Loading @@ -47,6 +50,9 @@ class ServiceFunctionRegistrationRequest(Model): "service_function_image": str, "service_function_type": str, "application_ports": List[int], "required_resources": dict, "app_provider": str, "version": str, } self.attribute_map = { Loading @@ -55,12 +61,18 @@ class ServiceFunctionRegistrationRequest(Model): "service_function_image": "service_function_image", "service_function_type": "service_function_type", "application_ports": "application_ports", "required_resources": "required_resources", "app_provider": "app_provider", "version": "version", } self._service_function_id = service_function_id self._service_function_name = service_function_name self._service_function_image = service_function_image self._service_function_type = service_function_type self._application_ports = application_ports self._required_resources = required_resources self._app_provider = app_provider self._version = version @classmethod def from_dict(cls, dikt) -> "ServiceFunctionRegistrationRequest": Loading Loading @@ -177,3 +189,66 @@ class ServiceFunctionRegistrationRequest(Model): """ self._application_ports = application_ports @property def required_resources(self) -> dict: """Gets the application_ports of this ServiceFunctionRegistrationRequest. :return: The application_ports of this ServiceFunctionRegistrationRequest. :rtype: List[int] """ return self._required_resources @required_resources.setter def required_resources(self, required_resources: dict): """Sets the application_ports of this ServiceFunctionRegistrationRequest. :param application_ports: The application_ports of this ServiceFunctionRegistrationRequest. :type application_ports: List[int] """ self._required_resources = required_resources @property def app_provider(self) -> str: """Gets the application_ports of this ServiceFunctionRegistrationRequest. :return: The application_ports of this ServiceFunctionRegistrationRequest. :rtype: List[int] """ return self._app_provider @app_provider.setter def app_provider(self, app_provider: str): """Sets the application_ports of this ServiceFunctionRegistrationRequest. :param application_ports: The application_ports of this ServiceFunctionRegistrationRequest. :type application_ports: List[int] """ self._app_provider = app_provider @property def version(self) -> str: """Gets the application_ports of this ServiceFunctionRegistrationRequest. :return: The application_ports of this ServiceFunctionRegistrationRequest. :rtype: List[int] """ return self._version @version.setter def version(self, version: str): """Sets the application_ports of this ServiceFunctionRegistrationRequest. :param application_ports: The application_ports of this ServiceFunctionRegistrationRequest. :type application_ports: List[int] """ self._version = version src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/lib/utils/connector_db.py +3 −0 Original line number Diff line number Diff line Loading @@ -138,6 +138,9 @@ class ConnectorDB: insert_doc["name"] = document["service_function_name"] insert_doc["type"] = document["service_function_type"] insert_doc["image"] = document["service_function_image"] insert_doc["app_provider"] = document["app_provider"] insert_doc["required_resources"] = document["required_resources"] insert_doc["version"] = document.get("version") if document.get("application_ports") is not None: insert_doc["application_ports"] = document.get("application_ports") if document.get("autoscaling_policies") is not None: Loading src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/lib/utils/kubernetes_connector.py +4 −4 Original line number Diff line number Diff line Loading @@ -754,7 +754,7 @@ class KubernetesConnector: if actual_name == app_col["name"]: app_["service_function_catalogue_name"] = app_col["name"] app_["appId"] = app_col["_id"] app_["appProvider"] = app_col.get("appProvider") app_["appProvider"] = app_col.get("app_provider") break # find volumes! Loading @@ -776,13 +776,13 @@ class KubernetesConnector: # Set status and replicas if (status.available_replicas is not None) and (status.ready_replicas is not None): if status.available_replicas >= 1 and status.ready_replicas >= 1: app_["status"] = "running" app_["status"] = "ready" app_["replicas"] = status.ready_replicas else: app_["status"] = "not_running" app_["status"] = "failed" app_["replicas"] = 0 else: app_["status"] = "not_running" app_["status"] = "failed" app_["replicas"] = 0 # Find compute node Loading Loading
src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/client.py +251 −70 Original line number Diff line number Diff line Loading @@ -23,9 +23,11 @@ from sunrise6g_opensdk.edgecloud.adapters.kubernetes.lib.utils.connector_db impo from sunrise6g_opensdk.edgecloud.adapters.kubernetes.lib.utils.kubernetes_connector import ( KubernetesConnector, ) from sunrise6g_opensdk.edgecloud.core import schemas as camara_schemas from sunrise6g_opensdk.edgecloud.core.edgecloud_interface import ( EdgeCloudManagementInterface, ) from sunrise6g_opensdk.edgecloud.core.utils import build_custom_http_response class EdgeApplicationManager(EdgeCloudManagementInterface): Loading @@ -49,15 +51,19 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): if storage_uri is not None: self.connector_db = ConnectorDB(storage_uri) def onboard_app(self, app_manifest: AppManifest) -> Dict: def onboard_app(self, app_manifest: AppManifest) -> Response: print(f"Submitting application: {app_manifest}") logging.info("Extracting variables from payload...") app_id = app_manifest.get("appId") app_name = app_manifest.get("name") image = app_manifest.get("appRepo").get("imagePath") package_type = app_manifest.get("packageType") network_interfaces = app_manifest.get("componentSpec")[0].get("networkInterfaces") ports = [] req_resources = app_manifest.get("requiredResources") version = app_manifest.get("version") app_provider = app_manifest.get("appProvider") for ni in network_interfaces: ports.append(ni.get("port")) insert_doc = ServiceFunctionRegistrationRequest( Loading @@ -66,54 +72,114 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): service_function_name=app_name, service_function_type=package_type, application_ports=ports, required_resources=req_resources, app_provider=app_provider, version=version, ) result = self.connector_db.insert_document_service_function(insert_doc.to_dict()) if type(result) is str: return result return {"appId": str(result.inserted_id)} status_code = 409 submitted_app = {"message": "App already exists"} else: submitted_app = camara_schemas.SubmittedApp( appId=camara_schemas.AppId(result.inserted_id) ).model_dump(mode="json") status_code = 201 return build_custom_http_response( status_code=status_code, content=submitted_app, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) def get_all_onboarded_apps(self) -> List[Dict]: def get_all_onboarded_apps(self) -> Response: logging.info("Retrieving all registered apps from database...") status_code = None content = [] try: db_list = self.connector_db.get_documents_from_collection( collection_input="service_functions" ) app_list = [] for sf in db_list: app_list.append(self.__transform_to_camara(sf)) return app_list # return [{"appId": "1234-5678", "name": "TestApp"}] content.append(self.__transform_to_camara(sf)) status_code = 200 return build_custom_http_response( status_code=status_code, content=content, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) except Exception as e: logging.error(e.args) status_code = 500 return { "status": 500, "code": "INTERNAL", "message": "Internal server error: " + e.args, } def get_onboarded_app(self, app_id: str) -> Dict: def get_onboarded_app(self, app_id: str) -> Response: logging.info("Searching for registered app with ID: " + app_id + " in database...") status_code = None content = None app = self.connector_db.get_documents_from_collection( "service_functions", input_type="_id", input_value=app_id ) if len(app) > 0: return self.__transform_to_camara(app[0]) status_code = 200 content = {"appManifest": self.__transform_to_camara(app[0])} else: return [] status_code = 404 content = {"status": 404, "code": "NOT_FOUND", "message": "Resource does not exist"} return build_custom_http_response( status_code=status_code, content=content, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) def delete_onboarded_app(self, app_id: str) -> None: def delete_onboarded_app(self, app_id: str) -> Response: result, code = self.connector_db.delete_document_service_function(_id=app_id) print(f"Removing application metadata: {app_id}") return code def deploy_app(self, body: dict) -> Dict: logging.info( "Searching for registered app with ID: " + body.get("appId") + " in database..." if code == 200: status_code = 204 content = None else: status_code = 404 content = {"status": 404, "code": "NOT_FOUND", "message": "Resource does not exist"} return build_custom_http_response( status_code=status_code, content=content, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) def deploy_app(self, app_id: str, app_zones: List[Dict]) -> Response: logging.info("Searching for registered app with ID: " + app_id + " in database...") status_code = None app = self.connector_db.get_documents_from_collection( "service_functions", input_type="_id", input_value=body.get("appId") "service_functions", input_type="_id", input_value=app_id ) # success_response = [] result = None response = None if len(app) < 1: return "Application with ID: " + body.get("appId") + " not found", 404 return "Application with ID: " + app_id + " not found", 404 if app is not None: sf = DeployServiceFunction( service_function_name=app[0].get("name"), service_function_instance_name=body.get("name"), service_function_instance_name=app[0].get("name"), # service_function_instance_name=body.get("name"), # location=body.get('edgeCloudZoneId'), ) result = deploy_service_function( Loading @@ -123,26 +189,47 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): ) if type(result) is V1Deployment: status_code = 202 response = {} response["name"] = body.get("name") response["name"] = result.metadata.name response["appId"] = app[0].get("_id") response["appInstanceId"] = result.metadata.uid response["appProvider"] = app[0].get("appProvider") response["appProvider"] = app[0].get("app_provider") response["status"] = "unknown" response["componentEndpointInfo"] = {} response["kubernetesClusterRef"] = "" response["edgeCloudZoneId"] = body.get("edgeCloudZoneId") else: response = {"Error": result} return response # interfaces = [] # for port in deployment.get("ports"): # access_point = {"port": port} # interfaces.append({"interfaceId": "", "accessPoints": access_point}) # response["componentEndpointInfo"] = interfaces response["kubernetesClusterRef"] = "642f6105-7015-4af1-a4d1-e1ecb8437abc" response["edgeCloudZoneId"] = app_zones[0].get("EdgeCloudZone").get("edgeCloudZoneId") elif "Conflict" in result: status_code = 409 response = { "status": 409, "code": "CONFLICT", "message": "Application already instantiated in the given Edge Cloud Zone", } return build_custom_http_response( status_code=status_code, content=response, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) def get_all_deployed_apps( self, app_id: Optional[str] = None, app_instance_id: Optional[str] = None, region: Optional[str] = None, ) -> List[Dict]: ) -> Response: logging.info("Retrieving all deployed apps in the edge cloud platform") status_code = None content = None try: deployments = self.k8s_connector.get_deployed_service_functions(self.connector_db) response = [] for deployment in deployments: Loading @@ -151,17 +238,34 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): item["appId"] = deployment.get("appId") item["appProvider"] = deployment.get("appProvider") item["appInstanceId"] = deployment.get("appInstanceId") item["status"] = deployment.get("status") item["status"] = deployment.get("status", "unknown") interfaces = [] for port in deployment.get("ports"): access_point = {"port": port} interfaces.append({"interfaceId": "", "accessPoints": access_point}) item["componentEndpointInfo"] = interfaces item["kubernetesClusterRef"] = "" item["edgeCloudZoneId"] = {} # item["componentEndpointInfo"] = interfaces item["kubernetesClusterRef"] = "642f6105-7015-4af1-a4d1-e1ecb8437abc" item["edgeCloudZoneId"] = deployment.get( "edgeCloudZoneId", "6824d63a-4d5b-4624-a487-dbdf118b0fdb" ) response.append(item) return response # return [{"appInstanceId": "abcd-efgh", "status": "ready"}] content = {"appInstances": response} status_code = 200 return build_custom_http_response( status_code=status_code, content=content, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) except Exception as e: logging.error(e.args) return { "status": 500, "code": "INTERNAL", "message": "Internal server error: " + e.args, } def get_deployed_app( self, app_instance_id: str, app_id: Optional[str] = None, region: Optional[str] = None Loading @@ -175,26 +279,78 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): :param region: Optional filter by Edge Cloud region :return: Response with application instance details """ # TODO: Implement actual kubernetes-specific logic for retrieving a specific deployed app raise NotImplementedError("get_deployed_app is not yet implemented for kubernetes adapter") logging.info("Retrieving for deployed app with ID: " + app_instance_id) deployments = self.k8s_connector.get_deployed_service_functions(self.connector_db) deployed_app = None response = {} for deployment in deployments: if deployment.get("appInstanceId") == app_instance_id: deployed_app = deployment break if deployed_app is not None: status_code = 200 response["name"] = deployed_app.get("service_function_catalogue_name") response["appId"] = deployed_app.get("appId") response["appProvider"] = deployed_app.get("appProvider") response["appInstanceId"] = deployed_app.get("appInstanceId") response["status"] = deployed_app.get("status", "unknown") interfaces = [] for port in deployed_app.get("ports"): access_point = {"port": port} interfaces.append({"interfaceId": "", "accessPoints": access_point}) # response["componentEndpointInfo"] = interfaces response["kubernetesClusterRef"] = "642f6105-7015-4af1-a4d1-e1ecb8437abc" response["edgeCloudZoneId"] = deployed_app.get( "edgeCloudZoneId", "6824d63a-4d5b-4624-a487-dbdf118b0fdb" ) response = {"appInstance": response} else: status_code = 404 response = { "status": 404, "code": "NOT_FOUND", "message": "App instance does not exist", } return build_custom_http_response( status_code=status_code, content=response, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) def undeploy_app(self, app_instance_id: str) -> None: logging.info("Searching for deployed app with ID: " + app_instance_id + " in database...") print(f"Deleting app instance: {app_instance_id}") status_code = 404 try: sfs = self.k8s_connector.get_deployed_service_functions(self.connector_db) response = "App instance with ID [" + app_instance_id + "] not found" # response = "App instance with ID [" + app_instance_id + "] not found" for service_fun in sfs: if service_fun["appInstanceId"] == app_instance_id: print(service_fun["service_function_instance_name"]) self.k8s_connector.delete_service_function( self.connector_db, service_fun["service_function_instance_name"] ) response = "App instance with ID [" + app_instance_id + "] successfully removed" status_code = 204 break return response return build_custom_http_response( status_code=status_code, content=None, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) except Exception as e: logging.error(e.args) return {"status": 404, "code": "NOT_FOUND", "message": "Resource does not exist"} def get_edge_cloud_zones( self, region: Optional[str] = None, status: Optional[str] = None ) -> List[Dict]: ) -> Response: nodes_response = self.k8s_connector.get_PoPs() zone_list = [] Loading @@ -207,9 +363,19 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): zone["edgeCloudProvider"] = self.edge_cloud_provider zone["edgeCloudRegion"] = node.get("location") zone_list.append(zone) return zone_list logging.info(zone_list) return build_custom_http_response( status_code=200, content=zone_list, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) def get_edge_cloud_zones_details(self, zone_id: str, flavour_id: Optional[str] = None) -> Dict: def get_edge_cloud_zones_details( self, zone_id: str, flavour_id: Optional[str] = None ) -> Response: nodes = self.k8s_connector.get_node_details() node_details = None for item in nodes.get("items"): Loading Loading @@ -241,18 +407,30 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): node_details["reservedComputeResources"] = reservedComputeResources node_details["flavoursSupported"] = flavoursSupported node_details["zoneId"] = zone_id return node_details return build_custom_http_response( status_code=200, content=node_details, headers={"Content-Type": "application/json"}, encoding="utf-8", url=None, request=None, ) def __transform_to_camara(self, app_data): app = {} app["appId"] = app_data.get("_id") app["name"] = app_data.get("name") app["packageType"] = app_data.get("type") appRepo = {"imagePath": app_data.get("image")} appRepo = {"imagePath": app_data.get("image"), "type": "PUBLICREPO"} app["appRepo"] = appRepo networkInterfaces = [] for port in app_data.get("application_ports"): port_spec = {"protocol": "TCP", "port": port} port_spec = { "protocol": "TCP", "port": port, "visibilityType": "VISIBILITY_EXTERNAL", "interfaceId": "gUn3d3EKG05T0zWZynI5c1d9l9QE", } networkInterfaces.append(port_spec) app["componentSpec"] = [ { Loading @@ -260,6 +438,9 @@ class EdgeApplicationManager(EdgeCloudManagementInterface): "networkInterfaces": networkInterfaces, } ] app["appProvider"] = app_data.get("app_provider") app["requiredResources"] = app_data.get("required_resources") app["version"] = app_data.get("version") return app # --- GSMA-specific methods --- Loading
src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/lib/core/piedge_encoder.py +1 −3 Original line number Diff line number Diff line Loading @@ -210,5 +210,3 @@ def deploy_service_function( document=deployed_service_function_db ) return response else: return "error instantiating application"
src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/lib/models/service_function_registration_request.py +75 −0 Original line number Diff line number Diff line Loading @@ -21,6 +21,9 @@ class ServiceFunctionRegistrationRequest(Model): service_function_image: str = None, service_function_type: str = None, application_ports: List[int] = None, required_resources: dict = None, app_provider: str = None, version: str = None, ): # noqa: E501 """ServiceFunctionRegistrationRequest - a model defined in Swagger Loading @@ -47,6 +50,9 @@ class ServiceFunctionRegistrationRequest(Model): "service_function_image": str, "service_function_type": str, "application_ports": List[int], "required_resources": dict, "app_provider": str, "version": str, } self.attribute_map = { Loading @@ -55,12 +61,18 @@ class ServiceFunctionRegistrationRequest(Model): "service_function_image": "service_function_image", "service_function_type": "service_function_type", "application_ports": "application_ports", "required_resources": "required_resources", "app_provider": "app_provider", "version": "version", } self._service_function_id = service_function_id self._service_function_name = service_function_name self._service_function_image = service_function_image self._service_function_type = service_function_type self._application_ports = application_ports self._required_resources = required_resources self._app_provider = app_provider self._version = version @classmethod def from_dict(cls, dikt) -> "ServiceFunctionRegistrationRequest": Loading Loading @@ -177,3 +189,66 @@ class ServiceFunctionRegistrationRequest(Model): """ self._application_ports = application_ports @property def required_resources(self) -> dict: """Gets the application_ports of this ServiceFunctionRegistrationRequest. :return: The application_ports of this ServiceFunctionRegistrationRequest. :rtype: List[int] """ return self._required_resources @required_resources.setter def required_resources(self, required_resources: dict): """Sets the application_ports of this ServiceFunctionRegistrationRequest. :param application_ports: The application_ports of this ServiceFunctionRegistrationRequest. :type application_ports: List[int] """ self._required_resources = required_resources @property def app_provider(self) -> str: """Gets the application_ports of this ServiceFunctionRegistrationRequest. :return: The application_ports of this ServiceFunctionRegistrationRequest. :rtype: List[int] """ return self._app_provider @app_provider.setter def app_provider(self, app_provider: str): """Sets the application_ports of this ServiceFunctionRegistrationRequest. :param application_ports: The application_ports of this ServiceFunctionRegistrationRequest. :type application_ports: List[int] """ self._app_provider = app_provider @property def version(self) -> str: """Gets the application_ports of this ServiceFunctionRegistrationRequest. :return: The application_ports of this ServiceFunctionRegistrationRequest. :rtype: List[int] """ return self._version @version.setter def version(self, version: str): """Sets the application_ports of this ServiceFunctionRegistrationRequest. :param application_ports: The application_ports of this ServiceFunctionRegistrationRequest. :type application_ports: List[int] """ self._version = version
src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/lib/utils/connector_db.py +3 −0 Original line number Diff line number Diff line Loading @@ -138,6 +138,9 @@ class ConnectorDB: insert_doc["name"] = document["service_function_name"] insert_doc["type"] = document["service_function_type"] insert_doc["image"] = document["service_function_image"] insert_doc["app_provider"] = document["app_provider"] insert_doc["required_resources"] = document["required_resources"] insert_doc["version"] = document.get("version") if document.get("application_ports") is not None: insert_doc["application_ports"] = document.get("application_ports") if document.get("autoscaling_policies") is not None: Loading
src/sunrise6g_opensdk/edgecloud/adapters/kubernetes/lib/utils/kubernetes_connector.py +4 −4 Original line number Diff line number Diff line Loading @@ -754,7 +754,7 @@ class KubernetesConnector: if actual_name == app_col["name"]: app_["service_function_catalogue_name"] = app_col["name"] app_["appId"] = app_col["_id"] app_["appProvider"] = app_col.get("appProvider") app_["appProvider"] = app_col.get("app_provider") break # find volumes! Loading @@ -776,13 +776,13 @@ class KubernetesConnector: # Set status and replicas if (status.available_replicas is not None) and (status.ready_replicas is not None): if status.available_replicas >= 1 and status.ready_replicas >= 1: app_["status"] = "running" app_["status"] = "ready" app_["replicas"] = status.ready_replicas else: app_["status"] = "not_running" app_["status"] = "failed" app_["replicas"] = 0 else: app_["status"] = "not_running" app_["status"] = "failed" app_["replicas"] = 0 # Find compute node Loading