我正在尝试从 RxJava 切换到 Kotlin Flow。流量真的很震撼。但是现在在 kotlin Flow 中是否有类似于 RxJava 的“GroupBy”的运算符?
我们知道LiveData 具有生命周期感知能力,如果配置发生更改,LiveData 对象不会每次都从数据库(本地/远程)重新查询,并且仅当数据有任何更新时才会更新。
最近我开始使用Kotlin Flow,我应该承认它最适合数据层,即在存储库中实现,以便通知 ViewModel。但我也在 ViewModel/View 层中使用了Kotlin Flow,以便直接根据其状态collect(密封类实现)来确定Fragment 中的 Flow 对象。我在使用 Flow 时遇到的问题是每次配置更改时都会从数据库(本地/远程)检索数据。
这种情况应该怎么办?有没有办法在使用 Flow 时避免重新查询,或者我应该只在 ViewModel/View 层中使用 LiveData?
示例代码
sealed class Status<T> {
class Processing<T> : Status<T>()
data class Completed<T>(val value: T) : Status<T>()
data class Error<T>(val error: String) : Status<T>()
companion object {
fun <T> processing() = Processing<T>()
fun <T> completed(value: T) = Completed(value)
fun <T> error(error: String) = Error<T>(error)
}
}
Run Code Online (Sandbox Code Playgroud)
回购协议:
class Repo(database: LocalDatabase){
fun retrieveUsersData() …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 来存储价值DataStore。
class BasicDataStore(context: Context) :
PrefsDataStore(
context,
PREF_FILE_BASIC
),
BasicImpl {
override val serviceRunning: Flow<Boolean>
get() = dataStore.data.map { preferences ->
preferences[SERVICE_RUNNING_KEY] ?: false
}
override suspend fun setServiceRunningToStore(serviceRunning: Boolean) {
dataStore.edit { preferences ->
preferences[SERVICE_RUNNING_KEY] = serviceRunning
}
}
companion object {
private const val PREF_FILE_BASIC = "basic_preference"
private val SERVICE_RUNNING_KEY = booleanPreferencesKey("service_running")
}
}
@Singleton
interface BasicImpl {
val serviceRunning: Flow<Boolean>
suspend fun setServiceRunningToStore(serviceRunning: Boolean)
}
Run Code Online (Sandbox Code Playgroud)
在 a 中Service,尝试监视该值,以下是相应的代码:
private fun monitorNotificationService() { …Run Code Online (Sandbox Code Playgroud) 我可以在 kotlin flow 中使用 async{} 吗?
场景:API 调用后,我得到了需要解析的 200 个对象的列表(转换为 UIObject)。我正在尝试并行处理这个列表。下面是伪代码:
fun getUIObjectListFlow():Flow<List<UIObject>> {
flow<List<UIObject>> {
while (stream.hasNext()) {
val data = stream.getData() // reading from an input stream. Data comes in chunk
val firstHalfDeffered = async(Dispatchers.IO) { /* process first half of the list that data obj contains*/ }
val secondHalfDeffered = async(Dispatchers.IO) { /*process second half of the list that data obj contains */ }
val processedList = firstHalfDeffered.await() + secondHalfDeffered.await() // just pseudo code
emit(processedList)
} …Run Code Online (Sandbox Code Playgroud) 我想为 Kotlin Flows 的永久循环和发出结果制定一个很好的逻辑。用例是,每 n 分钟我需要更新应用程序中的配置,并且此配置来自其余 api。
我认为一个不错的解决方案是运行一个“调度程序”,在后台每 n 分钟轮询一次 api,并且ConfigService订阅该调度程序的调度程序可以在调度程序发出新值时更新它自己的状态。
使用 RxJava 这将是
Observable.interval(n, TimeUnit.MINUTES)
.flatMap( ... )
Run Code Online (Sandbox Code Playgroud)
但由于我使用 Kotlin,我认为我可以使用原生 Flow 库实现相同的逻辑。那会是什么样子?我试图用谷歌搜索,要么没有找到正确的关键字,要么之前没有人遇到过同样的问题?
我有以下方法:
operator fun invoke(query: String): Flow<MutableList<JobDomainModel>> = flow {
val jobDomainModelList = mutableListOf<JobDomainModel>()
jobListingRepository.searchJobs(sanitizeSearchQuery(query))
.collect { jobEntityList: List<JobEntity> ->
for (jobEntity in jobEntityList) {
categoriesRepository.getCategoryById(jobEntity.categoryId)
.collect { categoryEntity ->
if (categoryEntity.categoryId == jobEntity.categoryId) {
jobDomainModelList.add(jobEntity.toDomainModel(categoryEntity))
}
}
}
emit(jobDomainModelList)
}
}
Run Code Online (Sandbox Code Playgroud)
它在存储库中搜索,调用search返回Flow<List<JobEntity>>. 然后,对于流程中的每个JobEntity任务,我需要从数据库中获取该作业所属的类别。一旦有了该类别和作业,我就可以将该作业转换为域模型对象 ( JobDomainModel) 并将其添加到列表中,该列表将作为方法的返回对象在流中返回。
我遇到的问题是没有任何东西被发射。我不确定在 Kotlin 中处理流程时是否遗漏了某些内容,但我没有通过 ID ( categoriesRepository.getCategoryById(jobEntity.categoryId)) 获取类别,然后它就可以正常工作并发出列表。
预先非常感谢!
我有一个 pagingData 流对象,我想将其与不同的融合位置流组合起来,以便它将根据 pagingdata 列表的每个项目进行相应的处理。
val point : Flow<Point>
val pagingDate : Flow<PagingData>
Run Code Online (Sandbox Code Playgroud)
我尝试使用组合和组合变换,但它似乎不起作用,因为当更新应用程序崩溃时,并显示与 pagingData 3 相关的错误消息不能发出两次
java.lang.IllegalStateException: 尝试从pageEventFlow收集两次,这是非法操作。您是否忘记调用 Flow<PagingData<*>>.cachedIn(coroutineScope) ?
我可以选择哪些选项来使用流数据转换分页数据项?
我正在尝试在 CoroutineWorker(WorkManager) 中使用 Flow,并且该流应该侦听存储库中的值 5 秒,如果您在该时间范围内获得该值,则返回 Result.success() 然后忽略/取消计时器,如果时间过去了,则返回 Result.failure()
现在我有类似的事情,我正在尝试将超时纳入其中。
repository.getListeningValue.onEach {
//doStuff here with the result
}.map{
Result.success()
}.first()
Run Code Online (Sandbox Code Playgroud) android kotlin android-workmanager kotlin-coroutines kotlin-flow
我正在开发一个 Android 项目,其中MVVM使用了该架构。有一个用例,Flow我的aRepository需要根据回调的结果进行更新,当我的数据源中的某些内容发生更改时会触发该回调。对于这个问题最合适的选择似乎是callbackFlow. 然而,它仍处于实验阶段。我很想使用callbackFlow,但是,我知道任何未来的更改都可能会破坏某些代码,并且我不希望在生产应用程序中发生这种情况。考虑到上述情况,应该用什么来替代callbackFlow?或者我应该考虑继续吗callbackFlow?
我的以下代码似乎阻塞了主线程,即使该流程是在 IO 协程上调用的。我是一个 kotlin 和 flow 菜鸟。我在这里做错了什么导致主线程阻塞?
存储库:
fun observeData(): Flow<Data> {
return flow {
//third party api is getting data from a ContentProvider
ThirdPartyApi.getData().map { convertFromExternalModelToDataModel(it) }
.collect {
emit(it)
}
}
}
Run Code Online (Sandbox Code Playgroud)
视图模型:
fun updateUI() {
scope.launch(Dispatchers.IO) {
repository.observerData().collect {
withContext(Dispatchers.Main) {
textView.text = data.name
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
运行以下代码后,我看到来自 Android Choreographer 的日志“跳过 200 帧。应用程序在主线程上工作太多”