如何让构成Combine框架的异步管道同步(串行)排队?
假设我有 50 个 URL,我想从中下载相应的资源,假设我想一次下载一个。我知道如何使用 Operation/OperationQueue 来做到这一点,例如使用一个 Operation 子类,该子类在下载完成之前不会声明自己已完成。我将如何使用Combine 做同样的事情?
目前我想到的只是保留一个剩余 URL 的全局列表并弹出一个,为一次下载设置一个管道,进行下载,然后在sink
管道中重复。这似乎不太像结合。
我确实尝试制作一组 URL 并将其映射到一组发布者。我知道我可以“生产”一个发布者,并使用flatMap
. 但后来我仍然在同时进行所有下载。没有任何组合方式以受控方式遍历阵列——或者有吗?
(我也想象过用 Future 做点什么,但我变得绝望了。我不习惯这种思维方式。)
我NSOperation
在Swift中进行子类化,需要覆盖isExecuting
和isFinished
属性,因为我重写了start
方法.
我遇到的问题是如何保留键值观察(KVO),同时还能够覆盖这些属性.
通常在Obj-C中,这很容易重新声明属性,就像readwrite
在类扩展JSONOperation ()
定义中一样.但是,我没有在Swift中看到同样的功能.
例:
class JSONOperation : NSOperation, NSURLConnectionDelegate
{
var executing : Bool
{
get { return super.executing }
set { super.executing } // ERROR: readonly in the superclass
}
// Starts the asynchronous NSURLConnection on the main thread
override func start()
{
self.willChangeValueForKey("isExecuting")
self.executing = true
self.didChangeValueForKey("isExecuting")
NSOperationQueue.mainQueue().addOperationWithBlock(
{
self.connection = NSURLConnection(request: self.request, delegate: self, startImmediately: true)
})
}
}
Run Code Online (Sandbox Code Playgroud)
所以这是我提出的解决方案,但它感觉非常丑陋和hacky:
var state = Operation() …
Run Code Online (Sandbox Code Playgroud) class SomeViewController: UIViewController {
let semaphore = DispatchSemaphore(value: 1)
deinit {
semaphore.signal() // just in case?
}
func someLongAsyncTask() {
semaphore.wait()
...
semaphore.signal() // called much later
}
}
Run Code Online (Sandbox Code Playgroud)
如果我告诉信号量等待,然后在信号量被告知发出信号之前取消初始化拥有它的视图控制器,则应用程序会因错误而崩溃Thread 1: EXC_BAD_INSTRUCTION (code=EXC_I386_INVOP, subcode=0x0)
。但是,如果我只是调用视图控制器的方法,就可以避免灾难semaphore.signal()
。deinit
但是,如果异步函数在deinit
调用之前返回并且视图控制器被取消初始化,则signal()
调用两次,这似乎没有问题。但这样做安全和/或明智吗?
I\xe2\x80\x99m 目前正在迁移我的应用程序以使用 Swift 中的并发模型。我想序列化任务以确保它们一个接一个地执行(无并行性)。在我的用例中,我想监听通知中心发布的通知,并在每次发布新通知时执行任务。但我想确保之前没有任务正在运行。这相当于使用 maxConcurrentOperationCount = 1 的操作队列。
\n例如,我\xe2\x80\x99m 在我的应用程序中使用 CloudKit 和 Core Data,并使用持久历史记录来确定存储中发生了哪些更改。\n在此将本地存储同步到云示例代码中,Apple 使用用于处理历史处理任务的操作队列(在CoreDataStack中)。此OperationQueue 的最大操作数设置为1。
\nprivate lazy var historyQueue: OperationQueue = {\n let queue = OperationQueue()\n queue.maxConcurrentOperationCount = 1\n return queue\n}()\n
Run Code Online (Sandbox Code Playgroud)\n当收到 Core Data 通知时,新任务将添加到该串行操作队列中。因此,如果收到多个通知,它们都会以串行方式一个接一个地执行。
\n@objc\nfunc storeRemoteChange(_ notification: Notification) {\n // Process persistent history to merge changes from other coordinators.\n historyQueue.addOperation {\n self.processPersistentHistory()\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n在此加载和显示大数据源示例代码中,Apple 使用任务来处理历史更改(在 QuakesProvider 中)。
\n// Observe Core Data remote change notifications on the queue where …
Run Code Online (Sandbox Code Playgroud) core-data nsoperationqueue grand-central-dispatch swift swift-concurrency
如何将 KVO 通知与线程安全结合起来?我有一个类需要符合 KVO 标准,这就是我目前的做法:
class CustomOperation: Operation {
private let stateQueue = DispatchQueue(label: "customOperation", attributes: .concurrent)
private var _isExecuting = false
override var isExecuting: Bool {
get {
return stateQueue.sync { _isExecuting }
}
set {
stateQueue.async(flags: .barrier) {
self._isExecuting = newValue
}
}
}
override func start() {
willChangeValue(forKey: "isExecuting")
isExecuting = true
didChangeValue(forKey: "isExecuting")
}
}
Run Code Online (Sandbox Code Playgroud)
我可以像这样在属性设置器内移动通知吗?
class CustomOperation: Operation {
private let stateQueue = DispatchQueue(label: "customOperation", attributes: .concurrent)
private var _isExecuting = false
override var isExecuting: …
Run Code Online (Sandbox Code Playgroud) SomeOperation went isFinished=YES without being started by the queue it is in
public class SomeOperation : AsyncOperation {
//MARK: Start
public override func start() {
isExecuting = true
guard !isCancelled else {
markAsCompleted() //isExecuting = false, isFinished = true
return
}
doSomethingAsynchronously { [weak self] in
self?.markAsCompleted() //isExecuting = false, isFinished = true
}
}
//MARK: Cancel
public override func cancel() {
super.cancel()
markAsCompleted() //isExecuting = false, isFinished = true …
Run Code Online (Sandbox Code Playgroud) 我确信我的逻辑有问题,只是无法弄清楚它是什么。
有一个“Service”类,它有一个操作队列:
class Service {
let queue: OperationQueue = {
var queue = OperationQueue()
queue.name = "my.operationQueue"
queue.maxConcurrentOperationCount = 1
return queue
}()
func add(operation: Operation) {
queue.addOperation(operation)
}
}
Run Code Online (Sandbox Code Playgroud)
该操作是异步的,因此它会覆盖状态和函数start
:
class MyOp: Operation {
private var state: State = .ready
private var id: Int
init(id: Int) {
self.id = id
}
override var isAsynchronous: Bool {
return true
}
override var isReady: Bool {
return state == .ready
}
override var isExecuting: Bool {
return state == …
Run Code Online (Sandbox Code Playgroud)