如何在 RxJava 中显式地发出 Flowable 完成的信号?

Ada*_*old 5 java kotlin rx-java2

我正在尝试创建Flowable一个包装Iterable. 我Iterable定期将元素推送到我的元素,但似乎完成事件是隐式的。我不知道如何表示处理已完成。例如在我的代码中:

    // note that this code is written in Kotlin
    val iterable = LinkedBlockingQueue<Int>()
    iterable.addAll(listOf(1, 2, 3))

    val flowable = Flowable.fromIterable(iterable)
            .subscribeOn(Schedulers.computation())
            .observeOn(Schedulers.computation())

    flowable.subscribe(::println, {it.printStackTrace()}, {println("completed")})

    iterable.add(4)

    Thread.sleep(1000)

    iterable.add(5)

    Thread.sleep(1000)
Run Code Online (Sandbox Code Playgroud)

这打印:

1 2 3 4 已完成

我检查了接口的来源Flowable,但似乎我无法Flowable明确表示 a 已完成。我怎样才能这样做呢?在我的程序中,我发布的事件之间有一些延迟,我想明确何时到达complete事件流。

澄清:我有一个长时间运行的进程,它会发出事件。我将它们收集在一个队列中,并公开一个方法,该方法返回一个围绕我的队列的 Flowable。问题是,当我创建 Flowable 时,队列中可能已经有元素了。我只会处理事件一次,并且我知道事件流何时停止,因此我知道何时需要完成 Flowable。

Mag*_*nus 4

使用.fromIterable是为您的用例创建错误的方法Flowable
我实际上不清楚该用例是什么,但您可能想使用Flowable.create()PublishSubject

val flowable = Flowable.create<Int>( {
    it.onNext(1)
    it.onNext(2)
    it.onComplete()
}, BackpressureStrategy.MISSING)

val publishSubject = PublishSubject.create<Int>()
val flowableFromSubject = publishSubject.toFlowable(BackpressureStrategy.MISSING)
//This data will be dropepd unless something is subscribed to the flowable.
publishSubject.onNext(1)
publishSubject.onNext(2)
publishSubject.onComplete()
Run Code Online (Sandbox Code Playgroud)

当然,如何处理背压取决于数据源的性质。