Nim*_*007 19 java messaging scala apache-kafka kafka-consumer-api
想要使用高级消费者api实现延迟消费者
大意:
提交1个偏移量
while (it.hasNext()) {
val msg = it.next().message()
//checks timestamp in msg to see delay period exceeded
while (!delayedPeriodPassed(msg)) {
waitSomeTime() //Thread.sleep or something....
}
//certain that the msg was delayed and can now be handled
Try { process(msg) } //the msg process will never fail the consumer
consumer.commitOffsets //commit each msg
}
Run Code Online (Sandbox Code Playgroud)对此实施的一些担忧:
谢谢!
Emi*_*l H 18
解决此问题的一种方法是使用不同的主题,您可以推送所有要延迟的邮件.如果所有延迟的消息都应该在相同的时间延迟之后处理,那么这将非常简单:
while(it.hasNext()) {
val message = it.next().message()
if(shouldBeDelayed(message)) {
val delay = 24 hours
val delayTo = getCurrentTime() + delay
putMessageOnDelayedQueue(message, delay, delayTo)
}
else {
process(message)
}
consumer.commitOffset()
}
Run Code Online (Sandbox Code Playgroud)
现在将尽快处理所有常规消息,而那些需要延迟的消息将被放在另一个主题上.
好消息是我们知道延迟主题头部的消息是应该首先处理的消息,因为它的delayTo值将是最小的.因此,我们可以设置另一个读取头消息的消费者,检查时间戳是否在过去,如果是,则处理消息并提交偏移量.如果不是它不提交偏移量而是只是睡觉直到那个时间:
while(it.hasNext()) {
val delayedMessage = it.peek().message()
if(delayedMessage.delayTo < getCurrentTime()) {
val readMessage = it.next().message
process(readMessage.originalMessage)
consumer.commitOffset()
} else {
delayProcessingUntil(delayedMessage.delayTo)
}
}
Run Code Online (Sandbox Code Playgroud)
如果有不同的延迟时间,您可以在延迟时划分主题(例如24小时,12小时,6小时).如果延迟时间比动态更加动态则变得有点复杂.您可以通过引入两个延迟主题来解决它.读取延迟主题A
中的所有消息并处理其delayTo
值过去的所有消息.在其他人中,您只需找到最接近的那个delayTo
,然后将它们放在主题上B
.睡觉直到应该处理最接近的一个,并反过来完成所有操作,即从主题处理消息B
并将尚未被删除的一次放回主题A
.
回答您的具体问题(有些问题已在您的问题的评论中得到解决)
- 提交每个偏移可能会减慢ZK
您可以考虑切换到在Kafka中存储偏移量(0.8.2中可用的功能,offsets.storage
在消费者配置中检出属性)
- consumer.commitOffsets会抛出异常吗?如果是的话,我会两次使用相同的消息(可以用幂等消息解决)
我相信如果它不能与偏移存储器通信,例如.正如你所说,使用幂等消息解决了这个问题.
- 问题等待很长时间没有提交偏移量,例如延迟时间是24小时,将从迭代器获得下一个,睡眠24小时,进程和提交(ZK会话超时?)
除非消息本身的处理花费的时间超过会话超时,否则上述解决方案不会出现问题.
- 如果没有提交新的偏移量,ZK会话如何保持活跃状态?(设置一个配置单元zookeeper.session.timeout.ms可以解决死亡的消费者而不识别它)
再次使用上述内容,您不需要设置长会话超时.
- 我遗失的任何其他问题?
总有;)