当前位置: 首页 > 工具软件 > xxl-mq > 使用案例 >

xxl-job如何去控制rabbitmq消费者的消费?

陶泳
2023-12-01

领导安排指定xxl-job去控制消费者的消费,通过百度知道rabibtmq中注解autoStartup是可以控制消费者去消费的,默认是true,如果是false,队列可以接受数据,但是不消费,根据这个注解,我在xxl-job中配置一个开关

1.开启消费者的能力开关

 

同时将消费者中注解的 autoStartup属性置为rabbitmqFlag

@RabbitListener(queues = {"${spring.rabbitmq.queue}"}, concurrency = "1", autoStartup= "rabbitmqFlag")

此时已经配置完成了,但是@RabbitListener注解其实底层就是用RabbitListenerEndpointRegistry,可以自行百度了解,通过方法.start()就会开启队列,这样就可以正常跑任务了

2.暂停消费者的消费能力

private void closeQueue(String queueName){
    //消费者关闭
    rabbitmqFlag = false;
    //关闭消费者
    stop(queueName);
}
/**
 * 判断监听器是否监听了指定的队列。
 * @param queueName 队列名称
 * @param listenerContainer 监听容器
 * @return true-监听,false-未监听。
 */
private boolean isQueueListener(String queueName, MessageListenerContainer listenerContainer) {
    if (listenerContainer instanceof AbstractMessageListenerContainer) {
        AbstractMessageListenerContainer abstractMessageListenerContainer = (AbstractMessageListenerContainer) listenerContainer;
        String[] queueNames = abstractMessageListenerContainer.getQueueNames();
        return ArrayUtils.contains(queueNames, queueName);
    }
    return false;
}

/**
 * 停止指定队列()
 * @param queueName 队列名称
 * @return true-监听,false-未监听。
 */
public boolean stop(String queueName) {
    Collection<MessageListenerContainer> listenerContainers = registry.getListenerContainers();
    for (MessageListenerContainer listenerContainer : listenerContainers) {
        if (this.isQueueListener(queueName, listenerContainer)) {
            listenerContainer.stop();
            return true;
        }
    }
    return false;
}

ok完美解决了

 类似资料: