当前位置: 首页 > 工具软件 > TBase > 使用案例 >

Tbase 源码 (四)

阮选
2023-12-01

【Executor-- 执行器策略 】

上层应用调用执行器的入口是 exec_simple_query函数 ,

 \src\backend\tcop\Postgres.c

/*
 * exec_simple_query
 *
 * Execute a "simple Query" protocol message.
 */

static void
exec_simple_query(const char *query_string)
{// #lizard forgives

...

/*
     * XXX We may receive multi-command string and the coordinator is not
     * equipped to handle multiple command-complete messages. So just send a
     * single command-complete until we fix the coordinator side of things
     */
    if (!IS_PGXC_LOCAL_COORDINATOR && list_length(parsetree_list) > 1)
        multiCommands = true;

......

 /*
         * We don't have to copy anything into the portal, because everything
         * we are passing here is in MessageContext, which will outlive the
         * portal anyway.
         */
        PortalDefineQuery(portal,
                          NULL,
                          query_string,
                          commandTag,
                          plantree_list,
                          NULL);
        /*
         * Start the portal.  No parameters here.
         */
        PortalStart(portal, NULL, 0, InvalidSnapshot);

#ifdef __TBASE__
        /* store query info, only SELECT cmd */
        if (distributed_query_analyze)
        {
            if (plantree_list)
            {
                PlannedStmt *plannedstmt = (PlannedStmt *) linitial(plantree_list);

                if (plannedstmt->commandType == CMD_SELECT)
                {
                    StoreQueryAnalyzeInfo(query_string, portal->queryDesc->plannedstmt);
                }
            }
        }
#endif

        /*
         * Select the appropriate output format: text unless we are doing a
         * FETCH from a binary cursor.  (Pretty grotty to have to do this here

         * --- but it avoids grottiness in other places.  Ah, the joys of
         * backward compatibility...)
         */
        format = 0;                /* TEXT is default */
        if (IsA(parsetree->stmt, FetchStmt))
        {
            FetchStmt  *stmt = (FetchStmt *) parsetree->stmt;

            if (!stmt->ismove)
            {
                Portal        fportal = GetPortalByName(stmt->portalname);

                if (PortalIsValid(fportal) &&
                    (fportal->cursorOptions & CURSOR_OPT_BINARY))
                    format = 1; /* BINARY */
            }
        }
        PortalSetResultFormat(portal, 1, &format);

        /*
         * Now we can create the destination receiver object.
         */
        receiver = CreateDestReceiver(dest);
        if (dest == DestRemote)
            SetRemoteDestReceiverParams(receiver, portal);

        /*
         * Switch back to transaction context for execution.
         */
        MemoryContextSwitchTo(oldcontext);

        /*
         * Run the portal to completion, and then drop it (and the receiver).
         */
        (void) PortalRun(portal,
                         FETCH_ALL,
                         isTopLevel,
                         true,
                         receiver,
                         receiver,
                         completionTag);

        (*receiver->rDestroy) (receiver);

        PortalDrop(portal, false);

#ifdef __TBASE__
        /* remove query info */
        if (distributed_query_analyze)
        {
            if (plantree_list)
            {
                PlannedStmt *plannedstmt = (PlannedStmt *) linitial(plantree_list);

                if (plannedstmt->commandType == CMD_SELECT)
                {
                    DropQueryAnalyzeInfo(query_string);
                }
            }
        }
#endif
.....

}

/*
 * Given a raw parsetree (gram.y output), and optionally information about
 * types of parameter symbols ($n), perform parse analysis and rule rewriting.
 *
 * A list of Query nodes is returned, since either the analyzer or the
 * rewriter might expand one query to several.
 *
 * NOTE: for reasons mentioned above, this must be separate from raw parsing.
 */
