'Celery runing subtasks asynchronously within a task and waiting for results
I have a celery task that launches three other celery tasks. I want these tasks to execute asynchronously and wait for them to finish before i resume the parent task. However the child tasks are running synchronosly and I don't know why. The problem started when I upgraded celery from 4.4.7 to 5.0.0
app_celery.py
@app.task(name="app_celery.scraping_process", soft_time_limit=900, time_limit=960, max_retries=3)
def scraping_process():
sources = ["a", "b", "c"]
job = group((company_representation.s(src) for src in sources))
result = job.apply_async(queue="spiders", routing_key="spiders")
while not result.ready():
time.sleep(5)
@app.task(name="app_celery.company_representation", max_retries=3)
def company_representation(source: str):
# do something
time.sleep(60)
I am running celery like this:
celery -A app_celery worker -c 8 -Q spiders -n spiders@%%h
celery -A app_celery worker -c 2 -Q companies -n companies@%%h --without-mingle --without-heartbeat -Ofair
celery==5.0.0
Solution 1:[1]
You could add the task id's to a list and then do something like:
def poll_job_status(active_jobs):
if len(active_jobs) == 1:
task = task.AsyncResult(active_jobs[0])
if not task.ready():
return active_jobs
_new_active_jobs = []
for taskid in active_jobs:
task = task.AsyncResult(taskid)
if task.state == "PENDING" or task.state == "RETRY":
_new_active_jobs.append(taskid)
active_jobs = _new_active_jobs
return active_jobs
So you would iterate over the list of task IDs and check if the task is complete or not. And then if the list is empty you know that all the tasks has run and you can carry on with other operations. example usage would be:
active_tasks_list = []
active_tasks_list.append(task.delay(args).id)
while len(active_tasks_list) > 0:
poll_job_status(active_tasks_list)
# carry on other processes
This is ideal if you have numerous tasks that you want to keep track of.
Solution 2:[2]
You can try the Celery group to invoke multiple tasks in parallel and wait for their results.
@app.task(name="app_celery.scraping_process", soft_time_limit=900, time_limit=960, max_retries=3)
def scraping_process():
sources = ["a", "b", "c"]
tasks =[]
for src in sources:
tasks.append(company_representation.s(src))
# create a group with all the tasks
job = group(tasks)
result = job.apply_async(queue="spiders", routing_key="spiders")
ret_val = result.get(disable_sync_subtasks=False)
return ret_val
@app.task(name="app_celery.company_representation", max_retries=3)
def company_representation(source: str):
# do something
time.sleep(60)
Reference: http://ask.github.io/celery/userguide/groups.html#groups
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Renier |
| Solution 2 | dassum |
