Swift 5.5 并发:如何序列化异步任务以用 maxConcurrentOperationCount = 1 替换 OperationQueue?

alp*_*nec 10 core-data nsoperationqueue grand-central-dispatch swift swift-concurrency

I\xe2\x80\x99m 目前正在迁移我的应用程序以使用 Swift 中的并发模型。我想序列化任务以确保它们一个接一个地执行(无并行性)。在我的用例中,我想监听通知中心发布的通知,并在每次发布新通知时执行任务。但我想确保之前没有任务正在运行。这相当于使用 maxConcurrentOperationCount = 1 的操作队列。

\n

例如,我\xe2\x80\x99m 在我的应用程序中使用 CloudKit 和 Core Data,并使用持久历史记录来确定存储中发生了哪些更改。\n在此将本地存储同步到云示例代码中,Apple 使用用于处理历史处理任务的操作队列(在CoreDataStack中)。此OperationQueue 的最大操作数设置为1。

\n
private lazy var historyQueue: OperationQueue = {\n    let queue = OperationQueue()\n    queue.maxConcurrentOperationCount = 1\n    return queue\n}()\n
Run Code Online (Sandbox Code Playgroud)\n

当收到 Core Data 通知时,新任务将添加到该串行操作队列中。因此,如果收到多个通知,它们都会以串行方式一个接一个地执行。

\n
@objc\nfunc storeRemoteChange(_ notification: Notification) {\n    // Process persistent history to merge changes from other coordinators.\n    historyQueue.addOperation {\n        self.processPersistentHistory()\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

在此加载和显示大数据源示例代码中,Apple 使用任务来处理历史更改(在 QuakesProvider 中)。

\n
// Observe Core Data remote change notifications on the queue where the changes were made.\nnotificationToken = NotificationCenter.default.addObserver(forName: .NSPersistentStoreRemoteChange, object: nil, queue: nil) { note in\n    Task {\n        await self.fetchPersistentHistory()\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

我觉得第二个项目中有问题,因为任务可能以任何顺序发生,而不一定以串行顺序发生(与第一个项目相反,其中OperationQueue为maxConcurrentOperationCount = 1)。

\n

我们是否应该在某处使用参与者来确保方法被串行调用?

\n

我想过这样的实现,但我\xe2\x80\x99m 还不太满意:

\n
actor PersistenceStoreListener {\n    let historyTokenManager: PersistenceHistoryTokenManager = .init()\n    private let persistentContainer: NSPersistentContainer\n\n    init(persistentContainer: NSPersistentContainer) {\n        self.persistentContainer = persistentContainer\n    }\n\n    func processRemoteStoreChange() async {\n        print("\\(#function) called on \\(Date.now.formatted(date: .abbreviated, time: .standard)).")\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

其中当收到新通知时将调用 processRemoteStoreChange 方法(AsyncSequence):

\n
notificationListenerTask = Task {\n   let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)\n   \n   for await _ in notifications {\n        print("notificationListenerTask called on \\(Date.now.formatted(date: .abbreviated, time: .standard)).")\n        await self.storeListener?.processRemoteStoreChange()\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

Rob*_*Rob 13

下面,在我原来的回答中,我回答了如何在 Swift 并发中从独立任务实现顺序行为的一般问题。

\n

但是,您提出了一个更具体的问题,即如何从异步事件序列中获取串行行为。如果您有AsyncSequence, 例如notifications,那么您在答案末尾考虑的for--方法是一个很好的解决方案:awaitin

\n
notificationListenerTask = Task {\n    let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)\n   \n    for await _ in notifications {\n        await self.storeListener?.processRemoteStoreChange()\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

因为您在循环中,所以直到先前的返回并且循环的执行继续之前await,它不会进入下一次迭代。notifications AsyncSequenceprocessRemoteStoreChange

\n

底线AsyncSequence(无论notifications是您自己的AsyncStream还是AsyncChannel)是从一系列异步事件中获取串行行为的绝佳方法。WWDC 2021 视频Meet AsyncSequence对于那些不熟悉该协议的人来说是异步序列的一个很好的入门读物AsyncSequence

\n
\n

在我最初的回答中,下面,我解决了从一系列独立的 Swift 并发任务中获取串行行为的更普遍的问题:

\n
\n

OperationQueue如果您想获得带有maxConcurrentOperationCountof的行为1(\xe2\x80\x9dserial\xe2\x80\x9d 操作队列),可以使用actor.

\n

您将在连续剧中看到两种模式OperationQueue

\n
    \n
  1. 队列中的操作本身是同步的。

    \n

    如果您使用标准OperationQueue(即,您没有子类化Operation手动 KVOisFinished等),则可以简单地actor实现我们想要的。演员将阻止并发执行。

    \n

    但这里的关键是,这只适用于同步方法(即那些没有await挂起点的方法)。

    \n
  2. \n
  3. 队列中的操作是异步的。

    \n

    操作队列的更高级用例之一是处理本身异步的任务之间的依赖关系。这是操作队列中更复杂的场景,需要一个自定义Operation子类,您可以在其中手动处理 的 KVOisFinished等(有关该模式的示例,请参阅此答案。)

    \n

    使用 Swift 并发执行此操作的挑战是参与者是可重入的(请参阅SE-0306 中的可重入讨论。如果 actor\xe2\x80\x99s 方法是异步的(即,带有async- await),则会引入挂起点,即,await其中一次调用将允许另一种async方法在该参与者上运行。

    \n

    要实现不同async方法之间的串行执行,您有以下几种选择:

    \n
      \n
    • 你可以awaitTask;或者
    • \n
    • 您可以使用AsyncSequence诸如AsyncStreamor AsyncChannel(请参阅/sf/answers/5301133841/)。
    • \n
    \n
  4. \n
\n
\n

请考虑以下内容(其中使用操作系统路标,以便我可以以图形方式说明仪器中的行为):

\n
import os.signpost\n\nprivate let pointsOfInterest = OSLog(subsystem: "log", category: .pointsOfInterest)\n\nclass ViewController: UIViewController {\n\n    let example = Example()\n    let taskSerializer = SerialTasks<Void>()\n\n    @IBAction func didTapSync(_ sender: Any) {\n        os_signpost(.event, log: pointsOfInterest, name: #function)\n        startSynchronous()\n    }\n\n    @IBAction func didTapAsync(_ sender: Any) {\n        os_signpost(.event, log: pointsOfInterest, name: #function)\n        Task { try await startAsynchronous() }\n    }\n\n    @IBAction func didTapSerializedAsync(_ sender: Any) {\n        os_signpost(.event, log: pointsOfInterest, name: #function)\n        Task { try await startSerializedAsynchronous() }\n    }\n\n    func startSynchronous() {\n        Task {\n            await example.synchronousExample("1. synchronous")\n        }\n    }\n\n    func startAsynchronous() async throws {\n        try await example.asynchronousExample("2. asynchronous")\n    }\n\n    func startSerializedAsynchronous() async throws {\n        try await taskSerializer.add {\n            try await self.example.asynchronousExample("3. serial async")\n        }\n    }\n}\n\nactor Example {\n    func asynchronousExample(_ name: StaticString) async throws {\n        let id = OSSignpostID(log: pointsOfInterest)\n        os_signpost(.begin, log: pointsOfInterest, name: name, signpostID: id)\n        defer { os_signpost(.end, log: pointsOfInterest, name: name, signpostID: id) }\n\n        try await Task.sleep(for: .seconds(2))\n    }\n\n    func synchronousExample(_ name: StaticString) {\n        let id = OSSignpostID(log: pointsOfInterest)\n        os_signpost(.begin, log: pointsOfInterest, name: name, signpostID: id)\n        defer { os_signpost(.end, log: pointsOfInterest, name: name, signpostID: id) }\n\n        Thread.sleep(forTimeInterval: 2)\n    }\n}\n\nactor SerialTasks<Success> {\n    private var previousTask: Task<Success, Error>?\n\n    func add(block: @Sendable @escaping () async throws -> Success) async throws -> Success {\n        let task = Task { [previousTask] in\n            let _ = await previousTask?.result\n            return try await block()\n        }\n        previousTask = task\n        return try await task.value\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

对于同步任务(场景 1),startSynchronous, 是最简单的。只需调用 actor 的同步方法即可串行执行。

\n

对于异步任务(场景 2),startAsynchronous如果您有await挂起点,则会由于 actor 重入而丢失顺序行为。

\n

但是您可以通过在上面的代码中使用一个参与者来完善异步任务模式(场景 3),SerialTasks该参与者跟踪上一个任务,并在开始下一个任务之前等待它。一个微妙的点是add方法本身是同步的(尽管它采用的闭包是异步的)。如果您添加多个任务,这可以避免微妙的竞争。

\n

在 Instruments 中运行上述代码,我们可以以图形方式查看执行情况,其中包含\xe2\x93\xa2启动任务的路标,以及显示任务执行时间的时间间隔:

\n

在此输入图像描述

\n

简而言之,如果您actor仅执行同步任务(这就是您的情况),那么会自动actor产生maxConcurrentOperationCount = 1某种行为。如果任务是异步的,您只需要await完成前一个任务,然后再开始下一个任务。

\n