当前位置: 首页 > 知识库问答 >
问题:

脱机消息不在带有Paho客户端的Moquette中使用

丁震博
2023-03-14

我在通过eclipse Paho客户机使用Moquette服务器中的脱机MQTT消息方面遇到了一个问题。

    null

以上步骤都是成功的,没有任何问题。

然后我停止了我的使用者应用程序,并用相同的主题将MQTT数据发送给broker。使用我的publisher Application-Server能够接收这些消息,但是在这个时刻没有任何使用者来使用此消息,因为我已经停止了我的使用者。然后我再次启动了我的消费者应用程序。它成功地连接到代理,但是,当使用者关闭时,它没有收到我发送给代理的任何消息。

我是否需要对我的Moquette服务器进行任何特定的配置来持久化数据(使用干净的会话:false)?还是我漏掉了什么?

package com.gbids.mqtt.moquette.main;

import com.gbids.mqtt.moquette.server.PublishInterceptor;
import io.moquette.interception.InterceptHandler;
import io.moquette.server.Server;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class ServerLauncher {

    public static void main(String[] args) throws IOException {
        Properties props = new Properties();
        final IConfig configs = new MemoryConfig(props);

        final Server mqttBroker = new Server();
        final List<? extends InterceptHandler> userHandlers = Arrays.asList(new PublishInterceptor());
        mqttBroker.startServer(configs, userHandlers);

        System.out.println("moquette mqtt broker started, press ctrl-c to shutdown..");
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.out.println("stopping moquette mqtt broker..");
                mqttBroker.stopServer();
                System.out.println("moquette mqtt broker stopped");
            }
        });
    }
}
package com.gbids.mqtt.moquette.main;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class ConsumerLauncher implements MqttCallback {

    private static final String topicPrefix = "devices/reported";
    private static final String broker = "tcp://0.0.0.0:1883";
    private static final String clientIdPrefix = "consumer";

    public static void main(String[] args) throws MqttException {
        final String clientId = "consumer_1";
        MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(false);
        sampleClient.connect(connOpts);
        sampleClient.subscribe(topicPrefix + "/#", 1);
        sampleClient.setCallback(new ConsumerLauncher());
    }

    public void connectionLost(Throwable throwable) {
        System.out.println("Consumer connection lost : " + throwable.getMessage());
    }

    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        System.out.println("Message arrived from topic : " + s + " | Message : " + new String(mqttMessage.getPayload()) + " | Message ID : " +mqttMessage.getId());
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("Delivery completed from : " + clientIdPrefix + "_1");
    }
}
package com.gbids.mqtt.moquette.main;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class ClientLauncher {

    private static final String content = "{\"randomData\": 25}";
    private static final String willContent = "Client disconnected unexpectedly";
    private static final String broker = "tcp://0.0.0.0:1883";
    private static final String clientIdPrefix = "client";

    public static void main(String[] args) throws Exception{
        sendDataWithQOSOne();
        System.exit(0);
    }

    private static void sendDataWithQOSOne(){
        try {
            final String clientId = "client_1";
            MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(false); // for publisher - this is not needed I think
            sampleClient.connect(connOpts);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(1);
            final String topic = "devices/reported/" + clientId;
            sampleClient.publish(topic, message);
            System.out.println("Message published from : " + clientId + " with payload of : " + content);
            sampleClient.disconnect();
        } catch (MqttException me) {
            me.printStackTrace();
        }
    }
}

共有1个答案

胡泓
2023-03-14

在您的例子中,在ClientLauncher(publisher)中创建MQTTMessage时,需要将Request标志设置为True。默认值为false,如文档中所示。

...    
message.setRetained(true)
...

设置此标志可以将消息保留在代理上并发送到新连接的客户端。请注意,代理只保留一个主题的最后一条消息。没有办法为一个特定的主题保留一个以上的消息。

 类似资料:
  • 问题内容: 我正在编写django应用程序,该应用程序应充当MQTT发布者和订阅者。 我应该在哪里启动Paho客户端并运行loop_forever()函数。 应该在wsgi.py中吗? 问题答案: 更新: 如果您需要在Django的多个线程运行,那么发布您的Django应用程序的信息,您可以使用辅助功能从发布泛美卫生组织的模块- https://eclipse.org/paho/clients/p

  • null 但是,我发现代理在重新连接时不会向客户机发送任何东西。 这就是我如何测试的:使用上面提到的四个参数将客户机连接到代理。用QOS=1订阅感兴趣的主题断开客户端 使用另一个客户端程序和另一个客户端id,连接到代理将消息发布到由现在脱机的客户端订阅的同一主题。请等待几秒钟,现在使用与以前相同的连接设置重新连接脱机客户端。

  • 我一直试图与paho mqtt客户端一起工作,以mosquitto作为代理发布和接收消息,并且工作良好。我的用例虽然涉及到发送者向代理发布消息并断开连接,但此时,接收者无论连接还是断开都应该使用该消息并立即删除它。我已经使用了所有的属性,例如QOS、保留的消息、干净的会话等,但没有一个产生我想要的结果。请帮忙。

  • 我确实在我的应用程序中使用PAHO C客户机库。我确实订阅了MQTTAsync_subscribe()和QoS设置为1的主题。根据我的理解,1的意思是,消息至少被发送到客户端一次。 也许有人能帮助我理解为什么会发生这种情况,或者如何克服这种情况?

  • null 有人帮忙吗? 谢谢,拉胡尔

  • Firebase数据库提供了两种让客户端脱机的方法: 数据库参考。白痴() 手动断开Firebase数据库客户端与服务器的连接,并禁用自动重新连接。注意:调用此方法将影响所有Firebase数据库连接。 FirebaseDatabase。白痴() 关闭与Firebase数据库后端的连接,直到调用goOnline()。 调用FirebasDatabase.getInstance(). goOffli