List *
pg_analyze_and_rewrite(RawStmt *parsetree, const char *query_string,
                       Oid *paramTypes, int numParams,
                       QueryEnvironment *queryEnv)
{
    Query       *query;
    List       *querytree_list;

    TRACE_POSTGRESQL_QUERY_REWRITE_START(query_string);

    /*
     * (1) Perform parse analysis.
     */
    if (log_parser_stats)
        ResetUsage();

    query = parse_analyze(parsetree, query_string, paramTypes, numParams,
                          queryEnv);

    if (log_parser_stats)
        ShowUsage("PARSE ANALYSIS STATISTICS");

#ifdef __AUDIT__
    if (query->commandType == CMD_UTILITY &&
        IsA(query->utilityStmt, CreateTableAsStmt) &&
        xact_started)
    {
        /*
         * first read utility from CreateTableAsStmt
         */
        AuditReadQueryList(query_string, list_make1(query));
    }
#endif

    /*
     * (2) Rewrite the queries, as necessary
     */
    querytree_list = pg_rewrite_query(query);

    TRACE_POSTGRESQL_QUERY_REWRITE_DONE(query_string);

    return querytree_list;
}

/*
 * Generate plans for a list of already-rewritten queries.
 *
 * For normal optimizable statements, invoke the planner.  For utility
 * statements, just make a wrapper PlannedStmt node.
 *
 * The result is a list of PlannedStmt nodes.
 */
List *
pg_plan_queries(List *querytrees, int cursorOptions, ParamListInfo boundParams)

主要通过调用 PortalRun来启动执行动作。

/*
 * PortalStart
 *        Prepare a portal for execution.
 *
 * Caller must already have created the portal, done PortalDefineQuery(),
 * and adjusted portal options if needed.
 *
 * If parameters are needed by the query, they must be passed in "params"
 * (caller is responsible for giving them appropriate lifetime).
 *
 * The caller can also provide an initial set of "eflags" to be passed to
 * ExecutorStart (but note these can be modified internally, and they are
 * currently only honored for PORTAL_ONE_SELECT portals).  Most callers
 * should simply pass zero.
 *
 * The caller can optionally pass a snapshot to be used; pass InvalidSnapshot
 * for the normal behavior of setting a new snapshot.  This parameter is
 * presently ignored for non-PORTAL_ONE_SELECT portals (it's only intended
 * to be used for cursors).
 *
 * On return, portal is ready to accept PortalRun() calls, and the result
 * tupdesc (if any) is known.
 */
void
PortalStart(Portal portal, ParamListInfo params,
            int eflags, Snapshot snapshot)
 
/*
 * PortalRun
 *        Run a portal's query or queries.
 *
 * count <= 0 is interpreted as a no-op: the destination gets started up
 * and shut down, but nothing else happens.  Also, count == FETCH_ALL is
 * interpreted as "all rows".  Note that count is ignored in multi-query
 * situations, where we always run the portal to completion.
 *
 * isTopLevel: true if query is being executed at backend "top level"
 * (that is, directly from a client command message)
 *
 * dest: where to send output of primary (canSetTag) query
 *
 * altdest: where to send output of non-primary queries
 *
 * completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
 *        in which to store a command completion status string.
 *        May be NULL if caller doesn't want a status string.
 *
 * Returns TRUE if the portal's execution is complete, FALSE if it was
 * suspended due to exhaustion of the count parameter.
 */
bool
PortalRun(Portal portal, long count, bool isTopLevel, bool run_once,
          DestReceiver *dest, DestReceiver *altdest,
          char *completionTag)

 在查询执行模块中,先由Portal模块识别查询类型(有计划树和无计划树),根据查询类型分别指派Executor模块和ProcessUtility模块进行处理。

1.查询优化策略
在进入这一模块之前,我已经简要说明了Executor模块和ProcessUtility模块这两个主要的执行分支。这里要提到两个概念:

可优化语句和非可优化语句

可优化语句说白了就是DML语句,这些语句的特点就是都要查询到满足条件的元组。这类查询都在查询规划阶段生成了规划树,而规划树的生成过程中会根据查询优化理论进行重写和优化以提高查询速度,因此称作可优化语句。

