'Celery runing subtasks asynchronously within a task and waiting for results

I have a celery task that launches three other celery tasks. I want these tasks to execute asynchronously and wait for them to finish before i resume the parent task. However the child tasks are running synchronosly and I don't know why. The problem started when I upgraded celery from 4.4.7 to 5.0.0

app_celery.py

@app.task(name="app_celery.scraping_process", soft_time_limit=900, time_limit=960, max_retries=3)
def scraping_process():
    sources = ["a", "b", "c"]
    job = group((company_representation.s(src) for src in sources))
    result = job.apply_async(queue="spiders", routing_key="spiders")
    while not result.ready():
        time.sleep(5)
    
@app.task(name="app_celery.company_representation", max_retries=3)
def company_representation(source: str):
    # do something
    time.sleep(60)
    

I am running celery like this:

celery -A app_celery worker -c 8 -Q spiders -n spiders@%%h
celery -A app_celery worker -c 2 -Q companies -n companies@%%h --without-mingle --without-heartbeat -Ofair

celery==5.0.0



Solution 1:[1]

You could add the task id's to a list and then do something like:

def poll_job_status(active_jobs):
    if len(active_jobs) == 1:
        task = task.AsyncResult(active_jobs[0])
        if not task.ready():
            return active_jobs
    _new_active_jobs = []
    for taskid in active_jobs:
        task = task.AsyncResult(taskid)
        if task.state == "PENDING" or task.state == "RETRY":
            _new_active_jobs.append(taskid)
    active_jobs = _new_active_jobs
    return active_jobs

So you would iterate over the list of task IDs and check if the task is complete or not. And then if the list is empty you know that all the tasks has run and you can carry on with other operations. example usage would be:

active_tasks_list = []
active_tasks_list.append(task.delay(args).id)
while len(active_tasks_list) > 0:
   poll_job_status(active_tasks_list)
# carry on other processes

This is ideal if you have numerous tasks that you want to keep track of.

Solution 2:[2]

You can try the Celery group to invoke multiple tasks in parallel and wait for their results.

@app.task(name="app_celery.scraping_process", soft_time_limit=900, time_limit=960, max_retries=3)
def scraping_process():
    sources = ["a", "b", "c"]
    tasks =[]
    for src in sources:
        tasks.append(company_representation.s(src))

    # create a group with all the tasks
    job = group(tasks)
    result = job.apply_async(queue="spiders", routing_key="spiders")
    ret_val = result.get(disable_sync_subtasks=False)
    return ret_val
    
@app.task(name="app_celery.company_representation", max_retries=3)
def company_representation(source: str):
    # do something
    time.sleep(60)

Reference: http://ask.github.io/celery/userguide/groups.html#groups

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Renier
Solution 2 dassum