在nodeJS中缓冲流中的数据以执行批量插入

dbr*_*rrt 4 javascript buffer stream mongodb node.js

如何在 NodeJS 中有效缓冲从流到批量插入的事件,而不是从流接收到的每个记录的唯一插入。这是我想到的伪代码:

// Open MongoDB connection

mystream.on('data', (record) => {
   // bufferize data into an array
   // if the buffer is full (1000 records)
   // bulk insert into MongoDB and empty buffer
})

mystream.on('end', () => {
   // close connection
})

Run Code Online (Sandbox Code Playgroud)

这看起来现实吗?有没有可能的优化?现有的图书馆设施有哪些?

jor*_*nkg 5

使用 NodeJS 的stream库,可以简洁有效地实现为:

const stream = require('stream');
const util = require('util');
const mongo = require('mongo');

const streamSource; // A stream of objects from somewhere

// Establish DB connection
const client = new mongo.MongoClient("uri");
await client.connect();

// The specific collection to store our documents
const collection = client.db("my_db").collection("my_collection");

await util.promisify(stream.pipeline)( 
  streamSource, 
  stream.Writable({
    objectMode: true,
    highWaterMark: 1000,
    writev: async (chunks, next) => {
      try {
        const documents = chunks.map(({chunk}) => chunk);
        
        await collection.insertMany(docs, {ordered: false});

        next();
      }
      catch( error ){
        next( error );
      }
    }
  })
);
Run Code Online (Sandbox Code Playgroud)