Fra*_*ger 5 c++ multithreading pthreads producer-consumer
我想实现一个遵循大致接口的生产者/消费者场景:
class Consumer {
private:
vector<char> read(size_t n) {
// If the internal buffer has `n` elements, then dequeue them
// Otherwise wait for more data and try again
}
public:
void run() {
read(10);
read(4839);
// etc
}
void feed(const vector<char> &more) {
// Safely queue the data
// Notify `read` that there is now more data
}
};
Run Code Online (Sandbox Code Playgroud)
在这种情况下,feed和run将运行在独立的线程和read应该是一个阻挡读出(如recv和fread).显然,我需要在我的双端队列中进行某种互斥,我需要某种通知系统来通知read再试一次.
我听说条件变量是要走的路,但我所有的多线程经验都在于Windows,我很难绕过它们.
谢谢你的帮助!
(是的,我知道返回向量是没有效率的.让我们不要进入那个.)
此代码未准备好生产.没有对任何库调用的结果进行错误检查.
我在LockThread中包装了互斥锁的锁定/解锁,因此它是异常安全的.但那是关于它的.
另外,如果我认真地这样做,我会将互斥锁和条件变量包装在对象中,这样它们就可以在Consumer的其他方法中被滥用.但只要你注意到在使用条件变量之前必须获取锁(以任何方式),那么这个简单的情况可以保持原样.
出于兴趣,你检查了增强线程库吗?
#include <iostream>
#include <vector>
#include <pthread.h>
class LockThread
{
public:
LockThread(pthread_mutex_t& m)
:mutex(m)
{
pthread_mutex_lock(&mutex);
}
~LockThread()
{
pthread_mutex_unlock(&mutex);
}
private:
pthread_mutex_t& mutex;
};
class Consumer
{
pthread_mutex_t lock;
pthread_cond_t cond;
std::vector<char> unreadData;
public:
Consumer()
{
pthread_mutex_init(&lock,NULL);
pthread_cond_init(&cond,NULL);
}
~Consumer()
{
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&lock);
}
private:
std::vector<char> read(size_t n)
{
LockThread locker(lock);
while (unreadData.size() < n)
{
// Must wait until we have n char.
// This is a while loop because feed may not put enough in.
// pthread_cond() releases the lock.
// Thread will not be allowed to continue until
// signal is called and this thread reacquires the lock.
pthread_cond_wait(&cond,&lock);
// Once released from the condition you will have re-aquired the lock.
// Thus feed() must have exited and released the lock first.
}
/*
* Not sure if this is exactly what you wanted.
* But the data is copied out of the thread safe buffer
* into something that can be returned.
*/
std::vector<char> result(n); // init result with size n
std::copy(&unreadData[0],
&unreadData[n],
&result[0]);
unreadData.erase(unreadData.begin(),
unreadData.begin() + n);
return (result);
}
public:
void run()
{
read(10);
read(4839);
// etc
}
void feed(const std::vector<char> &more)
{
LockThread locker(lock);
// Once we acquire the lock we can safely modify the buffer.
std::copy(more.begin(),more.end(),std::back_inserter(unreadData));
// Only signal the thread if you have the lock
// Otherwise race conditions happen.
pthread_cond_signal(&cond);
// destructor releases the lock and thus allows read thread to continue.
}
};
int main()
{
Consumer c;
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3793 次 |
| 最近记录: |