当前位置: 首页 > 知识库问答 >
问题:

带回调的RabbitTemplate异步消息

邓阳炎
2023-03-14
@Autowired(required = false)
@Qualifier("rabbitTemplate")
private RabbitTemplate queueTemplate;

@Override
public Response createIncident(String incident) {
    LOGGER.info("Sending incident into queue");
    queueTemplate.convertAndSend((Object)incident, new MessagePostProcessor() {

        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setReplyTo("replyQueue");
            return message;
        }
    });
    Message message = queueTemplate.receive();
...
<rabbit:admin connection-factory="rabbitConnFactory" />

<rabbit:queue name="rabbitQueue" />
<rabbit:queue name="replyQueue" />

<rabbit:topic-exchange name="rabbitExchange">
    <rabbit:bindings>
        <rabbit:binding queue="rabbitQueue" pattern="camel" />
        <rabbit:binding queue="replyQueue" pattern="camel" />
    </rabbit:bindings>
</rabbit:topic-exchange>


<rabbit:template id="rabbitTemplate" 
    connection-factory="rabbitConnFactory" exchange="rabbitExchange" routing-key="camel"
    reply-address="replyQueue" reply-queue="replyQueue">
    <rabbit:reply-listener/>
</rabbit:template>

<rabbit:listener-container
    connection-factory="rabbitConnFactory" concurrency="10">
    <rabbit:listener ref="rabbitMessageListener" method="createIncident"
        queue-names="rabbitQueue" />
</rabbit:listener-container>

<bean id="rabbitMessageListener" class="com.my.listener.QueueListener" />

和响应的侦听器

@Autowired(required = false)
@Qualifier("rabbitTemplate")
private RabbitTemplate queueTemplate;

public void createIncident() {
    queueTemplate.receiveAndReply("replyQueue", new ReceiveAndReplyMessageCallback() {

        @Override
        public Message handle(Message message) {
            String incident = new String(message.getBody());
            LOGGER.debug("Queue incoming message: " + incident);
            String result = "" + messageHandler.createIncident(incident);
            return new Message(result.getBytes(), new MessageProperties());
        }

    }); 

它不想进入侦听器中的这个句柄消息。和应用程序容器显示消息

共有1个答案

澹台浩广
2023-03-14

您应该使用:

/**
 * The name of the default queue to receive messages from when none is specified explicitly.
 *
 * @param queue the default queue name to use for receive
 */
public void setQueue(String queue) {
    this.queue = queue;
}

而不是回复地址

重新考虑你的

queueTemplate.receiveAndReply("replyQueue", 
 类似资料:
  • 问题内容: 如何快速进行异步回调?我正在为我的应用程序编写一个小框架,因为它应该同时在iOS和OS X上运行。因此,我将非特定于设备的主要代码放入该框架中,该框架还处理对我的在线api的请求。很显然,我也希望应用程序的GUI以及ViewController在api请求完成后立即做出反应。在Objective- C中,我通过将包含必须在id变量中调用的函数以及函数本身的视图保存在选择器变量中的视图来

  • 几周前刚开始学Node.js....我不明白为什么“products”数组包含null而不是所需的对象.... 在第13行,当我对对象进行控制台日志记录时,我得到了所需的对象,但我不明白当我在map函数完成它的执行后在第40行对它们进行控制台日志记录时,它们为什么是空的.... 如果数组长度是2(这意味着推入成功),为什么里面存储的对象仍然是空的,而不是我想要存储的对象? 控制台输出 订单模式

  • 稳定性: 1 - 试验的 async_hooks 模块提供了一个用于注册回调函数的 API,这些回调函数可追踪在 Node.js 应用中创建的异步资源的生命周期。可以通过以下方式使用: const async_hooks = require('async_hooks'); Terminology An asynchronous resource represents an object with

  • 所以问题是在这种情况下notificationPhoneNumber对象锁定了多长时间?它是否会在线程完成其工作时被锁定?

  • 我正在尝试创建一个回调函数: 首先,我创建了一个函数接口,用于定义回调函数的约定 我创建了一个类,该类将定义一个方法来调用我的回调(我通过使用lambda表达式传递了接口的实现作为对此方法的引用) 下面是我的代码: 但当我运行这段代码时,我得到了这样的结果。 有人能告诉我为什么我会有这个例外吗?

  • 我需要捕获异步发送到Kafka时的异常。Kafka producer Api附带一个函数send(ProducerRecord记录、回调)。但当我针对以下两种情况进行测试时: Kafka经纪人倒下 主题没有预创建回调没有被调用。相反,我在代码中收到发送不成功的警告(如下所示)。 问题: > 那么回调是否只针对特定的异常调用? Kafka客户端何时尝试在异步发送时连接到Kafka代理:每次批处理发送