C++中的消费者/生产者

Dan*_* P. 4 c c++ concurrency mutex producer-consumer

这是一个经典的 c/p 问题,其中一些线程生成数据而其他线程读取数据。生产者和消费者都共享一个常量大小的缓冲区。如果缓冲区为空,则消费者必须等待,如果缓冲区已满,则生产者必须等待。我正在使用信号量来跟踪已满或空的队列。生产者将减少空闲位置信号量,增加值,并增加填充槽信号量。所以我试图实现一个程序,从生成器函数中获取一些数字,然后打印出这些数字的平均值。通过将此视为生产者-消费者问题,我试图在程序执行中节省一些时间。generateNumber 函数会导致过程中出现一些延迟,因此我想创建多个生成数字的线程,并将它们放入队列中。然后是“主线程” 正在运行的主函数必须从队列中读取并找到总和,然后求平均值。所以这是我到目前为止所拥有的:

#include <cstdio> 
#include <cstdlib>
#include <time.h>
#include "Thread.h" 
#include <queue> 

int generateNumber() {
    int delayms = rand() / (float) RAND_MAX * 400.f + 200;
    int result = rand() / (float) RAND_MAX * 20;
    struct timespec ts;
    ts.tv_sec = 0;
    ts.tv_nsec = delayms * 1000000;
    nanosleep(&ts, NULL);
    return result; }


struct threadarg {
    Semaphore filled(0);
    Semaphore empty(n);
    std::queue<int> q; };


void* threadfunc(void *arg) {
    threadarg *targp = (threadarg *) arg;
    threadarg &targ = *targp;
    while (targ.empty.value() != 0) {
        int val = generateNumber();
        targ.empty.dec(); 
        q.push_back(val);
        targ.filled.inc(); }
}
int main(int argc, char **argv) {
    Thread consumer, producer;
    // read the command line arguments
    if (argc != 2) {
        printf("usage: %s [nums to average]\n", argv[0]);
        exit(1); }
    int n = atoi(argv[1]);
    // Seed random number generator
    srand(time(NULL));
}
Run Code Online (Sandbox Code Playgroud)

我现在有点困惑,因为我不确定如何在消费者从队列中读取时(即如果 q 不为空)创建多个生成数字的生产者线程(如果 q 未满)。我不确定在主要内容中放什么来实现它。同样在“Thread.h”中,您可以创建线程、互斥锁或信号量。线程有.run(threadFunc, arg)、.join() 等方法。互斥锁可以被锁定或解锁。信号量方法都在我的代码中使用过。

Use*_*ess 7

你的队列没有同步,所以多个生产者可以同时调用push_back,或者消费者同时调用pop_front......这会中断。

完成这项工作的简单方法是使用线程安全队列,它可以是std::queue您已有的包装器,外加一个互斥锁。

您可以首先添加一个互斥锁,并在您转发到的每个呼叫周围锁定/解锁它std::queue- 对于单个消费者应该足够了,对于多个消费者您需要融合front()pop_front()进入单个同步调用。

要让消费者在队列为空时阻塞,您可以向包装器添加条件变量。

这应该足以让您可以在线找到答案 - 下面的示例代码。


template <typename T> class SynchronizedQueue
{
    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable condvar_;

    typedef std::lock_guard<std::mutex> lock;
    typedef std::unique_lock<std::mutex> ulock;

public:
    void push(T const &val)
    {
        lock l(mutex_); // prevents multiple pushes corrupting queue_
        bool wake = queue_.empty(); // we may need to wake consumer
        queue_.push(val);
        if (wake) condvar_.notify_one();
    }

    T pop()
    {
        ulock u(mutex_);
        while (queue_.empty())
            condvar_.wait(u);
        // now queue_ is non-empty and we still have the lock
        T retval = queue_.front();
        queue_.pop();
        return retval;
    }
};
Run Code Online (Sandbox Code Playgroud)

std::mutex“Thread.h”给你的任何原语替换et al。