'Wait for the reading of a csv file
I'm trying to read a csv file in Typescript with "csv-parse" library by creating an observable; the following code use fs.createReadStream to read the file; I would like to return the observable and subscribe to it, but the process does not get nothing, probably because I'm not waiting the asynchronous fs.createReadStream; how to resolve?
import { parse } from "csv-parse";
import { Observable } from "rxjs";
import * as fs from "fs";
import path from "path";
export interface StdJsonDoc<T = string> {
[key: string]: T;
}
export function createCsvObservable(
filePath: string,
fileType: string | undefined = undefined,
fieldDelimiter: string = ",",
columnHeader: boolean = true
) {
if (fileType !== "csv") {
throw Error(`Cannot create CSV observable from non CSV file`);
}
return new Observable<StdJsonDoc>((subscriber) => {
const parser = fs.createReadStream(filePath).pipe(
parse({
delimiter: fieldDelimiter,
columns: columnHeader,
trim: true,
skip_empty_lines: true,
relax_column_count: true,
})
);
parser.on("readable", () => {
let record: StdJsonDoc;
while ((record = parser.read())) {
subscriber.next(record);
}
});
parser.on("end", () => {
subscriber.complete();
});
parser.on("error", () => {
subscriber.error();
});
});
}
async function main() {
const myObservableCsv = createCsvObservable(
path.join(__dirname, "data", "myCsvFile.csv"),
"csv"
);
myObservableCsv.subscribe({
next: (record) => {
console.log(`RECORD: ${record}`);
},
error: () => {
console.log("ERROR");
},
complete: () => {
console.log("COMPLETE");
},
});
}
main().then(() => {
console.log(`*** END PROGRAM ***`);
process.exit(0);
});
Solution 1:[1]
Your caller does not subscribe to the observable returned. What happens when you do:
createCsvObservable(path.join(__dirname, "data", "myCsvFile.csv"),"csv").subscribe(console.log);
Solution 2:[2]
The problem was the "asynchronous" main; removed "async" and simply called main() works.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | daflodedeing |
Solution 2 | user1 |