'Wait for the reading of a csv file

I'm trying to read a csv file in Typescript with "csv-parse" library by creating an observable; the following code use fs.createReadStream to read the file; I would like to return the observable and subscribe to it, but the process does not get nothing, probably because I'm not waiting the asynchronous fs.createReadStream; how to resolve?

import { parse } from "csv-parse";
import { Observable } from "rxjs";
import * as fs from "fs";
import path from "path";

export interface StdJsonDoc<T = string> {
  [key: string]: T;
}

export function createCsvObservable(
  filePath: string,
  fileType: string | undefined = undefined,
  fieldDelimiter: string = ",",
  columnHeader: boolean = true
) {
  if (fileType !== "csv") {
    throw Error(`Cannot create CSV observable from non CSV file`);
  }

  return new Observable<StdJsonDoc>((subscriber) => {
    const parser = fs.createReadStream(filePath).pipe(
      parse({
        delimiter: fieldDelimiter,
        columns: columnHeader,
        trim: true,
        skip_empty_lines: true,
        relax_column_count: true,
      })
    );

    parser.on("readable", () => {
      let record: StdJsonDoc;
      while ((record = parser.read())) {
        subscriber.next(record);
      }
    });
    parser.on("end", () => {
      subscriber.complete();
    });
    parser.on("error", () => {
      subscriber.error();
    });
  });
}

async function main() {
  const myObservableCsv = createCsvObservable(
    path.join(__dirname, "data", "myCsvFile.csv"),
    "csv"
  );
  myObservableCsv.subscribe({
    next: (record) => {
      console.log(`RECORD: ${record}`);
    },
    error: () => {
      console.log("ERROR");
    },
    complete: () => {
      console.log("COMPLETE");
    },
  });
}

main().then(() => {
  console.log(`*** END PROGRAM ***`);
  process.exit(0);
});


Solution 1:[1]

Your caller does not subscribe to the observable returned. What happens when you do:

createCsvObservable(path.join(__dirname, "data", "myCsvFile.csv"),"csv").subscribe(console.log);

Solution 2:[2]

The problem was the "asynchronous" main; removed "async" and simply called main() works.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 daflodedeing
Solution 2 user1