elasticsearch java批量大小

Jus*_*tin 5 java elasticsearch

我想使用java的elasticsearch bulk api,并想知道如何设置批量大小.

目前我正在使用它:

BulkRequestBuilder bulkRequest = getClient().prepareBulk();
while(hasMore) {
    bulkRequest.add(getClient().prepareIndex(indexName, indexType, artist.getDocId()).setSource(json));
    hasMore = checkHasMore();
}
BulkResponse bResp = bulkRequest.execute().actionGet();
//To check failures
log.info("Has failures? {}", bResp.hasFailures());
Run Code Online (Sandbox Code Playgroud)

知道如何设置批量/批量大小吗?

jav*_*nna 22

它主要取决于文档的大小,客户端上的可用资源以及客户端的类型(传输客户端或节点客户端).

节点客户端知道群集上的分片,并将文档直接发送到保存应该被索引的分片的节点.另一方面,传输客户端是普通客户端,它以循环方式将其请求发送到节点列表.然后,批量请求将被发送到一个节点,这将成为索引时的网关.

由于您正在使用Java API,我建议您查看一下BulkProcessor,这使得批量索引变得更加容易和灵活.您可以定义自上次批量执行以来的最大操作数,最大大小和最大时间间隔.它会在需要时自动为您执行批量处理.您还可以设置最大并发批量请求数.

在您创建之后BulkProcessor:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        logger.info("Going to execute new bulk composed of {} actions", request.numberOfActions());
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        logger.info("Executed bulk composed of {} actions", request.numberOfActions());
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        logger.warn("Error executing bulk", failure);
    }
    }).setBulkActions(bulkSize).setConcurrentRequests(maxConcurrentBulk).build();
Run Code Online (Sandbox Code Playgroud)

您只需要向其添加请求:

bulkProcessor.add(indexRequest);
Run Code Online (Sandbox Code Playgroud)

并在最后关闭它以刷新可能尚未执行的任何最终请求:

bulkProcessor.close();
Run Code Online (Sandbox Code Playgroud)

最后回答你的问题:关于BulkProcessor它的好处还在于它具有合理的默认值:5 MB大小,1000个动作,1个并发请求,没有刷新间隔(可能对设置有用).