'Is there a concurrent.futures.wait equivalent for the multiprocessing library?

I have a multiprocessing.Pool of workers, and I'd like to block the main thread until one worker completes. If I use concurrent.futures.ProcessPoolExecutor I can use concurrent.futures.wait to wait for the first of the tasks I submit to the executor to complete to unblock the main thread and possibly do something with the result. Is there an equivalent for multiprocessing.pool? I could use a Queue and have the child insert the result into the Queue and the parent wait on it, but I'm looking for something more akin to the wait function.

Example:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_COMPLETED
from random import random
from time import sleep

class Example:
    def start(self):
        with ProcessPoolExecutor(max_workers=10) as executor:
            futures = set()
            for i in range(10):
                futures.add(executor.submit(self.worker, i))
            while futures:
                done, futures = wait(futures, return_when=FIRST_COMPLETED)
                print(f"{len(done)} tasks completed")
                for task in done:
                    res = task.result()
                    print(f"Worker {res} finished")

    def worker(self, workerid):
        sleep(random())
        return workerid

ex = Example()
ex.start()

outputs:

1 tasks completed
Worker 6 finished
1 tasks completed
Worker 4 finished
1 tasks completed
Worker 9 finished
1 tasks completed
Worker 8 finished
1 tasks completed
Worker 0 finished
1 tasks completed
Worker 3 finished
1 tasks completed
Worker 7 finished
1 tasks completed
Worker 2 finished
1 tasks completed
Worker 1 finished
1 tasks completed
Worker 5 finished


Solution 1:[1]

The answer given by Andrej Kesely is certainly the most straight forward and is more-or-less equivalent to using the concurrent.futures.as_completed method. But by definition each iteration will always give you just one more completed task even if two or more had completed between iterations.

This would be the closest equivalent to what your code is doing. But it is rare for tasks to complete so close together such that each iteration would produce more than one completed task, although it is possible (see demo run below, where it did occur). But I would still expect the number of tasks completed to be printed out to be 1 just about every time:

from multiprocessing import Pool
from threading import Event
from random import random
from time import sleep

class Example:

    def start(self):

        def my_callback(result):
            nonlocal event
            # Show a task has completed:
            event.set()

        pool = Pool(10)
        event = Event()
        async_results = {pool.apply_async(self.worker, args=(i,), callback=my_callback) for i in range(10)}
        while async_results:
            # Wait for a task to complete:
            event.wait()
            event.clear()
            done = set()
            for async_result in async_results:
                if async_result.ready():
                    res = async_result.get()
                    print(f"Worker {res} finished")
                    done.add(async_result)

            tasks_completed = len(done)
            if tasks_completed:
                print(f"{tasks_completed} task(s) completed")
                async_results -= done
        pool.close()
        pool.join()

    def worker(self, workerid):
        sleep(random())
        return workerid

# In case we are running under Windows:
if __name__ == '__main__':
    ex = Example()
    ex.start()

Prints:

Worker 7 finished
1 task(s) completed
Worker 4 finished
Worker 9 finished
2 task(s) completed
Worker 2 finished
1 task(s) completed
Worker 1 finished
1 task(s) completed
Worker 8 finished
1 task(s) completed
Worker 5 finished
1 task(s) completed
Worker 0 finished
1 task(s) completed
Worker 6 finished
1 task(s) completed
Worker 3 finished
1 task(s) completed

If you modify worker to be the following, then it is much more likely that multiple tasks will complete more-or-less at the same time so that each iteration can very well find multiple tasks having completed:

    def worker(self, workerid):
        #sleep(random())
        sleep(.1)
        return workerid

Prints:

Worker 0 finished
1 task(s) completed
Worker 1 finished
Worker 2 finished
2 task(s) completed
Worker 5 finished
Worker 8 finished
Worker 7 finished
Worker 4 finished
Worker 9 finished
Worker 3 finished
6 task(s) completed
Worker 6 finished
1 task(s) completed

Note

The callback function my_callback will set the event once for each completed task (i.e. 10 times for this example). If multiple tasks are discovered to have been completed together in each iteration of the block while async_results:, then during the next iteration the event will have been set for one or more tasks that had completed and processed during the previous iteration and therefore tasks_completed might be 0.

Solution 2:[2]

I hope I've understand your question right. It seems that you can use pool.imap_unordered function to iterate over finished results:

from time import sleep
from random import random
from multiprocessing import Pool


def worker_fn(workerid):
    sleep(random())
    return workerid


if __name__ == "__main__":
    with Pool() as pool:
        for result in pool.imap_unordered(worker_fn, range(10)):
            print(f"Worker {result} finished")

Prints:

Worker 3 finished
Worker 6 finished
Worker 8 finished
Worker 9 finished
Worker 1 finished
Worker 2 finished
Worker 5 finished
Worker 4 finished
Worker 7 finished
Worker 0 finished

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Andrej Kesely