sil*_*ent 6 azure apache-kafka apache-spark pyspark databricks
我正在尝试将一个包含大约 2.3 亿条记录的数据框写入 Kafka。更具体地说,是启用 Kafka 的 Azure Event Hub,但我不确定这是否真的是我的问题的根源。
EH_SASL = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=****";'
dfKafka \
.write \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("topic", "mytopic") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()
Run Code Online (Sandbox Code Playgroud)
启动正常,并成功(而且速度相当快)将大约 3-4 百万条记录写入队列。但几分钟后工作就会停止,并显示如下消息:
org.apache.spark.SparkException:由于阶段失败而中止作业:阶段 7.0 中的任务 6 失败 4 次,最近一次失败:阶段 7.0 中丢失任务 6.3(TID 248、10.139.64.5、执行器 1):kafkashaded.org。 apache.kafka.common.errors.TimeoutException:mytopic-18 的 61 条记录即将到期:自上次追加以来已过去 32839 毫秒
或者
org.apache.spark.SparkException:作业因阶段失败而中止:阶段 8.0 中的任务 13 失败 4 次,最近一次失败:阶段 8.0 中丢失任务 13.3(TID 348、10.139.64.5、执行器 1):kafkashaded.org。 apache.kafka.common.errors.TimeoutException:请求超时。
另外,我从未看到检查点文件被创建/写入。我也尝试过.option("kafka.delivery.timeout.ms", 30000)不同的价值观,但这似乎没有任何效果。
我在 Azure Databricks 集群版本 5.0 中运行它(包括 Apache Spark 2.4.0、Scala 2.11)
我在事件中心上没有看到任何类似限制的错误,所以应该没问题。
终于弄清楚了(大部分):
事实证明,大约 16000 条消息的默认批量大小对于端点来说太大了。在我将 batch.size 参数设置为 5000 后,它开始工作,并且以每分钟约 700k 条消息写入事件中心。另外,上面的超时参数是错误的并且被忽略了。这是kafka.request.timeout.ms
唯一的问题是,它仍然会随机地超时运行,并且显然会再次从头开始,因此我最终会得到重复的结果。将为此提出另一个问题。
dfKafka \
.write \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.batch.size", 5000) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("kafka.request.timeout.ms", 120000) \
.option("topic", "raw") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2163 次 |
| 最近记录: |