Commit aeba4f8d authored by guillecxb's avatar guillecxb
Browse files

Merge branch 'staging' of ssh://labs.etsi.org:29419/ocf/capif into...

Merge branch 'staging' of ssh://labs.etsi.org:29419/ocf/capif into OCF152-supported-feature-negotiation-for-the-security-service
parents 407c1a05 a1805f51
Loading
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -2,10 +2,10 @@

if [ "$CELERY_MODE" = "worker" ]; then
    echo "Starting Celery Worker..."
    celery -A tasks worker --loglevel=info
    celery -A tasks worker
elif [ "$CELERY_MODE" = "beat" ]; then
    echo "Iniciando Celery Beat..."
    celery -A tasks beat --loglevel=info
    celery -A tasks beat
else
    echo "ERROR: The environment variable CELERY_MODE is not set correctly (worker|beat)"
    exit 1
+61 −6
Original line number Diff line number Diff line
@@ -7,7 +7,10 @@ from bson.codec_options import CodecOptions
from config import Config
import aiohttp
import asyncio
import logging
from logging.handlers import RotatingFileHandler

# Initialize Celery
celery = Celery(
    "notifications",
    broker=f"redis://{os.getenv("REDIS_HOST")}:{os.getenv("REDIS_PORT")}/0",
@@ -21,7 +24,54 @@ celery.conf.beat_schedule = {
        "args": (),
    },
}

celery.conf.timezone = "UTC"
celery.conf.update(worker_hijack_root_logger=False)


# Setting log level
# Set the log level based on the environment variable or default to INFO
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
numeric_level = getattr(logging, log_level, logging.INFO)


def verbose_formatter():
    return logging.Formatter(
        '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "logger": "%(name)s", "function": "%(funcName)s", "line": %(lineno)d, "message": %(message)s}',
        datefmt='%d/%m/%Y %H:%M:%S'
    )


def configure_logging():

    formatter = verbose_formatter()

    console_handler = logging.StreamHandler()
    console_handler.setLevel(numeric_level)
    console_handler.setFormatter(formatter)

    file_handler = RotatingFileHandler(
        filename="celery_logs.log",
        maxBytes=1024 * 1024 * 100,
        backupCount=20
    )
    file_handler.setLevel(numeric_level)
    file_handler.setFormatter(formatter)

    # Root logger configuration
    root_logger = logging.getLogger()
    root_logger.setLevel(numeric_level)
    root_logger.handlers = []
    root_logger.addHandler(console_handler)
    root_logger.addHandler(file_handler)

    # Optional: configure specific logger
    logger = logging.getLogger(__name__)
    logger.setLevel(numeric_level)
    return logger


logger = configure_logging()

# MongoDB Connection
config = Config().get_config()
@@ -38,6 +88,7 @@ def serialize_clean_camel_case(obj):

    return res

# Function to clean empty values from a dictionary
def clean_empty(d):
    if isinstance(d, dict):
        return {
@@ -49,6 +100,7 @@ def clean_empty(d):
        return [v for v in map(clean_empty, d) if v is not None]
    return d

# Function to convert snake_case keys to camelCase
def dict_to_camel_case(my_dict):


@@ -82,6 +134,7 @@ def dict_to_camel_case(my_dict):

        return result

# Functions to send a request
async def send_request(url, data):
    async with aiohttp.ClientSession() as session:
        timeout = aiohttp.ClientTimeout(total=10)
@@ -91,14 +144,16 @@ async def send_request(url, data):

async def send(url, data):
    try:
        logger.info(f"Sending notification to {url} with data: {data}")
        response = await send_request(url, data)
        print(response)
        logger.info(response)
    except asyncio.TimeoutError:
        print("Timeout: Request timeout")
        logger.info("Timeout: Request timeout")
    except Exception as e:
        print("An exception occurred sending notification::" + str(e))
        logger.info("An exception occurred sending notification::" + str(e))
        return False

# Periodic task to check the notifications collection
@celery.task(name="celery.tasks.check_notifications_collection")
def my_periodic_task():
    while True:
@@ -109,12 +164,12 @@ def my_periodic_task():
            if not notification_data:
                break
        except pymongo.errors.AutoReconnect:
            print("MongoDB connection failed. Retrying...")
            logger.info("MongoDB connection failed. Retrying...")
            continue

        try:
            print(f"sending notification to {notification_data['url']}")
            logger.info(f"Notification for suscription {notification_data["subscription_id"]} ready to send")
            asyncio.run(send(notification_data["url"], notification_data["notification"]))
        except Exception as e:
            print(f"Error sending notification: {e}")
            logger.info(f"Error sending notification: {e}")
+2 −0
Original line number Diff line number Diff line
@@ -182,6 +182,7 @@ services:
      - CELERY_MODE=worker
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - LOG_LEVEL=${LOG_LEVEL}
    depends_on:
      - redis
      - mongo
@@ -193,6 +194,7 @@ services:
      - CELERY_MODE=beat
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - LOG_LEVEL=${LOG_LEVEL}
    depends_on:
      - redis
      - mongo