标签: rx-java

如何在 RxJava 中处理 dispose 而不会出现 InterruptedException

在下面的代码中,当dispose()被调用时,发射器线程被中断(InterruptedException被抛出 sleep 方法)。

    Observable<Integer> obs = Observable.create(emitter -> {
        for (int i = 0; i < 10; i++) {
            if (emitter.isDisposed()) {
                System.out.println("> exiting.");
                emitter.onComplete();
                return;
            }

            emitter.onNext(i);
            System.out.println("> calculation = " + i);


            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        emitter.onComplete();
    });

    Disposable disposable = obs
            .subscribeOn(Schedulers.computation())
            .subscribe(System.out::println);

    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    disposable.dispose();
Run Code Online (Sandbox Code Playgroud)

从调试会话中我看到中断源FutureTask在处理期间被取消。在那里,将对照运行线程检查dispose()正在调用的线程,如果不匹配,则发射器被中断。由于我使用了计算,所以线程有所不同Scheduler

有什么方法可以使处理不中断此类发射器,或者实际上应该如何处理?我在这种方法中看到的一个问题是,当我有一个可中断操作(此处通过 sleep 模拟)时,我希望在调用 之前正常完成该操作onComplete() …

java interrupted-exception rx-java rx-java2

2
推荐指数
1
解决办法
3202
查看次数

将 Rxjava Completable 转换为 Map

我需要在链中调用一个可完成的方法,完成后,它需要使用Map运算符
示例继续链

Single.just(someOperation())
     .flatMapCompletable{
         completableMethod()
    }
     .map{ // i need to continue here with map or some other operator
         doSomeOperation()
    }
Run Code Online (Sandbox Code Playgroud)

谁能帮我解决这个问题吗?

android rx-java rx-java2 rx-kotlin2

2
推荐指数
1
解决办法
909
查看次数

Mono.fromCallable 与 Mono.fromSupplier

我试图理解这两种方法之间的区别,但它们对我来说似乎很相似。甚至 javadoc 也几乎相同

java rx-java reactive

2
推荐指数
1
解决办法
6471
查看次数

如何对同一端点并行进行多个 API 调用并将所有响应合并到输出数组中?

目前在我的 Kotlin 代码中,我有一个用户数组,我执行一个 for 循环并对每个用户的信息发出 GET 请求,然后将他们的信息添加到 MutableList 中。每当有新的用户信息发布到列表中时,我都会观察该列表并更新我的用户界面。

但我想知道如何同时执行所有用户调用并等待最终结果并立即发布 UI?

kotlin rx-java retrofit kotlin-coroutines

2
推荐指数
1
解决办法
3469
查看次数

RxJava UndeliverableException how to handle onSuccess consumer exception?

The exception ended up not handled and was reported as a non-fatal in Crashlytics, hence fell off our radar User saw a blank screen which is not desirable A crash would have been better, for fail-fast is preferred

Ideally, I'd like to the onError consumer to be trigger, that's where I handle the error e.g. shows error UI for every stream

However,

fun main() {
    Single.just(listOf("efaewf"))
            .subscribe({
                println("result is ${it[1]}")
            }, {
                println("handling exception")
                it.printStackTrace()
            })
}
Run Code Online (Sandbox Code Playgroud)

shows that onError …

kotlin rx-java rx-java2

2
推荐指数
1
解决办法
2886
查看次数

反应式编程与基于线程的编程

我对这个概念很陌生,并且希望对这个主题有深入的了解。为了阐明我的观点,我想打个比方。

让我们以 Node JS 为例,它是单线程的,并使用事件循环提供快速 IO 操作。现在这是有道理的,因为它是单线程的并且不会被任何任务阻塞。

在使用 Reactor 学习 Java 反应式编程时。我遇到了这样的情况:当对象订阅并且发生一些延迟事件时,主线程被阻塞。然后我开始了解 subscribeOn.boundedElastic 的概念以及更多类似的管道。

我了解到他们正在尝试通过将这些订阅者移动到其他线程来使其异步。

但如果出现这样的情况那为什么是异步的呢?这不是基于线程的编程吗?如果我们试图实现 Node JS 的异步行为,那么根据我的观点,它应该在单线程中。

我的问题摘要是:

因此,由于两个原因,我不明白使用或调用反应式编程作为异步或函数式编程的事实

  1. 主线程被阻塞
  2. 我们可以管理线程并可以在另一个池中运行它。我们也可以定义可运行服务/可调用服务。

java multithreading reactive-programming rx-java spring-webflux

2
推荐指数
1
解决办法
4558
查看次数

2.6.0 版本后找不到 androidx.lifecycle.LiveDataReactiveStreams 类

我刚刚开始玩LiveData。我按照官方 Android文档导入不同的工件。一切都很好,直到我需要将其转换ObservableLiveData. 但LiveDataReactiveStreams没有找到。最终我减少了版本号,当我达到2.5.1时,我很惊讶它终于起作用了。

我做错什么了吗?还是谷歌方面有问题?

我找不到任何与此相关的报告问题。但我真的不知道我能做些什么不同的事情......

我尝试手动导入它。然后我用了android.arch.lifecycle.LiveDataReactiveStreams. 这确实有效,但我应该坚持下去,并将所有内容更改androidxandroid.arch不是真正的解决方案。

android rx-java android-livedata

2
推荐指数
1
解决办法
896
查看次数

RxJava .subscribeOn(Schedulers.newThread())问题

我在普通的JDK 8上.我有这个简单的RxJava示例:

Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word, Thread.currentThread().getName()))
//.subscribeOn(Schedulers.newThread())
.subscribe(word -> System.out.println(word));
Run Code Online (Sandbox Code Playgroud)

它会逐行打印出单词,与线程信息交织在一起,这对于所有下一次调用都是"主要的",如预期的那样.

但是,当我取消注释subscribeOn(Schedulers.newThread())通话时,根本不会打印任何内容.为什么不工作?我原以为它会为每个onNext()调用启动一个新线程并doOnNext()打印该线程的名称.现在,我没有看到任何内容,也适用于其他调度程序.

当我Thread.sleep(10000L)在main的末尾添加调用时,我可以看到输出,这表明RxJava使用的线程都是守护进程.是这样的吗?可以以某种方式更改,但使用自定义ThreadFactory或类似的概念,而不必实现自定义调度程序?

通过上述更改,线程名称始终是RxNewThreadScheduler-1,而newThread的文档说" {@link Thread}为每个工作单元创建新的调度程序".是不是应该为所有排放创建一个新线程?

java rx-java

1
推荐指数
1
解决办法
2739
查看次数

RXJava将函数应用于Observable中的每个元素并返回它的可观察性

我有一个改装服务,它返回一个Observable并在每次加载时保存它.

我的改造服务:

@GET("url")
Observable<Order> getOrder();
Run Code Online (Sandbox Code Playgroud)

功能:

return SERVICE.getOrder()
            .map(order -> {
                save(order);
                return order;
            });
Run Code Online (Sandbox Code Playgroud)

但是,如果你看到我只是保存并返回相同的对象,有没有更简单的方法?

android reactive-programming rx-java retrofit

1
推荐指数
1
解决办法
1490
查看次数

RxJava:丢弃物品?- 背压

我使用RxJava观察几个按钮上的点击.

这些订阅将在一个对象上调用不同的函数,这需要几毫秒.这些功能是同步的.

问题是,当按下太多按钮时,我会得到一个背压异常.对我来说有用的是丢弃几个输入(最好是oldes).RxJava可以吗?

java backpressure rx-java

1
推荐指数
1
解决办法
439
查看次数