1.1 六种执行策略
上面提到,一条简单的SQL语句会被查询编译器转化为一个执行计划树或者一个非计划树操作。而一条复杂的SQL语句往往同时带有DDL和DML语句,即它会被转换为一个可执行计划树和非执行计划树操作的序列。而可执行计划树和非可执行计划树是由不同的子模块去处理的。这样就有了三种不同的情况,需要三种不同的策略去应对。

然而除此之外,我们还有一种额外的情况需要考虑到:有些SQL语句虽然可以被转换为一个原子操作,但是其执行过程中由于各种原因需要能够缓存语句执行的结果,等到整个语句执行完毕在返回执行结果。

具体的说:

1 对于可优化语句,当执行修改元组操作时,希望能够返回被修改的元组(例如带RETURNING子句的DELETE),由于原子操作的处理过程不能被可能有问题的输出过程终止,因此不能边执行边输出,因此需要一个缓存结构来临时存放执行结果;

2 某些非优化语句是需要返回结果的(例如SHOW,EXPLAIN) ,因此也需要一个缓存结构暂存处理结果。

此外,对于带有INSERT/UPDATE/DELETE的WITH子句,会在CTE中修改数据,和一般的CTE不一样。我们也需要进行特事特办,特殊处理,这是第五种情况。

因此,综合上面所说的,我们需要有六种处理策略来解决,分别如下:

1)PORTAL_ONE_SELECT:处理单个的SELECT语句,调用Executor模块;

2)PORTAL_ONE_RETURNING:处理带RETURNING的UPDATE/DELETE/INSERT语句,调用Executor模块;

3)PORTAL_UTIL_SELECT:处理单个的数据定义语句,调用ProcessUtility模块;

4)PORTAL_ONE_MOD_WITH:处理带有INSERT/UPDATE/DELETE的WITH子句的SELECT,其处理逻辑类似PORTAL_ONE_RETURNING。调用Executor模块;

5)PORTAL_MULTI_QUERY:是前面几种策略的混合,可以处理多个原子操作。 

6)PORTAL_DISTRIBUTED:分布式数据查询计划,需要发送或接收多个DN节点数据。

 【数据分布方式】

/*----------
 * DistributionType - how to distribute the data
 *
 *----------
 */
typedef enum DistributionType
{
    DISTTYPE_REPLICATION,            /* Replicated */
    DISTTYPE_HASH,                /* Hash partitioned */
    DISTTYPE_ROUNDROBIN,            /* Round Robin */
    DISTTYPE_MODULO,                /* Modulo partitioned */
#ifdef _MIGRATE_
    DISTTYPE_SHARD
#endif
} DistributionType;

【分布式数据交换】

节点完成局部结果之后,把结果持续不断的写入叫做的SharedQueue结果,这是一个生产者(局部计算得到的结果)消费者(目标的节点,也就是需要distribute的结果)模型。
一个节点的输入可能来自多个节点的,循环读取结果即可。
如果需要的话,用Combiner完成排序。

上述SharedQueue每个dataNode上各有一个.

\src\include\pgxc\Squeue.h

typedef struct SQueueHeader *SharedQueue;

\src\include\pgxc\Squeue.c 

/* Shared queue header *//* Shared queue header */
typedef struct SQueueHeader
{
    char        sq_key[SQUEUE_KEYSIZE]; /* Hash entry key should be at the
                                 * beginning of the hash entry */
    int            sq_pid;         /* Process id of the producer session */
    int            sq_nodeid;        /* Node id of the producer parent */
    SQueueSync *sq_sync;        /* Associated sinchronization objects */
    int            sq_refcnt;        /* Reference count to this entry */
#ifdef SQUEUE_STAT
    bool        stat_finish;
    long        stat_paused;
#endif
#ifdef __TBASE__
     DataPumpSender sender; /* used for locally data transfering */
    bool        with_params;
    bool        sender_destroy;
    bool        parallelWorkerSendTuple;
    int         numParallelWorkers;
    ParallelSender parallelSendControl;
    int16       nodeMap[MAX_NODES_NUMBER];
    bool        sq_error;
    char        err_msg[ERR_MSGSIZE];
    bool        has_err_msg;
    bool        producer_done;
    int         nConsumer_done;
    slock_t        lock;
#endif
    int            sq_nconsumers;    /* Number of consumers */
    ConsState     sq_consumers[0];/* variable length array */
} SQueueHeader;

 1.2 策略的实现

