当前位置: 首页 > 工具软件 > OpenEdge > 使用案例 >

openedge-hub模块请求处理源码浅析——百度BIE边缘侧openedge项目源码阅读(2)

韦飞尘
2023-12-01

前言

openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1)一文中浅析了openedge-hub模块的启动过程,openedge-hub为每一个连接的请求创建一个实体——session,这个实体负责创建后会话的处理(这部分代码位于openedge/openedge-hub/session/session_handle.go),比如说处理连接、发布、发布确认、订阅、取消订阅、断开连接等请求,下面分别进行解析。

预处理

当接收到来自客户端的连接请求后,openedge-hub底层依赖的gomqtt库为其创建了一个连接,openedge-hub在其之上封装了session及其session相关的操作。
在创建了session后,还需要进行如下处理如下代码:

pkt, err = s.conn.Receive()
		if err != nil {
			if !s.tomb.Alive() {
				return
			}
			s.log.WithError(err).Warnf("failed to reveive message")
			s.close(true)
			return
		}
		if _, ok := pkt.(*packet.Connect); !ok && s.authorizer == nil {
			s.log.Errorf("only connect packet allowed before auth")
			s.close(true)
			return
		}

这段代码是位于session_handle中的Handle方法中,也是后续所有种类packet处理逻辑的入口,主要是通过阻塞接收请求,并看一下这个会话有没有授权,代码中也有显示,只有Connect类型的请求才可以不需要授权,其他必须要求授权。

Connect处理

处理连接的入口是Handle方法的如下部分:

func (s *session) Handle() {
	···
	for {
		pkt, err = s.conn.Receive()
		···
		switch p := pkt.(type) {
		case *packet.Connect:
			s.log.Debugln("received:", p.Type())
			err = s.onConnect(p)
		···
		}
	}
}

再继续看onConnect方法:

func (s *session) onConnect(p *packet.Connect) error {
	···
	authorizer := s.manager.auth.AuthenticateAccount(p.Username, p.Password)
	if authorizer == nil {
		s.sendConnack(packet.BadUsernameOrPassword)
		return fmt.Errorf("username (%s) or password not permitted", p.Username)
	}
	s.authorizer = authorizer
	
	p.Will != nil {
		//处理一下will中的publish是否允许,与onPublish逻辑差不多,后面讲
		···
	}
	var err error
	s.clientID = p.ClientID
	s.clean = p.CleanSession
	if p.ClientID == "" {
		s.id = common.PrefixTmp + uuid.Generate().String()
		s.clean = true
	} else {
		s.id = common.PrefixSess + p.ClientID
	}
	err = s.manager.register(s)
	···
	err = s.sendConnack(packet.ConnectionAccepted)
	if err != nil {
		return err
	}
	err = s.manager.rules.StartRule(s.id)
	···
	return nil
}

这段代码分为三个部分,第一部分进行验证授权,第二个部分是向sessionManager和ruleManager注册(ruleManager注册的过程包括在sessionManager中),第三部分就是向客户端返回连接成功并开启rule。

第一部分:验证授权

在sessionManager进行初始化的时候,曾经创建过一个Auth对象,这个对象保存了用户名、密码、以及对应每个用户的发布、订阅的权限,AuthenticateAccount方法就是根据用户名、密码验证是不是正确,并返回相应的authorizer:

// AuthenticateAccount auth client account, then return authorizer if pass
func (a *Auth) AuthenticateAccount(username, password string) *Authorizer {
	_account, ok := a.accounts[username]
	if ok && len(password) > 0 && strings.Compare(encodePassword(password), _account.Password) == 0 {
		return _account.Authorizer
	}
	return nil
}

authorizer中存储了该用户发布、订阅的权限,后面用户发布、订阅的时候都要用这个对象进行发布、订阅的权限验证。
顺便这里有一个p.will的处理,这个处理就是在连接的时候会发送可能订阅的一些内容,这里也需要这个authorizer进行权限的验证。

第二部分:sessionManager注册

这里为session赋予id,就是一个前缀加上客户端id(如果没有客户端id就随机生成),然后向sessionManager进行注册:

// Called by session during onConnect
func (m *Manager) register(sess *session) error {
	···
	m.sessions.Set(sess.id, sess)
	//Blank: 使得消息能够返回客户端的关键
	return m.rules.AddRuleSess(sess.id, !sess.clean, sess.publish, sess.republish)
}

// AddRuleSess adds a new rule for session during running
func (m *Manager) AddRuleSess(id string, persistent bool, publish, republish common.Publish) error {
	···
	m.rules.Set(id, newRuleSess(id, persistent, m.broker, m.trieq0, publish, republish))
	return nil
}

func newRuleSess(id string, p bool, b broker, r *router.Trie, publish, republish common.Publish) base {
	return newRuleBase(id, p, b, r, publish, republish)
}

sessionManager注册就是向sessionManager中保存session的实例,之后调用ruleManager添加一个rule(可以看到这里最终调用的还是newRuleBase方法,与之前ruleManager创建RuleMsgQ0和RuleTopic的方法一样),这个rule的id就是session的id,这里要注意两个参数,publish、republish方法,这两个参数最终其实是传递到这个Rule的msgchan中:

func (s *session) publish(msg common.Message) {
   pub := new(packet.Publish)
   pub.Message.QOS = packet.QOS(msg.TargetQOS)
   pub.Message.Topic = msg.TargetTopic
   pub.Message.Payload = msg.Payload
   pub.Message.Retain = msg.Retain
   if msg.TargetQOS == 1 {
   	pid := s.pids.Set(&msg)
   	pub.ID = packet.ID(pid)
   }
   if err := s.send(pub, true); err != nil {
   	s.close(true)
   }
}

上面的方法就是return m.rules.AddRuleSess(sess.id, !sess.clean, sess.publish, sess.republish)语句中的publish参数,结合上一篇博客,很容易想到,因为RuleMsgQ0在接收到broker的消息后,会在Trie树中找到对应的sinksub ,sinksub中会进行封装,这里就把消息的TargetTopic赋值了,如果是在RuleTopic中的消息,TargetTopic会变成相应sinksub的TargetTopic(也就是进行转发的路径),如果不是在RuleTopic的话,那么TargetTopic就会与message的Topic相同:

// Flow flows message
func (s *sinksub) Flow(msg common.Message) {
	// set target topic
	if s.ttopic != "" {
		msg.TargetTopic = s.ttopic
	} else {
		msg.TargetTopic = msg.Topic
	}
	···
}

不论怎样,最终到达publish方法的Message的TargetTopic就是要发送的Topic,比如说定义了/a的消息发送到/a/b中,那么最终到达publish方法中的Message的TargetTopic就是/a/b,publish方法最终会err := s.send(pub, true)将消息发送到客户端。

第三部分:开启rule

首先到达这一步时会先向客户端发送连接通过的信息,之后开启这个客户端的rule,其实与开启RuleMsgQ0和RuleTopic的逻辑是相同的,其实都是开启msgchan,等待msgchan中channel的另一端发送数据,当接收到消息后会调用msgchan的process方法:

// ProcessingQ1 processing message with QOS=1
func (c *msgchan) goProcessingQ1() error {
	···
loop:
	for {
		select {
		case <-c.msgtomb.Dying():
			break loop
		case msg := <-c.msgq1:
			c.process(msg)
		}
	}
	···
}

func (c *msgchan) process(msg *common.Message) {
	if msg.QOS == 0 {
		c.publish(*msg)
		return
	}
	···
}

可以看到接收到消息后就进入了process方法,process方法又会调用c.publish方法,这个方法就是刚刚创建rule时传入的那个publish方法。

publish处理

当Handle方法接收到Publish类型的请求时,那么就进入了Publish的处理:

func (s *session) Handle() {
	···
	for {
		pkt, err = s.conn.Receive()
		···
		switch p := pkt.(type) {
		case *packet.Connect:
			s.log.Debugln("received:", p.Type())
			err = s.onConnect(p)
		···
		}
	}
}

接下来进入onConnect方法:

func (s *session) onPublish(p *packet.Publish) error {
	if _, ok := s.permittedPublishTopics[p.Message.Topic]; !ok {
		// TODO: remove?
		if !common.PubTopicValidate(p.Message.Topic) {
			return fmt.Errorf("publish topic (%s) invalid", p.Message.Topic)
		}
		if !s.authorizer.Authorize(auth.Publish, p.Message.Topic) {
			return fmt.Errorf("publish topic (%s) not permitted", p.Message.Topic)
		}
		s.permittedPublishTopics[p.Message.Topic] = struct{}{}
	}
	···
	msg := common.NewMessage(uint32(p.Message.QOS), p.Message.Topic, p.Message.Payload, s.clientID)
	···
	s.manager.flow(msg)
	return nil
}

