为什么Kafka Direct Stream会为每条消息创建一个新的解码器?

sco*_*pio 6 java kryo apache-kafka apache-spark spark-streaming

我有一个用Java编写并使用Spark 2.1的Spark流媒体应用程序.我正在使用KafkaUtils.createDirectStream来自Kafka的消息.我正在使用kryo编码器/解码器用于kafka消息.我在Kafka properties-> key.deserializer,value.deserializer,key.serializer,value.deserializer中指定了这一点.

当Spark以微批处理方式提取消息时,使用kryo解码器成功解码消息.但是我注意到Spark执行器创建了一个kryo解码器的新实例,用于解码从kafka读取的每条消息.我通过将日志放入解码器构造函数中

来检查这个.这对我来说似乎很奇怪.不应该为每个消息和每个批次使用相同的解码器实例吗?

我在卡夫卡读书的代码:

JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams));

JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> {
    return new Tuple2<String, Class1>(consRecord.key(), consRecord.value());
});
Run Code Online (Sandbox Code Playgroud)

Yuv*_*kov 3

如果我们想了解 Spark 如何在内部从 Kafka 获取数据,我们需要查看KafkaRDD.compute,这是一个为 every 实现的方法RDD,它告诉框架如何计算RDD

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
  val part = thePart.asInstanceOf[KafkaRDDPartition]
  assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
  if (part.fromOffset == part.untilOffset) {
    logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
    s"skipping ${part.topic} ${part.partition}")
    Iterator.empty
  } else {
    new KafkaRDDIterator(part, context)
  }
}
Run Code Online (Sandbox Code Playgroud)

这里重要的是else子句,它创建了一个KafkaRDDIterator. 这内部有:

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
  .newInstance(kc.config.props)
  .asInstanceOf[Decoder[K]]

val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
  .newInstance(kc.config.props)
  .asInstanceOf[Decoder[V]]
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,它通过反射为每个底层分区创建键解码器和值解码器的实例。这意味着它不是按消息生成,而是按 Kafka 分区生成

为什么要这样实现呢?我不知道。我假设是因为与 Spark 内部发生的所有其他分配相比,键和值解码器的性能影响应该可以忽略不计。

如果您已经分析了您的应用程序并发现这是分配热路径,则可以提出问题。不然我也不会担心这个问题。