在swift中创建线程安全数组

pat*_*ckS 51 arrays multithreading read-write ios swift

我有快速的线程问题.我有一个包含一些对象的数组.在委托上,类每秒都会获得新对象.之后我必须检查对象是否已经在数组中,所以我必须更新对象,否则我必须删除/添加新对象.

如果我添加一个新对象,我必须首先通过网络获取一些数据.这是handelt经过一个街区.

现在我的问题是,如何同步这项任务?

我尝试了一个dispatch_semaphore,但是这个阻止了UI,直到块完成.

我还尝试了一个简单的bool变量,它检查块当前是否执行并同时跳过compare方法.

但这两种方法都不理想.

什么是管理阵列的最佳方式,我不想在阵列中有重复的数据.

ski*_*kim 68

Kirsteins是正确的,但您并不总是需要使用调度队列.您可以使用:

objc_sync_enter(array)
// manipulate the array
objc_sync_exit(array)
Run Code Online (Sandbox Code Playgroud)

这应该做的伎俩.为了增加额外的好处,您可以创建一个在需要线程安全时使用的函数:

func sync(lock: NSObject, closure: () -> Void) {
    objc_sync_enter(lock)
    closure()
    objc_sync_exit(lock)
}

...
var list = NSMutableArray()
sync (list) {
   list.addObject("something")
}
Run Code Online (Sandbox Code Playgroud)

请注意,我已更改AnyObjectNSObject.在Swift集合类型中实现为structs并且它们按值传递,因此我猜测使用方便函数时使用通过引用传递的可变集合类会更安全.sync

更新Swift

线程安全访问的推荐模式是使用dispatch barrier:

let queue = DispatchQueue(label: "thread-safe-obj", attributes: .concurrent)

// write
queue.async(flags: .barrier) {
    // perform writes on data
}

// read
var value: ValueType!
queue.sync {
    // perform read and assign value
}
return value
Run Code Online (Sandbox Code Playgroud)

  • 在诸如`Array`之类的值类型上使用`objc_sync_enter`是不安全/正确的.`objc_sync_array`执行指针比较,并且不保证桥接Swift值类型的指针的生命周期是稳定的(实际上大多数时候它是*不*稳定). (10认同)
  • 你让我很好奇,所以我在'objc_sync_enter`上找到了这个:http://rykap.com/objective-c/2015/05/09/synchronized/ (2认同)
  • 这不起作用,原因/sf/ask/2455932811/ (2认同)

Kir*_*ins 32

我解决这个问题的方法是使用串行调度队列来同步对盒装数组的访问.当你试图获取索引值并且队列真的很忙时它会阻塞线程,但这也是锁的问题.

public class SynchronizedArray<T> {
    private var array: [T] = []
    private let accessQueue = dispatch_queue_create("SynchronizedArrayAccess", DISPATCH_QUEUE_SERIAL)

    public func append(newElement: T) {
        dispatch_async(self.accessQueue) {
            self.array.append(newElement)
        }
    }

    public subscript(index: Int) -> T {
        set {
            dispatch_async(self.accessQueue) {
                self.array[index] = newValue
            }
        }
        get {
            var element: T!

            dispatch_sync(self.accessQueue) {
                element = self.array[index]
            }

            return element
        }
    }
}

var a = SynchronizedArray<Int>()
a.append(1)
a.append(2)
a.append(3)

// can be empty as this is non-thread safe access
println(a.array)

// thread-safe synchonized access
println(a[0])
println(a[1])
println(a[2])
Run Code Online (Sandbox Code Playgroud)

  • 您可以考虑使用读写器模式:使用`DISPATCH_QUEUE_CONCURRENT`; 将写入从`dispatch_async`更改为`dispatch_barrier_async`; 但是将读取保留为`dispatch_sync`.这为您提供了并发读取,但写入仍然是同步的. (22认同)
  • 你只对`dispatch_barrier_async`进行写操作.但是用`dispatch_sync`读取.因此,虽然写入不会(并且不应该)与其他任何内容同时发生,但是读取_can_与其他读取同时发生.请参阅WWDC 2012视频后半部分中的模式#6 [带块,GCD和XPC的异步设计模式](https://developer.apple.com/videos/play/wwdc2012-712/). (3认同)

