RocketMQ 开源版本任意时间延时队列实现
定时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。
延时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
定时消息与延时消息在代码配置上存在一些差异,但是最终达到的效果相同:消息在发送到消息队列RocketMQ版服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。
定时消息和延时消息适用于以下一些场景:
消息生产和消费有时间窗口要求,例如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。
这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。
如支付未完成,则关闭订单。如已完成支付则忽略。
通过消息触发一些定时任务,例如在某一固定时间点向用户发送提醒消息。
使用方式 定时消息和延时消息的使用在代码编写上存在略微的区别:
发送定时消息需要明确指定消息发送时间点之后的某一时间点作为消息投递的时间点。
发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
定时消息的精度会有1s~2s的延迟误差。
定时和延时消息的msg.setStartDeliverTime参数需要设置成当前时间戳之后的某个时刻(单位毫秒)。
如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
定时和延时消息的msg.setStartDeliverTime参数可设置40天内的任何时刻(单位毫秒),超过40天消息发送将失败。
StartDeliverTime是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时和延时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。
由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差。
推荐使用阿里云提供的rocketmq版本的pom
<dependency> <groupId>com.aliyun.openservicesgroupId> <artifactId>ons-clientartifactId> <version>1.8.4.Finalversion> dependency>
import com.aliyun.openservices.ons.api.*; import com.aliyun.openservices.shade.org.apache.commons.lang3.time.DateFormatUtils; import java.util.Date; import java.util.Properties; public class ProducerDelayTest { public static void main(String[] args) { Properties properties = new Properties(); // AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。 properties.put(PropertyKeyConst.AccessKey, "XXX"); // AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。 properties.put(PropertyKeyConst.SecretKey, "XXX"); // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。 properties.put(PropertyKeyConst.NAMESRV_ADDR, "localhost:9876"); Producer producer = ONSFactory.createProducer(properties); // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。 producer.start(); { Message msg = new Message( // 您在消息队列RocketMQ版控制台创建的Topic。 "TopicTest", // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版服务器过滤。 "TagA", // Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。 "演示15秒钟>>> ".getBytes()); // 设置代表消息的业务关键属性,请尽可能全局唯一。 // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。 // 注意:不设置也不会影响消息正常收发。 msg.setKey("ORDERID_100e"); try { // 延时消息,单位毫秒(ms),在指定延迟时间(当前时间之后)进行投递,例如消息在15秒后投递。 long delayTime = System.currentTimeMillis() + 15000; System.out.println("发送时间>>" + DateFormatUtils.format(new Date(), "yyyy-MM-dd HH����ss")); // 设置消息需要被投递的时间。 msg.setStartDeliverTime(delayTime); SendResult sendResult = producer.send(msg); // 同步发送消息,只要不抛异常就是成功。 if (sendResult != null) { System.out.println(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH����ss") + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); } } catch (Exception e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); } } // 在应用退出前,销毁Producer对象。 // 注意:如果不销毁也没有问题。 producer.shutdown(); } }
问题内容: 我阅读了今年的UberConf的幻灯片,其中一位发言者认为Spring JMS给您的消息队列系统增加了性能开销,但是在幻灯片中我看不到任何证据支持这一点。演讲者还提出了点对点比传统“发布- 订阅”方法更快的情况,因为每个消息仅发送一次,而不是广播给每个消费者。 我想知道是否有经验的Java消息传递专家可以在这里介绍一些技术问题: 使用Spring JMS而不是单纯的JMS实际上会产生性
使用启用会话(消息排序)的Azure ServiceBus队列,我的会话需要持续几分钟到几个小时。 为此,我将QueueClient配置如下: 并按如下方式开始接收消息: 在几次(1到6次之间)成功(几乎是瞬时的)消息接收回调之后——无论是对于新会话还是现有会话,接收处理程序都会停止触发。使用,我可以看到位于servicebus队列上的消息。有趣的是,它们都有一个DeliveryCount=1。过
我是AWS的新手。我在这里试图理解SQS。我也看了一些培训,但我仍然不能得到一些答案那里的讨论论坛。我在这里重复我的问题。注意,我知道下面的几个问题有明显的答案,因此更多的是一种修辞。我的困惑源于这样一个事实,即我目前对这个主题的理解导致我对在明显已知的问题之后出现在我脑海中的后续问题给出了相互矛盾的答案,并且夺走了我认为我理解得很好的任何东西的信心。 如果我有一个名为MyQueue的标准队列,并
本文向大家介绍RabbitMQ 怎么实现延迟消息队列?相关面试题,主要包含被问及RabbitMQ 怎么实现延迟消息队列?时的应答技巧和注意事项,需要的朋友参考一下 延迟队列的实现有两种方式: 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能; 使用 RabbitMQ-delayed-message-exchange 插件实现延迟功能。
问题内容: 我有一个Java App和一个NodeJS App,它们都使用一个Azure Service Bus消息队列。 我看到与客户产生一些奇怪的影响,如下所示。 JAVA MESSAGE PRODUCER(每个Azure JMS教程使用QPID库): 输出:发送的邮件的JMSMessageID = ID:2414932965987073843 NODEJS消息消费者: 输出: 如果将其与通过
我试图实现一个AWS SQS队列,以最大限度地减少来自后端服务器的数据库交互,但我遇到了问题。 我有一个消费者进程,它从一个SQS队列中查找消息 为了测试功能,我为一个客户端实现了逻辑。它运行得很好。然而,当我添加了3个客户端时,它不能正常工作。我能够看到SQS队列被500条消息卡住了,后端作业正在正常工作,从队列中读取。 我需要增加后端作业的数量还是增加客户端SQS队列的数量?现在,所有客户端都
编辑:在我写的时候解决了这个问题:P--我喜欢这样的解决方案。我想无论如何我都要把它贴出来,也许别人也会有同样的问题,找到我的解决办法。不关心点数/因果报应等等。我只是把整个事情写了出来,所以我想我应该把它和解决方案贴出来。 我有一个SQS FIFO队列。它使用的是一纸空文队列。以下是它的配置方式: 我有一个单一的生产者微服务,我有10个ECS映像运行作为消费者。 由于业务原因,我们在接近消息在队
为什么已经拥有了共享内存时需要消息队列呢? 这将是多种原因,让我们将其分解为多个点来简化 - 据了解,一旦消息被一个进程接收到,它将不再可用于任何其他进程。 而在共享内存中,数据可供多个进程访问。 如果想使用小信息格式进行通信。 当多个进程同时进行通信时,共享内存数据需要同步保护。 使用共享内存的写入和读取频率很高,那么实现功能将会非常复杂。 在这种情况下不值得使用。 如果所有的进程不需要访问共享