当前位置: 首页 > 工具软件 > gnatsd > 使用案例 >

gnatsd的Subject数据结构及优于mosquitto的原因

宗政子辰
2023-12-01

主题名(Subject Name)


主题名是大小写敏感的,必须是非空字符串,不能包含空格,使用“.”符号来分层,mqtt中使用“/”分层。
星号“*”匹配一层,跟mqtt中的“+”一样。
大于号“>”匹配后面所有层,跟mqtt中的“#”一样。

源码分析


代码都在sublist.go
先列出数据结构:

// A Sublist stores and efficiently retrieves subscriptions.
type Sublist struct {
    sync.RWMutex
    genid     uint64
    matches   uint64
    cacheHits uint64
    inserts   uint64
    removes   uint64
    cache     map[string]*SublistResult
    root      *level
    count     uint32
}
// A level represents a group of nodes and special pointers to
// wildcard nodes.
type level struct {
    nodes    map[string]*node
    pwc, fwc *node //pwc代表'*'节点,fwc代表'>'节点
}
// A node contains subscriptions and a pointer to the next level.
type node struct {
    next  *level
    psubs []*subscription //普通订阅者列表
    qsubs [][]*subscription //queue订阅者列表
}
// New will create a default sublist
func NewSublist() *Sublist {
    return &Sublist{root: newLevel(), cache: make(map[string]*SublistResult)}
}
// Create a new default level. We use FNV1A as the hash
// algorithm for the tokens, which should be short.
//FNV1A?历史遗留注释吧,这儿分明直接用了golang自带的map哈希。
func newLevel() *level {
    return &level{nodes: make(map[string]*node)}
}

一开始使用NewSublist创建一个Sublist。Sublist保存了所有subject。
Sublist初始化时创建了root节点。


来看看怎么插入一个subject:


// Insert adds a subscription into the sublist
func (s *Sublist) Insert(sub *subscription) error {
    // copy the subject since we hold this and this might be part of a large byte slice.
    subject := string(sub.subject)
    tsa := [32]string{}
    tokens := tsa[:0]
    start := 0
    for i := 0; i < len(subject); i++ {
        if subject[i] == btsep {
            tokens = append(tokens, subject[start:i])
            start = i + 1
        }
    }
    tokens = append(tokens, subject[start:])

    s.Lock()

    sfwc := false
    l := s.root
    var n *node

    for _, t := range tokens {
        lt := len(t)
        if lt == 0 || sfwc { //如果此层长度为0或者上一层已经是'>'了,表示Subject是非法的
            s.Unlock()
            return ErrInvalidSubject
        }

        if lt > 1 { //不是*和>,直接map定位
            n = l.nodes[t]
        } else {
            switch t[0] {
            case pwc:
                n = l.pwc
            case fwc:
                n = l.fwc
                sfwc = true //表示此层只能是最后一层
            default: //不是*和>,直接map定位
                n = l.nodes[t]
            }
        }
        if n == nil { //node节点还没有则创建
            n = newNode()
            if lt > 1 {
                l.nodes[t] = n
            } else {
                switch t[0] {
                case pwc:
                    l.pwc = n
                case fwc:
                    l.fwc = n
                default:
                    l.nodes[t] = n
                }
            }
        }
        if n.next == nil {
            n.next = newLevel()
        }
        l = n.next //下一层
    }
    //上面循环结束后此时n是最后一层的node节点
    if sub.queue == nil { //不是queue,把sub加到psubs中。psubs切片存储了所有订阅此subject的client
        n.psubs = append(n.psubs, sub)
    } else {
        // This is a queue subscription
        if i := findQSliceForSub(sub, n.qsubs); i >= 0 {
            n.qsubs[i] = append(n.qsubs[i], sub)
        } else {
            n.qsubs = append(n.qsubs, []*subscription{sub})
        }
    }

    s.count++
    s.inserts++

    s.addToCache(subject, sub)
    atomic.AddUint64(&s.genid, 1)

    s.Unlock()
    return nil
}

从Insert方法中可以理出整个数据结构:
Sublist第一个层节点是root,root是个level结构,level代表一层。level包含了一个nodes map,nodes存储了此层的所有node,pwc和fwc分别代表了*和> node 。
node的next指向了下一层level,node的psubs存储了普通subject订阅者client,qsubs存储的是queue类别的subject订阅者client。

整个list是个树结构,只不过每层的node节点使用map哈希存储。

先不管Cache干什么用的,先来看看查找匹配:


// matchLevel is used to recursively descend into the trie.
func matchLevel(l *level, toks []string, results *SublistResult) {
    var pwc, n *node
    for i, t := range toks {
        if l == nil {
            return
        }
        if l.fwc != nil { //全匹配,把下面的所有订阅者都加入到results中
            addNodeToResults(l.fwc, results)
        }
        if pwc = l.pwc; pwc != nil { //层匹配,递归子层
            matchLevel(pwc.next, toks[i+1:], results)
        }
        n = l.nodes[t] //查找节点
        if n != nil {  //找到继续下一层
            l = n.next
        } else {
            l = nil
        }
    }
    if n != nil { //找到节点,把订阅者加入到results中
        addNodeToResults(n, results)
    }
    if pwc != nil { //最后一层*通配符的订阅者加入到results中
        addNodeToResults(pwc, results)
    }
}

从指定level比如root开始遍历匹配下面的每一层nodes,如果匹配则把订阅者加入到result中,注意通配符的处理。

每一层使用map快速定位node,使用切片存储此层所有订阅者。
因为使用了map,查询定位比mosquitto的遍历链表树快的多。何况qnatsd还做了Cache。

下面再来看看Cache:


因为查找一个subject的所有订阅者比较费时间,所以使用cache缓存一部分subject订阅者信息,每次查找先去cache中查找,找不到再去sublist中查找,如果找到就加入到cache中,新增一个subject时也要加入到cache中。
cache也有数量限制,当超过一定数量时删除最早的部分cache,防止cache过多。

 类似资料: