所以我正在玩RX(非常酷),我一直在转换我的api,它访问Android中的sqlite数据库以返回observables.
所以我开始尝试解决的问题之一就是"如果我想要进行3次API调用,得到结果,然后在完成后再进行一些处理,该怎么办?"
我花了一个小时或两个小时,但我最终找到了Zip功能,它帮助我轻松地:
Observable<Integer> one = getNumberedObservable(1);
Observable<Integer> two = getNumberedObservable(2);
Observable<Integer> three = getNumberedObservable(3);
Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
@Override
public Integer call(Integer arg0, Integer arg1, Integer arg2) {
System.out.println("Zip0: " + arg0);
System.out.println("Zip1: " + arg1);
System.out.println("Zip2: " + arg2);
return arg0 + arg1 + arg2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer arg0) {
System.out.println("Zipped Result: " + arg0);
}
});
public static Observable<Integer> getNumberedObservable(final int value) {
return Observable.create(new …Run Code Online (Sandbox Code Playgroud) 我想使用rxjava Observable处理Retrofit中的分页.我听从了另一个问题的建议.
我有超过100个页面需要被提取,但链在第20页的页面失败并停止对logserv的进一步订阅以及logcat中的以下日志
04-04 04:12:11.766 2951-3012/com.example.app I/dalvikvm? threadid=28: stack overflow on call to Ljava/util/concurrent/atomic/AtomicLongFieldUpdater$CASUpdater;.compareAndSet:ZLJJ
04-04 04:12:11.766 2951-3012/com.example.app I/dalvikvm? method requires 56+20+32=108 bytes, fp is 0x94b52350 (80 left)
04-04 04:12:11.766 2951-3012/com.example.app I/dalvikvm? expanding stack end (0x94b52300 to 0x94b52000)
04-04 04:12:11.766 2951-3012/com.example.app I/dalvikvm? Shrank stack (to 0x94b52300, curFrame is 0x94b548dc)
Run Code Online (Sandbox Code Playgroud)
有人知道为什么会这样吗?
更新:我知道这是由于递归而发生的,但是有一种更优雅的方式来处理改装和rxjava的分页吗?
Java 8 lambda流有一个peek()运算符,允许您对每个项执行void操作.这通常用于调试,但它也是一种欺骗和启动void操作而不映射到某些东西的好方法.
在RxJava中是否有相同的功能?也许我没有遵循良好的做法或反应性思考......但是在操作之前和之后创建状态标签会非常方便吗?如果peek()不受支持,是否有更好的模式可供遵循?
Observable<Item> Item= ...;
Label statusLabel = new Label();
Label resultLabel = new Label();
Observable<CalculatedItem> calculatedItem = calculated.subscribeOn(Schedulers.computation())
.peek(c -> statusLabel.setText("Working.."))
.map(c -> performExpensiveCalculation(c))
.peek(r -> statusLabel.setText(""));
calculatedItem.subscribe(c -> resultLabel.setText(c));
Run Code Online (Sandbox Code Playgroud) 我想在Android环境中学习rxjava.假设我有一个可观察到的网络调用结果.如果我理解正确,处理配置更改的一种常见方法是:
将observable存储在保留的片段/单例/应用程序对象中
将缓存运算符应用于observable
订阅/取消订阅适当的生命周期处理程序
这样做,我们不会松开一旦新配置发生后将重新观察到的observable的结果.
现在,我的问题是:
有没有办法强制observable发出一个新值(并使缓存的值无效)?每次我需要来自网络的新数据时,我是否需要创建一个新的observable(这听起来不像Android世界中的坏习惯,因为这会让gc做额外的工作)?
非常感谢,
费德里科
在沙发基地,Observables
有什么区别:Schedulers.io()和Schedulers.computation()
使用时出现 UndeliverableExceptioncompletable
public Completable createBucketWithStorageClassAndLocation() {
return Completable.complete()
.doFinally(() -> {
Bucket bucket =
storage.create(
BucketInfo.newBuilder(googleUploadObjectConfiguration.bucketName())
.setStorageClass(storageClass)
.setLocation(googleUploadObjectConfiguration.locationName())
.build());
}).doOnError(error -> LOG.error(error.getMessage()));
}
Run Code Online (Sandbox Code Playgroud)
异常是从 Google 存储中抛出的,这是正确的,但尝试处理doOnError方法
Caused by: com.google.cloud.storage.StorageException: You already own this bucket. Please select another name.
Run Code Online (Sandbox Code Playgroud)
RXJava 异常
io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | com.google.cloud.storage.StorageException: You already own this bucket. …Run Code Online (Sandbox Code Playgroud) 我在编辑文本搜索中使用rxandroid进行去抖操作
我用了
private void setUpText() {
_mSubscription = RxTextView.textChangeEvents(searchStation)//
.debounce(500, TimeUnit.MILLISECONDS)// default Scheduler is Computation
.observeOn(AndroidSchedulers.mainThread())//
.subscribe(getOps().getdata());
}
Run Code Online (Sandbox Code Playgroud)
和观察者一样
public Observer<TextViewTextChangeEvent> getdata()
{
return new Observer<TextViewTextChangeEvent>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(TextViewTextChangeEvent onTextChangeEvent) {
// stationsugession(onTextChangeEvent.text().toString());
//here i called link to get the data from the server
}
};
}
Run Code Online (Sandbox Code Playgroud)
我的问题是甚至在任何edittext更改发生之前调用链接.它没有调用textchange事件.我错过了什么我在这里做错了什么.我是rxandroid和rxjava的新手.
我用了
compile 'io.reactivex:rxandroid:1.1.0'
compile 'io.reactivex:rxjava:1.1.0'
compile 'com.jakewharton.rxbinding:rxbinding:0.2.0'
Run Code Online (Sandbox Code Playgroud)
编辑:
它现在可以工作,我在我的逻辑中得到一个空指针获取列表..当我使用onError方法并放置一个堆栈跟踪我发现问题.
如果你想跳过初始调用,那么我们应该将.skip(1)调用 到你的订阅对象.[感谢Daniel Lew ] …
我可以使用Retrofit + RxJava来聆听无尽的流吗?例如Twitter流.我有的是这个:
public interface MeetupAPI {
@GET("http://stream.meetup.com/2/rsvps/")
Observable<RSVP> getRSVPs();
}
MeetupAPI api = new Retrofit.Builder()
.baseUrl(MeetupAPI.RSVP_API)
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build()
.create(MeetupAPI.class);
api.getRSVPs()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(rsvp -> Log.d(TAG, "got rsvp"),
error -> Log.d(TAG, "error: " + error),
() -> Log.d(TAG, "onComplete"));
Run Code Online (Sandbox Code Playgroud)
但在解析第一个对象后调用"onComplete".有没有办法告诉Retrofit保持开放直到另行通知?
我正在尝试获取给定的最新值,Observable并在调用后立即将其发出.以下面的代码为例:
return Observable.just(myObservable.last())
.flatMap(myObservable1 -> {
return myObservable1;
})
.map(o -> o.x) // Here I want to end up with a T object instead of Observable<T> object
Run Code Online (Sandbox Code Playgroud)
这不起作用,因为通过这样做,flatMap将发射myObservable1,然后必须发射,以达到map.我不知道是否可以做这样的事情.有没有人知道如何实现这一目标?谢谢
我正在学习RxJava运算符,我发现下面这些代码没有打印任何东西:
public static void main(String[] args) {
Observable
.interval(1, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError -> " + e.getMessage());
}
@Override
public void onNext(Long l) {
System.out.println("onNext -> " + l);
}
});
}
Run Code Online (Sandbox Code Playgroud)
作为ReactiveX, interval
创建一个Observable,它发出一个由特定时间间隔隔开的整数序列
我犯了错误还是忘了什么?