使用 Kotlin 中不同线程的结果更新列表

Saa*_*shi 4 multithreading list thread-safety kotlin mutablelist

我想用不同线程的结果更新列表。

mainFunction(): List<A> {
  var x: List<A> = listOf<A>()
  val job = ArrayList<Job>()
  val ans = mainScope.async {
            var i = 0
            for (j in (0..5)) {
                job.add(
                    launch {
                        val res = async {
                            func1()
                        }
                        x += res.await()
                    }
                )
            }
         job.joinAll()
        }
  ans.await()
  return x
}
fun func1(): List<A> {
 //Perform some operation to get list abc
 var abc: List<A> = listOf<A>()
 delay(1000)
 return abc
}
Run Code Online (Sandbox Code Playgroud)

列表“x”未正确更新。有时,它会附加“res”..有时则不会。有没有线程安全的方法来修改这样的列表?

新的实施:

mainFunction(): List<A> {
 var x: List<A> = listOf<A>()
 val ans = mainScope.async {
   List(6) {
     async{
      func1()
     }
   }.awaitAll()
 }
 print(ans)
 for (item in ans) {
  x+= item as List<A> // item is of type Kotlin.Unit
 }
}
Run Code Online (Sandbox Code Playgroud)

Jof*_*rey 6

简短回答

这是您正在做的事情的一个更简单的版本,它避免了您可能遇到的同步问题:

suspend fun mainFunction(): List<A> {
    return coroutineScope {
        List(6) { async { func1() } }.awaitAll()
    }
}
Run Code Online (Sandbox Code Playgroud)

如果你想解开这个问题,你可以阅读长答案。我将解释原始代码中并非真正惯用且可以替换的不同内容。

长答案

问题中的代码中有多个非惯用的东西,所以我将尝试解决每个问题。

从 0 开始的范围的索引 for 循环

如果您只想多次重复某个操作,则使用Repeat(6)而不是 更简单for (j in 0..5)。它更容易阅读,特别是当您不需要索引变量时:

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val job = ArrayList<Job>()
    val ans = mainScope.async {
        repeat(6) {
            job.add(
                launch {
                    val res = async {
                        func1()
                    }
                    x += res.await()
                }
            )
        }
        job.joinAll()
    }
    ans.await()
    return x
}
Run Code Online (Sandbox Code Playgroud)

使用循环创建列表

如果您想要在该循环之外创建一个列表,您也可以使用(或)List(size) { computeElement() }来代替,它利用了List 工厂函数repeatfor

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val ans = mainScope.async {
        val jobs = List(6) {
            launch {
                val res = async {
                    func1()
                }
                x += res.await()
            }
        }
        jobs.joinAll()
    }
    ans.await()
    return x
}
Run Code Online (Sandbox Code Playgroud)

额外的异步

无需在此处使用额外的异步包装您的启动,您可以直接在launches 上使用您的范围:

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val jobs = List(6) {
        mainScope.launch {
            val res = async {
                func1()
            }
            x += res.await()
        }
    }
    jobs.joinAll()
    return x
}
Run Code Online (Sandbox Code Playgroud)

异步+立即等待

使用async { someFun() }然后立即await-ing 这个Deferred结果相当于直接调用someFun()(除非您使用不同的范围或上下文,而您在这里没有为最内部的逻辑执行此操作)。

所以你可以替换最里面的部分:

val res = async {
    func1()
}
x += res.await()
Run Code Online (Sandbox Code Playgroud)

仅通过x += func1(),给出:

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val jobs = List(6) {
        mainScope.launch {
            x += func1()
        }
    }
    jobs.joinAll()
    return x
}
Run Code Online (Sandbox Code Playgroud)

启动与异步

如果您想要结果,通常使用async而不是更实用launch。当您使用 时launch,您必须手动将结果存储在某个地方(这会让您遇到像现在这样的同步问题)。使用async,您可以获得一个Deferred<T>await(),然后您可以得到一个列表,当您等待它们全部Deferred时,就不会有同步问题。

因此,前面代码的总体思路是不好的做法,并且可能会咬你,因为它需要手动同步。您可以通过以下方式替换它:

suspend fun mainFunction(): List<A> {
    val deferredValues = List(6) {
        mainScope.async {
            func1()
        }
    }
    val x = deferredValues.awaitAll()
    return x
}
Run Code Online (Sandbox Code Playgroud)

或者更简单:

suspend fun mainFunction(): List<A> {
    return List(6) {
        mainScope.async {
            func1()
        }
    }.awaitAll()
}
Run Code Online (Sandbox Code Playgroud)

手动连接与协程作用域

这通常是手动作业的气味join()。如果您想等待某些协程完成,更惯用的做法是在一个coroutineScope { ... }内启动所有这些协程,该块将挂起直到所有子协程完成。

在这里,我们已经将所有launchwe替换join()为调用asyncwe await,因此这不再适用,因为我们仍然需要await()延迟值才能获得结果。但是,由于我们已经处于挂起函数中,coroutineScope因此我们仍然可以使用外部作用域来代替外部作用域mainScope,以确保我们不会泄漏任何协程:

suspend fun mainFunction(): List<A> {
    return coroutineScope {
        List(6) { async { func1() } }.awaitAll()
    }
}
Run Code Online (Sandbox Code Playgroud)