单个读取器多个写入器与pthreads和锁定,没有提升

Ale*_*oft 3 c++ multithreading pthreads producer-consumer

考虑下一段代码.

#include <iostream>
#include <vector>
#include <map>

using namespace std;

map<pthread_t,vector<int>> map_vec;
vector<pair<pthread_t ,int>> how_much_and_where;

pthread_cond_t CV = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void* writer(void* args)
{
    while(*some condition*)
    {
        int howMuchPush = (rand() % 5) + 1;
        for (int i = 0; i < howMuchPush; ++i)
        {
            // WRITE
            map_vec[pthread_self()].push_back(rand() % 10);
        }

        how_much_and_where.push_back(make_pair(pthread_self(), howMuchPush));
        // Wake up the reader - there's something to read.
        pthread_cond_signal(&CV);
    }

    cout << "writer thread: " <<  pthread_self()  << endl;
    return nullptr;
}

void* reader(void* args) {

    pair<pthread_t, int> to_do;

    pthread_cond_wait(&CV, &mutex);
    while(*what condition??*)
    {
        to_do = how_much_and_where.front();
        how_much_and_where.erase(how_much_and_where.begin());

        // READ
        cout << to_do.first << " wrote " << endl;
        for (int i = 0; i < to_do.second; i++)
        {
            cout << map_vec[to_do.first][i] << endl;
        }

        // Done reading. Go to sleep.
        pthread_cond_wait(&CV, &mutex);
    }

    return nullptr;
}

//----------------------------------------------------------------------------//


int main()
{
    pthread_t threads[4];

    // Writers
    pthread_create(&threads[0], nullptr, writer, nullptr);
    pthread_create(&threads[1], nullptr, writer, nullptr);
    pthread_create(&threads[2], nullptr, writer, nullptr);
    // reader
    pthread_create(&threads[4], nullptr, reader, nullptr);


    pthread_join(threads[0], nullptr);
    pthread_join(threads[1], nullptr);
    pthread_join(threads[2], nullptr);
    pthread_join(threads[3], nullptr);

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

背景

每个作家都有自己的容器来写数据.并且假设有一个读者知道作者何时完成数据块的编写,以及该块的大小(读者有一个容器,编写者将这些数据写入其中).

问题

  • 显然我应该把锁放在共享源上 - map_vechow_much_and_where.但我不明白,在这种情况下,是什么- 在这个资源上定位锁定的有效方法(例如,在for循环中map_vec每次锁定之前锁定push_back?或者可能在for循环之前锁定它 - 但是不推动到队列是一个浪费和长时间的操作,这可能会导致读者等待太多?)/ 安全的方式锁定位置,以防止死锁.
  • 我不明白应该在while循环中的正确条件 - 我认为可能只要how_much_and_where不是空的,但显然读者how_much_and_where在作者添加一对之前清空的情况可能会消失.
  • 假设一个作者在读者忙着读取一些数据时发出了一个信号.据我所知,这个信号将被忽略,而编写者推动的那对信号可能永远不会被处理(接收和处理<#of对对象的任务的信号#)​​.我该如何防止这种情况?

pas*_*sti 5

为了简化操作,我们应该从实际生产者和消费者的实现(非通用目的)中分离通用/可重用生产者 - 消费者队列的实现(或简称为"阻塞队列",就像我通常所说的那样). /可重用 - 它们特定于您的程序).这将使代码从设计角度更加清晰和易于管理.

1.实现通用(可重用)阻塞队列

首先,您应该实现一个"阻塞队列",可以管理多个多个生产者和一个消费者.此阻塞队列将包含处理多线程/同步的代码,并且消费者线程可以使用它来从多个生产者线程接收项目.这样的阻塞队列可以通过许多不同的方式实现(不仅使用互斥锁+ cond组合),具体取决于您是否拥有1个或更多的使用者以及1个或更多个生成器(有时可以引入不同类型的[特定于平台]优化)当你只有1个消费者或1个生产者时).使用mutex + cond对的最简单的队列实现会在需要时自动处理多个生产者和多个使用者.

队列中有仅一个内部容器(它可以是一个非线程安全的std ::队列,向量或列表),它保存的物品和保护从多个线程的并发访问此容器相关联的互斥+ COND对.队列必须提供两个操作:

  • produce(item):将一个项放入队列并立即返回.伪代码如下所示:

    1. 锁定互斥锁
    2. 将新项添加到内部容器
    3. 信号通过cond
    4. 解锁互斥锁
    5. 返回
  • wait_and_get():如果队列中至少有一个项目,则删除最旧的项目并立即返回,否则等待util有人将一个项目放入队列中进行produce(item)操作.

    1. 锁定互斥锁
    2. 如果容器是空的:

      1. 等待cond(pthread_cond_wait)
    3. 删除最旧的项目

    4. 解锁互斥锁
    5. 返回删除的最旧项目

