AyB*_*Bay 4 multithreading grand-central-dispatch barrier data-synchronization swift
我正在尝试实现线程安全的PhoneBook对象。电话簿应该能够添加一个人,并根据其姓名和phoneNumber查找一个人。从实现的角度来看,这仅涉及两个哈希表,一个关联名称-> Person,另一个关联电话号-> Person。
需要注意的是,我希望该对象是threadSafe。这意味着我希望能够在电话簿中支持并发查找,同时确保每次只有一个线程可以将一个人添加到电话簿中。这是基本的读写器问题,我正在尝试使用GrandCentralDispatch和Dispatch障碍解决此问题。尽管遇到问题,但我仍在努力解决此问题。以下是我的Swift操场代码:
//: Playground - noun: a place where people can play
import UIKit
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
public class Person: CustomStringConvertible {
public var description: String {
get {
return "Person: \(name), \(phoneNumber)"
}
}
public var name: String
public var phoneNumber: String
private var readLock = ReaderWriterLock()
public init(name: String, phoneNumber: String) {
self.name = name
self.phoneNumber = phoneNumber
}
public func uniquePerson() -> Person {
let randomID = UUID().uuidString
return Person(name: randomID, phoneNumber: randomID)
}
}
public enum Qos {
case threadSafe, none
}
public class PhoneBook {
private var qualityOfService: Qos = .none
public var nameToPersonMap = [String: Person]()
public var phoneNumberToPersonMap = [String: Person]()
private var readWriteLock = ReaderWriterLock()
public init(_ qos: Qos) {
self.qualityOfService = qos
}
public func personByName(_ name: String) -> Person? {
var person: Person? = nil
if qualityOfService == .threadSafe {
readWriteLock.concurrentlyRead { [weak self] in
guard let strongSelf = self else { return }
person = strongSelf.nameToPersonMap[name]
}
} else {
person = nameToPersonMap[name]
}
return person
}
public func personByPhoneNumber( _ phoneNumber: String) -> Person? {
var person: Person? = nil
if qualityOfService == .threadSafe {
readWriteLock.concurrentlyRead { [weak self] in
guard let strongSelf = self else { return }
person = strongSelf.phoneNumberToPersonMap[phoneNumber]
}
} else {
person = phoneNumberToPersonMap[phoneNumber]
}
return person
}
public func addPerson(_ person: Person) {
if qualityOfService == .threadSafe {
readWriteLock.exclusivelyWrite { [weak self] in
guard let strongSelf = self else { return }
strongSelf.nameToPersonMap[person.name] = person
strongSelf.phoneNumberToPersonMap[person.phoneNumber] = person
}
} else {
nameToPersonMap[person.name] = person
phoneNumberToPersonMap[person.phoneNumber] = person
}
}
}
// A ReaderWriterLock implemented using GCD and OS Barriers.
public class ReaderWriterLock {
private let concurrentQueue = DispatchQueue(label: "com.ReaderWriterLock.Queue", attributes: DispatchQueue.Attributes.concurrent)
private var writeClosure: (() -> Void)!
public func concurrentlyRead(_ readClosure: (() -> Void)) {
concurrentQueue.sync {
readClosure()
}
}
public func exclusivelyWrite(_ writeClosure: @escaping (() -> Void)) {
self.writeClosure = writeClosure
concurrentQueue.async(flags: .barrier) { [weak self] in
guard let strongSelf = self else { return }
strongSelf.writeClosure()
}
}
}
// MARK: Testing the synchronization and thread-safety
for _ in 0..<5 {
let iterations = 1000
let phoneBook = PhoneBook(.none)
let concurrentTestQueue = DispatchQueue(label: "com.PhoneBookTest.Queue", attributes: DispatchQueue.Attributes.concurrent)
for _ in 0..<iterations {
let person = Person(name: "", phoneNumber: "").uniquePerson()
concurrentTestQueue.async {
phoneBook.addPerson(person)
}
}
sleep(10)
print(phoneBook.nameToPersonMap.count)
}
Run Code Online (Sandbox Code Playgroud)
为了测试我的代码,我运行1000个并发线程,它们只是向PhoneBook中添加了一个新的Person。每个人都是唯一的,因此在完成1000个线程之后,我希望电话簿包含1000个计数。每次执行写操作时,我都会执行dispatch_barrier调用,更新哈希表,然后返回。据我所知,这就是我们要做的全部;但是,在重复运行1000个线程之后,我得到了电话簿中条目的数量不一致,并且遍布整个地方:
Phone Book Entries: 856
Phone Book Entries: 901
Phone Book Entries: 876
Phone Book Entries: 902
Phone Book Entries: 912
Run Code Online (Sandbox Code Playgroud)
谁能帮我弄清楚发生了什么事?我的锁定代码是否有问题,甚至更糟的是我的测试结构如何?我对这个多线程问题空间非常陌生,谢谢!
Rob*_*Rob 11
问题是你的ReaderWriterLock。您将另存writeClosure为一个属性,然后异步调度一个调用该已保存属性的闭包。但是,如果exclusiveWrite在此期间有其他人进入,则您的writeClosure财产将被新的封顶取代。
在这种情况下,这意味着您可以Person多次添加相同的内容。而且因为您使用的是字典,所以这些重复项具有相同的键,因此不会导致您看到所有1000个条目。
您实际上可以简化ReaderWriterLock,完全消除该属性。我还要做concurrentRead一个泛型,返回值(就像这样sync做),然后抛出任何错误(如果有的话)。
public class ReaderWriterLock {
private let queue = DispatchQueue(label: "com.domain.app.rwLock", attributes: .concurrent)
public func concurrentlyRead<T>(_ block: (() throws -> T)) rethrows -> T {
return try queue.sync {
try block()
}
}
public func exclusivelyWrite(_ block: @escaping (() -> Void)) {
queue.async(flags: .barrier) {
block()
}
}
}
Run Code Online (Sandbox Code Playgroud)
其他一些不相关的观察结果:
顺便说一下,这种简化ReaderWriterLock恰好解决了另一个问题。该writeClosure属性(我们现在已将其删除)可以很容易地引入一个强大的参考周期。
是的,您谨慎使用[weak self],因此没有强大的参考周期,但有可能。我建议无论您在何处使用闭包属性,都应在使用完该属性后将其设置为闭包属性nil,以便解决闭包可能偶然引起的任何强引用。这样一来,就不可能有持久的强参考周期。(此外,将解析闭包本身以及任何局部变量或其他外部引用。)
你睡了十秒钟。这应该绰绰有余,但是我建议sleep您不要添加随机调用(因为您永远无法100%确定)。幸运的是,您有一个并发队列,因此可以使用:
concurrentTestQueue.async(flags: .barrier) {
print(phoneBook.count)
}
Run Code Online (Sandbox Code Playgroud)
由于存在这种障碍,它将一直等到您放入该队列的所有其他操作都完成之后。
注意,我不只是打印nameToPersonMap.count。此数组已在中进行了仔细同步PhoneBook,因此您不能只让随机的外部类直接访问它而不进行同步。
只要您有一些要在内部进行同步的属性,它就应该存在private,然后创建一个线程安全的函数/变量以检索所需的内容:
public class PhoneBook {
private var nameToPersonMap = [String: Person]()
private var phoneNumberToPersonMap = [String: Person]()
...
var count: Int {
return readWriteLock.concurrentlyRead {
nameToPersonMap.count
}
}
}
Run Code Online (Sandbox Code Playgroud)您说您正在测试线程安全性,但是随后创建了PhoneBook带.none选项的选项(没有实现线程安全性)。在这种情况下,我预计会出现问题。您必须PhoneBook使用.threadSafe选项创建您的。
您有多种strongSelf模式。那真是不明智。在Swift中通常不需要它,因为您可以使用它[weak self],然后只需执行可选的链接即可。
综合所有这些,这是我最后的游乐场:
PlaygroundPage.current.needsIndefiniteExecution = true
public class Person {
public let name: String
public let phoneNumber: String
public init(name: String, phoneNumber: String) {
self.name = name
self.phoneNumber = phoneNumber
}
public static func uniquePerson() -> Person {
let randomID = UUID().uuidString
return Person(name: randomID, phoneNumber: randomID)
}
}
extension Person: CustomStringConvertible {
public var description: String {
return "Person: \(name), \(phoneNumber)"
}
}
public enum ThreadSafety { // Changed the name from Qos, because this has nothing to do with quality of service, but is just a question of thread safety
case threadSafe, none
}
public class PhoneBook {
private var threadSafety: ThreadSafety
private var nameToPersonMap = [String: Person]() // if you're synchronizing these, you really shouldn't expose them to the public
private var phoneNumberToPersonMap = [String: Person]() // if you're synchronizing these, you really shouldn't expose them to the public
private var readWriteLock = ReaderWriterLock()
public init(_ threadSafety: ThreadSafety) {
self.threadSafety = threadSafety
}
public func personByName(_ name: String) -> Person? {
if threadSafety == .threadSafe {
return readWriteLock.concurrentlyRead { [weak self] in
self?.nameToPersonMap[name]
}
} else {
return nameToPersonMap[name]
}
}
public func personByPhoneNumber(_ phoneNumber: String) -> Person? {
if threadSafety == .threadSafe {
return readWriteLock.concurrentlyRead { [weak self] in
self?.phoneNumberToPersonMap[phoneNumber]
}
} else {
return phoneNumberToPersonMap[phoneNumber]
}
}
public func addPerson(_ person: Person) {
if threadSafety == .threadSafe {
readWriteLock.exclusivelyWrite { [weak self] in
self?.nameToPersonMap[person.name] = person
self?.phoneNumberToPersonMap[person.phoneNumber] = person
}
} else {
nameToPersonMap[person.name] = person
phoneNumberToPersonMap[person.phoneNumber] = person
}
}
var count: Int {
return readWriteLock.concurrentlyRead {
nameToPersonMap.count
}
}
}
// A ReaderWriterLock implemented using GCD concurrent queue and barriers.
public class ReaderWriterLock {
private let queue = DispatchQueue(label: "com.domain.app.rwLock", attributes: .concurrent)
public func concurrentlyRead<T>(_ block: (() throws -> T)) rethrows -> T {
return try queue.sync {
try block()
}
}
public func exclusivelyWrite(_ block: @escaping (() -> Void)) {
queue.async(flags: .barrier) {
block()
}
}
}
for _ in 0 ..< 5 {
let iterations = 1000
let phoneBook = PhoneBook(.threadSafe)
let concurrentTestQueue = DispatchQueue(label: "com.PhoneBookTest.Queue", attributes: .concurrent)
for _ in 0..<iterations {
let person = Person.uniquePerson()
concurrentTestQueue.async {
phoneBook.addPerson(person)
}
}
concurrentTestQueue.async(flags: .barrier) {
print(phoneBook.count)
}
}
Run Code Online (Sandbox Code Playgroud)
就个人而言,我倾向于更进一步,
Person对象数组,以便:
例如:
public struct Person {
public let name: String
public let phoneNumber: String
public static func uniquePerson() -> Person {
return Person(name: UUID().uuidString, phoneNumber: UUID().uuidString)
}
}
public struct PhoneBook {
private var synchronizedPeople = Synchronized([Person]())
public func people(name: String? = nil, phone: String? = nil) -> [Person]? {
return synchronizedPeople.value.filter {
(name == nil || $0.name == name) && (phone == nil || $0.phoneNumber == phone)
}
}
public func append(_ person: Person) {
synchronizedPeople.writer { people in
people.append(person)
}
}
public var count: Int {
return synchronizedPeople.reader { $0.count }
}
}
/// A structure to provide thread-safe access to some underlying object using reader-writer pattern.
public class Synchronized<T> {
/// Private value. Use `public` `value` computed property (or `reader` and `writer` methods)
/// for safe, thread-safe access to this underlying value.
private var _value: T
/// Private reader-write synchronization queue
private let queue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".synchronized", qos: .default, attributes: .concurrent)
/// Create `Synchronized` object
///
/// - Parameter value: The initial value to be synchronized.
public init(_ value: T) {
_value = value
}
/// A threadsafe variable to set and get the underlying object
public var value: T {
get { return queue.sync { _value } }
set { queue.async(flags: .barrier) { self._value = newValue } }
}
/// A "reader" method to allow thread-safe, read-only concurrent access to the underlying object.
///
/// - Warning: If the underlying object is a reference type, you are responsible for making sure you
/// do not mutating anything. If you stick with value types (`struct` or primitive types),
/// this will be enforced for you.
public func reader<U>(_ block: (T) throws -> U) rethrows -> U {
return try queue.sync { try block(_value) }
}
/// A "writer" method to allow thread-safe write with barrier to the underlying object
func writer(_ block: @escaping (inout T) -> Void) {
queue.async(flags: .barrier) {
block(&self._value)
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1759 次 |
| 最近记录: |