当前位置: 首页 > 知识库问答 >
问题:

无法从Vert.x MQTT服务器接收消息

常哲彦
2023-03-14

我试图让一些基于PAHO的客户机使用Vert.x MQTT服务器。我试图发布到我的接收客户端订阅的测试主题。我很难从客户端发布者向客户端订阅者发送消息。

使用我在Internet上看到的真实例子,我构建了一个MQTT代理。vert.x MQTT代理代码的代码如下所示:

public class MQTTBroker
{      
   public MQTTBroker()
   {
      MqttServerOptions opts = new MqttServerOptions();
      opts.setHost("localhost");
      opts.setPort(1883);

      MqttServer server = MqttServer.create(Vertx.vertx(),opts);

      server.endpointHandler(endpoint -> {
      System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
      endpoint.accept(false);

      if("Test_Send".equals(endpoint.clientIdentifier()))
      {
         doPublish(endpoint);

         handleClientDisconnect(endpoint);
      }
      else
      {
          handleSubscription(endpoint);
          handleUnsubscription(endpoint);
          handleClientDisconnect(endpoint);
      }
     }).listen(ar -> {
      if (ar.succeeded()) {
           System.out.println("MQTT server is listening on port " + ar.result().actualPort());
       } else {
           System.out.println("Error on starting the server");
           ar.cause().printStackTrace();
       }
   });
 }

  protected void handleSubscription(MqttEndpoint endpoint) {
     endpoint.subscribeHandler(subscribe -> {
         List grantedQosLevels = new ArrayList < > ();
         for (MqttTopicSubscription s: subscribe.topicSubscriptions()) {
             System.out.println("Subscription for " + s.topicName() + " with QoS " + s.qualityOfService());
             grantedQosLevels.add(s.qualityOfService());
        }
        endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
    });
}

protected void handleUnsubscription(MqttEndpoint endpoint) {
    endpoint.unsubscribeHandler(unsubscribe -> {
        for (String t: unsubscribe.topics()) {
            System.out.println("Unsubscription for " + t);
        }
        endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
    });
}

protected void publishHandler(MqttEndpoint endpoint) {
     endpoint.publishHandler(message -> {
     endpoint.publishAcknowledge(message.messageId());
     }).publishReleaseHandler(messageId -> {
         endpoint.publishComplete(messageId);
     });
 }

protected void handleClientDisconnect(MqttEndpoint endpoint) {
    endpoint.disconnectHandler(h -> {
        System.out.println("The remote client has closed the connection.");
    });
}

protected void doPublish(MqttEndpoint endpoint) {

   // just as example, publish a message with QoS level 2
   endpoint.publish("Test_Topic",
     Buffer.buffer("Hello from the Vert.x MQTT server"),
     MqttQoS.EXACTLY_ONCE,
     false,
     false);
   publishHandler(endpoint);

   // specifing handlers for handling QoS 1 and 2
   endpoint.publishAcknowledgeHandler(messageId -> {

     System.out.println("Received ack for message = " +  messageId);

   }).publishReceivedHandler(messageId -> {

     endpoint.publishRelease(messageId);

   }).publishCompleteHandler(messageId -> {

     System.out.println("Received ack for message = " +  messageId);
   });
  }
}
public class Receiver implements MqttCallback
{

   public Receiver()
   {     
      MqttClient client = null;

      try
      {
         MemoryPersistence persist = new MemoryPersistence(); 
         client = new MqttClient("tcp://localhost:1883",persist);
         client.setCallback(this);
         client.connect();
         client.subscribe("Test_Topic");
         System.out.println("The receiver is initialized.");
      }
      catch(Exception exe)
      {
         exe.printStackTrace();
      }
   }

   @Override
   public void connectionLost(Throwable arg0)
   {
      System.out.println("Connection is lost!");
   }

   @Override
   public void deliveryComplete(IMqttDeliveryToken arg0)
   {
      System.out.println("Delivered!");
   }