执行策略选择器的工作是根据查询编译阶段生成的计划树链表来为当前的查询选择五种执行策略中的一种。在这个过程中,执行策略选择器会使用数据结构PortalData来存储查询计划树链表以及最后选中的执行策略等信息。

对于查询执行器来说,在执行一个SQL语句时都会以一个Portal作为输入数据,在Portal中存放了与执行该SQL相关的所有信息,例如查询树、计划树和执行状态等。

这里仅仅给出了两种可能的原子操作PlannedStmt和Query,这两者都能包含查询计划树,用于保存含有查询的操作。当然有些含有查询计划树的原子操作不一定是SELECT语句,例如游标的声明(utilityStmt字段不为空),SELECT INTO语句(intoClause字段不为空)等等。

那么我们很容易想到,postgres是不是就是根据原子操作的命令类型和原子操作的个数来确定合适的执行策略当然呢?

不完全是,

命令的类型就如下几种: 

src/include/nodes/Nodes.h

/*
 * CmdType -
 *      enums for type of operation represented by a Query or PlannedStmt
 *
 * This is needed in both parsenodes.h and plannodes.h, so put it here...
 */
typedef enum CmdType
{
    CMD_UNKNOWN,
    CMD_SELECT,                    /* select stmt */
    CMD_UPDATE,                    /* update stmt */
    CMD_INSERT,                    /* insert stmt */
    CMD_DELETE,
    CMD_UTILITY,                /* cmds like create, destroy, copy, vacuum,
                                 * etc. */
    CMD_NOTHING                    /* dummy command for instead nothing rules
                                 * with qual */
} CmdType;

    根据命令类型,原子操作个数以及查询树、计划树上的某些字段(比如hasModifyingCTE、utilityStmt等等)这些做判断选择哪种执行策略 。

执行这一任务的函数是ChoosePortalStrategy,在src/backend/tcop/Pquery.c文件中。
 1.3 Portal的执行过程

所有的SQL语句的执行都必须从一个Portal开始,

PortalStart  => PortalRun  =>  PortalDrop 

该流程都在exec_simple_query函数内部进行。过程大致如下:

1)调用函数CreatePortal创建一个“clean”的Portal,它的内存上下文,资源跟踪器清理函数都已经设置好,但是sourceText,stmts字段还未设置;
2)调用函数PortalDefineQuery函数为刚刚创建的Portal设置sourceText,stmt等,并且设置Portal的状态为PORTAL_DEFINED;
3)调用函数PortalStart对定义好的Portal进行初始化:

 a.调用函数ChoosePortalStrategy为portal选择策略;
b.如果选择的是PORTAL_ONE_SELECT,则调用CreateQueryDesc为Portal创建查询描述符;
c.如果选择的是PORTAL_ONE_RETURNING或者PORTAL_ONE_MOD_WITH,则调用ExecCleanTypeFromTL为portal创建返回元组的描述符;
d.对于PORTAL_UTIL_SELECT则调用UtilityTupleDescriptor为Portal创建查询描述符;
e.对于PORTAL_MULTI_QUERY这里则不做过多操作;
f.将Portal的状态设置为PORTAL_READY。

4)调用函数PortalRun执行portal,这就按照既定的策略调用相关执行部件执行Portal;

5)调用函数PortalDrop清理Portal,释放资源。

 类似资料: