'python multiprocessing.Process rarely does not start

Rarely, the mpmt_a / mpmt_b function created with multiprocess.Process got start(), but none of the lines in the function are executed.

the code is tested on

  1. local ( Ubuntu 20.04.1 LTS, 5.13.0-1017-oem, Python 3.9.7 )
  2. colab
import itertools
import multiprocessing
import random
import threading

import tqdm

random_bytes = lambda n: bytes([random.randint(0, 255) for _ in range(n)])


def mpmt_a(RTQ, put_bytes, barrier):
    print("mpmt_a wait")
    barrier.wait()
    print("mpmt_a pass")
    RTQ.put(put_bytes)


def mpmt_b(RTQ_a, RTQ_b, barrier):
    print("mpmt_b wait")
    barrier.wait()
    print("mpmt_b pass")
    items = []
    while 1:
        items.append(RTQ_a.get())
        if items[-1] is None:
            break

    RTQ_b.put(len(items) - 1)


def randorder(X):
    L = list(X)
    random.shuffle(L)
    return L


if __name__ == "__main__":
    random.seed(1)
    test_iter = tqdm.trange(1000)
    for idx in test_iter:
        mp_a = random.randrange(3)
        mt_a = random.randrange(3)
        mp_b = random.randrange(3)
        mt_b = random.randrange(3)
        ctx = [
            multiprocessing.get_context("spawn"),
            multiprocessing.get_context("fork"),
            multiprocessing.get_context("forkserver"),
        ][idx % 3]
        RTQ_a = ctx.Queue()
        RTQ_b = ctx.Queue()

        ba_a = ctx.Barrier(mp_a + mt_a)
        mps_a = [
            ctx.Process(
                target=mpmt_a,
                args=(
                    RTQ_a,
                    random_bytes(16),
                    ba_a,
                ),
            )
            for i in range(mp_a)
        ]
        mts_a = [
            threading.Thread(
                target=mpmt_a,
                args=(
                    RTQ_a,
                    random_bytes(16),
                    ba_a,
                ),
            )
            for i in range(mt_a)
        ]

        ba_b = ctx.Barrier(mp_b + mt_b)
        mps_b = [
            ctx.Process(
                target=mpmt_b,
                args=(
                    RTQ_a,
                    RTQ_b,
                    ba_b,
                ),
            )
            for i in range(mp_b)
        ]
        mts_b = [
            threading.Thread(
                target=mpmt_b,
                args=(
                    RTQ_a,
                    RTQ_b,
                    ba_b,
                ),
            )
            for i in range(mt_b)
        ]

        for MPMT in randorder(itertools.chain(mps_a, mts_a, mps_b, mts_b)):
            MPMT.start()

        for MPMT in randorder(itertools.chain(mps_a, mts_a)):
            MPMT.join()
        # finish protocol; mpmt_b will stop if get None from queue
        for MPMT in randorder(itertools.chain(mps_b, mts_b)):
            RTQ_a.put(None)

        collect_info = [
            RTQ_b.get() for MPMT in randorder(itertools.chain(mps_b, mts_b))
        ]

        for MPMT in randorder(itertools.chain(mps_b, mts_b)):
            MPMT.join()

        test_iter.set_description(
            f"{idx} : {threading.activeCount()} / {len(ctx.active_children())} "
        )

    print("finish")

The suspicious part is that when removing print from mpmt_a and mpmt_b, the following error occurred, and the frequency of abort (or deadlock) drastically decreased. (Only effective on local machine, not colab)

Traceback (most recent call last):
  File "/usr/lib/python3.9/multiprocessing/queues.py", line 251, in _feed
    send_bytes(obj)
  File "/usr/lib/python3.9/multiprocessing/connection.py", line 205, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes
    self._send(header + buf)
  File "/usr/lib/python3.9/multiprocessing/connection.py", line 373, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

I expect the code finish with

999 : 4 / 0 : 100%|██████████| 1000/1000 [00:34<00:00, 33.04it/s]
finish


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source