rmo*_*ney 32

Kirsteins的答案是正确的,但为方便起见,我已经用Amol Chaudhari和Rob的建议更新了答案,建议使用带有异步屏障的并发队列来允许并发读取但阻止写入.

我还包装了一些对我有用的其他数组函数.

public class SynchronizedArray<T> {
private var array: [T] = []
private let accessQueue = dispatch_queue_create("SynchronizedArrayAccess", DISPATCH_QUEUE_CONCURRENT)

public func append(newElement: T) {
    dispatch_barrier_async(self.accessQueue) {
        self.array.append(newElement)
    }
}

public func removeAtIndex(index: Int) {
    dispatch_barrier_async(self.accessQueue) {
        self.array.removeAtIndex(index)
    }
}

public var count: Int {
    var count = 0

    dispatch_sync(self.accessQueue) {
        count = self.array.count
    }

    return count
}

public func first() -> T? {
    var element: T?

    dispatch_sync(self.accessQueue) {
        if !self.array.isEmpty {
            element = self.array[0]
        }
    }

    return element
}

public subscript(index: Int) -> T {
    set {
        dispatch_barrier_async(self.accessQueue) {
            self.array[index] = newValue
        }
    }
    get {
        var element: T!

        dispatch_sync(self.accessQueue) {
            element = self.array[index]
        }

        return element
    }
}
}
Run Code Online (Sandbox Code Playgroud)

更新 这是为Swift3更新的相同代码.

public class SynchronizedArray<T> {
private var array: [T] = []
private let accessQueue = DispatchQueue(label: "SynchronizedArrayAccess", attributes: .concurrent)

public func append(newElement: T) {

    self.accessQueue.async(flags:.barrier) {
        self.array.append(newElement)
    }
}

public func removeAtIndex(index: Int) {

    self.accessQueue.async(flags:.barrier) {
        self.array.remove(at: index)
    }
}

public var count: Int {
    var count = 0

    self.accessQueue.sync {
        count = self.array.count
    }

    return count
}

public func first() -> T? {
    var element: T?

    self.accessQueue.sync {
        if !self.array.isEmpty {
            element = self.array[0]
        }
    }

    return element
}

public subscript(index: Int) -> T {
    set {
        self.accessQueue.async(flags:.barrier) {
            self.array[index] = newValue
        }
    }
    get {
        var element: T!
        self.accessQueue.sync {
            element = self.array[index]
        }

        return element
    }
}
}
Run Code Online (Sandbox Code Playgroud)

  • removeAtIndex可以删除此代码中的错误项目导致idx冷却被其他线程'removeAtIndex'更改... (3认同)
  • @mooney我认为@Speakus就在这里:说sychronized数组有一个元素,这不会失败吗?`if synchronizedArray.count == 1 {synchronizedArray.remove(at:0)}`这是一个竞争条件,比如两个线程执行该语句.两者同时读取1的计数,两者同时将写入块排队.写块顺序执行,第二个将失败.我错了吗?这是设计者还是用户错误?在任何一种情况下它都很脆弱,让我永远不想触摸多线程...你怎么能让这个安全?回到阅读和写作的独家锁? (3认同)
  • @rmooney - 在您的 Swift 3 和 4 示例中,您可以简化其中一些例程,消除隐式解包的选项,例如 `var first: T? { return accessQueue.sync { self.array.first } }` (2认同)

Eri*_*ner 22

