相关疑难解决方法(0)

RxJava - 终止无限流

我正在探索反应式编程和RxJava.这很有趣,但我遇到了一个无法找到答案的问题.我的基本问题:什么是一种反应性的方法来终止一个无限运行的Observable?我也欢迎有关我的代码的批评和反应性最佳实践.

作为练习,我正在编写一个日志文件尾实用程序.日志文件中的行流由a表示Observable<String>.为了BufferedReader继续阅读添加到文件中的文本,我忽略了通常的reader.readLine() == null终止检查,而是将其解释为意味着我的线程应该休眠并等待更多的记录器文本.

但是当我可以使用Ob终止Observer时takeUntil,我需要找到一种干净的方法来终止无限运行的文件观察器.我可以编写自己的terminateWatcher方法/字段,但这会打破Observable/Observer封装 - 我希望尽可能严格遵守反应范式.

这是Observable<String>代码:

public class FileWatcher implements OnSubscribeFunc<String> {
    private Path path = . . .;

    @Override
    // The <? super String> generic is pointless but required by the compiler
    public Subscription onSubscribe(Observer<? super String> observer) {
        try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
            String newLine = "";
            while (!Thread.interrupted()) {  // How do I terminate this reactively?
                if ((newLine = reader.readLine()) …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming rx-java

10
推荐指数
1
解决办法
3503
查看次数

标签 统计

java ×1

reactive-programming ×1

rx-java ×1