标签: rx-java

为什么要考虑在RxJava中使用AndroidObservables

据我所知,AndroidObservable有助于确保:

  1. 订阅者始终在主线程上观察
  2. 当分离/活动分离/停止时,观察立即停止,并且不更新与框架相关的组件(如ui textviews等).

但是,为了确保上下文被释放(防止泄漏),我看到的大多数示例通常都说你必须做.unsubscribe onDestroyView/onDestroy,这实质上会停止订阅,并阻止订阅者接收这些更新.

所以我的问题是:

如果我通过.observeOn(AndroidSchedulers.mainThread()手动指示订阅应该在主线程上发生,那么使用AndroidObservables还有其他优势吗?

以下两种方法有什么不同吗?

_subscription1 = AndroidObservable.bindFragment(MyFragment.this, myCustomAwesomeObservable()) //
                           .subscribeOn(Schedulers.io()) //
                           .subscribe(...);


_subscription2 =  myCustomAwesomeObservable()
                           .subscribeOn(Schedulers.io()) //
                           .observeOn(AndroidSchedulers.mainThread()) //
                           .subscribe(...);


@Override
public void onDestroyView() {
    _subscription1.unsubscribe();
    _subscription2.unsubscribe();
    super.onDestroyView();
}
Run Code Online (Sandbox Code Playgroud)

android rx-java rx-android

17
推荐指数
1
解决办法
3031
查看次数

如何在RxJava中的自定义Observable中获得观察者取消订阅操作的通知

我正在尝试将一些基于侦听器模式的API包装到Observable中.我的代码大致如下.

def myObservable = Observable.create({ aSubscriber ->
    val listener = {event -> 
      aSubscriber.onNext(event);                
    }
    existingEventSource.addListener(listener)
})
Run Code Online (Sandbox Code Playgroud)

但是,当观察者调用subscription.unscribe()时,我希望我的observable立即从底层的existingEventSource中删除监听器.我怎么能实现这个目标?

observable rx-java

17
推荐指数
1
解决办法
7811
查看次数

如何用rx-java替换'if语句'以避免回调地狱?

我试图用rx-java替换我的代码.(这是非常小的代码.)

它完成了它的工作原理.

但我想知道......

  1. 这是一个很好的Rx风格吗?
  2. 如果不好,请说明不好的一点

下面是我的api处理代码.

之前

Random r = new Random();
boolean apiResult = r.nextBoolean(); // it represents api result. ex. {"result": true} or {"result": false}

if (apiResult == true) {
    // do something

    System.out.println("result:" + "success");
} else {
    // do something

    System.out.println("result:" + "failure");
}
Run Code Online (Sandbox Code Playgroud)

Random r = new Random();
Observable<Boolean> apiResultStream = Observable.create(new OnSubscribe<Boolean>() {
    @Override
    public void call(Subscriber<? super Boolean> subscriber) {
        // emit true or false
         subscriber.onNext(r.nextBoolean());
    }
}).cache(1);


// I used filter …
Run Code Online (Sandbox Code Playgroud)

java rx-java

17
推荐指数
1
解决办法
2万
查看次数

如何过滤RXJava中observable发出的重复值?

我有一组对象,我想要压制重复的项目.我知道Distinct运算符,但如果我没有弄错,它通过正确覆盖的哈希码方法比较项目.但是,如果我的hashcode为相同的对象返回不同的值,并且我想通过我自己设置相等性.distinct有2个重载方法 - 一个没有params,一个有Func1 param,我想我应该使用第二种方法,但是如何exaclty?

    .distinct(new Func1<ActivityManager.RunningServiceInfo, Object>() {
                        @Override
                        public Object call(ActivityManager.RunningServiceInfo runningServiceInfo) {
                            return null;
                        }
                    })
Run Code Online (Sandbox Code Playgroud)

java distinct rx-java

17
推荐指数
1
解决办法
1万
查看次数

在Android中使用RxJava排队任务

我正在开发具有后台数据同步功能的Android应用程序.我目前正在使用RxJava定期在服务器上发布一些数据.除此之外,我想为用户提供一个"强制同步"按钮,它会立即触发同步.我知道如何使用Observable.interval()以定期时间间隔推送数据,我知道如何使用它Observalbe.just()来推送那个被强制的数据,但是我想将它们排队,如果发生这种情况,那么在前一个仍然运行的情况下触发它.

所以让我们举个例子,当1min是自动同步的间隔时,让我们说同步持续40秒(我夸张这里只是为了更容易点).现在,如果有任何机会,用户在自动仍在运行时按下"强制"按钮(反之亦然 - 强制一个仍在运行时自动触发),我想将第二个同步请求排队第一个完成.

我画了这张图片,可能会有更多的视角:

在此输入图像描述

如您所见,自动触发(由某些人Observable.interval()),在同步过程中,用户按下"强制"按钮.现在我们要等待第一个请求完成,然后再次启动强制请求.有一次,当强制请求正在运行时,再次触发了新的自动请求,只是将其添加到队列中.从队列中完成最后一个之后,一切都停止了,然后稍后再次安排自动计划.

希望有人能指出我纠正操作员如何做到这一点.我已尝试过Observable.combineLatest(),但是队列列表在开始时被调度,当我向队列添加新的同步时,它在前一个操作完成时没有继续.

Darko,非常感谢任何帮助

android rx-java rx-android

17
推荐指数
1
解决办法
6338
查看次数

如何在RxJava中使用TestScheduler

我该如何使用RxJava TestScheduler?我来自.NET背景,但TestSchedulerRxJava似乎与.NET rx中的测试调度程序的工作方式不同.

这是我想测试的示例代码

Observable<Long> tick = Observable.interval(1, TimeUnit.SECONDS);
contactsRepository.find(index)
  .buffer(MAX_CONTACTS_FETCH)
  .zipWith(tick, new Func2<List<ContactDto>, Long, List<ContactDto>>() {
    @Override
    public List<ContactDto> call(List<ContactDto> contactList, Long aLong) {
      return contactList;
    }
  }).subscribe()
Run Code Online (Sandbox Code Playgroud)

我试过了:

subscribeOn(testScheduler)
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
testScheduler.triggerActions();
Run Code Online (Sandbox Code Playgroud)

没有运气.

rx-java

16
推荐指数
2
解决办法
8860
查看次数

如何将两个已排序的Observable合并为一个已排序的Observable?

鉴于:

Integer[] arr1 = {1, 5, 9, 17};
Integer[] arr2 = {1, 2, 3, 6, 7, 12, 15};
Observable<Integer> o1 = Observable.from(arr1);
Observable<Integer> o2 = Observable.from(arr2);
Run Code Online (Sandbox Code Playgroud)

如何获得包含的Observable 1, 1, 2, 3, 5, 6, 7, 9, 12, 15, 17

java system.reactive rx-java

16
推荐指数
1
解决办法
2706
查看次数

RxJava- cache()与replay()相同吗?

我想知道是否有一个cache()运营商可以缓存x个数量的排放,但也会在指定的时间间隔(例如1分钟)后使它们到期.我在寻找像......

Observable<ImmutableList<MyType>> cachedList = otherObservable
    .cache(1, 1, TimeUnit.MINUTES); 
Run Code Online (Sandbox Code Playgroud)

这将缓存一个项目,但会在一分钟后过期并清除缓存.

我做了一些研究,找到了重播算子.看起来它可以满足这种需求,但我有一些问题.为什么它很热并且需要连接?这是否与cache()运营商不同?我知道cache()模仿一个主题,但它不需要连接.

java reactive-programming rx-java

16
推荐指数
1
解决办法
8508
查看次数

如果重试时出现捕获错误:重试耗尽

Retry文档中,示例如下所示:

Observable.create((Subscriber<? super String> s) -> {
  System.out.println("subscribing");
  s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
  return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
      System.out.println("delay retry by " + i + " second(s)");
      return Observable.timer(i, TimeUnit.SECONDS);
  });
}).toBlocking().forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

