Vit*_*meo 7 c++ memory sorting algorithm c++11
我正在创建一个具有动态内存块大小的预分配器,我需要统一连续的内存块.
struct Chunk // Chunk of memory
{
Ptr begin, end; // [begin, end) range
}
struct PreAlloc
{
std::vector<Chunk> chunks; // I need to unify contiguous chunks here
...
}
Run Code Online (Sandbox Code Playgroud)
我尝试了一个天真的解决方案,在根据它们对块进行排序后begin,基本上通过向量检查是否下一个块begin是否等于当前块end.我相信它可以改进.
是否有一个很好的算法来统一连续的范围?
信息:
注意:我的原始算法出错,我只考虑当前块左侧的块.
使用两个关联表(例如unordered_map),一个映射begin地址到Chunk另一个映射end到Chunk.这使您可以快速找到相邻的块.或者,您可以更改Chunk结构以将指针/ id /任何内容存储到邻居Chunk,并添加标记以标记它是否空闲.
该算法包括扫描块的向量一次,同时保持不变量:如果左边有一个邻居,则合并它们; 如果右边有邻居,则合并它们.最后,只收集剩余的块.
这是代码:
void unify(vector<Chunk>& chunks)
{
unordered_map<Ptr, Chunk> begins(chunks.size() * 1.25); // tweak this
unordered_map<Ptr, Chunk> ends(chunks.size() * 1.25); // tweak this
for (Chunk c : chunks) {
// check the left
auto left = ends.find(c.begin);
if (left != ends.end()) { // found something to the left
Chunk neighbour = left->second;
c.begin = neighbour.begin;
begins.erase(neighbour.begin);
ends.erase(left);
}
// check the right
auto right = begins.find(c.end);
if (right != begins.end()) { // found something to the right
Chunk neighbour = right->second;
c.end = neighbour.end;
begins.erase(right);
ends.erase(neighbour.end);
}
begins[c.begin] = c;
ends[c.end] = c;
}
chunks.clear();
for (auto x : begins)
chunks.push_back(x.second);
}
Run Code Online (Sandbox Code Playgroud)
该算法具有O(n)复杂性,假设对表begins和ends表的持续时间访问(如果不触发重新散列,则几乎就是你得到的,因此"调整此"注释).实现关联表有很多选项,请务必尝试一些不同的选择; 正如Ben Jackson的评论所指出的,哈希表并不总能很好地利用缓存,因此即使是带有二进制搜索的排序向量也可能更快.
如果您可以更改Chunk结构以存储左/右指针,则可以获得有保证的O(1)查找/插入/删除.假设您这样做是为了整合空闲的内存块,可以O(1)在free()通话期间进行左/右检查,因此之后无需合并它.
这似乎是一个有趣的问题,所以我投入了一些时间。你所采取的方法绝非天真。事实上它已经取得了相当不错的效果。但它绝对可以进一步优化。我假设块列表尚未排序,因为那时您的算法可能是最佳的。为了优化它,我的方法是优化排序本身,消除排序过程中可以组合的块,从而使剩余元素的排序更快。下面的代码基本上是冒泡排序的修改版本。我还使用 std::sort 实现了您的解决方案,只是为了进行比较。使用我的结果也出奇的好。对于包含 1000 万块的数据集,组合排序与块合并的执行速度提高了 20 倍。代码的输出是(algo1 是 std::sort 后合并连续元素,algo 2 是通过删除可合并的块进行优化的排序):
generating input took: 00:00:19.655999
algo 1 took 00:00:00.968738
initial chunks count: 10000000, output chunks count: 3332578
algo 2 took 00:00:00.046875
initial chunks count: 10000000, output chunks count: 3332578
Run Code Online (Sandbox Code Playgroud)
您可能可以使用更好的排序算法(如 introsort)进一步改进它。
完整代码:
#include <vector>
#include <map>
#include <set>
#include <iostream>
#include <boost\date_time.hpp>
#define CHUNK_COUNT 10000000
struct Chunk // Chunk of memory
{
char *begin, *end; // [begin, end) range
bool operator<(const Chunk& rhs) const
{
return begin < rhs.begin;
}
};
std::vector<Chunk> in;
void generate_input_data()
{
std::multimap<int, Chunk> input_data;
Chunk chunk;
chunk.begin = 0;
chunk.end = 0;
for (int i = 0; i < CHUNK_COUNT; ++i)
{
int continuous = rand() % 3; // 66% chance of a chunk being continuous
if (continuous)
chunk.begin = chunk.end;
else
chunk.begin = chunk.end + rand() % 100 + 1;
int chunk_size = rand() % 100 + 1;
chunk.end = chunk.begin + chunk_size;
input_data.insert(std::multimap<int, Chunk>::value_type(rand(), chunk));
}
// now we have the chunks randomly ordered in the map
// will copy them in the input vector
for (std::multimap<int, Chunk>::const_iterator it = input_data.begin(); it != input_data.end(); ++it)
in.push_back(it->second);
}
void merge_chunks_sorted(std::vector<Chunk>& chunks)
{
if (in.empty())
return;
std::vector<Chunk> res;
Chunk ch = in[0];
for (size_t i = 1; i < in.size(); ++i)
{
if (in[i].begin == ch.end)
{
ch.end = in[i].end;
} else
{
res.push_back(ch);
ch = in[i];
}
}
res.push_back(ch);
chunks = res;
}
void merge_chunks_orig_algo(std::vector<Chunk>& chunks)
{
std::sort(in.begin(), in.end());
merge_chunks_sorted(chunks);
}
void merge_chunks_new_algo(std::vector<Chunk>& chunks)
{
size_t new_last_n = 0;
Chunk temp;
do {
int last_n = new_last_n;
new_last_n = chunks.size() - 1;
for (int i = chunks.size() - 2; i >= last_n; --i)
{
if (chunks[i].begin > chunks[i + 1].begin)
{
if (chunks[i].begin == chunks[i + 1].end)
{
chunks[i].begin = chunks[i + 1].begin;
if (i + 1 != chunks.size() - 1)
chunks[i + 1] = chunks[chunks.size() - 1];
chunks.pop_back();
} else
{
temp = chunks[i];
chunks[i] = chunks[i + 1];
chunks[i + 1] = temp;
}
new_last_n = i + 1;
} else
{
if (chunks[i].end == chunks[i + 1].begin)
{
chunks[i].end = chunks[i + 1].end;
if (i + 1 != chunks.size() - 1)
chunks[i + 1] = chunks[chunks.size() - 1];
chunks.pop_back();
}
}
}
} while (new_last_n < chunks.size() - 1);
}
void run_algo(void (*algo)(std::vector<Chunk>&))
{
static int count = 1;
// allowing the algo to modify the input vector is intentional
std::vector<Chunk> chunks = in;
size_t in_count = chunks.size();
boost::posix_time::ptime start = boost::posix_time::microsec_clock::local_time();
algo(chunks);
boost::posix_time::ptime stop = boost::posix_time::microsec_clock::local_time();
std::cout<<"algo "<<count++<<" took "<<stop - start<<std::endl;
// if all went ok, statistically we should have around 33% of the original chunks count in the output vector
std::cout<<" initial chunks count: "<<in_count<<", output chunks count: "<<chunks.size()<<std::endl;
}
int main()
{
boost::posix_time::ptime start = boost::posix_time::microsec_clock::local_time();
generate_input_data();
boost::posix_time::ptime stop = boost::posix_time::microsec_clock::local_time();
std::cout<<"generating input took:\t"<<stop - start<<std::endl;
run_algo(merge_chunks_orig_algo);
run_algo(merge_chunks_new_algo);
return 0;
}
Run Code Online (Sandbox Code Playgroud)
我在下面看到你提到 n 没有那么高。所以我用 1000 个块重新运行测试,运行 1000000 次,以使时间变得有意义。修改后的冒泡排序的性能仍然提高了 5 倍。基本上,对于 1000 个块,总运行时间为 3 微秒。下面的数字。
generating input took: 00:00:00
algo 1 took 00:00:15.343456, for 1000000 runs
initial chunks count: 1000, output chunks count: 355
algo 2 took 00:00:03.374935, for 1000000 runs
initial chunks count: 1000, output chunks count: 355
Run Code Online (Sandbox Code Playgroud)