LinkedBlockingQueue .take()表现

use*_*100 0 java multithreading

我使用的LinkedBlockingQueue是多个线程填充,项目数量很大(数千万个对象).

LinkedBlockingQueue.take()花费大量时间(通过分析器检查) - 56%的时间.
队列永远不会空!

什么会影响take()方法的性能?

更新:我在处理take()结果的代码中做了一些更改,我还将take()放到另一个线程中,性能几乎提高了50%.

我不明白这是怎么可能的,因为我没有改变推杆的任何逻辑......

更新:

我在调用take()之前计算了队列已满的次数:
使用原始代码,90%的调用队列已满.
通过改进的代码,13%的呼叫队列已满.

Pet*_*rey 5

take()的重量相当轻,但如果你调用它就会占用大量的CPU.听起来你传递了大量的物品,这些物品对于消费者来说需要的工作量非常小.我建议尝试重构你的问题,以便更有效地完成.

你可以,例如,

  • 使用专为此类问题设计的Disruptor.
  • 发送块/批量数据,例如对象列表而不是许多单个对象.
  • 使用多个交换器(适用于少数作者)
  • 如果您的目标是持久保存数据,请使用像Chronicle这样的内容.

您的个人资料也可能不完全准确.当您测量很小的时间段时,它可以给出混合的结果.

BTW:take()是单线程的.如果你有许多线程试图同时调用take(),它们将互相阻塞.


sel*_*lig 5

如果我们在这里查看代码,我们可以看到在获取元素之前必须进行锁定.如果有很多线程正在进行,那么这个锁就会存在争用 - 线程不会等待某些事情出现,而是让其他线程采取措施.

 public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            try {
                while (count.get() == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to a non-interrupted thread
                throw ie;
            }

            x = extract();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
Run Code Online (Sandbox Code Playgroud)

编辑

在你有一个接受者和许多推杆并且队列不断满的情况signalNotFull()下,这个代码将给出瓶颈

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}
Run Code Online (Sandbox Code Playgroud)

这需要putLock表明队列中现在有空间这一事实.