'Work on pairs/tuples of items fetched from PriorityQueue
I have a process that supplies items to a JoinableQueue, the data_queue. Each item consists of an index, starting with 0, and a data string, consisting of three letters (for demo purposes). The data is added to the queue in a random order; I cannot rely on the indices being added in strictly ascending order.
I want to create another process that works off this data queue, sorting the received items into a new PriorityQueue such that I can fetch items from it in the correct index order. The catch is that I don't need to just get one item: I need them in tuples of two.
Hence, for following content of the priority queue:
| idx | data |
|---|---|
| 0 | "P8K" |
| 1 | "JOY" |
| 2 | "MR5" |
| 3 | "EZV" |
| 4 | "9VX" |
| 5 | "6WL" |
| 6 | "4W3" |
| 7 | "NNK" |
| 8 | "0II" |
| 9 | "KAN" |
I want to process the items in the index tuples of (0, 1), (1, 2), and so on. The actual operation itself is very simple: concatenate the data strings. For the above data, the resulting output should be P8K-JOY, JOY-MR5, and so on.
I have the following code to do this, which works in principle:
#!/usr/bin/env python3
import random
from multiprocessing import JoinableQueue, Process
from queue import PriorityQueue
import string
data_queue = JoinableQueue(10)
output_queue = JoinableQueue(10)
random.seed(234)
def process_data_queue(data_queue: JoinableQueue, output_queue: JoinableQueue):
prio_queue = PriorityQueue()
current_index_required = 0
this_item = None
next_item = None
processing_started = False
while True:
try:
idx, data = data_queue.get_nowait()
prio_queue.put((idx, data))
data_queue.task_done()
except Exception:
pass
print(
f"current_index_required: {current_index_required}, this_item: {this_item}, queue: {[q[0] for q in prio_queue.queue]}"
)
if (
not processing_started
and len(prio_queue.queue)
and prio_queue.queue[0][0] == current_index_required
):
print(f"Got first item from queue {current_index_required}")
this_item = prio_queue.get()
processing_started = True
# peek into the queue and check if the next item has the correct index
if processing_started and this_item is not None and next_item is None:
if not len(prio_queue.queue):
print("No more items in queue!")
break
if prio_queue.queue[0][0] == current_index_required + 1:
next_item = prio_queue.get()
print(f"Processing {this_item} {next_item}")
output_queue.put(
(current_index_required, this_item[1] + "-" + next_item[1])
)
this_item = next_item
next_item = None
current_index_required += 1
def process_output_queue(output_queue: JoinableQueue):
while True:
idx, data = output_queue.get()
print(f"Got from output queue: {idx} {data}")
output_queue.task_done()
indices = list(range(10))
random.shuffle(indices)
for frame_idx_to_add in indices:
random_string = "".join(random.choices(string.ascii_uppercase + string.digits, k=3))
print(f"Adding to data queue: {frame_idx_to_add} {random_string}")
data_queue.put(
(
frame_idx_to_add,
random_string,
)
)
p1 = Process(target=process_data_queue, args=(data_queue, output_queue))
p1.start()
p2 = Process(target=process_output_queue, args=(output_queue,))
p2.start()
data_queue.join()
output_queue.join()
print("Done")
p1.terminate()
p2.terminate()
The processed tuples P8K-JOY, JOY-MR5, etc. are placed in the output_queue.
My problem is that the process terminates too early. I can break from the process_data_queue function because I know when the queue itself is empty, but the output queue does not know when it should finish. So whenever you run this script, it stops at another point in time.
How do I make the processes know when to stop?
Also, the approach seems quite convoluted. Is there any way to make this code more concise?
Solution 1:[1]
First off all I think you're using the wrong data structure. Rather than a PriorityQueue, you just want a simple dictionary:
current_index == 0
current_items = {}
while True:
while True:
.... same as as above, but ....
current_item[idx] = data
while True:
if current_index in current_items and current_index + 1 in current_items:
... output whatever you need...
del current_items[current_index]
current_index += 1
Your next question is a harder one to answer. Whenever you have an external data generator, it must provide some way of saying "I'm done". There really is no other way. It can set a flag; it can push a special value onto the queue; anything. Your consumer thread has no way of knowing if the Producer is done or just thinking really hard.
And you're not really gaining anything by using a joinable queue. Its use is for letting the producer know when the consumer is done. But I don't think that's an issue here.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Frank Yellin |