但是如果重试用完,我该如何传播错误?

添加.doOnError(System.out::println) retryWhen条款不捕获错误.它甚至被排出了吗?

添加.doOnError(System.out::println) 之前always fails重试时显示所有重试次数.

reactive-programming rx-java

16
推荐指数
3
解决办法
5382
查看次数

使用Espresso和RxJava测试无尽的滚动RecyclerView

我有RecyclerView无尽的滚动.因此,当用户到达last - 2列表中的位置时,我呼叫服务器以获取更多数据,并且在通话过程中,我再添加一个项目 - 进度一项.

现在,我正在尝试使用Espresso编写体面的UI测试,它将检查无限滚动当前是否正在工作:

    @Test
    public void checkIfProgressShown() {
        InstaFeed feed = TestDataFactory.makeInstaFeed(20);
        InstaFeed oldFeed = TestDataFactory.makeInstaFeed(20);
               when(mockDataManager.getFeedItemsFromServer()).thenReturn(Observable.just(feed.getInstaItems()));
        when(mockDataManager.getOldFeedItemsFromServer()).thenReturn(Observable.just(oldFeed.getInstaItems())
                .delay(2, TimeUnit.SECONDS));

        instaActivityActivityTestRule.launchActivity(null);

        int position = 0;
        for (InstaItem item : feed.getInstaItems()) {
            onView(withId(R.id.recycler_view))
               .perform(RecyclerViewActions.scrollToPosition(position));
            onView(withText(item.getLocation().getName()))
                .check(matches(isDisplayed())); // Line of crash
            position++;
        }

        onView(withId(R.id.progress))
                .check(matches(isDisplayed()));
    }
Run Code Online (Sandbox Code Playgroud)

所以基本上说,我试图延迟Observable的响应,显示新的一批项目,以便Espresso可以向下滚动到最后一项并使我的progress项目可见.问题在于测试只是堆栈,从UI的角度来看它看起来像:

不要混淆,这里没有进度条 - 在设备上禁用动画,所以没关系.最后,小项是有进展的项目

60秒后它崩溃了 -

android.support.test.espresso.AppNotIdleException: Looped for 3019 iterations over 60 SECONDS. The following Idle Conditions failed .
at dalvik.system.VMStack.getThreadStackTrace(Native Method)
at java.lang.Thread.getStackTrace(Thread.java:580)
at android.support.test.espresso.base.DefaultFailureHandler.getUserFriendlyError(DefaultFailureHandler.java:82) …
Run Code Online (Sandbox Code Playgroud)

android automated-tests ui-testing rx-java android-espresso

16
推荐指数
1
解决办法
1351
查看次数