标签: rx-java

在RxJava中concatMap和flatMap有什么区别

似乎这两个功能非常相似.它们具有相同的签名(接受rx.functions.Func1<? super T, ? extends Observable<? extends R>> func),并且它们的大理石图看起来完全相同.不能在这里粘贴图片,但这里是concatMap的一个,这里是flatMap的一个.在结果的描述中似乎存在一些细微的差别Observable,其中一个由concatMap包含由结合的Observable产生的flatMap项产生,而由包含由首先合并结果的Observable 产生的项产生,并且发出该合并的结果.

然而,这种微妙之处对我来说完全不清楚.任何人都可以更好地解释这种差异,并且理想地给出一些说明这种差异的例子.

java rx-java flatmap concatmap

43
推荐指数
4
解决办法
2万
查看次数

RxJava - 获取列表中的每个项目

我有一个返回an的方法Observable<ArrayList<Long>>,它是一些Items的id.我想通过这个列表并使用另一个返回的方法下载每个Item Observable<Item>.

我如何使用RxJava运算符执行此操作?

java monads reactive-programming rx-java

41
推荐指数
2
解决办法
4万
查看次数

41
推荐指数
2
解决办法
2038
查看次数

RxJava:d​​oOnNext和doOnEach之间的区别

在哪些情况下我应该使用doOnNext,在哪些情况下doOnEach?

 .doOnEach(new Action1<Notification<? super MessageProfile>>() {
                @Override
                public void call(Notification<? super MessageProfile> notification) {

                }
            })
 .doOnNext(new Action1<MessageProfile>() {
                @Override
                public void call(MessageProfile profile) {
                    messageProfileDao.save(profile);
                }
            })
Run Code Online (Sandbox Code Playgroud)

这看起来两个运算符具有相同的效果.

java rx-java

40
推荐指数
1
解决办法
1万
查看次数

Concat VS Merge运算符

我正在检查RXJava的文档,我注意到concat和merge运算符似乎也是这样.我写了几个测试以确定.

@Test
public void testContact() {

    Observable.concat(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
              .subscribe(System.out::println);
}

@Test
public void testMerge() {

    Observable.merge(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
            .subscribe(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)

文件说

Merge运算符也类似.它结合了两个或多个Observable的发射,但可以交错它们,而Concat从不交错来自多个Observable的发射.

但是我还是不完全明白,运行这个测试千次,合并结果总是一样的.由于订单未被授予,我期待有时"反应性""世界""你好".

代码在这里https://github.com/politrons/reactive

java java-8 rx-java

40
推荐指数
2
解决办法
3万
查看次数

如何忽略错误并继续无限流?

我想知道如何忽略异常并继续无限流(在我的情况下,位置流)?

我正在获取当前用户位置(使用Android-ReactiveLocation),然后将它们发送到我的API(使用Retrofit).

在我的情况下,当在网络调用(例如超时)期间发生异常时,调用onError方法并且流自行停止.怎么避免呢?

活动:

private RestService mRestService;
private Subscription mSubscription;
private LocationRequest mLocationRequest = LocationRequest.create()
            .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
            .setInterval(100);
...
private void start() {
    mRestService = ...;
    ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this);
    mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
            .buffer(50)
            .flatMap(locations -> mRestService.postLocations(locations)) // can throw exception
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}
Run Code Online (Sandbox Code Playgroud)

RestService:

public interface RestService {
    @POST("/.../")
    Observable<Response> postLocations(@Body List<Location> locations);
}
Run Code Online (Sandbox Code Playgroud)

android rx-java rx-android

39
推荐指数
4
解决办法
3万
查看次数

rxjava合并不同类型的observable

我是rxjava的新手.我需要组合两个发出不同类型对象的observable.喜欢的东西Observable<Milk>,并Observable<Cereals>和获得Observable<CerealsWithMilk>.对于像这样的事情,我找不到任何操作员.做这样的事情的rx方式是什么?请注意,Milk并且Cereals是异步的.

asynchronous observable rx-java rx-android

39
推荐指数
2
解决办法
2万
查看次数

RxJava观察调用/订阅线程

我有点麻烦了解subscribeOn/observeOn如何在RxJava中工作.我创建了一个带有observable的简单应用程序,可以发出太阳系行星名称,进行一些映射和过滤并打印结果.

据我所知,调度工作到后台线程是通过subscribeOn运算符完成的(它似乎工作正常).

观察后台线程也适用于observeOn运营商.

但是我在理解,如何观察调用线程(如果它是主线程或任何其他线程)时遇到了麻烦.它很容易在Android上运行AndroidSchedulers.mainThread(),但我不知道如何在纯java中实现这一点.

这是我的代码:

public class Main {

    public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

        System.out.println("Main thread: " + getCurrentThreadInfo());

        Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
                .map(in -> {
                    System.out.println("map on: " + getCurrentThreadInfo());
                    return in.toUpperCase();
                })
                .filter(in -> {
                    System.out.println("filter on: " + getCurrentThreadInfo());
                    return in.contains("A");
                })
                .subscribeOn(Schedulers.from(executor));

        for (int i = 0; i < …
Run Code Online (Sandbox Code Playgroud)

java multithreading reactive-programming rx-java

39
推荐指数
2
解决办法
3万
查看次数

如何在RxJava2中链接两个Completable

我有两个可完成的.我想做以下场景:如果第一个Completable到达onComplete,继续第二个Completable.最终结果将是第二次完成的完成.

当我有单个getUserIdAlreadySavedInDevice()和Completable login()时,我就是这样做的:

@Override
public Completable loginUserThatIsAlreadySavedInDevice(String password) {
    return getUserIdAlreadySavedInDevice()
            .flatMapCompletable(s -> login(password, s))

}
Run Code Online (Sandbox Code Playgroud)

java rx-java rx-java2

38
推荐指数
4
解决办法
2万
查看次数

将observable转换为list

我正在使用RxJava.

我有一个Observable<T>.我如何将其转换为List<T>

似乎是一个简单的操作,但我无法在网上的任何地方找到它.

java reactive-programming rx-java

37
推荐指数
4
解决办法
3万
查看次数