我有一个带有PublisherReturn回调的RabbitTemplate。
经过一些调查,我得出的结论是rabbitTemplate和/或连接将被阻塞,直到原始消息被完全处理。
以下是我所拥有的简化细节:
@Configuration
public class MyConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setPublisherReturns(true);
// ... other settings left out for brevity
return connectionFactory;
}
@Bean
@Qualifier("rabbitTemplate")
public RabbitTemplate rabbitTemplate(ReturnCallbackForAlternative returnCallbackForAlternative) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(returnCallbackForAlternative);
// ... other settings left out for brevity
return rabbitTemplate;
}
@Bean
@Qualifier("connectionFactoryForUndeliverable")
public ConnectionFactory connectionFactoryForUndeliverable() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
// ... other settings left out for brevity
return connectionFactory;
}
@Bean
@Qualifier("rabbitTemplateForUndeliverable")
public RabbitTemplate rabbitTemplateForUndeliverable() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactoryForUndeliverable());
// ... other settings left out for brevity
return rabbitTemplate;
}
}
然后发送我正在使用的信息
@Autowired
@Qualifier("rabbitTemplate")
private RabbitTemplate rabbitTemplate;
public void send(Message message) {
rabbitTemplate.convertAndSend(
"exchange-name",
"primary-key",
message);
}
ReturnCallback中的代码是
@Component
public class ReturnCallbackForAlternative implements RabbitTemplate.ReturnCallback {
@Autowired
@Qualifier("rabbitTemplateForUndeliverable")
private RabbitTemplate rabbitTemplate;
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
rabbitTemplate.convertAndSend(
"exchange-name",
"alternative-key",
message);
}
}
in returnCallback before message send
in returnCallback after message send
如果注释掉ConnectionFactory.SetPublisherConnections(true);
,它运行正常。
@SpringBootApplication
public class HangingApplication {
public static void main(String[] args) {
SpringApplication.run(HangingApplication.class, args);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setPublisherReturns(true);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange("foo");
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("Confirm callback for main template. Ack=" + ack);
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("in returnCallback before message send");
rabbitTemplate.send("foo", message);
System.out.println("in returnCallback after message send");
});
return rabbitTemplate;
}
@Bean
public ApplicationRunner runner(@Qualifier("rabbitTemplate") RabbitTemplate template) {
return args -> {
template.convertAndSend("BADKEY", "foo payload");
};
}
@RabbitListener(queues = "foo")
public void listen(String in) {
System.out.println("Message received on undeliverable queue : " + in);
}
}
这是我用过的建筑物:
plugins {
id 'org.springframework.boot' version '2.1.5.RELEASE'
id 'java'
}
apply plugin: 'io.spring.dependency-management'
group 'pcoates'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.11
repositories {
mavenCentral()
}
dependencies {
compile 'org.springframework.boot:spring-boot-starter-amqp'
}
它会在amqp-client代码中造成某种死锁。最简单的解决方案是在单独的线程上执行发送--在回调中使用taskexecutor
...
exec.execute(() -> template.send(...));
您可以使用相同的模板/连接工厂,但发送必须在不同的线程上运行。
我以为我们最近改变了框架,总是在一个不同的线程上调用返回回调(在最后一个人报告了这件事之后),但它看起来像是从缝隙中掉下来的。
@SpringBootApplication
public class So57234770Application {
public static void main(String[] args) {
SpringApplication.run(So57234770Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
template.send("foo", message);
});
return args -> {
template.convertAndSend("BADKEY", "foo");
};
}
@RabbitListener(queues = "foo")
public void listen(String in) {
System.out.println(in);
}
}
我正在使用Spring AMQP侦听RabbitMQ队列。在侦听队列时,根据业务逻辑,我的服务可以引发RuntimeException,在这种情况下,消息将重试多次。在最大次数重试后,消息将保留在DLQ中。我想知道,在DLQ中处理这些消息的最佳方法是什么?我从博客上读到我可以使用停车场队列。但在这种情况下,如何监控队列并通知人们死信消息?P、 对不起,我的英语不好。希望我能够解释我的问题:)
null 谁能给我一个向RabbitMQ发送消息的标准程序的例子。我正在使用Spring Boot,也可以使用它的特性。
我想使用rabbitMq队列中Storm喷口中的消息。 现在,我们使用Spring AMQP异步发送和接收来自RabbitMq的消息。 Spring AMQP提供了从队列读取消息的机制(创建监听器或使用注释@RabbitListner)。 问题是我可以让一个侦听器从队列中读取消息。但是,我如何将此消息发送到Storm群上运行的Storm喷口? 拓扑将启动一个集群,但在我的spout的nextTup
我需要在一定的持续时间后将消息发送给MessageListener,所以有没有任何方法可以使用SpringAMQP实现。 如。Producer生成消息并将消息发送到RabbitMQ Q,该消息立即被侦听器接收到,我想延迟消费者端接收到的消息,比如说在一些配置参数(比如1000ms)之后
如标题所述,我想使用RabbitMQ向Websocket发送消息。使用AngularJS前端,我想从Websocket读取RabbitMQ消息,并将它们打印到控制台。原则上,我的代码似乎是可行的,尽管我不知道如何获得消息的实际(字符串)内容? 后端:为了创建Websocket并执行路由,我使用Spring Boot和Apache Camel:http://camel.Apache.org/Spri
本文向大家介绍RabbitMQ 的消息是怎么发送的?相关面试题,主要包含被问及RabbitMQ 的消息是怎么发送的?时的应答技巧和注意事项,需要的朋友参考一下 首先客户端必须连接到 RabbitMQ 服务器才能发布和消费消息,客户端和 rabbit server 之间会创建一个 tcp 连接,一旦 tcp 打开并通过了认证(认证就是你发送给 rabbit 服务器的用户名和密码),你的客户端和 Ra