我想将所有分区的偏移量重置为特定值......我看到 kafka-consumer-groups.sh 提供了 --from -file 将偏移量重置为 CSV 文件中定义的值的选项
任何人都可以分享这个 csv 文件的内容/格式和它的示例命令吗?
例如:
./kafka_2.12-2.1.1/bin/kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKER} --group ${GROUP_NAME} --topic ${TOPIC} --reset-offsets --from-file offsets.csv --execute
offsets.csv 的内容/格式是什么?
如何在 Flink 中跳过损坏的消息
我有 DAG:KafkaSrcConsumer > FlatMap > Window > SinkFunction
现在,如果我在操作员“KafkaSrcConsumer”中收到来自 Kafka 的corruptedMessage,我想抛出/跳过该消息,并且我不想将该损坏的消息转发给下一个操作员“FlatMap”
我们如何在 Apache Flink 中实现这一点?
(注意:从 KafkaSrcConsumer 抛出异常将重新启动 flink 作业,我想避免这种情况,因为我只想跳过消息并移至下一条消息)