用于统一块集合中的连续块的算法

Vit*_*meo 7 c++ memory sorting algorithm c++11

我正在创建一个具有动态内存块大小的预分配器,我需要统一连续的内存块.

struct Chunk // Chunk of memory
{
    Ptr begin, end; // [begin, end) range
}

struct PreAlloc
{
    std::vector<Chunk> chunks; // I need to unify contiguous chunks here
    ...
}
Run Code Online (Sandbox Code Playgroud)

我尝试了一个天真的解决方案,在根据它们对块进行排序后begin,基本上通过向量检查是否下一个块begin是否等于当前块end.我相信它可以改进.

是否有一个很好的算法来统一连续的范围

信息:

  • 大块永远不会"重叠".
  • 块可以具有大于0的任何大小.
  • 性能是最重要的因素.

Dan*_*lKO 8

注意:我的原始算法出错,我只考虑当前块左侧的块.

使用两个关联表(例如unordered_map),一个映射begin地址到Chunk另一个映射endChunk.这使您可以快速找到相邻的块.或者,您可以更改Chunk结构以将指针/ id /任何内容存储到邻居Chunk,并添加标记以标记它是否空闲.

该算法包括扫描块的向量一次,同时保持不变量:如果左边有一个邻居,则合并它们; 如果右边有邻居,则合并它们.最后,只收集剩余的块.

这是代码:

void unify(vector<Chunk>& chunks)
{
    unordered_map<Ptr, Chunk> begins(chunks.size() * 1.25); // tweak this
    unordered_map<Ptr, Chunk> ends(chunks.size() * 1.25); // tweak this

    for (Chunk c : chunks) {        
        // check the left
        auto left = ends.find(c.begin);
        if (left != ends.end()) { // found something to the left
            Chunk neighbour = left->second;
            c.begin = neighbour.begin;
            begins.erase(neighbour.begin);
            ends.erase(left);
        }
        // check the right
        auto right = begins.find(c.end);
        if (right != begins.end()) { // found something to the right
            Chunk neighbour = right->second;
            c.end = neighbour.end;
            begins.erase(right);
            ends.erase(neighbour.end);
        }

        begins[c.begin] = c;
        ends[c.end] = c;
    }
    chunks.clear();
    for (auto x : begins)
        chunks.push_back(x.second);
}
Run Code Online (Sandbox Code Playgroud)

该算法具有O(n)复杂性,假设对表beginsends表的持续时间访问(如果不触发重新散列,则几乎就是你得到的,因此"调整此"注释).实现关联表有很多选项,请务必尝试一些不同的选择; 正如Ben Jackson的评论所指出的,哈希表并不总能很好地利用缓存,因此即使是带有二进制搜索的排序向量也可能更快.

如果您可以更改Chunk结构以存储左/右指针,则可以获得有保证的O(1)查找/插入/删除.假设您这样做是为了整合空闲的内存块,可以O(1)free()通话期间进行左/右检查,因此之后无需合并它.


zar*_*are 3

这似乎是一个有趣的问题,所以我投入了一些时间。你所采取的方法绝非天真。事实上它已经取得了相当不错的效果。但它绝对可以进一步优化。我假设块列表尚未排序,因为那时您的算法可能是最佳的。为了优化它,我的方法是优化排序本身,消除排序过程中可以组合的块,从而使剩余元素的排序更快。下面的代码基本上是冒泡排序的修改版本。我还使用 std::sort 实现了您的解决方案,只是为了进行比较。使用我的结果也出奇的好。对于包含 1000 万块的数据集,组合排序与块合并的执行速度提高了 20 倍。代码的输出是(algo1 是 std::sort 后合并连续元素,algo 2 是通过删除可合并的块进行优化的排序):

generating input took:  00:00:19.655999
algo 1 took 00:00:00.968738
  initial chunks count: 10000000, output chunks count: 3332578
algo 2 took 00:00:00.046875
  initial chunks count: 10000000, output chunks count: 3332578
Run Code Online (Sandbox Code Playgroud)

您可能可以使用更好的排序算法(如 introsort)进一步改进它。

完整代码:

#include <vector>
#include <map>
#include <set>
#include <iostream>
#include <boost\date_time.hpp>

#define CHUNK_COUNT 10000000

struct Chunk // Chunk of memory
{
    char *begin, *end; // [begin, end) range

    bool operator<(const Chunk& rhs) const
    {
        return begin < rhs.begin;
    }
};

std::vector<Chunk> in;

void generate_input_data()
{
    std::multimap<int, Chunk> input_data;
    Chunk chunk;
    chunk.begin = 0;
    chunk.end = 0;
    for (int i = 0; i < CHUNK_COUNT; ++i)
    {
        int continuous = rand() % 3; // 66% chance of a chunk being continuous
        if (continuous)
            chunk.begin = chunk.end;
        else
            chunk.begin = chunk.end + rand() % 100 + 1;
        int chunk_size = rand() % 100 + 1;
        chunk.end = chunk.begin + chunk_size;
        input_data.insert(std::multimap<int, Chunk>::value_type(rand(), chunk));
    }
    // now we have the chunks randomly ordered in the map
    // will copy them in the input vector
    for (std::multimap<int, Chunk>::const_iterator it = input_data.begin(); it != input_data.end(); ++it)
        in.push_back(it->second);
}

void merge_chunks_sorted(std::vector<Chunk>& chunks)
{
    if (in.empty())
        return;
    std::vector<Chunk> res;
    Chunk ch = in[0];
    for (size_t i = 1; i < in.size(); ++i)
    {
        if (in[i].begin == ch.end)
        {
            ch.end = in[i].end;
        } else
        {
            res.push_back(ch);
            ch = in[i];
        }
    }
    res.push_back(ch);
    chunks = res;
}

void merge_chunks_orig_algo(std::vector<Chunk>& chunks)
{
    std::sort(in.begin(), in.end());
    merge_chunks_sorted(chunks);
}

void merge_chunks_new_algo(std::vector<Chunk>& chunks)
{
    size_t new_last_n = 0;
    Chunk temp;
    do {
        int last_n = new_last_n;
        new_last_n = chunks.size() - 1;
        for (int i = chunks.size() - 2; i >= last_n; --i)
        {
            if (chunks[i].begin > chunks[i + 1].begin)
            {
                if (chunks[i].begin == chunks[i + 1].end)
                {
                    chunks[i].begin = chunks[i + 1].begin;
                    if (i + 1 != chunks.size() - 1)
                        chunks[i + 1] = chunks[chunks.size() - 1];
                    chunks.pop_back();
                } else
                {
                    temp = chunks[i];
                    chunks[i] = chunks[i + 1];
                    chunks[i + 1] = temp;
                }
                new_last_n = i + 1;
            } else
            {
                if (chunks[i].end == chunks[i + 1].begin)
                {
                    chunks[i].end = chunks[i + 1].end;
                    if (i + 1 != chunks.size() - 1)
                        chunks[i + 1] = chunks[chunks.size() - 1];
                    chunks.pop_back();
                }
            }
        }
    } while (new_last_n < chunks.size() - 1);
}

void run_algo(void (*algo)(std::vector<Chunk>&))
{
    static int count = 1;
    // allowing the algo to modify the input vector is intentional
    std::vector<Chunk> chunks = in;
    size_t in_count = chunks.size();
    boost::posix_time::ptime start = boost::posix_time::microsec_clock::local_time();
    algo(chunks);
    boost::posix_time::ptime stop = boost::posix_time::microsec_clock::local_time();
    std::cout<<"algo "<<count++<<" took "<<stop - start<<std::endl;
    // if all went ok, statistically we should have around 33% of the original chunks count in the output vector
    std::cout<<"  initial chunks count: "<<in_count<<", output chunks count: "<<chunks.size()<<std::endl;
}

int main()
{
    boost::posix_time::ptime start = boost::posix_time::microsec_clock::local_time();
    generate_input_data();
    boost::posix_time::ptime stop = boost::posix_time::microsec_clock::local_time();
    std::cout<<"generating input took:\t"<<stop - start<<std::endl;

    run_algo(merge_chunks_orig_algo);
    run_algo(merge_chunks_new_algo);

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

我在下面看到你提到 n 没有那么高。所以我用 1000 个块重新运行测试,运行 1000000 次,以使时间变得有意义。修改后的冒泡排序的性能仍然提高了 5 倍。基本上,对于 1000 个块,总运行时间为 3 微秒。下面的数字。

generating input took:  00:00:00
algo 1 took 00:00:15.343456, for 1000000 runs
  initial chunks count: 1000, output chunks count: 355
algo 2 took 00:00:03.374935, for 1000000 runs
  initial chunks count: 1000, output chunks count: 355
Run Code Online (Sandbox Code Playgroud)