'Running a Python multiprocessing Queue with two consumers

I just began to learn about Python multiprocessing. For my first exercise I am trying to create a simple Queue with two consumers. Each consumer gets an element from the queue, processes it, and prints the result to stdout.

Here's what I tried (takes a bunch from an example I tried in the Python standard library):

import random
import time
from multiprocessing import Queue, Process


stop_sentinel = "STOP"


def consumer(in_q: Queue, name: str) -> None:
    for func, args in iter(in_q, stop_sentinel):
        print(f"Process {name}, result: {func(*args)}")
        time.sleep(0.5 * random.random())


def fn(x: int) -> str:
    if x % 3:
        return "Fizz"
    if x % 5:
        return "Buzz"
    if x % 15:
        return "FizzBuzz"
    return str(x)


def main():
    proc_q = Queue()

    for i in range(20):
        inputs = (fn, (i + 1,))
        proc_q.put(inputs)

    proc_q.put(stop_sentinel)
    proc_q.put(stop_sentinel)

    p1 = Process(target=consumer, args=())
    p2 = Process(target=consumer, args=())

    p1._args = (proc_q, p1.name)
    p2._args = (proc_q, p2.name)

    p1.start()
    p2.start()


if __name__ == '__main__':
    main()

However when I run this it fails immediately, without processing a single element. This is the stack trace:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/synchronize.py", line 110, in __setstate__
    self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/synchronize.py", line 110, in __setstate__
    self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory

What am I doing wrong?



Solution 1:[1]

I am having some difficulty accounting for your specific error message (I get a different one with your code), but clearly your use of iter in function consumer is not correct. When you use this function with two arguments as you are doing, then the first argument must be a callable object that takes no arguments, i.e. it should be a function, specifically in this case in_q.get.

import random
import time
from multiprocessing import Queue, Process


stop_sentinel = "STOP"


def consumer(in_q: Queue, name: str) -> None:
    for func, args in iter(in_q.get, stop_sentinel):
        print(f"Process {name}, result: {func(*args)}")
        time.sleep(0.5 * random.random())


def fn(x: int) -> str:
    if x % 3:
        return "Fizz"
    if x % 5:
        return "Buzz"
    if x % 15:
        return "FizzBuzz"
    return str(x)


def main():
    proc_q = Queue()

    for i in range(20):
        inputs = (fn, (i + 1,))
        proc_q.put(inputs)

    proc_q.put(stop_sentinel)
    proc_q.put(stop_sentinel)

    p1 = Process(target=consumer, args=())
    p2 = Process(target=consumer, args=())

    p1._args = (proc_q, p1.name)
    p2._args = (proc_q, p2.name)

    p1.start()
    p2.start()


if __name__ == '__main__':
    main()

Prints:

Process Process-1, result: Fizz
Process Process-2, result: Fizz
Process Process-1, result: Buzz
Process Process-2, result: Fizz
Process Process-1, result: Fizz
Process Process-1, result: Buzz
Process Process-2, result: Fizz
Process Process-1, result: Fizz
Process Process-2, result: Buzz
Process Process-2, result: Fizz
Process Process-2, result: Fizz
Process Process-1, result: Buzz
Process Process-2, result: Fizz
Process Process-1, result: Fizz
Process Process-1, result: 15
Process Process-2, result: Fizz
Process Process-1, result: Fizz
Process Process-2, result: Buzz
Process Process-2, result: Fizz
Process Process-1, result: Fizz

Some Notes

You have no control over how the two processes will be dispatched by the operating system and therefore which process will "get" which items that have been put on the queue. If you rerun the program you will probably see different processes process different items.

Object attributes that begin with an underscore, such as the _args attribute of the Process object are generally to be though of as "private" and when you use them you run the risk that tomorrow's new version of Python may no longer use this attribute or use it in a different way. Consequently, I would personally use the following method for identifying processes:

import random
import time
from multiprocessing import Queue, Process
import os


stop_sentinel = "STOP"


def consumer(in_q: Queue) -> None:
    pid = os.getpid()
    for func, args in iter(in_q.get, stop_sentinel):
        print(f"Process {pid}, result: {func(*args)}")
        time.sleep(0.5 * random.random())


def fn(x: int) -> str:
    if x % 3:
        return "Fizz"
    if x % 5:
        return "Buzz"
    if x % 15:
        return "FizzBuzz"
    return str(x)


def main():
    proc_q = Queue()

    for i in range(20):
        inputs = (fn, (i + 1,))
        proc_q.put(inputs)

    proc_q.put(stop_sentinel)
    proc_q.put(stop_sentinel)

    p1 = Process(target=consumer, args=(proc_q,))
    p2 = Process(target=consumer, args=(proc_q,))

    p1.start()
    p2.start()

    # Explicitly wait for tasks to complete:
    p1.join()
    p2.join()


if __name__ == '__main__':
    main()

Prints:

Process 16672, result: Fizz
Process 15304, result: Fizz
Process 15304, result: Buzz
Process 16672, result: Fizz
Process 15304, result: Fizz
Process 16672, result: Buzz
Process 15304, result: Fizz
Process 16672, result: Fizz
Process 16672, result: Buzz
Process 15304, result: Fizz
Process 16672, result: Fizz
Process 15304, result: Buzz
Process 16672, result: Fizz
Process 16672, result: Fizz
Process 15304, result: 15
Process 15304, result: Fizz
Process 16672, result: Fizz
Process 16672, result: Buzz
Process 15304, result: Fizz
Process 16672, result: Fizz

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Booboo