# celery/tasks.py from celery import Celery from datetime import datetime, timezone import pymongo import os from bson.codec_options import CodecOptions from config import Config import aiohttp import asyncio # Celery Configuration # celery = Celery( # "notifications", # broker=os.environ.get("CELERY_BROKER_URL", "redis://redis:6379/0"), # backend=os.environ.get("CELERY_RESULT_BACKEND", "redis://redis:6379/0") # ) celery = Celery( "notifications", broker=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0", backend=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0" ) celery.conf.beat_schedule = { "check_notifications_collection": { "task": "celery.tasks.check_notifications_collection", "schedule": 1.0, "args": (), }, } celery.conf.timezone = "UTC" # MongoDB Connection config = Config().get_config() mongo_uri = f"mongodb://{config['mongo']['user']}:{config['mongo']['password']}@" \ f"{config['mongo']['host']}:{config['mongo']['port']}" client = pymongo.MongoClient(mongo_uri) notifications_col = client[config['mongo']['db']][config['mongo']['notifications_col']].with_options(codec_options=CodecOptions(tz_aware=True)) def serialize_clean_camel_case(obj): res = obj.to_dict() res = clean_empty(res) res = dict_to_camel_case(res) return res def clean_empty(d): if isinstance(d, dict): return { k: v for k, v in ((k, clean_empty(v)) for k, v in d.items()) if v is not None or (isinstance(v, list) and len(v) == 0) } if isinstance(d, list): return [v for v in map(clean_empty, d) if v is not None] return d def dict_to_camel_case(my_dict): result = {} for attr, value in my_dict.items(): if len(attr.split('_')) != 1: my_key = ''.join(word.title() for word in attr.split('_')) my_key = ''.join([my_key[0].lower(), my_key[1:]]) else: my_key = attr if my_key == "serviceApiCategory": my_key = "serviceAPICategory" elif my_key == "serviceApiDescriptions": my_key = "serviceAPIDescriptions" if isinstance(value, list): result[my_key] = list(map( lambda x: dict_to_camel_case(x) if isinstance(x, dict) else x, value )) elif hasattr(value, "to_dict"): result[my_key] = dict_to_camel_case(value) elif isinstance(value, dict): value = dict_to_camel_case(value) result[my_key] = value else: result[my_key] = value return result async def send_request(url, data): async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout(total=10) headers = {'content-type': 'application/json'} async with session.post(url, json=data, timeout=timeout, headers=headers) as response: return await response.text() async def send(url, data): try: response = await send_request(url, data) print(response) except asyncio.TimeoutError: print("Timeout: Request timeout") except Exception as e: print("An exception occurred sending notification::" + str(e)) return False @celery.task(name="celery.tasks.check_notifications_collection") def my_periodic_task(): # print("Checking notifications collection...") while True: try: notification_data = notifications_col.find_one_and_delete( {"next_report_time": {"$lt": datetime.now(timezone.utc)}} ) if not notification_data: break except pymongo.errors.AutoReconnect: print("MongoDB connection failed. Retrying...") continue try: print(f"sending notification to {notification_data['url']}") asyncio.run(send(notification_data["url"], notification_data["notification"])) except Exception as e: print(f"Error sending notification: {e}") # print("Finished processing notifications.")