使用RxJava和EventBus将事件发送回活动/片段

ant*_*009 8 android reactive-programming event-bus kotlin rx-java

Android Studio 3.2 Canary 8
com.squareup:otto:1.3.8
io.reactivex:rxjava:1.3.7
kotlin 1.2.31
Run Code Online (Sandbox Code Playgroud)

我正在尝试使用otto EventBus将事件发送回我的活动.

但是,我正在使用RxJava执行一些后台工作,并且需要在第一个完成后发送事件.然而,在发布事件后.活动永远不会收到它.

此事件必须在主线程上执行此操作.RxJava位于IO线程上.我不确定最好的方法是什么:

这是我执行RxJava和EventBus帖子的交互器的代码

class Interactors(private val eventBus: Bus) {
    fun transmitMessage(): Completable {
        return insertTransmission()
                .andThen(onTransmissionChanged()) /* Send event to the activity */
                .andThen(requestTransmission())
    }

    private fun insertTransmission(): Completable {
        return Completable.fromCallable {
            Thread.sleep(4000)
            System.out.println("insertTransmission doing some long operation")
        }
    }

    private fun requestTransmission(): Completable {
        return Completable.fromCallable {
            Thread.sleep(2000)
            System.out.println("requestTransmission doing some long operation")
        }
    }

    /* Need to send this event back to the activity/fragment */
    private fun onTransmissionChanged(): Completable {
        return Completable.fromCallable {
            System.out.println("onTransmissionChanged send event to activity")
            eventBus.post(TransmissionChanged())
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

活动:

public class HomeActivity extends AppCompatActivity {
    private Bus eventBus = new Bus();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_home);

        eventBus.register(this);

        new Interactors(eventBus).transmitMessage()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();
    }

    @Override
    protected void onDestroy() {
        eventBus.unregister(this);
        super.onDestroy();
    }

    @Subscribe
    public void onTransmissionChangedEvent(TransmissionChanged transmissionChanged) {
        System.out.println("onTransmissionChangedEvent");
    }
}
Run Code Online (Sandbox Code Playgroud)

和EventBus类:

class TransmissionChanged
Run Code Online (Sandbox Code Playgroud)

这是我运行应用程序时的输出:

insertTransmission doing some long operation
onTransmissionChanged
Run Code Online (Sandbox Code Playgroud)

我不确定eventBus.post(..)是否阻塞.实际上这应该在主线程中完成,因为回发到Activity以在UI中执行一些更新.

iag*_*een 1

不允许接收者指定它想要在哪个线程上接收事件是 Otto 的一个缺点。它强制所有调用都必须在同一线程上(默认为主线程)。由调用者决定是否处于正确的线程上。我更喜欢EventBusGreenRobot。您可以使用注释更改要接收的线程。因此,我的第一个建议是,如果您还没有对 Otto 投入太多,请考虑使用它EventBus

如果您无法重新编写所有事件总线代码,您可以post通过分配Handler. 它既快速又简单,但感觉有点像走出 rx 框架。

private fun onTransmissionChanged(): Completable {
    return Completable.fromCallable {
        System.out.println("onTransmissionChanged send event to activity")
        Handler(Looper.getMainLooper()).post {
            eventBus.post(TransmissionChanged())
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

如果您经常调用此函数,您可能需要缓存Handler并将其传递到Interactors构造函数中。

如果您想坚持使用 RxJava 调度程序,您可以将 a 传递Scheduler到构造函数中来指示您想要在哪里进行后台工作,而不是使用subscribeOn. 在 中transmitMessage,使用它安排后台操作,同时强制eventBus.post进入主线程,如下所示 -

class Interactors(private val eventBus: Bus, private val scheduler: Scheduler) {
    fun transmitMessage(): Completable {
        return insertTransmission()
                .subscribeOn(scheduler)
                .observeOn(AndroidSchedulers.mainThread())
                .andThen(onTransmissionChanged()) /* Send event to the activity */
                .observeOn(scheduler)
                .andThen(requestTransmission())
    }
    // Rest of the class is unchanged

}
Run Code Online (Sandbox Code Playgroud)

HomeActivity在这种情况下,您将按如下方式使用它——

new Interactors(eventBus, Schedulers.io()).transmitMessage()
              .observeOn(AndroidSchedulers.mainThread())
              .subscribe();
Run Code Online (Sandbox Code Playgroud)