mongodb将文档从一个集合移动到另一个集合

man*_*jpt 47 mongodb

如何文档可以从一个集合移动到另一个集合MongoDB中?例如:我在集合A中有很多文档,我想将所有1个月的旧文档移动到集合B(这些1个月的旧文档不应该在集合A中).

使用聚合我们可以复制.但我想要做的是移动文件.可以用什么方法移动文件?

Mar*_*erg 59

更新2

请不要再回答这个问题了.正如写的@ jasongarber的答案在任何方面都更好.

更新

@jasongarber的答案是一种更安全的方法,应该用来代替我的.


如果我找到了你想要移动超过1个月的所有文档,并且你使用mongoDB 2.6,则没有理由不使用批量操作,这是我所知道的多种操作的最有效方式:

> var bulkInsert = db.target.initializeUnorderedBulkOp()
> var bulkRemove = db.source.initializeUnorderedBulkOp()
> var date = new Date()
> date.setMonth(date.getMonth() -1)
> db.source.find({"yourDateField":{$lt: date}}).forEach(
    function(doc){
      bulkInsert.insert(doc);
      bulkRemove.find({_id:doc._id}).removeOne();
    }
  )
> bulkInsert.execute()
> bulkRemove.execute()
Run Code Online (Sandbox Code Playgroud)

这应该非常快,并且它具有以下优点:如果在批量插入期间出现问题,原始数据仍然存在.


编辑

为了防止使用太多内存,您可以对每个x处理的文档执行批量操作:

> var bulkInsert = db.target.initializeUnorderedBulkOp()
> var bulkRemove = db.source.initializeUnorderedBulkOp()
> var x = 10000
> var counter = 0
> var date = new Date()
> date.setMonth(date.getMonth() -1)
> db.source.find({"yourDateField":{$lt: date}}).forEach(
    function(doc){
      bulkInsert.insert(doc);
      bulkRemove.find({_id:doc._id}).removeOne();
      counter ++
      if( counter % x == 0){
        bulkInsert.execute()
        bulkRemove.execute()
        bulkInsert = db.target.initializeUnorderedBulkOp()
        bulkRemove = db.source.initializeUnorderedBulkOp()
      }
    }
  )
> bulkInsert.execute()
> bulkRemove.execute()
Run Code Online (Sandbox Code Playgroud)

  • @Arthur:你的方法有两个主要缺点.***慢***在最糟糕的情况下,您可能会有不完整的集合难以再次同步. (3认同)

jas*_*ber 54

批量操作@ markus-w-mahlberg显示(和@ mark-mullin精炼)有效但不安全.如果bulkInsert失败,bulkRemove仍将继续.为了确保您在移动时不丢失任何记录,请使用以下内容:

function insertBatch(collection, documents) {
  var bulkInsert = collection.initializeUnorderedBulkOp();
  var insertedIds = [];
  var id;
  documents.forEach(function(doc) {
    id = doc._id;
    // Insert without raising an error for duplicates
    bulkInsert.find({_id: id}).upsert().replaceOne(doc);
    insertedIds.push(id);
  });
  bulkInsert.execute();
  return insertedIds;
}

function deleteBatch(collection, documents) {
  var bulkRemove = collection.initializeUnorderedBulkOp();
  documents.forEach(function(doc) {
    bulkRemove.find({_id: doc._id}).removeOne();
  });
  bulkRemove.execute();
}

function moveDocuments(sourceCollection, targetCollection, filter, batchSize) {
  print("Moving " + sourceCollection.find(filter).count() + " documents from " + sourceCollection + " to " + targetCollection);
  var count;
  while ((count = sourceCollection.find(filter).count()) > 0) {
    print(count + " documents remaining");
    sourceDocs = sourceCollection.find(filter).limit(batchSize);
    idsOfCopiedDocs = insertBatch(targetCollection, sourceDocs);

    targetDocs = targetCollection.find({_id: {$in: idsOfCopiedDocs}});
    deleteBatch(sourceCollection, targetDocs);
  }
  print("Done!")
}
Run Code Online (Sandbox Code Playgroud)

  • 你能提供一个运行 moveDocuments 的例子吗? (5认同)
  • 你是绝对正确的!*捂脸* (3认同)

