如果我使用Kafka Async生成器,假设缓冲区中有X个消息.当它们在客户端上实际处理时,如果代理或特定分区在某段时间内关闭,kafka客户端将重试,如果消息失败,它是否会将特定消息标记为失败并转到下一条消息(这可能导致乱序消息)?或者,为了保留订单,是否会使批处理中的剩余消息失败?
我接下来要保持订购,所以理想情况下希望kafka能够从失败的地方使批处理失败,所以我可以从失败点重试,我将如何实现?
从这里:http://kafka.apache.org/documentation.html#compaction,很明显,如果我启用日志压缩,则无法保证在日志的"头部"中保留什么.所以,考虑到我有消费者 - 例如:审核消费者,我怎样才能确保我将至少保留所有消息(比如说)7天,只有比这更早的消息应该有资格进行压缩?
没有公开的配置wrt log.compaction允许我提供这样的保证.这可能吗?
我期待这段代码进入无限循环,发送和接收消息。但它似乎既不发送也不接收。为什么?
go func() {
for {
select {
case ch1 <- 1:
println("sent 1 ")
case c := <-ch1:
println(" received ", c)
}
time.Sleep(time.Second)
}
}()
Run Code Online (Sandbox Code Playgroud)