2.使用阻塞队列实现程序

既然你有一个可重用的阻塞队列来构建,我们可以实现生成器和消费者以及控制事物的主线程.

制片人

他们只是将一堆项目放入队列(通过调用produce(item)阻塞队列),然后退出.如果项目的生成不是计算量大或者不需要等待大量IO操作,那么这将在您的示例程序中非常快速地完成.要模拟线程执行繁重工作的真实场景,您可以执行以下操作:在每个生产者线程上,您只将X(比如说5个)项目放入队列,但是在每个项目之间等待一个随机的秒数让我们说1到3秒之间.请注意,经过一段时间后,生产者线程在完成工作后自行退出.

消费者

消费者有一个无限循环,它总是从队列中获取下一个项目wait_and_get()并以某种方式处理它.如果它是表示处理结束的特殊项目,那么它会突破无限循环而不是处理项目.伪代码:

  1. 无限循环:

    1. 从队列中获取下一个项目(wait_and_get())
    2. 如果这是表示处理结束的特殊项目,则打破循环...
    3. 否则让我们处理这个项目

主线程

  1. 以任何顺序启动所有线程,包括生产者和消费者.
  2. 等待所有生产者线程完成(pthread_join()他们).

    请记住,生产者在一段时间后自己完成并退出,没有外部刺激.当您完成所有生产者的加入时,这意味着每个生产者都已退出,因此没有人会produce(item)再次调用队列的操作.但是,队列可能仍然有未处理的项目,消费者仍然可以处理这些项目.

  3. 将最后一个特殊的"处理结束"项放入消费者的队列中.

    当消费者完成处理由生产者生成的最后一个项目时,它仍然会向队列询问下一个项目wait_and_get()- 这可能会导致死锁,因为等待从未到达的下一个项目.为了在主线程上提供帮助,我们将最后一个特殊项目放入队列,以表示消费者的处理结束.请记住,我们的消费者实现包含对此特殊项目的检查,以确定何时完成处理.重要的是,只有在生成器完成后(加入它们之后)才能将这个特殊项放在主线程的队列中!

    如果您有多个消费者,则更容易将多个特殊的"处理结束"项目放入队列(每个消费者1个),而不是使队列更智能,以便能够处理只有1个"处理结束"项目的多个消费者.由于主线程协调整个事物(线程创建,线程连接等),它确切地知道消费者的数量,因此很容易将相同数量的"处理结束"项目放入队列.

  4. 等待消费者线程通过加入来终止.

    在将处理结束特殊项目放入队列之后,我们等待消费者线程处理剩余项目(由生产者生成)以及我们的最后一个特殊项目(由主"协调员"线程生成),该项目要求消费者完.我们通过pthread_join()消费者线程在主线程上等待.

补充说明:

  • 在我的线程系统实现中,阻塞队列的项通常是指针 - 指向必须执行/处理的"作业"对象的指针.(您可以将阻塞队列实现为模板类,在这种情况下,阻塞队列的用户可以指定项的类型).在我的例子中,很容易将一个特殊的"处理结束"项目放入消费者的队列中:我通常使用一个简单的NULL作业指针来实现此目的.在您的情况下,您将必须找出在队列中可以使用哪种特殊值来表示消费者的处理结束.
  • 生产者可能拥有自己的队列和一大堆其他数据结构,他们可以使用这些数据结构来"生产项目",但消费者并不关心这些数据结构.消费者只关心通过其自己的阻塞队列接收的单个项目.如果生产者需要来自消费者的东西,那么它必须通过队列向消费者发送项目("工作").阻塞队列实例属于使用者线程 - 它在任意线程和使用者线程之间提供单向通信通道.甚至消费者线程本身也可以将一个项放入自己的队列中(在某些情况下这很有用).
  • pthread_cond_wait文件说,这个功能可以唤醒没有实际的信号(虽然我从来没有见过在我的生命造成这个函数的虚假wakup一个错误).为了解决这个if container is empty then pthread_cond_wait问题,代码的一部分应该被替换为while the container is empty pthread_cond_wait但是,这个虚假的唤醒事件可能是一个只在某些架构上存在的具有线程原语的特定linux实现的lochness怪物,所以你的代码可能在台式机上运行而不关心这个问题.