多生产者多个消费者线程问题

use*_*710 5 c synchronization pthreads producer-consumer

我有一个程序,我正在尝试实现多生产者,多消费者设置.我有一个代码,当我有一个消费者和多个生产者时似乎运作良好,但引入多个消费者线程似乎引发了一些奇怪的问题.

这就是我现在所拥有的:

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>

#define MAX 10

typedef struct travs {
    int id;
    int numBags;
    int arrTime;
    struct travs *next;
} travs;


travs *queue;
//travs *servicing[MAX];

int produced; // The total # of produced in the queue

pthread_mutex_t queue_lock;
//pthread_mutex_t staff_lock;
pthread_cond_t ct, cs;

int CheckIn(){
    sleep(1);
    if(produced != 0) return 1;
    else return 0;
}



void *producerThread(void *args){
    travs *traveler = (travs *)args;
    // Acquire the mutex
    pthread_mutex_lock(&queue_lock);
    produced++;
//  pthread_cond_signal(&cs);
    pthread_cond_wait(&ct, &queue_lock);
    printf("Producer %d is now checked in at time %d.\n", queue->id, (1+queue-    >arrTime));
    queue = queue->next;
    pthread_mutex_unlock(&queue_lock);

    return; 
}       

int Producer(int id, int numBags, int arrTime){

    int ret;
    pthread_t ttid;
    travs *traveler = malloc(sizeof(travs));
    traveler->id = id;
    traveler->numBags = numBags;
    traveler->arrTime = arrTime;
    sleep(arrTime); 
    pthread_mutex_lock(&queue_lock);
    if(queue != NULL) {
        travs *check_in = malloc(sizeof(travs));
        check_in = queue;
        while(check_in->next != NULL){
            check_in = check_in->next;
        }
        check_in->next = traveler;
    }
    else { queue = traveler; }
    pthread_mutex_unlock(&queue_lock);
    // Create a new traveler thread
    ret = pthread_create(&ttid, NULL, producerThread, (void *)traveler);

    // Check if thread creation was successful
    if(ret == 0) {
        printf("Producer %d has entered the check-in line at time %d; s/he is at     position %d and has %d bags.\n", id, arrTime, produced, numBags);
        pthread_cond_signal(&cs);
        return 0;
    }
    else return -1;

}


void *consumerThread(void *arg){

    int i = 0; // travelers serviced
    char *name = (char *)arg;
    while(1) { // run iteratively

        // If 20 producers have been served, the consumer's work is done.
        if(i == 20) {
            printf("Consumer %s's service has completed!\n", name);
                pthread_exit(NULL);
            }
        // Sleep for 10s if 5 travelers have been checked in
        if (((i+1) % 5) == 0) {
                // Wake up sleeping travelers
                printf("Consumer %s is taking a break.\n", name);
                sleep(2);
                printf("Consumer %s's break is over.\n", name);
        }

        if(CheckIn()) {
            pthread_mutex_lock(&queue_lock);
            int j = 1;
                    pthread_cond_wait(&cs, &queue_lock);
                    printf("Producer %d presents ticket to consumer     %s.\n", queue->id, name);
                    printf("Consumer %s gives boarding pass to producer     %d.\n", name, queue->id);
                    while(j <= queue->numBags){
                        printf("Consumer %s checks in bag %d for     producer %d; baggage tag is _X_.\n", name, j, queue->id);
                        j++;
                }
            // Signal producer being serviced that their check in is complete.
            i++;
            pthread_mutex_unlock(&queue_lock);
            produced--;
            pthread_cond_signal(&ct);
        }
    sleep(3);
    }
}

int Consumer(char *Name) {

    sleep(5);
    int ret;
    pthread_t stid;
    // Create a staff thread

    ret = pthread_create(&stid, NULL, consumerThread, (void *)Name);
    // Acquire the lock
    if(ret == 0) { 
        printf("Producer %s's service has begun!\n", Name);
        return 0;
    }
    else    return -1;
}

int main() {
    int ret = 0;
    char *staff_name = malloc(sizeof(char));
    int staff_check = 0;
    int trav_check = 0;
    int id;
    int bagnum;
    int travtime;
    FILE *consumer_fp;
    FILE *producer_fp;
    queue = malloc(sizeof(travs));
    queue = NULL;
    /*while(ret < 10){
        servicing[ret] = malloc(sizeof(travs));
        servicing[ret] = NULL;
    }*/

    // Initilize mutexes
    pthread_mutex_init(&queue_lock, NULL);
    //pthread_mutex_init(&staff_lock, NULL);

    // Initialize condition variables
    pthread_cond_init(&ct, NULL);
    pthread_cond_init(&cs, NULL);

    // Open the file so we can start reading from it

    consumer_fp = fopen("staff.txt", "r");
    producer_fp = fopen("travelers.txt", "r");

    staff_check = fscanf(consumer_fp, "%s", staff_name);
    trav_check = fscanf(producer_fp, "%d %d %d", &id, &bagnum, &travtime);
    while(1){   

        K:
        while(staff_check == 1) {
            Consumer(staff_name);
            staff_check = fscanf(consumer_fp, "%s", staff_name);
            goto L;
        }
        L:
        while(trav_check == 3) { 
            Producer(id, bagnum, travtime);
            trav_check = fscanf(producer_fp, "%d %d %d", &id, &bagnum,     &travtime);
            goto K;
        }

    pthread_exit(NULL);
    }

}
Run Code Online (Sandbox Code Playgroud)

在这个设置中,每个生成器线程在返回之前只存在很短的时间,并且除了向全局队列添加新元素和很少适当的定时输出行之外,它本身没有真正的计算.

但是,当我引入多个生成器时,只有最后一个生成器线程做任何事情.

根据我的猜测,我需要以下内容:

i)等待登记的生产者的单独队列和当前正在登记的生产者(注释为travs*服务[MAX]以上)

ii)为消费者提供单独的互斥锁.

但是,我不确定如何实现这一点.这是我想到的想法:

  1. CheckIn()生成器线程并将*queue复制到*servicing [i](在使用者线程中).

  2. 设置queue = queue-> next(在生产者线程中).

但是,我怎么能保证当我复制*队列时,它已经不会提前一步了?我可以通过一个与当前线程所持有的锁不同的锁来发出等待线程的信号吗?而且,更重要的是,我如何让不同的消费者线程处理不同的旅行者线程?

任何援助将不胜感激!

alk*_*alk 4

使用一个队列。

编写两个函数,一个将现有项目添加到队列中,一个将项目从队列中删除。不要在这些函数中使用任何锁定。在单线程应用程序中测试它们。

然后为这两个添加和删除函数编写两个包装器。这些包装器应该采用额外的互斥体作为参数。在调用添加或删除函数之前将此互斥锁锁定在包装器中,然后解锁互斥锁。

编写创建新项目并调用 add-item-wrapper 的生产者线程函数。编写消费者线程函数,调用remove-item-wrapper并销毁已删除的项目。

设置main()声明和初始化互斥体的函数,然后使用pthread_create(). 将互斥体作为参数传递给线程函数。