Commit cfceb9d1 authored by Jorge Moratinos's avatar Jorge Moratinos
Browse files

Merge branch 'OCF120-implement-eventreq-2' into 'staging'

Ocf120 implement eventreq 2

See merge request !120
parents 60c6bde8 82d5bada
Loading
Loading
Loading
Loading
Loading
+112 −23
Original line number Original line Diff line number Diff line
@@ -3,12 +3,16 @@ import secrets


import rfc3987
import rfc3987
from capif_events.models.event_subscription import EventSubscription  # noqa: E501
from capif_events.models.event_subscription import EventSubscription  # noqa: E501
from flask import Response, current_app
from flask import current_app, Response
from datetime import datetime, timezone
import asyncio


from ..util import clean_empty, dict_to_camel_case, serialize_clean_camel_case
from ..util import clean_empty, dict_to_camel_case, serialize_clean_camel_case
from .auth_manager import AuthManager
from .auth_manager import AuthManager
from .resources import Resource
from .resources import Resource
from .responses import bad_request_error, internal_server_error, make_response, not_found_error
from .responses import internal_server_error, not_found_error, make_response, bad_request_error
from ..util import serialize_clean_camel_case, clean_empty, dict_to_camel_case
from .notifications import Notifications


TOTAL_FEATURES = 4
TOTAL_FEATURES = 4
SUPPORTED_FEATURES_HEX = "c"
SUPPORTED_FEATURES_HEX = "c"
@@ -27,6 +31,10 @@ def return_negotiated_supp_feat_dict(supp_feat):


class EventSubscriptionsOperations(Resource):
class EventSubscriptionsOperations(Resource):


    def __init__(self):
        super().__init__()
        self.notifications = Notifications()

    def __check_subscriber_id(self, subscriber_id):
    def __check_subscriber_id(self, subscriber_id):
        mycol_invoker= self.db.get_col_by_name(self.db.invoker_collection)
        mycol_invoker= self.db.get_col_by_name(self.db.invoker_collection)
        mycol_provider= self.db.get_col_by_name(self.db.provider_collection)
        mycol_provider= self.db.get_col_by_name(self.db.provider_collection)
@@ -71,6 +79,37 @@ class EventSubscriptionsOperations(Resource):
                 return bad_request_error(detail="Bad Param", cause = f"Invalid eventFilter for event {event}", invalid_params=[{"param": "eventFilter", "reason": f"The eventFilter {invalid_filters} for event {event} are not applicable."}])
                 return bad_request_error(detail="Bad Param", cause = f"Invalid eventFilter for event {event}", invalid_params=[{"param": "eventFilter", "reason": f"The eventFilter {invalid_filters} for event {event} are not applicable."}])
        return None
        return None
    
    
    def __check_event_req(self, event_subscription, subscription_id=None):
        current_app.logger.debug("Checking event requirement.")
        expired_at = None
        if event_subscription.event_req.mon_dur:
            if event_subscription.event_req.mon_dur > datetime.now(timezone.utc):
                expired_at = event_subscription.event_req.mon_dur
            else:
                current_app.logger.error("monDur is in the past")
                return bad_request_error(
                    detail="Bad Param",
                    cause="monDur is in the past",
                    invalid_params=[{"param": "monDur", "reason": "monDur is in the past"}]
                )
        
        if event_subscription.event_req.notif_method == "PERIODIC" and event_subscription.event_req.rep_period is None:
            current_app.logger.error("Periodic notification method selected but repPeriod not provided")
            return bad_request_error(
                detail="Bad Param",
                cause="Periodic notification method selected but repPeriod not provided",
                invalid_params=[{"param": "repPeriod", "reason": "Periodic notification method selected but repPeriod not provided"}]
            )
        
        if event_subscription.event_req.imm_rep and subscription_id is not None:
            current_app.logger.debug("Sending immediate notification")
            notifications_col = self.db.get_col_by_name(self.db.notifications_col)
            result = notifications_col.find({"subscription_id": subscription_id})
            for notification in result:
                asyncio.run(self.notifications.send(notification["url"], notification["notification"]))

        return expired_at

    def __init__(self):
    def __init__(self):
        Resource.__init__(self)
        Resource.__init__(self)
        self.auth_manager = AuthManager()
        self.auth_manager = AuthManager()
