以下只针对集群模式:
1 producer
默认情况下不需要设置instanceName,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
如果同一个jvm中,不同的producer需要往不同的rocketmq集群发送消息,需要设置不同的instanceName
原因如下:如果不设置instanceName,那么会使用ip@pid作为producer唯一标识,那么会导致多个producer内部只有一个MQClientInstance(与mq交互)实例,从而导致只往一个集群发消息。
2 consumer
默认情况下不需要设置instanceName,rocketmq会使用ip@pid作为instanceName(pid代表jvm名字)
如果设置instanceName,rocketmq会使用ip@instanceName作为consumer的唯一标示,此时需要注意instanceName需要不同。
3 consumer设置上instanceName后,无法集群消费的问题调查
应用场景:
一台机器上的多个consumer jvm进程消费整个集群的消息
问题说明:
由于集群模式下我们希望consumer能够平均消费整个集群的消息,但是设置上instanceName后,发现每个consumer都消费整个集群的消息。
查看开发指南,该参数说明如下:
参数名
默认值
说明
instanceName DEFAULT
客户端实例名称,客户端创建的多个 Producer、
Consumer 实际是共用一个内部实例(这个实例包含
网络连接、线程资源等)
问题调查:
一开始怀疑是cosumer的MessageModel设置为广播消费的原因导致的:
广播消费:一条消息被多个 Consumer 消费, 即使返些 Consumer 属亍同一个 Consumer Group,消息也会被 Consumer
Group 中的每个 Consumer 都消费一次
但是DefaultMQPushConsumer中MessageModel默认就是集群模式,故排除。
后来经过走查DefaultMQPushConsumer源码发现是由于rebalance策略问题导致的,consumer的rebalance会在首次启动时和之后每10秒一次,做rebalance操作,代码调用链如下:
首次启动rebalance代码如下:
DefaultMQPushConsumer.start -> DefaultMQPushConsumerImpl.start -> MQClientInstance.rebalanceImmediately -> RebalanceService.wakeup
之后每10秒一次的代码如下:
DefaultMQPushConsumer.start -> DefaultMQPushConsumerImpl.start -> MQClientInstance.start -> RebalanceService.start,启动rebalance线程
查看DefaultMQPushConsumer的rebalance策略,默认是AllocateMessageQueueAveragely,该策略是均衡消息队列到consumer,既然有rebalance,那为何没有做到平衡呢?
继续跟踪源码:
RebalanceService.start -> MQClientInstance.doRebalance -> DefaultMQPushConsumerImpl.doRebalance -> RebalanceImpl.doRebalance -> RebalanceImpl.rebalanceByTopic -> AllocateMessageQueueAveragely.allocate 此时才到真正的rebalance。
该方法参数说明如下:
/**
* Allocating by consumer id
*
* @param consumerGroup current consumer group
* @param currentCID current consumer id
* @param mqAll message queue set in current topic
* @param cidAll consumer set in current consumer group
* @return
*/
publicList allocate(String consumerGroup,String currentCID,List mqAll,List cidAll);
通过debug allocate方法发现,第二个参数需要MQClientInstance.clientId,这个由ClientConfig.buildMQClientId产生,产生规则是ip@instanceName
而instanceName即为consumer设置的,如果设置上这个参数,启动多个jvm进程,则currentCID都一样,而计算rebalance时如下代码导致每次将所有的queue分配到一个consumer上:
int index = cidAll.indexOf(currentCID);
假如不设置instanceName,ClientConfig.changeInstanceNameToPID会获取RuntimeMXBean.getName作为instanceName,而这个值对于多个jvm是不一样的,api解释如下:
RuntimeMXBean.getName:返回表示正在运行的 Java 虚拟机的名称。返回的名称字符串可以为任何任意字符串,Java 虚拟机实现可以选择在返回的名称字符串中嵌入特定于平台的有用信息。每个正在的运行的虚拟机可以具有不同的名称。
这样rebalance时就会平均分配到consumer上。
————————————————
版权声明:本文为CSDN博主「MQCloud」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/a417930422/article/details/50663629