如何读取与另一个线程共享的 std::queue?

db7*_*638 6 c++ multithreading

我的代码获取图像并处理它们。性能对我的代码至关重要,所以我尝试了多线程。目前,我只将获取部分作为一个单独的线程。我正在使用std::queue存储获取的图像来实现一个简单的 FIFO 缓冲区。采集功能AcquireImages无限期地将原始图像数据写入该缓冲区,直到用户中断。处理函数,ProcessImages读取缓冲区并处理图像数据(当前在主线程中,但我计划在解决问题后将其设为一个单独的线程)。这是我的代码(修改后形成一个MCV 示例):

#include <iostream>
#include <vector>
#include <queue>
#include <atomic>
#include <thread>

#define NUM_CAMERAS 2

void AcquireImages(std::queue<unsigned char*> &rawImageQueue, std::atomic<bool> &quit)
{
    unsigned char* rawImage{};

    while (!quit)
    {
        for (int camera = 0; camera < NUM_CAMERAS; camera++)
        {
            switch (camera)
            {
            case 0:
                rawImage = (unsigned char*)"Cam0Image";
                break;
            case 1:
                rawImage = (unsigned char*)"Cam1Image";
                break;
            default:
                break;
            }
            
            rawImageQueue.push(std::move(rawImage));
        }
    }
}

int ProcessImages(const std::vector<unsigned char*> &rawImageVec, const int count)
{
    // Do something to the raw image vector

    if (count > 10)
    {
        return 1;
    }
    else
    {
        return 0;
    } // In my application, this function only returns non-zero upon user interception.
}


int main()
{
    // Preparation
    std::vector<unsigned char*> rawImageVec;
    rawImageVec.reserve(NUM_CAMERAS);
    std::queue<unsigned char*> rawImageQueue;
    int count{};

    const unsigned int nThreads = 1;    // this might grow later

    std::atomic<bool> loopFlags[nThreads];
    std::thread       threads[nThreads];

    // Start threads
    for (int i = 0; i < nThreads; i++) {
        loopFlags[i] = false;
        threads[i] = std::thread(AcquireImages, rawImageQueue, ref(loopFlags[i]));
    }

    // Process images
    while (true)
    {
    
        // Process the images
        for (int cam{}; cam < NUM_CAMERAS; ++cam)
        {
            rawImageVec.push_back(rawImageQueue.front());
            rawImageQueue.pop();
        }

        int processResult = ProcessImages(move(rawImageVec), count);
        if (processResult)
        {
            std::cout << "Leaving while loop.\n"; // In my application this is triggered by the user
            break;
        }

        rawImageVec.clear();
        ++count;
    }

    // Shutdown other threads
    for (auto & flag : loopFlags) {
        flag = true;
    }

    // Wait for threads to actually finish.
    for (auto& thread : threads) {
        thread.join();
    }

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

你们中的一些人可能已经注意到我的错误。我所知道的是该程序在rawImageVec.push_back(rawImageQueue.front());.

抛出异常后的输出如下:

Debug Assertion Failed!

Program: C:\WINDOWS\SYSTEM32\MSVCP140D.dll
File: c:\program files (x86)\microsoft visual studio 14.0\vc\include\deque
Line: 329

Expression: deque iterator not dereferencable
Run Code Online (Sandbox Code Playgroud)

我了解问题的原因可能是我正在阅读与另一个线程共享的内容(我说得对吗?)。我该如何解决?

我在评论中遵循了 Praetorian 的建议,在检查是否rawImageQueue为空后,我发现它始终为空。我不确定是什么原因造成的。

Jos*_*osh 7

这是共享队列上的生产者/消费者的通用示例。这个想法是,如果您正在写入和读取数据结构,则需要某种针对访问的保护。

为此,下面的示例使用条件变量和互斥体。

#include <thread>
#include <iostream>
#include <chrono>
#include <queue>
#include <mutex>
#include <vector>
#include <condition_variable>

using namespace std::chrono_literals;
using std::vector;
using std::thread;
using std::unique_lock;
using std::mutex;
using std::condition_variable;
using std::queue;

class WorkQueue
{
  condition_variable work_available;
  mutex work_mutex;
  queue<int> work;

public:
  void push_work(int item)
  {
    unique_lock<mutex> lock(work_mutex);

    bool was_empty = work.empty();
    work.push(item);

    lock.unlock();

    if (was_empty)
    {
      work_available.notify_one();
    }    
  }

  int wait_and_pop()
  {
    unique_lock<mutex> lock(work_mutex);
    while (work.empty())
    {
      work_available.wait(lock);
    }

    int tmp = work.front();
    work.pop();
    return tmp;
  }
};

int main() {
  WorkQueue work_queue;

  auto producer = [&]() {
    while (true) {
      work_queue.push_work(10);
      std::this_thread::sleep_for(2ms);
    }
  };

  vector<thread> producers;
  producers.push_back(std::thread(producer));
  producers.push_back(std::thread(producer));
  producers.push_back(std::thread(producer));
  producers.push_back(std::thread(producer));

  std::thread consumer([&]() {        
    while (true)
    {
      int work_to_do = work_queue.wait_and_pop();
      std::cout << "Got some work: " << work_to_do << std::endl;
    }
  });

  std::for_each(producers.begin(), producers.end(), [](thread &p) {
    p.join();
  });    

  consumer.join();  
}
Run Code Online (Sandbox Code Playgroud)