NZk*_*yca 1 apache-kafka apache-spark apache-spark-sql pyspark spark-structured-streaming
我之前已经成功地将 pyspark 用于 Spark Streaming(Spark 2.0.2)和 Kafka(0.10.1.0),但我的目的更适合结构化流。我尝试在线使用示例:https : //spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html
使用以下类似代码:
ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
query = ds1
.writeStream
.outputMode('append')
.format('console')
.start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
但是,我总是以以下错误告终:
: org.apache.kafka.common.config.ConfigException:
Missing required configuration "partition.assignment.strategy" which has no default value
Run Code Online (Sandbox Code Playgroud)
我还尝试在创建 ds1 时将其添加到我的选项集中:
.option("partition.assignment.strategy", "range")
Run Code Online (Sandbox Code Playgroud)
但即使明确地为其分配一个值也不能阻止错误,我可以在网上或 Kafka 文档中找到的任何其他值(如“roundrobin”)也没有。
我也用“assign”选项尝试了这个并实现了同样的错误(我们的Kafka主机设置为assign——每个消费者只分配一个分区,我们没有任何重新平衡)。
知道这里发生了什么吗?该文档没有帮助(可能是因为它仍处于实验阶段)。另外,是否有使用 KafkaUtils 进行结构化流处理?或者这是唯一的网关?
Kafka 0.10.1.* 客户端存在一个已知问题,您不应该将它与 Spark 一起使用,因为它可能会由于https://issues.apache.org/jira/browse/KAFKA-4547产生错误的答案。您可以使用 0.10.0.1 客户端,它应该可以与 0.10.1.* Kafka 集群一起使用。
要在结构化流中向 Kafka 消费者客户端发送 Kafka 配置,您需要添加kafka.前缀,例如.option("kafka.partition.assignment.strategy", "range"). 但是,您不需要设置,kafka.partition.assignment.strategy因为它具有默认值。我的预感是您可能将 Kafka 0.8.* 和 0.10.* jars 放在类路径上并加载了错误的类。
您想使用 KafkaUtils 中的哪个 API 但在 Structured Streaming 中没有?Spark 2.2.0 刚刚发布,您可以在 Structured Streaming 中对 Kafka 使用批处理或流查询。阅读http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html以获取示例。
| 归档时间: |
|
| 查看次数: |
2664 次 |
| 最近记录: |