通过Kotlin Coroutines同步S3文件上传

lin*_*nqu 3 upload amazon-s3 kotlin kotlin-coroutines

我需要将许多文件上传到S3,按顺序完成该作业需要几个小时.这正是Kotlin的新协程所擅长的,所以我想先给他们一次尝试,而不是再用一些基于线程的执行服务来摆弄.

这是我的(简化)代码:

fun upload(superTiles: Map<Int, Map<Int, SuperTile>>) = runBlocking {
    val s3 = AmazonS3ClientBuilder.standard().withRegion("eu-west-1").build()
    for ((x, ys) in superTiles) {
        val jobs = mutableListOf<Deferred<Any>>()
        for ((y, superTile) in ys) {
            val job = async(CommonPool) {
                uploadTile(s3, x, y, superTile)
            }
            jobs.add(job)
        }
        jobs.map { it.await() }
    }
}

suspend fun uploadTile(s3: AmazonS3, x: Int, y: Int, superTile: SuperTile) {
    val json: String = "{}"
    val key = "$s3Prefix/x4/$z/$x/$y.json"
    s3.putObject(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata))
}
Run Code Online (Sandbox Code Playgroud)

问题是:代码仍然非常慢,并且日志记录显示请求仍然按顺序执行:作业在创建下一个作业之前完成.只有极少数情况下(十分之一)我才能看到同时运行的工作.

为什么代码运行速度不快/同时?我能做些什么呢?

Rom*_*rov 9

当您使用异步 API时,Kotlin协程表现出色,而AmazonS3.putObject您使用的API是一个老派阻塞的同步API,因此您只能获得与CommonPool您正在使用的线程数一样多的并发上载.uploadTile使用suspend修改标记您的函数没有任何价值,因为它在其正文中不使用任何挂起函数.

在上传任务中获得更多吞吐量的第一步是开始使用异步API.我建议看一下Amazon S3 TransferManager的钱包.看看是否能首先解决您的问题.

Kotlin协程旨在帮助您将异步API组合成易于使用的逻辑工作流.例如,TransferManager通过编写以下扩展函数,可以直接调整与协同程序一起使用的异步API :

suspend fun Upload.await(): UploadResult = suspendCancellableCoroutine { cont ->
    addProgressListener {
        if (isDone) {
            // we know it should not actually wait when done
            try { cont.resume(waitForUploadResult()) }
            catch (e: Throwable) { cont.resumeWithException(e) }
        }
    }
    cont.invokeOnCompletion { abort() }
}
Run Code Online (Sandbox Code Playgroud)

这个扩展使您能够编写非常流畅的代码,TransferManager您可以重写您的uploadTile函数,TransferManager而不是使用阻塞AmazonS3接口:

suspend fun uploadTile(tm: TransferManager, x: Int, y: Int, superTile: SuperTile) {
    val json: String = "{}"
    val key = "$s3Prefix/x4/$z/$x/$y.json"
    tm.upload(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata))
        .await()
}
Run Code Online (Sandbox Code Playgroud)

请注意,此新版本如何uploadTile使用await上面定义的挂起函数.