当重新排队到原始队列时,消息可以再次返回到死信队列,并看到x-death报头计数不断增长。
由于某些原因,我们希望处理count>=5的死信html" target="_blank">消息(例如),并将其他消息重新排入死信队列。
我需要首先对消息进行基本的ack以检查X死亡计数头,然后将其发送到原始队列,如果计数足够大,否则在死信队列中重新排队。
我无法重新排队到死信队列,因为基本的get不在侦听器内部:抛出AmqpRejectAndDontRequeueException不起作用,因为异常不在rabbitmq侦听器对象内部抛出。
我尝试在receiveAndCallback方法中抛出异常,但这似乎并不更好:
rabbitTemplate.receiveAndReply(queueName, new ReceiveAndReplyCallback<Message, Object>() {
@Override
public Object handle(Message message) {
Long messageXdeathCount = null;
if (null != message.getMessageProperties() && null != message.getMessageProperties().getHeaders()) {
List<Map<String, ?>> xdeathHeader =
(List<Map<String, ?>>) message.getMessageProperties().getHeaders().get(
"x-death");
if (null != xdeathHeader && null != xdeathHeader.get(0)) {
messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
}
}
if (messageXdeathCount == null) {
messageXdeathCount = 0L;
}
if (messageXdeathCount >= 5) {
resendsMessage(message);
} else {
// this does not reject the message
throw new AmqpRejectAndDontRequeueException("rejected");
}
return null;
}
});
return receive;
}
在这个方法执行之后,消息没有像我预期的那样被拒绝,并且远离队列(它已经被加密)。
@Bean
public Exchange exchange() {
TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
admin().declareExchange(exchange);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", EXCHANGE);
Queue queue = new Queue("queueName", true, false, false, args);
admin().declareQueue(queue);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
admin().declareBinding(binding);
return exchange;
}
谢谢你的帮助
更新
我尝试了另一种方法,使用通道get和reject:
// exchange creation
@Bean
public Exchange exchange() throws IOException {
Connection connection = connectionFactory().createConnection();
Channel channel = channel();
channel.exchangeDeclare(EXCHANGE, ExchangeTypes.TOPIC, true, false, null);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", EXCHANGE);
channel.queueDeclare("queueName", true, false, false, args);
channel.queueBind("queueName", EXCHANGE, routingKey);
return exchange;
}
GetResponse response = channel.basicGet(queueName, false);
Long messageXdeathCount = null;
if(null != response.getProps() && null != response.getProps().getHeaders()) {
List<Map<String, ?>> xdeathHeader =
(List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
if(null != xdeathHeader && null != xdeathHeader.get(0)) {
messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
}
}
if (messageXdeathCount == null) {
messageXdeathCount = 0L;
}
if (messageXdeathCount >= 5) {
MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
MessageProperties messageProps =
messagePropertiesConverter.toMessageProperties(response.getProps(),
response.getEnvelope(), "UTF-8");
resendsMessage(new Message(response.getBody(), messageProps));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
if(response.getProps().getHeaders().get("x-death") == null) {
response.getProps().getHeaders().put("x-death", new ArrayList<>());
}
if(((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0) == null) {
((List<Map<String, Object>>)response.getProps().getHeaders().get("x-death")).add(new HashMap<>());
}
((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0).put(
"count", messageXdeathCount + 1);
channel.basicReject(response.getEnvelope().getDeliveryTag(), true);
}
ReceiveAndReply()
方法当前不提供对接收消息的确认的控制。请随意打开一个新的特性请求。
您可以改用侦听器容器来获得所需的灵活性。
编辑
rabbitTemplate.execute(channel -> {
// basicGet, basicPublish, ack/nack etc here
});
我正在使用 发送和 对于现在从rappid mq接收消息,我希望使用类似以下内容的侦听器: 问题是onMessage监听器与Messages一起工作是否有可能在类似的函数中接收简单的可序列化对象?
我正在尝试在WebLogic 10.3.5上创建一个MDB(EJB 3.0)。在外部AMQ服务器上监听队列。经过大量工作和教程组合,我在webLogic上部署时遇到以下错误。 [EJB:015027]消息驱动的EJB是事务性的,但JNDI名称ActiveMQXAConnectionFactory引用的JMS连接工厂不是JMS XA连接工厂。 以下是我所做工作的简要介绍: 我已经将相应的库添加到我的
问题内容: 一个典型的Redis聊天示例将如下所示(仅举一个这样的示例,请参见https://github.com/emrahayanoglu/Socket.io- Redis-RealTime-Chat- Example/blob/master/chatServer.js ): 但是,这里的问题是,当“断开连接”时,侦听器仍然处于连接状态。控制台将继续打印出。如果要检查的事件列表,他们仍然会发现
问题内容: 我有一个使用“ EXPOSE 8000”构建的泊坞窗容器。我开始这样的过程: 容器中的进程正在侦听8000。 在主机上(即容器外部),我看到端口49164绑定到容器端口8000: Inded,说(除其他事项外) 但是,我无法与容器交谈。外, 在里面 我希望外部的telnet到49164像在内部一样返回html。 有什么建议么? 问题答案: 您可能希望在容器中运行正在运行的服务,而不是监
我有一个Spring应用程序在独立的JBoss EAP 6.2中运行(带有嵌入式HornetQ提供程序)。 消息被成功地放在队列中(我可以在jboss eap-6.2\独立\data\MessagingJournal\hornetq-data-1.hq中看到它们,因为队列是持久的),但不会被侦听器拾取(侦听器中的断点不会被命中)。我怀疑配置中缺少或错误的东西,但看不出是什么。JBoss启动时没有任
问题内容: 我当时在上网,但找不到很好的信息。我试图在每次运行应用程序时检测按键。我正在使用JavaFX并将其与FXML一起运行。我尝试了很多事情,但没有任何效果。请帮我。 问题答案: 您应该签出Ensemble示例。这是关键的侦听器代码。