在openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1)一文中浅析了openedge-hub模块的启动过程,openedge-hub为每一个连接的请求创建一个实体——session,这个实体负责创建后会话的处理(这部分代码位于openedge/openedge-hub/session/session_handle.go),比如说处理连接、发布、发布确认、订阅、取消订阅、断开连接等请求,下面分别进行解析。
当接收到来自客户端的连接请求后,openedge-hub底层依赖的gomqtt库为其创建了一个连接,openedge-hub在其之上封装了session及其session相关的操作。
在创建了session后,还需要进行如下处理如下代码:
pkt, err = s.conn.Receive()
if err != nil {
if !s.tomb.Alive() {
return
}
s.log.WithError(err).Warnf("failed to reveive message")
s.close(true)
return
}
if _, ok := pkt.(*packet.Connect); !ok && s.authorizer == nil {
s.log.Errorf("only connect packet allowed before auth")
s.close(true)
return
}
这段代码是位于session_handle中的Handle方法中,也是后续所有种类packet处理逻辑的入口,主要是通过阻塞接收请求,并看一下这个会话有没有授权,代码中也有显示,只有Connect类型的请求才可以不需要授权,其他必须要求授权。
处理连接的入口是Handle方法的如下部分:
func (s *session) Handle() {
···
for {
pkt, err = s.conn.Receive()
···
switch p := pkt.(type) {
case *packet.Connect:
s.log.Debugln("received:", p.Type())
err = s.onConnect(p)
···
}
}
}
再继续看onConnect方法:
func (s *session) onConnect(p *packet.Connect) error {
···
authorizer := s.manager.auth.AuthenticateAccount(p.Username, p.Password)
if authorizer == nil {
s.sendConnack(packet.BadUsernameOrPassword)
return fmt.Errorf("username (%s) or password not permitted", p.Username)
}
s.authorizer = authorizer
p.Will != nil {
//处理一下will中的publish是否允许,与onPublish逻辑差不多,后面讲
···
}
var err error
s.clientID = p.ClientID
s.clean = p.CleanSession
if p.ClientID == "" {
s.id = common.PrefixTmp + uuid.Generate().String()
s.clean = true
} else {
s.id = common.PrefixSess + p.ClientID
}
err = s.manager.register(s)
···
err = s.sendConnack(packet.ConnectionAccepted)
if err != nil {
return err
}
err = s.manager.rules.StartRule(s.id)
···
return nil
}
这段代码分为三个部分,第一部分进行验证授权,第二个部分是向sessionManager和ruleManager注册(ruleManager注册的过程包括在sessionManager中),第三部分就是向客户端返回连接成功并开启rule。
在sessionManager进行初始化的时候,曾经创建过一个Auth对象,这个对象保存了用户名、密码、以及对应每个用户的发布、订阅的权限,AuthenticateAccount方法就是根据用户名、密码验证是不是正确,并返回相应的authorizer:
// AuthenticateAccount auth client account, then return authorizer if pass
func (a *Auth) AuthenticateAccount(username, password string) *Authorizer {
_account, ok := a.accounts[username]
if ok && len(password) > 0 && strings.Compare(encodePassword(password), _account.Password) == 0 {
return _account.Authorizer
}
return nil
}
authorizer中存储了该用户发布、订阅的权限,后面用户发布、订阅的时候都要用这个对象进行发布、订阅的权限验证。
顺便这里有一个p.will的处理,这个处理就是在连接的时候会发送可能订阅的一些内容,这里也需要这个authorizer进行权限的验证。
这里为session赋予id,就是一个前缀加上客户端id(如果没有客户端id就随机生成),然后向sessionManager进行注册:
// Called by session during onConnect
func (m *Manager) register(sess *session) error {
···
m.sessions.Set(sess.id, sess)
//Blank: 使得消息能够返回客户端的关键
return m.rules.AddRuleSess(sess.id, !sess.clean, sess.publish, sess.republish)
}
// AddRuleSess adds a new rule for session during running
func (m *Manager) AddRuleSess(id string, persistent bool, publish, republish common.Publish) error {
···
m.rules.Set(id, newRuleSess(id, persistent, m.broker, m.trieq0, publish, republish))
return nil
}
func newRuleSess(id string, p bool, b broker, r *router.Trie, publish, republish common.Publish) base {
return newRuleBase(id, p, b, r, publish, republish)
}
sessionManager注册就是向sessionManager中保存session的实例,之后调用ruleManager添加一个rule(可以看到这里最终调用的还是newRuleBase方法,与之前ruleManager创建RuleMsgQ0和RuleTopic的方法一样),这个rule的id就是session的id,这里要注意两个参数,publish、republish方法,这两个参数最终其实是传递到这个Rule的msgchan中:
func (s *session) publish(msg common.Message) {
pub := new(packet.Publish)
pub.Message.QOS = packet.QOS(msg.TargetQOS)
pub.Message.Topic = msg.TargetTopic
pub.Message.Payload = msg.Payload
pub.Message.Retain = msg.Retain
if msg.TargetQOS == 1 {
pid := s.pids.Set(&msg)
pub.ID = packet.ID(pid)
}
if err := s.send(pub, true); err != nil {
s.close(true)
}
}
上面的方法就是return m.rules.AddRuleSess(sess.id, !sess.clean, sess.publish, sess.republish)
语句中的publish参数,结合上一篇博客,很容易想到,因为RuleMsgQ0在接收到broker的消息后,会在Trie树中找到对应的sinksub ,sinksub中会进行封装,这里就把消息的TargetTopic赋值了,如果是在RuleTopic中的消息,TargetTopic会变成相应sinksub的TargetTopic(也就是进行转发的路径),如果不是在RuleTopic的话,那么TargetTopic就会与message的Topic相同:
// Flow flows message
func (s *sinksub) Flow(msg common.Message) {
// set target topic
if s.ttopic != "" {
msg.TargetTopic = s.ttopic
} else {
msg.TargetTopic = msg.Topic
}
···
}
不论怎样,最终到达publish方法的Message的TargetTopic就是要发送的Topic,比如说定义了/a的消息发送到/a/b中,那么最终到达publish方法中的Message的TargetTopic就是/a/b,publish方法最终会err := s.send(pub, true)
将消息发送到客户端。
首先到达这一步时会先向客户端发送连接通过的信息,之后开启这个客户端的rule,其实与开启RuleMsgQ0和RuleTopic的逻辑是相同的,其实都是开启msgchan,等待msgchan中channel的另一端发送数据,当接收到消息后会调用msgchan的process方法:
// ProcessingQ1 processing message with QOS=1
func (c *msgchan) goProcessingQ1() error {
···
loop:
for {
select {
case <-c.msgtomb.Dying():
break loop
case msg := <-c.msgq1:
c.process(msg)
}
}
···
}
func (c *msgchan) process(msg *common.Message) {
if msg.QOS == 0 {
c.publish(*msg)
return
}
···
}
可以看到接收到消息后就进入了process方法,process方法又会调用c.publish方法,这个方法就是刚刚创建rule时传入的那个publish方法。
当Handle方法接收到Publish类型的请求时,那么就进入了Publish的处理:
func (s *session) Handle() {
···
for {
pkt, err = s.conn.Receive()
···
switch p := pkt.(type) {
case *packet.Connect:
s.log.Debugln("received:", p.Type())
err = s.onConnect(p)
···
}
}
}
接下来进入onConnect方法:
func (s *session) onPublish(p *packet.Publish) error {
if _, ok := s.permittedPublishTopics[p.Message.Topic]; !ok {
// TODO: remove?
if !common.PubTopicValidate(p.Message.Topic) {
return fmt.Errorf("publish topic (%s) invalid", p.Message.Topic)
}
if !s.authorizer.Authorize(auth.Publish, p.Message.Topic) {
return fmt.Errorf("publish topic (%s) not permitted", p.Message.Topic)
}
s.permittedPublishTopics[p.Message.Topic] = struct{}{}
}
···
msg := common.NewMessage(uint32(p.Message.QOS), p.Message.Topic, p.Message.Payload, s.clientID)
···
s.manager.flow(msg)
return nil
}
首先进行鉴定这个主题的消息是否有权限进行发布,session中有一个属性permittedPublishTopics map[string]struct{}
就是用来存储已经授权可以发布的主题,如果这个主题是第一次发布,那么就进入第一个if-else中。
在这里面先通过PubTopicValidate方法鉴定要发布主题的名称是不是合法,看看其具体逻辑:
const (
// topic validate fields
MaxSlashCount = 8
MaxTopicNameLen = 255
// wildcard topic fields
TopicSeparator = "/"
SingleWildCard = "+"
MultipleWildCard = "#"
SysCmdPrefix = "$"
)
// PubTopicValidate validate MQTT publish topic
func PubTopicValidate(topic string) bool {
if topic == "" {
return false
}
if len(topic) > MaxTopicNameLen || strings.Contains(topic, "\u0000") ||
strings.Count(topic, TopicSeparator) > MaxSlashCount {
return false
}
if ContainsWildcard(topic) {
return false
}
if isSysTopic(topic) {
return false
}
return true
}
从上面的代码中可以看到一些规定:
- topic不能为空(“”)
- 长度不能超过255
- 使用“/“字符不能超过8次
- 不能包含“\u0000”字符
- 不能包含“+”字符
- 不能包含“$”字符
验证过发布主题的名称后,通过之前connect中为session添加的authorizer验证该主题是否具有发布的权限,如果具有发布的权限那么在permittedPublishTopics中记录下来,这样下次发布该主题的消息就不用再重新认证了。
验证过后,就将构建一个Message,然后将Message使用s.manager.flow
(调用session对应的sessionManager的flow方法)把进行消息发布:
// Flow flows message to broker
func (b *Broker) Flow(msg *common.Message) {
···
select {
case b.msgQ0Chan <- msg:
case <-b.tomb.Dying():
b.log.Debugf("flow message (qos=0) failed since broker closed")
}
}
这个方法是之前在构造sessionManager的时候赋予sessionManager的,也就是s.manager.flow
执行的方法,其目的就是向broker的channel(msgQ0Chan)发送消息,之后由RuleMsgQ0来进行消息的消费:
func (s *sink) goRoutingQ0() error {
var msg *common.Message
for {
select {
case <-s.tomb.Dying():
return nil
case msg = <-s.broker.MsgQ0Chan():
matches := s.trieq0.MatchUnique(msg.Topic)
for _, sub := range matches {
sub.Flow(*msg)
}
}
}
}
这个方法的具体逻辑在上一篇openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1)有详细说到,这里简单的说一下就是RuleMsgQ0在这个方法中读取消息后,遍历在Trie树中注册的sinksub,因为sinksub中拥有对应session的rule的msgchan,然后通过这个msgchan把相应消息发送到对应的sinksub,如果sinksub对应的是RuleTopic的话会依据配置文件中的subscription来修改消息,并将消息发送到broker中,继续循环上述的过程,如果sinksub对应的是session的rule的话,那么对应session的msgchan读取这个消息,并调用publish方法将消息发送回客户端。
当Handle方法接收到Subscribe类型的请求时,那么就进入了Subscribe的处理:
func (s *session) Handle() {
···
for {
pkt, err = s.conn.Receive()
···
switch p := pkt.(type) {
case *packet.Subscribe:
s.log.Debugf("received: %s, subs: %v", p.Type(), p.Subscriptions)
err = s.onSubscribe(p)
···
}
}
}
下面进入onSubscribe方法中:
func (s *session) onSubscribe(p *packet.Subscribe) error {
ack := packet.NewSuback()
rv := s.genSubAck(p.Subscriptions)
for i, sub := range p.Subscriptions {
if rv[i] == packet.QOSFailure {
s.log.Errorf("failed to subscribe topic (%s)", sub.Topic)
continue
}
if _, ok := s.subs[sub.Topic]; !ok {
err := s.manager.rules.AddSinkSub(s.id, s.id, uint32(sub.QOS), sub.Topic, uint32(sub.QOS), "")
···
s.log.Infof("topic (%s) subscribed", sub.Topic)
s.subs[sub.Topic] = sub
···
} else {
if s.subs[sub.Topic].QOS != sub.QOS {
//重新向Trie树中添加sinksub,之后移除原有的sinksub
···
}
}
}
ack.ID = p.ID
ack.ReturnCodes = rv
err := s.send(ack, true)
if err != nil {
return err
}
return s.sendRetainMessage(p)
}
这里构建了一个rv,这个主要是验证订阅的每一个subscription合法性,这里就不进入源码看了,直接翻译过来.
订阅时主题合法性检验:
- topic不能为“”
- 主题名称长度不能超过255
- 主题名称不能包含“\u0000”
- 主题名称中“/”不能超过8个
- 主题名称中的“#”只能位于最后以“/”后
- “#”字符不与其他字符混用,比如/ab#c、/c#、/#a都是错误的,/…/#正确
- “+”字符不与其他字符混用,比如/ab+c、/c+、/+c都是错误的,/…/+/…正确
在genSubAck方法中对于不符合上述规定的subscription都设置rv[index]=QOSFailure
,之后对于通过合法性检验的subscription向RuleManager中添加sinkSub,这样就把sinkSub添加到RuleManager中(具体添加过程,在上一篇博客中详细讲到),这样在RuleMsgQ0从broker的channel中读取到数据后就可以根据订阅的主题找到对应的sinksub,并且发送消息了。
当Handle方法接收到Pingreq类型的请求时,那么就进入了Pingreq的处理:
func (s *session) Handle() {
···
for {
pkt, err = s.conn.Receive()
···
switch p := pkt.(type) {
case *packet.Pingreq:
s.log.Debugln("received:", p.Type())
err = s.onPingreq(p)
···
}
}
}
进入到onPingreq方法:
func (s *session) onPingreq(p *packet.Pingreq) error {
return s.send(packet.NewPingresp(), true)
}
其实就是发送一个包,表明能够ping到。
当Handle方法接收到Pingreq类型的请求时,那么就进入了Pingreq的处理:
func (s *session) Handle() {
···
for {
pkt, err = s.conn.Receive()
···
switch p := pkt.(type) {
case *packet.Unsubscribe:
s.log.Debugf("received: %s, topics: %v", p.Type(), p.Topics)
err = s.onUnsubscribe(p)
···
}
}
}
下面进入到onUnsubscribe方法中:
func (s *session) onUnsubscribe(p *packet.Unsubscribe) error {
···
for _, topic := range p.Topics {
if _, ok := s.subs[topic]; ok {
err := s.manager.rules.RemoveSinkSub(s.id, topic)
}
···
}
···
return s.send(ack, true)
}
其实就是从RuleManager的Trie树中剔除掉当初session(其实应该是session对应的rule)设置的sinksub。
当Handle方法接收到Disconnect类型的请求时,那么就进入了Disconnect的处理:
func (s *session) Handle() {
···
for {
pkt, err = s.conn.Receive()
···
case *packet.Disconnect:
s.log.Debugln("received:", p.Type())
s.close(false)
return
···
}
}
}
下面进入到close方法中:
// Close closes this session, only called by session manager
func (s *session) close(will bool) {
s.once.Do(func() {
s.tomb.Kill(nil)
···
s.manager.remove(s.id)
if will {
s.sendWillMessage()
}
s.conn.Close()
···
})
}
// Called by session when error raises
func (m *Manager) remove(id string) {
m.sessions.Remove(id)
err := m.rules.RemoveRule(id)
if err != nil {
m.log.WithError(err).Debugf("failed to remove rule")
}
}
其实就是移除sessionManager中保存的session、ruleManager中保存的rule,并且移除掉Trie树中所有的rule相关的sinksub(是在err := m.rules.RemoveRule(id)
中执行的),最后关掉连接,关掉协程。
当之前消息类型处理中出现异常或者消息类型不合法(即不是规定的类型),那么就会关掉这个session的连接:
func (s *session) Handle() {
···
if err != nil {
s.log.Errorf(err.Error())
s.close(true)
break
}
}