'nodejs stream csv data into mongoDB

I'm new to NodeJS and MongoDB and trying to insert a CSV file into MongoDB. my first version was to create an array variable and push the data into the array like this

.on('data',(data)=>{array.push(JSON.parse(data))}

then after pushing all the objects into the array I insert it into MongoDB using

TempModel.insertMany(array)

this solution worked great for me in small files and even large ones if I allocate enough memory for nodeJS so the array can store more objects. but in very large files I get an error

FATAL ERROR: Ineffective mark-compacts heap limit Allocation failed - JavaScript heap out of memory

I am guessing this error occurred because there are too many objects in the array (correct me if I am wrong)

So my new solution was to stream the CSV file and insert every line in it as an object into MongoDB, instead of pushing it into the array. but when I start the project it stops at the first line and doesn't insert it into the MongoDB. that's the code I have now.

  • any ideas on how can I make it work?
  • It is good to insert millions of objects one by one into the MongoDB instead of insertMany?

I have created a schema and model in mongoose, then created a read stream and converted the CSV file into objects, and then insert it into MongoDB

const tempSchema = new mongoose.Schema({},{stric:false});
const TempModel = mongoose.model('tempCollection',tempSchema);
fs.createReadStream(req.file.path)
.pipe(csv())
.on('data',(data) => {
TempModel.insertOne(JSON.parse(data));
})
.on('end',()=>{
console.log('finished');
)};


Solution 1:[1]

The snippet can be restructured to use stream pipes to control the data flow down to the MongoDB write operations. This will avoid the memory issue and provide a means to batch operations together.

A somewhat complete pseudocode example:

import util from "util";
import streams from "streams";

const tempSchema = new mongoose.Schema({},{stric:false});
const TempModel = mongoose.model('tempCollection',tempSchema);

// Promisify waiting for the file to be parsed and stored in mongodb
await util.promisify(streams.pipeline)(
  fs.createReadStream(req.file.path),
  csv(),
  // Create a writeable to piggyback streams built-in batch processing logic
  new streams.Writable({
    // bulkWrite() supports at most 1000 ops/call. Consequently, we do not need
    // to load/parse additional rows into memory when this queue is full
    highWaterMark: 1000,
    writev: async (chunks, next) => {
      try {
        // Bulk write documents to MongoDB
        await TempModel.bulkWrite(chunks.map(({chunk: data}) => ({
          insertOne: JSON.parse(data)
        })), {
          ordered: false
        });
        // Signal completion
        next();
      } 
      catch(error) {
        // Propagate errors
        next(error)
      }
    }
  })
);

console.log('finished');

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 jorgenkg