Commit cff24653 authored by Jorge Moratinos's avatar Jorge Moratinos
Browse files

Merge branch 'OCF101-implement-eventfilters-on-events-service-notification' into 'staging'

Resolve "implement eventfilters on events service notification"

Closes #101

See merge request !91
parents 3f5851e2 c39b4d3c
Loading
Loading
Loading
Loading
Loading
+22 −4
Original line number Diff line number Diff line

from datetime import datetime, timedelta

from flask import current_app
@@ -36,6 +35,22 @@ class InternalServiceOps(Resource):
            if r is None:
                mycol.update_one({"service_id": service_id, "aef_id": aef_id}, {
                                 "$push": {"api_invoker_policies": invoker_acl.to_dict()}})
            
            inserted_invoker_acl = mycol.find_one({"service_id": service_id, "aef_id": aef_id,
                               "api_invoker_policies.api_invoker_id": invoker_id}, {"_id": 0})
            current_app.logger.info(inserted_invoker_acl)
            inserted_invoker_acl_camel = dict_to_camel_case(inserted_invoker_acl)
            current_app.logger.info(inserted_invoker_acl_camel)

            created_invoker_policy = next((policy for policy in inserted_invoker_acl_camel['apiInvokerPolicies'] if policy['apiInvokerId'] == invoker_id), None)

            accCtrlPolListExt = {
                "apiId": service_id,
                "apiInvokerPolicies": [created_invoker_policy]
            }
            RedisEvent("ACCESS_CONTROL_POLICY_UPDATE",
                       acc_ctrl_pol_list=accCtrlPolListExt).send_event()
            
        else:
            current_app.logger.info(
                f"Creating service ACLs for service: {service_id}")
@@ -55,9 +70,12 @@ class InternalServiceOps(Resource):
            current_app.logger.info(inserted_service_acls)
            inserted_service_acls_camel = dict_to_camel_case(inserted_service_acls)
            current_app.logger.info(inserted_service_acls_camel)

            created_invoker_policy = next((policy for policy in inserted_service_acls_camel['apiInvokerPolicies'] if policy['apiInvokerId'] == invoker_id), None)

            accCtrlPolListExt = {
                "apiId": service_id,
                "apiInvokerPolicies": inserted_service_acls_camel['apiInvokerPolicies']
                "apiInvokerPolicies": [created_invoker_policy]
            }
            RedisEvent("ACCESS_CONTROL_POLICY_UPDATE",
                       acc_ctrl_pol_list=accCtrlPolListExt).send_event()
+46 −1
Original line number Diff line number Diff line
@@ -33,6 +33,30 @@ class EventSubscriptionsOperations(Resource):

        return None
    
    def __check_event_filters(self, events, filters):
        current_app.logger.debug("Checking event filters.")
        valid_filters = {
            "SERVICE_API_UPDATE": ["api_ids"],
            "SERVICE_API_AVAILABLE" : ["api_ids"],
            "SERVICE_API_UNAVAILABLE" : ["api_ids"],
            "API_INVOKER_ONBOARDED": ["api_invoker_ids"],
            "API_INVOKER_OFFBOARDED": ["api_invoker_ids"],
            "API_INVOKER_UPDATED": ["api_invoker_ids"],
            "ACCESS_CONTROL_POLICY_UPDATE":["api_invoker_ids", "api_ids"],
            "SERVICE_API_INVOCATION_SUCCESS": ["api_invoker_ids", "api_ids", "aef_ids"],
            "SERVICE_API_INVOCATION_FAILURE": ["api_invoker_ids", "api_ids", "aef_ids"],
            "API_TOPOLOGY_HIDING_CREATED": [],
            "API_TOPOLOGY_HIDING_REVOKED": []
        }

        for event, filter in zip(events, filters):
            invalid_filters = set(filter.keys()) - set(valid_filters.get(event, []))

            if invalid_filters:
                 current_app.logger.debug(f"The eventFilter {invalid_filters} for event {event} are not applicable.")
                 return bad_request_error(detail="Bad Param", cause = f"Invalid eventFilter for event {event}", invalid_params=[{"param": "eventFilter", "reason": f"The eventFilter {invalid_filters} for event {event} are not applicable."}])
        return None

    def __init__(self):
        Resource.__init__(self)
        self.auth_manager = AuthManager()
