下的subjects包你有这样的类PublishSubject和BehaviorSubject我想可以被描述为一些可用的样品Observables.
如何取消订阅这些主题?没有unsubscribe方法和调用onCompleted完全结束了Observable吗?
目前,我正在使用带有Scala(和JavaFX)的EventBus/PubSub架构/模式来实现一个简单的笔记组织应用程序(有点像Evernote客户端,带有一些添加的思维导图功能),我不得不说我真的很喜欢EventBus观察者模式.
以下是一些EventBus库:
https://code.google.com/p/guava-libraries/wiki/EventBusExplained
http://eventbus.org(目前似乎已经失效)这是我在实施中使用的那个.
http://greenrobot.github.io/EventBus/
以下是EventBus库的比较:http://codeblock.engio.net/37/
EventBus与发布 - 订阅模式相关.
但是!
最近,我参加了Coursera的Reactive课程并开始怀疑使用RXJava而不是EventBus是否会在单线程应用程序中简化事件处理代码?
我想问一下关于使用这两种技术(某种eventbus库谁编程的人的经验和某种形式的反应扩展(RX)):是很容易解决的事件处理使用RX复杂性比与事件总线架构给出没有必要使用多个线程?
我问这个,因为我已经在听到无功讲座Coursera是RX(即没有"回调地狱")导致更清洁的代码比使用观察者模式,但是我没有找到VS EventBus架构之间的任何比较RXJava.所以很明显,EventBus和RXJava都比观察者模式更好,但 在代码清晰度和可维护性方面,它在单线程应用程序中更好?
如果我理解正确的话,RXJava的主要卖点是,如果存在阻塞操作(例如,等待来自服务器的响应),它可用于生成响应式应用程序.
但我根本不关心异步性,我所关心的只是在单线程应用程序中保持代码清洁,解开并易于推理.
在这种情况下,使用RXJava比使用EventBus更好吗?
我认为EventBus将是一个更简单,更清晰的解决方案,我认为没有任何理由可以将RXJava用于单线程应用程序,而采用简单的EventBus架构.
但我可能错了!
如果我错了,请纠正我,并解释为什么RXJava在单线程应用程序中没有执行阻塞操作的情况下比简单的EventBus更好.
event-handling reactive-programming system.reactive event-bus rx-java
我在Rx库中收到IllegalStateException错误,并且不知道问题根源的确切位置,无论是RxJava还是我可能做错了.
证书锁定(发生在所有服务器请求上)但似乎指向会话超时或注销并重新登录时发生致命崩溃.Repro步骤(大约25%的时间发生)如下:登录,打开列表项 - 滚动一路结束 - 注销 - 重新登录 - 打开应用程序 - 关闭应用程序 - >崩溃!
任何人对如何防止这种情况有任何想法?我发现Observer.onError类似的问题在不一致的情况下触发
java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:62)
at android.os.Handler.handleCallback(Handler.java:615)
at android.os.Handler.dispatchMessage(Handler.java:92)
at android.os.Looper.loop(Looper.java:137)
at android.app.ActivityThread.main(ActivityThread.java:4867)
at java.lang.reflect.Method.invokeNative(Method.java)
at java.lang.reflect.Method.invoke(Method.java:511)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1007)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:774)
at dalvik.system.NativeStart.main(NativeStart.java)
Caused by: rx.exceptions.OnErrorFailedException: Error occurred when trying to propagate error to Observer.onError
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:201)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:111)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:159)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at android.os.Handler.handleCallback(Handler.java:615)
at android.os.Handler.dispatchMessage(Handler.java:92)
at android.os.Looper.loop(Looper.java:137)
at android.app.ActivityThread.main(ActivityThread.java:4867)
at java.lang.reflect.Method.invokeNative(Method.java)
at java.lang.reflect.Method.invoke(Method.java:511)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1007)
at …Run Code Online (Sandbox Code Playgroud) 在使用RxJava和Retrofit 2时,我正在尝试创建单元测试以覆盖我的应用程序何时收到特定响应.
我的问题是,使用Retrofit 2,我无法看到一个很好的方法来创建一个不使用反射的retrofit.Response对象.
@Test
public void testLogin_throwsLoginBadRequestExceptionWhen403Error() {
Request.Builder requestBuilder = new Request.Builder();
requestBuilder.get();
requestBuilder.url("http://localhost");
Response.Builder responseBuilder = new Response.Builder();
responseBuilder.code(403);
responseBuilder.protocol(Protocol.HTTP_1_1);
responseBuilder.body(ResponseBody.create(MediaType.parse("application/json"), "{\"key\":[\"somestuff\"]}"));
responseBuilder.request(requestBuilder.build());
retrofit.Response<LoginResponse> aResponse = null;
try {
Constructor<retrofit.Response> constructor= (Constructor<retrofit.Response>) retrofit.Response.class.getDeclaredConstructors()[0];
constructor.setAccessible(true);
aResponse = constructor.newInstance(responseBuilder.build(), null, null);
} catch (Exception ex) {
//reflection error
}
doReturn(Observable.just(aResponse)).when(mockLoginAPI).login(anyObject());
TestSubscriber testSubscriber = new TestSubscriber();
loginAPIService.login(loginRequest).subscribe(testSubscriber);
Throwable resultError = (Throwable) testSubscriber.getOnErrorEvents().get(0);
assertTrue(resultError instanceof LoginBadRequestException);
}
Run Code Online (Sandbox Code Playgroud)
我尝试过使用以下内容,但这会创建一个OkHttp响应与一个Retrofit响应.
Request.Builder requestBuilder = new Request.Builder();
requestBuilder.get();
requestBuilder.url("http://localhost");
Response.Builder responseBuilder = new Response.Builder();
responseBuilder.code(403); …Run Code Online (Sandbox Code Playgroud) 我是反应式编程的新手.所以当我从一个事件创建一个流时遇到问题,比如onClick,ontouch ......
任何人都可以帮我解决这个问题.
谢谢.
我需要一个Observable,例如提供一个系统时钟,它不需要在onNext()中传递任何东西.我找不到允许我这样做的签名.
当然,我可以使用任何对象,然后传递null,但这没有多大意义.所以我的问题是,是否有更好的方法来做到这一点.
Observable.create(new Observable.OnSubscribe<Anyobject>() { // use any object in the signature
@Override public void call(Subscriber<? super Anyobject> subscriber) {
subscriber.onNext(null); // then pass null
subscriber.onCompleted();
}
})
Run Code Online (Sandbox Code Playgroud) 这是我试图完成的图片.
--abca - BBB - 一个
分裂成
--a ----- a ------- a - >一个流
---- b ------ bbb --- - > b stream
------ c ---------- - > c stream
然后,能够
a.subscribe()
b.subscribe()
c.subscribe()
Run Code Online (Sandbox Code Playgroud)
到目前为止,我发现的所有内容都使用groupBy()拆分流,但随后将所有内容折叠回单个流并在同一个函数中处理它们.我想要做的是以不同的方式处理每个派生流.
我现在正在做的方式是做一堆过滤器.有一个更好的方法吗?
考虑以下用例:
我最终实现了自定义运算符,OperatorDebounceWithTime然后像这样使用它
.lift(new CustomOperatorDebounceWithTime<>(1, TimeUnit.SECONDS, Schedulers.computation()))
Run Code Online (Sandbox Code Playgroud)
CustomOperatorDebounceWithTime立即发送第一个项目,然后使用OperatorDebounceWithTime操作员的逻辑去除后期项目.
是否有更简单的方法来实现所描述的行为?让我们跳过compose运算符,但它没有解决问题.我正在寻找一种方法来实现这一点,而无需实现自定义运算符.
我只是在学习Rx-java和Rxandroid2,我只是混淆了SubscribeOn和ObserveOn之间的主要区别.
我试图将我的Handler方法替换为Rx java.我的要求
我想在5秒后调用方法getTransactionDetails().
这是我使用Handler的工作代码
new Handler().postDelayed(new Runnable() {
@Override
public void run() {
getTransactionDetails();
}
}, 5000);
Run Code Online (Sandbox Code Playgroud)
Rx java代码 - 它不起作用
Observable.empty().delay(5000, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(o -> getTransactionDetails())
.subscribe();
Run Code Online (Sandbox Code Playgroud)