如何将总是从特定线程调用其回调的侦听器包装到符合subscribeOn定义的调度程序的Observable中?

Ren*_*ari 9 android firebase rx-java firebase-realtime-database

那些不熟悉Android和/或Firebase开发的人简介:

在Android开发中,您应该总是从主线程(也称为UI线程)操作应用程序的视图,但如果您的应用程序需要进行一些繁重的处理,它应该使用后台线程,否则应用程序似乎没有响应.

Firebase是一种服务,它提供了一种在云中存储和同步数据的方法.它还提供了一个Android SDK来管理这个数据库.每次使用此SDK进行操作(如查询)时,Firebase都会通过在其自己的内部后台线程上进行所有繁重处理并始终在主线程上调用其回调避免这些线程陷阱.

例:

Query postsQuery = FirebaseDatabase.getInstance().getReference("posts");

ValueEventListener postListener = new ValueEventListener() {
  @Override
  public void onDataChange(DataSnapshot dataSnapshot) {
    // This is always called on the main thread
    // Get Post object and use the values to update the UI
    Post post = dataSnapshot.getValue(Post.class);
    // ...
  }

  @Override
  public void onCancelled(DatabaseError databaseError) {
    // Getting Post failed, log a message
    printError(databaseError.toException());
    // ...
  }
};

postsQuery.addValueEventListener(postListener);
Run Code Online (Sandbox Code Playgroud)

我面临的实际问题:

我正在尝试使用如下方法使用RxJava包装Firebase的查询侦听器:

private static Observable<DataSnapshot> queryObservable(final Query query) {
  return Observable.fromEmitter(emitter -> {
    // This is called on the Scheduler's thread defined with .subscribeOn()
    printThread("emitter");
    final ValueEventListener listener = new ValueEventListener() {
      @Override public void onDataChange(final DataSnapshot dataSnapshot) {
        // This is always called on the main thread
        printThread("onDataChange");
        emitter.onNext(dataSnapshot);
      }

      @Override public void onCancelled(final DatabaseError databaseError) {
        // This is called on the main thread too
        emitter.onError(databaseError.toException());
      }
    };

    query.addValueEventListener(listener);

    emitter.setCancellation(() -> query.removeEventListener(listener));
  }, Emitter.BackpressureMode.BUFFER);
}
Run Code Online (Sandbox Code Playgroud)

但是因为Observable从Firebase的回调内部发出项目(在主线程上调用).subscribeOn(),所以将忽略任何其他运算符.

例如,调用上面的方法如下:

Query postsQuery = FirebaseDatabase.getInstance().getReference("posts");

queryObservable(postsQuery).doOnSubscribe(() -> printThread("onSubscribe"))
    .subscribeOn(Schedulers.io())
    .subscribe(dataSnapshot -> printThread("onNext"));
Run Code Online (Sandbox Code Playgroud)

会打印以下内容:

onSubscribe Thread: RxIoScheduler-2
emitter Thread: RxIoScheduler-2
onDataChange Thread: main
onNext Thread: main
Run Code Online (Sandbox Code Playgroud)

根据我的理解,当Firebase的SDK调用onDataChange()回调并从其自己的内部后台线程切换到主线程时,它还会使Observable在主线程上发出新项目,使得任何.subscribeOn()操作员都无法使用流.

实际问题:

我能做的不仅是将这样的侦听器正确地包装到Observable中,还使它们符合定义的调度程序.subscribeOn()吗?

谢谢!

更新:

我知道.observeOn()让我能够处理Firebase在另一个线程上返回的数据.这就是我现在所做的,但这不是这个问题的重点.重点是:当我通过调度程序时,.subscribeOn()我希望上游符合该调度程序的线程,但是当Observable有一个内部侦听器从另一个线程的回调触发时,就不会发生这种情况.当这种情况发生时,我就失去了.subscribeOn()保证.

这个问题的严重性一开始可能看起来不太明显,但如果Observable是图书馆的一部分呢?那里最好的做法是什么?库应该强制其客户端.observeOn()在调用该方法后始终调用吗?库是否应该.observeOn()自行调用并将其称为"默认调度程序"?在任何这些情况下,这.subscribeOn()都是无用的,这对我来说似乎不对.

Fra*_*cia 0

observeOn只需在 IO 和主线程中使用subscribeOn,这样您就可以管理在主线程中收到的信息并将 firebase 工作移动到不同的Thread.

请记住将 rxAndroid 导入到您的 gradle(Rxjava 或 RxJava 2):

 compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
Run Code Online (Sandbox Code Playgroud)

还建议您检查以下库之一作为参考(或仅使用它):

RxJava: https: //github.com/nmoskalenko/RxFirebase

RxJava 2.0:https://github.com/FrangSierra/Rx2Firebase

其中一个与 RxJava 一起使用,另一个与 RxJava 2.0 的新 RC 一起使用。如果您对此感兴趣,您可以 在这里查看两者之间的差异。