如何限制kotlin协同程序的最大并发性

Ben*_*n H 8 parallel-processing multithreading kotlin kotlinx.coroutines

我有一个Sequence(来自File.walkTopDown),我需要在每个上运行一个长时间运行的操作.我想使用Kotlin最佳实践/协同程序,但我要么没有并行性,要么太多并行性并且遇到"太多打开文件"IO错误.

File("/Users/me/Pictures/").walkTopDown()
    .onFail { file, ex -> println("ERROR: $file caused $ex") }
    .filter { ... only big images... }
    .map { file ->
        async { // I *think* I want async and not "launch"...
            ImageProcessor.fromFile(file)
        }
    }
Run Code Online (Sandbox Code Playgroud)

这似乎没有并行运行,我的多核CPU永远不会超过1 CPU的价值.是否有协同程序运行"NumberOfCores并行操作"值得延期的工作?

使用Kotlin协同程序查看多线程,它首先创建所有作业,然后加入它们,但这意味着在重处理连接步骤之前完成序列/文件树步骤,这似乎......如果!将其拆分为收集和处理步骤意味着收集可以在处理之前运行.

val jobs = ... the Sequence above...
    .toSet()
println("Found ${jobs.size}")
jobs.forEach { it.await() }
Run Code Online (Sandbox Code Playgroud)

Pat*_*Pat 9

这不是针对您的问题,但它确实回答了“如何限制 kotlin 协程的最大并发性”的问题。

我一开始想使用newFixedThreadPoolContext,但 1)它已被弃用2) 它会使用线程,我认为这不是必需的或可取的(与 相同Executors.newFixedThreadPool().asCoroutineDispatcher())。通过使用Semaphore,此解决方案可能存在我不知道的缺陷,但它非常简单:

import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit

/**
 * Maps the inputs using [transform] at most [maxConcurrency] at a time until all Jobs are done.
 */
suspend fun <TInput, TOutput> Iterable<TInput>.mapConcurrently(
    maxConcurrency: Int,
    transform: suspend (TInput) -> TOutput,
) = coroutineScope {
    val gate = Semaphore(maxConcurrency)
    this@mapConcurrently.map {
        async {
            gate.withPermit {
                transform(it)
            }
        }
    }.awaitAll()
}
Run Code Online (Sandbox Code Playgroud)

测试(抱歉,它使用 Spek、hamcrest 和 kotlin 测试):

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestCoroutineDispatcher
import org.hamcrest.MatcherAssert.assertThat
import org.hamcrest.Matchers.greaterThanOrEqualTo
import org.hamcrest.Matchers.lessThanOrEqualTo
import org.spekframework.spek2.Spek
import org.spekframework.spek2.style.specification.describe
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals

@OptIn(ExperimentalCoroutinesApi::class)
object AsyncHelpersKtTest : Spek({
    val actionDelay: Long = 1_000 // arbitrary; obvious if non-test dispatcher is used on accident
    val testDispatcher = TestCoroutineDispatcher()

    afterEachTest {
        // Clean up the TestCoroutineDispatcher to make sure no other work is running.
        testDispatcher.cleanupTestCoroutines()
    }

    describe("mapConcurrently") {
        it("should run all inputs concurrently if maxConcurrency >= size") {
            val concurrentJobCounter = AtomicInteger(0)
            val inputs = IntRange(1, 2).toList()
            val maxConcurrency = inputs.size

            // https://github.com/Kotlin/kotlinx.coroutines/issues/1266 has useful info & examples
            runBlocking(testDispatcher) {
                print("start runBlocking $coroutineContext\n")

                // We have to run this async so that the code afterwards can advance the virtual clock
                val job = launch {
                    testDispatcher.pauseDispatcher {
                        val result = inputs.mapConcurrently(maxConcurrency) {
                            print("action $it $coroutineContext\n")

                            // Sanity check that we never run more in parallel than max
                            assertThat(concurrentJobCounter.addAndGet(1), lessThanOrEqualTo(maxConcurrency))

                            // Allow for virtual clock adjustment
                            delay(actionDelay)

                            // Sanity check that we never run more in parallel than max
                            assertThat(concurrentJobCounter.getAndAdd(-1), lessThanOrEqualTo(maxConcurrency))
                            print("action $it after delay $coroutineContext\n")

                            it
                        }

                        // Order is not guaranteed, thus a Set
                        assertEquals(inputs.toSet(), result.toSet())
                        print("end mapConcurrently $coroutineContext\n")
                    }
                }
                print("before advanceTime $coroutineContext\n")

                // Start the coroutines
                testDispatcher.advanceTimeBy(0)
                assertEquals(inputs.size, concurrentJobCounter.get(), "All jobs should have been started")

                testDispatcher.advanceTimeBy(actionDelay)
                print("after advanceTime $coroutineContext\n")
                assertEquals(0, concurrentJobCounter.get(), "All jobs should have finished")
                job.join()
            }
        }

        it("should run one at a time if maxConcurrency = 1") {
            val concurrentJobCounter = AtomicInteger(0)
            val inputs = IntRange(1, 2).toList()
            val maxConcurrency = 1

            runBlocking(testDispatcher) {
                val job = launch {
                    testDispatcher.pauseDispatcher {
                        inputs.mapConcurrently(maxConcurrency) {
                            assertThat(concurrentJobCounter.addAndGet(1), lessThanOrEqualTo(maxConcurrency))
                            delay(actionDelay)
                            assertThat(concurrentJobCounter.getAndAdd(-1), lessThanOrEqualTo(maxConcurrency))
                            it
                        }
                    }
                }

                testDispatcher.advanceTimeBy(0)
                assertEquals(1, concurrentJobCounter.get(), "Only one job should have started")

                val elapsedTime = testDispatcher.advanceUntilIdle()
                print("elapsedTime=$elapsedTime")
                assertThat(
                    "Virtual time should be at least as long as if all jobs ran sequentially",
                    elapsedTime,
                    greaterThanOrEqualTo(actionDelay * inputs.size)
                )
                job.join()
            }
        }

        it("should handle cancellation") {
            val jobCounter = AtomicInteger(0)
            val inputs = IntRange(1, 2).toList()
            val maxConcurrency = 1

            runBlocking(testDispatcher) {
                val job = launch {
                    testDispatcher.pauseDispatcher {
                        inputs.mapConcurrently(maxConcurrency) {
                            jobCounter.addAndGet(1)
                            delay(actionDelay)
                            it
                        }
                    }
                }

                testDispatcher.advanceTimeBy(0)
                assertEquals(1, jobCounter.get(), "Only one job should have started")

                job.cancel()
                testDispatcher.advanceUntilIdle()
                assertEquals(1, jobCounter.get(), "Only one job should have run")
                job.join()
            }
        }
    }
})
Run Code Online (Sandbox Code Playgroud)

