And*_*rea 9 hbase scala apache-spark
我正在尝试使用Spark来处理来自HBase表的数据.此博客文章提供了如何使用NewHadoopAPI
从任何Hadoop读取数据的示例InputFormat
.
我做了什么
由于我需要多次执行此操作,因此我尝试使用implicits进行丰富SparkContext
,以便我可以从HBase中的给定列集中获取RDD.我写了以下帮助:
trait HBaseReadSupport {
implicit def toHBaseSC(sc: SparkContext) = new HBaseSC(sc)
implicit def bytes2string(bytes: Array[Byte]) = new String(bytes)
}
final class HBaseSC(sc: SparkContext) extends Serializable {
def extract[A](data: Map[String, List[String]], result: Result, interpret: Array[Byte] => A) =
data map { case (cf, columns) =>
val content = columns map { column =>
val cell = result.getColumnLatestCell(cf.getBytes, column.getBytes)
column -> interpret(CellUtil.cloneValue(cell))
} toMap
cf -> content
}
def makeConf(table: String) = {
val conf = HBaseConfiguration.create()
conf.setBoolean("hbase.cluster.distributed", true)
conf.setInt("hbase.client.scanner.caching", 10000)
conf.set(TableInputFormat.INPUT_TABLE, table)
conf
}
def hbase[A](table: String, data: Map[String, List[String]])
(interpret: Array[Byte] => A) =
sc.newAPIHadoopRDD(makeConf(table), classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result]) map { case (key, row) =>
Bytes.toString(key.get) -> extract(data, row, interpret)
}
}
Run Code Online (Sandbox Code Playgroud)
它可以像
val rdd = sc.hbase[String](table, Map(
"cf" -> List("col1", "col2")
))
Run Code Online (Sandbox Code Playgroud)
在这种情况下,我们得到一个RDD (String, Map[String, Map[String, String]])
,其中第一个组件是rowkey,第二个是一个映射,其键是列族,值是映射,其键是列,其内容是单元格值.
哪里失败了
不幸的是,似乎我的工作得到了一个参考sc
,它本身不是设计可序列化的.我在工作时得到的是
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
Run Code Online (Sandbox Code Playgroud)
我可以删除帮助程序类并在我的工作中使用相同的内联逻辑,一切运行正常.但我希望得到一些我可以重复使用的东西,而不是一遍又一遍地编写相同的样板.
顺便说一下,问题并不是特定于隐含的,即使使用sc
展示相同问题的功能.
为了比较,下面的帮助程序来读取TSV文件(我知道它已经坏了,因为它不支持引用等等,没关系)似乎工作正常:
trait TsvReadSupport {
implicit def toTsvRDD(sc: SparkContext) = new TsvRDD(sc)
}
final class TsvRDD(val sc: SparkContext) extends Serializable {
def tsv(path: String, fields: Seq[String], separator: Char = '\t') = sc.textFile(path) map { line =>
val contents = line.split(separator).toList
(fields, contents).zipped.toMap
}
}
Run Code Online (Sandbox Code Playgroud)
如何封装逻辑以从HBase读取行而不会无意中捕获SparkContext?
Wil*_*ire 13
只需@transient
向sc
变量添加注释:
final class HBaseSC(@transient val sc: SparkContext) extends Serializable {
...
}
Run Code Online (Sandbox Code Playgroud)
并确保sc
不在extract
功能范围内使用,因为它不适用于工人.
如果需要从分布式计算中访问Spark上下文,rdd.context
可能会使用以下函数:
val rdd = sc.newAPIHadoopRDD(...)
rdd map {
case (k, v) =>
val ctx = rdd.context
....
}
Run Code Online (Sandbox Code Playgroud)