Spark Structured Streaming ForeachWriter和数据库性能

Exi*_*xie 12 database scala jdbc apache-spark spark-structured-streaming

我已经实现了像这样的结构化流...

myDataSet
  .map(r =>  StatementWrapper.Transform(r))
  .writeStream
  .foreach(MyWrapper.myWriter)
  .start()
  .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

这一切似乎都有效,但看看MyWrapper.myWriter的吞吐量是可怕的.它有效地尝试成为JDBC接收器,它看起来像:

val myWriter: ForeachWriter[Seq[String]] = new ForeachWriter[Seq[String]] {

  var connection: Connection = _

  override def open(partitionId: Long, version: Long): Boolean = {
    Try (connection = getRemoteConnection).isSuccess
  }

  override def process(row: Seq[String]) {
    val statement = connection.createStatement()
    try {
      row.foreach( s => statement.execute(s) )
    } catch {
      case e: SQLSyntaxErrorException => println(e)
      case e: SQLException => println(e)
    } finally {
      statement.closeOnCompletion()
    }
  }

  override def close(errorOrNull: Throwable) {
    connection.close()
  }
}
Run Code Online (Sandbox Code Playgroud)

所以我的问题是 - 新的ForeachWriter是否为每一行实例化?因此,对数据集中的每一行调用open()和close()?

是否有更好的设计来提高吞吐量?

如何解析SQL语句一次并执行多次,同时保持数据库连接打开?

Yuv*_*kov 11

底层水槽的打开和关闭取决于您的实现ForeachWriter.

调用的相关类ForeachWriterForeachSink,这是调用你的编写器的代码:

data.queryExecution.toRdd.foreachPartition { iter =>
  if (writer.open(TaskContext.getPartitionId(), batchId)) {
    try {
      while (iter.hasNext) {
        writer.process(encoder.fromRow(iter.next()))
      }
    } catch {
      case e: Throwable =>
        writer.close(e)
        throw e
    }
    writer.close(null)
  } else {
    writer.close(null)
  }
}
Run Code Online (Sandbox Code Playgroud)

尝试打开和关闭作者,从源生成的foreach批处理.如果您希望open并且close每次都打开并关闭接收器驱动程序,则需要通过实现来实现.

如果您想要更好地控制数据的处理方式,可以实现Sink提供批处理ID和基础的特征DataFrame:

trait Sink {
  def addBatch(batchId: Long, data: DataFrame): Unit
}
Run Code Online (Sandbox Code Playgroud)