在这三种从共享内存中读取链表的方法中,为什么第三快?

Jos*_*vin 8 c++ performance multithreading latency shared-memory

我有一个'服务器'程序,它可以更新共享内存中的许多链表以响应外部事件.我希望客户端程序能够尽快注意到任何列表上的更新(最低延迟).服务器标记链接列表的节点,state_就像FILLED填充数据并将其下一个指针设置为有效位置一样.在那之前,它state_NOT_FILLED_YET.我使用内存屏障,以确保客户看不到state_FILLED数据之前,中实际上是准备好(和它似乎工作,我从来没有看到被破坏的数据).此外,state_是易失性的,以确保编译器不会解除客户端的循环检查.

保持服务器代码完全相同,我提出了3种不同的方法让客户端扫描链表以进行更改.问题是:为什么第三种方法最快?

方法1:连续循环遍历所有链接列表(称为"通道"),查看是否有任何节点已更改为"已填充":

void method_one()
{
    std::vector<Data*> channel_cursors;
    for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
    {
        Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
        channel_cursors.push_back(current_item);
    }

    while(true)
    {
        for(std::size_t i = 0; i < channel_list.size(); ++i)
        {   
            Data* current_item = channel_cursors[i];

            ACQUIRE_MEMORY_BARRIER;
            if(current_item->state_ == NOT_FILLED_YET) {
                continue;
            }

            log_latency(current_item->tv_sec_, current_item->tv_usec_);

            channel_cursors[i] = static_cast<Data*>(current_item->next_.get(segment));
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

当通道数量很小时,方法1给出了非常低的延迟.但是当通道数量增加(250K +)时,由于在所有通道上循环,它变得非常慢.所以我试过......

方法2:为每个链接列表提供ID.保留一个单独的"更新列表".每次更新其中一个链接列表时,请将其ID推送到更新列表.现在我们只需要监控单个更新列表,并检查我们从中获取的ID.

void method_two()
{
    std::vector<Data*> channel_cursors;
    for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
    {
        Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
        channel_cursors.push_back(current_item);
    }

    UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));

    while(true)
    {   
            ACQUIRE_MEMORY_BARRIER;
        if(update_cursor->state_ == NOT_FILLED_YET) {
            continue;
        }

        ::uint32_t update_id = update_cursor->list_id_;

        Data* current_item = channel_cursors[update_id];

        if(current_item->state_ == NOT_FILLED_YET) {
            std::cerr << "This should never print." << std::endl; // it doesn't
            continue;
        }

        log_latency(current_item->tv_sec_, current_item->tv_usec_);

        channel_cursors[update_id] = static_cast<Data*>(current_item->next_.get(segment));
        update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
    }   
}
Run Code Online (Sandbox Code Playgroud)

方法2给出了可怕的延迟.方法1可能会给出低于10us的延迟,而方法2会莫名其妙地经常给出8ms的延迟!使用gettimeofday似乎update_cursor-> state_的变化从服务器的视图传播到客户端是非常慢的(我在多核盒子上,所以我假设延迟是由于缓存).所以我尝试了混合方法......

方法3:保留更新列表.但是连续循环遍历所有通道,并在每次迭代中检查更新列表是否已更新.如果有,请使用推到它上面的数字.如果没有,请检查我们当前迭代的频道.

void method_three()
{
    std::vector<Data*> channel_cursors;
    for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
    {
        Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
        channel_cursors.push_back(current_item);
    }

    UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));

    while(true)
    {
        for(std::size_t i = 0; i < channel_list.size(); ++i)
        {
            std::size_t idx = i;

            ACQUIRE_MEMORY_BARRIER;
            if(update_cursor->state_ != NOT_FILLED_YET) {
                //std::cerr << "Found via update" << std::endl;
                i--;
                idx = update_cursor->list_id_;
                update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
            }

            Data* current_item = channel_cursors[idx];

            ACQUIRE_MEMORY_BARRIER;
            if(current_item->state_ == NOT_FILLED_YET) {
                continue;
            }

            found_an_update = true;

            log_latency(current_item->tv_sec_, current_item->tv_usec_);
            channel_cursors[idx] = static_cast<Data*>(current_item->next_.get(segment));
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

此方法的延迟与方法1一样好,但是缩放到大量通道.问题是,我不知道为什么.只是为了解决问题:如果我取消注释"通过更新找到"部分,它会在每个延迟日志消息之间打印.这意味着只有在更新列表中找到了东西!所以我不明白这种方法如何比方法2更快.

生成随机字符串作为测试数据的完整,可编译代码(需要GCC和boost-1.41)位于:http://pastebin.com/0kuzm3Uf

更新:所有3种方法都有效地自动锁定,直到发生更新.不同之处在于他们注意到更新发生了多长时间.他们对处理器不断征税,因此无法解释速度差异.我正在测试4核机器上没有其他任何东西在运行,所以服务器和客户端没有任何竞争对手.我甚至制作了一个代码版本,其中更新发出信号并让客户端等待条件 - 它没有帮助任何方法的延迟.

Update2:尽管有3种方法,但我一次只尝试1种方法,因此只有1台服务器和1台客户端竞争state_ member.

Jos*_*vin 0

答案很难弄清楚,公平地说,我提供的信息很难,但如果有人真正编译了我提供的源代码,他们就有机会;)我说打印了“通过更新列表找到”在每条延迟日志消息之后,但这实际上并不是真的——只有我可以在终端中回滚时才如此。一开始,在没有使用更新列表的情况下发现了大量更新。

问题是,在我在更新列表中设置起点和在每个数据列表中设置起点之间,会存在一些滞后,因为这些操作需要时间。请记住,在整个过程中,列表一直在增长。考虑最简单的情况,我有 2 个数据列表,A 和 B。当我在更新列表中设置起点时,由于列表 A 上有 30 个更新,列表 B 上有 30 个更新,其中恰好有 60 个元素。假设它们已经交替:

A
B
A
B
A // and I start looking at the list here
B
Run Code Online (Sandbox Code Playgroud)

但是当我将更新列表设置到那里后,B 有大量更新,而 A 没有更新。然后我在每个数据列表中设置了起始位置。我的数据列表的起点将是在更新激增之后,但我在更新列表中的起点是在更新激增之前,所以现在我要检查一堆更新但找不到它们。上面的混合方法效果最好,因为通过在找不到更新时迭代所有元素,它可以快速缩小更新列表所在位置和数据列表所在位置之间的时间间隙。