private void doConnect() {
Log.d(TAG, "doConnect()");
IMqttToken token;
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setKeepAliveInterval(30);
try {
mqttClient = new MqttAsyncClient(tcp_server_URL, deviceId, new MemoryPersistence());
token = mqttClient.connect(options);
token.waitForCompletion(3500);
mqttClient.setCallback(new MqttEventCallback());
//changed single subscription of each topic to mass subscription
// using String [] of Topics and int [] of QoS
if (channelList != null) {
channelListStringArray= new String[channelList.size()];
channelListQosStringArray= new int[channelList.size()];
for (int i = 0; i < channelList.size(); i++) {
// creating String Array of topics and int Array of QoS
channelListStringArray [i] = "account/" + channelList.get(i);
channelListQosStringArray [i] = 1;
}
//subscribe all channels by passing all topics as String Array and QoS int array
token = mqttClient.subscribe(channelListStringArray, channelListQosStringArray);
token.waitForCompletion(3500);
}
} catch (MqttSecurityException e) {
e.printStackTrace();
} catch (MqttException e) {
switch (e.getReasonCode()) {
case MqttException.REASON_CODE_BROKER_UNAVAILABLE:
case MqttException.REASON_CODE_CLIENT_TIMEOUT:
case MqttException.REASON_CODE_CONNECTION_LOST:
case MqttException.REASON_CODE_SERVER_CONNECT_ERROR:
Log.v(TAG, "c" + e.getMessage());
e.printStackTrace();
break;
case MqttException.REASON_CODE_FAILED_AUTHENTICATION:
Intent i = new Intent("RAISEALLARM");
i.putExtra("ALLARM", e);
Log.e(TAG, "b" + e.getMessage());
break;
default:
Log.e(TAG, "a" + e.getMessage());
}
}
}
public void onDestroy() {
super.onDestroy();
//disconnect from mqtt server
IMqttToken token;
try {
if (channelList != null) {
channelListStringArray = new String[channelList.size()];
for (int i = 0; i < channelList.size(); i++) {
//creating array of topics
channelListStringArray[i] = "account/" + channelList.get(i);
}
//unsubscribe MQTTClient by passing String array of Topics, NULL, mqttActionListener to get CallBack
//on success or failure of unsubscription
token = mqttClient.unsubscribe(channelListStringArray, getApplicationContext(), mqttActionListener);
token.waitForCompletion(3500);
}
//Disconnect MqttCient
token = mqttClient.disconnect();
token.waitForCompletion(3500);
} catch (MqttException e) {
e.printStackTrace();
}
unregisterReceiver(broadcastReceiver);
}
这是因为你把洁净标志设为假。
如果不希望消息排队,则将其设置为true。
...
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setKeepAliveInterval(30);
...
Clean Session标志告诉代理存储QoS1/2订阅的任何消息,直到客户端重新连接。
问题内容: 我正在使用Paho发送和接收mqtt消息。到目前为止,发送消息一直没有问题,我正在使用mosquitto接收消息。 现在,我想使用Java客户端读取消息,并且注意到关于接收消息的文档越来越少。 我实现了MqttCallback接口,但仍然无法弄清楚如何阅读已订阅的主题的消息。 到目前为止,这是我的源代码,我可以使用mosquitto_sub读取消息。 问题答案: 您将在代理有时间将消息
微信文档:https://developers.weixin.qq.com/miniprogram/dev/api-backend/open-api/subscribe-message/subscribeMessage.addTemplate.html 组合模板并添加至帐号下的个人模板库 $tid = 563; // 模板标题 id,可通过接口获取,也可登录小程序后台查看获取 $kidLi
开普勒消息目前分为三大类:公告,告警和通知。 通知中根据不同的操作事件类型,分为十几个事件。每个事件都跟项目操作相关。便于接收项目操作变更的通知。 分类 事件 公告 Alarm 告警 Proclaim 通知 Build,Apply,Audit,Delete,Rollback,Logging,Reboot,Command,Storage,Extend... 订阅界面: 用户中心,点击头像,下拉菜单→
例如: 我是否需要取消订阅此订阅?我问这是因为我不知道Angular是否处理它本身。另外,请求是一次性的,而不是连续的。我倾向于认为我不需要。你有什么想法? 根据下面的帖子:Angular/RXJS,我什么时候应该取消订阅'subscription' RxJS处理一次性订阅,但我在他们的文档中没有找到任何东西。
我使用SockJS和StompJS,当我在浏览器中打开我的应用程序时,有时它会在连接到websocket之前尝试订阅一些主题。我希望主题订阅等待应用程序连接到websocket。 这就是我实现此代码的原因,我将其称为: 因此,我只在连接状态为时才订阅该主题,并且只有在客户端首次成功连接时才会调用该主题。 我想稍后从主题中取消订阅,所以我需要内部订阅返回的对象,我还需要内部订阅的消息。 我所实现的很
我通读了RxJS文档,并希望确保我理解了< code > subscriber . unsubscribe()和< code > subscriber . complete()之间的区别。 假设我有一个有两个订阅者的可观察对象,订阅者1和订阅者2。如果订阅者1对其订阅调用取消订阅,它将不再接收来自可观察对象的通知,但订阅者2将继续接收它们。 <代码>的文档。complete(): 观察者回调,用于