我正在尝试使用 Kotlin Flows 创建一个移动数据窗口。它可以在 RxKotlin 中使用缓冲区来实现,但缓冲区与使用 Flows 不同。
RxKotlin 有一个buffer运算符,定期将 Observable 发出的项目收集到包中并发出这些包,而不是一次发出一个项目 - buffer(count,skip)
Kotlin Flow 有一个buffer但只是在单独的协程中运行收集器 -缓冲区
Flows 中是否有现有的运算符可以实现此目的?
我在我的Android应用程序中使用rxAndroid和rxKotlin来异步处理网络请求.现在我想在点击Snackbar按钮后重试失败的网络请求.
我的代码现在:
val citiesService = ApiFactory.citiesService
citiesService.cities()
.subscribeOn(Schedulers.newThread()) // fetch List<String>
.flatMap { Observable.from(it) } // convert to sequence of String
.flatMap { city ->
citiesService.coordinates(city) // fetch DoubleArray
.map { City(city, it) } // convert to City(String, DoubleArray)
}
.toList()
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
listView.setOnItemClickListener { adapterView, view, position, id ->
onItemClick(it[position])
}
}
.map { it.map { it.getName(activity) } }
.subscribe(
{ listAdapter = setupAdapter(it) },
{ showErrorSnackbar() } // handle error
)
fun showErrorSnackbar() {
Snackbar.make(listView, getString(R.string.not_available_msg), Snackbar.LENGTH_INDEFINITE) …Run Code Online (Sandbox Code Playgroud) 在连接到蓝牙设备的应用程序中,我使用以下函数使用RxKotlin:
private fun startBluetoothPair(device: BluetoothDevice) {
Observable.just(device)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map {
var uuid: UUID = BLUETOOTH_UUID
var socket = it.createRfcommSocketToServiceRecord(uuid)
socket.connect()
return socket
}
.subscribe {
// Do something with the BluetoothSocket
}
}
Run Code Online (Sandbox Code Playgroud)
这个函数应该只是在背景上与蓝牙设备连接,然后对套接字做一些事情(再次在主线程中).但是,map无法处理的return socket部分,告诉我有一个Type mismatch,它找到了BluetoothSocket它需要的地方Unit.
这里出了什么问题?我认为地图应该能够推断出返回类型.
我一直在研究Reactive Programming with RxJava一书中的例子,它针对版本 1 而不是 2。无限流的介绍有以下示例(并注意有更好的方法来处理并发):
Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
Runnabler = () -> {
BigInteger i = ZERO;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i);
i = i.add(ONE);
}
};
new Thread(r).start();
});
...
Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();
Run Code Online (Sandbox Code Playgroud)
但是,在 RxJava 2 中,传递给create()方法的 lambda 表达式是类型的,ObservableEmitter并且它没有isUnsubscribed()方法。我查看了2.0 中的不同之处,还搜索了存储库,但找不到任何此类方法。
如何在 2.0 中实现相同的功能?
编辑以包含如下给出的解决方案(nb 使用 kotlin):
val naturalNumbers = Observable.create<BigInteger> { emitter ->
Thread({
var int: …Run Code Online (Sandbox Code Playgroud) 我有一个对象QuickSort,我试图创建2个实例.当我尝试创建2个单独的实例时,我可以看到它只使用一个实例,因为我在QuickSort类中有一个不准确的计数.Kotlin 在语法中没有使用new,那么我该怎么做呢?
object QuickSort {
var count = 0;
quickSortOne(...){
...
count++
...
}
quickSortTwo(...){
...
count++
...
}
}
Run Code Online (Sandbox Code Playgroud)
以下是我尝试创建2个实例的方法.我的目标是让quickSort1和quickSort2成为2个独立的实例.
var quickSort1 = QuickSort
quickSort1.quickSortOne(...)
var quickSort2 = QuickSort
quickSort2.quickSortTwo(...)
Run Code Online (Sandbox Code Playgroud)
尝试的解决方案:将QuickSort从对象转换为类.这仍然导致使用相同的实例,如第二种方法的计数所示,包括第一次调用计数.
class QuickSort {
var count = 0;
quickSortOne(...){
...
count++
...
}
quickSortTwo(...){
...
count++
...
}
}
Run Code Online (Sandbox Code Playgroud)
...
var quickSortFirst = QuickSort()
printTest(quickSortFirst.quickSortFirst(arrayList, 0, arrayList.size - 1))
var quickSortLast = QuickSort()
printTest(quickSortLast.quickSortLast(arrayList, 0, arrayList.size …Run Code Online (Sandbox Code Playgroud) Observable.just(1)
.flatMap(object : Function<Int, Observable<Int>> {
override fun apply(integer: Int): Observable<Int> {
return Observable.just(integer * 10)
}
})
.flatMap(object : Function<Int, Observable<Int>> {
override fun apply(integer: Int): Observable<Int> {
return Observable.just(integer * 20)
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Observer<Int> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: Int) {
Log.d("result", "" + t)
}
override fun onError(e: Throwable) {
e.printStackTrace()
}
})
Run Code Online (Sandbox Code Playgroud) 我是使用 Kotlin 进行 Android 开发的新手,我正在努力寻找任何有用的文档,了解如何使用当前最佳实践创建简单的 GET 和 POST 请求。我来自 Angular 开发,我们使用 RxJS 进行响应式开发。
通常我会创建一个服务文件来保存我的所有请求函数,然后我会在任何组件中使用此服务并订阅可观察的。
在 Android 中你会如何做到这一点?有没有一个好的开始示例来说明必须创建的东西?从第一眼看上去,一切都显得如此复杂和过度设计
我是 rxjava/rxkotlin/rxandroid 的初学者。
我需要按顺序处理三个不同的异步调用。问题是第一步返回 a Single<LocationResult>,第二步返回 a Completable,第三步返回 a Completable。
(单个 -> 可完成 -> 可完成)
现在的问题是最后一个Completable取决于第一个的数据Single
我当前的解决方案:
我认为这是一个糟糕的解决方案,但我不知道如何正确地做到这一点。
val ft = FenceTransaction(applicationContext, apiClient)
stream
.flatMap { locationResult ->
ft.removeAll()
return@flatMap ft.commit().toSingle({ return@toSingle locationResult })
}
.flatMapCompletable {
ft.recycle()
ft.setZone(it.location.longitude, it.location.latitude, ZONE_RADIUS)
val dots = DotFilter().getFilteredDots()
for (dot in dots) {
ft.addDot(dot)
}
return@flatMapCompletable ft.commit()
}
.subscribeBy(
onComplete = {
"transaction complete".logi(this)
},
onError = {
"transaction error".logi(this)
})
Run Code Online (Sandbox Code Playgroud)
这种方法是正确的方法吗?
我应该如何处置Completeables?一般我应该什么时候处置Observables?
我正在使用RxJava/Kotlin和Room and Retrofit.我确信我刚开始学习RxJava时没有写东西.方案是我打电话检查数据库中是否有喜欢的记录并将它们放入List中,从API获取数据并将其插入数据库,使用上一个收藏夹列表更新数据库并获取所有记录,现在更新,列表.我得到了我的片段中的结果,但每次我得到它就好像我得到1个不太喜欢的项目,直到我得到没有喜欢的项目.
Repository
fun getKafaniFromApi(): Observable<List<Kafana>> {
return apiService.getKafani().toObservable().doOnNext {
insertKafaniInDb(it)
}
}
fun getKafaniFromDb(): Observable<List<Kafana>> {
return kafanaDao.getKafani().toObservable()
}
fun insertKafaniInDb(kafani: List<Kafana>) {
Observable.fromCallable { kafanaDao.insertAll(kafani) }
.subscribeOn(Schedulers.io())
.subscribe {
Timber.d("Inserted ${kafani.size} kafani from API in DB...")
}
}
fun getFavoriteKafani(): Single<List<Kafana>> {
return kafanaDao.getFavoriteKafani()
}
fun setKafanaFavorite(kafana: Kafana, isFavorite: Int) {
return kafanaDao.setFavourite(kafana.name, isFavorite)
}
fun updateFavoriteKafana(kafana: Kafana) {
return kafanaDao.updateFavoriteKafana(kafana)
}
Run Code Online (Sandbox Code Playgroud)
在我的 viewmodel
fun get(): Observable<List<Kafana>> {
return kafanaRepository.getFavoriteKafani()
.toObservable()
.doOnNext { kafaniList = it } …Run Code Online (Sandbox Code Playgroud) 我有我无法解决的问题。我试图使用Kotlin将.zip(List,)多个Singles合并为一个,而我提供的Function都不适合作为第二个参数。
fun getUserFriendsLocationsInBuckets(token: String) {
roomDatabase.userFriendsDao().getUserFriendsDtosForToken(token).subscribe(
{ userFriends: List<UserFriendDTO> ->
Single.zip(getLocationSingleForEveryUser(userFriends),
Function<Array<List<Location>>, List<Location>> { t: Array<List<Location>> -> listOf<Location>() })
},
{ error: Throwable -> }
)
}
private fun getLocationSingleForEveryUser(userFriends: List<UserFriendDTO>): List<Single<List<Location>>> =
userFriends.map { serverRepository.locationEndpoint.getBucketedUserLocationsInLast24H(it.userFriendId) }
Run Code Online (Sandbox Code Playgroud)
rx-kotlin ×10
android ×6
kotlin ×4
rx-java ×4
rx-java2 ×3
reactivex ×2
retrofit2 ×2
rx-android ×2
android-room ×1
flatmap ×1
lambda ×1
rx-kotlin2 ×1