目录
StatementExecutor::executeExplainStatement
influxdb存储引擎tsm读数跟踪, 本文暂时不涉及集群分片的逻辑, 仅关注从请求到引擎查询中间的流程.
func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) {
opt := query.SelectOptions{
NodeID: ctx.ExecutionOptions.NodeID,
MaxSeriesN: e.MaxSelectSeriesN,
MaxBucketsN: e.MaxSelectBucketsN,
Authorizer: ctx.Authorizer,
}
// Prepare the query for execution, but do not actually execute it.
// This should perform any needed substitutions.
p, err := query.Prepare(q.Statement, e.ShardMapper, opt)
if err != nil {
return nil, err
}
defer p.Close()
plan, err := p.Explain()
if err != nil {
return nil, err
}
plan = strings.TrimSpace(plan)
row := &models.Row{
Columns: []string{"QUERY PLAN"},
}
for _, s := range strings.Split(plan, "\n") {
row.Values = append(row.Values, []interface{}{s})
}
return models.Rows{row}, nil
}
func (p *preparedStatement) Explain() (string, error) {
// Determine the cost of all iterators created as part of this plan.
ic := &explainIteratorCreator{ic: p.ic}
p.ic = ic
cur, err := p.Select(context.Background())
p.ic = ic.ic
if err != nil {
return "", err
}
cur.Close()
var buf bytes.Buffer
for i, node := range ic.nodes {
if i > 0 {
buf.WriteString("\n")
}
expr := "<nil>"
if node.Expr != nil {
expr = node.Expr.String()
}
fmt.Fprintf(&buf, "EXPRESSION: %s\n", expr)
if len(node.Aux) != 0 {
refs := make([]string, len(node.Aux))
for i, ref := range node.Aux {
refs[i] = ref.String()
}
fmt.Fprintf(&buf, "AUXILIARY FIELDS: %s\n", strings.Join(refs, ", "))
}
fmt.Fprintf(&buf, "NUMBER OF SHARDS: %d\n", node.Cost.NumShards)
fmt.Fprintf(&buf, "NUMBER OF SERIES: %d\n", node.Cost.NumSeries)
fmt.Fprintf(&buf, "CACHED VALUES: %d\n", node.Cost.CachedValues)
fmt.Fprintf(&buf, "NUMBER OF FILES: %d\n", node.Cost.NumFiles)
fmt.Fprintf(&buf, "NUMBER OF BLOCKS: %d\n", node.Cost.BlocksRead)
fmt.Fprintf(&buf, "SIZE OF BLOCKS: %d\n", node.Cost.BlockSize)
}
return buf.String(), nil
}
func (p *preparedStatement) Select(ctx context.Context) (Cursor, error) {
// TODO(jsternberg): Remove this hacky method of propagating now.
// Each level of the query should use a time range discovered during
// compilation, but that requires too large of a refactor at the moment.
ctx = context.WithValue(ctx, "now", p.now)
opt := p.opt
opt.InterruptCh = ctx.Done()
cur, err := buildCursor(ctx, p.stmt, p.ic, opt)
if err != nil {
return nil, err
}
// If a monitor exists and we are told there is a maximum number of points,
// register the monitor function.
if m := MonitorFromContext(ctx); m != nil {
if p.maxPointN > 0 {
monitor := PointLimitMonitor(cur, DefaultStatsInterval, p.maxPointN)
m.Monitor(monitor)
}
}
return cur, nil
}
func buildCursor(ctx context.Context, stmt *influxql.SelectStatement, ic IteratorCreator, opt IteratorOptions) (Cursor, error) {
span := tracing.SpanFromContext(ctx)
if span != nil {
span = span.StartSpan("build_cursor")
defer span.Finish()
span.SetLabels("statement", stmt.String())
ctx = tracing.NewContextWithSpan(ctx, span)
}
switch opt.Fill {
case influxql.NumberFill:
if v, ok := opt.FillValue.(int); ok {
opt.FillValue = int64(v)
}
case influxql.PreviousFill:
opt.FillValue = SkipDefault
}
fields := make([]*influxql.Field, 0, len(stmt.Fields)+1)
if !stmt.OmitTime {
// Add a field with the variable "time" if we have not omitted time.
fields = append(fields, &influxql.Field{
Expr: &influxql.VarRef{
Val: "time",
Type: influxql.Time,
},
})
}
// Iterate through each of the fields to add them to the value mapper.
valueMapper := newValueMapper()
for _, f := range stmt.Fields {
fields = append(fields, valueMapper.Map(f))
// If the field is a top() or bottom() call, we need to also add
// the extra variables if we are not writing into a target.
if stmt.Target != nil {
continue
}
switch expr := f.Expr.(type) {
case *influxql.Call:
if expr.Name == "top" || expr.Name == "bottom" {
for i := 1; i < len(expr.Args)-1; i++ {
nf := influxql.Field{Expr: expr.Args[i]}
fields = append(fields, valueMapper.Map(&nf))
}
}
}
}
// Set the aliases on each of the columns to what the final name should be.
columns := stmt.ColumnNames()
for i, f := range fields {
f.Alias = columns[i]
}
// Retrieve the refs to retrieve the auxiliary fields.
var auxKeys []influxql.VarRef
if len(valueMapper.refs) > 0 {
opt.Aux = make([]influxql.VarRef, 0, len(valueMapper.refs))
for ref := range valueMapper.refs {
opt.Aux = append(opt.Aux, *ref)
}
sort.Sort(influxql.VarRefs(opt.Aux))
auxKeys = make([]influxql.VarRef, len(opt.Aux))
for i, ref := range opt.Aux {
auxKeys[i] = valueMapper.symbols[ref.String()]
}
}
// If there are no calls, then produce an auxiliary cursor.
if len(valueMapper.calls) == 0 {
// If all of the auxiliary keys are of an unknown type,
// do not construct the iterator and return a null cursor.
if !hasValidType(auxKeys) {
return newNullCursor(fields), nil
}
itr, err := buildAuxIterator(ctx, ic, stmt.Sources, opt)
if err != nil {
return nil, err
}
// Create a slice with an empty first element.
keys := []influxql.VarRef{{}}
keys = append(keys, auxKeys...)
scanner := NewIteratorScanner(itr, keys, opt.FillValue)
return newScannerCursor(scanner, fields, opt), nil
}
// Check to see if this is a selector statement.
// It is a selector if it is the only selector call and the call itself
// is a selector.
selector := len(valueMapper.calls) == 1
if selector {
for call := range valueMapper.calls {
if !influxql.IsSelector(call) {
selector = false
}
}
}
// Produce an iterator for every single call and create an iterator scanner
// associated with it.
scanners := make([]IteratorScanner, 0, len(valueMapper.calls))
for call := range valueMapper.calls {
driver := valueMapper.table[call]
if driver.Type == influxql.Unknown {
// The primary driver of this call is of unknown type, so skip this.
continue
}
itr, err := buildFieldIterator(ctx, call, ic, stmt.Sources, opt, selector, stmt.Target != nil)
if err != nil {
for _, s := range scanners {
s.Close()
}
return nil, err
}
keys := make([]influxql.VarRef, 0, len(auxKeys)+1)
keys = append(keys, driver)
keys = append(keys, auxKeys...)
scanner := NewIteratorScanner(itr, keys, opt.FillValue)
scanners = append(scanners, scanner)
}
if len(scanners) == 0 {
return newNullCursor(fields), nil
} else if len(scanners) == 1 {
return newScannerCursor(scanners[0], fields, opt), nil
}
return newMultiScannerCursor(scanners, fields, opt), nil
}
注意观点处理点:
switch opt.Fill {
case influxql.NumberFill:
if v, ok := opt.FillValue.(int); ok {
opt.FillValue = int64(v)
}
case influxql.PreviousFill:
opt.FillValue = SkipDefault
}
跟踪协程的管道,发现已经在解析查询到的数据
// parseFill parses the fill call and its options.
func (p *Parser) parseFill() (FillOption, interface{}, error) {
// Parse the expression first.
tok, _, lit := p.ScanIgnoreWhitespace()
p.Unscan()
if tok != IDENT || strings.ToLower(lit) != "fill" {
return NullFill, nil, nil
}
expr, err := p.ParseExpr()
if err != nil {
return NullFill, nil, err
}
fill, ok := expr.(*Call)
if !ok {
return NullFill, nil, errors.New("fill must be a function call")
} else if len(fill.Args) != 1 {
return NullFill, nil, errors.New("fill requires an argument, e.g.: 0, null, none, previous, linear")
}
switch fill.Args[0].String() {
case "null":
return NullFill, nil, nil
case "none":
return NoFill, nil, nil
case "previous":
return PreviousFill, nil, nil
case "linear":
return LinearFill, nil, nil
default:
switch num := fill.Args[0].(type) {
case *IntegerLiteral:
return NumberFill, num.Val, nil
case *NumberLiteral:
return NumberFill, num.Val, nil
default:
return NullFill, nil, fmt.Errorf("expected number argument in fill()")
}
}
}
查看调用关系:
// parseSelectStatement parses a select string and returns a Statement AST object.
// This function assumes the SELECT token has already been consumed.
func (p *Parser) parseSelectStatement(tr targetRequirement) (*SelectStatement, error) {
stmt := &SelectStatement{}
var err error
// Parse fields: "FIELD+".
if stmt.Fields, err = p.parseFields(); err != nil {
return nil, err
}
// Parse target: "INTO"
if stmt.Target, err = p.parseTarget(tr); err != nil {
return nil, err
}
// Parse source: "FROM".
if tok, pos, lit := p.ScanIgnoreWhitespace(); tok != FROM {
return nil, newParseError(tokstr(tok, lit), []string{"FROM"}, pos)
}
if stmt.Sources, err = p.parseSources(true); err != nil {
return nil, err
}
// Parse condition: "WHERE EXPR".
if stmt.Condition, err = p.parseCondition(); err != nil {
return nil, err
}
// Parse dimensions: "GROUP BY DIMENSION+".
if stmt.Dimensions, err = p.parseDimensions(); err != nil {
return nil, err
}
// Parse fill options: "fill(<option>)"
if stmt.Fill, stmt.FillValue, err = p.parseFill(); err != nil {
return nil, err
}
// Parse sort: "ORDER BY FIELD+".
if stmt.SortFields, err = p.parseOrderBy(); err != nil {
return nil, err
}
// Parse limit: "LIMIT <n>".
if stmt.Limit, err = p.ParseOptionalTokenAndInt(LIMIT); err != nil {
return nil, err
}
// Parse offset: "OFFSET <n>".
if stmt.Offset, err = p.ParseOptionalTokenAndInt(OFFSET); err != nil {
return nil, err
}
// Parse series limit: "SLIMIT <n>".
if stmt.SLimit, err = p.ParseOptionalTokenAndInt(SLIMIT); err != nil {
return nil, err
}
// Parse series offset: "SOFFSET <n>".
if stmt.SOffset, err = p.ParseOptionalTokenAndInt(SOFFSET); err != nil {
return nil, err
}
// Parse timezone: "TZ(<timezone>)".
if stmt.Location, err = p.parseLocation(); err != nil {
return nil, err
}
// Set if the query is a raw data query or one with an aggregate
stmt.IsRawQuery = true
WalkFunc(stmt.Fields, func(n Node) {
if _, ok := n.(*Call); ok {
stmt.IsRawQuery = false
}
})
return stmt, nil
}
// scan returns the next token from the underlying scanner.
func (p *Parser) Scan() (tok Token, pos Pos, lit string) { return p.s.Scan() }
// ScanIgnoreWhitespace scans the next non-whitespace and non-comment token.
func (p *Parser) ScanIgnoreWhitespace() (tok Token, pos Pos, lit string) {
for {
tok, pos, lit = p.Scan()
if tok == WS || tok == COMMENT {
continue
}
return
}
}
// Scan reads the next token from the scanner.
func (s *bufScanner) Scan() (tok Token, pos Pos, lit string) {
return s.scanFunc(s.s.Scan)
}
// Scan returns the next token and position from the underlying reader.
// Also returns the literal text read for strings, numbers, and duration tokens
// since these token types can have different literal representations.
func (s *Scanner) Scan() (tok Token, pos Pos, lit string) {
// Read next code point.
ch0, pos := s.r.read()
// If we see whitespace then consume all contiguous whitespace.
// If we see a letter, or certain acceptable special characters, then consume
// as an ident or reserved word.
if isWhitespace(ch0) {
return s.scanWhitespace()
} else if isLetter(ch0) || ch0 == '_' {
s.r.unread()
return s.scanIdent(true)
} else if isDigit(ch0) {
return s.scanNumber()
}
// Otherwise parse individual characters.
switch ch0 {
case eof:
return EOF, pos, ""
case '"':
s.r.unread()
return s.scanIdent(true)
case '\'':
return s.scanString()
case '.':
ch1, _ := s.r.read()
s.r.unread()
if isDigit(ch1) {
return s.scanNumber()
}
return DOT, pos, ""
case '$':
tok, _, lit = s.scanIdent(false)
if tok != IDENT {
return tok, pos, "$" + lit
}
return BOUNDPARAM, pos, "$" + lit
case '+':
return ADD, pos, ""
case '-':
ch1, _ := s.r.read()
if ch1 == '-' {
s.skipUntilNewline()
return COMMENT, pos, ""
}
s.r.unread()
return SUB, pos, ""
case '*':
return MUL, pos, ""
case '/':
ch1, _ := s.r.read()
if ch1 == '*' {
if err := s.skipUntilEndComment(); err != nil {
return ILLEGAL, pos, ""
}
return COMMENT, pos, ""
} else {
s.r.unread()
}
return DIV, pos, ""
case '%':
return MOD, pos, ""
case '&':
return BITWISE_AND, pos, ""
case '|':
return BITWISE_OR, pos, ""
case '^':
return BITWISE_XOR, pos, ""
case '=':
if ch1, _ := s.r.read(); ch1 == '~' {
return EQREGEX, pos, ""
}
s.r.unread()
return EQ, pos, ""
case '!':
if ch1, _ := s.r.read(); ch1 == '=' {
return NEQ, pos, ""
} else if ch1 == '~' {
return NEQREGEX, pos, ""
}
s.r.unread()
case '>':
if ch1, _ := s.r.read(); ch1 == '=' {
return GTE, pos, ""
}
s.r.unread()
return GT, pos, ""
case '<':
if ch1, _ := s.r.read(); ch1 == '=' {
return LTE, pos, ""
} else if ch1 == '>' {
return NEQ, pos, ""
}
s.r.unread()
return LT, pos, ""
case '(':
return LPAREN, pos, ""
case ')':
return RPAREN, pos, ""
case ',':
return COMMA, pos, ""
case ';':
return SEMICOLON, pos, ""
case ':':
if ch1, _ := s.r.read(); ch1 == ':' {
return DOUBLECOLON, pos, ""
}
s.r.unread()
return COLON, pos, ""
}
return ILLEGAL, pos, string(ch0)
}
// read reads the next rune from the reader.
func (r *reader) read() (ch rune, pos Pos) {
// If we have unread characters then read them off the buffer first.
if r.n > 0 {
r.n--
return r.curr()
}
// Read next rune from underlying reader.
// Any error (including io.EOF) should return as EOF.
ch, _, err := r.r.ReadRune()
if err != nil {
ch = eof
} else if ch == '\r' {
if ch, _, err := r.r.ReadRune(); err != nil {
// nop
} else if ch != '\n' {
_ = r.r.UnreadRune()
}
ch = '\n'
}
// Save character and position to the buffer.
r.i = (r.i + 1) % len(r.buf)
buf := &r.buf[r.i]
buf.ch, buf.pos = ch, r.pos
// Update position.
// Only count EOF once.
if ch == '\n' {
r.pos.Line++
r.pos.Char = 0
} else if !r.eof {
r.pos.Char++
}
// Mark the reader as EOF.
// This is used so we don't double count EOF characters.
if ch == eof {
r.eof = true
}
return r.curr()
}
// curr returns the last read character and position.
func (r *reader) curr() (ch rune, pos Pos) {
i := (r.i - r.n + len(r.buf)) % len(r.buf)
buf := &r.buf[i]
return buf.ch, buf.pos
}