我正在探索反应式编程和RxJava.这很有趣,但我遇到了一个无法找到答案的问题.我的基本问题:什么是一种反应性的方法来终止一个无限运行的Observable?我也欢迎有关我的代码的批评和反应性最佳实践.
作为练习,我正在编写一个日志文件尾实用程序.日志文件中的行流由a表示Observable<String>.为了BufferedReader继续阅读添加到文件中的文本,我忽略了通常的reader.readLine() == null终止检查,而是将其解释为意味着我的线程应该休眠并等待更多的记录器文本.
但是当我可以使用Ob终止Observer时takeUntil,我需要找到一种干净的方法来终止无限运行的文件观察器.我可以编写自己的terminateWatcher方法/字段,但这会打破Observable/Observer封装 - 我希望尽可能严格遵守反应范式.
这是Observable<String>代码:
public class FileWatcher implements OnSubscribeFunc<String> {
private Path path = . . .;
@Override
// The <? super String> generic is pointless but required by the compiler
public Subscription onSubscribe(Observer<? super String> observer) {
try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
String newLine = "";
while (!Thread.interrupted()) { // How do I terminate this reactively?
if ((newLine = reader.readLine()) …Run Code Online (Sandbox Code Playgroud)