'How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?
I am getting data from a queue server and I need to process it and send an acknowledgement. Something like this:
while (true) {
queueserver.get.data
ThreadPoolExecutor //send data to thread
queueserver.acknowledgement
I don't fully understand what happens in threads but I think this program gets the data, sends it the thread and then immediately acknowledges it. So even if I have a limit of each queue can only have 200 unacknowledged items, it will just pull as fast as it can receive it. This is good when I write a program on a single server, but if I'm using multiple workers then this becomes an issue because the amount of items in the thread queue are not a reflection of the work its done but instead of how fast it can get items from the queue server.
Is there anything I can do to somehow make the program wait if the thread queue is full of work?
Solution 1:[1]
If you want the acknowledgment when the worker starts working on the task, you can make a custom ThreadFactory that sends the acknowledgment from the thread before doing the actual work. OR you can override beforeExecute of a ThreadPoolExecutor.
If you want the acknowledgment when a new worker is freed up for a new task, I think you can initialize a ThreadPoolExecutor with a SynchronousQueue and a ThreadPoolExecutor.CallerRunsPolicy, or with your own policy where the caller blocks.
Solution 2:[2]
first, i think your attitude is wrong because what you did in your pseudo code is busy waiting, you should read through the Concurrency tutorial from java toturial http://docs.oracle.com/javase/tutorial/essential/concurrency/
ignoring that, ill offer you a solution with the busy wait (which is not recommanded):
ExecutorService e1 = Executors.newFixedThreadPool(20);
while (true) {
if (!serverq.isEmpty() && !myq.isFull()) myq.enq(serverq.poll());
if (!myq.isEmpty()) e1.execute(myq.poll());
}
NOTES:
1.make sure your myq is synchronized, as said in the other answers. you can extend some blocking queue to make sure the synchronization is correct.
2.you implement a runnable class which does what you exepct from the server in an iteration of service, those runnables have to get myq as a parameter to the constructor and save it as global variable.
3.myq gets the runnables, that in the end of its run method, you must make sure the runnable deletes itself from myq.
Solution 3:[3]
What about having a blockingPool which will not execute more than 200 tasks and wait for a task to complete before submitting 201 task. I've achieved it using semaphore in my application. You can also change the limit by passing the value to its constructor.
Only difference here from @Gray answer is that rarely any task will get rejected in this case. Semaphore will make any 201 task to wait unless a other task gets over. Nevertheless, we have rejection handler to re-submit that task to executor in case of any rejection.
private class BlockingPool extends ThreadPoolExecutor {
private final Semaphore semaphore;
public BlockingPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int tasksAllowedInThreads){
super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
executor.execute(r);
}
});
semaphore = new Semaphore(tasksAllowedInThreads);
}
@Override
public void execute(Runnable task){
boolean acquired = false;
do{
try{
semaphore.acquire();
acquired = true;
} catch (final InterruptedException e){
// log
}
} while (!acquired); // run in loop to handle InterruptedException
try{
super.execute(task);
} catch (final RejectedExecutionException e){
System.out.println("Task Rejected");
semaphore.release();
throw e;
}
}
@Override
protected void afterExecute(Runnable r, Throwable t){
super.afterExecute(r, t);
if (t != null){
t.printStackTrace();
}
semaphore.release();
}
}
Does this make sense!
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | |
| Solution 3 | Rahul Winner |
