Commit 898ae96b authored by Pelayo Torres's avatar Pelayo Torres
Browse files

REL 19 API_Invoker_Management_API

parent 57d6bff7
Loading
Loading
Loading
Loading
Loading
+12 −4
Original line number Diff line number Diff line
@@ -8,13 +8,14 @@ git_push.sh
api_invoker_management/__init__.py
api_invoker_management/__main__.py
api_invoker_management/controllers/__init__.py
api_invoker_management/controllers/default_controller.py
api_invoker_management/controllers/individual_api_invoker_enrolment_details_controller.py
api_invoker_management/controllers/individual_on_boarded_api_invoker_document_controller.py
api_invoker_management/controllers/on_boarded_api_invokers_collection_controller.py
api_invoker_management/controllers/security_controller.py
api_invoker_management/encoder.py
api_invoker_management/models/__init__.py
api_invoker_management/models/aef_location.py
api_invoker_management/models/aef_profile.py
api_invoker_management/models/api_info.py
api_invoker_management/models/api_invoker_enrolment_details.py
api_invoker_management/models/api_invoker_enrolment_details_patch.py
api_invoker_management/models/api_list.py
@@ -25,6 +26,8 @@ api_invoker_management/models/communication_type.py
api_invoker_management/models/custom_operation.py
api_invoker_management/models/data_format.py
api_invoker_management/models/ellipsoid_arc.py
api_invoker_management/models/enrol_fail_cause.py
api_invoker_management/models/enrol_fail_reason.py
api_invoker_management/models/gad_shape.py
api_invoker_management/models/geographic_area.py
api_invoker_management/models/geographical_coordinates.py
@@ -37,7 +40,10 @@ api_invoker_management/models/ipv6_address_range.py
api_invoker_management/models/local2d_point_uncertainty_ellipse.py
api_invoker_management/models/local3d_point_uncertainty_ellipsoid.py
api_invoker_management/models/local_origin.py
api_invoker_management/models/net_slice_id.py
api_invoker_management/models/o_auth_grant_type.py
api_invoker_management/models/onboarding_criteria.py
api_invoker_management/models/onboarding_fail_reason.py
api_invoker_management/models/onboarding_information.py
api_invoker_management/models/onboarding_notification.py
api_invoker_management/models/operation.py
@@ -50,12 +56,14 @@ api_invoker_management/models/polygon.py
api_invoker_management/models/problem_details.py
api_invoker_management/models/protocol.py
api_invoker_management/models/published_api_path.py
api_invoker_management/models/related_criteria.py
api_invoker_management/models/relative_cartesian_location.py
api_invoker_management/models/resource.py
api_invoker_management/models/security_method.py
api_invoker_management/models/service_api_description.py
api_invoker_management/models/service_kpis.py
api_invoker_management/models/shareable_information.py
api_invoker_management/models/snssai.py
api_invoker_management/models/supported_gad_shapes.py
api_invoker_management/models/uncertainty_ellipse.py
api_invoker_management/models/uncertainty_ellipsoid.py
@@ -63,8 +71,8 @@ api_invoker_management/models/version.py
api_invoker_management/models/websock_notif_config.py
api_invoker_management/openapi/openapi.yaml
api_invoker_management/test/__init__.py
api_invoker_management/test/test_default_controller.py
api_invoker_management/test/test_individual_api_invoker_enrolment_details_controller.py
api_invoker_management/test/test_individual_on_boarded_api_invoker_document_controller.py
api_invoker_management/test/test_on_boarded_api_invokers_collection_controller.py
api_invoker_management/typing_utils.py
api_invoker_management/util.py
requirements.txt
+0 −61
Original line number Diff line number Diff line
from flask import current_app, request
from functools import wraps
from cryptography import x509
from cryptography.hazmat.backends import default_backend

from ..core.apiinvokerenrolmentdetails import InvokerManagementOperations
from ..core.validate_user import ControlAccess
from api_invoker_management.models.api_invoker_enrolment_details_patch import \
    APIInvokerEnrolmentDetailsPatch  # noqa: E501

invoker_operations = InvokerManagementOperations()
valid_user = ControlAccess()


