标签: rx-java

连续执行不同的完成项

我目前正在尝试使用Java中的反应式扩展来实现特定的结果,但是我无法做到这一点,也许你们中的某人可以帮助我。

firstCompletable
  .onErrorComplete(t -> specificErrorHandlingOne())
  .andThen(secondCompletable())
  .onErrorComplete(t -> specificErrorHandlingTwo())
  .andThen(thirdCompletable())
  .onErrorComplete(t -> specificErrorHandlingThree())
  .andThen(fourthCompletable())
  .onErrorComplete(t -> specificErrorHandlingFour())
  .subscribe(viewCallback::showSuccess)
Run Code Online (Sandbox Code Playgroud)

但是,例如在secondCompletable中存在错误时,将执行特定的错误处理,但随后仍在计划其他Completable。我希望如果其中一个Completables失败,整个Completables链将停止执行。我该怎么做?

我已经尝试过使用doOnError代替,但这只是在没有抛出特定错误的堆栈跟踪中结束。

rx-java reactivex

1
推荐指数
1
解决办法
933
查看次数

RXJava处理嵌套调用

想象一下,我有一个服务来检索一些作者(id,名字,姓氏,生日,国籍)和第二个服务来检索作者的书(标题,摘要,页数).我希望我的网络电话会给我一份带有他们书籍的作者名单.

为了说明,我的服务会返回AuthorOResponse和BookResponse,我想发出一个Observable of AuthorObject.

public class AuthorResponse {
    private String id;
    private String firstname;
    private String lastname;
    private Date birthday;
    private String nationality;
}

public class BookResponse {
    private String title;
    private String summary;
    private int pageNumber;
}

public class AuthorObject {
    private String id;
    private String firstname;
    private String lastname;
    private Date birthday;
    private String nationality;
    private List<BookObject> books
}
Run Code Online (Sandbox Code Playgroud)

我的服务就像是

Observable<List<AuthorResponse>> getAuthors()
Observable<List<BookResponse>> getAuthorBooks(String authorId)
Run Code Online (Sandbox Code Playgroud)

我想用它们来发出一个Observable,但由于我对getAuthorBooks的每次调用都需要一个作者,所以无法弄明白怎么做.

我想出了一些看起来像这样的东西,但我有一个问题,因为我的concatMap让我"松散"了我的作者.

