使用结构化 Spark Streaming 在 HBase 中批量插入数据

Eri*_*c C 4 hbase scala bulkinsert apache-spark spark-streaming

我正在使用结构化 Spark Streaming 读取来自 Kafka(每秒 100.000 行)的数据,并且我正在尝试将所有数据插入 HBase。

我在 Cloudera Hadoop 2.6 中使用 Spark 2.3

我尝试了类似我在这里看到的东西

eventhubs.writeStream
 .foreach(new MyHBaseWriter[Row])
 .option("checkpointLocation", checkpointDir)
 .start()
 .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

MyHBaseWriter 看起来像这样:

class AtomeHBaseWriter[RECORD] extends HBaseForeachWriter[Row] {
  override def toPut(record: Row): Put = {
    override val tableName: String = "hbase-table-name"

    override def toPut(record: Row): Put = {
        // Get Json
        val data = JSON.parseFull(record.getString(0)).asInstanceOf[Some[Map[String, Object]]]
        val key = data.getOrElse(Map())("key")+ ""
        val val = data.getOrElse(Map())("val")+ ""

        val p = new Put(Bytes.toBytes(key))
        //Add columns ... 
        p.addColumn(Bytes.toBytes(columnFamaliyName),Bytes.toBytes(columnName), Bytes.toBytes(val))

        p
     }
    }
Run Code Online (Sandbox Code Playgroud)

HBaseForeachWriter 类如下所示:

trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {
  val tableName: String

  def pool: Option[ExecutorService] = None

  def user: Option[User] = None

  private var hTable: Table = _
  private var connection: Connection = _


  override def open(partitionId: Long, version: Long): Boolean = {
    connection = createConnection()
    hTable = getHTable(connection)
    true
  }

  def createConnection(): Connection = {
    // I create HBase Connection Here
  }

  def getHTable(connection: Connection): Table = {
    connection.getTable(TableName.valueOf(Variables.getTableName()))
  }

  override def process(record: RECORD): Unit = {
    val put = toPut(record)
    hTable.put(put)
  }

  override def close(errorOrNull: Throwable): Unit = {
    hTable.close()
    connection.close()
  }

  def toPut(record: RECORD): Put
}
Run Code Online (Sandbox Code Playgroud)

因此,我在这里逐行执行放置,即使我允许每个执行程序有 20 个执行程序和 4 个内核,我也没有将数据立即插入 HBase 中。所以我需要做的是批量加载我很挣扎,因为我在互联网上找到的所有内容都是通过 RDD 和 Map/Reduce 来实现的。

Ram*_*ram 5

我的理解是记录摄取到 hbase 的速度很慢。我给你的建议很少。

1) hbase.client.write.buffer r
以下属性可能对您有所帮助。

hbase.client.write.buffer
Run Code Online (Sandbox Code Playgroud)

说明BufferedMutator 写入缓冲区的默认大小(以字节为单位)。更大的缓冲区需要更多的内存? - 在客户端和服务器端,因为服务器实例化传递的写入缓冲区来处理它? - 但更大的缓冲区大小减少了 RPC 的数量。对于服务器端内存使用的估计,评估 hbase.client.write.buffer * hbase.regionserver.handler.count

默认 2097152(大约 2 mb)

我更喜欢foreachBatch看火花文档(它在火花核心中的那种 foreachPartition)而不是foreach

同样在您的 hbase 编写器中扩展 ForeachWriter

open方法初始化放入的数组列表将process放入的放入数组列表中close table.put(listofputs);,然后在更新表后重置...

它所做的基本上是您上面提到的缓冲区大小填充了 2 mb,然后它将刷新到 hbase 表中。在此之前,记录不会进入 hbase 表。

您可以将其增加到 10mb 等等...。这样一来,RPC 的数量就会减少。大量数据将被刷新,并将在 hbase 表中。

当写缓冲区被填满并flushCommits触发到 hbase 表时。

示例代码:在我的回答中

2)关闭WAL, 您可以关闭WAL(提前写入日志-危险无法恢复)但它会加快写入速度...如果不想恢复数据。

注意:如果您在 hbase 表上使用 solr 或 cloudera 搜索,则不应将其关闭,因为 Solr 将在 WAL 上工作。如果您当时关闭它,Solr 索引将无法工作。这是我们许多人经常犯的一个错误。

如何关闭: https ://hbase.apache.org/1.1/apidocs/org/apache/hadoop/hbase/client/Put.html#setWriteToWAL(boolean )

正如我提到的那样,放置列表是个好方法......这是在结构化流示例如下之前所做的旧方法(foreachPartition 与放置列表),如下所示......其中foreachPartition为每个分区而不是每一行操作。

def writeHbase(mydataframe: DataFrame) = {
      val columnFamilyName: String = "c"
      mydataframe.foreachPartition(rows => {
        val puts = new util.ArrayList[ Put ]
        rows.foreach(row => {
          val key = row.getAs[ String ]("rowKey")
          val p = new Put(Bytes.toBytes(key))
          val columnV = row.getAs[ Double ]("x")
          val columnT = row.getAs[ Long ]("y")
          p.addColumn(
            Bytes.toBytes(columnFamilyName),
            Bytes.toBytes("x"),
            Bytes.toBytes(columnX)
          )
          p.addColumn(
            Bytes.toBytes(columnFamilyName),
            Bytes.toBytes("y"),
            Bytes.toBytes(columnY)
          )
          puts.add(p)
        })
        HBaseUtil.putRows(hbaseZookeeperQuorum, hbaseTableName, puts)
      })
    }
Run Code Online (Sandbox Code Playgroud)

总结 :

我的感觉是我们需要了解 spark 和 hbase 的心理,才能形成有效的配对。