当前位置: 首页 > 工具软件 > mica-mqtt > 使用案例 >

spring实现mqtt服务端_SpringBoot 集成MQTT配置

微生智刚
2023-12-01

1. 前言

公司的IOT平台主要采用MQTT(消息队列遥测传输)对底层的驱动做命令下发和数据采集。也用到了redis、zeroMQ、nats等消息中间件。今天先整理SpringBoot集成MQTT笔记和工作中遇到的问题。

2. MQTT介绍

MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.

MQTT除了具备大部分消息中间件拥有的功能外,其最大的特点就是小型传输。以减少开销,减低网络流量的方式去满足低带宽、不稳定的网络远程传输。

MQTT服务器有很多,比如Apache-Apollo和EMQX,ITDragon龙 目前使用的时EMQX作为MQTT的服务器。使用也很简单,下载解压后,进入bin目录执行emqx console 启动服务。

MQTT调试工具可以用MQTTBox

3. SpringBoot 集成MQTT

3.1 导入mqtt库

第一步:导入面向企业应用集成库和对应mqtt集成库

compile('org.springframework.boot:spring-boot-starter-integration')

compile('org.springframework.integration:spring-integration-mqtt')

这里要注意spring-integration-mqtt的版本。因为会存在org.eclipse.paho.client.mqttv3修复了一些bug,并迭代了新版本。但spring-integration-mqtt并没有及时更新的情况。修改方法如下

compile("org.springframework.integration:spring-integration-mqtt") {

exclude group: "org.eclipse.paho" , module: "org.eclipse.paho.client.mqttv3"

}

compile("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.2")

第二步:MQTT连接配置文件

# MQTT Config

mqtt.server=tcp://x.x.x.x:1883

mqtt.username=xxx

mqtt.password=xxx

mqtt.client-id=clientID

mqtt.cache-number=100

mqtt.message.topic=itDragon/tags/cov

3.2 配置MQTT订阅者

第一步:配置MQTT客户端工厂类DefaultMqttPahoClientFactory

第二步:配置MQTT入站消息适配器MqttPahoMessageDrivenChannelAdapter

第三步:定义MQTT入站消息通道MessageChannel

第四步:声明MQTT入站消息处理器MessageHandler

以下有些配置是冲突或者重复的,主要是体现一些重要配置。

package com.itdragon.server.config

import com.itdragon.server.message.ITDragonMQTTMessageHandler

import org.eclipse.paho.client.mqttv3.MqttConnectOptions

import org.springframework.beans.factory.annotation.Value

import org.springframework.context.annotation.Bean

import org.springframework.context.annotation.Configuration

import org.springframework.integration.annotation.ServiceActivator

import org.springframework.integration.channel.DirectChannel

import org.springframework.integration.core.MessageProducer

import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory

import org.springframework.integration.mqtt.core.MqttPahoClientFactory

import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter

import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter

import org.springframework.messaging.MessageChannel

import org.springframework.messaging.MessageHandler

import java.time.Instant

@Configuration

class MQTTConfig {

@Value("\${mqtt.server}")

lateinit var mqttServer: String

@Value("\${mqtt.user-name}")

lateinit var mqttUserName: String

@Value("\${mqtt.password}")

lateinit var mqttUserPassword: String

@Value("\${mqtt.client-id}")

lateinit var clientID: String

@Value("\${mqtt.cache-number}")

lateinit var maxMessageInFlight: String

@Value("\${mqtt.message.topic}")

lateinit var messageTopic: String

/**

* 配置DefaultMqttPahoClientFactory

* 1. 配置基本的链接信息

* 2. 配置maxInflight,在mqtt消息量比较大的情况下将值设大

*/

fun mqttClientFactory(): MqttPahoClientFactory {

val mqttConnectOptions = MqttConnectOptions()

// 配置mqtt服务端地址,登录账号和密码

mqttConnectOptions.serverURIs = arrayOf(mqttServer)

mqttConnectOptions.userName = mqttUserName

mqttConnectOptions.password = mqttUserPassword.toCharArray()

// 配置最大不确定接收消息数量,默认值10,qos!=0 时生效

mqttConnectOptions.maxInflight = maxMessageInFlight.toInt()

val factory = DefaultMqttPahoClientFactory()

factory.connectionOptions = mqttConnectOptions

return factory

}

/**

* 配置Inbound入站,消费者基本连接配置

* 1. 通过DefaultMqttPahoClientFactory 初始化入站通道适配器

* 2. 配置超时时长,默认30000毫秒

* 3. 配置Paho消息转换器

* 4. 配置发送数据的服务质量 0~2

* 5. 配置订阅通道

*/

@Bean

fun itDragonMqttInbound(): MessageProducer {

// 初始化入站通道适配器,使用的是Eclipse Paho MQTT客户端库

val adapter = MqttPahoMessageDrivenChannelAdapter(clientID + Instant.now().toEpochMilli(), mqttClientFactory(), messageTopic)

// 设置连接超时时长(默认30000毫秒)

adapter.setCompletionTimeout(30000)

// 配置默认Paho消息转换器(qos=0, retain=false, charset=UTF-8)

adapter.setConverter(DefaultPahoMessageConverter())

// 设置服务质量

// 0 最多一次,数据可能丢失;

// 1 至少一次,数据可能重复;

// 2 只有一次,有且只有一次;最耗性能

adapter.setQos(0)

// 设置订阅通道

adapter.outputChannel = itDragonMqttInputChannel()

return adapter

}

/**

* 配置Inbound入站,消费者订阅的消息通道

*/

@Bean

fun itDragonMqttInputChannel(): MessageChannel {

return DirectChannel()

}

/**

* 配置Inbound入站,消费者的消息处理器

* 1. 使用@ServiceActivator注解,表明所修饰的方法用于消息处理

* 2. 使用inputChannel值,表明从指定通道中取值

* 3. 利用函数式编程的思路,解耦MessageHandler的业务逻辑

*/

@Bean

@ServiceActivator(inputChannel = "itDragonMqttInputChannel")

fun commandDataHandler(): MessageHandler {

/*return MessageHandler { message ->

println(message.payload)

}*/

return ITDragonMQTTMessageHandler()

}

}

注意:

1)MQTT的客户端ID要唯一。

2)MQTT在消息量大的情况下会出现消息丢失的情况。

3)MessageHandler注意解耦问题。

3.3 配置MQTT发布者

第一步:配置Outbound出站,出站通道适配器

第二步:配置Outbound出站,发布者发送的消息通道

第三步:对外提供推送消息的接口

在原有的MQTTConfig配置类的集成上补充以下内容

/**

* 配置Outbound出站,出站通道适配器

* 1. 通过MqttPahoMessageHandler 初始化出站通道适配器

* 2. 配置异步发送

* 3. 配置默认的服务质量

*/

@Bean

@ServiceActivator(inputChannel = "itDragonMqttOutputChannel")

fun itDragonMqttOutbound(): MqttPahoMessageHandler {

// 初始化出站通道适配器,使用的是Eclipse Paho MQTT客户端库

val messageHandler = MqttPahoMessageHandler(clientID + Instant.now().toEpochMilli() + "_set", mqttClientFactory())

// 设置异步发送,默认是false(发送时阻塞)

messageHandler.setAsync(true)

// 设置默认的服务质量

messageHandler.setDefaultQos(0)

return messageHandler

}

/**

* 配置Outbound出站,发布者发送的消息通道

*/

@Bean

fun itDragonMqttOutputChannel(): MessageChannel {

return DirectChannel()

}

/**

* 对外提供推送消息的接口

* 1. 使用@MessagingGateway注解,配置MQTTMessageGateway消息推送接口

* 2. 使用defaultRequestChannel值,调用时将向其发送消息的默认通道

* 3. 配置灵活的topic主题

*/

@MessagingGateway(defaultRequestChannel = "itDragonMqttOutputChannel")

interface MQTTMessageGateway {

fun sendToMqtt(data: String, @Header(MqttHeaders.TOPIC) topic: String)

fun sendToMqtt(data: String, @Header(MqttHeaders.QOS) qos: Int, @Header(MqttHeaders.TOPIC) topic: String)

}