小智 12

插入和删除:

var documentsToMove = db.collectionA.find({});
documentsToMove.forEach(function(doc) {
    db.collectionB.insert(doc);
    db.collectionA.remove(doc);
});
Run Code Online (Sandbox Code Playgroud)

注意:对于拥有大型文档的大型集合或集合,此方法可能非常慢.

  • 最后一行应该是`});`而不仅仅是`}`.缺少结束括号. (5认同)
  • 这不是原子的,有可能在集合B中插入一些东西,而不是从A中删除. (2认同)

kar*_*thi 6

$ out用于创建具有数据的新集合,因此请使用$ out

db.oldCollection.aggregate([{$out : "newCollection"}])
Run Code Online (Sandbox Code Playgroud)

然后使用drop

db.oldCollection.drop()
Run Code Online (Sandbox Code Playgroud)

  • 请记住,如果该名称已存在,这将覆盖整个集合(而不是附加旧集合中的匹配文档)! (3认同)

Nin*_*nad 5

您可以使用范围查询从 sourceCollection 获取数据并将游标数据保留在变量中并在其上循环并插入到目标集合:

 var doc = db.sourceCollection.find({
        "Timestamp":{
              $gte:ISODate("2014-09-01T00:00:00Z"),
              $lt:ISODate("2014-10-01T00:00:00Z")
        }
 });

 doc.forEach(function(doc){
    db.targetCollection.insert(doc);
 })
Run Code Online (Sandbox Code Playgroud)

希望有帮助!


Mat*_*lls 5

这是对 @jasongarber 答案的更新,它使用了最新的 mongo 'bulkWrite' 操作(请在此处阅读文档),并且还保持整个过程异步,以便您可以将其作为更广泛脚本的一部分运行,这取决于其完成情况。

async function moveDocuments (sourceCollection, targetCollection, filter) {
  const sourceDocs = await sourceCollection.find(filter)

  console.log(`Moving ${await sourceDocs.count()} documents from ${sourceCollection.collectionName} to ${targetCollection.collectionName}`)

  const idsOfCopiedDocs = await insertDocuments(targetCollection, sourceDocs)

  const targetDocs = await targetCollection.find({_id: {$in: idsOfCopiedDocs}})
  await deleteDocuments(sourceCollection, targetDocs)

  console.log('Done!')
}

async function insertDocuments (collection, documents) {
  const insertedIds = []
  const bulkWrites = []

  await documents.forEach(doc => {
    const {_id} = doc

    insertedIds.push(_id)
    bulkWrites.push({
      replaceOne: {
        filter: {_id},
        replacement: doc,
        upsert: true,
      },
    })
  })

  if (bulkWrites.length) await collection.bulkWrite(bulkWrites, {ordered: false})

  return insertedIds
}

async function deleteDocuments (collection, documents) {
  const bulkWrites = []

  await documents.forEach(({_id}) => {
    bulkWrites.push({
      deleteOne: {
        filter: {_id},
      },
    })
  })

  if (bulkWrites.length) await collection.bulkWrite(bulkWrites, {ordered: false})
}
Run Code Online (Sandbox Code Playgroud)


Isu*_*ghe 5

第一个选项(使用 mongo dump)

1.从集合中获取转储

mongodump -d db -c 源集合

2.从集合中恢复

mongorestore -d db -c target_collection dir=dump/db_name/source_collection.bson

第二个选项

运行聚合

db.getCollection('source_collection').aggregate([ { $match: {"emailAddress" : "apitester@mailinator.com"} }, { $out: "target_collection" } ])

第三种选择(最慢)

运行一个for循环

db.getCollection('source_collection').find().forEach(function(docs){ db.getCollection('target_collection').insert(docs); }) print("回滚完成!");