首先进行鉴定这个主题的消息是否有权限进行发布,session中有一个属性permittedPublishTopics map[string]struct{}就是用来存储已经授权可以发布的主题,如果这个主题是第一次发布,那么就进入第一个if-else中。
在这里面先通过PubTopicValidate方法鉴定要发布主题的名称是不是合法,看看其具体逻辑:


const (
	// topic validate fields
	MaxSlashCount          = 8
	MaxTopicNameLen        = 255
	// wildcard topic fields
	TopicSeparator    = "/"
	SingleWildCard    = "+"
	MultipleWildCard  = "#"
	SysCmdPrefix      = "$"
)
// PubTopicValidate validate MQTT publish topic
func PubTopicValidate(topic string) bool {
	if topic == "" {
		return false
	}
	if len(topic) > MaxTopicNameLen || strings.Contains(topic, "\u0000") ||
		strings.Count(topic, TopicSeparator) > MaxSlashCount {
		return false
	}
	if ContainsWildcard(topic) {
		return false
	}
	if isSysTopic(topic) {
		return false
	}
	return true
}

从上面的代码中可以看到一些规定:

  1. topic不能为空(“”)
  2. 长度不能超过255
  3. 使用“/“字符不能超过8次
  4. 不能包含“\u0000”字符
  5. 不能包含“+”字符
  6. 不能包含“$”字符

验证过发布主题的名称后,通过之前connect中为session添加的authorizer验证该主题是否具有发布的权限,如果具有发布的权限那么在permittedPublishTopics中记录下来,这样下次发布该主题的消息就不用再重新认证了。
验证过后,就将构建一个Message,然后将Message使用s.manager.flow(调用session对应的sessionManager的flow方法)把进行消息发布:

// Flow flows message to broker
func (b *Broker) Flow(msg *common.Message) {
	···
	select {
	case b.msgQ0Chan <- msg:
	case <-b.tomb.Dying():
		b.log.Debugf("flow message (qos=0) failed since broker closed")
	}
}

这个方法是之前在构造sessionManager的时候赋予sessionManager的,也就是s.manager.flow执行的方法,其目的就是向broker的channel(msgQ0Chan)发送消息,之后由RuleMsgQ0来进行消息的消费:

func (s *sink) goRoutingQ0() error {
	var msg *common.Message
	for {
		select {
		case <-s.tomb.Dying():
			return nil
		case msg = <-s.broker.MsgQ0Chan():
			matches := s.trieq0.MatchUnique(msg.Topic)
			for _, sub := range matches {
				sub.Flow(*msg)
			}
		}
	}
}

这个方法的具体逻辑在上一篇openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1)有详细说到,这里简单的说一下就是RuleMsgQ0在这个方法中读取消息后,遍历在Trie树中注册的sinksub,因为sinksub中拥有对应session的rule的msgchan,然后通过这个msgchan把相应消息发送到对应的sinksub,如果sinksub对应的是RuleTopic的话会依据配置文件中的subscription来修改消息,并将消息发送到broker中,继续循环上述的过程,如果sinksub对应的是session的rule的话,那么对应session的msgchan读取这个消息,并调用publish方法将消息发送回客户端。

Subscribe处理

当Handle方法接收到Subscribe类型的请求时,那么就进入了Subscribe的处理:

func (s *session) Handle() {
	···
	for {
		pkt, err = s.conn.Receive()
		···
		switch p := pkt.(type) {
		case *packet.Subscribe:
			s.log.Debugf("received: %s, subs: %v", p.Type(), p.Subscriptions)
			err = s.onSubscribe(p)
		···
		}
	}
}

下面进入onSubscribe方法中:

func (s *session) onSubscribe(p *packet.Subscribe) error {
	ack := packet.NewSuback()
	rv := s.genSubAck(p.Subscriptions)
	for i, sub := range p.Subscriptions {
		if rv[i] == packet.QOSFailure {
			s.log.Errorf("failed to subscribe topic (%s)", sub.Topic)
			continue
		}
		if _, ok := s.subs[sub.Topic]; !ok {
			err := s.manager.rules.AddSinkSub(s.id, s.id, uint32(sub.QOS), sub.Topic, uint32(sub.QOS), "")
			···
			s.log.Infof("topic (%s) subscribed", sub.Topic)
			s.subs[sub.Topic] = sub
			···
		} else {
			if s.subs[sub.Topic].QOS != sub.QOS {
				//重新向Trie树中添加sinksub,之后移除原有的sinksub
				···
			}
		}
	}
	ack.ID = p.ID
	ack.ReturnCodes = rv
	err := s.send(ack, true)
	if err != nil {
		return err
	}
	return s.sendRetainMessage(p)
}

