1. mqtt服务器, org.smartboot.mqtt.broker.Bootstrap, 直接启动
https://gitee.com/smartboot/smart-mqtt.git
2. mqtt发消息工具
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.12</version>
</dependency>
mqtt:
username:
password:
host-url: tcp://127.0.0.1:1883
in-client-id: ${random.value}
out-client-id: ${random.value}
client-id: ${random.int}
default-topic: test/#,topic/+/+/up
timeout: 60
keepalive: 60
clearSession: true
maxPoolSize: 100
corePoolSize: 10
queueCapacity: 1000
keepAliveSeconds: 500
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
@Getter
@Setter
public class MqttProperties {
/**
* 用户名
*/
@Value("${mqtt.username}")
private String username;
/**
* 密码
*/
@Value("${mqtt.password}")
private String password;
/**
* 连接地址
*/
@Value("${mqtt.host-url}")
private String hostUrl;
/**
* 进-客户Id
*/
@Value("${mqtt.in-client-id}")
private String inClientId;
/**
* 出-客户Id
*/
@Value("${mqtt.out-client-id}")
private String outClientId;
/**
* 客户Id
*/
@Value("${mqtt.client-id}")
private String clientId;
/**
* 默认连接话题
*/
@Value("${mqtt.default-topic}")
private String defaultTopic;
/**
* 超时时间
*/
@Value("${mqtt.timeout}")
private int timeout;
/**
* 保持连接数
*/
@Value("${mqtt.keepalive}")
private int keepalive;
/**
* 是否清除session
*/
@Value("${mqtt.clearSession}")
private boolean clearSession;
@Value("${mqtt.maxPoolSize}")
private int maxPoolSize;
@Value("${mqtt.corePoolSize}")
private int corePoolSize;
@Value("${mqtt.queueCapacity}")
private int queueCapacity;
@Value("${mqtt.keepAliveSeconds}")
private int keepAliveSeconds;
}
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class MqttConfig {
@Autowired
private MqttProperties mqttProperties;
@Autowired
private MqttMessageHandle mqttMessageHandle;
//Mqtt 客户端工厂 所有客户端从这里产生
@Bean
public MqttPahoClientFactory mqttPahoClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(mqttProperties.getHostUrl().split(","));
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
factory.setConnectionOptions(options);
return factory;
}
// Mqtt 管道适配器
@Bean
public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory) {
return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(), factory, mqttProperties.getDefaultTopic().split(","));
}
// 接收, 处理来自mqtt的消息
@Bean
public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {
adapter.setCompletionTimeout(5000);
adapter.setQos(2);
return IntegrationFlows.from(adapter)
.channel(new ExecutorChannel(mqttThreadPoolTaskExecutor()))
.handle(mqttMessageHandle)
.get();
}
@Bean
public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int maxPoolSize = mqttProperties.getMaxPoolSize();
int corePoolSize = mqttProperties.getCorePoolSize();
int queueCapacity = mqttProperties.getQueueCapacity();
int keepAliveSeconds = mqttProperties.getKeepAliveSeconds();
executor.setMaxPoolSize(maxPoolSize);
executor.setCorePoolSize(corePoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
public class MqttMessageHandle implements MessageHandler {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("==========" + message.getPayload());
System.out.println("==========" + message.getHeaders());
}
}
文章参考:
https://blog.csdn.net/weixin_42230797/article/details/126507310
感谢~