标签: rx-java

如何取消订阅PublishSubject和BehaviorSubject?

下的subjects包你有这样的类PublishSubjectBehaviorSubject我想可以被描述为一些可用的样品Observables.

如何取消订阅这些主题?没有unsubscribe方法和调用onCompleted完全结束了Observable吗?

java subject-observer rx-java

37
推荐指数
3
解决办法
4万
查看次数

EventBus/PubSub vs(反应式扩展)RX关于单线程应用程序中的代码清晰度

目前,我正在使用带有Scala(和JavaFX)的EventBus/PubSub架构/模式来实现一个简单的笔记组织应用程序(有点像Evernote客户端,带有一些添加的思维导图功能),我不得不说我真的很喜欢EventBus观察者模式.

以下是一些EventBus库:

https://code.google.com/p/guava-libraries/wiki/EventBusExplained

http://eventbus.org(目前似乎已经失效)这是我在实施中使用的那个.

http://greenrobot.github.io/EventBus/

以下是EventBus库的比较:http://codeblock.engio.net/37/

EventBus与发布 - 订阅模式相关.

但是!

最近,我参加了CourseraReactive课程并开始怀疑使用RXJava而不是EventBus是否会在单线程应用程序中简化事件处理代码?

我想问一下关于使用这两种技术(某种eventbus库谁编程的人的经验某种形式的反应扩展(RX)):是很容易解决的事件处理使用RX复杂性比与事件总线架构给出没有必要使用多个线程

我问这个,因为我已经在听到无功讲座CourseraRX(即没有"回调地狱")导致更清洁的代码比使用观察者模式,但是我没有找到VS EventBus架构之间的任何比较RXJava.所以很明显,EventBus和RXJava都比观察者模式更好,但 在代码清晰度和可维护性方面,它在单线程应用程序更好

如果我理解正确的话,RXJava的主要卖点是,如果存在阻塞操作(例如,等待来自服务器的响应),它可用于生成响应式应用程序.

但我根本不关心异步性,我所关心的只是在单线程应用程序中保持代码清洁,解开并易于推理.

在这种情况下,使用RXJava比使用EventBus更好吗?

我认为EventBus将是一个更简单,更清晰的解决方案,我认为没有任何理由可以将RXJava用于单线程应用程序,而采用简单的EventBus架构.

但我可能错了!

如果我错了,请纠正我,并解释为什么RXJava在单线程应用程序中没有执行阻塞操作的情况下比简单的EventBus更好.

event-handling reactive-programming system.reactive event-bus rx-java

37
推荐指数
2
解决办法
1万
查看次数

RxJava:尝试将错误传播到Observer.onError时发生错误

我在Rx库中收到IllegalStateException错误,并且不知道问题根源的确切位置,无论是RxJava还是我可能做错了.

证书锁定(发生在所有服务器请求上)但似乎指向会话超时或注销并重新登录时发生致命崩溃.Repro步骤(大约25%的时间发生)如下:登录,打开列表项 - 滚动一路结束 - 注销 - 重新登录 - 打开应用程序 - 关闭应用程序 - >崩溃!

任何人对如何防止这种情况有任何想法?我发现Observer.onError类似的问题在不一致的情况下触发

java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
   at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:62)
   at android.os.Handler.handleCallback(Handler.java:615)
   at android.os.Handler.dispatchMessage(Handler.java:92)
   at android.os.Looper.loop(Looper.java:137)
   at android.app.ActivityThread.main(ActivityThread.java:4867)
   at java.lang.reflect.Method.invokeNative(Method.java)
   at java.lang.reflect.Method.invoke(Method.java:511)
   at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1007)
   at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:774)
   at dalvik.system.NativeStart.main(NativeStart.java)
Caused by: rx.exceptions.OnErrorFailedException: Error occurred when trying to propagate error to Observer.onError
   at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:201)
   at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:111)
   at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:159)
   at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
   at android.os.Handler.handleCallback(Handler.java:615)
   at android.os.Handler.dispatchMessage(Handler.java:92)
   at android.os.Looper.loop(Looper.java:137)
   at android.app.ActivityThread.main(ActivityThread.java:4867)
   at java.lang.reflect.Method.invokeNative(Method.java)
   at java.lang.reflect.Method.invoke(Method.java:511)
   at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1007)
   at …
Run Code Online (Sandbox Code Playgroud)

android exception onerror rx-java rx-android

37
推荐指数
2
解决办法
2万
查看次数

如何使用Retrofit 2在单元测试期间创建retrofit.Response对象

在使用RxJava和Retrofit 2时,我正在尝试创建单元测试以覆盖我的应用程序何时收到特定响应.

我的问题是,使用Retrofit 2,我无法看到一个很好的方法来创建一个不使用反射的retrofit.Response对象.