@@ -59,11 +83,16 @@ class EventSubscriptionsOperations(Resource):

            result = self.__check_subscriber_id(subscriber_id)


            if  isinstance(result, Response):

                return result
            
            if EventSubscription.return_supp_feat_dict(event_subscription.supported_features)["EnhancedEventReport"] and event_subscription.event_filters:
                current_app.logger.debug(event_subscription.event_filters)
                result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"]))
                if  isinstance(result, Response):
                    return result

            # Generate subscriptionID
            subscription_id = secrets.token_hex(15)
            evnt = dict()
@@ -138,6 +167,11 @@ class EventSubscriptionsOperations(Resource):
            if  isinstance(result, Response):
                return result
            
            if EventSubscription.return_supp_feat_dict(event_subscription.supported_features)["EnhancedEventReport"] and event_subscription.event_filters:
                result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"]))
                if  isinstance(result, Response):
                    return result

            my_query = {'subscriber_id': subscriber_id,
                    'subscription_id': subscription_id}
            eventdescription = mycol.find_one(my_query)
@@ -183,6 +217,17 @@ class EventSubscriptionsOperations(Resource):
                current_app.logger.error("Event subscription not found")
                return not_found_error(detail="Event subscription not exist", cause="Event API subscription id not found")

            if EventSubscription.return_supp_feat_dict(eventdescription.get("supported_features"))["EnhancedEventReport"]:
                if event_subscription.events and event_subscription.event_filters:
                    result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"]))
                elif event_subscription.events and  event_subscription.event_filters is None and eventdescription.get("event_filters", None):
                    result = self.__check_event_filters(event_subscription.events, eventdescription.get("event_filters"))
                elif event_subscription.events is None and event_subscription.event_filters:
                    result = self.__check_event_filters(eventdescription.get("events"), clean_empty(event_subscription.to_dict()["event_filters"]))

                if  isinstance(result, Response):
                    return result

            body = clean_empty(event_subscription.to_dict())
            document = mycol.update_one(my_query, {"$set":body})
            document = mycol.find_one(my_query)
