diff --git a/services/celery/start_celery.sh b/services/celery/start_celery.sh index 979e325b07c8b4abbb96baed626586c8d6082233..778ae7b3e105a04e4860884ac73d5d692f34092b 100644 --- a/services/celery/start_celery.sh +++ b/services/celery/start_celery.sh @@ -2,10 +2,10 @@ if [ "$CELERY_MODE" = "worker" ]; then echo "Starting Celery Worker..." - celery -A tasks worker --loglevel=info + celery -A tasks worker elif [ "$CELERY_MODE" = "beat" ]; then echo "Iniciando Celery Beat..." - celery -A tasks beat --loglevel=info + celery -A tasks beat else echo "ERROR: The environment variable CELERY_MODE is not set correctly (worker|beat)" exit 1 diff --git a/services/celery/tasks.py b/services/celery/tasks.py index 014769203779c061343ec57ca7b24f9fd8385456..212902671d31f430e47f4a0c6bd1536a044f5a58 100644 --- a/services/celery/tasks.py +++ b/services/celery/tasks.py @@ -7,7 +7,10 @@ from bson.codec_options import CodecOptions from config import Config import aiohttp import asyncio +import logging +from logging.handlers import RotatingFileHandler +# Initialize Celery celery = Celery( "notifications", broker=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0", @@ -21,7 +24,54 @@ celery.conf.beat_schedule = { "args": (), }, } + celery.conf.timezone = "UTC" +celery.conf.update(worker_hijack_root_logger=False) + + +# Setting log level +# Set the log level based on the environment variable or default to INFO +log_level = os.getenv('LOG_LEVEL', 'INFO').upper() +numeric_level = getattr(logging, log_level, logging.INFO) + + +def verbose_formatter(): + return logging.Formatter( + '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "logger": "%(name)s", "function": "%(funcName)s", "line": %(lineno)d, "message": %(message)s}', + datefmt='%d/%m/%Y %H:%M:%S' + ) + + +def configure_logging(): + + formatter = verbose_formatter() + + console_handler = logging.StreamHandler() + console_handler.setLevel(numeric_level) + console_handler.setFormatter(formatter) + + file_handler = RotatingFileHandler( + filename="celery_logs.log", + maxBytes=1024 * 1024 * 100, + backupCount=20 + ) + file_handler.setLevel(numeric_level) + file_handler.setFormatter(formatter) + + # Root logger configuration + root_logger = logging.getLogger() + root_logger.setLevel(numeric_level) + root_logger.handlers = [] + root_logger.addHandler(console_handler) + root_logger.addHandler(file_handler) + + # Optional: configure specific logger + logger = logging.getLogger(__name__) + logger.setLevel(numeric_level) + return logger + + +logger = configure_logging() # MongoDB Connection config = Config().get_config() @@ -38,6 +88,7 @@ def serialize_clean_camel_case(obj): return res +# Function to clean empty values from a dictionary def clean_empty(d): if isinstance(d, dict): return { @@ -49,6 +100,7 @@ def clean_empty(d): return [v for v in map(clean_empty, d) if v is not None] return d +# Function to convert snake_case keys to camelCase def dict_to_camel_case(my_dict): @@ -82,6 +134,7 @@ def dict_to_camel_case(my_dict): return result +# Functions to send a request async def send_request(url, data): async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout(total=10) @@ -91,14 +144,16 @@ async def send_request(url, data): async def send(url, data): try: + logger.info(f"Sending notification to {url} with data: {data}") response = await send_request(url, data) - print(response) + logger.info(response) except asyncio.TimeoutError: - print("Timeout: Request timeout") + logger.info("Timeout: Request timeout") except Exception as e: - print("An exception occurred sending notification::" + str(e)) + logger.info("An exception occurred sending notification::" + str(e)) return False +# Periodic task to check the notifications collection @celery.task(name="celery.tasks.check_notifications_collection") def my_periodic_task(): while True: @@ -109,12 +164,12 @@ def my_periodic_task(): if not notification_data: break except pymongo.errors.AutoReconnect: - print("MongoDB connection failed. Retrying...") + logger.info("MongoDB connection failed. Retrying...") continue try: - print(f"sending notification to {notification_data['url']}") + logger.info(f"Notification for suscription {notification_data["subscription_id"]} ready to send") asyncio.run(send(notification_data["url"], notification_data["notification"])) except Exception as e: - print(f"Error sending notification: {e}") + logger.info(f"Error sending notification: {e}") diff --git a/services/docker-compose-capif.yml b/services/docker-compose-capif.yml index 06247465c40bca268b9b354de4e0b972600bddf0..b6e2bfecd91d06df500a77384cb193a9d2cd85cf 100644 --- a/services/docker-compose-capif.yml +++ b/services/docker-compose-capif.yml @@ -182,6 +182,7 @@ services: - CELERY_MODE=worker - REDIS_HOST=redis - REDIS_PORT=6379 + - LOG_LEVEL=${LOG_LEVEL} depends_on: - redis - mongo @@ -193,6 +194,7 @@ services: - CELERY_MODE=beat - REDIS_HOST=redis - REDIS_PORT=6379 + - LOG_LEVEL=${LOG_LEVEL} depends_on: - redis - mongo