当前位置: 首页 > 工具软件 > TSM monitor > 使用案例 >

2022-03-02 Influxdb存储引擎tsm读数跟踪

宣煜
2023-12-01

目录

摘要:

核心函数:

StatementExecutor::executeExplainStatement

preparedStatement::Explain

preparedStatement::Select

buildCursor

Parser::parseFill

Parser::parseSelectStatement

Parser::ScanIgnoreWhitespace

bufScanner::Scan

Scanner::Scan

reader::read

reader::curr


摘要:

influxdb存储引擎tsm读数跟踪, 本文暂时不涉及集群分片的逻辑, 仅关注从请求到引擎查询中间的流程.

核心函数:

StatementExecutor::executeExplainStatement


func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) {
	opt := query.SelectOptions{
		NodeID:      ctx.ExecutionOptions.NodeID,
		MaxSeriesN:  e.MaxSelectSeriesN,
		MaxBucketsN: e.MaxSelectBucketsN,
		Authorizer:  ctx.Authorizer,
	}

	// Prepare the query for execution, but do not actually execute it.
	// This should perform any needed substitutions.
	p, err := query.Prepare(q.Statement, e.ShardMapper, opt)
	if err != nil {
		return nil, err
	}
	defer p.Close()

	plan, err := p.Explain()
	if err != nil {
		return nil, err
	}
	plan = strings.TrimSpace(plan)

	row := &models.Row{
		Columns: []string{"QUERY PLAN"},
	}
	for _, s := range strings.Split(plan, "\n") {
		row.Values = append(row.Values, []interface{}{s})
	}
	return models.Rows{row}, nil
}

preparedStatement::Explain


func (p *preparedStatement) Explain() (string, error) {
	// Determine the cost of all iterators created as part of this plan.
	ic := &explainIteratorCreator{ic: p.ic}
	p.ic = ic
	cur, err := p.Select(context.Background())
	p.ic = ic.ic

	if err != nil {
		return "", err
	}
	cur.Close()

	var buf bytes.Buffer
	for i, node := range ic.nodes {
		if i > 0 {
			buf.WriteString("\n")
		}

		expr := "<nil>"
		if node.Expr != nil {
			expr = node.Expr.String()
		}
		fmt.Fprintf(&buf, "EXPRESSION: %s\n", expr)
		if len(node.Aux) != 0 {
			refs := make([]string, len(node.Aux))
			for i, ref := range node.Aux {
				refs[i] = ref.String()
			}
			fmt.Fprintf(&buf, "AUXILIARY FIELDS: %s\n", strings.Join(refs, ", "))
		}
		fmt.Fprintf(&buf, "NUMBER OF SHARDS: %d\n", node.Cost.NumShards)
		fmt.Fprintf(&buf, "NUMBER OF SERIES: %d\n", node.Cost.NumSeries)
		fmt.Fprintf(&buf, "CACHED VALUES: %d\n", node.Cost.CachedValues)
		fmt.Fprintf(&buf, "NUMBER OF FILES: %d\n", node.Cost.NumFiles)
		fmt.Fprintf(&buf, "NUMBER OF BLOCKS: %d\n", node.Cost.BlocksRead)
		fmt.Fprintf(&buf, "SIZE OF BLOCKS: %d\n", node.Cost.BlockSize)
	}
	return buf.String(), nil
}

preparedStatement::Select

func (p *preparedStatement) Select(ctx context.Context) (Cursor, error) {
	// TODO(jsternberg): Remove this hacky method of propagating now.
	// Each level of the query should use a time range discovered during
	// compilation, but that requires too large of a refactor at the moment.
	ctx = context.WithValue(ctx, "now", p.now)

	opt := p.opt
	opt.InterruptCh = ctx.Done()
	cur, err := buildCursor(ctx, p.stmt, p.ic, opt)
	if err != nil {
		return nil, err
	}

	// If a monitor exists and we are told there is a maximum number of points,
	// register the monitor function.
	if m := MonitorFromContext(ctx); m != nil {
		if p.maxPointN > 0 {
			monitor := PointLimitMonitor(cur, DefaultStatsInterval, p.maxPointN)
			m.Monitor(monitor)
		}
	}
	return cur, nil
}

buildCursor


