标签: rx-java

术语:功能反应编程/ RX中的"故障"是什么?

在功能反应式编程的上下文中,"毛刺"的定义是什么?

我知道在一些FRP框架中可能会发生"故障",而在其他框架中则不会.例如,RX不是无干扰,而ReactFX是无干扰的[ 1 ].

有人可以给出一个非常简单的例子,演示如何以及何时使用RX时出现毛刺,并在同一示例中显示相应的ReactFX解决方案如何以及为何无故障.

谢谢阅读.

terminology reactive-programming system.reactive rx-java reactfx

13
推荐指数
1
解决办法
1807
查看次数

RxJava网络请求和缓存

我正在寻求一个流程的例子,我试图在RxJava的帮助下实现.

假设我想显示一个数据列表.流程应如下所示:

  1. 读缓存.如果它包含数据,请显示;
  2. 向服务器发送API请求:

    如果它返回了数据,则将其缓存并显示.

    如果它返回并且错误且没有缓存数据,则显示错误.

    如果它返回并且错误并且有缓存的内容,那么什么都不做.

现在我有一种方法可以做类似的事情(从Jake的u2020获得很多灵感).主要区别在于它使用内存缓存,这意味着不需要单独Observable从缓存中读取,它可以同步完成.

我不知道如何组合两个observable(一个用于从缓存读取而另一个用于API调用)并获得上述流程.

有什么建议?

android rx-java

13
推荐指数
1
解决办法
6195
查看次数

在项目中找到缺少的onError

我试图onError()在项目中找到遗漏.这意味着应用程序崩溃,因为订阅不处理throwables所以我想找到该订阅并添加onError方法.

不幸的是,堆栈跟踪在这里并没有真正的帮助,它只显示了行,throw new IOException但仅此而已:

 FATAL EXCEPTION: main
    Process: my.app.example.dev, PID: 20309
    java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
            at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:54)
            at android.os.Handler.handleCallback(Handler.java:739)
            at android.os.Handler.dispatchMessage(Handler.java:95)
            at android.os.Looper.loop(Looper.java:135)
            at android.app.ActivityThread.main(ActivityThread.java:5221)
            at java.lang.reflect.Method.invoke(Native Method)
            at java.lang.reflect.Method.invoke(Method.java:372)
            at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:899)
            at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:694)
     Caused by: rx.exceptions.OnErrorFailedException: Error occurred when trying to propagate error to Observer.onError
            at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:201)
            at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:111)
            at rx.android.app.OperatorConditionalBinding$1.onError(OperatorConditionalBinding.java:69)
            at rx.internal.operators.NotificationLite.accept(NotificationLite.java:147)
            at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:177)
            at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.access$000(OperatorObserveOn.java:65)
            at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:153)
            at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
            at android.os.Handler.handleCallback(Handler.java:739)
            at android.os.Handler.dispatchMessage(Handler.java:95)
            at android.os.Looper.loop(Looper.java:135)
            at android.app.ActivityThread.main(ActivityThread.java:5221)
            at java.lang.reflect.Method.invoke(Native Method) …
Run Code Online (Sandbox Code Playgroud)

android rx-java

13
推荐指数
1
解决办法
5395
查看次数

如果第二个立即失败,ReactiveX concat不会从第一个observable产生onNext

我连续两个observable来显示缓存中的数据,然后开始从网络加载数据并显示更新的数据.

Observable.concat(
    getContentFromCache.subscribeOn(dbScheduler),
    getContentFromNetwork.subscibeOn(networkScheduler)
).observeOn(AndroidSchedulers.mainThread())
 .subscribe(subscriber);
Run Code Online (Sandbox Code Playgroud)

如果没有网络连接,则在调用OnSubscribe后第二个observable会立即失败.

如果第二个observable立即失败,则第一个observable的数据将丢失.永远不会在订阅者中调用onNext方法.

我想,这可能是由于OperatorConcat.ConcatSubscriber中的以下代码造成的

    @Override
    public void onNext(Observable<? extends T> t) {
        queue.add(nl.next(t));
        if (WIP_UPDATER.getAndIncrement(this) == 0) {
            subscribeNext();
        }
    }

    @Override
    public void onError(Throwable e) {
        child.onError(e);
        unsubscribe();
    }
Run Code Online (Sandbox Code Playgroud)

看起来在收到错误后它取消订阅,并且所有挂起的onNext都将丢失.

解决问题的最佳方法是什么?

更新

看起来我找到了解决方案,而不是为连接的observable设置observOn我为每个observable设置了observOn.

Observable.concat(
    getContentFromCache.subscribeOn(dbScheduler).observeOn(AndroidSchedulers.mainThread()),
    getContentFromNetwork.subscibeOn(networkScheduler).observeOn(AndroidSchedulers.mainThread())
)
 .subscribe(subscriber);
Run Code Online (Sandbox Code Playgroud)

rx-java

13
推荐指数
1
解决办法
2490
查看次数

什么是Subject.asObservable有用?

为什么RxJava需要asObservable

从技术上讲,每个主题已经是一个Observable.它的优点是什么,而不仅仅是铸造它

Observable obs = subject;
Run Code Online (Sandbox Code Playgroud)

rx-java

13
推荐指数
1
解决办法
5178
查看次数

Realm,RxJava,asObservable()和doOnUnsubscribe()

在我的Android项目中,我使用realm作为我的数据存储引擎.我喜欢它!
我也使用RxJava,因为它使"线程化"变得如此简单,我真的很喜欢整个"反应性思维".我喜欢它!

我使用MVP模式+一些"清洁架构"的想法来构建我的应用程序.

Interactors是唯一知道的人Realm.我在Observable的帮助下公开数据,如下所示:

