我们在哪里可以在std :: latch上使用std :: barrier?

Bud*_*nga 5 c++ std visual-c++ c++-experimental

我最近听说了新的c ++标准功能,这些功能包括:

  1. std :: latch
  2. std :: barrier

我无法弄清楚它们在哪些情况下比另一种情况更适用和有用。

  • 如果有人可以举一个如何明智地使用每个人的榜样,那将真的很有帮助。

Art*_*cca 10

很简短的回答

它们确实针对完全不同的目标:

  • 当你有一堆线程并且你想一次在它们之间同步时,屏障很有用,例如,一次对它们的所有数据进行操作。
  • 如果您有一堆工作项并且您想知道它们何时都被处理,并且不一定对哪个线程处理它们感兴趣,那么锁存器很有用。

更长的答案

当您有一个执行某些处理的工作线程池和一个在它们之间共享的工作项队列时,通常会使用屏障和闩锁。这不是使用它们的唯一情况,但它是一种非常常见的情况,确实有助于说明差异。下面是一些示例代码,可以像这样设置一些线程:

const size_t worker_count = 7; // or whatever
std::vector<std::thread> workers;
std::vector<Proc> procs(worker_count);
Queue<std::function<void(Proc&)>> queue;
for (size_t i = 0; i < worker_count; ++i) {
    workers.push_back(std::thread(
        [p = &procs[i], &queue]() {
            while (auto fn = queue.pop_back()) {
                fn(*p);
            }
        }
    ));
}
Run Code Online (Sandbox Code Playgroud)

我假设在该示例中存在两种类型:

  • Proc:特定于您的应用程序的类型,包含处理工作项所需的数据和逻辑。对一个的引用传递给线程池中运行的每个回调函数。
  • Queue: 一个线程安全的阻塞队列。C++ 标准库中没有这样的东西(有点令人惊讶)但是有很多包含它们的开源库,例如FollyMPMCQueuemoodycamel::ConcurrentQueue,或者您可以自己构建一个不那么花哨的库std::mutexstd::condition_variable并且std::deque(有很多示例说明如何如果您使用 Google 搜索它们,请执行此操作)。

锁存器

闩锁通常用于等待您推送到队列中的某些工作项全部完成,通常这样您就可以检查结果。

std::vector<WorkItem> work = get_work();
std::latch latch(work.size());
for (WorkItem& work_item : work) {
    queue.push_back([&work_item, &latch](Proc& proc) {
        proc.do_work(work_item);
        latch.count_down();
    });
}
latch.wait();
// Inspect the completed work
Run Code Online (Sandbox Code Playgroud)

这是如何工作的:

  1. 线程最终会从队列中弹出工作项,池中可能有多个线程同时处理不同的工作项。
  2. 当每个工作项完成时,latch.count_down()被调用,有效地递减从 开始的内部计数器work.size()
  3. 当所有工作项都完成时,该计数器达到零,此时latch.wait()返回并且生产者线程知道工作项已全部处理完毕。

笔记:

  • 闩锁计数是将被处理的工作项的数量,而不是工作线程的数量。
  • count_down()方法可以在每个线程上被调用零次、一次或多次,并且该数量对于不同的线程可能不同。例如,即使您将 7 条消息推送到 7 个线程,也可能所有 7 个项目都在同一个线程上处理(而不是每个线程一个),这很好。
  • 其他不相关的工作项目可以与这些项目交错(例如,因为它们被其他生产者线程推送到队列中),这也很好。
  • 原则上,latch.wait()在所有工作线程完成处理所有工作项之前不会调用它。(这是编写线程代码时需要注意的那种奇怪的情况。)但是没关系,这不是竞争条件:latch.wait()在这种情况下会立即返回。
  • 使用闩锁的另一种方法是,除了此处显示的队列之外,还有另一个队列,其中包含工作项的结果。线程池回调将结果推送到该队列,而生产者线程从中弹出结果。基本上,它与queue此代码中的 方向相反。这也是一个完全有效的策略,事实上,如果有的话它更常见,但在其他情况下闩锁更有用。

障碍

屏障通常用于使所有线程同时等待,以便可以同时操作与所有线程相关的数据。

typedef Fn std::function<void()>;
Fn completionFn = [&procs]() {
    // Do something with the whole vector of Proc objects
};
auto barrier = std::make_shared<std::barrier<Fn>>(worker_count, completionFn);
auto workerFn = [barrier](Proc&) {
    barrier->count_down_and_wait();
};
for (size_t i = 0; i < worker_count; ++i) {
    queue.push_back(workerFn);
}
Run Code Online (Sandbox Code Playgroud)

这是如何工作的:

  1. 所有工作线程将从workerFn队列中弹出这些项目之一并调用barrier.count_down_and_wait().
  2. 一旦所有人都在等待,其中一个会打电话,completionFn()而其他人则继续等待。
  3. 一旦该功能完成,它们都count_down_and_wait()将从队列中返回并可以自由地从队列中弹出其他不相关的工作项。

笔记:

  • 这里的屏障计数是工作线程的数量。
  • 保证每个线程都会workerFn从队列中精确弹出一个并处理它。一旦一个线程从队列中弹出一个,它将等待barrier.count_down_and_wait()直到所有其他副本workerFn其他线程弹出,因此它没有机会弹出另一个。
  • 我使用了一个指向屏障的共享指针,以便在所有工作项完成后它会自动销毁。这不是闩锁的问题,因为我们可以在生产者线程函数中将其设置为局部变量,因为它会等到工作线程使用闩锁(它调用latch.wait())。这里生产者线程不等待屏障,所以我们需要以不同的方式管理内存。
  • 如果您确实希望原始生产者线程等到屏障完成,那很好,它也可以调用count_down_and_wait(),但显然您需要通过worker_count + 1给屏障的构造函数。(然后你就不需要为屏障使用共享指针了。)
  • 如果其他工作项同时被推送到队列中,那也没关系,尽管这可能会浪费时间,因为某些线程将只是坐在那里等待获取屏障,而其他线程则在它们之前被其他工作分心获得屏障。

!!!危险 !!!

关于其他工作被推入队列的最后一个要点是“很好”,只有当其他工作也没有使用障碍时才会出现这种情况!如果您有两个不同的生产者线程将带有屏障的工作项放在同一个队列中,并且这些项目是交错的,那么一些线程将在一个屏障上等待而其他线程在另一个屏障上等待,并且都不会达到所需的等待计数 -死锁. 避免这种情况的一种方法是只在单个线程中使用这样的屏障,或者甚至在整个程序中只使用一个屏障(这听起来很极端,但实际上是一种很常见的策略,因为屏障通常用于一个-启动时的时间初始化)。另一种选择是,如果您使用的线程队列支持它,则立即将屏障的所有工作项以原子方式推送到队列中,这样它们就不会与任何其他工作项交错。(这不适用于moodycamel队列,它支持一次推送多个项目,但不保证它们不会与其他线程推送的项目交错。)

无完成功能的障碍

在您提出这个问题时,建议的实验性 API 不支持完成功能。即使是当前的 API 至少也不允许使用它们,所以我想我应该展示一个示例,说明如何也可以像这样使用屏障。

auto barrier = std::make_shared<std::barrier<>>(worker_count);
auto workerMainFn = [&procs, barrier](Proc&) {
    barrier->count_down_and_wait();
    // Do something with the whole vector of Proc objects
    barrier->count_down_and_wait();
};
auto workerOtherFn = [barrier](Proc&) {
    barrier->count_down_and_wait();  // Wait for work to start
    barrier->count_down_and_wait();  // Wait for work to finish
}
queue.push_back(std::move(workerMainFn));
for (size_t i = 0; i < worker_count - 1; ++i) {
    queue.push_back(workerOtherFn);
}
Run Code Online (Sandbox Code Playgroud)

这是如何工作的:

关键思想是在每个线程中等待屏障两次,并在两者之间进行工作。第一个等待与前面的示例具有相同的目的:它们确保队列中的任何较早的工作项在开始此工作之前已完成。第二次等待确保队列中的任何后续项目在此工作完成之前不会启动。

笔记:

注释与前面的障碍示例大致相同,但有一些不同之处:

  • 一个区别是,因为屏障不依赖于特定的完成函数,你更有可能在多次使用之间共享它,就像我们在闩锁示例中所做的那样,避免使用共享指针。
  • 这个例子看起来像使用没有完成函数的屏障要复杂得多,但这只是因为这种情况不太适合他们。有时,您所需要的只是到达障碍。例如,虽然我们在线程启动之前初始化了一个队列,但也许您为每个线程都有一个队列,但在线程的运行函数中进行了初始化。在这种情况下,障碍可能只是表示队列已经初始化并准备好让其他线程相互传递消息。在这种情况下,您可以使用没有完成功能的屏障,而无需像这样等待两次。
  • 您实际上可以为此使用闩锁,调用count_down()然后wait()代替count_down_and_wait(). 但是使用屏障更有意义,因为调用组合函数更简单,而且使用屏障可以更好地向未来的代码读者传达您的意图。
  • 无论如何,之前的“危险”警告仍然适用。