'celery logging to different files for different tasks

My Celery service modules' hierarchy is:

my_celery_package
|-> my_celery_pkg_base
||---> celery.py
||---> settings.py
||---> ...
|-> my_celery_tasks
||---> auxiliary_tasks.py
||---> parser_tasks.py
||---> ...
|-> lib
||---> logger.py
||---> ...

I want to setup the loggers after celery is setup such that each time one of the periodic auxiliary tasks is called, it will use the timed rotating logger that outputs to auxiliary_tasks.log. Similarly for periodic_tasks.

But right now, if i call a task for any of these periodic tasks from the celery worker's shell, the log file is getting overwritten.

my_celery_package/lib/logger.py:

import logging
import os
from logging.handlers import TimedRotatingFileHandler
from celery.utils.log import get_task_logger
from celery.app.log import TaskFormatter

from my_celery_package.my_celery_pkg_base import settings

_log_format = TaskFormatter(
        '%(task_id)s - %(asctime)s - %(task_name)s - %(name)s - [%(levelname)s] - (%(filename)s).%(funcName)s(%(lineno)d) - %(message)s')
_log_base_path = settings.LOG_BASE_PATH


def get_file_handler(name):
    file_handler = TimedRotatingFileHandler(os.path.join(_log_base_path + name + '.log'), when='midnight', interval=1,
                                            backupCount=30)
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(_log_format)
    return file_handler


def get_stream_handler():
    stream_handler = logging.StreamHandler()
    stream_handler.setLevel(logging.INFO)
    stream_handler.setFormatter(_log_format)
    return stream_handler


def get_logger(name):
    logger = get_task_logger(name)
    logger.setLevel(logging.DEBUG)
    logger.addHandler(get_file_handler(name))
    logger.addHandler(get_stream_handler())
    return logger

my_celery_package/my_celery_pkg_base/celery.py:

...
from my_celery_package.lib import logger

LOGGER = logger.get_logger('custom_consumer')
count_logger = logger.get_logger('message_count')
...

my_celery_package/my_celery_tasks/auxiliary_tasks.py:

...
from my_celery_package.lib import logger

LOGGER = logger.get_logger('auxiliary_tasks')
...

my_celery_package/my_celery_tasks/parser_tasks.py:

...
from my_celery_package.lib import logger

LOGGER = logger.get_logger('parser_tasks')
...

I'm aware of @signals.after_setup_logger.connect & @signals.after_setup_task_logger.connect, but not sure how to use them here.



Solution 1:[1]

Because BigQuery cannot calc AVG on TIME type, you would see the error message if you tried to do so.

Instead you could calc AVG by INT64.
The time_ts is timestamp format.
I tried to use time_diff to calc the differences from time to "00:00:00", then I could get the seconds in FLOAT64 format and cast it to INT64 format.
I create a function secondToTime. It's pretty straightforward to calc hour / minute / second and parse back to time format.

For the date format, I think you could do it in the same way.

create temp function secondToTime (seconds INT64)
    returns time 
    as (
        PARSE_TIME (
            "%H:%M:%S",
            concat(
                cast(seconds / 3600 as int),
                ":",
                cast(mod(seconds, 3600) / 60 as int),
                ":",
                mod(seconds, 60)
            )
        )
    );


with october_fall as (
    select
        extract (date from time_ts) as start_date,
        extract (time from time_ts) as start_time
    from `bigquery-public-data.hacker_news.comments`
    limit 10
) SELECT 
    avg(time_diff(start_time, time '00:00:00', second)),
    secondToTime(
        cast(avg(time_diff(start_time, time '00:00:00', second)) as INT64) 
    ),
    secondToTime(0),
    secondToTime(60),
    secondToTime(3601),
    secondToTime(7265)
FROM october_fall

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 鄭元傑