From 916a94a73378c01b62c0c152aa34f101d6d88ed1 Mon Sep 17 00:00:00 2001 From: Pelayo Torres Date: Tue, 1 Apr 2025 12:15:19 +0200 Subject: [PATCH 1/6] added event requirement to events service --- .../capif_events/celery_app/Dockerfile | 14 + .../capif_events/celery_app/__init__.py | 0 .../capif_events/celery_app/config.py | 20 + .../capif_events/celery_app/config.yaml | 8 + .../capif_events/celery_app/requirements.txt | 7 + .../capif_events/celery_app/start_celery.sh | 12 + .../capif_events/celery_app/tasks.py | 124 +++++ .../capif_events/core/events_apis.py | 77 ++- .../capif_events/core/internal_event_ops.py | 31 ++ .../capif_events/core/notifications.py | 27 +- .../capif_events/db/db.py | 3 + services/TS29222_CAPIF_Events_API/config.yaml | 1 + services/docker-compose-capif.yml | 19 + tests/features/Event Filter/event_req.robot | 443 ++++++++++++++++++ tests/libraries/api_events/bodyRequests.py | 16 + 15 files changed, 797 insertions(+), 5 deletions(-) create mode 100644 services/TS29222_CAPIF_Events_API/capif_events/celery_app/Dockerfile create mode 100644 services/TS29222_CAPIF_Events_API/capif_events/celery_app/__init__.py create mode 100644 services/TS29222_CAPIF_Events_API/capif_events/celery_app/config.py create mode 100644 services/TS29222_CAPIF_Events_API/capif_events/celery_app/config.yaml create mode 100644 services/TS29222_CAPIF_Events_API/capif_events/celery_app/requirements.txt create mode 100644 services/TS29222_CAPIF_Events_API/capif_events/celery_app/start_celery.sh create mode 100644 services/TS29222_CAPIF_Events_API/capif_events/celery_app/tasks.py create mode 100644 tests/features/Event Filter/event_req.robot diff --git a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/Dockerfile b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/Dockerfile new file mode 100644 index 00000000..1b96a1c3 --- /dev/null +++ b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/Dockerfile @@ -0,0 +1,14 @@ +# celery_app/Dockerfile + +FROM python:3.9 + +WORKDIR /celery_app + +COPY requirements.txt /celery_app/ +RUN pip install --no-cache-dir -r requirements.txt + +COPY . /celery_app + +RUN chmod +x /celery_app/start_celery.sh + +CMD ["/celery_app/start_celery.sh"] diff --git a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/__init__.py b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/config.py b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/config.py new file mode 100644 index 00000000..60d542a2 --- /dev/null +++ b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/config.py @@ -0,0 +1,20 @@ +import os + +import yaml + + +#Config class to get config +class Config: + def __init__(self): + self.cached = 0 + self.file="./config.yaml" + self.my_config = {} + stamp = os.stat(self.file).st_mtime + if stamp != self.cached: + self.cached = stamp + f = open(self.file) + self.my_config = yaml.safe_load(f) + f.close() + + def get_config(self): + return self.my_config diff --git a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/config.yaml b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/config.yaml new file mode 100644 index 00000000..c1aa656a --- /dev/null +++ b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/config.yaml @@ -0,0 +1,8 @@ +mongo: { + 'user': 'root', + 'password': 'example', + 'db': 'capif', + 'notifications_col': 'notifications', + 'host': 'mongo', + 'port': "27017" +} diff --git a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/requirements.txt b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/requirements.txt new file mode 100644 index 00000000..64b64cfd --- /dev/null +++ b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/requirements.txt @@ -0,0 +1,7 @@ +celery==5.4 +pymongo==4.2.0 +redis==4.5.4 +aiohttp == 3.10.5 +async-timeout == 4.0.3 +pyyaml == 6.0.2 +python_dateutil >= 2.6.0 diff --git a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/start_celery.sh b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/start_celery.sh new file mode 100644 index 00000000..979e325b --- /dev/null +++ b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/start_celery.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +if [ "$CELERY_MODE" = "worker" ]; then + echo "Starting Celery Worker..." + celery -A tasks worker --loglevel=info +elif [ "$CELERY_MODE" = "beat" ]; then + echo "Iniciando Celery Beat..." + celery -A tasks beat --loglevel=info +else + echo "ERROR: The environment variable CELERY_MODE is not set correctly (worker|beat)" + exit 1 +fi \ No newline at end of file diff --git a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/tasks.py b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/tasks.py new file mode 100644 index 00000000..3016f12c --- /dev/null +++ b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/tasks.py @@ -0,0 +1,124 @@ +# celery/tasks.py +from celery import Celery +from datetime import datetime, timedelta, timezone +import pymongo +import os +from bson.codec_options import CodecOptions +from config import Config +import aiohttp +import asyncio +from dateutil import parser + +# Celery Configuration +celery = Celery( + "notifications", + broker=os.environ.get("CELERY_BROKER_URL", "redis://redis:6379/0"), + backend=os.environ.get("CELERY_RESULT_BACKEND", "redis://redis:6379/0") +) + +celery.conf.beat_schedule = { + "check_notifications_collection": { + "task": "celery.tasks.check_notifications_collection", + "schedule": 1.0, + "args": (), + }, +} +celery.conf.timezone = "UTC" + +# MongoDB Connection +config = Config().get_config() + +mongo_uri = f"mongodb://{config['mongo']['user']}:{config['mongo']['password']}@" \ + f"{config['mongo']['host']}:{config['mongo']['port']}" +client = pymongo.MongoClient(mongo_uri) +notifications_col = client[config['mongo']['db']][config['mongo']['notifications_col']].with_options(codec_options=CodecOptions(tz_aware=True)) + +def serialize_clean_camel_case(obj): + res = obj.to_dict() + res = clean_empty(res) + res = dict_to_camel_case(res) + + return res + +def clean_empty(d): + if isinstance(d, dict): + return { + k: v + for k, v in ((k, clean_empty(v)) for k, v in d.items()) + if v is not None or (isinstance(v, list) and len(v) == 0) + } + if isinstance(d, list): + return [v for v in map(clean_empty, d) if v is not None] + return d + +def dict_to_camel_case(my_dict): + + + result = {} + + for attr, value in my_dict.items(): + + if len(attr.split('_')) != 1: + my_key = ''.join(word.title() for word in attr.split('_')) + my_key = ''.join([my_key[0].lower(), my_key[1:]]) + else: + my_key = attr + + if my_key == "serviceApiCategory": + my_key = "serviceAPICategory" + elif my_key == "serviceApiDescriptions": + my_key = "serviceAPIDescriptions" + + if isinstance(value, list): + result[my_key] = list(map( + lambda x: dict_to_camel_case(x) if isinstance(x, dict) else x, value )) + + elif hasattr(value, "to_dict"): + result[my_key] = dict_to_camel_case(value) + + elif isinstance(value, dict): + value = dict_to_camel_case(value) + result[my_key] = value + else: + result[my_key] = value + + return result + +async def send_request(url, data): + async with aiohttp.ClientSession() as session: + timeout = aiohttp.ClientTimeout(total=10) + headers = {'content-type': 'application/json'} + async with session.post(url, json=data, timeout=timeout, headers=headers) as response: + return await response.text() + +async def send(url, data): + try: + response = await send_request(url, data) + print(response) + except asyncio.TimeoutError: + print("Timeout: Request timeout") + except Exception as e: + print("An exception occurred sending notification::" + str(e)) + return False + +@celery.task(name="celery.tasks.check_notifications_collection") +def my_periodic_task(): + print("Checking notifications collection...") + while True: + try: + notification_data = notifications_col.find_one_and_delete( + {"next_report_time": {"$lt": datetime.now(timezone.utc)}} + ) + if not notification_data: + break + except pymongo.errors.AutoReconnect: + print("MongoDB connection failed. Retrying...") + continue + + try: + print(f"sending notification to {notification_data['url']}") + asyncio.run(send(notification_data["url"], notification_data["notification"])) + except Exception as e: + print(f"Error sending notification: {e}") + + print("Finished processing notifications.") diff --git a/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py b/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py index d6b0bdf2..fea10021 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py @@ -4,6 +4,7 @@ import secrets import rfc3987 from capif_events.models.event_subscription import EventSubscription # noqa: E501 from flask import current_app, Response +from datetime import datetime, timedelta, timezone from .auth_manager import AuthManager from .resources import Resource @@ -56,6 +57,28 @@ class EventSubscriptionsOperations(Resource): current_app.logger.debug(f"The eventFilter {invalid_filters} for event {event} are not applicable.") return bad_request_error(detail="Bad Param", cause = f"Invalid eventFilter for event {event}", invalid_params=[{"param": "eventFilter", "reason": f"The eventFilter {invalid_filters} for event {event} are not applicable."}]) return None + + def __check_event_req(self, event_subscription): + current_app.logger.debug("Checking event requirement.") + if event_subscription.event_req.mon_dur: + if event_subscription.event_req.mon_dur > datetime.now(timezone.utc): + expired_at = event_subscription.event_req.mon_dur + else: + current_app.logger.error("monDur is in the past") + return bad_request_error( + detail="Bad Param", + cause="monDur is in the past", + invalid_params=[{"param": "monDur", "reason": "monDur is in the past"}] + ) + + if event_subscription.event_req.notif_method == "PERIODIC" and event_subscription.event_req.rep_period is None: + current_app.logger.error("Periodic notification method selected but repPeriod not provided") + return bad_request_error( + detail="Bad Param", + cause="Periodic notification method selected but repPeriod not provided", + invalid_params=[{"param": "repPeriod", "reason": "Periodic notification method selected but repPeriod not provided"}] + ) + return expired_at def __init__(self): Resource.__init__(self) @@ -89,12 +112,19 @@ class EventSubscriptionsOperations(Resource): # Check if EnhancedEventReport is enabled and validate event filters + expired_at = None + if EventSubscription.return_supp_feat_dict(event_subscription.supported_features)["EnhancedEventReport"]: if event_subscription.event_filters: current_app.logger.debug(event_subscription.event_filters) result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"])) if isinstance(result, Response): return result + if event_subscription.event_req: + current_app.logger.debug(event_subscription.event_req) + expired_at = self.__check_event_req(event_subscription) + if isinstance(expired_at, Response): + return result else: if event_subscription.event_filters: current_app.logger.error("Event filters provided but EnhancedEventReport is not enabled") @@ -103,12 +133,25 @@ class EventSubscriptionsOperations(Resource): cause="Event filters provided but EnhancedEventReport is not enabled", invalid_params=[{"param": "eventFilters", "reason": "EnhancedEventReport is not enabled"}] ) + if event_subscription.event_req: + current_app.logger.error("Event requirement provided but EnhancedEventReport is not enabled") + return bad_request_error( + detail="Bad Param", + cause="Event requirement provided but EnhancedEventReport is not enabled", + invalid_params=[{"param": "eventReq", "reason": "EnhancedEventReport is not enabled"}] + ) # Generate subscriptionID subscription_id = secrets.token_hex(15) evnt = dict() evnt["subscriber_id"] = subscriber_id evnt["subscription_id"] = subscription_id + + evnt["report_nbr"] = 0 + evnt["created_at"] = datetime.now(timezone.utc) + evnt["expire_at"] = expired_at + + evnt.update(event_subscription.to_dict()) mycol.insert_one(evnt) @@ -130,6 +173,7 @@ class EventSubscriptionsOperations(Resource): try: mycol = self.db.get_col_by_name(self.db.event_collection) + notifications_col = self.db.get_col_by_name(self.db.notifications_col) current_app.logger.debug("Removing event subscription") @@ -148,6 +192,7 @@ class EventSubscriptionsOperations(Resource): return not_found_error(detail="Event subscription not exist", cause="Event API subscription id not found") mycol.delete_one(my_query) + notifications_col.delete_many({"subscription_id": subscription_id}) current_app.logger.debug("Event subscription removed from database") self.auth_manager.remove_auth_event(subscription_id, subscriber_id) @@ -178,10 +223,17 @@ class EventSubscriptionsOperations(Resource): if isinstance(result, Response): return result - if EventSubscription.return_supp_feat_dict(event_subscription.supported_features)["EnhancedEventReport"] and event_subscription.event_filters: - result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"])) - if isinstance(result, Response): - return result + if EventSubscription.return_supp_feat_dict(event_subscription.supported_features)["EnhancedEventReport"]: + if event_subscription.event_filters: + current_app.logger.debug(event_subscription.event_filters) + result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"])) + if isinstance(result, Response): + return result + if event_subscription.event_req: + current_app.logger.debug(event_subscription.event_req) + expired_at = self.__check_event_req(event_subscription) + if isinstance(expired_at, Response): + return result my_query = {'subscriber_id': subscriber_id, 'subscription_id': subscription_id} @@ -195,6 +247,10 @@ class EventSubscriptionsOperations(Resource): body["subscriber_id"] = subscriber_id body["subscription_id"] = subscription_id + body["report_nbr"] = eventdescription.get("report_nbr", 0) + body["created_at"] = eventdescription.get("created_at", datetime.now(timezone.utc)) + body["expire_at"] = expired_at if expired_at else eventdescription.get("expire_at", None) + mycol.replace_one(my_query, body) current_app.logger.debug("Event subscription updated from database") @@ -238,8 +294,21 @@ class EventSubscriptionsOperations(Resource): if isinstance(result, Response): return result + + if event_subscription.event_req: + current_app.logger.debug(event_subscription.event_req) + expired_at = self.__check_event_req(event_subscription) + if isinstance(expired_at, Response): + return result + else: + expired_at = expired_at if expired_at else eventdescription.get("expire_at", None) + + if isinstance(result, Response): + return result body = clean_empty(event_subscription.to_dict()) + if expired_at: + body["expire_at"] = expired_at document = mycol.update_one(my_query, {"$set":body}) document = mycol.find_one(my_query) current_app.logger.debug("Event subscription patched from database") diff --git a/services/TS29222_CAPIF_Events_API/capif_events/core/internal_event_ops.py b/services/TS29222_CAPIF_Events_API/capif_events/core/internal_event_ops.py index f9daf008..73ea353f 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/core/internal_event_ops.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/core/internal_event_ops.py @@ -21,6 +21,14 @@ class InternalEventOperations(Resource): #We dont need remove all auth events, becase when invoker is removed, remove auth entry #self.auth_manager.remove_auth_all_event(subscriber_id) + + def delete_subscription(self, subscription_id): + + mycol = self.db.get_col_by_name(self.db.event_collection) + my_query = {'subscription_id': subscription_id} + mycol.delete_one(my_query) + + current_app.logger.info(f"Removed subscription: {subscription_id}") def get_event_subscriptions(self, event): current_app.logger.info("get subscription from db") @@ -42,3 +50,26 @@ class InternalEventOperations(Resource): except Exception as e: current_app.logger.error("An exception occurred ::" + str(e)) return False + + def add_notification(self, notification): + current_app.logger.info("Adding Notification to notifications list") + try: + mycol = self.db.get_col_by_name(self.db.notifications_col) + mycol.insert_one(notification) + current_app.logger.info("Notification added to notifications list") + except Exception as e: + current_app.logger.error("An exception occurred ::" + str(e)) + return False + + def update_report_nbr(self, subscription_id): + current_app.logger.info("Incrementing report number") + try: + mycol = self.db.get_col_by_name(self.db.event_collection) + my_query = {'subscription_id': subscription_id} + result = mycol.update_one(my_query, {'$inc': {'report_nbr': 1}}) + current_app.logger.info(result) + current_app.logger.info("Report number incremented") + except Exception as e: + current_app.logger.error("An exception occurred ::" + str(e)) + return False + diff --git a/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py b/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py index ab769e51..584e31fe 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py @@ -10,6 +10,7 @@ from flask import current_app from models.event_notification import EventNotification from models.event_subscription import EventSubscription from util import serialize_clean_camel_case +from datetime import datetime, timedelta, timezone from .internal_event_ops import InternalEventOperations @@ -135,7 +136,31 @@ class Notifications(): current_app.logger.debug(json.dumps(data.to_dict(),cls=CustomJSONEncoder)) - asyncio.run(self.send(url, serialize_clean_camel_case(data))) + if EventSubscription.return_supp_feat_dict(sub["supported_features"])["EnhancedEventReport"] and sub.get("event_req", None): + current_app.logger.debug(f"Creating notification for {sub['subscription_id']}") + + if sub["event_req"]["notif_method"] == "PERIODIC": + transcurred_time = (datetime.now(timezone.utc)-sub["created_at"]).total_seconds() + if transcurred_time > sub["event_req"]["rep_period"]: + transcurred_blocks = int(transcurred_time // sub["event_req"]["rep_period"]) + next_report_time = sub["created_at"] + timedelta(seconds=((transcurred_blocks+1) * sub["event_req"]["rep_period"])) + else: + next_report_time = sub["created_at"] + timedelta(seconds=sub["event_req"]["rep_period"]) + + notification = {"notification": data.to_dict(), "next_report_time" : next_report_time, "url": url, "subscription_id": sub["subscription_id"]} + + self.events_ops.add_notification(notification) + + if sub["event_req"].get("max_report_nbr", None) and sub["report_nbr"] + 1 == sub["event_req"].get("max_report_nbr", None): + current_app.logger.debug(f"Limit reached, deleting subscription {sub['subscription_id']}") + self.events_ops.delete_subscription(sub["subscription_id"]) + + else: + asyncio.run(self.send(url, serialize_clean_camel_case(data))) + + self.events_ops.update_report_nbr(sub["subscription_id"]) + + except Exception as e: current_app.logger.error("An exception occurred ::" + str(e)) diff --git a/services/TS29222_CAPIF_Events_API/capif_events/db/db.py b/services/TS29222_CAPIF_Events_API/capif_events/db/db.py index adfc472a..3af5d6cf 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/db/db.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/db/db.py @@ -21,6 +21,9 @@ class MongoDatabse(): self.invoker_collection = self.config['mongo']['capif_invokers_col'] self.provider_collection = self.config['mongo']['capif_providers_col'] self.certs_col = self.config['mongo']['certs_col'] + self.notifications_col = self.config['mongo']['notifications_col'] + + self.get_col_by_name(self.event_collection).create_index([("expire_at", 1)],expireAfterSeconds=0) # self.acls_col = self.config['mongo']['capif_acls_col'] def get_col_by_name(self, name): diff --git a/services/TS29222_CAPIF_Events_API/config.yaml b/services/TS29222_CAPIF_Events_API/config.yaml index 101d300e..a64584dd 100644 --- a/services/TS29222_CAPIF_Events_API/config.yaml +++ b/services/TS29222_CAPIF_Events_API/config.yaml @@ -7,6 +7,7 @@ mongo: { 'capif_invokers_col': 'invokerdetails', 'capif_providers_col': 'providerenrolmentdetails', 'capif_acls_col': 'acls', + 'notifications_col': 'notifications', 'host': 'mongo', 'port': "27017" } diff --git a/services/docker-compose-capif.yml b/services/docker-compose-capif.yml index 6b5a504c..e965cedb 100644 --- a/services/docker-compose-capif.yml +++ b/services/docker-compose-capif.yml @@ -175,6 +175,25 @@ services: - redis - mongo + celery_worker: + build: + context: ${SERVICES_DIR}/TS29222_CAPIF_Events_API/capif_events/celery_app + environment: + - CELERY_MODE=worker + - REDIS_HOST=redis + - REDIS_PORT=6379 + depends_on: + - redis + + celery_beat: + build: + context: ${SERVICES_DIR}/TS29222_CAPIF_Events_API/capif_events/celery_app + environment: + - CELERY_MODE=beat + - REDIS_HOST=redis + - REDIS_PORT=6379 + depends_on: + - redis api-invocation-logs: build: context: ${SERVICES_DIR}/TS29222_CAPIF_Logging_API_Invocation_API diff --git a/tests/features/Event Filter/event_req.robot b/tests/features/Event Filter/event_req.robot new file mode 100644 index 00000000..4530c0d2 --- /dev/null +++ b/tests/features/Event Filter/event_req.robot @@ -0,0 +1,443 @@ +*** Settings *** +Resource /opt/robot-tests/tests/resources/common.resource +Library /opt/robot-tests/tests/libraries/bodyRequests.py +Library XML +Library String +Resource /opt/robot-tests/tests/resources/common/basicRequests.robot +Resource ../../resources/common.resource +Resource ../../resources/common/basicRequests.robot + +Suite Teardown Reset Testing Environment +Test Setup Reset Testing Environment +Test Teardown Reset Testing Environment + + +*** Variables *** +${API_INVOKER_NOT_REGISTERED} not-valid +${SUBSCRIBER_ID_NOT_VALID} not-valid +${SUBSCRIPTION_ID_NOT_VALID} not-valid + + +*** Test Cases *** +Invoker subscribe to Service API Available + [Tags] pelayo-1 mockserver + + # Initialize Mock server + Init Mock Server + + # Default Invoker Registration and Onboarding + ${register_user_info_invoker} ${url} ${request_body}= Invoker Default Onboarding + + # Create Provider1 with 2 AEF roles and publish API + ${register_user_info_provider_1}= Provider Default Registration + ${aef_id_1}= Set Variable ${register_user_info_provider_1['aef_roles']['${AEF_PROVIDER_USERNAME}']['aef_id']} + ${aef_empty_list}= Create List + + # Subscribe to events and setup event filter with api_id + ${events_list}= Create List API_INVOKER_ONBOARDED + ${event_req}= Create Event Req notif_method=PERIODIC max_report_nbr=${2} rep_period=${1} + + ${subscription_ids}= Create List + + FOR ${counter} IN RANGE 1 10000 1 + Log ${counter} + ${subscription_id}= + ... Subscribe invoker ${register_user_info_invoker} to events ${events_list} with event req ${event_req} + Append To List ${subscription_ids} ${subscription_id} + END + + Sleep 300s + + ${resp}= Get Mock Server Messages + + ${notification_events_on_mock_server}= Set Variable ${resp.json()} + +Invoker subscribe to Service API Availables + [Tags] event_filter-7 mockserver smoke + + # Initialize Mock server + Init Mock Server + + # Register Providers + ## Default Provider 1 Registration + ${register_user_info_provider_1}= Provider Default Registration provider_username=${PROVIDER_USERNAME}_1 + ${aef_id_1}= Set Variable + ... ${register_user_info_provider_1['aef_roles']['${AEF_PROVIDER_USERNAME}_1']['aef_id']} + + ## Publish service_1 API + ${service_api_description_published_1} + ... ${provider_resource_url_1} + ... ${provider_request_body_1}= + ... Publish Service Api + ... ${register_user_info_provider_1} + ... service_name=service_1 + + ## Default Provider 2 Registration + ${register_user_info_provider_2}= Provider Default Registration provider_username=${PROVIDER_USERNAME}_2 + ${aef_id_2}= Set Variable + ... ${register_user_info_provider_2['aef_roles']['${AEF_PROVIDER_USERNAME}_2']['aef_id']} + + ## Publish service_2 API + ${service_api_description_published_2} + ... ${provider_resource_url_2} + ... ${provider_request_body_2}= + ... Publish Service Api + ... ${register_user_info_provider_2} + ... service_name=service_2 + + ## Store apiId1 and apiId2 for further use + ${service_api_id_1}= Set Variable ${service_api_description_published_1['apiId']} + ${service_api_id_2}= Set Variable ${service_api_description_published_2['apiId']} + + # Register Invokers + ## Default Invoker 1 Registration and Onboarding + ${register_user_info_invoker_1} ${invoker_url_1} ${request_body_1}= Invoker Default Onboarding + ... invoker_username=${INVOKER_USERNAME}_1 + + ## Default Invoker 2 Registration and Onboarding + ${register_user_info_invoker_2} ${invoker_url_2} ${request_body_2}= Invoker Default Onboarding + ... invoke_username=${INVOKER_USERNAME}_2 + + ## Store apiInvokerIds for further use + ${api_invoker_id_1}= Set Variable ${register_user_info_invoker_1['api_invoker_id']} + ${api_invoker_id_2}= Set Variable ${register_user_info_invoker_2['api_invoker_id']} + + # Subscribe to events + ## Event lists + ${events_list}= Create List SERVICE_API_INVOCATION_SUCCESS SERVICE_API_INVOCATION_FAILURE + + ## Event filters + ${event_filter_empty}= Create Capif Event Filter + ${event_filter_api_invoker_ids}= Create Capif Event Filter apiInvokerIds=${api_invoker_id_1} + ${event_filter_api_ids}= Create Capif Event Filter apiIds=${service_api_id_1} + ${event_filter_aef_ids}= Create Capif Event Filter aefIds=${aef_id_2} + ${event_filter_api_ids_and_aef_ids}= Create Capif Event Filter + ... apiIds=${service_api_id_2} + ... aefIds=${aef_id_2} + ${event_filter_api_ids_and_api_invoker_ids}= Create Capif Event Filter + ... apiInvokerIds=${api_invoker_id_2} + ... apiIds=${service_api_id_2} + ${event_filter_aef_ids_and_api_invoker_ids}= Create Capif Event Filter + ... apiInvokerIds=${api_invoker_id_2} + ... aefIds=${aef_id_1} + ${event_filter_api_ids_aef_ids_and_api_invoker_ids}= Create Capif Event Filter + ... apiInvokerIds=${api_invoker_id_2} + ... aefIds=${aef_id_2} + ... apiIds=${service_api_id_2} + + ## Subscription to Events 1 + ${event_filters}= Create List ${event_filter_api_ids} ${event_filter_api_ids} + ${subscription_id_1}= + ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} + + ## Subscription to Events 2 + ${event_filters}= Create List ${event_filter_aef_ids} ${event_filter_aef_ids} + ${subscription_id_2}= + ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} + + ## Subscription to Events 3 + ${event_filters}= Create List ${event_filter_api_invoker_ids} ${event_filter_api_invoker_ids} + ${subscription_id_3}= + ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} + + ## Subscription to Events 4 + ${event_filters}= Create List ${event_filter_api_ids_and_aef_ids} ${event_filter_api_ids_and_aef_ids} + ${subscription_id_4}= + ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} + + ## Subscription to Events 5 + ${event_filters}= Create List + ... ${event_filter_api_ids_and_api_invoker_ids} + ... ${event_filter_api_ids_and_api_invoker_ids} + ${subscription_id_5}= + ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} + + ## Subscription to Events 6 + ${event_filters}= Create List + ... ${event_filter_aef_ids_and_api_invoker_ids} + ... ${event_filter_aef_ids_and_api_invoker_ids} + ${subscription_id_6}= + ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} + + ## Subscription to Events 7 + ${event_filters}= Create List + ... ${event_filter_api_ids_aef_ids_and_api_invoker_ids} + ... ${event_filter_api_ids_aef_ids_and_api_invoker_ids} + ${subscription_id_7}= + ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} + + # 1.Log entry for service_1 and invoker_1 + ${request_body_log_1}= Send Log Message to CAPIF + ... ${service_api_id_1} + ... service_1 + ... ${register_user_info_invoker_1} + ... ${register_user_info_provider_1} + ... 200 + ... 400 + + # 2.Log entry for service_2 and invoker_1 + ${request_body_log_2}= Send Log Message to CAPIF + ... ${service_api_id_2} + ... service_2 + ... ${register_user_info_invoker_1} + ... ${register_user_info_provider_2} + ... 200 + + # 3.Log entry for service_2 and invoker_2 + ${request_body_log_3}= Send Log Message to CAPIF + ... ${service_api_id_2} + ... service_2 + ... ${register_user_info_invoker_2} + ... ${register_user_info_provider_2} + ... 200 + + # 4.Log entry for service_1 and invoker_2 + ${request_body_log_4}= Send Log Message to CAPIF + ... ${service_api_id_1} + ... service_1 + ... ${register_user_info_invoker_2} + ... ${register_user_info_provider_1} + ... 400 + + # Check Event Notifications + ## Create check Events to ensure all notifications were received + ### Subscription 1 Checks + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_1} + ... ${request_body_log_1} + + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_1} + ... ${request_body_log_4} + ... events_expected=${events_expected} + + ### Subcription 2 Checks + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_2} + ... ${request_body_log_2} + ... events_expected=${events_expected} + + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_2} + ... ${request_body_log_3} + ... events_expected=${events_expected} + + # Subscription 3 Checks + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_3} + ... ${request_body_log_1} + ... events_expected=${events_expected} + + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_3} + ... ${request_body_log_2} + ... events_expected=${events_expected} + + # Subscription 4 Checks + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_4} + ... ${request_body_log_2} + ... events_expected=${events_expected} + + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_4} + ... ${request_body_log_3} + ... events_expected=${events_expected} + + # Subscription 5 Checks + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_5} + ... ${request_body_log_3} + ... events_expected=${events_expected} + + # Subscription 6 Checks + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_6} + ... ${request_body_log_4} + ... events_expected=${events_expected} + + # Subscription 7 Checks + ${events_expected}= Create Events From InvocationLogs + ... ${subscription_id_7} + ... ${request_body_log_3} + ... events_expected=${events_expected} + + Log List ${events_expected} + ## Check Events Expected towards received notifications at mock server + Wait Until Keyword Succeeds 5x 5s Check Mock Server Notification Events ${events_expected} + + +*** Keywords *** +Create Security Context between ${invoker_info} and ${provider_info} + # Discover APIs by invoker + ${discover_response}= Get Request Capif + ... ${DISCOVER_URL}${invoker_info['api_invoker_id']}&aef-id=${provider_info['aef_id']} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${invoker_info['management_cert']} + + Check Response Variable Type And Values ${discover_response} 200 DiscoveredAPIs + + # create Security Context + ${request_service_security_body} ${api_ids}= Create Service Security From Discover Response + ... http://${CAPIF_HOSTNAME}:${CAPIF_HTTP_PORT}/test + ... ${discover_response} + ... legacy=${FALSE} + ${resp}= Put Request Capif + ... /capif-security/v1/trustedInvokers/${invoker_info['api_invoker_id']} + ... json=${request_service_security_body} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${invoker_info['management_cert']} + + Set To Dictionary ${invoker_info} security_body=${request_service_security_body} + # Check Service Security + Check Response Variable Type And Values ${resp} 201 ServiceSecurity + ${resource_url}= Check Location Header ${resp} ${LOCATION_SECURITY_RESOURCE_REGEX} + + ${api_invoker_policies_list}= Create List + + FOR ${api_id} IN @{api_ids} + Log ${api_id} + ${resp}= Get Request Capif + ... /access-control-policy/v1/accessControlPolicyList/${api_id}?aef-id=${provider_info['aef_id']} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${provider_info['aef_username']} + Check Response Variable Type And Values ${resp} 200 AccessControlPolicyList + Should Not Be Empty ${resp.json()['apiInvokerPolicies']} + ${api_invoker_policies}= Set Variable ${resp.json()['apiInvokerPolicies']} + ${api_invoker_policies_list}= Set Variable ${api_invoker_policies} + END + + Log List ${api_invoker_policies_list} + + RETURN ${api_invoker_policies_list} + +Update Security Context between ${invoker_info} and ${provider_info} + # Discover APIs by invoker + ${discover_response}= Get Request Capif + ... ${DISCOVER_URL}${invoker_info['api_invoker_id']}&aef-id=${provider_info['aef_id']} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${invoker_info['management_cert']} + + Check Response Variable Type And Values ${discover_response} 200 DiscoveredAPIs + + # create Security Context + ${request_service_security_body} ${api_ids}= Update Service Security With Discover Response + ... ${invoker_info['security_body']} + ... ${discover_response} + ... legacy=${FALSE} + ${resp}= Post Request Capif + ... /capif-security/v1/trustedInvokers/${invoker_info['api_invoker_id']}/update + ... json=${request_service_security_body} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${invoker_info['management_cert']} + + # Check Service Security + Check Response Variable Type And Values ${resp} 200 ServiceSecurity + ${resource_url}= Check Location Header ${resp} ${LOCATION_SECURITY_RESOURCE_REGEX} + + ${api_invoker_policies_list}= Create List + + ${api_id}= Get From List ${api_ids} -1 + Log ${api_id} + ${resp}= Get Request Capif + ... /access-control-policy/v1/accessControlPolicyList/${api_id}?aef-id=${provider_info['aef_id']} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${provider_info['aef_username']} + + Check Response Variable Type And Values ${resp} 200 AccessControlPolicyList + # Check returned values + Should Not Be Empty ${resp.json()['apiInvokerPolicies']} + + ${api_invoker_policies}= Set Variable ${resp.json()['apiInvokerPolicies']} + # Append To List ${api_invoker_policies_list} ${api_invoker_policies} + ${api_invoker_policies_list}= Set Variable ${api_invoker_policies} + + Log List ${api_invoker_policies_list} + + RETURN ${api_invoker_policies_list} + +Subscribe provider ${provider_info} to events ${events_list} with event filters ${event_filters} + ${resp}= + ... Subscribe ${provider_info['amf_id']} with ${provider_info['amf_username']} to ${events_list} with ${event_filters} + + Check Response Variable Type And Values ${resp} 201 EventSubscription + ${subscriber_id} ${subscription_id}= Check Event Location Header ${resp} + + RETURN ${subscription_id} + +Subscribe invoker ${invoker_info} to events ${events_list} with event filters ${event_filters} + ${resp}= + ... Subscribe ${invoker_info['api_invoker_id']} with ${invoker_info['management_cert']} to ${events_list} with ${event_filters} + + Check Response Variable Type And Values ${resp} 201 EventSubscription + ${subscriber_id} ${subscription_id}= Check Event Location Header ${resp} + + RETURN ${subscription_id} + +Subscribe invoker ${invoker_info} to events ${events_list} with event req ${event_req} + ${resp}= + ... Subscribe ${invoker_info['api_invoker_id']} with ${invoker_info['management_cert']} to ${events_list} with ${event_req} + + Check Response Variable Type And Values ${resp} 201 EventSubscription + ${subscriber_id} ${subscription_id}= Check Event Location Header ${resp} + + RETURN ${subscription_id} + +Subscribe ${subscriber_id} with ${username} to ${events_list} with ${event_req} + ${request_body}= Create Events Subscription + ... events=@{events_list} + ... notification_destination=${NOTIFICATION_DESTINATION_URL}/testing + ... supported_features=C + ... event_req=${event_req} + ${resp}= Post Request Capif + ... /capif-events/v1/${subscriber_id}/subscriptions + ... json=${request_body} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${username} + + RETURN ${resp} + +Send Log Message to CAPIF + [Arguments] ${api_id} ${service_name} ${invoker_info} ${provider_info} @{results} + ${api_ids}= Create List ${api_id} + ${api_names}= Create List ${service_name} + ${request_body}= Create Log Entry + ... ${provider_info['aef_id']} + ... ${invoker_info['api_invoker_id']} + ... ${api_ids} + ... ${api_names} + ... results=@{results} + ${resp}= Post Request Capif + ... /api-invocation-logs/v1/${provider_info['aef_id']}/logs + ... json=${request_body} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${provider_info['amf_username']} + + Check Response Variable Type And Values ${resp} 201 InvocationLog + ${resource_url}= Check Location Header ${resp} ${LOCATION_LOGGING_RESOURCE_REGEX} + + RETURN ${request_body} + +Check not valid ${resp} with event filter ${attribute_snake_case} for event ${event} + # Check Results + ${invalid_param}= Create Dictionary + ... param=eventFilter + ... reason=The eventFilter {'${attribute_snake_case}'} for event ${event} are not applicable. + ${invalid_param_list}= Create List ${invalid_param} + Check Response Variable Type And Values + ... ${resp} + ... 400 + ... ProblemDetails + ... title=Bad Request + ... status=400 + ... detail=Bad Param + ... cause=Invalid eventFilter for event ${event} + ... invalidParams=${invalid_param_list} diff --git a/tests/libraries/api_events/bodyRequests.py b/tests/libraries/api_events/bodyRequests.py index 1bb4604f..4e853a62 100644 --- a/tests/libraries/api_events/bodyRequests.py +++ b/tests/libraries/api_events/bodyRequests.py @@ -39,6 +39,21 @@ def create_capif_event_filter(aefIds=None, apiIds=None, apiInvokerIds=None): return capif_event_filter +def create_event_req(imm_rep=None, notif_method=None, max_report_nbr=None, mon_dur=None, rep_period=None): + data = dict() + if imm_rep is not None: + data['immRep'] = imm_rep + if notif_method is not None: + data['notifMethod'] = notif_method + if max_report_nbr is not None: + data['maxReportNbr'] = max_report_nbr + if mon_dur is not None: + data['monDur'] = mon_dur + if rep_period is not None: + data['repPeriod'] = rep_period + return data + + def create_default_event_req(): return { "grpRepTime": 5, @@ -51,6 +66,7 @@ def create_default_event_req(): } + def create_websock_notif_config_default(): return { "requestWebsocketUri": True, -- GitLab From f47caf83c58d9de7a03187d230c6d333ab32e630 Mon Sep 17 00:00:00 2001 From: Pelayo Torres Date: Tue, 1 Apr 2025 12:17:54 +0200 Subject: [PATCH 2/6] use etsi python image in celery --- .../capif_events/celery_app/Dockerfile | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/Dockerfile b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/Dockerfile index 1b96a1c3..7d334128 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/Dockerfile +++ b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/Dockerfile @@ -1,6 +1,4 @@ -# celery_app/Dockerfile - -FROM python:3.9 +FROM labs.etsi.org:5050/ocf/capif/python:3-slim-bullseye WORKDIR /celery_app -- GitLab From f6120f0bc4efc3f3a8c051a6f64a1619e708574f Mon Sep 17 00:00:00 2001 From: Pelayo Torres Date: Fri, 11 Apr 2025 09:04:38 +0200 Subject: [PATCH 3/6] Celery in services folder --- .../capif_events/celery_app/Dockerfile | 12 - .../capif_events/core/events_apis.py | 1 + services/celery/Dockerfile | 12 + .../celery_app => celery}/__init__.py | 0 .../celery_app => celery}/config.py | 0 .../celery_app => celery}/config.yaml | 0 .../celery_app => celery}/requirements.txt | 0 .../celery_app => celery}/start_celery.sh | 0 .../celery_app => celery}/tasks.py | 17 +- services/docker-compose-capif.yml | 6 +- tests/features/Event Filter/event_req.robot | 215 ------------------ 11 files changed, 28 insertions(+), 235 deletions(-) delete mode 100644 services/TS29222_CAPIF_Events_API/capif_events/celery_app/Dockerfile create mode 100644 services/celery/Dockerfile rename services/{TS29222_CAPIF_Events_API/capif_events/celery_app => celery}/__init__.py (100%) rename services/{TS29222_CAPIF_Events_API/capif_events/celery_app => celery}/config.py (100%) rename services/{TS29222_CAPIF_Events_API/capif_events/celery_app => celery}/config.yaml (100%) rename services/{TS29222_CAPIF_Events_API/capif_events/celery_app => celery}/requirements.txt (100%) rename services/{TS29222_CAPIF_Events_API/capif_events/celery_app => celery}/start_celery.sh (100%) rename services/{TS29222_CAPIF_Events_API/capif_events/celery_app => celery}/tasks.py (88%) diff --git a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/Dockerfile b/services/TS29222_CAPIF_Events_API/capif_events/celery_app/Dockerfile deleted file mode 100644 index 7d334128..00000000 --- a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/Dockerfile +++ /dev/null @@ -1,12 +0,0 @@ -FROM labs.etsi.org:5050/ocf/capif/python:3-slim-bullseye - -WORKDIR /celery_app - -COPY requirements.txt /celery_app/ -RUN pip install --no-cache-dir -r requirements.txt - -COPY . /celery_app - -RUN chmod +x /celery_app/start_celery.sh - -CMD ["/celery_app/start_celery.sh"] diff --git a/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py b/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py index fea10021..d57defba 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py @@ -60,6 +60,7 @@ class EventSubscriptionsOperations(Resource): def __check_event_req(self, event_subscription): current_app.logger.debug("Checking event requirement.") + expired_at = None if event_subscription.event_req.mon_dur: if event_subscription.event_req.mon_dur > datetime.now(timezone.utc): expired_at = event_subscription.event_req.mon_dur diff --git a/services/celery/Dockerfile b/services/celery/Dockerfile new file mode 100644 index 00000000..ffad675f --- /dev/null +++ b/services/celery/Dockerfile @@ -0,0 +1,12 @@ +FROM labs.etsi.org:5050/ocf/capif/python:3-slim-bullseye + +WORKDIR /celery + +COPY requirements.txt /celery/ +RUN pip install --no-cache-dir -r requirements.txt + +COPY . /celery + +RUN chmod +x /celery/start_celery.sh + +CMD ["/celery/start_celery.sh"] diff --git a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/__init__.py b/services/celery/__init__.py similarity index 100% rename from services/TS29222_CAPIF_Events_API/capif_events/celery_app/__init__.py rename to services/celery/__init__.py diff --git a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/config.py b/services/celery/config.py similarity index 100% rename from services/TS29222_CAPIF_Events_API/capif_events/celery_app/config.py rename to services/celery/config.py diff --git a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/config.yaml b/services/celery/config.yaml similarity index 100% rename from services/TS29222_CAPIF_Events_API/capif_events/celery_app/config.yaml rename to services/celery/config.yaml diff --git a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/requirements.txt b/services/celery/requirements.txt similarity index 100% rename from services/TS29222_CAPIF_Events_API/capif_events/celery_app/requirements.txt rename to services/celery/requirements.txt diff --git a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/start_celery.sh b/services/celery/start_celery.sh similarity index 100% rename from services/TS29222_CAPIF_Events_API/capif_events/celery_app/start_celery.sh rename to services/celery/start_celery.sh diff --git a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/tasks.py b/services/celery/tasks.py similarity index 88% rename from services/TS29222_CAPIF_Events_API/capif_events/celery_app/tasks.py rename to services/celery/tasks.py index 3016f12c..cdc589b5 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/celery_app/tasks.py +++ b/services/celery/tasks.py @@ -1,19 +1,24 @@ # celery/tasks.py from celery import Celery -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone import pymongo import os from bson.codec_options import CodecOptions from config import Config import aiohttp import asyncio -from dateutil import parser # Celery Configuration +# celery = Celery( +# "notifications", +# broker=os.environ.get("CELERY_BROKER_URL", "redis://redis:6379/0"), +# backend=os.environ.get("CELERY_RESULT_BACKEND", "redis://redis:6379/0") +# ) + celery = Celery( "notifications", - broker=os.environ.get("CELERY_BROKER_URL", "redis://redis:6379/0"), - backend=os.environ.get("CELERY_RESULT_BACKEND", "redis://redis:6379/0") + broker=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0", + backend=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0" ) celery.conf.beat_schedule = { @@ -103,7 +108,7 @@ async def send(url, data): @celery.task(name="celery.tasks.check_notifications_collection") def my_periodic_task(): - print("Checking notifications collection...") + # print("Checking notifications collection...") while True: try: notification_data = notifications_col.find_one_and_delete( @@ -121,4 +126,4 @@ def my_periodic_task(): except Exception as e: print(f"Error sending notification: {e}") - print("Finished processing notifications.") + # print("Finished processing notifications.") diff --git a/services/docker-compose-capif.yml b/services/docker-compose-capif.yml index e965cedb..06247465 100644 --- a/services/docker-compose-capif.yml +++ b/services/docker-compose-capif.yml @@ -177,23 +177,25 @@ services: celery_worker: build: - context: ${SERVICES_DIR}/TS29222_CAPIF_Events_API/capif_events/celery_app + context: ${SERVICES_DIR}/celery environment: - CELERY_MODE=worker - REDIS_HOST=redis - REDIS_PORT=6379 depends_on: - redis + - mongo celery_beat: build: - context: ${SERVICES_DIR}/TS29222_CAPIF_Events_API/capif_events/celery_app + context: ${SERVICES_DIR}/celery environment: - CELERY_MODE=beat - REDIS_HOST=redis - REDIS_PORT=6379 depends_on: - redis + - mongo api-invocation-logs: build: context: ${SERVICES_DIR}/TS29222_CAPIF_Logging_API_Invocation_API diff --git a/tests/features/Event Filter/event_req.robot b/tests/features/Event Filter/event_req.robot index 4530c0d2..b2d9f9dd 100644 --- a/tests/features/Event Filter/event_req.robot +++ b/tests/features/Event Filter/event_req.robot @@ -52,221 +52,6 @@ Invoker subscribe to Service API Available ${notification_events_on_mock_server}= Set Variable ${resp.json()} -Invoker subscribe to Service API Availables - [Tags] event_filter-7 mockserver smoke - - # Initialize Mock server - Init Mock Server - - # Register Providers - ## Default Provider 1 Registration - ${register_user_info_provider_1}= Provider Default Registration provider_username=${PROVIDER_USERNAME}_1 - ${aef_id_1}= Set Variable - ... ${register_user_info_provider_1['aef_roles']['${AEF_PROVIDER_USERNAME}_1']['aef_id']} - - ## Publish service_1 API - ${service_api_description_published_1} - ... ${provider_resource_url_1} - ... ${provider_request_body_1}= - ... Publish Service Api - ... ${register_user_info_provider_1} - ... service_name=service_1 - - ## Default Provider 2 Registration - ${register_user_info_provider_2}= Provider Default Registration provider_username=${PROVIDER_USERNAME}_2 - ${aef_id_2}= Set Variable - ... ${register_user_info_provider_2['aef_roles']['${AEF_PROVIDER_USERNAME}_2']['aef_id']} - - ## Publish service_2 API - ${service_api_description_published_2} - ... ${provider_resource_url_2} - ... ${provider_request_body_2}= - ... Publish Service Api - ... ${register_user_info_provider_2} - ... service_name=service_2 - - ## Store apiId1 and apiId2 for further use - ${service_api_id_1}= Set Variable ${service_api_description_published_1['apiId']} - ${service_api_id_2}= Set Variable ${service_api_description_published_2['apiId']} - - # Register Invokers - ## Default Invoker 1 Registration and Onboarding - ${register_user_info_invoker_1} ${invoker_url_1} ${request_body_1}= Invoker Default Onboarding - ... invoker_username=${INVOKER_USERNAME}_1 - - ## Default Invoker 2 Registration and Onboarding - ${register_user_info_invoker_2} ${invoker_url_2} ${request_body_2}= Invoker Default Onboarding - ... invoke_username=${INVOKER_USERNAME}_2 - - ## Store apiInvokerIds for further use - ${api_invoker_id_1}= Set Variable ${register_user_info_invoker_1['api_invoker_id']} - ${api_invoker_id_2}= Set Variable ${register_user_info_invoker_2['api_invoker_id']} - - # Subscribe to events - ## Event lists - ${events_list}= Create List SERVICE_API_INVOCATION_SUCCESS SERVICE_API_INVOCATION_FAILURE - - ## Event filters - ${event_filter_empty}= Create Capif Event Filter - ${event_filter_api_invoker_ids}= Create Capif Event Filter apiInvokerIds=${api_invoker_id_1} - ${event_filter_api_ids}= Create Capif Event Filter apiIds=${service_api_id_1} - ${event_filter_aef_ids}= Create Capif Event Filter aefIds=${aef_id_2} - ${event_filter_api_ids_and_aef_ids}= Create Capif Event Filter - ... apiIds=${service_api_id_2} - ... aefIds=${aef_id_2} - ${event_filter_api_ids_and_api_invoker_ids}= Create Capif Event Filter - ... apiInvokerIds=${api_invoker_id_2} - ... apiIds=${service_api_id_2} - ${event_filter_aef_ids_and_api_invoker_ids}= Create Capif Event Filter - ... apiInvokerIds=${api_invoker_id_2} - ... aefIds=${aef_id_1} - ${event_filter_api_ids_aef_ids_and_api_invoker_ids}= Create Capif Event Filter - ... apiInvokerIds=${api_invoker_id_2} - ... aefIds=${aef_id_2} - ... apiIds=${service_api_id_2} - - ## Subscription to Events 1 - ${event_filters}= Create List ${event_filter_api_ids} ${event_filter_api_ids} - ${subscription_id_1}= - ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} - - ## Subscription to Events 2 - ${event_filters}= Create List ${event_filter_aef_ids} ${event_filter_aef_ids} - ${subscription_id_2}= - ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} - - ## Subscription to Events 3 - ${event_filters}= Create List ${event_filter_api_invoker_ids} ${event_filter_api_invoker_ids} - ${subscription_id_3}= - ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} - - ## Subscription to Events 4 - ${event_filters}= Create List ${event_filter_api_ids_and_aef_ids} ${event_filter_api_ids_and_aef_ids} - ${subscription_id_4}= - ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} - - ## Subscription to Events 5 - ${event_filters}= Create List - ... ${event_filter_api_ids_and_api_invoker_ids} - ... ${event_filter_api_ids_and_api_invoker_ids} - ${subscription_id_5}= - ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} - - ## Subscription to Events 6 - ${event_filters}= Create List - ... ${event_filter_aef_ids_and_api_invoker_ids} - ... ${event_filter_aef_ids_and_api_invoker_ids} - ${subscription_id_6}= - ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} - - ## Subscription to Events 7 - ${event_filters}= Create List - ... ${event_filter_api_ids_aef_ids_and_api_invoker_ids} - ... ${event_filter_api_ids_aef_ids_and_api_invoker_ids} - ${subscription_id_7}= - ... Subscribe provider ${register_user_info_provider_1} to events ${events_list} with event filters ${event_filters} - - # 1.Log entry for service_1 and invoker_1 - ${request_body_log_1}= Send Log Message to CAPIF - ... ${service_api_id_1} - ... service_1 - ... ${register_user_info_invoker_1} - ... ${register_user_info_provider_1} - ... 200 - ... 400 - - # 2.Log entry for service_2 and invoker_1 - ${request_body_log_2}= Send Log Message to CAPIF - ... ${service_api_id_2} - ... service_2 - ... ${register_user_info_invoker_1} - ... ${register_user_info_provider_2} - ... 200 - - # 3.Log entry for service_2 and invoker_2 - ${request_body_log_3}= Send Log Message to CAPIF - ... ${service_api_id_2} - ... service_2 - ... ${register_user_info_invoker_2} - ... ${register_user_info_provider_2} - ... 200 - - # 4.Log entry for service_1 and invoker_2 - ${request_body_log_4}= Send Log Message to CAPIF - ... ${service_api_id_1} - ... service_1 - ... ${register_user_info_invoker_2} - ... ${register_user_info_provider_1} - ... 400 - - # Check Event Notifications - ## Create check Events to ensure all notifications were received - ### Subscription 1 Checks - ${events_expected}= Create Events From InvocationLogs - ... ${subscription_id_1} - ... ${request_body_log_1} - - ${events_expected}= Create Events From InvocationLogs - ... ${subscription_id_1} - ... ${request_body_log_4} - ... events_expected=${events_expected} - - ### Subcription 2 Checks - ${events_expected}= Create Events From InvocationLogs - ... ${subscription_id_2} - ... ${request_body_log_2} - ... events_expected=${events_expected} - - ${events_expected}= Create Events From InvocationLogs - ... ${subscription_id_2} - ... ${request_body_log_3} - ... events_expected=${events_expected} - - # Subscription 3 Checks - ${events_expected}= Create Events From InvocationLogs - ... ${subscription_id_3} - ... ${request_body_log_1} - ... events_expected=${events_expected} - - ${events_expected}= Create Events From InvocationLogs - ... ${subscription_id_3} - ... ${request_body_log_2} - ... events_expected=${events_expected} - - # Subscription 4 Checks - ${events_expected}= Create Events From InvocationLogs - ... ${subscription_id_4} - ... ${request_body_log_2} - ... events_expected=${events_expected} - - ${events_expected}= Create Events From InvocationLogs - ... ${subscription_id_4} - ... ${request_body_log_3} - ... events_expected=${events_expected} - - # Subscription 5 Checks - ${events_expected}= Create Events From InvocationLogs - ... ${subscription_id_5} - ... ${request_body_log_3} - ... events_expected=${events_expected} - - # Subscription 6 Checks - ${events_expected}= Create Events From InvocationLogs - ... ${subscription_id_6} - ... ${request_body_log_4} - ... events_expected=${events_expected} - - # Subscription 7 Checks - ${events_expected}= Create Events From InvocationLogs - ... ${subscription_id_7} - ... ${request_body_log_3} - ... events_expected=${events_expected} - - Log List ${events_expected} - ## Check Events Expected towards received notifications at mock server - Wait Until Keyword Succeeds 5x 5s Check Mock Server Notification Events ${events_expected} - - *** Keywords *** Create Security Context between ${invoker_info} and ${provider_info} # Discover APIs by invoker -- GitLab From b8632d8fd2a84429161e95fb97a2319861b934dd Mon Sep 17 00:00:00 2001 From: Pelayo Torres Date: Thu, 24 Apr 2025 08:34:08 +0200 Subject: [PATCH 4/6] one_time event logic --- .../capif_events/core/notifications.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py b/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py index 584e31fe..dca31901 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py @@ -150,6 +150,11 @@ class Notifications(): notification = {"notification": data.to_dict(), "next_report_time" : next_report_time, "url": url, "subscription_id": sub["subscription_id"]} self.events_ops.add_notification(notification) + self.events_ops.update_report_nbr(sub["subscription_id"]) + + if sub["event_req"]["notif_method"] == "ONE_TIME": + asyncio.run(self.send(url, serialize_clean_camel_case(data))) + self.events_ops.delete_subscription(sub["subscription_id"]) if sub["event_req"].get("max_report_nbr", None) and sub["report_nbr"] + 1 == sub["event_req"].get("max_report_nbr", None): current_app.logger.debug(f"Limit reached, deleting subscription {sub['subscription_id']}") @@ -157,8 +162,7 @@ class Notifications(): else: asyncio.run(self.send(url, serialize_clean_camel_case(data))) - - self.events_ops.update_report_nbr(sub["subscription_id"]) + self.events_ops.update_report_nbr(sub["subscription_id"]) -- GitLab From 81378162fd2c4b95cba9e0688848489e90b1105f Mon Sep 17 00:00:00 2001 From: Pelayo Torres Date: Fri, 2 May 2025 11:12:52 +0200 Subject: [PATCH 5/6] immRep logic --- .../capif_events/core/events_apis.py | 38 ++++++++++++++----- services/celery/requirements.txt | 2 +- services/celery/tasks.py | 9 ----- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py b/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py index d57defba..aaa8860d 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py @@ -4,16 +4,22 @@ import secrets import rfc3987 from capif_events.models.event_subscription import EventSubscription # noqa: E501 from flask import current_app, Response -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone +import asyncio from .auth_manager import AuthManager from .resources import Resource from .responses import internal_server_error, not_found_error, make_response, bad_request_error from ..util import serialize_clean_camel_case, clean_empty, dict_to_camel_case +from .notifications import Notifications class EventSubscriptionsOperations(Resource): + def __init__(self): + super().__init__() + self.notifications = Notifications() + def __check_subscriber_id(self, subscriber_id): mycol_invoker= self.db.get_col_by_name(self.db.invoker_collection) mycol_provider= self.db.get_col_by_name(self.db.provider_collection) @@ -58,7 +64,7 @@ class EventSubscriptionsOperations(Resource): return bad_request_error(detail="Bad Param", cause = f"Invalid eventFilter for event {event}", invalid_params=[{"param": "eventFilter", "reason": f"The eventFilter {invalid_filters} for event {event} are not applicable."}]) return None - def __check_event_req(self, event_subscription): + def __check_event_req(self, event_subscription, subscription_id=None): current_app.logger.debug("Checking event requirement.") expired_at = None if event_subscription.event_req.mon_dur: @@ -79,6 +85,14 @@ class EventSubscriptionsOperations(Resource): cause="Periodic notification method selected but repPeriod not provided", invalid_params=[{"param": "repPeriod", "reason": "Periodic notification method selected but repPeriod not provided"}] ) + + if event_subscription.event_req.imm_rep and subscription_id is not None: + current_app.logger.debug("Sending immediate notification") + notifications_col = self.db.get_col_by_name(self.db.notifications_col) + result = notifications_col.find({"subscription_id": subscription_id}) + for notification in result: + asyncio.run(self.notifications.send(notification["url"], notification["notification"])) + return expired_at def __init__(self): @@ -209,6 +223,7 @@ class EventSubscriptionsOperations(Resource): def put_event(self, event_subscription, subscriber_id, subscription_id): try: mycol = self.db.get_col_by_name(self.db.event_collection) + notifications_col = self.db.get_col_by_name(self.db.notifications_col) current_app.logger.debug("Updating event subscription") @@ -224,6 +239,8 @@ class EventSubscriptionsOperations(Resource): if isinstance(result, Response): return result + current_app.logger.debug(event_subscription) + expired_at = None if EventSubscription.return_supp_feat_dict(event_subscription.supported_features)["EnhancedEventReport"]: if event_subscription.event_filters: current_app.logger.debug(event_subscription.event_filters) @@ -231,8 +248,7 @@ class EventSubscriptionsOperations(Resource): if isinstance(result, Response): return result if event_subscription.event_req: - current_app.logger.debug(event_subscription.event_req) - expired_at = self.__check_event_req(event_subscription) + expired_at = self.__check_event_req(event_subscription, subscription_id) if isinstance(expired_at, Response): return result @@ -252,6 +268,7 @@ class EventSubscriptionsOperations(Resource): body["created_at"] = eventdescription.get("created_at", datetime.now(timezone.utc)) body["expire_at"] = expired_at if expired_at else eventdescription.get("expire_at", None) + notifications_col.delete_many({"subscription_id": subscription_id}) mycol.replace_one(my_query, body) current_app.logger.debug("Event subscription updated from database") @@ -269,6 +286,7 @@ class EventSubscriptionsOperations(Resource): def patch_event(self, event_subscription, subscriber_id, subscription_id): try: mycol = self.db.get_col_by_name(self.db.event_collection) + notifications_col = self.db.get_col_by_name(self.db.notifications_col) current_app.logger.debug("Patching event subscription") @@ -284,7 +302,8 @@ class EventSubscriptionsOperations(Resource): if eventdescription is None: current_app.logger.error("Event subscription not found") return not_found_error(detail="Event subscription not exist", cause="Event API subscription id not found") - + current_app.logger.debug(event_subscription) + expired_at = None if EventSubscription.return_supp_feat_dict(eventdescription.get("supported_features"))["EnhancedEventReport"]: if event_subscription.events and event_subscription.event_filters: result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"])) @@ -292,13 +311,12 @@ class EventSubscriptionsOperations(Resource): result = self.__check_event_filters(event_subscription.events, eventdescription.get("event_filters")) elif event_subscription.events is None and event_subscription.event_filters: result = self.__check_event_filters(eventdescription.get("events"), clean_empty(event_subscription.to_dict()["event_filters"])) - if isinstance(result, Response): return result if event_subscription.event_req: - current_app.logger.debug(event_subscription.event_req) - expired_at = self.__check_event_req(event_subscription) + updated_data = EventSubscription.from_dict(dict_to_camel_case({**eventdescription, **clean_empty(event_subscription.to_dict())})) + expired_at = self.__check_event_req(updated_data, subscription_id) if isinstance(expired_at, Response): return result else: @@ -308,8 +326,8 @@ class EventSubscriptionsOperations(Resource): return result body = clean_empty(event_subscription.to_dict()) - if expired_at: - body["expire_at"] = expired_at + body["expire_at"] = expired_at + notifications_col.delete_many({"subscription_id": subscription_id}) document = mycol.update_one(my_query, {"$set":body}) document = mycol.find_one(my_query) current_app.logger.debug("Event subscription patched from database") diff --git a/services/celery/requirements.txt b/services/celery/requirements.txt index 64b64cfd..32b0f0c6 100644 --- a/services/celery/requirements.txt +++ b/services/celery/requirements.txt @@ -4,4 +4,4 @@ redis==4.5.4 aiohttp == 3.10.5 async-timeout == 4.0.3 pyyaml == 6.0.2 -python_dateutil >= 2.6.0 +python_dateutil == 2.9.0 diff --git a/services/celery/tasks.py b/services/celery/tasks.py index cdc589b5..01476920 100644 --- a/services/celery/tasks.py +++ b/services/celery/tasks.py @@ -8,13 +8,6 @@ from config import Config import aiohttp import asyncio -# Celery Configuration -# celery = Celery( -# "notifications", -# broker=os.environ.get("CELERY_BROKER_URL", "redis://redis:6379/0"), -# backend=os.environ.get("CELERY_RESULT_BACKEND", "redis://redis:6379/0") -# ) - celery = Celery( "notifications", broker=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0", @@ -108,7 +101,6 @@ async def send(url, data): @celery.task(name="celery.tasks.check_notifications_collection") def my_periodic_task(): - # print("Checking notifications collection...") while True: try: notification_data = notifications_col.find_one_and_delete( @@ -126,4 +118,3 @@ def my_periodic_task(): except Exception as e: print(f"Error sending notification: {e}") - # print("Finished processing notifications.") -- GitLab From 82d5badad0d2f722a9537d79da34488b8bf72e31 Mon Sep 17 00:00:00 2001 From: Pelayo Torres Date: Mon, 5 May 2025 12:00:24 +0200 Subject: [PATCH 6/6] removed commented line --- services/TS29222_CAPIF_Events_API/capif_events/db/db.py | 1 - 1 file changed, 1 deletion(-) diff --git a/services/TS29222_CAPIF_Events_API/capif_events/db/db.py b/services/TS29222_CAPIF_Events_API/capif_events/db/db.py index 3af5d6cf..c4888466 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/db/db.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/db/db.py @@ -24,7 +24,6 @@ class MongoDatabse(): self.notifications_col = self.config['mongo']['notifications_col'] self.get_col_by_name(self.event_collection).create_index([("expire_at", 1)],expireAfterSeconds=0) - # self.acls_col = self.config['mongo']['capif_acls_col'] def get_col_by_name(self, name): return self.db[name].with_options(codec_options=CodecOptions(tz_aware=True)) -- GitLab