在单写入器多读取器线程中交换缓冲区

Sha*_*baz 5 c linux algorithm readerwriterlock double-buffering

故事

有一个作家线程,定期从某个地方收集数据(实时,但这在问题中并不重要)。有很多读者然后从这些数据中读取。通常的解决方案是使用两个读写器锁和两个缓冲区,如下所示:

Writer (case 1):
acquire lock 0                        
loop
    write to current buffer
    acquire other lock
    free this lock
    swap buffers
    wait for next period
Run Code Online (Sandbox Code Playgroud)

或者

Writer (case 2):
acquire lock 0                        
loop
    acquire other lock
    free this lock
    swap buffers
    write to current buffer
    wait for next period
Run Code Online (Sandbox Code Playgroud)

问题

在这两种方法中,如果获取其他锁操作失败,则不进行交换并且写入器将覆盖其先前的数据(因为写入器是实时的,它无法等待读取器)因此在这种情况下,所有读取器都会丢失该帧数据的。

不过这没什么大不了的,读者是我自己的代码,他们很短,所以有了双缓冲区,这个问题就解决了,如果有问题,我可以把它变成三重缓冲区(或更多)。

问题是我想最小化的延迟。想象案例1:

writer writes to buffer0                reader is reading buffer1
writer can't acquire lock1              because reader is still reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      <- **this point**
|
|
writer wakes up, and again writes to buffer0
Run Code Online (Sandbox Code Playgroud)

**此时**,理论上其他读者可以读取数据,buffer0如果只有作者可以在读者完成后进行交换而不是等待下一个时期。在这种情况下发生的事情是,仅仅因为一个阅读器有点晚,所有阅读器都错过了一帧数据,而问题本来可以完全避免。

情况2类似:

writer writes to buffer0                reader is idle
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)
|
|                                       reader starts reading buffer1
writer wakes up                         |
it can't acquire lock1                  because reader is still reading buffer1
overwrites buffer0
Run Code Online (Sandbox Code Playgroud)

我尝试混合解决方案,因此作者尝试在写入后立即交换缓冲区,如果不可能,则在下一个时段醒来后。所以像这样:

Writer (case 3):
acquire lock 0                        
loop
    if last buffer swap failed
        acquire other lock
        free this lock
        swap buffers
    write to current buffer
    acquire other lock
    free this lock
    swap buffers
    wait for next period
Run Code Online (Sandbox Code Playgroud)

现在延迟问题仍然存在:

writer writes to buffer0                reader is reading buffer1
writer can't acquire lock1              because reader is still reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      <- **this point**
|
|
writer wakes up
swaps buffers
writes to buffer1
Run Code Online (Sandbox Code Playgroud)

再次在**此时**,所有读者都可以开始阅读buffer0,这是buffer0写入后的短暂延迟,但他们必须等到下一个作者的时期。

问题

问题是,我该如何处理?如果我希望编写器在所需的时间段精确执行,则需要使用 RTAI 功能等待时间段,而我不能这样做

Writer (case 4):
acquire lock 0                        
loop
    write to current buffer
    loop a few times or until the buffer has been swapped
        sleep a little
        acquire other lock
        free this lock
        swap buffers
    wait for next period
Run Code Online (Sandbox Code Playgroud)

这引入了抖动。因为“几次”可能会比“等待下一个时期”变得更长,因此作者可能会错过其时期的开始。

为了更清楚,这是我想要发生的事情:

writer writes to buffer0                reader is reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      As soon as all readers finish reading,
|                                         the buffer is swapped
|                                       readers start reading buffer0
writer wakes up                         |
writes to buffer1
Run Code Online (Sandbox Code Playgroud)

我已经找到的

我发现read-copy-update据我所知一直为缓冲区分配内存并释放它们直到读者完成它们,这对我来说是不可能的,原因有很多。一,线程在内核和用户空间之间共享。其次,使用 RTAI,您不能在实时线程中分配内存(因为那样您的线程将调用 Linux 的系统调用,从而破坏实时性!(更不用说使用 Linux 自己的 RCU 实现是无用的,因为出于同样的原因)

我还考虑过有一个额外的线程以更高的频率尝试交换缓冲区,但这听起来不是一个好主意。首先,它本身需要与作者同步,其次,我有许多这样的作者-读者并行工作在不同的部分,每个作者一个额外的线程似乎太多了。所有作者的一个线程在与每个作者的同步方面似乎非常复杂。

chi*_*ill 3

您使用什么 API 来实现读写锁?您是否有定时锁,例如pthread_rwlock_timedwrlock?如果是,我认为它可以解决您的问题,如以下代码所示:

void *buf[2];

void
writer ()
{
  int lock = 0, next = 1;

  write_lock (lock);
  while (1)
    {
      abs_time tm = now() + period;

      fill (buf [lock]);
      if (timed_write_lock (next, tm))
        {
          unlock (lock);
          lock = next;
          next = (next + 1) & 1;
        }
      wait_period (tm);
    }
}


void
reader ()
{
  int lock = 0;
  while (1)
    {
      reade_lock (lock);
      process (buf [lock]);
      unlock (lock);
      lock = (lock + 1) & 1;
    }
}
Run Code Online (Sandbox Code Playgroud)

这里发生的情况是,对于写入者来说,等待锁还是等待下一个周期并不重要,只要它确保在下一个周期到来之前醒来即可。绝对超时可以确保这一点。