使用Spark Streaming处理Kafka消息时遇到的挑战

sco*_*pio 1 bigdata apache-kafka apache-spark spark-streaming

我想实时处理在Web服务器上报告的消息。Web服务器上报告的消息属于不同的会话,我想进行一些会话级别的聚合。为此,我计划使用Kafka的Spark Streaming前端。甚至在我开始之前,我就列出了该体系结构将面临的一些挑战。熟悉这个生态系统的人可以帮助我解决以下问题:

  1. 如果每个Kafka消息都属于一个特定的会话,那么如何管理会话亲缘关系,以便同一Spark执行程序可以查看链接到会话的所有消息?
  2. 如何确保属于会话的消息由Spark执行程序按在Kafka上报告的顺序进行处理?我们能以某种方式实现这一目标而又不限制线程数和增加处理开销(如按消息时间戳排序)吗?
  3. 什么时候检查点会话状态?如果执行程序节点崩溃,如何从最后一个检查点恢复状态?如果驱动程序节点崩溃,如何从最后一个检查点恢复状态?
  4. 如果节点(执行程序/驱动程序)在检查点状态之前崩溃,如何恢复状态?如果Spark通过重播消息来重新创建RDD状态,那么它从哪里开始从以下位置开始重播Kafka消息:病房的最后一个检查点,或者它处理重新创建分区所需的所有消息?Spark Streaming是否可以在多个Spark Streaming批次之间或仅在当前批次中恢复状态,即,如果在上一个批次中未执行检查点,则可以恢复状态吗?

Sön*_*bau 5

如果每个Kafka消息都属于一个特定的会话,那么如何管理会话亲缘关系,以便同一Spark执行程序可以查看链接到会话的所有消息?

Kafka将主题划分为多个分区,每个分区一次只能由一个使用者读取,因此您需要确保属于一个会话的所有消息都进入同一分区。分区分配是通过分配给每条消息的密钥来控制的,因此实现此目的的最简单方法可能是在发送数据时使用会话ID作为密钥。这样,同一使用者将获得一个会话的所有消息。但有一个警告:当消费者加入或离开消费者组时,Kafka将重新分配分配给消费者的分区。如果这种情况发生在会话中间,则可能(并且将会)发生,在重新平衡之后,该会话的一半消息发送给一个使用者,另一半消息发送给另一个使用者。为避免这种情况,您 需要在代码中手动订阅特定的分区,以便每个处理器都有其特定的分区集,并且不会更改这些分区。看一下ConsumerStrategies。为此,在SparkKafka组件代码中分配。


如何确保属于会话的消息由Spark执行程序按在Kafka上报告的顺序进行处理?我们能以某种方式实现这一目标而又不限制线程数和增加处理开销(如按消息时间戳排序)吗?

Kafka会保留每个分区的顺序,因此您无需在此做太多事情。唯一的办法是避免同时从生产者向代理发出多个请求,您可以通过生产者参数max.in.flight.requests.per.connection进行配置。只要您将此设置为1,如果我正确理解您的设置,就应该安全。


什么时候检查点会话状态?如果执行程序节点崩溃,如何从最后一个检查点恢复状态?如果驱动程序节点崩溃,如何从最后一个检查点恢复状态?

我建议阅读《Spark Streaming + Kafka集成指南》的偏移量存储部分,该部分应该已经回答了很多问题。

简短的版本是,您可以将最后一次读取的偏移量保留到Kafka中,并且绝对应在检查执行者时执行此操作。这样,无论何时新的执行者接受处理,无论它是否从检查点恢复,它都会知道在Kafka中从哪里读取。


如果节点(执行程序/驱动程序)在检查点状态之前崩溃,如何恢复状态?如果Spark通过重播消息来重新创建RDD状态,那么它从哪里开始从以下位置开始重播Kafka消息:病房的最后一个检查点,或者它处理重新创建分区所需的所有消息?Spark Streaming是否可以在多个Spark Streaming批次之间或仅在当前批次中恢复状态,即,如果在上一个批次中未执行检查点,则可以恢复状态吗?

我在这里的Spark知识有点不稳定,但是我想说这不是Kafka / Spark完成的工作,而是您需要积极影响代码的工作。默认情况下,如果启动新的Kafka Stream并没有找到以前提交的偏移量,它将仅从主题末尾开始读取,因此它将获得使用者启动后产生的任何消息。如果需要恢复状态,则需要知道要从哪个确切偏移量开始重新读取消息,或者只是从头开始读取。分配分区时,可以将偏移量读取到上述.Assign()方法中。

我希望这会有所帮助,我确信这绝不是所有问题的完整答案,但是这是一个相当广泛的工作领域,请告诉我是否可以提供进一步的帮助。