根据https://play.kotlinlang.org/hands-on/Introduction%20to%20Coroutines%20and%20Channels/09_Testing,您可能还需要调整编译器参数以运行测试:

compileTestKotlin {
    kotlinOptions {
        // Needed for runBlocking test coroutine dispatcher?
        freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental"
        freeCompilerArgs += "-Xopt-in=kotlin.RequiresOptIn"
    }
}
testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.1'
Run Code Online (Sandbox Code Playgroud)

  • 请注意,挂起的作业不计入“limitedParallelism”,因此,如果您使用非阻塞(挂起)API,则“limitedParallelism”将不会限制操作的并发性。该函数的关键词是并行性,它不限制并发性。“limitedParallelism”最有可能与“Dispatchers.IO”一起使用,后者是为阻塞 IO 任务而设计的。 (3认同)

vod*_*dan 7

你的第一个片段的问题是它根本不运行 - 记住,Sequence是懒惰的,你必须使用终端操作,如toSet()forEach().此外,您需要通过构造newFixedThreadPoolContext上下文并在async以下位置使用它来限制可用于该任务的线程数:

val pictureContext = newFixedThreadPoolContext(nThreads = 10, name = "reading pictures in parallel")

File("/Users/me/Pictures/").walkTopDown()
    .onFail { file, ex -> println("ERROR: $file caused $ex") }
    .filter { ... only big images... }
    .map { file ->
        async(pictureContext) {
            ImageProcessor.fromFile(file)
        }
    }
    .toList()
    .forEach { it.await() }
Run Code Online (Sandbox Code Playgroud)

编辑:你必须使用一个终端运营商(toList)befor等待结果

  • 值得注意的是,使用有限的线程池会限制并行性但不会限制并发性,这意味着如果`ImageProcessor.fromFile` 是一个挂起函数(不会阻塞),您仍然可以处理多个文件,这可能不是您想要的。 (5认同)

Ben*_*n H 5

我让它与频道一起工作。但也许我对你的方式是多余的?

val pipe = ArrayChannel<Deferred<ImageFile>>(20)
launch {
    while (!(pipe.isEmpty && pipe.isClosedForSend)) {
        imageFiles.add(pipe.receive().await())
    }
    println("pipe closed")
}
File("/Users/me/").walkTopDown()
        .onFail { file, ex -> println("ERROR: $file caused $ex") }
        .forEach { pipe.send(async { ImageFile.fromFile(it) }) }
pipe.close()
Run Code Online (Sandbox Code Playgroud)


Ser*_*gey 5

为了将并行性限制为某个值,有limitedParallelism从库1.6.0版本开始的函数kotlinx.coroutines。可以在对象上调用它CoroutineDispatcher。因此,为了限制并行执行的线程,我们可以编写如下内容:

val parallelismLimit = Runtime.getRuntime().availableProcessors()
val limitedDispatcher = Dispatchers.Default.limitedParallelism(parallelismLimit)
val scope = CoroutineScope(limitedDispatcher) // we can set limitedDispatcher for the whole scope

scope.launch { // or we can set limitedDispatcher for a coroutine launch(limitedDispatcher)
    File("/Users/me/Pictures/").walkTopDown()
        .onFail { file, ex -> println("ERROR: $file caused $ex") }
        .filter { ... only big images... }
        .map { file ->
            async {
                ImageProcessor.fromFile(file)
            }
        }.toList().awaitAll()
}
Run Code Online (Sandbox Code Playgroud)

ImageProcessor.fromFile(file)parallelismLimit将使用多个线程并行执行。