'Cannot access property of subclass of multiprocessing.queues.Queue in multiprocessing.Process
I was trying to use multiprocessing.Queue to save the return value of multiprocessing.Process:
queue = Queue()
for i, (name, func) in enumerate(funcs.items()):
p = Process(target=analyse, args=(i, name, func, grid, queue))
The problem is that Queue.qsize doesn't work on macOS, so I use the implementation in this answer.
class Queue(multiprocessing.queues.Queue):
""" A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, ctx=multiprocessing.get_context())
self.size = SharedCounter(0)
def put(self, *args, **kwargs):
self.size.increment(1)
super().put(*args, **kwargs)
Note that ctx=multiprocessing.get_context() is added to fix the missing ctx, according to this answer.
The code in question:
def analyse(i, name, func, grid, queue):
...
queue.put((i, name, single, minimum, current, peak))
And Python's complaint:
Traceback (most recent call last):
File "/usr/local/Cellar/[email protected]/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/local/Cellar/[email protected]/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "...", line 26, in analyse
queue.put((i, name, single, minimum, current, peak))
File "...", line 48, in put
self.size.increment(1)
AttributeError: 'Queue' object has no attribute 'size'
Any ideas where the error is? Tried to debug in PyCharm, queue still has size when it's passed to multiprocessing.Process but it's no longer there when queue.put() is called in analyse().
Edit: Feel free to answer this question. However, I gave up on Queue and instead used multiprocessing.Manager, sacrificing some precious milliseconds there.
Solution 1:[1]
Ok, here is the complete working solution. Apparently the queue needs its state set and restored as mentioned here.
import multiprocessing
import multiprocessing.queues as mpq
class Queue(mpq.Queue):
""" A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
"""
def __init__(self, maxsize=-1, block=True, timeout=None):
self.block = block
self.timeout = timeout
super().__init__(maxsize, ctx=multiprocessing.get_context())
self.size = SharedCounter(0)
def __getstate__(self):
return super().__getstate__() + (self.size,)
def __setstate__(self, state):
super().__setstate__(state[:-1])
self.size = state[-1]
def put(self, *args, **kwargs):
super(Queue, self).put(*args, **kwargs)
self.size.increment(1)
def get(self, *args, **kwargs):
item = super(Queue, self).get(*args, **kwargs)
self.size.increment(-1)
return item
def qsize(self):
""" Reliable implementation of multiprocessing.Queue.qsize() """
return self.size.value
def empty(self):
""" Reliable implementation of multiprocessing.Queue.empty() """
return not self.qsize()
def clear(self):
""" Remove all elements from the Queue. """
while not self.empty():
self.get()
class SharedCounter(object):
""" A synchronized shared counter.
The locking done by multiprocessing.Value ensures that only a single
process or thread may read or write the in-memory ctypes object. However,
in order to do n += 1, Python performs a read followed by a write, so a
second process may read the old value before the new one is written by the
first process. The solution is to use a multiprocessing.Lock to guarantee
the atomicity of the modifications to Value.
This class comes almost entirely from Eli Bendersky's blog:
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
"""
def __init__(self, n = 0):
self.count = multiprocessing.Value('i', n)
def increment(self, n = 1):
""" Increment the counter by n (default = 1) """
with self.count.get_lock():
self.count.value += n
@property
def value(self):
""" Return the value of the counter """
return self.count.value
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
