如何将并发解决方案应用于生产者 - 消费者之类的情况

gli*_*ite 5 c++ performance multithreading producer-consumer c++11

我有一个带有一系列节点的XML文件.每个节点代表一个我需要解析并添加到排序列表中的元素(顺序必须与文件中找到的节点相同).

目前我正在使用顺序解决方案:

struct Graphic
{
    bool parse()
    {
        // parsing...
        return parse_outcome;
    }
};

vector<unique_ptr<Graphic>> graphics;

void producer()
{
    for (size_t i = 0; i < N_GRAPHICS; i++)
    {
        auto g = new Graphic();

        if (g->parse())
            graphics.emplace_back(g);
        else
            delete g;
    }
}
Run Code Online (Sandbox Code Playgroud)

因此,只有图形(实际上是派生的类的实例Graphic,Line,Rectangle等,这就是为什么new)才能正确解析,它才会被添加到我的数据结构中.

由于我只关心将图形添加到列表中的顺序,我虽然是异步调用解析方法,因此生产者的任务是从文件中读取每个节点并将此图形添加到数据结构中,同时对消费者有每当一个新的图形已准备好被解析解析每个图形的任务.

现在我有几个消费者线程(在main中创建),我的代码如下所示:

queue<pair<Graphic*, size_t>> q;
mutex m;
atomic<size_t> n_elements;

void producer()
{
    for (size_t i = 0; i < N_GRAPHICS; i++)
    {
        auto g = new Graphic();
        graphics.emplace_back(g);
        q.emplace(make_pair(g, i));
    }

    n_elements = graphics.size();
}

void consumer()
{
    pair<Graphic*, size_t> item;

    while (true)
    {
        {
            std::unique_lock<std::mutex> lk(m);

            if (n_elements == 0)
                return;

            n_elements--;
            item = q.front();
            q.pop();
        }

        if (!item.first->parse())
        {
            // here I should remove the item from the vector
            assert(graphics[item.second].get() == item.first);
            delete item.first;
            graphics[item.second] = nullptr;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我首先在我的main中运行生成器,这样当第一个使用者启动时队列已经完全填满了.

int main()
{
    producer();

    vector<thread> threads;

    for (auto i = 0; i < N_THREADS; i++)
        threads.emplace_back(consumer);

    for (auto& t : threads)
        t.join();

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

并发版本似乎至少是原始版本的两倍.完整代码已在此处上传.

现在我想知道:

  • 我的代码中是否存在任何(同步)错误?
  • 有没有办法更快(或更好)地实现相同的结果?

另外,我注意到在我的计算机上,如果我将线程数等于8,我得到了最好的结果(就经过的时间而言).更多(或更少)线程给我最差的结果.为什么?

Gui*_*let 0

Blockquote 没有同步错误,但我认为内存管理可能会更好,因为如果parse()抛出异常,您的代码就会泄漏。

没有同步错误,但我认为您的内存管理可能会更好,因为如果parse()抛出异常,则会出现泄漏。

Blockquote 有没有办法更快(或更好)地达到相同的结果?

大概。您可以使用线程池的简单实现和 lambda 来为您执行 parse() 。

下面的代码说明了这种方法。我这里使用线程池实现

#include <iostream>
#include <stdexcept>
#include <vector>
#include <memory>
#include <chrono>
#include <utility>
#include <cassert>
#include <ThreadPool.h>

using namespace std;
using namespace std::chrono;



#define N_GRAPHICS        (1000*1000*1)
#define N_THREADS       8


struct Graphic;
using GPtr = std::unique_ptr<Graphic>;

static vector<GPtr> graphics;

struct Graphic
{
    Graphic()
        : status(false)
    {
    }


    bool parse()
    {
        // waste time
        try
        {
            throw runtime_error("");
        } 
        catch (runtime_error)
        {
        }

        status = true;
        //return false;
        return true;
    }


    bool status;
};


int main()
{
    auto start = system_clock::now();

    auto producer_unit = []()-> GPtr {
        std::unique_ptr<Graphic> g(new Graphic);
        if(!g->parse()){
            g.reset(); // if g don't parse, return nullptr
        }
        return g;        
    };

    using ResultPool = std::vector<std::future<GPtr>>;
    ResultPool results;
    // ThreadPool pool(thread::hardware_concurrency());
    ThreadPool pool(N_THREADS);
    for(int i = 0; i <N_GRAPHICS; ++i){
     // Running async task
     results.emplace_back(pool.enqueue(producer_unit));
    }

   for(auto &t : results){
        auto value = t.get();
        if(value){
          graphics.emplace_back(std::move(value));
        }
    }

    auto duration = duration_cast<milliseconds>(system_clock::now() - start);
    cout << "Elapsed: " << duration.count() << endl;

    for (size_t i = 0; i < graphics.size(); i++)
    {
        if (!graphics[i]->status)
        {
            cerr << "Assertion failed! (" << i << ")" << endl;
            break;
        }
    }

    cin.get();
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

它在我的机器上更快一点(1s),更具可读性,并且消除了共享数据的必要性(同步是邪恶的,避免它或以可靠且有效的方式隐藏它)。