似乎这两个功能非常相似.它们具有相同的签名(接受rx.functions.Func1<? super T, ? extends Observable<? extends R>> func),并且它们的大理石图看起来完全相同.不能在这里粘贴图片,但这里是concatMap的一个,这里是flatMap的一个.在结果的描述中似乎存在一些细微的差别Observable,其中一个由concatMap包含由结合的Observable产生的flatMap项产生,而由包含由首先合并结果的Observable 产生的项产生,并且发出该合并的结果.
然而,这种微妙之处对我来说完全不清楚.任何人都可以更好地解释这种差异,并且理想地给出一些说明这种差异的例子.
我有一个返回an的方法Observable<ArrayList<Long>>,它是一些Items的id.我想通过这个列表并使用另一个返回的方法下载每个Item Observable<Item>.
我如何使用RxJava运算符执行此操作?
将Retrofit与Rxjava结合使用有什么好处?
在哪些情况下我应该使用doOnNext,在哪些情况下doOnEach?
.doOnEach(new Action1<Notification<? super MessageProfile>>() {
@Override
public void call(Notification<? super MessageProfile> notification) {
}
})
.doOnNext(new Action1<MessageProfile>() {
@Override
public void call(MessageProfile profile) {
messageProfileDao.save(profile);
}
})
Run Code Online (Sandbox Code Playgroud)
这看起来两个运算符具有相同的效果.
我正在检查RXJava的文档,我注意到concat和merge运算符似乎也是这样.我写了几个测试以确定.
@Test
public void testContact() {
Observable.concat(Observable.just("Hello"),
Observable.just("reactive"),
Observable.just("world"))
.subscribe(System.out::println);
}
@Test
public void testMerge() {
Observable.merge(Observable.just("Hello"),
Observable.just("reactive"),
Observable.just("world"))
.subscribe(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)
文件说
Merge运算符也类似.它结合了两个或多个Observable的发射,但可以交错它们,而Concat从不交错来自多个Observable的发射.
但是我还是不完全明白,运行这个测试千次,合并结果总是一样的.由于订单未被授予,我期待有时"反应性""世界""你好".
我想知道如何忽略异常并继续无限流(在我的情况下,位置流)?
我正在获取当前用户位置(使用Android-ReactiveLocation),然后将它们发送到我的API(使用Retrofit).
在我的情况下,当在网络调用(例如超时)期间发生异常时,调用onError方法并且流自行停止.怎么避免呢?
活动:
private RestService mRestService;
private Subscription mSubscription;
private LocationRequest mLocationRequest = LocationRequest.create()
.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
.setInterval(100);
...
private void start() {
mRestService = ...;
ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this);
mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
.buffer(50)
.flatMap(locations -> mRestService.postLocations(locations)) // can throw exception
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
Run Code Online (Sandbox Code Playgroud)
RestService:
public interface RestService {
@POST("/.../")
Observable<Response> postLocations(@Body List<Location> locations);
}
Run Code Online (Sandbox Code Playgroud) 我是rxjava的新手.我需要组合两个发出不同类型对象的observable.喜欢的东西Observable<Milk>,并Observable<Cereals>和获得Observable<CerealsWithMilk>.对于像这样的事情,我找不到任何操作员.做这样的事情的rx方式是什么?请注意,Milk并且Cereals是异步的.
我有点麻烦了解subscribeOn/observeOn如何在RxJava中工作.我创建了一个带有observable的简单应用程序,可以发出太阳系行星名称,进行一些映射和过滤并打印结果.
据我所知,调度工作到后台线程是通过subscribeOn运算符完成的(它似乎工作正常).
观察后台线程也适用于observeOn运营商.
但是我在理解,如何观察调用线程(如果它是主线程或任何其他线程)时遇到了麻烦.它很容易在Android上运行AndroidSchedulers.mainThread(),但我不知道如何在纯java中实现这一点.
这是我的代码:
public class Main {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
System.out.println("Main thread: " + getCurrentThreadInfo());
Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
.map(in -> {
System.out.println("map on: " + getCurrentThreadInfo());
return in.toUpperCase();
})
.filter(in -> {
System.out.println("filter on: " + getCurrentThreadInfo());
return in.contains("A");
})
.subscribeOn(Schedulers.from(executor));
for (int i = 0; i < …Run Code Online (Sandbox Code Playgroud) 我有两个可完成的.我想做以下场景:如果第一个Completable到达onComplete,继续第二个Completable.最终结果将是第二次完成的完成.
当我有单个getUserIdAlreadySavedInDevice()和Completable login()时,我就是这样做的:
@Override
public Completable loginUserThatIsAlreadySavedInDevice(String password) {
return getUserIdAlreadySavedInDevice()
.flatMapCompletable(s -> login(password, s))
}
Run Code Online (Sandbox Code Playgroud) 我正在使用RxJava.
我有一个Observable<T>.我如何将其转换为List<T>?
似乎是一个简单的操作,但我无法在网上的任何地方找到它.
rx-java ×10
java ×7
android ×2
rx-android ×2
asynchronous ×1
concatmap ×1
flatmap ×1
java-8 ×1
monads ×1
observable ×1
retrofit ×1
retrofit2 ×1
rx-java2 ×1