'Celery task log using google-cloud-logging

I'm currently managing my API using Celery tasks and a kubernetes cluster on Google Cloud Platform.

Celery is automatically logging input and output of each task. This is something I want but I would like to use the possibility of google-cloud-logging to log input and output as jsonPayload.

I use for all other log the following:

from google.cloud.logging.handlers import CloudLoggingHandler
from google.cloud.logging_v2.handlers import setup_logging

# Imports the Cloud Logging client library
import google.cloud.logging

# Instantiates a client
client = google.cloud.logging.Client()

handler = CloudLoggingHandler(client)
setup_logging(handler)

import logging

logger = logging.getLogger(__name__)

data_dict = {"my": "data"}
logger.info("this is an example", extra={"json_fields": data_dict})

And I use Celery with the following template:

app = Celery(**my_params)

@app.task
def task_test(data):
    # Update dictonary with new data
    data["key1"] = "value1"
    return data

...

detection_task = celery.signature('tasks.task_test', args=([[{"hello": "world"}]]))
r = detection_task.apply_async()
data = r.get()

Here's an example of log I receive from Celery: enter image description here

The blurred part correspond to the dict/json I would like to have in a jsonPayload instead of a textPayload. (Also note that this log is marked as error on GCP but INFO from celery)

Any idea how I could connect python built-in logging, celery logger and gcp logger ?



Solution 1:[1]

To connect your Python logger with GCP Logger:

import logging
import google.cloud.logging
from google.cloud.logging.handlers import CloudLoggingHandler, setup_logging

client = google.cloud.logging.Client()
handler = CloudLoggingHandler(client, name="your_log_name")
cloud_logger = logging.getLogger('cloudLogger')
# configure cloud_logger
cloud_logger.addHandler(handler)

To connect this logger to Celery's logger:

def initialize_log(logger=None,loglevel=logging.DEBUG, **kwargs):
    logger = logging.getLogger('celery')
    handler = # use GCP handler defined above
    handler.setLevel(loglevel)
    logger.addHandler(handler)
    return logger

from celery.signals import after_setup_task_logger
after_setup_task_logger.connect(initialize_log)
from celery.signals import after_setup_logger
after_setup_logger.connect(initialize_log)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Abhinav Mathur