bcf*_*ado 13 java cassandra full-table-scan
第一:我知道在Cassandra进行全面扫描并不是一个好主意,但是,目前,这就是我需要的.
当我开始寻找像这样做的东西时,我读到人们说不可能在卡桑德拉进行全面扫描而且他没有做这种事情.
不满意,我一直在寻找,直到找到这篇文章:http: //www.myhowto.org/bigdata/2013/11/04/scanning-the-entire-cassandra-column-family-with-cql/
看起来很合理,我试一试.因为我将只执行一次全扫描,时间和性能不是问题,我编写了查询并将其放在一个简单的Job中查找我想要的所有记录.从20亿行记录中,1000个是我预期的输出,但是,我只有100条记录.
我的工作:
public void run() {
Cluster cluster = getConnection();
Session session = cluster.connect("db");
LOGGER.info("Starting ...");
boolean run = true;
int print = 0;
while ( run ) {
if (maxTokenReached(actualToken)) {
LOGGER.info("Max Token Reached!");
break;
}
ResultSet resultSet = session.execute(queryBuilder(actualToken));
Iterator<Row> rows = resultSet.iterator();
if ( !rows.hasNext()){
break;
}
List<String> rowIds = new ArrayList<String>();
while (rows.hasNext()) {
Row row = rows.next();
Long leadTime = row.getLong("my_column");
if (myCondition(myCollumn)) {
String rowId = row.getString("key");
rowIds.add(rowId);
}
if (!rows.hasNext()) {
Long token = row.getLong("token(rowid)");
if (!rowIds.isEmpty()) {
LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
}
actualToken = nextToken(token);
}
}
}
LOGGER.info("Done!");
cluster.shutdown();
}
public boolean maxTokenReached(Long actualToken){
return actualToken >= maxToken;
}
public String queryBuilder(Long nextRange) {
return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString());
}
public Long nextToken(Long token){
return token + 1;
}
Run Code Online (Sandbox Code Playgroud)
基本上我所做的是搜索允许的最小令牌,并逐步进行到最后一次.
我不知道,但就像工作没有完全扫描完全扫描或我的查询只访问过一个节点或东西.我不知道我做错了什么,或者是不是真的可以进行全面扫描.
今天我有近2 TB的数据,在一个七个节点的集群中只有一个表.
有人已经处于这种情况或有一些建议吗?
绝对有可能在Cassandra中进行全表扫描 - 事实上,它对于像Spark这样的事情来说很常见.然而,它通常不是"快速",所以除非你知道你为什么这样做,否则它是气馁的.对于您的实际问题:
1)如果您使用的是CQL,那么您几乎肯定会使用Murmur3分区程序,因此您的最小标记为-9223372036854775808(最大标记为9223372036854775808).
2)你正在使用session.execute(),它将使用一个默认的一致性,这可能不会返回你的集群中的所有结果,特别是如果你也在ONE编写,我怀疑你可能会这样.将其提升为ALL,并使用预准备语句来加速CQL解析:
public void run() {
Cluster cluster = getConnection();
Session session = cluster.connect("db");
LOGGER.info("Starting ...");
actualToken = -9223372036854775808;
boolean run = true;
int print = 0;
while ( run ) {
if (maxTokenReached(actualToken)) {
LOGGER.info("Max Token Reached!");
break;
}
SimpleStatement stmt = new SimpleStatement(queryBuilder(actualToken));
stmt.setConsistencyLevel(ConsistencyLevel.ALL);
ResultSet resultSet = session.execute(stmt);
Iterator<Row> rows = resultSet.iterator();
if ( !rows.hasNext()){
break;
}
List<String> rowIds = new ArrayList<String>();
while (rows.hasNext()) {
Row row = rows.next();
Long leadTime = row.getLong("my_column");
if (myCondition(myCollumn)) {
String rowId = row.getString("key");
rowIds.add(rowId);
}
if (!rows.hasNext()) {
Long token = row.getLong("token(rowid)");
if (!rowIds.isEmpty()) {
LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
}
actualToken = nextToken(token);
}
}
}
LOGGER.info("Done!");
cluster.shutdown();
}
public boolean maxTokenReached(Long actualToken){
return actualToken >= maxToken;
}
public String queryBuilder(Long nextRange) {
return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString());
}
public Long nextToken(Long token) {
return token + 1;
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5180 次 |
| 最近记录: |