我不知道为什么人们对这么简单的事情采取如此复杂的方法

  • 不要滥用DispatchQueues锁定。使用queue.sync无非是在锁 ( DispatchGroup) 等待期间获取锁并将工作分派给另一个线程。这不仅是不必要的,而且还会产生副作用,具体取决于您锁定的内容。你可以在GCD Source自行查找

  • 不要使用objc_sync_enter/exit,它们被 ObjC 使用,@synchronized它们会隐式地将 Swift 集合桥接到 ObjC 对应物,这也是不必要的。它是一个遗留 API。

只需定义一个锁,并保护您的收藏访问。

var lock = DispatchSemaphore(value: 1)
var a = [1, 2, 3]

lock.wait()
a.append(4)
lock.signal()
Run Code Online (Sandbox Code Playgroud)

如果你想让你的生活更轻松一点,定义一个小的扩展。

extension DispatchSemaphore {

    @discardableResult
    func with<T>(_ block: () throws -> T) rethrows -> T {
        wait()
        defer { signal() }
        return try block()
    }
}

let lock = DispatchSemaphore(value: 1)
var a = [1, 2, 3]

lock.with { a.append(4) }
Run Code Online (Sandbox Code Playgroud)

您还可以定义 a@propertyWrapper使您的成员具有var原子性。

@propertyWrapper
struct Atomic<Value> {

    private let lock = DispatchSemaphore(value: 1)
    private var value: Value

    init(default: Value) {
        self.value = `default`
    }

    var wrappedValue: Value {
        get {
            lock.wait()
            defer { lock.signal() }
            return value
        }
        set {
            lock.wait()
            value = newValue
            lock.signal()
        }
    }
}
Run Code Online (Sandbox Code Playgroud)


Vas*_*huk 9

细节

  • Xcode 10.1 (10B61),Swift 4.2
  • Xcode 10.2.1 (10E1001),Swift 5

解决方案

import Foundation

// https://developer.apple.com/documentation/swift/rangereplaceablecollection
struct AtomicArray<T>: RangeReplaceableCollection {

    typealias Element = T
    typealias Index = Int
    typealias SubSequence = AtomicArray<T>
    typealias Indices = Range<Int>
    fileprivate var array: Array<T>
    var startIndex: Int { return array.startIndex }
    var endIndex: Int { return array.endIndex }
    var indices: Range<Int> { return array.indices }

    func index(after i: Int) -> Int { return array.index(after: i) }

    private var semaphore = DispatchSemaphore(value: 1)
    fileprivate func _wait() { semaphore.wait() }
    fileprivate func _signal() { semaphore.signal() }
}

// MARK: - Instance Methods

extension AtomicArray {

    init<S>(_ elements: S) where S : Sequence, AtomicArray.Element == S.Element {
        array = Array<S.Element>(elements)
    }

    init() { self.init([]) }

    init(repeating repeatedValue: AtomicArray.Element, count: Int) {
        let array = Array(repeating: repeatedValue, count: count)
        self.init(array)
    }
}

// MARK: - Instance Methods

extension AtomicArray {

    public mutating func append(_ newElement: AtomicArray.Element) {
        _wait(); defer { _signal() }
        array.append(newElement)
    }

    public mutating func append<S>(contentsOf newElements: S) where S : Sequence, AtomicArray.Element == S.Element {
        _wait(); defer { _signal() }
        array.append(contentsOf: newElements)
    }

    func filter(_ isIncluded: (AtomicArray.Element) throws -> Bool) rethrows -> AtomicArray {
        _wait(); defer { _signal() }
        let subArray = try array.filter(isIncluded)
        return AtomicArray(subArray)
    }

    public mutating func insert(_ newElement: AtomicArray.Element, at i: AtomicArray.Index) {
        _wait(); defer { _signal() }
        array.insert(newElement, at: i)
    }

    mutating func insert<S>(contentsOf newElements: S, at i: AtomicArray.Index) where S : Collection, AtomicArray.Element == S.Element {
        _wait(); defer { _signal() }
        array.insert(contentsOf: newElements, at: i)
    }

    mutating func popLast() -> AtomicArray.Element? {
        _wait(); defer { _signal() }
        return array.popLast()
    }

    @discardableResult mutating func remove(at i: AtomicArray.Index) -> AtomicArray.Element {
        _wait(); defer { _signal() }
        return array.remove(at: i)
    }

    mutating func removeAll() {
        _wait(); defer { _signal() }
        array.removeAll()
    }

    mutating func removeAll(keepingCapacity keepCapacity: Bool) {
        _wait(); defer { _signal() }
        array.removeAll()
    }

    mutating func removeAll(where shouldBeRemoved: (AtomicArray.Element) throws -> Bool) rethrows {
        _wait(); defer { _signal() }
        try array.removeAll(where: shouldBeRemoved)
    }

    @discardableResult mutating func removeFirst() -> AtomicArray.Element {
        _wait(); defer { _signal() }
        return array.removeFirst()
    }

    mutating func removeFirst(_ k: Int) {
        _wait(); defer { _signal() }
        array.removeFirst(k)
    }

    @discardableResult mutating func removeLast() -> AtomicArray.Element {
        _wait(); defer { _signal() }
        return array.removeLast()
    }

    mutating func removeLast(_ k: Int) {
        _wait(); defer { _signal() }
        array.removeLast(k)
    }

    @inlinable public func forEach(_ body: (Element) throws -> Void) rethrows {
        _wait(); defer { _signal() }
        try array.forEach(body)
    }

    mutating func removeFirstIfExist(where shouldBeRemoved: (AtomicArray.Element) throws -> Bool) {
        _wait(); defer { _signal() }
        guard let index = try? array.firstIndex(where: shouldBeRemoved) else { return }
        array.remove(at: index)
    }

    mutating func removeSubrange(_ bounds: Range<Int>) {
        _wait(); defer { _signal() }
        array.removeSubrange(bounds)
    }

    mutating func replaceSubrange<C, R>(_ subrange: R, with newElements: C) where C : Collection, R : RangeExpression, T == C.Element, AtomicArray<Element>.Index == R.Bound {
        _wait(); defer { _signal() }
        array.replaceSubrange(subrange, with: newElements)
    }

    mutating func reserveCapacity(_ n: Int) {
        _wait(); defer { _signal() }
        array.reserveCapacity(n)
    }

    public var count: Int {
        _wait(); defer { _signal() }
        return array.count
    }

    public var isEmpty: Bool {
        _wait(); defer { _signal() }
        return array.isEmpty
    }
}

// MARK: - Get/Set

extension AtomicArray {

    // Single  action

    func get() -> [T] {
        _wait(); defer { _signal() }
        return array
    }

    mutating func set(array: [T]) {
        _wait(); defer { _signal() }
        self.array = array
    }

    // Multy actions

    mutating func get(closure: ([T])->()) {
        _wait(); defer { _signal() }
        closure(array)
    }

    mutating func set(closure: ([T]) -> ([T])) {
        _wait(); defer { _signal() }
        array = closure(array)
    }
}

// MARK: - Subscripts

extension AtomicArray {

    subscript(bounds: Range<AtomicArray.Index>) -> AtomicArray.SubSequence {
        get {
            _wait(); defer { _signal() }
            return AtomicArray(array[bounds])
        }
    }

    subscript(bounds: AtomicArray.Index) -> AtomicArray.Element {
        get {
            _wait(); defer { _signal() }
            return array[bounds]
        }
        set(value) {
            _wait(); defer { _signal() }
            array[bounds] = value
        }
    }
}

// MARK: - Operator Functions

extension AtomicArray {

    static func + <Other>(lhs: Other, rhs: AtomicArray) -> AtomicArray where Other : Sequence, AtomicArray.Element == Other.Element {
        return AtomicArray(lhs + rhs.get())
    }

    static func + <Other>(lhs: AtomicArray, rhs: Other) -> AtomicArray where Other : Sequence, AtomicArray.Element == Other.Element {
        return AtomicArray(lhs.get() + rhs)
    }

    static func + <Other>(lhs: AtomicArray, rhs: Other) -> AtomicArray where Other : RangeReplaceableCollection, AtomicArray.Element == Other.Element {
        return AtomicArray(lhs.get() + rhs)
    }

    static func + (lhs: AtomicArray<Element>, rhs: AtomicArray<Element>) -> AtomicArray {
        return AtomicArray(lhs.get() + rhs.get())
    }

    static func += <Other>(lhs: inout AtomicArray, rhs: Other) where Other : Sequence, AtomicArray.Element == Other.Element {
        lhs._wait(); defer { lhs._signal() }
        lhs.array += rhs
    }
}

// MARK: - CustomStringConvertible

extension AtomicArray: CustomStringConvertible {
    var description: String {
        _wait(); defer { _signal() }
        return "\(array)"
    }
}

// MARK: - Equatable

extension AtomicArray where Element : Equatable {

    func split(separator: Element, maxSplits: Int, omittingEmptySubsequences: Bool) -> [ArraySlice<Element>] {
        _wait(); defer { _signal() }
        return array.split(separator: separator, maxSplits: maxSplits, omittingEmptySubsequences: omittingEmptySubsequences)
    }

    func firstIndex(of element: Element) -> Int? {
        _wait(); defer { _signal() }
        return array.firstIndex(of: element)
    }

    func lastIndex(of element: Element) -> Int? {
        _wait(); defer { _signal() }
        return array.lastIndex(of: element)
    }

    func starts<PossiblePrefix>(with possiblePrefix: PossiblePrefix) -> Bool where PossiblePrefix : Sequence, Element == PossiblePrefix.Element {
        _wait(); defer { _signal() }
        return array.starts(with: possiblePrefix)
    }

    func elementsEqual<OtherSequence>(_ other: OtherSequence) -> Bool where OtherSequence : Sequence, Element == OtherSequence.Element {
        _wait(); defer { _signal() }
        return array.elementsEqual(other)
    }

    func contains(_ element: Element) -> Bool {
        _wait(); defer { _signal() }
        return array.contains(element)
    }

    static func != (lhs: AtomicArray<Element>, rhs: AtomicArray<Element>) -> Bool {
        lhs._wait(); defer { lhs._signal() }
        rhs._wait(); defer { rhs._signal() }
        return lhs.array != rhs.array
    }

    static func == (lhs: AtomicArray<Element>, rhs: AtomicArray<Element>) -> Bool {
        lhs._wait(); defer { lhs._signal() }
        rhs._wait(); defer { rhs._signal() }
        return lhs.array == rhs.array
    }
}
Run Code Online (Sandbox Code Playgroud)

使用示例 1

import Foundation

// init
var array = AtomicArray<Int>()
print(array)
array = AtomicArray(repeating: 0, count: 5)
print(array)
array = AtomicArray([1,2,3,4,5,6,7,8,9])
print(array)

// add
array.append(0)
print(array)
array.append(contentsOf: [5,5,5])
print(array)

// filter
array = array.filter { $0 < 7 }
print(array)

// map
let strings = array.map { "\($0)" }
print(strings)

// insert
array.insert(99, at: 5)
print(array)
array.insert(contentsOf: [2, 2, 2], at: 0)
print(array)

// pop
_ = array.popLast()
print(array)
_ = array.popFirst()
print(array)

// remove
array.removeFirst()
print(array)
array.removeFirst(3)
print(array)
array.remove(at: 2)
print(array)
array.removeLast()
print(array)
array.removeLast(5)
print(array)
array.removeAll { $0%2 == 0 }
print(array)
array = AtomicArray([1,2,3,4,5,6,7,8,9,0])
array.removeSubrange(0...2)
print(array)
array.replaceSubrange(0...2, with: [0,0,0])
print(array)
array.removeAll()
print(array)

array.set(array: [1,2,3,4,5,6,7,8,9,0])
print(array)

// subscript
print(array[0])
array[0] = 100
print(array)
print(array[1...4])

