rocketmq-spring-boot-starter之消费者消息确认

蔚元明
2023-12-01

       最近搞了一下rocketmq消息中间件,使用了Apache提供的与springboot封装的rocketmq-spring-boot-starter,版本是2.1.0的,生产者使用方式和其他的消息中间件一样如下:

@Resource
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping(value = "/mq/{name}")
    public String qmtest(@PathVariable("name")String name) throws Exception {
        // 发送消息到一个Broker
        SendResult sendResult = rocketMQTemplate.syncSend("TopicTest", name);
        // 通过sendResult返回消息是否成功送达
        System.out.printf("%s%n", sendResult);
        return null;
    }

       消费者却提供了很多监听的类,主要在org.apache.rocketmq.spring.core这个路径下面,这里就不一一的赘述了,我想要说的问题是在这些监听接口中并没有提供一个消息消费是否成功确认机制,这个就有点让人抓狂了,我如果消息不能正确消费我想要重发怎么整,这个不靠谱啊,我们看下其中一个消费实现的接口方式

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "${carInInfo.topic}",
        topic = "${carInInfo.topic}", selectorExpression = "*",
        consumeMode = ConsumeMode.ORDERLY)
public class CarInParkSynThirdMQ implements RocketMQListener<AddCarInParkDTO> {
   
    /**
     * 请不要捕获异常信息,否则无法进行消息重新推送
     *
     * @param addCarInParkDTO
     */
    @Override
    public void onMessage(AddCarInParkDTO addCarInParkDTO) {
        System.out.println("收到消息:" + JSON.toJSONString(addCarInParkDTO));
    }
}

       实现方式很简单吧,但是你也看见了代码中就没有消息能够消费是否成功后的确认方式,因为实现的onMessage()方法是个void的,还好看过原始的rocketmq的消费者实现方式,也就是rocketmq-client.jar的实现,它是MessageListener.java类来实现消息监听接收的,而它有2个继承接口类MessageListenerConcurrently.java和MessageListenerOrderly.java,这样就好找了,直接收一下这2个接口的实现类,乖乖,果然找到了在rocket-spring-boot的jar里面,就是DefaultRocketMQListenerContainer.java这个类,看下其中一个实现

public class DefaultMessageListenerOrderly implements MessageListenerOrderly {

        @SuppressWarnings("unchecked")
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    handleMessage(messageExt);
                    long costTime = System.currentTimeMillis() - now;
                    log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    log.warn("consume message failed. messageExt:{}", messageExt, e);
                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }

            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

       看到没有,只要没有异常出现,那么就会消费成功,有异常出现了就重新进行发送,那这个又是在哪里调用的呢?再看下这个private方法就明白了

private void initRocketMQPushConsumer() throws MQClientException {
       ......

        switch (consumeMode) {
            case ORDERLY:
                consumer.setMessageListener(new DefaultMessageListenerOrderly());
                break;
            case CONCURRENTLY:
                consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                break;
            default:
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
        }

        if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
        } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
        }

    }

      同样在DefaultRocketMQListenerContainer.java这个类里面进行了调用设置,因为这个类实现了spring的InitializingBean.java接口,因此在afterPropertiesSet()方法里进行了初始化,所以现在我们就可以放心的进行消费了,当然你也可以记录一条消息的消费次数,如果出现异常次数过来,你可以记录在一个专门的表里,同时也要捕获异常,不让消息重发了。

 类似资料: