当前位置: 首页 > 知识库问答 >
问题:

如何在java中使用mqtt从云(iothub)检索数据

尹小云
2023-03-14

我是新来的。我已经成功地使用python将消息发送到IOT中心(D2C)。我们使用的协议是mqtt。我们正在尝试使用java从云(IOT中心)检索数据,但无法找到从云接收消息的正确方法。我的疑问是我们是否可以直接从IOT中心读取消息,或者我们需要将传入消息重定向到事件中心来检索消息。

我还尝试在向云发送数据的同时从java中的iothub读取消息,但得到的错误如下。。(与服务器的连接中断。重新连接0次。)

我使用此代码从iothub读取数据,

import com.microsoft.azure.sdk.iot.device.DeviceClient;
import com.microsoft.azure.sdk.iot.device.IotHubMessageResult;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.MessageCallback;
import com.microsoft.azure.sdk.iot.device.IotHubClientProtocol;
import com.microsoft.azure.sdk.iot.service.sdk.IotHubServiceClientProtocol;
import java.io.IOException;
import java.net.URISyntaxException;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Kafkareception {

    public static void main(String[] args) throws IOException {
        try {
            String connString = "HostName=";
            IotHubClientProtocol protocol = IotHubClientProtocol.MQTT;
            DeviceClient client = new DeviceClient(connString, protocol);

            MessageCallback callback = new AppMessageCallback();
            client.setMessageCallback(callback, null);
            client.open();
        } catch (URISyntaxException ex) {
            Logger.getLogger(Kafkareception.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    private static class AppMessageCallback implements MessageCallback {

        public IotHubMessageResult execute(Message msg, Object context) {
            System.out.println(new String(msg.getBytes(), Message.DEFAULT_IOTHUB_MESSAGE_CHARSET) + "Received message from hub: ");

            return IotHubMessageResult.COMPLETE;
        }
    }
}

共有2个答案

尤祖鹤
2023-03-14

我读了iothub的数据。我们可以用密码

import java.io.IOException;
import com.microsoft.azure.eventhubs.*;
import com.microsoft.azure.servicebus.*;

import java.nio.charset.Charset;
import java.time.*;
import java.util.function.*;

public class Datafetch {

    public static void main(String[] args) throws IOException {
        EventHubClient client0 = receiveMessages("0");
        EventHubClient client1 = receiveMessages("1");
        System.out.println("Press ENTER to exit.");
        System.in.read();
        try {
            client0.closeSync();
            client1.closeSync();
            System.exit(0);
        } catch (ServiceBusException sbe) {
            System.exit(1);
        }
    }

private static EventHubClient receiveMessages(final String partitionId) {

        String connStr = "Endpoint={youreventhubcompatibleendpoint};EntityPath={youreventhubcompatiblename};SharedAccessKeyName=iothubowner;SharedAccessKey={youriothubkey}";
        EventHubClient client = null;
        try {
            client = EventHubClient.createFromConnectionStringSync(connStr);
        } catch (Exception e) {
            System.out.println("Failed to create client: " + e.getMessage());
            System.exit(1);
        }
        try {
            client.createReceiver(
                    EventHubClient.DEFAULT_CONSUMER_GROUP_NAME,
                    partitionId,
                    Instant.now()).thenAccept(new Consumer<PartitionReceiver>() {
                        public void accept(PartitionReceiver receiver) {
                            System.out.println("** Created receiver on partition " + partitionId);
                            try {
                                while (true) {
                                    Iterable<EventData> receivedEvents = receiver.receive(100).get();
                                    System.out.println(receivedEvents);
                                    int batchSize = 0;
                                    if (receivedEvents != null) {
                                        for (EventData receivedEvent : receivedEvents) {
                                            System.out.println(String.format("Offset: %s, SeqNo: %s, EnqueueTime: %s",
                                                    receivedEvent.getSystemProperties().getOffset(),
                                                    receivedEvent.getSystemProperties().getSequenceNumber(),
                                                    receivedEvent.getSystemProperties().getEnqueuedTime()));
                                            System.out.println(String.format("| Device ID: %s", receivedEvent.getSystemProperties().getClass()));
                                            System.out.println(String.format("| Message Payload: %s", new String(receivedEvent.getBody(),
                                                    Charset.defaultCharset())));
                                            batchSize++;
                                        }
                                    }
                                    System.out.println(String.format("Partition: %s, ReceivedBatch Size: %s", partitionId, batchSize));
                                }
                            } catch (Exception e) {
                                System.out.println("Failed to receive messages: " + e.getMessage());
                            }
                        }
                    });
        } catch (Exception e) {
            System.out.println("Failed to create receiver: " + e.getMessage());
        }
        return client;
    }
}
葛子昂
2023-03-14

根据您提供的信息,您可以尝试使用DeviceClient,设置一台设备到Azure IoT Hub的两个活动连接:一个是发送D2C消息,另一个是“从iothub读取数据”。出现错误的原因可能是:

IoT集线器仅支持每个设备一个活动MQTT连接。代表同一设备ID的任何新MQTT连接都会导致IoT Hub断开现有连接。

Ref:使用MQTT协议与IoT集线器通信。

如果您想接收发送到Azure IoT Hub的D2C消息,可以使用事件中心兼容endpoint(Java)。无需自行将传入消息重定向到事件中心。

IoT中心公开后端服务的内置终结点的消息/事件,以读取中心接收的设备到云消息。此终结点与事件中心兼容,可使用事件中心服务支持的任何机制读取消息。

Ref:了解Azure物联网中心消息传递和物联网中心endpoint。

 类似资料:
  • 我还要求通过启用TLS/SSL(根据Microsoft Azure文档:https://docs.Microsoft.com/en-us/Azure/iot-hub/iot-hub-mqtt-support#tlsssl-configuration)(如https://github.com/mqttjs/mqtt.js#client上的mqtt.js文档所述)。 如何使用第三方MQTT库和SAS令

  • 我想从Java的MySQL数据库中检索数学方程,并使用ITextPDF将其写入PDF文件。 我尝试在数据库连接中将字符编码设置为UTF8,然后我从数据库中检索以字节为单位的数据,将其转换为字符串,然后将其添加到我的文档中。生成的pdf文件没有任何数学符号。Java是罪魁祸首还是iTextPDF? } 输出:4 x 25÷5=? 预期输出为:√4 x√25÷5=?

  • 问题内容: 我已经完成了将记录插入数据库的操作,但是我不知道如何检索它。我的代码是: Account.java: MyAccount.java: insert.html: web.xml: 当用户单击“检索”按钮时,如何从数据库检索数据,并以另一HTML格式显示所有记录?请提供有关操作方法的建议。 在我的应用程序中,当用户单击检索按钮时,它正在执行插入操作。但是我想要的是,当用户单击它时,应该转到

  • 问题内容: 我想使用HTTP GET和POST命令从网站检索URL并解析HTML。我该怎么做呢? 问题答案: 您可以将HttpURLConnection与URL结合使用。

  • 本文向大家介绍在使用Java从MongoDB集合中检索数据时,如何限制记录数?,包括了在使用Java从MongoDB集合中检索数据时,如何限制记录数?的使用技巧和注意事项,需要的朋友参考一下 从MongoDB集合中检索记录时,您可以使用以下方法限制结果中的记录数: limit()方法。 语法 Java MongoDB库提供了一个具有相同名称的方法,以通过传递表示所需记录数的整数值来限制调用此方法的

  • 问题内容: 我正在尝试获取Java中的发布数据。好像应该做的最简单的事情之一?我的意思是,HttpServletRequest.getParameter必须正确执行吗?那么,如何获取原始帖子数据? 我发现HttpServletRequest获得JSONPOST数据,并使用Kdeveloper的代码从请求中提取发布数据。它有效,但是有一个陷阱:我只能 一次 获取该发布数据。 这是我根据Kdevelo