Loading services/celery/start_celery.sh +2 −2 Original line number Original line Diff line number Diff line Loading @@ -2,10 +2,10 @@ if [ "$CELERY_MODE" = "worker" ]; then if [ "$CELERY_MODE" = "worker" ]; then echo "Starting Celery Worker..." echo "Starting Celery Worker..." celery -A tasks worker --loglevel=info celery -A tasks worker elif [ "$CELERY_MODE" = "beat" ]; then elif [ "$CELERY_MODE" = "beat" ]; then echo "Iniciando Celery Beat..." echo "Iniciando Celery Beat..." celery -A tasks beat --loglevel=info celery -A tasks beat else else echo "ERROR: The environment variable CELERY_MODE is not set correctly (worker|beat)" echo "ERROR: The environment variable CELERY_MODE is not set correctly (worker|beat)" exit 1 exit 1 Loading services/celery/tasks.py +61 −6 Original line number Original line Diff line number Diff line Loading @@ -7,7 +7,10 @@ from bson.codec_options import CodecOptions from config import Config from config import Config import aiohttp import aiohttp import asyncio import asyncio import logging from logging.handlers import RotatingFileHandler # Initialize Celery celery = Celery( celery = Celery( "notifications", "notifications", broker=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0", broker=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0", Loading @@ -21,7 +24,54 @@ celery.conf.beat_schedule = { "args": (), "args": (), }, }, } } celery.conf.timezone = "UTC" celery.conf.timezone = "UTC" celery.conf.update(worker_hijack_root_logger=False) # Setting log level # Set the log level based on the environment variable or default to INFO log_level = os.getenv('LOG_LEVEL', 'INFO').upper() numeric_level = getattr(logging, log_level, logging.INFO) def verbose_formatter(): return logging.Formatter( '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "logger": "%(name)s", "function": "%(funcName)s", "line": %(lineno)d, "message": %(message)s}', datefmt='%d/%m/%Y %H:%M:%S' ) def configure_logging(): formatter = verbose_formatter() console_handler = logging.StreamHandler() console_handler.setLevel(numeric_level) console_handler.setFormatter(formatter) file_handler = RotatingFileHandler( filename="celery_logs.log", maxBytes=1024 * 1024 * 100, backupCount=20 ) file_handler.setLevel(numeric_level) file_handler.setFormatter(formatter) # Root logger configuration root_logger = logging.getLogger() root_logger.setLevel(numeric_level) root_logger.handlers = [] root_logger.addHandler(console_handler) root_logger.addHandler(file_handler) # Optional: configure specific logger logger = logging.getLogger(__name__) logger.setLevel(numeric_level) return logger logger = configure_logging() # MongoDB Connection # MongoDB Connection config = Config().get_config() config = Config().get_config() Loading @@ -38,6 +88,7 @@ def serialize_clean_camel_case(obj): return res return res # Function to clean empty values from a dictionary def clean_empty(d): def clean_empty(d): if isinstance(d, dict): if isinstance(d, dict): return { return { Loading @@ -49,6 +100,7 @@ def clean_empty(d): return [v for v in map(clean_empty, d) if v is not None] return [v for v in map(clean_empty, d) if v is not None] return d return d # Function to convert snake_case keys to camelCase def dict_to_camel_case(my_dict): def dict_to_camel_case(my_dict): Loading Loading @@ -82,6 +134,7 @@ def dict_to_camel_case(my_dict): return result return result # Functions to send a request async def send_request(url, data): async def send_request(url, data): async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout(total=10) timeout = aiohttp.ClientTimeout(total=10) Loading @@ -91,14 +144,16 @@ async def send_request(url, data): async def send(url, data): async def send(url, data): try: try: logger.info(f"Sending notification to {url} with data: {data}") response = await send_request(url, data) response = await send_request(url, data) print(response) logger.info(response) except asyncio.TimeoutError: except asyncio.TimeoutError: print("Timeout: Request timeout") logger.info("Timeout: Request timeout") except Exception as e: except Exception as e: print("An exception occurred sending notification::" + str(e)) logger.info("An exception occurred sending notification::" + str(e)) return False return False # Periodic task to check the notifications collection @celery.task(name="celery.tasks.check_notifications_collection") @celery.task(name="celery.tasks.check_notifications_collection") def my_periodic_task(): def my_periodic_task(): while True: while True: Loading @@ -109,12 +164,12 @@ def my_periodic_task(): if not notification_data: if not notification_data: break break except pymongo.errors.AutoReconnect: except pymongo.errors.AutoReconnect: print("MongoDB connection failed. Retrying...") logger.info("MongoDB connection failed. Retrying...") continue continue try: try: print(f"sending notification to {notification_data['url']}") logger.info(f"Notification for suscription {notification_data["subscription_id"]} ready to send") asyncio.run(send(notification_data["url"], notification_data["notification"])) asyncio.run(send(notification_data["url"], notification_data["notification"])) except Exception as e: except Exception as e: print(f"Error sending notification: {e}") logger.info(f"Error sending notification: {e}") services/docker-compose-capif.yml +2 −0 Original line number Original line Diff line number Diff line Loading @@ -182,6 +182,7 @@ services: - CELERY_MODE=worker - CELERY_MODE=worker - REDIS_HOST=redis - REDIS_HOST=redis - REDIS_PORT=6379 - REDIS_PORT=6379 - LOG_LEVEL=${LOG_LEVEL} depends_on: depends_on: - redis - redis - mongo - mongo Loading @@ -193,6 +194,7 @@ services: - CELERY_MODE=beat - CELERY_MODE=beat - REDIS_HOST=redis - REDIS_HOST=redis - REDIS_PORT=6379 - REDIS_PORT=6379 - LOG_LEVEL=${LOG_LEVEL} depends_on: depends_on: - redis - redis - mongo - mongo Loading Loading
services/celery/start_celery.sh +2 −2 Original line number Original line Diff line number Diff line Loading @@ -2,10 +2,10 @@ if [ "$CELERY_MODE" = "worker" ]; then if [ "$CELERY_MODE" = "worker" ]; then echo "Starting Celery Worker..." echo "Starting Celery Worker..." celery -A tasks worker --loglevel=info celery -A tasks worker elif [ "$CELERY_MODE" = "beat" ]; then elif [ "$CELERY_MODE" = "beat" ]; then echo "Iniciando Celery Beat..." echo "Iniciando Celery Beat..." celery -A tasks beat --loglevel=info celery -A tasks beat else else echo "ERROR: The environment variable CELERY_MODE is not set correctly (worker|beat)" echo "ERROR: The environment variable CELERY_MODE is not set correctly (worker|beat)" exit 1 exit 1 Loading
services/celery/tasks.py +61 −6 Original line number Original line Diff line number Diff line Loading @@ -7,7 +7,10 @@ from bson.codec_options import CodecOptions from config import Config from config import Config import aiohttp import aiohttp import asyncio import asyncio import logging from logging.handlers import RotatingFileHandler # Initialize Celery celery = Celery( celery = Celery( "notifications", "notifications", broker=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0", broker=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0", Loading @@ -21,7 +24,54 @@ celery.conf.beat_schedule = { "args": (), "args": (), }, }, } } celery.conf.timezone = "UTC" celery.conf.timezone = "UTC" celery.conf.update(worker_hijack_root_logger=False) # Setting log level # Set the log level based on the environment variable or default to INFO log_level = os.getenv('LOG_LEVEL', 'INFO').upper() numeric_level = getattr(logging, log_level, logging.INFO) def verbose_formatter(): return logging.Formatter( '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "logger": "%(name)s", "function": "%(funcName)s", "line": %(lineno)d, "message": %(message)s}', datefmt='%d/%m/%Y %H:%M:%S' ) def configure_logging(): formatter = verbose_formatter() console_handler = logging.StreamHandler() console_handler.setLevel(numeric_level) console_handler.setFormatter(formatter) file_handler = RotatingFileHandler( filename="celery_logs.log", maxBytes=1024 * 1024 * 100, backupCount=20 ) file_handler.setLevel(numeric_level) file_handler.setFormatter(formatter) # Root logger configuration root_logger = logging.getLogger() root_logger.setLevel(numeric_level) root_logger.handlers = [] root_logger.addHandler(console_handler) root_logger.addHandler(file_handler) # Optional: configure specific logger logger = logging.getLogger(__name__) logger.setLevel(numeric_level) return logger logger = configure_logging() # MongoDB Connection # MongoDB Connection config = Config().get_config() config = Config().get_config() Loading @@ -38,6 +88,7 @@ def serialize_clean_camel_case(obj): return res return res # Function to clean empty values from a dictionary def clean_empty(d): def clean_empty(d): if isinstance(d, dict): if isinstance(d, dict): return { return { Loading @@ -49,6 +100,7 @@ def clean_empty(d): return [v for v in map(clean_empty, d) if v is not None] return [v for v in map(clean_empty, d) if v is not None] return d return d # Function to convert snake_case keys to camelCase def dict_to_camel_case(my_dict): def dict_to_camel_case(my_dict): Loading Loading @@ -82,6 +134,7 @@ def dict_to_camel_case(my_dict): return result return result # Functions to send a request async def send_request(url, data): async def send_request(url, data): async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout(total=10) timeout = aiohttp.ClientTimeout(total=10) Loading @@ -91,14 +144,16 @@ async def send_request(url, data): async def send(url, data): async def send(url, data): try: try: logger.info(f"Sending notification to {url} with data: {data}") response = await send_request(url, data) response = await send_request(url, data) print(response) logger.info(response) except asyncio.TimeoutError: except asyncio.TimeoutError: print("Timeout: Request timeout") logger.info("Timeout: Request timeout") except Exception as e: except Exception as e: print("An exception occurred sending notification::" + str(e)) logger.info("An exception occurred sending notification::" + str(e)) return False return False # Periodic task to check the notifications collection @celery.task(name="celery.tasks.check_notifications_collection") @celery.task(name="celery.tasks.check_notifications_collection") def my_periodic_task(): def my_periodic_task(): while True: while True: Loading @@ -109,12 +164,12 @@ def my_periodic_task(): if not notification_data: if not notification_data: break break except pymongo.errors.AutoReconnect: except pymongo.errors.AutoReconnect: print("MongoDB connection failed. Retrying...") logger.info("MongoDB connection failed. Retrying...") continue continue try: try: print(f"sending notification to {notification_data['url']}") logger.info(f"Notification for suscription {notification_data["subscription_id"]} ready to send") asyncio.run(send(notification_data["url"], notification_data["notification"])) asyncio.run(send(notification_data["url"], notification_data["notification"])) except Exception as e: except Exception as e: print(f"Error sending notification: {e}") logger.info(f"Error sending notification: {e}")
services/docker-compose-capif.yml +2 −0 Original line number Original line Diff line number Diff line Loading @@ -182,6 +182,7 @@ services: - CELERY_MODE=worker - CELERY_MODE=worker - REDIS_HOST=redis - REDIS_HOST=redis - REDIS_PORT=6379 - REDIS_PORT=6379 - LOG_LEVEL=${LOG_LEVEL} depends_on: depends_on: - redis - redis - mongo - mongo Loading @@ -193,6 +194,7 @@ services: - CELERY_MODE=beat - CELERY_MODE=beat - REDIS_HOST=redis - REDIS_HOST=redis - REDIS_PORT=6379 - REDIS_PORT=6379 - LOG_LEVEL=${LOG_LEVEL} depends_on: depends_on: - redis - redis - mongo - mongo Loading