如何在C++中正确等待条件变量?

Dov*_*Dov 1 c++ asynchronous condition-variable wait

尝试在 Linux 下用 C++ 创建异步 I/O 文件读取器。我的例子有两个缓冲区。第一个读取块。然后,每次主循环时,我都会异步启动 IO 并调用process()它来运行当前块的模拟处理。处理完成后,我们等待条件变量。这个想法是异步处理程序应该通知条件变量。

不幸的是,这notify似乎发生在之前wait,而且这似乎不是条件变量wait()函数的工作方式。我应该如何重写代码,以便循环等待异步 io 完成?

#include <aio.h>
#include <fcntl.h>
#include <signal.h>
#include <unistd.h>

#include <condition_variable>
#include <cstring>
#include <iostream>
#include <thread>

using namespace std;
using namespace std::chrono_literals;

constexpr uint32_t blockSize = 512;

mutex readMutex;

condition_variable cv;

int fh;
int bytesRead;

void process(char* buf, uint32_t bytesRead) {
  cout << "processing..." << endl;
  usleep(100000);
}

void aio_completion_handler(sigval_t sigval) {
  struct aiocb* req = (struct aiocb*)sigval.sival_ptr;

  // check whether asynch operation is complete
  if (aio_error(req) == 0) {
    int ret = aio_return(req);
        bytesRead = req->aio_nbytes;
    cout << "ret == " << ret << endl;
    cout << (char*)req->aio_buf << endl;
  }
    {
        unique_lock<mutex> readLock(readMutex);
        cv.notify_one();
    }
}

void thready() {
  char* buf1 = new char[blockSize];
  char* buf2 = new char[blockSize];
  aiocb cb;
  char* processbuf = buf1;
  char* readbuf = buf2;
  fh = open("smallfile.dat", O_RDONLY);
  if (fh < 0) {
    throw std::runtime_error("cannot open file!");
  }

  memset(&cb, 0, sizeof(aiocb));
  cb.aio_fildes = fh;
  cb.aio_nbytes = blockSize;
  cb.aio_offset = 0;

  // Fill in callback information
  /*
  Using SIGEV_THREAD to request a thread callback function as a notification
  method
  */
  cb.aio_sigevent.sigev_notify_attributes = nullptr;
  cb.aio_sigevent.sigev_notify = SIGEV_THREAD;
  cb.aio_sigevent.sigev_notify_function = aio_completion_handler;
  /*
  The context to be transmitted is loaded into the handler (in this case, a
  reference to the aiocb request itself). In this handler, we simply refer to
  the arrived sigval pointer and use the AIO function to verify that the request
  has been completed.
  */
  cb.aio_sigevent.sigev_value.sival_ptr = &cb;

  int currentBytesRead = read(fh, buf1, blockSize);  // read the 1st block

  while (true) {
    cb.aio_buf = readbuf;
    aio_read(&cb);  // each next block is read asynchronously
    process(processbuf, currentBytesRead);  // process while waiting
        {
            unique_lock<mutex> readLock(readMutex);
            cv.wait(readLock);
        }
        currentBytesRead = bytesRead; // make local copy of global modified by the asynch code
    if (currentBytesRead < blockSize) {
      break;  // last time, get out
    }
    cout << "back from wait" << endl;
    swap(processbuf, readbuf);     // switch to other buffer for next time
    currentBytesRead = bytesRead;  // create local copy
  }

  delete[] buf1;
  delete[] buf2;
}

int main() {
  try {
    thready();
  } catch (std::exception& e) {
    cerr << e.what() << '\n';
  }
  return 0;
}
Run Code Online (Sandbox Code Playgroud)

And*_*zel 7

条件变量通常应该用于

  • 等待,直到谓词(例如共享变量)可能发生更改,并且
  • 通知等待线程谓词可能已更改,以便等待线程应再次检查谓词。

但是,您似乎试图使用条件变量本身的状态作为谓词。这不是条件变量的使用方式,可能会导致竞争条件,例如您的问题中描述的情况。始终检查谓词的另一个原因是条件变量可能导致虚假唤醒。

在你的情况下,创建一个共享变量可能是合适的

bool operation_completed = false;
Run Code Online (Sandbox Code Playgroud)

并使用该变量作为条件变量的谓词。对该变量的访问应始终由互斥体控制。

然后你可以改变线路

{
    unique_lock<mutex> readLock(readMutex);
    cv.notify_one();
}
Run Code Online (Sandbox Code Playgroud)

{
    unique_lock<mutex> readLock(readMutex);
    operation_completed = true;
    cv.notify_one();
}
Run Code Online (Sandbox Code Playgroud)

并更改线路

{
    unique_lock<mutex> readLock(readMutex);
    cv.wait(readLock);
}
Run Code Online (Sandbox Code Playgroud)

到:

{
    unique_lock<mutex> readLock(readMutex);
    while ( !operation_completed )
        cv.wait(readLock);
}
Run Code Online (Sandbox Code Playgroud)

代替

while ( !operation_completed )
    cv.wait(readLock);
Run Code Online (Sandbox Code Playgroud)

你也可以写

cv.wait( readLock, []{ return operation_completed; } );
Run Code Online (Sandbox Code Playgroud)

这是等价的。请参阅 的文档std::condition_varible::wait以获取更多信息。

当然,operation_completed也应该在适当的时候设置回false,同时互斥体被锁定。

  • 比 `while (!operation_completed)` 循环更惯用的是 `cv.wait(readLock, []{ return opera_completed; });` (3认同)
  • @user4581301:感谢您指出这一点。我已将您的评论纳入我的答案中。不过,我不会删除 `while` 循环,因为在这种情况下,向 OP 展示“幕后”发生的情况很重要。 (2认同)