mqtt 发送消息过多_eclipse mqttclient 性能&MQTT(32202): 正在发布过多的消息

孟健
2023-12-01

mqttclient性能&MQTT(32202): 正在发布过多的消息

org.eclipse.paho.client.mqttv3

2.2 GHz Intel Core i7 mac系统

publish性能,注意请使用单线程的 mqttclinet

1万条 341毫秒

4万条 1163毫秒

5万 1450毫秒

10万条 2700毫秒

多线程的 mqttclinet MQTT(32202): 正在发布过多的消息 问题

异常信息

[15:07:21]: publish failed, message: aaaa

正在进行过多的发布 (32202)

at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:496)

at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:132)

at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:156)

at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1027)

at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:399)

at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:171)

at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:161)

at io.communet.ichater.emq.sub.MqttSendMsgEventSubscribe.onEvent(MqttSendMsgEventSubscribe.java:28)

at java.lang.reflect.Method.invoke(Native Method)

at java.lang.reflect.Method.invoke(Method.java:372)

at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:507)

at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:501)

at org.greenrobot.eventbus.AsyncPoster.run(AsyncPoster.java:46)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)

at java.lang.Thread.run(Thread.java:818)

解决办法

消息发送发送限流

用单独的一个线程来完成 MQ 消息的推送 (不用这个MqttAsyncClient ,使用MqttClient 就没有事)

options.setMaxInflight(1000) 增加 actualInFlight 的值;

反思

笔者出现这个错误是因为使用 EventBus, 之前使用单独线程的 Handler 是没有问题的, 调查发现, 使用 EventBus 是新建线程运行的, 而 Handler 是单独一个线程.

所以当发送大量消息的时候, EventBus 几乎是同一个点发出去, 就会造成这个错误

原因

根据堆栈信息找到报错地方

if (actualInFlight >= this.maxInflight) {

//@TRACE 613= sending {0} msgs at max inflight window

log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)});

throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);

}

其中 actualInFlight 如下

// processed until the inflight window has space.

if (actualInFlight < this.maxInflight) {

// The in flight window is not full so process the

// first message in the queue

result = (MqttWireMessage)pendingMessages.elementAt(0);

pendingMessages.removeElementAt(0);

actualInFlight++;

//@TRACE 623=+1 actualInFlight={0}

log.fine(CLASS_NAME,methodName,"623",new Object[]{new Integer(actualInFlight)});

}

从 pendingMessages 中取出消息时, actualInFlight 加 1, maxInflight 可以自己设定, 默认值为 10.

public class ClientState {

...

volatile private Vector pendingMessages;

...

}

在 ClientState 中:

public void send(MqttWireMessage message, MqttToken token) throws MqttException {

...

if (message instanceof MqttPublish) {

synchronized (queueLock) {

if (actualInFlight >= this.maxInflight) {

//@TRACE 613= sending {0} msgs at max inflight window

log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)});

throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);

}

MqttMessage innerMessage = ((MqttPublish) message).getMessage();

//@TRACE 628=pending publish key={0} qos={1} message={2}

log.fine(CLASS_NAME,methodName,"628", new Object[]{new Integer(message.getMessageId()), new Integer(innerMessage.getQos()), message});

switch(innerMessage.getQos()) {

case 2:

outboundQoS2.put(new Integer(message.getMessageId()), message);

persistence.put(getSendPersistenceKey(message), (MqttPublish) message);

break;

case 1:

outboundQoS1.put(new Integer(message.getMessageId()), message);

persistence.put(getSendPersistenceKey(message), (MqttPublish) message);

break;

}

tokenStore.saveToken(token, message);

pendingMessages.addElement(message);

queueLock.notifyAll();

}

} else {

...

}

}

可以看到 pendingMessages 中添加元素的时候并没有做 qos 类型的判断

private void decrementInFlight() {

final String methodName = "decrementInFlight";

synchronized (queueLock) {

actualInFlight--;

//@TRACE 646=-1 actualInFlight={0}

log.fine(CLASS_NAME,methodName,"646",new Object[]{new Integer(actualInFlight)});

if (!checkQuiesceLock()) {

queueLock.notifyAll();

}

}

}

当收到消息反馈时 actualInFlight 减 1.

 类似资料: