当前位置: 首页 > 知识库问答 >
问题:

将消息从“回复”队列复制到另一个队列

施鸿
2023-03-14

我主要在RPC模式下使用rabbitMq,但我还想将请求和响应消息复制到另一个队列。

最后,我想实现的是,外部消费者可以通过听一个队列来查看所有流量,我们称之为“日志队列”。

复制传入消息是可以的,我只需要使用扇出交换,或者使用与RPC调用使用的路由密钥相同的路由密钥将日志队列绑定到使用过的交换。

但我无法找到通过直接回复功能“扇出”发送的消息的方法。

到目前为止,我了解到响应消息以amqp的形式通过生成的routing_密钥发送到默认的direct exchange。rabbitmq。回复。generatedName,由于默认的交换是不可接触的,所以我不能复制这些消息。

你知道怎么做吗?

我有一个我宁愿避免的解决方案:让客户端将从回复接收到的响应重新发送到“日志队列”。

但这意味着我的客户要对这个“日志”功能负责,我宁愿不负责。

顺便说一句,即使我认为这不相关,因为这可能是rabbitMq服务器配置问题,我也使用Spring AMQP客户端

共有2个答案

孟均
2023-03-14

我已经找到了实现我想要的东西的方法。

这并不像Gary Russell提议的那样灵活:https://stackoverflow.com/a/59976806/2546702

但是,我可以利用fire hose功能并将amq.rabbitmq.trace上的队列(或交换以获得更多控制)与设置为“发布”的例程键绑定(末尾的点很重要)

这允许记录发布到默认交换的消息,包括回复消息。

当然,使用firehose会对性能产生影响,但就我而言,这并不是什么大问题,因为rabbitmq没有得到充分利用。

因为我有16个队列要听,所以我不想对每个队列使用不同的模板。我可以对所有RPC队列使用单个回复队列,但这将是一个瓶颈。

所以,如果没有硬性酒,消防水龙带似乎是一个不错的选择。

薛征
2023-03-14

你不能用固定的回复来做这件事,因为没有真正的队列/交换。

但是,您可以将每个RabbitTemboard配置为使用固定的回复队列和回复容器将回复从该队列定向到模板。

文件在这里。

此外,使用此机制时,可以将模板的replyAddress配置为交换和路由密钥的形式。

/**
 * An address for replies; if not provided, a temporary exclusive, auto-delete queue will
 * be used for each reply, unless RabbitMQ supports 'amq.rabbitmq.reply-to' - see
 * https://www.rabbitmq.com/direct-reply-to.html
 * <p>The address can be a simple queue name (in which case the reply will be routed via the default
 * exchange), or with the form {@code exchange/routingKey} to route the reply using an explicit
 * exchange and routing key.
 * @param replyAddress the replyAddress to set
 */
public synchronized void setReplyAddress(String replyAddress) {...}

您只需按正常方式设置模板和容器,使模板成为容器的侦听器。。。

@Bean
public RabbitTemplate amqpTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setMessageConverter(msgConv());
    rabbitTemplate.setReplyAddress(replyQueue().getName());
    rabbitTemplate.setReplyTimeout(60000);
    rabbitTemplate.setUseDirectReplyToContainer(false);
    return rabbitTemplate;
}

@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueues(replyQueue());
    container.setMessageListener(amqpTemplate());
    return container;
}

同样,这是从留档。

 类似资料:
  • 为什么已经拥有了共享内存时需要消息队列呢? 这将是多种原因,让我们将其分解为多个点来简化 - 据了解,一旦消息被一个进程接收到,它将不再可用于任何其他进程。 而在共享内存中,数据可供多个进程访问。 如果想使用小信息格式进行通信。 当多个进程同时进行通信时,共享内存数据需要同步保护。 使用共享内存的写入和读取频率很高,那么实现功能将会非常复杂。 在这种情况下不值得使用。 如果所有的进程不需要访问共享

  • 一、消息模型 点对点 发布/订阅 二、使用场景 异步处理 流量削锋 应用解耦 三、可靠性 发送端的可靠性 接收端的可靠性 参考资料 一、消息模型 点对点 消息生产者向消息队列中发送了一个消息之后,只能被一个消费者消费一次。 发布/订阅 消息生产者向频道发送一个消息之后,多个消费者可以从该频道订阅到这条消息并消费。 发布与订阅模式和观察者模式有以下不同: 观察者模式中,观察者和主题都知道对方的存在;

  • 一个线程会从消息队列中收取消息,另一个线程会定时给消息队列发送普通消息和紧急消息 一个线程会从消息队列中收取消息,另一个线程会定时给消息队列发送普通消息和紧急消息 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: *

  • 消息队列接口 结构体 struct   rt_messagequeue   消息队列控制块 更多...   类型定义 typedef struct rt_messagequeue *  rt_mq_t   消息队列类型指针定义   函数 rt_err_t  rt_mq_init (rt_mq_t mq, const char *name, void *msgpool, rt_size_t msg_

  • rabbitmq 使用 定义handler实体 public class UserEvent : EventHandler { public string Name { get; set; } public string Job { get; set; } } 队列定义 [QueueConsumer(nameof(HelloEventHandler), QueueCon

  • 我已经设置了Apache camel,在其中我使用来自一个队列的消息并对其进行某种操作,然后将其传输到其他队列。 现在,如果异常来了,我希望它应该回滚,然后在6次尝试后,它发送到死信队列,目前回滚发生5-6次,但我的消息没有转移到死信队列。 这里会发生什么-->Queue1->>(消耗)-->Operation(引发异常)-->Rollback-->Queue1->>(消耗)-->Operatio