@Test
public void testLogin_throwsLoginBadRequestExceptionWhen403Error() {


    Request.Builder requestBuilder = new Request.Builder();
    requestBuilder.get();
    requestBuilder.url("http://localhost");

    Response.Builder responseBuilder = new Response.Builder();
    responseBuilder.code(403);
    responseBuilder.protocol(Protocol.HTTP_1_1);
    responseBuilder.body(ResponseBody.create(MediaType.parse("application/json"), "{\"key\":[\"somestuff\"]}"));
    responseBuilder.request(requestBuilder.build());

    retrofit.Response<LoginResponse> aResponse = null;

    try {
        Constructor<retrofit.Response> constructor= (Constructor<retrofit.Response>) retrofit.Response.class.getDeclaredConstructors()[0];
        constructor.setAccessible(true);
        aResponse = constructor.newInstance(responseBuilder.build(), null, null);
    } catch (Exception ex) {
        //reflection error
    }

    doReturn(Observable.just(aResponse)).when(mockLoginAPI).login(anyObject());

    TestSubscriber testSubscriber = new TestSubscriber();
    loginAPIService.login(loginRequest).subscribe(testSubscriber);

    Throwable resultError = (Throwable) testSubscriber.getOnErrorEvents().get(0);
    assertTrue(resultError instanceof LoginBadRequestException);
}
Run Code Online (Sandbox Code Playgroud)

我尝试过使用以下内容,但这会创建一个OkHttp响应与一个Retrofit响应.

    Request.Builder requestBuilder = new Request.Builder();
    requestBuilder.get();
    requestBuilder.url("http://localhost");

    Response.Builder responseBuilder = new Response.Builder();
    responseBuilder.code(403); …
Run Code Online (Sandbox Code Playgroud)

android unit-testing mockito rx-java retrofit

37
推荐指数
3
解决办法
1万
查看次数

如何从OnClick事件Android创建一个Observable?

我是反应式编程的新手.所以当我从一个事件创建一个流时遇到问题,比如onClick,ontouch ......

任何人都可以帮我解决这个问题.

谢谢.

android reactive-programming rx-java

36
推荐指数
4
解决办法
3万
查看次数

在onNext()中没有传递任何内容的Observable

我需要一个Observable,例如提供一个系统时钟,它不需要在onNext()中传递任何东西.我找不到允许我这样做的签名.

当然,我可以使用任何对象,然后传递null,但这没有多大意义.所以我的问题是,是否有更好的方法来做到这一点.

Observable.create(new Observable.OnSubscribe<Anyobject>() { // use any object in the signature

            @Override public void call(Subscriber<? super Anyobject> subscriber) {

                    subscriber.onNext(null); // then pass null
                    subscriber.onCompleted();

            }

        })
Run Code Online (Sandbox Code Playgroud)

java rx-java

36
推荐指数
3
解决办法
4万
查看次数

将Rx Observable拆分为多个流并单独处理

这是我试图完成的图片.

--abca - BBB - 一个

分裂成

--a ----- a ------- a - >一个流

---- b ------ bbb --- - > b stream

------ c ---------- - > c stream

然后,能够

a.subscribe()
b.subscribe()
c.subscribe()
Run Code Online (Sandbox Code Playgroud)

到目前为止,我发现的所有内容都使用groupBy()拆分流,但随后将所有内容折叠回单个流并在同一个函数中处理它们.我想要做的是以不同的方式处理每个派生流.

我现在正在做的方式是做一堆过滤器.有一个更好的方法吗?

reactive-programming rxjs rx-java

35
推荐指数
2
解决办法
2万
查看次数

立即发送第一个项目,"debounce"以下项目

考虑以下用例:

  • 需要尽快交付第一件物品
  • 需要在1秒超时后对事件进行去抖动

我最终实现了自定义运算符,OperatorDebounceWithTime然后像这样使用它

.lift(new CustomOperatorDebounceWithTime<>(1, TimeUnit.SECONDS, Schedulers.computation()))
Run Code Online (Sandbox Code Playgroud)

CustomOperatorDebounceWithTime立即发送第一个项目,然后使用OperatorDebounceWithTime操作员的逻辑去除后期项目.

是否有更简单的方法来实现所描述的行为?让我们跳过compose运算符,但它没有解决问题.我正在寻找一种方法来实现这一点,而无需实现自定义运算符.

rx-java rx-android

35
推荐指数
6
解决办法
1万
查看次数

Rxandroid SubscribeOn和ObserveOn之间的区别是什么

我只是在学习Rx-java和Rxandroid2,我只是混淆了SubscribeOn和ObserveOn之间的主要区别.

java android rx-java rx-android rx-java2

34
推荐指数
4
解决办法
6510
查看次数

如何在使用rxjava的android延迟后调用一个方法?

我试图将我的Handler方法替换为Rx java.我的要求

我想在5秒后调用方法getTransactionDetails().

这是我使用Handler的工作代码

new Handler().postDelayed(new Runnable() {
      @Override
      public void run() {
        getTransactionDetails();
      }
    }, 5000); 
Run Code Online (Sandbox Code Playgroud)

Rx java代码 - 它不起作用

Observable.empty().delay(5000, TimeUnit.MILLISECONDS)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnNext(o -> getTransactionDetails())
        .subscribe();
Run Code Online (Sandbox Code Playgroud)

android rx-java

33
推荐指数
5
解决办法
3万
查看次数