如何将项目发送到Kotlin.Flow(如Behaviorsubject)

Joa*_*Ley 7 android kotlin kotlin-coroutines kotlinx.coroutines.channels kotlinx.coroutines.flow

我想知道如何将项目发送/发送到Kotlin.Flow,所以我的用例是:

在消费者/ ViewModel / Presenter中,我可以使用以下 功能进行订阅collect

fun observe() {
 coroutineScope.launch {
    // 1. Send event
    reopsitory.observe().collect {
      println(it)
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

但是问题就在Repository侧面,对于RxJava,我们可以使用BehaviorBehavior主题将其公开为Observable/Flowable并发出类似以下的新项:

behaviourSubject.onNext(true)
Run Code Online (Sandbox Code Playgroud)

但是,每当我建立新流程时:

flow {

}
Run Code Online (Sandbox Code Playgroud)

我只能收集。如何将值发送到流?

har*_*eri 15

看看MutableStateFlow文档,因为它ConflatedBroadcastChannel是即将被弃用的替代品,很快。

要获得更好的上下文,请查看关于 Github 上 Kotlin 存储库原始问题整个讨论

  • 仍在@ExperimentalCoroutinesApi - 一旦它稳定,我将更新已接受的答案。 (3认同)
  • @JoaquimLey,协程“1.4.0”版本中的“StateFlow”[已变得稳定](https://github.com/Kotlin/kotlinx.coroutines/blob/c35ce7e509ff284f4b33ed20a0026e5149ca17a9/CHANGES.md#version-140)。 (2认同)

Joa*_*Ley 11

如果要获取订阅/收藏的最新值,则应使用ConflatedBroadcastChannel

private val channel = ConflatedBroadcastChannel<Boolean>()
Run Code Online (Sandbox Code Playgroud)

这将复制BehaviourSubject,以将通道公开为Flow:

// Repository
fun observe() {
  return channel.asFlow()
}

Run Code Online (Sandbox Code Playgroud)

现在将事件/值Flow发送到该公开的简单发送到此通道。

// Repository
fun someLogicalOp() {
  channel.send(false) // This gets sent to the ViewModel/Presenter and printed.
}
Run Code Online (Sandbox Code Playgroud)

安慰:

如果您希望仅开始收集接收值则应改用a BroadcastChannel

明确说明:

表现为Rx PublishedSubject

private val channel = BroadcastChannel<Boolean>(1)

fun broadcastChannelTest() {
  // 1. Send event
  channel.send(true)

  // 2. Start collecting
  channel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  channel.send(false)
}

Run Code Online (Sandbox Code Playgroud)

false在发送第一个事件之前 被打印collect { }


表现为Rx BehaviourSubject

private val confChannel = ConflatedBroadcastChannel<Boolean>()

fun conflatedBroadcastChannelTest() {
  // 1. Send event
  confChannel.send(true)

  // 2. Start collecting
  confChannel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  confChannel.send(false)
}
Run Code Online (Sandbox Code Playgroud)

真正

这两个事件都将被打印,您将始终获得最新的值(如果存在)。

此外,还要提及Kotlin的团队开发DataFlow(名称待定):

似乎更适合此用例(因为它将是冷流)。

  • 就在最近,[StateFlow](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-state-flow/index.html)(这是一个“DataFlow”)答案)是在[协程版本1.3.6](https://github.com/Kotlin/kotlinx.coroutines/releases/tag/1.3.6)中引入的,所以也许可以尝试一下 (4认同)

flo*_*hot 7

更新

Kotlin Coroutines1.4.0现在可用于MutableSharedFlow,它取代了Channel. MutableSharedFlow清理也是内置的,所以你不需要手动打开和关闭它,不像Channel. MutableSharedFlow如果您需要类似主题的 api,请使用Flow

原答案

由于您的问题有android标签,我将添加一个 Android 实现,允许您轻松创建处理其自身生命周期的aBehaviorSubject或 a PublishSubject

这在 Android 中很重要,因为您不想忘记关闭通道并泄漏内存。此实现通过将反应流与 Fragment/Activity 的创建和销毁联系起来,避免了显式“处置”反应流的需要。相似LiveData

interface EventReceiver<Message> {
    val eventFlow: Flow<Message>
}

interface EventSender<Message> {
    fun postEvent(message: Message)
    val initialMessage: Message?
}

class LifecycleEventSender<Message>(
    lifecycle: Lifecycle,
    private val coroutineScope: CoroutineScope,
    private val channel: BroadcastChannel<Message>,
    override val initialMessage: Message?
) : EventSender<Message>, LifecycleObserver {

    init {
        lifecycle.addObserver(this)
    }

    override fun postEvent(message: Message) {
        if (!channel.isClosedForSend) {
            coroutineScope.launch { channel.send(message) }
        } else {
            Log.e("LifecycleEventSender","Channel is closed. Cannot send message: $message")
        }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
    fun create() {
        channel.openSubscription()
        initialMessage?.let { postEvent(it) }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun destroy() {
        channel.close()
    }
}

class ChannelEventReceiver<Message>(channel: BroadcastChannel<Message>) :
    EventReceiver<Message> {
    override val eventFlow: Flow<Message> = channel.asFlow()
}

abstract class EventRelay<Message>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    channel: BroadcastChannel<Message>,
    initialMessage: Message? = null
) : EventReceiver<Message> by ChannelEventReceiver<Message>(channel),
    EventSender<Message> by LifecycleEventSender<Message>(
        lifecycle,
        coroutineScope,
        channel,
        initialMessage
    )

Run Code Online (Sandbox Code Playgroud)

通过使用Lifecycle来自 Android的库,我现在可以创建一个BehaviorSubject在活动/片段被销毁后自行清理的

class BehaviorSubject<String>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    initialMessage = "Initial Message"
) : EventRelay<String>(
    lifecycle,
    coroutineScope,
    ConflatedBroadcastChannel(),
    initialMessage
)
Run Code Online (Sandbox Code Playgroud)

或者我可以PublishSubject通过使用缓冲BroadcastChannel

class PublishSubject<String>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    initialMessage = "Initial Message"
) : EventRelay<String>(
    lifecycle,
    coroutineScope,
    BroadcastChannel(Channel.BUFFERED),
    initialMessage
)
Run Code Online (Sandbox Code Playgroud)

现在我可以做这样的事情

class MyActivity: Activity() {

    val behaviorSubject = BehaviorSubject(
        this@MyActivity.lifecycle,
        this@MyActivity.lifecycleScope
    )

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        if (savedInstanceState == null) { 

            behaviorSubject.eventFlow
                .onEach { stringEvent ->
                    Log.d("BehaviorSubjectFlow", stringEvent)
                    // "BehaviorSubjectFlow: Initial Message"
                    // "BehaviorSubjectFlow: Next Message"
                }
                .flowOn(Dispatchers.Main)
                .launchIn(this@MyActivity.lifecycleScope)

        }
    }

    override fun onResume() {
        super.onResume()

        behaviorSubject.postEvent("Next Message")
    }
}
Run Code Online (Sandbox Code Playgroud)