func buildCursor(ctx context.Context, stmt *influxql.SelectStatement, ic IteratorCreator, opt IteratorOptions) (Cursor, error) {
	span := tracing.SpanFromContext(ctx)
	if span != nil {
		span = span.StartSpan("build_cursor")
		defer span.Finish()

		span.SetLabels("statement", stmt.String())
		ctx = tracing.NewContextWithSpan(ctx, span)
	}

	switch opt.Fill {
	case influxql.NumberFill:
		if v, ok := opt.FillValue.(int); ok {
			opt.FillValue = int64(v)
		}
	case influxql.PreviousFill:
		opt.FillValue = SkipDefault
	}

	fields := make([]*influxql.Field, 0, len(stmt.Fields)+1)
	if !stmt.OmitTime {
		// Add a field with the variable "time" if we have not omitted time.
		fields = append(fields, &influxql.Field{
			Expr: &influxql.VarRef{
				Val:  "time",
				Type: influxql.Time,
			},
		})
	}

	// Iterate through each of the fields to add them to the value mapper.
	valueMapper := newValueMapper()
	for _, f := range stmt.Fields {
		fields = append(fields, valueMapper.Map(f))

		// If the field is a top() or bottom() call, we need to also add
		// the extra variables if we are not writing into a target.
		if stmt.Target != nil {
			continue
		}

		switch expr := f.Expr.(type) {
		case *influxql.Call:
			if expr.Name == "top" || expr.Name == "bottom" {
				for i := 1; i < len(expr.Args)-1; i++ {
					nf := influxql.Field{Expr: expr.Args[i]}
					fields = append(fields, valueMapper.Map(&nf))
				}
			}
		}
	}

	// Set the aliases on each of the columns to what the final name should be.
	columns := stmt.ColumnNames()
	for i, f := range fields {
		f.Alias = columns[i]
	}

	// Retrieve the refs to retrieve the auxiliary fields.
	var auxKeys []influxql.VarRef
	if len(valueMapper.refs) > 0 {
		opt.Aux = make([]influxql.VarRef, 0, len(valueMapper.refs))
		for ref := range valueMapper.refs {
			opt.Aux = append(opt.Aux, *ref)
		}
		sort.Sort(influxql.VarRefs(opt.Aux))

		auxKeys = make([]influxql.VarRef, len(opt.Aux))
		for i, ref := range opt.Aux {
			auxKeys[i] = valueMapper.symbols[ref.String()]
		}
	}

	// If there are no calls, then produce an auxiliary cursor.
	if len(valueMapper.calls) == 0 {
		// If all of the auxiliary keys are of an unknown type,
		// do not construct the iterator and return a null cursor.
		if !hasValidType(auxKeys) {
			return newNullCursor(fields), nil
		}

		itr, err := buildAuxIterator(ctx, ic, stmt.Sources, opt)
		if err != nil {
			return nil, err
		}

		// Create a slice with an empty first element.
		keys := []influxql.VarRef{{}}
		keys = append(keys, auxKeys...)

		scanner := NewIteratorScanner(itr, keys, opt.FillValue)
		return newScannerCursor(scanner, fields, opt), nil
	}

	// Check to see if this is a selector statement.
	// It is a selector if it is the only selector call and the call itself
	// is a selector.
	selector := len(valueMapper.calls) == 1
	if selector {
		for call := range valueMapper.calls {
			if !influxql.IsSelector(call) {
				selector = false
			}
		}
	}

	// Produce an iterator for every single call and create an iterator scanner
	// associated with it.
	scanners := make([]IteratorScanner, 0, len(valueMapper.calls))
	for call := range valueMapper.calls {
		driver := valueMapper.table[call]
		if driver.Type == influxql.Unknown {
			// The primary driver of this call is of unknown type, so skip this.
			continue
		}

		itr, err := buildFieldIterator(ctx, call, ic, stmt.Sources, opt, selector, stmt.Target != nil)
		if err != nil {
			for _, s := range scanners {
				s.Close()
			}
			return nil, err
		}

		keys := make([]influxql.VarRef, 0, len(auxKeys)+1)
		keys = append(keys, driver)
		keys = append(keys, auxKeys...)

		scanner := NewIteratorScanner(itr, keys, opt.FillValue)
		scanners = append(scanners, scanner)
	}

	if len(scanners) == 0 {
		return newNullCursor(fields), nil
	} else if len(scanners) == 1 {
		return newScannerCursor(scanners[0], fields, opt), nil
	}

	return newMultiScannerCursor(scanners, fields, opt), nil
}

注意观点处理点:

	switch opt.Fill {
	case influxql.NumberFill:
		if v, ok := opt.FillValue.(int); ok {
			opt.FillValue = int64(v)
		}
	case influxql.PreviousFill:
		opt.FillValue = SkipDefault
	}

跟踪协程的管道,发现已经在解析查询到的数据

Parser::parseFill

