Loading services/TS29222_CAPIF_Discover_Service_API/config.yaml +1 −0 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ mongo: { 'col': 'serviceapidescriptions', 'invokers_col': 'invokerdetails', 'capif_users_col': "user", 'certs_col': "certs", 'host': 'mongo', 'port': "27017" } Loading services/TS29222_CAPIF_Discover_Service_API/service_apis/controllers/default_controller.py +32 −1 Original line number Diff line number Diff line import json from functools import wraps from cryptography import x509 from cryptography.hazmat.backends import default_backend from flask import current_app, request from service_apis.models.discovered_apis import DiscoveredAPIs # noqa: E501 from ..core.discoveredapis import DiscoverApisOperations, return_negotiated_supp_feat_dict from ..core.validate_user import ControlAccess discover_apis = DiscoverApisOperations() valid_user = ControlAccess() def cert_validation(): def _cert_validation(f): @wraps(f) def __cert_validation(*args, **kwargs): args = request.view_args cert_tmp = request.headers['X-Ssl-Client-Cert'] cert_raw = cert_tmp.replace('\t', '') cert = x509.load_pem_x509_certificate(str.encode(cert_raw), default_backend()) cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value.strip() if cn != "superadmin": cert_signature = cert.signature.hex() current_app.logger.debug(request.args) result = valid_user.validate_user_cert(request.args["api-invoker-id"], cert_signature) if result is not None: return result result = f(**kwargs) return result return __cert_validation return _cert_validation @cert_validation() def all_service_apis_get(api_invoker_id, api_name=None, api_version=None, comm_type=None, protocol=None, aef_id=None, data_format=None, api_cat=None, preferred_aef_loc=None, req_api_prov_name=None, supported_features=None, api_supported_features=None, ue_ip_addr=None, service_kpis=None, grant_types=None): # noqa: E501 """all_service_apis_get Loading services/TS29222_CAPIF_Discover_Service_API/service_apis/core/validate_user.py 0 → 100644 +42 −0 Original line number Diff line number Diff line import json from flask import Response, current_app from ..encoder import CustomJSONEncoder from ..models.problem_details import ProblemDetails from ..util import serialize_clean_camel_case from .resources import Resource from .responses import internal_server_error class ControlAccess(Resource): def validate_user_cert(self, api_invoker_id, cert_signature): cert_col = self.db.get_col_by_name(self.db.certs_col) try: my_query = {'invoker_id':api_invoker_id} cert_entry = cert_col.find_one(my_query) current_app.logger.debug("*****************") current_app.logger.debug(cert_entry) current_app.logger.debug("*****************") my_query = {'id': api_invoker_id} cert_entry = cert_col.find_one(my_query) current_app.logger.debug("*****************") current_app.logger.debug(cert_entry) current_app.logger.debug("*****************") if cert_entry is not None: if cert_entry["cert_signature"] != cert_signature: prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" current_app.logger.error(exception + "::" + str(e)) return internal_server_error(detail=exception, cause=str(e)) No newline at end of file services/TS29222_CAPIF_Discover_Service_API/service_apis/db/db.py +1 −0 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ class MongoDatabse(): self.invoker_col = self.config['mongo']['invokers_col'] self.service_api_descriptions = self.config['mongo']['col'] self.capif_users = self.config['mongo']['capif_users_col'] self.certs_col = self.config['mongo']['certs_col'] def get_col_by_name(self, name): Loading Loading
services/TS29222_CAPIF_Discover_Service_API/config.yaml +1 −0 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ mongo: { 'col': 'serviceapidescriptions', 'invokers_col': 'invokerdetails', 'capif_users_col': "user", 'certs_col': "certs", 'host': 'mongo', 'port': "27017" } Loading
services/TS29222_CAPIF_Discover_Service_API/service_apis/controllers/default_controller.py +32 −1 Original line number Diff line number Diff line import json from functools import wraps from cryptography import x509 from cryptography.hazmat.backends import default_backend from flask import current_app, request from service_apis.models.discovered_apis import DiscoveredAPIs # noqa: E501 from ..core.discoveredapis import DiscoverApisOperations, return_negotiated_supp_feat_dict from ..core.validate_user import ControlAccess discover_apis = DiscoverApisOperations() valid_user = ControlAccess() def cert_validation(): def _cert_validation(f): @wraps(f) def __cert_validation(*args, **kwargs): args = request.view_args cert_tmp = request.headers['X-Ssl-Client-Cert'] cert_raw = cert_tmp.replace('\t', '') cert = x509.load_pem_x509_certificate(str.encode(cert_raw), default_backend()) cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value.strip() if cn != "superadmin": cert_signature = cert.signature.hex() current_app.logger.debug(request.args) result = valid_user.validate_user_cert(request.args["api-invoker-id"], cert_signature) if result is not None: return result result = f(**kwargs) return result return __cert_validation return _cert_validation @cert_validation() def all_service_apis_get(api_invoker_id, api_name=None, api_version=None, comm_type=None, protocol=None, aef_id=None, data_format=None, api_cat=None, preferred_aef_loc=None, req_api_prov_name=None, supported_features=None, api_supported_features=None, ue_ip_addr=None, service_kpis=None, grant_types=None): # noqa: E501 """all_service_apis_get Loading
services/TS29222_CAPIF_Discover_Service_API/service_apis/core/validate_user.py 0 → 100644 +42 −0 Original line number Diff line number Diff line import json from flask import Response, current_app from ..encoder import CustomJSONEncoder from ..models.problem_details import ProblemDetails from ..util import serialize_clean_camel_case from .resources import Resource from .responses import internal_server_error class ControlAccess(Resource): def validate_user_cert(self, api_invoker_id, cert_signature): cert_col = self.db.get_col_by_name(self.db.certs_col) try: my_query = {'invoker_id':api_invoker_id} cert_entry = cert_col.find_one(my_query) current_app.logger.debug("*****************") current_app.logger.debug(cert_entry) current_app.logger.debug("*****************") my_query = {'id': api_invoker_id} cert_entry = cert_col.find_one(my_query) current_app.logger.debug("*****************") current_app.logger.debug(cert_entry) current_app.logger.debug("*****************") if cert_entry is not None: if cert_entry["cert_signature"] != cert_signature: prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" current_app.logger.error(exception + "::" + str(e)) return internal_server_error(detail=exception, cause=str(e)) No newline at end of file
services/TS29222_CAPIF_Discover_Service_API/service_apis/db/db.py +1 −0 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ class MongoDatabse(): self.invoker_col = self.config['mongo']['invokers_col'] self.service_api_descriptions = self.config['mongo']['col'] self.capif_users = self.config['mongo']['capif_users_col'] self.certs_col = self.config['mongo']['certs_col'] def get_col_by_name(self, name): Loading