我试图使用ListenableFuture对Cassandra集群进行异步写入,如下所示:
private static Cluster cluster = null;
private ListeningExecutorService executorService;
private PreparedStatement preparedStatement;
private Session session = null;
...
executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(POOL_SIZE));
...
public void writeValue(Tuple tuple) {
ListenableFuture<String> future = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
if(session == null) {
session = getCluster().connect("dbname");
preparedStatement = session.prepare(queryString);
}
try {
BoundStatement boundStatement = preparedStatement.bind(tuple values);
session.execute(boundStatement);
} catch(Exception exception) {
// handle exception
}
return null;
}
});
Run Code Online (Sandbox Code Playgroud)
如果我将POOL_SIZE设置为1,一切正常.
如果我将POOL_SIZE设置为> 1,我会收到如下错误:
引起:com.datastax.driver.core.exceptions.InvalidQueryException:试图执行未知的准备查询:0x75c5b41b9f07afa5384a69790503f963.您可能已使用使用其他Cluster实例创建的PreparedStatement.
所以我session和preparedStatement当地的变种.然后我得到关于Re-preparing already prepared query ...加上它每次都创建一个新会话的警告.
我想尽可能多地重用.我做错了什么,我有什么选择?
将这个类静态化会有帮助吗?
你在这里有各种各样的竞争条件,并且执行不是线程安全的.
每个的Cluster,Session和PreparedStatement正在意思是应用范围的单身人士,即.你只需要一个(每个查询一个PreparedStatement).
但是,您正在重新创建Session并可能PreparedStatement多次准备.
别.Session在构造函数或某个只运行一次的位置初始化您的一次,并同时准备您的语句.然后使用Session和PreparedStatement在适当情况下.
使用单线程执行程序,一切都像同步一样运行.当您添加更多线程时,其中许多可能会调用
session.prepare(queryString);
Run Code Online (Sandbox Code Playgroud)
同时.或者PreparedStatement你在这里使用
BoundStatement boundStatement = preparedStatement.bind(tuple values);
session.execute(boundStatement);
Run Code Online (Sandbox Code Playgroud)
可能与您初始化的不同
preparedStatement = session.prepare(queryString);
Run Code Online (Sandbox Code Playgroud)
即使在同一个执行线程内.或者您可能尝试使用与初始化它PreparedStatement不同的方法执行Session.
准备好的语句是绑定在一个会话上还是可以在另一个会话中使用?
预准备语句派生自特定会话实例.因此,当您准备一个语句并将其发送到服务器时,它将被发送到与此会话实例关联的集群.
的javadoc的Session状态
会话实例是线程安全的,通常每个应用程序的单个实例就足够了.