在 Kotlin 协程中等待 Java 5 Futures 而不阻塞线程

Abh*_*oni 5 java concurrency kotlin google-cloud-firestore kotlin-coroutines

我有一个suspend函数,我想从中返回 Java 5 的结果Future。future 对象来自另一个库Firebase Cloud Firestore- Admin SDK for Java并提供阻塞调用get()来检索所述 future 的结果。

我的功能看起来像这样-

suspend fun getPrefix(messageCreateEvent: MessageCreateEvent): String {

    val snapshot = db.collection("prefixes")
        .document(messageCreateEvent.guildId.get().asString())
        .get() //This returns a future
        .get() //Retrieves the future's result (Blocks thread; IDE gives warning)

    //Return the prefix
    return if (snapshot.exists())
        snapshot.getString("prefix") ?: DEFAULT_PREFIX
    else DEFAULT_PREFIX
}
Run Code Online (Sandbox Code Playgroud)

我考虑过的解决方案

我考虑的第一件事是寻找kotlinx.coroutine扩展来连接未来。虽然存在扩展名,但它们仅适用于CompletionStatge. 所以我决定将未来包装成一个 ()-

val snapshot = CompleteableFuture.supplyAsync {
    db.collection("prefixes")
        .document(messageCreateEvent.guildId.get().asString())
        .get() // This returns a future
        .get() // Get the result
}.await()
Run Code Online (Sandbox Code Playgroud)

我非常缺乏经验,不确定这是否是正确的解决方案。我在编程社区上查询了我的问题,有人推荐我使用Deferred-

val deferred = CompletableDeferred<DocumentSnapshot>()
val future = db.collection("prefixes")
    .document(messageCreateEvent.guildId.get().asString())
    .get()

future.addListener(
    Runnable { deferred.complete(future.get()) },
    ForkJoinPool.commonPool()
)
            
val snapshot = deferred.await()
Run Code Online (Sandbox Code Playgroud)

我已经花了很长时间来寻找一种将期货与协同程序连接起来的方法,在 SO 上甚至没有类似的问题。通过,如果这个问题得到重复标记,我不会感到惊讶。

Cly*_*yde 3

这个问题的关键是suspendCoroutine功能。另一个不明显的一点是,要添加回调,ApiFuture您可以在 上使用静态方法ApiFutures

这是await()ApiFuture.

        /**
         * Function to convert an ApiFuture into a coroutine return.
         */
        suspend fun <F : Any?, R : Any?> ApiFuture<F>.await(
            successHandler: (F) -> R,
        ): R {
            return suspendCoroutine { cont ->
                ApiFutures.addCallback(this, object : ApiFutureCallback<F> {
                    override fun onFailure(t: Throwable?) {
                        cont.resumeWithException(t ?: IOException("Unknown error"))
                    }

                    override fun onSuccess(result: F) {
                        cont.resume(successHandler(result))
                    }
                }, Dispatchers.IO.asExecutor())
            }
        }

/**
 * inline function to retrieve a document as a POJO from a DocumentReference
*/

suspend inline fun <reified T: Any>DocumentReference.toObject(): T? {
            return get().await<DocumentSnapshot, T?> {
                it.toObject(T::class.java)
            }
        }
Run Code Online (Sandbox Code Playgroud)

现在你可以写这样的东西:

        /**
         * Function to convert an ApiFuture into a coroutine return.
         */
        suspend fun <F : Any?, R : Any?> ApiFuture<F>.await(
            successHandler: (F) -> R,
        ): R {
            return suspendCoroutine { cont ->
                ApiFutures.addCallback(this, object : ApiFutureCallback<F> {
                    override fun onFailure(t: Throwable?) {
                        cont.resumeWithException(t ?: IOException("Unknown error"))
                    }

                    override fun onSuccess(result: F) {
                        cont.resume(successHandler(result))
                    }
                }, Dispatchers.IO.asExecutor())
            }
        }

/**
 * inline function to retrieve a document as a POJO from a DocumentReference
*/

suspend inline fun <reified T: Any>DocumentReference.toObject(): T? {
            return get().await<DocumentSnapshot, T?> {
                it.toObject(T::class.java)
            }
        }
Run Code Online (Sandbox Code Playgroud)