@@ -103,6 +142,7 @@ class EventSubscriptionsOperations(Resource):


            negotiated_supported_features = return_negotiated_supp_feat_dict(event_subscription.supported_features)
            negotiated_supported_features = return_negotiated_supp_feat_dict(event_subscription.supported_features)
            
            
            expired_at = None
            # Check if EnhancedEventReport is enabled and validate event filters
            # Check if EnhancedEventReport is enabled and validate event filters
            if negotiated_supported_features["EnhancedEventReport"]:
            if negotiated_supported_features["EnhancedEventReport"]:
                if event_subscription.event_filters:
                if event_subscription.event_filters:
@@ -110,6 +150,11 @@ class EventSubscriptionsOperations(Resource):
                    result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"]))
                    result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"]))
                    if isinstance(result, Response):
                    if isinstance(result, Response):
                        return result
                        return result
                if event_subscription.event_req:
                    current_app.logger.debug(event_subscription.event_req)
                    expired_at = self.__check_event_req(event_subscription)
                    if isinstance(expired_at, Response):
                        return result
            else:
            else:
                if event_subscription.event_filters:
                if event_subscription.event_filters:
                    current_app.logger.error("Event filters provided but EnhancedEventReport is not enabled")
                    current_app.logger.error("Event filters provided but EnhancedEventReport is not enabled")
@@ -118,6 +163,13 @@ class EventSubscriptionsOperations(Resource):
                        cause="Event filters provided but EnhancedEventReport is not enabled",
                        cause="Event filters provided but EnhancedEventReport is not enabled",
                        invalid_params=[{"param": "eventFilters", "reason": "EnhancedEventReport is not enabled"}]
                        invalid_params=[{"param": "eventFilters", "reason": "EnhancedEventReport is not enabled"}]
                    )
                    )
                if event_subscription.event_req:
                    current_app.logger.error("Event requirement provided but EnhancedEventReport is not enabled")
                    return bad_request_error(
                        detail="Bad Param",
                        cause="Event requirement provided but EnhancedEventReport is not enabled",
                        invalid_params=[{"param": "eventReq", "reason": "EnhancedEventReport is not enabled"}]
                    )


            # Generate subscriptionID
            # Generate subscriptionID
            subscription_id = secrets.token_hex(15)
            subscription_id = secrets.token_hex(15)
@@ -125,7 +177,9 @@ class EventSubscriptionsOperations(Resource):
            evnt["subscriber_id"] = subscriber_id
            evnt["subscriber_id"] = subscriber_id
            evnt["subscription_id"] = subscription_id
            evnt["subscription_id"] = subscription_id


            # Edit supported_features field to the negotiated one
            evnt["report_nbr"] = 0
            evnt["created_at"] = datetime.now(timezone.utc)
            evnt["expire_at"] = expired_at
            event_subscription.supported_features = negotiated_supported_features["Final"]
            event_subscription.supported_features = negotiated_supported_features["Final"]


            evnt.update(event_subscription.to_dict())
            evnt.update(event_subscription.to_dict())
@@ -149,6 +203,7 @@ class EventSubscriptionsOperations(Resource):


        try:
        try:
            mycol = self.db.get_col_by_name(self.db.event_collection)
            mycol = self.db.get_col_by_name(self.db.event_collection)
            notifications_col = self.db.get_col_by_name(self.db.notifications_col)


            current_app.logger.debug("Removing event subscription")
            current_app.logger.debug("Removing event subscription")


@@ -167,6 +222,7 @@ class EventSubscriptionsOperations(Resource):
                return not_found_error(detail="Event subscription not exist", cause="Event API subscription id not found")
                return not_found_error(detail="Event subscription not exist", cause="Event API subscription id not found")


            mycol.delete_one(my_query)
            mycol.delete_one(my_query)
            notifications_col.delete_many({"subscription_id": subscription_id})
            current_app.logger.debug("Event subscription removed from database")
            current_app.logger.debug("Event subscription removed from database")


            self.auth_manager.remove_auth_event(subscription_id, subscriber_id)
            self.auth_manager.remove_auth_event(subscription_id, subscriber_id)
@@ -182,6 +238,7 @@ class EventSubscriptionsOperations(Resource):
    def put_event(self, event_subscription, subscriber_id, subscription_id):
    def put_event(self, event_subscription, subscriber_id, subscription_id):
        try:
        try:
            mycol = self.db.get_col_by_name(self.db.event_collection)
            mycol = self.db.get_col_by_name(self.db.event_collection)
            notifications_col = self.db.get_col_by_name(self.db.notifications_col)


            current_app.logger.debug("Updating event subscription")
            current_app.logger.debug("Updating event subscription")


