同步push_back和std :: thread

gsa*_*ras 3 c++ multithreading synchronization vector c++11

我的代码

void build(std::vector<RKD <DivisionSpace> >& roots, ...) {
  try {
    // using a local lock_guard to lock mtx guarantees unlocking on destruction / exception:
    std::lock_guard<std::mutex> lck (mtx);
    roots.push_back(RKD<DivisionSpace>(...));
  }
  catch (const std::bad_alloc&) {
    std::cout << "[exception caught when constructing tree]\n";
    return;
  }
}
Run Code Online (Sandbox Code Playgroud)

现在,实际工作应该连续进行,而不是并行进行.

构造函数RKD可以与其他构造函数并行运行RKD.然而,推回物体std::Vector是一个关键部分,对吗?

我要构建的对象的数量是已知的.在实践中它将在[2,16]范围内.理论上它可以是任何正数.

此外,我对它们将被插入容器的顺序并不感兴趣.

所以我可以这样做:

RKD tree = RKD(...);
mutex_lock(...);
roots.push_back(tree);
Run Code Online (Sandbox Code Playgroud)

然而,这意味着复制,不是吗?

我应该怎么做才能使我的代码并行?


由于这个答案,我决定使用锁(而不仅仅是互斥锁).

5go*_*der 7

该托马斯Lewowski已经在他的评论中长大,我都在扩大的建议是非常简单,基于以下观察:一push_backstd::vector潜在的需要重新分配的后备存储和拷贝(或者最好,移动)的元素.这构成了需要同步的关键部分.

对于下一个例子,假设我们想要一个向前填充前12个素数的向量,但我们不关心它们的排序.(我刚刚对这里的数字进行了硬编码,但假设它们是通过一些昂贵的计算得到的,这是有意义的并行.)在下面的场景中存在危险的竞争条件.

std::vector<int> numbers {};  // an empty vector

// thread A             // thread B             // thread C

numbers.push_back( 2);  numbers.push_back(11);  numbers.push_back(23);
numbers.push_back( 3);  numbers.push_back(13);  numbers.push_back(27);
numbers.push_back( 5);  numbers.push_back(17);  numbers.push_back(29);
numbers.push_back( 7);  numbers.push_back(19);  numbers.push_back(31);
Run Code Online (Sandbox Code Playgroud)

这还有另一个问题push_back.如果两个线程同时调用它,它们都将尝试在同一索引处构造一个具有潜在灾难性后果的对象.因此,reserve(n)在分叉线程之前没有解决问题.

但是,由于您事先知道元素的数量,因此您可以简单地它们分配到a中的特定位置std::vector而不更改其大小.如果不更改大小,则没有关键部分.因此,在以下场景中没有竞争.

std::vector<int> numbers(12);  // 12 elements initialized with 0

// thread A          // thread B          // thread C

numbers[ 0] =  2;    numbers[ 1] =  3;    numbers[ 2] =  5;
numbers[ 3] =  7;    numbers[ 4] = 11;    numbers[ 5] = 13;
numbers[ 6] = 17;    numbers[ 7] = 19;    numbers[ 8] = 23;
numbers[ 9] = 29;    numbers[10] = 31;    numbers[11] = 37;
Run Code Online (Sandbox Code Playgroud)

当然,如果两个线程都试图写入相同的索引,那么竞争将再次出现.幸运的是,在实践中防范这一点并不困难.如果你的向量有n个元素并且你有p个线程,则线程i只写入元素[ i n/p,(i + 1)n/p).请注意,这比使用线程i仅在j mod p = i时写入索引j处的元素更可取因为它可以减少缓存失效.所以上面例子中的访问模式是次优的,最好是这样的.

std::vector<int> numbers(12);  // 12 elements initialized with 0

// thread A          // thread B          // thread C

numbers[ 0] =  2;    numbers[ 4] = 11;    numbers[ 8] = 23;
numbers[ 1] =  3;    numbers[ 5] = 13;    numbers[ 9] = 29;
numbers[ 2] =  5;    numbers[ 6] = 17;    numbers[10] = 31;
numbers[ 3] =  7;    numbers[ 7] = 19;    numbers[11] = 37;
Run Code Online (Sandbox Code Playgroud)

到现在为止还挺好.但是,如果你没有一个std::vector<int>但是std::vector<Foo>怎么办?如果Foo没有默认构造函数,那么

std::vector<Foo> numbers(10);
Run Code Online (Sandbox Code Playgroud)

将无效.即使它有一个,创建许多昂贵的默认构造对象只是为了在不检索值的情况下很快重新分配它们将是无耻的.

当然,大多数设计良好的类应该有一个非常便宜的默认构造函数.例如,a std::string默认构造为空字符串,不需要内存分配.一个好的实现将降低默认构造字符串的成本

std::memset(this, 0, sizeof(std::string));
Run Code Online (Sandbox Code Playgroud)

如果编译器足够聪明,可以确定我们正在分配和初始化整个std::vector<std::string>(n),那么它可能能够进一步优化到单个调用

std::calloc(n, sizeof(std::string));
Run Code Online (Sandbox Code Playgroud)

因此,如果有任何机会你可以Foo廉价地构建和分配,那么你就完成了.但是,如果事情变得困难,可以通过将其移到堆中来避免此问题.智能指针是廉价的可默认构造的,所以

std::vector<std::unique_ptr<Foo>> foos(n);
Run Code Online (Sandbox Code Playgroud)

最终会减少到一个

std::calloc(n, sizeof(std::unique_ptr<Foo>));
Run Code Online (Sandbox Code Playgroud)

没有你做任何事情Foo.当然,这种便利是以每个元素的动态内存分配为代价的.

std::vector<std::unique_ptr<Foo>> foos(n);

// thread A                    // thread B                           // thread C

foos[0].reset(new Foo {...});  foos[n / 3 + 0].reset(new Foo {...});  foos[2 * n / 3 + 0].reset(new Foo {...});
foos[1].reset(new Foo {...});  foos[n / 3 + 1].reset(new Foo {...});  foos[2 * n / 3 + 1].reset(new Foo {...});
foos[2].reset(new Foo {...});  foos[n / 3 + 2].reset(new Foo {...});  foos[2 * n / 3 + 2].reset(new Foo {...});
...                            ...                                    ...
Run Code Online (Sandbox Code Playgroud)

这可能没有你想象的那么糟糕,因为虽然动态内存分配不是免费的,但是sizeofa std::unique_ptr非常小,所以如果sizeof(Foo)很大,你会得到一个更紧凑的矢量,可以更快地迭代.这一切当然取决于您打算如何使用您的数据.

如果你事先不知道元素的确切数量或者害怕你会搞砸索引,还有另一种方法:让每个线程填充自己的向量并在最后合并它们.继续素数的例子,我们得到了这个.

std::vector<int> numbersA {};  // private store for thread A
std::vector<int> numbersB {};  // private store for thread B
std::vector<int> numbersC {};  // private store for thread C

// thread A              // thread B              // thread C

numbersA.push_back( 2);  numbersB.push_back(11);  numbersC.push_back(23);
numbersA.push_back( 3);  numbersB.push_back(13);  numbersC.push_back(27);
numbersA.push_back( 5);  numbersB.push_back(17);  numbersC.push_back(29);
numbersA.push_back( 7);  numbersB.push_back(21);  numbersC.push_back(31);

// Back on the main thread after A, B and C are joined:

std::vector<int> numbers(
    numbersA.size() + numbersB.size() + numbersC.size());
auto pos = numbers.begin();
pos = std::move(numbersA.begin(), numbersA.end(), pos);
pos = std::move(numbersB.begin(), numbersB.end(), pos);
pos = std::move(numbersC.begin(), numbersC.end(), pos);
assert(pos == numbers.end());

// Now dispose of numbersA, numbersB and numbersC as soon as possible
// in order to release their no longer needed memory.
Run Code Online (Sandbox Code Playgroud)

(std::move上面代码中使用的是算法库中的代码.)

这种方法有都因为最可取的内存访问模式numbersA,numbersBnumbersC正在编写完全独立分配的内存.当然,我们必须完成加入中间结果的额外顺序工作.请注意,效率很大程度上依赖于移动分配元素的成本与查找/创建元素的成本相比可忽略不计的事实.至少如上所述,代码还假定您的类型具有廉价的默认构造函数.当然,如果你的类型不是这种情况,你可以再次使用智能指针.

我希望这能为您提供足够的想法来优化您的问题.

如果您以前从未使用过智能指针,请查看"C++中的RAII和智能指针 ",并查看标准库的动态内存管理库.上面显示的技术当然也适用于a,std::vector<Foo *>但我们不再使用现代C++中拥有这样的原始指针的资源.