我通过SpringJMS在我的项目中使用MQ,作为代理我使用ActiveMQ。我需要基于消息设置过期,所以我尝试使用message.setjmsExpersion
,但没有成功。所有到达ActiveMQ的消息都具有expiration=0。
有人成功地使用Spring为每个消息设置过期吗?
long expiration = 0L;
if (!producer.getDisableMessageTimestamp()) {
long timeStamp = System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
message.setJMSExpiration(expiration);
//me: timeToLive coming from default values of Producer/JmsTemplate...
我不知道为什么Spring决定排除这一点,但是您可以扩展JmsTemplate并重载一些方法,传递一个timeToLive参数。
public class MyJmsTemplate extends JmsTemplate {
public void send(final Destination destination,
final MessageCreator messageCreator, final long timeToLive)
throws JmsException {
execute(new SessionCallback<Object>() {
public Object doInJms(Session session) throws JMSException {
doSend(session, destination, messageCreator, timeToLive);
return null;
}
}, false);
}
protected void doSend(Session session, Destination destination,
MessageCreator messageCreator, long timeToLive) throws JMSException {
Assert.notNull(messageCreator, "MessageCreator must not be null");
MessageProducer producer = createProducer(session, destination);
try {
Message message = messageCreator.createMessage(session);
if (logger.isDebugEnabled()) {
logger.debug("Sending created message: " + message);
}
doSend(producer, message, timeToLive);
// Check commit - avoid commit call within a JTA transaction.
if (session.getTransacted() && isSessionLocallyTransacted(session)) {
// Transacted session created by this template -> commit.
JmsUtils.commitIfNecessary(session);
}
} finally {
JmsUtils.closeMessageProducer(producer);
}
}
protected void doSend(MessageProducer producer, Message message,
long timeToLive) throws JMSException {
if (isExplicitQosEnabled() && timeToLive > 0) {
producer.send(message, getDeliveryMode(), getPriority(), timeToLive);
} else {
producer.send(message);
}
}
}
我的Kafka消费者必须倾听多个主题。每个主题都定义了一个优先级,比如高、低和中。 消费者服务必须以这样的方式配置,例如,它有30个执行器用于处理高主题的消息,5个执行器用于处理低主题和中主题的每个消息。 如果执行程序被占用并运行当前任务,是否有方法配置消费者停止消费来自相应主题的消息?
上面的配置是为了让消息在30秒内不过期。但是消息在到达队列后立即过期。侦听器甚至不接收消息。
当你在操作本系统时,系统会根据你操作的类型在下方弹出一些操作提示和异常,警告等信息,如下图,你可以根据需要控制这些信息是否显示。
设置是否接收Queue的每日消息数量统计邮件 方法参数 SetQueueDailyStatisticsStateRequest queueName : String : required Queue名称 enabled : boolean : required 是否接收每日消息数量统计邮件 注: 如若接收,须设置用户邮箱地址。 方法无返回 查看当前是否允许接收Queue的每日消息数量统计邮件 方法
我没有找到任何关于Debezium如何设置Kafka消息时间戳的文档,以及是否设置了时间戳。 通过比较这些值,Kafka消息的时间戳总是在数据库表(source.ts_ms)更改的时间戳之后,也在Debezium(ts_ms)处理更改的时间之后。这表明Kafka消息时间戳只是Kafka代理设置为摄入时间的时间戳。 有人知道关于Debezium是否以及如何在其填充的接收器主题中设置Kafka消息时间
设置方法:我-设置-消息通知设置