RocketMQ三、Consumer的启动

娄嘉石
2023-12-01

我们先看一下RocketMQAutoConfiguration的定义:

@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({ MQAdmin.class, ObjectMapper.class })
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
@Import({ JacksonFallbackConfiguration.class, ListenerContainerConfiguration.class })
@AutoConfigureAfter(JacksonAutoConfiguration.class)
public class RocketMQAutoConfiguration {
    ...
}

这里通过@Import注解引入了两个配置类,JacksonFallbackConfiguration是和json相关的暂时先不用管。我们重点关注ListenerContainerConfiguration这个配置类:

@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
 	...   
}

实现了ApplicationContextAware和SmartInitializingSingleton,Aware比较好理解,就是要拿到applicationContext。

我们看一下SmartInitializingSingleton接口,要实现afterSingletonsInstantiated()方法,即Spring容器中所有单例对象初始化完成后会执行的方法:

@Override
public void afterSingletonsInstantiated() {
    Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);

    if (Objects.nonNull(beans)) {
        beans.forEach(this::registerContainer);
    }
}

首先从容器中拿到被@RocketMQMessageListener注解的所有对象,然后循环调用registerContainer:

private void registerContainer(String beanName, Object bean) {
    Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

    if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
        throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
    }

    RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
    validate(annotation);

    String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
        counter.incrementAndGet());
    GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;

    genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
        () -> createRocketMQListenerContainer(bean, annotation));
    DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
        DefaultRocketMQListenerContainer.class);
    if (!container.isRunning()) {
        try {
            container.start();
        } catch (Exception e) {
            log.error("Started container failed. {}", container, e);
            throw new RuntimeException(e);
        }
    }

    log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}

针对用户定义的每个RocketMQMessageListener,都会构造一个RocketMQListenerContainer监听器的容器,并放入到容器中。然后

container.start()启动每个容器。

先看一下DefaultRocketMQListenerContainer的定义:

public class DefaultRocketMQListenerContainer implements InitializingBean,
    RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
     ...   
    }

实现InitializingBean接口,看一下afterPropertiesSet方法:

@Override
public void afterPropertiesSet() throws Exception {
    initRocketMQPushConsumer();

    this.messageType = getMessageType();
    log.debug("RocketMQ messageType: {}", messageType.getName());
}

initRocketMQPushConsumer初始化push模型的Consumer对象。

现在回过头来看container.start()方法:

@Override
public void start() {
    if (this.isRunning()) {
        throw new IllegalStateException("container already running. " + this.toString());
    }

    try {
        consumer.start();
    } catch (MQClientException e) {
        throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
    }
    this.setRunning(true);

    log.info("running container: {}", this.toString());
}

实际上就是consumer对象的start:

@Override
public void start() throws MQClientException {
    this.defaultMQPushConsumerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

消息跟踪的暂时不考虑,我们看defaultMQPushConsumerImpl.start():

public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            this.copySubscription();

            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }

            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            this.pullAPIWrapper = new PullAPIWrapper(
                mQClientFactory,
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
            this.offsetStore.load();

            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService =
                    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService =
                    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
            }

            this.consumeMessageService.start();

            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown();
                throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            mQClientFactory.start();
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    this.mQClientFactory.checkClientInBroker();
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    this.mQClientFactory.rebalanceImmediately();
}

switch里主要还是拿到mQClientFactory,然后start,这部分前面Producer时已经分析过了。我们看一下switch外面的部分:

  • this.updateTopicSubscribeInfoWhenSubscriptionChanged();

    更新topic路由信息的

  • this.mQClientFactory.checkClientInBroker();

    public void checkClientInBroker() throws MQClientException {
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
    
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            Set<SubscriptionData> subscriptionInner = entry.getValue().subscriptions();
            if (subscriptionInner == null || subscriptionInner.isEmpty()) {
                return;
            }
    
            for (SubscriptionData subscriptionData : subscriptionInner) {
                if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                    continue;
                }
                // may need to check one broker every cluster...
                // assume that the configs of every broker in cluster are the the same.
                String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
    
                if (addr != null) {
                    try {
                        this.getMQClientAPIImpl().checkClientInBroker(
                            addr, entry.getKey(), this.clientId, subscriptionData, 3 * 1000
                        );
                    } catch (Exception e) {
                        if (e instanceof MQClientException) {
                            throw (MQClientException) e;
                        } else {
                            throw new MQClientException("Check client in broker error, maybe because you use "
                                + subscriptionData.getExpressionType() + " to filter message, but server has not been upgraded to support!"
                                + "This error would not affect the launch of consumer, but may has impact on message receiving if you " +
                                "have use the new features which are not supported by server, please check the log!", e);
                        }
                    }
                }
            }
        }
    }
    

    发给broker做客户端校验。看一下MQClientAPIImpl的checkClientInBroker方法:

    public void checkClientInBroker(final String brokerAddr, final String consumerGroup,
        final String clientId, final SubscriptionData subscriptionData,
        final long timeoutMillis)
        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
        RemotingConnectException, MQClientException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_CLIENT_CONFIG, null);
    
        CheckClientRequestBody requestBody = new CheckClientRequestBody();
        requestBody.setClientId(clientId);
        requestBody.setGroup(consumerGroup);
        requestBody.setSubscriptionData(subscriptionData);
    
        request.setBody(requestBody.encode());
    
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
    
        assert response != null;
    
        if (ResponseCode.SUCCESS != response.getCode()) {
            throw new MQClientException(response.getCode(), response.getRemark());
        }
    }
    

    就是构造request发送消息。实际上和用户发送普通消息是一样的调用方式,只是这里构造的Command不太一样。这里封装了netty通信的细节,对上层提供了统一的remotingClient.invokeSync和remotingClient.invokeAsync等方法。后面单独一个篇章详细分析RemotingClient的通信部分。

    这里我们要理解的一点,前面mQClientFactory的start时只是初始化了netty的客户端对象,并没有和具体的broker建立链接,而这里会建立一个client和broker的链接,那么之后broker的push消息就可以推送到对应的client上了

  • this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

先做一次心跳

  • this.mQClientFactory.rebalanceImmediately();

做一次rebalance

我们知道前面mQClientFactory.start时会有对应的任务,但是每个任务都会有延迟,这里直接先调用保证实时性。

总结一下

Consumer端启动时需要做的事情基本包含Producer端启动时的所有内容。同时需要根据用户定义的消息监听器构建对应的Consumer对象。当Consumer对象启动时,会发一个检查消息给相关的broker,同时建立双方的TCP链接,准备接受push过来的消息。

 类似资料: