如何有效地将“std::sort”用于具有运行时记录长度的不透明数据类型?

Ran*_*its 5 c++ sorting iterator c++20

问题

我想用于std::sort具有运行时记录长度的不透明数据类型。这是对类似问题的回答。

尝试

我尝试过间接对记录指针和索引进行排序,但这些方法的缓存局部性很差,因为比较需要访问数据集的随机分布的行。

我还尝试回退到允许运行时记录长度的C函数。qsort这样更好,但是需要一个回调函数来进行比较。这有两个缺点,1)与使用可内联的 lambda 相比,性能会受到影响;2)界面人体工学效果很差,因为您(可能)必须创建一个结构来保存函子以进行比较。

可能的想法

前面提到的答案建议,

带有专门交换的 pod 块伪引用迭代器,

但没有给出任何关于如何运作的提示。

概括

总而言之,我想要一种方法来调用std::sort传递 lambda 来比较具有运行时记录长度的不透明数据类型数组,如以下代码所示,

#include <iostream>
#include <random>

using std::cout, std::endl;

int main(int argc, const char *argv[]) {
    // We have 100 records each with 50 bytes.
    int nrecords = 100, nbytes = 50;
    std::vector<uint8_t> data(nrecords * nbytes);

    // Generate random data for testing.
    std::uniform_int_distribution<uint8_t> d;
    std::mt19937_64 rng;
    std::generate(data.begin(), data.end(), [&]() { return d(rng); });

    int key_index = 20;
    auto cmp = [&](uint8_t *a, uint8_t *b) {
        int aval = *(int*)(a + key_index), bval = *(int*)(b + key_index);
        return aval < bval;
    };

    // How can I call std::sort with run-time record length?
    // std::sort(data.begin(), ...)
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

迭代器尝试

我尝试根据前面的建议编写一个迭代器,但我还无法编译它。我的迭代器 foo低于标准,我觉得我犯了概念性错误。

这是代码,

#include <algorithm>
#include <random>

struct MemoryIterator {
    using iterator_category = std::forward_iterator_tag;
    using difference_type = std::ptrdiff_t;
    using value_type = uint8_t *;
    using pointer = value_type*;
    using reference = value_type&;

    MemoryIterator(value_type ptr, size_t element_size)
        : ptr_(ptr)
        , element_size_(element_size) {
    }

    reference operator*() {
        return ptr_;
    }

    MemoryIterator& operator++() {
        ptr_ += element_size_;
        return *this;
    }

    MemoryIterator& operator--() {
        ptr_ -= element_size_;
        return *this;
    }

    MemoryIterator operator++(int) {
        auto tmp = *this;
        ++(*this);
        return tmp;
    }

    MemoryIterator operator--(int) {
        auto tmp = *this;
        --(*this);
        return tmp;
    }

    MemoryIterator& operator+=(size_t n) {
        ptr_ += n * element_size_;
        return *this;
    }

    MemoryIterator operator+(size_t n) {
        auto r = *this;
        r += n;
        return r;
    }

    MemoryIterator& operator-=(size_t n) {
        ptr_ -= n * element_size_;
        return *this;
    }

    MemoryIterator operator-(size_t n) {
        auto r = *this;
        r -= n;
        return r;
    }
    friend bool operator==(const MemoryIterator& a, const MemoryIterator& b) {
        return a.ptr_ == b.ptr_;
    }

    friend difference_type operator-(const MemoryIterator& a, const MemoryIterator& b) {
        return a.ptr_ - b.ptr_;
    }

    friend void swap(MemoryIterator a, MemoryIterator b) {
    }

private:
    value_type ptr_;
    size_t element_size_;
};

int main(int argc, const char *argv[]) {
    int nrecords = 100, nbytes = 50;
    std::vector<uint8_t> data(nrecords * nbytes);

    std::uniform_int_distribution<uint8_t> d;
    std::mt19937_64 rng;
    std::generate(data.begin(), data.end(), [&]() { return d(rng); });

    int key_index = 20;
    auto cmp = [&](uint8_t *a, uint8_t *b) {
        int aval = *(int*)(a + key_index), bval = *(int*)(b + key_index);
        return aval < bval;
    };

    MemoryIterator begin(data.data(), nbytes);
    MemoryIterator end(data.data() + data.size(), nbytes); 
    std::sort(begin, end, cmp);

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

它抱怨它无法比较两个MemoryIterator这是真的——它需要使用比较函数。

编译错误

/opt/local/libexec/llvm-16/bin/../include/c++/v1/__algorithm/sort.h:533:21: error:
      invalid operands to binary expression ('MemoryIterator' and 'MemoryIterator')
            if (__i >= __j)
                ~~~ ^  ~~~
/opt/local/libexec/llvm-16/bin/../include/c++/v1/__algorithm/sort.h:639:8: note:
      in instantiation of function template specialization
      'std::__introsort<std::_ClassicAlgPolicy, (lambda at
      sort0.cpp:92:16) &, MemoryIterator>' requested here
  std::__introsort<_AlgPolicy, _Compare>(__first, __last, __comp, __depth_limit);
       ^
/opt/local/libexec/llvm-16/bin/../include/c++/v1/__algorithm/sort.h:699:10: note:
      in instantiation of function template specialization 'std::__sort<(lambda at
      sort0.cpp:92:16) &, MemoryIterator>' requested here
    std::__sort<_WrappedComp>(std::__unwrap_iter(__first), std::__unwrap_iter(__last), _...
Run Code Online (Sandbox Code Playgroud)

chr*_*nte 3

std::sort以下是我们按照您想要的方式工作所需的要素:

代理参考

代理引用是用户定义的类型(尽可能模仿 C++ 引用)。通过它们,字节数组中的元素实际上被移动std::sort

请注意,为了实现比 更强大的实现,您还可以在标准库容器中sort提供ConstRecordReference类似于iterator和 的, 。const_iterator

struct RecordReference {
    explicit RecordReference(void* data, size_t element_size):
        _data(data), _size(element_size) {}
    
    RecordReference(Record&);
    
    RecordReference(RecordReference const& rhs):
        _data(rhs.data()), _size(rhs.size()) {}
    
    /// Because this represents a reference, an assignment represents a copy of
    /// the referred-to value, not of the reference
    RecordReference& operator=(RecordReference const& rhs) {
        assert(size() == rhs.size());
        std::memcpy(data(), rhs.data(), size());
        return *this;
    }
    
    RecordReference& operator=(Record const& rhs);
    
    /// Also `swap` swaps 'through' the reference
    friend void swap(RecordReference a, RecordReference b) {
        assert(a.size() == b.size());
        size_t size = a.size();
        // Perhaps don't use alloca if you're really serious
        // about standard conformance
        auto* buffer = (void*)alloca(size);
        std::memcpy(buffer, a.data(), size);
        std::memcpy(a.data(), b.data(), size);
        std::memcpy(b.data(), buffer, size);
    }
    
    void* data() const { return _data; }
    
    size_t size() const { return _size; }
    
private:
    void* _data;
    size_t _size;
};

/// After the definition of Record (see below):

RecordReference::RecordReference(Record& rhs):
    _data(rhs.data()), _size(rhs.size()) {}

RecordReference& RecordReference::operator=(Record const& rhs) {
    assert(size() == rhs.size());
    std::memcpy(data(), rhs.data(), size());
    return *this;
}
Run Code Online (Sandbox Code Playgroud)

代表您的记录的类类型

令人烦恼的是我们甚至需要这个。我们需要它是因为 libstdc++ 的std::sort实现(可能还有任何实现)创建了一些堆栈分配的值副本,因此我们需要为value_type实际上代表记录的迭代器提供一些合理的内容。

我们提供内联缓冲区以避免堆分配达到合理的大小。您可以将该InlineSize参数调整为记录的预期大小。

struct Record {
    static constexpr size_t InlineSize = 56; 
    
    Record(RecordReference ref): _size(ref.size()) {
        if (is_inline()) {
            std::memcpy(&inline_data, ref.data(), size());
        }
        else {
            heap_data = std::malloc(size());
            std::memcpy(heap_data, ref.data(), size());
        }
    }

    Record(Record&& rhs) noexcept: _size(rhs.size()) {
        if (is_inline()) {
            std::memcpy(&inline_data, &rhs.inline_data, size());
        }
        else {
            heap_data = rhs.heap_data;
            rhs.heap_data = nullptr;
        }
    }
    
    ~Record() {
        if (!is_inline()) {
            std::free(heap_data);
        }
    }
    
    void* data() { return (void*)((Record const*)this)->data(); }
    
    void const* data() const { return is_inline() ? inline_data : heap_data; }
    
    size_t size() const { return _size; }
    
private:
    bool is_inline() const { return _size <= InlineSize; }
    
    size_t _size;
    union {
        char inline_data[InlineSize];
        void* heap_data;
    };
};
Run Code Online (Sandbox Code Playgroud)

然而,至少 libc++ 的std::sort实现似乎一次只创建一个这种类型的对象。这意味着我们可以thread_local在堆栈上预先分配一个缓冲区来保存该单个对象。通过这种方式,我们可以防止任何记录大小的堆分配,但代价是依赖于实现细节std::sort并可能在未来的构建中创建运行时错误,因此我不建议这样做。此外,这使得这些类型对于许多其他算法来说毫无用处。

最后是你的迭代器类

正如评论中所指出的,它需要定义关系比较运算符(以满足随机访问迭代器的要求或只是因为sort逻辑上需要它们)。

它通过和typedef公开我们的Record和类型。RecordReferencevalue_typereference

struct MemoryIterator {
    using iterator_category = std::random_access_iterator_tag;
    using difference_type = std::ptrdiff_t;
    using value_type = Record;
    using pointer = void*;
    using reference = RecordReference;

    MemoryIterator(pointer ptr, size_t size)
        : ptr_((char*)ptr)
        , size_(size) {
    }

    reference operator*() const {
        return reference(ptr_, size_);
    }

    reference operator[](size_t index) const {
        return *(*this + index);
    }

    MemoryIterator& operator++() {
        ptr_ += size_;
        return *this;
    }

    MemoryIterator& operator--() {
        ptr_ -= size_;
        return *this;
    }

    MemoryIterator operator++(int) {
        auto tmp = *this;
        ++(*this);
        return tmp;
    }

    MemoryIterator operator--(int) {
        auto tmp = *this;
        --(*this);
        return tmp;
    }

    MemoryIterator& operator+=(size_t n) {
        ptr_ += n * size_;
        return *this;
    }

    MemoryIterator operator+(size_t n) const {
        auto r = *this;
        r += n;
        return r;
    }

    MemoryIterator& operator-=(size_t n) {
        ptr_ -= n * size_;
        return *this;
    }

    MemoryIterator operator-(size_t n) const {
        auto r = *this;
        r -= n;
        return r;
    }
    
    friend bool operator==(const MemoryIterator& a, const MemoryIterator& b) {
        assert(a.size_ == b.size_);
        return a.ptr_ == b.ptr_;
    }
    
    friend std::strong_ordering operator<=>(const MemoryIterator& a, const MemoryIterator& b) {
        assert(a.size_ == b.size_);
        return a.ptr_ <=> b.ptr_;
    }

    friend difference_type operator-(const MemoryIterator& a, const MemoryIterator& b) {
        assert(a.size_ == b.size_);
        return (a.ptr_ - b.ptr_) / a.size_;
    }

private:
    char* ptr_;
    size_t size_;
};
Run Code Online (Sandbox Code Playgroud)

在这里您可以找到代码的现场演示。