我需要一些帮助来实现RxJava中的并行异步调用.我已经选择了一个简单的用例,其中FIRST调用获取(而不是搜索)要显示的产品列表(Tile).随后的电话会出来并获取(A)评论和(B)产品图片
经过几次尝试,我到了这个地方.
1 Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm);
2 List<Tile> allTiles = new ArrayList<Tile>();
3 ClientResponse response = new ClientResponse();
4 searchTile.parallel(oTile -> {
5 return oTile.flatMap(t -> {
6 Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId());
7 Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId());
8 return Observable.zip(reviews, imageUrl, (r, u) -> {
9 t.setReviews(r);
10 t.setImageUrl(u);
11 return t;
12 });
13 });
14 }).subscribe(e -> {
15 allTiles.add((Tile) e);
16 });
Run Code Online (Sandbox Code Playgroud)
第1行:熄灭并取出要显示的产品(平铺)
第4行:我们获取Observable和SHARD的列表以获取评论和imageUrls
谎言6,7:获取Observable评论和Observable url
第8行:最后将2个可观察对象压缩以返回更新的Observable
第15行:最后第15行整理所有要在集合中显示的单个产品,这些产品可以返回到调用层
虽然Observable已被分片,但在我们的测试中运行了4个不同的线程; 获取评论和图像似乎是一个接一个.我怀疑第8行的zip步骤基本上导致了2个observables(review和url)的顺序调用.

这个小组是否有任何关于平行获取reiews和图片网址的建议.实质上,上面附带的瀑布图应该看起来更垂直堆叠.对评论和图像的调用应该是并行的
谢谢anand raman
我有SourceObjects列表,我需要将它转换为ResultObjects列表.
我可以使用ResultObject的方法将一个对象提取到另一个对象:
convertFromSource(srcObj);
Run Code Online (Sandbox Code Playgroud)
当然我可以这样做:
public void onNext(List<SourceObject> srcObjects) {
List<ResultsObject> resObjects = new ArrayList<>();
for (SourceObject srcObj : srcObjects) {
resObjects.add(new ResultsObject().convertFromSource(srcObj));
}
}
Run Code Online (Sandbox Code Playgroud)
但我会非常感谢能够使用rxJava展示如何做同样事情的人.
我们什么时候应该使用doOnNext()可观察的,而不是仅仅onNext()?
我在为我的android项目理解RxJava中的zip操作符时遇到了很多麻烦.问题我需要能够发送网络请求上传视频然后我需要发送网络请求上传图片最后我需要添加说明并使用前两个请求的响应上传视频和图片的位置网址以及我的服务器的说明.
我认为zip操作符对于此任务是完美的,因为我知道我们可以接受两个可观察量(视频和图片请求)的响应并将它们用于我的最终任务.但我似乎无法让这种情况发生在我想象中.
我正在寻找一个人来回答如何用一些伪代码在概念上完成这个.谢谢
我对android中的EventBus和RxJava之间的区别感到困惑.我需要为我的问题实现其中一个关于在完成一些更改时通知某些组件的问题,以便他们可以更新其状态.
另外,我读到EventsBus已经被RxJava弃用了,我不知道这些信息是否属实.
@DELETE("/job/deletejob")
Observable<JobDeleteResponseModel> jobDelete( @Body JobDeleteRequestModel model);
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
非身体HTTP方法不能包含@Body或@TypedOutput
谁能帮助我从这里出来?
我在尝试为正在使用的演示者运行JUnit测试时遇到RuntimeException observeOn(AndroidSchedulers.mainThread()).
由于它们是纯JUnit测试而不是Android检测测试,因此它们无法访问Android依赖项,导致我在执行测试时遇到以下错误:
java.lang.ExceptionInInitializerError
at io.reactivex.android.schedulers.AndroidSchedulers$1.call(AndroidSchedulers.java:35)
at io.reactivex.android.schedulers.AndroidSchedulers$1.call(AndroidSchedulers.java:33)
at io.reactivex.android.plugins.RxAndroidPlugins.callRequireNonNull(RxAndroidPlugins.java:70)
at io.reactivex.android.plugins.RxAndroidPlugins.initMainThreadScheduler(RxAndroidPlugins.java:40)
at io.reactivex.android.schedulers.AndroidSchedulers.<clinit>(AndroidSchedulers.java:32)
…
Caused by: java.lang.RuntimeException: Method getMainLooper in android.os.Looper not mocked. See http://g.co/androidstudio/not-mocked for details.
at android.os.Looper.getMainLooper(Looper.java)
at io.reactivex.android.schedulers.AndroidSchedulers$MainHolder.<clinit>(AndroidSchedulers.java:29)
...
java.lang.NoClassDefFoundError: Could not initialize class io.reactivex.android.schedulers.AndroidSchedulers
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
…
Run Code Online (Sandbox Code Playgroud) 我正在片段中使用RxJava ReplaySubject.
我试图以某种方式使用ReplaySubject,我希望Subject执行一个过程直到完成(可能超出片段的生命周期).
在完成该过程后,我想释放资源 - 据我所知 - 通过在注册观察者时取消订阅(在我的情况下,是主题本身)来完成.
在这个github问题中,线程 @benjchristensen说:
如果它是一个Observable,那么它应该发出一个onCompleted并完成.
如果它是Observer,则它应该取消订阅它在调用Observable.subscribe时收到的订阅,它将使Observable有机会关闭和清理.
如果它是一个主体 - 它既是观察者又是观察者 - 行为是什么?如果我在主题上调用onComplete,这是否基本上意味着订阅被停止,因此我不需要通过注册观察者手动取消订阅我获得的订阅?
似乎这两个功能非常相似.它们具有相同的签名(接受rx.functions.Func1<? super T, ? extends Observable<? extends R>> func),并且它们的大理石图看起来完全相同.不能在这里粘贴图片,但这里是concatMap的一个,这里是flatMap的一个.在结果的描述中似乎存在一些细微的差别Observable,其中一个由concatMap包含由结合的Observable产生的flatMap项产生,而由包含由首先合并结果的Observable 产生的项产生,并且发出该合并的结果.
然而,这种微妙之处对我来说完全不清楚.任何人都可以更好地解释这种差异,并且理想地给出一些说明这种差异的例子.
是否可以使用RxJava实现下一个链接:
loginObservable()
.then( (someData) -> {
// returns another Observable<T> with some long operation
return fetchUserDataObservable(someData);
}).then( (userData) -> {
// it should be called when fetching user data completed (with userData of type T)
cacheUserData(userData);
}).then( (userData) -> {
// it should be called after all previous operations completed
displayUserData()
}).doOnError( (error) -> {
//do something
})
Run Code Online (Sandbox Code Playgroud)
我发现这个库非常有趣,但无法想象我们如何将请求链接到彼此依赖于之前的地方.
rx-java ×10
android ×6
java ×6
asynchronous ×1
concatmap ×1
event-bus ×1
flatmap ×1
junit ×1
observable ×1
promise ×1
retrofit ×1
rx-java2 ×1
scala ×1
unit-testing ×1