MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是用于物联网(IoT)的OASIS标准消息传递协议。发布/订阅是连接远程消息传递设备的理想选择,因为它具有非常小的网络带宽。MQTT目前广泛应用于各种行业,如汽车、制造业、电信、石油和天然气等。
对于MQTT broker,目前主流的实现有EMQ,mosquito,HiveMQ等,但是并没有一个很完整的Go语言实现。目前的开源的Go实现对MQTT协议的支持基本上都是缺胳膊少腿,而Gmqtt完整的实现了MQTT V3.1.1和最新的V5协议,应该是Go语言中对MQTT协议支持最完整的项目。
Gmqtt的诞生是由于之前工作的项目需要,要在MQTT broker里面定制化许多业务逻辑,调研了一些broker都不尽满意,于是乎就撸起袖子自己干,造了这么一个轮子。起初只支持V3.1.1版本,但本着尽善尽美的原则(本人有强迫症),放弃了撸铁时间,肝了一段时间,把V5的特性也全部支持了。
跟所有的Go项目一样,go get下载即可。
$ go get -u github.com/DrmagicE/gmqtt
$ cd cmd/gmqttd
$ go run . start -c default_config.yml
2020-12-13T23:11:54.037+0800 INFO server/server.go:996 init plugin hook wrappers
2020-12-13T23:11:54.037+0800 INFO server/server.go:802 open persistence succeeded {"type": "memory"}
2020-12-13T23:11:54.037+0800 INFO server/server.go:825 init session store succeeded {"type": "memory", "session_total": 0}
2020-12-13T23:11:54.037+0800 INFO server/server.go:842 init queue store succeeded {"type": "memory", "session_total": 0}
2020-12-13T23:11:54.037+0800 INFO server/server.go:843 init subscription store succeeded {"type": "memory", "client_total": 0}
2020-12-13T23:11:54.037+0800 INFO server/server.go:1218 loading plugin {"name": "prometheus"}
2020-12-13T23:11:54.037+0800 INFO server/server.go:1218 loading plugin {"name": "admin"}
2020-12-13T23:11:54.038+0800 INFO server/server.go:1259 starting gmqtt server {"tcp server listen on": ["[::]:1883"], "websocket server listen on": [":8883"]}
使用上述的命令将使用默认配置default_config.yml
启动gmqtt,监听1883端口提供TCP服务和8883端口提供websocket服务。Gmqtt默认配置没有启用鉴权,客户端不需配置鉴权可以直接连接。
Gmqtt具备极强的扩展性,你几乎可以通过定制化插件来定制任何逻辑。例如通过HTTP/gRPC接口来查询客户端信息,强制断开连接,订阅主题,发布消息等等。这极强的扩展性得益于gmqtt提供的丰富的钩子函数,以及其内置的扩展接口。
目前,gmqtt提供了17个钩子函数。
hook | 说明 | 用途示例 |
---|---|---|
OnAccept | TCP连接建立时调用 | TCP连接限速,黑白名单等. |
OnStop | Broker退出时调用 | |
OnSubscribe | 收到订阅请求时调用 | 校验订阅是否合法 |
OnSubscribed | 订阅成功后调用 | 统计订阅报文数量 |
OnUnsubscribe | 取消订阅时调用 | 校验是否允许取消订阅 |
OnUnsubscribed | 取消订阅成功后调用 | 统计订阅报文数 |
OnMsgArrived | 收到消息发布报文时调用 | 校验发布权限,改写发布消息 |
OnBasicAuth | 收到连接请求报文时调用 | 客户端连接鉴权 |
OnEnhancedAuth | 收到带有AuthMetho的连接请求报文时调用(V5特性) | 客户端连接鉴权 |
OnReAuth | 收到Auth报文时调用(V5特性) | 客户端连接鉴权 |
OnConnected | 客户端连接成功后调用 | 统计在线客户端数量 |
OnSessionCreated | 客户端创建新session后调用 | 统计session数量 |
OnSessionResumed | 客户端从旧session恢复后调用 | 统计session数量 |
OnSessionTerminated | session删除后调用 | 统计session数量 |
OnDeliver | 消息从broker投递到客户端后调用 | |
OnClosed | 客户端断开连接后调用 | 统计在线客户端数量 |
OnMsgDropped | 消息被丢弃时调用 |
https://github.com/DrmagicE/gmqtt/blob/master/server/hook.go#L11
举其中常用的OnBasicAuth
,OnSubscribe
,OnMsgArrived
为例,说明如何通过这些函数来定制化鉴权逻辑。
我们在内存中保存以下6个客户端的用户名密码。
var validUser = map[string]string{
"root": "pwd", // root用户拥有所有权限
"qos0": "pwd", // qos0用户最高只允许订阅qos0主题
"qos1": "pwd", // qos1用户最高只允许订阅qos1主题
"publishonly": "pwd", // publishonly用户只允许发布,不允许订阅
"subscribeonly": "pwd", // subscribeonly用户只允许订阅,不允许发布
"disable_shared": "pwd", // disable_shared用户禁止订阅表示共享订阅的主题(V5特性)
}
除去以上的针对用户的权限设置外,假设我们由于性能因素的考虑,只允许发布QoS1的消息,忽略所有QoS2消息。
//authentication
var onBasicAuth server.OnBasicAuth = func(ctx context.Context, client server.Client, req *server.ConnectRequest) error {
username := string(req.Connect.Username)
password := string(req.Connect.Password)
// 校验用户名密码
if validateUser(username, password) {
if username == "disable_shared" {
// 禁用共享订阅
req.Options.SharedSubAvailable = false
}
return nil
}
// 检查客户端的版本,兼容V311和V5不同的错误码返回
switch client.Version() {
case packets.Version5:
return codes.NewError(codes.BadUserNameOrPassword)
case packets.Version311:
return codes.NewError(codes.V3BadUsernameorPassword)
}
// 校验通过返回nil
return nil
}
可以看到,在OnBasicAuth
这个钩子函数中,我们能拿到鉴权所需的必要信息,例如username
,password
,除了这两个信息外,还有很多其他信息,例如clientID
,IP地址等等,均可以用来作为鉴权的参数。如果判断鉴权失败,则返回MQTT定义的错误码。如果判断鉴权成功,返回nil
即可。
// subscription acl
var onSubscribe server.OnSubscribe = func(ctx context.Context, client server.Client, req *server.SubscribeRequest) error {
// 获取用户名。几乎在所有的钩子函数里,都可以获取客户端的必要信息
username := client.ClientOptions().Username
// 遍历当次订阅请求中的所有订阅消息
for k,v := range req.Subscriptions {
switch username {
case "root":
// 如果是root用户,他想订阅什么都可以
case "qos0":
// 如果是qos0用户,那么他最多只能订阅qos0等级
req.GrantQoS(k, packets.Qos0)
case "qos1":
// 如果是qos1用户,最多只能订阅qos1等级
if v.Sub.QoS > packets.Qos1 {
req.GrantQoS(k, packets.Qos1)
}
case "publishonly":
// 对于只允许发布的客户端,拒绝一切订阅
req.Reject(k, &codes.Error{
Code: codes.NotAuthorized,
ErrorDetails: codes.ErrorDetails{
ReasonString: []byte("publish only"),
},
})
}
}
return nil
}
var onMsgArrived server.OnMsgArrived = func(ctx context.Context, client server.Client, req *server.MsgArrivedRequest) error {
version := client.Version()
if client.ClientOptions().Username == "subscribeonly" {
switch version {
case packets.Version311:
// 对于V311协议来说,如果服务端不允许客户端发布某条消息,由于没有任何通知机制,服务端只能选择回复一个正常的ACK。
// 或者把客户端连接断开。[MQTT-3.3.5-2].
// 我们丢弃这个报文。
req.Drop()
// 或者我们也可以强硬一点,直接把客户端连接断开
// client.Close()
return nil
case packets.Version5:
// 对于V5来说,V5引入了错误码的回复机制,因此我们可以回复一个错误码来告诉客户端没有权限。
return &codes.Error{
Code: codes.NotAuthorized,
}
// 或者你依然可以强硬一点,关闭客户端,但由于V5支持由服务端给客户端发Disconnect报文
// 所以用client.Disconnect()来代替client.Close()对于V5来说是更好的选择
//req.Drop()
//client.Disconnect(&packets.Disconnect{
// Version: packets.Version5,
// Code: codes.UnspecifiedError,
//})
//return
}
}
if req.Message.QoS == packets.Qos2 {
// 由于最高允许QoS1消息,这里丢弃所有QoS2消息
req.Drop()
return &codes.Error{
Code: codes.NotAuthorized,
ErrorDetails: codes.ErrorDetails{
ReasonString: []byte("not authorized"),
UserProperties: []struct {
K []byte
V []byte
}{
{
K: []byte("user property key"),
V: []byte("user property value"),
},
},
},
}
}
return nil
}
上述完整的代码可以在这里找到:
https://github.com/DrmagicE/gmqtt/blob/v0.2.2/examples/hook/main.go
https://github.com/DrmagicE/gmqtt/blob/v0.2.2/server/server.go#L80
// Server interface represents a mqtt server instance.
type Server interface {
// Publisher 允许向broker发送MQTT消息
Publisher() Publisher
// GetConfig 返回当前配置文件
GetConfig() config.Config
// StatsManager 返回状态统计
StatsManager() StatsReader
...
// ClientService 提供对客户端的查询,强制离线,强制清除session等操作。
ClientService() ClientService
// SubscriptionService 允许对订阅进行增删改查等操作。
SubscriptionService() SubscriptionService
// RetainedService 提供对保留消息的增删改查操作
RetainedService() RetainedService
...
}
Gmqtt提供了上述接口来提供扩展能力,这些扩展接口通常会被插件调用。可以看到,通过这些扩展接口,我们可以通过函数调用来向broker发消息,对主题进行增删改查,查询客户端连接等等功能。
基于钩子函数和扩展接口,开发者可以通过编写插件来灵活扩展gmqtt的能力。目前gmqtt内置了三个插件,auth
鉴权,prometheus
监控以及admin
API管理插件。插件相关接口定义:plugin.go
// HookWrapper groups all hook wrappers function
type HookWrapper struct {
OnBasicAuthWrapper OnBasicAuthWrapper
OnEnhancedAuthWrapper OnEnhancedAuthWrapper
OnConnectedWrapper OnConnectedWrapper
OnReAuthWrapper OnReAuthWrapper
OnSessionCreatedWrapper OnSessionCreatedWrapper
OnSessionResumedWrapper OnSessionResumedWrapper
OnSessionTerminatedWrapper OnSessionTerminatedWrapper
OnSubscribeWrapper OnSubscribeWrapper
OnSubscribedWrapper OnSubscribedWrapper
OnUnsubscribeWrapper OnUnsubscribeWrapper
OnUnsubscribedWrapper OnUnsubscribedWrapper
OnMsgArrivedWrapper OnMsgArrivedWrapper
OnMsgDroppedWrapper OnMsgDroppedWrapper
OnDeliverWrapper OnDeliverWrapper
OnCloseWrapper OnCloseWrapper
OnAcceptWrapper OnAcceptWrapper
OnStopWrapper OnStopWrapper
}
// NewPlugin 是插件的构造函数
type NewPlugin func(config config.Config) (Plugable, error)
// Plugin 是所有插件都需要实现的接口
type Plugin interface {
// Load 会在server启动阶段被调用,可以看到通过这个方法,我们将扩展接口传给的插件。使插件具备调用扩展接口的能力。
Load(service Server) error
// Unload 会当server退出时调用,方便插件做一些cleanup。
Unload() error
// HookWrapper 返回插件需要向broker注册的钩子函数,如果该插件不需要注册任何钩子函数,返回空结构体。
HookWrapper() HookWrapper
// Name 返回插件的名称。
Name() string
}
关于如何实现一个插件,在详细的插件文档出炉之前,大家可以先参考内置的两个插件admin和prometheus。
Gmqtt默认使用内存存储,这也是gmqtt推荐的存储方式,内存存储具备绝佳的性能优势,但缺点是session信息会在broker重启后丢失。
如果你希望重启后session不丢失,可以配置redis持久化存储:
persistence:
type: redis # memory或者redis
redis:
# redis地址
addr: "127.0.0.1:6379"
# 连接池的最大空闲连接数
max_idle: 1000
# 连接池最大活跃连接数,0则表示不限制.
max_active: 0
# 空闲连接的超时时间,超时将关闭空闲连接
idle_timeout: 240s
password: ""
database: 0
如果你对本项目感兴趣,欢迎start支持一下,有问题issue随便提哦。