'How can I use Celery's apply_async()/delay() as a non-blocking asyncio coroutine inside an asyncio loop?

All of the questions I've found seem to want to use a coroutine inside the celery worker called from a sync parent process. I want to do the exact opposite. I want an async loop to be able to use celery.app.task.Task.apply_async() (or delay()) as a non-blocking coroutine. For context, I'm calling celery tasks from a discord.py loop and I'd like each command to wait for the task to return and keep the context to reply easily. The program flow should look something like this:

@commands.command()
async def some_kind_of_bot_command(ctx: commands.Context):
    await ctx.reply("your command is processing...")

    # this should NOT block the event loop
    result = await celery_task_module.some_task_ran_in_celery.delay()

    await ctx.reply("here is the result of your command: " + result)

In this example, some_task_ran_in_celery is a synchronous/blocking task ran within a celery worker.

I don't mind using external libraries or a custom function wrapper/something else as long as it has this kind of behavior.

If this isn't possible or is a bad way to do this, please let me know as I'm open to feedback.



Solution 1:[1]

In general, the celery is geared towards multi-processing not towards coroutines and async. The key thing to remember when trying to integrate with asyncio is that calling delay or apply_async is a "relatively" non-blocking call (each call will kick off the task by placing a message on the celery broker, like redis or rabbitmq). To that end, you could do a few things here

  1. pay the small penalty of the blocking I/O required to place the message on the broker.
  2. write your own asyncio way to place the message on the broker
  3. write a polling loop that will check the AsyncResult for the task call to see if it is ready (checking the AsyncResult will normally use a blocking I/O call to check on the results in the celery result backend).

You will need (1 or 2) and 3.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 DharmanBot