// operator functions
array = [1,2,3] + AtomicArray([4,5,6])
print(array)
array = AtomicArray([4,5,6]) + [1,2,3]
print(array)
array = AtomicArray([1,2,3]) + AtomicArray([4,5,6])
print(array)
Run Code Online (Sandbox Code Playgroud)

使用示例2

import Foundation

var arr = AtomicArray([0,1,2,3,4,5])
for i in 0...1000 {
    // Single actions
    DispatchQueue.global(qos: .background).async {
        usleep(useconds_t(Int.random(in: 100...10000)))
        let num = i*i
        arr.append(num)
        print("arr.append(\(num)), background queue")
    }
    DispatchQueue.global(qos: .default).async {
        usleep(useconds_t(Int.random(in: 100...10000)))
        arr.append(arr.count)
        print("arr.append(\(arr.count)), default queue")
    }

    // multy actions
    DispatchQueue.global(qos: .utility).async {
        arr.set { array -> [Int] in
            var newArray = array
            newArray.sort()
            print("sort(), .utility queue")
            return newArray
        }
    }
}
Run Code Online (Sandbox Code Playgroud)


Lou*_*Lac 9

具有参与者的线程安全数据结构

从 Swift 5.5 开始,你可以用 actor 来表达这一点:

actor SyncArray<T> {
    private var buffer: [T]
    
    init<S: Sequence>(_ elements: S) where S.Element == T {
        buffer = Array(elements)
    }
    
    var count: Int {
        buffer.count
    }
    
    func append(_ element: T) {
        buffer.append(element)
    }
    
    @discardableResult
    func remove(at index: Int) -> T {
        buffer.remove(at: index)
    }
}
Run Code Online (Sandbox Code Playgroud)

它不仅使代码更简单且不易出错,而且使其他答案中指出的潜在竞争条件更加明确:

Task {
    let array = SyncArray([1])

    if await array.count == 1 { 
        await array.remove(at: 0)
    }
}
Run Code Online (Sandbox Code Playgroud)

这里有两个暂停点,这意味着在.remove(at:)调用时,数组count可能已经改变。

这种先读后写的操作必须是原子的才能保持一致,因此应该将其定义为 actor 上的方法:

extension SyncArray {
    func removeLastIfSizeOfOne() {
        if buffer.count == 1 {
            buffer.remove(at: 0)
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

上面,没有挂起点表明该操作是原子执行的。另一种无需编写扩展即可工作的解决方案是使用isolated如下关键字:

func removeLastIfSizeOfOne<T>(_ array: isolated SyncArray<T>) {
    if array == 1 {
        array(at: 0)
    }
}
Run Code Online (Sandbox Code Playgroud)

这将在整个调用期间(而不是在每个暂停点)隔离传递的 Actor。调用该函数只需要一个暂停点。


nbl*_*oqs 7

一个小细节:在Swift 3中(至少在XCode 8 Beta 6中),队列的语法发生了显着变化.@Kirsteins答案的重要变化是:

private let accessQueue = DispatchQueue(label: "SynchronizedArrayAccess")

txAccessQueue.async() {
  // Your async code goes here...
}

txAccessQueue.sync() {
  // Your sync code goes here...
}
Run Code Online (Sandbox Code Playgroud)


swi*_*ynx 6

Swift-Nio 和 Vapor Swift

对于那些使用 Swift-Nio(或基于 Swift-Nio 的 Vapor Swift)的人来说,有一个针对此问题的内置解决方案:

class MyClass {
    let lock = Lock()
    var myArray: Array<Int> = []

    func networkRequestWhatEver() {
        lock.withLock {
            array.append(someValue)
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Lock请注意,修改同一对象时应使用同一Array对象(或Dictionary等)。

https://github.com/apple/swift-nio/blob/5e728b57862ce9e13877ff1edc9249adc933070a/Sources/NIOConcurrencyHelpers/lock.swift#L15

  • ```导入 NIOConcurrencyHelpers``` (2认同)