Loading edge_cloud_management_api/controllers/edge_cloud_controller.py +1 −0 Original line number Diff line number Diff line Loading @@ -120,3 +120,4 @@ def edge_cloud_zone_details(zoneId: str) -> dict: def get_zone_details(zoneId:str): pass edge_cloud_management_api/controllers/federation_manager_controller.py +47 −57 Original line number Diff line number Diff line from flask import request, jsonify from flask import request, jsonify, Response import json from edge_cloud_management_api.services.federation_services import FederationManagerClientFactory # Factory pattern factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() def handle_response(status_code, title, detail): return Response( json.dumps({ "title": title, "detail": detail, "status": status_code }), status=status_code, content_type="application/problem+json" ) def create_federation(): """ POST /partner Forwards the federation creation request to Federation Manager. """ """POST /partner - Create federation with partner OP.""" try: factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() body = request.get_json() result = federation_client.post_partner(body) return jsonify(result), 200 if "error" not in result else 502 if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return jsonify({"error": str(e)}), 400 return handle_response(400, "Bad Request", str(e)) def get_federation(federationContextId): """ GET /{federationContextId}/partner Forwards the GET federation info request. """ """GET /{federationContextId}/partner - Get federation info.""" try: factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() result = federation_client.get_partner(federationContextId) return jsonify(result), 200 if "error" not in result else 502 if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return jsonify({"error": str(e)}), 400 return handle_response(400, "Bad Request", str(e)) def delete_federation(federationContextId): """ DELETE /{federationContextId}/partner Forwards the DELETE federation request. """ """DELETE /{federationContextId}/partner - Delete federation.""" try: factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() result = federation_client.delete_partner(federationContextId) return jsonify(result), 200 if "error" not in result else 502 if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return jsonify({"error": str(e)}), 400 return handle_response(400, "Bad Request", str(e)) def get_federation_context_ids(): """ GET /fed-context-id Forwards the request to fetch federation context IDs. """ """GET /fed-context-id - Fetch federationContextId(s).""" try: factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() result = federation_client.get_federation_context_ids() return jsonify(result), 200 if "error" not in result else 502 if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return jsonify({"error": str(e)}), 400 return handle_response(400, "Bad Request", str(e)) def onboard_application_to_partner(federationContextId): """ POST /{federationContextId}/application/onboarding Forwards the onboarding request to the Federation Manager. """ """POST /{federationContextId}/application/onboarding - Onboard app.""" try: factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() body = request.get_json() result = federation_client.onboard_application(federationContextId, body) return jsonify(result), 202 if "error" not in result else 502 if "error" in result: return handle_response(502, "Onboarding Error", result["error"]) return jsonify(result), 202 except Exception as e: return jsonify({"error": str(e)}), 400 return handle_response(400, "Bad Request", str(e)) def get_onboarded_app(federationContextId, appId): """ GET /{federationContextId}/application/onboarding/app/{appId} Retrieves onboarding info of a federated app from a partner OP. """ """GET /{federationContextId}/application/onboarding/app/{appId}""" try: factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() result = federation_client.get_onboarded_app(federationContextId, appId) if "error" in result: return jsonify(result), result.get("status_code", 502) return handle_response(result.get("status_code", 502), "Retrieval Error", result["error"]) return jsonify(result), 200 except Exception as e: return jsonify({"error": str(e)}), 500 return handle_response(500, "Internal Server Error", str(e)) def delete_onboarded_app(federationContextId, appId): """ DELETE /{federationContextId}/application/onboarding/app/{appId} Deboards the application and deletes it from the partner OP. """ """DELETE /{federationContextId}/application/onboarding/app/{appId}""" try: factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() result = federation_client.delete_onboarded_app(federationContextId, appId) if "error" in result: return jsonify(result), result.get("status_code", 502) return handle_response(result.get("status_code", 502), "Deletion Error", result["error"]) return jsonify({"message": f"App {appId} successfully deleted from partner"}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 return handle_response(500, "Internal Server Error", str(e)) Loading
edge_cloud_management_api/controllers/edge_cloud_controller.py +1 −0 Original line number Diff line number Diff line Loading @@ -120,3 +120,4 @@ def edge_cloud_zone_details(zoneId: str) -> dict: def get_zone_details(zoneId:str): pass
edge_cloud_management_api/controllers/federation_manager_controller.py +47 −57 Original line number Diff line number Diff line from flask import request, jsonify from flask import request, jsonify, Response import json from edge_cloud_management_api.services.federation_services import FederationManagerClientFactory # Factory pattern factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() def handle_response(status_code, title, detail): return Response( json.dumps({ "title": title, "detail": detail, "status": status_code }), status=status_code, content_type="application/problem+json" ) def create_federation(): """ POST /partner Forwards the federation creation request to Federation Manager. """ """POST /partner - Create federation with partner OP.""" try: factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() body = request.get_json() result = federation_client.post_partner(body) return jsonify(result), 200 if "error" not in result else 502 if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return jsonify({"error": str(e)}), 400 return handle_response(400, "Bad Request", str(e)) def get_federation(federationContextId): """ GET /{federationContextId}/partner Forwards the GET federation info request. """ """GET /{federationContextId}/partner - Get federation info.""" try: factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() result = federation_client.get_partner(federationContextId) return jsonify(result), 200 if "error" not in result else 502 if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return jsonify({"error": str(e)}), 400 return handle_response(400, "Bad Request", str(e)) def delete_federation(federationContextId): """ DELETE /{federationContextId}/partner Forwards the DELETE federation request. """ """DELETE /{federationContextId}/partner - Delete federation.""" try: factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() result = federation_client.delete_partner(federationContextId) return jsonify(result), 200 if "error" not in result else 502 if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return jsonify({"error": str(e)}), 400 return handle_response(400, "Bad Request", str(e)) def get_federation_context_ids(): """ GET /fed-context-id Forwards the request to fetch federation context IDs. """ """GET /fed-context-id - Fetch federationContextId(s).""" try: factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() result = federation_client.get_federation_context_ids() return jsonify(result), 200 if "error" not in result else 502 if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return jsonify({"error": str(e)}), 400 return handle_response(400, "Bad Request", str(e)) def onboard_application_to_partner(federationContextId): """ POST /{federationContextId}/application/onboarding Forwards the onboarding request to the Federation Manager. """ """POST /{federationContextId}/application/onboarding - Onboard app.""" try: factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() body = request.get_json() result = federation_client.onboard_application(federationContextId, body) return jsonify(result), 202 if "error" not in result else 502 if "error" in result: return handle_response(502, "Onboarding Error", result["error"]) return jsonify(result), 202 except Exception as e: return jsonify({"error": str(e)}), 400 return handle_response(400, "Bad Request", str(e)) def get_onboarded_app(federationContextId, appId): """ GET /{federationContextId}/application/onboarding/app/{appId} Retrieves onboarding info of a federated app from a partner OP. """ """GET /{federationContextId}/application/onboarding/app/{appId}""" try: factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() result = federation_client.get_onboarded_app(federationContextId, appId) if "error" in result: return jsonify(result), result.get("status_code", 502) return handle_response(result.get("status_code", 502), "Retrieval Error", result["error"]) return jsonify(result), 200 except Exception as e: return jsonify({"error": str(e)}), 500 return handle_response(500, "Internal Server Error", str(e)) def delete_onboarded_app(federationContextId, appId): """ DELETE /{federationContextId}/application/onboarding/app/{appId} Deboards the application and deletes it from the partner OP. """ """DELETE /{federationContextId}/application/onboarding/app/{appId}""" try: factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() result = federation_client.delete_onboarded_app(federationContextId, appId) if "error" in result: return jsonify(result), result.get("status_code", 502) return handle_response(result.get("status_code", 502), "Deletion Error", result["error"]) return jsonify({"message": f"App {appId} successfully deleted from partner"}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 return handle_response(500, "Internal Server Error", str(e))