'Python multiprocessing.Queue.get raises EOFError on first call

I'm using python 3.7 on ubuntu 20.04 OS. My problem statement is similar to that of producer and consumer problem, where there is a pair of reader and writer processes. My reader process calls Queue.get in an infinite loop, (As per documentation, the Queue.get is blocking call until any data is put into the queue by another process).

Making this call raises EOFError.

reader.py

import multiprocessing as mp

def reader(queue):
    while True:
        try:
            data = queue.get()
        except Exception as e:
            print(f'Exception occurred: {e}')
        # Do something

queue = mp.Manager().Queue()
p = mp.Process(target=reader, args=(queue,))
p.start()
# Prepare some data to send
queue.put(some_data)
# Do my own tasks

Running This results in

    Traceback (most recent call last):
  File "/usr/src/app/src/processor.py", line 775, in classification_manager
    slot_data = classification_queue.get()
  File "<string>", line 2, in get
  File "/usr/lib/python3.7/multiprocessing/managers.py", line 819, in _callmethod
    kind, result = conn.recv()
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError
2022-02-18 20:45:01,528 classification_1 INFO     Waiting for Data!
2022-02-18 20:45:01,528 classification_1 ERROR    BrokenPipeError
Traceback (most recent call last):
  File "/usr/src/app/src/processor.py", line 775, in classification_manager
    slot_data = classification_queue.get()
  File "<string>", line 2, in get
  File "/usr/lib/python3.7/multiprocessing/managers.py", line 818, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe


Solution 1:[1]

I got my answer. In my case, the Linux OOM killer was killing the parent process. That's why the child process was getting this EOFError and then BrokenPipeError.

You can read about linux OOM killer here.

Solution 2:[2]

The threading equivalent to your code works fine:

import threading as th
from queue import Queue
from time import sleep

def reader(queue):
    while True:
        data = queue.get()
        print ("Reader saw",data)
        # Do something

queue = Queue()
p = th.Thread(target=reader, args=(queue,))
p.start()

queue.put("Expect More Data")

call_count = 0
while True:
    sleep(2)
    call_count += 1
    queue.put(call_count)

Solution 3:[3]

If I just elaborate your code (which isn't runnable as it stands) then there's no problem at all.

import multiprocessing as mp

def reader(queue):
    while (data := queue.get()) != 'stop':
        print(data)

def main():
    queue = mp.Manager().Queue()
    p = mp.Process(target=reader, args=(queue,))
    p.start()
    for some_data in ['Hello', 'world', 'stop']:
        queue.put(some_data)
    p.join()


if __name__ == '__main__':
    main()

Output:

Hello
world

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Bhavesh Achhada
Solution 2 RufusVS
Solution 3