多数元素 - 数组的一部分

Krz*_*fik 32 c++ arrays algorithm element c++11

我有一个数组,填充整数.我的工作是快速找到数组的任何部分的多数元素,我需要这样做... 记录时间,而不是线性,但事先我可能需要一些时间来准备数组.

例如:

1 5 2 7 7 7 8 4 6
Run Code Online (Sandbox Code Playgroud)

和查询:

[4, 7] 回报 7

[4, 8] 回报 7

[1, 2]返回0(没有多数元素),依此类推......

我需要为每个查询找到答案,如果可能的话,它需要快速执行.

为了准备,我可以使用O(n log n)时间

Evg*_*uev 16

O(log n) queries and O(n log n) preprocessing/space could be achieved by finding and using majority intervals with following properties:

  1. For each value from input array there may be one or several majority intervals (or there may be none if elements with these values are too sparse; we don't need majority intervals of length 1 because they may be useful only for query intervals of size 1 which are better handled as a special case).
  2. If query interval lies completely inside one of these majority intervals, corresponding value may be the majority element of this query interval.
  3. 如果没有完全包含查询间隔的多数间隔,则相应的 不能是此查询间隔的主要元素.
  4. 输入数组的每个元素都由O(log n)多数间隔覆盖.

换句话说,多数区间的唯一目的是为任何查询区间提供O(log n)多数元素候选.

该算法使用以下数据结构:

  1. 输入数组()中每个的位置列表map<Value, vector<Position>>.或者unordered_map可以在这里用来提高性能(但是我们需要提取所有键并对它们进行排序,以便按正确的顺序填充结构#3).
  2. 每个多数间隔列表().vector<Interval>
  3. 用于处理查询的数据结构(vector<small_map<Value, Data>>).其中Data包含来自结构#1的适当向量的两个索引,指向具有给定的元素的下一个/前一个位置.更新:感谢@justhalf,最好存储Data具有给定的元素的累积频率.small_map可以实现为对的排序向量 - 预处理将按排序顺序附加元素,查询将small_map仅用于线性搜索.

预处理:

  1. 扫描输入数组并将当前位置推送到结构#1中的适当向量.
  2. 对结构#1中的每个向量执行步骤3 .. 4.
  3. Transform list of positions into list of majority intervals. See details below.
  4. For each index of input array covered by one of majority intervals, insert data to appropriate element of structure #3: value and positions of previous/next elements with this value (or cumulative frequency of this value).

Query:

  1. If query interval length is 1, return corresponding element of source array.
  2. For starting point of query interval get corresponding element of 3rd structure's vector. For each element of the map perform step 3. Scan all elements of the map corresponding to ending point of query interval in parallel with this map to allow O(1) complexity for step 3 (instead of O(log log n)).
  3. 如果对应于查询间隔结束点的地图包含匹配,则计算s3[stop][value].prev - s3[start][value].next + 1.如果它大于查询间隔的一半,则返回.如果使用累积频率而不是下一个/上一个索引,请s3[stop+1][value].freq - s3[start][value].freq改为计算.
  4. 如果在步骤3中找不到任何内容,则返回"Nothing".

算法的主要部分是从位置列表中获取多数间隔:

  1. 为列表中的每个位置指定权重:number_of_matching_values_to_the_left - number_of_nonmatching_values_to_the_left.
  2. 仅按严格降序(贪婪)过滤权重到"前缀"数组:for (auto x: positions) if (x < prefix.back()) prefix.push_back(x);.
  3. Filter only weights in strictly increasing order (greedily, backwards) to the "suffix" array: reverse(positions); for (auto x: positions) if (x > suffix.back()) suffix.push_back(x);.
  4. Scan "prefix" and "suffix" arrays together and find intervals from every "prefix" element to corresponding place in "suffix" array and from every "suffix" element to corresponding place in "prefix" array. (If all "suffix" elements' weights are less than given "prefix" element or their position is not to the right of it, no interval generated; if there is no "suffix" element with exactly the weight of given "prefix" element, get nearest "suffix" element with larger weight and extend interval with this weight difference to the right).
  5. Merge overlapping intervals.

此算法保证了多数间隔的属性1 .. 3 .对于属性#4,我能想象的唯一方法是覆盖一些具有最大多数间隔的元素,如下所示:11111111222233455666677777777.这里的元素42 * log n间隔覆盖,因此这个属性似乎得到满足.在本文末尾查看此属性的更多正式证明.

例:

对于输入数组"0 1 2 0 0 1 1 0",将生成以下位置列表:

value  positions
    0  0 3 4 7
    1  1 5 6
    2  2
Run Code Online (Sandbox Code Playgroud)

值的位置0将获得以下属性:

weights:    0:1 3:0 4:1 7:0
prefix:     0:1 3:0          (strictly decreasing)
suffix:     4:1 7:0          (strictly increasing when scanning backwards)
intervals:  0->4 3->7 4->0 7->3
merged intervals: 0-7
Run Code Online (Sandbox Code Playgroud)

值的位置1将获得以下属性:

weights:    1:0  5:-2  6:-1
prefix:     1:0  5:-2
suffix:     1:0  6:-1
intervals:  1->none 5->6+1 6->5-1 1->none
merged intervals: 4-7
Run Code Online (Sandbox Code Playgroud)

查询数据结构:

positions value next prev
        0     0    0    x
     1..2     0    1    0
        3     0    1    1
        4     0    2    2
        4     1    1    x
        5     0    3    2
    ...
Run Code Online (Sandbox Code Playgroud)

查询[0,4]:

prev[4][0]-next[0][0]+1=2-0+1=3
query size=5
3>2.5, returned result 0
Run Code Online (Sandbox Code Playgroud)

查询[2,5]:

prev[5][0]-next[2][0]+1=2-1+1=2
query size=4
2=2, returned result "none"
Run Code Online (Sandbox Code Playgroud)

请注意,没有尝试检查元素"1",因为其多数间隔不包括这些间隔中的任何一个.

财产证明#4:

Majority intervals are constructed in such a way that strictly more than 1/3 of all their elements have corresponding value. This ratio is nearest to 1/3 for sub-arrays like any*(m-1) value*m any*m, for example, 01234444456789.

To make this proof more obvious, we could represent each interval as a point in 2D: every possible starting point represented by horizontal axis and every possible ending point represented by vertical axis (see diagram below).

enter image description here

All valid intervals are located on or above diagonal. White rectangle represents all intervals covering some array element (represented as unit-size interval on its lower right corner).

Let's cover this white rectangle with squares of size 1, 2, 4, 8, 16, ... sharing the same lower right corner. This divides white area into O(log n) areas similar to yellow one (and single square of size 1 containing single interval of size 1 which is ignored by this algorithm).

Let's count how many majority intervals may be placed into yellow area. One interval (located at the nearest to diagonal corner) occupies 1/4 of elements belonging to interval at the farthest from diagonal corner (and this largest interval contains all elements belonging to any interval in yellow area). This means that smallest interval contains strictly more than 1/12 values available for whole yellow area. So if we try to place 12 intervals to yellow area, we have not enough elements for different values. So yellow area cannot contain more than 11 majority intervals. And white rectangle cannot contain more than 11 * log n majority intervals. Proof completed.

11 * log n is overestimation. As I said earlier, it's hard to imagine more than 2 * log n majority intervals covering some element. And even this value is much greater than average number of covering majority intervals.

C++11 implementation. See it either at ideone or here:

#include <iostream>
#include <vector>
#include <map>
#include <algorithm>
#include <functional>
#include <random>

constexpr int SrcSize = 1000000;
constexpr int NQueries = 100000;

using src_vec_t = std::vector<int>;
using index_vec_t = std::vector<int>;
using weight_vec_t = std::vector<int>;
using pair_vec_t = std::vector<std::pair<int, int>>;
using index_map_t = std::map<int, index_vec_t>;
using interval_t = std::pair<int, int>;
using interval_vec_t = std::vector<interval_t>;
using small_map_t = std::vector<std::pair<int, int>>;
using query_vec_t = std::vector<small_map_t>;

constexpr int None = -1;
constexpr int Junk = -2;

src_vec_t generate_e()
{ // good query length = 3
    src_vec_t src;
    std::random_device rd;
    std::default_random_engine eng{rd()};
    auto exp = std::bind(std::exponential_distribution<>{0.4}, eng);

    for (int i = 0; i < SrcSize; ++i)
    {
        int x = exp();
        src.push_back(x);
        //std::cout << x << ' ';
    }

    return src;
}

src_vec_t generate_ep()
{ // good query length = 500
    src_vec_t src;
    std::random_device rd;
    std::default_random_engine eng{rd()};
    auto exp = std::bind(std::exponential_distribution<>{0.4}, eng);
    auto poisson = std::bind(std::poisson_distribution<int>{100}, eng);

    while (int(src.size()) < SrcSize)
    {
        int x = exp();
        int n = poisson();

        for (int i = 0; i < n; ++i)
        {
            src.push_back(x);
            //std::cout << x << ' ';
        }
    }

    return src;
}

src_vec_t generate()
{
    //return generate_e();
    return generate_ep();
}

int trivial(const src_vec_t& src, interval_t qi)
{
    int count = 0;
    int majorityElement = 0; // will be assigned before use for valid args

    for (int i = qi.first; i <= qi.second; ++i)
    {
        if (count == 0)
            majorityElement = src[i];

        if (src[i] == majorityElement) 
           ++count;
        else 
           --count;
    }

    count = 0;
    for (int i = qi.first; i <= qi.second; ++i)
    {
        if (src[i] == majorityElement)
            count++;
    }

    if (2 * count > qi.second + 1 - qi.first)
        return majorityElement;
    else
        return None;
}

index_map_t sort_ind(const src_vec_t& src)
{
    int ind = 0;
    index_map_t im;

    for (auto x: src)
        im[x].push_back(ind++);

    return im;
}

weight_vec_t get_weights(const index_vec_t& indexes)
{
    weight_vec_t weights;

    for (int i = 0; i != int(indexes.size()); ++i)
        weights.push_back(2 * i - indexes[i]);

    return weights;
}

pair_vec_t get_prefix(const index_vec_t& indexes, const weight_vec_t& weights)
{
    pair_vec_t prefix;

    for (int i = 0; i != int(indexes.size()); ++i)
        if (prefix.empty() || weights[i] < prefix.back().second)
            prefix.emplace_back(indexes[i], weights[i]);

    return prefix;
}

pair_vec_t get_suffix(const index_vec_t& indexes, const weight_vec_t& weights)
{
    pair_vec_t suffix;

    for (int i = indexes.size() - 1; i >= 0; --i)
        if (suffix.empty() || weights[i] > suffix.back().second)
            suffix.emplace_back(indexes[i], weights[i]);

    std::reverse(suffix.begin(), suffix.end());
    return suffix;
}

interval_vec_t get_intervals(const pair_vec_t& prefix, const pair_vec_t& suffix)
{
    interval_vec_t intervals;
    int prev_suffix_index = 0; // will be assigned before use for correct args
    int prev_suffix_weight = 0; // same assumptions

    for (int ind_pref = 0, ind_suff = 0; ind_pref != int(prefix.size());)
    {
        auto i_pref = prefix[ind_pref].first;
        auto w_pref = prefix[ind_pref].second;

        if (ind_suff != int(suffix.size()))
        {
            auto i_suff = suffix[ind_suff].first;
            auto w_suff = suffix[ind_suff].second;

            if (w_pref <= w_suff)
            {
                auto beg = std::max(0, i_pref + w_pref - w_suff);

                if (i_pref < i_suff)
                    intervals.emplace_back(beg, i_suff + 1);

                if (w_pref == w_suff)
                    ++ind_pref;

                ++ind_suff;
                prev_suffix_index = i_suff;
                prev_suffix_weight = w_suff;
                continue;
            }
        }

        // ind_suff out of bounds or w_pref > w_suff:
        auto end = prev_suffix_index + prev_suffix_weight - w_pref + 1;
        // end may be out-of-bounds; that's OK if overflow is not possible
        intervals.emplace_back(i_pref, end);
        ++ind_pref;
    }

    return intervals;
}

interval_vec_t merge(const interval_vec_t& from)
{
    using endpoints_t = std::vector<std::pair<int, bool>>;
    endpoints_t ep(2 * from.size());

    std::transform(from.begin(), from.end(), ep.begin(),
                   [](interval_t x){ return std::make_pair(x.first, true); });

    std::transform(from.begin(), from.end(), ep.begin() + from.size(),
                   [](interval_t x){ return std::make_pair(x.second, false); });

    std::sort(ep.begin(), ep.end());

    interval_vec_t to;
    int start; // will be assigned before use for correct args
    int overlaps = 0;

    for (auto& x: ep)
    {
        if (x.second) // begin
        {
            if (overlaps++ == 0)
                start = x.first;
        }
        else // end
        {
            if (--overlaps == 0)
                to.emplace_back(start, x.first);
        }
    }

    return to;
}

interval_vec_t get_intervals(const index_vec_t& indexes)
{
    auto weights = get_weights(indexes);
    auto prefix = get_prefix(indexes, weights);
    auto suffix = get_suffix(indexes, weights);
    auto intervals = get_intervals(prefix, suffix);
    return merge(intervals);
}

void update_qv(
    query_vec_t& qv,
    int value,
    const interval_vec_t& intervals,
    const index_vec_t& iv)
{
    int iv_ind = 0;
    int qv_ind = 0;
    int accum = 0;

    for (auto& interval: intervals)
    {
        int i_begin = interval.first;
        int i_end = std::min<int>(interval.second, qv.size() - 1);

        while (iv[iv_ind] < i_begin)
        {
            ++accum;
            ++iv_ind;
        }

        qv_ind = std::max(qv_ind, i_begin);

        while (qv_ind <= i_end)
        {
            qv[qv_ind].emplace_back(value, accum);

            if (iv[iv_ind] == qv_ind)
            {
                ++accum;
                ++iv_ind;
            }

            ++qv_ind;
        }
    }
}

void print_preprocess_stat(const index_map_t& im, const query_vec_t& qv)
{
    double sum_coverage = 0.;
    int max_coverage = 0;

    for (auto& x: qv)
    {
        sum_coverage += x.size();
        max_coverage = std::max<int>(max_coverage, x.size());
    }

    std::cout << "             size = " << qv.size() - 1 << '\n';
    std::cout << "           values = " << im.size() << '\n';
    std::cout << "     max coverage = " << max_coverage << '\n';
    std::cout << "     avg coverage = " << sum_coverage / qv.size() << '\n';
}

query_vec_t preprocess(const src_vec_t& src)
{
    query_vec_t qv(src.size() + 1);
    auto im = sort_ind(src);

    for (auto& val: im)
    {
        auto intervals = get_intervals(val.second);
        update_qv(qv, val.first, intervals, val.second);
    }

    print_preprocess_stat(im, qv);
    return qv;
}

int do_query(const src_vec_t& src, const query_vec_t& qv, interval_t qi)
{
    if (qi.first == qi.second)
        return src[qi.first];

    auto b = qv[qi.first].begin();
    auto e = qv[qi.second + 1].begin();

    while (b != qv[qi.first].end() && e != qv[qi.second + 1].end())
    {
        if (b->first < e->first)
        {
            ++b;
        }
        else if (e->first < b->first)
        {
            ++e;
        }
        else // if (e->first == b->first)
        {
            // hope this doesn't overflow
            if (2 * (e->second - b->second) > qi.second + 1 - qi.first)
                return b->first;

            ++b;
            ++e;
        }
    }

    return None;
}

int main()
{
    std::random_device rd;
    std::default_random_engine eng{rd()};
    auto poisson = std::bind(std::poisson_distribution<int>{500}, eng);
    int majority = 0;
    int nonzero = 0;
    int failed = 0;

    auto src = generate();
    auto qv = preprocess(src);

    for (int i = 0; i < NQueries; ++i)
    {
        int size = poisson();
        auto ud = std::uniform_int_distribution<int>(0, src.size() - size - 1);
        int start = ud(eng);
        int stop = start + size;
        auto res1 = do_query(src, qv, {start, stop});
        auto res2 = trivial(src, {start, stop});
        //std::cout << size << ": " << res1 << ' ' << res2 << '\n';

        if (res2 != res1)
            ++failed;

        if (res2 != None)
        {
            ++majority;

            if (res2 != 0)
                ++nonzero;
        }
    }

    std::cout << "majority elements = " << 100. * majority / NQueries << "%\n";
    std::cout << " nonzero elements = " << 100. * nonzero / NQueries << "%\n";
    std::cout << "          queries = " << NQueries << '\n';
    std::cout << "           failed = " << failed << '\n';

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

Related work:

As pointed in other answer to this question, there is other work where this problem is already solved: "Range majority in constant time and linear space" by S. Durocher, M. He, I Munro, P.K. Nicholson, M. Skala.

Algorithm presented in this paper has better asymptotic complexities for query time: O(1) instead of O(log n) and for space: O(n) instead of O(n log n).

Better space complexity allows this algorithm to process larger data sets (comparing to the algorithm proposed in this answer). Less memory needed for preprocessed data and more regular data access pattern, most likely, allow this algorithm to preprocess data more quickly. But it is not so easy with query time...

Let's suppose we have input data most favorable to algorithm from the paper: n=1000000000 (it's hard to imagine a system with more than 10..30 gigabytes of memory, in year 2013).

Algorithm proposed in this answer needs to process up to 120 (or 2 query boundaries*2*log n) elements for each query. But it performs very simple operations, similar to linear search. And it sequentially accesses two contiguous memory areas, so it is cache-friendly.

Algorithm from the paper needs to perform up to 20 operations (or 2 query boundaries*5 candidates*2 wavelet tree levels) for each query. This is 6 times less. But each operation is more complex. Each query for succinct representation of bit counters itself contains a linear search (which means 20 linear searches instead of one). Worst of all, each such operation should access several independent memory areas (unless query size and therefore quadruple size is very small), so query is cache-unfriendly. Which means each query (while is a constant-time operation) is pretty slow, probably slower than in algorithm proposed here. If we decrease input array size, increased are the chances that proposed here algorithm is quicker.

Practical disadvantage of algorithm in the paper is wavelet tree and succinct bit counter implementation. Implementing them from scratch may be pretty time consuming. Using a pre-existing implementation is not always convenient.


flu*_*lup 9

诀窍

在查找多数元素时,您可以丢弃没有多数元素的区间.请参见查找数组中的多数元素.这使您可以非常简单地解决这个问题.

制备

在准备时,递归地将数组分成两半并将这些数组间隔存储在二叉树中.对于每个节点,计算数组间隔中每个元素的出现次数.您需要一个提供O(1)插入和读取的数据结构.我建议使用unsorted_multiset,它平均按需要运行(但最坏情况插入是线性的).还要检查间隔是否具有多数元素,如果存在则存储它.

运行

在运行时,当要求计算范围的多数元素时,请深入到树中以计算完全覆盖给定范围的区间集.使用技巧来组合这些间隔.

如果我们有数组间隔7 5 5 7 7 7,使用多数元素7,我们可以拆分并丢弃,5 5 7 7因为它没有多数元素.实际上五个人已经吞并了两个七人组.剩下的是一个数组7 7,或者2x7.拨打这个号码2多数计数大部分元素7:

数组间隔的多数元素的多数计数是多数元素的出现次数减去所有其他元素的组合出现次数.

使用以下规则组合间隔以查找潜在的多数元素:

  • 丢弃没有多数元素的区间
  • 将两个数组与相同的多数元素组合起来很容易,只需将元素的大多数计算加起来.2x73x7成为5x7
  • 当两个阵列组合使用不同的多数元素时,较高的多数计数会获胜.从较高的数字中减去较低的多数计数,以找到最终的多数计数.3x72x3成为1x7.
  • 如果它们的多数元素不同但具有相等的多数计数,则忽略两个数组.3x73x5取消对方.

当所有间隔都被丢弃或组合时,你要么没有任何东西,在这种情况下没有多数元素.或者您有一个包含潜在多数元素的组合区间.查找并在所有数组间隔(也包括先前丢弃的数据间隔)中添加此元素的出现次数,以检查它是否确实是多数元素.

对于数组1,1,1,2,2,3,3,2,2,2,3,2,2,您获得树(括号中列出的多数计数x多数元素)

                        1,1,1,2,2,3,3,2,2,2,3,2,2    
                                  (1x2)
                      /                           \
             1,1,1,2,2,3,3                       2,2,2,3,2,2
                                                    (4x2)
            /              \                   /            \
        1,1,1,2           2,3,3            2,2,2             3,2,2
         (2x1)            (1x3)            (3x2)             (1x2)
        /     \          /    \            /    \            /    \
     1,1      1,2       2,3     3        2,2     2        3,2      2
    (1x1)                     (1x3)     (2x2)  (1x2)             (1x2)
    /   \     /  \     /   \            /  \             /   \
   1     1   1   2    2    3           2    2           3     2
(1x1) (1x1)(1x1)(1x2)(1x2)(1x3)       (1x2)(1x2)       (1x3) (1x2)     
Run Code Online (Sandbox Code Playgroud)

范围[5,10](1索引)由间隔2,3,3(1x3),2,2,2(3x2)的集合覆盖.他们有不同的多数元素.减去他们的多数计数,你剩下2x2.所以2是潜在的多数元素.查找并总结数组中实际出现次数2:1 + 3 = 4中的4次.2是多数元素.

范围[1,10]由一组区间1,1,1,2,2,3,3(无多数元素)和2,2,2(3x2)覆盖.忽略第一个区间,因为它没有多数元素,所以2是潜在的多数元素.在所有区间中对出现次数2求和:2 + 3 = 10中的5次.没有多数元素.