def cert_validation():
    def _cert_validation(f):
        @wraps(f)
        def __cert_validation(*args, **kwargs):

            args = request.view_args
            cert_tmp = request.headers['X-Ssl-Client-Cert']
            cert_raw = cert_tmp.replace('\t', '')

            cert = x509.load_pem_x509_certificate(str.encode(cert_raw), default_backend())

            cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value.strip()

            if cn != "superadmin":
                cert_signature = cert.signature.hex()
                result = valid_user.validate_user_cert(args["onboardingId"], cert_signature)

                if result is not None:
                    return result

            result = f(**kwargs)
            return result
        return __cert_validation
    return _cert_validation


@cert_validation()
def modify_ind_api_invoke_enrolment(onboarding_id, body):  # noqa: E501
    """modify_ind_api_invoke_enrolment

    Modify an individual API invoker details. # noqa: E501

    :param onboarding_id: 
    :type onboarding_id: str
    :param api_invoker_enrolment_details_patch: 
    :type api_invoker_enrolment_details_patch: dict | bytes

    :rtype: Union[APIInvokerEnrolmentDetails, Tuple[APIInvokerEnrolmentDetails, int], Tuple[APIInvokerEnrolmentDetails, int, Dict[str, str]]
    """
    current_app.logger.info("Updating invoker")
    if request.is_json:
        body = APIInvokerEnrolmentDetailsPatch.from_dict(request.get_json())  # noqa: E501

    res = invoker_operations.patch_apiinvokerenrolmentdetail(onboarding_id, body)

    return res
+27 −22
Original line number Diff line number Diff line
import connexion
from typing import Dict
from typing import Tuple
from typing import Union

from functools import wraps

from cryptography import x509
from cryptography.hazmat.backends import default_backend
from flask import current_app, request
from flask_jwt_extended import get_jwt_identity, jwt_required

from ..core.apiinvokerenrolmentdetails import InvokerManagementOperations
from ..core.validate_user import ControlAccess
from ..models.api_invoker_enrolment_details import APIInvokerEnrolmentDetails  # noqa: E501
from api_invoker_management.models.api_invoker_enrolment_details_patch import \
    APIInvokerEnrolmentDetailsPatch  # noqa: E501

invoker_operations = InvokerManagementOperations()
valid_user = ControlAccess()
@@ -38,12 +44,12 @@ def cert_validation():
    return _cert_validation

