Dea*_*ler 2 apache-kafka clj-kafka
我们使用的是Kafka 0.8异步生产者,但是它正在丢弃消息(并且没有来自另一个线程的aysnc响应,或者我们可以继续使用异步).
我们设定batch.num.messages为500,我们的消费者没有变化.我读过batch.num.messages这只适用于异步生产者而不是同步所以我需要自己批处理.我们正在使用compression.codec=snappy和我们自己的序列化器类.
我的问题是双重的:
我可以假设我可以使用自己的序列化程序类然后自己发送消息吗?
我是否需要担心Kafka可能使用的任何特殊的快速选项/参数?
是的,这是因为只batch.num.messages控制异步生产者的行为.在相关的参数指南中明确说明了这一点:
使用异步模式时,一批中要发送的消息数.生产者将等待,直到准备好发送此数量的消息或达到queue.buffer.max.ms.
要为同步生产者进行批处理,您必须发送消息列表:
public void trySend(List<M> messages) {
List<KeyedMessage<String, M>> keyedMessages = Lists.newArrayListWithExpectedSize(messages.size());
for (M m : messages) {
keyedMessages.add(new KeyedMessage<String, M>(topic, m));
}
try {
producer.send(keyedMessages);
} catch (Exception ex) {
log.error(ex)
}
}
Run Code Online (Sandbox Code Playgroud)
请注意,我在kafka.javaapi.producer.Producer这里使用.
一旦send执行,就会发送批处理.
我可以假设我可以使用自己的序列化程序类然后自己发送消息吗?我是否需要担心Kafka可能使用的任何特殊的快速选项/参数?
压缩和序列化都是正交功能,不影响批处理,但实际应用于单个消息.
请注意,将会有api更改,并且将统一async/sync api.