ven*_*ore 2 async-await swift structured-concurrency asyncstream asyncsequence
我正在尝试使用现代 Swift Concurrency 构建分块文件上传机制。有一个流式文件读取器,我用它来逐块读取 1mb 大小的文件。它有两个闭包nextChunk: (DataChunk) -> Void
和completion: () - Void
。InputStream
第一个被调用的次数与从块大小读取的数据一样多。
为了使该阅读器兼容 Swift Concurrency,我进行了扩展并创建了AsyncStream
似乎最适合这种情况的扩展。
public extension StreamedFileReader {
func read() -> AsyncStream<DataChunk> {
AsyncStream { continuation in
self.read(nextChunk: { chunk in
continuation.yield(chunk)
}, completion: {
continuation.finish()
})
}
}
}
Run Code Online (Sandbox Code Playgroud)
使用它,AsyncStream
我迭代地读取一些文件并进行如下网络调用:
func process(_ url: URL) async {
// ...
do {
for await chunk in reader.read() {
let request = // ...
_ = try await service.upload(data: chunk.data, request: request)
}
} catch let error {
reader.cancelReading()
print(error)
}
}
Run Code Online (Sandbox Code Playgroud)
问题是,据我所知,没有任何限制机制不允许执行超过 N 个网络调用。因此,当我尝试上传大文件(5Gb)时,内存消耗会急剧增加。因此,流式读取文件的想法没有任何意义,因为将整个文件读入内存会更容易(这是一个笑话,但看起来像那样)。
相比之下,如果我使用的是一个好的旧 GCD,一切都会像魅力一样工作:
func process(_ url: URL) {
let semaphore = DispatchSemaphore(value: 5) // limit to no more than 5 requests at a given time
let uploadGroup = DispatchGroup()
let uploadQueue = DispatchQueue.global(qos: .userInitiated)
uploadQueue.async(group: uploadGroup) {
// ...
reader.read(nextChunk: { chunk in
let requset = // ...
uploadGroup.enter()
semaphore.wait()
service.upload(chunk: chunk, request: requset) {
uploadGroup.leave()
semaphore.signal()
}
}, completion: { _ in
print("read completed")
})
}
}
Run Code Online (Sandbox Code Playgroud)
DispatchQueue
嗯,这与顺序运行时使用并发的行为并不完全相同AsyncStream
。所以我做了一些研究,发现这可能TaskGroup
就是我在这种情况下需要的。它允许并行运行异步任务等。
我这样尝试过:
func process(_ url: URL) async {
// ...
do {
let totalParts = try await withThrowingTaskGroup(of: Void.self) { [service] group -> Int in
var counter = 1
for await chunk in reader.read() {
let request = // ...
group.addTask {
_ = try await service.upload(data: chunk.data, request: request)
}
counter = chunk.index
}
return counter
}
} catch let error {
reader.cancelReading()
print(error)
}
}
Run Code Online (Sandbox Code Playgroud)
在这种情况下,内存消耗甚至比迭代示例中的内存消耗还要多AsyncStream
!
我怀疑应该有一些条件,我需要暂停组或任务或其他东西,group.addTask
只有当有可能真正处理我要添加的这些任务时才调用,但我不知道该怎么做。
我找到了这个Q/A
并尝试try await group.next()
为每个第五块添加,但它根本没有帮助我。
有没有类似于DispatchGroup
+DispatchSemaphore
但针对现代并发的机制?
更新: 为了更好地展示所有 3 种方式之间的差异,这里是内存报告的屏幕截图
try await group.next()
在每个第 5 个块上使用)关键问题是使用AsyncStream
. 你的AsyncStream
读取数据并生成块的速度比上传速度要快。
考虑这个 MCVE,其中我模拟了 100 个块的流,每个块 1mb:
\nimport os.log\n\nprivate let log = OSLog(subsystem: "Test", category: .pointsOfInterest)\n\nstruct Chunk {\n let index: Int\n let data: Data\n}\n\nactor FileMock {\n let maxChunks = 100\n let chunkSize = 1_000_000\n var index = 0\n\n func nextChunk() -> Chunk? {\n guard index < maxChunks else { print("done"); return nil }\n defer { index += 1 }\n return Chunk(index: index, data: Data(repeating: UInt8(index & 0xff), count: chunkSize))\n }\n\n func chunks() -> AsyncStream<Chunk> {\n AsyncStream { continuation in\n index = 0\n while let chunk = nextChunk() {\n os_signpost(.event, log: log, name: "chunk")\n continuation.yield(chunk)\n }\n\n continuation.finish()\n }\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n和
\nfunc uploadAll() async throws {\n try await withThrowingTaskGroup(of: Void.self) { group in\n let chunks = await FileMock().chunks()\n var index = 0\n for await chunk in chunks {\n index += 1\n if index > 5 {\n try await group.next()\n }\n group.addTask { [self] in\n try await upload(chunk)\n }\n }\n try await group.waitForAll()\n }\n}\n\nfunc upload(_ chunk: Chunk) async throws {\n let id = OSSignpostID(log: log)\n os_signpost(.begin, log: log, name: #function, signpostID: id, "%d start", chunk.index)\n try await Task.sleep(nanoseconds: 1 * NSEC_PER_SEC)\n os_signpost(.end, log: log, name: #function, signpostID: id, "end")\n}\n
Run Code Online (Sandbox Code Playgroud)\n当我这样做时,我看到内存激增至 150mb,因为AsyncStream
快速产生所有的块:
请注意,所有\xe2\x93\x88
路标均显示何时Data
对象创建时间的所有路标都在过程开始时聚集在一起。
注意,文档警告我们,序列生成值的速度可能比消耗速度快:
\n\n\n任意元素源生成元素的速度可以快于调用者迭代元素的消耗速度。因此,
\nAsyncStream
定义了缓冲行为,允许流缓冲特定数量的最旧或最新元素。默认情况下,缓冲区限制为Int.max
,这意味着该值是无界的。
不幸的是,各种缓冲替代方案 .bufferingOldest
和.bufferingNewest
只会在缓冲区填满时丢弃值。在一些AsyncStreams
,这可能是一个可行的解决方案(例如,如果您正在跟踪用户位置,您可能只关心最近的位置),但是当上传文件块时,您显然不能让它在缓冲区已满时丢弃块。筋疲力尽的。
因此,不要AsyncStream
使用自定义的文件读取方式,而是使用自定义的文件读取方式AsyncSequence
,这样在实际需要之前不会读取下一个块,从而显着减少峰值内存使用量,例如:
struct FileMock: AsyncSequence {\n typealias Element = Chunk\n\n struct AsyncIterator : AsyncIteratorProtocol {\n let chunkSize = 1_000_000\n let maxChunks = 100\n var current = 0\n\n mutating func next() async -> Chunk? {\n os_signpost(.event, log: log, name: "chunk")\n\n guard current < maxChunks else { return nil }\n defer { current += 1 }\n return Chunk(index: current, data: Data(repeating: UInt8(current & 0xff), count: chunkSize))\n }\n }\n\n func makeAsyncIterator() -> AsyncIterator {\n return AsyncIterator()\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n和
\nfunc uploadAll() async throws {\n try await withThrowingTaskGroup(of: Void.self) { group in\n var index = 0\n for await chunk in FileMock() {\n index += 1\n if index > 5 {\n try await group.next()\n }\n group.addTask { [self] in\n try await upload(chunk)\n }\n }\n try await group.waitForAll()\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n这可以避免一次将所有 100mb 加载到内存中。请注意,内存的垂直比例不同,但您可以看到峰值使用量比上图少了 100mb,并且\xe2\x93\x88
显示数据何时读入内存的路标现在分布在整个图表中,而不是全部分布在开始时:
现在,显然,我只是用Chunk
/Data
对象来模拟大文件的读取,并用 a 来模拟上传Task.sleep
,但它希望说明了基本思想。
底线,不要用来AsyncStream
读取文件,而是考虑自定义AsyncSequence
在需要块时读取文件的自定义或其他模式。
其他一些观察结果:
\n您说 \xe2\x80\x9ct 尝试为每个第 5 个块 \xe2\x80\x9d 放置 try wait group.next() 。也许您可以向我们展示您尝试过的内容。但请注意,这个答案并没有\xe2\x80\x99t说\xe2\x80\x9ceach第5个块\xe2\x80\x9d,而是说\xe2\x80\x9ce在第5个\xe2\x80\x9d之后的每个块。我们无法对您尝试过的内容发表评论,除非您向我们展示您实际尝试过的内容(或提供MCVE。如上所示,使用Instruments\xe2\x80\x99\xe2\x80\x9cPoints of Interest\xe2\x80\x9d工具可以显示实际的并发量。
\n顺便说一下,上传大型资源时,请考虑使用基于文件的上传而不是Data
. 基于文件的上传具有更高的内存效率。无论资产大小如何,基于文件的资产期间使用的内存都将以 kb 为单位进行测量。您甚至可以完全关闭分块,并且无论文件大小如何,基于文件的上传都将使用很少的内存。URLSession
文件上传具有最小的内存占用。这是我们进行基于文件上传的原因之一。
基于文件上传的另一个原因是,尤其是对于 iOS,可以将基于文件的上传与后台会话结合起来。通过后台会话,用户甚至可以离开应用程序去做其他事情,上传将继续在后台运行。那时,您可以重新评估是否需要/想要进行分块。
\n