Met*_*ata 5 apache-kafka apache-spark spark-streaming pyspark spark-structured-streaming
在读取如下的配置单元表后,我尝试将数据写入 Kafka 主题。
\nwrite_kafka_data.py:\nread_df = spark.sql("select * from db.table where some_column in (\'ASIA\', \'Europe\')")\nfinal_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))\n\nfinal_df.write.format("kafka")\\\n .option("kafka.bootstrap.servers", kafka_broker)\\\n .option("kafka.batch.size", 51200)\\\n .option("retries", 3)\\\n .option("kafka.max.request.size", 500000)\\\n .option("kafka.max.block.ms", 120000)\\\n .option("kafka.metadata.max.age.ms", 120000)\\\n .option("kafka.request.timeout.ms", 120000)\\\n .option("kafka.linger.ms", 0)\\\n .option("kafka.delivery.timeout.ms", 130000)\\\n .option("acks", "1")\\\n .option("kafka.compression.type", "snappy")\\\n .option("kafka.security.protocol", "SASL_SSL")\\\n .option("kafka.sasl.jaas.config", oauth_config)\\\n .option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\\\n .option("kafka.sasl.mechanism", "OAUTHBEARER")\\\n .option("topic", \'topic_name\')\\\n .save()\nRun Code Online (Sandbox Code Playgroud)\n成功写入后(记录数为29000),我正在另一个文件中读取来自同一主题的数据,如下所示:\nread_kafka_data.py:
\n # SCHEMA\n schema = StructType([StructField("col1", StringType()),\n StructField("col2", IntegerType())\n ])\n\n # READ FROM TOPIC\n jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" \\\n + " oauth.token.endpoint.uri=" + \'"\' + "uri" + \'"\' \\\n + " oauth.client.id=" + \'"\' + "client_id" + \'"\' \\\n + " oauth.client.secret=" + \'"\' + "secret_key" + \'" ;\'\n\n stream_df = spark.readStream \\\n .format(\'kafka\') \\\n .option(\'kafka.bootstrap.servers\', kafka_broker) \\\n .option(\'subscribe\', \'topic_name\') \\\n .option(\'kafka.security.protocol\', \'SASL_SSL\') \\\n .option(\'kafka.sasl.mechanism\', \'OAUTHBEARER\') \\\n .option(\'kafka.sasl.jaas.config\', jass_config) \\\n .option(\'kafka.sasl.login.callback.handler.class\', "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler") \\\n .option(\'startingOffsets\', \'latest\') \\\n .option(\'group.id\', \'group_id\') \\\n .option(\'maxOffsetsPerTrigger\', 200) \\\n .option(\'fetchOffset.retryIntervalMs\', 200) \\\n .option(\'fetchOffset.numRetries\', 3) \\\n .load()\\\n .select(from_json(col(\'value\').cast(\'string\'), schema).alias("json_dta")).selectExpr(\'json_dta.*\')\n\n stream_df.writeStream.outputMode(\'append\')\n .format(HiveWarehouseSession.STREAM_TO_STREAM)\n .option("database", "database_name")\n .option("table", "table_name")\n .option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))\n .option("checkpointLocation", "/path/to/checkpoint/dir")\n .start().awaitTermination()\nRun Code Online (Sandbox Code Playgroud)\n我是 Kafka 的初学者,一直在阅读 Kafka 性能优化技术并遇到了这两个。
\n\n\n\n
spark.streaming.backpressure.enabled和spark.streaming.kafka.maxRatePerPartition
要启用第一个参数:
\nsparkConf.set("spark.streaming.backpressure.enabled",\xe2\x80\x9dtrue\xe2\x80\x9d)\nRun Code Online (Sandbox Code Playgroud)\n官方文档中对上述参数的解释为:
\n\n\n启用或禁用 Spark Streaming 的内部反压机制(自 1.5 起)。这使得 Spark Streaming 能够根据当前批量调度延迟和处理时间来控制\n接收速率,以便系统仅按照系统能够处理的速度进行接收。\n在内部,这会动态设置\n接收器的最大接收速率。该速率的上限由值\n
\nspark.streaming.receiver.maxRate和\n决定spark.streaming.kafka.maxRatePerPartition
现在我是第一次运行应用程序并且没有以前的微批次,我应该指定一些值:spark.streaming.backpressure.initialRate
如果是的话,应该如何确定 的值spark.streaming.backpressure.initialRate。\n文档中还说,如果spark.streaming.backpressure.enabled设置为true最大接收速率,是动态设置的。\n如果是这样,还需要配置吗:\nspark.streaming.receiver.maxRate并且spark.streaming.kafka.maxRatePerPartition\nifspark.streaming.backpressure.enabled设置到true?
这个链接spark.streaming.backpressure.initialRate说施加背压时不会影响使用。
任何有助于消除混乱的帮助将不胜感激。
\n您所指的配置spark.streaming.[...]属于Direct Streaming(又名 Spark Streaming),而不属于Structured Streaming。
如果您不知道其中的区别,我建议您查看单独的编程指南:
结构化流不提供背压机制。当您从 Kafka 消费时,您可以使用(正如您已经在做的那样)该选项maxOffsetsPerTrigger来设置每个触发器上读取消息的限制。此选项在《结构化流媒体和 Kafka 集成指南》中记录为:
“每个触发间隔处理的最大偏移量的速率限制。指定的总偏移量将按比例分配到不同卷的 topicPartitions 上。”
如果您仍然对标题问题感兴趣
与 Kafka 的 Spark Streaming有何
spark.streaming.kafka.maxRatePerPartition关系?spark.streaming.backpressure.enabled
Spark 的配置文档中解释了这种关系:
“启用或禁用Spark Streaming的内部反压机制(从1.5开始)。这使得Spark Streaming可以根据当前批量调度延迟和处理时间来控制接收速率,以便系统接收的速度与系统可以处理的速度一样快。在内部,这会动态设置接收器的最大接收速率。该速率的上限取决于这些值
spark.streaming.receiver.maxRate以及spark.streaming.kafka.maxRatePerPartition它们是否已设置(见下文)。 ”
Spark Streaming(DStream,而不是结构化流)中可用的反压机制的所有详细信息都在您已链接的启用反压以使您的 Spark Streaming 应用程序生产做好准备的博客中进行了解释。
通常,如果启用背压,您将设置spark.streaming.kafka.maxRatePerPartition为最佳估计速率的 150% ~ 200%。
PID控制器的精确计算可以在PIDRateEstimator类中的代码中找到。
正如您要求的一个例子,这是我在我的一个高效应用程序中所做的一个例子:
spark.streaming.backpressure.enabled设置为真spark.streaming.kafka.maxRatePerPartition设置为 10000spark.streaming.backpressure.pid.minRate保持默认值 100| 归档时间: |
|
| 查看次数: |
1896 次 |
| 最近记录: |