注意:

1)发布者和订阅者的客户端ID不能相同。

2)消息的推送建议采用异步的方式。

3)消息的推送方法可以只传payload消息体,但需要配置setDefaultTopic。

3.4 MQTT消息处理和发送

3.4.1 消息处理

为了让消息处理函数和MQTT配置解耦,这里提供MessageHandler 注册类,将消息处理的业务逻辑以函数式编程的思维注册到Handler中。

package com.itdragon.server.message

import org.springframework.messaging.Message

import org.springframework.messaging.MessageHandler

class ITDragonMQTTMessageHandler : MessageHandler {

private var handler: ((String) -> Unit)? = null

fun registerHandler(handler: (String) -> Unit) {

this.handler = handler

}

override fun handleMessage(message: Message) {

handler?.run { this.invoke(message.payload.toString()) }

}

}

注册MessageHandler

package com.itdragon.server.message

import org.slf4j.LoggerFactory

import org.springframework.beans.factory.annotation.Autowired

import org.springframework.stereotype.Service

import javax.annotation.PostConstruct

@Service

class ITDragonMessageDispatcher {

private val logger = LoggerFactory.getLogger(ITDragonMessageDispatcher::class.java)

@Autowired

lateinit var itDragonMQTTMessageHandler: ITDragonMQTTMessageHandler

@PostConstruct

fun init() {

itDragonMQTTMessageHandler.registerHandler { itDragonMsgHandler(it) }

}

fun itDragonMsgHandler(message: String) {

logger.info("itdragon mqtt receive message: $message")

try {

// todo

}catch (ex: Exception) {

ex.printStackTrace()

}

}

}

3.4.1 消息发送

注入MQTT的MessageGateway,然后推送消息。

@Autowired

lateinit var mqttGateway: MQTTConfig.MQTTMessageGateway

@Scheduled(fixedDelay = 10*1000)

fun sendMessage() {

mqttGateway.sendToMqtt("Hello ITDragon ${Instant.now()}", "itDragon/tags/cov/set")

}

4. 开发常见问题

4.1 MQTT每次重连失败都会增长线程数

项目上线一段时间后,客户的服务器严重卡顿。原因是客户服务断网后,MQTT在每次尝试重连的过程中一直在创建新的线程,导致一个Java服务创建了上万个线程。解决方案是更新了org.eclipse.paho.client.mqttv3的版本,也是 "3.1 导入mqtt库" 中提到的。后续就没有出现这个问题了。

4.2 MQTT消息量大存在消息丢失的情况

MQTT的消息量大的情况下,既要保障数据的完整,又要保障性能的稳定。光从MQTT本身上来说,很难做到鱼和熊掌不可兼得。ITDragon龙 先要理清需求:

1)数据的完整性,主要用于能耗的统计、报警的分析

2)性能的稳定性,服务器不挂藍藍藍藍

在消息量大的情况下,ITDragon龙 可以将服务质量设置成0(最多一次)以减少消息确认的开销,用来保证系统的稳定性。

将消息的服务质量设置成0后,会让消息的丢失可能性变得更大,如何保证数据的完整性?其实ITDragon龙 可以在往MQTT通道推送消息之前,先将底层驱动采集的数据先异步保存到Inflxudb数据库中。

还有就是每次发送消息量不能太大,太大也会导致消息丢失。最直接的就是后端报错,比如:java.io.EOFException 和 too large message: xxx bytes 。但是有的场景后端没有报错,前端订阅的mqtt也没收到消息。最麻烦的是mqttbox工具因为数据量太大直接卡死。一时间真不知道把锅甩给谁。其实我们 可以将消息拆包一批批发送。可以缓解这个问题藍藍藍藍。

其实采集的数据消息,若在这一批推送过程中丢失。也会在下一批推送过程中补上。命令下发也是一样,如果下发失败,再重写下发一次。毕竟消息的丢失并不是必现的情况。也是小概率事件,系统的稳定性才是最重要的。

 类似资料: