当前位置: 首页 > 工具软件 > Shard-Query > 使用案例 >

Influxdb源码分析-Shard和TSM Engine Cache

乜烨霖
2023-12-01

前言

这是一个influxdb源码分析系列的文章,上一章分析了Store结构,Store是数据库核心功能的一个总体抽象。Store里面包含了其他核心组件,例如Shard,series,index等。本篇文章分析的是Shard结构。

Shard 的定位

Shard是一个物理的概念,在存储时,最小的存储粒度就是Shard。Shard封装了单个分片的存储查询功能。这个结构定义在了tsdb/shard.go中,首先看一下具体的结构:

type Shard struct {
	path    string
	walPath string
	id      uint64

	database        string
	retentionPolicy string

	sfile   *SeriesFile
	options EngineOptions

	mu      sync.RWMutex
	_engine Engine
	index   Index
	enabled bool
	// expvar-based stats.
	stats       *ShardStatistics
	defaultTags models.StatisticTags

	baseLogger *zap.Logger
	logger     *zap.Logger

	EnableOnOpen bool
	CompactionDisabled bool
}

这里面的一些结构在之前的文章都有详细的分析,如series,index,所以这里就不再分析。除此之外,还有一个核心结构:Engine,这个是存储引擎,负责数据真正写入到influxdb中。我们本篇文章说是分析Shard,其实主要是分析Engine结构。

Engine

上面提到了,Engine是shard的存储引擎,负责读写数据用的。和大多数结构一样,为了能够更好地拓展,Engine也定义了一个顶层的抽象,在tsdb/engine.go中。部分结构如下:

// Engine represents a swappable storage engine for the shard.
type Engine interface {
	Open() error
	Close() error
	SetEnabled(enabled bool)
	SetCompactionsEnabled(enabled bool)
	ScheduleFullCompaction() error

	WithLogger(*zap.Logger)

	LoadMetadataIndex(shardID uint64, index Index) error

	CreateSnapshot() (string, error)
	Backup(w io.Writer, basePath string, since time.Time) error
	Export(w io.Writer, basePath string, start time.Time, end time.Time) error
	Restore(r io.Reader, basePath string) error
	Import(r io.Reader, basePath string) error
	Digest() (io.ReadCloser, int64, error)

	CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error)
	CreateCursorIterator(ctx context.Context) (CursorIterator, error)
	IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error)
	WritePoints(points []models.Point) error

	CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
	CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
	DeleteSeriesRange(itr SeriesIterator, min, max int64) error
	DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error

	MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
	SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
	SeriesN() int64

	MeasurementExists(name []byte) (bool, error)

	MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
	MeasurementFieldSet() *MeasurementFieldSet
	MeasurementFields(measurement []byte) *MeasurementFields
	ForEachMeasurementName(fn func(name []byte) error) error
	DeleteMeasurement(name []byte) error

	HasTagKey(name, key []byte) (bool, error)
	MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
	TagKeyCardinality(name, key []byte) int

	// Statistics will return statistics relevant to this engine.
	Statistics(tags map[string]string) []models.Statistic
	LastModified() time.Time
	DiskSize() int64
	IsIdle() bool
	Free() error

	io.WriterTo
}

这里的设计和Index部分很像。不是很了解的可以去看看influxdb源码解析-inmem index 如果想要自己实现一个存储引擎。需要做:

  • 实现上述接口
  • 注册

这个流程和index是一致的,对于每个引擎的具体实现:

// NewEngineFunc creates a new engine.
type NewEngineFunc func(id uint64, i Index, path string, walPath string, sfile *SeriesFile, options EngineOptions) Engine

// newEngineFuncs is a lookup of engine constructors by name.
var newEngineFuncs = make(map[string]NewEngineFunc)

// RegisterEngine registers a storage engine initializer by name.
func RegisterEngine(name string, fn NewEngineFunc) {
	if _, ok := newEngineFuncs[name]; ok {
		panic("engine already registered: " + name)
	}
	newEngineFuncs[name] = fn
}

需要注册一个NewEngineFunc到map中(key是engine的名字),作为当前engine的构造函数。使用的时候根据当前启动参数选择不同的engine:

func NewEngine(id uint64, i Index, path string, walPath string, sfile *SeriesFile, options EngineOptions) (Engine, error) {
	// Create a new engine
	if _, err := os.Stat(path); os.IsNotExist(err) {
		engine := newEngineFuncs[options.EngineVersion](id, i, path, walPath, sfile, options)
		if options.OnNewEngine != nil {
			options.OnNewEngine(engine)
		}
		return engine, nil
	}

	// If it's a dir then it's a tsm1 engine
	format := DefaultEngine // 默认是tsm 
	if fi, err := os.Stat(path); err != nil {
		return nil, err
	} else if !fi.Mode().IsDir() {
		return nil, ErrUnknownEngineFormat
	} else {
		format = "tsm1"
	}
	// Lookup engine by format.
	fn := newEngineFuncs[format] // 寻找实现
	if fn == nil {
		return nil, fmt.Errorf("invalid engine format: %q", format)
	}

	engine := fn(id, i, path, walPath, sfile, options)
	if options.OnNewEngine != nil {
		options.OnNewEngine(engine)
	}
	return engine, nil
}

具体的实现在tsdb/engine/tsm1下,实现的是一个基于TSM(Time Series Merge Tree)引擎。看一下这里的实现。

TSM Engine

TSM Engine是influxdb实现的一个存储时序数据的存储引擎,结构的定义在tsdb/engine/tsm1/engine.go 这个结构的字段很多,只关心核心逻辑,做一下简化:

  type Engine struct {
    mu sync.RWMutex
    index tsdb.Index
    
    id           uint64
    path         string
    sfile        *tsdb.SeriesFile

    fieldset *tsdb.MeasurementFieldSet

    WAL            *WAL
    Cache          *Cache
    Compactor      *Compactor
    CompactionPlan CompactionPlanner
    FileStore      *FileStore
}

这个结构主要分为三部分

  • index 和series 信息。database 粒度的
  • id,path,fieldSet 基本信息
  • WAL,Cache,Compactor,FileStore TSM 相关。

index和series之前有过介绍,id,path也不再细说。主要来分析 TSM相关的部分。

Engine Cache

Cache部分作为LSM Based的存储引擎是一个基本的组件。主要功能是缓存磁盘上的数据。提供series-> seires Value 的映射。Cache结构位于tsdb/engine/tsm1/cache.go里。

   type Cache struct {
    mu      sync.RWMutex
    store   storer
    snapshot     *Cache
    snapshotting bool
    stats         *CacheStatistics   
}

简化过后如上,核心字段是两个:store和snapshot。store 是一个抽象的interface,可以有多种实现。snapshot是cache的快照。

storer

storer结构是cache 存储数据的部分。结构定义如下:

// storer is the interface that descibes a cache's store.
type storer interface {
	entry(key []byte) *entry                        // Get an entry by its key.
	write(key []byte, values Values) (bool, error)  // Write an entry to the store.
	add(key []byte, entry *entry)                   // Add a new entry to the store.
	remove(key []byte)                              // Remove an entry from the store.
	keys(sorted bool) [][]byte                      // Return an optionally sorted slice of entry keys.
	apply(f func([]byte, *entry) error) error       // Apply f to all entries in the store in parallel.
	applySerial(f func([]byte, *entry) error) error // Apply f to all entries in serial.
	reset()                                         // Reset the store to an initial unused state.
	split(n int) []storer                           // Split splits the store into n stores
	count() int                                     // Count returns the number of keys in the store
}

这里先插一句,可以看到add 的参数是(key []byte,entry),这个entry是当前这个key对应的value集合,封装了一个结构:

type entry struct {
	mu     sync.RWMutex
	values Values // All stored values.

	// The type of values stored. Read only so doesn't need to be protected by
	// mu.
	vtype byte
}

key指的是series key=name+tags+fieldKey

Storer 提供了一些基本函数,例如add,write等。实现crud的功能。storer的实现是:ring.位于tsdb/engine/tsm1/ring.go,具体结构:

type ring struct {
	// Number of keys within the ring. This is used to provide a hint for
	// allocating the return values in keys(). It will not be perfectly accurate
	// since it doesn't consider adding duplicate keys, or trying to remove non-
	// existent keys.
	keysHint int64

	// The unique set of partitions in the ring.
	// len(partitions) <= len(continuum)
	partitions []*partition
}

ring把数据做了一下分桶,分为多个partition。写入数据是,首先要计算写到哪个partition,这里的逻辑和写series ,计算shard 有点像:

func (r *ring) getPartition(key []byte) *partition {
	return r.partitions[int(xxhash.Sum64(key)%uint64(len(r.partitions)))]
}

做了个hash然后取模。partition 结构其实是一个map:

type partition struct {
	mu    sync.RWMutex
	store map[string]*entry
}

所以在得到具体的partition之后,写入到这个partition就成了map数据的添加,这里可能会有并发问题,所以需要加锁

func (p *partition) add(key []byte, entry *entry) {
	p.mu.Lock()
	p.store[string(key)] = entry
	p.mu.Unlock()
}

add 函数不返回值,write函数会返回是否有新的entry写入:

func (p *partition) write(key []byte, values Values) (bool, error) {
	p.mu.RLock()
	e := p.store[string(key)]
	p.mu.RUnlock()
	if e != nil {
		// Hot path.
		return false, e.add(values)
	}

	p.mu.Lock()
	defer p.mu.Unlock()

	// Check again.
	if e = p.store[string(key)]; e != nil {
		return false, e.add(values)
	}

	// Create a new entry using a preallocated size if we have a hint available.
	e, err := newEntryValues(values)
	if err != nil {
		return false, err
	}

	p.store[string(key)] = e
	return true, nil
}

这里的逻辑也不复杂,首先是加读锁,check数据是不是已经存在,然后写入。到这里一些核心的函数基本就分析完毕了。得到以下结论:

  • Cache的真正存数据是交给了storer的一个实现ring
  • ring是一个curde hash ring,里面会分多个partition 缓存数据
  • partition 本质上市一个map结构,key是series key,value是这个series 对应的value集合。

接下来看看Cache本身的一些函数

Cache Init和Free

Cache的吃初始化和清空

// init initializes the cache and allocates the underlying store.  Once initialized,
// the store re-used until Freed.
func (c *Cache) init() {
	if !atomic.CompareAndSwapUint32(&c.initializedCount, 0, 1) {
		return
	}

	c.mu.Lock()
	c.store, _ = newring(ringShards)
	c.mu.Unlock()
}

// Free releases the underlying store and memory held by the Cache.
func (c *Cache) Free() {
	if !atomic.CompareAndSwapUint32(&c.initializedCount, 1, 0) {
		return
	}

	c.mu.Lock()
	c.store = emptyStore{}
	c.mu.Unlock()
}

初始化时,核心逻辑是初始化storer,指定ring作为实现。Free时改变了store结构的指向,作为清空数据操作。

Cache Write和WriteMulti

写入单条数据:Write

func (c *Cache) Write(key []byte, values []Value) error {
	c.init()
	addedSize := uint64(Values(values).Size())

	// Enough room in the cache?
	limit := c.maxSize
	n := c.Size() + addedSize

	if limit > 0 && n > limit {
		atomic.AddInt64(&c.stats.WriteErr, 1)
		return ErrCacheMemorySizeLimitExceeded(n, limit)
	}

	newKey, err := c.store.write(key, values)
	if err != nil {
		atomic.AddInt64(&c.stats.WriteErr, 1)
		return err
	}

	if newKey {
		addedSize += uint64(len(key))
	}
	// Update the cache size and the memory size stat.
	c.increaseSize(addedSize)
	c.updateMemSize(int64(addedSize))
	atomic.AddInt64(&c.stats.WriteOK, 1)

	return nil
}

这里委托给了store.write执行真正的写入。

写入多条数据:WriteMulti,代码有点长,做一下简化

func (c *Cache) WriteMulti(values map[string][]Value) error {
    c.init()
    var werr error
    c.mu.RLock()
    store := c.store
    c.mu.RUnlock()

    for k, v := range values {
        newKey, err := store.write([]byte(k), v)
    }
    return werr
}

核心逻辑就是遍历values map,循环调用单条写入。说到这里,多提一句,还记得这个函数在哪里调用的吗?在之前的文章里分析写入的时候influxdb 数据写入 倒数第二段:


	// first try to write to the cache
	if err := e.Cache.WriteMulti(values); err != nil {
		return err
	}

	if e.WALEnabled {
		if _, err := e.WAL.WriteMulti(values); err != nil {
			return err
		}
	}

	// if requested, store points written stats
	if pointsWritten, ok := ctx.Value(tsdb.StatPointsWritten).(*int64); ok {
		*pointsWritten = npoints
	}

	// if requested, store values written stats
	if valuesWritten, ok := ctx.Value(tsdb.StatValuesWritten).(*int64); ok {
		*valuesWritten = nvalues
	}

