tmn*_*tmn 4 stream sequence kotlin rx-java rx-kotlin
我知道这违反了很多 Rx 规则,但我真的很喜欢RxJava-JDBC,我的队友也是如此。关系数据库是我们工作的核心,Rx 也是如此。
然而,在某些情况下,我们不想发出Observable<ResultSet>,而是只想使用基于拉取的 Java 8Stream<ResultSet>或 Kotlin Sequence<ResultSet>。但我们非常习惯 RxJava-JDBC 库,它只返回一个Observable<ResultSet>.
因此,我想知道是否有一种方法可以使用扩展函数将 an 转换Observable<ResultSet>为 a Sequence<ResultSet>,而不进行任何中间收集或toBlocking()调用。下面是我到目前为止所拥有的一切,但我现在正在尝试连接基于推和拉的系统,但我无法缓冲,因为ResultSet每次onNext()调用都是有状态的。这是一个不可能完成的任务吗?
import rx.Observable
import rx.Subscriber
import java.sql.ResultSet
fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() {
private var isComplete = false
override fun onCompleted() {
isComplete = true
}
override fun onError(e: Throwable?) {
throw UnsupportedOperationException()
}
override fun onNext(rs: ResultSet?) {
throw UnsupportedOperationException()
}
override fun hasNext(): Boolean {
throw UnsupportedOperationException()
}
override fun next(): ResultSet {
throw UnsupportedOperationException()
}
}.asSequence()
Run Code Online (Sandbox Code Playgroud)
我不确定这是实现你想要的最简单的方法,但你可以尝试这个代码。它通过创建一个阻塞队列并将所有事件从 发布到此队列来将 an 转换Observable为 an 。从队列中提取事件,如果没有则阻塞。然后它根据接收到的当前事件修改自己的状态。IteratorObservableIterable
class ObservableIterator<T>(
observable: Observable<T>,
scheduler: Scheduler
) : Iterator<T>, Closeable {
private val queue = LinkedBlockingQueue<Notification<T>>()
private var cached: Notification<T>? = null
private var completed: Boolean = false
private val subscription =
observable
.materialize()
.subscribeOn(scheduler)
.subscribe({ queue.put(it) })
override fun hasNext(): Boolean {
cacheNext()
return !completed
}
override fun next(): T {
cacheNext()
val notification = cached ?: throw NoSuchElementException()
check(notification.isOnNext)
cached = null
return notification.value
}
private fun cacheNext() {
if (completed) {
return
}
if (cached == null) {
queue.take().let { notification ->
if (notification.isOnError) {
completed = true
throw RuntimeException(notification.throwable)
} else if (notification.isOnCompleted) {
completed = true
} else {
cached = notification
}
}
}
}
override fun close() {
subscription.unsubscribe()
completed = true
cached = null
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3449 次 |
| 最近记录: |