Cassandra如何处理阻塞datastax java驱动程序中的execute语句

Xit*_*rum 4 cassandra datastax-java-driver datastax

阻止从com.datastax.driver.core.Session执行fethod

public ResultSet execute(Statement statement);
Run Code Online (Sandbox Code Playgroud)

评论这种方法:

该方法阻塞,直到从数据库接收到至少一些结果.但是,对于SELECT查询,它不保证已完全接收结果.但它确实保证从数据库收到了一些响应,特别是保证如果请求无效,则此方法将抛出异常.

来自com.datastax.driver.core.Session的非阻塞执行方法

public ResultSetFuture executeAsync(Statement statement);
Run Code Online (Sandbox Code Playgroud)

此方法不会阻止.一旦查询传递到底层网络堆栈,它就会返回.特别是,从此方法返回并不保证查询有效或甚至已提交到活动节点.访问{@link ResultSetFuture}时,将抛出与查询失败有关的任何异常.

我有关于它们的02个问题,因此如果你能帮我理解它们会很棒.

假设我有100万条记录,我希望所有这些记录都能到达数据库(没有丢失).

问题1:如果我有n个线程,则所有线程将具有发送到数据库所需的相同数量的记录.所有这些都继续使用阻塞执行调用向cassandra发送多个插入查询.如果我增加n的值,它是否也有助于加快我需要将所有记录插入cassandra的时间?

这会导致cassandra的性能问题吗?Cassandra是否必须确保对于每个插入记录,群集中的所有节点都应立即知道新记录?为了保持数据的一致性.(我假设cassandra节点甚至不会考虑使用本地机器时间来控制记录插入时间).

问题2:通过非阻塞执行,我如何确保所有插入成功?我知道的唯一方法是等待ResultSetFuture检查插入查询的执行.有什么更好的办法吗?非阻塞执行更容易失败然后阻塞执行的可能性更高吗?

非常感谢您的帮助.

And*_*ert 6

如果我有n个线程,则所有线程将具有发送到数据库所需的相同数量的记录.所有这些都继续使用阻塞执行调用向cassandra发送多个插入查询.如果我增加n的值,它是否也有助于加快我需要将所有记录插入cassandra的时间?

在某种程度上.让我们稍微分离一下客户端实现细节,并从"并发请求数"的角度看待事情,因为如果你使用executeAsync,你不需要为每个正在进行的请求都有一个线程.在我的测试中,我发现虽然拥有大量并发请求有很多价值,但是有一个阈值,其回报递减或性能开始下降.我的一般经验法则是,但您可以或多或少找到更优化的结果.(number of Nodes *native_transport_max_threads (default: 128)* 2)

这里的想法是,将cassandra一次处理的请求排入队列的次数并不多.在减少飞行请求数量的同时,可以限制驱动程序客户端和cassandra之间连接的不必要拥塞.

问题2:通过非阻塞执行,我如何确保所有插入成功?我知道的唯一方法是等待ResultSetFuture检查插入查询的执行.有什么更好的办法吗?非阻塞执行更容易失败然后阻塞执行的可能性更高吗?

在ResultSetFuture上等待get是一条路线,但如果您正在开发一个完全异步的应用程序,则希望尽可能避免阻塞.使用番石榴,你的两个最好的武器是Futures.addCallbackFutures.transform.

  • Futures.addCallback允许您注册FutureCallback在驱动程序收到响应时执行的操作. onSuccess在成功的情况下执行,onFailure否则.

  • Futures.transform允许您有效地将返回映射ResultSetFuture到其他内容.例如,如果您只想要1列的值,则可以使用它转换ListenableFuture<ResultSet>为a ListenableFuture<String>而不必在代码中阻塞ResultSetFuture然后获取String值.

在编写dataloader程序的上下文中,您可以执行以下操作:

  1. 为简单Semaphore起见,请使用具有固定数量许可的一个或其他构造(这将是您的最大飞行请求数).每当您使用提交查询时executeAsync,都会获得许可.您实际上只需要1个线程(但可能需要引入一个#cpu cores size of pool),它从Semaphore获取许可并执行查询.在获得许可证之前,它将阻止获取.
  2. 使用Futures.addCallback为将来的返回executeAsync.回调应调用Sempahore.release()两者onSuccessonFailure案例.通过释放许可证,这应该允许您在步骤1中的线程继续并提交下一个请求.

要进一步提高吞吐量,您可能需要考虑BatchStatement批量使用和提交请求.如果您保持批量较小(50-250是一个很好的数字)并且批量插入所有共享相同的分区键,这是一个很好的选择.