当前位置: 首页 > 工具软件 > OpenEdge > 使用案例 >

openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1)

万俟高峻
2023-12-01

前言

因为最近项目需要用到边缘计算,结合百度的openedge进行开发,openedge目前主要功能为结合docker容器实现边缘计算,具体内容官网很多,其架构中,openedge-hub作为所有模块的通信中心节点(消息的接收和转发)是非常重要的,本篇主要介绍一下openedge-hub模块的启动以及在QOS=0的情况下消息的发送和转发,本文主要是为了记录下思路方便后续改造,因个人水平有限,对于MQTT了解有限,很多名词使用不当,中间若有错误劳烦告知,谢谢!

openedge-hub的启动

openedge-hub的开始的节点是在openedge/openedge-hub/main.go文件进行的,main函数如下所示:

	openedge.Run(func(ctx openedge.Context) error {
		m := mo{log: ctx.Log()}
		defer m.close()
		err := m.start()
		if err != nil {
			return err
		}
		ctx.Wait()
		return nil
	})

其作用主要是启动mo对象,mo结构如下:

type mo struct {
	cfg      config.Config
	Rules    *rule.Manager
	Sessions *session.Manager
	broker   *broker.Broker
	servers  *server.Manager
	factory  *persist.Factory
	log      logger.Logger
}
  1. cfg是读取配置文件后保存配置的实体。
  2. Rules是消息转发使得“路由器”。
  3. Sessions保存着所有客户端的连接。
  4. broker中间带有channel,用于接收来自session发送的消息,并通过这个channel发送至路由器(这里说的有点欠妥,后面会讲到的),路由器找到对应的session,并将消息发送到session对应的客户端。
  5. factory是用于进行持久化的,暂不做解析。
  6. log进行日志的记录,暂不做解析。

刚刚main文件中有一行代码是:err := m.start(),这个就是启动的入口,接下来看一下这个start方法:

func (m *mo) start() error {
	err := utils.LoadYAML(openedge.DefaultConfFile, &m.cfg)
	if err != nil {
		m.log.Errorln("failed to load config:", err.Error())
		return err
	}
	m.factory, err = persist.NewFactory(m.cfg.Storage.Dir)
	if err != nil {
		m.log.Errorln("failed to new factory:", err.Error())
		return err
	}
	m.broker, err = broker.NewBroker(&m.cfg, m.factory)
	if err != nil {
		m.log.Errorln("failed to new broker:", err.Error())
		return err
	}
	m.Rules, err = rule.NewManager(m.cfg.Subscriptions, m.broker)
	if err != nil {
		m.log.Errorln("failed to new rule manager:", err.Error())
		return err
	}
	m.Sessions, err = session.NewManager(&m.cfg, m.broker.Flow, m.Rules, m.factory)
	if err != nil {
		m.log.Errorln("failed to new session manager:", err.Error())
		return err
	}
	m.servers, err = server.NewManager(m.cfg.Listen, m.cfg.Certificate, m.Sessions.Handle)
	if err != nil {
		m.log.Errorln("failed to new server manager:", err.Error())
		return err
	}
	m.Rules.Start()
	m.servers.Start()
	return nil
}

总体上就是分别对mo的各个属性进行初始化,下面分别对每一个属性的初始化进行解析。

cfg加载

cfg加载在main.go中start方法的代码如下:

err := utils.LoadYAML(openedge.DefaultConfFile, &m.cfg)
if err != nil {
	m.log.Errorln("failed to load config:", err.Error())
	return err
}

其实就是把yaml对应的属性填充到mo的cfg中,下面介绍一下cfg这个结构体的一些配置项:

// Config all config of edge
type Config struct {
	Listen      []string          `yaml:"listen" json:"listen"`
	Certificate utils.Certificate `yaml:"certificate" json:"certificate"`

	Principals    []Principal    `yaml:"principals" json:"principals" validate:"principals"`
	Subscriptions []Subscription `yaml:"subscriptions" json:"subscriptions" validate:"subscriptions"`

	Message Message `yaml:"message" json:"message"`
	Status  struct {
		Logging struct {
			Enable   bool          `yaml:"enable" json:"enable"`
			Interval time.Duration `yaml:"interval" json:"interval" default:"1m"`
		} `yaml:"logging" json:"logging"`
	} `yaml:"status" json:"status"`
	Storage struct {
		Dir string `yaml:"dir" json:"dir" default:"var/db/openedge"`
	} `yaml:"storage" json:"storage"`
	Shutdown struct {
		Timeout time.Duration `yaml:"timeout" json:"timeout" default:"10m"`
	} `yaml:"shutdown" json:"shutdown"`
}

