diff --git a/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py b/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py index 440925c361560ca56b9a66919f941a1403c44326..1f9acaf3e4ff5c26e5aea1c729b1c02968960b2c 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/core/events_apis.py @@ -3,12 +3,16 @@ import secrets import rfc3987 from capif_events.models.event_subscription import EventSubscription # noqa: E501 -from flask import Response, current_app +from flask import current_app, Response +from datetime import datetime, timezone +import asyncio from ..util import clean_empty, dict_to_camel_case, serialize_clean_camel_case from .auth_manager import AuthManager from .resources import Resource -from .responses import bad_request_error, internal_server_error, make_response, not_found_error +from .responses import internal_server_error, not_found_error, make_response, bad_request_error +from ..util import serialize_clean_camel_case, clean_empty, dict_to_camel_case +from .notifications import Notifications TOTAL_FEATURES = 4 SUPPORTED_FEATURES_HEX = "c" @@ -27,6 +31,10 @@ def return_negotiated_supp_feat_dict(supp_feat): class EventSubscriptionsOperations(Resource): + def __init__(self): + super().__init__() + self.notifications = Notifications() + def __check_subscriber_id(self, subscriber_id): mycol_invoker= self.db.get_col_by_name(self.db.invoker_collection) mycol_provider= self.db.get_col_by_name(self.db.provider_collection) @@ -70,6 +78,37 @@ class EventSubscriptionsOperations(Resource): current_app.logger.debug(f"The eventFilter {invalid_filters} for event {event} are not applicable.") return bad_request_error(detail="Bad Param", cause = f"Invalid eventFilter for event {event}", invalid_params=[{"param": "eventFilter", "reason": f"The eventFilter {invalid_filters} for event {event} are not applicable."}]) return None + + def __check_event_req(self, event_subscription, subscription_id=None): + current_app.logger.debug("Checking event requirement.") + expired_at = None + if event_subscription.event_req.mon_dur: + if event_subscription.event_req.mon_dur > datetime.now(timezone.utc): + expired_at = event_subscription.event_req.mon_dur + else: + current_app.logger.error("monDur is in the past") + return bad_request_error( + detail="Bad Param", + cause="monDur is in the past", + invalid_params=[{"param": "monDur", "reason": "monDur is in the past"}] + ) + + if event_subscription.event_req.notif_method == "PERIODIC" and event_subscription.event_req.rep_period is None: + current_app.logger.error("Periodic notification method selected but repPeriod not provided") + return bad_request_error( + detail="Bad Param", + cause="Periodic notification method selected but repPeriod not provided", + invalid_params=[{"param": "repPeriod", "reason": "Periodic notification method selected but repPeriod not provided"}] + ) + + if event_subscription.event_req.imm_rep and subscription_id is not None: + current_app.logger.debug("Sending immediate notification") + notifications_col = self.db.get_col_by_name(self.db.notifications_col) + result = notifications_col.find({"subscription_id": subscription_id}) + for notification in result: + asyncio.run(self.notifications.send(notification["url"], notification["notification"])) + + return expired_at def __init__(self): Resource.__init__(self) @@ -102,7 +141,8 @@ class EventSubscriptionsOperations(Resource): return result negotiated_supported_features = return_negotiated_supp_feat_dict(event_subscription.supported_features) - + + expired_at = None # Check if EnhancedEventReport is enabled and validate event filters if negotiated_supported_features["EnhancedEventReport"]: if event_subscription.event_filters: @@ -110,6 +150,11 @@ class EventSubscriptionsOperations(Resource): result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"])) if isinstance(result, Response): return result + if event_subscription.event_req: + current_app.logger.debug(event_subscription.event_req) + expired_at = self.__check_event_req(event_subscription) + if isinstance(expired_at, Response): + return result else: if event_subscription.event_filters: current_app.logger.error("Event filters provided but EnhancedEventReport is not enabled") @@ -118,6 +163,13 @@ class EventSubscriptionsOperations(Resource): cause="Event filters provided but EnhancedEventReport is not enabled", invalid_params=[{"param": "eventFilters", "reason": "EnhancedEventReport is not enabled"}] ) + if event_subscription.event_req: + current_app.logger.error("Event requirement provided but EnhancedEventReport is not enabled") + return bad_request_error( + detail="Bad Param", + cause="Event requirement provided but EnhancedEventReport is not enabled", + invalid_params=[{"param": "eventReq", "reason": "EnhancedEventReport is not enabled"}] + ) # Generate subscriptionID subscription_id = secrets.token_hex(15) @@ -125,7 +177,9 @@ class EventSubscriptionsOperations(Resource): evnt["subscriber_id"] = subscriber_id evnt["subscription_id"] = subscription_id - # Edit supported_features field to the negotiated one + evnt["report_nbr"] = 0 + evnt["created_at"] = datetime.now(timezone.utc) + evnt["expire_at"] = expired_at event_subscription.supported_features = negotiated_supported_features["Final"] evnt.update(event_subscription.to_dict()) @@ -149,6 +203,7 @@ class EventSubscriptionsOperations(Resource): try: mycol = self.db.get_col_by_name(self.db.event_collection) + notifications_col = self.db.get_col_by_name(self.db.notifications_col) current_app.logger.debug("Removing event subscription") @@ -167,6 +222,7 @@ class EventSubscriptionsOperations(Resource): return not_found_error(detail="Event subscription not exist", cause="Event API subscription id not found") mycol.delete_one(my_query) + notifications_col.delete_many({"subscription_id": subscription_id}) current_app.logger.debug("Event subscription removed from database") self.auth_manager.remove_auth_event(subscription_id, subscriber_id) @@ -182,6 +238,7 @@ class EventSubscriptionsOperations(Resource): def put_event(self, event_subscription, subscriber_id, subscription_id): try: mycol = self.db.get_col_by_name(self.db.event_collection) + notifications_col = self.db.get_col_by_name(self.db.notifications_col) current_app.logger.debug("Updating event subscription") @@ -196,30 +253,41 @@ class EventSubscriptionsOperations(Resource): if isinstance(result, Response): return result + + current_app.logger.debug(event_subscription) + expired_at = None + negotiated_supported_features = return_negotiated_supp_feat_dict(event_subscription.supported_features) + if negotiated_supported_features["EnhancedEventReport"]: + if event_subscription.event_filters: + current_app.logger.debug(event_subscription.event_filters) + result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"])) + if isinstance(result, Response): + return result + if event_subscription.event_req: + current_app.logger.debug(event_subscription.event_req) + expired_at = self.__check_event_req(event_subscription) + if isinstance(expired_at, Response): + return result + else: + if event_subscription.event_filters: + current_app.logger.error("Event filters provided but EnhancedEventReport is not enabled") + return bad_request_error( + detail="Bad Param", + cause="Event filters provided but EnhancedEventReport is not enabled", + invalid_params=[{"param": "eventFilters", "reason": "EnhancedEventReport is not enabled"}] + ) + if event_subscription.event_req: + current_app.logger.error("Event requirement provided but EnhancedEventReport is not enabled") + return bad_request_error( + detail="Bad Param", + cause="Event requirement provided but EnhancedEventReport is not enabled", + invalid_params=[{"param": "eventReq", "reason": "EnhancedEventReport is not enabled"}] + ) my_query = {'subscriber_id': subscriber_id, 'subscription_id': subscription_id} eventdescription = mycol.find_one(my_query) - if eventdescription is None: - current_app.logger.error("Event subscription not found") - return not_found_error(detail="Event subscription not exist", - cause="Event API subscription id not found") - - negotiated_supported_features = return_negotiated_supp_feat_dict(event_subscription.supported_features) - - if negotiated_supported_features["EnhancedEventReport"] and event_subscription.event_filters: - result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"])) - if isinstance(result, Response): - return result - elif (not negotiated_supported_features["EnhancedEventReport"]) and event_subscription.event_filters: - current_app.logger.error("Event filters provided but EnhancedEventReport is not enabled") - return bad_request_error( - detail="Bad Param", - cause="Event filters provided but EnhancedEventReport is not enabled", - invalid_params=[{"param": "eventFilters", "reason": "EnhancedEventReport is not enabled"}] - ) - event_subscription.supported_features = negotiated_supported_features["Final"] body = event_subscription.to_dict() @@ -227,6 +295,11 @@ class EventSubscriptionsOperations(Resource): body["subscriber_id"] = subscriber_id body["subscription_id"] = subscription_id + body["report_nbr"] = eventdescription.get("report_nbr", 0) + body["created_at"] = eventdescription.get("created_at", datetime.now(timezone.utc)) + body["expire_at"] = expired_at if expired_at else eventdescription.get("expire_at", None) + + notifications_col.delete_many({"subscription_id": subscription_id}) mycol.replace_one(my_query, body) current_app.logger.debug("Event subscription updated from database") @@ -244,6 +317,7 @@ class EventSubscriptionsOperations(Resource): def patch_event(self, event_subscription, subscriber_id, subscription_id): try: mycol = self.db.get_col_by_name(self.db.event_collection) + notifications_col = self.db.get_col_by_name(self.db.notifications_col) current_app.logger.debug("Patching event subscription") @@ -260,6 +334,9 @@ class EventSubscriptionsOperations(Resource): current_app.logger.error("Event subscription not found") return not_found_error(detail="Event subscription not exist", cause="Event API subscription id not found") + current_app.logger.debug(event_subscription) + expired_at = None + negotiated_supported_features = return_negotiated_supp_feat_dict(eventdescription.get("supported_features")) if negotiated_supported_features["EnhancedEventReport"]: @@ -269,6 +346,16 @@ class EventSubscriptionsOperations(Resource): result = self.__check_event_filters(event_subscription.events, eventdescription.get("event_filters")) elif event_subscription.events is None and event_subscription.event_filters: result = self.__check_event_filters(eventdescription.get("events"), clean_empty(event_subscription.to_dict()["event_filters"])) + if isinstance(result, Response): + return result + + if event_subscription.event_req: + updated_data = EventSubscription.from_dict(dict_to_camel_case({**eventdescription, **clean_empty(event_subscription.to_dict())})) + expired_at = self.__check_event_req(updated_data, subscription_id) + if isinstance(expired_at, Response): + return result + else: + expired_at = expired_at if expired_at else eventdescription.get("expire_at", None) if isinstance(result, Response): return result @@ -276,6 +363,8 @@ class EventSubscriptionsOperations(Resource): event_subscription.supported_features = negotiated_supported_features["Final"] body = clean_empty(event_subscription.to_dict()) + body["expire_at"] = expired_at + notifications_col.delete_many({"subscription_id": subscription_id}) document = mycol.update_one(my_query, {"$set":body}) document = mycol.find_one(my_query) current_app.logger.debug("Event subscription patched from database") diff --git a/services/TS29222_CAPIF_Events_API/capif_events/core/internal_event_ops.py b/services/TS29222_CAPIF_Events_API/capif_events/core/internal_event_ops.py index f9daf00889930f64567a573ba7bbb219ecf585ec..73ea353f6a31c482a25716be2336e80b9d8c6b81 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/core/internal_event_ops.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/core/internal_event_ops.py @@ -21,6 +21,14 @@ class InternalEventOperations(Resource): #We dont need remove all auth events, becase when invoker is removed, remove auth entry #self.auth_manager.remove_auth_all_event(subscriber_id) + + def delete_subscription(self, subscription_id): + + mycol = self.db.get_col_by_name(self.db.event_collection) + my_query = {'subscription_id': subscription_id} + mycol.delete_one(my_query) + + current_app.logger.info(f"Removed subscription: {subscription_id}") def get_event_subscriptions(self, event): current_app.logger.info("get subscription from db") @@ -42,3 +50,26 @@ class InternalEventOperations(Resource): except Exception as e: current_app.logger.error("An exception occurred ::" + str(e)) return False + + def add_notification(self, notification): + current_app.logger.info("Adding Notification to notifications list") + try: + mycol = self.db.get_col_by_name(self.db.notifications_col) + mycol.insert_one(notification) + current_app.logger.info("Notification added to notifications list") + except Exception as e: + current_app.logger.error("An exception occurred ::" + str(e)) + return False + + def update_report_nbr(self, subscription_id): + current_app.logger.info("Incrementing report number") + try: + mycol = self.db.get_col_by_name(self.db.event_collection) + my_query = {'subscription_id': subscription_id} + result = mycol.update_one(my_query, {'$inc': {'report_nbr': 1}}) + current_app.logger.info(result) + current_app.logger.info("Report number incremented") + except Exception as e: + current_app.logger.error("An exception occurred ::" + str(e)) + return False + diff --git a/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py b/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py index 9c8e07ff5ffb665d5fee301582be883371acfcb1..0bd895575b416ae813f0828099c0895f54d433f8 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/core/notifications.py @@ -9,6 +9,7 @@ from encoder import CustomJSONEncoder from flask import current_app from models.event_notification import EventNotification from util import serialize_clean_camel_case +from datetime import datetime, timedelta, timezone from .internal_event_ops import InternalEventOperations @@ -148,7 +149,35 @@ class Notifications(): current_app.logger.debug(json.dumps(data.to_dict(),cls=CustomJSONEncoder)) - asyncio.run(self.send(url, serialize_clean_camel_case(data))) + if return_negotiated_supp_feat_dict(sub["supported_features"])["EnhancedEventReport"] and sub.get("event_req", None): + current_app.logger.debug(f"Creating notification for {sub['subscription_id']}") + + if sub["event_req"]["notif_method"] == "PERIODIC": + transcurred_time = (datetime.now(timezone.utc)-sub["created_at"]).total_seconds() + if transcurred_time > sub["event_req"]["rep_period"]: + transcurred_blocks = int(transcurred_time // sub["event_req"]["rep_period"]) + next_report_time = sub["created_at"] + timedelta(seconds=((transcurred_blocks+1) * sub["event_req"]["rep_period"])) + else: + next_report_time = sub["created_at"] + timedelta(seconds=sub["event_req"]["rep_period"]) + + notification = {"notification": data.to_dict(), "next_report_time" : next_report_time, "url": url, "subscription_id": sub["subscription_id"]} + + self.events_ops.add_notification(notification) + self.events_ops.update_report_nbr(sub["subscription_id"]) + + if sub["event_req"]["notif_method"] == "ONE_TIME": + asyncio.run(self.send(url, serialize_clean_camel_case(data))) + self.events_ops.delete_subscription(sub["subscription_id"]) + + if sub["event_req"].get("max_report_nbr", None) and sub["report_nbr"] + 1 == sub["event_req"].get("max_report_nbr", None): + current_app.logger.debug(f"Limit reached, deleting subscription {sub['subscription_id']}") + self.events_ops.delete_subscription(sub["subscription_id"]) + + else: + asyncio.run(self.send(url, serialize_clean_camel_case(data))) + self.events_ops.update_report_nbr(sub["subscription_id"]) + + except Exception as e: current_app.logger.error("An exception occurred ::" + str(e)) diff --git a/services/TS29222_CAPIF_Events_API/capif_events/db/db.py b/services/TS29222_CAPIF_Events_API/capif_events/db/db.py index adfc472a567be9deb866e90eab1d8986281be0e8..c48884660da34a72900833d38ecb357928011f80 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/db/db.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/db/db.py @@ -21,7 +21,9 @@ class MongoDatabse(): self.invoker_collection = self.config['mongo']['capif_invokers_col'] self.provider_collection = self.config['mongo']['capif_providers_col'] self.certs_col = self.config['mongo']['certs_col'] - # self.acls_col = self.config['mongo']['capif_acls_col'] + self.notifications_col = self.config['mongo']['notifications_col'] + + self.get_col_by_name(self.event_collection).create_index([("expire_at", 1)],expireAfterSeconds=0) def get_col_by_name(self, name): return self.db[name].with_options(codec_options=CodecOptions(tz_aware=True)) diff --git a/services/TS29222_CAPIF_Events_API/config.yaml b/services/TS29222_CAPIF_Events_API/config.yaml index 101d300e8b86d57d77a4d054e9100cc25bf42aa0..a64584dd0da0d0f2dc2735f69d8a4cdbe76a275b 100644 --- a/services/TS29222_CAPIF_Events_API/config.yaml +++ b/services/TS29222_CAPIF_Events_API/config.yaml @@ -7,6 +7,7 @@ mongo: { 'capif_invokers_col': 'invokerdetails', 'capif_providers_col': 'providerenrolmentdetails', 'capif_acls_col': 'acls', + 'notifications_col': 'notifications', 'host': 'mongo', 'port': "27017" } diff --git a/services/celery/Dockerfile b/services/celery/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..ffad675f873609b54252c011d16cfa92cb244582 --- /dev/null +++ b/services/celery/Dockerfile @@ -0,0 +1,12 @@ +FROM labs.etsi.org:5050/ocf/capif/python:3-slim-bullseye + +WORKDIR /celery + +COPY requirements.txt /celery/ +RUN pip install --no-cache-dir -r requirements.txt + +COPY . /celery + +RUN chmod +x /celery/start_celery.sh + +CMD ["/celery/start_celery.sh"] diff --git a/services/celery/__init__.py b/services/celery/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/services/celery/config.py b/services/celery/config.py new file mode 100644 index 0000000000000000000000000000000000000000..60d542a2fe7c8ed81b0a330732075101bf489ad6 --- /dev/null +++ b/services/celery/config.py @@ -0,0 +1,20 @@ +import os + +import yaml + + +#Config class to get config +class Config: + def __init__(self): + self.cached = 0 + self.file="./config.yaml" + self.my_config = {} + stamp = os.stat(self.file).st_mtime + if stamp != self.cached: + self.cached = stamp + f = open(self.file) + self.my_config = yaml.safe_load(f) + f.close() + + def get_config(self): + return self.my_config diff --git a/services/celery/config.yaml b/services/celery/config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..c1aa656aecf69ece1b9f04da73804dcb4c230e88 --- /dev/null +++ b/services/celery/config.yaml @@ -0,0 +1,8 @@ +mongo: { + 'user': 'root', + 'password': 'example', + 'db': 'capif', + 'notifications_col': 'notifications', + 'host': 'mongo', + 'port': "27017" +} diff --git a/services/celery/requirements.txt b/services/celery/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..32b0f0c6021ad8c149a4f87182cfe64f048e95b2 --- /dev/null +++ b/services/celery/requirements.txt @@ -0,0 +1,7 @@ +celery==5.4 +pymongo==4.2.0 +redis==4.5.4 +aiohttp == 3.10.5 +async-timeout == 4.0.3 +pyyaml == 6.0.2 +python_dateutil == 2.9.0 diff --git a/services/celery/start_celery.sh b/services/celery/start_celery.sh new file mode 100644 index 0000000000000000000000000000000000000000..979e325b07c8b4abbb96baed626586c8d6082233 --- /dev/null +++ b/services/celery/start_celery.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +if [ "$CELERY_MODE" = "worker" ]; then + echo "Starting Celery Worker..." + celery -A tasks worker --loglevel=info +elif [ "$CELERY_MODE" = "beat" ]; then + echo "Iniciando Celery Beat..." + celery -A tasks beat --loglevel=info +else + echo "ERROR: The environment variable CELERY_MODE is not set correctly (worker|beat)" + exit 1 +fi \ No newline at end of file diff --git a/services/celery/tasks.py b/services/celery/tasks.py new file mode 100644 index 0000000000000000000000000000000000000000..014769203779c061343ec57ca7b24f9fd8385456 --- /dev/null +++ b/services/celery/tasks.py @@ -0,0 +1,120 @@ +# celery/tasks.py +from celery import Celery +from datetime import datetime, timezone +import pymongo +import os +from bson.codec_options import CodecOptions +from config import Config +import aiohttp +import asyncio + +celery = Celery( + "notifications", + broker=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0", + backend=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0" +) + +celery.conf.beat_schedule = { + "check_notifications_collection": { + "task": "celery.tasks.check_notifications_collection", + "schedule": 1.0, + "args": (), + }, +} +celery.conf.timezone = "UTC" + +# MongoDB Connection +config = Config().get_config() + +mongo_uri = f"mongodb://{config['mongo']['user']}:{config['mongo']['password']}@" \ + f"{config['mongo']['host']}:{config['mongo']['port']}" +client = pymongo.MongoClient(mongo_uri) +notifications_col = client[config['mongo']['db']][config['mongo']['notifications_col']].with_options(codec_options=CodecOptions(tz_aware=True)) + +def serialize_clean_camel_case(obj): + res = obj.to_dict() + res = clean_empty(res) + res = dict_to_camel_case(res) + + return res + +def clean_empty(d): + if isinstance(d, dict): + return { + k: v + for k, v in ((k, clean_empty(v)) for k, v in d.items()) + if v is not None or (isinstance(v, list) and len(v) == 0) + } + if isinstance(d, list): + return [v for v in map(clean_empty, d) if v is not None] + return d + +def dict_to_camel_case(my_dict): + + + result = {} + + for attr, value in my_dict.items(): + + if len(attr.split('_')) != 1: + my_key = ''.join(word.title() for word in attr.split('_')) + my_key = ''.join([my_key[0].lower(), my_key[1:]]) + else: + my_key = attr + + if my_key == "serviceApiCategory": + my_key = "serviceAPICategory" + elif my_key == "serviceApiDescriptions": + my_key = "serviceAPIDescriptions" + + if isinstance(value, list): + result[my_key] = list(map( + lambda x: dict_to_camel_case(x) if isinstance(x, dict) else x, value )) + + elif hasattr(value, "to_dict"): + result[my_key] = dict_to_camel_case(value) + + elif isinstance(value, dict): + value = dict_to_camel_case(value) + result[my_key] = value + else: + result[my_key] = value + + return result + +async def send_request(url, data): + async with aiohttp.ClientSession() as session: + timeout = aiohttp.ClientTimeout(total=10) + headers = {'content-type': 'application/json'} + async with session.post(url, json=data, timeout=timeout, headers=headers) as response: + return await response.text() + +async def send(url, data): + try: + response = await send_request(url, data) + print(response) + except asyncio.TimeoutError: + print("Timeout: Request timeout") + except Exception as e: + print("An exception occurred sending notification::" + str(e)) + return False + +@celery.task(name="celery.tasks.check_notifications_collection") +def my_periodic_task(): + while True: + try: + notification_data = notifications_col.find_one_and_delete( + {"next_report_time": {"$lt": datetime.now(timezone.utc)}} + ) + if not notification_data: + break + except pymongo.errors.AutoReconnect: + print("MongoDB connection failed. Retrying...") + continue + + try: + print(f"sending notification to {notification_data['url']}") + asyncio.run(send(notification_data["url"], notification_data["notification"])) + except Exception as e: + print(f"Error sending notification: {e}") + diff --git a/services/docker-compose-capif.yml b/services/docker-compose-capif.yml index 6b5a504c5938bd2c690b5898eb84f0e514c67acb..06247465c40bca268b9b354de4e0b972600bddf0 100644 --- a/services/docker-compose-capif.yml +++ b/services/docker-compose-capif.yml @@ -175,6 +175,27 @@ services: - redis - mongo + celery_worker: + build: + context: ${SERVICES_DIR}/celery + environment: + - CELERY_MODE=worker + - REDIS_HOST=redis + - REDIS_PORT=6379 + depends_on: + - redis + - mongo + + celery_beat: + build: + context: ${SERVICES_DIR}/celery + environment: + - CELERY_MODE=beat + - REDIS_HOST=redis + - REDIS_PORT=6379 + depends_on: + - redis + - mongo api-invocation-logs: build: context: ${SERVICES_DIR}/TS29222_CAPIF_Logging_API_Invocation_API diff --git a/tests/features/Event Filter/event_req.robot b/tests/features/Event Filter/event_req.robot new file mode 100644 index 0000000000000000000000000000000000000000..caddaf9ffb1004442624f913da2e0ab8cd2ee449 --- /dev/null +++ b/tests/features/Event Filter/event_req.robot @@ -0,0 +1,228 @@ +*** Settings *** +Resource /opt/robot-tests/tests/resources/common.resource +Library /opt/robot-tests/tests/libraries/bodyRequests.py +Library XML +Library String +Resource /opt/robot-tests/tests/resources/common/basicRequests.robot +Resource ../../resources/common.resource +Resource ../../resources/common/basicRequests.robot + +Suite Teardown Reset Testing Environment +Test Setup Reset Testing Environment +Test Teardown Reset Testing Environment + + +*** Variables *** +${API_INVOKER_NOT_REGISTERED} not-valid +${SUBSCRIBER_ID_NOT_VALID} not-valid +${SUBSCRIPTION_ID_NOT_VALID} not-valid + + +*** Test Cases *** +Invoker subscribe to Service API Available + [Tags] pelayo-1 mockserver + + # Initialize Mock server + Init Mock Server + + # Default Invoker Registration and Onboarding + ${register_user_info_invoker} ${url} ${request_body}= Invoker Default Onboarding + + # Create Provider1 with 2 AEF roles and publish API + ${register_user_info_provider_1}= Provider Default Registration + ${aef_id_1}= Set Variable ${register_user_info_provider_1['aef_roles']['${AEF_PROVIDER_USERNAME}']['aef_id']} + ${aef_empty_list}= Create List + + # Subscribe to events and setup event filter with api_id + ${events_list}= Create List API_INVOKER_ONBOARDED + ${event_req}= Create Event Req notif_method=PERIODIC max_report_nbr=${2} rep_period=${1} + + ${subscription_ids}= Create List + + FOR ${counter} IN RANGE 1 1 1 + Log ${counter} + ${subscription_id}= + ... Subscribe invoker ${register_user_info_invoker} to events ${events_list} with event req ${event_req} + Append To List ${subscription_ids} ${subscription_id} + END + + Sleep 5s + + ${resp}= Get Mock Server Messages + + # ${notification_events_on_mock_server}= Set Variable ${resp.json()} + +*** Keywords *** +Create Security Context between ${invoker_info} and ${provider_info} + # Discover APIs by invoker + ${discover_response}= Get Request Capif + ... ${DISCOVER_URL}${invoker_info['api_invoker_id']}&aef-id=${provider_info['aef_id']} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${invoker_info['management_cert']} + + Check Response Variable Type And Values ${discover_response} 200 DiscoveredAPIs + + # create Security Context + ${request_service_security_body} ${api_ids}= Create Service Security From Discover Response + ... http://${CAPIF_HOSTNAME}:${CAPIF_HTTP_PORT}/test + ... ${discover_response} + ... legacy=${FALSE} + ${resp}= Put Request Capif + ... /capif-security/v1/trustedInvokers/${invoker_info['api_invoker_id']} + ... json=${request_service_security_body} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${invoker_info['management_cert']} + + Set To Dictionary ${invoker_info} security_body=${request_service_security_body} + # Check Service Security + Check Response Variable Type And Values ${resp} 201 ServiceSecurity + ${resource_url}= Check Location Header ${resp} ${LOCATION_SECURITY_RESOURCE_REGEX} + + ${api_invoker_policies_list}= Create List + + FOR ${api_id} IN @{api_ids} + Log ${api_id} + ${resp}= Get Request Capif + ... /access-control-policy/v1/accessControlPolicyList/${api_id}?aef-id=${provider_info['aef_id']} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${provider_info['aef_username']} + Check Response Variable Type And Values ${resp} 200 AccessControlPolicyList + Should Not Be Empty ${resp.json()['apiInvokerPolicies']} + ${api_invoker_policies}= Set Variable ${resp.json()['apiInvokerPolicies']} + ${api_invoker_policies_list}= Set Variable ${api_invoker_policies} + END + + Log List ${api_invoker_policies_list} + + RETURN ${api_invoker_policies_list} + +Update Security Context between ${invoker_info} and ${provider_info} + # Discover APIs by invoker + ${discover_response}= Get Request Capif + ... ${DISCOVER_URL}${invoker_info['api_invoker_id']}&aef-id=${provider_info['aef_id']} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${invoker_info['management_cert']} + + Check Response Variable Type And Values ${discover_response} 200 DiscoveredAPIs + + # create Security Context + ${request_service_security_body} ${api_ids}= Update Service Security With Discover Response + ... ${invoker_info['security_body']} + ... ${discover_response} + ... legacy=${FALSE} + ${resp}= Post Request Capif + ... /capif-security/v1/trustedInvokers/${invoker_info['api_invoker_id']}/update + ... json=${request_service_security_body} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${invoker_info['management_cert']} + + # Check Service Security + Check Response Variable Type And Values ${resp} 200 ServiceSecurity + ${resource_url}= Check Location Header ${resp} ${LOCATION_SECURITY_RESOURCE_REGEX} + + ${api_invoker_policies_list}= Create List + + ${api_id}= Get From List ${api_ids} -1 + Log ${api_id} + ${resp}= Get Request Capif + ... /access-control-policy/v1/accessControlPolicyList/${api_id}?aef-id=${provider_info['aef_id']} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${provider_info['aef_username']} + + Check Response Variable Type And Values ${resp} 200 AccessControlPolicyList + # Check returned values + Should Not Be Empty ${resp.json()['apiInvokerPolicies']} + + ${api_invoker_policies}= Set Variable ${resp.json()['apiInvokerPolicies']} + # Append To List ${api_invoker_policies_list} ${api_invoker_policies} + ${api_invoker_policies_list}= Set Variable ${api_invoker_policies} + + Log List ${api_invoker_policies_list} + + RETURN ${api_invoker_policies_list} + +Subscribe provider ${provider_info} to events ${events_list} with event filters ${event_filters} + ${resp}= + ... Subscribe ${provider_info['amf_id']} with ${provider_info['amf_username']} to ${events_list} with ${event_filters} + + Check Response Variable Type And Values ${resp} 201 EventSubscription + ${subscriber_id} ${subscription_id}= Check Event Location Header ${resp} + + RETURN ${subscription_id} + +Subscribe invoker ${invoker_info} to events ${events_list} with event filters ${event_filters} + ${resp}= + ... Subscribe ${invoker_info['api_invoker_id']} with ${invoker_info['management_cert']} to ${events_list} with ${event_filters} + + Check Response Variable Type And Values ${resp} 201 EventSubscription + ${subscriber_id} ${subscription_id}= Check Event Location Header ${resp} + + RETURN ${subscription_id} + +Subscribe invoker ${invoker_info} to events ${events_list} with event req ${event_req} + ${resp}= + ... Subscribe ${invoker_info['api_invoker_id']} with ${invoker_info['management_cert']} to ${events_list} with ${event_req} + + Check Response Variable Type And Values ${resp} 201 EventSubscription + ${subscriber_id} ${subscription_id}= Check Event Location Header ${resp} + + RETURN ${subscription_id} + +Subscribe ${subscriber_id} with ${username} to ${events_list} with ${event_req} + ${request_body}= Create Events Subscription + ... events=@{events_list} + ... notification_destination=${NOTIFICATION_DESTINATION_URL}/testing + ... supported_features=C + ... event_req=${event_req} + ${resp}= Post Request Capif + ... /capif-events/v1/${subscriber_id}/subscriptions + ... json=${request_body} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${username} + + RETURN ${resp} + +Send Log Message to CAPIF + [Arguments] ${api_id} ${service_name} ${invoker_info} ${provider_info} @{results} + ${api_ids}= Create List ${api_id} + ${api_names}= Create List ${service_name} + ${request_body}= Create Log Entry + ... ${provider_info['aef_id']} + ... ${invoker_info['api_invoker_id']} + ... ${api_ids} + ... ${api_names} + ... results=@{results} + ${resp}= Post Request Capif + ... /api-invocation-logs/v1/${provider_info['aef_id']}/logs + ... json=${request_body} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${provider_info['amf_username']} + + Check Response Variable Type And Values ${resp} 201 InvocationLog + ${resource_url}= Check Location Header ${resp} ${LOCATION_LOGGING_RESOURCE_REGEX} + + RETURN ${request_body} + +Check not valid ${resp} with event filter ${attribute_snake_case} for event ${event} + # Check Results + ${invalid_param}= Create Dictionary + ... param=eventFilter + ... reason=The eventFilter {'${attribute_snake_case}'} for event ${event} are not applicable. + ${invalid_param_list}= Create List ${invalid_param} + Check Response Variable Type And Values + ... ${resp} + ... 400 + ... ProblemDetails + ... title=Bad Request + ... status=400 + ... detail=Bad Param + ... cause=Invalid eventFilter for event ${event} + ... invalidParams=${invalid_param_list} diff --git a/tests/libraries/api_events/bodyRequests.py b/tests/libraries/api_events/bodyRequests.py index 1bb4604fe2fd518a1d81ad1d10c2637e6d3257c8..4e853a624f0d1b719adb03b88aa16d5617e2d1d9 100644 --- a/tests/libraries/api_events/bodyRequests.py +++ b/tests/libraries/api_events/bodyRequests.py @@ -39,6 +39,21 @@ def create_capif_event_filter(aefIds=None, apiIds=None, apiInvokerIds=None): return capif_event_filter +def create_event_req(imm_rep=None, notif_method=None, max_report_nbr=None, mon_dur=None, rep_period=None): + data = dict() + if imm_rep is not None: + data['immRep'] = imm_rep + if notif_method is not None: + data['notifMethod'] = notif_method + if max_report_nbr is not None: + data['maxReportNbr'] = max_report_nbr + if mon_dur is not None: + data['monDur'] = mon_dur + if rep_period is not None: + data['repPeriod'] = rep_period + return data + + def create_default_event_req(): return { "grpRepTime": 5, @@ -51,6 +66,7 @@ def create_default_event_req(): } + def create_websock_notif_config_default(): return { "requestWebsocketUri": True,