j9d*_*9dy 2 java cassandra datastax-java-driver
我为MongoDB和Cassandra构建了一个导入器.基本上导入器的所有操作都是相同的,除了最后一部分形成数据以匹配所需的cassandra表模式和想要的mongodb文档结构.与MongoDB相比,Cassandra的写入性能非常差,我认为我做错了.
基本上,我的抽象导入器类加载数据,读出所有数据并将其传递给扩展的MongoDBImporter或CassandraImporter类以将数据发送到数据库.一次针对一个数据库 - 同时没有"双重"插入C*和MongoDB.导入器在相同数量的节点上运行在同一台机器上(6).
问题:
MongoDB导入在57分钟后完成.我摄取了10.000.000个文档,我希望Cassandra的行数相同.我的Cassandra导入器现在运行2.5小时,并且只插入了5.000.000行.我将等待进口商完成并在此处编辑实际完成时间.
我如何用Cassandra导入:
我准备两个语句一旦摄取数据之前.这两个语句都是UPDATE查询,因为有时我必须将数据附加到现有列表.在开始导入之前,我的表格已完全清除.准备好的陈述一次又一次地被使用.
PreparedStatement statementA = session.prepare(queryA);
PreparedStatement statementB = session.prepare(queryB);
Run Code Online (Sandbox Code Playgroud)
对于每一行,我创建一个BoundStatement并将该语句传递给我的"自定义"批处理方法:
BoundStatement bs = new BoundStatement(preparedStatement); //either statementA or B
bs = bs.bind();
//add data... with several bs.setXXX(..) calls
cassandraConnection.executeBatch(bs);
Run Code Online (Sandbox Code Playgroud)
使用MongoDB,我可以一次插入1000个文档(这是最大的)没有问题.对于Cassandra来说,进口商com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large在某些时候仅仅因为我的10个陈述而崩溃了.我正在使用此代码来构建批次.顺便说一句,我以1000,500,300,200,100,50,20批量开始,但显然它们也不起作用.然后我将其设置为10并再次抛出异常.现在我已经没有想法为什么它会破裂.
private static final int MAX_BATCH_SIZE = 10;
private Session session;
private BatchStatement currentBatch;
...
@Override
public ResultSet executeBatch(Statement statement) {
if (session == null) {
throw new IllegalStateException(CONNECTION_STATE_EXCEPTION);
}
if (currentBatch == null) {
currentBatch = new BatchStatement(Type.UNLOGGED);
}
currentBatch.add(statement);
if (currentBatch.size() == MAX_BATCH_SIZE) {
ResultSet result = session.execute(currentBatch);
currentBatch = new BatchStatement(Type.UNLOGGED);
return result;
}
return null;
}
Run Code Online (Sandbox Code Playgroud)
我的C*架构看起来像这样
CREATE TYPE stream.event (
data_dbl frozen<map<text, double>>,
data_str frozen<map<text, text>>,
data_bool frozen<map<text, boolean>>,
);
CREATE TABLE stream.data (
log_creator text,
date text, //date of the timestamp
ts timestamp,
log_id text, //some id
hour int, //just the hour of the timestmap
x double,
y double,
events list<frozen<event>>,
PRIMARY KEY ((log_creator, date, hour), ts, log_id)
) WITH CLUSTERING ORDER BY (ts ASC, log_id ASC)
Run Code Online (Sandbox Code Playgroud)
我有时需要在现有行中添加更多新事件.这就是我需要一个UDT列表的原因.我的UDT包含三个映射,因为事件创建者生成不同的数据(string/double/boolean类型的键/值对).我知道UDT已被冻结,我无法触及已经摄取事件的地图.这对我来说很好,我只需要添加有时候具有相同时间戳的新事件.我在日志的创建者(一些传感器名称)以及记录的日期(即"22-09-2016")和时间戳的小时上进行分区(以便在保持相关数据靠近的同时更多地分发数据)一个分区).
我在我的pom中使用Cassandra 3.0.8和Datastax Java Driver 3.1.0版.根据Cassandra的批量限制是多少?,我不应该通过调整batch_size_fail_threshold_in_kb我的增加批量大小cassandra.yaml.那么......我的导入做了什么或出了什么问题?
更新 所以我调整了我的代码来运行异步查询并将当前运行的插入存储在列表中.每当异步插入完成时,它将从列表中删除.当列表大小超过阈值并且之前插入中发生错误时,该方法将等待500毫秒,直到插入低于阈值.我的代码现在在没有插入失败时自动增加阈值.
但是在流式传输3.300.000行之后,正在处理280.000个插入但没有发生错误.这似乎当前处理的插入数量看起来太高.6个cassandra节点在商用硬件上运行,该硬件已有2年历史.
这是并发插入的高数字(6个节点280.000)有问题吗?我应该添加一个变量MAX_CONCURRENT_INSERT_LIMIT吗?
private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...
@Override
public void executeBatch(Statement statement) throws InterruptedException {
if (this.runningInsertList == null) {
this.runningInsertList = new ArrayList<>();
}
//Sleep while the currently processing number of inserts is too high
while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
Thread.sleep(concurrentInsertSleepTime);
}
ResultSetFuture future = this.executeAsync(statement);
this.runningInsertList.add(future);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
runningInsertList.remove(future);
}
@Override
public void onFailure(Throwable t) {
concurrentInsertErrorOccured = true;
}
}, MoreExecutors.sameThreadExecutor());
if (!concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
concurrentInsertLimit += 2000;
LOGGER.info(String.format("New concurrent insert limit is %d", concurrentInsertLimit));
}
return;
}
Run Code Online (Sandbox Code Playgroud)
在使用C*之后,我确信你应该真正使用批处理来保持多个表同步.如果您不需要该功能,则根本不要使用批次,因为这会导致性能损失.
将数据加载到C*的正确方法是使用异步写入,如果您的群集无法跟上摄取率,则可选择背压.您应该使用以下内容替换"自定义"批处理方法:
要执行异步写入,请使用.executeAsync将返回ResultSetFuture对象的方法.
为了控制多少飞行查询只收集ResultSetFuture从.executeAsync列表中的方法中检索到的对象,如果列表获得(此处为球场值),则说出1k元素,然后在发出更多写入之前等待所有这些元素完成.或者你可以在发出一次写入之前等待第一次完成,只是为了保持列表满.
最后,您可以在等待操作完成时检查写入失败.在这种情况下,您可以:
从1到4,您的背压强度会增加.选择最适合您的情况.
问题更新后编辑
您的插入逻辑对我来说似乎有些不妥:
while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit)错了,因为你只有在发出的查询数量是>时才会睡觉,因为你concurrentInsertLimit的线程只会停在那里.concurrentInsertErrorOccured我通常会保留一个(失败的)查询列表,以便以后重试它们.这让我对查询有了强大的控制权,当失败的查询开始累积时,我会睡一会儿,然后继续重试(最多X次,然后很难失败......).
此列表应该非常动态,例如,您在查询失败时添加项目,并在执行重试时删除项目.现在,您可以了解群集的限制,并concurrentInsertLimit根据例如最后一秒中失败查询的平均数量进行调整,或者坚持使用更简单的方法" 暂停,如果我们在重试列表中有项目 "等等...
评论后编辑2
由于您不需要任何重试逻辑,我会以这种方式更改您的代码:
private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...
@Override
public void executeBatch(Statement statement) throws InterruptedException {
if (this.runningInsertList == null) {
this.runningInsertList = new ArrayList<>();
}
ResultSetFuture future = this.executeAsync(statement);
this.runningInsertList.add(future);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
runningInsertList.remove(future);
}
@Override
public void onFailure(Throwable t) {
runningInsertList.remove(future);
concurrentInsertErrorOccured = true;
}
}, MoreExecutors.sameThreadExecutor());
//Sleep while the currently processing number of inserts is too high
while (runningInsertList.size() >= concurrentInsertLimit) {
Thread.sleep(concurrentInsertSleepTime);
}
if (!concurrentInsertErrorOccured) {
// Increase your ingestion rate if no query failed so far
concurrentInsertLimit += 10;
} else {
// Decrease your ingestion rate because at least one query failed
concurrentInsertErrorOccured = false;
concurrentInsertLimit = Max(1, concurrentInsertLimit - 50);
while (runningInsertList.size() >= concurrentInsertLimit) {
Thread.sleep(concurrentInsertSleepTime);
}
}
return;
}
Run Code Online (Sandbox Code Playgroud)
您还可以通过替换List<ResultSetFuture>计数器来优化程序.
希望有所帮助.
在Cassandra中运行批处理时,它会选择一个节点作为协调器.然后,该节点负责查看批量写入找到其适当的节点.因此(例如)通过将10000个写入一起批处理,您现在已经为一个节点执行了协调10000次写入的任务,其中大多数将用于不同的节点.通过执行此操作,可以非常轻松地提示节点,或者消除整个群集的延迟.因此,批量大小限制的原因.
问题是Cassandra CQL BATCH是用词不当,它不会做你或别人认为它做的事情.它不能用于提高性能.并行异步写入总是比运行相同数量的语句BATCH一样快.
我知道我可以轻松地将10.000行一起批量处理,因为它们会转到同一个分区....你还会使用单行插入(异步)而不是批次吗?
这取决于写性能是否是您的真正目标.如果是这样,那么我仍然坚持使用并行,异步写入.
有关这方面的更多信息,请查看DataStax的Ryan Svihla撰写的这两篇博文:
Cassandra:无批量批量加载 - Nuanced Edition
| 归档时间: |
|
| 查看次数: |
1736 次 |
| 最近记录: |