@@ -197,28 +254,39 @@ class EventSubscriptionsOperations(Resource):
            if  isinstance(result, Response):
            if  isinstance(result, Response):
                return result
                return result
            
            
            my_query = {'subscriber_id': subscriber_id,
            current_app.logger.debug(event_subscription)
                        'subscription_id': subscription_id}
            expired_at = None
            eventdescription = mycol.find_one(my_query)

            if eventdescription is None:
                current_app.logger.error("Event subscription not found")
                return not_found_error(detail="Event subscription not exist",
                                       cause="Event API subscription id not found")


            negotiated_supported_features = return_negotiated_supp_feat_dict(event_subscription.supported_features)
            negotiated_supported_features = return_negotiated_supp_feat_dict(event_subscription.supported_features)

            if negotiated_supported_features["EnhancedEventReport"]:
            if negotiated_supported_features["EnhancedEventReport"] and event_subscription.event_filters:
                if event_subscription.event_filters:
                    current_app.logger.debug(event_subscription.event_filters)
                    result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"]))
                    result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"]))
                    if isinstance(result, Response):
                    if isinstance(result, Response):
                        return result
                        return result
            elif (not negotiated_supported_features["EnhancedEventReport"]) and event_subscription.event_filters:
                if event_subscription.event_req:
                    current_app.logger.debug(event_subscription.event_req)
                    expired_at = self.__check_event_req(event_subscription)
                    if isinstance(expired_at, Response):
                        return result
            else:
                if event_subscription.event_filters:
                    current_app.logger.error("Event filters provided but EnhancedEventReport is not enabled")
                    current_app.logger.error("Event filters provided but EnhancedEventReport is not enabled")
                    return bad_request_error(
                    return bad_request_error(
                        detail="Bad Param",
                        detail="Bad Param",
                        cause="Event filters provided but EnhancedEventReport is not enabled",
                        cause="Event filters provided but EnhancedEventReport is not enabled",
                        invalid_params=[{"param": "eventFilters", "reason": "EnhancedEventReport is not enabled"}]
                        invalid_params=[{"param": "eventFilters", "reason": "EnhancedEventReport is not enabled"}]
                    )
                    )
                if event_subscription.event_req:
                    current_app.logger.error("Event requirement provided but EnhancedEventReport is not enabled")
                    return bad_request_error(
                        detail="Bad Param",
                        cause="Event requirement provided but EnhancedEventReport is not enabled",
                        invalid_params=[{"param": "eventReq", "reason": "EnhancedEventReport is not enabled"}]
                    )
            my_query = {'subscriber_id': subscriber_id,
                        'subscription_id': subscription_id}
            eventdescription = mycol.find_one(my_query)


            event_subscription.supported_features = negotiated_supported_features["Final"]
            event_subscription.supported_features = negotiated_supported_features["Final"]


@@ -227,6 +295,11 @@ class EventSubscriptionsOperations(Resource):
            body["subscriber_id"] = subscriber_id
            body["subscriber_id"] = subscriber_id
            body["subscription_id"] = subscription_id
            body["subscription_id"] = subscription_id


            body["report_nbr"] = eventdescription.get("report_nbr", 0)
            body["created_at"] = eventdescription.get("created_at", datetime.now(timezone.utc))
            body["expire_at"] = expired_at if expired_at else eventdescription.get("expire_at", None)

            notifications_col.delete_many({"subscription_id": subscription_id})
            mycol.replace_one(my_query, body)
            mycol.replace_one(my_query, body)
            current_app.logger.debug("Event subscription updated from database")
            current_app.logger.debug("Event subscription updated from database")


@@ -244,6 +317,7 @@ class EventSubscriptionsOperations(Resource):
    def patch_event(self, event_subscription, subscriber_id, subscription_id):
    def patch_event(self, event_subscription, subscriber_id, subscription_id):
        try:
        try:
            mycol = self.db.get_col_by_name(self.db.event_collection)
            mycol = self.db.get_col_by_name(self.db.event_collection)
            notifications_col = self.db.get_col_by_name(self.db.notifications_col)


            current_app.logger.debug("Patching event subscription")
            current_app.logger.debug("Patching event subscription")


@@ -260,6 +334,9 @@ class EventSubscriptionsOperations(Resource):
                current_app.logger.error("Event subscription not found")
                current_app.logger.error("Event subscription not found")
                return not_found_error(detail="Event subscription not exist", cause="Event API subscription id not found")
                return not_found_error(detail="Event subscription not exist", cause="Event API subscription id not found")


            current_app.logger.debug(event_subscription)
            expired_at = None
            
            negotiated_supported_features = return_negotiated_supp_feat_dict(eventdescription.get("supported_features"))
            negotiated_supported_features = return_negotiated_supp_feat_dict(eventdescription.get("supported_features"))


            if negotiated_supported_features["EnhancedEventReport"]:
            if negotiated_supported_features["EnhancedEventReport"]:
@@ -269,6 +346,16 @@ class EventSubscriptionsOperations(Resource):
                    result = self.__check_event_filters(event_subscription.events, eventdescription.get("event_filters"))
                    result = self.__check_event_filters(event_subscription.events, eventdescription.get("event_filters"))
                elif event_subscription.events is None and event_subscription.event_filters:
                elif event_subscription.events is None and event_subscription.event_filters:
                    result = self.__check_event_filters(eventdescription.get("events"), clean_empty(event_subscription.to_dict()["event_filters"]))
                    result = self.__check_event_filters(eventdescription.get("events"), clean_empty(event_subscription.to_dict()["event_filters"]))
                if  isinstance(result, Response):
                    return result
                
                if event_subscription.event_req:
                    updated_data = EventSubscription.from_dict(dict_to_camel_case({**eventdescription, **clean_empty(event_subscription.to_dict())}))
                    expired_at = self.__check_event_req(updated_data, subscription_id)
                    if isinstance(expired_at, Response):
                        return result
                    else:
                        expired_at = expired_at if expired_at else eventdescription.get("expire_at", None)


                if  isinstance(result, Response):
                if  isinstance(result, Response):
                    return result
                    return result
@@ -276,6 +363,8 @@ class EventSubscriptionsOperations(Resource):
            event_subscription.supported_features = negotiated_supported_features["Final"]
            event_subscription.supported_features = negotiated_supported_features["Final"]


            body = clean_empty(event_subscription.to_dict())
            body = clean_empty(event_subscription.to_dict())
            body["expire_at"] = expired_at
            notifications_col.delete_many({"subscription_id": subscription_id})
            document = mycol.update_one(my_query, {"$set":body})
            document = mycol.update_one(my_query, {"$set":body})
            document = mycol.find_one(my_query)
            document = mycol.find_one(my_query)
            current_app.logger.debug("Event subscription patched from database")
            current_app.logger.debug("Event subscription patched from database")
+31 −0
Original line number Original line Diff line number Diff line
@@ -22,6 +22,14 @@ class InternalEventOperations(Resource):
        #We dont need remove all auth events, becase when invoker is removed, remove auth entry
        #We dont need remove all auth events, becase when invoker is removed, remove auth entry
        #self.auth_manager.remove_auth_all_event(subscriber_id)
        #self.auth_manager.remove_auth_all_event(subscriber_id)
    
    
    def delete_subscription(self, subscription_id):

        mycol = self.db.get_col_by_name(self.db.event_collection)
        my_query = {'subscription_id': subscription_id}
        mycol.delete_one(my_query)

        current_app.logger.info(f"Removed subscription: {subscription_id}")

    def get_event_subscriptions(self, event):
    def get_event_subscriptions(self, event):
        current_app.logger.info("get subscription from db")
        current_app.logger.info("get subscription from db")
        try:
        try:
@@ -42,3 +50,26 @@ class InternalEventOperations(Resource):
        except Exception as e:
        except Exception as e:
            current_app.logger.error("An exception occurred ::" + str(e))
            current_app.logger.error("An exception occurred ::" + str(e))
            return False
            return False
        
    def add_notification(self, notification):
        current_app.logger.info("Adding Notification to notifications list")
        try:
            mycol = self.db.get_col_by_name(self.db.notifications_col)
            mycol.insert_one(notification)
            current_app.logger.info("Notification added to notifications list")
        except Exception as e:
            current_app.logger.error("An exception occurred ::" + str(e))
            return False
    
    def update_report_nbr(self, subscription_id):
        current_app.logger.info("Incrementing report number")
        try:
            mycol = self.db.get_col_by_name(self.db.event_collection)
            my_query = {'subscription_id': subscription_id}
            result = mycol.update_one(my_query, {'$inc': {'report_nbr': 1}})
            current_app.logger.info(result)
            current_app.logger.info("Report number incremented")
        except Exception as e:
            current_app.logger.error("An exception occurred ::" + str(e))
            return False
+30 −1
Original line number Original line Diff line number Diff line
@@ -9,6 +9,7 @@ from encoder import CustomJSONEncoder
from flask import current_app
from flask import current_app
from models.event_notification import EventNotification
from models.event_notification import EventNotification
from util import serialize_clean_camel_case
from util import serialize_clean_camel_case
from datetime import datetime, timedelta, timezone


from .internal_event_ops import InternalEventOperations
from .internal_event_ops import InternalEventOperations


@@ -148,7 +149,35 @@ class Notifications():


                current_app.logger.debug(json.dumps(data.to_dict(),cls=CustomJSONEncoder))
                current_app.logger.debug(json.dumps(data.to_dict(),cls=CustomJSONEncoder))


                if return_negotiated_supp_feat_dict(sub["supported_features"])["EnhancedEventReport"] and sub.get("event_req", None):
                    current_app.logger.debug(f"Creating notification for {sub['subscription_id']}")

                    if sub["event_req"]["notif_method"] == "PERIODIC":
                        transcurred_time = (datetime.now(timezone.utc)-sub["created_at"]).total_seconds()
                        if transcurred_time > sub["event_req"]["rep_period"]:
                            transcurred_blocks = int(transcurred_time // sub["event_req"]["rep_period"])
                            next_report_time = sub["created_at"] + timedelta(seconds=((transcurred_blocks+1) * sub["event_req"]["rep_period"]))
                        else:
                            next_report_time = sub["created_at"] + timedelta(seconds=sub["event_req"]["rep_period"])

                        notification = {"notification": data.to_dict(), "next_report_time" : next_report_time, "url": url, "subscription_id": sub["subscription_id"]}

                        self.events_ops.add_notification(notification)
                        self.events_ops.update_report_nbr(sub["subscription_id"])

                    if sub["event_req"]["notif_method"] == "ONE_TIME":
                        asyncio.run(self.send(url, serialize_clean_camel_case(data)))
                        asyncio.run(self.send(url, serialize_clean_camel_case(data)))
                        self.events_ops.delete_subscription(sub["subscription_id"])
                    
                    if sub["event_req"].get("max_report_nbr", None) and sub["report_nbr"] + 1 == sub["event_req"].get("max_report_nbr", None):
                        current_app.logger.debug(f"Limit reached, deleting subscription {sub['subscription_id']}")
                        self.events_ops.delete_subscription(sub["subscription_id"])

                else:
                    asyncio.run(self.send(url, serialize_clean_camel_case(data)))
                    self.events_ops.update_report_nbr(sub["subscription_id"])

                


        except Exception as e:
        except Exception as e:
            current_app.logger.error("An exception occurred ::" + str(e))
            current_app.logger.error("An exception occurred ::" + str(e))
+3 −1
Original line number Original line Diff line number Diff line
@@ -21,7 +21,9 @@ class MongoDatabse():
        self.invoker_collection = self.config['mongo']['capif_invokers_col']
        self.invoker_collection = self.config['mongo']['capif_invokers_col']
        self.provider_collection = self.config['mongo']['capif_providers_col']
        self.provider_collection = self.config['mongo']['capif_providers_col']
        self.certs_col = self.config['mongo']['certs_col']
        self.certs_col = self.config['mongo']['certs_col']
        # self.acls_col = self.config['mongo']['capif_acls_col']
        self.notifications_col = self.config['mongo']['notifications_col']

        self.get_col_by_name(self.event_collection).create_index([("expire_at", 1)],expireAfterSeconds=0)


    def get_col_by_name(self, name):
    def get_col_by_name(self, name):
        return self.db[name].with_options(codec_options=CodecOptions(tz_aware=True))
        return self.db[name].with_options(codec_options=CodecOptions(tz_aware=True))
+1 −0
Original line number Original line Diff line number Diff line
@@ -7,6 +7,7 @@ mongo: {
  'capif_invokers_col': 'invokerdetails',
  'capif_invokers_col': 'invokerdetails',
  'capif_providers_col': 'providerenrolmentdetails',
  'capif_providers_col': 'providerenrolmentdetails',
  'capif_acls_col': 'acls',
  'capif_acls_col': 'acls',
  'notifications_col': 'notifications',
  'host': 'mongo',
  'host': 'mongo',
  'port': "27017"
  'port': "27017"
}
}
Loading