Swift 5.6 如何将异步任务放入队列

Ric*_* Mo 14 concurrency asynchronous ios async-await swift

假设我有这个代码

class Duck{
    
    func walk() async {
        //do something
        print("walk start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("walk end")
    }
    
    func quack() async {
        //do something...
        print("quack start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("quack end")
    }
    
    func fly() async{
        //do something
        print("fly start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("fly end")
    }
    
}

let duck = Duck()

Task{
    await duck.walk()
}

Task{
    await duck.quack()
}

Task{
    await duck.fly()
}
Run Code Online (Sandbox Code Playgroud)

这将打印

walk start
quack start
fly start
walk end
quack end
fly end
Run Code Online (Sandbox Code Playgroud)

这是我理解和期望的。但是如果我希望这 3 个Task顺序运行怎么办?假设每个Task都是由用户按下按钮创建的。我希望任务在后台排队并一项一项地运行。有没有类似可以DispatchWorkItemDispatchQueueaTask版本中排队的东西?


编辑:

我想出了一个解决方案,但我不确定这是否是实现它的好方法。由于这种实现可能会创建多层级联Task,我想知道是否会有堆栈溢出或内存泄漏的风险?

class TaskQueue{
    private var currentTask : Task<Void,Never> = Task{}
    
    func dispatch(block:@escaping () async ->Void){
        
        let oldTask = currentTask
        currentTask = Task{
            _ = await oldTask.value
            await block()
        }
    }
}

taskQueue.dispatch {
    await duck.walk()
}
taskQueue.dispatch {
    await duck.quack()
}
taskQueue.dispatch {
    await duck.fly()
}
Run Code Online (Sandbox Code Playgroud)

Rob*_*Rob 17

我曾经是非结构化任务方法的支持者,其中每个任务都执行await前一个任务。回想起来,这对我来说有点脆弱。AsyncChannel我现在越来越多地使用异步序列(特别是来自 Apple\xe2\x80\x99s )(感谢 Rob Napier 在这个方向上的推动)swift-async-algorithms。我认为这是一种更健壮的行为,并且与现代 Swift 并发的异步序列更加一致。

\n

在我们讨论您的示例之前,请考虑这个串行下载器,其中我们有一个进程(用户单击按钮)将对象发送到另一个进程,该进程在- -循环URL中监视 URL 通道:forawaitin

\n
struct DownloadView: View {\n    @StateObject var viewModel = DownloadViewModel()\n\n    var body: some View {\n        VStack {\n            Button("1") { Task { await viewModel.appendDownload(1) } }\n            Button("2") { Task { await viewModel.appendDownload(2) } }\n            Button("3") { Task { await viewModel.appendDownload(3) } }\n        }\n        .task {\n            await viewModel.monitorDownloadRequests()\n        }\n    }\n}\n\n@MainActor\nclass DownloadViewModel: ObservableObject {\n    private let session: URLSession = \xe2\x80\xa6\n    private let baseUrl: URL = \xe2\x80\xa6\n    private let folder: URL = \xe2\x80\xa6\n    private let channel = AsyncChannel<URL>()   // note, we\'re sending URLs on this channel\n\n    func monitorDownloadRequests() async {\n        for await url in channel {\n            await download(url)\n        }\n    }\n\n    func appendDownload(_ index: Int) async {\n        let url = baseUrl.appending(component: "\\(index).jpg")\n        await channel.send(url)\n    }\n\n    func download(_ url: URL) async {\n        do {\n            let (location, _) = try await session.download(from: url)\n            let fileUrl = folder.appending(component: url.lastPathComponent)\n            try? FileManager.default.removeItem(at: fileUrl)\n            try FileManager.default.moveItem(at: location, to: fileUrl)\n        } catch {\n            print(error)\n        }\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

我们启动monitorDownloadRequests并将append请求下载到频道。

\n

这会串行执行请求(因为monitorDownloadRequests有一个for-await循环)。例如,在 Instruments\xe2\x80\x99 \xe2\x80\x9cPoints of Interest\xe2\x80\x9d 工具中,我在单击这些按钮的位置添加了一些 \xe2\x93\x88 路标,并显示请求发生的时间间隔,您可以看到这三个请求是按顺序发生的。

\n

在此输入图像描述

\n

但通道的奇妙之处在于它们提供串行行为,而不会引入非结构化并发问题。他们还会自动处理取消(如果您想要这种行为)。如果取消for- await-in循环(当.task {\xe2\x80\xa6}视图关闭时,视图修饰符会在 SwiftUI 中自动为我们执行此操作)。如果您有一堆非结构化并发,其中一个Task并发等待前一个并发,那么处理取消很快就会变得混乱。

\n
\n

现在,就您而言,您正在询问一个更通用的队列,您可以在其中等待任务。好吧,你可以有一个AsyncChannelof 闭包:

\n
typealias AsyncClosure = () async -> Void\n\nlet channel = AsyncChannel<AsyncClosure>()\n
Run Code Online (Sandbox Code Playgroud)\n

例如:

\n
typealias AsyncClosure = () async -> Void\n\nstruct ExperimentView: View {\n    @StateObject var viewModel = ExperimentViewModel()\n\n    var body: some View {\n        VStack {\n            Button("Red")   { Task { await viewModel.addRed() } }\n            Button("Green") { Task { await viewModel.addGreen() } }\n            Button("Blue")  { Task { await viewModel.addBlue() } }\n        }\n        .task {\n            await viewModel.monitorChannel()\n        }\n    }\n}\n\n@MainActor\nclass ExperimentViewModel: ObservableObject {\n    let channel = AsyncChannel<AsyncClosure>()\n\n    func monitorChannel() async {\n        for await block in channel {\n            await block()\n        }\n    }\n\n    func addRed() async {\n        await channel.send { await self.red() }\n    }\n\n    func addGreen() async {\n        await channel.send { await self.green() }\n    }\n\n    func addBlue() async {\n        await channel.send { await self.blue() }\n    }\n\n    func red() async { \xe2\x80\xa6 }\n\n    func green() async { \xe2\x80\xa6 }\n\n    func blue() async { \xe2\x80\xa6 }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

得出:

\n

在此输入图像描述

\n

在这里,我再次使用 Instruments 来可视化正在发生的事情。我连续快速单击了 \xe2\x80\x9cred\xe2\x80\x9d、\xe2\x80\x9cgreen\xe2\x80\x9d 和 \xe2\x80\x9cblue\xe2\x80\x9d 按钮两次。然后我观察了这三个第二任务的六个相应间隔。然后,我第二次重复该六次单击过程,但这次我在第二组按钮点击的绿色任务完成之前就忽略了相关视图,这说明了AsyncChannel异步序列的无缝取消功能)一般来说)。

\n

现在,我希望你原谅我,因为我省略了创建所有这些 \xe2\x80\x9cPoints of Interest\xe2\x80\x9d 路标和间隔的代码,因为它添加了很多与实际无关的 kruft手头的问题(但请参阅这个如果您感兴趣,但希望这些可视化有助于说明正在发生的事情。

\n

最重要的信息是AsyncChannel(及其兄弟)是保持结构化并发的好方法,但获得串行(或受约束的行为,如本答案AsyncThrowingChannel末尾所示))是保持结构化并发的好方法,但获得我们过去通过队列获得),但通过异步任务。

\n

我必须承认,后一个AsyncClosure例子虽然有望回答你的问题,但在我看来有点强迫。我已经使用了AsyncChannel几个月了,我个人总是有一个由通道处理的更具体的对象(例如,URL、GPS 位置、图像标识符等)。这个带有闭包的示例感觉有点过于努力地重现老式的调度/操作队列行为。

\n


Ric*_* Mo 4

更新:

对于未来觉得这篇文章有用的人,我创建了一个具有更好实现的 swift 包,并添加了对排队的支持AsyncThrowingStream

https://github.com/rickymohk/SwiftTaskQueue


这是我更新的实现,我认为它比我在问题中发布的实现更安全。该TaskQueueActor部分完成了所有工作,我用外部类包装它,只是为了在从非异步上下文调用时使其更清晰。

class TaskQueue{
    
    private actor TaskQueueActor{
        private var blocks : [() async -> Void] = []
        private var currentTask : Task<Void,Never>? = nil
        
        func addBlock(block:@escaping () async -> Void){
            blocks.append(block)
            next()
        }
        
        func next()
        {
            if(currentTask != nil) {
                return
            }
            if(!blocks.isEmpty)
            {
                let block = blocks.removeFirst()
                currentTask = Task{
                    await block()
                    currentTask = nil
                    next()
                }
            }
        }
    }
    private let taskQueueActor = TaskQueueActor()
    
    func dispatch(block:@escaping () async ->Void){
        Task{
            await taskQueueActor.addBlock(block: block)
        }
    }
}
Run Code Online (Sandbox Code Playgroud)