@Override
public Observable<City> getHomeTown() {
    final Realm realm = Realm.getDefaultInstance();
    return realm.where(City.class).equalTo("name", "Cluj-Napoca").findAllAsync().asObservable()
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    realm.close();
                }
            })
            .compose(new NullIfNoRealmObject<City>());
}
Run Code Online (Sandbox Code Playgroud)

问题是我doOnUnsubscribe的副作用被调用之前Realm可以做它的事情,处理暴露的observable:

Caused by: java.lang.IllegalStateException: This Realm instance has already been closed, making it unusable.
at io.realm.BaseRealm.checkIfValid(BaseRealm.java:344)
at io.realm.RealmResults.removeChangeListener(RealmResults.java:818)
at io.realm.rx.RealmObservableFactory$3$2.call(RealmObservableFactory.java:137)
at rx.subscriptions.BooleanSubscription.unsubscribe(BooleanSubscription.java:71)
at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124)
at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113)
at rx.Subscriber.unsubscribe(Subscriber.java:98)
at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124)
at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113)
at rx.Subscriber.unsubscribe(Subscriber.java:98)
at rx.subscriptions.CompositeSubscription.unsubscribeFromAll(CompositeSubscription.java:150)
at …
Run Code Online (Sandbox Code Playgroud)

mvp android realm rx-java

13
推荐指数
1
解决办法
3260
查看次数

将Firebase实时数据侦听器与RxJava结合使用

我在我的应用程序中使用Firebase以及RxJava.Firebase能够在后端数据中发生更改(添加,删除,更改等)时通知您的应用.我试图将Firebase的功能与RxJava结合起来.

调用我正在侦听的数据Leisure,以及包含a 和更新类型(添加,删除,移动,更改)的Observable发出.LeisureUpdateLeisure

这是我的方法,允许订阅此事件.

private Observable<LeisureUpdate> leisureUpdatesObservable;
private ChildEventListener leisureUpdatesListener;
private int leisureUpdatesSubscriptionsCount;

@NonNull
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() {
    if (leisureUpdatesObservable == null) {
        leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() {

            @Override
            public void call(final Subscriber<? super LeisureUpdate> subscriber) {
                leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() {
                    @Override
                    public void onChildAdded(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED));
                    }

                    @Override
                    public void onChildChanged(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure …
Run Code Online (Sandbox Code Playgroud)

android firebase rx-java

13
推荐指数
2
解决办法
1万
查看次数

如何使用RxJava处理回收站视图的项目点击

我有兴趣了解回应物品点击回收者视图的最佳方法是什么.

通常我会向ViewHolder添加一个onclick()侦听器,并通过接口将结果传回活动/片段.

我想在onBindViewHolder中添加一个Observable,但我不想为每个项目绑定创建一个新的Observable.

android rx-java rx-android android-recyclerview

13
推荐指数
1
解决办法
7277
查看次数

RxJava onError无法在未调用Looper.prepare()的线程内创建处理程序

首先,我将尝试解释我想要做什么,接下来你会看到我在做什么(代码).由于我是RxJava的新手,并且还在学习中随意给我你的意见.

所以,我从服务器调用网络API,当启动请求我调用loader(微调器)时,完成时我隐藏它,当我收到错误时也一样.我希望我的所有请求都是通用的,所以我从参数中得到了Observable和Observer.在这个方法上,我只关心隐藏和显示加载器.

OnError(这里是技巧部分),我试图显示一个对话框,但我得到了你可以在标题上看到的错误. 无法在未调用Looper.prepare()的线程内创建处理程序

这是代码..

protected void makeMyrequest(MyBaseActivity myBaseActivity, Observable observable, Observer observer) {

    mSubscription = observable
            .doOnRequest(new Action1<Long>() {
                @Override
                public void call(Long aLong) {

                    Log.d(TAG, "On request");
                    myBaseActivity.showLoader();
                }
            })
            .doOnCompleted(new Action0() {
                @Override
                public void call() {
                    Log.d(TAG, "onCompleted: Hide spinner");
                    myBaseActivity.hideLoader();
                    mSubscription.unsubscribe();
                }
            })
            .doOnError(new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {

                    Log.d(TAG, "onError: Hide spinner");
                        myBaseActivity.showAlertDialog("error");
                        myBaseActivity.hideLoader();

                                        }
            })
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);
}
Run Code Online (Sandbox Code Playgroud)

在我的基础活动上,我有一个显示对话框的方法

public void showAlertDialog(String message) {

    mDialog = new AlertDialog.Builder(this)
            .setMessage(message) …
Run Code Online (Sandbox Code Playgroud)

android rx-java rx-android

13
推荐指数
1
解决办法
6660
查看次数

用于解构的Kotlin四倍,五倍等

我正在寻找一种干净的方式来在线创建可破坏的对象.kotlin.Pairkotlin.Triple涵盖了很多用例,但有时需要传递更多的对象.

一个示例用例是RX的zip函数,其中几个I/O调用的结果需要映射到另一个对象:

Single
    .zip(repositoryA.loadData(someId),
         repositoryB.loadData(someId),
         repositoryC.loadAll(),
         repositoryD.loadAll()),
         { objectA, objectB, objectsC, objectsD -> /*some Kotlin magic*/ }
    )
    .map { (objectA, objectB, objectsC, objectsD) -> /*do the mapping*/ }
Run Code Online (Sandbox Code Playgroud)

我想弄清楚"一些Kotlin魔法"部分会发生什么.如果只有3个存储库,那就是

Triple(objectA, objectB, objectsC)
Run Code Online (Sandbox Code Playgroud)

我是否需要为此创建一个新的数据类,以及任何n元组的情况,还是有另一种方法?

kotlin rx-java

13
推荐指数
4
解决办法
4738
查看次数