Newer
Older
# celery/tasks.py
from celery import Celery
import pymongo
import os
from bson.codec_options import CodecOptions
from config import Config
import aiohttp
import asyncio
# Celery Configuration
# celery = Celery(
# "notifications",
# broker=os.environ.get("CELERY_BROKER_URL", "redis://redis:6379/0"),
# backend=os.environ.get("CELERY_RESULT_BACKEND", "redis://redis:6379/0")
# )
celery = Celery(
"notifications",
broker=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0",
backend=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0"
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
)
celery.conf.beat_schedule = {
"check_notifications_collection": {
"task": "celery.tasks.check_notifications_collection",
"schedule": 1.0,
"args": (),
},
}
celery.conf.timezone = "UTC"
# MongoDB Connection
config = Config().get_config()
mongo_uri = f"mongodb://{config['mongo']['user']}:{config['mongo']['password']}@" \
f"{config['mongo']['host']}:{config['mongo']['port']}"
client = pymongo.MongoClient(mongo_uri)
notifications_col = client[config['mongo']['db']][config['mongo']['notifications_col']].with_options(codec_options=CodecOptions(tz_aware=True))
def serialize_clean_camel_case(obj):
res = obj.to_dict()
res = clean_empty(res)
res = dict_to_camel_case(res)
return res
def clean_empty(d):
if isinstance(d, dict):
return {
k: v
for k, v in ((k, clean_empty(v)) for k, v in d.items())
if v is not None or (isinstance(v, list) and len(v) == 0)
}
if isinstance(d, list):
return [v for v in map(clean_empty, d) if v is not None]
return d
def dict_to_camel_case(my_dict):
result = {}
for attr, value in my_dict.items():
if len(attr.split('_')) != 1:
my_key = ''.join(word.title() for word in attr.split('_'))
my_key = ''.join([my_key[0].lower(), my_key[1:]])
else:
my_key = attr
if my_key == "serviceApiCategory":
my_key = "serviceAPICategory"
elif my_key == "serviceApiDescriptions":
my_key = "serviceAPIDescriptions"
if isinstance(value, list):
result[my_key] = list(map(
lambda x: dict_to_camel_case(x) if isinstance(x, dict) else x, value ))
elif hasattr(value, "to_dict"):
result[my_key] = dict_to_camel_case(value)
elif isinstance(value, dict):
value = dict_to_camel_case(value)
result[my_key] = value
else:
result[my_key] = value
return result
async def send_request(url, data):
async with aiohttp.ClientSession() as session:
timeout = aiohttp.ClientTimeout(total=10)
headers = {'content-type': 'application/json'}
async with session.post(url, json=data, timeout=timeout, headers=headers) as response:
return await response.text()
async def send(url, data):
try:
response = await send_request(url, data)
print(response)
except asyncio.TimeoutError:
print("Timeout: Request timeout")
except Exception as e:
print("An exception occurred sending notification::" + str(e))
return False
@celery.task(name="celery.tasks.check_notifications_collection")
def my_periodic_task():
# print("Checking notifications collection...")
while True:
try:
notification_data = notifications_col.find_one_and_delete(
{"next_report_time": {"$lt": datetime.now(timezone.utc)}}
)
if not notification_data:
break
except pymongo.errors.AutoReconnect:
print("MongoDB connection failed. Retrying...")
continue
try:
print(f"sending notification to {notification_data['url']}")
asyncio.run(send(notification_data["url"], notification_data["notification"]))
except Exception as e:
print(f"Error sending notification: {e}")