Spark DStream定期调用saveAsObjectFile使用transform不能按预期工作

Dyi*_*yin 8 streaming hdfs apache-kafka apache-spark

我使用DirectKafkaStreamAPI 1从Kafka读取数据,进行一些转换,更新计数然后将数据写回Kafka.实际上,这种代码的和平正在考验中:

kafkaStream[Key, Value]("test")
      .map(record => (record.key(), 1))
      .updateStateByKey[Int](
        (numbers: Seq[Int], state: Option[Int]) =>
          state match {
            case Some(s) => Some(s + numbers.length)
            case _ => Some(numbers.length)
          }
      )
      .checkpoint(this)("count") {
        case (save: (Key, Int), current: (Key, Int)) =>
          (save._1, save._2 + current._2)
      }
      .map(_._2)
      .reduce(_ + _)
      .map(count => (new Key, new Result[Long](count.toLong)))
      .toKafka(Key.Serializer.getClass.getName, Result.longKafkaSerializer.getClass.getName)
Run Code Online (Sandbox Code Playgroud)

checkpoint运营商是一个浓缩到DStream我创建的API,它要切实节省给出一个RDD DStream一个Time使用到HDFS saveAsObjectFile.实际上,它将每60个微批(RDD)的结果保存到HDFS中.

检查点执行以下操作:

def checkpoint(processor: Streaming)(name: String)(
mergeStates: (T, T) => T): DStream[T] = {
val path = processor.configuration.get[String](
  "processing.spark.streaming.checkpoint-directory-prefix") + "/" +
  Reflection.canonical(processor.getClass) + "/" + name + "/"
logInfo(s"Checkpoint base path is [$path].")

processor.registerOperator(name)

if (processor.fromCheckpoint && processor.restorationPoint.isDefined) {
  val restorePath = path + processor.restorationPoint.get.ID.stringify
  logInfo(s"Restoring from path [$restorePath].")
  checkpointData = context.objectFile[T](restorePath).cache()

  stream
    .transform((rdd: RDD[T], time: Time) => {
      val merged = rdd
        .union(checkpointData)
        .map[(Boolean, T)](record => (true, record))
        .reduceByKey(mergeStates)
        .map[T](_._2)

      processor.maybeCheckpoint(name, merged, time)

      merged
    }
  )
} else {
  stream
    .transform((rdd: RDD[T], time: Time) => {
      processor.maybeCheckpoint(name, rdd, time)

      rdd
    })
}
}
Run Code Online (Sandbox Code Playgroud)

有效的代码如下:

dstream.transform((rdd: RDD[T], time: Time) => {
      processor.maybeCheckpoint(name, rdd, time)

      rdd
    })
Run Code Online (Sandbox Code Playgroud)

其中dstream在上面的代码变量是前一个操作员,它是的结果updateStateByKey,所以变换后右称为updateStateByKey.

def maybeCheckpoint(name: String, rdd: RDD[_], time: Time) = {
  if (doCheckpoint(time)) {
    logInfo(s"Checkpointing for operator [$name] with RDD ID of [${rdd.id}].")
    val newPath = configuration.get[String](
    "processing.spark.streaming.checkpoint-directory-prefix") + "/" +
    Reflection.canonical(this.getClass) + "/" + name + "/" + checkpointBarcode
    logInfo(s"Saving new checkpoint to [$newPath].")
    rdd.saveAsObjectFile(newPath)
    registerCheckpoint(name, Operator(name), time)
    logInfo(s"Checkpoint completed for operator [$name].")
  }
}
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,大多数代码只是簿记,但saveAsObjectFile有效地称为a .

问题在于,即使生成的RDD updateStateByKey应该自动保留,当saveAsObjectFile每个第X个微批处理调用时,Spark将从流程作业的开始,从头开始重新计算所有内容,首先从Kafka再次读取所有内容.我试图在DStreams和RDD上放置并强制cachepersist使用不同级别的存储.

微批次:

微批次

工作22的DAG:

DAG工作22

DAG运行的工作saveAsObjectFile:

SAOF1 SAOF2

可能是什么问题呢?

谢谢!

1使用Spark 2.1.0.

ImD*_*enG 3

我相信使用transform定期检查点会导致意外的缓存行为。

相反,使用foreachRDD执行定期检查点将使 DAG 保持足够稳定以有效缓存 RDD。

我几乎肯定这是我们不久前遇到的类似问题的解决方案。