以最有效的方式更新许多(100k +)文档MongoDB

fil*_*rak 2 javascript mongoose mongodb node.js promise

我有一个定期运行的函数,它更新了我的Collection中item.price的一些函数.将有10万个+项目.该函数如下所示:DocumentsPricesPrice Collection

 //Just a helper function for multiple GET requests with request.
let _request = (urls, cb) => {
    let results = {}, i = urls.length, c = 0;
    handler = (err, response, body) => {
        let url = response.request.uri.href;
        results[url] = { err, response, body };

        if (++c === urls.length) {
            cb(results);
        }
    };
    while (i--) {
        request(urls[i], handler);
    }
};
// function to update the prices in our Prices collection.

const update = (cb) => {
    Price.remove({}, (err, remove) => {
        if (err) {
            return logger.error(`Error removing items...`);
        }
        logger.info(`Removed all items... Beginning to update.`);
        _request(urls, (responses) => {
            let url, response, gameid;

            for (url in responses) {
                id = url.split('/')[5].split('?')[0];
                response = responses[url];

                if (response.err) {
                    logger.error(`Error in request to ${url}: ${err}`);
                    return;
                }

                if (response.body) {
                    logger.info(`Request to ${url} successful.`)
                    let jsonResult = {};
                    try {
                        jsonResult = JSON.parse(response.body);
                    } catch (e) {
                        logger.error(`Could not parse.`);
                    }

                    logger.info(`Response body for ${id} is ${Object.keys(jsonResult).length}.`);
                    let allItemsArray = Object.keys(jsonResult).map((key, index) => {
                        return {
                            itemid: id,
                            hash_name: key,
                            price: jsonResult[key]
                        }
                    });

                    Price.insertMany(allItemsArray).then(docs => {
                        logger.info(`Saved docs for ${id}`)
                    }, (e) => {
                        logger.error(`Error saving docs.`);
                    });

                }
            }
            if (cb && typeof cb == 'function') {
                cb();
            }
        })
    });

}
Run Code Online (Sandbox Code Playgroud)

正如你所看到的,为了避免迭代100k +文档,并分别更新它们中的每一个,我在开始时将它们全部删除,然后调用API给我这些项目的价格,并用于InsertMany插入所有这些项目进入我的价格收集.

此更新过程将每30分钟发生一次.

但我刚才意识到,如果有些用户想检查价格并且我Prices Collection目前是空的,因为它正在更新自己呢?

问题

那么我是否必须遍历所有这些以便不删除它?(请记住,每30分钟有许多文档要更新.)或者是否有其他解决方案?

这是我的Prices Collection外观的图片(有这样的100k文档,我只是想更新价格属性):

价格收集

更新:

我已经重新编写了我的update函数,现在它看起来像这样:

const update = (cb = null) => {
    Price.remove({}, (err, remove) => {
        if (err) {
            return logger.error(`Error removing items...`);
        }
        logger.info(`Removed all items... Beginning to update.`);
        _request(urls, (responses) => {
            let url, response, gameid;

            for (url in responses) {
                gameid = url.split('/')[5].split('?')[0];
                response = responses[url];

                if (response.err) {
                    logger.error(`Error in request to ${url}: ${err}`);
                    return;
                }

                if (response.body) {
                    logger.info(`Request to ${url} successful.`)
                    let jsonResult = {};
                    try {
                        jsonResult = JSON.parse(response.body);
                    } catch (e) {
                        logger.error(`Could not parse.`);
                    }

                    logger.info(`Response body for ${gameid} is ${Object.keys(jsonResult).length}.`);
                    let allItemsArray = Object.keys(jsonResult).map((key, index) => {
                        return {
                            game_id: gameid,
                            market_hash_name: key,
                            price: jsonResult[key]
                        }
                    });
                    let bulk = Price.collection.initializeUnorderedBulkOp();

                    allItemsArray.forEach(item => {
                        bulk.find({market_hash_name: item.market_hash_name})
                            .upsert().updateOne(item);
                    });
                    bulk.execute((err, bulkers) => {
                        if (err) {
                            return logger.error(`Error bulking: ${e}`);
                        }
                        logger.info(`Updated Items for ${gameid}`)
                    });

                    // Price.insertMany(allItemsArray).then(docs => {
                    //     logger.info(`Saved docs for ${gameid}`)
                    // }, (e) => {
                    //     logger.error(`Error saving docs.`);
                    // });

                }
            }
            if (cb && typeof cb == 'function') {
                cb();
            }
        })
    });

}
Run Code Online (Sandbox Code Playgroud)

现在注意批量变量(感谢@Rahul),但现在,该集合需要很长时间才能更新.我的处理器正在燃烧,它需要3分钟以上来更新60k +文档.我老实说感觉像以前的方法,虽然它可能会删除所有这些,然后重新插入它们,它也需要快10倍.

任何人?

geo*_*org 5

根据我的经验(每小时更新数百万个mongo文档),这是一个非常大的批量更新的现实方法:

  • 分别执行所有API调用,并将结果作为bson写入文件
  • 调用mongoimport该bson文件并将其导入新的空集合prices_new.Javascript,更不用说高级OO包装器,对它来说太慢了
  • 重命名prices_new- > prices dropTarget=true(这将是原子的,因此没有停机时间)

原理上,它在JS中看起来像这样

let fname = '/tmp/data.bson';
let apiUrls = [...];

async function doRequest(url) {
    // perform a request and return an array of records
}

let responses  = await Promise.all(apiUrls.map(doRequest));

// if the data too big to fit in memory, use streams instead of this:

let data = flatMap(responses, BSON.serialize).join('\n'));
await fs.writeFile(fname, data);

await child_process.exec(`mongoimport --collection prices_new --drop ${fname}`);

await db.prices_new.renameCollection('prices', true);
Run Code Online (Sandbox Code Playgroud)