这里构建了一个rv,这个主要是验证订阅的每一个subscription合法性,这里就不进入源码看了,直接翻译过来.

订阅时主题合法性检验:

  1. topic不能为“”
  2. 主题名称长度不能超过255
  3. 主题名称不能包含“\u0000”
  4. 主题名称中“/”不能超过8个
  5. 主题名称中的“#”只能位于最后以“/”后
  6. “#”字符不与其他字符混用,比如/ab#c、/c#、/#a都是错误的,/…/#正确
  7. “+”字符不与其他字符混用,比如/ab+c、/c+、/+c都是错误的,/…/+/…正确

在genSubAck方法中对于不符合上述规定的subscription都设置rv[index]=QOSFailure,之后对于通过合法性检验的subscription向RuleManager中添加sinkSub,这样就把sinkSub添加到RuleManager中(具体添加过程,在上一篇博客中详细讲到),这样在RuleMsgQ0从broker的channel中读取到数据后就可以根据订阅的主题找到对应的sinksub,并且发送消息了。

Pingreq处理

当Handle方法接收到Pingreq类型的请求时,那么就进入了Pingreq的处理:

func (s *session) Handle() {
	···
	for {
		pkt, err = s.conn.Receive()
		···
		switch p := pkt.(type) {
		case *packet.Pingreq:
			s.log.Debugln("received:", p.Type())
			err = s.onPingreq(p)
		···
		}
	}
}

进入到onPingreq方法:

func (s *session) onPingreq(p *packet.Pingreq) error {
	return s.send(packet.NewPingresp(), true)
}

其实就是发送一个包,表明能够ping到。

Unsubscribe处理

当Handle方法接收到Pingreq类型的请求时,那么就进入了Pingreq的处理:

func (s *session) Handle() {
	···
	for {
		pkt, err = s.conn.Receive()
		···
		switch p := pkt.(type) {
		case *packet.Unsubscribe:
			s.log.Debugf("received: %s, topics: %v", p.Type(), p.Topics)
			err = s.onUnsubscribe(p)
		···
		}
	}
}

下面进入到onUnsubscribe方法中:

func (s *session) onUnsubscribe(p *packet.Unsubscribe) error {
	···
	for _, topic := range p.Topics {
		if _, ok := s.subs[topic]; ok {
			err := s.manager.rules.RemoveSinkSub(s.id, topic)
		}
		···
	}
	···
	return s.send(ack, true)
}

其实就是从RuleManager的Trie树中剔除掉当初session(其实应该是session对应的rule)设置的sinksub。

Disconnect处理

当Handle方法接收到Disconnect类型的请求时,那么就进入了Disconnect的处理:

func (s *session) Handle() {
	···
	for {
		pkt, err = s.conn.Receive()
		···
		case *packet.Disconnect:
			s.log.Debugln("received:", p.Type())
			s.close(false)
			return
		···
		}
	}
}

下面进入到close方法中:

// Close closes this session, only called by session manager
func (s *session) close(will bool) {
	s.once.Do(func() {
		s.tomb.Kill(nil)
		···
		s.manager.remove(s.id)
		if will {
			s.sendWillMessage()
		}
		s.conn.Close()
		···
	})
}

// Called by session when error raises
func (m *Manager) remove(id string) {
	m.sessions.Remove(id)
	err := m.rules.RemoveRule(id)
	if err != nil {
		m.log.WithError(err).Debugf("failed to remove rule")
	}
}

其实就是移除sessionManager中保存的session、ruleManager中保存的rule,并且移除掉Trie树中所有的rule相关的sinksub(是在err := m.rules.RemoveRule(id)中执行的),最后关掉连接,关掉协程。

非法消息类型处理

当之前消息类型处理中出现异常或者消息类型不合法(即不是规定的类型),那么就会关掉这个session的连接:

func (s *session) Handle() {
	···
	if err != nil {
			s.log.Errorf(err.Error())
			s.close(true)
			break
		}
}
 类似资料: