'python multiprocessing.Process rarely does not start
Rarely, the mpmt_a / mpmt_b function created with multiprocess.Process got start(), but none of the lines in the function are executed.
the code is tested on
- local ( Ubuntu 20.04.1 LTS, 5.13.0-1017-oem, Python 3.9.7 )
- colab
import itertools
import multiprocessing
import random
import threading
import tqdm
random_bytes = lambda n: bytes([random.randint(0, 255) for _ in range(n)])
def mpmt_a(RTQ, put_bytes, barrier):
print("mpmt_a wait")
barrier.wait()
print("mpmt_a pass")
RTQ.put(put_bytes)
def mpmt_b(RTQ_a, RTQ_b, barrier):
print("mpmt_b wait")
barrier.wait()
print("mpmt_b pass")
items = []
while 1:
items.append(RTQ_a.get())
if items[-1] is None:
break
RTQ_b.put(len(items) - 1)
def randorder(X):
L = list(X)
random.shuffle(L)
return L
if __name__ == "__main__":
random.seed(1)
test_iter = tqdm.trange(1000)
for idx in test_iter:
mp_a = random.randrange(3)
mt_a = random.randrange(3)
mp_b = random.randrange(3)
mt_b = random.randrange(3)
ctx = [
multiprocessing.get_context("spawn"),
multiprocessing.get_context("fork"),
multiprocessing.get_context("forkserver"),
][idx % 3]
RTQ_a = ctx.Queue()
RTQ_b = ctx.Queue()
ba_a = ctx.Barrier(mp_a + mt_a)
mps_a = [
ctx.Process(
target=mpmt_a,
args=(
RTQ_a,
random_bytes(16),
ba_a,
),
)
for i in range(mp_a)
]
mts_a = [
threading.Thread(
target=mpmt_a,
args=(
RTQ_a,
random_bytes(16),
ba_a,
),
)
for i in range(mt_a)
]
ba_b = ctx.Barrier(mp_b + mt_b)
mps_b = [
ctx.Process(
target=mpmt_b,
args=(
RTQ_a,
RTQ_b,
ba_b,
),
)
for i in range(mp_b)
]
mts_b = [
threading.Thread(
target=mpmt_b,
args=(
RTQ_a,
RTQ_b,
ba_b,
),
)
for i in range(mt_b)
]
for MPMT in randorder(itertools.chain(mps_a, mts_a, mps_b, mts_b)):
MPMT.start()
for MPMT in randorder(itertools.chain(mps_a, mts_a)):
MPMT.join()
# finish protocol; mpmt_b will stop if get None from queue
for MPMT in randorder(itertools.chain(mps_b, mts_b)):
RTQ_a.put(None)
collect_info = [
RTQ_b.get() for MPMT in randorder(itertools.chain(mps_b, mts_b))
]
for MPMT in randorder(itertools.chain(mps_b, mts_b)):
MPMT.join()
test_iter.set_description(
f"{idx} : {threading.activeCount()} / {len(ctx.active_children())} "
)
print("finish")
The suspicious part is that when removing print from mpmt_a and mpmt_b, the following error occurred, and the frequency of abort (or deadlock) drastically decreased. (Only effective on local machine, not colab)
Traceback (most recent call last):
File "/usr/lib/python3.9/multiprocessing/queues.py", line 251, in _feed
send_bytes(obj)
File "/usr/lib/python3.9/multiprocessing/connection.py", line 205, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes
self._send(header + buf)
File "/usr/lib/python3.9/multiprocessing/connection.py", line 373, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
I expect the code finish with
999 : 4 / 0 : 100%|██████████| 1000/1000 [00:34<00:00, 33.04it/s]
finish
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
