当前位置: 首页 > 面试题库 >

Spring MqttPahoMessageDrivenChannelAdapter丢失的连接:连接丢失;重试

郏瀚
2023-03-14
问题内容

我们正在使用Spring message-driven-channel- adapter订阅MQTT主题。但是,我们经常遇到错误。我已经使用JavaScript客户端(mqttws31.js)测试了连接,效果很好。表示连接没有问题。

错误:-

org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter connectionLost
SEVERE: Lost connection:Connection lost; retrying...

MQTT消息:-

[payload=6483D03E4C75BA943148F18D73,1.00,1E, headers={mqtt_retained=false, mqtt_qos=0, 
id=5fa41168-34c6-1e3d-a775-e3146842990a, mqtt_topic=TEST/GATEWAY2, mqtt_duplicate=false, timestamp=1499067757559}]

配置 :-

<bean id="clientFactory"
    class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="userName" value="${mqtt.username}" />
    <property name="password" value="${mqtt.password}" />
</bean>

<int-mqtt:message-driven-channel-adapter
    id="mqttInbound" client-id="${mqtt.default.client.id}" url="${mqtt.url}"
    topics="${topics}" client-factory="clientFactory" auto-startup="true"
    channel="output" error-channel="errorChannel" />


<int:channel id="output" />
<int:channel id="errorChannel" />

<int:service-activator input-channel="errorChannel"
    ref="errorMessageLogger" method="logError" />
<bean id="errorMessageLogger" class="com.mqtt.ErrorMessageLogger" />

<int:service-activator input-channel="output"
    method="handleMessage" ref="mqttLogger" />
<bean id="mqttLogger" class="com.mqtt.MqttReciever" />

pom.xml:

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.1.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>4.2.2.RELEASE</version>
</dependency>

在调试时org.eclipse.paho.client.mqttv3-1.1.1-sources.jar:-

CommsReceiver.Java

public void run() {
        final String methodName = "run";
        MqttToken token = null;

        while (running && (in != null)) {
            try {
                //@TRACE 852=network read message
                log.fine(CLASS_NAME,methodName,"852");
                receiving = in.available() > 0;
                MqttWireMessage message = in.readMqttWireMessage();
                receiving = false;

                // instanceof checks if message is null
                if (message instanceof MqttAck) {
                    token = tokenStore.getToken(message);
                    if (token!=null) {
                        synchronized (token) {
                            // Ensure the notify processing is done under a lock on the token
                            // This ensures that the send processing can complete  before the
                            // receive processing starts! ( request and ack and ack processing
                            // can occur before request processing is complete if not!
                            clientState.notifyReceivedAck((MqttAck)message);
                        }
                    } else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) {
                        //This is an ack for a message we no longer have a ticket for.
                        //This probably means we already received this message and it's being send again
                        //because of timeouts, crashes, disconnects, restarts etc.
                        //It should be safe to ignore these unexpected messages.
                        log.fine(CLASS_NAME, methodName, "857");
                    } else {
                        // It its an ack and there is no token then something is not right.
                        // An ack should always have a token assoicated with it.
                        throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
                    }
                } else {
                    if (message != null) {
                        // A new message has arrived
                        clientState.notifyReceivedMsg(message);
                    }
                }
            }
            catch (MqttException ex) {
                //@TRACE 856=Stopping, MQttException
                log.fine(CLASS_NAME,methodName,"856",null,ex);
                running = false;
                // Token maybe null but that is handled in shutdown
                clientComms.shutdownConnection(token, ex);
            }
            catch (IOException ioe) {
                //@TRACE 853=Stopping due to IOException
                log.fine(CLASS_NAME,methodName,"853");

                running = false;
                // An EOFException could be raised if the broker processes the
                // DISCONNECT and ends the socket before we complete. As such,
                // only shutdown the connection if we're not already shutting down.
                if (!clientComms.isDisconnecting()) {
                    clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
                }
            }
            finally {
                receiving = false;
            }
        }

        //@TRACE 854=<
        log.fine(CLASS_NAME,methodName,"854");
    }

在上述方法中,有时会in.readMqttWireMessage()抛出IOException。所以基于catch块,它使用clientComms.shutdownConnection(token, ...


问题答案:
我只是想分享一下,以防万一。…我有相同的异常,并通过确保生成了唯一的客户端ID(使用 MqttAsyncClient.generateClientId())来解决它,如下所述: https
//github.com/eclipse/paho.mqtt.java / issues /
207#issuecomment-338246879


 类似资料:
  • 问题内容: 在生产中,我有一个使用连接局部变量保存游戏状态的游戏。但是,我注意到,如果我在连接上闲置了一段时间,它将断开连接并重新连接,这将丢失当前状态。在本地主机上进行测试时,我从未注意到此行为。这是套接字连接的规范行为,还是导致连接断开的其他原因。 如果是正常行为,通常如何处理?是否应该将连接值全局存储,以便用户断开/重新连接时可以恢复连接值? 问题答案: 您的问题与套接字超时有关。如果某个套

  • 我怎样才能让我的查询在查询时不丢失连接?

  • 我试图在我的设备(华为Nova 4)上运行我的应用程序,但它崩溃了,在另一部手机上却没有。 颤振问题,救命

  • 自上次更新以来,启动应用程序后,“Project Explorer”中的FTP-Server-Connections丢失。通常我可以很容易地选择服务器和浏览文件,但现在一秒钟后,所有的列表条目都丢失了,它只显示“本地文件系统”了。 (之前在Local Filesystem下列出的是我的服务器:http://d.pr/I/zbc1)昨天我重新添加了它,我以为它可以工作,但今天它又丢失了... 编辑:

  • 问题内容: 我有一个问题,如果mySQL Server在“睡眠时间” 500秒后终止了会话,则下一个请求不会成功。如果mySQL Server没有关闭睡眠连接,则可能在700秒后发生相同的问题。 我能做什么?遵循我的persistence.xml的属性 如果重要的话,transactiontype为RESOURCE_LOCAL。 问题答案: 您如何配置连接池?如果它是服务器数据源,则应在服务器中设

  • 我正在尝试从macos上使用swift开发的应用程序连接到realm object server 当我尝试连接时,会出现“网络连接丢失”错误,但我可以从浏览器中毫无问题地打开领域服务器。。顺便说一句,这个问题最近发生了,它在几周前工作正常 这里,用户总是,错误描述是 这发生在本地服务器和一个托管在数字海洋 服务器版本1.8。3. xcode控制台日志 2018-01-15 12:46:07.077