当前位置: 首页 > 知识库问答 >
问题:

使用SpringJMS设置每个消息的过期时间

吕自怡
2023-03-14

我通过SpringJMS在我的项目中使用MQ,作为代理我使用ActiveMQ。我需要基于消息设置过期,所以我尝试使用message.setjmsExpersion,但没有成功。所有到达ActiveMQ的消息都具有expiration=0。

有人成功地使用Spring为每个消息设置过期吗?

        long expiration = 0L;
        if (!producer.getDisableMessageTimestamp()) {
            long timeStamp = System.currentTimeMillis();
            message.setJMSTimestamp(timeStamp);
            if (timeToLive > 0) {
                expiration = timeToLive + timeStamp;
            }
        }
        message.setJMSExpiration(expiration);  
        //me: timeToLive coming from default values of Producer/JmsTemplate...

共有1个答案

魏楷
2023-03-14

我不知道为什么Spring决定排除这一点,但是您可以扩展JmsTemplate并重载一些方法,传递一个timeToLive参数。

public class MyJmsTemplate extends JmsTemplate {

    public void send(final Destination destination,
            final MessageCreator messageCreator, final long timeToLive)
            throws JmsException {
        execute(new SessionCallback<Object>() {
            public Object doInJms(Session session) throws JMSException {
                doSend(session, destination, messageCreator, timeToLive);
                return null;
            }
        }, false);
    }

    protected void doSend(Session session, Destination destination,
            MessageCreator messageCreator, long timeToLive) throws JMSException {

        Assert.notNull(messageCreator, "MessageCreator must not be null");
        MessageProducer producer = createProducer(session, destination);
        try {
            Message message = messageCreator.createMessage(session);
            if (logger.isDebugEnabled()) {
                logger.debug("Sending created message: " + message);
            }
            doSend(producer, message, timeToLive);
            // Check commit - avoid commit call within a JTA transaction.
            if (session.getTransacted() && isSessionLocallyTransacted(session)) {
                // Transacted session created by this template -> commit.
                JmsUtils.commitIfNecessary(session);
            }
        } finally {
            JmsUtils.closeMessageProducer(producer);
        }
    }

    protected void doSend(MessageProducer producer, Message message,
            long timeToLive) throws JMSException {
        if (isExplicitQosEnabled() && timeToLive > 0) {
            producer.send(message, getDeliveryMode(), getPriority(), timeToLive);
        } else {
            producer.send(message);
        }
    }

}
 类似资料:
  • 我的Kafka消费者必须倾听多个主题。每个主题都定义了一个优先级,比如高、低和中。 消费者服务必须以这样的方式配置,例如,它有30个执行器用于处理高主题的消息,5个执行器用于处理低主题和中主题的每个消息。 如果执行程序被占用并运行当前任务,是否有方法配置消费者停止消费来自相应主题的消息?

  • 上面的配置是为了让消息在30秒内不过期。但是消息在到达队列后立即过期。侦听器甚至不接收消息。

  • 当你在操作本系统时,系统会根据你操作的类型在下方弹出一些操作提示和异常,警告等信息,如下图,你可以根据需要控制这些信息是否显示。

  • 设置是否接收Queue的每日消息数量统计邮件 方法参数 SetQueueDailyStatisticsStateRequest queueName : String : required Queue名称 enabled : boolean : required 是否接收每日消息数量统计邮件 注: 如若接收,须设置用户邮箱地址。 方法无返回 查看当前是否允许接收Queue的每日消息数量统计邮件 方法

  • 我没有找到任何关于Debezium如何设置Kafka消息时间戳的文档,以及是否设置了时间戳。 通过比较这些值,Kafka消息的时间戳总是在数据库表(source.ts_ms)更改的时间戳之后,也在Debezium(ts_ms)处理更改的时间之后。这表明Kafka消息时间戳只是Kafka代理设置为摄入时间的时间戳。 有人知道关于Debezium是否以及如何在其填充的接收器主题中设置Kafka消息时间

  • 设置方法:我-设置-消息通知设置