@cert_validation()
def onboarded_invokers_onboarding_id_delete(onboarding_id):  # noqa: E501
    """onboarded_invokers_onboarding_id_delete
def delete_ind_onboarded_api_invoker(onboarding_id):  # noqa: E501
    """Delete an existing Individual On-boarded API Invoker resource.

    Deletes an individual API Invoker. # noqa: E501
    Deletes an existing Individual On-boarded API Invoker. # noqa: E501

    :param onboarding_id: String identifying an individual on-boarded API invoker resource
    :param onboarding_id: 
    :type onboarding_id: str

    :rtype: Union[None, Tuple[None, int], Tuple[None, int, Dict[str, str]]
@@ -54,44 +60,43 @@ def onboarded_invokers_onboarding_id_delete(onboarding_id): # noqa: E501
    return res

@cert_validation()
def onboarded_invokers_onboarding_id_put(onboarding_id, body):  # noqa: E501
    """onboarded_invokers_onboarding_id_put
def modify_ind_api_invoke_enrolment(onboarding_id, body):  # noqa: E501
    """modify_ind_api_invoke_enrolment

    Updates an individual API invoker details. # noqa: E501
     # noqa: E501

    :param onboarding_id: String identifying an individual on-boarded API invoker resource
    :param onboarding_id: 
    :type onboarding_id: str
    :param api_invoker_enrolment_details: representation of the API invoker details to be updated in CAPIF core function
    :type api_invoker_enrolment_details: dict | bytes
    :param api_invoker_enrolment_details_patch: 
    :type api_invoker_enrolment_details_patch: dict | bytes

    :rtype: Union[APIInvokerEnrolmentDetails, Tuple[APIInvokerEnrolmentDetails, int], Tuple[APIInvokerEnrolmentDetails, int, Dict[str, str]]
    """
    current_app.logger.info("Updating invoker")
    if request.is_json:
        body = APIInvokerEnrolmentDetails.from_dict(request.get_json())  # noqa: E501
        body = APIInvokerEnrolmentDetailsPatch.from_dict(request.get_json())  # noqa: E501

    res = invoker_operations.update_apiinvokerenrolmentdetail(onboarding_id,body)
    res = invoker_operations.patch_apiinvokerenrolmentdetail(onboarding_id, body)

    return res

@jwt_required()
def onboarded_invokers_post(body):  # noqa: E501
    """onboarded_invokers_post
@cert_validation()
def update_ind_onboarded_api_invoker(onboarding_id, body):  # noqa: E501
    """Update an existing Individual On-boarded API Invoker resource.

    Creates a new individual API Invoker profile. # noqa: E501
     # noqa: E501

    :param onboarding_id: 
    :type onboarding_id: str
    :param api_invoker_enrolment_details: 
    :type api_invoker_enrolment_details: dict | bytes

    :rtype: Union[APIInvokerEnrolmentDetails, Tuple[APIInvokerEnrolmentDetails, int], Tuple[APIInvokerEnrolmentDetails, int, Dict[str, str]]
    """
    identity = get_jwt_identity()
    username, uuid = identity.split()

    current_app.logger.info("Creating Invoker")
    current_app.logger.info("Updating invoker")
    if request.is_json:
        body = APIInvokerEnrolmentDetails.from_dict(request.get_json())  # noqa: E501

    res = invoker_operations.add_apiinvokerenrolmentdetail(body, username, uuid)
    res = invoker_operations.update_apiinvokerenrolmentdetail(onboarding_id,body)

    return res
+37 −0
Original line number Diff line number Diff line
import connexion
from typing import Dict
from typing import Tuple
from typing import Union

from api_invoker_management.models.api_invoker_enrolment_details import APIInvokerEnrolmentDetails  # noqa: E501
from api_invoker_management.models.problem_details import ProblemDetails  # noqa: E501
from api_invoker_management import util

from ..core.apiinvokerenrolmentdetails import InvokerManagementOperations

from flask import current_app, request
from flask_jwt_extended import get_jwt_identity, jwt_required

invoker_operations = InvokerManagementOperations()

@jwt_required()
def create_onboarded_api_invoker(body):  # noqa: E501
    """Request the Creation of a new On-boarded API Invoker.

     # noqa: E501

    :param api_invoker_enrolment_details: 
    :type api_invoker_enrolment_details: dict | bytes

    :rtype: Union[APIInvokerEnrolmentDetails, Tuple[APIInvokerEnrolmentDetails, int], Tuple[APIInvokerEnrolmentDetails, int, Dict[str, str]]
    """
    identity = get_jwt_identity()
    username, uuid = identity.split()

    current_app.logger.info("Creating Invoker")
    if request.is_json:
        body = APIInvokerEnrolmentDetails.from_dict(request.get_json())  # noqa: E501

    res = invoker_operations.add_apiinvokerenrolmentdetail(body, username, uuid)

    return res
+29 −0
Original line number Diff line number Diff line
from typing import List


def info_from_oAuth2ClientCredentials(token):
    """
    Validate and decode token.
    Returned value will be passed in 'token_info' parameter of your operation function, if there is one.
    'sub' or 'uid' will be set in 'user' parameter of your operation function, if there is one.
    'scope' or 'scopes' will be passed to scope validation function.

    :param token Token provided by Authorization header
    :type token: str
    :return: Decoded token information or None if token is invalid
    :rtype: dict | None
    """
    return {'scopes': ['read:pets', 'write:pets'], 'uid': 'user_id'}


def validate_scope_oAuth2ClientCredentials(required_scopes, token_scopes):
    """
    Validate required scopes are included in token scope

    :param required_scopes Required scope to access called API
    :type required_scopes: List[str]
    :param token_scopes Scope present in token
    :type token_scopes: List[str]
    :return: True if access to called API is allowed
    :rtype: bool
    """
    return set(required_scopes).issubset(set(token_scopes))
Loading