Guy*_*ham 1 c multithreading mutex pthreads condition-variable
我想确定我理解条件变量是如何工作的,所以我将使用我写的程序来问我的问题.
在我的程序中,我有一个"生产者"线程(一个)和"工作线程"(几个让我们假设3).
生产者线程"处理"一个FIFO链表,这意味着它所做的只是检查列表开头是否有一个项(在我的程序类型请求中调用Req)(由一个名为front的全局指针指向)在我的程序中)如果是这样,将它分配到一个全局请求元素(称为globalReq).
工作线程,在一个循环中运行,等待处理请求,通过提取全局请求变量,将它们变为自己的局部变量(对于它们中的每一个都是"私有"的,因为每个线程都有一个独立的堆栈 - 正确我,如果我错了),然后处理请求.
为了做到这一点,我使用互斥量和条件变量.
一个重要的注意事项是,一旦请求存在(暂时让我们假设只存在一个请求),那么工作线程中的哪一个将"关注"它(假设它们都是"自由的" - 睡觉并不重要在条件变量上).
在提取请求并将其分配到全局请求之后,生成器线程调用pthread_cond_signal- 据我所知,解锁至少一个"阻塞"线程 - >因此它可以解除阻塞,例如2个线程.
所以我的问题是我现有的代码(如下):
1)我怎样才能保证只有一个线程(来自工作线程)将处理请求.我是否需要在所有通用"生产者消费者"实施中添加"while check loop"?
2)如何通过pthread_cond_broadcast(或者如果一个pthread_cond_signal未被阻塞的多个线程)通过互斥锁上的内容解除阻塞的线程,可能我还没有掌握它...
(每个)工作线程的代码是:
void *worker(void *arg)
{
while(1)
{
printf("\n BEFORE LOCKING sthread_mutex with thread: %d \n", syscall(SYS_gettid));
pthread_mutex_lock(&sthread_mutex);
printf("\n AFTER UNLOCKING sthread_mutex with thread: %d \n", syscall(SYS_gettid));
printf("\n BEFORE WAITING ON cond_var with thread: %d \n", syscall(SYS_gettid));
pthread_cond_wait(&cond_var,&sthread_mutex); //wait on condition variable
printf("\n AFTER WAITING ON cond_var with thread: %d \n", syscall(SYS_gettid));
printf("\n got signal for thread: %d \n",syscall(SYS_gettid));
// extract the current request into g local variable
// within the "private stack" of this thread
Req localReq = globalReq;
pthread_mutex_unlock(&sthread_mutex);
printf("\n AFTER UNLOCKING sthread_mutex with thread: %d \n", syscall(SYS_gettid));
// perform the desired task
task(localReq);
printf("\n BEFORE calling sem_post with thread: %d \n", syscall(SYS_gettid));
sem_post(&sem);
} // end while (1)
} // end of worker thread function
Run Code Online (Sandbox Code Playgroud)
生产者线程的代码是:
void *producer(void *arg)
{
while(1)
{
if(front != NULL) // queue not empty
{
// go to sleep if all "worker threads" are occuipied
// or decrement number of free "worker threads" threads by 1
printf(" schedualer thread BEFORE calling sem_wait on sem \n");
sem_wait(&sem);
// lock the sthread mutex in order to "synchronize" with the
// "worker threads"...
printf(" schedualer thread BEFORE locking sthread_mutex \n");
pthread_mutex_lock(&sthread_mutex);
printf(" schedualer thread AFTER locking sthread_mutex \n");
globalReq = extract_element(); // this is the global request variable
// notify the worker threads that an "arriving request" needed to
// be taking care of
printf(" schedualer thread BEFORE calling signal on cond_var \n");
// pthread_cond_signal(&cond_var);
pthread_cond_broadcast(&cond_var);
printf(" schedualer thread AFTER calling signal on cond_var \n");
// unlock the smutex
printf(" schedualer thread BEFORE UNLOCKING sthread_mutex \n");
pthread_mutex_unlock(&sthread_mutex);
printf(" schedualer thread AFTER UNLOCKING sthread_mutex \n");
} // queue not empty
else
continue;
} // end while (1)
} // end of producer
Run Code Online (Sandbox Code Playgroud)
另一个问题:
生产者线程调用sem_wait的全球信号量(这是在与工作线程的数量开始初始化,在这种情况下,3,以指示本身)有多少工作线程处理目前的请求,并完成此"机制",工作线程,一旦完成处理他们"赢了"的请求(当争用条件变量时),调用sem_post指示"另一个工作线程可用"
3)这是实现这种"发信号通知有多少可用工作线程"的正确(良好/有效)方式吗?
4)通过//*段落中提到的生产者和工人线程共享的全局变量 "传递"请求有什么好处和缺点?传递它是一种明智的方式,或者最好"只是"创建一个"新的请求变量"(在使用malloc的堆上),它将为每个工作线程和请求"专用"(并且还在其中释放它)每个工作线程一旦完成为请求服务)?
5)如果您对这段代码有任何其他意见(好的或坏的),请随时向我表明.
编辑:
大家好,
一些额外的问题:
除了生产者和工作者线程之外,还有另一个称为监听器的线程,它只是用于插入到达链表(FIFO队列)的请求,所以它实际上不是之前提到的生产者的任务.
所以我的新问题是:
8)关于我的程序的附加信息,我用信号量组成的"信令机制"是否有效?
9)由生产者和监听器线程管理的链表有两个全局指针 front,分别rear指向链表的头部和尾部(列表的头部是第一个要处理的请求).
下面是监听器线程执行的插入函数的实现,以及生产者线程执行的"提取"功能.
为了在"队列"(链表)上同步这两个线程,我使用了一个名为qmutex的共享互斥锁.
我的问题是,关于下面的两个代码,在每个函数中"放置"互斥锁(锁定和解锁)的"最佳"位置在哪里?
谢谢分配,
盖伊.
插入功能:
void insertion(void *toInsert)
{
struct getInfo *req = (struct getInfo *)toInsert;
newNode = (N*)malloc(sizeof(N));
newNode->req = req;
newNode->next = NULL;
// WHERE SHULD I LOCK (AND UNLOCK) THE QUEUE MUTEX ????????????????????????
if(front == NULL)
{
front = newNode;
printf("empty list - insert as head \n");
}
else
{
rear->next = newNode;
printf(" NOT AN EMPTY list - insert as last node \n");
}
rear = newNode;
} // end of insertion
Run Code Online (Sandbox Code Playgroud)
提取功能:
Req extract_element()
{
if(front == NULL)
printf("\n empty queue \n");
else
{
Req ret;
tmpExtract = front;
ret.socketNum = tmpExtract->req->socketNum;
ret.type = tmpExtract->req->type;
printf("\n extracting node with sockNum: %d \n",ret.socketNum);
front = front->next;
free(tmpExtract);
return(ret);
}
} // end of extract_element
Run Code Online (Sandbox Code Playgroud)
而不是直接回答您的问题,首先,这里是对典型方法的描述:
您有一种队列或列表,您可以在其中添加工作数据.每当您添加一组工作数据时,首先锁定互斥锁,添加数据,发出条件变量信号,然后解锁互斥锁.
然后,您的工作线程会锁定互斥锁,并在队列为空时等待循环中的条件.当发送信号时,一个或多个工作人员将被唤醒,但只有一个(一次)将获取互斥锁.锁定互斥锁后,"获胜者"会检查队列中是否存在某些内容,将其解压缩,解锁互斥锁,并执行必要的工作.在解锁互斥锁之后,其他线程也可能会被唤醒(并且如果条件被广播将会唤醒),并且将从队列中提取下一个工作,或者如果队列为空则返回等待.
在代码中,它看起来有点像这样:
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#define WORKER_COUNT 3
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_t workers[WORKER_COUNT];
static int queueSize = 0;
static void *workerFunc(void *arg)
{
printf("Starting worker %d\n", (int)arg);
while(1) {
pthread_mutex_lock(&mutex);
while(queueSize < 1) {
pthread_cond_wait(&cond, &mutex);
}
printf("Worker %d woke up, processing queue #%d\n", (int)arg, queueSize);
//Extract work from queue
--queueSize;
pthread_mutex_unlock(&mutex);
//Do work
sleep(1);
}
}
int main()
{
int i;
pthread_mutex_init(&mutex, 0);
pthread_cond_init(&cond, 0);
for(i=0; i<WORKER_COUNT; ++i) {
pthread_create(&(workers[i]), 0, workerFunc, (void*)(i+1));
}
sleep(1);
pthread_mutex_lock(&mutex);
//Add work to queue
queueSize = 5;
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&mutex);
sleep(10);
return 0;
}
Run Code Online (Sandbox Code Playgroud)
(我在线程之后省略了清理工作,并且将工作人员号码传递给线程很快且很脏,但在这种情况下可以正常工作).
在这里,工作人员将被唤醒pthread_cond_broadcast(),并且只要队列中存在某些东西就会运行(直到queueSize回到0 - 想象还有一个实际的队列),然后再回去等待.
回到问题:
1:互斥锁和保护变量(这里是它queueSize)负责这一点.您还需要保护变量,因为您的线程也可能因其他原因而被唤醒(所谓的虚假唤醒,请参阅http://linux.die.net/man/3/pthread_cond_wait).
2:如果你打电话,唤醒线程就像任何其他线程一样攻击互斥锁pthread_mutex_lock().
3:我不确定为什么你需要向生产者发出可用工作线程数量的信号?
4:队列需要可以从生产者和使用者那里访问 - 但是仍然可以用各种方式用函数(或类,如果你使用的是C++)封装.
5:我希望以上就足够了?
6:事情pthread_cond_wait()是,它可以有虚假的唤醒.也就是说,即使你没有发出信号,它也可能会醒来.因此,你需要保护变量(while()周围循环pthread_cond_wait()在我的代码示例),以确保有实际上是有原因的苏醒,一次pthread_cond_wait()返回.然后,您使用与条件使用的相同的互斥锁保护保护变量(以及您需要提取的任何工作数据),然后您可以确定只有一个线程将执行每项工作.
7:我没有让生产者进入睡眠状态,而是让它添加它可以提取到工作队列的任何数据.如果队列已满,那么它应该进入休眠状态,否则它应该继续添加内容.
8:使用Listener线程,我真的不明白为什么你甚至需要你的Producer线程.为什么不让工人extract_element()自言自语?
9:您需要保护对列表变量的所有访问.也就是说,在insertion()第一次访问之前锁定互斥锁front,并在上次访问之后将其解锁rear.同样的事情extract_element()- 虽然你需要重写函数,以便在队列为空时也有一个有效的返回值.
| 归档时间: |
|
| 查看次数: |
1303 次 |
| 最近记录: |