Loading edge_cloud_management_api/controllers/federation_manager_controller.py +16 −33 Original line number Diff line number Diff line from flask import request, jsonify, Response import json import logging from edge_cloud_management_api.services.federation_services import FederationManagerClientFactory # Factory pattern factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() def handle_response(status_code, title, detail): return Response( json.dumps({ "title": title, "detail": detail, "status": status_code }), status=status_code, content_type="application/problem+json" ) def create_federation(): Loading @@ -23,44 +14,39 @@ def create_federation(): try: body = request.get_json() result = federation_client.post_partner(body) if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return handle_response(400, "Bad Request", str(e)) logging.error(e.args) return jsonify(result), 500 def get_federation(federationContextId): """GET /{federationContextId}/partner - Get federation info.""" try: result = federation_client.get_partner(federationContextId) if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return handle_response(400, "Bad Request", str(e)) logging.error(e.args) return jsonify(result), 500 def delete_federation(federationContextId): """DELETE /{federationContextId}/partner - Delete federation.""" try: result = federation_client.delete_partner(federationContextId) if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return handle_response(400, "Bad Request", str(e)) logging.error(e.args) return jsonify(result), 500 def get_federation_context_ids(): """GET /fed-context-id - Fetch federationContextId(s).""" try: result = federation_client.get_federation_context_ids() if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return handle_response(400, "Bad Request", str(e)) logging.error(e.args) return jsonify(result), 500 def onboard_application_to_partner(federationContextId): Loading @@ -68,31 +54,28 @@ def onboard_application_to_partner(federationContextId): try: body = request.get_json() result = federation_client.onboard_application(federationContextId, body) if "error" in result: return handle_response(502, "Onboarding Error", result["error"]) return jsonify(result), 202 except Exception as e: return handle_response(400, "Bad Request", str(e)) logging.error(e.args) return jsonify(result), 500 def get_onboarded_app(federationContextId, appId): """GET /{federationContextId}/application/onboarding/app/{appId}""" try: result = federation_client.get_onboarded_app(federationContextId, appId) if "error" in result: return handle_response(result.get("status_code", 502), "Retrieval Error", result["error"]) return jsonify(result), 200 except Exception as e: return handle_response(500, "Internal Server Error", str(e)) logging.error(e.args) return jsonify(result), 500 def delete_onboarded_app(federationContextId, appId): """DELETE /{federationContextId}/application/onboarding/app/{appId}""" try: result = federation_client.delete_onboarded_app(federationContextId, appId) if "error" in result: return handle_response(result.get("status_code", 502), "Deletion Error", result["error"]) return jsonify({"message": f"App {appId} successfully deleted from partner"}), 200 return jsonify(result), 200 except Exception as e: return handle_response(500, "Internal Server Error", str(e)) logging.error(e.args) return jsonify(result), 500 Loading
edge_cloud_management_api/controllers/federation_manager_controller.py +16 −33 Original line number Diff line number Diff line from flask import request, jsonify, Response import json import logging from edge_cloud_management_api.services.federation_services import FederationManagerClientFactory # Factory pattern factory = FederationManagerClientFactory() federation_client = factory.create_federation_client() def handle_response(status_code, title, detail): return Response( json.dumps({ "title": title, "detail": detail, "status": status_code }), status=status_code, content_type="application/problem+json" ) def create_federation(): Loading @@ -23,44 +14,39 @@ def create_federation(): try: body = request.get_json() result = federation_client.post_partner(body) if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return handle_response(400, "Bad Request", str(e)) logging.error(e.args) return jsonify(result), 500 def get_federation(federationContextId): """GET /{federationContextId}/partner - Get federation info.""" try: result = federation_client.get_partner(federationContextId) if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return handle_response(400, "Bad Request", str(e)) logging.error(e.args) return jsonify(result), 500 def delete_federation(federationContextId): """DELETE /{federationContextId}/partner - Delete federation.""" try: result = federation_client.delete_partner(federationContextId) if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return handle_response(400, "Bad Request", str(e)) logging.error(e.args) return jsonify(result), 500 def get_federation_context_ids(): """GET /fed-context-id - Fetch federationContextId(s).""" try: result = federation_client.get_federation_context_ids() if "error" in result: return handle_response(502, "Federation Error", result["error"]) return jsonify(result), 200 except Exception as e: return handle_response(400, "Bad Request", str(e)) logging.error(e.args) return jsonify(result), 500 def onboard_application_to_partner(federationContextId): Loading @@ -68,31 +54,28 @@ def onboard_application_to_partner(federationContextId): try: body = request.get_json() result = federation_client.onboard_application(federationContextId, body) if "error" in result: return handle_response(502, "Onboarding Error", result["error"]) return jsonify(result), 202 except Exception as e: return handle_response(400, "Bad Request", str(e)) logging.error(e.args) return jsonify(result), 500 def get_onboarded_app(federationContextId, appId): """GET /{federationContextId}/application/onboarding/app/{appId}""" try: result = federation_client.get_onboarded_app(federationContextId, appId) if "error" in result: return handle_response(result.get("status_code", 502), "Retrieval Error", result["error"]) return jsonify(result), 200 except Exception as e: return handle_response(500, "Internal Server Error", str(e)) logging.error(e.args) return jsonify(result), 500 def delete_onboarded_app(federationContextId, appId): """DELETE /{federationContextId}/application/onboarding/app/{appId}""" try: result = federation_client.delete_onboarded_app(federationContextId, appId) if "error" in result: return handle_response(result.get("status_code", 502), "Deletion Error", result["error"]) return jsonify({"message": f"App {appId} successfully deleted from partner"}), 200 return jsonify(result), 200 except Exception as e: return handle_response(500, "Internal Server Error", str(e)) logging.error(e.args) return jsonify(result), 500