标签: rx-kotlin

使用Kotlin高阶函数和单方法接口的行为?

我之前使用RxJava和Kotlin遇到了一些问题.我做了一些有趣的发现,我仍然感到困惑.

Func1RxJava中有简单的界面

public interface Func1<T, R> extends Function {
    R call(T t);
}
Run Code Online (Sandbox Code Playgroud)

我试图添加一个扩展方法Observable,也是一个RxJava类.这会将排放物收集到Google Guava中,ImmutableListMulitmap使用a Func1来映射每个项目的关键字.

fun <K,T> Observable<T>.toImmutableListMultimap(keyMapper: Func1<T, K>): Observable<ImmutableListMultimap<K,T>> {
    return this.collect({ ImmutableListMultimap.builder<K,T>()},{ b, t -> b.put(keyMapper.call(t), t)}).map { it.build() }
}
Run Code Online (Sandbox Code Playgroud)

当我试图调用这个扩展方法时,我无法进行编译,而且根本不理解lambda表达式.

ScheduledItem.all.flatMap { it.rebuildSoftTransactions }
.toImmutableListMultimap { it.id /*compile error */ } .cache()
Run Code Online (Sandbox Code Playgroud)

但是,当我修改扩展方法以使用函数类型时,最奇怪的事情发生了.

fun <K,T> Observable<T>.toImmutableListMultimap(keyMapper: (T) -> K): Observable<ImmutableListMultimap<K,T>> {
    return this.collect({ ImmutableListMultimap.builder<K,T>()},{ b, t -> b.put(keyMapper(t), t)}).map { it.build() }
}
Run Code Online (Sandbox Code Playgroud)

然后一切编译得很好.但这让我感到困惑:为什么它没有将lambda推断到界面上?当我使用标准map()方法时, …

lambda kotlin rx-java rx-kotlin

5
推荐指数
1
解决办法
710
查看次数

RxJava 2.0和Kotlin Single.zip()以及单曲列表

我有我无法解决的问题。我试图使用Kotlin将.zip(List,)多个Singles合并为一个,而我提供的Function都不适合作为第二个参数。

    fun getUserFriendsLocationsInBuckets(token: String) {
    roomDatabase.userFriendsDao().getUserFriendsDtosForToken(token).subscribe(
            { userFriends: List<UserFriendDTO> ->
                Single.zip(getLocationSingleForEveryUser(userFriends),
                        Function<Array<List<Location>>, List<Location>> { t: Array<List<Location>> -> listOf<Location>() })
            },
            { error: Throwable -> }
    )
}

private fun getLocationSingleForEveryUser(userFriends: List<UserFriendDTO>): List<Single<List<Location>>> =
        userFriends.map { serverRepository.locationEndpoint.getBucketedUserLocationsInLast24H(it.userFriendId) }
Run Code Online (Sandbox Code Playgroud)

Android Studio错误

kotlin rx-java rx-kotlin

5
推荐指数
1
解决办法
2505
查看次数

Kotlin合并两个可为空的可变列表

val mutableList1: MutableList<TeamInvitationData?>?
val mutableList2: MutableList<TeamInvitationData?>?
Run Code Online (Sandbox Code Playgroud)

addAll方法可用于合并可为空的可变列表,但是,这会引发编译时错误。

例:

val map1 = listOne?.map { TeamInvitationData(it) }
val map2 = listTwo?.map { TeamInvitationData(it) }
map1.addAll(map2)
Run Code Online (Sandbox Code Playgroud)

类型接口失败,请尝试明确指定类型参数。

在这里,我可以通过任何方式合并这两个数组,谢谢。

android kotlin rx-kotlin

5
推荐指数
2
解决办法
5253
查看次数

rx kotlin订阅不起作用,没有收到项目

我创建了一个函数,它返回Observable<String>带有文件名的函数,但是我在订阅中没有得到任何我调用此方法的事件.也没有onError或onComplete的调用
请参阅我的代码:

fun getAllFiles(): Observable<String> {

    val allFiles = File("/Users/stephan/Projects/Playground/kotlinfiles/")
            .listFiles { file -> !file.isDirectory() }
    return observable { subscriber ->
        allFiles.toObservable()
                .map { f -> "${f.name}" }
                .doOnNext { println("Found file $it") }
                .subscribe { subscriber}
    }
}

fun test() {
    getAllFiles()
            .doOnNext { println("File name$it") }
            .subscribe(
                    {n -> println("File: $n")},
                    {e -> println("Damn: $e")},
                    {println("Completed")})
}
Run Code Online (Sandbox Code Playgroud)

虽然在getAllFiles()函数中调用了所有内容,但我还缺少什么?

kotlin rx-kotlin

4
推荐指数
1
解决办法
2713
查看次数

RxJava-将 Observable 转换为迭代器、流或序列

我知道这违反了很多 Rx 规则,但我真的很喜欢RxJava-JDBC,我的队友也是如此。关系数据库是我们工作的核心,Rx 也是如此。

然而,在某些情况下,我们不想发出Observable<ResultSet>,而是只想使用基于拉取的 Java 8Stream<ResultSet>或 Kotlin Sequence<ResultSet>。但我们非常习惯 RxJava-JDBC 库,它只返回一个Observable<ResultSet>.

