我有这段代码可以保存到 HBase HTABLE。预期的行为是该表将为每个分区推送提交或“刷新”到 hbase 的放置。
注意:这是更新的代码
rdd.foreachPartition(p => {
val table = connection.getTable(TableName.valueOf(HTABLE))
val mutator = connection.getBufferedMutator(TableName.valueOf(HTABLE))
p.foreach(row => {
val hRow = new Put(rowkey)
hRow.addColumn....
// use table.exists instead of table.checkAndPut (in favor of BufferedMutator's flushCommits)
val exists = table.exists(new Get(rowkey))
if (!exists) {
hRow.addColumn...
}
mutator.mutate(hRow)
})
table.close()
mutator.flush()
mutator.close()
})
Run Code Online (Sandbox Code Playgroud)
在 HBase 1.1 中,不推荐使用 HTable,并且 org.apache.hadoop.hbase.client.Table 中没有可用的 flushCommits()。
替换 BufferedMutator.mutate(put) 对于普通的 put 是可以的,但是 mutator 没有任何类似于 Table 的 checkAndPut。
在新的 API 中,BufferedMutator
使用了。
你可以Table t = connection.getTable(TableName.valueOf("foo"))
改为BufferedMutator t = connection.getBufferedMutator(TableName.valueOf("foo"))
. 然后t.put(p);
改为t.mutate(p);
这个对我有用!
当我搜索时,即使在官方文档中,也几乎没有相关信息。希望我的回答有帮助,有人可以更新文档。
归档时间: |
|
查看次数: |
5073 次 |
最近记录: |