我有一个可能永远不会被调用的监听器。但是,如果它至少被调用一次,我有理由确信它会被调用很多次。我是 Flows 的粉丝,因此我将其包装在callbackFlow() 构建器中。为了防止永远等待,我想添加一个超时时间。我正在尝试构建流运算符,如果流的第一个元素发出时间太长,它将抛出某种超时。这是我所拥有的。
fun <T> Flow<T>.flowBeforeTimeout(ms: Long): Flow<T> = flow {
withTimeout(ms){
emit(first())
}
emitAll(this@flowBeforeTimeout.drop(1))
}
Run Code Online (Sandbox Code Playgroud)
效果还不错,这些 JUnit4 测试都通过了。还有更多通过的测试,但为了简洁起见我省略了它们。
@Test(expected = CancellationException::class)
fun `Throws on timeout`(): Unit = runBlocking {
val testFlow = flow {
delay(200)
emit(1)
}
testFlow.flowBeforeTimeout(100).toList()
}
Run Code Online (Sandbox Code Playgroud)
@Test
fun `No distortion`(): Unit = runBlocking {
val testList = listOf(1,2,3)
val resultList = testList
.asFlow()
.flowBeforeTimeout(100)
.toList()
assertThat(testList.size, `is`(resultList.size))
}
Run Code Online (Sandbox Code Playgroud)
然而,这个测试没有通过。
// Fails with: Expected: is <1> but: was <2>
@Test
fun `Starts only once`(): Unit = …Run Code Online (Sandbox Code Playgroud) 我正在尝试从 RxJava 切换到 Kotlin Flow。流量真的很震撼。但是现在在 kotlin Flow 中是否有类似于 RxJava 的“GroupBy”的运算符?
我们知道LiveData 具有生命周期感知能力,如果配置发生更改,LiveData 对象不会每次都从数据库(本地/远程)重新查询,并且仅当数据有任何更新时才会更新。
最近我开始使用Kotlin Flow,我应该承认它最适合数据层,即在存储库中实现,以便通知 ViewModel。但我也在 ViewModel/View 层中使用了Kotlin Flow,以便直接根据其状态collect(密封类实现)来确定Fragment 中的 Flow 对象。我在使用 Flow 时遇到的问题是每次配置更改时都会从数据库(本地/远程)检索数据。
这种情况应该怎么办?有没有办法在使用 Flow 时避免重新查询,或者我应该只在 ViewModel/View 层中使用 LiveData?
示例代码
sealed class Status<T> {
class Processing<T> : Status<T>()
data class Completed<T>(val value: T) : Status<T>()
data class Error<T>(val error: String) : Status<T>()
companion object {
fun <T> processing() = Processing<T>()
fun <T> completed(value: T) = Completed(value)
fun <T> error(error: String) = Error<T>(error)
}
}
Run Code Online (Sandbox Code Playgroud)
回购协议:
class Repo(database: LocalDatabase){
fun retrieveUsersData() …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 来存储价值DataStore。
class BasicDataStore(context: Context) :
PrefsDataStore(
context,
PREF_FILE_BASIC
),
BasicImpl {
override val serviceRunning: Flow<Boolean>
get() = dataStore.data.map { preferences ->
preferences[SERVICE_RUNNING_KEY] ?: false
}
override suspend fun setServiceRunningToStore(serviceRunning: Boolean) {
dataStore.edit { preferences ->
preferences[SERVICE_RUNNING_KEY] = serviceRunning
}
}
companion object {
private const val PREF_FILE_BASIC = "basic_preference"
private val SERVICE_RUNNING_KEY = booleanPreferencesKey("service_running")
}
}
@Singleton
interface BasicImpl {
val serviceRunning: Flow<Boolean>
suspend fun setServiceRunningToStore(serviceRunning: Boolean)
}
Run Code Online (Sandbox Code Playgroud)
在 a 中Service,尝试监视该值,以下是相应的代码:
private fun monitorNotificationService() { …Run Code Online (Sandbox Code Playgroud) 我可以在 kotlin flow 中使用 async{} 吗?
场景:API 调用后,我得到了需要解析的 200 个对象的列表(转换为 UIObject)。我正在尝试并行处理这个列表。下面是伪代码:
fun getUIObjectListFlow():Flow<List<UIObject>> {
flow<List<UIObject>> {
while (stream.hasNext()) {
val data = stream.getData() // reading from an input stream. Data comes in chunk
val firstHalfDeffered = async(Dispatchers.IO) { /* process first half of the list that data obj contains*/ }
val secondHalfDeffered = async(Dispatchers.IO) { /*process second half of the list that data obj contains */ }
val processedList = firstHalfDeffered.await() + secondHalfDeffered.await() // just pseudo code
emit(processedList)
} …Run Code Online (Sandbox Code Playgroud) 通过状态流我可以使用
val items by myViewModel.items.collectAsState()
Run Code Online (Sandbox Code Playgroud)
我想共享流不能以这种方式使用。共享流是否适用于 Compose?
android kotlin-coroutines android-jetpack-compose kotlin-flow
在我的 ViewModel 中,我正在发出 API 请求,并且正在使用 FragmentStateFlow并SharedFlow与 Fragment 进行通信。在发出 API 请求时,我可以轻松更新状态流的值,并将其成功收集到片段中。
但在发出请求之前,我发出了一些布尔值,SharedFlow并且它没有被收集在片段中。有人可以帮助我为什么会发生这种情况吗?
class MainViewModel: ViewModel() {
private val _stateFlow = MutableStateFlow(emptyList<Model>())
val stateFlow = _stateFlow.asStateFlow()
private val _loading = MutableSharedFlow<Boolean>()
val loading = _loading.asSharedFlow()
suspend fun request() {
_loading.emit(true)
withContext(Dispatchers.IO) {
/* makes API request */
/* updates _stateFlow.value */
/* stateFlow value is successfully collected */
}
_loading.emit(false) // emitting boolean value
}
}
Run Code Online (Sandbox Code Playgroud)
class MyFragment : Fragment(R.layout.fragment_my) {
// ...
override …Run Code Online (Sandbox Code Playgroud) android kotlin kotlin-coroutines kotlin-flow kotlin-sharedflow
Kotlin 流程中的 single() 和 first() 有什么区别?您能举例说明何时使用哪种变体吗?
最近,自 android 生命周期库版本 2.6.0-alpha01 以来,我们有了一个新的 API,即
\ncollectAsStateWithLifecycle(...)\nRun Code Online (Sandbox Code Playgroud)\n\n\n\n如果您\xe2\x80\x99 使用 Jetpack Compose 构建 Android 应用程序,请使用\n
\ncollectAsStateWithLifecycle可组合函数(而不是\ncollectAsState)
我尝试一下,对于流动(冷流)例如
\n val counter = flow {\n var value = 0\n while (true) {\n emit(value++)\n delay(1000)\n }\n }\nRun Code Online (Sandbox Code Playgroud)\n拥有它是有用的
\nflow.collectAsStateWithLifecycle(0)\nRun Code Online (Sandbox Code Playgroud)\n但是如果我们有一个像 mutableStateFlow 这样的热流
\nval stateFlow = MutableStateFlow(0)\nRun Code Online (Sandbox Code Playgroud)\n拥有似乎没什么用
\nstateFlow.collectAsStateWithLifecycle(0)\nRun Code Online (Sandbox Code Playgroud)\n鉴于它不会阻止任何排放。
\n我说collectAsStateWithLifecycle只对冷流有用,对热流无效,这样说对吗?
如果我错了,你能给我举一个collectAsStateWithLifecycle对热流也有用的例子吗?
android android-lifecycle kotlin kotlin-flow kotlin-stateflow
在审查用 kotlin 编写的一些代码时,有件事引起了我的注意。我在一些项目中查看领域层,在一些项目中,我看到挂起功能和 Flow 一起使用,而在一些项目中,我看到只使用 Flow。
例如暂停和流动在一起:
class FetchMovieDetailFlowUseCase @Inject constructor(
private val repository: MovieRepository
) : FlowUseCase<FetchMovieDetailFlowUseCase.Params, State<MovieDetailUiModel>>() {
data class Params(val id: Long)
override suspend fun execute(params: Params): Flow<State<MovieDetailUiModel>> =
repository.fetchMovieDetailFlow(params.id)
}
Run Code Online (Sandbox Code Playgroud)
只是流动
class GetCoinUseCase @Inject constructor(
private val repository: CoinRepository
){
operator fun invoke(coinId:String): Flow<Resource<CoinDetail>> = flow {
try {
emit(Resource.Loading())
emit(Resource.Success(coin))
}catch (e:HttpException){
emit(Resource.Error(e.localizedMessage ?: "An unexpected error occured"))
}catch (e:IOException){
emit(Resource.Error("Couldn't reach server. Check your internet connection."))
}
}
}
Run Code Online (Sandbox Code Playgroud)
只是暂停
class GetLatestNewsWithAuthorsUseCase( …Run Code Online (Sandbox Code Playgroud)