在功能反应式编程的上下文中,"毛刺"的定义是什么?
我知道在一些FRP框架中可能会发生"故障",而在其他框架中则不会.例如,RX不是无干扰,而ReactFX是无干扰的[ 1 ].
有人可以给出一个非常简单的例子,演示如何以及何时使用RX时出现毛刺,并在同一示例中显示相应的ReactFX解决方案如何以及为何无故障.
谢谢阅读.
terminology reactive-programming system.reactive rx-java reactfx
我正在寻求一个流程的例子,我试图在RxJava的帮助下实现.
假设我想显示一个数据列表.流程应如下所示:
向服务器发送API请求:
如果它返回了数据,则将其缓存并显示.
如果它返回并且错误且没有缓存数据,则显示错误.
如果它返回并且错误并且有缓存的内容,那么什么都不做.
现在我有一种方法可以做类似的事情(从Jake的u2020获得很多灵感).主要区别在于它使用内存缓存,这意味着不需要单独Observable从缓存中读取,它可以同步完成.
我不知道如何组合两个observable(一个用于从缓存读取而另一个用于API调用)并获得上述流程.
有什么建议?
我试图onError()在项目中找到遗漏.这意味着应用程序崩溃,因为订阅不处理throwables所以我想找到该订阅并添加onError方法.
不幸的是,堆栈跟踪在这里并没有真正的帮助,它只显示了行,throw new IOException但仅此而已:
FATAL EXCEPTION: main
Process: my.app.example.dev, PID: 20309
java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:54)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:135)
at android.app.ActivityThread.main(ActivityThread.java:5221)
at java.lang.reflect.Method.invoke(Native Method)
at java.lang.reflect.Method.invoke(Method.java:372)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:899)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:694)
Caused by: rx.exceptions.OnErrorFailedException: Error occurred when trying to propagate error to Observer.onError
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:201)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:111)
at rx.android.app.OperatorConditionalBinding$1.onError(OperatorConditionalBinding.java:69)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:147)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:177)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.access$000(OperatorObserveOn.java:65)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:153)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:135)
at android.app.ActivityThread.main(ActivityThread.java:5221)
at java.lang.reflect.Method.invoke(Native Method) …Run Code Online (Sandbox Code Playgroud) 我连续两个observable来显示缓存中的数据,然后开始从网络加载数据并显示更新的数据.
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler),
getContentFromNetwork.subscibeOn(networkScheduler)
).observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
Run Code Online (Sandbox Code Playgroud)
如果没有网络连接,则在调用OnSubscribe后第二个observable会立即失败.
如果第二个observable立即失败,则第一个observable的数据将丢失.永远不会在订阅者中调用onNext方法.
我想,这可能是由于OperatorConcat.ConcatSubscriber中的以下代码造成的
@Override
public void onNext(Observable<? extends T> t) {
queue.add(nl.next(t));
if (WIP_UPDATER.getAndIncrement(this) == 0) {
subscribeNext();
}
}
@Override
public void onError(Throwable e) {
child.onError(e);
unsubscribe();
}
Run Code Online (Sandbox Code Playgroud)
看起来在收到错误后它取消订阅,并且所有挂起的onNext都将丢失.
解决问题的最佳方法是什么?
更新
看起来我找到了解决方案,而不是为连接的observable设置observOn我为每个observable设置了observOn.
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler).observeOn(AndroidSchedulers.mainThread()),
getContentFromNetwork.subscibeOn(networkScheduler).observeOn(AndroidSchedulers.mainThread())
)
.subscribe(subscriber);
Run Code Online (Sandbox Code Playgroud) 为什么RxJava需要asObservable?
从技术上讲,每个主题已经是一个Observable.它的优点是什么,而不仅仅是铸造它
Observable obs = subject;
Run Code Online (Sandbox Code Playgroud) 在我的Android项目中,我使用realm作为我的数据存储引擎.我喜欢它!
我也使用RxJava,因为它使"线程化"变得如此简单,我真的很喜欢整个"反应性思维".我喜欢它!
我使用MVP模式+一些"清洁架构"的想法来构建我的应用程序.
我Interactors是唯一知道的人Realm.我在Observable的帮助下公开数据,如下所示:
@Override
public Observable<City> getHomeTown() {
final Realm realm = Realm.getDefaultInstance();
return realm.where(City.class).equalTo("name", "Cluj-Napoca").findAllAsync().asObservable()
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
realm.close();
}
})
.compose(new NullIfNoRealmObject<City>());
}
Run Code Online (Sandbox Code Playgroud)
问题是我doOnUnsubscribe的副作用被调用之前Realm可以做它的事情,处理暴露的observable:
Caused by: java.lang.IllegalStateException: This Realm instance has already been closed, making it unusable.
at io.realm.BaseRealm.checkIfValid(BaseRealm.java:344)
at io.realm.RealmResults.removeChangeListener(RealmResults.java:818)
at io.realm.rx.RealmObservableFactory$3$2.call(RealmObservableFactory.java:137)
at rx.subscriptions.BooleanSubscription.unsubscribe(BooleanSubscription.java:71)
at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124)
at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113)
at rx.Subscriber.unsubscribe(Subscriber.java:98)
at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124)
at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113)
at rx.Subscriber.unsubscribe(Subscriber.java:98)
at rx.subscriptions.CompositeSubscription.unsubscribeFromAll(CompositeSubscription.java:150)
at …Run Code Online (Sandbox Code Playgroud) 我在我的应用程序中使用Firebase以及RxJava.Firebase能够在后端数据中发生更改(添加,删除,更改等)时通知您的应用.我试图将Firebase的功能与RxJava结合起来.
调用我正在侦听的数据Leisure,以及包含a 和更新类型(添加,删除,移动,更改)的Observable发出.LeisureUpdateLeisure
这是我的方法,允许订阅此事件.
private Observable<LeisureUpdate> leisureUpdatesObservable;
private ChildEventListener leisureUpdatesListener;
private int leisureUpdatesSubscriptionsCount;
@NonNull
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() {
if (leisureUpdatesObservable == null) {
leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() {
@Override
public void call(final Subscriber<? super LeisureUpdate> subscriber) {
leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() {
@Override
public void onChildAdded(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED));
}
@Override
public void onChildChanged(DataSnapshot dataSnapshot, String s) {
final Leisure leisure …Run Code Online (Sandbox Code Playgroud) 我有兴趣了解回应物品点击回收者视图的最佳方法是什么.
通常我会向ViewHolder添加一个onclick()侦听器,并通过接口将结果传回活动/片段.
我想在onBindViewHolder中添加一个Observable,但我不想为每个项目绑定创建一个新的Observable.
首先,我将尝试解释我想要做什么,接下来你会看到我在做什么(代码).由于我是RxJava的新手,并且还在学习中随意给我你的意见.
所以,我从服务器调用网络API,当启动请求我调用loader(微调器)时,完成时我隐藏它,当我收到错误时也一样.我希望我的所有请求都是通用的,所以我从参数中得到了Observable和Observer.在这个方法上,我只关心隐藏和显示加载器.
OnError(这里是技巧部分),我试图显示一个对话框,但我得到了你可以在标题上看到的错误. 无法在未调用Looper.prepare()的线程内创建处理程序
这是代码..
protected void makeMyrequest(MyBaseActivity myBaseActivity, Observable observable, Observer observer) {
mSubscription = observable
.doOnRequest(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.d(TAG, "On request");
myBaseActivity.showLoader();
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
Log.d(TAG, "onCompleted: Hide spinner");
myBaseActivity.hideLoader();
mSubscription.unsubscribe();
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.d(TAG, "onError: Hide spinner");
myBaseActivity.showAlertDialog("error");
myBaseActivity.hideLoader();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
}
Run Code Online (Sandbox Code Playgroud)
在我的基础活动上,我有一个显示对话框的方法
public void showAlertDialog(String message) {
mDialog = new AlertDialog.Builder(this)
.setMessage(message) …Run Code Online (Sandbox Code Playgroud) 我正在寻找一种干净的方式来在线创建可破坏的对象.kotlin.Pair并kotlin.Triple涵盖了很多用例,但有时需要传递更多的对象.
一个示例用例是RX的zip函数,其中几个I/O调用的结果需要映射到另一个对象:
Single
.zip(repositoryA.loadData(someId),
repositoryB.loadData(someId),
repositoryC.loadAll(),
repositoryD.loadAll()),
{ objectA, objectB, objectsC, objectsD -> /*some Kotlin magic*/ }
)
.map { (objectA, objectB, objectsC, objectsD) -> /*do the mapping*/ }
Run Code Online (Sandbox Code Playgroud)
我想弄清楚"一些Kotlin魔法"部分会发生什么.如果只有3个存储库,那就是
Triple(objectA, objectB, objectsC)
Run Code Online (Sandbox Code Playgroud)
我是否需要为此创建一个新的数据类,以及任何n元组的情况,还是有另一种方法?
rx-java ×10
android ×6
rx-android ×2
firebase ×1
kotlin ×1
mvp ×1
reactfx ×1
realm ×1
terminology ×1