据我所知,AndroidObservable有助于确保:
但是,为了确保上下文被释放(防止泄漏),我看到的大多数示例通常都说你必须做.unsubscribe onDestroyView/onDestroy,这实质上会停止订阅,并阻止订阅者接收这些更新.
所以我的问题是:
如果我通过.observeOn(AndroidSchedulers.mainThread()手动指示订阅应该在主线程上发生,那么使用AndroidObservables还有其他优势吗?
以下两种方法有什么不同吗?
_subscription1 = AndroidObservable.bindFragment(MyFragment.this, myCustomAwesomeObservable()) //
.subscribeOn(Schedulers.io()) //
.subscribe(...);
_subscription2 = myCustomAwesomeObservable()
.subscribeOn(Schedulers.io()) //
.observeOn(AndroidSchedulers.mainThread()) //
.subscribe(...);
@Override
public void onDestroyView() {
_subscription1.unsubscribe();
_subscription2.unsubscribe();
super.onDestroyView();
}
Run Code Online (Sandbox Code Playgroud) 我正在尝试将一些基于侦听器模式的API包装到Observable中.我的代码大致如下.
def myObservable = Observable.create({ aSubscriber ->
val listener = {event ->
aSubscriber.onNext(event);
}
existingEventSource.addListener(listener)
})
Run Code Online (Sandbox Code Playgroud)
但是,当观察者调用subscription.unscribe()时,我希望我的observable立即从底层的existingEventSource中删除监听器.我怎么能实现这个目标?
我试图用rx-java替换我的代码.(这是非常小的代码.)
它完成了它的工作原理.
但我想知道......
下面是我的api处理代码.
之前
Random r = new Random();
boolean apiResult = r.nextBoolean(); // it represents api result. ex. {"result": true} or {"result": false}
if (apiResult == true) {
// do something
System.out.println("result:" + "success");
} else {
// do something
System.out.println("result:" + "failure");
}
Run Code Online (Sandbox Code Playgroud)
后
Random r = new Random();
Observable<Boolean> apiResultStream = Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> subscriber) {
// emit true or false
subscriber.onNext(r.nextBoolean());
}
}).cache(1);
// I used filter …Run Code Online (Sandbox Code Playgroud) 我有一组对象,我想要压制重复的项目.我知道Distinct运算符,但如果我没有弄错,它通过正确覆盖的哈希码方法比较项目.但是,如果我的hashcode为相同的对象返回不同的值,并且我想通过我自己设置相等性.distinct有2个重载方法 - 一个没有params,一个有Func1 param,我想我应该使用第二种方法,但是如何exaclty?
.distinct(new Func1<ActivityManager.RunningServiceInfo, Object>() {
@Override
public Object call(ActivityManager.RunningServiceInfo runningServiceInfo) {
return null;
}
})
Run Code Online (Sandbox Code Playgroud) 我正在开发具有后台数据同步功能的Android应用程序.我目前正在使用RxJava定期在服务器上发布一些数据.除此之外,我想为用户提供一个"强制同步"按钮,它会立即触发同步.我知道如何使用Observable.interval()以定期时间间隔推送数据,我知道如何使用它Observalbe.just()来推送那个被强制的数据,但是我想将它们排队,如果发生这种情况,那么在前一个仍然运行的情况下触发它.
所以让我们举个例子,当1min是自动同步的间隔时,让我们说同步持续40秒(我夸张这里只是为了更容易点).现在,如果有任何机会,用户在自动仍在运行时按下"强制"按钮(反之亦然 - 强制一个仍在运行时自动触发),我想将第二个同步请求排队第一个完成.
我画了这张图片,可能会有更多的视角:
如您所见,自动触发(由某些人Observable.interval()),在同步过程中,用户按下"强制"按钮.现在我们要等待第一个请求完成,然后再次启动强制请求.有一次,当强制请求正在运行时,再次触发了新的自动请求,只是将其添加到队列中.从队列中完成最后一个之后,一切都停止了,然后稍后再次安排自动计划.
希望有人能指出我纠正操作员如何做到这一点.我已尝试过Observable.combineLatest(),但是队列列表在开始时被调度,当我向队列添加新的同步时,它在前一个操作完成时没有继续.
Darko,非常感谢任何帮助
我该如何使用RxJava TestScheduler?我来自.NET背景,但TestSchedulerRxJava似乎与.NET rx中的测试调度程序的工作方式不同.
这是我想测试的示例代码
Observable<Long> tick = Observable.interval(1, TimeUnit.SECONDS);
contactsRepository.find(index)
.buffer(MAX_CONTACTS_FETCH)
.zipWith(tick, new Func2<List<ContactDto>, Long, List<ContactDto>>() {
@Override
public List<ContactDto> call(List<ContactDto> contactList, Long aLong) {
return contactList;
}
}).subscribe()
Run Code Online (Sandbox Code Playgroud)
我试过了:
subscribeOn(testScheduler)
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
testScheduler.triggerActions();
Run Code Online (Sandbox Code Playgroud)
没有运气.
鉴于:
Integer[] arr1 = {1, 5, 9, 17};
Integer[] arr2 = {1, 2, 3, 6, 7, 12, 15};
Observable<Integer> o1 = Observable.from(arr1);
Observable<Integer> o2 = Observable.from(arr2);
Run Code Online (Sandbox Code Playgroud)
如何获得包含的Observable 1, 1, 2, 3, 5, 6, 7, 9, 12, 15, 17?
我想知道是否有一个cache()运营商可以缓存x个数量的排放,但也会在指定的时间间隔(例如1分钟)后使它们到期.我在寻找像......
Observable<ImmutableList<MyType>> cachedList = otherObservable
.cache(1, 1, TimeUnit.MINUTES);
Run Code Online (Sandbox Code Playgroud)
这将缓存一个项目,但会在一分钟后过期并清除缓存.
我做了一些研究,找到了重播算子.看起来它可以满足这种需求,但我有一些问题.为什么它很热并且需要连接?这是否与cache()运营商不同?我知道cache()模仿一个主题,但它不需要连接.
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).toBlocking().forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
但是如果重试用完,我该如何传播错误?
添加.doOnError(System.out::println) 后的retryWhen条款不捕获错误.它甚至被排出了吗?
添加.doOnError(System.out::println) 之前always fails重试时显示所有重试次数.
我有RecyclerView无尽的滚动.因此,当用户到达last - 2列表中的位置时,我呼叫服务器以获取更多数据,并且在通话过程中,我再添加一个项目 - 进度一项.
现在,我正在尝试使用Espresso编写体面的UI测试,它将检查无限滚动当前是否正在工作:
@Test
public void checkIfProgressShown() {
InstaFeed feed = TestDataFactory.makeInstaFeed(20);
InstaFeed oldFeed = TestDataFactory.makeInstaFeed(20);
when(mockDataManager.getFeedItemsFromServer()).thenReturn(Observable.just(feed.getInstaItems()));
when(mockDataManager.getOldFeedItemsFromServer()).thenReturn(Observable.just(oldFeed.getInstaItems())
.delay(2, TimeUnit.SECONDS));
instaActivityActivityTestRule.launchActivity(null);
int position = 0;
for (InstaItem item : feed.getInstaItems()) {
onView(withId(R.id.recycler_view))
.perform(RecyclerViewActions.scrollToPosition(position));
onView(withText(item.getLocation().getName()))
.check(matches(isDisplayed())); // Line of crash
position++;
}
onView(withId(R.id.progress))
.check(matches(isDisplayed()));
}
Run Code Online (Sandbox Code Playgroud)
所以基本上说,我试图延迟Observable的响应,显示新的一批项目,以便Espresso可以向下滚动到最后一项并使我的progress项目可见.问题在于测试只是堆栈,从UI的角度来看它看起来像:
不要混淆,这里没有进度条 - 在设备上禁用动画,所以没关系.最后,小项是有进展的项目
60秒后它崩溃了 -
android.support.test.espresso.AppNotIdleException: Looped for 3019 iterations over 60 SECONDS. The following Idle Conditions failed .
at dalvik.system.VMStack.getThreadStackTrace(Native Method)
at java.lang.Thread.getStackTrace(Thread.java:580)
at android.support.test.espresso.base.DefaultFailureHandler.getUserFriendlyError(DefaultFailureHandler.java:82) …Run Code Online (Sandbox Code Playgroud)