'Alternative to asyncio.gather which I can keep adding coroutines to at runtime?

I need to be able to keep adding coroutines to the asyncio loop at runtime. I tried using create_task() thinking that this would do what I want, but it still needs to be awaited.

This is the code I had, not sure if there is a simple edit to make it work?

async def get_value_from_api():
    global ASYNC_CLIENT
    return ASYNC_CLIENT.get(api_address)


async def print_subs():
    count = await get_value_from_api()
    print(count)


async def save_subs_loop():
    while True:
        asyncio.create_task(print_subs())
        time.sleep(0.1)


async def start():
    global ASYNC_CLIENT
    async with httpx.AsyncClient() as ASYNC_CLIENT:
        await save_subs_loop()


asyncio.run(start())


Solution 1:[1]

asyncio.create_task() works as you describe it. The problem you are having here is that you create an infinite loop here:

async def save_subs_loop():
    while True:
        asyncio.create_task(print_subs())
        time.sleep(0.1) # do not use time.sleep() in async code EVER

save_subs_loop() keeps creating tasks but control is never yielded back to the event loop, because there is no await in there. Try

async def save_subs_loop():
    while True:
        asyncio.create_task(print_subs())
        await asyncio.sleep(0.1) # yield control back to loop to give tasks a chance to actually run

This problem is so common I'm thinking python should raise a RuntimeError if it detects time.sleep() within a coroutine :-)

Solution 2:[2]

I once created similar pattern when I was mixing trio and kivy, which was demonstration of running multiple coroutines asynchronously.

It use a trio.MemoryChannel which is roughly equivalent to asyncio.Queue, I'll just refer it as queue here.

Main idea is:

  1. Wrap each task with class, which has run function.
  2. Make class object's own async method to put object itself into queue when execution is done.
  3. Create a global task-spawning loop to wait for the object in queue and schedule execution/create task for the object.
import asyncio
import traceback

import httpx


async def task_1(client: httpx.AsyncClient):
    resp = await client.get("http://127.0.0.1:5000/")
    print(resp.read())
    await asyncio.sleep(0.1)  # without this would be IP ban


async def task_2(client: httpx.AsyncClient):
    resp = await client.get("http://127.0.0.1:5000/meow/")
    print(resp.read())
    await asyncio.sleep(0.5)


class CoroutineWrapper:
    def __init__(self, queue: asyncio.Queue,  coro_func, *param):
        self.func = coro_func
        self.param = param
        self.queue = queue

    async def run(self):
        try:
            await self.func(*self.param)
        except Exception:
            traceback.print_exc()
            return
        
        # put itself back into queue
        await self.queue.put(self)


class KeepRunning:
    def __init__(self):
        # queue for gathering CoroutineWrapper
        self.queue = asyncio.Queue()

    def add_task(self, coro, *param):
        wrapped = CoroutineWrapper(self.queue, coro, *param)
        
        # add tasks to be executed in queue
        self.queue.put_nowait(wrapped)

    async def task_processor(self):
        task: CoroutineWrapper
        while task := await self.queue.get():
            # wait for new CoroutineWrapper Object then schedule it's async method execution
            asyncio.create_task(task.run())


async def main():
    keep_running = KeepRunning()
    async with httpx.AsyncClient() as client:
        keep_running.add_task(task_1, client)
        keep_running.add_task(task_2, client)

        await keep_running.task_processor()

asyncio.run(main())

Server

import time

from flask import Flask
app = Flask(__name__)


@app.route("/")
def hello():
    return str(time.time())


@app.route("/meow/")
def meow():
    return "meow"


app.run()

Output:

b'meow'
b'1639920445.965701'
b'1639920446.0767004'
b'1639920446.1887035'
b'1639920446.2986999'
b'1639920446.4067013'
b'meow'
b'1639920446.516704'
b'1639920446.6267014'
...

You can see tasks running repeatedly on their own pace.


Old answer

Seems like you only want to cycle fixed amount of tasks.

In that case just iterate list of coroutine with itertools.cycle

But this is no different with synchronous, so lemme know if you need is asynchronous.

import asyncio
import itertools

import httpx


async def main_task(client: httpx.AsyncClient):
    resp = await client.get("http://127.0.0.1:5000/")
    print(resp.read())
    await asyncio.sleep(0.1)  # without this would be IP ban


async def main():
    async with httpx.AsyncClient() as client:
        for coroutine in itertools.cycle([main_task]):
            await coroutine(client)


asyncio.run(main())

Server:

import time

from flask import Flask
app = Flask(__name__)


@app.route("/")
def hello():
    return str(time.time())


app.run()

Output:

b'1639918937.7694323'
b'1639918937.8804302'
b'1639918937.9914327'
b'1639918938.1014295'
b'1639918938.2124324'
b'1639918938.3204308'
...

Solution 3:[3]

You might want to try the TaskThread framework

  • It allows you to add tasks in runtime
  • Tasks are re-scheduled periodically (like in your while loop up there)
  • There is a consumer / producer framework built in (parent/child relationships) which you seem to need

disclaimer: I wrote TaskThread out of necessity & it's been a life saver.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 thisisalsomypassword
Solution 2
Solution 3 El Sampsa