    @Override
   public void messageArrived(String theStr, MqttMessage theMsg)
   {
     System.out.println("Message from: "+theStr);

      try
      {
         String str = new String(theMsg.getPayload());
         System.out.println("Message is: "+str); 
      }
      catch(Exception exe)
      {
         exe.printStackTrace();
      }

   }

 }
public void publish(String theMsg)
{
   MqttClient nwclient = null;

   try
  {
      ServConfigurator serv = ServConfigurator.getInstance();
      MemoryPersistence persist = new MemoryPersistence(); 
      String url = "tcp://localhost:1883";

      nwclient = new MqttClient(url,"Test_Send",persist);

      MqttConnectOptions option = new MqttConnectOptions();
      option.setCleanSession(true);

      nwclient.connect(option);

      MqttMessage message = new MqttMessage(theMsg.getBytes());
      message.setQos(2);
      nwclient.publish("Test_Topic",message);

      nwclient.disconnect();
      nwclient.close();

   }
   catch(Exception exe)
   {
       exe.printStackTrace();
   }
}

我很确定我在这里错过了什么,但我想不出会是什么。有人能帮我把这个弄好吗???

提前感谢您的任何帮助或见解。

共有1个答案

张鹏鹍
2023-03-14

当MQTT服务器从发布服务器接收消息,以便调用Endpoint.PublishHandler将消息传递给它时,我在代码中看不到任何逻辑来获取主题并搜索为该主题注册的订阅者(endpoint)并向它们发送消息。同时,我没有看到您在代码中以某种方式保存订阅服务器endpoint和订阅主题之间的引用来进行上述研究。请记住,MQTT服务器不是代理,它不会为您处理关于主题的订阅客户机列表;你可以在它上面建立一个经纪人,这应该是你正在努力做的事情?

 类似资料:
  • 问题内容: 我要进行最简单的解释。我的Java TCP项目有一个服务器和三个客户端。 服务器具有一个ClientThread。每个客户端都有一个ServerThread和一个UserThread。 工作流程为: 1.客户端(例如,client_0)的UserThread获取用户输入,然后将消息发送到服务器。 2.服务器的ClientThread捕获来自client_0的消息,并将另一条消息发送到另

  • 问题内容: 我要进行最简单的解释。我的Java TCP项目有一个服务器和三个客户端。 服务器具有一个ClientThread。每个客户端都有一个ServerThread和一个UserThread。 工作流程为: 1.客户端(例如,client_0)的UserThread获取用户输入,然后将消息发送到服务器。 2.服务器的ClientThread捕获来自client_0的消息,并将另一条消息发送到另

  • 使用@value(${fanout.exchange})注释在使用带有Github Repo的spring-cloud-config服务器时初始化失败。我得到: 这两个类的pom.xml上都有spring-cloud-config。configServer使用@EnableConfigServer进行注释。我在github repo中的文件名为datacollector.properties Ra

  • 我开发了一个客户端-服务器UDP应用程序。服务器UDP套接字设置为广播UDP套接字。双方的代码不会产生任何错误,但客户端未接收到从广播UDP服务器端发送的消息。请看一下我的代码,我知道有一些错误我想不通。我真的需要帮助: 服务器: 客户: }

  • 问题内容: 我正在尝试用两个客户端实现一个系统,其中一个客户端发送一条消息,而另一个客户端将接收该消息。下图将以更直观的方式对其进行解释: 因此,客户端1将消息发送到服务器(此工作正常),服务器接收到“推送”消息并发出应由客户端2接收的“弹出”消息。这里的问题是客户端2从未收到“流行”消息。:( 这是所有代码。 SERVER.JS 客户1(aka mobile.html) 客户2(aka web.

  • 有人知道netty服务器处理程序取消从web服务器接收数据的最佳方法吗?我有一个服务器处理程序,它将HttpRequests代理到web服务器。但是,当请求客户端取消请求时,我希望在不关闭服务器处理程序和web服务器之间的连接的情况下停止从web服务器接收服务器通道上的数据。 有谁知道我怎么才能做到这一点。你的答复将不胜感激。 非常感谢。