如何使用datastax java驱动程序有效地使用批量写入cassandra?

joh*_*ohn 5 java cassandra datastax-java-driver

我需要使用Datastax Java驱动程序批量写入Cassandra,这是我第一次尝试使用批处理与datastax Java驱动程序,所以我有一些困惑 -

下面是我的代码,我在其中尝试创建一个Statement对象并将其添加到Batch并将ConsistencyLevel设置为QUORUM.

Session session = null;
Cluster cluster = null;

// we build cluster and session object here and we use  DowngradingConsistencyRetryPolicy as well
// cluster = builder.withSocketOptions(socketOpts).withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)

public void insertMetadata(List<AddressMetadata> listAddress) {
    // what is the purpose of unloggedBatch here?
    Batch batch = QueryBuilder.unloggedBatch();

    try {
        for (AddressMetadata data : listAddress) {
            Statement insert = insertInto("test_table").values(
                    new String[] { "address", "name", "last_modified_date", "client_id" },
                    new Object[] { data.getAddress(), data.getName(), data.getLastModifiedDate(), 1 });
            // is this the right way to set consistency level for Batch?
            insert.setConsistencyLevel(ConsistencyLevel.QUORUM);
            batch.add(insert);
        }

        // now execute the batch
        session.execute(batch);
    } catch (NoHostAvailableException e) {
        // log an exception
    } catch (QueryExecutionException e) {
        // log an exception
    } catch (QueryValidationException e) {
        // log an exception
    } catch (IllegalStateException e) {
        // log an exception
    } catch (Exception e) {
        // log an exception
    }
}
Run Code Online (Sandbox Code Playgroud)

以下是我的AddressMetadata课 -

public class AddressMetadata {

    private String name;
    private String address;
    private Date lastModifiedDate;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public Date getLastModifiedDate() {
        return lastModifiedDate;
    }

    public void setLastModifiedDate(Date lastModifiedDate) {
        this.lastModifiedDate = lastModifiedDate;
    }
}
Run Code Online (Sandbox Code Playgroud)

现在我的问题是 - 我使用Batch来插入带有Datastax Java Driver的cassandra的方式是否正确?那么重试策略呢,这意味着如果批处理语句执行失败,那么会发生什么,它会再次重试吗?

有没有更好的方法使用Java驱动程序使用批量写入cassandra?

pha*_*act 27

首先是一点咆哮:

Cassandra中的batch关键字不是用于批量处理大量数据的性能优化.

批处理用于将原子操作组合在一起,这些操作是您希望一起发生的操作.批次保证如果批次的单个部分成功,则整个批次成功.

使用批次可能不会使您的质量摄取更快

现在问你的问题:

unloggedBatch的目的是什么?

Cassandra使用称为批量日志记录的机制来确保批处理的原子性.通过指定未记录的批处理,您将关闭此功能,因此批处理不再是原子的,可能会因部分完成而失败.当然,记录批次并确保其原子性会有性能损失,使用未记录的批次将消除此惩罚.

在某些情况下,您可能希望使用未记录的批次来确保属于同一分区的请求(插入)一起发送.如果将操作一起批处理并且需要在不同的分区/节点中执行,则实质上是为协调器创建了更多工作.请参阅Ryan博客中的具体示例:

阅读这篇文章

现在我的问题是 - 我使用Batch来插入带有Datastax Java Driver的cassandra的方式是否正确?

我在这里看不到你的代码有什么问题,只取决于你想要实现的目标.深入了解我分享的博客文章,以获得更多洞察力.

那么重试策略呢,这意味着如果批处理语句执行失败,那么会发生什么,它会再次重试吗?

如果它失败,它自己的批处理将不会自行重试.驱动程序确实有重试策略,但您必须单独应用它们.

java驱动程序中的默认策略仅在以下场景中重试:

  • 在读取超时时,如果回复了足够的副本但未检索到数据.
  • 在写入超时时,如果我们在编写批处理语句使用的分布式日志时超时.

阅读有关默认策略的更多信息,并根据您的使用案例考虑不太保守的策略.