'How to take items from queue in chunks?

I have mulitple producer threads that add objects to a shared queue concurrently.

I want to create a singlethread consumer that reads from that shared queue for further data processing (database batch insert).

Problem: I want to only take the data from the queue in chunks for better performance during batch insert. Thus I somehow have to detect how many items are in the queue, then take all of those items from the queue, and empty the queue again.

 BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<>();

 ExecutorService pes = Executors.newFixedThreadPool(4);
 ExecutorService ces = Executors.newFixedThreadPool(1);

 pes.submit(new Producer(sharedQueue, 1));
 pes.submit(new Producer(sharedQueue, 2));
 pes.submit(new Producer(sharedQueue, 3));
 pes.submit(new Producer(sharedQueue, 4));
 ces.submit(new Consumer(sharedQueue, 1));

class Producer implements Runnable {
    run() {
            ...
            sharedQueue.put(obj);
    }
}

class Consumer implements Runnable {
    run() {
            ...
            sharedQueue.take();
    }
}

Question for the Consumer: how can I poll the shared queue, wait for the queue having X items, then take all items and simultaneous empty the queue (so the consumer can start again polling and waiting)?

I'm open for any suggestions and am not necessarily bound to the code above.



Solution 1:[1]

Instead of checking size of the queue you'd better create an inner List in consumer and take objects from queue and add to that list. Once list has X items you do your processing and then empty internal list.

class Consumer implements Runnable {
  private List itemsToProcess = new ArrayList();
  run() {
        while (true) { // or when producers are stopped and queue is empty
          while (itemsToProcess.size() < X) {
            itemsToProcess.add(sharedQueue.take());
          }
          // do process
          itemsToProcess.clear();
        }
  }
}

And instead of BlockingQueue.take() you might use BlockingQueue.poll(timeout) with some reasonable timeout and checking results for null to detect situation when all producers are done and queue is empty to be able to shutdown your consumer.

Solution 2:[2]

I recently developed this utility that batch BlockingQueue elements using a flushing timeout if queue elements doesn't reach the batch size. It also supports fanOut pattern using multiple instances to elaborate the same set of data:

// Instantiate the registry
FQueueRegistry registry = new FQueueRegistry();

// Build FQueue consumer
registry.buildFQueue(String.class)
                .batch()
                .withChunkSize(5)
                .withFlushTimeout(1)
                .withFlushTimeUnit(TimeUnit.SECONDS)
                .done()
                .consume(() -> (broadcaster, elms) -> System.out.println("elms batched are: "+elms.size()));

// Push data into queue
for(int i = 0; i < 10; i++){
        registry.sendBroadcast("Sample"+i);
}

More info here!

https://github.com/fulmicotone/io.fulmicotone.fqueue

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 shmosel
Solution 2 eold