和硬件设备进行通信,阿里云mqtt消息会自动转发到RocketMq,所以在进行交互的时候:
接收消息:设备–>Mqtt–>RocketMQ–>服务端(java控制程序)
发送消息:服务端(java控制程序)–>Mqtt–>设备
所以需要整合RockeMQ的消息消费者和Mqtt的消息生产者
server:
port: 8080
spring:
thymeleaf:
prefix: classpath:/templates/
suffix: .html
mode: HTML
encoding: UTF-8
cache: false
resources:
chain:
strategy:
content:
enabled: true
paths: /**
#rocketmq
aliyun.rocketmq:
onsAddr: http://XXXXXXXXXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80
accessKey: XXXXXXXXXXXXXXXXXX
secretKey: XXXXXXXXXXXXXXXXXX
groupId: GID_java_test
topic: Public
timeout: 3000
# mqtt
mqtt.msg:
instanceId: XXXXXXXXXXXXXXXXXX
accessKey: XXXXXXXXXXXXXXXXXX
secretKey: XXXXXXXXXXXXXXXXXX
connectEndpoint: XXXXXXXXXXXXXXXXXX
topicId: Public
groupId: GID_java_test
messageModel: CLUSTERING
sendMsgTimeoutMillis: 20000
suspendTimeMillis: 500
maxReconsumeTimes: 3
mqttClientTokenServer: mqauth.aliyuncs.com
mqttClientTokenExpireTime: 2592000000
mqttAction: R,W
clientId: 444
qosLevel: 0
timeToWait: 5000
#spring.main.allow-bean-definition-overriding: true
连接对象:
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix = "aliyun.rocketmq")
public class RocketMqProperties {
/**
* TCP 接入域名
*/
private String onsAddr;
/**
* 阿里云身份验证:AccessKey
*/
private String accessKey;
/**
* 阿里云身份验证:SecretKey
*/
private String secretKey;
/**
* Group管理配置group id
*/
private String groupId;
/**
* 配置的topic名
*/
private String topic;
/**
* 生产标签,可自定义,默认通配
*/
private String tag;
/**
* 超时时间
*/
private String timeout;
public String getOnsAddr() {
return onsAddr;
}
public void setOnsAddr(String onsAddr) {
this.onsAddr = onsAddr;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public String getTimeout() {
return timeout;
}
public void setTimeout(String timeout) {
this.timeout = timeout;
}
}
连接配置:
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Properties;
@Component
public class RocketMqConsumerConfig {
@Autowired
private RocketMqProperties rocketMqProperties;
private static Consumer consumer;
@PostConstruct
public void init(){
System.out.println("初始化启动消费者者!");
// listen 实例配置初始化
Properties properties = new Properties();
//您在控制台创建的Group ID
properties.setProperty(PropertyKeyConst.GROUP_ID, rocketMqProperties.getGroupId());
// AccessKey 阿里云身份验证
properties.setProperty(PropertyKeyConst.AccessKey, rocketMqProperties.getAccessKey());
// SecretKey 阿里云身份验证
properties.setProperty(PropertyKeyConst.SecretKey, rocketMqProperties.getSecretKey());
//设置发送超时时间(毫秒)
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, rocketMqProperties.getTimeout());
// 设置 TCP 接入域名
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, rocketMqProperties.getOnsAddr());
consumer = ONSFactory.createConsumer(properties);
//监听topic,new对应的监听器
consumer.subscribe(rocketMqProperties.getTopic(),rocketMqProperties.getTag(), new RocketMqListener());
// 在发送消息前,必须调用start方法来启动consumer,只需调用一次即可,当项目关闭时,自动shutdown
consumer.start();
}
/**
* 初始化消费者
* @return
*/
public Consumer getConsumer(){
return consumer;
}
}
消息监听:
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.zskj.config.utilsConfig.CacheUtil;
import com.zskj.service.UpgradeServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class RocketMqListener implements MessageListener {
private CacheUtil cacheUtil=CacheUtil.getInstance();
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
try {
/*byte[] body = message.getBody();
//获取到接收的消息,由于接收到的是byte数组,所以需要转换成字符串
String msg = new String(body);
System.out.println("RocketMQ收到消息一级主题:"+message.getTopic());
System.out.println("RocketMQ收到消息二级主题:"+message.getUserProperties().get("mqttSecondTopic"));
System.out.println("RocketMQ收到消息:"+msg);
System.out.println("=============");*/
if (message.getUserProperties().get("XXXTopic").equals("/XXXXX")){
JSONObject jsonObject=JSONObject.parseObject(new String(message.getBody()));
UpgradeServiceImpl.mqttData.put(jsonObject.getString("P"), new String(message.getBody()));
}else {
cacheUtil.add("XXXXX/"+message.getUserProperties().get("XXXTopic").toString().substring(1),new String(message.getBody()));
}
} catch (Exception e) {
e.printStackTrace();
return Action.ReconsumeLater;
}
return Action.CommitMessage;
}
}
配置生产对象:
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "mqtt.msg")
public class MqttConfig {
/**
* 可在阿里云控制台找到(实例id)
*/
private String instanceId;
/**
* accessKey
*/
private String accessKey;
/**
* 密钥
*/
private String secretKey;
/**
* TCP 协议接入点
*/
private String connectEndpoint;
/**
* 话题id
*/
private String topicId;
/**
* 群组id
*/
private String groupId;
/**
* 消息模式(广播订阅, 集群订阅)
*/
private String messageModel;
/**
* 超时时间
*/
private String sendMsgTimeoutMillis;
/**
* 顺序消息消费失败进行重试前的等待时间 单位(毫秒)
*/
private String suspendTimeMillis;
/**
* 消息消费失败时的最大重试次数
*/
private String maxReconsumeTimes;
/**
* 公网token服务器
*/
private String mqttClientTokenServer;
/**
* 过期时间(默认1个月)
*/
private Long mqttClientTokenExpireTime;
/**
* 分发给客户端的token的操作权限
*/
private String mqttAction;
/**
* 客户端标识
*/
private String clientId;
/**
* QoS参数代表传输质量,可选0,1,2,根据实际需求合理设置,具体参考 https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
*/
private int qosLevel = 0;
/**
* 客户端超时时间
*/
private int timeToWait;
public String getInstanceId() {
return instanceId;
}
public void setInstanceId(String instanceId) {
this.instanceId = instanceId;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getConnectEndpoint() {
return connectEndpoint;
}
public void setConnectEndpoint(String connectEndpoint) {
this.connectEndpoint = connectEndpoint;
}
public String getTopicId() {
return topicId;
}
public void setTopicId(String topicId) {
this.topicId = topicId;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public String getMessageModel() {
return messageModel;
}
public void setMessageModel(String messageModel) {
this.messageModel = messageModel;
}
public String getSendMsgTimeoutMillis() {
return sendMsgTimeoutMillis;
}
public void setSendMsgTimeoutMillis(String sendMsgTimeoutMillis) {
this.sendMsgTimeoutMillis = sendMsgTimeoutMillis;
}
public String getSuspendTimeMillis() {
return suspendTimeMillis;
}
public void setSuspendTimeMillis(String suspendTimeMillis) {
this.suspendTimeMillis = suspendTimeMillis;
}
public String getMaxReconsumeTimes() {
return maxReconsumeTimes;
}
public void setMaxReconsumeTimes(String maxReconsumeTimes) {
this.maxReconsumeTimes = maxReconsumeTimes;
}
public String getMqttClientTokenServer() {
return mqttClientTokenServer;
}
public void setMqttClientTokenServer(String mqttClientTokenServer) {
this.mqttClientTokenServer = mqttClientTokenServer;
}
public Long getMqttClientTokenExpireTime() {
return mqttClientTokenExpireTime;
}
public void setMqttClientTokenExpireTime(Long mqttClientTokenExpireTime) {
this.mqttClientTokenExpireTime = mqttClientTokenExpireTime;
}
public String getMqttAction() {
return mqttAction;
}
public void setMqttAction(String mqttAction) {
this.mqttAction = mqttAction;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public int getQosLevel() {
return qosLevel;
}
public void setQosLevel(int qosLevel) {
this.qosLevel = qosLevel;
}
public int getTimeToWait() {
return timeToWait;
}
public void setTimeToWait(int timeToWait) {
this.timeToWait = timeToWait;
}
}
连接配置:
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import static org.eclipse.paho.client.mqttv3.MqttConnectOptions.MQTT_VERSION_3_1_1;
@Configuration
public class MqttBeanConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttConnectOptions getMqttConnectOptions() throws NoSuchAlgorithmException, InvalidKeyException {
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
//组装用户名密码
mqttConnectOptions.setUserName("Signature|" + mqttConfig.getAccessKey() + "|" + mqttConfig.getInstanceId());
//密码签名
mqttConnectOptions.setPassword(Tools.macSignature(mqttConfig.getGroupId()+"@@@"+mqttConfig.getClientId(), mqttConfig.getSecretKey()).toCharArray());
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setKeepAliveInterval(20);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);
//连接超时时间
mqttConnectOptions.setConnectionTimeout(0);
mqttConnectOptions.setKeepAliveInterval(2);
return mqttConnectOptions;
}
@Bean(initMethod = "start", destroyMethod = "shutdown")
public MqttClientBean getClient() {
return new MqttClientBean();
}
}
连接客服端:
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqttClientBean {
private MqttClient mqttClient;
@Autowired
private MqttConnectOptions mqttConnectOptions;
@Autowired
private MqttConfig mqttConfig;
private void start() throws MqttException {
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* 客户端使用的协议和端口必须匹配,具体参考文档 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
* 如果是 SSL 加密则设置ssl://endpoint:8883
*/
this.mqttClient= new MqttClient("tcp://" + mqttConfig.getConnectEndpoint() + ":1883",
mqttConfig.getGroupId() + "@@@" + mqttConfig.getClientId(), memoryPersistence);
mqttClient.setTimeToWait(mqttConfig.getTimeToWait());
mqttClient.connect(mqttConnectOptions);
}
private void shutdown() throws MqttException {
this.mqttClient.disconnect();
}
@Bean
public MqttClient getMqttClient() throws MqttException {
start();
return this.mqttClient;
}
}
发送消息生产者:
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class IMqttSender {
@Autowired
private MqttClientBean mqttClientBean;
@Autowired
private MqttClient mqttClient;
@Autowired
private MqttConfig mqttConfig;
public void sendToMqtt(String topic, String msg){
try {
MqttMessage mqttMessage = new MqttMessage(msg.getBytes());
mqttMessage.setQos(0);
/*MqttClient mqttClient=mqttClientBean.getMqttClient();*/
mqttClient.publish(topic,mqttMessage);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.nio.charset.Charset;
import java.security.*;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.*;
public class Tools {
public static Properties loadProperties() {
Properties properties = new Properties();
try {
properties.load(ClassLoader.getSystemResourceAsStream("test.properties"));
} catch (IOException e) {
}
return properties;
}
/**
* 计算签名,参数分别是参数对以及密钥
*
* @param requestParams 参数对,即参与计算签名的参数
* @param secretKey 密钥
* @return 签名字符串
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
*/
public static String doHttpSignature(Map<String, String> requestParams,
String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
List<String> paramList = new ArrayList<String>();
for (Map.Entry<String, String> entry : requestParams.entrySet()) {
paramList.add(entry.getKey() + "=" + entry.getValue());
}
Collections.sort(paramList);
StringBuffer sb = new StringBuffer();
for (int i = 0; i < paramList.size(); i++) {
if (i > 0) {
sb.append('&');
}
sb.append(paramList.get(i));
}
return macSignature(sb.toString(), secretKey);
}
/**
* @param text 要签名的文本
* @param secretKey 阿里云MQ secretKey
* @return 加密后的字符串
* @throws InvalidKeyException
* @throws NoSuchAlgorithmException
*/
public static String macSignature(String text,
String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
Charset charset = Charset.forName("UTF-8");
String algorithm = "HmacSHA1";
Mac mac = Mac.getInstance(algorithm);
mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
byte[] bytes = mac.doFinal(text.getBytes(charset));
return new String(Base64.encodeBase64(bytes), charset);
}
/**
* 创建HTTPS 客户端
*/
private static CloseableHttpClient httpClient = null;
public static CloseableHttpClient getHttpsClient() throws KeyStoreException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyManagementException {
if (httpClient != null) {
return httpClient;
}
X509TrustManager xtm = new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] arg0, String arg1)
throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] arg0, String arg1)
throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[] {};
}
};
SSLContext context = SSLContext.getInstance("TLS");
context.init(null, new TrustManager[] {xtm}, null);
SSLConnectionSocketFactory scsf = new SSLConnectionSocketFactory(context, NoopHostnameVerifier.INSTANCE);
Registry<ConnectionSocketFactory> sfr = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.INSTANCE)
.register("https", scsf).build();
RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(1000).setConnectTimeout(5000).setSocketTimeout(5000).build();
PoolingHttpClientConnectionManager pcm = new PoolingHttpClientConnectionManager(sfr);
httpClient = HttpClientBuilder.create().setConnectionManager(pcm).setDefaultRequestConfig(requestConfig).build();
return httpClient;
}
/**
* 发起Https Get请求,并得到返回的JSON响应
*
* @param url 接口Url
* @param params 参数u对
* @return
* @throws IOException
* @throws KeyStoreException
* @throws UnrecoverableKeyException
* @throws NoSuchAlgorithmException
* @throws KeyManagementException
*/
public static JSONObject httpsGet(String url,
Map<String, String> params) throws IOException, KeyStoreException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyManagementException {
CloseableHttpClient client = Tools.getHttpsClient();
JSONObject jsonResult = null;
//发送get请求
List<NameValuePair> urlParameters = new ArrayList<NameValuePair>();
for (Map.Entry<String, String> entry : params.entrySet()) {
urlParameters.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
}
String paramUrl = URLEncodedUtils.format(urlParameters, Charset.forName("UTF-8"));
HttpGet request = new HttpGet(url + "?" + paramUrl);
CloseableHttpResponse response = null;
try {
response = client.execute(request);
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String strResult = EntityUtils.toString(response.getEntity());
jsonResult = JSON.parseObject(strResult);
}
} finally {
if (response != null) {
response.close();
}
}
return jsonResult;
}
/**
* 工具方法,发送一个http post请求,并尝试将响应转换为JSON
*
* @param url 请求的方法名url
* @param params 参数表
* @return 如果请求成功则返回JSON, 否则抛异常或者返回空
* @throws IOException
*/
public static JSONObject httpsPost(String url,
Map<String, String> params) throws IOException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
JSONObject jsonResult = null;
//发送get请求
CloseableHttpClient client = getHttpsClient();
HttpPost request = new HttpPost(url);
List<NameValuePair> urlParameters = new ArrayList<NameValuePair>();
for (Map.Entry<String, String> entry : params.entrySet()) {
urlParameters.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
}
HttpEntity postParams = new UrlEncodedFormEntity(urlParameters);
request.setEntity(postParams);
CloseableHttpResponse response = null;
try {
response = client.execute(request);
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String strResult = EntityUtils.toString(response.getEntity());
jsonResult = JSON.parseObject(strResult);
}
} finally {
if (response != null) {
response.close();
}
}
return jsonResult;
}
}