use*_*098 6 c c++ linux multithreading thread-safety
最近我一直在玩IPC使用共享内存.我一直试图实现的一件事是一个简单的环形缓冲区,其中包含1个进程生成和1个进程消耗.每个进程都有自己的序列号来跟踪其位置.使用原子操作更新这些序列号,以确保其他进程可以看到正确的值.一旦环形缓冲区已满,生产者将阻止.代码是无锁的,因为没有使用信号量或互斥量.
性能方面我在相当适中的VM上每秒获得大约2000万条消息 - 非常满意:)
我对我的代码是如何"正确"感到好奇.有人能发现任何固有问题/竞争条件吗?这是我的代码.提前感谢您的任何意见.
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>
#include <string.h>
#define SHM_ID "/mmap-test"
#define BUFFER_SIZE 4096
#define SLEEP_NANOS 1000 // 1 micro
struct Message
{
long _id;
char _data[128];
};
struct RingBuffer
{
size_t _rseq;
char _pad1[64];
size_t _wseq;
char _pad2[64];
Message _buffer[BUFFER_SIZE];
};
void
producerLoop()
{
int size = sizeof( RingBuffer );
int fd = shm_open( SHM_ID, O_RDWR | O_CREAT, 0600 );
ftruncate( fd, size+1 );
// create shared memory area
RingBuffer* rb = (RingBuffer*)mmap( 0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0 );
close( fd );
// initialize our sequence numbers in the ring buffer
rb->_wseq = rb->_rseq = 0;
int i = 0;
timespec tss;
tss.tv_sec = 0;
tss.tv_nsec = SLEEP_NANOS;
while( 1 )
{
// as long as the consumer isn't running behind keep producing
while( (rb->_wseq+1)%BUFFER_SIZE != rb->_rseq%BUFFER_SIZE )
{
// write the next entry and atomically update the write sequence number
Message* msg = &rb->_buffer[rb->_wseq%BUFFER_SIZE];
msg->_id = i++;
__sync_fetch_and_add( &rb->_wseq, 1 );
}
// give consumer some time to catch up
nanosleep( &tss, 0 );
}
}
void
consumerLoop()
{
int size = sizeof( RingBuffer );
int fd = shm_open( SHM_ID, O_RDWR, 0600 );
if( fd == -1 ) {
perror( "argh!!!" ); return;
}
// lookup producers shared memory area
RingBuffer* rb = (RingBuffer*)mmap( 0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0 );
// initialize our sequence numbers in the ring buffer
size_t seq = 0;
size_t pid = -1;
timespec tss;
tss.tv_sec = 0;
tss.tv_nsec = SLEEP_NANOS;
while( 1 )
{
// while there is data to consume
while( seq%BUFFER_SIZE != rb->_wseq%BUFFER_SIZE )
{
// get the next message and validate the id
// id should only ever increase by 1
// quit immediately if not
Message msg = rb->_buffer[seq%BUFFER_SIZE];
if( msg._id != pid+1 ) {
printf( "error: %d %d\n", msg._id, pid ); return;
}
pid = msg._id;
++seq;
}
// atomically update the read sequence in the ring buffer
// making it visible to the producer
__sync_lock_test_and_set( &rb->_rseq, seq );
// wait for more data
nanosleep( &tss, 0 );
}
}
int
main( int argc, char** argv )
{
if( argc != 2 ) {
printf( "please supply args (producer/consumer)\n" ); return -1;
} else if( strcmp( argv[1], "consumer" ) == 0 ) {
consumerLoop();
} else if( strcmp( argv[1], "producer" ) == 0 ) {
producerLoop();
} else {
printf( "invalid arg: %s\n", argv[1] ); return -1;
}
}
Run Code Online (Sandbox Code Playgroud)
乍一看对我来说似乎是正确的。我意识到您对性能感到满意,但一个有趣的实验可能是使用比 __sync_fetch_and_add 更轻量级的东西。AFAIK 它是一个完整的内存屏障,这是昂贵的。由于只有一个生产者和一个消费者,因此发布和相应的获取操作应该会给您带来更好的性能。Facebook 的 Folly 库有一个单生产者单消费者队列,它使用新的 C++11 原子: https: //github.com/facebook/folly/blob/master/folly/ProducerConsumerQueue.h
归档时间: |
|
查看次数: |
7502 次 |
最近记录: |