我正在设计一个连接到一个或多个数据源流的系统,并对数据进行一些分析,而不是基于结果触发事件.在典型的多线程生产者/消费者设置中,我将有多个生产者线程将数据放入队列,并且多个消费者线程读取数据,并且消费者仅对最新数据点加上n个点感兴趣.如果慢速消费者无法跟上,生产者线程将不得不阻止,当然,当没有未经处理的更新时,消费者线程将被阻止.使用具有读取器/写入器锁的典型并发队列将很好地工作,但是进入的数据速率可能很大,因此我希望减少锁定开销,尤其是生产者的写入锁.我认为我需要一个循环无锁缓冲区.
现在有两个问题:
圆形无锁缓冲是答案吗?
如果是这样,在我自己推出之前,您是否知道任何符合我需求的公共实施?
任何指向实现循环无锁缓冲区的指针总是受欢迎的.
顺便说一句,在Linux上用C++做这件事.
一些额外的信息:
响应时间对我的系统至关重要.理想情况下,消费者线程会希望尽快看到任何更新,因为额外的1毫秒延迟可能会使系统失去价值,或者价值更低.
我倾向于的设计思想是一个半无锁的循环缓冲区,生产者线程尽可能快地将数据放入缓冲区,让我们调用缓冲区A的头部,除非缓冲区已满,否则A满足缓冲区Z的结束.消费者线程将各自保存两个指向循环缓冲区P和P n的指针,其中P是线程的本地缓冲区头,P n是P 之后的第n个项目.每个消费者线程将推进其P和P ñ一旦处理完当前P和缓冲器指针Z的端前进具有最慢P ñ.当P赶上A,这意味着没有更新的处理更新,消费者旋转并忙着等待A再次前进.如果消费者线程旋转太长时间,它可以进入休眠状态并等待条件变量,但我没关系消费者占用CPU周期等待更新,因为这不会增加我的延迟(我会有更多的CPU核心)比线程).想象一下,你有一个循环轨道,并且生产者正在一群消费者面前运行,关键是调整系统,使生产者通常比消费者领先一步,并且大部分操作都可以使用无锁技术完成.我理解获得正确实施的细节并不容易......好吧,非常难,这就是为什么我想在自己制作一些错误之前先从别人的错误中吸取教训.
我正在寻找一种方法来实现支持单个生产者和多个消费者的无锁队列数据结构.我看过Maged Michael和Michael Scott(1996)的经典方法,但他们的版本使用链表.我想要一个使用有界循环缓冲区的实现.什么东西使用原子变量?
另外,我不确定为什么这些经典方法是为需要大量动态内存管理的链表设计的.在多线程程序中,所有内存管理例程都是序列化的.我们不是通过将它们与动态数据结构结合使用来破坏无锁方法的好处吗?
我试图在英特尔64位架构上使用pthread库在C/C++中编写代码.
谢谢Shirish
在一些关于算法的文章中,有些使用了这个词lockfree,有些则使用了lockless.lockless和之间有什么区别lockfree?谢谢!
更新
http://www.intel.com/content/dam/www/public/us/en/documents/guides/intel-dpdk-programmers-guide.pdf
第5.2节 - "Linux*中的无锁环缓冲区",这是使用"无锁"一词的一个例子
我正在尝试在C++ 11中实现一个无锁多个生产者,多个消费者队列.我这样做是为了学习练习,所以我很清楚我可以使用现有的开源实现,但我真的很想知道为什么我的代码不起作用.数据存储在一个环形缓冲区中,显然它是一个"有界MPMC队列".
我已经将它与Disruptor的内容非常接近地建模了.我注意到的事情是它对单个消费者和单个/多个生产者来说绝对正常,它只是多个消费者似乎打破了它.
这是队列:
template <typename T>
class Queue : public IQueue<T>
{
public:
explicit Queue( int capacity );
~Queue();
bool try_push( T value );
bool try_pop( T& value );
private:
typedef struct
{
bool readable;
T value;
} Item;
std::atomic<int> m_head;
std::atomic<int> m_tail;
int m_capacity;
Item* m_items;
};
template <typename T>
Queue<T>::Queue( int capacity ) :
m_head( 0 ),
m_tail( 0 ),
m_capacity(capacity),
m_items( new Item[capacity] )
{
for( int i = 0; i < capacity; ++i )
{ …Run Code Online (Sandbox Code Playgroud) 我正在寻找符合这些要求的无锁设计:
我目前已经实现了一个包含这些结构的多个实例的ringbuffer; 但是这个实现受到以下事实的影响:当编写器使用了环形缓冲区中存在的所有结构时,没有更多的地方可以从结构中改变......但是,其余的环形缓冲区包含一些不必读取的数据由读者但不能被作者重复使用.因此,环形缓冲器不适合这个目的.
无锁设计的任何想法(名称或伪实现)?谢谢你考虑过这个问题.
存在三种不同类型的“无锁”算法。并发实践中给出的定义是:
\n\n\n非正式地,“无锁”\xe2\x89\x88“不使用互斥体”==其中任何一个。
\n
我不明白为什么基于锁的算法不能落入上面给出的无锁定义。这是一个简单的基于锁的程序:
\n#include <iostream>\n#include <mutex>\n#include <thread>\n\nstd::mutex x_mut;\n\nvoid print(int& x) {\n std::lock_guard<std::mutex> lk(x_mut);\n std::cout << x;\n}\n\nvoid add(int& x, int y) {\n std::lock_guard<std::mutex> lk(x_mut);\n x += y;\n}\n\nint main() {\n\n int i = 3;\n\n std::thread thread1{print, std::ref(i)};\n\n std::thread thread2(add, std::ref(i), 4);\n\n thread1.join();\n\n thread2.join();\n\n}\nRun Code Online (Sandbox Code Playgroud)\n如果这两个线程都在运行,那么在有限数量的步骤之后,其中一个必须完成。为什么我的程序不满足“无锁”的定义?
\n编写满足无锁进度保证的算法或数据结构的困难之一是动态内存分配:以可移植的方式调用类似malloc或new不保证无锁的东西。然而,存在许多malloc或的无锁实现,new也有多种无锁内存分配器可用于实现无锁算法/数据结构。
但是,我仍然不明白这实际上如何完全满足无锁进度保证,除非您将数据结构或算法明确限制为某些预先分配的静态内存池。但是,如果您需要动态内存分配,我不明白从长远来看,任何所谓的无锁内存分配器如何才能真正实现无锁。问题是,无论您的无锁malloc或new可能多么出色,最终您可能会耗尽内存,此时您必须退回到向操作系统请求更多内存。这意味着最终你必须打电话brk()或mmap()或者一些这样的低级等价物来实际访问更多内存。并且根本无法保证任何这些低级调用都是以无锁方式实现的。
根本没有办法解决这个问题(除非您使用的是像 MS-DOS 这样不提供内存保护的古老操作系统,或者您编写自己的完全无锁的操作系统——这两种情况都不实用或不太可能。)那么,动态内存分配器如何才能真正做到无锁呢?
因此,无锁是指尽管某些线程可能会饥饿,但您可以保证整个系统取得进展。因此基于 CAS 的实现将提供这种保证。
那么无阻塞是指如果所有其他线程都挂起,则保证一个线程完成。
你能举一个非无锁的无阻塞算法的简单例子吗?
谢谢!
从 C++20 std::atomics 开始,有等待和通知操作。使用 is_always_lock_free 我们可以确保实现是无锁的。使用这些积木构建无锁互斥锁并不那么困难。在简单的情况下,锁定将是比较交换操作,或者如果互斥体被锁定则等待。这里最大的问题是这是否值得。如果我可以创建这样的实现,那么 STL 版本很可能会更好、更快。然而,我仍然记得当我在 2016 年看到 QMutex 如何优于 std::mutex QMutex 与 std::mutex时,我是多么惊讶。那么您认为我应该尝试这样的实现还是 std::mutex 的当前实现已经足够成熟,可以进行远远超出这些技巧的优化?
更新 我的措辞不是最好的,我的意思是实现可以在快乐的路径上无锁(从未锁定状态锁定)。当然,如果我们需要等待获取锁,我们应该被阻塞并重新调度。在大多数平台上,atomic::wait 很可能不是通过简单的自旋锁实现的(现在让我们忽略极端情况),所以基本上它实现了 mutex::lock 所做的相同的事情。所以基本上,如果我实现这样一个类,它将执行与 std::mutex 完全相同的操作(同样在大多数流行平台上)。这意味着 STL 可以在支持这些技巧的平台上的互斥实现中使用相同的技巧。就像这个 spinlock,但我会使用原子等待而不是旋转。我应该相信我的 STL 实现是他们这样做的吗?