当前位置: 首页 > 面试题库 >

Kafka-使用高级使用者的延迟队列实施

盖昀
2023-03-14
问题内容

想要使用高级消费者API实现延迟的消费者

大意:

  • 按键生成消息(每个消息包含创建时间戳记),以确保每个分区按生成时间对消息进行排序。
  • auto.commit.enable = false(将在每个消息处理之后显式提交)
  • 消费一条消息
  • 检查消息时间戳,并检查是否经过了足够的时间
  • 处理消息(此操作将永不失败)
  • 提交1个偏移
        while (it.hasNext()) {
      val msg = it.next().message()
      //checks timestamp in msg to see delay period exceeded
      html" target="_blank">while (!delayedPeriodPassed(msg)) { 
         waitSomeTime() //Thread.sleep or something....
      }
      //certain that the msg was delayed and can now be handled
      Try { process(msg) } //the msg process will never fail the consumer
      consumer.commitOffsets //commit each msg
    }

有关此实现的一些担忧:

  1. 提交每个偏移量可能会使ZK变慢
  2. Consumer.commitOffsets是否可以引发异常?如果是,我将使用同一条消息两次(可以解决幂等消息)
  3. 等待较长时间而不提交偏移量的问题,例如延迟时间为24小时,将从迭代器中获取下一个,睡眠24小时,进行处理并提交(ZK会话超时?)
  4. ZK会话如何在不提交新偏移的情况下保持活动?(设置一个配置单元zookeeper.session.timeout.ms可以解决死掉的消费者而又不认识它)
  5. 我还有其他问题吗?

谢谢!


问题答案:

解决此问题的一种方法是使用不同的主题,在其中推送所有将要延迟的消息。如果所有延迟的消息都应在相同的时间延迟之后进行处理,这将很简单:

while(it.hasNext()) {
    val message = it.next().message()

    if(shouldBeDelayed(message)) {
        val delay = 24 hours
        val delayTo = getCurrentTime() + delay
        putMessageOnDelayedQueue(message, delay, delayTo)
    }
    else {
       process(message)
    }

    consumer.commitOffset()
}

现在将尽快处理所有常规消息,而需要延迟的消息将放在另一个主题上。

令人高兴的是,我们知道延迟主题开头的消息是应该首先处理的消息,因为其delayTo值将是最小的。因此,我们可以设置另一个读取头消息的使用者,检查时间戳是否在过去,如果是,则处理该消息并提交偏移量。如果不是,它不会提交偏移量,而是一直休眠直到那个时间:

while(it.hasNext()) {
    val delayedMessage = it.peek().message()
    if(delayedMessage.delayTo < getCurrentTime()) {
        val readMessage = it.next().message
        process(readMessage.originalMessage)
        consumer.commitOffset()
    } else {
        delayProcessingUntil(delayedMessage.delayTo)
    }
}

如果存在不同的延迟时间,则可以按延迟划分主题(例如24小时,12小时,6小时)。如果延迟时间比该时间更动态,那么它将变得更加复杂。您可以通过引入两个延迟主题来解决。从延迟主题中读取所有消息,A并处理所有delayTo值都为过去的消息。除其他外,您只需找到最接近的一个,delayTo然后将它们放在主题上即可B。休眠直到应该处理最接近的消息为止,然后相反地进行所有处理,即处理来自主题的消息,B然后将尚未处理的消息放回主题A

回答您的特定问题(在您的问题的注释中已经解决了一些问题)

  1. 提交每个偏移量可能会使ZK变慢

您可以考虑切换到在Kafka中存储偏移量(自0.8.2起可用的功能,请offsets.storage在消费者配置中检出属性)

  1. Consumer.commitOffsets是否可以引发异常?如果是,我将使用同一条消息两次(可以解决幂等消息)

我认为,例如,如果它不能与偏移存储进行通信,则可以。正如您所说,使用幂等消息可以解决此问题。

  1. 等待较长时间而不提交偏移量的问题,例如延迟时间为24小时,将从迭代器中获取下一个,睡眠24小时,进行处理并提交(ZK会话超时?)

除非消息本身的处理花费的时间超过会话超时,否则上述解决方案不会有问题。

4.
ZK会话如何在不提交新偏移的情况下保持活动?(设置一个配置单元zookeeper.session.timeout.ms可以解决死掉的消费者而又不认识它)

同样,使用上述方法,您无需设置长时间的会话超时。

  1. 我还有其他问题吗?

总是有;)



 类似资料:
  • 本文向大家介绍kafka如何实现延迟队列?相关面试题,主要包含被问及kafka如何实现延迟队列?时的应答技巧和注意事项,需要的朋友参考一下 Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不

  • 我不熟悉ApacheStorm和kafka,作为POC的一部分,我正在尝试使用kafka和ApacheStorm处理消息流。我使用的是暴风Kafka的来源https://github.com/apache/storm/tree/master/external/storm-kafka,我能够创建一个示例程序,该程序使用KafkaSpout读取来自kafka主题的消息,并将其输出到另一个kafka主题

  • 我们最近遇到了由RabbitMQ支持的应用程序的意外行为。RabbitMQ版本为3.6.12,我们使用的是.NET客户端5.0.1 应用程序订阅了两个队列,一个用于命令,另一个用于事件--我们还使用手动确认。我们的应用程序配置为有7个消费者。每个通道都有自己的通道(IModel),每个通道都有自己的EventingBasicConsumer,当EventingBasicConsumer.Recei

  • AWS SQS FIFO队列的批处理设置为1,延迟为1秒。收到的每个项目都与一个MessageGroup相关联。 队列同时接收10个不同消息组的30条消息,每个消息组包含3条消息。。。 一秒钟的延迟是否适用于队列级别,即30条消息需要30秒的传递时间? 还是队列会启动10个消费者,每个消息组一个,在3秒内清空队列?

  • 我在一个公认的缓慢配置中设置了Kafka——但我不期待我看到的数字。 我将集群设置为<code>LogAppendTime</code>,因此我正在测量事件写入Kafka(由代理决定)与服务接收到事件之间的时间。代理和应用程序都位于“同一位置”,因此服务器之间的ping时间很短,时钟应该同步或接近。 我看到延迟在 到 600ms 之间,很多是 ......巨大的差异让我觉得我的设置出了问题。它因消

  • 本文向大家介绍php使用redis的有序集合zset实现延迟队列应用示例,包括了php使用redis的有序集合zset实现延迟队列应用示例的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了php使用redis的有序集合zset实现延迟队列。分享给大家供大家参考,具体如下: 延迟队列就是个带延迟功能的消息队列,相对于普通队列,它可以在指定时间消费掉消息。 延迟队列的应用场景: 1、新用户注册,