附上一个源码中示例的文件:

name: localhub
listen:
- tcp://0.0.0.0:1883
principals:
- username: 'test'
  password: '9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08'
  permissions:
  - action: 'pub'
    permit: ['#']
  - action: 'sub'
    permit: ['#']
subscriptions:
- source:
    topic: 't'
  target:
    topic: 't/topic'
logger:
  path: var/log/openedge/localhub/localhub.log
  console: true
  level: "debug"
  • name是模块的名称,localhub的模块起名为localhub。
  • listen是用于开启服务器后监听的地址(客户端连接的地址)。
  • principals存放客户端连接的用户名、密码(密码是使用SHA-256加密的,客户端连接的时候使用解密后的明文密码)、权限等相关信息。
  • subscriptions用于存放消息转发(订阅)的路由规则,source是发布消息时的地址,target是转发的地址(比如A发送的“t”主题消息就会转发到订阅“t/topic”主题的客户端中。
  • logger是配置日志相关信息。

通过加载配置文件后,cfg中就存储了这些信息。

broker加载

使用淘宝的人既可以自己开店,也可以去买货,但是卖货的信息发布在什么地方呢?想要买东西从哪里浏览呢?当然是通过淘宝了,broker就是淘宝,所有连接到openedge-hub(后面简称为hub)的session既可以作为卖家发布消息,这个消息就发送到broker的channel(根据QOS不同对应了不同的channel)了,在生活中为了让买家能够买到称心如意的东西,淘宝一般会进行个性化推荐,在hub中每一个session会个性化的订阅主题,这时候路由器(Rule.Manager)就会把符合session需要的消息发送到session中了。

broker初始化的入口如下:

	m.broker, err = broker.NewBroker(&m.cfg, m.factory)
	if err != nil {
		m.log.Errorln("failed to new broker:", err.Error())
		return err
	}

接下来进入到broker.NewBroker这个方法中:

// NewBroker NewBroker
func NewBroker(c *config.Config, pf *persist.Factory) (b *Broker, err error) {
	···
	b = &Broker{
		config:     c,
		msgQ0Chan:  make(chan *common.Message, c.Message.Ingress.Qos0.Buffer.Size),
		msgQ1Chan:  make(chan *common.Message, c.Message.Ingress.Qos1.Buffer.Size),
		msgQ1DB:    msgqos1DB,
		offsetDB:   offsetDB,
		offsetChan: make(chan *Offset, c.Message.Offset.Buffer.Size),
		log:        logger.WithField("broker", "mqtt"),
	}
	···
	return b, b.tomb.Gos(b.persistingMsgQos1, b.persistingOffset, b.cleaningMsgQos1)
}

“···”表示这里省略了部分代码

可以看到这里为broker注入了刚刚加载好的cfg,并且创建了两个channel(现在只以QOS0作为研究对象),然后返回这个broker,这个broker就初始化好了。

Rules加载

假设一下淘宝进行个性化推荐时候的做法(只是为了理解这么比喻一下),可以生成每一个人的特征,看看这个人喜欢什么,比如喜欢鞋子、外衣、化妆品,那么淘宝后台专门为他/她开启一个线程(只是这么说,不用较真),这个线程就用来不断从淘宝中获取新的鞋子、外衣、化妆品的商品,然后把它推送到用户的手机中,那么这么多线程不方便管理,弄一个“线程池”集中管理。回到hub中,每一个session订阅的主题相当于特征,这时候为每一个session添加一个rulebase就是相当于淘宝后台专门开启的线程,当rulebase接收到数据后推送到客户端。

Rules初始化的入口如下:

	m.Rules, err = rule.NewManager(m.cfg.Subscriptions, m.broker)
	if err != nil {
		m.log.Errorln("failed to new rule manager:", err.Error())
		return err
	}

进入rule.NewManager方法中,如下:

// NewManager creates a new rule manager
func NewManager(c []config.Subscription, b broker) (*Manager, error) {
	m := &Manager{
		broker: b,
		rules:  cmap.New(),
		trieq0: router.NewTrie(),
		log:    logger.WithField("manager", "rule"),
	}
	
	m.rules.Set(common.RuleMsgQ0, newRuleQos0(m.broker, m.trieq0))
	m.rules.Set(common.RuleTopic, newRuleTopic(m.broker, m.trieq0))
	for _, sub := range c {
		err := m.AddSinkSub(common.RuleTopic, sub.Target.Topic, uint32(sub.Source.QOS), sub.Source.Topic, uint32(sub.Target.QOS), sub.Target.Topic)
		if err != nil {
			return nil, fmt.Errorf("failed to add subscription (%v): %s", sub.Source, err.Error())
		}
	}
	if b.Config().Status.Logging.Enable {
		return m, m.tomb.Gos(m.logging)
	}
	return m, nil
}

这个部分是较为核心的部分,首先初始化了rule.Manager的部分属性,broker就使用上一步创建的broker,rules是一个map,trieq0表示的是一个Trie树,Trie树代表的就是消息发布、订阅时这种转发的关系,这也是为什么NewManager这个方法为什么要带着Subscription这个参数了,就是为了构建这个Trie树。

Trie树上每一个仅表示一个节点的转发关系,比如在/a/b节点中,近对主题为/a/b的消息进行处理,至于在subscriptino参数中定义的转发关系如何实现,是由下面要介绍的RuleTopic进行处理的

下面有两行是构建两个比较特殊的rule,第一个是RuleMsgQ0,这个rule主要是为了监听来自broker的消息,并把消息发送至对应的sinksub上,第二个是RuleTopic,这个rule主要是用来进行在subscriptions中定义的转发关系进行转发,转发到对应的层。

sinksub就是用来表示订阅关系,sinksub内包含有sessoin(session对应的rule)中的channel,sinksub将消息发送到channel中,session从channel另一端读取数据数据,并发送至客户端。
sinksub中有两个属性对比一下,一个是topic,另一个是targettopic:topic就是订阅的主题,targettopic就是要发送的主题
sinksub中的channel就是来自sink中的msgchan

RuleMsgQ0

进入到初始化RuleMsgQ0的方法中:

func newRuleQos0(b broker, r *router.Trie) *rulebase {
	return newRuleBase(common.RuleMsgQ0, false, b, r, nil, nil)
}

func newRuleBase(id string, persistent bool, b broker, r *router.Trie, publish, republish common.Publish) *rulebase {
	···
	rb := &rulebase{
		id:     id,
		broker: b,
		log:    log,
	}
	···
	rb.msgchan = newMsgChan(
		b.Config().Message.Egress.Qos0.Buffer.Size,
		b.Config().Message.Egress.Qos1.Buffer.Size,
		publish,
		republish,
		b.Config().Message.Egress.Qos1.Retry.Interval,
		b.Config().Shutdown.Timeout,
		persist,
		log,
	)
	rb.sink = newSink(id, b, r, rb.msgchan)
	return rb
}

func newSink(id string, b broker, r *router.Trie, msgchan *msgchan) *sink {
	s := &sink{
		id:      id,
		broker:  b,
		trieq0:  r,
		trieq1:  router.NewTrie(),
		msgchan: msgchan,
		log:     logger.WithField("sink", id),
	}
	return s
}

msgchan可以理解为用于发送消息的channel

对照着参数,可以看到publish、republish传送过来均为nil,表明其实这个rule并不参与消息的转接(后面如果自己推敲一下,它同样会接收来自sinksub的消息,只不过因为没有在Trie树中注册sinksub,所以不会向这个channel发送数据罢了。

rule的publish方法就是用于在接收到来自sinksub从channel中发送的消息后进行的处理(普通的rulebase会发送回客户端,对于RuleTopic会向broker的channel转发消息)。

newRuleBase这个方法其实前面也没有什么,就是对rulebase的一个初始化,注意这里每一个rulebase都有一个自己的msgchan。这个方法的结尾有一个小点,生成一个newSink方法,里面对于这个sink赋予了来自rulebase的Trie树(其实这个Trie树应该是来自ruleManager的)、msgchan。
这样RuleMsgQ0就构造好了。

RuleTopic

func newRuleTopic(b broker, r *router.Trie) *rulebase {
	rb := newRuleBase(common.RuleTopic, true, b, r, nil, nil)
	rb.msgchan.publish = rb.publish
	return rb
}

与RuleMsgQ0非常相似,唯一不同的是这里为rulebase赋予了publish方法(这个publish最后给予了msgchan,这也是subscription属性定义的转发逻辑能够生效的关键),接下来就看看RuleTopic的publish方法:

func (r *rulebase) publish(msg common.Message) {
	msg.QOS = msg.TargetQOS
	msg.Topic = msg.TargetTopic
	msg.SequenceID = 0
	if msg.QOS == 1 {
		msg.SetCallbackPID(0, func(_ uint32) { msg.Ack() })
	}
	r.broker.Flow(&msg)
}

就是把要发送的TargetTopic和TargetQOS转换为Topic和QOS属性,因为后面RuleMsgQ0获取消息后对每一层级进行消息转发时是按照这个topic(其实是因为对Trie树进行遍历查找的,所以这个topic与Trie树每一层的children map的key值相同)。
举个例子,我们subscription选项里面source是“t”,target是“t/topic”,那么在之前构造Trie树就是如下这个图:

root
key : t topic : t targetTopic : t/topic

上图的key是指在Trie树中每一层children map中的key值

那么在客户端进行了对t/topic订阅操作后:

root
key : t topic : t targetTopic : t/topic
key : topic topic : topic

这样,如果对主题“t“发送一个消息后,消息从session发送到broker的channel中,RuleMsgQ0从broker的channel读取数据,然后从root出发根据主题”t”遍历Trie树,找到了第一个key为t的节点(sinksub,注意这个sinksub属于RuleTopic),通过这个sinksub对其channel(属于RuleTopic的channel)发送这个消息,RuleTopic拿到这个消息后,进入其publish函数,publish函数把消息的topic设置为原来的targetTopic,然后发送到broker的channel中,RuleMsgQ0又从broker中读取了这个新消息,然后根据Trie树遍历,先找到key值为“t”的节点,然后在这个节点的children map中找寻key值为“topic”的节点,最终找到了这个key为topic的sinksub(注意,这个sinksub属于用户session对应的Rule),然后把消息发送给这个sinksub的channel,之后会从这个channel读取数据并发送到用户客户端。

RuleManager是一个比较重要的部分,其实消息在hub中的流动主要在RuleManager的rules中流动,进而再发送到客户端,至于是如何流动的,后面会详细说明

sessions加载

sessions加载入口如下:

// NewManager creates a session manager
func NewManager(conf *config.Config, flow common.Flow, rules *rule.Manager, pf *persist.Factory) (*Manager, error) {
	···
	return &Manager{
		auth:     auth.NewAuth(conf.Principals),
		rules:    rules,
		flow:     flow,
		conf:     &conf.Message,
		recorder: newRecorder(sessionDB),
		sessions: cmap.New(),
		log:      logger.WithField("manager", "session"),
	}, nil
}

主要是对Manager的各个属性进行初始化工作,这里看到有一个sessions属性,用的是map存储管理客户端会话,还存储了一些消息的配置信息,以及前面刚刚初始化的RuleManager,除了这些,着重解析一下auth和flow属性。

auth

auth从字面上很好理解,就是授权,我们进入到zhegeNewAuth方法中(这里只对普通的用户名密码授权进行解析):

// NewAuth creates auth
func NewAuth(principals []config.Principal) *Auth {
	···
	_accounts := make(map[string]account)
	for _, principal := range principals {
		authorizer := NewAuthorizer()
		for _, p := range duplicatePubSubPermitRemove(principal.Permissions) {
			for _, topic := range p.Permits {
				authorizer.Add(topic, p.Action)
			}
		}
		···
		_accounts[principal.Username] = account{
			Password:   principal.Password,
			Authorizer: authorizer,
		}	
		···
	}
	return &Auth{certs: _certs, accounts: _accounts}
}

这个方法主要是遍历每一个principal,为其每一个生成authorizer,其中的duplicatePubSubPermitRemove方法是用于去除用户在pub、sub里面自定义中重复的主题名称,之后将这些sub、pub添加到authorizer中,之后把用户名、密码和authorizer信息存储到Auth中,并返回。

Flow

// Flow flows message to broker
func (b *Broker) Flow(msg *common.Message) {
	···
	select {
	case b.msgQ0Chan <- msg:
	case <-b.tomb.Dying():
		b.log.Debugf("flow message (qos=0) failed since broker closed")
	}
	···
}

这个方法就是用来向broker发送消息的,当session要进行发布消息时,通过这个方法把消息发送到broker的channel中,再由RuleManager中的rules进行消息的处理。

servers加载

servers的入口如下:

// NewManager creates a server manager
func NewManager(addrs []string, cert utils.Certificate, handle Handle) (*Manager, error) {
	launcher, err := mqtt.NewLauncher(cert)
	if err != nil {
		return nil, err
	}
	m := &Manager{
		servers: make([]transport.Server, 0),
		handle:  handle,
		log:     logger.WithField("manager", "server"),
	}
	for _, addr := range addrs {
		svr, err := launcher.Launch(addr)
		if err != nil {
			m.Close()
			return nil, err
		}
		m.servers = append(m.servers, svr)
	}
	return m, nil
}

因为hub不止可以监听一个地址(可以看一下yaml配置文件,里面的Listen是数组属性),这个ServerManager就是把所有的监听地址保存起来,并且,它还负责启动所有的Server(每一个server对应一个监听地址)。

hub启动

在main.go的start方法最后调用了两个方法,

m.Rules.Start()
m.servers.Start()

就是分别启动RuleManager和SeverManager,下面分别解析一下:

RuleManager启动

进入RuleManager的start方法:

// Start starts all rules
func (m *Manager) Start() {
	···
	for item := range m.rules.IterBuffered() {
		r := item.Val.(base)
		if err := r.start(); err != nil {
			m.log.WithError(err).Infof("failed to start rule (%s)", r.uid())
		}
	}
}

其实就是遍历所有的Rule,然后调用Rule的start方法:

func (r *rulebase) start() (err error) {
	
	r.once.Do(func() {
		err = r.msgchan.start()
		···
		err = r.sink.start()
		···
	})
	return
}

这里面主要调用了两个方法一个是启动这个rule的msgchan,另一个是启动rule的sink。

msgchan启动

func (c *msgchan) start() error {
	···
	return c.msgtomb.Gos(c.goProcessingQ0, c.goProcessingQ1)
}

msgchan启动时运行goProcessingQ0方法:

func (c *msgchan) goProcessingQ0() error {
···
loop:
	for {
		select {
		case <-c.msgtomb.Dying():
			break loop
		case msg := <-c.msgq0:
			c.process(msg)
		}
	}
	···
}

func (c *msgchan) process(msg *common.Message) {
	if msg.QOS == 0 {
		c.publish(*msg)
		return
	}
	···
}

这个方法就是为了接收来自msgchan中channel的消息,然后在process方法中处理,想想之前RuleTopic中设置了一个publish函数,就是在process里面调用的。想想RuleTopic的逻辑,它在一个未知(就是sink.start的方法)地方接收到了消息,然后发送到自身的channel(RuleTopic对应的msgchan拥有的channel),之后在这个goProcessingQ0方法中接收到了这个消息,然后调用publish方法,把修改后的Message发送到broker的channel中。

sink启动

sink启动的代码如下:

func (s *sink) start() error {
	if s.id == common.RuleMsgQ0 {
		return s.tomb.Gos(s.goRoutingQ0)
	}
	···
}

//Blank: 就是为了数据进行路由,转发
func (s *sink) goRoutingQ0() error {
	···
	var msg *common.Message
	for {
		select {
		case <-s.tomb.Dying():
			return nil
		case msg = <-s.broker.MsgQ0Chan():
			matches := s.trieq0.MatchUnique(msg.Topic)
			for _, sub := range matches {
				sub.Flow(*msg)
			}
		}
	}
}

这个start方法(只在QOS为0的层面)只对RuleMsgQ0有用,调用了goRoutingQ0方法。goRoutingQ0方法就是接收来自broker的channel中的方法(在上面提到的,RuleTopic把修改后的消息发送到broker后,消息就是在这里被消费的),里面调用了一个方法,MatchUnique,这个方法就是为了从Trie树中拿到对应主题的subsink,然后调用subsink的Flow方法。

这里先顺一下思路,其实这个goRoutingQ0只是为了从broker中拿到消息,然后根据消息的topic匹配到subsink,通过subsink将消息发送到session中,这也就是为什么发布的消息(QOS=0)能够发送到相对应的客户端中(订阅了该主题,或者在subscription中定义的内容)。因为这个方法其实只对RuleMsgQ0有用,所以这也是hub中预先定义这个Rule的原因了。其实如果不需要subscription定义的关系的话,那么不启动RuleTopic也是可以的。把RuleManager中NewManager的m.rules.Set(common.RuleTopic, newRuleTopic(m.broker, m.trieq0)) 及后面对Trie树存放sinksub的循环删掉后,就将subscription定义的内容无效掉了。

接下来看一下subsink的Flow方法:

// Flow flows message
func (s *sinksub) Flow(msg common.Message) {
	// set target topic
	if s.ttopic != "" {
		msg.TargetTopic = s.ttopic
	} else {
		msg.TargetTopic = msg.Topic
	}
	···
	if sqos == 0 {
		msg.TargetQOS = 0
		s.channel.putQ0(&msg)
	} else {
		msg.TargetQOS = s.tqos
		s.channel.putQ1(&msg)
	}
}

第一个if-else就是为了对Message设置TargetTopic,这一点主要是为RuleTopic用的,因为RuleTopic接收到消息后,要把消息的topic属性设置为TargetTopic对应的值,所以在这里提前把Message的TargetTopic设置为sinksub对应的TargetTopic(因为从客户端发送过来的消息是没有TargetTopic的,这里要设置一下。sinksub本身就是作为转发(从一个主题到另一个主题)的关系表示,如果这个sinksub表示的是RuleTopic创建的话,本身就代表这个消息(这个主题的消息)就要发送到RuleTopic定义的主题中(TargetTopic),所以这里要把TargetTopic设置为与sinksub一样,这样消息在之后才能按照subscriptions定义的进行转发
在第二个if-else中,使用sinksub对应的channel发送这个消息,sinksub对应的channel是sink中的msgchan(再往深了看,其实是Rule的msgchan),这样把消息发送后,在刚刚msgchan启动中提到的goProcessingQ0方法中一直阻塞在等待消息的地方case msg := <-c.msgq0:就获取到了消息,然后把消息使用process方法进行处理:

  • 如果Rule是RuleTopic的话,会调用publish方法,把消息的Topic改为TargetTopic对应的值,然后发送到broker的channel中,之后RuleMsgQ0从这个channel获取到了数据,再重复上面找寻sinksub的过程(注意再次寻找后获得的sinksub不一定为session对应的Rule所创建的,有可能依然还是RuleTopic对应的sinksub,因为可能出现如下情况,这样就需要经过两次的RuleTopic的修改转发)
subscriptions:
  - source:
      topic: 't'
    target:
      topic: 't/topic'
  - source:
      topic: 't/topic'
    target:
      topic: 't/topic/a'
  • 如果Rule是普通Session中创建的话,那么就会调用客户端的处理方法(其实就是发送方法,具体这个方法是如何设置在sink中以后会写文章提到,这里简单说一下,这个方法位于openedge/openedge-hub/session/session_egress.go#publish),这个publish方法把这个消息发送到对应的客户端中。

ServerManager启动

ServerManager启动方法如下:

// Start starts all servers
func (m *Manager) Start() {
	for _, item := range m.servers {
		svr := item
		m.tomb.Go(func() error {
			for {
				conn, err := svr.Accept()
				···
				go m.handle(conn)
			}
		})
	}
}

这个方法主要是对每一个监听Server(之前在ServerManager中定义的Server)监听连接请求,收到新的请求后,调用handle方法进行处理:

// Handle handles connection
func (m *Manager) Handle(conn transport.Conn) {
	defer conn.Close()
	conn.SetReadLimit(int64(m.conf.Length.Max))
	newSession(conn, m).Handle()
}

func newSession(conn transport.Conn, manager *Manager) *session {
	return &session{
		conn:                   conn,
		manager:                manager,
		subs:                   make(map[string]packet.Subscription),
		pids:                   common.NewPacketIDS(),
		log:                    logger.WithField("mqtt", "session"),
		permittedPublishTopics: make(map[string]struct{}),
	}
}

上面两个方法主要是创建一个新的会话session,至于session是如何初始化的,是在Session的handle方法中:

// Handle handles mqtt connection
func (s *session) Handle() {
	var err error
	var pkt packet.Generic
	for {
		pkt, err = s.conn.Receive()
		···
		switch p := pkt.(type) {
		case *packet.Connect:
			···
		case *packet.Publish:
			···
		case *packet.Puback:
			···
		case *packet.Subscribe:
			···
		case *packet.Pingreq:
			···
		case *packet.Pingresp:
			···
		case *packet.Disconnect:
			···
			return
		case *packet.Unsubscribe:
			···
		default:
			···
		}
		···
	}
}

也就是Session监听收到的packet,然后依据packet的种类,采取不同的处理逻辑,之前提到的Rule的创建、Authorize就是在Connect这里面处理的,这个在下一篇文章中进行讲解。

 类似资料: