在使用 Kafka 进行 Spark 流式传输的情况下,spark.streaming.kafka.maxRatePerPartition 与 Spark.streaming.backPressure.enabled 有何关系?

Met*_*ata 5 apache-kafka apache-spark spark-streaming pyspark spark-structured-streaming

在读取如下的配置单元表后,我尝试将数据写入 Kafka 主题。

\n
write_kafka_data.py:\nread_df = spark.sql("select * from db.table where some_column in (\'ASIA\', \'Europe\')")\nfinal_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))\n\nfinal_df.write.format("kafka")\\\n        .option("kafka.bootstrap.servers", kafka_broker)\\\n        .option("kafka.batch.size", 51200)\\\n        .option("retries", 3)\\\n        .option("kafka.max.request.size", 500000)\\\n        .option("kafka.max.block.ms", 120000)\\\n        .option("kafka.metadata.max.age.ms", 120000)\\\n        .option("kafka.request.timeout.ms", 120000)\\\n        .option("kafka.linger.ms", 0)\\\n        .option("kafka.delivery.timeout.ms", 130000)\\\n        .option("acks", "1")\\\n        .option("kafka.compression.type", "snappy")\\\n        .option("kafka.security.protocol", "SASL_SSL")\\\n        .option("kafka.sasl.jaas.config", oauth_config)\\\n        .option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\\\n        .option("kafka.sasl.mechanism", "OAUTHBEARER")\\\n        .option("topic", \'topic_name\')\\\n        .save()\n
Run Code Online (Sandbox Code Playgroud)\n

成功写入后(记录数为29000),我正在另一个文件中读取来自同一主题的数据,如下所示:\nread_kafka_data.py:

\n
    # SCHEMA\n    schema = StructType([StructField("col1", StringType()),\n            StructField("col2", IntegerType())\n    ])\n\n    # READ FROM TOPIC\n    jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" \\\n                          + " oauth.token.endpoint.uri=" + \'"\' + "uri" + \'"\' \\\n                          + " oauth.client.id=" + \'"\' + "client_id" + \'"\' \\\n                          + " oauth.client.secret=" + \'"\' + "secret_key" + \'" ;\'\n\n    stream_df = spark.readStream \\\n            .format(\'kafka\') \\\n            .option(\'kafka.bootstrap.servers\', kafka_broker) \\\n            .option(\'subscribe\', \'topic_name\') \\\n            .option(\'kafka.security.protocol\', \'SASL_SSL\') \\\n            .option(\'kafka.sasl.mechanism\', \'OAUTHBEARER\') \\\n            .option(\'kafka.sasl.jaas.config\', jass_config) \\\n            .option(\'kafka.sasl.login.callback.handler.class\', "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler") \\\n            .option(\'startingOffsets\', \'latest\') \\\n            .option(\'group.id\', \'group_id\') \\\n            .option(\'maxOffsetsPerTrigger\', 200) \\\n            .option(\'fetchOffset.retryIntervalMs\', 200) \\\n            .option(\'fetchOffset.numRetries\', 3) \\\n            .load()\\\n            .select(from_json(col(\'value\').cast(\'string\'), schema).alias("json_dta")).selectExpr(\'json_dta.*\')\n\n    stream_df.writeStream.outputMode(\'append\')\n    .format(HiveWarehouseSession.STREAM_TO_STREAM)\n      .option("database", "database_name")\n      .option("table", "table_name")\n      .option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))\n      .option("checkpointLocation", "/path/to/checkpoint/dir")\n      .start().awaitTermination()\n
Run Code Online (Sandbox Code Playgroud)\n

我是 Kafka 的初学者,一直在阅读 Kafka 性能优化技术并遇到了这两个。

\n
\n

spark.streaming.backpressure.enabledspark.streaming.kafka.maxRatePerPartition

\n
\n

要启用第一个参数:

\n
sparkConf.set("spark.streaming.backpressure.enabled",\xe2\x80\x9dtrue\xe2\x80\x9d)\n
Run Code Online (Sandbox Code Playgroud)\n

官方文档中对上述参数的解释为:

\n
\n

启用或禁用 Spark Streaming 的内部反压机制(自 1.5 起)。这使得 Spark Streaming 能够根据当前批量调度延迟和处理时间来控制\n接收速率,以便系统仅按照系统能够处理的速度进行接收。\n在内部,这会动态设置\n接收器的最大接收速率。该速率的上限由值\nspark.streaming.receiver.maxRate和\n决定spark.streaming.kafka.maxRatePerPartition

\n
\n

现在我是第一次运行应用程序并且没有以前的微批次,我应该指定一些值:spark.streaming.backpressure.initialRate

\n

如果是的话,应该如何确定 的值spark.streaming.backpressure.initialRate。\n文档中还说,如果spark.streaming.backpressure.enabled设置为true最大接收速率,是动态设置的。\n如果是这样,还需要配置吗:\nspark.streaming.receiver.maxRate并且spark.streaming.kafka.maxRatePerPartition\nifspark.streaming.backpressure.enabled设置到true

\n

这个链接spark.streaming.backpressure.initialRate说施加背压时不会影响使用。

\n

任何有助于消除混乱的帮助将不胜感激。

\n

mik*_*ike 3

您所指的配置spark.streaming.[...]属于Direct Streaming(又名 Spark Streaming),而不属于Structured Streaming

如果您不知道其中的区别,我建议您查看单独的编程指南:

结构化流不提供背压机制。当您从 Kafka 消费时,您可以使用(正如您已经在做的那样)该选项maxOffsetsPerTrigger来设置每个触发器上读取消息的限制。此选项在《结构化流媒体和 Kafka 集成指南》中记录为:

“每个触发间隔处理的最大偏移量的速率限制。指定的总偏移量将按比例分配到不同卷的 topicPartitions 上。”


如果您仍然对标题问题感兴趣

与 Kafka 的 Spark Streaming有何spark.streaming.kafka.maxRatePerPartition关系?spark.streaming.backpressure.enabled

Spark 的配置文档中解释了这种关系:

“启用或禁用Spark Streaming的内部反压机制(从1.5开始)。这使得Spark Streaming可以根据当前批量调度延迟和处理时间来控制接收速率,以便系统接收的速度与系统可以处理的速度一样快。在内部,这会动态设置接收器的最大接收速率。该速率的上限取决于这些值spark.streaming.receiver.maxRate以及spark.streaming.kafka.maxRatePerPartition它们是否已设置(见下文)。

Spark Streaming(DStream,而不是结构化流)中可用的反压机制的所有详细信息都在您已链接的启用反压以使您的 Spark Streaming 应用程序生产做好准备的博客中进行了解释。

通常,如果启用背压,您将设置spark.streaming.kafka.maxRatePerPartition为最佳估计速率的 150% ~ 200%。

PID控制器的精确计算可以在PIDRateEstimator类中的代码中找到。

Spark Streaming 的背压示例

正如您要求的一个例子,这是我在我的一个高效应用程序中所做的一个例子:

设置

  • Kafka主题有16个分区
  • Spark运行有16个工作核心,因此每个分区都可以并行使用
  • 使用 Spark Streaming(非结构化流)
  • 批次间隔为10秒
  • spark.streaming.backpressure.enabled设置为真
  • spark.streaming.kafka.maxRatePerPartition设置为 10000
  • spark.streaming.backpressure.pid.minRate保持默认值 100
  • 该作业每个分区每秒可以处理大约 5000 条消息
  • 在开始流作业之前,Kafka 主题在每个分区中包含数百万条消息

观察

  • 在第一批中,流作业获取 16000(= 10 秒 * 16 个分区 * 100 pid.minRate)消息。
  • 该作业处理这 16000 条消息的速度相当快,因此 PID 控制器会估计大于 maxRatePerPartition 10000 的最佳速率。
  • 因此,在第二批中,流作业获取 1600000(= 10 秒 * 16 个分区 * 10000 maxRatePerPartition)条消息。
  • 现在,第二批完成大约需要 22 秒
  • 因为我们的批处理间隔设置为 10 秒,所以 10 秒后,流处理作业已经再次调度了第三个微批次,数量为 1600000。原因是 PID 控制器只能使用已完成微批次的性能信息。
  • 仅在第六或第七个微批次中,PID 控制器才能找到每个分区每秒大约 5000 条消息的最佳处理速率。