// parseFill parses the fill call and its options.
func (p *Parser) parseFill() (FillOption, interface{}, error) {
	// Parse the expression first.
	tok, _, lit := p.ScanIgnoreWhitespace()
	p.Unscan()
	if tok != IDENT || strings.ToLower(lit) != "fill" {
		return NullFill, nil, nil
	}

	expr, err := p.ParseExpr()
	if err != nil {
		return NullFill, nil, err
	}
	fill, ok := expr.(*Call)
	if !ok {
		return NullFill, nil, errors.New("fill must be a function call")
	} else if len(fill.Args) != 1 {
		return NullFill, nil, errors.New("fill requires an argument, e.g.: 0, null, none, previous, linear")
	}
	switch fill.Args[0].String() {
	case "null":
		return NullFill, nil, nil
	case "none":
		return NoFill, nil, nil
	case "previous":
		return PreviousFill, nil, nil
	case "linear":
		return LinearFill, nil, nil
	default:
		switch num := fill.Args[0].(type) {
		case *IntegerLiteral:
			return NumberFill, num.Val, nil
		case *NumberLiteral:
			return NumberFill, num.Val, nil
		default:
			return NullFill, nil, fmt.Errorf("expected number argument in fill()")
		}
	}
}

查看调用关系:

Parser::parseSelectStatement


// parseSelectStatement parses a select string and returns a Statement AST object.
// This function assumes the SELECT token has already been consumed.
func (p *Parser) parseSelectStatement(tr targetRequirement) (*SelectStatement, error) {
	stmt := &SelectStatement{}
	var err error

	// Parse fields: "FIELD+".
	if stmt.Fields, err = p.parseFields(); err != nil {
		return nil, err
	}

	// Parse target: "INTO"
	if stmt.Target, err = p.parseTarget(tr); err != nil {
		return nil, err
	}

	// Parse source: "FROM".
	if tok, pos, lit := p.ScanIgnoreWhitespace(); tok != FROM {
		return nil, newParseError(tokstr(tok, lit), []string{"FROM"}, pos)
	}
	if stmt.Sources, err = p.parseSources(true); err != nil {
		return nil, err
	}

	// Parse condition: "WHERE EXPR".
	if stmt.Condition, err = p.parseCondition(); err != nil {
		return nil, err
	}

	// Parse dimensions: "GROUP BY DIMENSION+".
	if stmt.Dimensions, err = p.parseDimensions(); err != nil {
		return nil, err
	}

	// Parse fill options: "fill(<option>)"
	if stmt.Fill, stmt.FillValue, err = p.parseFill(); err != nil {
		return nil, err
	}

	// Parse sort: "ORDER BY FIELD+".
	if stmt.SortFields, err = p.parseOrderBy(); err != nil {
		return nil, err
	}

	// Parse limit: "LIMIT <n>".
	if stmt.Limit, err = p.ParseOptionalTokenAndInt(LIMIT); err != nil {
		return nil, err
	}

	// Parse offset: "OFFSET <n>".
	if stmt.Offset, err = p.ParseOptionalTokenAndInt(OFFSET); err != nil {
		return nil, err
	}

	// Parse series limit: "SLIMIT <n>".
	if stmt.SLimit, err = p.ParseOptionalTokenAndInt(SLIMIT); err != nil {
		return nil, err
	}

	// Parse series offset: "SOFFSET <n>".
	if stmt.SOffset, err = p.ParseOptionalTokenAndInt(SOFFSET); err != nil {
		return nil, err
	}

	// Parse timezone: "TZ(<timezone>)".
	if stmt.Location, err = p.parseLocation(); err != nil {
		return nil, err
	}

	// Set if the query is a raw data query or one with an aggregate
	stmt.IsRawQuery = true
	WalkFunc(stmt.Fields, func(n Node) {
		if _, ok := n.(*Call); ok {
			stmt.IsRawQuery = false
		}
	})

	return stmt, nil
}

Parser::ScanIgnoreWhitespace

// scan returns the next token from the underlying scanner.
func (p *Parser) Scan() (tok Token, pos Pos, lit string) { return p.s.Scan() }

// ScanIgnoreWhitespace scans the next non-whitespace and non-comment token.
func (p *Parser) ScanIgnoreWhitespace() (tok Token, pos Pos, lit string) {
	for {
		tok, pos, lit = p.Scan()
		if tok == WS || tok == COMMENT {
			continue
		}
		return
	}
}

bufScanner::Scan

// Scan reads the next token from the scanner.
func (s *bufScanner) Scan() (tok Token, pos Pos, lit string) {
	return s.scanFunc(s.s.Scan)
}

Scanner::Scan


