当前位置: 首页 > 工具软件 > smart-mqtt > 使用案例 >

spring-integration-mqtt

宗苗宣
2023-12-01

文章只做了spring集成mqtt的接收消息部分

前期需要准备的东西:

1. mqtt服务器, org.smartboot.mqtt.broker.Bootstrap, 直接启动

https://gitee.com/smartboot/smart-mqtt.git

2. mqtt发消息工具

https://mqttx.app/zh

下面直接上代码

pom.xml添加

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.5.12</version>
</dependency>

 application.yml配置文件: 

mqtt:
  username:
  password:
  host-url: tcp://127.0.0.1:1883
  in-client-id: ${random.value}
  out-client-id: ${random.value}
  client-id: ${random.int}
  default-topic: test/#,topic/+/+/up
  timeout: 60
  keepalive: 60
  clearSession: true
  maxPoolSize: 100
  corePoolSize: 10
  queueCapacity: 1000
  keepAliveSeconds: 500

 配置文件1:

import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
@Getter
@Setter
public class MqttProperties {

    /**
     * 用户名
     */
    @Value("${mqtt.username}")
    private String username;

    /**
     * 密码
     */
    @Value("${mqtt.password}")
    private String password;

    /**
     * 连接地址
     */
    @Value("${mqtt.host-url}")
    private String hostUrl;

    /**
     * 进-客户Id
     */
    @Value("${mqtt.in-client-id}")
    private String inClientId;

    /**
     * 出-客户Id
     */
    @Value("${mqtt.out-client-id}")
    private String outClientId;

    /**
     * 客户Id
     */
    @Value("${mqtt.client-id}")
    private String clientId;

    /**
     * 默认连接话题
     */
    @Value("${mqtt.default-topic}")
    private String defaultTopic;

    /**
     * 超时时间
     */
    @Value("${mqtt.timeout}")
    private int timeout;

    /**
     * 保持连接数
     */
    @Value("${mqtt.keepalive}")
    private int keepalive;

    /**
     * 是否清除session
     */
    @Value("${mqtt.clearSession}")
    private boolean clearSession;

    @Value("${mqtt.maxPoolSize}")
    private int maxPoolSize;

    @Value("${mqtt.corePoolSize}")
    private int corePoolSize;

    @Value("${mqtt.queueCapacity}")
    private int queueCapacity;

    @Value("${mqtt.keepAliveSeconds}")
    private int keepAliveSeconds;

}

配置文件2: 

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class MqttConfig {

    @Autowired
    private MqttProperties mqttProperties;

    @Autowired
    private MqttMessageHandle mqttMessageHandle;

    //Mqtt 客户端工厂 所有客户端从这里产生
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    // Mqtt 管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory) {
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(), factory, mqttProperties.getDefaultTopic().split(","));
    }

    // 接收, 处理来自mqtt的消息
    @Bean
    public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {
        adapter.setCompletionTimeout(5000);
        adapter.setQos(2);
        return IntegrationFlows.from(adapter)
                .channel(new ExecutorChannel(mqttThreadPoolTaskExecutor()))
                .handle(mqttMessageHandle)
                .get();
    }

    @Bean
    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        int maxPoolSize = mqttProperties.getMaxPoolSize();
        int corePoolSize = mqttProperties.getCorePoolSize();
        int queueCapacity = mqttProperties.getQueueCapacity();
        int keepAliveSeconds = mqttProperties.getKeepAliveSeconds();
        executor.setMaxPoolSize(maxPoolSize);
        executor.setCorePoolSize(corePoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

}

消息处理器:

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

@Component
public class MqttMessageHandle implements MessageHandler {
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        System.out.println("==========" + message.getPayload());
        System.out.println("==========" + message.getHeaders());
    }
}

文章参考:

https://blog.csdn.net/weixin_42230797/article/details/126507310

感谢~

 类似资料: