From 3c42a0e0ea3b046e9a75e343a1b47fc2dfd1a49a Mon Sep 17 00:00:00 2001 From: Pelayo Torres Date: Thu, 20 Mar 2025 13:23:08 +0100 Subject: [PATCH 1/2] Celery and evtReq --- .../capif_events/app.py | 9 +- .../capif_events/core/events_apis.py | 44 +++- .../capif_events/core/internal_event_ops.py | 77 ++++++ .../capif_events/core/notifications.py | 223 +++++++++--------- .../capif_events/db/db.py | 1 + services/TS29222_CAPIF_Events_API/config.yaml | 1 + services/celery/Dockerfile | 21 ++ services/celery/config.py | 20 ++ services/celery/config.yaml | 8 + services/celery/requirements.txt | 7 + services/celery/tasks.py | 146 ++++++++++++ services/docker-compose-capif.yml | 38 ++- 12 files changed, 483 insertions(+), 112 deletions(-) create mode 100644 services/celery/Dockerfile create mode 100644 services/celery/config.py create mode 100644 services/celery/config.yaml create mode 100644 services/celery/requirements.txt create mode 100644 services/celery/tasks.py diff --git a/services/TS29222_CAPIF_Events_API/capif_events/app.py b/services/TS29222_CAPIF_Events_API/capif_events/app.py index ef1d8c3a..abe68507 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/app.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/app.py @@ -7,6 +7,7 @@ from logging.handlers import RotatingFileHandler import connexion from flask_apscheduler import APScheduler +from apscheduler.jobstores.mongodb import MongoDBJobStore from flask_executor import Executor from flask_jwt_extended import JWTManager from fluent import sender @@ -123,7 +124,7 @@ app.add_api('openapi.yaml', -config = Config() +config = Config().get_config() notifications = Notifications() jwt = JWTManager(app.app) @@ -131,14 +132,14 @@ configure_logging(app.app) monitoring_value = os.environ.get("MONITORING", "").lower() if monitoring_value == "true": - configure_monitoring(app.app, config.get_config()) + configure_monitoring(app.app, config) -config = Config() executor = Executor(app.app) subscriber = Subscriber() + scheduler = APScheduler() scheduler.init_app(app.app) -scheduler.start() +scheduler.start() @scheduler.task('date', id='listener', next_run_time=datetime.now()) def up_listener(): diff --git a/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py b/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py index d6b0bdf2..55fdd131 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py @@ -9,6 +9,7 @@ from .auth_manager import AuthManager from .resources import Resource from .responses import internal_server_error, not_found_error, make_response, bad_request_error from ..util import serialize_clean_camel_case, clean_empty, dict_to_camel_case +from datetime import datetime class EventSubscriptionsOperations(Resource): @@ -87,14 +88,39 @@ class EventSubscriptionsOperations(Resource): return result + subscription_id = secrets.token_hex(15) + # Check if EnhancedEventReport is enabled and validate event filters if EventSubscription.return_supp_feat_dict(event_subscription.supported_features)["EnhancedEventReport"]: if event_subscription.event_filters: - current_app.logger.debug(event_subscription.event_filters) + current_app.logger.debug(f"Event Filters: {event_subscription.event_filters}") result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"])) if isinstance(result, Response): return result + + if event_subscription.event_req: + current_app.logger.debug(f"Event Requirement: {event_subscription.event_req}") + + if event_subscription.event_req.notif_method is None: + event_subscription.event_req["notif_method"] = "ON_EVENT_DETECTION" + + elif event_subscription.event_req.notif_method == "PERIODIC" and event_subscription.event_req.rep_period is None: + current_app.logger.error("Notif method provided is PERIODIC but rep_period is None") + return bad_request_error( + detail="Bad Param", + cause="Notif method provided is PERIODIC but rep_period is None", + invalid_params=[{"param": "repPeriod", "reason": "repPeriod is necessary if notifMethod is PERIODIC"}] + ) + + if event_subscription.event_req.mon_dur is None and event_subscription.event_req.mon_dur > datetime.now(): + current_app.logger.error("monDur provided is invalid") + return bad_request_error( + detail="Bad Param", + cause="monDur provided is ", + invalid_params=[{"param": "repPeriod", "reason": "repPeriod is necessary if notifMethod is PERIODIC"}] + ) + else: if event_subscription.event_filters: current_app.logger.error("Event filters provided but EnhancedEventReport is not enabled") @@ -103,12 +129,19 @@ class EventSubscriptionsOperations(Resource): cause="Event filters provided but EnhancedEventReport is not enabled", invalid_params=[{"param": "eventFilters", "reason": "EnhancedEventReport is not enabled"}] ) + if event_subscription.event_req: + current_app.logger.error("Event requirements provided but EnhancedEventReport is not enabled") + return bad_request_error( + detail="Bad Param", + cause="Event requirements provided but EnhancedEventReport is not enabled", + invalid_params=[{"param": "eventReq", "reason": "EnhancedEventReport is not enabled"}] + ) # Generate subscriptionID - subscription_id = secrets.token_hex(15) evnt = dict() evnt["subscriber_id"] = subscriber_id evnt["subscription_id"] = subscription_id + evnt["report_nbr"] = 0 evnt.update(event_subscription.to_dict()) mycol.insert_one(evnt) @@ -148,6 +181,13 @@ class EventSubscriptionsOperations(Resource): return not_found_error(detail="Event subscription not exist", cause="Event API subscription id not found") mycol.delete_one(my_query) + + event_req = eventdescription.get("event_req", None) + if event_req: + if event_req.get["notif_method"] == "PERIODIC": + mycol = self.db.get_col_by_name(self.db.celery_col) + mycol.delete_one(my_query) + current_app.logger.debug("Event subscription removed from database") self.auth_manager.remove_auth_event(subscription_id, subscriber_id) diff --git a/services/TS29222_CAPIF_Events_API/capif_events/core/internal_event_ops.py b/services/TS29222_CAPIF_Events_API/capif_events/core/internal_event_ops.py index f9daf008..d67adab5 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/core/internal_event_ops.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/core/internal_event_ops.py @@ -2,6 +2,7 @@ from flask import current_app from .auth_manager import AuthManager from .resources import Resource +from datetime import datetime class InternalEventOperations(Resource): @@ -21,6 +22,20 @@ class InternalEventOperations(Resource): #We dont need remove all auth events, becase when invoker is removed, remove auth entry #self.auth_manager.remove_auth_all_event(subscriber_id) + + def delete_event(self, subscription): + + mycol = self.db.get_col_by_name(self.db.event_collection) + my_query = {'subscription_id': subscription["subscription_id"]} + mycol.delete_one(my_query) + + event_req = subscription.get("event_req", None) + if event_req: + if event_req.get["notif_method"] == "PERIODIC": + mycol = self.db.get_col_by_name(self.db.celery) + mycol.delete_one(my_query) + + current_app.logger.info(f"Removed event subscription: {subscription["subscription_id"]}") def get_event_subscriptions(self, event): current_app.logger.info("get subscription from db") @@ -42,3 +57,65 @@ class InternalEventOperations(Resource): except Exception as e: current_app.logger.error("An exception occurred ::" + str(e)) return False + + def get_event_subscription(self, subscription_id): + current_app.logger.info("get subscription from db") + try: + mycol = self.db.get_col_by_name(self.db.event_collection) + query={'subscription_id':{'$in':[subscription_id]}} + subscriptions = mycol.find(query) + + if subscriptions is None: + current_app.logger.error("Not found event subscriptions") + + else: + json_docs=[] + for subscription in subscriptions: + json_docs.append(subscription) + + return json_docs + + except Exception as e: + current_app.logger.error("An exception occurred ::" + str(e)) + return False + + + def add_notification(self, subscription_id, notification): + current_app.logger.info("add notification to db") + try: + mycol = self.db.get_col_by_name(self.db.celery_col) + query={'subscription_id':{'$in':[subscription_id]}} + subscriptions = mycol.find_one(query) + current_app.logger.debug(subscriptions) + + if subscriptions is None: + current_app.logger.debug(f"Creating a new accumulated notifications for subscription: {subscription_id}") + + sub_notifications = { + "subscription_id": subscription_id, + "next_run_time": datetime.now(), + "report_nbr": 0, + "notifications": [notification] + } + mycol.insert_one(sub_notifications) + else: + mycol.update_one({"subscription_id": subscription_id}, {"$push": {"notifications": notification}}) + current_app.logger.info("Notification added to db") + except Exception as e: + current_app.logger.error("An exception occurred ::" + str(e)) + return False + + def update_report_number(self, subscription_id, report_nbr): + current_app.logger.info(f"Updating report number for subscription {subscription_id}") + try: + mycol = self.db.get_col_by_name(self.db.event_collection) + query={'subscription_id':{'$in':[subscription_id]}} + subscription = mycol.find_one(query) + current_app.logger.debug(subscription) + + if subscription: + mycol.update_one({"subscription_id": subscription_id}, {"$set": {"report_nbr": report_nbr}}) + current_app.logger.info("Report number updated") + except Exception as e: + current_app.logger.error("An exception occurred ::" + str(e)) + return False diff --git a/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py b/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py index ab769e51..f0bccc13 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py @@ -10,6 +10,7 @@ from flask import current_app from models.event_notification import EventNotification from models.event_subscription import EventSubscription from util import serialize_clean_camel_case +from datetime import datetime from .internal_event_ops import InternalEventOperations @@ -29,117 +30,129 @@ class Notifications(): current_app.logger.info("Received event " + event + ", sending notifications") subscriptions = self.events_ops.get_event_subscriptions(event) current_app.logger.info(subscriptions) + event_detail_redis=redis_event.get('event_detail', None) for sub in subscriptions: - url = sub["notification_destination"] - current_app.logger.debug(url) - data = EventNotification(sub["subscription_id"], events=event) - event_detail_redis=redis_event.get('event_detail', None) - if event_detail_redis is not None: - if EventSubscription.return_supp_feat_dict(sub["supported_features"])["EnhancedEventReport"]: - event_detail={} - current_app.logger.debug(f"event: {event_detail_redis}") - - event_filters = sub.get("event_filters", None) - event_filter=None - if event_filters: - try: - event_filter = None if all(value is None for value in event_filters[sub.get("events", []).index(event)].values()) else event_filters[sub.get("events", []).index(event)] - current_app.logger.debug(f"Event filters: {event_filter}") - except IndexError: - event_filter=None - - if event in ["SERVICE_API_AVAILABLE", "SERVICE_API_UNAVAILABLE"]: - if event_filter: - api_ids_list = event_filter.get("api_ids", None) - if api_ids_list and event_detail_redis.get('apiIds', None)[0] in api_ids_list: - event_detail["apiIds"]=event_detail_redis.get('apiIds', None) - if EventSubscription.return_supp_feat_dict(sub["supported_features"])["ApiStatusMonitoring"]: - event_detail["serviceAPIDescriptions"]=event_detail_redis.get('serviceAPIDescriptions', None) - else: - continue - else: - event_detail["apiIds"]=event_detail_redis.get('apiIds', None) - if EventSubscription.return_supp_feat_dict(sub["supported_features"])["ApiStatusMonitoring"]: - event_detail["serviceAPIDescriptions"]=event_detail_redis.get('serviceAPIDescriptions', None) - elif event in ["SERVICE_API_UPDATE"]: - if event_filter: - api_ids_list = event_filter.get("api_ids", None) - if api_ids_list and event_detail_redis.get('serviceAPIDescriptions', {})[0].get('apiId') in api_ids_list: - event_detail["serviceAPIDescriptions"]=event_detail_redis.get('serviceAPIDescriptions', None) - else: - continue - else: - event_detail["serviceAPIDescriptions"]=event_detail_redis.get('serviceAPIDescriptions', None) - elif event in ["API_INVOKER_ONBOARDED", "API_INVOKER_OFFBOARDED", "API_INVOKER_UPDATED"]: - if event_filter: - invoker_ids_list = event_filter.get("api_invoker_ids", None) - if invoker_ids_list and event_detail_redis.get('apiInvokerIds', None)[0] in invoker_ids_list: - event_detail["apiInvokerIds"]=event_detail_redis.get('apiInvokerIds', None) - else: - continue - else: - event_detail["apiInvokerIds"]=event_detail_redis.get('apiInvokerIds', None) - elif event in ["ACCESS_CONTROL_POLICY_UPDATE"]: - if event_filter: - filter_invoker_ids = event_filter.get("api_invoker_ids", []) - filter_api_ids = event_filter.get("api_ids", []) - - invoker_ids_list = [invoker.get("apiInvokerId") for invoker in event_detail_redis.get("accCtrlPolList", None).get("apiInvokerPolicies")] - api_id = event_detail_redis.get("accCtrlPolList").get("apiId", None) - - if (filter_api_ids and not filter_invoker_ids) and (api_id in filter_api_ids): - event_detail["accCtrlPolList"]=event_detail_redis.get('accCtrlPolList', None) - elif (not filter_api_ids and filter_invoker_ids) and bool(set(filter_invoker_ids) & set(invoker_ids_list)): - event_detail["accCtrlPolList"]=event_detail_redis.get('accCtrlPolList', None) - elif (filter_api_ids and filter_invoker_ids) and bool(set(filter_invoker_ids) & set(invoker_ids_list)) and api_id in filter_api_ids: - event_detail["accCtrlPolList"]=event_detail_redis.get('accCtrlPolList', None) - else: - continue - else: - event_detail["accCtrlPolList"]=event_detail_redis.get('accCtrlPolList', None) - elif event in ["SERVICE_API_INVOCATION_SUCCESS", "SERVICE_API_INVOCATION_FAILURE"]: - if event_filter: - filter_invoker_ids = event_filter.get("api_invoker_ids", None) - filter_api_ids = event_filter.get("api_ids", None) - filter_aef_ids = event_filter.get("aef_ids", None) - - invoker_id = event_detail_redis.get("invocationLogs", None)[0].get("api_invoker_id", None) - aef_id = event_detail_redis.get("invocationLogs", None)[0].get("aef_id", None) - api_id = event_detail_redis.get("invocationLogs", None)[0].get("logs", None)[0].get("api_id", None) - - if (filter_api_ids and not filter_invoker_ids and not filter_aef_ids) and (api_id in filter_api_ids): - event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None) - elif (not filter_api_ids and filter_invoker_ids and not filter_aef_ids) and invoker_id in filter_invoker_ids: - event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None) - elif (not filter_api_ids and not filter_invoker_ids and filter_aef_ids) and aef_id in filter_aef_ids: - event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None) - elif (filter_api_ids and filter_invoker_ids and not filter_aef_ids) and (api_id in filter_api_ids) and invoker_id in filter_invoker_ids: - event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None) - elif (filter_api_ids and not filter_invoker_ids and filter_aef_ids) and (api_id in filter_api_ids) and aef_id in filter_aef_ids: - event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None) - elif (not filter_api_ids and filter_invoker_ids and filter_aef_ids) and invoker_id in filter_invoker_ids and aef_id in filter_aef_ids: - event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None) - elif (filter_api_ids and filter_invoker_ids and filter_aef_ids) and (api_id in filter_api_ids) and invoker_id in filter_invoker_ids and aef_id in filter_aef_ids: - event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None) - else: - continue - - else: - event_detail["invocationLogs"]=event_detail_redis.get('invocationLogs', None) - elif event in ["API_TOPOLOGY_HIDING_CREATED", "API_TOPOLOGY_HIDING_REVOKED"]: - event_detail["apiTopoHide"]=event_detail_redis.get('apiTopoHide', None) - - current_app.logger.debug(event_detail) - data.event_detail=event_detail - - current_app.logger.debug(json.dumps(data.to_dict(),cls=CustomJSONEncoder)) - - asyncio.run(self.send(url, serialize_clean_camel_case(data))) + + data = self.make_notification(event, sub, event_detail_redis) + + if EventSubscription.return_supp_feat_dict(sub.get("supported_features", None))["EnhancedEventReport"]: + event_req = sub.get("event_req", None) + if event_req: + if event_req.get("notif_method", None) == "PERIODIC": + self.events_ops.add_notification(sub["subscription_id"], serialize_clean_camel_case(data)) + elif (event_req.get("max_report_nbr", None) != None and sub["report_nbr"] == event_req.get("max_report_nbr", None)) or (event_req.get("mon_dur", None) != None and datetime.now() > event_req.get("mon_dur", None)): + url = sub["notification_destination"] + current_app.logger.debug(f"Subscription URL: {url}") + asyncio.run(self.send(url, serialize_clean_camel_case(data))) + new_report_nbr = sub["report_nbr"]+1 + self.events_ops.update_report_number(sub, new_report_nbr) + + if (event_req.get("max_report_nbr", None) != None and new_report_nbr == event_req.get("max_report_nbr", None)): + self.events_ops.delete_event(sub) + + else: + url = sub["notification_destination"] + current_app.logger.debug(f"Subscription URL: {url}") + asyncio.run(self.send(url, serialize_clean_camel_case(data))) except Exception as e: current_app.logger.error("An exception occurred ::" + str(e)) return False + + def make_notification(self, event, sub, event_details): + data = EventNotification(sub["subscription_id"], events=event) + if event_details is not None: + if EventSubscription.return_supp_feat_dict(sub["supported_features"])["EnhancedEventReport"]: + event_detail={} + current_app.logger.debug(f"event: {event_details}") + + event_filters = sub.get("event_filters", None) + event_filter=None + if event_filters: + try: + event_filter = None if all(value is None for value in event_filters[sub.get("events", []).index(event)].values()) else event_filters[sub.get("events", []).index(event)] + current_app.logger.debug(f"Event filters: {event_filter}") + except IndexError: + event_filter=None + + if event in ["SERVICE_API_AVAILABLE", "SERVICE_API_UNAVAILABLE"]: + if event_filter: + api_ids_list = event_filter.get("api_ids", None) + if api_ids_list and event_details.get('apiIds', None)[0] in api_ids_list: + event_detail["apiIds"]=event_details.get('apiIds', None) + if EventSubscription.return_supp_feat_dict(sub["supported_features"])["ApiStatusMonitoring"]: + event_detail["serviceAPIDescriptions"]=event_details.get('serviceAPIDescriptions', None) + else: + event_detail["apiIds"]=event_details.get('apiIds', None) + if EventSubscription.return_supp_feat_dict(sub["supported_features"])["ApiStatusMonitoring"]: + event_detail["serviceAPIDescriptions"]=event_details.get('serviceAPIDescriptions', None) + elif event in ["SERVICE_API_UPDATE"]: + if event_filter: + api_ids_list = event_filter.get("api_ids", None) + if api_ids_list and event_details.get('serviceAPIDescriptions', {})[0].get('apiId') in api_ids_list: + event_detail["serviceAPIDescriptions"]=event_details.get('serviceAPIDescriptions', None) + else: + event_detail["serviceAPIDescriptions"]=event_details.get('serviceAPIDescriptions', None) + elif event in ["API_INVOKER_ONBOARDED", "API_INVOKER_OFFBOARDED", "API_INVOKER_UPDATED"]: + if event_filter: + invoker_ids_list = event_filter.get("api_invoker_ids", None) + if invoker_ids_list and event_details.get('apiInvokerIds', None)[0] in invoker_ids_list: + event_detail["apiInvokerIds"]=event_details.get('apiInvokerIds', None) + else: + event_detail["apiInvokerIds"]=event_details.get('apiInvokerIds', None) + elif event in ["ACCESS_CONTROL_POLICY_UPDATE"]: + if event_filter: + filter_invoker_ids = event_filter.get("api_invoker_ids", []) + filter_api_ids = event_filter.get("api_ids", []) + + invoker_ids_list = [invoker.get("apiInvokerId") for invoker in event_details.get("accCtrlPolList", None).get("apiInvokerPolicies")] + api_id = event_details.get("accCtrlPolList").get("apiId", None) + + if (filter_api_ids and not filter_invoker_ids) and (api_id in filter_api_ids): + event_detail["accCtrlPolList"]=event_details.get('accCtrlPolList', None) + elif (not filter_api_ids and filter_invoker_ids) and bool(set(filter_invoker_ids) & set(invoker_ids_list)): + event_detail["accCtrlPolList"]=event_details.get('accCtrlPolList', None) + elif (filter_api_ids and filter_invoker_ids) and bool(set(filter_invoker_ids) & set(invoker_ids_list)) and api_id in filter_api_ids: + event_detail["accCtrlPolList"]=event_details.get('accCtrlPolList', None) + else: + event_detail["accCtrlPolList"]=event_details.get('accCtrlPolList', None) + elif event in ["SERVICE_API_INVOCATION_SUCCESS", "SERVICE_API_INVOCATION_FAILURE"]: + if event_filter: + filter_invoker_ids = event_filter.get("api_invoker_ids", None) + filter_api_ids = event_filter.get("api_ids", None) + filter_aef_ids = event_filter.get("aef_ids", None) + + invoker_id = event_details.get("invocationLogs", None)[0].get("api_invoker_id", None) + aef_id = event_details.get("invocationLogs", None)[0].get("aef_id", None) + api_id = event_details.get("invocationLogs", None)[0].get("logs", None)[0].get("api_id", None) + + if (filter_api_ids and not filter_invoker_ids and not filter_aef_ids) and (api_id in filter_api_ids): + event_detail["invocationLogs"]=event_details.get('invocationLogs', None) + elif (not filter_api_ids and filter_invoker_ids and not filter_aef_ids) and invoker_id in filter_invoker_ids: + event_detail["invocationLogs"]=event_details.get('invocationLogs', None) + elif (not filter_api_ids and not filter_invoker_ids and filter_aef_ids) and aef_id in filter_aef_ids: + event_detail["invocationLogs"]=event_details.get('invocationLogs', None) + elif (filter_api_ids and filter_invoker_ids and not filter_aef_ids) and (api_id in filter_api_ids) and invoker_id in filter_invoker_ids: + event_detail["invocationLogs"]=event_details.get('invocationLogs', None) + elif (filter_api_ids and not filter_invoker_ids and filter_aef_ids) and (api_id in filter_api_ids) and aef_id in filter_aef_ids: + event_detail["invocationLogs"]=event_details.get('invocationLogs', None) + elif (not filter_api_ids and filter_invoker_ids and filter_aef_ids) and invoker_id in filter_invoker_ids and aef_id in filter_aef_ids: + event_detail["invocationLogs"]=event_details.get('invocationLogs', None) + elif (filter_api_ids and filter_invoker_ids and filter_aef_ids) and (api_id in filter_api_ids) and invoker_id in filter_invoker_ids and aef_id in filter_aef_ids: + event_detail["invocationLogs"]=event_details.get('invocationLogs', None) + + else: + event_detail["invocationLogs"]=event_details.get('invocationLogs', None) + elif event in ["API_TOPOLOGY_HIDING_CREATED", "API_TOPOLOGY_HIDING_REVOKED"]: + event_detail["apiTopoHide"]=event_details.get('apiTopoHide', None) + + current_app.logger.debug(event_detail) + data.event_detail=event_detail + + current_app.logger.debug(json.dumps(data.to_dict(),cls=CustomJSONEncoder)) + return data + def request_post(self, url, data): headers = {'content-type': 'application/json'} diff --git a/services/TS29222_CAPIF_Events_API/capif_events/db/db.py b/services/TS29222_CAPIF_Events_API/capif_events/db/db.py index adfc472a..d6ca59b6 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/db/db.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/db/db.py @@ -21,6 +21,7 @@ class MongoDatabse(): self.invoker_collection = self.config['mongo']['capif_invokers_col'] self.provider_collection = self.config['mongo']['capif_providers_col'] self.certs_col = self.config['mongo']['certs_col'] + self.celery_col = self.config['mongo']['celery_col'] # self.acls_col = self.config['mongo']['capif_acls_col'] def get_col_by_name(self, name): diff --git a/services/TS29222_CAPIF_Events_API/config.yaml b/services/TS29222_CAPIF_Events_API/config.yaml index 101d300e..f5cea138 100644 --- a/services/TS29222_CAPIF_Events_API/config.yaml +++ b/services/TS29222_CAPIF_Events_API/config.yaml @@ -2,6 +2,7 @@ mongo: { 'user': 'root', 'password': 'example', 'db': 'capif', + 'celery_col': 'celery', 'col': 'eventsdetails', 'certs_col': "certs", 'capif_invokers_col': 'invokerdetails', diff --git a/services/celery/Dockerfile b/services/celery/Dockerfile new file mode 100644 index 00000000..40be47b7 --- /dev/null +++ b/services/celery/Dockerfile @@ -0,0 +1,21 @@ +# celery/Dockerfile +FROM python:3.9-slim + +WORKDIR /app + +# Copia e instala las dependencias +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copia el código de Celery (tasks.py, etc.) +COPY . . + +# Comando que depende de la variable ROLE +CMD [ "sh", "-c", "\ + if [ \"$ROLE\" = 'worker' ]; then \ + celery -A tasks worker --loglevel=info; \ + elif [ \"$ROLE\" = 'beat' ]; then \ + celery -A tasks beat --loglevel=info; \ + else \ + echo 'No ROLE specified, aborting'; \ + fi" ] diff --git a/services/celery/config.py b/services/celery/config.py new file mode 100644 index 00000000..60d542a2 --- /dev/null +++ b/services/celery/config.py @@ -0,0 +1,20 @@ +import os + +import yaml + + +#Config class to get config +class Config: + def __init__(self): + self.cached = 0 + self.file="./config.yaml" + self.my_config = {} + stamp = os.stat(self.file).st_mtime + if stamp != self.cached: + self.cached = stamp + f = open(self.file) + self.my_config = yaml.safe_load(f) + f.close() + + def get_config(self): + return self.my_config diff --git a/services/celery/config.yaml b/services/celery/config.yaml new file mode 100644 index 00000000..1e69a651 --- /dev/null +++ b/services/celery/config.yaml @@ -0,0 +1,8 @@ +mongo: { + 'user': 'root', + 'password': 'example', + 'db': 'capif', + 'celery': 'celery', + 'host': 'mongo', + 'port': "27017" +} diff --git a/services/celery/requirements.txt b/services/celery/requirements.txt new file mode 100644 index 00000000..8212e485 --- /dev/null +++ b/services/celery/requirements.txt @@ -0,0 +1,7 @@ +celery==5.2.7 +pymongo==4.2.0 +redis==4.5.4 +aiohttp == 3.10.5 +async-timeout == 4.0.3 +pyyaml == 6.0.2 +python_dateutil >= 2.6.0 diff --git a/services/celery/tasks.py b/services/celery/tasks.py new file mode 100644 index 00000000..f1137a9a --- /dev/null +++ b/services/celery/tasks.py @@ -0,0 +1,146 @@ +# celery/tasks.py +from celery import Celery +from datetime import datetime, timedelta +import pymongo +import os +from bson.codec_options import CodecOptions +from config import Config +import aiohttp +import asyncio +from dateutil import parser + +# Celery Configuration +celery = Celery( + "notifications", + broker=os.environ.get("CELERY_BROKER_URL", "redis://redis:6379/0"), + backend=os.environ.get("CELERY_RESULT_BACKEND", "redis://redis:6379/0") +) + +# MongoDB Connection +config = Config().get_config() + +def get_mongo_client(): + mongo_uri = f"mongodb://{config['mongo']['user']}:{config['mongo']['password']}@" \ + f"{config['mongo']['host']}:{config['mongo']['port']}" + client = pymongo.MongoClient(mongo_uri) + celery_col = client[config['mongo']['db']][config['mongo']['celery']].with_options(codec_options=CodecOptions(tz_aware=True)) + events_col = client[config['mongo']['db']]["eventsdetails"].with_options(codec_options=CodecOptions(tz_aware=True)) + + return celery_col, events_col + +def serialize_clean_camel_case(obj): + res = obj.to_dict() + res = clean_empty(res) + res = dict_to_camel_case(res) + + return res + + +def clean_empty(d): + if isinstance(d, dict): + return { + k: v + for k, v in ((k, clean_empty(v)) for k, v in d.items()) + if v is not None or (isinstance(v, list) and len(v) == 0) + } + if isinstance(d, list): + return [v for v in map(clean_empty, d) if v is not None] + return d + + +def dict_to_camel_case(my_dict): + + + result = {} + + for attr, value in my_dict.items(): + + if len(attr.split('_')) != 1: + my_key = ''.join(word.title() for word in attr.split('_')) + my_key = ''.join([my_key[0].lower(), my_key[1:]]) + else: + my_key = attr + + if my_key == "serviceApiCategory": + my_key = "serviceAPICategory" + elif my_key == "serviceApiDescriptions": + my_key = "serviceAPIDescriptions" + + if isinstance(value, list): + result[my_key] = list(map( + lambda x: dict_to_camel_case(x) if isinstance(x, dict) else x, value )) + + elif hasattr(value, "to_dict"): + result[my_key] = dict_to_camel_case(value) + + elif isinstance(value, dict): + value = dict_to_camel_case(value) + result[my_key] = value + else: + result[my_key] = value + + return result + +async def send_request(url, data): + async with aiohttp.ClientSession() as session: + timeout = aiohttp.ClientTimeout(total=10) + headers = {'content-type': 'application/json'} + async with session.post(url, json=data, timeout=timeout, headers=headers) as response: + return await response.text() + +async def send(url, data): + try: + response = await send_request(url, data) + print(response) + except asyncio.TimeoutError: + print("Timeout: Request timeout") + except Exception as e: + print("An exception occurred sending notification::" + str(e)) + return False + +@celery.task +def send_notification(sub_notifications): + + notifications = sub_notifications.get("notifications", None) + celery_col, events_col = get_mongo_client() + subscription = events_col.find_one({"subscription_id": sub_notifications["subscription_id"]}) + event_req = subscription.get("event_req", None) + if notifications != []: + if event_req: + if not ((event_req.get("max_report_nbr", None) != None and sub_notifications["report_nbr"] == event_req.get("max_report_nbr", None)) or (event_req.get("mon_dur", None) != None and sub_notifications["next_run_time"] > event_req.get("mon_dur", None))): + url = subscription["notification_destination"] + print(f"Subscription URL: {url}") + for notification in notifications: + asyncio.run(send(url, notification)) + sub_notifications["report_nbr"] += 1 + sub_notifications["notifications"] = [] + + sub_notifications["next_run_time"] = sub_notifications["next_run_time"] + timedelta(seconds=event_req.get("rep_period", None)) + + if (event_req.get("max_report_nbr", None) != None and sub_notifications["report_nbr"] == event_req.get("max_report_nbr", None)) or (event_req.get("mon_dur", None) != None and sub_notifications["next_run_time"] > event_req.get("mon_dur", None)): + print(f"Limit reached, deleting subscription {sub_notifications['subscription_id']}") + celery_col.delete_one({"subscription_id": sub_notifications["subscription_id"]}) + events_col.delete_one({"subscription_id": sub_notifications["subscription_id"]}) + return + + celery_col.update_one({"subscription_id": sub_notifications["subscription_id"]}, {"$set": sub_notifications}) + return + +@celery.task +def process_pending_subscriptions(): + celery_col, events_col= get_mongo_client() + pending_subs = celery_col.find({"next_run_time": {"$lte": datetime.utcnow()}}, {"_id":0}) + for sub in pending_subs: + print(sub["subscription_id"]) + send_notification.delay(sub) + +celery.conf.task_routes = { + "tasks.process_pending_subscriptions": {"queue": "scheduler_queue"}, +} + +celery.conf.beat_schedule = { + "process-subscriptions-every-second": { + "task": "tasks.process_pending_subscriptions", + "schedule": 1.0, # every 1 second + }, +} diff --git a/services/docker-compose-capif.yml b/services/docker-compose-capif.yml index 6b5a504c..0372ae1f 100644 --- a/services/docker-compose-capif.yml +++ b/services/docker-compose-capif.yml @@ -9,7 +9,43 @@ services: - ${SERVICES_DIR}/redis.conf:/usr/local/etc/redis/redis.conf environment: - REDIS_REPLICATION_MODE=master - + + celery_worker: + build: + context: ./celery + dockerfile: Dockerfile + environment: + - ROLE=worker + - MONGO_URI=mongodb://root:example@mongo:27017/capif + - CELERY_BROKER_URL=redis://redis:6379/0 + - CELERY_RESULT_BACKEND=redis://redis:6379/0 + depends_on: + - redis + - mongo + + celery_scheduler: + build: + context: ./celery + dockerfile: Dockerfile + environment: + - ROLE=worker + command: celery -A tasks worker --queues scheduler_queue --loglevel=info + depends_on: + - mongo + - redis + + celery_beat: + build: + context: ./celery + dockerfile: Dockerfile + environment: + - ROLE=beat + - MONGO_URI=mongodb://mongo:27017/events_db + - CELERY_BROKER_URL=redis://redis:6379/0 + - CELERY_RESULT_BACKEND=redis://redis:6379/0 + depends_on: + - mongo + - redis helper: build: -- GitLab From ae104072ae2c93720b056dbe72616958532e2645 Mon Sep 17 00:00:00 2001 From: Jorge Moratinos Salcines Date: Mon, 24 Mar 2025 16:02:52 +0100 Subject: [PATCH 2/2] Event Req Example for testing --- tests/features/Event Filter/event_req.robot | 460 ++++++++++++++++++++ tests/libraries/api_events/bodyRequests.py | 15 + 2 files changed, 475 insertions(+) create mode 100644 tests/features/Event Filter/event_req.robot diff --git a/tests/features/Event Filter/event_req.robot b/tests/features/Event Filter/event_req.robot new file mode 100644 index 00000000..ee6ca900 --- /dev/null +++ b/tests/features/Event Filter/event_req.robot @@ -0,0 +1,460 @@ +*** Settings *** +Resource /opt/robot-tests/tests/resources/common.resource +Library /opt/robot-tests/tests/libraries/bodyRequests.py +Library XML +Library String +Resource /opt/robot-tests/tests/resources/common/basicRequests.robot +Resource ../../resources/common.resource +Resource ../../resources/common/basicRequests.robot + +Suite Teardown Reset Testing Environment +Test Setup Reset Testing Environment +Test Teardown Reset Testing Environment + + +*** Variables *** +${API_INVOKER_NOT_REGISTERED} not-valid +${SUBSCRIBER_ID_NOT_VALID} not-valid +${SUBSCRIPTION_ID_NOT_VALID} not-valid + + +*** Test Cases *** +Invoker subscribe to Service API Available + [Tags] pelayo-1 mockserver + + # Initialize Mock server + Init Mock Server + + # Default Invoker Registration and Onboarding + ${register_user_info_invoker} ${url} ${request_body}= Invoker Default Onboarding + + # Create Provider1 with 2 AEF roles and publish API + ${register_user_info_provider_1}= Provider Default Registration + ${aef_id_1}= Set Variable ${register_user_info_provider_1['aef_roles']['${AEF_PROVIDER_USERNAME}']['aef_id']} + ${aef_empty_list}= Create List + + # Subscribe to events and setup event filter with api_id + ${events_list}= Create List SERVICE_API_AVAILABLE + ${event_req}= Create Event Req notif_method=PERIODIC max_report_nbr=${5} rep_period=${300} + + ${subscription_ids}= Create List + + FOR ${counter} IN RANGE 1 100 1 + Log ${counter} + ${subscription_id}= + ... Subscribe invoker ${register_user_info_invoker} to events ${events_list} with event req ${event_req} + Append To List ${subscription_ids} ${subscription_id} + END + + ## Publish API service_1 with 2 aefIds + ${service_api_description_published_1} ${resource_url} ${request_body}= Publish Service Api + ... ${register_user_info_provider_1} + ... service_1 + ... aef_id=${aef_id_1} + ... supported_features=020 + + + # Remove Providers + ## Remove Provider1 + ${resp}= Delete Request Capif + ... ${resource_url.path} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${APF_PROVIDER_USERNAME} + Status Should Be 204 ${resp} + + # Sleep 320s + + ${resp}= Get Mock Server Messages + + ${notification_events_on_mock_server}= Set Variable ${resp.json()} + + + [Tags] event_filter-7 mockserver smoke + + # Initialize Mock server + Init Mock Server + + # Register Providers + ## Default Provider 1 Registration + ${register_user_info_provider_1}= Provider Default Registration provider_username=${PROVIDER_USERNAME}_1 + ${aef_id_1}= Set Variable + ... ${register_user_info_provider_1['aef_roles']['${AEF_PROVIDER_USERNAME}_1']['aef_id']} + + ## Publish service_1 API + ${service_api_description_published_1} + ... ${provider_resource_url_1} + ... ${provider_request_body_1}= + ... Publish Service Api + ... ${register_user_info_provider_1} + ... service_name=service_1 + + ## Default Provider 2 Registration + ${register_user_info_provider_2}= Provider Default Registration provider_username=${PROVIDER_USERNAME}_2 + ${aef_id_2}= Set Variable + ... ${register_user_info_provider_2['aef_roles']['${AEF_PROVIDER_USERNAME}_2']['aef_id']} + + ## Publish service_2 API + ${service_api_description_published_2} + ... ${provider_resource_url_2} + ... ${provider_request_body_2}= + ... Publish Service Api + ... ${register_user_info_provider_2} + ... service_name=service_2 + + ## Store apiId1 and apiId2 for further use + ${service_api_id_1}= Set Variable ${service_api_description_published_1['apiId']} + ${service_api_id_2}= Set Variable ${service_api_description_published_2['apiId']} + + # Register Invokers + ## Default Invoker 1 Registration and Onboarding + ${register_user_info_invoker_1} ${invoker_url_1} ${request_body_1}= Invoker Default Onboarding + ... invoker_username=${INVOKER_USERNAME}_1 + + ## Default Invoker 2 Registration and Onboarding + ${register_user_info_invoker_2} ${invoker_url_2} ${request_body_2}= Invoker Default Onboarding + ... invoke_username=${INVOKER_USERNAME}_2 + + ## Store apiInvokerIds for further use + ${api_invoker_id_1}= Set Variable ${register_user_info_invoker_1['api_invoker_id']} + ${api_invoker_id_2}= Set Variable ${register_user_info_invoker_2['api_invoker_id']} + + # Subscribe to events + ## Event lists + ${events_list}= Create List SERVICE_API_INVOCATION_SUCCESS SERVICE_API_INVOCATION_FAILURE + + ## Event filters + ${event_filter_empty}= Create Capif Event Filter + ${event_filter_api_invoker_ids}= Create Capif Event Filter apiInvokerIds=${api_invoker_id_1} + ${event_filter_api_ids}= Create Capif Event Filter apiIds=${service_api_id_1} + ${event_filter_aef_ids}= Create Capif Event Filter aefIds=${aef_id_2} + ${event_filter_api_ids_and_aef_ids}= Create Capif Event Filter + ... apiIds=${service_api_id_2} + ... aefIds=${aef_id_2} + ${event_filter_api_ids_and_api_invoker_ids}= Create Capif Event Filter + ... apiInvokerIds=${api_invoker_id_2} + ... apiIds=${service_api_id_2} + ${event_filter_aef_ids_and_api_invoker_ids}= Create Capif Event Filter + ... apiInvokerIds=${api_invoker_id_2} + ... aefIds=${aef_id_1} + ${event_filter_api_ids_aef_ids_and_api_invoker_ids}= Create Capif Event Filter + ... apiInvokerIds=${api_invoker_id_2} + ... aefIds=${aef_id_2} + ... apiIds=${service_api_id_2} + + ## Subscription to Events 1 + ${event_filters}= Create List ${event_filter_api_ids} ${event_filter_api_ids} + ${subscription_id_1}= + ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} + + ## Subscription to Events 2 + ${event_filters}= Create List ${event_filter_aef_ids} ${event_filter_aef_ids} + ${subscription_id_2}= + ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} + + ## Subscription to Events 3 + ${event_filters}= Create List ${event_filter_api_invoker_ids} ${event_filter_api_invoker_ids} + ${subscription_id_3}= + ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} + + ## Subscription to Events 4 + ${event_filters}= Create List ${event_filter_api_ids_and_aef_ids} ${event_filter_api_ids_and_aef_ids} + ${subscription_id_4}= + ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} + + ## Subscription to Events 5 + ${event_filters}= Create List + ... ${event_filter_api_ids_and_api_invoker_ids} + ... ${event_filter_api_ids_and_api_invoker_ids} + ${subscription_id_5}= + ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} + + ## Subscription to Events 6 + ${event_filters}= Create List + ... ${event_filter_aef_ids_and_api_invoker_ids} + ... ${event_filter_aef_ids_and_api_invoker_ids} + ${subscription_id_6}= + ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} + + ## Subscription to Events 7 + ${event_filters}= Create List + ... ${event_filter_api_ids_aef_ids_and_api_invoker_ids} + ... ${event_filter_api_ids_aef_ids_and_api_invoker_ids} + ${subscription_id_7}= + ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} + + # 1.Log entry for service_1 and invoker_1 + ${request_body_log_1}= Send Log Message to CAPIF + ... ${service_api_id_1} + ... service_1 + ... ${register_user_info_invoker_1} + ... ${register_user_info_provider_1} + ... 200 + ... 400 + + # 2.Log entry for service_2 and invoker_1 + ${request_body_log_2}= Send Log Message to CAPIF + ... ${service_api_id_2} + ... service_2 + ... ${register_user_info_invoker_1} + ... ${register_user_info_provider_2} + ... 200 + + # 3.Log entry for service_2 and invoker_2 + ${request_body_log_3}= Send Log Message to CAPIF + ... ${service_api_id_2} + ... service_2 + ... ${register_user_info_invoker_2} + ... ${register_user_info_provider_2} + ... 200 + + # 4.Log entry for service_1 and invoker_2 + ${request_body_log_4}= Send Log Message to CAPIF + ... ${service_api_id_1} + ... service_1 + ... ${register_user_info_invoker_2} + ... ${register_user_info_provider_1} + ... 400 + + # Check Event Notifications + ## Create check Events to ensure all notifications were received + ### Subscription 1 Checks + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_1} + ... ${request_body_log_1} + + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_1} + ... ${request_body_log_4} + ... events_expected=${events_expected} + + ### Subcription 2 Checks + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_2} + ... ${request_body_log_2} + ... events_expected=${events_expected} + + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_2} + ... ${request_body_log_3} + ... events_expected=${events_expected} + + # Subscription 3 Checks + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_3} + ... ${request_body_log_1} + ... events_expected=${events_expected} + + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_3} + ... ${request_body_log_2} + ... events_expected=${events_expected} + + # Subscription 4 Checks + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_4} + ... ${request_body_log_2} + ... events_expected=${events_expected} + + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_4} + ... ${request_body_log_3} + ... events_expected=${events_expected} + + # Subscription 5 Checks + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_5} + ... ${request_body_log_3} + ... events_expected=${events_expected} + + # Subscription 6 Checks + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_6} + ... ${request_body_log_4} + ... events_expected=${events_expected} + + # Subscription 7 Checks + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_7} + ... ${request_body_log_3} + ... events_expected=${events_expected} + + Log List ${events_expected} + ## Check Events Expected towards received notifications at mock server + Wait Until Keyword Succeeds 5x 5s Check Mock Server Notification Events ${events_expected} + + +*** Keywords *** +Create Security Context between ${invoker_info} and ${provider_info} + # Discover APIs by invoker + ${discover_response}= Get Request Capif + ... ${DISCOVER_URL}${invoker_info['api_invoker_id']}&aef-id=${provider_info['aef_id']} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${invoker_info['management_cert']} + + Check Response Variable Type And Values ${discover_response} 200 DiscoveredAPIs + + # create Security Context + ${request_service_security_body} ${api_ids}= Create Service Security From Discover Response + ... http://${CAPIF_HOSTNAME}:${CAPIF_HTTP_PORT}/test + ... ${discover_response} + ... legacy=${FALSE} + ${resp}= Put Request Capif + ... /capif-security/v1/trustedInvokers/${invoker_info['api_invoker_id']} + ... json=${request_service_security_body} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${invoker_info['management_cert']} + + Set To Dictionary ${invoker_info} security_body=${request_service_security_body} + # Check Service Security + Check Response Variable Type And Values ${resp} 201 ServiceSecurity + ${resource_url}= Check Location Header ${resp} ${LOCATION_SECURITY_RESOURCE_REGEX} + + ${api_invoker_policies_list}= Create List + + FOR ${api_id} IN @{api_ids} + Log ${api_id} + ${resp}= Get Request Capif + ... /access-control-policy/v1/accessControlPolicyList/${api_id}?aef-id=${provider_info['aef_id']} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${provider_info['aef_username']} + Check Response Variable Type And Values ${resp} 200 AccessControlPolicyList + Should Not Be Empty ${resp.json()['apiInvokerPolicies']} + ${api_invoker_policies}= Set Variable ${resp.json()['apiInvokerPolicies']} + ${api_invoker_policies_list}= Set Variable ${api_invoker_policies} + END + + Log List ${api_invoker_policies_list} + + RETURN ${api_invoker_policies_list} + +Update Security Context between ${invoker_info} and ${provider_info} + # Discover APIs by invoker + ${discover_response}= Get Request Capif + ... ${DISCOVER_URL}${invoker_info['api_invoker_id']}&aef-id=${provider_info['aef_id']} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${invoker_info['management_cert']} + + Check Response Variable Type And Values ${discover_response} 200 DiscoveredAPIs + + # create Security Context + ${request_service_security_body} ${api_ids}= Update Service Security With Discover Response + ... ${invoker_info['security_body']} + ... ${discover_response} + ... legacy=${FALSE} + ${resp}= Post Request Capif + ... /capif-security/v1/trustedInvokers/${invoker_info['api_invoker_id']}/update + ... json=${request_service_security_body} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${invoker_info['management_cert']} + + # Check Service Security + Check Response Variable Type And Values ${resp} 200 ServiceSecurity + ${resource_url}= Check Location Header ${resp} ${LOCATION_SECURITY_RESOURCE_REGEX} + + ${api_invoker_policies_list}= Create List + + ${api_id}= Get From List ${api_ids} -1 + Log ${api_id} + ${resp}= Get Request Capif + ... /access-control-policy/v1/accessControlPolicyList/${api_id}?aef-id=${provider_info['aef_id']} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${provider_info['aef_username']} + + Check Response Variable Type And Values ${resp} 200 AccessControlPolicyList + # Check returned values + Should Not Be Empty ${resp.json()['apiInvokerPolicies']} + + ${api_invoker_policies}= Set Variable ${resp.json()['apiInvokerPolicies']} + # Append To List ${api_invoker_policies_list} ${api_invoker_policies} + ${api_invoker_policies_list}= Set Variable ${api_invoker_policies} + + Log List ${api_invoker_policies_list} + + RETURN ${api_invoker_policies_list} + +Subscribe provider ${provider_info} to events ${events_list} with event filters ${event_filters} + ${resp}= + ... Subscribe ${provider_info['amf_id']} with ${provider_info['amf_username']} to ${events_list} with ${event_filters} + + Check Response Variable Type And Values ${resp} 201 EventSubscription + ${subscriber_id} ${subscription_id}= Check Event Location Header ${resp} + + RETURN ${subscription_id} + +Subscribe invoker ${invoker_info} to events ${events_list} with event filters ${event_filters} + ${resp}= + ... Subscribe ${invoker_info['api_invoker_id']} with ${invoker_info['management_cert']} to ${events_list} with ${event_filters} + + Check Response Variable Type And Values ${resp} 201 EventSubscription + ${subscriber_id} ${subscription_id}= Check Event Location Header ${resp} + + RETURN ${subscription_id} + +Subscribe invoker ${invoker_info} to events ${events_list} with event req ${event_req} + ${resp}= + ... Subscribe ${invoker_info['api_invoker_id']} with ${invoker_info['management_cert']} to ${events_list} with ${event_req} + + Check Response Variable Type And Values ${resp} 201 EventSubscription + ${subscriber_id} ${subscription_id}= Check Event Location Header ${resp} + + RETURN ${subscription_id} + +Subscribe ${subscriber_id} with ${username} to ${events_list} with ${event_req} + ${request_body}= Create Events Subscription + ... events=@{events_list} + ... notification_destination=${NOTIFICATION_DESTINATION_URL}/testing + ... supported_features=C + ... event_req=${event_req} + ${resp}= Post Request Capif + ... /capif-events/v1/${subscriber_id}/subscriptions + ... json=${request_body} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${username} + + RETURN ${resp} + +Send Log Message to CAPIF + [Arguments] ${api_id} ${service_name} ${invoker_info} ${provider_info} @{results} + ${api_ids}= Create List ${api_id} + ${api_names}= Create List ${service_name} + ${request_body}= Create Log Entry + ... ${provider_info['aef_id']} + ... ${invoker_info['api_invoker_id']} + ... ${api_ids} + ... ${api_names} + ... results=@{results} + ${resp}= Post Request Capif + ... /api-invocation-logs/v1/${provider_info['aef_id']}/logs + ... json=${request_body} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${provider_info['amf_username']} + + Check Response Variable Type And Values ${resp} 201 InvocationLog + ${resource_url}= Check Location Header ${resp} ${LOCATION_LOGGING_RESOURCE_REGEX} + + RETURN ${request_body} + +Check not valid ${resp} with event filter ${attribute_snake_case} for event ${event} + # Check Results + ${invalid_param}= Create Dictionary + ... param=eventFilter + ... reason=The eventFilter {'${attribute_snake_case}'} for event ${event} are not applicable. + ${invalid_param_list}= Create List ${invalid_param} + Check Response Variable Type And Values + ... ${resp} + ... 400 + ... ProblemDetails + ... title=Bad Request + ... status=400 + ... detail=Bad Param + ... cause=Invalid eventFilter for event ${event} + ... invalidParams=${invalid_param_list} diff --git a/tests/libraries/api_events/bodyRequests.py b/tests/libraries/api_events/bodyRequests.py index 1bb4604f..d83b9201 100644 --- a/tests/libraries/api_events/bodyRequests.py +++ b/tests/libraries/api_events/bodyRequests.py @@ -39,6 +39,21 @@ def create_capif_event_filter(aefIds=None, apiIds=None, apiInvokerIds=None): return capif_event_filter +def create_event_req(imm_rep=None, notif_method=None, max_report_nbr=None, mon_dur=None, rep_period=None): + data = dict() + if imm_rep is not None: + data['immRep'] = imm_rep + if notif_method is not None: + data['notifMethod'] = notif_method + if max_report_nbr is not None: + data['maxReportNbr'] = max_report_nbr + if mon_dur is not None: + data['monDur'] = mon_dur + if rep_period is not None: + data['repPeriod'] = rep_period + return data + + def create_default_event_req(): return { "grpRepTime": 5, -- GitLab