mic*_*cgn 1 java stream offset consumer apache-kafka
我正在使用Kafka流,并希望将Java的一些使用者偏移量重置为开始。
KafkaConsumer.seekToBeginning(...)听起来像是正确的事情,但是我使用Kafka Streams:
KafkaStreams streams = new KafkaStreams(builder, props);
...
streams.start();
Run Code Online (Sandbox Code Playgroud)
我想根据我定义的具体流管道,这将在引擎盖下创建多个消费者。我可以访问那些吗?还是有其他方法以编程方式重置偏移量?
基于Hans Jespersens的答案,我成功地使用以下代码来完成脚本在Java代码中的工作:
import kafka.tools.StreamsResetter;
StreamsResetter resetter = new StreamsResetter();
String[] args = {"--application-id", APP_ID, "--bootstrap-servers", KAFKA_SERVERS, "--input-topics", TEST_TOPIC_NAME, "--zookeeper", ZOOKEEPER};
resetter.run(args);
Run Code Online (Sandbox Code Playgroud)
该类是我在maven中使用以下命令导入的kafka核心库的一部分:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1185 次 |
| 最近记录: |