Ben*_*Ben 12 c++ synchronization ipc shared-memory lock-free
请考虑以下情形:
要求:
程序序列(伪代码):
流程A(生产者):
int bufferPos = 0;
while( true )
{
if( isBufferEmpty( bufferPos ) )
{
writeData( bufferPos );
setBufferFull( bufferPos );
bufferPos = ( bufferPos + 1 ) % M;
}
}
Run Code Online (Sandbox Code Playgroud)
流程B(消费者):
int bufferPos = 0;
while( true )
{
if( isBufferFull( bufferPos ) )
{
readData( bufferPos );
setBufferEmpty( bufferPos );
bufferPos = ( bufferPos + 1 ) % M;
}
}
Run Code Online (Sandbox Code Playgroud)
现在这个古老的问题:如何有效地同步它们!?
理想情况下,我想要一个内存屏障的东西,这可以保证所有先前的读/写在所有CPU中都是可见的,沿着这样的方向:
writeData( i );
MemoryBarrier();
//All data written and visible, set flag
setBufferFull( i );
Run Code Online (Sandbox Code Playgroud)
这样,我只需要监视缓冲区标志,然后就可以安全地读取大数据块.
一般来说,我正在寻找像Preshing这里描述的获取/释放栅栏的东西:
http://preshing.com/20130922/acquire-and-release-fences/
(如果我理解正确的话,C++ 11原子只适用于单个进程的线程,而不适用于多个进程.)
然而,GCC自己的内存障碍(__sync_synchronize与编译器屏障asm volatile("":::"内存")相结合)似乎没有按预期工作,因为写入在障碍后变得可见,当我期望他们完成.
任何帮助,将不胜感激...
顺便说一句:在Windows下,使用volatile变量(Microsoft特定行为)可以正常工作......
seh*_*ehe 28
Boost Interprocess支持共享内存.
Boost Lockfree具有Single-Producer Single-Consumer队列类型(spsc_queue).这基本上就是你所说的循环缓冲区.
这是一个string使用此队列以无锁方式传递IPC消息(在本例中为类型)的演示.
首先,让我们定义我们的类型:
namespace bip = boost::interprocess;
namespace shm
{
template <typename T>
using alloc = bip::allocator<T, bip::managed_shared_memory::segment_manager>;
using char_alloc = alloc<char>;
using shared_string = bip::basic_string<char, std::char_traits<char>, char_alloc >;
using string_alloc = alloc<shared_string>;
using ring_buffer = boost::lockfree::spsc_queue<
shared_string,
boost::lockfree::capacity<200>
// alternatively, pass
// boost::lockfree::allocator<string_alloc>
>;
}
Run Code Online (Sandbox Code Playgroud)
为简单起见,我选择演示运行时大小的spsc_queue实现,随机请求200个元素的容量.
所述shared_string的typedef定义了一个串,将透明地从共享存储器段分配,所以它们也是"神奇"与其他进程共享.
这是最简单的,所以:
int main()
{
// create segment and corresponding allocator
bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536);
shm::string_alloc char_alloc(segment.get_segment_manager());
shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")();
Run Code Online (Sandbox Code Playgroud)
这将打开共享内存区域,找到共享队列(如果存在).注意这应该在现实生活中同步.
现在进行实际演示:
while (true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
shm::shared_string v(char_alloc);
if (queue->pop(v))
std::cout << "Processed: '" << v << "'\n";
}
Run Code Online (Sandbox Code Playgroud)
消费者只是无限地监视队列中的待处理作业,并且每个处理一个~10ms.
生产者方面非常相似:
int main()
{
bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536);
shm::char_alloc char_alloc(segment.get_segment_manager());
shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")();
Run Code Online (Sandbox Code Playgroud)
再次,在初始化阶段添加适当的同步.此外,您可能会让生产者负责在适当的时候释放共享内存段.在这个演示中,我只是"让它挂起".这很适合测试,见下文.
那么,制作人做了什么?
for (const char* s : { "hello world", "the answer is 42", "where is your towel" })
{
std::this_thread::sleep_for(std::chrono::milliseconds(250));
queue->push({s, char_alloc});
}
}
Run Code Online (Sandbox Code Playgroud)
是的,生产者在~750ms内准确生成 3条消息,然后退出.
请注意,如果我们这样做(假设一个带有作业控制的POSIX shell):
./producer& ./producer& ./producer&
wait
./consumer&
Run Code Online (Sandbox Code Playgroud)
将"立即"打印3x3消息,同时让消费者继续运行.干
./producer& ./producer& ./producer&
Run Code Online (Sandbox Code Playgroud)
在此之后,将再次显示"涓涓细流"的消息(以3~24毫秒的间隔突发),因为消费者仍然在后台运行
请参阅此要点在线查看完整代码:https://gist.github.com/sehe/9376856
| 归档时间: |
|
| 查看次数: |
11048 次 |
| 最近记录: |