Jar*_*red 5 streaming apache-kafka apache-spark google-cloud-dataproc
我目前正在 Dataproc 上运行 Spark 作业,并且在尝试重新加入组并从 kafka 主题读取数据时遇到错误。我已经做了一些挖掘,但不确定是什么问题。我已经auto.offset.reset设置,earliest所以它应该从最早的可用非提交偏移量中读取,最初我的火花日志看起来像这样:
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-11 to offset 5553330.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-2 to offset 5555553.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-3 to offset 5555484.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-4 to offset 5555586.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-5 to offset 5555502.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-6 to offset 5555561.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-7 to offset 5555542.```
Run Code Online (Sandbox Code Playgroud)
但是接下来的一行我在尝试从服务器上不存在的偏移量中读取时出错(您可以看到分区的偏移量与上面列出的不同,所以我不知道为什么它会尝试读取表单该偏移量,这是下一行的错误:
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets
out of range with no configured reset policy for partitions:
{demo.topic-11=4544296}
Run Code Online (Sandbox Code Playgroud)
关于为什么我的 spark 作业不断返回到这个偏移量 (4544296) 而不是它最初输出的那个 (5553330) 的任何想法?
它似乎自相矛盾 wa) 它所说的实际偏移量和它试图读取的偏移量和 b) 说没有配置的重置策略
这个答案迟了一年,但希望能帮助其他面临类似问题的人。
通常,当消费者尝试读取 Kafka 主题中不再存在的偏移量时,就会出现此行为。偏移量不再存在,通常是因为它已被 Kafka Cleaner 删除(例如,由于保留或压缩策略)。然而,消费者组仍然是 Kafka 已知的,并且 Kafka 保留了主题“demo.topic”及其所有分区的组“demo-group”的最新消费消息的信息。
因此,auto.offset.reset配置不会产生任何影响,因为不需要重置。相反,卡夫卡了解消费者组。
此外,Fetcheronly 还告诉您主题的每个分区内最新的可用偏移量。它并不自动意味着它实际上会轮询直到此偏移量的所有消息。Spark 决定每个分区实际消耗和处理多少消息(基于例如配置maxRatePerPartition)。
要解决此问题,您可以更改消费者组(在这种特殊情况下这可能不是您想要的),或者通过使用手动重置消费者组“演示组”的偏移量
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group demo-group --topic demo.topic --partition 11 --to-latest
Run Code Online (Sandbox Code Playgroud)
根据您的要求,您可以使用该工具重置主题每个分区的偏移量。帮助功能或文档解释了所有可用选项。
| 归档时间: |
|
| 查看次数: |
830 次 |
| 最近记录: |