当前位置: 首页 > 知识库问答 >
问题:

使用Spring在RabbitMQ中创建新队列

司空鸣
2023-03-14

我在 RabbitMQ 中创建新队列时遇到了一些问题。我只创建一个使用者客户端,该客户端将使用来自另一个微服务的消息。

这是我到目前为止所做的。

应用程序属性:

spring.rabbitmq.addresses=SCGLCCRAMQD0005.SCGER.DEV.CORP:5672
spring.rabbitmq.username=people-consumer
spring.rabbitmq.password=*************
spring.rabbitmq.vhost=PEOPLE

peopleevents.queue=qu-people-cores-update
peopleevents.exchange=ex-people-updates
peopleevents.routingkey=ONLINE.UPDATE.PERSONF.PERSONF

配置类:

@Configuration
public class MQConfig {


    @Value("${peopleevents.queue}")
    public String queue;

    @Value("${peopleevents.exchange}")
    public String exchange;

    @Value("${peopleevents.routingkey}")
    public String routingKey;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.addresses}")
    private String address;

    @Value("${spring.rabbitmq.vhost}")
    private String vHost;


    @Bean
    public Queue queue() {
        return new Queue(queue, true, false, false);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(exchange);
    }

    //@Bean
    //Exchange myExchange() {
     //   return ExchangeBuilder.topicExchange(exchange).durable(true).build();
    //}

    @Bean
    public Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(routingKey);
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate template(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }

    @Bean
    public ConnectionFactory connectionFactory() throws IOException {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(address);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vHost);
        connectionFactory.setPublisherReturns(true);

        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

}

和侦听器类:

@Service
public class RabbitMQService {

        private final Logger logger = LoggerFactory.getLogger(RabbitMQService.class);

        @RabbitListener(queues = "${peopleevents.queue}")
        public void receivedMessage(@Payload Message message) {
            logger.info("User Details Received is.. " + message.toString());
        }
    }

当我运行这个程序时,我有一条ACCESS_REFUSED消息,但我不知道为什么。我错过了什么吗??

谢谢

共有1个答案

许典
2023-03-14

您可以使用 RabbitAdmin 动态创建队列,因此您需要将 RabbitAdmin bean 添加到配置类中:

@Bean
public AmqpAdmin amqpAdmin() {
    return new RabbitAdmin(connectionFactory());
}

然后,您可以使用绑定队列配置自动连线 RabbitAdmin

@Autowired
private AmqpAdmin admin;

@Autowired
private Binding binding;

@Autowired
private Queue queue;

最后,您可以根据需要创建队列:

admin.declareQueue(queue);
admin.declareBinding(binding);

您可以在配置或服务类中创建队列。

您可以在构造函数中的服务类中实现它:

@Service
public class RabbitMQService {

        private final Logger logger = LoggerFactory.getLogger(RabbitMQService.class);

        private AmqpAdmin admin;

        private Binding binding;

        private Queue queue;

        @Autowired
        public RabbitMQService (AmqpAdmin admin, Binding binding, Queue queue) {
                this.admin = admin;
                this.binding = binding;
                this.queue = queue;
                admin.declareQueue(queue);
                admin.declareBinding(binding);
        }


        @RabbitListener(queues = "${peopleevents.queue}")
        public void receivedMessage(@Payload Message message) {
            logger.info("User Details Received is.. " + message.toString());
        }
}
 类似资料:
  • 我花了整整一天的时间来尝试让spring-AMQP示例项目在docker版本的RabbitMQ上运行。我只是在运行标准的rabbitmq Docker。虽然我没有连接问题,但我总是得到与创建队列相关的异常,并且我已经尝试了所有可能的变体,在这一点上。 我尝试在我的配置中声明队列,就像示例项目一样。我尝试显式配置RabbitAdmin。我已经尝试显式配置整个自动配置混乱。我在rabbitmq中创建了

  • 问题内容: 根据我对Rabbit-mq的(有限的)经验,如果您为尚不存在的队列创建新的侦听器,则会自动创建该队列。我试图将Spring AMQP项目与rabbit- mq一起使用来设置侦听器,但是却出现错误。这是我的xml配置: 我在RabbitMq日志中得到了这个: 还有来自AMQP的类似错误: 从堆栈跟踪中可以看出,队列是在“被动”模式下创建的-任何人都可以指出我如何不使用被动模式来创建队列,

  • 我有两个独立的应用程序,一个是生产者,一个是具有交换类型默认(DIRECT)的消费者。 下面是带有死信队列设置的Rabbit MQ Producer的配置。 现在,这里是单独应用程序中Rabbit MQ listener的代码。 现在,当我运行消费者服务时,它会创建一个同名的重复队列。如下图所示 Rabbit MQ GUI 我也在消费者中尝试了以下设置,但结果相同 注意:当我在生产者中添加死信队列

  • 我收到了一个关于这个问题的建议,一个评论,说我可能没有我需要的交换,因为我的队列已经存在。于是,我就全部手动删除了。 但是,在重新部署应用程序时,我发现所有队列都有以下异常: 我不确定这意味着什么,因为我显然想要创建队列<code>myInput。组,它似乎在抱怨它不存在。。。 我也检查了这个问题,我的问题可能是权限。但我应该有它们——否则我相信我无法删除队列...... 你能给我一个解决方案吗?

  • 我是这个消息队列的新手,刚刚开始学习一些基本的东西。 因此,对于我们的Spring Boot应用程序,我们遵循了contoller talks to service&service talks to repository这样的体系结构,所以在这里,我必须创建一个控制器,它将接受类DTO作为json,并将这些信息发布到apache Camel中指定的消息队列。我在跟踪这个链接!对于我的参考,工作良好

  • 我对Spring和RabbitMQ有以下配置: 为了防止这种情况,我想创建持久队列(因为这些队列的持久度设置为false,自动删除设置为true)。如果不是,我想在这些队列上设置一些过期时间(例如,1小时或其他时间)。从RabbitMQ文档来看,我们似乎可以在头中传递这些值,但是,这只适用于版本3.6.0以后的版本,因为我们有3.5.4,它不是一个选项。 有没有其他方法可以配置它?(另一种方法是为