弹性搜索中的EsRejectedExecutionException用于并行搜索

Vip*_*pin 52 exception elasticsearch

我在我的应用程序中使用单个传输客户端实例查询多个并行请求的elasticsearch.

我得到了并行执行的以下异常.如何克服这个问题.

org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 1000) on org.elasticsearch.search.action.SearchServiceTransportAction$23@5f804c60
    at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
    at org.elasticsearch.search.action.SearchServiceTransportAction.execute(SearchServiceTransportAction.java:509)
    at org.elasticsearch.search.action.SearchServiceTransportAction.sendExecuteScan(SearchServiceTransportAction.java:441)
    at org.elasticsearch.action.search.type.TransportSearchScanAction$AsyncAction.sendExecuteFirstPhase(TransportSearchScanAction.java:68)
    at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.performFirstPhase(TransportSearchTypeAction.java:171)
    at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.start(TransportSearchTypeAction.java:153)
    at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:52)
    at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:42)
    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63)
    at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:107)
    at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:43)
    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63)
    at org.elasticsearch.action.search.TransportSearchAction$TransportHandler.messageReceived(TransportSearchAction.java:124)
    at org.elasticsearch.action.search.TransportSearchAction$TransportHandler.messageReceived(TransportSearchAction.java:113)
    at org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(MessageChannelHandler.java:212)
    at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:109)
    at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:74)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

Vin*_*han 74

Elasticsearch有一个线程池和一个每个节点搜索的队列.线程池将有N个工作人员准备好处理请求.当请求到来并且工作者是空闲时,这由工作人员处理.现在,默认情况下,worker的数量等于该CPU上的核心数.当工作人员已满并且有更多搜索请求时,请求将进入队列.队列的大小也是有限的.如果默认大小为100,并且如果发生的并行请求多于此数,那么这些请求将被拒绝,如错误日志中所示.

解决方案:

  1. 对此的直接解决方案是增加搜索队列的大小.我们还可以增加线程池的大小,但这可能会严重影响单个查询的性能.因此,增加队列可能是个好主意.但是请记住,这个队列是内存住宅,并且增加队列大小太多会导致Out Of Memory问题.(更多信息)

  2. 增加节点和副本的数量 - 请记住每个节点都有自己的搜索线程池/队列.此外,搜索可能发生在主分片或副本上.

  • 我只是想知道为什么他不这么说。更简单的“线程池已满,请修复我!” 加上堆栈跟踪会让我的生活变得更加轻松。 (3认同)

And*_*fan 5

也许这听起来很奇怪,但你需要降低并行搜索次数.有了这个例外,Elasticsearch告诉你,你正在超载它.在Elasticsearch中设置了一些限制(在线程计数级别),并且大多数情况下,这些限制的默认值是最佳选项.因此,如果您正在测试群集以查看它可以承载多少负载,那么这将表明已达到某些限制.

或者,如果您确实要更改默认值,可以尝试增加搜索的队列大小以适应并发需求,但请记住,队列大小越大,您在群集上施加的压力就越大,最后,会导致不稳定.


Dar*_*ous 5

我看到了同样的错误,因为我并行向 ES 发送了大量索引请求。由于我正在编写数据迁移,因此很容易将它们串行化,从而解决了问题。