当前位置: 首页 > 知识库问答 >
问题:

如何侦听动态创建的队列?

宰修能
2023-03-14

我有一个rabbitListener,它连续异步地侦听队列“用户消息”的用户消息。除非队列中加载了大量消息,否则一切都正常。当消息批量发布到队列时,同一用户的消息首先被处理,从而其他用户的消息等待轮到他们。

我无法使用优先级队列,因为所有用户的优先级都相同。所以我想创建新的队列,并在运行时监听它们。一旦消息被使用,所有队列都将是短暂的。(队列将被删除)

在浏览时,我发现可以使用RabbitAdmin动态创建队列。但问题是

  1. 如何让我的听众收听在运行时创建的新短活(TTL)队列?
  2. 如何使侦听器停止监听已删除的队列(在TTL时间之后)以避免异常?

目前,我正在使用SimpleMessageListenerContainerFactory。我也可以使用DirectMessageListenerContainer。我唯一关心的是如何就动态队列创建进行通信

spring amqp是否有支持启动/停止侦听动态队列的方法。提前谢谢。

    @Bean
    public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(config.getConnectionFactory());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(3);
        return factory;
    }

    @RabbitListener(id = "listener", queues = {
            "#{receiver.queues()}" }, containerFactory = "myRabbitListenerContainerFactory")
    public void listen(QueueMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
            MessageHeaders headers) {
         //process message
    }


  [1]: https://www.rabbitmq.com/event-exchange.html

共有1个答案

南宫泓
2023-03-14

这个怪人似乎正是这么做的=

链接中的代码:

  • rabbitMQ配置
@Configuration
public class RabbitMqConfiguration implements RabbitListenerConfigurer {
    @Autowired
    private ConnectionFactory connectionFactory;
    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    @Bean
    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();
    }
    @Bean
    public RabbitTemplate rabbitTemplate() {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        return rabbitTemplate;
    }
    @Bean
    public RabbitAdmin rabbitAdmin() {
        return new RabbitAdmin(connectionFactory);
    }
    @Bean
    public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }
    @Bean
    public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(consumerJackson2MessageConverter());
        return factory;
    }
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    @Override
    public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setPrefetchCount(1);
        factory.setConsecutiveActiveTrigger(1);
        factory.setConsecutiveIdleTrigger(1);
        factory.setConnectionFactory(connectionFactory);
        registrar.setContainerFactory(factory);
        registrar.setEndpointRegistry(rabbitListenerEndpointRegistry());
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }
}
  • 接口
public interface RabbitQueueService {
    void addNewQueue(String queueName,String exchangeName,String routingKey);
    void addQueueToListener(String listenerId,String queueName);
    void removeQueueFromListener(String listenerId,String queueName);
    Boolean checkQueueExistOnListener(String listenerId,String queueName);
}
  • 服务
@Service
@Log4j2
public class RabbitQueueServiceImpl implements RabbitQueueService {
    @Autowired
    private RabbitAdmin rabbitAdmin;
    @Autowired
    private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

    @Override
    public void addNewQueue(String queueName, String exchangeName, String routingKey) {
        Queue queue = new Queue(queueName, true, false, false);
        Binding binding = new Binding(
                queueName,
                Binding.DestinationType.QUEUE,
                exchangeName,
                routingKey,
                null
        );
        rabbitAdmin.declareQueue(queue);
        rabbitAdmin.declareBinding(binding);
        this.addQueueToListener(exchangeName,queueName);
    }

    @Override
    public void addQueueToListener(String listenerId, String queueName) {
        log.info("adding queue : " + queueName + " to listener with id : " + listenerId);
        if (!checkQueueExistOnListener(listenerId,queueName)) {
            this.getMessageListenerContainerById(listenerId).addQueueNames(queueName);
            log.info("queue ");
        } else {
            log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId);
        }
    }

    @Override
    public void removeQueueFromListener(String listenerId, String queueName) {
        log.info("removing queue : " + queueName + " from listener : " + listenerId);
        if (checkQueueExistOnListener(listenerId,queueName)) {
            this.getMessageListenerContainerById(listenerId).removeQueueNames(queueName);
            log.info("deleting queue from rabbit management");
            this.rabbitAdmin.deleteQueue(queueName);
        } else {
            log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId);
        }
    }

    @Override
    public Boolean checkQueueExistOnListener(String listenerId, String queueName) {
        try {
            log.info("checking queueName : " + queueName + " exist on listener id : " + listenerId);
            log.info("getting queueNames");
            String[] queueNames = this.getMessageListenerContainerById(listenerId).getQueueNames();
            log.info("queueNames : " + new Gson().toJson(queueNames));
            if (queueNames != null) {
                log.info("checking " + queueName + " exist on active queues");
                for (String name : queueNames) {
                    log.info("name : " + name + " with checking name : " + queueName);
                    if (name.equals(queueName)) {
                        log.info("queue name exist on listener, returning true");
                        return Boolean.TRUE;
                    }
                }
                return Boolean.FALSE;
            } else {
                log.info("there is no queue exist on listener");
                return Boolean.FALSE;
            }
        } catch (Exception e) {
            log.error("Error on checking queue exist on listener");
            log.error("error message : " + ExceptionUtils.getMessage(e));
            log.error("trace : " + ExceptionUtils.getStackTrace(e));
            return Boolean.FALSE;
        }
    }

    private AbstractMessageListenerContainer getMessageListenerContainerById(String listenerId) {
        log.info("getting message listener container by id : " + listenerId);
        return ((AbstractMessageListenerContainer) this.rabbitListenerEndpointRegistry
                .getListenerContainer(listenerId)
        );
    }
}
 类似资料:
  • 兔子配置: 应用概述:每当gitRepository连接到我们的应用程序时,存储库名称就会成为交换名称,在这种情况下,然后存储库的每个分支都会创建自己的队列,这里有两个队列和。现在每次在开发分支中创建拉取请求时,我都需要将信息传递给开发队列,并且应该由特定的侦听器侦听,该侦听器应该仅注册用于开发。我看到了动态队列的示例,但我似乎找不到任何关于如何创建将使用不同线程执行的动态侦听器的示例,我如何实现

  • 问题内容: 这是我的情况: 当用户登录到我的网站时,我为给定的用户排队执行一系列任务(通常每个任务花费100毫秒的时间,每个用户有100毫秒的任务)。这些任务排队到默认的Celery队列中,而我有100的工人正在运行。我使用websockets在后端完成任务时向用户显示实时进度。如果我只有1个或2个用户处于活动状态,那么生活会很好。 现在,如果我有几个并发用户登录到我的站点,则后一个用户将排在初始

  • 问题内容: 是否可以向所有动态生成的元素添加事件侦听器(Javascript)?我不是页面的所有者,因此无法以静态方式添加侦听器。 对于页面加载时创建的所有元素,我使用: 当页面上出现新元素时,我需要一种方法来调用此代码,但是我无法使用jQuery(在项目中无法使用elegate,on等)。我怎样才能做到这一点? 问题答案: 听起来您需要执行委派策略而又不退回图书馆。我在此处的小提琴中发布了一些示

  • 我有一个项目,我们将在rabbit中有数百个(可能数千个)队列,每个队列都需要一个消费者池来使用。 在rabbit(使用spring amqp)中,您有rabbitlistener注释,它允许我静态地分配这个特定消费者将处理的队列。 我的问题是,对于rabbit和spring,是否有一种干净的方法可以让我获取一段队列(比如以a-c开头的队列),然后还可以侦听消费者运行时创建的任何队列。 示例(开始

  • 问题内容: 假设我使用statement定义了一些变量。代码运行时,变量的值会更改。 如何跟踪此变量的变化?我如何实现某些行为类似于onSomeVariableChangedListener的侦听器? 我还需要知道何时在一个页面中执行了其他方法,以便可以在另一个类中设置侦听器。 问题答案: 这是将变量隐藏在 setter / getter 对后面的众多原因之一。然后,在设置器中,您可以通知您的侦听

  • 问题内容: 我有一个项目,我们将在兔子中有数百个(可能是数千个)队列,并且这些队列中的每个队列都需要由一组消费者使用。 在Rabbit(使用spring-amqp)中,您具有rabbitlistener批注,该批注使我可以静态分配此特定消费者将要处理的队列。 我的问题是-对于兔子和春天,有没有一种干净的方法可以让我抓取一部分队列(比如说以ac开头的队列),然后还监听使用者运行时创建的任何队列。 示