当前位置: 首页 > 知识库问答 >
问题:

动态队列和侦听器,未发送消息?

柯捷
2023-03-14

兔子配置:

package com.rabbitMQ;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.URI;
import java.net.URISyntaxException;

@EnableRabbit
@Configuration
public class RabbitMqConfig {
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);

    @Value("${spring.rabbitmq.addresses}")
    private String addressURL;


    @Bean
    public ConnectionFactory connectionFactory() throws URISyntaxException {
        return new CachingConnectionFactory(new URI(addressURL));
    }

    /**
     * Required for executing adminstration functions against an AMQP Broker
     */
    @Bean
    public AmqpAdmin amqpAdmin() throws URISyntaxException {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate rabbitTemplate() throws URISyntaxException {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }


}

应用概述:每当gitRepository连接到我们的应用程序时,存储库名称就会成为交换名称,在这种情况下ForceCI,然后存储库的每个分支都会创建自己的队列,这里有两个队列开发master。现在每次在开发分支中创建拉取请求时,我都需要将信息传递给开发队列,并且应该由特定的侦听器侦听,该侦听器应该仅注册用于开发。我看到了动态队列的示例,但我似乎找不到任何关于如何创建将使用不同线程执行的动态侦听器的示例,我如何实现这一点?此外,我正在尝试将一些消息发送到队列作为测试,但我无法在控制台中看到它们。(下面的代码)

@RequestMapping(value = "/createExchange", method = RequestMethod.GET)
public void createExchange(ServletResponse response, ServletRequest
        request) throws URISyntaxException {


    rabbitMqConfig.amqpAdmin().declareExchange(new DirectExchange("ForceCI"));

}

@RequestMapping(value = "/createDynamicQueues", method = RequestMethod.GET)
public void createDynamicQueues(@RequestParam String branchName, ServletResponse response, ServletRequest
        request) throws URISyntaxException {
    Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties(branchName);

    System.out.println("develop -> "+develop);
    if(develop != null && develop.stringPropertyNames() != null && !develop.stringPropertyNames().isEmpty()) {
        for (String stringPropertyName : develop.stringPropertyNames()) {
            String property = develop.getProperty(stringPropertyName);
            System.out.println("property Value -> " + property + " ---- " + "property key -> " + stringPropertyName);
        }
    } else {
                    Queue queue = new Queue(branchName, true);
        String develop1 = rabbitMqConfig.amqpAdmin().declareQueue(new Queue(branchName, true));
        rabbitMqConfig.amqpAdmin().declareBinding(BindingBuilder.bind(queue).to(new DirectExchange("ForceCI")).withQueueName());
        System.out.println(develop1);
    }
}

@RequestMapping(value = "/sendMessageToQueuesDevelop", method = RequestMethod.GET)
public void sendMessageToQueuesDevelop(ServletResponse response, ServletRequest
        request) throws URISyntaxException {


    Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties("develop");
    String queue_name = develop.getProperty("QUEUE_NAME");

    rabbitTemplate.convertAndSend("ForceCI", queue_name, "TestMessage");


}

@RequestMapping(value = "/sendMessageToQueuesMaster", method = RequestMethod.GET)
public void sendMessageToQueuesMaster(ServletResponse response, ServletRequest
        request) throws URISyntaxException {


    Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties("master");
    String queue_name = develop.getProperty("QUEUE_NAME");

    rabbitTemplate.convertAndSend("ForceCI", queue_name, "TestMessage1");


}

使现代化

缺少绑定,当我在代码中给出如上所示的绑定时,消息开始进入,但我仍然不知道如何在不同的侦听器中侦听这些消息并在不同的线程中处理它们?

共有1个答案

澹台星剑
2023-03-14

最简单的方法是使用DirectMessageListenerContainer,并根据需要向其中添加队列。不过,您不会为每个队列获得一个新线程;通过直接容器,监听器在amqp客户端线程池的线程上调用。

直接容器在添加队列方面很有效;如果需要,您可以从零队列开始。有关详细信息,请参阅选择容器。

如果必须为每个队列创建一个新线程,则必须为每个队列手动创建(并管理)一个SimpleMessageListenerContainer。

 类似资料:
  • 我有一个rabbitListener,它连续异步地侦听队列“用户消息”的用户消息。除非队列中加载了大量消息,否则一切都正常。当消息批量发布到队列时,同一用户的消息首先被处理,从而其他用户的消息等待轮到他们。 我无法使用优先级队列,因为所有用户的优先级都相同。所以我想创建新的队列,并在运行时监听它们。一旦消息被使用,所有队列都将是短暂的。(队列将被删除) 在浏览时,我发现可以使用RabbitAdmi

  • 我有一个项目,我们将在rabbit中有数百个(可能数千个)队列,每个队列都需要一个消费者池来使用。 在rabbit(使用spring amqp)中,您有rabbitlistener注释,它允许我静态地分配这个特定消费者将处理的队列。 我的问题是,对于rabbit和spring,是否有一种干净的方法可以让我获取一段队列(比如以a-c开头的队列),然后还可以侦听消费者运行时创建的任何队列。 示例(开始

  • 我正在尝试使用spring cloud AWS注释驱动的队列侦听器编写一个使用AWS SQS的web应用程序,下面是我的代码的样子: XML AWS bean: 然后我编写了这个类,它有一个带有SqsListener anotation的方法,它将hello打印到控制台: 这是我的gradle构建文件: 但是当我运行这个应用程序的时候,我对java和spring boot是个新手,有没有做错什么

  • 我试图让队列在laravel 5中工作,队列侦听器正在输出: 未定义索引:表 存在"作业"和"failed_jobs"表,config.php设置为"数据库"。 搜索laravel论坛和google都没有找到解决办法,艾米的想法去哪里找?

  • 问题内容: 我有一个项目,我们将在兔子中有数百个(可能是数千个)队列,并且这些队列中的每个队列都需要由一组消费者使用。 在Rabbit(使用spring-amqp)中,您具有rabbitlistener批注,该批注使我可以静态分配此特定消费者将要处理的队列。 我的问题是-对于兔子和春天,有没有一种干净的方法可以让我抓取一部分队列(比如说以ac开头的队列),然后还监听使用者运行时创建的任何队列。 示

  • 我需要在一定的持续时间后将消息发送给MessageListener,所以有没有任何方法可以使用SpringAMQP实现。 如。Producer生成消息并将消息发送到RabbitMQ Q,该消息立即被侦听器接收到,我想延迟消费者端接收到的消息,比如说在一些配置参数(比如1000ms)之后