因此,我想知道是否有一种方法可以使用扩展函数将 an 转换Observable<ResultSet>为 a Sequence<ResultSet>,而不进行任何中间收集或toBlocking()调用。下面是我到目前为止所拥有的一切,但我现在正在尝试连接基于推和拉的系统,但我无法缓冲,因为ResultSet每次onNext()调用都是有状态的。这是一个不可能完成的任务吗?

import rx.Observable
import rx.Subscriber
import java.sql.ResultSet

fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() {

    private var isComplete = false

    override fun onCompleted() {
        isComplete = true
    }

    override fun onError(e: Throwable?) {
        throw UnsupportedOperationException()
    }

    override fun onNext(rs: ResultSet?) {
        throw UnsupportedOperationException()
    }


    override fun hasNext(): Boolean …
Run Code Online (Sandbox Code Playgroud)

stream sequence kotlin rx-java rx-kotlin

4
推荐指数
1
解决办法
3449
查看次数

rxjava2和rxkotlin有什么区别?

rxjava2依赖和rxkotlin依赖之间有什么区别。如果我正在使用rxkotlin依赖项,我是否还需要添加rxjava2依赖项。

implementation 'io.reactivex.rxjava2:rxkotlin:x.y.z'
// do i need to add the below dependencies also?
implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
Run Code Online (Sandbox Code Playgroud)

android kotlin rx-kotlin rx-java2

4
推荐指数
2
解决办法
438
查看次数

Rxjava - 如何获取当前项和上一项?

如何使用运算符以便我始终获得先前值和当前值?如果可能的话,我想避免在管道外创建状态。

- time ->
1      2      3      4
|      |      |      |
Operations
       |      |      |
       (1,2)  (2,3)  (3,4)

Run Code Online (Sandbox Code Playgroud)

请注意,除了第一个和最后一个值之外的每个值都必须出现两次,因此简单的缓冲区是行不通的。

我考虑过skipmergeand结合buffer,但合并似乎并不能保证顺序。

val s = PublishSubject.create<Int>()
s.mergeWith(s.skip(1)).buffer(2).subscribe{i -> print(i)}
s.onNext(1)
s.onNext(2)
s.onNext(3)
s.onNext(4)


Run Code Online (Sandbox Code Playgroud)

输出:
[1, 2][2, 3][3, 4]

val o = Observable.just(1,2,3,4)
o.mergeWith(o.skip(1)).buffer(2).subscribe{i -> print(i)}
Run Code Online (Sandbox Code Playgroud)

输出:
[1, 2][3, 4][2, 3][4]

(唯一的4个很好,并且是预期的)

kotlin rx-java rx-kotlin

4
推荐指数
1
解决办法
1524
查看次数

在 Kotlin 函数类型中表达“超级”泛型?

我正在尝试移植 RxJava 库并利用 Kotlin 中的扩展功能。

fun <T,R: MutableCollection<T>> Observable<T>.collectWhile(factory: (() -> R), condition: (R,T) -> Boolean) =
        compose(Transformers.collectWhile(factory,condition))
Run Code Online (Sandbox Code Playgroud)

Transformers.collectWhile()是用Java编写的,有此签名:

public static <T, R extends Collection<T>> Transformer<T, R> collectWhile(final Func0<R> factory,
            final Action2<? super R, ? super T> collect)
Run Code Online (Sandbox Code Playgroud)

但是,我在映射collect参数时遇到了问题,而且我不擅长泛型。如何super用函数类型表达?

更新

我犯了一个愚蠢的错误。我不应该在深夜发帖。

我实际上是针对这个

public static <T, R extends Iterable<?>> Transformer<T, R> collectWhile(final Func0<R> factory,
        final Action2<? super R, ? super T> collect, final Func2<? super R, ? super T, Boolean> condition)
Run Code Online (Sandbox Code Playgroud)

而这正是我应该做的。

fun <T,R: MutableCollection<T>> …
Run Code Online (Sandbox Code Playgroud)

java generics kotlin rx-java rx-kotlin

3
推荐指数
1
解决办法
867
查看次数

Rx-Kotlin awaitTerminalEvent从未获得onComplete

我试图更好地理解如何使用Rx-Kotlin进行单元测试,但是我无法成功地将主题设置为"已完成".因此,我总是在等待5秒的超时(onComplete应该是立即的),然后在assertComplete上失败.

我对awaitTerminalEvent的理解是它应该只阻塞,直到调用onComplete.我也研究过TestScheduler,但我不相信这里应该要求它.

任何可以引导我朝着正确方向前进的帮助或文档都将非常感激.

@Test
fun testObservable() {
    val subject = BehaviorSubject.create<Int>()
    subject.onNext(0)

    TestSubscriber<Int>().apply {
        subject.subscribe({
            System.out.println(it)
            subject.onNext(1)
            subject.onComplete()
        })

        this.awaitTerminalEvent(5, TimeUnit.SECONDS)
        this.assertComplete()
        this.assertValue(1)
    }
}
Run Code Online (Sandbox Code Playgroud)

testing unit-testing kotlin rx-kotlin rx-java2

3
推荐指数
1
解决办法
642
查看次数

defer() 和 defer{} 有什么区别

defer() 我正在研究 RxKotlin,出现了一个问题:和 之间有什么区别defer{}

kotlin rx-java rx-kotlin

3
推荐指数
1
解决办法
319
查看次数