如何在从 Kafka 读取消息时为 Spark Structured Streaming 设置最佳配置值 - 触发时间、maxOffsetsPerTrigger?

ak0*_*817 5 apache-kafka apache-spark spark-streaming spark-structured-streaming

我有一个结构化流应用程序从 Kafka 读取消息。每天的消息总数约为 180 亿条,每分钟的峰值消息数 = 12,500,000。最大消息大小为 2 KB。

如何确保我的 Structured Streaming 应用程序能够处理如此多的数据量和速度?基本上,我只想知道如何设置最佳触发时间、maxOffsetsPerTrigger 或任何其他使工作顺利进行并能够处理故障和重新启动的配置。

San*_*shK 3

您可以以固定间隔微批次或连续方式运行 Spark 结构化流应用程序。以下是一些可用于调整流应用程序的选项。

卡夫卡配置:

Kafka中的分区数量:

您可以增加 Kafka 中的分区数量。因此,更多数量的消费者可以同时读取数据。根据输入速率和引导服务器数量将其设置为适当的数字。

Spark 流配置:

驱动程序和执行程序内存配置:

计算每个批次中的数据大小(#records * 每条消息的大小)并相应地设置内存。

执行人人数:

将执行器的数量设置为kafka主题中的分区数。这增加了并行性。同时读取数据的任务数。

限制偏移数量:

每个触发间隔处理的最大偏移量的速率限制。指定的偏移总数将按比例分配到不同卷的主题分区中。

  val df = spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "topicName")
    .option("startingOffsets", "latest")
    .option("maxOffsetsPerTrigger", "1000000")
    .load()
Run Code Online (Sandbox Code Playgroud)

通过检查点从故障中恢复:

如果发生故障或故意关闭,您可以恢复先前查询的先前进度和状态,并从中断处继续。这是通过使用检查点和预写日志来完成的。

finalDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()
Run Code Online (Sandbox Code Playgroud)

扳机:

流式查询的触发器设置定义了流式数据处理的时间,查询是作为具有固定批处理间隔的微批查询执行还是作为连续处理查询执行。

  • @SantoshK,您好,感谢您的有用回答。我可以知道我是否应该更喜欢设置适当的触发间隔(例如 2 分钟),或者设置 maxOffsetsPerTrigger。我如何找到最佳值?比如说,我每秒有 1,00,000 条消息流入 Kafka,我必须处理这些消息,如何确定最佳调整值? (2认同)