+83 −7
Original line number Diff line number Diff line
@@ -39,17 +39,93 @@ class Notifications():
                    if EventSubscription.return_supp_feat_dict(sub["supported_features"])["EnhancedEventReport"]:
                        event_detail={}
                        current_app.logger.debug(f"event: {event_detail_redis}")

                        event_filters = sub.get("event_filters", None)
                        event_filter=None
                        if event_filters:
                            try:
                                event_filter = None if all(value is None for value in event_filters[sub.get("events", []).index(event)].values()) else event_filters[sub.get("events", []).index(event)]
                                current_app.logger.debug(f"Event filters: {event_filter}")
                            except IndexError:
                                event_filter=None

                        if event in ["SERVICE_API_AVAILABLE", "SERVICE_API_UNAVAILABLE"]:
                            if event_filter:
                                api_ids_list = event_filter.get("api_ids", None)
                                if api_ids_list and event_detail_redis.get('apiIds', None)[0] in api_ids_list:
                                    event_detail["apiIds"]=event_detail_redis.get('apiIds', None)
                                    if EventSubscription.return_supp_feat_dict(sub["supported_features"])["ApiStatusMonitoring"]:
                                        event_detail["serviceAPIDescriptions"]=event_detail_redis.get('serviceAPIDescriptions', None)
                                else:
                                    continue
                            else:
                                event_detail["apiIds"]=event_detail_redis.get('apiIds', None)
                                if EventSubscription.return_supp_feat_dict(sub["supported_features"])["ApiStatusMonitoring"]:
                                    event_detail["serviceAPIDescriptions"]=event_detail_redis.get('serviceAPIDescriptions', None)
                        elif event in ["SERVICE_API_UPDATE"]:
                            if event_filter:
                                api_ids_list = event_filter.get("api_ids", None)
                                if api_ids_list and event_detail_redis.get('serviceAPIDescriptions', {})[0].get('apiId') in api_ids_list:
                                    event_detail["serviceAPIDescriptions"]=event_detail_redis.get('serviceAPIDescriptions', None)
                                else:
                                    continue
                            else:
                                event_detail["serviceAPIDescriptions"]=event_detail_redis.get('serviceAPIDescriptions', None)
                        elif event in ["API_INVOKER_ONBOARDED", "API_INVOKER_OFFBOARDED", "API_INVOKER_UPDATED"]:
                            if event_filter:
                                invoker_ids_list = event_filter.get("api_invoker_ids", None)
                                if invoker_ids_list and event_detail_redis.get('apiInvokerIds', None)[0] in invoker_ids_list:
                                    event_detail["apiInvokerIds"]=event_detail_redis.get('apiInvokerIds', None)
                                else:
                                    continue
                            else:
                                event_detail["apiInvokerIds"]=event_detail_redis.get('apiInvokerIds', None)
                        elif event in ["ACCESS_CONTROL_POLICY_UPDATE"]:
                            if event_filter:
                                filter_invoker_ids = event_filter.get("api_invoker_ids", [])
                                filter_api_ids = event_filter.get("api_ids", [])

                                invoker_ids_list = [invoker.get("apiInvokerId") for invoker in event_detail_redis.get("accCtrlPolList", None).get("apiInvokerPolicies")]
                                api_id = event_detail_redis.get("accCtrlPolList").get("apiId", None)

                                if (filter_api_ids and not filter_invoker_ids) and (api_id in filter_api_ids):
                                    event_detail["accCtrlPolList"]=event_detail_redis.get('accCtrlPolList', None)                               
                                elif (not filter_api_ids and filter_invoker_ids) and bool(set(filter_invoker_ids) & set(invoker_ids_list)):
                                    event_detail["accCtrlPolList"]=event_detail_redis.get('accCtrlPolList', None)
                                elif (filter_api_ids and filter_invoker_ids) and bool(set(filter_invoker_ids) & set(invoker_ids_list)) and api_id in filter_api_ids:
                                    event_detail["accCtrlPolList"]=event_detail_redis.get('accCtrlPolList', None)
                                else:
                                    continue
                            else:
                                event_detail["accCtrlPolList"]=event_detail_redis.get('accCtrlPolList', None)
                        elif event in ["SERVICE_API_INVOCATION_SUCCESS", "SERVICE_API_INVOCATION_FAILURE"]:
                            if event_filter:
                                filter_invoker_ids = event_filter.get("api_invoker_ids", None)
                                filter_api_ids = event_filter.get("api_ids", None)
                                filter_aef_ids = event_filter.get("aef_ids", None)

                                invoker_id = event_detail_redis.get("invocationLogs", None)[0].get("api_invoker_id", None)
                                aef_id = event_detail_redis.get("invocationLogs", None)[0].get("aef_id", None)
                                api_id = event_detail_redis.get("invocationLogs", None)[0].get("logs", None)[0].get("api_id", None)

                                if (filter_api_ids and not filter_invoker_ids and not filter_aef_ids) and (api_id in filter_api_ids):
                                   event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None)
                                elif (not filter_api_ids and filter_invoker_ids and not filter_aef_ids) and invoker_id in filter_invoker_ids:
                                   event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None)
                                elif (not filter_api_ids and not filter_invoker_ids and filter_aef_ids) and aef_id in filter_aef_ids:
                                   event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None)
                                elif (filter_api_ids and filter_invoker_ids and not filter_aef_ids) and (api_id in filter_api_ids) and invoker_id in filter_invoker_ids:
                                   event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None)                                
                                elif (filter_api_ids and not filter_invoker_ids and filter_aef_ids) and (api_id in filter_api_ids) and aef_id in filter_aef_ids:
                                    event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None)
                                elif (not filter_api_ids and filter_invoker_ids and filter_aef_ids) and invoker_id in filter_invoker_ids and aef_id in filter_aef_ids:
                                    event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None)                               
                                elif (filter_api_ids and filter_invoker_ids and filter_aef_ids) and (api_id in filter_api_ids) and invoker_id in filter_invoker_ids and aef_id in filter_aef_ids:
                                    event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None)
                                else:
                                    continue
                                    
                            else:
                                event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None)
                        elif event in ["API_TOPOLOGY_HIDING_CREATED", "API_TOPOLOGY_HIDING_REVOKED"]:
                            event_detail["apiTopoHide"]=event_detail_redis.get('apiTopoHide', None)
