Dan*_* W. 13 reconnect mongodb node.js node-mongodb-native
我们读取了一个xml-stream包含大约500k元素的XML文件(使用),并将它们插入到MongoDB中,如下所示:
xml.on(`endElement: product`, writeDataToDb.bind(this, "product"));
Run Code Online (Sandbox Code Playgroud)
插入writeDataToDb(type, obj)如下:
collection.insertOne(obj, {w: 1, wtimeout: 15000}).catch((e) => { });
Run Code Online (Sandbox Code Playgroud)
现在当Mongo连接断开连接时,xml流仍然会读取并且控制台充满了错误消息(无法插入,断开连接,EPIPE损坏,......).
在文档中它说:
当你关闭mongod进程时,驱动程序停止处理操作并继续缓冲它们,因为bufferMaxEntries默认为-1意味着缓冲所有操作.
这个缓冲区实际上做了什么?
我们注意到当我们插入数据并关闭mongo服务器时,事情得到缓冲,然后我们将mongo服务器重新启动,本机驱动程序成功重新连接,节点恢复插入数据但缓存的文件(在mongo beeing offline期间)不插入再次.
所以我质疑这个缓冲区及其用途.
目标:
我们正在寻找最好的方式(根据15000milliseconds保持刀片在缓冲区中,直到蒙戈回来wtimeout),并让然后将缓冲的文件或利用xml.pause();和xml.resume()我们尝试没有成功.
基本上我们需要一些帮助来处理如何在没有数据丢失或中断的情况下处理断开连接.
我不太了解 Mongodb 驱动程序和这个条目缓冲区。也许它只保留特定场景下的数据。
因此,我将用一种可以适用于任何数据库的更通用的方法来回答这个问题。
总而言之,您有两个问题:
要解决第一个问题,您需要实现一个重试算法,以确保在放弃之前进行多次尝试。
要解决第二个问题,您需要对 xml 流实施反压。您可以使用pause方法、resume方法和输入缓冲区来做到这一点。
var Promise = require('bluebird');
var fs = require('fs');
var Xml = require('xml-stream');
var fileStream = fs.createReadStream('myFile.xml');
var xml = new Xml(fileStream);
// simple exponential retry algorithm based on promises
function exponentialRetry(task, initialDelay, maxDelay, maxRetry) {
var delay = initialDelay;
var retry = 0;
var closure = function() {
return task().catch(function(error) {
retry++;
if (retry > maxRetry) {
throw error
}
var promise = Promise.delay(delay).then(closure);
delay = Math.min(delay * 2, maxDelay);
return promise;
})
};
return closure();
}
var maxPressure = 100;
var currentPressure = 0;
var suspended = false;
var stopped = false;
var buffer = [];
// handle back pressure by storing incoming tasks in the buffer
// pause the xml stream as soon as we have enough tasks to work on
// resume it when the buffer is empty
function writeXmlDataWithBackPressure(product) {
// closure used to try to start a task
var tryStartTask = function() {
// if we have enough tasks running, pause the xml stream
if (!stopped && !suspended && currentPressure >= maxPressure) {
xml.pause();
suspended = true;
console.log("stream paused");
}
// if we have room to run tasks
if (currentPressure < maxPressure) {
// if we have a buffered task, start it
// if not, resume the xml stream
if (buffer.length > 0) {
buffer.shift()();
} else if (!stopped) {
try {
xml.resume();
suspended = false;
console.log("stream resumed");
} catch (e) {
// the only way to know if you've reached the end of the stream
// xml.on('end') can be triggered BEFORE all handlers are called
// probably a bug of xml-stream
stopped = true;
console.log("stream end");
}
}
}
};
// push the task to the buffer
buffer.push(function() {
currentPressure++;
// use exponential retry to ensure we will try this operation 100 times before giving up
exponentialRetry(function() {
return writeDataToDb(product)
}, 100, 2000, 100).finally(function() {
currentPressure--;
// a task has just finished, let's try to run a new one
tryStartTask();
});
});
// we've just buffered a task, let's try to run it
tryStartTask();
}
// write the product to database here :)
function writeDataToDb(product) {
// the following code is here to create random delays and random failures (just for testing)
var timeToWrite = Math.random() * 100;
var failure = Math.random() > 0.5;
return Promise.delay(timeToWrite).then(function() {
if (failure) {
throw new Error();
}
return null;
})
}
xml.on('endElement: product', writeXmlDataWithBackPressure);
Run Code Online (Sandbox Code Playgroud)
玩玩它,放一些东西console.log来了解它的行为方式。我希望这能帮助您解决您的问题:)
| 归档时间: |
|
| 查看次数: |
2249 次 |
| 最近记录: |