在使用"executeAsync"时如何限制写入请求到cassandra?

joh*_*ohn 14 java multithreading rate-limiting backpressure datastax-java-driver

我使用datastax java驱动程序3.1.0连接到cassandra集群,我的cassandra集群版本是2.0.10.我与QUORUM一致性异步编写.

  private final ExecutorService executorService = Executors.newFixedThreadPool(10);

  public void save(String process, int clientid, long deviceid) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
  }
Run Code Online (Sandbox Code Playgroud)

我的上述save方法将以非常快的速度从多个线程调用.

题:

我想限制请求到executeAsync异步写入Cassandra的方法.如果我以高于我的Cassandra集群可以处理的速度写入,那么它将开始抛出错误,我希望我的所有写入都应成功进入cassandra而不会有任何损失.

我看到这篇文章的解决方案是使用Semaphore固定数量的许可证.但我不确定如何以及实施该方法的最佳方式是什么.我之前从未使用过Semaphor.这是逻辑.任何人都可以在我的代码上提供一个以信号量为基础的示例,或者如果有更好的方法/选项,那么请告诉我.

在编写dataloader程序的上下文中,您可以执行以下操作:

  • 为简单起见,请使用信号量或其他具有固定数量许可的构造(这将是您的最大飞行请求数).每当您使用executeAsync提交查询时,都会获得许可.您实际上只需要1个线程(但可能需要引入一个#cpu cores size of pool),它从Semaphore获取许可并执行查询.在获得许可证之前,它将阻止获取.
  • 使用Futures.addCallback从executeAsync返回的未来.回调应该在onSuccess和onFailure情况下调用Sempahore.release().通过释放许可证,这应该允许您在步骤1中的线程继续并提交下一个请求.

此外,我已经看到他们谈到使用的其他几个帖子,RingBuffer或者Guava RateLimitter哪个更好,我应该使用?以下是我能想到的选项:

  • 使用信号量
  • 使用环形缓冲区
  • 使用番石榴率限制器

任何人都可以帮我一个例子,说明我们如何限制请求或获取cassandra写入的背压并确保所有写入成功进入cassandra?

Pet*_*ikh 8

不是权威的答案,但可能会有所帮助.首先,你应该考虑当你的查询无法立即执行时你会做什么.无论您选择哪种速率限制,如果您收到的请求率高于您最近写入Cassandra的请求,您最终会因等待请求而阻塞您的流程.在那一刻,你需要告诉你的客户暂停他们的请求("推回").例如,如果他们通过HTTP来,那么响应状态将是429"Too Many Requests".如果在同一进程中生成请求,则确定可接受的最长超时.那说如果Cassandra跟不上,那就是缩放(或调整)它的时候了.

也许在实现速率限制之前,在调用save方法之前尝试并在线程中添加人为延迟是值得的(使用Thread.sleep(...))并查看它是否解决了您的问题或需要其他东西.

查询返回错误 Cassandra的背压.但您可以选择或实施RetryPolicy来确定何时重试失败的查询.

您还可以查看连接池选项(尤其是监视和调整池).可以调整每个连接的异步请求数.然而文档说,对于Cassandra 2.x,这个参数上限为128,一个不应该改变它(虽然我试验了:)

使用Semaphore的实现看起来像

/* Share it among all threads or associate with a thread for per-thread limits
   Number of permits is to be tuned depending on acceptable load.
*/
final Semaphore queryPermits = new Semaphore(20); 


public void save(String process, int clientid, long deviceid) {
  ....
  queryPermits.acquire(); // Blocks until a permit is available

  ResultSetFuture future = session.executeAsync(bs);
  Futures.addCallback(future, new FutureCallback<ResultSet>() {
    @Override
    public void onSuccess(ResultSet result) {
      queryPermits.release();
      logger.logInfo("successfully written");
    }
    @Override
    public void onFailure(Throwable t) {
      queryPermits.release(); // Permit should be released in all cases.
      logger.logError("error= ", t);
    }
  }, executorService);
  ....
}
Run Code Online (Sandbox Code Playgroud)

(在实际代码中,我将创建一个包装器回调,它将释放许可,然后调用包装方法)

Guava的RateLimiter类似于信号量,但在未充分利用期后允许临时突发,并根据时间限制请求(不是活动查询的总数).

但是,无论如何请求都会因各种原因而失败,因此最好有一个计划如何重试它们(如果出现间歇性错误).

它可能不适合你的情况,但我会尝试使用一些队列或缓冲区来排队请求(例如java.util.concurrent.ArrayBlockingQueue)."缓冲区已满"意味着客户应该等待或放弃请求.缓冲区还将用于重新排队失败的请求.然而,更公平的失败请求可能应该放在队列前面,以便首先重试它们.此外,当队列已满并且同时存在新的失败请求时,应该以某种方式处理该情况.然后,单线程工作者将从队列中选择请求并将其发送到Cassandra.因为它不应该做太多,所以它不太可能成为一个瓶颈.该工作人员还可以应用自己的速率限制,例如基于时间限制com.google.common.util.concurrent.RateLimiter.

如果想要尽可能避免丢失消息,他可以在Cassandra面前放置一个持久性的消息代理(例如Kafka).这样,即使长时间停止使用Cassandra,传入的消息也可以存活.但是,我想,你的情况太过分了.