当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ+SpringAMPQ:在SimpleMessageListenerContainer中接收多个队列

戎劲
2023-03-14

我目前正在学习一些关于RabbitMQ+SpringAMQ的知识,我试图使我的SimpleMessageListenerContainer能够读取传递到我的两个队列的消息,但只有一个队列在接收消息。如果你有任何关于代码的进一步提示,或者如果有任何方法可以更好地做它,请让我知道这将是很好的。

编辑:问题是代码没有将消息发送给两个队列,只发送给一个队列。

基本上我是从两个队列接收的,一个在数据库中插入一些东西,另一个将接收一个验证请求。我希望稍后向路由密钥和交换机发送响应。

下面是我的application.java:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class ChallengeApplication {

    public static void main(String[] args) {
        SpringApplication.run(ChallengeApplication.class, args);
    }

    static final String responseExchange = "response.exchange";
    static final String routingKey = "response.routing.key";
    static final String queueInsertion = "insertion.queue";
    static final String queueValidation = "validation.queue";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    Queue queueInsertion() {
        return new Queue(queueInsertion, true);
    }

    @Bean
    Queue queueValidation() {
        return new Queue(queueValidation, true);
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(responseExchange);
    }

    @Bean
    Binding bindingInsertion(@Qualifier("queueInsertion") Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(queueInsertion);
    }

    @Bean
    Binding queueValidation(@Qualifier("queueValidation") Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(queueValidation);
    }

    @Bean
    SimpleMessageListenerContainer containerValidation(ConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setConcurrentConsumers(3);
        container.setQueueNames(queueValidation, queueInsertion);
        container.setMessageListener(listenerAdapter);
        return container;
    }


    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

}

正如您所看到的,我的SimpleMessageListenerContainer在setQueueNames中有两个我想要侦听的队列。并且我的MessageListenerAdapter将其发送到Receiver.java到方法receiveMessage,另一个问题是如何读取从哪个队列接收消息?这是可能的,还是我应该使用@RabbitListener来将它指向每个地方?

receiver.java

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Pattern;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.stereotype.Component;

import com.axur.challenge.DAO.WhitelistDAO;
import com.axur.challenge.formatters.InputData;
import com.axur.challenge.model.Whitelist;
import com.google.gson.Gson;
import com.google.gson.JsonParseException;

@Component
public class Receiver {

    @Autowired
    private WhitelistDAO whitelistDAO;

    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(String message) {
        try {
            InputData inputData = new Gson().fromJson(message, InputData.class);
            String url = inputData.getUrl();
            if (url == null) {
                receivedInsertion(inputData);
            } else {
                receivedValidation(inputData);
            }
        } catch (JsonParseException e) {
            System.out.println("It was not possible to read the input");
        }
        latch.countDown();
    }

    public void receivedInsertion(InputData inputData) {        
        Whitelist whitelist = new Whitelist();
        String client = inputData.getClient();

        if (client == null) {
            whitelist.setClient("global");
        } else {
            whitelist.setClient(inputData.getClient());
        }

        whitelist.setRegex(inputData.getRegex());

        System.out.println("Whitelist: " + whitelist);

        try {
            Whitelist selectedWhitelist = whitelistDAO.getSpecificWhitelist(whitelist.getClient(), whitelist.getRegex());
            System.out.println("selectedWhitelist: " + selectedWhitelist);
            if (selectedWhitelist == null || !selectedWhitelist.equals(whitelist)) {
                whitelistDAO.insertWhitelist(whitelist);
                System.out.println("Whitelist Added");
            } else {
                System.out.println("Already added before");
            }

        } catch (DataAccessException dae) {
            System.out.println("Whitelist NOT Added");
            System.err.println(dae);
        }

    }

    public void receivedValidation(InputData inputData) {
        //getting all regex from this client
        List<Whitelist> listWhitelist = whitelistDAO.getWhitelist(inputData.getClient(), inputData.getRegex());
        boolean match = false;
        //verify if the regex works for the url provided
        if (listWhitelist != null) {
            int index = 0;
            while (match == false && index < listWhitelist.size()) {
                match = checkRegex(listWhitelist.get(index).getRegex(), inputData.getUrl());
                index++;
            }
        }

        if (match == true) {

        } else {

        }
    }

    public boolean checkRegex(String regex, String url) {
        return Pattern.compile(regex).matcher(url).find();
    }

    public CountDownLatch getLatch() {
        return latch;
    }

}

这是runner.java,我在其中向每个队列发送消息以对其进行测试:

import java.util.concurrent.TimeUnit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class Runner implements CommandLineRunner {

    private final RabbitTemplate rabbitTemplate;
    private final Receiver receiver;

    public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
        this.receiver = receiver;
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public void run(String... args) throws Exception {
            System.out.println("Sending message...");
            rabbitTemplate.convertAndSend(ChallengeApplication.responseExchange, ChallengeApplication.queueInsertion, "{'client':null, 'regex':'[a-z]'}");
            rabbitTemplate.convertAndSend(ChallengeApplication.responseExchange, ChallengeApplication.queueValidation, "{'client':null, 'regex':'[a-z]'}");
            //rabbitTemplate.convertAndSend(ChallengeApplication.responseExchange, ChallengeApplication.routingKey, "{'client':null, 'regex':'[a-z]'}");
            receiver.getLatch().await(3, TimeUnit.SECONDS);
    }
}

谢谢你抽出时间!

共有1个答案

满言
2023-03-14

在您的情况下,我认为您应该使用RabbitListner,并将队列的名称作为参数传递,如下所示:

 @RabbitListener(queues = RabbitTopicConfig.QUEUE_NAME)
    public void consumerDefaultMessage(final Message message){
        LOGGER.info("Recive Message with default configuration {}", message.toString());
    }

您应该在配置类中注入rabbitTemplate作为bean,在这种情况下,您可以在队列存在时使用许多listner:

    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerMessageConverter());
        return rabbitTemplate;
    }
 类似资料:
  • 我运行生产者,它生成N条消息,我在仪表板上看到它们。当我运行接收器时,它会接收来自队列的所有消息,并且队列为空。 我需要有多个生产者生成消息到同一个队列。多个客户从队列中接收消息。消息将被队列TTL删除。但是现在第一个接收者从队列中获取所有消息。我怎么能做到这一点?

  • 我有一个关于RabbitMQ队列的问题。我想在一个队列上发送两种类型的消息。 我知道,我可以创建两个不同的队列,并使用路由键将不同的消息发送到不同的队列。 但是我希望在一个队列上有两个消费者,并以某种方式将消费者与消息类型绑定。它是通过兔子队列驱动的事件,当客户端和核心是发布者和消费者时。 有可能吗?或者我应该使用不同的队列吗? 数据交换

  • 我有以下场景:有3个rabbitmq队列,生产者根据消息的优先级将消息推送到这些队列。(myqueue_high,myqueue_medium,myqueue_low)我希望有一个可以按顺序或优先级从这些队列中提取的单一使用者,即只要消息在那里,它就一直从高队列中提取。它是从介质中拉出来的。如果medium也是空的,它从Low拉出。 我如何实现这一点?我需要编写自定义组件吗?

  • 正如您在日志中看到的那样,流到了最后,然后这个容器崩溃了。不幸的是,异常跟踪中没有有用的信息。我在想为什么会这样。我试图将该流中的消息重定向到MessageChannel中,然后处理它们,但没有帮助。

  • 我有一个由第三方发布的JMS队列。我想在不同的机器上设置多个使用者,只有一台特定机器的使用者确认该队列上的消息。简而言之,如果特定机器的使用者没有接收到消息,那么该消息不应从队列中删除。这是可以实现的吗?

  • 我目前正在尝试使用RabbitMQ(具有出色的RabbitMQBundle)来处理大量的异步工作。 目标是让一个队列发布相同类型的消息,并让多个服务器上的X个工作者在同一时间内查看消息。 每个工人都要偷看一条消息,完成工作,然后偷看另一条消息,等等。 这里是我的conf: 在我的consumer中,我有一个日志文件中的条目和120秒的睡眠。 我启动了php app/console rabbitmq