dmc*_*lis 7 hadoop hbase scala apache-spark
假设我们可以直接从HDFS而不是使用HBase API来更快地访问数据,我们正在尝试基于HBase的表快照构建RDD.
所以,我有一个名为"dm_test_snap"的快照.我似乎能够使大多数配置工作正常,但我的RDD为空(尽管Snapshot本身存在数据).
我有一段时间找到一个使用Spark对HBase快照进行离线分析的人的例子,但我不敢相信我一个人试图让这个工作起来.非常感谢任何帮助或建议.
这是我的代码片段:
object TestSnap {
def main(args: Array[String]) {
val config = ConfigFactory.load()
val hbaseRootDir = config.getString("hbase.rootdir")
val sparkConf = new SparkConf()
.setAppName("testnsnap")
.setMaster(config.getString("spark.app.master"))
.setJars(SparkContext.jarOfObject(this))
.set("spark.executor.memory", "2g")
.set("spark.default.parallelism", "160")
val sc = new SparkContext(sparkConf)
println("Creating hbase configuration")
val conf = HBaseConfiguration.create()
conf.set("hbase.rootdir", hbaseRootDir)
conf.set("hbase.zookeeper.quorum", config.getString("hbase.zookeeper.quorum"))
conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")
val scan = new Scan
val job = Job.getInstance(conf)
TableSnapshotInputFormat.setInput(job, "dm_test_snap",
new Path("hdfs://nameservice1/tmp"))
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.count()
System.exit(0)
}
}
Run Code Online (Sandbox Code Playgroud)
更新以包含解决方案 诀窍是,正如@Holden在下面提到的那样,conf没有通过.为了解决这个问题,我能够通过将对newAPIHadoopRDD的调用更改为:
val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
Run Code Online (Sandbox Code Playgroud)
第二个问题也被@ victor的回答强调,我没有通过扫描.为了解决这个问题,我添加了这一行和方法:
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
def convertScanToString(scan : Scan) = {
val proto = ProtobufUtil.toScan(scan);
Base64.encodeBytes(proto.toByteArray());
}
Run Code Online (Sandbox Code Playgroud)
这也让我从conf.set命令中提取这一行:
conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")
Run Code Online (Sandbox Code Playgroud)
*注意:这是针对CDH5.0上的HBase版本0.96.1.1
最终完整代码,以便于参考:
object TestSnap {
def main(args: Array[String]) {
val config = ConfigFactory.load()
val hbaseRootDir = config.getString("hbase.rootdir")
val sparkConf = new SparkConf()
.setAppName("testnsnap")
.setMaster(config.getString("spark.app.master"))
.setJars(SparkContext.jarOfObject(this))
.set("spark.executor.memory", "2g")
.set("spark.default.parallelism", "160")
val sc = new SparkContext(sparkConf)
println("Creating hbase configuration")
val conf = HBaseConfiguration.create()
conf.set("hbase.rootdir", hbaseRootDir)
conf.set("hbase.zookeeper.quorum", config.getString("hbase.zookeeper.quorum"))
conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
val scan = new Scan
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val job = Job.getInstance(conf)
TableSnapshotInputFormat.setInput(job, "dm_test_snap",
new Path("hdfs://nameservice1/tmp"))
val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.count()
System.exit(0)
}
def convertScanToString(scan : Scan) = {
val proto = ProtobufUtil.toScan(scan);
Base64.encodeBytes(proto.toByteArray());
}
}
Run Code Online (Sandbox Code Playgroud)
查看作业信息,它会复制您提供给它的 conf 对象 ( The Job makes a copy of the Configuration so that any necessary internal modifications do not reflect on the incoming parameter.
),因此您需要在 conf 对象上设置的信息很可能不会传递到 Spark。您可以使用TableSnapshotInputFormatImpl
它具有适用于 conf 对象的类似方法。可能还需要其他东西,但首先解决这个问题,这似乎是最可能的原因。
正如评论中指出的,另一种选择是用于job.getConfiguration
从作业对象获取更新的配置。
归档时间: |
|
查看次数: |
1879 次 |
最近记录: |