Skip to content
tasks.py 4.1 KiB
Newer Older
# celery/tasks.py
from celery import Celery
Pelayo Torres's avatar
Pelayo Torres committed
from datetime import datetime, timezone
import pymongo
import os
from bson.codec_options import CodecOptions
from config import Config
import aiohttp
import asyncio

# Celery Configuration
Pelayo Torres's avatar
Pelayo Torres committed
# celery = Celery(
#     "notifications",
#     broker=os.environ.get("CELERY_BROKER_URL", "redis://redis:6379/0"),
#     backend=os.environ.get("CELERY_RESULT_BACKEND", "redis://redis:6379/0")
# )

celery = Celery(
    "notifications",
Pelayo Torres's avatar
Pelayo Torres committed
    broker=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0",
    backend=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0"
)

celery.conf.beat_schedule = {
    "check_notifications_collection": {
        "task": "celery.tasks.check_notifications_collection",
        "schedule": 1.0,
        "args": (),
    },
}
celery.conf.timezone = "UTC"

# MongoDB Connection
config = Config().get_config()

mongo_uri = f"mongodb://{config['mongo']['user']}:{config['mongo']['password']}@" \
                      f"{config['mongo']['host']}:{config['mongo']['port']}"
client = pymongo.MongoClient(mongo_uri)
notifications_col = client[config['mongo']['db']][config['mongo']['notifications_col']].with_options(codec_options=CodecOptions(tz_aware=True))

def serialize_clean_camel_case(obj):
    res = obj.to_dict()
    res = clean_empty(res)
    res = dict_to_camel_case(res)

    return res

def clean_empty(d):
    if isinstance(d, dict):
        return {
            k: v
            for k, v in ((k, clean_empty(v)) for k, v in d.items())
            if v is not None or (isinstance(v, list) and len(v) == 0)
        }
    if isinstance(d, list):
        return [v for v in map(clean_empty, d) if v is not None]
    return d

def dict_to_camel_case(my_dict):


        result = {}

        for attr, value in my_dict.items():

            if len(attr.split('_')) != 1:
                my_key = ''.join(word.title() for word in attr.split('_'))
                my_key = ''.join([my_key[0].lower(), my_key[1:]])
            else:
                my_key = attr

            if my_key == "serviceApiCategory":
                my_key = "serviceAPICategory"
            elif my_key == "serviceApiDescriptions":
                my_key = "serviceAPIDescriptions"

            if isinstance(value, list):
                result[my_key] = list(map(
                    lambda x: dict_to_camel_case(x) if isinstance(x, dict) else x, value ))

            elif hasattr(value, "to_dict"):
                result[my_key] = dict_to_camel_case(value)

            elif isinstance(value, dict):
                value = dict_to_camel_case(value)
                result[my_key] = value
            else:
                result[my_key] = value

        return result

async def send_request(url, data):
    async with aiohttp.ClientSession() as session:
        timeout = aiohttp.ClientTimeout(total=10)
        headers = {'content-type': 'application/json'}
        async with session.post(url, json=data, timeout=timeout, headers=headers) as response:
            return await response.text()

async def send(url, data):
    try:
        response = await send_request(url, data)
        print(response)
    except asyncio.TimeoutError:
        print("Timeout: Request timeout")
    except Exception as e:
        print("An exception occurred sending notification::" + str(e))
        return False

@celery.task(name="celery.tasks.check_notifications_collection")
def my_periodic_task():
Pelayo Torres's avatar
Pelayo Torres committed
    # print("Checking notifications collection...")
    while True:
        try:
            notification_data = notifications_col.find_one_and_delete(
            {"next_report_time": {"$lt": datetime.now(timezone.utc)}}
        )
            if not notification_data:
                break
        except pymongo.errors.AutoReconnect:
            print("MongoDB connection failed. Retrying...")
            continue

        try:
            print(f"sending notification to {notification_data['url']}")
            asyncio.run(send(notification_data["url"], notification_data["notification"]))
        except Exception as e:
            print(f"Error sending notification: {e}")

Pelayo Torres's avatar
Pelayo Torres committed
    # print("Finished processing notifications.")