Joa*_*Ley 7 android kotlin kotlin-coroutines kotlinx.coroutines.channels kotlinx.coroutines.flow
我想知道如何将项目发送/发送到Kotlin.Flow
,所以我的用例是:
在消费者/ ViewModel / Presenter中,我可以使用以下 功能进行订阅collect
:
fun observe() {
coroutineScope.launch {
// 1. Send event
reopsitory.observe().collect {
println(it)
}
}
}
Run Code Online (Sandbox Code Playgroud)
但是问题就在Repository
侧面,对于RxJava,我们可以使用BehaviorBehavior主题将其公开为Observable/Flowable
并发出类似以下的新项:
behaviourSubject.onNext(true)
Run Code Online (Sandbox Code Playgroud)
但是,每当我建立新流程时:
flow {
}
Run Code Online (Sandbox Code Playgroud)
我只能收集。如何将值发送到流?
har*_*eri 15
看看MutableStateFlow文档,因为它ConflatedBroadcastChannel
是即将被弃用的替代品,很快。
要获得更好的上下文,请查看关于 Github 上 Kotlin 存储库原始问题的整个讨论。
Joa*_*Ley 11
如果要获取订阅/收藏的最新值,则应使用ConflatedBroadcastChannel:
private val channel = ConflatedBroadcastChannel<Boolean>()
Run Code Online (Sandbox Code Playgroud)
这将复制BehaviourSubject
,以将通道公开为Flow:
// Repository
fun observe() {
return channel.asFlow()
}
Run Code Online (Sandbox Code Playgroud)
现在将事件/值Flow
发送到该公开的简单发送到此通道。
// Repository
fun someLogicalOp() {
channel.send(false) // This gets sent to the ViewModel/Presenter and printed.
}
Run Code Online (Sandbox Code Playgroud)
安慰:
假
如果您希望仅在开始收集后接收值,则应改用a BroadcastChannel
。
表现为Rx PublishedSubject
private val channel = BroadcastChannel<Boolean>(1)
fun broadcastChannelTest() {
// 1. Send event
channel.send(true)
// 2. Start collecting
channel
.asFlow()
.collect {
println(it)
}
// 3. Send another event
channel.send(false)
}
Run Code Online (Sandbox Code Playgroud)
假
仅 false
在发送第一个事件之前 被打印collect { }
。
表现为Rx BehaviourSubject
private val confChannel = ConflatedBroadcastChannel<Boolean>()
fun conflatedBroadcastChannelTest() {
// 1. Send event
confChannel.send(true)
// 2. Start collecting
confChannel
.asFlow()
.collect {
println(it)
}
// 3. Send another event
confChannel.send(false)
}
Run Code Online (Sandbox Code Playgroud)
真正
假
这两个事件都将被打印,您将始终获得最新的值(如果存在)。
此外,还要提及Kotlin的团队开发DataFlow
(名称待定):
似乎更适合此用例(因为它将是冷流)。
更新:
Kotlin Coroutines1.4.0
现在可用于MutableSharedFlow
,它取代了Channel
. MutableSharedFlow
清理也是内置的,所以你不需要手动打开和关闭它,不像Channel
. MutableSharedFlow
如果您需要类似主题的 api,请使用Flow
原答案
由于您的问题有android
标签,我将添加一个 Android 实现,允许您轻松创建处理其自身生命周期的aBehaviorSubject
或 a PublishSubject
。
这在 Android 中很重要,因为您不想忘记关闭通道并泄漏内存。此实现通过将反应流与 Fragment/Activity 的创建和销毁联系起来,避免了显式“处置”反应流的需要。相似LiveData
interface EventReceiver<Message> {
val eventFlow: Flow<Message>
}
interface EventSender<Message> {
fun postEvent(message: Message)
val initialMessage: Message?
}
class LifecycleEventSender<Message>(
lifecycle: Lifecycle,
private val coroutineScope: CoroutineScope,
private val channel: BroadcastChannel<Message>,
override val initialMessage: Message?
) : EventSender<Message>, LifecycleObserver {
init {
lifecycle.addObserver(this)
}
override fun postEvent(message: Message) {
if (!channel.isClosedForSend) {
coroutineScope.launch { channel.send(message) }
} else {
Log.e("LifecycleEventSender","Channel is closed. Cannot send message: $message")
}
}
@OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
fun create() {
channel.openSubscription()
initialMessage?.let { postEvent(it) }
}
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun destroy() {
channel.close()
}
}
class ChannelEventReceiver<Message>(channel: BroadcastChannel<Message>) :
EventReceiver<Message> {
override val eventFlow: Flow<Message> = channel.asFlow()
}
abstract class EventRelay<Message>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
channel: BroadcastChannel<Message>,
initialMessage: Message? = null
) : EventReceiver<Message> by ChannelEventReceiver<Message>(channel),
EventSender<Message> by LifecycleEventSender<Message>(
lifecycle,
coroutineScope,
channel,
initialMessage
)
Run Code Online (Sandbox Code Playgroud)
通过使用Lifecycle
来自 Android的库,我现在可以创建一个BehaviorSubject
在活动/片段被销毁后自行清理的
class BehaviorSubject<String>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
initialMessage = "Initial Message"
) : EventRelay<String>(
lifecycle,
coroutineScope,
ConflatedBroadcastChannel(),
initialMessage
)
Run Code Online (Sandbox Code Playgroud)
或者我可以PublishSubject
通过使用缓冲BroadcastChannel
class PublishSubject<String>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
initialMessage = "Initial Message"
) : EventRelay<String>(
lifecycle,
coroutineScope,
BroadcastChannel(Channel.BUFFERED),
initialMessage
)
Run Code Online (Sandbox Code Playgroud)
现在我可以做这样的事情
class MyActivity: Activity() {
val behaviorSubject = BehaviorSubject(
this@MyActivity.lifecycle,
this@MyActivity.lifecycleScope
)
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
if (savedInstanceState == null) {
behaviorSubject.eventFlow
.onEach { stringEvent ->
Log.d("BehaviorSubjectFlow", stringEvent)
// "BehaviorSubjectFlow: Initial Message"
// "BehaviorSubjectFlow: Next Message"
}
.flowOn(Dispatchers.Main)
.launchIn(this@MyActivity.lifecycleScope)
}
}
override fun onResume() {
super.onResume()
behaviorSubject.postEvent("Next Message")
}
}
Run Code Online (Sandbox Code Playgroud)