'Asynchronous readline loop without async / await
I'd like to be using this function (which runs fine on my laptop), or something like it, on my embedded device. But two problems:
- Node.JS on our device is so old that the JavaScript doesn't support async / await. Given our hundreds of units in the field, updating is probably impractical.
- Even if that weren't a problem, we have another problem: the file may be tens of megabytes in size, not something that could fit in memory all at once. And this for-await loop will set thousands of asynchronous tasks into motion, more or less all at once.
async function sendOldData(pathname) {
const reader = ReadLine.createInterface({
input: fs.createReadStream(pathname),
crlfDelay: Infinity
})
for await (const line of reader) {
record = JSON.parse(line);
sendOldRecord(record);
}
}
function sendOldRecord(record) {...}
Promises work in this old version. I'm sure there is a graceful syntax for doing this sequentially with Promises:
read one line
massage its data
send that data to our server
sequentially, but asynchronously, so that the JavaScript event loop is not blocked while the data is sent to the server.
Please, could someone suggest the right syntax for doing this in my outdated JavaScript?
Solution 1:[1]
Make a queue so it takes the next one off the array
function foo() {
const reader = [
'{"foo": 1}',
'{"foo": 2}',
'{"foo": 3}',
'{"foo": 4}',
'{"foo": 5}',
];
function doNext() {
if (!reader.length) {
console.log('done');
return;
}
const line = reader.shift();
const record = JSON.parse(line);
sendOldRecord(record, doNext);
}
doNext();
}
function sendOldRecord(record, done) {
console.log(record);
// what ever your async task is
window.setTimeout(function () {
done();
}, Math.floor(2000 * Math.random()));
}
foo();
Solution 2:[2]
The Problem
Streams and asynchronous processing are somewhat of a pain to get them to work well together and handle all possible error conditions and things are even worse for the readline module. Since you seem to be saying that you can't use the for await () construct for the readable (which even when it is supported, has various issues), things are even a bit more complicated.
The main problem with readline.createInterface() on a stream is that it reads a chunk of the file, parses that chunk for full lines and then synchronously sends all the lines in a tight for loop.
You can literally see the code here:
for (let n = 0; n < lines.length; n++) this[kOnLine](lines[n]);
The implementation of kOnLine does this:
this.emit('line', line);
So, this is a tight for loop that emits all the lines it read out. So ... if you try to do something asynchronous in your responding to the line event, the moment you hit an await or an asynchronous callback, this readline code will send the next line event before you're done processing the previous one. This makes it a pain to do asynchronous processing of the line events in sequential order where you finishing asynchronous processing one line before starting on the next one. IMO, this is a very busted design as it only really works with synchronous processing. You will notice that this for loop also doesn't care if the readline object was paused either. It just pumps out all the lines it has without regard for anything.
Discussion of Possible Solutions
So, what to do about that. Some part of a fix for this is in the asynchronous iterator interface to readline (but it has other problems which I've filed bugs on). But, the supposition of your question seems to be that you can't use that asynchronous iterator interface because your device may have an older version of nodejs. If that's the case, then I only know of two options:
- Ditch the
readline.createInterface()functionality entirely and either use a 3rd party module or do your own line boundary processing. - Cover the
lineevent with your own code that supports asynchronous processing of lines without getting the next line in the middle of still processing the previous one.
A Solution
I've written an implementation for option #2, covering the line event with your own code. In my implementation, we just acknowledge that line events will arrive during our asynchronous processing of previous lines, but instead of notifying you about then, the input stream gets paused and these "early" lines get queued. With this solution the readline code will read a chunk of data from the input stream, parse it into its full lines, synchronously send all the line events for those full lines. But, upon receipt of the first line event, we will pause the input stream and initiate queueing of subsequent line events. So, you can asynchronously process a line and you won't get another one until you ask for the next line.
This code has a different way of communicating incoming lines to your code. Since we're in the age of promises for asynchronous code, I've added a promise-based reader.getNextLine() function to the reader object.
This lets you write code like this:
import fs from 'fs';
async function run(filename) {
let reader = createLineReader({
input: fs.createReadStream(filename),
crlfDelay: Infinity
});
let line;
let cntr = 0;
while ((line = await reader.getNextLine()) !== null) {
// simulate some asynchronous operation in the processing of the line
console.log(`${++cntr}: ${line}`);
await processLine(line);
}
}
run("temp.txt").then(result => {
console.log("done");
}).catch(err => {
console.log(err);
});
And, here's the implementation of createLineReader():
import * as ReadLine from 'readline';
function createLineReader(options) {
const stream = options.input;
const reader = ReadLine.createInterface(options);
// state machine variables
let latchedErr = null;
let isPaused = false;
let readerClosed = false;
const queuedLines = [];
// resolves with line
// resolves with null if no more lines
// rejects with error
reader.getNextLine = async function() {
if (latchedErr) {
// once we get an error, we're done
throw latchedErr;
} else if (queuedLines.length) {
// if something in the queue, return the oldest from the queue
const line = queuedLines.shift();
if (queuedLines.length === 0 && isPaused) {
reader.resume();
}
return line;
} else if (readerClosed) {
// if nothing in the queue and the reader is closed, then signify end of data
return null;
} else {
// waiting for more line data to arrive
return new Promise((resolve, reject) => {
function clear() {
reader.off('error', errorListener);
reader.off('queued', queuedListener);
reader.off('done', doneListener);
}
function queuedListener() {
clear();
resolve(queuedLines.shift());
}
function errorListener(e) {
clear();
reject(e);
}
function doneListener() {
clear();
resolve(null);
}
reader.once('queued', queuedListener);
reader.once('error', errorListener);
reader.once('done', doneListener);
});
}
}
reader.on('pause', () => {
isPaused = true;
}).on('resume', () => {
isPaused = false;
}).on('line', line => {
queuedLines.push(line);
if (!isPaused) {
reader.pause();
}
// tell any queue listener that something was just added to the queue
reader.emit('queued');
}).on('close', () => {
readerClosed = true;
if (queuedLines.length === 0) {
reader.emit('done');
}
});
return reader;
}
Explanation
Internally, the implementation takes each new line event and puts it into a queue. Then, reader.getNextLine() just pulls items from the queue or waits (with a promise) for the queue to get something put in it.
During operation, the readline object will get a chunk of data from your readstream, it will parse that into whole lines. The whole lines will all get added to the queue (via line events). The readstream will be paused so it won't generate any more lines until the queue has been drained.
When the queue becomes empty, the readstream will be resumed so it can send more data to the reader object.
This is scalable to very large files because it will only queue the whole lines found in one chunk of the file being read. Once those lines are queued, the input stream is paused so it won't put more into the queue. After the queue is drained, the inputs stream is resumed so it can send more data and repeat...
Any errors in the readstream will trigger an error event on the readline object which will either reject a reader.getNextLine() that is already waiting for the next line or will reject the next time reader.getNextLine() is called.
Disclaimers
This has only been tested with file-based readstreams.
I would not recommend having more than one reader.getNextLine() in process at once as this code does not anticipate that and it's not even clear what that should do.
Solution 3:[3]
Basically you can achieve this using functional approach:
const arrayOfValues = [1,2,3,4,5];
const chainOfPromises = arrayOfValues.reduce((acc, item) => {
return acc.then((result) => {
// Here you can add your logic for parsing/sending request
// And here you are chaining next promise request
return yourAsyncFunction(item);
})
}, Promise.resolve());
// Basically this will do
// Promise.resolve().then(_ => yourAsyncFunction(1)).then(_ => yourAsyncFunction(2)) and so on...
// Start
chainOfPromises.then();
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | epascarello |
| Solution 2 | |
| Solution 3 |
