当前位置: 首页 > 工具软件 > Gmqtt > 使用案例 >

Gmqtt——Go语言实现的MQTT broker

郁宾鸿
2023-12-01

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是用于物联网(IoT)的OASIS标准消息传递协议。发布/订阅是连接远程消息传递设备的理想选择,因为它具有非常小的网络带宽。MQTT目前广泛应用于各种行业,如汽车、制造业、电信、石油和天然气等。

摘自 https://mqtt.org/

对于MQTT broker,目前主流的实现有EMQ,mosquito,HiveMQ等,但是并没有一个很完整的Go语言实现。目前的开源的Go实现对MQTT协议的支持基本上都是缺胳膊少腿,而Gmqtt完整的实现了MQTT V3.1.1和最新的V5协议,应该是Go语言中对MQTT协议支持最完整的项目。

项目地址: https://github.com/DrmagicE/gmqtt

Gmqtt的诞生是由于之前工作的项目需要,要在MQTT broker里面定制化许多业务逻辑,调研了一些broker都不尽满意,于是乎就撸起袖子自己干,造了这么一个轮子。起初只支持V3.1.1版本,但本着尽善尽美的原则(本人有强迫症),放弃了撸铁时间,肝了一段时间,把V5的特性也全部支持了。

快速开始

跟所有的Go项目一样,go get下载即可。

$ go get -u github.com/DrmagicE/gmqtt
$ cd cmd/gmqttd
$ go run . start -c default_config.yml
2020-12-13T23:11:54.037+0800    INFO    server/server.go:996    init plugin hook wrappers
2020-12-13T23:11:54.037+0800    INFO    server/server.go:802    open persistence succeeded      {"type": "memory"}
2020-12-13T23:11:54.037+0800    INFO    server/server.go:825    init session store succeeded    {"type": "memory", "session_total": 0}
2020-12-13T23:11:54.037+0800    INFO    server/server.go:842    init queue store succeeded      {"type": "memory", "session_total": 0}
2020-12-13T23:11:54.037+0800    INFO    server/server.go:843    init subscription store succeeded       {"type": "memory", "client_total": 0}
2020-12-13T23:11:54.037+0800    INFO    server/server.go:1218   loading plugin  {"name": "prometheus"}
2020-12-13T23:11:54.037+0800    INFO    server/server.go:1218   loading plugin  {"name": "admin"}
2020-12-13T23:11:54.038+0800    INFO    server/server.go:1259   starting gmqtt server   {"tcp server listen on": ["[::]:1883"], "websocket server listen on": [":8883"]}

使用上述的命令将使用默认配置default_config.yml启动gmqtt,监听1883端口提供TCP服务和8883端口提供websocket服务。Gmqtt默认配置没有启用鉴权,客户端不需配置鉴权可以直接连接。

特点

Gmqtt具备极强的扩展性,你几乎可以通过定制化插件来定制任何逻辑。例如通过HTTP/gRPC接口来查询客户端信息,强制断开连接,订阅主题,发布消息等等。这极强的扩展性得益于gmqtt提供的丰富的钩子函数,以及其内置的扩展接口。

钩子函数

目前,gmqtt提供了17个钩子函数。

hook说明用途示例
OnAcceptTCP连接建立时调用TCP连接限速,黑白名单等.
OnStopBroker退出时调用
OnSubscribe收到订阅请求时调用校验订阅是否合法
OnSubscribed订阅成功后调用统计订阅报文数量
OnUnsubscribe取消订阅时调用校验是否允许取消订阅
OnUnsubscribed取消订阅成功后调用统计订阅报文数
OnMsgArrived收到消息发布报文时调用校验发布权限,改写发布消息
OnBasicAuth收到连接请求报文时调用客户端连接鉴权
OnEnhancedAuth收到带有AuthMetho的连接请求报文时调用(V5特性)客户端连接鉴权
OnReAuth收到Auth报文时调用(V5特性)客户端连接鉴权
OnConnected客户端连接成功后调用统计在线客户端数量
OnSessionCreated客户端创建新session后调用统计session数量
OnSessionResumed客户端从旧session恢复后调用统计session数量
OnSessionTerminatedsession删除后调用统计session数量
OnDeliver消息从broker投递到客户端后调用
OnClosed客户端断开连接后调用统计在线客户端数量
OnMsgDropped消息被丢弃时调用

https://github.com/DrmagicE/gmqtt/blob/master/server/hook.go#L11

