我正在使用结构化 Spark Streaming 读取来自 Kafka(每秒 100.000 行)的数据,并且我正在尝试将所有数据插入 HBase。
我在 Cloudera Hadoop 2.6 中使用 Spark 2.3
eventhubs.writeStream
.foreach(new MyHBaseWriter[Row])
.option("checkpointLocation", checkpointDir)
.start()
.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
MyHBaseWriter 看起来像这样:
class AtomeHBaseWriter[RECORD] extends HBaseForeachWriter[Row] {
override def toPut(record: Row): Put = {
override val tableName: String = "hbase-table-name"
override def toPut(record: Row): Put = {
// Get Json
val data = JSON.parseFull(record.getString(0)).asInstanceOf[Some[Map[String, Object]]]
val key = data.getOrElse(Map())("key")+ ""
val val = data.getOrElse(Map())("val")+ ""
val p = new Put(Bytes.toBytes(key))
//Add columns ... …Run Code Online (Sandbox Code Playgroud)