因为最近项目需要用到边缘计算,结合百度的openedge进行开发,openedge目前主要功能为结合docker容器实现边缘计算,具体内容官网很多,其架构中,openedge-hub作为所有模块的通信中心节点(消息的接收和转发)是非常重要的,本篇主要介绍一下openedge-hub模块的启动以及在QOS=0的情况下消息的发送和转发,本文主要是为了记录下思路方便后续改造,因个人水平有限,对于MQTT了解有限,很多名词使用不当,中间若有错误劳烦告知,谢谢!
openedge-hub的开始的节点是在openedge/openedge-hub/main.go文件进行的,main函数如下所示:
openedge.Run(func(ctx openedge.Context) error {
m := mo{log: ctx.Log()}
defer m.close()
err := m.start()
if err != nil {
return err
}
ctx.Wait()
return nil
})
其作用主要是启动mo对象,mo结构如下:
type mo struct {
cfg config.Config
Rules *rule.Manager
Sessions *session.Manager
broker *broker.Broker
servers *server.Manager
factory *persist.Factory
log logger.Logger
}
刚刚main文件中有一行代码是:err := m.start()
,这个就是启动的入口,接下来看一下这个start方法:
func (m *mo) start() error {
err := utils.LoadYAML(openedge.DefaultConfFile, &m.cfg)
if err != nil {
m.log.Errorln("failed to load config:", err.Error())
return err
}
m.factory, err = persist.NewFactory(m.cfg.Storage.Dir)
if err != nil {
m.log.Errorln("failed to new factory:", err.Error())
return err
}
m.broker, err = broker.NewBroker(&m.cfg, m.factory)
if err != nil {
m.log.Errorln("failed to new broker:", err.Error())
return err
}
m.Rules, err = rule.NewManager(m.cfg.Subscriptions, m.broker)
if err != nil {
m.log.Errorln("failed to new rule manager:", err.Error())
return err
}
m.Sessions, err = session.NewManager(&m.cfg, m.broker.Flow, m.Rules, m.factory)
if err != nil {
m.log.Errorln("failed to new session manager:", err.Error())
return err
}
m.servers, err = server.NewManager(m.cfg.Listen, m.cfg.Certificate, m.Sessions.Handle)
if err != nil {
m.log.Errorln("failed to new server manager:", err.Error())
return err
}
m.Rules.Start()
m.servers.Start()
return nil
}
总体上就是分别对mo的各个属性进行初始化,下面分别对每一个属性的初始化进行解析。
cfg加载在main.go中start方法的代码如下:
err := utils.LoadYAML(openedge.DefaultConfFile, &m.cfg)
if err != nil {
m.log.Errorln("failed to load config:", err.Error())
return err
}
其实就是把yaml对应的属性填充到mo的cfg中,下面介绍一下cfg这个结构体的一些配置项:
// Config all config of edge
type Config struct {
Listen []string `yaml:"listen" json:"listen"`
Certificate utils.Certificate `yaml:"certificate" json:"certificate"`
Principals []Principal `yaml:"principals" json:"principals" validate:"principals"`
Subscriptions []Subscription `yaml:"subscriptions" json:"subscriptions" validate:"subscriptions"`
Message Message `yaml:"message" json:"message"`
Status struct {
Logging struct {
Enable bool `yaml:"enable" json:"enable"`
Interval time.Duration `yaml:"interval" json:"interval" default:"1m"`
} `yaml:"logging" json:"logging"`
} `yaml:"status" json:"status"`
Storage struct {
Dir string `yaml:"dir" json:"dir" default:"var/db/openedge"`
} `yaml:"storage" json:"storage"`
Shutdown struct {
Timeout time.Duration `yaml:"timeout" json:"timeout" default:"10m"`
} `yaml:"shutdown" json:"shutdown"`
}
附上一个源码中示例的文件:
name: localhub
listen:
- tcp://0.0.0.0:1883
principals:
- username: 'test'
password: '9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08'
permissions:
- action: 'pub'
permit: ['#']
- action: 'sub'
permit: ['#']
subscriptions:
- source:
topic: 't'
target:
topic: 't/topic'
logger:
path: var/log/openedge/localhub/localhub.log
console: true
level: "debug"
通过加载配置文件后,cfg中就存储了这些信息。
使用淘宝的人既可以自己开店,也可以去买货,但是卖货的信息发布在什么地方呢?想要买东西从哪里浏览呢?当然是通过淘宝了,broker就是淘宝,所有连接到openedge-hub(后面简称为hub)的session既可以作为卖家发布消息,这个消息就发送到broker的channel(根据QOS不同对应了不同的channel)了,在生活中为了让买家能够买到称心如意的东西,淘宝一般会进行个性化推荐,在hub中每一个session会个性化的订阅主题,这时候路由器(Rule.Manager)就会把符合session需要的消息发送到session中了。
broker初始化的入口如下:
m.broker, err = broker.NewBroker(&m.cfg, m.factory)
if err != nil {
m.log.Errorln("failed to new broker:", err.Error())
return err
}
接下来进入到broker.NewBroker这个方法中:
// NewBroker NewBroker
func NewBroker(c *config.Config, pf *persist.Factory) (b *Broker, err error) {
···
b = &Broker{
config: c,
msgQ0Chan: make(chan *common.Message, c.Message.Ingress.Qos0.Buffer.Size),
msgQ1Chan: make(chan *common.Message, c.Message.Ingress.Qos1.Buffer.Size),
msgQ1DB: msgqos1DB,
offsetDB: offsetDB,
offsetChan: make(chan *Offset, c.Message.Offset.Buffer.Size),
log: logger.WithField("broker", "mqtt"),
}
···
return b, b.tomb.Gos(b.persistingMsgQos1, b.persistingOffset, b.cleaningMsgQos1)
}
“···”表示这里省略了部分代码
可以看到这里为broker注入了刚刚加载好的cfg,并且创建了两个channel(现在只以QOS0作为研究对象),然后返回这个broker,这个broker就初始化好了。
假设一下淘宝进行个性化推荐时候的做法(只是为了理解这么比喻一下),可以生成每一个人的特征,看看这个人喜欢什么,比如喜欢鞋子、外衣、化妆品,那么淘宝后台专门为他/她开启一个线程(只是这么说,不用较真),这个线程就用来不断从淘宝中获取新的鞋子、外衣、化妆品的商品,然后把它推送到用户的手机中,那么这么多线程不方便管理,弄一个“线程池”集中管理。回到hub中,每一个session订阅的主题相当于特征,这时候为每一个session添加一个rulebase就是相当于淘宝后台专门开启的线程,当rulebase接收到数据后推送到客户端。
Rules初始化的入口如下:
m.Rules, err = rule.NewManager(m.cfg.Subscriptions, m.broker)
if err != nil {
m.log.Errorln("failed to new rule manager:", err.Error())
return err
}
进入rule.NewManager方法中,如下:
// NewManager creates a new rule manager
func NewManager(c []config.Subscription, b broker) (*Manager, error) {
m := &Manager{
broker: b,
rules: cmap.New(),
trieq0: router.NewTrie(),
log: logger.WithField("manager", "rule"),
}
m.rules.Set(common.RuleMsgQ0, newRuleQos0(m.broker, m.trieq0))
m.rules.Set(common.RuleTopic, newRuleTopic(m.broker, m.trieq0))
for _, sub := range c {
err := m.AddSinkSub(common.RuleTopic, sub.Target.Topic, uint32(sub.Source.QOS), sub.Source.Topic, uint32(sub.Target.QOS), sub.Target.Topic)
if err != nil {
return nil, fmt.Errorf("failed to add subscription (%v): %s", sub.Source, err.Error())
}
}
if b.Config().Status.Logging.Enable {
return m, m.tomb.Gos(m.logging)
}
return m, nil
}
这个部分是较为核心的部分,首先初始化了rule.Manager的部分属性,broker就使用上一步创建的broker,rules是一个map,trieq0表示的是一个Trie树,Trie树代表的就是消息发布、订阅时这种转发的关系,这也是为什么NewManager这个方法为什么要带着Subscription这个参数了,就是为了构建这个Trie树。
Trie树上每一个仅表示一个节点的转发关系,比如在/a/b节点中,近对主题为/a/b的消息进行处理,至于在subscriptino参数中定义的转发关系如何实现,是由下面要介绍的RuleTopic进行处理的
下面有两行是构建两个比较特殊的rule,第一个是RuleMsgQ0,这个rule主要是为了监听来自broker的消息,并把消息发送至对应的sinksub上,第二个是RuleTopic,这个rule主要是用来进行在subscriptions中定义的转发关系进行转发,转发到对应的层。
sinksub就是用来表示订阅关系,sinksub内包含有sessoin(session对应的rule)中的channel,sinksub将消息发送到channel中,session从channel另一端读取数据数据,并发送至客户端。
sinksub中有两个属性对比一下,一个是topic,另一个是targettopic:topic就是订阅的主题,targettopic就是要发送的主题
sinksub中的channel就是来自sink中的msgchan
进入到初始化RuleMsgQ0的方法中:
func newRuleQos0(b broker, r *router.Trie) *rulebase {
return newRuleBase(common.RuleMsgQ0, false, b, r, nil, nil)
}
func newRuleBase(id string, persistent bool, b broker, r *router.Trie, publish, republish common.Publish) *rulebase {
···
rb := &rulebase{
id: id,
broker: b,
log: log,
}
···
rb.msgchan = newMsgChan(
b.Config().Message.Egress.Qos0.Buffer.Size,
b.Config().Message.Egress.Qos1.Buffer.Size,
publish,
republish,
b.Config().Message.Egress.Qos1.Retry.Interval,
b.Config().Shutdown.Timeout,
persist,
log,
)
rb.sink = newSink(id, b, r, rb.msgchan)
return rb
}
func newSink(id string, b broker, r *router.Trie, msgchan *msgchan) *sink {
s := &sink{
id: id,
broker: b,
trieq0: r,
trieq1: router.NewTrie(),
msgchan: msgchan,
log: logger.WithField("sink", id),
}
return s
}
msgchan可以理解为用于发送消息的channel
对照着参数,可以看到publish、republish传送过来均为nil,表明其实这个rule并不参与消息的转接(后面如果自己推敲一下,它同样会接收来自sinksub的消息,只不过因为没有在Trie树中注册sinksub,所以不会向这个channel发送数据罢了。
rule的publish方法就是用于在接收到来自sinksub从channel中发送的消息后进行的处理(普通的rulebase会发送回客户端,对于RuleTopic会向broker的channel转发消息)。
newRuleBase这个方法其实前面也没有什么,就是对rulebase的一个初始化,注意这里每一个rulebase都有一个自己的msgchan。这个方法的结尾有一个小点,生成一个newSink方法,里面对于这个sink赋予了来自rulebase的Trie树(其实这个Trie树应该是来自ruleManager的)、msgchan。
这样RuleMsgQ0就构造好了。
func newRuleTopic(b broker, r *router.Trie) *rulebase {
rb := newRuleBase(common.RuleTopic, true, b, r, nil, nil)
rb.msgchan.publish = rb.publish
return rb
}
与RuleMsgQ0非常相似,唯一不同的是这里为rulebase赋予了publish方法(这个publish最后给予了msgchan,这也是subscription属性定义的转发逻辑能够生效的关键),接下来就看看RuleTopic的publish方法:
func (r *rulebase) publish(msg common.Message) {
msg.QOS = msg.TargetQOS
msg.Topic = msg.TargetTopic
msg.SequenceID = 0
if msg.QOS == 1 {
msg.SetCallbackPID(0, func(_ uint32) { msg.Ack() })
}
r.broker.Flow(&msg)
}
就是把要发送的TargetTopic和TargetQOS转换为Topic和QOS属性,因为后面RuleMsgQ0获取消息后对每一层级进行消息转发时是按照这个topic(其实是因为对Trie树进行遍历查找的,所以这个topic与Trie树每一层的children map的key值相同)。
举个例子,我们subscription选项里面source是“t”,target是“t/topic”,那么在之前构造Trie树就是如下这个图:
上图的key是指在Trie树中每一层children map中的key值
那么在客户端进行了对t/topic订阅操作后:
这样,如果对主题“t“发送一个消息后,消息从session发送到broker的channel中,RuleMsgQ0从broker的channel读取数据,然后从root出发根据主题”t”遍历Trie树,找到了第一个key为t的节点(sinksub,注意这个sinksub属于RuleTopic),通过这个sinksub对其channel(属于RuleTopic的channel)发送这个消息,RuleTopic拿到这个消息后,进入其publish函数,publish函数把消息的topic设置为原来的targetTopic,然后发送到broker的channel中,RuleMsgQ0又从broker中读取了这个新消息,然后根据Trie树遍历,先找到key值为“t”的节点,然后在这个节点的children map中找寻key值为“topic”的节点,最终找到了这个key为topic的sinksub(注意,这个sinksub属于用户session对应的Rule),然后把消息发送给这个sinksub的channel,之后会从这个channel读取数据并发送到用户客户端。
RuleManager是一个比较重要的部分,其实消息在hub中的流动主要在RuleManager的rules中流动,进而再发送到客户端,至于是如何流动的,后面会详细说明
sessions加载入口如下:
// NewManager creates a session manager
func NewManager(conf *config.Config, flow common.Flow, rules *rule.Manager, pf *persist.Factory) (*Manager, error) {
···
return &Manager{
auth: auth.NewAuth(conf.Principals),
rules: rules,
flow: flow,
conf: &conf.Message,
recorder: newRecorder(sessionDB),
sessions: cmap.New(),
log: logger.WithField("manager", "session"),
}, nil
}
主要是对Manager的各个属性进行初始化工作,这里看到有一个sessions属性,用的是map存储管理客户端会话,还存储了一些消息的配置信息,以及前面刚刚初始化的RuleManager,除了这些,着重解析一下auth和flow属性。
auth从字面上很好理解,就是授权,我们进入到zhegeNewAuth方法中(这里只对普通的用户名密码授权进行解析):
// NewAuth creates auth
func NewAuth(principals []config.Principal) *Auth {
···
_accounts := make(map[string]account)
for _, principal := range principals {
authorizer := NewAuthorizer()
for _, p := range duplicatePubSubPermitRemove(principal.Permissions) {
for _, topic := range p.Permits {
authorizer.Add(topic, p.Action)
}
}
···
_accounts[principal.Username] = account{
Password: principal.Password,
Authorizer: authorizer,
}
···
}
return &Auth{certs: _certs, accounts: _accounts}
}
这个方法主要是遍历每一个principal,为其每一个生成authorizer,其中的duplicatePubSubPermitRemove方法是用于去除用户在pub、sub里面自定义中重复的主题名称,之后将这些sub、pub添加到authorizer中,之后把用户名、密码和authorizer信息存储到Auth中,并返回。
// Flow flows message to broker
func (b *Broker) Flow(msg *common.Message) {
···
select {
case b.msgQ0Chan <- msg:
case <-b.tomb.Dying():
b.log.Debugf("flow message (qos=0) failed since broker closed")
}
···
}
这个方法就是用来向broker发送消息的,当session要进行发布消息时,通过这个方法把消息发送到broker的channel中,再由RuleManager中的rules进行消息的处理。
servers的入口如下:
// NewManager creates a server manager
func NewManager(addrs []string, cert utils.Certificate, handle Handle) (*Manager, error) {
launcher, err := mqtt.NewLauncher(cert)
if err != nil {
return nil, err
}
m := &Manager{
servers: make([]transport.Server, 0),
handle: handle,
log: logger.WithField("manager", "server"),
}
for _, addr := range addrs {
svr, err := launcher.Launch(addr)
if err != nil {
m.Close()
return nil, err
}
m.servers = append(m.servers, svr)
}
return m, nil
}
因为hub不止可以监听一个地址(可以看一下yaml配置文件,里面的Listen是数组属性),这个ServerManager就是把所有的监听地址保存起来,并且,它还负责启动所有的Server(每一个server对应一个监听地址)。
在main.go的start方法最后调用了两个方法,
m.Rules.Start()
m.servers.Start()
就是分别启动RuleManager和SeverManager,下面分别解析一下:
进入RuleManager的start方法:
// Start starts all rules
func (m *Manager) Start() {
···
for item := range m.rules.IterBuffered() {
r := item.Val.(base)
if err := r.start(); err != nil {
m.log.WithError(err).Infof("failed to start rule (%s)", r.uid())
}
}
}
其实就是遍历所有的Rule,然后调用Rule的start方法:
func (r *rulebase) start() (err error) {
r.once.Do(func() {
err = r.msgchan.start()
···
err = r.sink.start()
···
})
return
}
这里面主要调用了两个方法一个是启动这个rule的msgchan,另一个是启动rule的sink。
func (c *msgchan) start() error {
···
return c.msgtomb.Gos(c.goProcessingQ0, c.goProcessingQ1)
}
msgchan启动时运行goProcessingQ0方法:
func (c *msgchan) goProcessingQ0() error {
···
loop:
for {
select {
case <-c.msgtomb.Dying():
break loop
case msg := <-c.msgq0:
c.process(msg)
}
}
···
}
func (c *msgchan) process(msg *common.Message) {
if msg.QOS == 0 {
c.publish(*msg)
return
}
···
}
这个方法就是为了接收来自msgchan中channel的消息,然后在process方法中处理,想想之前RuleTopic中设置了一个publish函数,就是在process里面调用的。想想RuleTopic的逻辑,它在一个未知(就是sink.start的方法)地方接收到了消息,然后发送到自身的channel(RuleTopic对应的msgchan拥有的channel),之后在这个goProcessingQ0方法中接收到了这个消息,然后调用publish方法,把修改后的Message发送到broker的channel中。
sink启动的代码如下:
func (s *sink) start() error {
if s.id == common.RuleMsgQ0 {
return s.tomb.Gos(s.goRoutingQ0)
}
···
}
//Blank: 就是为了数据进行路由,转发
func (s *sink) goRoutingQ0() error {
···
var msg *common.Message
for {
select {
case <-s.tomb.Dying():
return nil
case msg = <-s.broker.MsgQ0Chan():
matches := s.trieq0.MatchUnique(msg.Topic)
for _, sub := range matches {
sub.Flow(*msg)
}
}
}
}
这个start方法(只在QOS为0的层面)只对RuleMsgQ0有用,调用了goRoutingQ0方法。goRoutingQ0方法就是接收来自broker的channel中的方法(在上面提到的,RuleTopic把修改后的消息发送到broker后,消息就是在这里被消费的),里面调用了一个方法,MatchUnique,这个方法就是为了从Trie树中拿到对应主题的subsink,然后调用subsink的Flow方法。
这里先顺一下思路,其实这个goRoutingQ0只是为了从broker中拿到消息,然后根据消息的topic匹配到subsink,通过subsink将消息发送到session中,这也就是为什么发布的消息(QOS=0)能够发送到相对应的客户端中(订阅了该主题,或者在subscription中定义的内容)。因为这个方法其实只对RuleMsgQ0有用,所以这也是hub中预先定义这个Rule的原因了。其实如果不需要subscription定义的关系的话,那么不启动RuleTopic也是可以的。把RuleManager中NewManager的
m.rules.Set(common.RuleTopic, newRuleTopic(m.broker, m.trieq0))
及后面对Trie树存放sinksub的循环删掉后,就将subscription定义的内容无效掉了。
接下来看一下subsink的Flow方法:
// Flow flows message
func (s *sinksub) Flow(msg common.Message) {
// set target topic
if s.ttopic != "" {
msg.TargetTopic = s.ttopic
} else {
msg.TargetTopic = msg.Topic
}
···
if sqos == 0 {
msg.TargetQOS = 0
s.channel.putQ0(&msg)
} else {
msg.TargetQOS = s.tqos
s.channel.putQ1(&msg)
}
}
第一个if-else就是为了对Message设置TargetTopic,这一点主要是为RuleTopic用的,因为RuleTopic接收到消息后,要把消息的topic属性设置为TargetTopic对应的值,所以在这里提前把Message的TargetTopic设置为sinksub对应的TargetTopic(因为从客户端发送过来的消息是没有TargetTopic的,这里要设置一下。sinksub本身就是作为转发(从一个主题到另一个主题)的关系表示,如果这个sinksub表示的是RuleTopic创建的话,本身就代表这个消息(这个主题的消息)就要发送到RuleTopic定义的主题中(TargetTopic),所以这里要把TargetTopic设置为与sinksub一样,这样消息在之后才能按照subscriptions定义的进行转发。
在第二个if-else中,使用sinksub对应的channel发送这个消息,sinksub对应的channel是sink中的msgchan(再往深了看,其实是Rule的msgchan),这样把消息发送后,在刚刚msgchan启动中提到的goProcessingQ0方法中一直阻塞在等待消息的地方case msg := <-c.msgq0:
就获取到了消息,然后把消息使用process方法进行处理:
subscriptions:
- source:
topic: 't'
target:
topic: 't/topic'
- source:
topic: 't/topic'
target:
topic: 't/topic/a'
ServerManager启动方法如下:
// Start starts all servers
func (m *Manager) Start() {
for _, item := range m.servers {
svr := item
m.tomb.Go(func() error {
for {
conn, err := svr.Accept()
···
go m.handle(conn)
}
})
}
}
这个方法主要是对每一个监听Server(之前在ServerManager中定义的Server)监听连接请求,收到新的请求后,调用handle方法进行处理:
// Handle handles connection
func (m *Manager) Handle(conn transport.Conn) {
defer conn.Close()
conn.SetReadLimit(int64(m.conf.Length.Max))
newSession(conn, m).Handle()
}
func newSession(conn transport.Conn, manager *Manager) *session {
return &session{
conn: conn,
manager: manager,
subs: make(map[string]packet.Subscription),
pids: common.NewPacketIDS(),
log: logger.WithField("mqtt", "session"),
permittedPublishTopics: make(map[string]struct{}),
}
}
上面两个方法主要是创建一个新的会话session,至于session是如何初始化的,是在Session的handle方法中:
// Handle handles mqtt connection
func (s *session) Handle() {
var err error
var pkt packet.Generic
for {
pkt, err = s.conn.Receive()
···
switch p := pkt.(type) {
case *packet.Connect:
···
case *packet.Publish:
···
case *packet.Puback:
···
case *packet.Subscribe:
···
case *packet.Pingreq:
···
case *packet.Pingresp:
···
case *packet.Disconnect:
···
return
case *packet.Unsubscribe:
···
default:
···
}
···
}
}
也就是Session监听收到的packet,然后依据packet的种类,采取不同的处理逻辑,之前提到的Rule的创建、Authorize就是在Connect这里面处理的,这个在下一篇文章中进行讲解。