Pau*_*l S 5 javascript automated-tests rxjs node.js-tape
我有一个rxjs观察者(真的是一个主题)永远地尾随文件,就像tail -f一样.例如,监控日志文件非常棒.
这种"永远"的行为对我的应用来说非常棒,但对于测试来说却很糟糕.目前我的应用程序工作,但我的测试永远挂起
我想强制观察者更改尽早完成,因为我的测试代码知道文件中应该有多少行.我该怎么做呢?
我尝试在我返回的Subject句柄上调用onCompleted但是在那时它基本上被强制转换为观察者并且你不能强制它关闭,错误是:
对象#没有方法'onCompleted'
这是源代码:
function ObserveTail(filename) {
source = new Rx.Subject();
if (fs.existsSync(filename) == false) {
console.error("file doesn't exist: " + filename);
}
var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);
tail.on("line", function(line) {
source.onNext(line);
});
tail.on('close', function(data) {
console.log("tail closed");
source.onCompleted();
});
tail.on('error', function(error) {
console.error(error);
});
this.source = source;
}
Run Code Online (Sandbox Code Playgroud)
这里的测试代码无法弄清楚如何强制永远结束(磁带样式测试).注意"ILLEGAL"行:
test('tailing a file works correctly', function(tid) {
var lines = 8;
var i = 0;
var filename = 'tape/tail.json';
var handle = new ObserveTail(filename);
touch(filename);
handle.source
.filter(function (x) {
try {
JSON.parse(x);
return true;
} catch (error) {
tid.pass("correctly caught illegal JSON");
return false;
}
})
.map(function(x) { return JSON.parse(x) })
.map(function(j) { return j.name })
.timeout(10000, "observer timed out")
.subscribe (
function(name) {
tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
i++;
if (i >= lines) {
handle.onCompleted(); // XXX ILLEGAL
}
},
function(err) {
console.error(err)
tid.fail("err leaked through to subscriber");
},
function() {
tid.end();
console.log("Completed");
}
);
})
Run Code Online (Sandbox Code Playgroud)
这听起来像你解决了你的问题,但对你原来的问题
我想强制观察者更改尽早完成,因为我的测试代码知道文件中应该有多少行.我该怎么做呢?
一般来说,Subject当你有更好的选择时,不鼓励使用s,因为它们往往是人们使用他们熟悉的编程风格的拐杖.而不是尝试使用Subject我会建议您考虑每个事件在Observable生命周期中的意义.
已经存在EventEmitter#on/off形式的包装Observable.fromEvent.它只在有侦听器时处理清理并保持订阅活动.因此ObserveTail可以重构成
function ObserveTail(filename) {
return Rx.Observable.create(function(observer) {
var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);
var line = Rx.Observable.fromEvent(tail, "line");
var close = Rx.Observable.fromEvent(tail, "close");
var error = Rx.Observable.fromEvent(tail, "error")
.flatMap(function(err) { return Rx.Observable.throw(err); });
//Only take events until close occurs and wrap in the error for good measure
//The latter two are terminal events in this case.
return line.takeUntil(close).merge(error).subscribe(observer);
});
}
Run Code Online (Sandbox Code Playgroud)
这比使用vanilla有几个好处Subjects,一个,你现在实际上会看到下游的错误,两个,当你完成它们时,这将处理事件的清理.
然后,这可以滚动到您的文件存在检查,而无需使用 readSync
//If it doesn't exist then we are done here
//You could also throw from the filter if you want an error tracked
var source = Rx.Observable.fromNodeCallback(fs.exists)(filename)
.filter(function(exists) { return exists; })
.flatMap(ObserveTail(filename));
Run Code Online (Sandbox Code Playgroud)
接下来,您可以通过使用flatMap来简化过滤器/地图/地图序列.
var result = source.flatMap(function(x) {
try {
return Rx.Observable.just(JSON.parse(x));
} catch (e) {
return Rx.Observable.empty();
}
},
//This allows you to map the result of the parsed value
function(x, json) {
return json.name;
})
.timeout(10000, "observer timed out");
Run Code Online (Sandbox Code Playgroud)
当流只向一个方向行进时,如何停止"发出信号"停止.我们实际上很少想让Observer直接与Observable通信,所以更好的模式是不实际"发出信号"停止,而是简单地取消订阅,Observable并将其留给Observable的行为以确定它应该从那里做什么.
基本上你Observer真的不应该关心你Observable而不是说"我在这里完成".
要做到这一点,您需要在停止时声明要达到的条件.
在这种情况下,因为您只是在测试用例中的一组数字后停止,您可以使用take取消订阅.因此,最终的订阅块看起来像:
result
//After lines is reached this will complete.
.take(lines)
.subscribe (
function(name) {
tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
},
function(err) {
console.error(err)
tid.fail("err leaked through to subscriber");
},
function() {
tid.end();
console.log("Completed");
}
);
Run Code Online (Sandbox Code Playgroud)
编辑1
正如评论中所指出的那样,在这个特定的api的情况下,没有真正的"关闭"事件,因为Tail本质上是一个无限的操作.从这个意义上说,它与鼠标事件处理程序没有什么不同,当人们停止收听时,我们将停止发送事件.所以你的块可能最终看起来像:
function ObserveTail(filename) {
return Rx.Observable.create(function(observer) {
var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);
var line = Rx.Observable.fromEvent(tail, "line");
var error = Rx.Observable.fromEvent(tail, "error")
.flatMap(function(err) { return Rx.Observable.throw(err); });
//Only take events until close occurs and wrap in the error for good measure
//The latter two are terminal events in this case.
return line
.finally(function() { tail.unwatch(); })
.merge(error).subscribe(observer);
}).share();
}
Run Code Online (Sandbox Code Playgroud)
添加finally和share运营商创建一个对象,当新用户到达时它将附加到尾部,并且只要至少有一个用户仍在收听,它就会保持连接.一旦完成所有订户,我们就可以安全地unwatch完成尾部.
| 归档时间: |
|
| 查看次数: |
2384 次 |
| 最近记录: |