'Multiple steps multithreading in Python
I'm a bit stuck with my multithreading processing. Not maybe stuck, but struggling. The case is:
Process incoming data elements read in while loop - I created a
multiprocessing.Pooland amultiprocessing.Queue. As a processing worker I do:the_pool = Pool(PROCESSES, process_item_queue, (i_queue, o_queue,))The idea is that process_item_queue generates some still complex data that I want to process ALSO in a queue (hence o_queue - output_queue). The body of process_item_queue looks something like:
while True: args = i_queue.get(block=True) if args is None: print("Breaking out of the queue") break output = process_item(*args) o_queue.put([output])Ideally, at some point, some of the processes could already start processing the o_queue, but I have no idea how to achieve that dynamically assigned processes.
The point I'm struggling with now, can actually be divided into multiple parts:
4.1. The process_item function returns a dict. All of the dicts have to be merged with some custom logic. My idea is, that each process should read two dicts from the queue, merge them using my custom logic, and republish dict back to the queue. At some point, obviously, we'll end up with just two dicts, possibly in different processes. I'd be okay with doing the last merge in main thread, but I don't know how to really inform my processes that, well, there's nothing more there! Normally, I'd send None and each process would quit, but this is more complicated, as we go 1024->512->256->128... etc. I also can't give it a definitive timeout, as depending on CPU, each process can take more or less time. 4.2 I can't really close the pool because o_queue is full of items. I could probably even hit some memory limits so it might be important to actually start processing those items on the fly.
Any advice will be appreciated ;)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
