小编Eri*_*c C的帖子

使用结构化 Spark Streaming 在 HBase 中批量插入数据

我正在使用结构化 Spark Streaming 读取来自 Kafka(每秒 100.000 行)的数据,并且我正在尝试将所有数据插入 HBase。

我在 Cloudera Hadoop 2.6 中使用 Spark 2.3

我尝试了类似我在这里看到的东西

eventhubs.writeStream
 .foreach(new MyHBaseWriter[Row])
 .option("checkpointLocation", checkpointDir)
 .start()
 .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

MyHBaseWriter 看起来像这样:

class AtomeHBaseWriter[RECORD] extends HBaseForeachWriter[Row] {
  override def toPut(record: Row): Put = {
    override val tableName: String = "hbase-table-name"

    override def toPut(record: Row): Put = {
        // Get Json
        val data = JSON.parseFull(record.getString(0)).asInstanceOf[Some[Map[String, Object]]]
        val key = data.getOrElse(Map())("key")+ ""
        val val = data.getOrElse(Map())("val")+ ""

        val p = new Put(Bytes.toBytes(key))
        //Add columns ... …
Run Code Online (Sandbox Code Playgroud)

hbase scala bulkinsert apache-spark spark-streaming

4
推荐指数
1
解决办法
1314
查看次数

标签 统计

apache-spark ×1

bulkinsert ×1

hbase ×1

scala ×1

spark-streaming ×1