高级消费者API似乎一次只读取一条消息.
如果消费者想要处理这些消息并将其提交给Solr或Elastic-Search等其他下游消费者,因为他们更喜欢批量发送消息而不是一次发送消息,这对消费者来说可能是个问题.
将这些消息在内存中批处理也不是一件容易的事,因为Kafka中的偏移量也只需要在批处理已经提交时同步,否则崩溃的kafka-consumer与未提交的下游消息(如在Solr或ES中)将具有其偏移量已经更新,因此消息松散.
如果在向下游提交消息之后但在更新消息偏移之前崩溃,消费者可能会多次使用消息.
如果Kafka批量使用消息,那么一些指向代码/文档的指针将非常受欢迎.
谢谢!
我不知道批处理消费者。但即使有一个,您的主要问题仍然存在。您希望在成功转发数据后提交偏移量。实现此目的的一种方法是通过设置属性来关闭使用者的自动提交auto.commit.enable = false。权衡当然是您必须注意何时提交偏移量。
在此处找到消费者属性的完整文档:https : //kafka.apache.org/documentation.html#consumerconfigs
关于如何手动提交从 java-doc ( https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html )窃取的偏移量的一个很好的例子:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5814 次 |
| 最近记录: |