'How to use google bigquery with celery?

Environment details

  • OS type:ubuntu
  • Python version: Python 3.10.1
  • pip version: pip 22.0.4
  • google-cloud-bigquery version: 2.34.0
  • celery version: 5.2.6

Steps to reproduce

  1. If I run celery as multiprocessing and call the to_dataframe() on the worker, it stops working. but run celery as single thread, it works fine.
  • multi thread cmd : celery -A app.worker worker
  • single thread cmd : celery -A app.worker worker -P sole
  1. And If I call the result() function instead of the to_dataframe() and print the data, the result is output normally.

Code example

celery_task = Celery(
    "app",
    broker="redis://127.0.0.1:6379/0",
    backend="redis://127.0.0.1:6379/0",
)

bq_client = bigquery.Client()

@celery_task.task()
def set_tracking():
    sql = "SELECT 1"
    df = bq_client.query(sql).to_dataframe()

# It works.
def set_tracking():
    sql = "SELECT 1"
    results = bq_client.query(sql).result()
    for result in results:
        print(result)
# [2022-04-24 20:03:58,826: WARNING/ForkPoolWorker-6] Row((1,), {'f0_': 0})

In order to minimize the time to receive data when using bigquery, the celery package is used to provide the service in the form of a background task.

However, there is no problem when receiving Big Query data in a single thread, but an error occurs only in multi-thread operation.

For debugging, there is no log output and the celery service just crashes, so it is difficult to solve it alone.

I would like to ask if there is a case where an error may occur when calling the to_dataframe() of the BigQuery package in multi-thread.

Thanks!



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source