举其中常用的OnBasicAuth,OnSubscribe,OnMsgArrived为例,说明如何通过这些函数来定制化鉴权逻辑。
我们在内存中保存以下6个客户端的用户名密码。

var validUser = map[string]string{
	"root":           "pwd", // root用户拥有所有权限
	"qos0":           "pwd", // qos0用户最高只允许订阅qos0主题
	"qos1":           "pwd", // qos1用户最高只允许订阅qos1主题
	"publishonly":    "pwd", // publishonly用户只允许发布,不允许订阅
	"subscribeonly":  "pwd", // subscribeonly用户只允许订阅,不允许发布
	"disable_shared": "pwd", // disable_shared用户禁止订阅表示共享订阅的主题(V5特性)
}

除去以上的针对用户的权限设置外,假设我们由于性能因素的考虑,只允许发布QoS1的消息,忽略所有QoS2消息。

登录鉴权

//authentication
var onBasicAuth server.OnBasicAuth = func(ctx context.Context, client server.Client, req *server.ConnectRequest) error {
	username := string(req.Connect.Username)
	password := string(req.Connect.Password)
	// 校验用户名密码
	if validateUser(username, password) {
		if username == "disable_shared" {
			// 禁用共享订阅
			req.Options.SharedSubAvailable = false
		}
		return nil
	}
	// 检查客户端的版本,兼容V311和V5不同的错误码返回
	switch client.Version() {
	case packets.Version5:
		return codes.NewError(codes.BadUserNameOrPassword)
	case packets.Version311:
		return codes.NewError(codes.V3BadUsernameorPassword)
	}
	// 校验通过返回nil
	return nil
}

可以看到,在OnBasicAuth这个钩子函数中,我们能拿到鉴权所需的必要信息,例如username,password,除了这两个信息外,还有很多其他信息,例如clientID,IP地址等等,均可以用来作为鉴权的参数。如果判断鉴权失败,则返回MQTT定义的错误码。如果判断鉴权成功,返回nil即可。

订阅权限控制

// subscription acl
var onSubscribe server.OnSubscribe = func(ctx context.Context, client server.Client, req *server.SubscribeRequest) error {
	// 获取用户名。几乎在所有的钩子函数里,都可以获取客户端的必要信息
	username := client.ClientOptions().Username
	// 遍历当次订阅请求中的所有订阅消息
	for k,v := range req.Subscriptions {
		switch username {
		case "root":
			// 如果是root用户,他想订阅什么都可以
		case "qos0":
			// 如果是qos0用户,那么他最多只能订阅qos0等级
			req.GrantQoS(k, packets.Qos0)
		case "qos1":
			// 如果是qos1用户,最多只能订阅qos1等级
			if v.Sub.QoS > packets.Qos1 {
				req.GrantQoS(k, packets.Qos1)
			}
		case "publishonly":
			// 对于只允许发布的客户端,拒绝一切订阅
			req.Reject(k, &codes.Error{
				Code: codes.NotAuthorized,
				ErrorDetails: codes.ErrorDetails{
					ReasonString: []byte("publish only"),
				},
			})
		}
	}
	return nil
}

发布权限控制

var onMsgArrived server.OnMsgArrived = func(ctx context.Context, client server.Client, req *server.MsgArrivedRequest) error {
	version := client.Version()
	if client.ClientOptions().Username == "subscribeonly" {
		switch version {
		case packets.Version311:
			// 对于V311协议来说,如果服务端不允许客户端发布某条消息,由于没有任何通知机制,服务端只能选择回复一个正常的ACK。
			// 或者把客户端连接断开。[MQTT-3.3.5-2].
			// 我们丢弃这个报文。
			req.Drop()
			// 或者我们也可以强硬一点,直接把客户端连接断开
			// client.Close()
			return nil

		case packets.Version5:
			// 对于V5来说,V5引入了错误码的回复机制,因此我们可以回复一个错误码来告诉客户端没有权限。
			return &codes.Error{
				Code: codes.NotAuthorized,
			}
			// 或者你依然可以强硬一点,关闭客户端,但由于V5支持由服务端给客户端发Disconnect报文
			// 所以用client.Disconnect()来代替client.Close()对于V5来说是更好的选择
			//req.Drop()
			//client.Disconnect(&packets.Disconnect{
			//	Version: packets.Version5,
			//	Code:    codes.UnspecifiedError,
			//})
			//return
		}
	}

	if req.Message.QoS == packets.Qos2 {
		// 由于最高允许QoS1消息,这里丢弃所有QoS2消息
		req.Drop()
		return &codes.Error{
			Code: codes.NotAuthorized,
			ErrorDetails: codes.ErrorDetails{
				ReasonString: []byte("not authorized"),
				UserProperties: []struct {
					K []byte
					V []byte
				}{
					{
						K: []byte("user property key"),
						V: []byte("user property value"),
					},
				},
			},
		}
	}
	return nil
}

