Raf*_*cio 0 c++ multithreading synchronization mutex locking
我会做一个假设的场景,只是为了清楚我需要知道什么。
假设我有一个经常更新的文件。
我需要通过几个不同的线程读取和解析这个文件。
每次重写此文件时,我都会唤醒一个条件互斥锁,以便其他线程可以为所欲为。
我的问题是:
如果我有 10000 个线程,第一个线程执行会阻塞其他 9999 个线程的执行吗?
它是并行工作还是同步工作?
自 Jonathan Wakely 首次发布以来,这篇文章已经进行了编辑,以解决下面的评论,并更好地区分条件变量、条件(在第一个版本中都称为条件)以及等待函数的运行方式。然而,同样重要的是探索来自现代 C++ using std::future、std::thread和的更好方法,并std::packaged_task讨论了一些关于缓冲和合理线程数的问题。
首先,10,000 个线程是很多线程。除了最高性能的计算机之外,线程调度程序将承受很大的负担。Windows 下的典型四核工作站会很困难。这表明某种任务的排队调度是有序的,典型的服务器使用大约 10 个线程接受数千个连接,每个服务 1,000 个连接。线程的数量对于这个问题来说确实不重要,但是在如此大量的任务中,10,000 个线程是不切实际的。
为了处理同步,互斥锁本身实际上并没有按照您的建议执行。您所描述的概念是一种事件对象,可能是自动重置事件,它本身就是一个更高级别的概念。Windows 将它们作为其 API 的一部分,但它们是在 Linux(通常用于便携式软件)上设计的,具有两个原始组件,一个互斥锁和一个条件变量。这些一起创建了自动重置事件,以及 Windows 调用它们的其他类型的“等待事件”。在 C++ 中,这些由std::mutex和提供std::condition_variable。
互斥体本身仅提供对公共资源的锁定控制。在这种情况下,我们不是在考虑客户端和服务器(或工人和执行人员),而是考虑对等点之间对单个资源的竞争,该资源只能由一个参与者(线程)在同一时间访问时间。互斥锁可以阻止执行,但它不会根据外部信号释放。如果另一个线程锁定了互斥锁,则互斥锁会阻塞,并无限期地等待直到锁的所有者释放它。这不是您在问题中提出的场景。
在您的场景中,有许多“客户端”和一个“服务器”线程。服务器负责发出信号,表明某事已准备好进行处理。在这个设计中,所有其他线程都是客户端(线程本身并没有使它们成为客户端,我们只是通过它们执行的函数来认为它们是客户端)。在一些讨论中,客户端被称为工作线程。
客户端使用互斥量/条件变量对来等待信号。此构造通常采用锁定互斥锁的形式,然后使用该互斥锁等待条件变量。当线程进入wait条件变量时,互斥锁被解锁。对等待工作完成的所有客户端线程重复此操作。典型的客户端等待示例是:
std::mutex m;
std::condition_variable cv;
void client_thread()
{
// Wait until server signals data is ready
std::unique_lock<std::mutex> lk(m); // lock the mutex
cv.wait(lk); // wait on cv
// do the work
}
Run Code Online (Sandbox Code Playgroud)
这是显示一起使用的互斥锁/条件变量的伪代码。std::condition_variable有两个等待函数的重载,这是最简单的一个。目的是线程将阻塞,进入空闲状态,直到通知 condition_variable。它并不打算作为一个完整的例子,只是指出这两个对象是一起使用的。
Johnathan Wakely 下面的评论是基于wait并非无限期的事实;不能保证呼叫被解除阻塞的原因是信号。文档将其称为“虚假唤醒”,由于操作系统调度的复杂原因偶尔会发生这种情况。Johnathan 提出的观点是,即使唤醒不是因为 condition_variable 被发出信号,使用这对代码的代码也必须是安全的。
在使用条件变量的说法中,这称为条件(而不是条件变量)。条件是应用程序定义的概念,在文献中通常以布尔值表示,并且通常是检查布尔值、整数(有时是原子类型)或调用返回布尔值的函数的结果。有时应用程序定义的真正条件的概念更复杂,但条件的整体效果是确定线程一旦被唤醒,是应该继续处理,还是应该简单地重复等待。
满足此要求的一种方法是 std::condition_variable::wait 的第二个版本。两者声明如下:
void wait( std::unique_lock<std::mutex>& lock );
template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );
Run Code Online (Sandbox Code Playgroud)
Johnathan 的观点是坚持使用第二个版本。但是,文档描述(并且有两个重载的事实表明)谓词是可选的。Predicate 是某种函子,通常是 lambda 表达式,如果等待应该解除阻塞,则解析为 true,如果等待应该继续等待,则解析为 false,并且在锁定下进行评估。Predicate 与 condition 同义,因为 Predicate 是一种指示关于等待是否应该解除阻塞的真或假的方式。
尽管 Predicate 实际上是可选的,但“等待”在阻塞直到接收到信号时并不完美的概念要求如果使用第一个版本,那是因为应用程序的构造使得虚假唤醒没有任何后果(事实上,是设计的一部分)。
Jonathan 的引用表明 Predicate 是在锁定状态下评估的,但通常是不切实际的范式的泛化形式。std::condition_variable 必须等待锁定的 std::mutex,这可能会保护定义条件的变量,但有时这是不可能的。有时条件更加复杂、外部或微不足道,以至于 std::mutex 与条件无关。
为了了解在建议的解决方案的上下文中它是如何工作的,假设有 10 个客户端线程在等待服务器发出工作要完成的信号,并且该工作作为虚拟函子的容器被安排在队列中。一个虚拟函子可能是这样的:
struct VFunc
{
virtual void operator()(){}
};
template <typename T>
struct VFunctor
{
// Something referring to T, possible std::function
virtual void operator()(){...call the std::function...}
};
typedef std::deque< VFunc > Queue;
Run Code Online (Sandbox Code Playgroud)
上面的伪代码建议了一个带有虚拟 operator() 的典型函子,它返回 void 并且不带任何参数,有时被称为“盲调用”。提出这一点的关键是 Queue 可以在不知道调用什么的情况下拥有这些集合,并且 Queue 中的任何 VFunctor 都可以引用 std::function 可能能够调用的任何东西,其中包括其他对象的成员函数, lambdas, 简单函数等。 然而,如果只有一个函数签名被调用,也许:
typedef std::deque< std::function<void(void)>> Queue
Run Code Online (Sandbox Code Playgroud)
足够了。
对于任何一种情况,只有当队列中有条目时才能完成工作。
等待,人们可能会使用这样的类:
class AutoResetEvent
{
private:
std::mutex m;
std::condition_variable cv;
bool signalled;
bool signalled_all;
unsigned int wcount;
public:
AutoResetEvent() : wcount( 0 ), signalled(false), signalled_all(false) {}
void SignalAll() { std::unique_lock<std::mutex> l(m);
signalled = true;
signalled_all = true;
cv.notify_all();
}
void SignalOne() { std::unique_lock<std::mutex> l(m);
signalled = true;
cv.notify_one();
}
void Wait() { std::unique_lock<std::mutex> l(m);
++wcount;
while( !signalled )
{
cv.wait(l);
}
--wcount;
if ( signalled_all )
{ if ( wcount == 0 )
{ signalled = false;
signalled_all = false;
}
}
else { signalled = false;
}
}
};
Run Code Online (Sandbox Code Playgroud)
这是可等待对象的标准重置事件类型的伪代码,与 WindowsCreateEvent和WaitForSingleObjectAPI兼容,功能基本相同。
所有客户端线程都以 cv.wait 结束(这在 Windows 中可能有超时,使用 Windows API,但不能使用std::condition_variable)。在某些时候,服务器通过调用 Signalxxx 来通知事件。你的情况表明SignalAll()。
如果notify_one 被调用,等待线程之一被释放,所有其他线程保持休眠状态。的notify_all 被调用,然后所有等待该条件的线程被释放去做工作。
以下可能是使用 AutoResetEvent 的示例:
AutoResetEvent evt; // probably not a global
void client()
{
while( !Shutdown ) // assuming some bool to indicate shutdown
{
if ( IsWorkPending() ) DoWork();
evt.Wait();
}
}
void server()
{
// gather data
evt.SignalAll();
}
Run Code Online (Sandbox Code Playgroud)
IsWorkPending()正如 Jonathan Wakely 所指出的,使用满足条件的概念。在指示关闭之前,如果工作处于挂起状态,则此循环将处理工作,否则等待信号。虚假唤醒没有负面影响。可能会通过一个使用 std::mutex 或其他一些同步机制保护 Queue 的对象进行IsWorkPending()检查Queue.size()。如果工作挂起,DoWork()将按顺序从队列中弹出条目,直到队列为空。返回时,循环将再次等待信号。
综上所述,mutex 和 condition_variable 的组合与旧的思维方式有关,现在在 C++11/C++14 时代已经过时了。除非您在使用兼容编译器时遇到问题,否则最好调查 std::promise、std::future 和 std::async 或 std::thread 与 std::packaged_task 的使用。例如,使用 future、promise、packaged_task 和 thread 可以完全取代上面的讨论。
例如:
// a function for threads to execute
int func()
{
// do some work, return status as result
return result;
}
Run Code Online (Sandbox Code Playgroud)
假设 func 完成您对文件所需的工作,则这些 typedef 适用:
typedef std::packaged_task< int() > func_task;
typedef std::future< int > f_int;
typedef std::shared_ptr< f_int > f_int_ptr;
typedef std::vector< f_int_ptr > f_int_vec;
Run Code Online (Sandbox Code Playgroud)
std::future 无法复制,因此使用 shared_ptr 存储它以便于在向量中使用,但有多种解决方案。
接下来,将这些用于 10 个工作线程的示例
void executive_function()
{
// a vector of future pointers
f_int_vec future_list;
// start some threads
for( int n=0; n < 10; ++n )
{
// a packaged_task calling func
func_task ft( &func );
// get a future from the task as a shared_ptr
f_int_ptr future_ptr( new f_int( ft.get_future() ) );
// store the task for later use
future_list.push_back( future_ptr );
// launch a thread to call task
std::thread( std::move( ft )).detach();
}
// at this point, 10 threads are running
for( auto &d : future_list )
{
// for each future pointer, wait (block if required)
// for each thread's func to return
d->wait();
// get the result of the func return value
int res = d->get();
}
}
Run Code Online (Sandbox Code Playgroud)
这里的重点实际上是在最后一个 range-for 循环中。该向量存储了由 packaged_tasks 提供的期货。这些任务用于启动线程,未来是同步执行的关键。一旦所有线程都在运行,每个线程都通过对未来的等待函数的简单调用来“等待”,之后可以获得 func 的返回值。不涉及互斥体或条件变量(我们知道)。
这让我想到了并行处理文件的主题,无论您如何启动多个线程。如果有一台可以处理 10,000 个线程的机器,那么如果每个线程都是面向文件的琐碎操作,就会有大量 RAM 资源专门用于文件处理,所有这些资源都会相互复制。根据选择的 API,每个读取操作都有关联的缓冲区。
假设文件大小为 10 MB,有 10,000 个线程开始对其进行操作,其中每个线程使用 4 KB 缓冲区进行处理。结合起来,这表明将有 40 MB 的缓冲区来处理 10 MB 的文件。简单地将文件读入 RAM 并提供对 RAM 中所有线程的只读访问会减少浪费。
如果磁盘缓存跟不上,那么在不同时间从文件的各个部分读取的多个任务可能会导致标准硬盘(对于闪存源并非如此)的严重抖动,这一事实进一步复杂化了这一概念。但更重要的是,10,000 个线程都在调用系统 API 来读取文件,每个线程都有相当大的开销。
如果源材料是完全读入 RAM 的候选者,则线程可以专注于 RAM 而不是文件,从而减轻开销,提高性能。线程可以在没有锁的情况下共享对内容的读取访问。
如果源文件太大而无法完全读取到 RAM 中,最好还是按源文件的块读取它,让线程从共享内存资源处理该部分,然后移动到一系列中的下一个块。