为什么使用std :: multiset作为优先级队列比使用std :: priority_queue更快?

Joh*_*hny 9 c++ performance priority-queue multimap multiset

我尝试用std :: priority_queue替换std :: multiset。但是我对速度结果感到失望。该算法的运行时间增加了50%...

以下是相应的命令:

top() = begin();
pop() = erase(knn.begin());
push() = insert();
Run Code Online (Sandbox Code Playgroud)

我对priority_queue实现的速度感到惊讶,我预期会有不同的结果(对于PQ更好)...从概念上讲,多集被用作优先级队列。为什么优先级队列和多重集即使在情况下也具有如此不同的性能-O2

十个结果的平均值,MSVS 2010,Win XP,32位,方法findAllKNN2()(请参见下面的内容)

MS
N           time [s]
100 000     0.5
1 000 000   8

PQ
N           time [s]
100 000     0.8
1 000 000   12
Run Code Online (Sandbox Code Playgroud)

什么会导致这些结果?尚未对源代码进行其他更改...感谢您的帮助...

MS实施:

template <typename Point>
struct TKDNodePriority
{
    KDNode <Point> *node;
    typename Point::Type priority;

    TKDNodePriority() : node ( NULL ), priority ( 0 ) {}
    TKDNodePriority ( KDNode <Point> *node_, typename Point::Type priority_ ) : node ( node_ ), priority ( priority_ ) {}

    bool operator < ( const TKDNodePriority <Point> &n1 ) const
    {
            return priority > n1.priority;
    }
};

template <typename Point>
struct TNNeighboursList
{
    typedef std::multiset < TKDNodePriority <Point> > Type;
};
Run Code Online (Sandbox Code Playgroud)

方法:

template <typename Point>
template <typename Point2>
void KDTree2D <Point>::findAllKNN2 ( const Point2 * point, typename TNNeighboursList <Point>::Type & knn, unsigned int k, KDNode <Point> *node, const unsigned int depth ) const
{
    if ( node == NULL )
    {
            return;
    }

    if ( point->getCoordinate ( depth % 2 ) <= node->getData()->getCoordinate ( depth % 2 ) )
    {
    findAllKNN2 ( point, knn, k, node->getLeft(), depth + 1 );
    }

    else 
    {
            findAllKNN2 ( point, knn, k, node->getRight(), depth + 1 );
    }

typename Point::Type dist_q_node = ( node->getData()->getX() - point->getX() ) * ( node->getData()->getX() - point->getX() ) +
                             ( node->getData()->getY() - point->getY() ) * ( node->getData()->getY() - point->getY() );

if (knn.size() == k)
{
    if (dist_q_node < knn.begin()->priority )
    {
        knn.erase(knn.begin());
        knn.insert ( TKDNodePriority <Point> ( node,  dist_q_node ) );
    }
}

else
{
    knn.insert ( TKDNodePriority <Point> ( node,  dist_q_node ) );
}

typename Point::Type dist_q_node_straight = ( point->getCoordinate ( node->getDepth() % 2 ) - node->getData()->getCoordinate ( node->getDepth() % 2 ) ) *
                                                ( point->getCoordinate ( node->getDepth() % 2 ) - node->getData()->getCoordinate ( node->getDepth() % 2 ) ) ;

typename Point::Type top_priority =  knn.begin()->priority;
if ( knn.size() < k ||  dist_q_node_straight <  top_priority )
{
            if ( point->getCoordinate ( node->getDepth() % 2 ) < node->getData()->getCoordinate ( node->getDepth() % 2 ) )
            {
        findAllKNN2 ( point, knn, k, node->getRight(), depth + 1 );
    }

    else
    {
        findAllKNN2 ( point, knn, k, node->getLeft(), depth + 1 );
    }
}
}
Run Code Online (Sandbox Code Playgroud)

PQ实施(速度较慢,为什么?)

template <typename Point>
struct TKDNodePriority
{
    KDNode <Point> *node;
    typename Point::Type priority;

