TaskGroup 限制大量任务的内存使用量

ven*_*ore 2 async-await swift structured-concurrency asyncstream asyncsequence

我正在尝试使用现代 Swift Concurrency 构建分块文件上传机制。有一个流式文件读取器,我用它来逐块读取 1mb 大小的文件。它有两个闭包nextChunk: (DataChunk) -> Voidcompletion: () - VoidInputStream第一个被调用的次数与从块大小读取的数据一样多。

为了使该阅读器兼容 Swift Concurrency,我进行了扩展并创建了AsyncStream 似乎最适合这种情况的扩展。

public extension StreamedFileReader {
    func read() -> AsyncStream<DataChunk> {
        AsyncStream { continuation in
            self.read(nextChunk: { chunk in
                continuation.yield(chunk)
            }, completion: {
                continuation.finish()
            })
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

使用它,AsyncStream我迭代地读取一些文件并进行如下网络调用:

func process(_ url: URL) async {
    // ...
    do {
        for await chunk in reader.read() {
            let request = // ...
            _ = try await service.upload(data: chunk.data, request: request)
        }
    } catch let error {
        reader.cancelReading()
        print(error)
    }
}
Run Code Online (Sandbox Code Playgroud)

问题是,据我所知,没有任何限制机制不允许执行超过 N 个网络调用。因此,当我尝试上传大文件(5Gb)时,内存消耗会急剧增加。因此,流式读取文件的想法没有任何意义,因为将整个文件读入内存会更容易(这是一个笑话,但看起来像那样)。

相比之下,如果我使用的是一个好的旧 GCD,一切都会像魅力一样工作:

func process(_ url: URL) {
    let semaphore = DispatchSemaphore(value: 5) // limit to no more than 5 requests at a given time
    let uploadGroup = DispatchGroup()
    let uploadQueue = DispatchQueue.global(qos: .userInitiated)
    uploadQueue.async(group: uploadGroup) {
        // ...
        reader.read(nextChunk: { chunk in
            let requset = // ...
            uploadGroup.enter()
            semaphore.wait()
            service.upload(chunk: chunk, request: requset) {
                uploadGroup.leave()
                semaphore.signal()
            }
        }, completion: { _ in
            print("read completed")
        })
    }    
}
Run Code Online (Sandbox Code Playgroud)

DispatchQueue嗯,这与顺序运行时使用并发的行为并不完全相同AsyncStream。所以我做了一些研究,发现这可能TaskGroup就是我在这种情况下需要的。它允许并行运行异步任务等。

我这样尝试过:

func process(_ url: URL) async {
    // ...
    do {
        let totalParts = try await withThrowingTaskGroup(of: Void.self) { [service] group -> Int in
            var counter = 1
            for await chunk in reader.read() {
                let request = // ...
                group.addTask {
                    _ = try await service.upload(data: chunk.data, request: request)
                }
                counter = chunk.index
            }
            return counter
        }
    } catch let error {
        reader.cancelReading()
        print(error)
    }
}
Run Code Online (Sandbox Code Playgroud)

在这种情况下,内存消耗甚至比迭代示例中的内存消耗还要多AsyncStream

我怀疑应该有一些条件,我需要暂停组或任务或其他东西,group.addTask只有当有可能真正处理我要添加的这些任务时才调用,但我不知道该怎么做。

我找到了这个Q/A 并尝试try await group.next()为每个第五块添加,但它根本没有帮助我。

有没有类似于DispatchGroup+DispatchSemaphore但针对现代并发的机制?

更新: 为了更好地展示所有 3 种方式之间的差异,这里是内存报告的屏幕截图

AsyncStream迭代

AsyncStream迭代

AsyncStream + TaskGroup(try await group.next()在每个第 5 个块上使用)

异步流 + 任务组

GCD DispatchQueue + DispatchGroup + DispatchSemaphore

GCD DispatchQueue + DispatchGroup + DispatchSemaphore

Rob*_*Rob 6

关键问题是使用AsyncStream. 你的AsyncStream读取数据并生成块的速度比上传速度要快。

\n

考虑这个 MCVE,其中我模拟了 100 个块的流,每个块 1mb:

\n
import os.log\n\nprivate let log = OSLog(subsystem: "Test", category: .pointsOfInterest)\n\nstruct Chunk {\n    let index: Int\n    let data: Data\n}\n\nactor FileMock {\n    let maxChunks = 100\n    let chunkSize = 1_000_000\n    var index = 0\n\n    func nextChunk() -> Chunk? {\n        guard index < maxChunks else { print("done"); return nil }\n        defer { index += 1 }\n        return Chunk(index: index, data: Data(repeating: UInt8(index & 0xff), count: chunkSize))\n    }\n\n    func chunks() -> AsyncStream<Chunk> {\n        AsyncStream { continuation in\n            index = 0\n            while let chunk = nextChunk() {\n                os_signpost(.event, log: log, name: "chunk")\n                continuation.yield(chunk)\n            }\n\n            continuation.finish()\n        }\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

\n
func uploadAll() async throws {\n    try await withThrowingTaskGroup(of: Void.self) { group in\n        let chunks = await FileMock().chunks()\n        var index = 0\n        for await chunk in chunks {\n            index += 1\n            if index > 5 {\n                try await group.next()\n            }\n            group.addTask { [self] in\n                try await upload(chunk)\n            }\n        }\n        try await group.waitForAll()\n    }\n}\n\nfunc upload(_ chunk: Chunk) async throws {\n    let id = OSSignpostID(log: log)\n    os_signpost(.begin, log: log, name: #function, signpostID: id, "%d start", chunk.index)\n    try await Task.sleep(nanoseconds: 1 * NSEC_PER_SEC)\n    os_signpost(.end, log: log, name: #function, signpostID: id, "end")\n}\n
Run Code Online (Sandbox Code Playgroud)\n

当我这样做时,我看到内存激增至 150mb,因为AsyncStream快速产生所有的块:

\n

在此输入图像描述

\n

请注意,所有\xe2\x93\x88路标均显示何时Data对象创建时间的所有路标都在过程开始时聚集在一起。

\n
\n

注意,文档警告我们,序列生成值的速度可能比消耗速度快:

\n
\n

任意元素源生成元素的速度可以快于调用者迭代元素的消耗速度。因此,AsyncStream定义了缓冲行为,允许流缓冲特定数量的最旧或最新元素。默认情况下,缓冲区限制为Int.max,这意味着该值是无界的。

\n
\n

不幸的是,各种缓冲替代方案 .bufferingOldest.bufferingNewest只会在缓冲区填满时丢弃值。在一些AsyncStreams,这可能是一个可行的解决方案(例如,如果您正在跟踪用户位置,您可能只关心最近的位置),但是当上传文件块时,您显然不能让它在缓冲区已满时丢弃块。筋疲力尽的。

\n
\n

因此,不要AsyncStream使用自定义的文件读取方式,而是使用自定义的文件读取方式AsyncSequence,这样在实际需要之前不会读取下一个块,从而显着减少峰值内存使用量,例如:

\n
struct FileMock: AsyncSequence {\n    typealias Element = Chunk\n\n    struct AsyncIterator : AsyncIteratorProtocol {\n        let chunkSize = 1_000_000\n        let maxChunks = 100\n        var current = 0\n\n        mutating func next() async -> Chunk? {\n            os_signpost(.event, log: log, name: "chunk")\n\n            guard current < maxChunks else { return nil }\n            defer { current += 1 }\n            return Chunk(index: current, data: Data(repeating: UInt8(current & 0xff), count: chunkSize))\n        }\n    }\n\n    func makeAsyncIterator() -> AsyncIterator {\n        return AsyncIterator()\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

\n
func uploadAll() async throws {\n    try await withThrowingTaskGroup(of: Void.self) { group in\n        var index = 0\n        for await chunk in FileMock() {\n            index += 1\n            if index > 5 {\n                try await group.next()\n            }\n            group.addTask { [self] in\n                try await upload(chunk)\n            }\n        }\n        try await group.waitForAll()\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

这可以避免一次将所有 100mb 加载到内存中。请注意,内存的垂直比例不同,但您可以看到峰值使用量比上图少了 100mb,并且\xe2\x93\x88显示数据何时读入内存的路标现在分布在整个图表中,而不是全部分布在开始时:

\n

在此输入图像描述

\n

现在,显然,我只是用Chunk/Data对象来模拟大文件的读取,并用 a 来模拟上传Task.sleep,但它希望说明了基本思想。

\n

底线,不要用来AsyncStream读取文件,而是考虑自定义AsyncSequence在需要块时读取文件的自定义或其他模式。

\n
\n

其他一些观察结果:

\n
    \n
  • 您说 \xe2\x80\x9ct 尝试为每个第 5 个块 \xe2\x80\x9d 放置 try wait group.next() 。也许您可以向我们展示您尝试过的内容。但请注意,这个答案并没有\xe2\x80\x99t说\xe2\x80\x9ceach第5个块\xe2\x80\x9d,而是说\xe2\x80\x9ce在第5个\xe2\x80\x9d之后的每个块。我们无法对您尝试过的内容发表评论,除非您向我们展示您实际尝试过的内容(或提供MCVE。如上所示,使用Instruments\xe2\x80\x99\xe2\x80\x9cPoints of Interest\xe2\x80\x9d工具可以显示实际的并发量。

    \n
  • \n
  • 顺便说一下,上传大型资源时,请考虑使用基于文件的上传而不是Data. 基于文件的上传具有更高的内存效率。无论资产大小如何,基于文件的资产期间使用的内存都将以 kb 为单位进行测量。您甚至可以完全关闭分块,并且无论文件大小如何,基于文件的上传都将使用很少的内存。URLSession文件上传具有最小的内存占用。这是我们进行基于文件上传的原因之一。

    \n
  • \n
  • 基于文件上传的另一个原因是,尤其是对于 iOS,可以将基于文件的上传与后台会话结合起来。通过后台会话,用户甚至可以离开应用程序去做其他事情,上传将继续在后台运行。那时,您可以重新评估是否需要/想要进行分块。

    \n
  • \n
\n