当前位置: 首页 > 面试题库 >

kafka如何实现延迟队列?

汪安然
2023-03-14
本文向大家介绍kafka如何实现延迟队列?相关面试题,主要包含被问及kafka如何实现延迟队列?时的应答技巧和注意事项,需要的朋友参考一下

Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、Zookeeper等组件中都存在时间轮的踪影。

底层使用数组实现,数组中的每个元素可以存放一个TimerTaskList对象。TimerTaskList是一个环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask.

Kafka中到底是怎么推进时间的呢?Kafka中的定时器借助了JDK中的DelayQueue来协助推进时间轮。具体做法是对于每个使用到的TimerTaskList都会加入到DelayQueue中。Kafka中的TimingWheel专门用来执行插入和删除TimerTaskEntry的操作,而DelayQueue专门负责时间推进的任务。再试想一下,DelayQueue中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue的队头只需要O(1)的时间复杂度。如果采用每秒定时推进,那么获取到第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取到第二个超时任务时有需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓是“知人善用”,用TimingWheel做最擅长的任务添加和删除操作,而用DelayQueue做最擅长的时间推进工作,相辅相成。

参考:https://blog.csdn.net/u013256816/article/details/80697456

 类似资料:
  • 本文向大家介绍RabbitMQ 怎么实现延迟消息队列?相关面试题,主要包含被问及RabbitMQ 怎么实现延迟消息队列?时的应答技巧和注意事项,需要的朋友参考一下 延迟队列的实现有两种方式: 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能; 使用 RabbitMQ-delayed-message-exchange 插件实现延迟功能。

  • 问题内容: 想要使用高级消费者API实现延迟的消费者 大意: 按键生成消息(每个消息包含创建时间戳记),以确保每个分区按生成时间对消息进行排序。 auto.commit.enable = false(将在每个消息处理之后显式提交) 消费一条消息 检查消息时间戳,并检查是否经过了足够的时间 处理消息(此操作将永不失败) 提交1个偏移 有关此实现的一些担忧: 提交每个偏移量可能会使ZK变慢 Consu

  • 问题内容: 我有一个风暴拓扑来处理来自Kafka的消息,并根据手头的任务在Cassandra中进行HTTP调用/保存。我会尽快处理这些消息。由于来自外部源(例如HTTP)的响应,很少有消息没有得到完全处理。如果HTTP服务器在一段时间后不响应/返回错误消息以重试,我想为重试实现指数补偿机制。我想不出什么主意就能实现它们。我想知道如果还有其他可以容错的解决方案,那么其中哪一个将是更好的解决方案。由于

  • 我需要一个队列来处理延迟x小时后的消息。并且我需要一个数据驱动的、所有基于事件的方法,不使用任何调度器等等。 场景是,我将一些实时数据发送到SNS主题,然后从那里发送到不同的SQS队列,由不同的AWS Lambda函数使用。 其中一个Lambda函数需要处理延迟3小时后的消息。不过,送货最长延迟15分钟。如果我第一次读取消息,它将自动从SQS中删除,因为我正在使用事件源映射触发器来调用lambda

  • 问题内容: 我正在尝试用Java做某事,而我需要一些东西在while循环中等待/延迟几秒钟。 我想构建一个步进音序器,并且对Java还是陌生的。有什么建议么? 问题答案: If you want to pause then use java.util.concurrent.TimeUnit: TimeUnit.SECONDS.sleep(1); TimeUnit.MINUTES.sleep(1);