Dan*_* P. 4 c c++ concurrency mutex producer-consumer
这是一个经典的 c/p 问题,其中一些线程生成数据而其他线程读取数据。生产者和消费者都共享一个常量大小的缓冲区。如果缓冲区为空,则消费者必须等待,如果缓冲区已满,则生产者必须等待。我正在使用信号量来跟踪已满或空的队列。生产者将减少空闲位置信号量,增加值,并增加填充槽信号量。所以我试图实现一个程序,从生成器函数中获取一些数字,然后打印出这些数字的平均值。通过将此视为生产者-消费者问题,我试图在程序执行中节省一些时间。generateNumber 函数会导致过程中出现一些延迟,因此我想创建多个生成数字的线程,并将它们放入队列中。然后是“主线程” 正在运行的主函数必须从队列中读取并找到总和,然后求平均值。所以这是我到目前为止所拥有的:
#include <cstdio>
#include <cstdlib>
#include <time.h>
#include "Thread.h"
#include <queue>
int generateNumber() {
int delayms = rand() / (float) RAND_MAX * 400.f + 200;
int result = rand() / (float) RAND_MAX * 20;
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = delayms * 1000000;
nanosleep(&ts, NULL);
return result; }
struct threadarg {
Semaphore filled(0);
Semaphore empty(n);
std::queue<int> q; };
void* threadfunc(void *arg) {
threadarg *targp = (threadarg *) arg;
threadarg &targ = *targp;
while (targ.empty.value() != 0) {
int val = generateNumber();
targ.empty.dec();
q.push_back(val);
targ.filled.inc(); }
}
int main(int argc, char **argv) {
Thread consumer, producer;
// read the command line arguments
if (argc != 2) {
printf("usage: %s [nums to average]\n", argv[0]);
exit(1); }
int n = atoi(argv[1]);
// Seed random number generator
srand(time(NULL));
}
Run Code Online (Sandbox Code Playgroud)
我现在有点困惑,因为我不确定如何在消费者从队列中读取时(即如果 q 不为空)创建多个生成数字的生产者线程(如果 q 未满)。我不确定在主要内容中放什么来实现它。同样在“Thread.h”中,您可以创建线程、互斥锁或信号量。线程有.run(threadFunc, arg)、.join() 等方法。互斥锁可以被锁定或解锁。信号量方法都在我的代码中使用过。
你的队列没有同步,所以多个生产者可以同时调用push_back
,或者消费者同时调用pop_front
......这会中断。
完成这项工作的简单方法是使用线程安全队列,它可以是std::queue
您已有的包装器,外加一个互斥锁。
您可以首先添加一个互斥锁,并在您转发到的每个呼叫周围锁定/解锁它std::queue
- 对于单个消费者应该足够了,对于多个消费者您需要融合front()
并pop_front()
进入单个同步调用。
要让消费者在队列为空时阻塞,您可以向包装器添加条件变量。
这应该足以让您可以在线找到答案 - 下面的示例代码。
template <typename T> class SynchronizedQueue
{
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable condvar_;
typedef std::lock_guard<std::mutex> lock;
typedef std::unique_lock<std::mutex> ulock;
public:
void push(T const &val)
{
lock l(mutex_); // prevents multiple pushes corrupting queue_
bool wake = queue_.empty(); // we may need to wake consumer
queue_.push(val);
if (wake) condvar_.notify_one();
}
T pop()
{
ulock u(mutex_);
while (queue_.empty())
condvar_.wait(u);
// now queue_ is non-empty and we still have the lock
T retval = queue_.front();
queue_.pop();
return retval;
}
};
Run Code Online (Sandbox Code Playgroud)
用std::mutex
“Thread.h”给你的任何原语替换et al。