小编mzl*_*zlo的帖子

如何在 Kafka Consumer Group 中将偏移量重置为任意值?

我想将所有分区的偏移量重置为特定值......我看到 kafka-consumer-groups.sh 提供了 --from -file 将偏移量重置为 CSV 文件中定义的值的选项

任何人都可以分享这个 csv 文件的内容/格式和它的示例命令吗?

例如: ./kafka_2.12-2.1.1/bin/kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKER} --group ${GROUP_NAME} --topic ${TOPIC} --reset-offsets --from-file offsets.csv --execute

offsets.csv 的内容/格式是什么?

apache-kafka

8
推荐指数
2
解决办法
2359
查看次数

如何在 Flink 中跳过损坏的消息?

如何在 Flink 中跳过损坏的消息

我有 DAG:KafkaSrcConsumer > FlatMap > Window > SinkFunction

现在,如果我在操作员“KafkaSrcConsumer”中收到来自 Kafka 的corruptedMessage,我想抛出/跳过该消息,并且我不想将该损坏的消息转发给下一个操作员“FlatMap”

我们如何在 Apache Flink 中实现这一点?

(注意:从 KafkaSrcConsumer 抛出异常将重新启动 flink 作业,我想避免这种情况,因为我只想跳过消息并移至下一条消息)

apache-flink

0
推荐指数
1
解决办法
448
查看次数

标签 统计

apache-flink ×1

apache-kafka ×1