Kafka - 使用高级消费者实现延迟队列

Nim*_*007 19 java messaging scala apache-kafka kafka-consumer-api

想要使用高级消费者api实现延迟消费者

大意:

  • 按密钥生成消息(每个消息包含创建时间戳)这可确保每个分区按生产时间排序消息.
  • auto.commit.enable = false(将在每个消息进程后显式提交)
  • 消费一条消息
  • 检查消息时间戳并检查是否已经过了足够的时间
  • 进程消息(此操作永远不会失败)
  • 提交1个偏移量

    while (it.hasNext()) {
      val msg = it.next().message()
      //checks timestamp in msg to see delay period exceeded
      while (!delayedPeriodPassed(msg)) { 
         waitSomeTime() //Thread.sleep or something....
      }
      //certain that the msg was delayed and can now be handled
      Try { process(msg) } //the msg process will never fail the consumer
      consumer.commitOffsets //commit each msg
    }
    
    Run Code Online (Sandbox Code Playgroud)

对此实施的一些担忧:

  1. 提交每个偏移可能会减慢ZK
  2. consumer.commitOffsets会抛出异常吗?如果是的话,我会两次使用相同的消息(可以用幂等消息解决)
  3. 问题等待很长时间没有提交偏移量,例如延迟时间是24小时,将从迭代器获得下一个,睡眠24小时,进程和提交(ZK会话超时?)
  4. 如果没有提交新的偏移量,ZK会话如何保持活跃状态​​?(设置一个配置单元zookeeper.session.timeout.ms可以解决死亡的消费者而不识别它)
  5. 我遗失的任何其他问题?

谢谢!

Emi*_*l H 18

解决此问题的一种方法是使用不同的主题,您可以推送所有要延迟的邮件.如果所有延迟的消息都应该在相同的时间延迟之后处理,那么这将非常简单:

while(it.hasNext()) {
    val message = it.next().message()

    if(shouldBeDelayed(message)) {
        val delay = 24 hours
        val delayTo = getCurrentTime() + delay
        putMessageOnDelayedQueue(message, delay, delayTo)
    }
    else {
       process(message)
    }

    consumer.commitOffset()
}
Run Code Online (Sandbox Code Playgroud)

现在将尽快处理所有常规消息,而那些需要延迟的消息将被放在另一个主题上.

好消息是我们知道延迟主题头部的消息是应该首先处理的消息,因为它的delayTo值将是最小的.因此,我们可以设置另一个读取头消息的消费者,检查时间戳是否在过去,如果是,则处理消息并提交偏移量.如果不是它不提交偏移量而是只是睡觉直到那个时间:

while(it.hasNext()) {
    val delayedMessage = it.peek().message()
    if(delayedMessage.delayTo < getCurrentTime()) {
        val readMessage = it.next().message
        process(readMessage.originalMessage)
        consumer.commitOffset()
    } else {
        delayProcessingUntil(delayedMessage.delayTo)
    }
}
Run Code Online (Sandbox Code Playgroud)

如果有不同的延迟时间,您可以在延迟时划分主题(例如24小时,12小时,6小时).如果延迟时间比动态更加动态则变得有点复杂.您可以通过引入两个延迟主题来解决它.读取延迟主题A中的所有消息并处理其delayTo值过去的所有消息.在其他人中,您只需找到最接近的那个delayTo,然后将它们放在主题上B.睡觉直到应该处理最接近的一个,并反过来完成所有操作,即从主题处理消息B并将尚未被删除的一次放回主题A.

回答您的具体问题(有些问题已在您的问题的评论中得到解决)

  1. 提交每个偏移可能会减慢ZK

您可以考虑切换到在Kafka中存储偏移量(0.8.2中可用的功能,offsets.storage在消费者配置中检出属性)

  1. consumer.commitOffsets会抛出异常吗?如果是的话,我会两次使用相同的消息(可以用幂等消息解决)

我相信如果它不能与偏移存储器通信,例如.正如你所说,使用幂等消息解决了这个问题.

  1. 问题等待很长时间没有提交偏移量,例如延迟时间是24小时,将从迭代器获得下一个,睡眠24小时,进程和提交(ZK会话超时?)

除非消息本身的处理花费的时间超过会话超时,否则上述解决方案不会出现问题.

  1. 如果没有提交新的偏移量,ZK会话如何保持活跃状态​​?(设置一个配置单元zookeeper.session.timeout.ms可以解决死亡的消费者而不识别它)

再次使用上述内容,您不需要设置长会话超时.

  1. 我遗失的任何其他问题?

总有;)

  • 将侦听器线程置于睡眠状态直到延迟过去并不是一个好主意。您可以快速耗尽所有侦听器线程。 (3认同)

Dhy*_*yan 6

使用 Tibco EMS 或其他 JMS 队列。他们内置了重试延迟。Kafka 可能不是您正在做的事情的正确设计选择