如何使用异步I/O将实时数据集写入磁盘?

jfr*_*d00 13 javascript asynchronous node.js promise raspberry-pi

我是在node.js中开发的新手(尽管在客户端javascript上比较有经验)并且在处理node.js中的异步操作时遇到了很多关于良好实践的问题.

我的具体问题(虽然我认为这是一个相当通用的主题)是我有一个node.js应用程序(在Raspberry Pi上运行),它将每10秒钟从几个温度探测器读取的数据记录到内存数据结构中.这很好用.数据随着时间的推移累积在内存中,并且当它累积并达到特定大小阈值时,数据会定期老化(仅保留最后N天的数据),以防止数据超出一定的大小.此温度数据用于控制其他一些设备.

然后,我有一个单独的间隔计时器,每隔一段时间就将这些数据写入磁盘(如果进程崩溃,则保留它).我使用的是异步的Node.js( fs.open(),fs.write()fs.close())磁盘IO将数据写入到磁盘中.

并且,由于磁盘IO的异步特性,我发现我正在尝试写入磁盘的数据结构可能会在我写入磁盘的过程中被修改.这可能是一件坏事.如果数据仅在写入磁盘时附加到数据结构,这实际上不会导致我写入数据的方式出现问题,但在某些情况下,可以在记录新数据时修改早期数据那真的会让我在写入磁盘的过程中完整无缺.

我可以想到我可以在我的代码中添加的各种丑陋的保护措施,例如:

  1. 切换到同步IO将数据写入磁盘(出于服务器响应的原因,实际上不希望这样做).
  2. 当我开始写入数据时设置一个标志,并且在设置该标志时不记录任何新数据(导致我在写入期间丢失数据记录).
  3. 选项2的更复杂版本,其中我设置了标志,当设置标志时,新数据进入一个单独的临时数据结构,当文件IO完成时,然后与真实数据合并(可行,但看起来很难看).
  4. 获取原始数据的快照副本,并花时间将该副本写入磁盘,因为知道没有其他人会修改副本.我不想这样做,因为数据集相对较大,而且我的内存环境有限(Raspberry PI).

所以,我的问题是当其他操作可能想要在异步IO期间修改该数据时,使用异步IO编写大型数据集的设计模式是什么?处理我的问题的通用方法是否比上面列出的具体解决方法更多?

Mik*_*e S 6

您的问题是数据同步.传统上这是通过锁/互斥锁来解决的,但是javascript/node实际上并没有像内置的那样.

那么,我们如何在节点中解决这个问题呢?我们使用队列.就个人而言,我使用异步模块中队列功能.

队列的工作方式是保留一个需要执行的任务列表,并按照它们添加到队列中的顺序执行这些任务,一旦上一个任务完成(类似于您的选项3).

队列动画

注意:异步模块的队列方法实际上可以同时运行多个任务(如上面的动画所示),但是,由于我们在这里讨论数据同步,我们不希望这样.幸运的是,我们可以告诉它一次只运行一个.

在您的特定情况下,您要做的是设置一个可以执行两种类型任务的队列:

  1. 修改您的数据结构
  2. 将数据结构写入磁盘

每当从温度探测器获取新数据时,将任务添加到队列中以使用该新数据修改数据结构.然后,每当间隔计时器触发时,将任务添加到将数据结构写入磁盘的队列中.

由于队列一次只运行一个任务,按照它们被添加到队列的顺序,它保证在您将数据写入磁盘时永远不会修改内存中的数据结构.

一个非常简单的实现可能看起来像:

var dataQueue = async.queue(function(task, callback) {
    if (task.type === "newData") {
        memoryStore.add(task.data); // modify your data structure however you do it now
        callback(); // let the queue know the task is done; you can pass an error here as usual if needed
    } else if (task.type === "writeData") {
        fs.writeFile(task.filename, JSON.stringify(memoryStore), function(err) {
            // error handling
            callback(err); // let the queue know the task is done
        })
    } else {
        callback(new Error("Unknown Task")); // just in case we get a task we don't know about
    }
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time

// call when you get new probe data
funcion addNewData(data) {
    dataQueue.push({task: "newData", data: data}, function(err) {
        // called when the task is complete; optional
    });
}

// write to disk every 5 minutes
setInterval(function() {
    dataQueue.push({task: "writeData", filename: "somefile.dat"}, function(err) {
        // called when the task is complete; optional
    });
}, 18000);
Run Code Online (Sandbox Code Playgroud)

另请注意,您现在可以异步地将数据添加到数据结构中.假设您添加一个新探测器,只要其值发生变化就会触发该事件.您可以addNewData(data)像对待现有探测器一样工作,而不用担心它与正在进行的修改或磁盘写入冲突(如果您开始写入数据库而不是内存数据存储,这真的会发挥作用).


更新:使用更优雅的实现bind()

这个想法是你bind()用来绑定一个函数的参数,然后推送bind()返回队列的新绑定函数.这样你就不需要将一些自定义对象推送到它必须解释的队列上; 你可以给它一个函数来调用,所有的设置都已经有正确的参数.唯一需要注意的是函数必须将回调作为其最后一个参数.

这应该允许您使用您拥有的所有现有函数(可能稍作修改),并在需要确保它们不同时运行时将它们推送到队列.

我把它扔在一起测试这个概念:

var async = require('async');

var dataQueue = async.queue(function(task, callback) {
    // task is just a function that takes a callback; call it
    task(callback); 
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time

function storeData(data, callback) {
    setTimeout(function() { // simulate async op
        console.log('store', data);
        callback(); // let the queue know the task is done
    }, 50);
}

function writeToDisk(filename, callback) {
    setTimeout(function() { // simulate async op
        console.log('write', filename);
        callback(); // let the queue know the task is done
    }, 250);
}

// store data every second
setInterval(function() {
    var data = {date: Date.now()}
    var boundStoreData = storeData.bind(null, data);
    dataQueue.push(boundStoreData, function(err) {
        console.log('store complete', data.date);
    })
}, 1000)

// write to disk every 2 seconds
setInterval(function() {
    var filename = Date.now() + ".dat"
    var boundWriteToDisk = writeToDisk.bind(null, filename);
    dataQueue.push(boundWriteToDisk, function(err) {
        console.log('write complete', filename);
    });
}, 2000);
Run Code Online (Sandbox Code Playgroud)


Ben*_*aum 6

首先 - 让我们展示一个实用的解决方案然后让我们深入了解它的工作原理和原因:

var chain = Promise.resolve(); // Create a resolved promise
var fs = Promise.promisifyAll(require("fs"));

chain = chain.then(function(){
    return fs.writeAsync(...); // A
});

// some time in the future
chain = chain.then(function(){
    return fs.writeAsync(...); // This will always execute after A is done
})
Run Code Online (Sandbox Code Playgroud)

既然你已经用promises标记了你的问题 - 值得一提的是,promises自己很好地解决了这个(相当复杂的)问题并且非常容易.

您的数据同步问题称为生产者消费者问题.有许多方法可以解决JavaScript中的同步问题 - 这篇由Q的KrisKowal撰写的最新文章很好地解读了这个主题.

输入:承诺

使用promises解决问题的最简单方法是通过单一承诺链接所有内容.我知道你对自己的承诺很有经验,但对于新读者来说,让我们回顾一下:

Promise是对序列本身概念的抽象.承诺是单一(读取离散)行动单位.链接承诺,就像;在某些语言中一样,注意到一个操作的结束和下一个操作的开始.JavaScript中的承诺抽象了两个主要内容 - 采取时间和特殊条件的行动概念.

这里有一个叫做monad的"更高"的抽象,而A +承诺不严格遵守monad法则(为了方便起见),有承诺的实现.承诺抽象出某种处理方式,其中monad抽象处理自身的概念,你可以说承诺是monad或者至少它们是monadic.

承诺一开始是悬而未决意味着它们代表了一个行动已经开始但尚未完成.在某些时候,他们可能会通过解决方案,在此期间他们定居在以下两种状态之一:

  • 如愿以偿 -指示该动作已成功完成.
  • 已拒绝 - 表示操作未成功完成.

一旦承诺得到解决,它就不能再改变其状态.就像你可以继续;下一行一样 - 你可以继续使用.then关键字将一个上一个操作链接到下一个.

解决生产者 - 消费者问题.

生产者/消费者问题的传统解决方案可以通过传统的并发结构(如Dijkstra的信号量)来实现.事实上,这种解决方案存在于承诺或简单的回调中,但我相信我们可以做类似的事情.

相反,我们将保持程序运行,并且每次都向其添加新操作.

var fsQueue = Promise.resolve(); // start a new chain

// one place
fsQueue = fsQueue.then(function(){ // assuming promisified fs here
    return fs.writeAsync(...); 
});

// some other place
fsQueue = fsQueue.then(function(){
    return fs.writeAsync(...);
});
Run Code Online (Sandbox Code Playgroud)

向队列添加操作可确保我们已经订购了同步,并且只有在先前的操作完成后才会执行操作.这是解决此问题的最简单的同步解决方案,需要将fs.asyncFunction调用包装.then到队列中.

另一种解决方案是使用类似于"监视器"的东西 - 我们可以通过包装fs来确保从内部访问是一致的:

var fs = B.promisifyAll(require("fs")); // bluebird promisified fs 
var syncFs = { // sync stands for synchronized, not synchronous
    queue: B.resolve();
    writeAsync = function(){
        var args = arguments
        return (queue = queue.then( // only execute later
            return fs.writeAsync.apply(fs,arguments);
        });
    } // promisify other used functions similarly
};
Run Code Online (Sandbox Code Playgroud)

哪个会生成fs动作的同步版本.也可以使用类似的东西自动化(未测试):

// assumes module is promisified and ignores nested functions
function synchronize(module){
    var ret = {}, queue = B.resolve();
    for(var fn in module){
        ret[fn] = function(){
            var args = arguments;
            queue = queue.then(function(){
                return module[fn].apply(module, args); 
            })
        };
    }
    ret.queue = queue; // expose the queue for handling errors
    return ret;
}
Run Code Online (Sandbox Code Playgroud)

哪个应该生成一个同步其所有操作的模块版本.请注意,我们可以获得额外的好处,即错误不会被抑制,文件系统也不会处于不一致状态,因为在导致操作不执行的错误得到处理之前,操作不会执行.

是不是那种类似于队列?

是! 通过为操作提供先进先出结构,队列可以执行非常相似的操作(您可以在另一个答案中看到).就像程序代码一样,按顺序执行.在我看来,承诺只是同一枚硬币的一个更强大的一面.

另一个答案还提供了通过队列的可行选项.

关于您建议的方法

切换到同步IO将数据写入磁盘(出于服务器响应的原因,实际上不希望这样做).

虽然我同意这是最简单的 - 链接所有动作​​的"监视器"方法,您需要在同一队列上同步,这非常相似.

当我开始写入数据时设置一个标志,并且在设置该标志时不记录任何新数据(导致我在写入期间丢失数据记录).

那个标志实际上是一个互斥体.如果你阻止(或产生并将动作放入队列中),当有人重试这样做时,你就会得到一个真正的互斥锁,它拥有"互斥保证".

重试那个标志,并保持一个下一个动作列表来保存标志实际上在信号量的实现中很常见 - 一个例子是在linux内核中.

选项2的更复杂版本,其中我设置了标志,当设置标志时,新数据进入一个单独的临时数据结构,当文件IO完成时,然后与真实数据合并(可行,但看起来很难看).获取原始数据的快照副本,并花时间将该副本写入磁盘,因为知道没有其他人会修改副本.我不想这样做,因为数据集相对较大,而且我的内存环境有限(Raspberry PI).

这些方法通常称为事务性RCU更新,它们实际上非常现代且在某些情况下非常快 - 例如"读者编写者问题"(与您的非常相似).最近对Linux内核的本机支持起了作用.在某些情况下这样做实际上既可行也有效,尽管在你的情况下过于复杂的事情有点像你建议的那样.

所以,总结一下

  • 这不是一个容易的问题,而是一个有趣的问题.
  • 幸运的是,promises很好地解决了它,它们的构建完全是为了通过抽象序列的概念来解决这类问题.

快乐的编码,Pi NodeJS项目听起来很棒.如果我能进一步澄清,请告诉我.