'How can I access the task dependency graph of a celery AsyncResult/Signature comprised of multiple tasks?

I'd like to be able to extract the task dependency graph for a celery AsyncResult/Signature. My AsyncResult/Signature may be a complex chain/group/chord. I'd like to extract the graph of task_id from parent/children AsyncResults and serialize it so that I can reconstitute the AsyncResult from task_id string at a later date.

I suspect this output would come from traversing the AsyncResult.children or AsyncResult.parent tree of tasks, but wanted to see if anything in celery already existed for this without having to write my own traversal code.

I'd like an output something roughly akin to:

{
    "GroupTask-id-xxx": [
        "Task-id-xxx",
        "Task-id-xxx",
    ]
}


Solution 1:[1]

Looks like lots of the tools to use are discussed in this answer on GitHub.

Main tools:

  • Async/GroupResult.as_tuple() gives a serialized tuple representation of the entire dependency graph. Used in conjunction with celery.result.result_from_tuple(tuple_representation) to rehydrate an Async/GroupResult from the tuple serialization.
  • my_group_result.save() if trying to access GroupResult objects in the future from a single id. This saves the tuple representation to the bckend. Used in conjunction with GroupResult.restore(group_id). Note that it does not capture the parents of either the group or its children AsyncResults.

If you want to save these results to a database so that they can be fully retrieved at a future date with just an id the following methods provide that:

import json
from celery.result import AsyncResult, GroupResult, result_from_tuple

def save_result(result: Union[AsyncResult, GroupResult]) -> None:
    """Save result compute structure to backend"""
    result.backend.set(result.id, json.dumps(result.as_tuple()))


def restore_result(result_id: str) -> Union[AsyncResult, GroupResult]:
    """Restore result (including parents) from backend

    Raises:
        ValueError if result not found in backend
    """
    try:
        return result_from_tuple(json.loads(tasks.celery_app.backend.get(result_id)))
    except TypeError:
        raise ValueError(f"Result id '{result_id}', not found.")

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1