使 Swift 并发中的任务串行运行

Rem*_*tra 9 multithreading swift swift-concurrency

我有一个基于文档的应用程序,它使用结构作为其主要数据/模型。由于模型是它(的子类)的属性,NSDocument因此需要从主线程访问。到目前为止一切都很好。

但对数据的某些操作可能需要相当长的时间,我想为用户提供一个进度条。这就是问题开始的地方。特别是当用户从 GUI 快速连续启动两个操作时。

如果我同步(或以“正常”方式)在模型上运行操作,Task {}我会得到正确的串行行为,但主线程被阻止,因此我无法显示进度条。(选项A)

如果我在闭包中对模型运行操作,Task.detached {}我可以更新进度条,但根据模型上操作的运行时间,用户的第二个操作可能会在第一个操作之前完成,从而导致无效/意外状态模型的。这是由于await分离任务中需要的语句(我认为)。(选项B)。

所以我想要 a) 释放主线程来更新 GUI,b) 确保每个任务在另一个(排队的)任务开始之前运行完全完成。使用后台串行调度队列很有可能实现这一点,但我正在尝试切换到新的 Swift 并发系统,该系统也用于在访问模型之前执行任何准备工作。

我尝试使用全局参与者,因为这似乎是某种串行后台队列,但它也需要await语句。尽管模型中出现意外状态的可能性降低了,但这仍然是可能的。

我写了一些小代码来演示这个问题:

该模型:

struct Model {
    var doneA = false
    var doneB = false

    mutating func updateA() {
        Thread.sleep(forTimeInterval: 5)
        doneA = true
    }

    mutating func updateB() {
        Thread.sleep(forTimeInterval: 1)
        doneB = true
    }
}
Run Code Online (Sandbox Code Playgroud)

和文档(省略标准覆盖NSDocument):

@globalActor
struct ModelActor {
    actor ActorType { }

    static let shared: ActorType = ActorType()
}

class Document: NSDocument {
    var model = Model() {
        didSet {
            Swift.print(model)
        }
    }

    func update(model: Model) {
        self.model = model
    }

    @ModelActor
    func updateModel(with operation: (Model) -> Model) async {
        var model = await self.model
        model = operation(model)
        await update(model: model)
    }

    @IBAction func operationA(_ sender: Any?) {
        //Option A
//        Task {
//            Swift.print("Performing some A work...")
//            self.model.updateA()
//        }

        //Option B
//        Task.detached {
//            Swift.print("Performing some A work...")
//            var model = await self.model
//            model.updateA()
//            await self.update(model: model)
//        }

        //Option C
        Task.detached {
            Swift.print("Performing some A work...")
            await self.updateModel { model in
                var model = model
                model.updateA()
                return model
            }
        }
    }

    @IBAction func operationB(_ sender: Any?) {
        //Option A
//        Task {
//            Swift.print("Performing some B work...")
//            self.model.updateB()
//        }

        //Option B
//        Task.detached {
//            Swift.print("Performing some B work...")
//            var model = await self.model
//            model.updateB()
//            await self.update(model: model)
//        }

        //Option C
        Task.detached {
            Swift.print("Performing some B work...")
            await self.updateModel { model in
                var model = model
                model.updateB()
                return model
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

单击“操作 A”,然后单击“操作 B”应该会生成带有两个 的模型true。但情况并非总是如此。

有没有办法确保操作 A 在我进行操作 B 之前完成并让主线程可用于 GUI 更新?

编辑 根据罗布的回答我想出了以下内容。我这样修改它是因为我可以等待创建的操作并向原始调用者报告任何错误。我认为通过将所有代码包含在单个函数中更容易理解正在发生的事情update,因此我选择执行独立任务而不是actor. 我还从任务中返回中间模型,否则可能会使用旧模型。

class Document {
    func updateModel(operation: @escaping (Model) throws -> Model) async throws {
        //Update the model in the background
        let modelTask = Task.detached { [previousTask, model] () throws -> Model in
            var model = model

            //Check whether we're cancelled
            try Task.checkCancellation()

            //Check whether we need to wait on earlier task(s)
            if let previousTask = previousTask {
                //If the preceding task succeeds we use its model
                do {
                    model = try await previousTask.value
                } catch {
                    throw CancellationError()
                }
            }

            return try operation(model)
        }


        previousTask = modelTask
        defer { previousTask = nil } //Make sure a later task can always start if we throw

        //Wait for the operation to finish and store the model
        do {
            self.model = try await modelTask.value
        } catch {
            if error is CancellationError { return }
            else { throw error }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

呼叫方:

@IBAction func operationA(_ sender: Any?) {
    //Option D
    Task {
        do {
            try await updateModel { model in
                var model = model
                model.updateA()
                return model
            }
        } catch {
            presentError(error)
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

它似乎做了我需要的任何事情,即将对文档上的属性的更新进行排队,可以等待并返回错误,就像所有事情都发生在主线程上一样。唯一的缺点似乎是在调用端,由于需要创建模型 avar并显式返回它,所以闭包非常冗长。

Rob*_*Rob 19

显然,如果您的任务没有任何await或其他暂停点,您只需使用演员,而不是创建方法async,它会自动按顺序执行它们。

\n

但是,在处理异步 Actor 方法时,必须认识到 Actor 是可重入的(请参阅SE-0306:Actors - Actor 可重入性)。如果您确实尝试串行运行一系列异步任务,您将需要手动让每个后续任务等待前一个任务。例如,

\n
actor Foo {\n    private var previousTask: Task<Void, Error>?\n\n    func add(block: @Sendable @escaping () async throws -> Void) {\n        previousTask = Task { [previousTask] in\n            let _ = await previousTask?.result\n\n            return try await block()\n        }\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

上述内容有两个微妙的方面:

\n
    \n
  1. 我使用捕获列表[previousTask]来确保获得先前任务的副本。

    \n
  2. \n
  3. await previousTask?.value 新任务中执行,而不是在新任务之前执行。

    \n

    如果您在创建新任务之前等待,则会出现竞赛,如果您启动三个任务,第二个和第三个任务都将等待第一个任务,即第三个任务不会等待第二个任务。

    \n
  4. \n
\n

而且,也许不用说,因为这是在一个 actor 内,所以它避免了对分离任务的需要,同时保持主线程空闲。

\n

在此输入图像描述

\n
\n

请注意,当使用非结构化并发(即Task {\xe2\x80\xa6}Task.detached {\xe2\x80\xa6})时,您需要负责处理取消,例如使用withTaskCancellationHandler

\n
actor Foo<Value: Sendable> {\n    private var previousTask: Task<Value, Error>?\n\n    func add(block: @Sendable @escaping () async throws -> Value) async throws -> Value {\n        let task = Task { [previousTask] in\n            try await withTaskCancellationHandler {\n                let _ = try await previousTask?.value\n            } onCancel: {\n                previousTask?.cancel()\n            }\n\n            return try await block()\n        }\n\n        previousTask = task\n\n        return try await withTaskCancellationHandler {\n            try await task.value\n        } onCancel: {\n            task.cancel()\n        }\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

我还将其扩展为可能返回值的块。

\n

因此,例如,我在这里添加了四个任务(仅Task.sleep持续两秒钟,然后返回一个随机值):

\n

在此输入图像描述

\n

或者,如果您在第三个任务中途取消第四个任务:

\n

在此输入图像描述

\n

(不用说,这假设您添加的任务支持取消,CancellationError如果取消则抛出等。标准Apple API,如URLSession,完成所有这些,但正如您所看到的,如果您引入非结构化并发,则需要小心.)

\n
\n

上面的内容有点脆弱,所以我可能建议异步序列(例如,任何符合AsyncSequence协议的内容,例如AsyncStream您自己的自定义异步序列),这也可以为您提供串行行为。

\n

或者,Swift 异步算法AsyncChannel是处理触发某些代码块的串行执行的请求管道的另一种好方法。

\n

例如,这是一个串行下载管理器,使用AsyncChannel一个简单的for- await-in循环来实现串行行为:

\n
actor Foo<Value: Sendable> {\n    private var previousTask: Task<Value, Error>?\n\n    func add(block: @Sendable @escaping () async throws -> Value) async throws -> Value {\n        let task = Task { [previousTask] in\n            try await withTaskCancellationHandler {\n                let _ = try await previousTask?.value\n            } onCancel: {\n                previousTask?.cancel()\n            }\n\n            return try await block()\n        }\n\n        previousTask = task\n\n        return try await withTaskCancellationHandler {\n            try await task.value\n        } onCancel: {\n            task.cancel()\n        }\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

然后你可以做这样的事情:

\n
actor SerialDownloadManager {\n    static let shared = SerialDownloadManager()\n\n    private let session: URLSession = \xe2\x80\xa6\n    private let urls = AsyncChannel<URL>()\n\n    private init() {\n        Task { try await startDownloader() }\n    }\n\n    // this sends URLs on the channel\n\n    func append(_ url: URL) async {\n        await urls.send(url)\n    }\n}\n\nprivate extension SerialDownloadManager {\n    func startDownloader() async throws {\n        let folder = try FileManager.default\n            .url(for: .applicationSupportDirectory, in: .userDomainMask, appropriateFor: nil, create: true)\n            .appending(component: "downloads")\n\n        try? FileManager.default.createDirectory(at: folder, withIntermediateDirectories: true)\n\n        // this consumes the URLs on the channel\n\n        for await url in urls {\n            // if you want to observe in "points of interest"\n            //\n            // let id = OSSignpostID(log: poi)\n            // os_signpost(.begin, log: poi, name: "Download", signpostID: id, "%{public}@", url.lastPathComponent)\n            // defer { os_signpost(.end, log: poi, name: "Download", signpostID: id) }\n\n            // download\n\n            let (location, response) = try await self.session.download(from: url, delegate: nil)\n\n            if let response = response as? HTTPURLResponse, 200 ..< 300 ~= response.statusCode {\n                let destination = folder.appending(component: url.lastPathComponent)\n                try? FileManager.default.removeItem(at: destination)\n                try FileManager.default.moveItem(at: location, to: destination)\n            }\n        }\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

产量:

\n

在此输入图像描述

\n

或者,如果您愿意,您可以允许任务组具有约束并发性,例如,一次执行 4 个任务:

\n
func appendUrls() async {\n    for i in 0 ..< 10 {\n        await SerialDownloadManager.shared.append(baseUrl.appending(component: "\\(i).jpg"))\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

产量:

\n

在此输入图像描述

\n

有关异步序列的更多信息,一般而言,请参阅 WWDC 2021 视频Meet AsyncSequence

\n