use*_*552 4 c multithreading semaphore pthreads
我不确定标题是否反映了我在这里问的内容,但如果没有很长的标题,我能做到的最好。我正在尝试worker thread在pthreads. 我想从main函数中生成一组线程,然后main线程将作业委托给工作线程并等待所有线程完成,然后再为它们分配下一个作业(实际上,要求是将线程安排在一个块中,很像 CUDA编程模型,但在 CPU 上。虽然它与当前问题无关)。该job数组用于向每个线程指示作业类型。目前,我已经使用信号量实现了这一点,这会强制执行繁忙的等待。我正在寻找方法使线程仅在需要时才进入睡眠和唤醒状态,而不是连续轮询。
每个线程执行的函数
volatile int jobs[MAX_THREADS]; // global job indicator array
sem_t semaphore; // semaphore to indicate completion
thread_execute(void *args)
{
tid = get_id(args);
while(jobs[tid] != -1)
{
if(jobs[tid] == 0) continue; // no job
if(jobs[tid] == JOBS_1)
{
jobs1();
jobs[tid] = 0; // go back to idle state
sem_post(&semapahore);
}
if(jobs[tid] == JOBS_2)
{
jobs2();
jobs[tid] = 0; // go back to idle state
sem_post(&semapahore);
}
}
pthread_exit(NULL);
}
Run Code Online (Sandbox Code Playgroud)
主要功能如下
int main()
{
sem_init(&semaphore, 0, 0);
jobs[0...MAX_THREADS] = 0;
spawn_threads();
// Dispatch first job
jobs[0...MAX_THREADS] = JOBS_1;
int semvalue = 0;
while (semvalue < MAX_THREADS) // Wait till all threads increment the semaphore
sem_getvalue(&sempaphore, &semvalue);
sem_init(&semaphore, 0, 0); // Init semaphore back to 0 for the next job
// I'm actually using diff. semaphores for diff. jobs
jobs[0...MAX_THREADS] = JOBS_2;
while (semvalue < MAX_THREADS)
sem_getvalue(&sempaphore, &semvalue);
jobs[0...MAX_THREADS] = -1; // No more jobs
pthread_join();
}
Run Code Online (Sandbox Code Playgroud)
此实现的问题在于main线程忙于等待所有工作线程完成,工作线程也在不断轮询作业数组以检查新作业。当线程进入睡眠状态并在需要时沿着信号处理程序和使用唤醒时,是否有更好的方法来执行此操作,pthread_kill()但使用单独的信号处理程序有点混乱。
您可以使用条件变量使线程进入休眠状态,直到收到信号。
volatile int jobs[MAX_THREADS]; // global job indicator array
pthread_cond_t th_cond; // threads wait on this
pthread_mutex_t th_mutex; // mutex to protect the signal
int busyThreads = MAX_THREADS;
pthread_cond_t m_cond; // main thread waits on this
pthread_mutex_t m_mutex; // mutex to protect main signal
thread_execute(void *args)
{
tid = get_id(args);
while(jobs[tid] != -1)
{
if(jobs[tid] == 0) continue; // no job
if(jobs[tid] == JOBS_1)
{
jobs1();
jobs[tid] = 0; // go back to idle state
pthread_mutex_lock(&th_mutex);
pthread_mutex_lock(&m_mutex);
--busyThreads; // one less worker
pthread_cond_signal(&m_cond); // signal main to check progress
pthread_mutex_unlock(&m_mutex);
pthread_cond_wait(&th_cond, &th_mutex); // wait for next job
pthread_mutex_unlock(&th_mutex);
}
if(jobs[tid] == JOBS_2)
{
jobs2();
jobs[tid] = 0; // go back to idle state
pthread_mutex_lock(&th_mutex);
--busyThreads;
pthread_cond_wait(&th_cond, &th_mutex);
pthread_mutex_unlock(&th_mutex);
}
}
pthread_exit(NULL);
}
Run Code Online (Sandbox Code Playgroud)
然后在主要:
int main()
{
sem_init(&semaphore, 0, 0);
jobs[0...MAX_THREADS] = 0;
spawn_threads();
// Dispatch first job
jobs[0...MAX_THREADS] = JOBS_1;
int semvalue = 0;
pthread_mutex_lock(&m_mutex);
while(busyThreads > 0) // check number of active workers
pthread_cond_wait(&m_cond, &m_mutex);
pthread_mutex_unlock(&m_mutex);
busyThreads = MAX_THREADS;
pthread_mutex_lock(&th_mutex);
pthread_cond_broadcast(&th_cond); // signal all workers to resume
pthread_mutex_unlock(&th_mutex);
// same for JOBS_2;
jobs[0...MAX_THREADS] = -1; // No more jobs
pthread_join();
}
Run Code Online (Sandbox Code Playgroud)