    TKDNodePriority() : node ( NULL ), priority ( 0 ) {}
    TKDNodePriority ( KDNode <Point> *node_, typename Point::Type priority_ ) : node ( node_ ), priority ( priority_ ) {}

    bool operator < ( const TKDNodePriority <Point> &n1 ) const
    {
            return priority > n1.priority;
    }
};


template <typename Point>
struct TNNeighboursList
{ 
    typedef std::priority_queue< TKDNodePriority <Point> > Type;
};
Run Code Online (Sandbox Code Playgroud)

方法:

template <typename Point>
template <typename Point2>
void KDTree2D <Point>::findAllKNN2 ( const Point2 * point, typename TNNeighboursList <Point>::Type & knn, unsigned int k, KDNode <Point> *node, const unsigned int depth ) const
{

    if ( node == NULL )
    {
            return;
    }

    if ( point->getCoordinate ( depth % 2 ) <= node->getData()->getCoordinate ( depth % 2 ) )
    {
    findAllKNN2 ( point, knn, k, node->getLeft(), depth + 1 );
    }

    else 
    {
            findAllKNN2 ( point, knn, k, node->getRight(), depth + 1 );
    }

typename Point::Type dist_q_node = ( node->getData()->getX() - point->getX() ) * ( node->getData()->getX() - point->getX() ) +
                             ( node->getData()->getY() - point->getY() ) * ( node->getData()->getY() - point->getY() );

if (knn.size() == k)
{
    if (dist_q_node < knn.top().priority )
    {
        knn.pop();

        knn.push ( TKDNodePriority <Point> ( node,  dist_q_node ) );
    }
}

else
{
    knn.push ( TKDNodePriority <Point> ( node,  dist_q_node ) );
}

typename Point::Type dist_q_node_straight = ( point->getCoordinate ( node->getDepth() % 2 ) - node->getData()->getCoordinate ( node->getDepth() % 2 ) ) *
                                                ( point->getCoordinate ( node->getDepth() % 2 ) - node->getData()->getCoordinate ( node->getDepth() % 2 ) ) ;

typename Point::Type top_priority =  knn.top().priority;
if ( knn.size() < k ||  dist_q_node_straight <  top_priority )
{
            if ( point->getCoordinate ( node->getDepth() % 2 ) < node->getData()->getCoordinate ( node->getDepth() % 2 ) )
            {
        findAllKNN2 ( point, knn, k, node->getRight(), depth + 1 );
    }

    else
    {
        findAllKNN2 ( point, knn, k, node->getLeft(), depth + 1 );
    }
}
}
Run Code Online (Sandbox Code Playgroud)

小智 7

It appears that your compiler's optimization settings may have a large impact on performance. In the code below, multiset handily beats vector-based and deque-based priority_queue without optimization. However, with "-O3" optimization, the vector-based priority queue beats all. Now, these experiments were run on Linux with GCC, so perhaps you would get different results on Windows. I believe enabling optimization may remove many error-checking behaviors in STL's vector.

Without Optimization:

pq-w-vector: 79.2997ms

pq-w-deque: 362.366ms

pq-w-multiset: 34.649ms

With -O2 Optimization:

pq-w-vector: 8.88154ms

pq-w-deque: 17.5233ms

pq-w-multiset: 12.5539ms

With -O3 Optimization:

pq-w-vector: 7.92462ms

pq-w-deque: 16.8028ms

pq-w-multiset: 12.3208ms

