'Dask : what is the asyncio equivalent of as_completed?

I have a working Dask client code like that :

client = Client(address=self.cluster)

futures = []

for job in jobs:

   future = client.submit(...)
   futures.append(future)

for future, result in as_completed(futures, with_results=True, raise_errors=True):

   key = future.key
   state = (State.FINISHED if result is True else State.FAILED)
   
   ...

The Dask as_completed function is relevant, because it iterate on job that have finished with the good order.

The problem with that code, is it may block indefinitely on the as_completed call, in case of the workers are not available for instance.

Is there a way to rewrite it with asyncio ? Indeed, with asyncio, I may use the wait function with a timeout, in order to unblock blocking call, in case of errors.

Thank you



Solution 1:[1]

You can use asyncio.as_completed https://docs.python.org/3/library/asyncio-task.html

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Andrea Tedeschi