+14 −4
Original line number Diff line number Diff line
@@ -316,7 +316,6 @@ class PublishServiceOperations(Resource):
                return_document=ReturnDocument.AFTER, upsert=False)

            result = clean_empty(result)

            current_app.logger.debug("Updated service api")

            service_api_description_updated = dict_to_camel_case(result)
@@ -328,7 +327,13 @@ class PublishServiceOperations(Resource):
                RedisEvent("SERVICE_API_UPDATE",
                           service_api_descriptions=[service_api_description_updated]).send_event()

                my_service_api = clean_n_camel_case(serviceapidescription_old)
                my_service_api = clean_empty(serviceapidescription_old)

                if (api_status := serviceapidescription_old.get("api_status")):
                    my_service_api["api_status"] = api_status
                
                my_service_api = dict_to_camel_case(my_service_api)
                
                self.send_events_on_update(
                    service_api_id,
                    my_service_api,
@@ -405,7 +410,6 @@ class PublishServiceOperations(Resource):

            service_api_description_updated = dict_to_camel_case(result)

            current_app.logger.debug(service_api_description_updated)
            response = make_response(
                object=service_api_description_updated, status=200)

@@ -413,7 +417,13 @@ class PublishServiceOperations(Resource):
                RedisEvent("SERVICE_API_UPDATE",
                           service_api_descriptions=[service_api_description_updated]).send_event()

                my_service_api = clean_n_camel_case(serviceapidescription_old)
                my_service_api = clean_empty(serviceapidescription_old)

                if (api_status := serviceapidescription_old.get("api_status")):
                    my_service_api["api_status"] = api_status
                
                my_service_api = dict_to_camel_case(my_service_api)

                self.send_events_on_update(
                    service_api_id,
                    my_service_api,
+2 −2
Original line number Diff line number Diff line
@@ -245,7 +245,7 @@ Invoker subscribe to Service API Available and Unavailable events
    ${request_body}=    Create Events Subscription
    ...    events=@{events_list}
    ...    notification_destination=${NOTIFICATION_DESTINATION_URL}/testing
    ...    event_filters=${event_filters}
    # ...    event_filters=${event_filters}
    ...    supported_features=4
    ${resp}=    Post Request Capif
    ...    /capif-events/v1/${register_user_info_invoker['api_invoker_id']}/subscriptions
@@ -317,7 +317,7 @@ Invoker subscribe to Service API Update
    ${request_body}=    Create Events Subscription
    ...    events=@{events_list}
    ...    notification_destination=${NOTIFICATION_DESTINATION_URL}/testing
    ...    event_filters=${event_filters}
    # ...    event_filters=${event_filters}
    ...    supported_features=4
    ${resp}=    Post Request Capif
    ...    /capif-events/v1/${register_user_info_invoker['api_invoker_id']}/subscriptions