Test Harness (don't forget to link with -lrt):

#include <iostream>
#include <queue>
#include <deque>
#include <set>
#include <ctime>
#include <cstdlib>
#include <unistd.h>

using namespace std;

template <typename T>
double run_test(T& pq, int size, int iterations)
{
    struct timespec start, end;

    for(int i = 0; i < size; ++i)
        pq.push(rand());

    clock_gettime(CLOCK_THREAD_CPUTIME_ID, &start);
    for(int i = 0; i < iterations; ++i)
    {
        if(rand()%2)
            pq.pop();
        else
            pq.push(rand());

    }
    clock_gettime(CLOCK_THREAD_CPUTIME_ID, &end);

    end.tv_sec -= start.tv_sec;
    end.tv_nsec -= start.tv_nsec;
    if (end.tv_nsec < 0)
    {
        --end.tv_sec;
        end.tv_nsec += 1000000000ULL;
    }

    return (end.tv_sec*1e3 + end.tv_nsec/1e6);
}

template <class T>
class multiset_pq: public multiset<T>
{
public:
    multiset_pq(): multiset<T>() {};
    void push(T elm) { this->insert(elm); }
    void pop() { if(!this->empty()) this->erase(this->begin()); }
    const T& top() { return *this->begin(); }
};

int main(void)
{
    const int size = 5000;
    const int iterations = 100000;

    priority_queue<int, vector<int> > pqv;
    priority_queue<int, deque<int> > pqd;
    multiset_pq<int> pqms;

    srand(time(0));

    cout<<"pq-w-vector: "<<run_test(pqv, size, iterations)<<"ms"<<endl;
    cout<<"pq-w-deque: "<<run_test(pqd, size, iterations)<<"ms"<<endl;
    cout<<"pq-w-multiset: "<<run_test(pqms, size, iterations)<<"ms"<<endl;

    return 0;
}
Run Code Online (Sandbox Code Playgroud)


Lia*_*tre 5

首先,作者没有提供导致性能下降的最小代码示例。其次,这个问题是在8年前提出的,我相信编译器可以极大地提高性能。

我已经做了一个基准测试示例,我将队列中的第一个元素放入,然后以另一个优先级推回(模拟新元素的推入而不创建一个),并通过迭代kNodesCount循环中数组中的元素计数来实现kRunsCount。我正在priority_queuemultiset和进行比较multimap。我已决定multimap进行更精确的比较。这个简单的测试非常接近作者的用例,我也尝试重现他在代码示例中使用的结构。

#include <set>
#include <type_traits>
#include <vector>
#include <chrono>
#include <queue>
#include <map>
#include <iostream>

template<typename T>
struct Point {
    static_assert(std::is_integral<T>::value || std::is_floating_point<T>::value, "Incompatible type");
    using Type = T;

    T x;
    T y;
};

template<typename T>
struct Node {
    using Type = T;

    Node<T> * left;
    Node<T> * right;
    T data;
};

template <typename T>
struct NodePriority {
    using Type = T;
    using DataType = typename T::Type;

    Node<T> * node = nullptr;
    DataType priority = static_cast<DataType>(0);

    bool operator < (const NodePriority<T> & n1) const noexcept {
        return priority > n1.priority;
    }

    bool operator > (const NodePriority<T> & n1) const noexcept {
        return priority < n1.priority;
    }
};

// descending order by default
template <typename T>
using PriorityQueueList = std::priority_queue<T>;

// greater used because of ascending order by default
template <typename T>
using MultisetList = std::multiset<T, std::greater<T>>;

// greater used because of ascending order by default
template <typename T>
using MultimapList = std::multimap<typename T::DataType, T, std::greater<typename T::DataType>>;

struct Inner {
    template<template <typename> class C, typename T>
    static void Operate(C<T> & list, std::size_t priority);

    template<typename T>
    static void Operate(PriorityQueueList<T> & list, std::size_t priority) {
        if (list.size() % 2 == 0) {
            auto el = std::move(list.top());
            el.priority = priority;
            list.push(std::move(el));
        }
        else {
            list.pop();
        }
    }

    template<typename T>
    static void Operate(MultisetList<T> & list, std::size_t priority) {
        if (list.size() % 2 == 0) {
            auto el = std::move(*list.begin());
            el.priority = priority;
            list.insert(std::move(el));
        }
        else {
            list.erase(list.begin());
        }
    }

    template<typename T>
    static void Operate(MultimapList<T> & list, std::size_t priority) {
        if (list.size() % 2 == 0) {
            auto el = std::move(*list.begin());
            auto & elFirst = const_cast<int&>(el.first);
            elFirst = priority;
            el.second.priority = priority;
            list.insert(std::move(el));
        }
        else {
            list.erase(list.begin());
        }
    }
};

template<typename T>
void doOperationOnPriorityList(T & list) {
    for (std::size_t pos = 0, len = list.size(); pos < len; ++pos) {
        // move top element and update priority
        auto priority = std::rand() % 10;
        Inner::Operate(list, priority);
    }
}

template<typename T>
void measureOperationTime(T & list, std::size_t runsCount) {
    std::chrono::system_clock::time_point t1, t2;
    std::uint64_t totalTime(0);
    for (std::size_t i = 0; i < runsCount; ++i) {
        t1 = std::chrono::system_clock::now();
        doOperationOnPriorityList(list);
        t2 = std::chrono::system_clock::now();
        auto castedTime = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
        std::cout << "Run " << i << " time: " << castedTime << "\n";
        totalTime += castedTime;
    }

    std::cout << "Average time is: " << totalTime / runsCount << " ms" << std::endl;
}

int main() {
    // consts
    const int kNodesCount = 10'000'000;
    const int kRunsCount = 10;

    // prepare data
    PriorityQueueList<NodePriority<Point<int>>> neighboursList1;
    MultisetList<NodePriority<Point<int>>> neighboursList2;
    MultimapList<NodePriority<Point<int>>> neighboursList3;
    std::vector<Node<Point<int>>> nodes;
    nodes.reserve(kNodesCount);
    for (auto i = 0; i < kNodesCount; ++i) {
        nodes.emplace_back(decltype(nodes)::value_type{ nullptr, nullptr, { 0,0 } });
        auto priority = std::rand() % 10;
        neighboursList1.emplace(decltype(neighboursList1)::value_type{ &nodes.back(), priority });
        neighboursList2.emplace(decltype(neighboursList2)::value_type{ &nodes.back(), priority });
        neighboursList3.emplace(decltype(neighboursList3)::value_type{ priority, { &nodes.back(), priority } });
    }

    // do operation on data
    std::cout << "\nPriority queue\n";
    measureOperationTime(neighboursList1, kRunsCount);
    std::cout << "\nMultiset\n";
    measureOperationTime(neighboursList2, kRunsCount);
    std::cout << "\nMultimap\n";
    measureOperationTime(neighboursList3, kRunsCount);

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

我已经使用VS v15.8.9使用/ Ox进行了发布。看一下10次运行中的1万件商品的结果:

Priority queue
Run 0 time: 764
Run 1 time: 933
Run 2 time: 920
Run 3 time: 813
Run 4 time: 991
Run 5 time: 862
Run 6 time: 902
Run 7 time: 1277
Run 8 time: 774
Run 9 time: 771
Average time is: 900 ms

Multiset
Run 0 time: 2235
Run 1 time: 1811
Run 2 time: 1755
Run 3 time: 1535
Run 4 time: 1475
Run 5 time: 1388
Run 6 time: 1482
Run 7 time: 1431
Run 8 time: 1347
Run 9 time: 1347
Average time is: 1580 ms

Multimap
Run 0 time: 2197
Run 1 time: 1885
Run 2 time: 1725
Run 3 time: 1671
Run 4 time: 1500
Run 5 time: 1403
Run 6 time: 1411
Run 7 time: 1420
Run 8 time: 1409
Run 9 time: 1362
Average time is: 1598 ms
Run Code Online (Sandbox Code Playgroud)

如您所见,嗯,multiset它的性能与之相同,multimap并且priority_queue是最快的(大约快43%)。那为什么会这样呢?

让我们从priority_queueC ++标准开始,它不会告诉我们如何实现一个或另一个容器或结构,但是在大多数情况下,它是基于二进制堆的(寻找msvc和gcc实现)!如果priority_queue除了top之外您无权访问其他任何元素,则无法遍历它们,按索引获取甚至获取最后一个元素(这会留出一些空间进行优化)。二进制堆的平均插入值为O(1),只有最坏的情况是O(log n),删除是O(log n),因为我们从底部开始获取元素,然后搜索下一个高优先级。

multimapmultiset。它们通常都在红黑二叉树上实现(寻找msvc和gcc实现),其中平均插入为O(log n),删除均为O(log n)。

从这个角度来看,priority_queue 决不multiset或慢multimap。因此,回到您的问题,multiset因为优先级队列并不priority_queue其自身快。可能有很多原因,包括priority_queue在旧编译器上的原始实现或该结构的错误使用(问题不包含最小的可行示例),除了作者未提及编译标志或编译器版本之外,有时优化还会做出重大更改。

@no ????? z上的更新1 请求

不幸的是,我现在无法访问linux环境,但是我已经安装了mingw-w64,版本信息:g ++。exe(x86_64-posix-seh,由Strawberryperl.com项目构建)8.3.0。使用的处理器与visual studio相同:处理器Intel(R)CoreTM i7-8550U CPU @ 1.80GHz,2001 Mhz,4个Core,8个逻辑处理器。

因此,结果为g++ -O2

Priority queue
Run 0 time: 775
Run 1 time: 995
Run 2 time: 901
Run 3 time: 807
Run 4 time: 930
Run 5 time: 765
Run 6 time: 799
Run 7 time: 1151
Run 8 time: 760
Run 9 time: 780
Average time is: 866 ms

Multiset
Run 0 time: 2280
Run 1 time: 1942
Run 2 time: 1607
Run 3 time: 1344
Run 4 time: 1319
Run 5 time: 1210
Run 6 time: 1129
Run 7 time: 1156
Run 8 time: 1244
Run 9 time: 992
Average time is: 1422 ms

Multimap
Run 0 time: 2530
Run 1 time: 1958
Run 2 time: 1670
Run 3 time: 1390
Run 4 time: 1391
Run 5 time: 1235
Run 6 time: 1088
Run 7 time: 1198
Run 8 time: 1071
Run 9 time: 963
Average time is: 1449 ms
Run Code Online (Sandbox Code Playgroud)

您可能会注意到它与msvc几乎相同。

更新2感谢@JorgeBellon

一个quick-bench.com在线基准测试链接,请您自己检查!

希望看到我的帖子有任何补充,干杯!


Vit*_*con 3

priority_queue据我了解,执行是罪魁祸首。priority_queue被实现(在下面)作为专门的vectordeque. 因为priority_queue需要有随机访问迭代器。当您将项目弹出或推送到 时priority_queue,队列中的剩余项目需要复制到空白空间,插入也会发生同样的情况。multi_set是基于键的。

编辑:非常抱歉,multi_set 不是基于哈希键。由于某种原因,我将它与 multi_map 混淆了。但是 multi_set 是一个多重排序的关联容器,它基于键存储元素,与值相同。由于元素在 a 中的存储方式multi_set

...有一个重要的属性,即向 a 中插入新元素 multi_set不会使指向现有元素的迭代器无效。从 a 中删除元素 multi_set也不会导致任何迭代器无效,当然,实际指向正在删除的元素的迭代器除外。

- 引自SGI 文档

这意味着 a 的存储multi_set不是线性的,因此性能增益也不是线性的。

  • `multisets` 不是基于哈希表 `unordered_multisets` 是 (4认同)
  • @BradTilley:您确实意识到这是在 C++11 获得批准之前编写的,对吧?如果这是您的主要动机,您能取消您的反对票吗?我已经从我的错误中吸取了教训:) 这个答案是在 2011 年 5 月,而 C++11 是在 2011 年 8 月批准的。谢谢。 (2认同)