我之前使用RxJava和Kotlin遇到了一些问题.我做了一些有趣的发现,我仍然感到困惑.
Func1RxJava中有简单的界面
public interface Func1<T, R> extends Function {
R call(T t);
}
Run Code Online (Sandbox Code Playgroud)
我试图添加一个扩展方法Observable,也是一个RxJava类.这会将排放物收集到Google Guava中,ImmutableListMulitmap使用a Func1来映射每个项目的关键字.
fun <K,T> Observable<T>.toImmutableListMultimap(keyMapper: Func1<T, K>): Observable<ImmutableListMultimap<K,T>> {
return this.collect({ ImmutableListMultimap.builder<K,T>()},{ b, t -> b.put(keyMapper.call(t), t)}).map { it.build() }
}
Run Code Online (Sandbox Code Playgroud)
当我试图调用这个扩展方法时,我无法进行编译,而且根本不理解lambda表达式.
ScheduledItem.all.flatMap { it.rebuildSoftTransactions }
.toImmutableListMultimap { it.id /*compile error */ } .cache()
Run Code Online (Sandbox Code Playgroud)
但是,当我修改扩展方法以使用函数类型时,最奇怪的事情发生了.
fun <K,T> Observable<T>.toImmutableListMultimap(keyMapper: (T) -> K): Observable<ImmutableListMultimap<K,T>> {
return this.collect({ ImmutableListMultimap.builder<K,T>()},{ b, t -> b.put(keyMapper(t), t)}).map { it.build() }
}
Run Code Online (Sandbox Code Playgroud)
然后一切编译得很好.但这让我感到困惑:为什么它没有将lambda推断到界面上?当我使用标准map()方法时, …
我有我无法解决的问题。我试图使用Kotlin将.zip(List,)多个Singles合并为一个,而我提供的Function都不适合作为第二个参数。
fun getUserFriendsLocationsInBuckets(token: String) {
roomDatabase.userFriendsDao().getUserFriendsDtosForToken(token).subscribe(
{ userFriends: List<UserFriendDTO> ->
Single.zip(getLocationSingleForEveryUser(userFriends),
Function<Array<List<Location>>, List<Location>> { t: Array<List<Location>> -> listOf<Location>() })
},
{ error: Throwable -> }
)
}
private fun getLocationSingleForEveryUser(userFriends: List<UserFriendDTO>): List<Single<List<Location>>> =
userFriends.map { serverRepository.locationEndpoint.getBucketedUserLocationsInLast24H(it.userFriendId) }
Run Code Online (Sandbox Code Playgroud)
val mutableList1: MutableList<TeamInvitationData?>?
val mutableList2: MutableList<TeamInvitationData?>?
Run Code Online (Sandbox Code Playgroud)
addAll方法可用于合并可为空的可变列表,但是,这会引发编译时错误。
例:
val map1 = listOne?.map { TeamInvitationData(it) }
val map2 = listTwo?.map { TeamInvitationData(it) }
map1.addAll(map2)
Run Code Online (Sandbox Code Playgroud)
类型接口失败,请尝试明确指定类型参数。
在这里,我可以通过任何方式合并这两个数组,谢谢。
我创建了一个函数,它返回Observable<String>带有文件名的函数,但是我在订阅中没有得到任何我调用此方法的事件.也没有onError或onComplete的调用
请参阅我的代码:
fun getAllFiles(): Observable<String> {
val allFiles = File("/Users/stephan/Projects/Playground/kotlinfiles/")
.listFiles { file -> !file.isDirectory() }
return observable { subscriber ->
allFiles.toObservable()
.map { f -> "${f.name}" }
.doOnNext { println("Found file $it") }
.subscribe { subscriber}
}
}
fun test() {
getAllFiles()
.doOnNext { println("File name$it") }
.subscribe(
{n -> println("File: $n")},
{e -> println("Damn: $e")},
{println("Completed")})
}
Run Code Online (Sandbox Code Playgroud)
虽然在getAllFiles()函数中调用了所有内容,但我还缺少什么?
我知道这违反了很多 Rx 规则,但我真的很喜欢RxJava-JDBC,我的队友也是如此。关系数据库是我们工作的核心,Rx 也是如此。
然而,在某些情况下,我们不想发出Observable<ResultSet>,而是只想使用基于拉取的 Java 8Stream<ResultSet>或 Kotlin Sequence<ResultSet>。但我们非常习惯 RxJava-JDBC 库,它只返回一个Observable<ResultSet>.
因此,我想知道是否有一种方法可以使用扩展函数将 an 转换Observable<ResultSet>为 a Sequence<ResultSet>,而不进行任何中间收集或toBlocking()调用。下面是我到目前为止所拥有的一切,但我现在正在尝试连接基于推和拉的系统,但我无法缓冲,因为ResultSet每次onNext()调用都是有状态的。这是一个不可能完成的任务吗?
import rx.Observable
import rx.Subscriber
import java.sql.ResultSet
fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() {
private var isComplete = false
override fun onCompleted() {
isComplete = true
}
override fun onError(e: Throwable?) {
throw UnsupportedOperationException()
}
override fun onNext(rs: ResultSet?) {
throw UnsupportedOperationException()
}
override fun hasNext(): Boolean …Run Code Online (Sandbox Code Playgroud) rxjava2依赖和rxkotlin依赖之间有什么区别。如果我正在使用rxkotlin依赖项,我是否还需要添加rxjava2依赖项。
implementation 'io.reactivex.rxjava2:rxkotlin:x.y.z'
// do i need to add the below dependencies also?
implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
Run Code Online (Sandbox Code Playgroud) 如何使用运算符以便我始终获得先前值和当前值?如果可能的话,我想避免在管道外创建状态。
- time ->
1 2 3 4
| | | |
Operations
| | |
(1,2) (2,3) (3,4)
Run Code Online (Sandbox Code Playgroud)
请注意,除了第一个和最后一个值之外的每个值都必须出现两次,因此简单的缓冲区是行不通的。
我考虑过skip与mergeand结合buffer,但合并似乎并不能保证顺序。
val s = PublishSubject.create<Int>()
s.mergeWith(s.skip(1)).buffer(2).subscribe{i -> print(i)}
s.onNext(1)
s.onNext(2)
s.onNext(3)
s.onNext(4)
Run Code Online (Sandbox Code Playgroud)
输出:
[1, 2][2, 3][3, 4]
val o = Observable.just(1,2,3,4)
o.mergeWith(o.skip(1)).buffer(2).subscribe{i -> print(i)}
Run Code Online (Sandbox Code Playgroud)
输出:
[1, 2][3, 4][2, 3][4]
(唯一的4个很好,并且是预期的)
我正在尝试移植 RxJava 库并利用 Kotlin 中的扩展功能。
fun <T,R: MutableCollection<T>> Observable<T>.collectWhile(factory: (() -> R), condition: (R,T) -> Boolean) =
compose(Transformers.collectWhile(factory,condition))
Run Code Online (Sandbox Code Playgroud)
该Transformers.collectWhile()是用Java编写的,有此签名:
public static <T, R extends Collection<T>> Transformer<T, R> collectWhile(final Func0<R> factory,
final Action2<? super R, ? super T> collect)
Run Code Online (Sandbox Code Playgroud)
但是,我在映射collect参数时遇到了问题,而且我不擅长泛型。如何super用函数类型表达?
更新
我犯了一个愚蠢的错误。我不应该在深夜发帖。
我实际上是针对这个
public static <T, R extends Iterable<?>> Transformer<T, R> collectWhile(final Func0<R> factory,
final Action2<? super R, ? super T> collect, final Func2<? super R, ? super T, Boolean> condition)
Run Code Online (Sandbox Code Playgroud)
而这正是我应该做的。
fun <T,R: MutableCollection<T>> …Run Code Online (Sandbox Code Playgroud) 我试图更好地理解如何使用Rx-Kotlin进行单元测试,但是我无法成功地将主题设置为"已完成".因此,我总是在等待5秒的超时(onComplete应该是立即的),然后在assertComplete上失败.
我对awaitTerminalEvent的理解是它应该只阻塞,直到调用onComplete.我也研究过TestScheduler,但我不相信这里应该要求它.
任何可以引导我朝着正确方向前进的帮助或文档都将非常感激.
@Test
fun testObservable() {
val subject = BehaviorSubject.create<Int>()
subject.onNext(0)
TestSubscriber<Int>().apply {
subject.subscribe({
System.out.println(it)
subject.onNext(1)
subject.onComplete()
})
this.awaitTerminalEvent(5, TimeUnit.SECONDS)
this.assertComplete()
this.assertValue(1)
}
}
Run Code Online (Sandbox Code Playgroud) defer() 我正在研究 RxKotlin,出现了一个问题:和 之间有什么区别defer{}