标签: rx-java

在 RxJava 中将 Observable 转换为集合

我想收集集合中发出的每个项目。我是这样做的:

List found = new ArrayList();
controller.search(query)
          .doOnCompleted(new Action0() {
            @Override
            public void call() {
                //Do something with "found" list
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String item) {
                found.add(item);
            }
        });
Run Code Online (Sandbox Code Playgroud)

是否有 RxJava 内置方法来执行此操作?

rx-java

2
推荐指数
1
解决办法
4501
查看次数

如何使用 Retrofit 和 RxJa 处理两个 JSON 响应?

我正在向服务器发送 API 请求,我可以得到两个 JSON 响应:

  • 经过身份验证的用户的 JSON 响应
  • 未经身份验证的用户的 JSON 响应

经过身份验证的用户响应:

{"status":"ok","tekst":"8bc6c9cf-293f-11e5-9940-448a5b5dd2bd","requestID":9034}
Run Code Online (Sandbox Code Playgroud)

未经身份验证的用户响应:

{"authenticationError":"User authentication failed"}
Run Code Online (Sandbox Code Playgroud)

改造服务:

public interface LiveService {
    @GET("live.php")
    Observable<Response<LiveResponse>> getLive();
}
Run Code Online (Sandbox Code Playgroud)

这是对服务器的改造请求:

retrofit.create(LiveService.class)
            .getLive()
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Response<LiveResponse>>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {
                    liveView.onLiveError(e);
                }

                @Override
                public void onNext(Response<LiveResponse> response) {


                }
            });
Run Code Online (Sandbox Code Playgroud)

我如何处理 onNext 中的响应以检查我是否收到了经过身份验证的用户或未经身份验证的用户的 JSON 响应?

android rx-java rx-android retrofit2

2
推荐指数
1
解决办法
1227
查看次数

将布尔 observables 列表映射到输出,只要所有输入都为真就发出真

假设我有n Boolean Observables ( O1, O2,..., On) 发出标志,例如,复选框选择状态。

例如,对于n = 3,我们会有这样的东西(T=trueF= false):

O1    - T ------------------- F ----------------------- T ------

O2    - F ------------ T ---------------------------------------

O3    - T -------------------------- F -------- T --------------
Run Code Online (Sandbox Code Playgroud)

我想得到一个输出Boolean ObservableOout),true当所有最新的排放量是truefalse其他情况时发出。像这样的东西:

O1    - T ------------------- F ----------------------- T ------

O2    - F ------------ T ---------------------------------------

O3    - T -------------------------- F -------- T -------------- …
Run Code Online (Sandbox Code Playgroud)

reactive-programming rx-java

2
推荐指数
1
解决办法
453
查看次数

RXjava 为对象更新实现一个可观察的对象

如何将 observable 添加到 Integer 对象,以便我收到每个对象更新的通知?在这种情况下整数 = 0; 并且我希望每次添加示例 1 时都会收到通知。最初我只想记录事件。这是我到目前为止所拥有的,但我被卡住了。

代码:

Subscriber<Integer> num = new Subscriber<Integer>() {
        @Override
        public void onNext(Integer num) { Log.d("RX", num.toString()); }

        @Override
        public void onError(Throwable e) { Log.d("RX", "error"); }

        @Override
        public void onCompleted() { Log.d("RX", "completed"); }
    };

    Observable.just(number)
            /*.doOnNext(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    Log.d("RX", "Updated integer" + integer.toString());
                }
            })*/
            /*.flatMap(new Func1<Integer, Observable<?>>() {
                @Override
                public Observable<?> call(Integer integer) {
                    Log.d("RX", "Updated integer" + integer.toString());
                    return Observable.just(number);
                }
            })*/ …
Run Code Online (Sandbox Code Playgroud)

java rx-java

2
推荐指数
1
解决办法
2570
查看次数

可观察的惰性随机生成器

我需要将随机生成器编写为可观察的。Observer 会用下一个值长时间做一些事情。所以我想只在观察者需要时才发出新的随机数,或者可能有两个额外的数字,但不能更多。

怎么做?

PS 如果重要的话,我使用 Java。

rx-java reactivex

2
推荐指数
1
解决办法
1210
查看次数

RxJava - 将列表和请求转换为地图

我有一个“父”对象列表,我想为每个对象获取一个“子”列表。我想要一个父子列表的地图。所以结果是

Map<Parent, List<Child>> result = new HashMap<>();
Run Code Online (Sandbox Code Playgroud)

我的示例父列表:

List<Parent> parents = new ArrayList<>();
parents.add(new Parent(1, "Parent1"));
parents.add(new Parent(2, "Parent2"));
parents.add(new Parent(3, "Parent3"));
parents.add(new Parent(4, "Parent4"));
parents.add(new Parent(5, "Parent5"));
Run Code Online (Sandbox Code Playgroud)

我想迭代它们,一一问孩子

 @GET("api/childs/{parentId}")
 Observable<Response<List<Child>>> getChilds(@Path("parentId") int parentId);