myServices.getAuthors()
                .map(author -> createAuthor())
                .concatMap(author -> mWebService.getAuthorBooks(author.getId())
                .flatMapIterable(data -> data.items)
                .map(book …
Run Code Online (Sandbox Code Playgroud)

android rx-java retrofit

1
推荐指数
1
解决办法
1514
查看次数

RxJava自上次事件以来忽略X时间的事件?

我想知道是否有一种干净的方法来实现一个observable,它可以过滤掉在最近发出的事件之后的时间窗口内发生的任何事件?

我目前有这个:

    source.timeInterval(TimeUnit.MILLISECONDS)
            .filter(new Predicate<Timed<Object>>() {
                final long TIME_LIMIT = 10 * 1000;
                long totalTime = 0;

                @Override
                public boolean test(@NonNull Timed<Object> objectTimed) throws Exception {
                    totalTime += objectTimed.time();

                    if (totalTime > TIME_LIMIT) {
                        totalTime = 0;
                        return true;
                    }
                    return false;
                }
            })
            .subscribe(objectTimed -> {
                doSomething(objectTimed)
            });
Run Code Online (Sandbox Code Playgroud)

这在技术上可以解决问题,但是在过滤器中需要一些额外的状态,这有点难看并且阻止我使用lambda.相反,我想看看是否有办法组成可以做同样事情的观察者.

java android rx-java reactivex

1
推荐指数
1
解决办法
499
查看次数

观察者的意见较少?

正如ReactiveX简介中所述- 可观察者的意见较少

ReactiveX不偏向某些特定的并发或异步性源.可以使用线程池,事件循环,非阻塞I/O,演员(例如来自Akka)或任何适合您的需求,风格或专业知识的实现来实现Observable. 客户端代码将其与Observable的所有交互视为异步,无论您的底层实现是阻塞还是非阻塞,然而您选择实现它.

我没有得到这个部分 - " 你的底层实现是阻塞还是非阻塞 ".

你能解释一下吗?或者一些示例代码来解释这个?

asynchronous reactive-programming java-8 rx-java reactivex

1
推荐指数
1
解决办法
63
查看次数

RxJava:合并BehaviorSubjects

如何合并两个BehaviorSubjects,使它们表现得像一个BehaviorSubject

我有这样的事情:

class Solution {
    public static void main(String[] args) {
        Subject<List<Integer>> left = BehaviorSubject.createDefault(Arrays.asList(1, 2, 3));
        Subject<List<Integer>> right = BehaviorSubject.createDefault(Arrays.asList(4, 5, 6));
        Single<List<Integer>> merged = left.mergeWith(right).reduce(new ArrayList<Integer>(), (l, r) -> {
            List<Integer> merged1 = new ArrayList<>(l.size() + r.size());
            merged1.addAll(l);
            merged1.addAll(r);
            return merged1;
        });
        merged.subscribe(System.out::println);
    }
}
Run Code Online (Sandbox Code Playgroud)

我希望得到一些东西[1, 2, 3, 4, 5, 6],但subscribe什么都不打印.

android java-8 rx-java rx-java2

1
推荐指数
1
解决办法
747
查看次数

转换改装回调值以返回包络对象

我的问题是关于使用RxJava for Android来处理来自Retrofit调用的数据的可能性.

我刚刚开始使用这些库,所以如果我的问题很简单,请耐心等待.

这是我的情景.

我有一个从服务器返回的json,看起来像这样

  { <--- ObjectResponse
     higher_level: {
        data: [
            {
               ...
               some fields,
               some inner object: {
               ....
                },
               other fields
            }, <----- obj 1

            ....,

            {
               ....
            }<---- obj n

         ]<-- array of SingleObjects

      }

  } <--- whole ObjectResponse
Run Code Online (Sandbox Code Playgroud)

我已经有改造得到这个响应并在ObjectResponse中解析.解析这个对象,我可以获得一个我可以像往常一样传递给我的RecyclerView适配器的List.

所以改造返回了ObjectResponse,它是整个服务器答案的模型,在改装回调中我操纵ObjectResponse来提取我的List然后传递给我的适配器.

现在,我有这样的事情

Call<ObjectResponse> call = apiInterface.getMyWholeObject();

call.enqueue(new Callback<ObjectResponse>() {
            @Override
            public void onResponse(Call<ObjectResponse> call, Response<ObjectResponse> response) {

                //various manipulation based on response.body() that in the ends 
                // lead to a List<SingleObject>

               mView.sendToAdapter(listSingleObject)
              }

            @Override
            public …
Run Code Online (Sandbox Code Playgroud)

java android rx-java rx-android retrofit2

1
推荐指数
1
解决办法
2667
查看次数

可流动的房间:初始化数据库,如果它是空的

我有以下@Dao,提供Flowable<User>流:

@Dao
interface UsersDao {
  @Query("SELECT * FROM users")
  fun loadUsers(): Flowable<List<User>>
}
Run Code Online (Sandbox Code Playgroud)

我希望流的订户在那里发生一些变化时立即接收数据库的更新.订阅Room Flowable我将获得开箱即用的功能.

我想要的是:如果数据库为空我想执行Web请求并将用户保存到数据库中.订阅者将自动接收刚刚发生的新更新.

现在我希望存储库的客户端不要知道所有的初始化逻辑:他所做的一切 - 他执行usersRepository.loadUsers().所有这些魔法应该发生在存储库类中:

class UsersRepository @Inject constructor(
    private val api: Api,
    private val db: UsersDao
) {

  fun loadUsers(): Flowable<List<User>> {
    ...
  }
}
Run Code Online (Sandbox Code Playgroud)

当然我可以使用以下方法:

fun loadUsers(): Flowable<List<User>> {
  return db.loadTables()
      .doOnSubscribe {
        if (db.getCount() == 0) {
          val list = api.getTables().blockingGet()
          db.insert(list)
        }
      }
}
Run Code Online (Sandbox Code Playgroud)

但我想在不使用副作用(doOn...运算符)的情况下构造流.我尝试过,composing()但没有多大帮助.被困在这一段时间了.

android kotlin rx-java rx-java2 android-room

1
推荐指数
1
解决办法
916
查看次数

RxJava 2,flatmap的resultSelector参数会发生什么?

在RxJava1中,flatmap有一个重载的方法,允许您保留源值并将其传递到流中。

我从以下博客文章中获得了这些知识

https://medium.com/rxjava-tidbits/rxjava-tidbits-1-use-flatmap-and-retain-original-source-value-4ec6a2de52d4

但是,转到RxJava2,我似乎找不到它。我检查了Rx1和Rx2的更改,但未列出。我想知道它是否仍然存在,但也许我找的地方不对。

我正在使用Single顺便说一句。

android rx-java rx-java2

1
推荐指数
1
解决办法
357
查看次数

在RxJava2中使用flatMap或zip?

我有一门课Student,它有两个字段gradeschool。这两个字段都需要从远程服务器中获取。当返回两个结果时,我新建了一个Student对象。

在的帮助下RxJava,我以两种方式完成了此操作,一种是flatMapin zip运算符,另一种是in 运算符。

Observable<String> gradeObservable =
        Observable.create((ObservableOnSubscribe<String>) emitter -> {
            Thread.sleep(1000);
            emitter.onNext("senior");
        }).subscribeOn(Schedulers.io());

Observable<String> schoolObservable =
        Observable.create((ObservableOnSubscribe<String>) emitter -> {
            emitter.onNext("MIT");
        }).subscribeOn(Schedulers.io());
Run Code Online (Sandbox Code Playgroud)

flatMap版本

gradeObservable
        .flatMap(grade ->
                schoolObservable.map(school -> {
                    Student student = new Student();
                    student.grade = grade;
                    student.school = school;
                    return student;
                }))
        .subscribe(student -> {
            System.out.println(student.grade);
            System.out.println(student.school);
        });
Run Code Online (Sandbox Code Playgroud)

压缩版本

 Observable.zip(gradeObservable, schoolObservable, (grade, school) -> {
    Student student = new Student();
    student.grade = grade;
    student.school = …
Run Code Online (Sandbox Code Playgroud)

rx-java rx-java2

1
推荐指数
1
解决办法
1782
查看次数

如何在BehaviourSubject中设置默认值

可能是一个菜鸟问题.如何为BehaviourSubject设置默认值.

我有一个有两个不同值的枚举

enum class WidgetState {
    HIDDEN,
    VISIBLE
}
Run Code Online (Sandbox Code Playgroud)

并且是发出状态的行为主体

val widgetStateEmitter: BehaviorSubject<WidgetState> = BehaviorSubject.create()
Run Code Online (Sandbox Code Playgroud)

写入视图逻辑时,我的发射器开始发射.但是默认情况下它是HIDDEN.如何将默认值设置为WidgetState.HIDDEN到我的发射器widgetStateEmitter

android kotlin rx-java

1
推荐指数
1
解决办法
781
查看次数