C++ UNIX线程

Jus*_*Y17 1 c++ unix multithreading

我正在UNIX和C++中使用线程进行项目.基本上有一个生产者线程和5个消费者线程.生产者线程在随机时间将递增的数字添加到队列中,并且消费者线程轮询q尝试将其删除.由于某种原因,我的q.size()继续消极,我无法弄清楚为什么.

 #include <queue>
 #include <list>

 #include <stdio.h>
 #include <unistd.h>
 #include <stdlib.h>
 #include <string.h>
 #include <pthread.h>

 using namespace std;

 #define NUM_CONSUMER_THREADS 5
 #define NUM_PRODUCER_THREADS 1
 #define BUFFER_SIZE 20

 void *c_thread_function(void *arg);
 void *p_thread_function(void *arg);

 queue<int> q;

 int produce(int cur)
 {
  int temp = cur + 1;
  return temp;
 }

 void append(int num)
 {
  if ( q.size() < BUFFER_SIZE )
  {
   q.push(num);
  }
 }

 int take()
 {
  int removed = q.front();
  q.pop();
  sleep(1);
  return removed;
 }

 void consume(int num, int thread)
 {
  printf("%d consumed %d \n", thread, num);
 }


 int main() 
 {
  int result;

  pthread_t cthreads[NUM_CONSUMER_THREADS];
  pthread_t pthreads[NUM_PRODUCER_THREADS];

  void *thread_result;

  // build an array of consumer threads
  for(int num_of_cthreads = 0; num_of_cthreads < NUM_CONSUMER_THREADS; num_of_cthreads++) 
  {
   result = pthread_create(&(cthreads[num_of_cthreads]), NULL, c_thread_function, (void *)num_of_cthreads);
   if ( result != 0 )
   {
    perror( "Thread Creation Failed");
    exit(EXIT_FAILURE);
   }
   //sleep(1);  
  } 

  // build an array of producer threads
  for(int num_of_pthreads = 0; num_of_pthreads < NUM_PRODUCER_THREADS; num_of_pthreads++) 
  {
   result = pthread_create(&(pthreads[num_of_pthreads]), NULL, p_thread_function, NULL);
   if ( result != 0 )
   {
    perror( "Thread Creation Failed");
    exit(EXIT_FAILURE);
   }
   //sleep(1);  
  }

  printf("All threads created\n");
  while ( true )
  {
   // do nothing
  }
 }

 void *c_thread_function(void *arg)
 {
  int temp = (long)arg;
  printf("Consumer thread %d created \n", temp);

  while ( true )
  {
   while (  q.size() > 0 )
   {
    int w = take();
    consume(w, temp);
    printf(" q size is now %d \n", q.size());
   }
  }
 }

 void *p_thread_function(void *arg) 
 {
  printf("Producer thread created \n");

  int itemsAdded = 0;
  int temp;
  int sleepTime;

  while ( true ) 
  {
   while ( q.size() < BUFFER_SIZE )
   {
    temp = produce(itemsAdded);

    sleepTime = 1+(int)(9.0*rand()/(RAND_MAX+1.0));
    sleep(sleepTime);

    append(temp);

    printf("Producer adds: %d \n", temp);
    printf(" q size is now %d \n", q.size());

    itemsAdded++;
   }
  }
 }
Run Code Online (Sandbox Code Playgroud)

输出:

制片人补充说:1

q大小现在为-1

0消耗1

q大小现在是-2

1消耗1

q现在是-3

3消耗1

q现在是-4

4消耗0

q现在是-5

0消耗0

Cha*_*via 5

你需要了解种族条件互斥的概念.您的std::queue对象是共享资源,意味着可能同时在其上运行多个线程.这意味着您必须使用锁(称为互斥锁)来保护它,以便每次访问都是同步的.否则,您将获得所谓的竞争条件,其中一个线程修改数据,而另一个线程也访问/修改数据,从而导致程序状态不一致或损坏.

要防止竞争条件,您需要pthread_mutex在每次访问队列之前锁定对象.

首先,您需要创建一个互斥对象并对其进行初始化.

pthread_mutex mymutex;
pthread_mutex_init(&mymutex, 0);
Run Code Online (Sandbox Code Playgroud)

您的应用程序代码应如下所示:

pthread_mutex_lock(&mymutex);

// Do something with queue

pthread_mutex_unlock(&mymutex);
Run Code Online (Sandbox Code Playgroud)

当一个线程获得锁定时,没有其他线程可以获取锁定.尝试获取已被另一个线程获取的锁的线程将等待直到锁被释放.这会同步对队列的访问,确保一次只有一个线程修改它.