使用mongoose在MongoDB中批量upsert

use*_*267 27 javascript mongoose mongodb node.js mongodb-query

有没有选择用mongoose执行批量upserts?所以基本上有一个数组并插入每个元素,如果它不存在或更新它,如果它存在?(我正在使用海关_ids)

当我使用.insert时, MongoDB会为重复键返回错误E11000(应该更新).插入多个新文档可以正常工作:

var Users = self.db.collection('Users');

Users.insert(data, function(err){
            if (err) {
                callback(err);
            }
            else {
                callback(null);
            }
        });
Run Code Online (Sandbox Code Playgroud)

使用.save会返回一个错误,该参数必须是单个文档:

Users.save(data, function(err){
   ...
}
Run Code Online (Sandbox Code Playgroud)

这个答案表明没有这样的选择,但它具体针对C#而且已经有3年了.所以我想知道是否有任何选择使用猫鼬做到这一点?

谢谢!

Nei*_*unn 22

特别是在"猫鼬"中,或者至少在编写时没有.从2.6版本开始,MongoDB shell实际上使用了"Bulk operations API ",因为它适用于所有常规帮助方法.在它的实现中,它首先尝试执行此操作,如果检测到旧版本服务器,则遗留实现存在"后备".

所有mongoose方法"当前"使用"遗留"实现或写入关注响应和基本遗留方法.但是.collection,任何给定的mongoose模型都有一个访问器,它基本上从底层的"节点本机驱动程序"访问"集合对象",其中实现了mongoose:

 var mongoose = require('mongoose'),
     Schema = mongoose.Schema;

 mongoose.connect('mongodb://localhost/test');

 var sampleSchema  = new Schema({},{ "strict": false });

 var Sample = mongoose.model( "Sample", sampleSchema, "sample" );

 mongoose.connection.on("open", function(err,conn) { 

    var bulk = Sample.collection.initializeOrderedBulkOp();
    var counter = 0;

    // representing a long loop
    for ( var x = 0; x < 100000; x++ ) {

        bulk.find(/* some search */).upsert().updateOne(
            /* update conditions */
        });
        counter++;

        if ( counter % 1000 == 0 )
            bulk.execute(function(err,result) {             
                bulk = Sample.collection.initializeOrderedBulkOp();
            });
    }

    if ( counter % 1000 != 0 )
        bulk.execute(function(err,result) {
           // maybe do something with result
        });

 });
Run Code Online (Sandbox Code Playgroud)

主要的问题是"mongoose方法"实际上意识到实际上还没有建立连接并且"排队"直到完成.你正在"挖掘"的本地驱动程序没有做出这种区分.

所以你必须要知道连接是以某种方式或形式建立的.但是只要你小心你正在做的事情,你就可以使用本机驱动程序方法.

  • @joeytwiddle"批量"操作在调用`.execute()`之前不是异步的.目的是服务器的任何"来回"都会在IO中花费,因此您正在尝试将其最小化.确实,在同步循环中,您可能会多次发生`.execute()`并使用多个连接.但是您可以使用[`async.whilst`](https://github.com/caolan/async#whilst)或其他控件来改变它,其中迭代可以通过回调来控制(因此在.execute内部( )`)处理完成.这对承诺来说有点难,但仍有可能. (2认同)

kon*_*mer 18

您不需要像@ neil-lunn建议的那样管理限制(1000).Mongoose已经这样做了.我用他的好答案作为完整的基于Promise的实现和示例的基础:

var Promise = require('bluebird');
var mongoose = require('mongoose');

var Show = mongoose.model('Show', {
  "id": Number,
  "title": String,
  "provider":  {'type':String, 'default':'eztv'}
});

/**
 * Atomic connect Promise - not sure if I need this, might be in mongoose already..
 * @return {Priomise}
 */
function connect(uri, options){
  return new Promise(function(resolve, reject){
    mongoose.connect(uri, options, function(err){
      if (err) return reject(err);
      resolve(mongoose.connection);
    });
  });
}

/**
 * Bulk-upsert an array of records
 * @param  {Array}    records  List of records to update
 * @param  {Model}    Model    Mongoose model to update
 * @param  {Object}   match    Database field to match
 * @return {Promise}  always resolves a BulkWriteResult
 */
function save(records, Model, match){
  match = match || 'id';
  return new Promise(function(resolve, reject){
    var bulk = Model.collection.initializeUnorderedBulkOp();
    records.forEach(function(record){
      var query = {};
      query[match] = record[match];
      bulk.find(query).upsert().updateOne( record );
    });
    bulk.execute(function(err, bulkres){
        if (err) return reject(err);
        resolve(bulkres);
    });
  });
}

/**
 * Map function for EZTV-to-Show
 * @param  {Object} show EZTV show
 * @return {Object}      Mongoose Show object
 */
function mapEZ(show){
  return {
    title: show.title,
    id: Number(show.id),
    provider: 'eztv'
  };
}

// if you are  not using EZTV, put shows in here
var shows = []; // giant array of {id: X, title: "X"}

// var eztv = require('eztv');
// eztv.getShows({}, function(err, shows){
//   if(err) return console.log('EZ Error:', err);

//   var shows = shows.map(mapEZ);
  console.log('found', shows.length, 'shows.');
  connect('mongodb://localhost/tv', {}).then(function(db){
    save(shows, Show).then(function(bulkRes){
      console.log('Bulk complete.', bulkRes);
      db.close();
    }, function(err){
        console.log('Bulk Error:', err);
        db.close();
    });
  }, function(err){
    console.log('DB Error:', err);
  });

// });
Run Code Online (Sandbox Code Playgroud)

这有助于在完成连接时关闭连接,如果你关心则显示任何错误,但如果没有则忽略它们(Promises中的错误回调是可选的.)它也非常快.离开这里分享我的发现.例如,如果要将所有eztv节目保存到数据库,可以取消注释eztv的内容.

  • @ECMAScript事实上,Neil和konsumer的建议都消耗了相似数量的Node内存,因为这两种技术都在不等待Mongo响应的情况下继续创建文档.显然,如果您打算插入的文档多于RAM中的文档,这只是一个问题. (2认同)

Mir*_*ili 6

await Model.bulkWrite(docs.map(doc => ({
    updateOne: {
        filter: {id: doc.id},
        update: doc,
        upsert: true
    }
})))

Run Code Online (Sandbox Code Playgroud)

或者更详细:

const bulkOps = docs.map(doc => ({
    updateOne: {
        filter: {id: doc.id},
        update: doc,
        upsert: true
    }
}))

Model.bulkWrite(bulkOps)
        .then(bulkWriteOpResult => console.log('BULK update OK:', bulkWriteOpResult))
        .catch(err => console.error('BULK update error:', err))
Run Code Online (Sandbox Code Playgroud)

/sf/answers/4223111301/


Ada*_*eis 5

我已经为 Mongoose 发布了一个插件,它公开了一个静态upsertMany方法来执行带有承诺接口的批量 upsert 操作。

使用这个插件而不是在底层集合上初始化你自己的批量操作的另一个好处是,这个插件首先将你的数据转换为 Mongoose 模型的数据,然后在 upsert 之前转换回普通对象。这可确保应用 Mongoose 模式验证,并减少数据填充并适合原始插入。

https://github.com/meanie/mongoose-upsert-many https://www.npmjs.com/package/@meanie/mongoose-upsert-many

希望能帮助到你!