// Scan returns the next token and position from the underlying reader.
// Also returns the literal text read for strings, numbers, and duration tokens
// since these token types can have different literal representations.
func (s *Scanner) Scan() (tok Token, pos Pos, lit string) {
	// Read next code point.
	ch0, pos := s.r.read()

	// If we see whitespace then consume all contiguous whitespace.
	// If we see a letter, or certain acceptable special characters, then consume
	// as an ident or reserved word.
	if isWhitespace(ch0) {
		return s.scanWhitespace()
	} else if isLetter(ch0) || ch0 == '_' {
		s.r.unread()
		return s.scanIdent(true)
	} else if isDigit(ch0) {
		return s.scanNumber()
	}

	// Otherwise parse individual characters.
	switch ch0 {
	case eof:
		return EOF, pos, ""
	case '"':
		s.r.unread()
		return s.scanIdent(true)
	case '\'':
		return s.scanString()
	case '.':
		ch1, _ := s.r.read()
		s.r.unread()
		if isDigit(ch1) {
			return s.scanNumber()
		}
		return DOT, pos, ""
	case '$':
		tok, _, lit = s.scanIdent(false)
		if tok != IDENT {
			return tok, pos, "$" + lit
		}
		return BOUNDPARAM, pos, "$" + lit
	case '+':
		return ADD, pos, ""
	case '-':
		ch1, _ := s.r.read()
		if ch1 == '-' {
			s.skipUntilNewline()
			return COMMENT, pos, ""
		}
		s.r.unread()
		return SUB, pos, ""
	case '*':
		return MUL, pos, ""
	case '/':
		ch1, _ := s.r.read()
		if ch1 == '*' {
			if err := s.skipUntilEndComment(); err != nil {
				return ILLEGAL, pos, ""
			}
			return COMMENT, pos, ""
		} else {
			s.r.unread()
		}
		return DIV, pos, ""
	case '%':
		return MOD, pos, ""
	case '&':
		return BITWISE_AND, pos, ""
	case '|':
		return BITWISE_OR, pos, ""
	case '^':
		return BITWISE_XOR, pos, ""
	case '=':
		if ch1, _ := s.r.read(); ch1 == '~' {
			return EQREGEX, pos, ""
		}
		s.r.unread()
		return EQ, pos, ""
	case '!':
		if ch1, _ := s.r.read(); ch1 == '=' {
			return NEQ, pos, ""
		} else if ch1 == '~' {
			return NEQREGEX, pos, ""
		}
		s.r.unread()
	case '>':
		if ch1, _ := s.r.read(); ch1 == '=' {
			return GTE, pos, ""
		}
		s.r.unread()
		return GT, pos, ""
	case '<':
		if ch1, _ := s.r.read(); ch1 == '=' {
			return LTE, pos, ""
		} else if ch1 == '>' {
			return NEQ, pos, ""
		}
		s.r.unread()
		return LT, pos, ""
	case '(':
		return LPAREN, pos, ""
	case ')':
		return RPAREN, pos, ""
	case ',':
		return COMMA, pos, ""
	case ';':
		return SEMICOLON, pos, ""
	case ':':
		if ch1, _ := s.r.read(); ch1 == ':' {
			return DOUBLECOLON, pos, ""
		}
		s.r.unread()
		return COLON, pos, ""
	}

	return ILLEGAL, pos, string(ch0)
}

reader::read


// read reads the next rune from the reader.
func (r *reader) read() (ch rune, pos Pos) {
	// If we have unread characters then read them off the buffer first.
	if r.n > 0 {
		r.n--
		return r.curr()
	}

	// Read next rune from underlying reader.
	// Any error (including io.EOF) should return as EOF.
	ch, _, err := r.r.ReadRune()
	if err != nil {
		ch = eof
	} else if ch == '\r' {
		if ch, _, err := r.r.ReadRune(); err != nil {
			// nop
		} else if ch != '\n' {
			_ = r.r.UnreadRune()
		}
		ch = '\n'
	}

	// Save character and position to the buffer.
	r.i = (r.i + 1) % len(r.buf)
	buf := &r.buf[r.i]
	buf.ch, buf.pos = ch, r.pos

	// Update position.
	// Only count EOF once.
	if ch == '\n' {
		r.pos.Line++
		r.pos.Char = 0
	} else if !r.eof {
		r.pos.Char++
	}

	// Mark the reader as EOF.
	// This is used so we don't double count EOF characters.
	if ch == eof {
		r.eof = true
	}

	return r.curr()
}

reader::curr

// curr returns the last read character and position.
func (r *reader) curr() (ch rune, pos Pos) {
	i := (r.i - r.n + len(r.buf)) % len(r.buf)
	buf := &r.buf[i]
	return buf.ch, buf.pos
}

 类似资料: