使用队列和信号量进行并发和属性包装?

Tru*_*an1 8 concurrency semaphore grand-central-dispatch swift

我正在尝试创建一个线程安全的属性包装器。我只能认为 GCD 队列和信号量是最快速、最可靠的方式。信号量只是更高的性能(如果这是真的),还是有另一个理由使用一个而不是另一个来实现并发?

以下是原子属性包装器的两种变体:

@propertyWrapper
struct Atomic<Value> {
    private var value: Value
    private let queue = DispatchQueue(label: "Atomic serial queue")

    var wrappedValue: Value {
        get { queue.sync { value } }
        set { queue.sync { value = newValue } }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}

@propertyWrapper
struct Atomic2<Value> {
    private var value: Value
    private var semaphore = DispatchSemaphore(value: 1)

    var wrappedValue: Value {
        get {
            semaphore.wait()
            let temp = value
            semaphore.signal()
            return temp
        }

        set {
            semaphore.wait()
            value = newValue
            semaphore.signal()
        }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}

struct MyStruct {
    @Atomic var counter = 0
    @Atomic2 var counter2 = 0
}

func test() {
    var myStruct = MyStruct()

    DispatchQueue.concurrentPerform(iterations: 1000) {
        myStruct.counter += $0
        myStruct.counter2 += $0
   }
}
Run Code Online (Sandbox Code Playgroud)

如何正确测试和测量它们以查看两种实现之间的差异,以及它们是否有效?

Rob*_*Rob 18

FWIW,另一种选择是具有并发队列的读写器模式,其中读取是同步完成的,但允许相对于其他读取并发运行,但写入是异步完成的,但有一个障碍(即不同时相对于任何其他读或写):

@propertyWrapper
class Atomic<Value> {
    private var value: Value
    private let queue = DispatchQueue(label: "com.domain.app.atomic", attributes: .concurrent)

    var wrappedValue: Value {
        get { queue.sync { value } }
        set { queue.async(flags: .barrier) { self.value = newValue } }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}
Run Code Online (Sandbox Code Playgroud)

还有一个是NSLock

@propertyWrapper
struct Atomic<Value> {
    private var value: Value
    private var lock = NSLock()

    var wrappedValue: Value {
        get { lock.synchronized { value } }
        set { lock.synchronized { value = newValue } }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}
Run Code Online (Sandbox Code Playgroud)

在哪里

extension NSLocking {
    func synchronized<T>(block: () throws -> T) rethrows -> T {
        lock()
        defer { unlock() }
        return try block()
    }
}
Run Code Online (Sandbox Code Playgroud)

或者你可以使用不公平的锁:

@propertyWrapper
struct SynchronizedUnfairLock<Value> {
    private var value: Value
    private var lock = UnfairLock()

    var wrappedValue: Value {
        get { lock.synchronized { value } }
        set { lock.synchronized { value = newValue } }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}
Run Code Online (Sandbox Code Playgroud)

在哪里

// One should not use `os_unfair_lock` directly in Swift (because Swift
// can move `struct` types), so we'll wrap it in a `UnsafeMutablePointer`.
// See https://github.com/apple/swift/blob/88b093e9d77d6201935a2c2fb13f27d961836777/stdlib/public/Darwin/Foundation/Publishers%2BLocking.swift#L18
// for stdlib example of this pattern.

final class UnfairLock: NSLocking {
    private let unfairLock: UnsafeMutablePointer<os_unfair_lock> = {
        let pointer = UnsafeMutablePointer<os_unfair_lock>.allocate(capacity: 1)
        pointer.initialize(to: os_unfair_lock())
        return pointer
    }()

    deinit {
        unfairLock.deinitialize(count: 1)
        unfairLock.deallocate()
    }

    func lock() {
        os_unfair_lock_lock(unfairLock)
    }

    func tryLock() -> Bool {
        os_unfair_lock_trylock(unfairLock)
    }

    func unlock() {
        os_unfair_lock_unlock(unfairLock)
    }
}
Run Code Online (Sandbox Code Playgroud)

我们应该认识到,虽然这些和你的提供了原子性,但你必须小心,因为取决于你如何使用它,它可能不是线程安全的。

考虑这个简单的实验,我们将整数递增一百万次:

func threadSafetyExperiment() {
    @Atomic var foo = 0

    DispatchQueue.global().async {
        DispatchQueue.concurrentPerform(iterations: 10_000_000) { _ in
            foo += 1
        }
        print(foo)
    }
}
Run Code Online (Sandbox Code Playgroud)

您希望foo等于 10,000,000,但事实并非如此。那是因为“取值并递增并保存它”的整个交互需要包装在单个同步机制中。

但是你可以添加一个原子增量方法:

extension Atomic where Value: Numeric {
    mutating func increment(by increment: Value) {
        lock.synchronized { value += increment }
    }
}
Run Code Online (Sandbox Code Playgroud)

然后这工作正常:

func threadSafetyExperiment() {
    @Atomic var foo = 0

    DispatchQueue.global().async {
        DispatchQueue.concurrentPerform(iterations: iterations) { _ in
            _foo.increment(by: 1)
        }
        print(foo)
    }
}
Run Code Online (Sandbox Code Playgroud)

如何正确测试和测量它们以查看两种实现之间的差异,以及它们是否有效?

一些想法:

  • 我建议进行超过 1,000 次迭代。您希望进行足够多的迭代,以秒为单位测量结果,而不是毫秒。我在我的例子中使用了一千万次迭代。

  • 单元测试框架非常适合使用该measure方法测试正确性和测量性能(每个单元测试重复性能测试 10 次,结果将由单元测试报告捕获):

    检测结果

    因此,创建一个带有单元测试目标的项目(或者如果需要,将单元测试目标添加到现有项目中),然后创建单元测试,并使用command+执行它们u

  • 如果您为目标编辑方案,则可以选择随机化测试的顺序,以确保它们执行的顺序不会影响性能:

    在此处输入图片说明

    我还会让测试目标使用发布版本,以确保您正在测试优化的版本。

  • 不用说,虽然我通过运行 10m 次迭代对锁进行压力测试,每次迭代增加 1,但这是非常低效的。在每个线程上根本没有足够的工作来证明线程处理的开销是合理的。人们通常会遍历数据集并为每个线程执行更多迭代,并减少同步次数。

    这的实际含义是,在精心设计的并行算法中,您正在做足够的工作来证明多线程是合理的,您正在减少正在发生的同步数量。因此,无法观察到不同同步技术中的微小差异。如果同步机制具有可观察到的性能差异,这可能表明并行化算法中存在更深层次的问题。专注于减少同步,而不是让同步更快。