数据在shard内部的真正写入逻辑,首先是写入到Cache,然后才是写入到WAL。这里调用的就是这个方法,好了,现在任督二脉,快打通了,和之前的连起来了!

Cache Snapshot和ClearSnapshot

snapshot是快照功能,快照的生成和清理,其实对应的就是Cache结构的snapshot字段的赋值和清除。代码比较长,简化一下:

生成快照:

func (c *Cache) Snapshot() (*Cache, error) {
    c.init()
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.snapshotting {
        return nil, ErrSnapshotInProgress
    }
    c.snapshotting = true
    c.snapshotAttempts++ // increment the number of times we tried to do this
    // If no snapshot exists, create a new one, otherwise update the existing snapshot
    if c.snapshot == nil {
        store, err := newring(ringShards)
        if err != nil {
            return nil, err
        }

        c.snapshot = &Cache{
            store: store,
        }
    }

    return c.snapshot, nil
}

清理快照:

func (c *Cache) ClearSnapshot(success bool) {
    c.init()

    c.mu.RLock()
    snapStore := c.snapshot.store
    c.mu.RUnlock()

    // reset the snapshot store outside of the write lock
    if success {
        snapStore.reset()
    }
}

这个主要依赖了ring的reset 实现,目的是把map里面的所有数据清理掉。

Cache Split

除了这些比较重要的操作之外,Cache还有一个重要的操作:split.这个是分割cache的,后面会提到,Cache会做compact生成TSM File,为了加速这个操作,经常会使用多个goroutine 一起生成,所以会把Cache 分割成多份,每份一个goroutine

func (c *Cache) Split(n int) []*Cache {
	if n == 1 {
		return []*Cache{c}
	}

	caches := make([]*Cache, n)
	storers := c.store.split(n)
	for i := 0; i < n; i++ {
		caches[i] = &Cache{
			store: storers[i],
		}
	}
	return caches
}

这里委托给了storer的实现,做split。

func (r *ring) split(n int) []storer {
	var keys int
	storers := make([]storer, n)
	for i := 0; i < n; i++ {
		storers[i], _ = newring(len(r.partitions))
	}

	for i, p := range r.partitions {
		r := storers[i%n].(*ring)
		r.partitions[i] = p
		keys += len(p.store)
	}
	return storers
}

这里会生成多个Cache的实现(ring),然后把当前的partition赋值到这些新的ring里面。

CacheLoader

上面分析了一下Cache的一些核心依赖和函数,那么Cache是怎么初始化的呢?这里的初始化指的不是创建对应的结构,而是灌数据到这个初始化好的结构里面去。这部分逻辑交给了CacheLoader来实现。

type CacheLoader struct {
	files []string

	Logger *zap.Logger
}

// NewCacheLoader returns a new instance of a CacheLoader.
func NewCacheLoader(files []string) *CacheLoader {
	return &CacheLoader{
		files:  files,
		Logger: zap.NewNop(),
	}
}

CacheLoader结构成员是一个files 数组,代表了WAL文件,然后读取这些WAL 文件,来把数据写入到Cache里。这部分逻辑在CacheLoader的Load函数里,代码有点长,简化一下:

func (cl *CacheLoader) Load(cache *Cache) error {
    var r *WALSegmentReader
    for _, fn := range cl.files {
        if err := func() error {
            f, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666)
                r = NewWALSegmentReader(f)
            for r.Next() {
                entry, err := r.Read()
                
                switch t := entry.(type) {
                case *WriteWALEntry:
                    if err := cache.WriteMulti(t.Values); err != nil {
                        return err
                    }
                case *DeleteRangeWALEntry:
                    cache.DeleteRange(t.Keys, t.Min, t.Max)
                case *DeleteWALEntry:
                    cache.Delete(t.Keys)
                }
            }
            return r.Close()
        }(); err != nil {
            return err
        }
    }
    return nil
}

可以看到核心的逻辑是读取WAL,然后根据WAL Entry的类型调用相关的方法,更新cache。

总结

到这里Cache部分算是告一段落。本篇文章主要揭示了:

  • Cache的基本结构
  • Cache是怎么缓存数据的
  • ring的实现和核心函数
  • Cache的核心操作
  • 从WAL中恢复Cache

下一章会分析一下WAL相关的内容。

 类似资料: