mbr*_*brt 9 c++ multithreading lock-free c++11
我正在尝试实现一个使用两个线程的类:一个用于生产者,一个用于消费者.当前实现不使用锁:
#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>
using Queue =
boost::lockfree::spsc_queue<
int,
boost::lockfree::capacity<1024>>;
class Worker
{
public:
Worker() : working_(false), done_(false) {}
~Worker() {
done_ = true; // exit even if the work has not been completed
worker_.join();
}
void enqueue(int value) {
queue_.push(value);
if (!working_) {
working_ = true;
worker_ = std::thread([this]{ work(); });
}
}
void work() {
int value;
while (!done_ && queue_.pop(value)) {
std::cout << value << std::endl;
}
working_ = false;
}
private:
std::atomic<bool> working_;
std::atomic<bool> done_;
Queue queue_;
std::thread worker_;
};
Run Code Online (Sandbox Code Playgroud)
应用程序需要将工作项排入一定时间,然后等待事件休眠.这是模拟行为的最小主要:
int main()
{
Worker w;
for (int i = 0; i < 1000; ++i)
w.enqueue(i);
std::this_thread::sleep_for(std::chrono::seconds(1));
for (int i = 0; i < 1000; ++i)
w.enqueue(i);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
Run Code Online (Sandbox Code Playgroud)
我很确定我的实现有问题:如果工作线程完成并且在执行之前working_ = false,另一个enqueue是什么呢?是否可以在不使用锁的情况下使我的代码线程安全?
解决方案要求:
我Worker根据你的建议做了另一个类的实现.这是我的第二次尝试:
class Worker
{
public:
Worker()
: working_(ATOMIC_FLAG_INIT), done_(false) { }
~Worker() {
// exit even if the work has not been completed
done_ = true;
if (worker_.joinable())
worker_.join();
}
bool enqueue(int value) {
bool enqueued = queue_.push(value);
if (!working_.test_and_set()) {
if (worker_.joinable())
worker_.join();
worker_ = std::thread([this]{ work(); });
}
return enqueued;
}
void work() {
int value;
while (!done_ && queue_.pop(value)) {
std::cout << value << std::endl;
}
working_.clear();
while (!done_ && queue_.pop(value)) {
std::cout << value << std::endl;
}
}
private:
std::atomic_flag working_;
std::atomic<bool> done_;
Queue queue_;
std::thread worker_;
};
Run Code Online (Sandbox Code Playgroud)
我介绍worker_.join()了enqueue方法内部.这可能会影响性能,但在非常罕见的情况下(当队列变空并且在线程退出之前,另一个enqueue出现).该working_变量现在是一个atomic_flag设置enqueue和清除的变量work.需要Additional whileafter working_.clear(),因为如果推送了另一个值,则在之前clear,但在之后while,不处理该值.
这个实现是否正确?
我做了一些测试,实现似乎工作.
OT:将它作为编辑或答案更好吗?
这是我对问题的解答。我不太喜欢回答自己,但我认为展示实际代码可能会对其他人有所帮助。
#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>
// I used this semaphore class: https://gist.github.com/yohhoy/2156481
#include "binsem.hpp"
using Queue =
boost::lockfree::spsc_queue<
int,
boost::lockfree::capacity<1024>>;
class Worker
{
public:
// the worker thread starts in the constructor
Worker()
: working_(ATOMIC_FLAG_INIT), done_(false), semaphore_(0)
, worker_([this]{ work(); })
{ }
~Worker() {
// exit even if the work has not been completed
done_ = true;
semaphore_.signal();
worker_.join();
}
bool enqueue(int value) {
bool enqueued = queue_.push(value);
if (!working_.test_and_set())
// signal to the worker thread to wake up
semaphore_.signal();
return enqueued;
}
void work() {
int value;
// the worker thread continue to live
while (!done_) {
// wait the start signal, sleeping
semaphore_.wait();
while (!done_ && queue_.pop(value)) {
// perform actual work
std::cout << value << std::endl;
}
working_.clear();
while (!done_ && queue_.pop(value)) {
// perform actual work
std::cout << value << std::endl;
}
}
}
private:
std::atomic_flag working_;
std::atomic<bool> done_;
binsem semaphore_;
Queue queue_;
std::thread worker_;
};
Run Code Online (Sandbox Code Playgroud)
我尝试了@Cameron 的建议,不要关闭线程并添加信号量。enqueue这实际上仅在第一个和最后一个中使用work。这不是无锁的,但仅限于这两种情况。
我在之前的版本(请参阅我编辑的问题)和这个版本之间进行了一些性能比较。当开始和停止次数不多时,没有显着差异。然而,enqueue当它必须使用signal工作线程而不是启动一个新线程时,速度要快 10 倍。这是一个罕见的情况,所以不是很重要,但无论如何它是一个改进。
该实现满足:
enqueue和work很忙时);enqueue| 归档时间: |
|
| 查看次数: |
1559 次 |
| 最近记录: |