'How can I "wake up" an event loop to notify it that a Future was completed from another thread?

When using python async/asyncio, I often create and complete asyncio.Future objects from threads that are not the thread running the event loop.

Unless I complete those futures in the thread that is running the event loop or via a function that notifies that loop of the completion, the event loop often does not "notice" that the futures are completed.

Is there a way to "notify" an event loop that it should check a Future for completion if that future was readied (via set_result) externally?

Why I am asking this

The threads which ready futures need to a) have very low latency, and b) check whether the future has been readied, synchronously, later on (e.g. via future.done()).

The event loop awaiting the Futures does not need to have low latency in being notified that they're ready--it can be notified a few milliseconds late.

Ideally there would be a performant way to notify the event loop that a Future had been readied after readying it synchronously in a thread.

Even if that's not possible, the event loop could poll readiness on an interval, so long as the futures were synchronously readied as quickly as possible in threads.

What I have tried

The "correct" way to solve this problem is with call_soon_threadsafe, e.g.:

def do_in_thread(future):
    future.get_loop().call_soon_threasafe(future.set_result, "the result")

That notifies the event loop of Future readiness reliably, but does not work for two reasons:

  1. It has significant (8-10x) overhead versus calling future.set_result in my benchmarks.
  2. It doesn't ready the Future until the event loop runs, which means I can't reliably check if the Future is done, which I need to do. For example, this won't work:
def do_in_thread(future):
    future.get_loop().call_soon_threasafe(future.set_result, "the result")
    assert future.done()  # Fails

One thing that does seem to work is to notify the event loop by intentionally failing a second call to set_result via call_soon_threadsafe, and swallowing the InvalidStateError, like this:

def ensure_result(f, res):
    try:
        f.set_result(res)
    except InvalidStateError:
        pass


def in_thread(fut: Future):
    fut.set_result("the result")
    fut.get_loop().call_soon_threadsafe(ensure_result, fut, "the result")

That still has overhead, but I could remove the overhead of calling call_soon_threadsafe by tracking Futures in a thread-shared data structure and polling calls to ensure_result occasionally. However, I'm still not sure:

  1. Does that reliably work? Is set_result failing with InvalidStateError guaranteed to notify the event loop that a await given Future can return out of await, or is that an undocumented implementation detail I'm relying on?
  2. Is there a better way to achieve that periodic-wakeup that doesn't involve me keeping track of/polling such Futures myself?

In a perfect world, there would be a loop.poll_all_pending_futures() or loop.update_future_state(fut) method which would achieve this efficiently, but I don't know of one.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source