上述完整的代码可以在这里找到:
https://github.com/DrmagicE/gmqtt/blob/v0.2.2/examples/hook/main.go

扩展接口

https://github.com/DrmagicE/gmqtt/blob/v0.2.2/server/server.go#L80

// Server interface represents a mqtt server instance.
type Server interface {
	// Publisher 允许向broker发送MQTT消息
	Publisher() Publisher
	// GetConfig 返回当前配置文件
	GetConfig() config.Config
	// StatsManager 返回状态统计
	StatsManager() StatsReader
	...
	// ClientService 提供对客户端的查询,强制离线,强制清除session等操作。
	ClientService() ClientService
	// SubscriptionService 允许对订阅进行增删改查等操作。
	SubscriptionService() SubscriptionService
	// RetainedService 提供对保留消息的增删改查操作
	RetainedService() RetainedService
	...
}

Gmqtt提供了上述接口来提供扩展能力,这些扩展接口通常会被插件调用。可以看到,通过这些扩展接口,我们可以通过函数调用来向broker发消息,对主题进行增删改查,查询客户端连接等等功能。

插件机制

基于钩子函数和扩展接口,开发者可以通过编写插件来灵活扩展gmqtt的能力。目前gmqtt内置了三个插件,auth鉴权,prometheus监控以及adminAPI管理插件。插件相关接口定义:plugin.go

// HookWrapper groups all hook wrappers function
type HookWrapper struct {
	OnBasicAuthWrapper         OnBasicAuthWrapper
	OnEnhancedAuthWrapper      OnEnhancedAuthWrapper
	OnConnectedWrapper         OnConnectedWrapper
	OnReAuthWrapper            OnReAuthWrapper
	OnSessionCreatedWrapper    OnSessionCreatedWrapper
	OnSessionResumedWrapper    OnSessionResumedWrapper
	OnSessionTerminatedWrapper OnSessionTerminatedWrapper
	OnSubscribeWrapper         OnSubscribeWrapper
	OnSubscribedWrapper        OnSubscribedWrapper
	OnUnsubscribeWrapper       OnUnsubscribeWrapper
	OnUnsubscribedWrapper      OnUnsubscribedWrapper
	OnMsgArrivedWrapper        OnMsgArrivedWrapper
	OnMsgDroppedWrapper        OnMsgDroppedWrapper
	OnDeliverWrapper           OnDeliverWrapper
	OnCloseWrapper             OnCloseWrapper
	OnAcceptWrapper            OnAcceptWrapper
	OnStopWrapper              OnStopWrapper
}
// NewPlugin 是插件的构造函数
type NewPlugin func(config config.Config) (Plugable, error)

// Plugin 是所有插件都需要实现的接口
type Plugin interface {
	// Load 会在server启动阶段被调用,可以看到通过这个方法,我们将扩展接口传给的插件。使插件具备调用扩展接口的能力。
	Load(service Server) error
	// Unload 会当server退出时调用,方便插件做一些cleanup。
	Unload() error
	// HookWrapper 返回插件需要向broker注册的钩子函数,如果该插件不需要注册任何钩子函数,返回空结构体。
	HookWrapper() HookWrapper
	// Name 返回插件的名称。
	Name() string
}

关于如何实现一个插件,在详细的插件文档出炉之前,大家可以先参考内置的两个插件adminprometheus

支持session持久化

Gmqtt默认使用内存存储,这也是gmqtt推荐的存储方式,内存存储具备绝佳的性能优势,但缺点是session信息会在broker重启后丢失。

如果你希望重启后session不丢失,可以配置redis持久化存储:

persistence:
  type: redis # memory或者redis 
  redis:
    # redis地址
    addr: "127.0.0.1:6379"
    # 连接池的最大空闲连接数
    max_idle: 1000
    # 连接池最大活跃连接数,0则表示不限制.
    max_active: 0
    # 空闲连接的超时时间,超时将关闭空闲连接
    idle_timeout: 240s
    password: ""
    database: 0

缺陷

  • 集群模式还不支持。(下一步计划)

如果你对本项目感兴趣,欢迎start支持一下,有问题issue随便提哦。

 类似资料: