Skip to content
Snippets Groups Projects
Commit 00a05a8f authored by Jorge Moratinos's avatar Jorge Moratinos
Browse files

New event redis with event details, also filter developed in Events service

parent 7a85ef8f
No related branches found
No related tags found
Loading
Pipeline #10491 failed
Showing
with 212 additions and 102 deletions
......@@ -107,8 +107,7 @@ class InvokerManagementOperations(Resource):
if res.status_code == 201:
current_app.logger.info("Invoker Created")
RedisEvent("API_INVOKER_ONBOARDED",
["apiInvokerIds"],
[[str(api_invoker_id)]]).send_event()
api_invoker_ids=[str(api_invoker_id)]).send_event()
return res
def update_apiinvokerenrolmentdetail(self, onboard_id, apiinvokerenrolmentdetail):
......@@ -154,8 +153,7 @@ class InvokerManagementOperations(Resource):
if res.status_code == 200:
current_app.logger.info("Invoker Updated")
RedisEvent("API_INVOKER_UPDATED",
["apiInvokerIds"],
[[onboard_id]]).send_event()
api_invoker_ids=[onboard_id]).send_event()
return res
except Exception as e:
......@@ -183,8 +181,7 @@ class InvokerManagementOperations(Resource):
if res.status_code == 204:
current_app.logger.info("Invoker Removed")
RedisEvent("API_INVOKER_OFFBOARDED",
["apiInvokerIds"],
[[onboard_id]]).send_event()
api_invoker_ids=[onboard_id]).send_event()
RedisInternalEvent("INVOKER-REMOVED",
"invokerId",
{
......
......@@ -6,7 +6,14 @@ publisher_ops = Publisher()
class RedisEvent():
def __init__(self, event, event_detail_key=None, information=None) -> None:
def __init__(self,
event,
service_api_descriptions=None,
api_ids=None,
api_invoker_ids=None,
acc_ctrl_pol_list=None,
invocation_logs=None,
api_topo_hide=None) -> None:
self.EVENTS_ENUM = [
'SERVICE_API_AVAILABLE',
'SERVICE_API_UNAVAILABLE',
......@@ -27,9 +34,23 @@ class RedisEvent():
self.redis_event = {
"event": event
}
if event_detail_key != None and information != None:
self.redis_event['key'] = event_detail_key
self.redis_event['information'] = information
# Add event filter keys to an auxiliary object
event_detail = {
"serviceApiDescriptions": service_api_descriptions,
"apiIds": api_ids,
"apiInvokerIds": api_invoker_ids,
"accCtrlPolList": acc_ctrl_pol_list,
"invocationLogs": invocation_logs,
"apiTopoHide": api_topo_hide
}
# Filter keys with not None values
filtered_event_detail = {k: v for k,
v in event_detail.items() if v is not None}
# If there are valid values then add to redis event.
if filtered_event_detail:
self.redis_event["event_detail"] = filtered_event_detail
def to_string(self):
return json.dumps(self.redis_event, cls=JSONEncoder)
......
......@@ -59,7 +59,7 @@ class InternalServiceOps(Resource):
"apiInvokerPolicies": inserted_service_acls_camel['apiInvokerPolicies']
}
RedisEvent("ACCESS_CONTROL_POLICY_UPDATE",
["accCtrlPolList"], [accCtrlPolListExt]).send_event()
acc_ctrl_pol_list=accCtrlPolListExt).send_event()
current_app.logger.info(
f"Invoker ACL added for invoker: {invoker_id} for service: {service_id}")
......
......@@ -6,7 +6,14 @@ publisher_ops = Publisher()
class RedisEvent():
def __init__(self, event, event_detail_key=None, information=None) -> None:
def __init__(self,
event,
service_api_descriptions=None,
api_ids=None,
api_invoker_ids=None,
acc_ctrl_pol_list=None,
invocation_logs=None,
api_topo_hide=None) -> None:
self.EVENTS_ENUM = [
'SERVICE_API_AVAILABLE',
'SERVICE_API_UNAVAILABLE',
......@@ -27,9 +34,23 @@ class RedisEvent():
self.redis_event = {
"event": event
}
if event_detail_key != None and information != None:
self.redis_event['key'] = event_detail_key
self.redis_event['information'] = information
# Add event filter keys to an auxiliary object
event_detail = {
"serviceApiDescriptions": service_api_descriptions,
"apiIds": api_ids,
"apiInvokerIds": api_invoker_ids,
"accCtrlPolList": acc_ctrl_pol_list,
"invocationLogs": invocation_logs,
"apiTopoHide": api_topo_hide
}
# Filter keys with not None values
filtered_event_detail = {k: v for k,
v in event_detail.items() if v is not None}
# If there are valid values then add to redis event.
if filtered_event_detail:
self.redis_event["event_detail"] = filtered_event_detail
def to_string(self):
return json.dumps(self.redis_event, cls=CustomJSONEncoder)
......
......@@ -21,36 +21,38 @@ class Notifications():
def send_notifications(self, redis_event):
try:
if redis_event.get('event', None) == None:
event = redis_event.get('event', None)
if event is None:
raise("Event value is not present on received event from REDIS")
current_app.logger.info("Received event " + redis_event.get('event') + ", sending notifications")
subscriptions = self.events_ops.get_event_subscriptions(redis_event.get('event'))
current_app.logger.info("Received event " + event + ", sending notifications")
subscriptions = self.events_ops.get_event_subscriptions(event)
current_app.logger.info(subscriptions)
for sub in subscriptions:
url = sub["notification_destination"]
current_app.logger.debug(url)
data = EventNotification(sub["subscription_id"], events=redis_event.get('event'))
if redis_event.get('key', None) != None and redis_event.get('information', None) != None:
data = EventNotification(sub["subscription_id"], events=event)
event_detail_redis=redis_event.get('event_detail', None)
if event_detail_redis is not None:
if EventSubscription.return_supp_feat_dict(sub["supported_features"])["EnhancedEventReport"]:
event_detail={}
for pos, key in enumerate(redis_event.get('key', None)):
current_app.logger.debug(f"information: {redis_event.get('information', None)[pos]}")
if redis_event.get('event', None) in ["SERVICE_API_AVAILABLE", "SERVICE_API_UNAVAILABLE"] and key == "apiIds":
event_detail["apiIds"]=[redis_event.get('information', None)[pos]]
elif EventSubscription.return_supp_feat_dict(sub["supported_features"])["ApiStatusMonitoring"] and key == "serviceAPIDescriptions":
event_detail["serviceAPIDescriptions"]=redis_event.get('information', None)[pos]
elif redis_event.get('event', None) in ["SERVICE_API_UPDATE"]:
event_detail["serviceAPIDescriptions"]=redis_event.get('information', None)[pos]
elif redis_event.get('event', None) in ["API_INVOKER_ONBOARDED", "API_INVOKER_OFFBOARDED", "API_INVOKER_UPDATED"]:
event_detail["apiInvokerIds"]=redis_event.get('information', None)[pos]
elif redis_event.get('event', None) in ["ACCESS_CONTROL_POLICY_UPDATE"]:
event_detail["accCtrlPolList"]=redis_event.get('information', None)[pos]
elif redis_event.get('event', None) in ["SERVICE_API_INVOCATION_SUCCESS", "SERVICE_API_INVOCATION_FAILURE"]:
event_detail["invocationLogs"]=redis_event.get('information', None)[pos]
elif redis_event.get('event', None) in ["API_TOPOLOGY_HIDING_CREATED", "API_TOPOLOGY_HIDING_REVOKED"]:
event_detail["apiTopoHide"]=redis_event.get('information', None)[pos]
current_app.logger.debug(f"event: {event_detail_redis}")
if event in ["SERVICE_API_AVAILABLE", "SERVICE_API_UNAVAILABLE"]:
event_detail["apiIds"]=event_detail_redis.get('apiIds', None)
if EventSubscription.return_supp_feat_dict(sub["supported_features"])["ApiStatusMonitoring"]:
event_detail["serviceAPIDescriptions"]=event_detail_redis.get('serviceAPIDescriptions', None)
elif event in ["SERVICE_API_UPDATE"]:
event_detail["serviceAPIDescriptions"]=event_detail_redis.get('serviceAPIDescriptions', None)
elif event in ["API_INVOKER_ONBOARDED", "API_INVOKER_OFFBOARDED", "API_INVOKER_UPDATED"]:
event_detail["apiInvokerIds"]=event_detail_redis.get('apiInvokerIds', None)
elif event in ["ACCESS_CONTROL_POLICY_UPDATE"]:
event_detail["accCtrlPolList"]=event_detail_redis.get('accCtrlPolList', None)
elif event in ["SERVICE_API_INVOCATION_SUCCESS", "SERVICE_API_INVOCATION_FAILURE"]:
event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None)
elif event in ["API_TOPOLOGY_HIDING_CREATED", "API_TOPOLOGY_HIDING_REVOKED"]:
event_detail["apiTopoHide"]=event_detail_redis.get('apiTopoHide', None)
current_app.logger.debug(event_detail)
data.event_detail=event_detail
......
......@@ -2,7 +2,7 @@
import os
import secrets
from flask import current_app, Flask, Response
from flask import current_app
from pymongo import ReturnDocument
from ..util import serialize_clean_camel_case
......@@ -14,7 +14,6 @@ from .redis_event import RedisEvent
import json
class LoggingInvocationOperations(Resource):
def __check_aef(self, request_aef_id, body_aef_id):
......@@ -22,7 +21,8 @@ class LoggingInvocationOperations(Resource):
prov_col = self.db.get_col_by_name(self.db.provider_details)
current_app.logger.debug("Checking aef id")
aef_res = prov_col.find_one({'api_prov_funcs': {'$elemMatch': {'api_prov_func_role': 'AEF', 'api_prov_func_id': request_aef_id}}})
aef_res = prov_col.find_one({'api_prov_funcs': {'$elemMatch': {
'api_prov_func_role': 'AEF', 'api_prov_func_id': request_aef_id}}})
if aef_res is None:
current_app.logger.error("Exposer not exist")
......@@ -31,7 +31,6 @@ class LoggingInvocationOperations(Resource):
if request_aef_id != body_aef_id:
return unauthorized_error(detail="AEF id not matching in request and body", cause="Not identical AEF id")
return None
def __check_invoker(self, invoker_id):
......@@ -50,18 +49,20 @@ class LoggingInvocationOperations(Resource):
serv_apis = self.db.get_col_by_name(self.db.service_apis)
current_app.logger.debug("Checking service apis")
services_api_res = serv_apis.find_one({"$and": [{'api_id': api_id}, {'api_name': api_name}]})
services_api_res = serv_apis.find_one(
{"$and": [{'api_id': api_id}, {'api_name': api_name}]})
if services_api_res is None:
detail = "Service API not exist"
cause = "Service API with id {} and name {} not found".format(api_id, api_name)
cause = "Service API with id {} and name {} not found".format(
api_id, api_name)
current_app.logger.error(detail)
return not_found_error(detail=detail, cause=cause)
return None
def add_invocationlog(self, aef_id, invocationlog):
mycol = self.db.get_col_by_name(self.db.invocation_logs)
try:
......@@ -79,8 +80,9 @@ class LoggingInvocationOperations(Resource):
return result
current_app.logger.debug("Check service apis")
event=None
invocation_log_base=json.loads(json.dumps(invocationlog.to_dict(), cls=CustomJSONEncoder))
event = None
invocation_log_base = json.loads(json.dumps(
invocationlog.to_dict(), cls=CustomJSONEncoder))
for log in invocationlog.logs:
result = self.__check_service_apis(log.api_id, log.api_name)
......@@ -88,23 +90,25 @@ class LoggingInvocationOperations(Resource):
current_app.logger.debug("Inside for loop.")
if result is not None:
return result
if log.result:
current_app.logger.debug(log)
if int(log.result) >= 200 and int(log.result) < 300:
event="SERVICE_API_INVOCATION_SUCCESS"
event = "SERVICE_API_INVOCATION_SUCCESS"
else:
event="SERVICE_API_INVOCATION_FAILURE"
event = "SERVICE_API_INVOCATION_FAILURE"
current_app.logger.info(event)
invocation_log_base['logs']=[log.to_dict()]
invocationLogs=[invocation_log_base]
RedisEvent(event,["invocation_logs"],[invocationLogs]).send_event()
invocation_log_base['logs'] = [log.to_dict()]
invocationLogs = [invocation_log_base]
RedisEvent(event, invocation_logs=[
invocationLogs]).send_event()
current_app.logger.debug("After log check")
current_app.logger.debug("Check existing logs")
my_query = {'aef_id': aef_id, 'api_invoker_id': invocationlog.api_invoker_id}
my_query = {'aef_id': aef_id,
'api_invoker_id': invocationlog.api_invoker_id}
existing_invocationlog = mycol.find_one(my_query)
if existing_invocationlog is None:
......@@ -119,21 +123,25 @@ class LoggingInvocationOperations(Resource):
log_id = existing_invocationlog['log_id']
updated_invocation_logs = invocationlog.to_dict()
for updated_invocation_log in updated_invocation_logs['logs']:
existing_invocationlog['logs'].append(updated_invocation_log)
mycol.find_one_and_update(my_query, {"$set": existing_invocationlog}, projection={'_id': 0, 'log_id': 0}, return_document=ReturnDocument.AFTER, upsert=False)
existing_invocationlog['logs'].append(
updated_invocation_log)
mycol.find_one_and_update(my_query, {"$set": existing_invocationlog}, projection={
'_id': 0, 'log_id': 0}, return_document=ReturnDocument.AFTER, upsert=False)
res = make_response(object=serialize_clean_camel_case(invocationlog), status=201)
res = make_response(object=serialize_clean_camel_case(
invocationlog), status=201)
current_app.logger.debug("Invocation Logs response ready")
apis_added = {log.api_id:log.api_name for log in invocationlog.logs}
apis_added = {
log.api_id: log.api_name for log in invocationlog.logs}
current_app.logger.debug(f"Added log entry to apis: {apis_added}")
res.headers['Location'] = "https://{}/api-invocation-logs/v1/{}/logs/{}".format(os.getenv('CAPIF_HOSTNAME'), str(aef_id), str(log_id))
res.headers['Location'] = "https://{}/api-invocation-logs/v1/{}/logs/{}".format(
os.getenv('CAPIF_HOSTNAME'), str(aef_id), str(log_id))
return res
except Exception as e:
exception = "An exception occurred in inserting invocation logs"
current_app.logger.error(exception + "::" + str(e))
return internal_server_error(detail=exception, cause=str(e))
......@@ -6,7 +6,14 @@ publisher_ops = Publisher()
class RedisEvent():
def __init__(self, event, event_detail_key=None, information=None) -> None:
def __init__(self,
event,
service_api_descriptions=None,
api_ids=None,
api_invoker_ids=None,
acc_ctrl_pol_list=None,
invocation_logs=None,
api_topo_hide=None) -> None:
self.EVENTS_ENUM = [
'SERVICE_API_AVAILABLE',
'SERVICE_API_UNAVAILABLE',
......@@ -27,9 +34,23 @@ class RedisEvent():
self.redis_event = {
"event": event
}
if event_detail_key != None and information != None:
self.redis_event['key'] = event_detail_key
self.redis_event['information'] = information
# Add event filter keys to an auxiliary object
event_detail = {
"serviceApiDescriptions": service_api_descriptions,
"apiIds": api_ids,
"apiInvokerIds": api_invoker_ids,
"accCtrlPolList": acc_ctrl_pol_list,
"invocationLogs": invocation_logs,
"apiTopoHide": api_topo_hide
}
# Filter keys with not None values
filtered_event_detail = {k: v for k,
v in event_detail.items() if v is not None}
# If there are valid values then add to redis event.
if filtered_event_detail:
self.redis_event["event_detail"] = filtered_event_detail
def to_string(self):
return json.dumps(self.redis_event, cls=CustomJSONEncoder)
......
......@@ -6,7 +6,14 @@ publisher_ops = Publisher()
class RedisEvent():
def __init__(self, event, event_detail_key=None, information=None) -> None:
def __init__(self,
event,
service_api_descriptions=None,
api_ids=None,
api_invoker_ids=None,
acc_ctrl_pol_list=None,
invocation_logs=None,
api_topo_hide=None) -> None:
self.EVENTS_ENUM = [
'SERVICE_API_AVAILABLE',
'SERVICE_API_UNAVAILABLE',
......@@ -27,9 +34,23 @@ class RedisEvent():
self.redis_event = {
"event": event
}
if event_detail_key != None and information != None:
self.redis_event['key'] = event_detail_key
self.redis_event['information'] = information
# Add event filter keys to an auxiliary object
event_detail = {
"serviceApiDescriptions": service_api_descriptions,
"apiIds": api_ids,
"apiInvokerIds": api_invoker_ids,
"accCtrlPolList": acc_ctrl_pol_list,
"invocationLogs": invocation_logs,
"apiTopoHide": api_topo_hide
}
# Filter keys with not None values
filtered_event_detail = {k: v for k,
v in event_detail.items() if v is not None}
# If there are valid values then add to redis event.
if filtered_event_detail:
self.redis_event["event_detail"] = filtered_event_detail
def to_string(self):
return json.dumps(self.redis_event, cls=JSONEncoder)
......
......@@ -114,7 +114,8 @@ class PublishServiceOperations(Resource):
serviceapidescription_dict = serviceapidescription.to_dict()
if vendor_specific:
serviceapidescription_dict = add_vend_spec_fields(vendor_specific, serviceapidescription_dict)
serviceapidescription_dict = add_vend_spec_fields(
vendor_specific, serviceapidescription_dict)
rec.update(serviceapidescription_dict)
......@@ -135,24 +136,24 @@ class PublishServiceOperations(Resource):
if serviceapidescription.return_supp_feat_dict(serviceapidescription.supported_features)["ApiStatusMonitoring"]:
current_app.logger.info(f"Service available")
RedisEvent("SERVICE_API_AVAILABLE",
["serviceAPIDescriptions", "apiIds"],
[[clean_n_camel_case(serviceapidescription.to_dict())], [str(api_id)]]).send_event()
service_api_descriptions=[clean_n_camel_case(
serviceapidescription.to_dict())],
api_ids=[str(api_id)]).send_event()
else:
current_app.logger.info("Service available")
RedisEvent("SERVICE_API_AVAILABLE",
["apiIds"],
[str(api_id)]).send_event()
api_ids=[str(api_id)]).send_event()
else:
if serviceapidescription.return_supp_feat_dict(serviceapidescription.supported_features)["ApiStatusMonitoring"]:
current_app.logger.info(f"Service unavailable")
RedisEvent("SERVICE_API_UNAVAILABLE",
["serviceAPIDescriptions", "apiIds"],
[[clean_n_camel_case(serviceapidescription.to_dict())], [str(api_id)]]).send_event()
service_api_descriptions=[clean_n_camel_case(
serviceapidescription.to_dict())],
api_ids=[str(api_id)]).send_event()
else:
current_app.logger.info("Service available")
RedisEvent("SERVICE_API_UNAVAILABLE",
["apiIds"],
[str(api_id)]).send_event()
api_ids=[str(api_id)]).send_event()
return res
......@@ -236,8 +237,8 @@ class PublishServiceOperations(Resource):
current_app.logger.info("Service unavailable")
RedisEvent(
"SERVICE_API_UNAVAILABLE",
["serviceAPIDescriptions", "apiIds"],
[[serviceapidescription], [str(service_api_id)]]
service_api_descriptions=[serviceapidescription],
api_ids=[str(service_api_id)]
).send_event()
else:
......@@ -245,9 +246,7 @@ class PublishServiceOperations(Resource):
"supportedFeatures") is None else "Service unavailable"
current_app.logger.info(status_message)
RedisEvent(
"SERVICE_API_UNAVAILABLE",
["apiIds"],
[str(service_api_id)]
"SERVICE_API_UNAVAILABLE", api_ids=[str(service_api_id)]
).send_event()
return res
......@@ -306,42 +305,41 @@ class PublishServiceOperations(Resource):
service_api_description = ServiceAPIDescription.from_dict(
json.dumps(service_api_description_updated, cls=CustomJSONEncoder))
if response.status_code == 200:
RedisEvent("SERVICE_API_UPDATE", ["serviceAPIDescriptions"], [[
service_api_description_updated]]).send_event()
RedisEvent("SERVICE_API_UPDATE",
service_api_descriptions=[service_api_description_updated]).send_event()
if service_api_description.api_status is None or len(service_api_description.api_status.aef_ids) > 0:
if service_api_description.supported_features is not None:
if service_api_description.return_supp_feat_dict(service_api_description.supported_features)["ApiStatusMonitoring"]:
current_app.logger.info(f"Service available")
RedisEvent("SERVICE_API_AVAILABLE",
["serviceAPIDescriptions", "apiIds"],
[[service_api_description], [str(service_api_id)]]).send_event()
service_api_descriptions=[
service_api_description],
api_ids=[str(service_api_id)]
).send_event()
else:
current_app.logger.info("Service available")
RedisEvent("SERVICE_API_AVAILABLE",
["apiIds"],
[str(service_api_id)]).send_event()
api_ids=[str(service_api_id)]).send_event()
else:
current_app.logger.info("Service available")
RedisEvent("SERVICE_API_AVAILABLE",
["apiIds"],
[str(service_api_id)]).send_event()
RedisEvent("SERVICE_API_AVAILABLE",
api_ids=[str(service_api_id)]).send_event()
else:
if service_api_description.supported_features is not None:
if service_api_description.return_supp_feat_dict(service_api_description.supported_features)["ApiStatusMonitoring"]:
current_app.logger.info(f"Service unavailable")
RedisEvent("SERVICE_API_UNAVAILABLE",
["serviceAPIDescriptions", "apiIds"],
[[service_api_description], [str(service_api_id)]]).send_event()
service_api_descriptions=[
service_api_description],
api_ids=[str(service_api_id)]).send_event()
else:
current_app.logger.info("Service available")
RedisEvent("SERVICE_API_UNAVAILABLE",
["apiIds"],
[str(service_api_id)]).send_event()
api_ids=[str(service_api_id)]).send_event()
else:
current_app.logger.info("Service available")
RedisEvent("SERVICE_API_UNAVAILABLE",
["apiIds"],
[str(service_api_id)]).send_event()
api_ids=[str(service_api_id)]).send_event()
return response
......
from ..encoder import CustomJSONEncoder
from ..encoder import JSONEncoder
from .publisher import Publisher
import json
......@@ -6,7 +6,14 @@ publisher_ops = Publisher()
class RedisEvent():
def __init__(self, event, event_detail_key=None, information=None) -> None:
def __init__(self,
event,
service_api_descriptions=None,
api_ids=None,
api_invoker_ids=None,
acc_ctrl_pol_list=None,
invocation_logs=None,
api_topo_hide=None) -> None:
self.EVENTS_ENUM = [
'SERVICE_API_AVAILABLE',
'SERVICE_API_UNAVAILABLE',
......@@ -27,12 +34,26 @@ class RedisEvent():
self.redis_event = {
"event": event
}
if event_detail_key != None and information != None:
self.redis_event['key'] = event_detail_key
self.redis_event['information'] = information
# Add event filter keys to an auxiliary object
event_detail = {
"serviceApiDescriptions": service_api_descriptions,
"apiIds": api_ids,
"apiInvokerIds": api_invoker_ids,
"accCtrlPolList": acc_ctrl_pol_list,
"invocationLogs": invocation_logs,
"apiTopoHide": api_topo_hide
}
# Filter keys with not None values
filtered_event_detail = {k: v for k,
v in event_detail.items() if v is not None}
# If there are valid values then add to redis event.
if filtered_event_detail:
self.redis_event["event_detail"] = filtered_event_detail
def to_string(self):
return json.dumps(self.redis_event, cls=CustomJSONEncoder)
return json.dumps(self.redis_event, cls=JSONEncoder)
def send_event(self):
publisher_ops.publish_message("events", self.to_string())
......
......@@ -2,7 +2,7 @@
from pymongo import ReturnDocument
import rfc3987
from flask import current_app, Flask, Response
from flask import current_app
from flask_jwt_extended import create_access_token
from datetime import datetime, timedelta
import json
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment