use*_*663 7 hbase scala apache-spark
我正在尝试使用Spark 1.0在HBase(0.96.0-hadoop2)中编写一些简单数据,但我不断遇到序列化问题.这是相关代码:
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext
import java.util.Properties
import java.io.FileInputStream
import org.apache.hadoop.hbase.client.Put
object PutRawDataIntoHbase{
def main(args: Array[String]): Unit = {
var propFileName = "hbaseConfig.properties"
if(args.size > 0){
propFileName = args(0)
}
/** Load properties here **/
val theData = sc.textFile(prop.getProperty("hbase.input.filename"))
.map(l => l.split("\t"))
.map(a => Array("%010d".format(a(9).toInt)+ "-" + a(0) , a(1)))
val tableName = prop.getProperty("hbase.table.name")
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.rootdir", prop.getProperty("hbase.rootdir"))
hbaseConf.addResource(prop.getProperty("hbase.site.xml"))
val myTable = new HTable(hbaseConf, tableName)
theData.foreach(a=>{
var p = new Put(Bytes.toBytes(a(0)))
p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
myTable.put(p)
})
}
}
Run Code Online (Sandbox Code Playgroud)
运行代码会导致:
Failed to run foreach at putDataIntoHBase.scala:79
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException:org.apache.hadoop.hbase.client.HTable
Run Code Online (Sandbox Code Playgroud)
用map替换foreach不会崩溃,但我也不会写.任何帮助将不胜感激.
Wil*_*ire 21
该类HBaseConfiguration
表示与HBase服务器的连接池.显然,它无法序列化并发送到工作节点.由于HTable
使用此池与HBase服务器通信,因此无法对其进行序列化.
基本上,有三种方法可以解决这个问题:
注意使用foreachPartition
方法:
val tableName = prop.getProperty("hbase.table.name")
<......>
theData.foreachPartition { iter =>
val hbaseConf = HBaseConfiguration.create()
<... configure HBase ...>
val myTable = new HTable(hbaseConf, tableName)
iter.foreach { a =>
var p = new Put(Bytes.toBytes(a(0)))
p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
myTable.put(p)
}
}
Run Code Online (Sandbox Code Playgroud)
请注意,每个工作节点必须能够访问HBase服务器,并且必须具有预先安装或通过提供的必需jar ADD_JARS
.
另请注意,由于为每个分区打开了连接池,因此最好将分区数大致减少到工作节点数(带coalesce
功能).也可以HTable
在每个工作节点上共享一个实例,但这并不是那么简单.
即使数据不适合内存,也可以使用单台计算机从RDD写入所有数据.详细信息在以下答案中进行了解释:Spark:从RDD检索大数据到本地计算机的最佳实践
当然,它比分布式写入慢,但它很简单,不会带来痛苦的序列化问题,如果数据大小合理,可能是最好的方法.
可以为HBase创建自定义HadoopOutputFormat或使用现有的HadoopOutputFormat.我不确定是否存在符合您需求的内容,但Google应该在此提供帮助.
PS顺便说一下,map
调用不会崩溃,因为它没有得到评估:在你调用带副作用的函数之前,不会评估RDD.例如,如果你打电话theData.map(....).persist
,它也会崩溃.
归档时间: |
|
查看次数: |
9559 次 |
最近记录: |