Run Code Online (Sandbox Code Playgroud)

什么是最好的 RX 结构?

谢谢你,罗伯特

java android rx-java rx-java2

2
推荐指数
1
解决办法
2382
查看次数

如何从 Runnable 创建 Observable?

有时我想触发 aRunnable作为我的Observable序列的一部分,但Runnable不报告进度。

我编写了一个简单的工厂,用于将Runnable对象包装成一个Observable

public static <T> Observable<T> fromRunnable(final Runnable action) {
    if (action == null) {
        throw new NullPointerException("action");
    }
    return Observable.fromPublisher(subscriber -> {
        try {
            action.run();
            subscriber.onComplete();
        } catch (final Throwable throwable) {
            subscriber.onError(throwable);
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

用法:

Observable.concat(
    someTask, 
    MoreObservables.fromRunnable(() -> {
        System.out.println("Done. ");
    }));
Run Code Online (Sandbox Code Playgroud)

但是 RxJava 2 已经提供了这个功能吗?

java rx-java rx-java2

2
推荐指数
1
解决办法
3176
查看次数

PublishSubject 在`onError()` 后停止发射

RxJava 查询

你好,

我有一个PublishSubject<Boolean> subject = PublishSubject.create();

我订阅了上述主题并在此之后进行 API 调用:

subject.observeOn(IOThread)
    .flatMap(boolean -> getSomethingFromServer())
    .observeOn(MainThread)
    .subscribe(something ->
        showSomethingOnView(),
        error -> showRetryView();
    )
Run Code Online (Sandbox Code Playgroud)

当出现类似UnknownHostException,的错误时SocketTimeoutException,我会显示一个重试按钮。单击重试按钮后,我将向PublishSubject().

subject.onNext(boolean Value);

但是在错误出现后,主题正在终止并且没有其他事件被转发。

在快速搜索中,我可以使用Notification<>包装器实现这一点,但还没有找到应用它的好方法。

这是我找到的两个链接:

此链接提到使用Notification.

此链接使用通知,但包装主题的初始类型,即Boolean在通知中。如何将收到的错误和响应包装getSomethingFromServer()到通知中。

我在这里做错了吗?

谢谢

java android rx-java rx-java2

2
推荐指数
1
解决办法
2861
查看次数

致命异常:触发处置时 RxCachedThreadScheduler-1。为什么?

我有以下 RxJava 2 代码(在 Kotlin 中),其中有一个 Observable

disposable = Observable.create<String>({
    subscriber ->
            try {
                Thread.sleep(2000)
                subscriber.onNext("Test")
                subscriber.onComplete()
            } catch (exception: Exception) {
                subscriber.onError(exception)
            }
}).subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe({ result -> Log.d("Test", "Completed $result") },
             { error -> Log.e("Test", "Completed ${error.message}") })
Run Code Online (Sandbox Code Playgroud)

当它静止时Thread.sleep(2000),我执行disposable?.dispose()调用,它会出错

FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: com.elyeproj.rxstate, PID: 10202
java.lang.InterruptedException 
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:371)
    at java.lang.Thread.sleep(Thread.java:313)
    at presenter.MainPresenter$loadData$1.subscribe(MainPresenter.kt:41)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
Run Code Online (Sandbox Code Playgroud)

我希望这dispose将有助于以静默方式取消操作,或者最多Log.e在订阅时捕获错误。但是,它只是按照上面的错误消息崩溃。

为什么异常逃脱了?是不是dispose假设在不崩溃的情况下静默取消整个操作?

java kotlin rx-java rx-java2

2
推荐指数
1
解决办法
3590
查看次数

在 RxJava2 中使用背压将 Observable 转换为 Flowable

我正在观察 a 产生的线条NetworkResource,将其包裹在Observable.create. 这是代码,为简单起见,缺少 try/catch 和取消:

fun linesOf(resource: NetworkResource): Observable<String> =
        Observable.create { emitter ->
            while (!emitter.isDisposed) {
                val line = resource.readLine()
                Log.i(TAG, "Emitting: $line")
                emitter.onNext(line)
            }
        }
Run Code Online (Sandbox Code Playgroud)

问题是,后来我想将它变成一个Flowable使用observable.toFlowable(LATEST)添加背压的情况下,我的消费跟不上,但根据我怎么做,消费者停止项目128后接收项目。

A)这样一切正常:

val resource = ...
linesOf(resource)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .toFlowable(BackpressureStrategy.LATEST)
    .subscribe { Log.i(TAG, "Consuming: $it") }
Run Code Online (Sandbox Code Playgroud)

B)这里消费者在 128 件物品后被卡住(但发射仍在继续):

val resource = ...
linesOf(resource)
    .toFlowable(BackpressureStrategy.LATEST)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { Log.i(TAG, "Consuming: $it") } // <-- stops after 128
Run Code Online (Sandbox Code Playgroud)

在选项A) 中,一切正常,没有任何问题,我可以看到 …

java android kotlin rx-java rx-java2

2
推荐指数
1
解决办法
3947
查看次数