fil*_*rak 2 javascript mongoose mongodb node.js promise
我有一个定期运行的函数,它更新了我的Collection中item.price的一些函数.将有10万个+项目.该函数如下所示:DocumentsPricesPrice Collection
//Just a helper function for multiple GET requests with request.
let _request = (urls, cb) => {
let results = {}, i = urls.length, c = 0;
handler = (err, response, body) => {
let url = response.request.uri.href;
results[url] = { err, response, body };
if (++c === urls.length) {
cb(results);
}
};
while (i--) {
request(urls[i], handler);
}
};
// function to update the prices in our Prices collection.
const update = (cb) => {
Price.remove({}, (err, remove) => {
if (err) {
return logger.error(`Error removing items...`);
}
logger.info(`Removed all items... Beginning to update.`);
_request(urls, (responses) => {
let url, response, gameid;
for (url in responses) {
id = url.split('/')[5].split('?')[0];
response = responses[url];
if (response.err) {
logger.error(`Error in request to ${url}: ${err}`);
return;
}
if (response.body) {
logger.info(`Request to ${url} successful.`)
let jsonResult = {};
try {
jsonResult = JSON.parse(response.body);
} catch (e) {
logger.error(`Could not parse.`);
}
logger.info(`Response body for ${id} is ${Object.keys(jsonResult).length}.`);
let allItemsArray = Object.keys(jsonResult).map((key, index) => {
return {
itemid: id,
hash_name: key,
price: jsonResult[key]
}
});
Price.insertMany(allItemsArray).then(docs => {
logger.info(`Saved docs for ${id}`)
}, (e) => {
logger.error(`Error saving docs.`);
});
}
}
if (cb && typeof cb == 'function') {
cb();
}
})
});
}
Run Code Online (Sandbox Code Playgroud)
正如你所看到的,为了避免迭代100k +文档,并分别更新它们中的每一个,我在开始时将它们全部删除,然后调用API给我这些项目的价格,并用于InsertMany插入所有这些项目进入我的价格收集.
此更新过程将每30分钟发生一次.
但我刚才意识到,如果有些用户想检查价格并且我Prices Collection目前是空的,因为它正在更新自己呢?
问题
那么我是否必须遍历所有这些以便不删除它?(请记住,每30分钟有许多文档要更新.)或者是否有其他解决方案?
这是我的Prices Collection外观的图片(有这样的100k文档,我只是想更新价格属性):
更新:
我已经重新编写了我的update函数,现在它看起来像这样:
const update = (cb = null) => {
Price.remove({}, (err, remove) => {
if (err) {
return logger.error(`Error removing items...`);
}
logger.info(`Removed all items... Beginning to update.`);
_request(urls, (responses) => {
let url, response, gameid;
for (url in responses) {
gameid = url.split('/')[5].split('?')[0];
response = responses[url];
if (response.err) {
logger.error(`Error in request to ${url}: ${err}`);
return;
}
if (response.body) {
logger.info(`Request to ${url} successful.`)
let jsonResult = {};
try {
jsonResult = JSON.parse(response.body);
} catch (e) {
logger.error(`Could not parse.`);
}
logger.info(`Response body for ${gameid} is ${Object.keys(jsonResult).length}.`);
let allItemsArray = Object.keys(jsonResult).map((key, index) => {
return {
game_id: gameid,
market_hash_name: key,
price: jsonResult[key]
}
});
let bulk = Price.collection.initializeUnorderedBulkOp();
allItemsArray.forEach(item => {
bulk.find({market_hash_name: item.market_hash_name})
.upsert().updateOne(item);
});
bulk.execute((err, bulkers) => {
if (err) {
return logger.error(`Error bulking: ${e}`);
}
logger.info(`Updated Items for ${gameid}`)
});
// Price.insertMany(allItemsArray).then(docs => {
// logger.info(`Saved docs for ${gameid}`)
// }, (e) => {
// logger.error(`Error saving docs.`);
// });
}
}
if (cb && typeof cb == 'function') {
cb();
}
})
});
}
Run Code Online (Sandbox Code Playgroud)
现在注意批量变量(感谢@Rahul),但现在,该集合需要很长时间才能更新.我的处理器正在燃烧,它需要3分钟以上来更新60k +文档.我老实说感觉像以前的方法,虽然它可能会删除所有这些,然后重新插入它们,它也需要快10倍.
任何人?
根据我的经验(每小时更新数百万个mongo文档),这是一个非常大的批量更新的现实方法:
mongoimport该bson文件并将其导入新的空集合prices_new.Javascript,更不用说高级OO包装器,对它来说太慢了prices_new- > prices dropTarget=true(这将是原子的,因此没有停机时间)原理上,它在JS中看起来像这样
let fname = '/tmp/data.bson';
let apiUrls = [...];
async function doRequest(url) {
// perform a request and return an array of records
}
let responses = await Promise.all(apiUrls.map(doRequest));
// if the data too big to fit in memory, use streams instead of this:
let data = flatMap(responses, BSON.serialize).join('\n'));
await fs.writeFile(fname, data);
await child_process.exec(`mongoimport --collection prices_new --drop ${fname}`);
await db.prices_new.renameCollection('prices', true);
Run Code Online (Sandbox Code Playgroud)