本文主要研究一下tempo的ExclusiveQueues
tempo/pkg/flushqueues/exclusivequeues.go
type ExclusiveQueues struct {
queues []*util.PriorityQueue
index *atomic.Int32
activeKeys sync.Map
}
ExclusiveQueues定义了queues、index、activeKeys属性
tempo/pkg/flushqueues/exclusivequeues.go
// New creates a new set of flush queues with a prom gauge to track current depth
func New(queues int, metric prometheus.Gauge) *ExclusiveQueues {
f := &ExclusiveQueues{
queues: make([]*util.PriorityQueue, queues),
index: atomic.NewInt32(0),
}
for j := 0; j < queues; j++ {
f.queues[j] = util.NewPriorityQueue(metric)
}
return f
}
New方法先创建ExclusiveQueues,然后根据指定的queue个数通过util.NewPriorityQueue(metric)创建PriorityQueue
tempo/pkg/flushqueues/exclusivequeues.go
// Enqueue adds the op to the next queue and prevents any other items to be added with this key
func (f *ExclusiveQueues) Enqueue(op util.Op) {
_, ok := f.activeKeys.Load(op.Key())
if ok {
return
}
f.activeKeys.Store(op.Key(), struct{}{})
f.Requeue(op)
}
Enqueue方法先从activeKeys查找指定的key,若已经存在则提前返回,不存在则放入activeKeys中,然后执行f.Requeue(op)
tempo/pkg/flushqueues/exclusivequeues.go
// Requeue adds an op that is presumed to already be covered by activeKeys
func (f *ExclusiveQueues) Requeue(op util.Op) {
flushQueueIndex := int(f.index.Inc()) % len(f.queues)
f.queues[flushQueueIndex].Enqueue(op)
}
Requeue方法首先通过
int(f.index.Inc()) % len(f.queues)
计算flushQueueIndex,然后找到对应的queue,执行Enqueue方法
tempo/pkg/flushqueues/exclusivequeues.go
// Dequeue removes the next op from the requested queue. After dequeueing the calling
// process either needs to call ClearKey or Requeue
func (f *ExclusiveQueues) Dequeue(q int) util.Op {
return f.queues[q].Dequeue()
}
Dequeue方法执行f.queues[q]对应queue的Dequeue
tempo/pkg/flushqueues/exclusivequeues.go
// Clear unblocks the requested op. This should be called only after a flush has been successful
func (f *ExclusiveQueues) Clear(op util.Op) {
f.activeKeys.Delete(op.Key())
}
Clear方法将指定key从activeKeys中移除
tempo/pkg/flushqueues/exclusivequeues.go
// Stop closes all queues
func (f *ExclusiveQueues) Stop() {
for _, q := range f.queues {
q.Close()
}
}
Stop方法遍历f.queues,挨个执行q.Close()
tempo的ExclusiveQueues定义了queues、index、activeKeys属性;它提供了Enqueue、Requeue、Dequeue、Clear、Stop方法。