什么是在实时应用程序中同步多个线程之间的容器访问的最佳方法

red*_*ver 10 c++ concurrency multithreading access-synchronization

std::list<Info> infoList在我的应用程序中有两个线程共享.这2个线程正在访问此列表,如下所示:

主题1:使用push_back(),pop_front()clear()在列表上(视情况而定) 线程2:使用一个iterator通过列表中的项目进行迭代,并做一些动作.

线程2正在迭代列表,如下所示:

for(std::list<Info>::iterator i = infoList.begin(); i != infoList.end(); ++i)
{
  DoAction(i);
}
Run Code Online (Sandbox Code Playgroud)

代码使用GCC 4.4.2编译.

有时++ i会导致段错误并导致应用程序崩溃.该错误是在以下行的std_list.h第143行引起的:

_M_node = _M_node->_M_next;
Run Code Online (Sandbox Code Playgroud)

我想这是一个赛车条件.当线程2迭代它时,列表可能已被线程1更改或甚至清除.

我使用Mutex来同步对此列表的访问,并且在我的初始测试期间一切正常.但是系统只是在压力测试下冻结,使得这个解决方案完全不可接受.此应用程序是一个实时应用程序,我需要找到一个解决方案,以便两个线程可以尽可能快地运行,而不会损害总的应用程序吞吐量.

我的问题是:线程1和线程2需要尽可能快地执行,因为这是一个实时应用程序.我该怎么做才能防止这个问题并仍然保持应用程序性能?是否有任何无锁算法可用于此类问题?

如果我Info在线程2的迭代中错过了一些新添加的对象,但是我可以做些什么来防止迭代器成为悬空指针?

谢谢

Han*_*ant 5

你的for()循环可能会在相当长的时间内保持锁定,具体取决于它迭代的元素数量.如果它"轮询"队列,不断检查新元素是否可用,则可能会遇到麻烦.这使得线程在不合理的长时间内拥有互斥锁,几乎没有机会让生产者线程进入并添加一个元素.并且在此过程中消耗了大量不必要的CPU周期.

你需要一个"有界阻塞队列".不要自己写,锁的设计并不简单.很难找到好的例子,大部分都是.NET代码. 这篇文章很有希望.


Har*_*ich 4

一般来说,以这种方式使用 STL 容器并不安全。您将必须实现特定的方法来使您的代码线程安全。您选择的解决方案取决于您的需求。我可能会通过维护两个列表来解决这个问题,每个线程一个。并通过无锁队列传达更改(在该问题的评论中提到)。您还可以通过将 Info 对象包装在 boost::shared_ptr 中来限制它们的生命周期,例如

typedef boost::shared_ptr<Info> InfoReference; 
typedef std::list<InfoReference> InfoList;

enum CommandValue
{
    Insert,
    Delete
}

struct Command
{
    CommandValue operation;
    InfoReference reference;
}

typedef LockFreeQueue<Command> CommandQueue;

class Thread1
{
    Thread1(CommandQueue queue) : m_commands(queue) {}
    void run()
    {
        while (!finished)
        {
            //Process Items and use 
            // deleteInfo() or addInfo()
        };

    }

    void deleteInfo(InfoReference reference)
    {
        Command command;
        command.operation = Delete;
        command.reference = reference;
        m_commands.produce(command);
    }

    void addInfo(InfoReference reference)
    {
        Command command;
        command.operation = Insert;
        command.reference = reference;
        m_commands.produce(command);
    }
}

private:
    CommandQueue& m_commands;
    InfoList m_infoList;
}   

class Thread2
{
    Thread2(CommandQueue queue) : m_commands(queue) {}

    void run()
    {
        while(!finished)
        {
            processQueue();
            processList();
        }   
    }

    void processQueue()
    {
        Command command;
        while (m_commands.consume(command))
        {
            switch(command.operation)
            {
                case Insert:
                    m_infoList.push_back(command.reference);
                    break;
                case Delete:
                    m_infoList.remove(command.reference);
                    break;
            }
        }
    }

    void processList()
    {
        // Iterate over m_infoList
    }

private:
    CommandQueue& m_commands;
    InfoList m_infoList;
}   


void main()
{
CommandQueue commands;

Thread1 thread1(commands);
Thread2 thread2(commands);

thread1.start();
thread2.start();

waitforTermination();

}
Run Code Online (Sandbox Code Playgroud)

这个还没有编译过。您仍然需要确保对Info对象的访问是线程安全的。