'Async Concurrent Queue with max concurrency
I'm running across a bug with a custom asynchronous queue that calls 10 async functions at a time.
I'm initiating the queue with 50 jobs, once first 10 jobs are finished the queue moves to the subsequent 10 until it finishes all.
The bug I'm coming across is that once it finishes 50, it restarts with first 5 jobs with 2 or 3 or 1 job at a time. It also takes fewer than 10 jobs towards the end of the queue.
Please create these two files and test with mocha and see the output yourself.
Note: Set the timeout in mocha to 0 to keep the test running for prolonged period of time.
Queue.js
function Queue(func, max) {
this.jobs = [];
this.func = func;
this.max = max ? max : 10;
}
Queue.prototype.push = function(data) {
var self = this;
return new Promise(function(resolve, reject){
self.jobs.push({data: data, resolve: resolve, reject: reject});
if(!self.progress) {
self.progress = true;
self.run();
}
});
};
Queue.prototype.run = function() {
var self = this;
var tasks = [];
console.log("--------------------");
for(var i=0; i<this.jobs.length && i < this.max; i++) {
tasks.push(this.jobs.shift());
console.log("queuing", tasks[tasks.length-1].data);
}
console.log("Total jobs queued", tasks.length);
Promise.all(
tasks.map(function(task){
return self.func(task.data)
.then(task.resolve, task.reject);
}
)).then(this.next.bind(this));
};
Queue.prototype.next = function(){
if(this.jobs.length) {
this.run();
} else {
this.progress = false;
}
};
module.exports = Queue;
QueueTest.js
function async(data) {
return new Promise(function(resolve, reject){
setTimeout(function(){
console.log("resolving", data);
resolve(data);
}, Math.random() * 5000);
});
}
it("should test queue", function(done){
var queue = new Queue(async);
Promise.all(
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50].map(queue.push.bind(queue))
).then(function(){
done();
});
});
Solution 1:[1]
The issue is this condition in the for loop: i<this.jobs.length
i is counting the number of jobs scheduled in the batch. This condition is correct when i is an index in the jobs array. In this case, we just want to confirm there are still jobs left to process, so we can simply use: this.jobs.length>0
The odd behavior towards the end of the queue is because the length is falling as elements are shifted off the jobs array, but the number of jobs scheduled in that batch (i) is increasing. Let's take an example where this.jobs.length is 4 when entering the for loop:
| i | this.jobs.length | i < this.jobs.length |
|---|---|---|
| 0 | 4 | true |
| 1 | 3 | true |
| 2 | 2 | false |
In this case, the loop is exited after only scheduling 2 of the four jobs in the queue. Checking if there are remaining tasks instead fixes the issue:
for(var i=0; this.jobs.length > 0 && i < this.max; i++) {
Solution 2:[2]
Schedule task parallelly from array without waiting any to finish within allowed threads
const fastQueue = async <T, Q>(
x: T[],
threads: number,
fn: (v: T, i: number, a: T[]) => Promise<Q>
) => {
let k = 0;
const result = Array(x.length) as Q[];
await Promise.all(
[...Array(threads)].map(async () => {
while (k < x.length) result[k] = await fn(x[k], k++, x);
})
);
return result;
};
const demo = async () => {
const wait = (x: number) => new Promise(r => setTimeout(r, x, x))
console.time('a')
console.log(await fastQueue([1000, 2000, 3000, 2000, 2000], 4, (v) => wait(v)))
console.timeEnd('a')
}
demo();
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Anthony DiSanti |
| Solution 2 |
