我需要在C++程序中并行化一些任务,并且对并行编程来说是全新的.到目前为止,我通过互联网搜索取得了一些进展,但现在有点卡住了.我想在循环中重用一些线程,但显然不知道如何做我正在尝试的东西.
我从计算机上的两个ADC卡(并行获取)中获取数据,然后我需要对收集的数据(并行处理)执行一些操作,同时收集下一批数据.这是一些伪代码来说明
//Acquire some data, wait for all the data to be acquired before proceeding
std::thread acq1(AcquireData, boardHandle1, memoryAddress1a);
std::thread acq2(AcquireData, boardHandle2, memoryAddress2a);
acq1.join();
acq2.join();
while(user doesn't interrupt)
{
//Process first batch of data while acquiring new data
std::thread proc1(ProcessData,memoryAddress1a);
std::thread proc2(ProcessData,memoryAddress2a);
acq1(AcquireData, boardHandle1, memoryAddress1b);
acq2(AcquireData, boardHandle2, memoryAddress2b);
acq1.join();
acq2.join();
proc1.join();
proc2.join();
/*Proceed in this manner, alternating which memory address
is written to and being processed until the user interrupts the program.*/
}
Run Code Online (Sandbox Code Playgroud)
这是它的主要要点.循环的下一次运行将写入"a"内存地址,同时处理"b"数据并继续交替(我可以获取代码来执行此操作,只是将其取出以防止混乱问题).
无论如何,问题(我确信有些人已经知道)是我第二次尝试使用acq1和acq2时,编译器(VS2012)说"IntelliSense:没有适当的操作符调用类类型的对象( )或转换函数到指针到函数类型".同样,如果我将std :: thread再次放在acq1和acq2之前,它会显示"错误C2374:'acq1':重新定义;多次初始化".
所以问题是,我可以在完成上一个任务后将线程重新分配给新任务吗?我总是等待先前使用线程结束再次调用之前,但我不知道如何重新分配线程,因为它在循环中,我不能每次都创建一个新线程(或者如果我可能,这似乎是浪费和不必要的,但我可能会弄错).
提前致谢
我一直试图让一个项目摆脱每个boost参考并切换到纯C++ 11.
有一次,创建了线程工作者,它等待一个障碍来给出'go'命令,完成工作(通过N个线程传播)并在所有这些完成时同步.基本思想是主循环给出了go命令(boost :: barrier .wait())并等待具有相同函数的结果.
我在一个不同的项目中实现了一个基于Boost版本的定制Barrier,一切都运行得很好.实施如下:
Barrier.h:
class Barrier {
public:
Barrier(unsigned int n);
void Wait(void);
private:
std::mutex counterMutex;
std::mutex waitMutex;
unsigned int expectedN;
unsigned int currentN;
};
Run Code Online (Sandbox Code Playgroud)
Barrier.cpp
Barrier::Barrier(unsigned int n) {
expectedN = n;
currentN = expectedN;
}
void Barrier::Wait(void) {
counterMutex.lock();
// If we're the first thread, we want an extra lock at our disposal
if (currentN == expectedN) {
waitMutex.lock();
}
// Decrease thread counter
--currentN;
if (currentN == 0) {
currentN = expectedN;
waitMutex.unlock(); …Run Code Online (Sandbox Code Playgroud) 我使用生产者 - 消费者模型用pthread编写了一个多线程程序.
当我使用英特尔VTune分析器来分析我的程序时,我发现生产者和消费者在pthread_mutex_unlock上花了很多时间.我不明白为什么会这样.我认为线程可能需要等待很长时间才能获得互斥锁,但释放互斥锁应该很快,对吧?
下面的快照来自英特尔VTune.它显示了消费者尝试从缓冲区中获取项目的代码,以及每个代码行消耗的时间.
我的问题是为什么pthread_mutex_unlock有这样的开销?是pthread互斥体本身的问题还是我使用它的方式?

在用C++ 11编写的分布式作业系统中,我使用以下结构实现了一个fence(即工作线程池外部的线程可能会要求阻塞,直到完成所有当前计划的作业):
struct fence
{
std::atomic<size_t> counter;
std::mutex resume_mutex;
std::condition_variable resume;
fence(size_t num_threads)
: counter(num_threads)
{}
};
Run Code Online (Sandbox Code Playgroud)
实现fence的代码如下所示:
void task_pool::fence_impl(void *arg)
{
auto f = (fence *)arg;
if (--f->counter == 0) // (1)
// we have zeroed this fence's counter, wake up everyone that waits
f->resume.notify_all(); // (2)
else
{
unique_lock<mutex> lock(f->resume_mutex);
f->resume.wait(lock); // (3)
}
}
Run Code Online (Sandbox Code Playgroud)
如果线程在一段时间内进入围栏,这种方法非常有效.然而,如果他们几乎同时尝试这样做,似乎有时会发生在原子递减(1)和开始条件var(3)的等待之间,线程产生CPU时间而另一个线程将计数器递减到零( 1)并解雇cond.var(2).这导致前一个线程在(3)中永远等待,因为它已经被通知后开始等待它.
让事情变得可行的黑客就是在(2)之前进行10毫秒的睡眠,但这显然是不可接受的.
有关如何以高效的方式解决这个问题的任何建议?
我在www.cppreference.com上找到了以下关于条件变量的示例,http: //en.cppreference.com/w/cpp/thread/condition_variable .对cv.notify_one()的调用放在锁外.我的问题是,如果在保持锁定的同时进行调用以保证等待线程实际上处于等待状态并且将接收通知信号.
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
void worker_thread()
{
// Wait until main() sends data
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return ready;});
// after the wait, we own the lock.
std::cout << "Worker thread is processing data\n";
data += " after processing";
// Send data back to main()
processed = true;
std::cout << "Worker thread signals data processing completed\n";
// …Run Code Online (Sandbox Code Playgroud) 下面是一个 C++17 代码片段,其中一个线程等待另一个线程到达某一阶段:
std::condition_variable cv;
std::atomic<bool> ready_flag{false};
std::mutex m;
// thread 1
... // start a thread, then wait for it to reach certain stage
auto lock = std::unique_lock(m);
cv.wait(lock, [&]{ return ready_flag.load(std::memory_order_acquire); });
// thread 2
... // modify state, etc
ready_flag.store(true, std::memory_order_release);
std::lock_guard{m}; // NOTE: this is lock immediately followed by unlock
cv.notify_all();
Run Code Online (Sandbox Code Playgroud)
据我了解,这是使用原子标志和条件变量来实现目标的有效方法。例如不需要使用std::memory_order_seq_cst。
是否可以进一步放宽此代码?例如:
std::memory_order_relaxed在ready_flag.load()std::atomic_thread_fence()而不是std::lock_guard{m};我实现了一个类,该类使我可以将线程与条件变量进行同步。我发现有关notify_all应该在锁内还是在锁外完成的信息相互矛盾。我发现示例是双向构造的。
首先释放锁的参数是为了防止等待的线程在被通知释放后立即在互斥体上阻塞。
反对先释放锁的论点是断言,等待线程可能会丢失通知。
释放功能的两个版本在这里:
// version 1 - unlock then notify.
void release(int address = 1)
{
{
std::lock_guard<std::mutex> lk(_address_mutex);
_address = address;
}
_cv.notify_all();
}
// version 2 - notify then unlock
void release(int address = 1)
{
std::lock_guard<std::mutex> lk(_address_mutex);
_address = address;
_cv.notify_all();
}
Run Code Online (Sandbox Code Playgroud)
供参考,等待代码如下所示:
bool wait(const std::chrono::microseconds dur, int address = 1)
{
std::unique_lock<std::mutex> lk(_address_mutex);
if (_cv.wait_for(lk, dur, [&] {return _address == address; }))
{
_address = 0;
return true;
}
return false;
}
Run Code Online (Sandbox Code Playgroud)
是否存在等待线程丢失版本1中的通知的风险,在该版本中,互斥对象被允许在notify_all之前超出作用域?如果是这样,它将如何发生?(这对我来说并不明显,这是如何导致错过通知的。)
我可以清楚地看到在通知过程中保持互斥锁处于锁定状态如何导致等待线程立即进入等待状态。但这是一个小小的代价,如果它可以防止错过通知。
我对conditions_variables如何使用它们(安全地)感到困惑。在我的应用程序中,我有一个创建 gui 线程的类,但是当 gui 由 gui 线程构建时,主线程需要等待。
情况与下面的函数相同。主线程创建互斥锁、锁和condition_variable. 然后它使线程。虽然这worker thread还没有通过某个点(此处打印数字),但不允许主线程继续(即必须wait打印所有数字)。
condition_variables在这种情况下如何正确使用?另外,我读到自发唤醒是一个问题。我该如何处理它们?
int main()
{
std::mutex mtx;
std::unique_lock<std::mutex> lck(mtx);
std::condition_variable convar;
auto worker = std::thread([&]{
/* Do some work. Main-thread can not continue. */
for(int i=0; i<100; ++i) std::cout<<i<<" ";
convar.notify_all(); // let main thread continue
std::cout<<"\nworker done"<<std::endl;
});
// The main thread can do some work but then must wait until the worker has done it's calculations.
/* do some stuff …Run Code Online (Sandbox Code Playgroud) c++ ×6
c++11 ×2
barrier ×1
c++17 ×1
concurrency ×1
intel-vtune ×1
loops ×1
mutex ×1
pthreads ×1
reusability ×1
stl ×1