tmn*_*tmn 5 java reactive-programming rx-java
我经常遇到一个模式,我不太确定如何有效地解决它。
基本上,如果我Observable<T>持有一个昂贵的物品T,我不想在T每次使用它时重新构建该物品,或者将其映射到一千个不同的其他可观察对象,这将导致它被构建 1000 次。
所以我开始使用replay()缓存它一段时间,但理想情况下我希望它在排放闲置一段时间时清除缓存。
是否有我可以使用的操作员或某些变压器来完成此操作?
public final class ActionManager {
private final Observable<ImmutableList<Action>> actionMap;
private ActionManager() {
this.actionMap = Observable.defer(() -> buildExpensiveList()).replay(10, TimeUnit.SECONDS).autoConnect();
}
//this method could get called thousands of times
//I don't want to rebuild the map for every call
public Observable<Action> forItem(Item item) {
actionMap.map(l -> //get Action for item);
}
}
Run Code Online (Sandbox Code Playgroud)
更新
试图将其实现为 Transformer/Operator 组合。我在这里做错了吗?
public static <T> Transformer<T,T> recacheOnIdle(long time, TimeUnit timeUnit) {
return obs -> obs.timeout(time, timeUnit).lift(new Operator<T,T>() {
private volatile T cachedItem;
private volatile boolean isCurrent = false;
@Override
public Subscriber<? super T> call(Subscriber<? super T> s) {
return new Subscriber<T>(s) {
@Override
public void onCompleted() {
if(!s.isUnsubscribed()) {
s.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if(!s.isUnsubscribed()) {
if (e instanceof TimeoutException) {
isCurrent = false;
cachedItem = null;
} else {
s.onError(e);
}
}
}
@Override
public void onNext(T t) {
if(!s.isUnsubscribed()) {
if (!isCurrent) {
cachedItem = t;
}
s.onNext(cachedItem);
}
}
};
}
});
}
Run Code Online (Sandbox Code Playgroud)
您也许可以使用超时运算符和可连接的可观察对象(以保存和同步多个订阅者):
镜像源 Observable,但如果在特定时间段内没有发出任何项目,则发出错误通知
通过这种方式,您可以通过重新缓存昂贵的项目来响应引发的错误。假设这是一个“罕见”案例:
// if no emissions are made for a period of 3 seconds - will call onError
observableWithCache.timeout(3000, TimeUnit.MILLISECONDS).subscribe(new Subscriber<SomeObject>() {
public void onCompleted() {
}
public void onError(Throwable arg0) {
doClearCache(); // make sure to re-subscribe with timeout
}
public void onNext(SomeObject item) {
System.out.println("Got item: " + item); // you can ignore this
}
});
Run Code Online (Sandbox Code Playgroud)
请注意,这onError不会取消原始可观察量,如图所示:
但您可以对没有排放的一段时间做出反应。