'multiprocessing.Queue fails intermittently. Bug in Python?

Python's multiprocessing.Queuefails intermittently, and I don't know why. Is this a bug in Python or my script?

Minimal failing script

import multiprocessing
import time
import logging
import multiprocessing.util
multiprocessing.util.log_to_stderr(level=logging.DEBUG)

queue = multiprocessing.Queue(maxsize=10)

def worker(queue):
    queue.put('abcdefghijklmnop')

    # "Indicate that no more data will be put on this queue by the
    # current process." --Documentation
    # time.sleep(0.01)
    queue.close()

proc = multiprocessing.Process(target=worker, args=(queue,))
proc.start()

# "Indicate that no more data will be put on this queue by the current
# process." --Documentation
# time.sleep(0.01)
queue.close()

proc.join()

I am testing this in CPython 3.6.6 in Debian. It also fails with docker python:3.7.0-alpine.

docker run --rm -v "${PWD}/test.py:/test.py" \
    python:3-alpine python3 /test.py

The above script sometimes fails with a BrokenPipeError.

Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 240, in _feed
    send_bytes(obj)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

Test harness

Because this is intermittent, I wrote a shell script to call it many times and count the failures.

#!/bin/sh
total=10

successes=0
for i in `seq ${total}`
do
    if ! docker run --rm -v "${PWD}/test.py:/test.py" python:3-alpine \
         python3 test.py 2>&1 \
         | grep --silent BrokenPipeError
    then
        successes=$(expr ${successes} + 1)
    fi
done
python3 -c "print(${successes} / ${total})"

This usually shows some fraction, maybe 0.2 indicating intermittent failures.

Timing adjustments

If I insert time.sleep(0.01) before either queue.close(), it works consistently. I noticed in the source code that writing happens in its own thread. I think if the writing thread is still trying to write data and all of the other threads close the queue, then it causes the error.

Debug logs

By uncommenting the first few lines, I can trace the execution for failures and successes.

Failure:

[DEBUG/MainProcess] created semlock with handle 140480257941504
[DEBUG/MainProcess] created semlock with handle 140480257937408
[DEBUG/MainProcess] created semlock with handle 140480257933312
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/Process-1] Queue._after_fork()
[INFO/Process-1] child process calling self.run()
[DEBUG/Process-1] Queue._start_thread()
[DEBUG/Process-1] doing self._thread.start()
[DEBUG/Process-1] starting thread to feed data to pipe
[DEBUG/Process-1] ... done self._thread.start()
[DEBUG/Process-1] telling queue thread to quit
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-1] joining queue thread
Traceback (most recent call last):
  File "/usr/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
[DEBUG/Process-1] feeder thread got sentinel -- exiting
[DEBUG/Process-1] ... queue thread joined
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

"Success" (really silent failure, only able to replicate with Python 3.6):

[DEBUG/MainProcess] created semlock with handle 139710276231168
[DEBUG/MainProcess] created semlock with handle 139710276227072
[DEBUG/MainProcess] created semlock with handle 139710276222976
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/Process-1] Queue._after_fork()
[INFO/Process-1] child process calling self.run()
[DEBUG/Process-1] Queue._start_thread()
[DEBUG/Process-1] doing self._thread.start()
[DEBUG/Process-1] starting thread to feed data to pipe
[DEBUG/Process-1] ... done self._thread.start()
[DEBUG/Process-1] telling queue thread to quit
[INFO/Process-1] process shutting down
[INFO/Process-1] error in queue thread: [Errno 32] Broken pipe
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-1] joining queue thread
[DEBUG/Process-1] ... queue thread joined
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

True success (using either time.sleep(0.01)):

[DEBUG/MainProcess] created semlock with handle 140283921616896
[DEBUG/MainProcess] created semlock with handle 140283921612800
[DEBUG/MainProcess] created semlock with handle 140283921608704
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/Process-1] Queue._after_fork()
[INFO/Process-1] child process calling self.run()
[DEBUG/Process-1] Queue._start_thread()
[DEBUG/Process-1] doing self._thread.start()
[DEBUG/Process-1] starting thread to feed data to pipe
[DEBUG/Process-1] ... done self._thread.start()
[DEBUG/Process-1] telling queue thread to quit
[INFO/Process-1] process shutting down
[DEBUG/Process-1] feeder thread got sentinel -- exiting
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-1] joining queue thread
[DEBUG/Process-1] ... queue thread joined
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

The difference seems to be that in the truly successful case, the feeder receives the sentinel object before the atexit handlers.



Solution 1:[1]

the primary issue with your code is that nobody is consuming what your worker process has put in the queue. python queues expect that the data in queues is consumed ("flushed to pipe") prior to the process that put data on it is killed.

in this light, your example doesn't make much sense, but if you want to get it to work:

the key is the queue.cancel_join_thread() -- https://docs.python.org/3/library/multiprocessing.html

Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue

^ relevant bit. the issue is that stuff is being put on the queue from the child process but NOT consumed by anyone. In this case cancel_join_queue must be called on the CHILD process prior to asking it to join. This code sample will get rid of the error.

import multiprocessing
import time
import logging
import multiprocessing.util
multiprocessing.util.log_to_stderr(level=logging.DEBUG)

queue = multiprocessing.Queue(maxsize=10)

def worker(queue):
    queue.put('abcdefghijklmnop')

    # "Indicate that no more data will be put on this queue by the
    # current process." --Documentation
    # time.sleep(0.01)
    queue.close()
    
    queue.cancel_join_thread() # ideally, this would not be here but would rather be a response to a signal (or other IPC message) sent from the main process


proc = multiprocessing.Process(target=worker, args=(queue,))
proc.start()

# "Indicate that no more data will be put on this queue by the current
# process." --Documentation
# time.sleep(0.01)
queue.close()

proc.join()

I didn't bother with IPC for this because there's no consumer at all but I hope the idea is clear.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 fjlksahfob