'Work on pairs/tuples of items fetched from PriorityQueue

I have a process that supplies items to a JoinableQueue, the data_queue. Each item consists of an index, starting with 0, and a data string, consisting of three letters (for demo purposes). The data is added to the queue in a random order; I cannot rely on the indices being added in strictly ascending order.

I want to create another process that works off this data queue, sorting the received items into a new PriorityQueue such that I can fetch items from it in the correct index order. The catch is that I don't need to just get one item: I need them in tuples of two.

Hence, for following content of the priority queue:

idx data
0 "P8K"
1 "JOY"
2 "MR5"
3 "EZV"
4 "9VX"
5 "6WL"
6 "4W3"
7 "NNK"
8 "0II"
9 "KAN"

I want to process the items in the index tuples of (0, 1), (1, 2), and so on. The actual operation itself is very simple: concatenate the data strings. For the above data, the resulting output should be P8K-JOY, JOY-MR5, and so on.

I have the following code to do this, which works in principle:

#!/usr/bin/env python3
import random
from multiprocessing import JoinableQueue, Process
from queue import PriorityQueue
import string

data_queue = JoinableQueue(10)
output_queue = JoinableQueue(10)

random.seed(234)


def process_data_queue(data_queue: JoinableQueue, output_queue: JoinableQueue):
    prio_queue = PriorityQueue()
    current_index_required = 0
    this_item = None
    next_item = None
    processing_started = False
    while True:
        try:
            idx, data = data_queue.get_nowait()
            prio_queue.put((idx, data))
            data_queue.task_done()
        except Exception:
            pass

        print(
            f"current_index_required: {current_index_required}, this_item: {this_item}, queue: {[q[0] for q in prio_queue.queue]}"
        )
        if (
            not processing_started
            and len(prio_queue.queue)
            and prio_queue.queue[0][0] == current_index_required
        ):
            print(f"Got first item from queue {current_index_required}")
            this_item = prio_queue.get()
            processing_started = True

        # peek into the queue and check if the next item has the correct index
        if processing_started and this_item is not None and next_item is None:
            if not len(prio_queue.queue):
                print("No more items in queue!")
                break
            if prio_queue.queue[0][0] == current_index_required + 1:
                next_item = prio_queue.get()

                print(f"Processing {this_item} {next_item}")
                output_queue.put(
                    (current_index_required, this_item[1] + "-" + next_item[1])
                )
                this_item = next_item
                next_item = None
                current_index_required += 1


def process_output_queue(output_queue: JoinableQueue):
    while True:
        idx, data = output_queue.get()
        print(f"Got from output queue: {idx} {data}")
        output_queue.task_done()


indices = list(range(10))
random.shuffle(indices)
for frame_idx_to_add in indices:
    random_string = "".join(random.choices(string.ascii_uppercase + string.digits, k=3))
    print(f"Adding to data queue: {frame_idx_to_add} {random_string}")
    data_queue.put(
        (
            frame_idx_to_add,
            random_string,
        )
    )

p1 = Process(target=process_data_queue, args=(data_queue, output_queue))
p1.start()

p2 = Process(target=process_output_queue, args=(output_queue,))
p2.start()

data_queue.join()
output_queue.join()

print("Done")

p1.terminate()
p2.terminate()

The processed tuples P8K-JOY, JOY-MR5, etc. are placed in the output_queue.

My problem is that the process terminates too early. I can break from the process_data_queue function because I know when the queue itself is empty, but the output queue does not know when it should finish. So whenever you run this script, it stops at another point in time.

How do I make the processes know when to stop?

Also, the approach seems quite convoluted. Is there any way to make this code more concise?



Solution 1:[1]

First off all I think you're using the wrong data structure. Rather than a PriorityQueue, you just want a simple dictionary:

current_index == 0
current_items = {}
while True:
    while True:
        .... same as as above, but ....
           current_item[idx] = data
    while True:
        if current_index in current_items and current_index + 1 in current_items:
            ... output whatever you need...
        del current_items[current_index]
        current_index += 1

Your next question is a harder one to answer. Whenever you have an external data generator, it must provide some way of saying "I'm done". There really is no other way. It can set a flag; it can push a special value onto the queue; anything. Your consumer thread has no way of knowing if the Producer is done or just thinking really hard.

And you're not really gaining anything by using a joinable queue. Its use is for letting the producer know when the consumer is done. But I don't think that's an issue here.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Frank Yellin