'How to explain these multiprocessing execution results?

I met such a problem: it has some mutually independent big tasks, and each task has some following mutually independent small tasks that depend on the result of the big task. I did some experiments on Python 3.10.4 to find how can I utilize laziness to fill the multiprocessing pool. The code is as follows:

import time
from multiprocessing import Pool
from itertools import chain


def measure(f):
    start = time.time()
    f()
    print(f'{f.__name__}: {time.time() - start}')


def big_task(_):
    time.sleep(3)
    return range(3)


def small_task(_):
    time.sleep(1)


p = Pool(4)


# total: 11s
@measure
def test1():
    for i in p.map(big_task, range(5)):
        p.map(small_task, i)


# total: 8s
@measure
def test2():
    for i in p.imap(big_task, range(5)):
        p.map(small_task, i)


# total: 10s
@measure
def test3():
    p.map(
        small_task,
        chain.from_iterable(
            p.imap(big_task, range(5))
        )
    )


# total: 8s
@measure
def test4():
    list(p.imap(
        small_task,
        chain.from_iterable(
            p.imap(big_task, range(5))
        )
    ))

I know how to explain the result of test1. It takes 6s to finish the outer loop and 10 * 1s to finish the inner loop. But I don't know how to explain the results of other tests because I can't figure out how multiprocessing.Pool.imap works internally and how lazy the iterable argument in map/imap is (i.e., when will the next(iter) be called).

I also did another experiment that changes Pool(4) to Pool(8) and changes range(5) to range(10). The results were 16s, 13s, 10s, 8s, respectively.



Solution 1:[1]

I try to add logging and see how the tasks are distributed. Here is the code:

import time
from multiprocessing import Pool, current_process, Value, Array
from itertools import chain

worker_num = 4
big_task_num = 5

start = Value('d', 0.0)
table = [Array('c', b' ' * 40) for _ in range(worker_num)]


def measure(f):
    start.value = time.time()
    for r in table:
        r.value = b'| ' * 20
    f()
    print(f'##### {f.__name__}: {time.time() - start.value}s #####')
    print('\n'.join(r.value.decode() for r in table))
    print(flush=True)


def write_table(msg):
    proc_idx = int(current_process().name[len('ForkPoolWorker-'):]) - 1
    time_idx = int(time.time() - start.value) * 2 + 1
    table[proc_idx][time_idx:time_idx + len(msg)] = msg


def big_task(_):
    write_table(b'b b b')
    time.sleep(3)
    return range(3)


def small_task(_):
    write_table(b's')
    time.sleep(1)


p = Pool(worker_num)


@measure
def test1():
    for i in p.map(big_task, range(big_task_num)):
        p.map(small_task, i)


@measure
def test2():
    for i in p.imap(big_task, range(big_task_num)):
        p.map(small_task, i)


@measure
def test3():
    p.map(
        small_task,
        chain.from_iterable(
            p.imap(big_task, range(big_task_num))
        )
    )


@measure
def test4():
    list(p.imap(
        small_task,
        chain.from_iterable(
            p.imap(big_task, range(big_task_num))
        )
    ))

Here is the result for worker_num = 4 and big_task_num = 5:

##### test1: 11.0163254737854s #####
|b b b| | | |s|s|s| |s| | | | | | | | | 
|b b b| | | |s| |s|s|s| | | | | | | | | 
|b b b| | | |s|s| |s|s| | | | | | | | | 
|b b b|b b b| |s|s|s| | | | | | | | | | 

##### test2: 8.008505821228027s #####
|b b b|s|s|s|s|s| | | | | | | | | | | | 
|b b b|s|s|s|s| | | | | | | | | | | | | 
|b b b|b b b|s|s| | | | | | | | | | | | 
|b b b|s|s|s| |s| | | | | | | | | | | | 

##### test3: 10.010857343673706s #####
|b b b| | | |s|s|s|s| | | | | | | | | | 
|b b b| | | |s|s|s| | | | | | | | | | | 
|b b b|b b b|s|s|s|s| | | | | | | | | | 
|b b b| | | |s|s|s|s| | | | | | | | | | 

##### test4: 8.006532430648804s #####
|b b b|s|s|s|s| | | | | | | | | | | | | 
|b b b|s|s|s|s| | | | | | | | | | | | | 
|b b b|s|s|s|s|s| | | | | | | | | | | | 
|b b b|b b b|s|s| | | | | | | | | | | | 

Here is the result for worker_num = 8 and big_task_num = 10:

##### test1: 16.0493221282959s #####
|b b b| | | |s| |s| | |s| | |s| | | | | 
|b b b| | | | |s| | |s| |s| | |s| | | | 
|b b b| | | |s| | |s| |s| | |s| | | | | 
|b b b| | | | |s| | |s| | |s| | | | | | 
|b b b|b b b| | |s| | |s| |s| | | | | | 
|b b b| | | | |s| |s| | |s| | |s| | | | 
|b b b| | | |s| | |s| | |s| |s| | | | | 
|b b b|b b b| | |s| |s| | |s| |s| | | | 

##### test2: 13.053321838378906s #####
|b b b| |s| |s| |s| | |s| | | | | | | | 
|b b b|b b b| |s| | |s| |s| | | | | | | 
|b b b| |s| |s| | |s| |s| | | | | | | | 
|b b b|s| |s| |s| | |s| | | | | | | | | 
|b b b|s| |s| | |s| |s| | | | | | | | | 
|b b b|s| |s| | |s| | |s| | | | | | | | 
|b b b| |s| |s| | |s| | |s| | | | | | | 
|b b b|b b b| |s| |s| | |s| | | | | | | 

##### test3: 10.020010232925415s #####
|b b b| | | |s|s|s|s| | | | | | | | | | 
|b b b| | | |s|s|s|s| | | | | | | | | | 
|b b b| | | |s|s|s|s| | | | | | | | | | 
|b b b|b b b|s|s|s|s| | | | | | | | | | 
|b b b| | | |s|s|s| | | | | | | | | | | 
|b b b|b b b|s|s|s|s| | | | | | | | | | 
|b b b| | | |s|s|s| | | | | | | | | | | 
|b b b| | | |s|s|s|s| | | | | | | | | | 

##### test4: 8.007385969161987s #####
|b b b|s|s|s|s| | | | | | | | | | | | | 
|b b b|s|s|s|s| | | | | | | | | | | | | 
|b b b|s|s|s|s| | | | | | | | | | | | | 
|b b b|b b b|s|s| | | | | | | | | | | | 
|b b b|s|s|s|s| | | | | | | | | | | | | 
|b b b|b b b|s|s| | | | | | | | | | | | 
|b b b|s|s|s|s|s| | | | | | | | | | | | 
|b b b|s|s|s|s|s| | | | | | | | | | | | 

Some observations:

  • map will immediately turn iterable into list(iterable), then apply them to each task.

  • imap will call next on iterable lazily.

  • Once imap is called, it starts executing on the background. If we call next on the iterator, we will block until the result is returned. It does not postpone the execution of tasks to the time we call next on it.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1