Skip to content
Snippets Groups Projects
Commit b055c5bb authored by Andres Anaya Amariels's avatar Andres Anaya Amariels :rocket:
Browse files

Merge remote-tracking branch 'origin/staging' into OCF44-helm-refactor

parents baa05607 fec556a6
No related branches found
No related tags found
2 merge requests!43Staging to Main for Release 1,!34Ocf44 helm refactor
Pipeline #6668 failed
Showing
with 415 additions and 158 deletions
......@@ -12,6 +12,7 @@ __pycache__/
*.crt
*.zip
*.srl
*.log
services/nginx/certs/sign_req_body.json
services/easy_rsa/certs/pki
services/easy_rsa/certs/*EasyRSA*
......@@ -35,4 +36,8 @@ docs/testing_with_postman/package-lock.json
results
helm/capif/*.lock
helm/capif/charts/tempo*
\ No newline at end of file
<<<<<<< HEAD
helm/capif/charts
=======
helm/capif/charts/tempo*
>>>>>>> staging
......@@ -15,7 +15,6 @@ from ..core.publisher import Publisher
from functools import wraps
invoker_operations = InvokerManagementOperations()
publisher_ops = Publisher()
valid_user = ControlAccess()
......@@ -59,11 +58,6 @@ def onboarded_invokers_onboarding_id_delete(onboarding_id): # noqa: E501
current_app.logger.info("Removing invoker")
res = invoker_operations.remove_apiinvokerenrolmentdetail(onboarding_id)
if res.status_code == 204:
current_app.logger.info("Invoker Removed")
publisher_ops.publish_message("events", "API_INVOKER_OFFBOARDED")
publisher_ops.publish_message("internal-messages", f"invoker-removed:{onboarding_id}")
return res
@cert_validation()
......@@ -84,10 +78,6 @@ def onboarded_invokers_onboarding_id_put(onboarding_id, body): # noqa: E501
body = APIInvokerEnrolmentDetails.from_dict(connexion.request.get_json()) # noqa: E501
res = invoker_operations.update_apiinvokerenrolmentdetail(onboarding_id,body)
if res.status_code == 200:
current_app.logger.info("Invoker Updated")
publisher_ops.publish_message("events", "API_INVOKER_UPDATED")
return res
......@@ -111,8 +101,5 @@ def onboarded_invokers_post(body): # noqa: E501
current_app.logger.info("Creating Invoker")
res = invoker_operations.add_apiinvokerenrolmentdetail(body, username, uuid)
if res.status_code == 201:
current_app.logger.info("Invoker Created")
publisher_ops.publish_message("events", "API_INVOKER_ONBOARDED")
return res
......@@ -11,9 +11,10 @@ from .auth_manager import AuthManager
from .resources import Resource
from ..config import Config
from api_invoker_management.models.api_invoker_enrolment_details import APIInvokerEnrolmentDetails
from .redis_event import RedisEvent
from .publisher import Publisher
publisher_ops = Publisher()
class InvokerManagementOperations(Resource):
def __check_api_invoker_id(self, api_invoker_id):
......@@ -93,6 +94,10 @@ class InvokerManagementOperations(Resource):
res = make_response(object=apiinvokerenrolmentdetail, status=201)
res.headers['Location'] = "/api-invoker-management/v1/onboardedInvokers/" + str(api_invoker_id)
if res.status_code == 201:
current_app.logger.info("Invoker Created")
RedisEvent("API_INVOKER_ONBOARDED", "apiInvokerIds", [str(api_invoker_id)]).send_event()
return res
# except Exception as e:
......@@ -130,6 +135,9 @@ class InvokerManagementOperations(Resource):
current_app.logger.debug("Invoker Resource inserted in database")
res = make_response(object=APIInvokerEnrolmentDetails().from_dict(dict_to_camel_case(result)), status=200)
if res.status_code == 200:
current_app.logger.info("Invoker Updated")
RedisEvent("API_INVOKER_UPDATED", "apiInvokerIds", [onboard_id]).send_event()
return res
except Exception as e:
......@@ -153,7 +161,12 @@ class InvokerManagementOperations(Resource):
current_app.logger.debug("Invoker resource removed from database")
current_app.logger.debug("Netapp offboarded sucessfuly")
out = "The Netapp matching onboardingId " + onboard_id + " was offboarded."
return make_response(out, status=204)
res = make_response(out, status=204)
if res.status_code == 204:
current_app.logger.info("Invoker Removed")
RedisEvent("API_INVOKER_OFFBOARDED", "apiInvokerIds", [onboard_id]).send_event()
publisher_ops.publish_message("internal-messages", f"invoker-removed:{onboard_id}")
return res
except Exception as e:
exception = "An exception occurred in remove invoker"
......
from ..encoder import JSONEncoder
from .publisher import Publisher
import json
publisher_ops = Publisher()
class RedisEvent():
def __init__(self, event, event_detail_key=None, information=None) -> None:
self.EVENTS_ENUM = [
'SERVICE_API_AVAILABLE',
'SERVICE_API_UNAVAILABLE',
'SERVICE_API_UPDATE',
'API_INVOKER_ONBOARDED',
'API_INVOKER_OFFBOARDED',
'SERVICE_API_INVOCATION_SUCCESS',
'SERVICE_API_INVOCATION_FAILURE',
'ACCESS_CONTROL_POLICY_UPDATE',
'ACCESS_CONTROL_POLICY_UNAVAILABLE',
'API_INVOKER_AUTHORIZATION_REVOKED',
'API_INVOKER_UPDATED',
'API_TOPOLOGY_HIDING_CREATED',
'API_TOPOLOGY_HIDING_REVOKED']
if event not in self.EVENTS_ENUM:
raise Exception(
"Event (" + event + ") is not on event enum (" + ','.join(self.EVENTS_ENUM) + ")")
self.redis_event = {
"event": event
}
if event_detail_key != None and information != None:
self.redis_event['key'] = event_detail_key
self.redis_event['information'] = information
def to_string(self):
return json.dumps(self.redis_event, cls=JSONEncoder)
def send_event(self):
publisher_ops.publish_message("events", self.to_string())
def __call__(self):
return self.redis_event
......@@ -93,7 +93,7 @@ class ProviderManagementOperations(Resource):
self.auth_manager.remove_auth_provider([apf_id[0], aef_id[0], amf_id[0]])
self.publish_ops.publish_message("internal-messages", f"provider-removed:{aef_id[0]}:{apf_id[0]}")
self.publish_ops.publish_message("internal-messages", f"provider-removed:{aef_id[0]}:{apf_id[0]}:{amf_id[0]}")
return make_response(object=out, status=204)
except Exception as e:
......
......@@ -20,8 +20,8 @@ class accessControlPolicyApi(Resource):
projection = {"_id":0}
if api_invoker_id is not None:
query['apiInvokerPolicies.api_invoker_id'] = api_invoker_id
projection['apiInvokerPolicies.$'] = 1
query['api_invoker_policies.api_invoker_id'] = api_invoker_id
projection['api_invoker_policies.$'] = 1
if supported_features is not None:
current_app.logger.debug(f"SupportedFeatures present on query with value {supported_features}, but currently not used")
......@@ -37,8 +37,8 @@ class accessControlPolicyApi(Resource):
current_app.logger.debug(policies)
api_invoker_policies = policies[0]['apiInvokerPolicies']
current_app.logger.debug(f"apiinvokerPolicies: {api_invoker_policies}")
api_invoker_policies = policies[0]['api_invoker_policies']
current_app.logger.debug(f"api_invoker_policies: {api_invoker_policies}")
if not api_invoker_policies:
current_app.logger.info(f"ACLs list is present but empty, then no ACLs found for the requested service: {service_api_id}, aef_id: {aef_id}, invoker: {api_invoker_id} and supportedFeatures: {supported_features}")
#Not found error
......
......@@ -4,82 +4,105 @@ from .resources import Resource
from ..models.api_invoker_policy import ApiInvokerPolicy
from ..models.time_range_list import TimeRangeList
from datetime import datetime, timedelta
from ..core.publisher import Publisher
from .redis_event import RedisEvent
from ..util import dict_to_camel_case, clean_empty
publisher_ops = Publisher()
class InternalServiceOps(Resource):
def create_acl(self, invoker_id, service_id, aef_id):
current_app.logger.info(f"Creating ACL for invoker: {invoker_id}")
if "acls" not in self.db.db.list_collection_names():
self.db.db.create_collection("acls")
mycol = self.db.get_col_by_name(self.db.acls)
res = mycol.find_one({"service_id": service_id, "aef_id":aef_id}, {"_id":0})
res = mycol.find_one(
{"service_id": service_id, "aef_id": aef_id}, {"_id": 0})
if res:
current_app.logger.info(f"Adding invoker ACL for invoker {invoker_id}")
range_list = [TimeRangeList(datetime.utcnow(), datetime.utcnow()+timedelta(days=365))]
invoker_acl = ApiInvokerPolicy(invoker_id, current_app.config["invocations"]["total"], current_app.config["invocations"]["perSecond"], range_list)
r = mycol.find_one({"service_id": service_id, "aef_id":aef_id, "apiInvokerPolicies.api_invoker_id": invoker_id}, {"_id":0})
current_app.logger.info(
f"Adding invoker ACL for invoker {invoker_id}")
range_list = [TimeRangeList(
datetime.utcnow(), datetime.utcnow()+timedelta(days=365))]
invoker_acl = ApiInvokerPolicy(
invoker_id, current_app.config["invocations"]["total"], current_app.config["invocations"]["perSecond"], range_list)
r = mycol.find_one({"service_id": service_id, "aef_id": aef_id,
"api_invoker_policies.api_invoker_id": invoker_id}, {"_id": 0})
if r is None:
mycol.update_one({"service_id": service_id, "aef_id":aef_id }, {"$push":{"apiInvokerPolicies":invoker_acl.to_dict()}})
mycol.update_one({"service_id": service_id, "aef_id": aef_id}, {
"$push": {"api_invoker_policies": invoker_acl.to_dict()}})
else:
current_app.logger.info(f"Creating service ACLs for service: {service_id}")
range_list = [TimeRangeList(datetime.utcnow(), datetime.utcnow()+timedelta(days=365))]
invoker_acl = ApiInvokerPolicy(invoker_id, current_app.config["invocations"]["total"], current_app.config["invocations"]["perSecond"], range_list)
current_app.logger.info(
f"Creating service ACLs for service: {service_id}")
range_list = [TimeRangeList(
datetime.utcnow(), datetime.utcnow()+timedelta(days=365))]
invoker_acl = ApiInvokerPolicy(
invoker_id, current_app.config["invocations"]["total"], current_app.config["invocations"]["perSecond"], range_list)
service_acls = {
"service_id": service_id,
"aef_id": aef_id,
"apiInvokerPolicies": [invoker_acl.to_dict()]
"api_invoker_policies": [invoker_acl.to_dict()]
}
mycol.insert_one(service_acls)
publisher_ops.publish_message("events", "ACCESS_CONTROL_POLICY_UPDATE")
current_app.logger.info(f"Invoker ACL added for invoker: {invoker_id} for service: {service_id}")
result = mycol.insert_one(service_acls)
inserted_service_acls=mycol.find_one({"_id": result.inserted_id}, {"_id": 0})
current_app.logger.info(inserted_service_acls)
inserted_service_acls_camel=dict_to_camel_case(inserted_service_acls)
current_app.logger.info(inserted_service_acls_camel)
accCtrlPolListExt = {
"apiId": service_id,
"apiInvokerPolicies": inserted_service_acls_camel['apiInvokerPolicies']
}
RedisEvent("ACCESS_CONTROL_POLICY_UPDATE",
"accCtrlPolList", accCtrlPolListExt).send_event()
current_app.logger.info(
f"Invoker ACL added for invoker: {invoker_id} for service: {service_id}")
def remove_acl(self, invoker_id, service_id, aef_id):
current_app.logger.info(f"Removing ACL for invoker: {invoker_id}")
mycol = self.db.get_col_by_name(self.db.acls)
res = mycol.find_one({"service_id": service_id, "aef_id":aef_id}, {"_id":0})
res = mycol.find_one(
{"service_id": service_id, "aef_id": aef_id}, {"_id": 0})
if res:
mycol.update_many({"service_id": service_id, "aef_id":aef_id},
{"$pull":{ "apiInvokerPolicies": { "api_invoker_id": invoker_id }}}
)
mycol.update_many({"service_id": service_id, "aef_id": aef_id},
{"$pull": {"api_invoker_policies": {
"api_invoker_id": invoker_id}}}
)
else:
current_app.logger.info(f"Not found: {service_id} for api : {service_id}")
publisher_ops.publish_message("events", "ACCESS_CONTROL_POLICY_UNAVAILABLE")
current_app.logger.info(f"Invoker ACL removed for invoker: {invoker_id} for service: {service_id}")
current_app.logger.info(
f"Not found: {service_id} for api : {service_id}")
RedisEvent("ACCESS_CONTROL_POLICY_UNAVAILABLE").send_event()
current_app.logger.info(
f"Invoker ACL removed for invoker: {invoker_id} for service: {service_id}")
def remove_invoker_acl(self, invoker_id):
current_app.logger.info(f"Removing ACLs for invoker: {invoker_id}")
mycol = self.db.get_col_by_name(self.db.acls)
mycol.update_many({"apiInvokerPolicies.api_invoker_id": invoker_id},
{"$pull":{ "apiInvokerPolicies": { "api_invoker_id": invoker_id }}}
)
publisher_ops.publish_message("events", "ACCESS_CONTROL_POLICY_UNAVAILABLE")
mycol.update_many({"api_invoker_policies.api_invoker_id": invoker_id},
{"$pull": {"api_invoker_policies": {
"api_invoker_id": invoker_id}}}
)
RedisEvent("ACCESS_CONTROL_POLICY_UNAVAILABLE").send_event()
current_app.logger.info(f"ACLs for invoker: {invoker_id} removed")
def remove_provider_acls(self, id):
current_app.logger.info(f"Removing ACLs for provider/service: {id}")
mycol = self.db.get_col_by_name(self.db.acls)
mycol.delete_many({"$or":[{"service_id":id}, {"aef_id":id}]}
)
publisher_ops.publish_message("events", "ACCESS_CONTROL_POLICY_UNAVAILABLE")
current_app.logger.info(f"ACLs for provider/service: {id} removed")
\ No newline at end of file
mycol.delete_many({"$or": [{"service_id": id}, {"aef_id": id}]})
RedisEvent("ACCESS_CONTROL_POLICY_UNAVAILABLE").send_event()
current_app.logger.info(f"ACLs for provider/service: {id} removed")
from ..encoder import JSONEncoder
from .publisher import Publisher
import json
publisher_ops = Publisher()
class RedisEvent():
def __init__(self, event, event_detail_key=None, information=None) -> None:
self.EVENTS_ENUM = [
'SERVICE_API_AVAILABLE',
'SERVICE_API_UNAVAILABLE',
'SERVICE_API_UPDATE',
'API_INVOKER_ONBOARDED',
'API_INVOKER_OFFBOARDED',
'SERVICE_API_INVOCATION_SUCCESS',
'SERVICE_API_INVOCATION_FAILURE',
'ACCESS_CONTROL_POLICY_UPDATE',
'ACCESS_CONTROL_POLICY_UNAVAILABLE',
'API_INVOKER_AUTHORIZATION_REVOKED',
'API_INVOKER_UPDATED',
'API_TOPOLOGY_HIDING_CREATED',
'API_TOPOLOGY_HIDING_REVOKED']
if event not in self.EVENTS_ENUM:
raise Exception(
"Event (" + event + ") is not on event enum (" + ','.join(self.EVENTS_ENUM) + ")")
self.redis_event = {
"event": event
}
if event_detail_key != None and information != None:
self.redis_event['key'] = event_detail_key
self.redis_event['information'] = information
def to_string(self):
return json.dumps(self.redis_event, cls=JSONEncoder)
def send_event(self):
publisher_ops.publish_message("events", self.to_string())
def __call__(self):
return self.redis_event
......@@ -20,14 +20,18 @@ class Subscriber():
def listen(self):
for raw_message in self.p.listen():
current_app.logger.info(raw_message)
if raw_message["type"] == "message" and raw_message["channel"].decode('utf-8') == "events":
current_app.logger.info("Event received")
self.notification.send_notifications(raw_message["data"].decode('utf-8'))
redis_event=json.loads(raw_message["data"].decode('utf-8'))
current_app.logger.info(json.dumps(redis_event, indent=4))
self.notification.send_notifications(redis_event)
elif raw_message["type"] == "message" and raw_message["channel"].decode('utf-8') == "internal-messages":
message, *invoker_id = raw_message["data"].decode('utf-8').split(":")
if message == "invoker-removed" and len(invoker_id)>0:
self.event_ops.delete_all_events(invoker_id[0])
message, *subscriber_ids = raw_message["data"].decode('utf-8').split(":")
if message == "invoker-removed" and len(subscriber_ids)>0:
self.event_ops.delete_all_events(subscriber_ids)
if message == "provider-removed" and len(subscriber_ids)>0:
self.event_ops.delete_all_events(subscriber_ids)
......@@ -8,22 +8,23 @@ class InternalEventOperations(Resource):
Resource.__init__(self)
self.auth_manager = AuthManager()
def delete_all_events(self, subscriber_id):
def delete_all_events(self, subscriber_ids):
mycol = self.db.get_col_by_name(self.db.event_collection)
my_query = {'subscriber_id': subscriber_id}
mycol.delete_many(my_query)
for subscriber_id in subscriber_ids:
mycol = self.db.get_col_by_name(self.db.event_collection)
my_query = {'subscriber_id': subscriber_id}
mycol.delete_many(my_query)
current_app.logger.info(f"Removed events for this subscriber: {subscriber_id}")
current_app.logger.info(f"Removed events for this subscriber: {subscriber_id}")
#We dont need remove all auth events, becase when invoker is removed, remove auth entry
#self.auth_manager.remove_auth_all_event(subscriber_id)
def get_event_subscriptions(self, event):
current_app.logger.info("get subscription from db")
try:
mycol = self.db.get_col_by_name(self.db.event_collection)
query= {'events':event}
query={'events':{'$in':[event]}}
subscriptions = mycol.find(query)
if subscriptions is None:
......@@ -39,21 +40,3 @@ class InternalEventOperations(Resource):
except Exception as e:
current_app.logger.error("An exception occurred ::" + str(e))
return False
# def get_acls(self, service_id):
# try:
# mycol = self.db.get_col_by_name(self.db.acls_col)
# query= {'api_id': service_id}
# acls = mycol.find(query)
# if acls is None:
# current_app.logger.error("Not found event subscriptions")
# else:
# return acls
# except Exception as e:
# current_app.logger.error("An exception occurred ::" + str(e))
# return False
\ No newline at end of file
......@@ -8,30 +8,34 @@ from ..encoder import JSONEncoder
import sys
import json
from flask import current_app
import asyncio
import aiohttp
class Notifications():
def __init__(self):
self.events_ops = InternalEventOperations()
def send_notifications(self, event):
current_app.logger.info("Received event, sending notifications")
subscriptions = self.events_ops.get_event_subscriptions(event)
# message, *ids = event.split(":")
def send_notifications(self, redis_event):
try:
if redis_event.get('event', None) == None:
raise("Event value is not present on received event from REDIS")
current_app.logger.info("Received event " + redis_event.get('event') + ", sending notifications")
subscriptions = self.events_ops.get_event_subscriptions(redis_event.get('event'))
current_app.logger.info(subscriptions)
for sub in subscriptions:
url = sub["notification_destination"]
data = EventNotification(sub["subscription_id"], events=event)
# details = CAPIFEventDetail()
# if message == "ACCESS_CONTROL_POLICY_UPDATE":
# current_app.logger.info("event: ACCESS_CONTROL_POLICY_UPDATE")
# acls = self.events_ops.get_acls(ids[0])
# details.acc_ctrl_pol_list = AccessControlPolicyListExt(api_id=acls['service_id'], api_invoker_policies=acls['apiInvokerPolicies'])
current_app.logger.debug(url)
event_detail=None
if redis_event.get('key', None) != None and redis_event.get('information', None) != None:
event_detail={redis_event.get('key'):redis_event.get('information')}
current_app.logger.debug(event_detail)
data = EventNotification(sub["subscription_id"], events=redis_event.get('event'), event_detail=event_detail)
current_app.logger.debug(json.dumps(data,cls=JSONEncoder))
# data.event_detail=details
self.request_post(url, data)
#current_app.logger.info("notification sended")
asyncio.run(self.send(url, json.loads(json.dumps(data,cls=JSONEncoder))))
except Exception as e:
current_app.logger.error("An exception occurred ::" + str(e))
......@@ -40,4 +44,20 @@ class Notifications():
def request_post(self, url, data):
headers = {'content-type': 'application/json'}
return requests.post(url, json={'text': str(data.to_str())}, headers=headers)
async def send_request(self, url, data):
async with aiohttp.ClientSession() as session:
timeout = aiohttp.ClientTimeout(total=10) # Establecer timeout a 10 segundos
headers = {'content-type': 'application/json'}
async with session.post(url, json=data, timeout=timeout, headers=headers) as response:
return await response.text()
async def send(self, url, data):
try:
response = await self.send_request(url, data)
current_app.logger.debug(response)
except asyncio.TimeoutError:
current_app.logger.error("Timeout: Request timeout")
except Exception as e:
current_app.logger.error("An exception occurred sending notification::" + str(e))
return False
......@@ -21,3 +21,5 @@ rfc3987
redis
flask_executor
Flask-APScheduler
aiohttp==3.9.5
async-timeout==4.0.3
......@@ -13,6 +13,11 @@ from ..util import dict_to_camel_case, clean_empty
from .resources import Resource
from .responses import bad_request_error, internal_server_error, forbidden_error, not_found_error, unauthorized_error, make_response
from ..models.invocation_log import InvocationLog
# from .publisher import Publisher
from .redis_event import RedisEvent
import copy
# publisher_ops = Publisher()
class LoggingInvocationOperations(Resource):
......@@ -61,7 +66,7 @@ class LoggingInvocationOperations(Resource):
return None
def add_invocationlog(self, aef_id, invocationlog):
mycol = self.db.get_col_by_name(self.db.invocation_logs)
try:
......@@ -79,11 +84,29 @@ class LoggingInvocationOperations(Resource):
return result
current_app.logger.debug("Check service apis")
event=None
invocation_log_base=json.loads(json.dumps(invocationlog, cls=JSONEncoder))
for log in invocationlog.logs:
result = self.__check_service_apis(log.api_id, log.api_name)
current_app.logger.debug("Inside for loop.")
if result is not None:
return result
if log.result:
current_app.logger.debug(log)
if int(log.result) >= 200 and int(log.result) < 300:
event="SERVICE_API_INVOCATION_SUCCESS"
else:
event="SERVICE_API_INVOCATION_FAILURE"
current_app.logger.info(event)
invocation_log_base['logs']=[log]
invocationLogs=[invocation_log_base]
RedisEvent(event,"invocationLogs",invocationLogs).send_event()
current_app.logger.debug("After log check")
current_app.logger.debug("Check existing logs")
my_query = {'aef_id': aef_id, 'api_invoker_id': invocationlog.api_invoker_id}
......
import redis
import sys
from flask import current_app
class Publisher():
def __init__(self):
self.r = redis.Redis(host='redis', port=6379, db=0)
def publish_message(self, channel, message):
self.r.publish(channel, message)
from ..encoder import JSONEncoder
from .publisher import Publisher
import json
publisher_ops = Publisher()
class RedisEvent():
def __init__(self, event, event_detail_key=None, information=None) -> None:
self.EVENTS_ENUM = [
'SERVICE_API_AVAILABLE',
'SERVICE_API_UNAVAILABLE',
'SERVICE_API_UPDATE',
'API_INVOKER_ONBOARDED',
'API_INVOKER_OFFBOARDED',
'SERVICE_API_INVOCATION_SUCCESS',
'SERVICE_API_INVOCATION_FAILURE',
'ACCESS_CONTROL_POLICY_UPDATE',
'ACCESS_CONTROL_POLICY_UNAVAILABLE',
'API_INVOKER_AUTHORIZATION_REVOKED',
'API_INVOKER_UPDATED',
'API_TOPOLOGY_HIDING_CREATED',
'API_TOPOLOGY_HIDING_REVOKED']
if event not in self.EVENTS_ENUM:
raise Exception(
"Event (" + event + ") is not on event enum (" + ','.join(self.EVENTS_ENUM) + ")")
self.redis_event = {
"event": event
}
if event_detail_key != None and information != None:
self.redis_event['key'] = event_detail_key
self.redis_event['information'] = information
def to_string(self):
return json.dumps(self.redis_event, cls=JSONEncoder)
def send_event(self):
publisher_ops.publish_message("events", self.to_string())
def __call__(self):
return self.redis_event
......@@ -6,6 +6,7 @@ Flask == 2.0.3
pymongo == 4.0.1
elasticsearch == 8.4.3
flask_jwt_extended == 4.4.4
redis == 4.5.4
opentelemetry-instrumentation == 0.38b0
opentelemetry-instrumentation-flask == 0.38b0
opentelemetry-instrumentation-redis == 0.38b0
......
......@@ -4,7 +4,6 @@ from ..core import serviceapidescriptions
from ..core.serviceapidescriptions import PublishServiceOperations
from ..core.publisher import Publisher
import json
from flask import Response, request, current_app
from flask_jwt_extended import jwt_required, get_jwt_identity
from flask import current_app
......@@ -14,14 +13,12 @@ from cryptography import x509
from cryptography.hazmat.backends import default_backend
from ..core.validate_user import ControlAccess
from functools import wraps
import pymongo
service_operations = PublishServiceOperations()
publisher_ops = Publisher()
valid_user = ControlAccess()
def cert_validation():
def _cert_validation(f):
@wraps(f)
......@@ -31,13 +28,16 @@ def cert_validation():
cert_tmp = request.headers['X-Ssl-Client-Cert']
cert_raw = cert_tmp.replace('\t', '')
cert = x509.load_pem_x509_certificate(str.encode(cert_raw), default_backend())
cert = x509.load_pem_x509_certificate(
str.encode(cert_raw), default_backend())
cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value.strip()
cn = cert.subject.get_attributes_for_oid(
x509.OID_COMMON_NAME)[0].value.strip()
if cn != "superadmin":
cert_signature = cert.signature.hex()
result = valid_user.validate_user_cert(args["apfId"], args["serviceApiId"], cert_signature)
result = valid_user.validate_user_cert(
args["apfId"], args["serviceApiId"], cert_signature)
if result is not None:
return result
......@@ -47,6 +47,7 @@ def cert_validation():
return __cert_validation
return _cert_validation
def apf_id_service_apis_get(apf_id): # noqa: E501
"""apf_id_service_apis_get
......@@ -82,12 +83,9 @@ def apf_id_service_apis_post(apf_id, body): # noqa: E501
res = service_operations.add_serviceapidescription(apf_id, body)
if res.status_code == 201:
current_app.logger.info("Service published")
publisher_ops.publish_message("events", "SERVICE_API_AVAILABLE")
return res
@cert_validation()
def apf_id_service_apis_service_api_id_delete(service_api_id, apf_id): # noqa: E501
"""apf_id_service_apis_service_api_id_delete
......@@ -103,15 +101,12 @@ def apf_id_service_apis_service_api_id_delete(service_api_id, apf_id): # noqa:
"""
current_app.logger.info("Removing service published")
res = service_operations.delete_serviceapidescription(service_api_id, apf_id)
if res.status_code == 204:
current_app.logger.info("Removed service published")
publisher_ops.publish_message("events", "SERVICE_API_UNAVAILABLE")
publisher_ops.publish_message("internal-messages", f"service-removed:{service_api_id}")
res = service_operations.delete_serviceapidescription(
service_api_id, apf_id)
return res
@cert_validation()
def apf_id_service_apis_service_api_id_get(service_api_id, apf_id): # noqa: E501
"""apf_id_service_apis_service_api_id_get
......@@ -131,6 +126,7 @@ def apf_id_service_apis_service_api_id_get(service_api_id, apf_id): # noqa: E50
return res
@cert_validation()
def apf_id_service_apis_service_api_id_put(service_api_id, apf_id, body): # noqa: E501
"""apf_id_service_apis_service_api_id_put
......@@ -147,14 +143,13 @@ def apf_id_service_apis_service_api_id_put(service_api_id, apf_id, body): # noq
:rtype: ServiceAPIDescription
"""
current_app.logger.info("Updating service api id with id: " + service_api_id)
current_app.logger.info(
"Updating service api id with id: " + service_api_id)
if connexion.request.is_json:
body = ServiceAPIDescription.from_dict(connexion.request.get_json()) # noqa: E501
response = service_operations.update_serviceapidescription(service_api_id, apf_id, body)
if response.status_code == 200:
publisher_ops.publish_message("events", "SERVICE_API_UPDATE")
response = service_operations.update_serviceapidescription(
service_api_id, apf_id, body)
return response
......@@ -21,7 +21,7 @@ class Subscriber():
for raw_message in self.p.listen():
if raw_message["type"] == "message" and raw_message["channel"].decode('utf-8') == "internal-messages":
message, *ids = raw_message["data"].decode('utf-8').split(":")
if message == "provider-removed" and len(ids)==2:
if message == "provider-removed" and len(ids) > 0:
self.security_ops.delete_intern_service(ids[1])
......
from ..encoder import JSONEncoder
from .publisher import Publisher
import json
publisher_ops = Publisher()
class RedisEvent():
def __init__(self, event, event_detail_key=None, information=None) -> None:
self.EVENTS_ENUM = [
'SERVICE_API_AVAILABLE',
'SERVICE_API_UNAVAILABLE',
'SERVICE_API_UPDATE',
'API_INVOKER_ONBOARDED',
'API_INVOKER_OFFBOARDED',
'SERVICE_API_INVOCATION_SUCCESS',
'SERVICE_API_INVOCATION_FAILURE',
'ACCESS_CONTROL_POLICY_UPDATE',
'ACCESS_CONTROL_POLICY_UNAVAILABLE',
'API_INVOKER_AUTHORIZATION_REVOKED',
'API_INVOKER_UPDATED',
'API_TOPOLOGY_HIDING_CREATED',
'API_TOPOLOGY_HIDING_REVOKED']
if event not in self.EVENTS_ENUM:
raise Exception(
"Event (" + event + ") is not on event enum (" + ','.join(self.EVENTS_ENUM) + ")")
self.redis_event = {
"event": event
}
if event_detail_key != None and information != None:
self.redis_event['key'] = event_detail_key
self.redis_event['information'] = information
def to_string(self):
return json.dumps(self.redis_event, cls=JSONEncoder)
def send_event(self):
publisher_ops.publish_message("events", self.to_string())
def __call__(self):
return self.redis_event
......@@ -17,26 +17,33 @@ from ..util import dict_to_camel_case, clean_empty
from .responses import bad_request_error, internal_server_error, forbidden_error, not_found_error, unauthorized_error, make_response
from bson import json_util
from .auth_manager import AuthManager
from .redis_event import RedisEvent
from .publisher import Publisher
publisher_ops = Publisher()
service_api_not_found_message = "Service API not found"
class PublishServiceOperations(Resource):
def __check_apf(self, apf_id):
providers_col = self.db.get_col_by_name(self.db.capif_provider_col)
current_app.logger.debug("Checking apf id")
provider = providers_col.find_one({"api_prov_funcs.api_prov_func_id": apf_id})
provider = providers_col.find_one(
{"api_prov_funcs.api_prov_func_id": apf_id})
if provider is None:
current_app.logger.error("Publisher not exist")
return unauthorized_error(detail = "Publisher not existing", cause = "Publisher id not found")
return unauthorized_error(detail="Publisher not existing", cause="Publisher id not found")
list_apf_ids = [func["api_prov_func_id"] for func in provider["api_prov_funcs"] if func["api_prov_func_role"] == "APF"]
list_apf_ids = [func["api_prov_func_id"]
for func in provider["api_prov_funcs"] if func["api_prov_func_role"] == "APF"]
if apf_id not in list_apf_ids:
current_app.logger.debug("This id not belongs to APF")
return unauthorized_error(detail ="You are not a publisher", cause ="This API is only available for publishers")
return unauthorized_error(detail="You are not a publisher", cause="This API is only available for publishers")
return None
......@@ -57,7 +64,8 @@ class PublishServiceOperations(Resource):
if result != None:
return result
service = mycol.find({"apf_id": apf_id}, {"_id":0, "api_name":1, "api_id":1, "aef_profiles":1, "description":1, "supported_features":1, "shareable_info":1, "service_api_category":1, "api_supp_feats":1, "pub_api_path":1, "ccf_id":1})
service = mycol.find({"apf_id": apf_id}, {"_id": 0, "api_name": 1, "api_id": 1, "aef_profiles": 1, "description": 1,
"supported_features": 1, "shareable_info": 1, "service_api_category": 1, "api_supp_feats": 1, "pub_api_path": 1, "ccf_id": 1})
current_app.logger.debug(service)
if service is None:
current_app.logger.error("Not found services for this apf id")
......@@ -92,9 +100,11 @@ class PublishServiceOperations(Resource):
if result != None:
return result
service = mycol.find_one({"api_name": serviceapidescription.api_name})
service = mycol.find_one(
{"api_name": serviceapidescription.api_name})
if service is not None:
current_app.logger.error("Service already registered with same api name")
current_app.logger.error(
"Service already registered with same api name")
return forbidden_error(detail="Already registered service with same api name", cause="Found service with same api name")
api_id = secrets.token_hex(15)
......@@ -110,8 +120,13 @@ class PublishServiceOperations(Resource):
current_app.logger.debug("Service inserted in database")
res = make_response(object=serviceapidescription, status=201)
res.headers['Location'] = "http://localhost:8080/published-apis/v1/" + str(apf_id) + "/service-apis/" + str(api_id)
res.headers['Location'] = "http://localhost:8080/published-apis/v1/" + \
str(apf_id) + "/service-apis/" + str(api_id)
if res.status_code == 201:
current_app.logger.info("Service published")
RedisEvent("SERVICE_API_AVAILABLE", "apiIds",
[str(api_id)]).send_event()
return res
except Exception as e:
......@@ -119,26 +134,25 @@ class PublishServiceOperations(Resource):
current_app.logger.error(exception + "::" + str(e))
return internal_server_error(detail=exception, cause=str(e))
def get_one_serviceapi(self, service_api_id, apf_id):
mycol = self.db.get_col_by_name(self.db.service_api_descriptions)
try:
current_app.logger.debug("Geting service api with id: " + service_api_id)
current_app.logger.debug(
"Geting service api with id: " + service_api_id)
result = self.__check_apf(apf_id)
if result != None:
return result
my_query = {'apf_id': apf_id, 'api_id': service_api_id}
service_api = mycol.find_one(my_query, {"_id":0, "api_name":1, "api_id":1, "aef_profiles":1, "description":1, "supported_features":1, "shareable_info":1, "service_api_category":1, "api_supp_feats":1, "pub_api_path":1, "ccf_id":1})
service_api = mycol.find_one(my_query, {"_id": 0, "api_name": 1, "api_id": 1, "aef_profiles": 1, "description": 1,
"supported_features": 1, "shareable_info": 1, "service_api_category": 1, "api_supp_feats": 1, "pub_api_path": 1, "ccf_id": 1})
if service_api is None:
current_app.logger.error(service_api_not_found_message)
return not_found_error(detail=service_api_not_found_message, cause="No Service with specific credentials exists")
my_service_api = dict_to_camel_case(service_api)
my_service_api = clean_empty(my_service_api)
......@@ -157,7 +171,8 @@ class PublishServiceOperations(Resource):
try:
current_app.logger.debug("Removing api service with id: " + service_api_id)
current_app.logger.debug(
"Removing api service with id: " + service_api_id)
result = self.__check_apf(apf_id)
if result != None:
......@@ -175,22 +190,29 @@ class PublishServiceOperations(Resource):
self.auth_manager.remove_auth_service(service_api_id, apf_id)
current_app.logger.debug("Removed service from database")
out = "The service matching api_id " + service_api_id + " was deleted."
return make_response(out, status=204)
out = "The service matching api_id " + service_api_id + " was deleted."
res = make_response(out, status=204)
if res.status_code == 204:
current_app.logger.info("Removed service published")
RedisEvent("SERVICE_API_UNAVAILABLE", "apiIds",
[service_api_id]).send_event()
publisher_ops.publish_message(
"internal-messages", f"service-removed:{service_api_id}")
return res
except Exception as e:
exception = "An exception occurred in delete service"
current_app.logger.error(exception + "::" + str(e))
return internal_server_error(detail=exception, cause=str(e))
def update_serviceapidescription(self, service_api_id, apf_id, service_api_description):
mycol = self.db.get_col_by_name(self.db.service_api_descriptions)
try:
current_app.logger.debug("Updating service api with id: " + service_api_id)
current_app.logger.debug(
"Updating service api with id: " + service_api_id)
result = self.__check_apf(apf_id)
......@@ -207,18 +229,22 @@ class PublishServiceOperations(Resource):
service_api_description = service_api_description.to_dict()
service_api_description = clean_empty(service_api_description)
result = mycol.find_one_and_update(serviceapidescription, {"$set":service_api_description}, projection={"_id":0, "api_name":1, "api_id":1, "aef_profiles":1, "description":1, "supported_features":1, "shareable_info":1, "service_api_category":1, "api_supp_feats":1, "pub_api_path":1, "ccf_id":1},return_document=ReturnDocument.AFTER ,upsert=False)
result = mycol.find_one_and_update(serviceapidescription, {"$set": service_api_description}, projection={"_id": 0, "api_name": 1, "api_id": 1, "aef_profiles": 1, "description": 1,
"supported_features": 1, "shareable_info": 1, "service_api_category": 1, "api_supp_feats": 1, "pub_api_path": 1, "ccf_id": 1}, return_document=ReturnDocument.AFTER, upsert=False)
result = clean_empty(result)
current_app.logger.debug("Updated service api")
response = make_response(object=dict_to_camel_case(result), status=200)
service_api_description_updated = dict_to_camel_case(result)
response = make_response(
object=service_api_description_updated, status=200)
if response.status_code == 200:
RedisEvent("SERVICE_API_UPDATE", "serviceAPIDescriptions", [
service_api_description_updated]).send_event()
return response
except Exception as e:
exception = "An exception occurred in update service"
current_app.logger.error(exception + "::" + str(e))
return internal_server_error(detail=exception, cause=str(e))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment