标签: rx-java2

RxJava - 将列表和请求转换为地图

我有一个“父”对象列表,我想为每个对象获取一个“子”列表。我想要一个父子列表的地图。所以结果是

Map<Parent, List<Child>> result = new HashMap<>();
Run Code Online (Sandbox Code Playgroud)

我的示例父列表:

List<Parent> parents = new ArrayList<>();
parents.add(new Parent(1, "Parent1"));
parents.add(new Parent(2, "Parent2"));
parents.add(new Parent(3, "Parent3"));
parents.add(new Parent(4, "Parent4"));
parents.add(new Parent(5, "Parent5"));
Run Code Online (Sandbox Code Playgroud)

我想迭代它们,一一问孩子

 @GET("api/childs/{parentId}")
 Observable<Response<List<Child>>> getChilds(@Path("parentId") int parentId);
Run Code Online (Sandbox Code Playgroud)

什么是最好的 RX 结构?

谢谢你,罗伯特

java android rx-java rx-java2

2
推荐指数
1
解决办法
2382
查看次数

如何从 Runnable 创建 Observable?

有时我想触发 aRunnable作为我的Observable序列的一部分,但Runnable不报告进度。

我编写了一个简单的工厂,用于将Runnable对象包装成一个Observable

public static <T> Observable<T> fromRunnable(final Runnable action) {
    if (action == null) {
        throw new NullPointerException("action");
    }
    return Observable.fromPublisher(subscriber -> {
        try {
            action.run();
            subscriber.onComplete();
        } catch (final Throwable throwable) {
            subscriber.onError(throwable);
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

用法:

Observable.concat(
    someTask, 
    MoreObservables.fromRunnable(() -> {
        System.out.println("Done. ");
    }));
Run Code Online (Sandbox Code Playgroud)

但是 RxJava 2 已经提供了这个功能吗?

java rx-java rx-java2

2
推荐指数
1
解决办法
3176
查看次数

PublishSubject 在`onError()` 后停止发射

RxJava 查询

你好,

我有一个PublishSubject<Boolean> subject = PublishSubject.create();

我订阅了上述主题并在此之后进行 API 调用:

subject.observeOn(IOThread)
    .flatMap(boolean -> getSomethingFromServer())
    .observeOn(MainThread)
    .subscribe(something ->
        showSomethingOnView(),
        error -> showRetryView();
    )
Run Code Online (Sandbox Code Playgroud)

当出现类似UnknownHostException,的错误时SocketTimeoutException,我会显示一个重试按钮。单击重试按钮后,我将向PublishSubject().

subject.onNext(boolean Value);

但是在错误出现后,主题正在终止并且没有其他事件被转发。

在快速搜索中,我可以使用Notification<>包装器实现这一点,但还没有找到应用它的好方法。

这是我找到的两个链接:

此链接提到使用Notification.

此链接使用通知,但包装主题的初始类型,即Boolean在通知中。如何将收到的错误和响应包装getSomethingFromServer()到通知中。

我在这里做错了吗?

谢谢

java android rx-java rx-java2

2
推荐指数
1
解决办法
2861
查看次数

致命异常:触发处置时 RxCachedThreadScheduler-1。为什么?

我有以下 RxJava 2 代码(在 Kotlin 中),其中有一个 Observable

disposable = Observable.create<String>({
    subscriber ->
            try {
                Thread.sleep(2000)
                subscriber.onNext("Test")
                subscriber.onComplete()
            } catch (exception: Exception) {
                subscriber.onError(exception)
            }
}).subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe({ result -> Log.d("Test", "Completed $result") },
             { error -> Log.e("Test", "Completed ${error.message}") })
Run Code Online (Sandbox Code Playgroud)

当它静止时Thread.sleep(2000),我执行disposable?.dispose()调用,它会出错

FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: com.elyeproj.rxstate, PID: 10202
java.lang.InterruptedException 
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:371)
    at java.lang.Thread.sleep(Thread.java:313)
    at presenter.MainPresenter$loadData$1.subscribe(MainPresenter.kt:41)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
Run Code Online (Sandbox Code Playgroud)

我希望这dispose将有助于以静默方式取消操作,或者最多Log.e在订阅时捕获错误。但是,它只是按照上面的错误消息崩溃。

为什么异常逃脱了?是不是dispose假设在不崩溃的情况下静默取消整个操作?

java kotlin rx-java rx-java2

2
推荐指数
1
解决办法
3590
查看次数

了解可流动背压 rxjava2

我把这个虚拟例子放在一起,试图backpressure更好地理解:

Flowable.range(1, 100).onBackpressureDrop()
                      .subscribeOn(Schedulers.io())
                      .observeOn(AndroidSchedulers.mainThread())
                      .subscribeWith(object : DisposableSubscriber<Int>() {
                        override fun onStart() {
                          request(1)
                        }

                        override fun onComplete() {
                          Log.d(this@MainActivity::class.java.simpleName, "onComplete")
                        }

                        override fun onNext(t: Int?) {
                          Log.d(this@MainActivity::class.java.simpleName, t.toString())
                          Thread.sleep(1000)
                          request(1)
                        }

                        override fun onError(t: Throwable?) { //handle error}
                      })
Run Code Online (Sandbox Code Playgroud)

我有一个非常慢的Subscriber消耗数据的非常快的Flowable. 我正在指示 Flowable 到onBackPressureDrop(). 尽管如此,我的输出看起来像这样(从 1 到 100)

07-16 23:07:21.097 22389-22389 D: 1
07-16 23:07:22.100 22389-22389 D: 2
07-16 23:07:23.102 22389-22389 D: 3
07-16 23:07:24.104 22389-22389 D: ...
07-16 23:07:24.104 22389-22389 D: …
Run Code Online (Sandbox Code Playgroud)

rx-java2

2
推荐指数
1
解决办法
1264
查看次数

在 RxJava2 中使用背压将 Observable 转换为 Flowable

我正在观察 a 产生的线条NetworkResource,将其包裹在Observable.create. 这是代码,为简单起见,缺少 try/catch 和取消:

fun linesOf(resource: NetworkResource): Observable<String> =
        Observable.create { emitter ->
            while (!emitter.isDisposed) {
                val line = resource.readLine()
                Log.i(TAG, "Emitting: $line")
                emitter.onNext(line)
            }
        }
Run Code Online (Sandbox Code Playgroud)

问题是,后来我想将它变成一个Flowable使用observable.toFlowable(LATEST)添加背压的情况下,我的消费跟不上,但根据我怎么做,消费者停止项目128后接收项目。

A)这样一切正常:

val resource = ...
linesOf(resource)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .toFlowable(BackpressureStrategy.LATEST)
    .subscribe { Log.i(TAG, "Consuming: $it") }
Run Code Online (Sandbox Code Playgroud)

B)这里消费者在 128 件物品后被卡住(但发射仍在继续):

val resource = ...
linesOf(resource)
    .toFlowable(BackpressureStrategy.LATEST)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { Log.i(TAG, "Consuming: $it") } // <-- stops after 128
Run Code Online (Sandbox Code Playgroud)

在选项A) 中,一切正常,没有任何问题,我可以看到 …

java android kotlin rx-java rx-java2

2
推荐指数
1
解决办法
3947
查看次数

RxJava2,2 个 Observable/Flowable 订阅者,但 onNext 被任何一个调用

rxjava2 版本 2.1.5

试图理解 RxJava2 对一个 observable 的多个订阅。有一个简单的文件监视服务,用于跟踪目录中文件的创建、修改、删除。我添加了 2 个订阅者,并希望在两个订阅者上都打印事件。当我将文件复制到监视目录中时,我看到一个订阅者打印出事件。然后,当我删除文件时,我看到第二个订阅者打印出事件。我期待两个订阅者都打印事件。我在这里缺少什么?

import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;

import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.concurrent.TimeUnit;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

public class MyRxJava2DirWatcher {

    public Flowable<WatchEvent<?>> createFlowable(WatchService watcher, Path path) {

        return Flowable.create(subscriber -> {

            boolean error = false;
            WatchKey key;
            try {

                key = path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
            }
            catch (IOException e) {
                subscriber.onError(e);
                error = true;
            } …
Run Code Online (Sandbox Code Playgroud)

watchservice rx-java rx-java2

2
推荐指数
1
解决办法
1672
查看次数

为什么 RxJava2 doOnSubscribe 以混乱的顺序运行?

以下代码打印 1、2

Observable.just(1)
    .doOnSubscribe(d -> System.out.println(1))
    .doOnSubscribe(d -> System.out.println(2))
    .blockingSubscribe();
Run Code Online (Sandbox Code Playgroud)

这个打印 2, 1

Observable.just(1)
    .doOnSubscribe(d -> System.out.println(1))
    .subscribeOn(Schedulers.newThread())
    .doOnSubscribe(d -> System.out.println(2))
    .blockingSubscribe();
Run Code Online (Sandbox Code Playgroud)

在 RxJava1 中,这两个代码都打印“2, 1”,因为doOnSubscribe在下游订阅上游之前调用。

在 RxJava2 中,订阅是从上游到下游 ( Observer.onSubscribe),但doOnSubscribe仍会在订阅之前调用。于是混乱的秩序出现了。

即使我可以给出一个更混乱的情况:

Observable.just(1)
    .doOnSubscribe(d -> System.out.println(1))
    .doOnSubscribe(d -> System.out.println(2))
    .subscribeOn(Schedulers.newThread())
    .doOnSubscribe(d -> System.out.println(3))
    .doOnSubscribe(d -> System.out.println(4))
    .blockingSubscribe();
Run Code Online (Sandbox Code Playgroud)

它按照我的预期打印“3, 4, 1, 2”,但不是最预期的。

这是设计使然吗?如果是,有什么好处?

rx-java rx-java2

2
推荐指数
1
解决办法
2961
查看次数

RxJava2 将两个 Single 转换为 Completable

我有两个单曲,我想做的是将它们转换为 Completable

 final Single<Boolean> httpRequestOne = createHttpRequestOne();
 final Single<Boolean> httpRequestTwo = createHttpRequestTwo();
Run Code Online (Sandbox Code Playgroud)

如果两个单曲都返回 true,则 Completable 的结果应该是 onSuccess 否则它将是 onError。

我也希望它们并行运行,所以我认为 concat 在这里无济于事

final Flowable<Boolean> flowable = httpRequestOne.concatWith(httpRequestTwo);
Run Code Online (Sandbox Code Playgroud)

java android rx-java2

2
推荐指数
1
解决办法
1182
查看次数

flatMap()observable上的调度程序是否会影响外部observable上的调度程序?

这看起来有点像一个愚蠢的问题,我已经做了一些测试,看看它在实践中是如何工作的,但我希望看到它得到确认,如果可能的话,我想知道为什么就Observable而言合同及其实施,因为在我对Rx规则的理解中感觉像是一个令人讨厌的漏洞.此外,如果你能告诉我在哪里寻找这个,那么它将帮助我将来自己回答这些问题.

如果我使用以下Observable:

Observable.range(0, 3)
          .observeOn(schedulerA)
          .flatMap(i -> Observable.just(i)
                                  .observeOn(schedulerB)
                                  .map(j -> -j))
          .doOnNext(i -> System.out.println(String.format("Got %d", i)))
          .subscribe()
Run Code Online (Sandbox Code Playgroud)

那么操作员是否.doOnNext(i -> System.out.println(String.format("Got %d", i)))会执行schedulerA或者schedulerB是否存在正式或基于规范的原因?

谢谢.

rx-java rx-java2

2
推荐指数
1
解决办法
69
查看次数

标签 统计

rx-java2 ×10

rx-java ×8

java ×6

android ×4

kotlin ×2

watchservice ×1