我必须定期轮询一些RESTful端点以刷新我的Android应用程序的数据.我还必须根据连接暂停和恢复它(如果手机处于离线状态,则无需尝试).我目前的解决方案是有效的,但它使用标准的Java ScheduledExecutorService来执行周期性任务,但我想留在Rx范例中.
这是我当前的代码,为简洁起见,部分内容被跳过.
userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() {
@Override
public void call(final Subscriber<? super UserProfile> subscriber) {
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final Runnable runnable = new Runnable() {
@Override
public void run() {
// making http request here
}
};
final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1);
networkStatusObservable.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean networkAvailable) {
if (!networkAvailable) {
pause();
} else {
pause();
futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS));
}
}
private void pause() {
for (ScheduledFuture<?> future : futures) {
future.cancel(true);
}
futures.clear();
}
});
final Subscription subscription = new Subscription() {
private boolean isUnsubscribed = false;
@Override
public void unsubscribe() {
scheduledExecutorService.shutdownNow();
isUnsubscribed = true;
}
@Override
public boolean isUnsubscribed() {
return isUnsubscribed;
}
};
subscriber.add(subscription);
}
}).multicast(BehaviorSubject.create()).refCount();
Run Code Online (Sandbox Code Playgroud)
networkStatusObservable基本上是一个广播接收器包裹Observable<Boolean>,表明手机已连接到网络.
正如我所说,这个解决方案正在运行,但是我想使用Rx方法进行定期轮询并发出新的UserProfiles,因为手动调度事情有很多问题,我想避免这种问题.我知道Observable.timer并且Observable.interval,但无法弄清楚如何将它们应用于此任务(我不确定是否需要使用它们).
Rob*_*ill 26
在这个GitHub问题上有一些方法可能会对您有所帮助.
https://github.com/ReactiveX/RxJava/issues/448
这三个实现是:
Observable.interval(delay, TimeUnit.SECONDS).timeInterval()
.flatMap(new Func1<Long, Observable<Notification<AppState>>>() {
public Observable<Notification<AppState>> call(Long seconds) {
return lyftApi.updateAppState(params).materialize(); } });
Run Code Online (Sandbox Code Playgroud)
Scheduler.schedulePeriodically
Observable.create({ observer ->
Schedulers.newThread().schedulePeriodically({
observer.onNext("application-state-from-network");
}, 0, 1000, TimeUnit.MILLISECONDS);
}).take(10).subscribe({ v -> println(v) });
Run Code Online (Sandbox Code Playgroud)
Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(final Observer<? super String> o) {
return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
@Override
public Subscription call(Scheduler inner, Long t2) {
o.onNext("data-from-polling");
return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS);
}
});
}
}).toBlockingObservable().forEach(new Action1<String>() {
@Override
public void call(String v) {
System.out.println("output: " + v);
}
});
Run Code Online (Sandbox Code Playgroud)
结论是手动递归是要走的路,因为它在调度下一次执行之前等待操作完成.
选项之一是使用Observable.interval并在发出间隔时检查用户状态:
Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS);
//pulling the user data
Observable<Observable<String>> userObservable = interval.map(new Func1<Long, Observable<String>>() {
Random random = new Random();
@Override
public Observable<String> call(Long tick) {
//here you are pulling user data; you should do it asynchronously - rx way - because the interval is using Schedulers.computation which is not best suited for doing io operations
switch(random.nextInt(10)){
case 0://suppose this is for cases when network in not available or exception happens
return Observable.<String>just(null);
case 1:
case 2:
return Observable.just("Alice");
default:
return Observable.just("Bob");
}
}
});
Observable<String> flatUsers = userObservable.flatMap(new Func1<Observable<String>, Observable<? extends String>>() {
@Override
public Observable<? extends String> call(Observable<String> stringObservable) {
return stringObservable;
}
});
//filter valid data
Observable<String> usersWithoutErrors = flatUsers.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s != null;
}
});
//publish only changes
Observable<String> uniqueUsers = usersWithoutErrors.distinctUntilChanged();
Run Code Online (Sandbox Code Playgroud)
如果您的networkStatusObservable发出事件的频率至少与检查用户数据所需的频率相同,则您可以做得更简单
networkStatusObservable.sample(1,TimeUnit.Seconds).filter(/*the best is to filter only connected state */).map(/*now start pulling the user data*/)
Run Code Online (Sandbox Code Playgroud)
最后,您可以创建可观察的对象,它使用调度程序定期发出用户状态-请参阅调度程序文档以了解最适合您的调度程序:
public abstract class ScheduledOnSubscribe<T> implements Observable.OnSubscribe<T>{
private final Scheduler scheduler;
private final long initialDelay;
private final long period;
private final TimeUnit unit;
public ScheduledOnSubscribe(Scheduler scheduler, long initialDelay, long period, TimeUnit unit) {
this.scheduler = scheduler;
this.initialDelay = initialDelay;
this.period = period;
this.unit = unit;
}
abstract T next() throws Exception;
@Override
public void call(final Subscriber<? super T> subscriber) {
final Scheduler.Worker worker = scheduler.createWorker();
subscriber.add(worker);
worker.schedulePeriodically(new Action0() {
@Override
public void call() {
try {
subscriber.onNext(next());
} catch (Throwable e) {
try {
subscriber.onError(e);
} finally {
worker.unsubscribe();
}
}
}
}, initialDelay, period, unit);
}
}
//And here is the sample usage
Observable<String> usersObservable = Observable.create(new ScheduledOnSubscribe(Schedulers.io(), 1, 1, TimeUnit.SECONDS ){
Random random = new Random();
@Override
String next() throws Exception {
//if you use Schedulers.io, you can call the remote service synchronously
switch(random.nextInt(10)){
case 0:
return null;
case 1:
case 2:
return "Alice";
default:
return "Bob";
}
}
});
Run Code Online (Sandbox Code Playgroud)
简短的回答。RxJava2:
Observable.interval(initialDelay, unitAmount, timeUnit)
.subscribe(value -> {
// code for periodic execution
});
Run Code Online (Sandbox Code Playgroud)
根据您的需要选择initialDelay、unitAmount 和TimeUnit。
示例:0、1、TimeUnit.MINUTES。