【Executor--执行数据定义语句】
1.数据定义语句的执行
数据定义语句(也就是之前我提到的非可优化语句)是一类用于定义数据模式、函数等的功能性语句。不同于元组增删査改的操作,其处理方式是为每一种类型的描述语句调用相应的处理函数。
数据定义语句的执行流程最终会进入到ProcessUtility处理器,然后执行语句对应的不同处理过程。由于数据定义语句的种类很多,因此整个处理过程中的数据结构和方式种类繁冗、复杂,但流程相对简单、固定。这里我们以Create table为例说明数据定义语句的具体处理过程。
1.1数据定义语句执行流程
由于ProcessUtility需要处理所有类型的数据定义语句,因此ProcessUtility通过判断数据结构中NodeTag字段的值来区分各种不同节点,并引导执行流程进入相应的处理函数。
相同类别的语句处理过程涉及内容相近,实现思想和主要过程相似。例如,事务类处理主要是对于当前事务的状态的判断和转换;游标类处理的主要思想是首次将执行一个査询计划树(Plantree),将结果缓存在Portal指向的特殊结构中,然后按照要求获取元组数据;表、属性管理类主要涉及权限管理、修改相应系统表以及关系表的存储类别操作等。由于数据定义语句的种类多达上百种,我们下面将以一个创建表的例子来介绍数据定义语句的执行流程。
针对各种不同的査询树,査询编译器在执行处理前会做一些额外的处理对査询树进行分析、处理与转换。创建表的语句会用函数transformCreateStmt进行査询树的处理。这些处理过程可能会在当前操作之前和之后增加一些新的操作(例如在创建表的操作之前增加创建serial序列表操作、之后增加创建触发器用于外键约束操作等),也可能会执行对数据结构的处理操作(例如将CreateStmt节点tableElts字段中CONST_CHECK类型的Constraint节点转存到CreateStmt的ccmstraints链表中等)。由于这些处理过程会产生一些新的操作,因此最终会生成一个由多个操作构成的链表。因此,执行过程需要依次扫描该链表,为每一个原子操作调用相应的处理函数。
1.2 执行实例
SQL语句如下:
CREATE TABLE course (
no SERIAL,
name VARCHAR,
credit INT,
CONSTRAINT con1 CHECK(credit > = 0 AND name <> ''),
PRIMARY KEY(no)
);
系统首先会对査询语句进行词法和语法分析,将査询语句构造为査询树的链表。然后,针对链
表中的每一个査询树进行如下的处理过程(下例仅有一个T_CreateStmt类型的査询树):
1)分析和重写查询树。
2)生成査询计划。
3)创建及初始化Portal。
4)调用Portal执行过程。
5)调用Portal清理过程。
下面给出了上述査询语句执行时的主要函数调用流程。
pg_parse_query
|
v
pg_analyze_and_rewrite
|
v
PortalStart -> ChoosePortalStrategy
|
v
PortalRun -> PortalRunMulti -> PortalRunUtility -> ProcessUtility -> standard_ProcessUtility -> ProcessUtilityPre -> ProcessUtilitySlow
|
v
PortalDrop
/*
* standard_ProcessUtility itself deals only with utility commands for
* which we do not provide event trigger support. Commands that do have
* such support are passed down to ProcessUtilitySlow, which contains the
* necessary infrastructure for such triggers.
*
* This division is not just for performance: it's critical that the
* event trigger code not be invoked when doing START TRANSACTION for
* example, because we might need to refresh the event trigger cache,
* which requires being in a valid transaction.
*/
void
standard_ProcessUtility(PlannedStmt *pstmt,
const char *queryString,
ProcessUtilityContext context,
ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest,
bool sentToRemote,
char *completionTag)
{// #lizard forgives
Node *parsetree = pstmt->utilityStmt;
bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL);
ParseState *pstate;#ifdef __TBASE__
/* parallel enable check */
parallel_ddl_process(parsetree);
#endif
/*
* For more detail see comments in function pgxc_lock_for_backup.
*
* Cosider the following scenario:
* Imagine a two cordinator cluster CO1, CO2
* Suppose a client connected to CO1 issues select pgxc_lock_for_backup()
* Now assume that a client connected to CO2 issues a create table
* select pgxc_lock_for_backup() would try to acquire the advisory lock
* in exclusive mode, whereas create table would try to acquire the same
* lock in shared mode. Both these requests will always try acquire the
* lock in the same order i.e. they would both direct the request first to
* CO1 and then to CO2. One of the two requests would therefore pass
* and the other would fail.
*
* Consider another scenario:
* Suppose we have a two cooridnator cluster CO1 and CO2
* Assume one client connected to each coordinator
* Further assume one client starts a transaction
* and issues a DDL. This is an unfinished transaction.
* Now assume the second client issues
* select pgxc_lock_for_backup()
* This request would fail because the unfinished transaction
* would already hold the advisory lock.
*/
if (IS_PGXC_LOCAL_COORDINATOR && IsNormalProcessingMode())
{
/* Is the statement a prohibited one? */
if (!IsStmtAllowedInLockedMode(parsetree, queryString))
{
/* node number changes with ddl is not allowed */
if (HandlesInvalidatePending && PrimaryNodeNumberChanged())
{
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("canceling transaction due to cluster configuration reset by administrator command")));
}
pgxc_lock_for_utility_stmt(parsetree);}
}check_xact_readonly(parsetree);
if (completionTag)
completionTag[0] = '\0';if (ProcessUtilityPre(pstmt, queryString, context, queryEnv, sentToRemote,
completionTag))
return;
......}
/*
* Do the necessary processing before executing the utility command locally on
* the coordinator.
*/
static bool
ProcessUtilityPre(PlannedStmt *pstmt,
const char *queryString,
ProcessUtilityContext context,
QueryEnvironment *queryEnv,
bool sentToRemote,
char *completionTag)
在上面的例子中,査询编译器会生成一个仅包含一个T_CreateStmt类型节点的査询树链表,因此对应的Portal的stmts字段中也只包含一个T_CreateStmt类型节点。ChoosePortalStrategy函数根据stmts字段值选择策略时会选择PORTAL_MULTI_QUERY策略。在接下来的PortalRun函数中将会调用PortalRunMuti来执行PORTAL_MULTI_QUERY策略,将会把处理流程引导到ProcessUtility中。ProcessUtility将首先调用函数transformCreateStmt对T_CreateStmt节点进行转换处理,流程如下所示。该过程会做如下转换:
ProcessUtilitySlow:
case T_CreateStmt:
case T_CreateForeignTableStmt:
transformCreateStmt()
|
v
if (IS_PGXC_LOCAL_COORDINATOR)
|
v
foreach(l, stmts){
if (IsA(stmt, CreateStmt))
|
v
...
DefineRelation
...
|
v
IsA(stmt, CreateForeignTableStmt)
...
|
v
AddRemoteQueryNode
|
v
foreach(l, stmts){
if (IsA(stmt, CreateStmt))
|
v
...
DefineRelation
...
|
v
IsA(stmt, CreateForeignTableStmt)
|
v
...
DefineRelation
...
else
ProcessUtility
......
}
将主键约束改为创建唯一索引(T_IndexStmt节点)。
将自增类型转换为int4oid,并附加创建专用的SERIAL表(用于记录自增字段,将形成一个 T_CreateSeqStnU 节点)操作
增加CONSTR_DEFAULT类型约束作为默认值(被定义为调用函数nextval)。
创建SERIAL表(T_CreateSeqStmt节点)的操作会被放在stmts链表中T_CreateStmt节点之前的位置,创建唯一约束索引(T_IndexStmt节点)的操作被放置在T_CreateStmt节点
之后。最后还会将单独定义或与属性同时定义的CONSTR_CHECK类型约束全部转移到T_CreateStmt节点的constraints字段所指向的链表中。
最后,transformCreateStmt将原有的 T_CreateStmt操作转换为一个操作序列:依次为T_CreateSeqStmt (创建序列表)、T_CreateStmt (创建数据表)、T_IndexStmt (创建唯一约束索引)。
之后ProcessUtility将逐个对序列中的操作进行处理。对T_CreateStmt操作将会调用DefineRelation进行数据表的创建,而其他节点则会通过递归调用ProcessUtility进人相应的处理过程。下面展示了 T_CreateStmt操作的处理过程
创建表的过程由函数DefineRelation完成,其流程如下:
进行权限检査,确定当前用户是否有权限创建表。
对表创建语句中的WITH子句进行解析(transfbrmRelOptions)。
调用heap_reloptions对参数进行合法性验证。
使用MergeAttributes,将继承的属性合并到表属性定义中。
调用BuildDescForRelation利用合并后的厲性定义链表创建tupleDesc结构(这个结构用于描述元组各属性结构等信息)。
决定是否使用系统属性OID (interpretOidsOption)。
对属性定义链表中的每一个属性进行处理,査看是否有默认值、表达式或约束检査。
使用heap_create_with_catalog创建表的物理文件并在相应的系统表中注册。
用StoreCataloglnheritance存储表的继承关系。
处理表中新增的约束与默认值(AddRleationNewConstraints)。
CREATE SERIAL TABLE course_no_seq;--用于产生自增序列
CREATE TABLE course (
noint40id DEFAULT nextval (),
nameVARCHAR,
creditINT,
CONSTRAINT coni CHECK (credit >=0 AND name <> "),
CREATE INDEX course_pkey;--用于唯一检査
表创建函数的主要功能是由heap_create_with_catalog完成的,之前的各种操作主要是构造heap_create_with_catalog所需要的参数。例如,WITH子句处理主要完成其中存储相关参数的处理,以便存人pg_class系统表的reloptions字段中;BuildDescForRelation主要处理表定义中属性名、类型、非空约束以便构造pg_attribute系统表相关内容。
heap_create_with_catalog函数首先会根据要创建表的属性描述信息、表的名称、命名空间等使用heap_create创建一个RelationData结构并放人RelCache,并根据这些信息通过调用RelalionCreateStorage函数创建物理文件。
然后调用AddNewRelationType,向pg_type中增加一条关于该表的记录。AddNewRelationTuple则会将表的相关信息插人pg_class系统表中,而AddNewAttributeTuples将表的每个属性记录到pg_attribute系统表中。最后还需要通过调用StoreConstraints将约束和默认值分别存储到 pg_constraint 和 pg_attrdef 中。
从创建表的例子可以看到,功能处理器(ProcessUtility)本身只作为入口选择函数,它会根据输人的节点类型调用相应的处理过程。除了创建表的处理过程之外,下面列出了几种常见的输人节点类型 。
T_TransactionStmt
TRANS_STMT_BEGIN_SUBTXN
TRANS_STMT_ROLLBACK_SUBTXN
TRANS_STMT_COMMIT_SUBTXN
TRANS_STMT_COMMIT
TRANS_STMT_PREPARE
TRANS_STMT_COMMIT_PREPARED
TRANS_STMT_ROLLBACK_PREPARED
TRANS_STMT_ROLLBACK
TRANS_STMT_SAVEPOINT
TRANS_STMT_RELEASE
TRANS_STMT_ROLLBACK_TO
...
T_DeclareCursorStmt
T_ClosePortalStmt
T_FetchStmt
T_DoStmt
T_CreateTableSpaceStmt
T_DropTableSpaceStmt
T_AlterTableSpaceOptionsStmt
T_TruncateStmt
T_CopyStmt
T_PrepareStmt
T_ExecuteStmt
T_DeallocateStmt
......
T_CreateShardStmt
T_CleanShardingStmt:
T_DropShardStmt:
T_MoveDataStmt:T_ReindexStmt:
T_GrantStmt:
T_DropStmt:
T_RenameStmt:T_CommentStmt:
T_SecLabelStmt:
T_CreateSchemaStmt:
T_CreateStmt:
T_CreateForeignTableStmt:
T_AlterTableStmt:
T_AlterDomainStmt:...
/*
* The "Slow" variant of ProcessUtility should only receive statements
* supported by the event triggers facility. Therefore, we always
* perform the trigger support calls if the context allows it.
*/
static void
ProcessUtilitySlow(ParseState *pstate,
PlannedStmt *pstmt,
const char *queryString,
ProcessUtilityContext context,
ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest,
bool sentToRemote,
char *completionTag)
{// #lizard forgives......
case T_CreateStmt:
case T_CreateForeignTableStmt:
{
List *stmts;
ListCell *l;
bool is_temp = false;
bool is_local = ((CreateStmt *) parsetree)->islocal;
#ifdef __COLD_HOT__
DistributeBy *distributeby = NULL;
PGXCSubCluster *subcluster = NULL;
#endif#ifdef __TBASE__
Oid nspaceid;
bool exist_ok = true;if (is_txn_has_parallel_ddl && IsConnFromCoord())
exist_ok = false;/* Run parse analysis ... */
/*
* If sentToRemote is set it is either EXECUTE DIRECT or part
* of extencion definition script, that is a kind of extension
* specific metadata table. So it makes sense do not distribute
* the relation. If someone sure he needs the table distributed
* it should explicitly specify distribution.
*/
stmts = transformCreateStmt((CreateStmt *) parsetree,
queryString, !is_local && !sentToRemote,
&nspaceid, exist_ok);#else
stmts = transformCreateStmt((CreateStmt *) parsetree,
queryString, !is_local && !sentToRemote);
#endifif (IS_PGXC_LOCAL_COORDINATOR)
{
/*
* Scan the list of objects.
* Temporary tables are created on Datanodes only.
* Non-temporary objects are created on all nodes.
* In case temporary and non-temporary objects are mized return an error.
*/
bool is_first = true;foreach(l, stmts)
{
Node *stmt = (Node *) lfirst(l);if (IsA(stmt, CreateStmt))
{
CreateStmt *stmt_loc = (CreateStmt *) stmt;
bool is_object_temp = stmt_loc->relation->relpersistence == RELPERSISTENCE_TEMP;
/*make sure local coordinator always send create table command to remote cn and dn */
if (!is_object_temp && stmt_loc->relkind == OBJECT_TABLE && sentToRemote)
{
elog(ERROR, "Cannot support nondistribute table");
}
#ifndef ENABLE_ALL_TABLE_TYPE
if(stmt_loc->distributeby
&& stmt_loc->distributeby->disttype != DISTTYPE_SHARD
&& stmt_loc->distributeby->disttype != DISTTYPE_REPLICATION)
{
switch(stmt_loc->distributeby->disttype)
{
case DISTTYPE_HASH:
case DISTTYPE_MODULO:
elog(ERROR, "Cannot support distribute type: Hash");
break;
case DISTTYPE_ROUNDROBIN:
elog(ERROR, "Cannot support distribute type: RoundRobin");
break;
default:
elog(ERROR,"Unknown distribute type.");
break;
}
}
#endifif (is_first)
{
is_first = false;
if (is_object_temp)
is_temp = true;
}
else
{
if (is_object_temp != is_temp)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CREATE not supported for TEMP and non-TEMP objects"),
errdetail("You should separate TEMP and non-TEMP objects")));
}#ifdef __COLD_HOT__
if (!is_object_temp)
{
distributeby = stmt_loc->distributeby;
subcluster = stmt_loc->subcluster;
}
#endif
}
else if (IsA(stmt, CreateForeignTableStmt))
{
/* There are no temporary foreign tables */
if (is_first)
{
is_first = false;
}
else
{
if (!is_temp)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CREATE not supported for TEMP and non-TEMP objects"),
errdetail("You should separate TEMP and non-TEMP objects")));
}
}
}
}
....../*
* Add a RemoteQuery node for a query at top level on a remote
* Coordinator, if not already done so
*/
if (!sentToRemote)
stmts = AddRemoteQueryNode(stmts, queryString, is_local
? EXEC_ON_NONE
: (is_temp ? EXEC_ON_DATANODES : EXEC_ON_ALL_NODES));/* ... and do it */
foreach(l, stmts)
{
Node *stmt = (Node *) lfirst(l);if (IsA(stmt, CreateStmt))
{
Datum toast_options;
static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
#ifdef __TBASE__
CreateStmt *createStmt = (CreateStmt *)stmt;/* Set temporary object object flag in pooler */
if (is_temp)
{
PoolManagerSetCommand(NULL, 0, POOL_CMD_TEMP, NULL);
}
#endif/* Create the table itself */
address = DefineRelation((CreateStmt *) stmt,
RELKIND_RELATION,
InvalidOid, NULL,
queryString);
EventTriggerCollectSimpleCommand(address,
secondaryObject,
stmt);/*
* Let NewRelationCreateToastTable decide if this
* one needs a secondary relation too.
*/
CommandCounterIncrement();/*
* parse and validate reloptions for the toast
* table
*/
toast_options = transformRelOptions((Datum) 0,
((CreateStmt *) stmt)->options,
"toast",
validnsps,
true,
false);(void) heap_reloptions(RELKIND_TOASTVALUE,
toast_options,
true);NewRelationCreateToastTable(address.objectId,
toast_options);......
}
/* ----------------------------------------------------------------
* DefineRelation
* Creates a new relation.
*
* stmt carries parsetree information from an ordinary CREATE TABLE statement.
* The other arguments are used to extend the behavior for other cases:
* relkind: relkind to assign to the new relation
* ownerId: if not InvalidOid, use this as the new relation's owner.
* typaddress: if not null, it's set to the pg_type entry's address.
*
* Note that permissions checks are done against current user regardless of
* ownerId. A nonzero ownerId is used when someone is creating a relation
* "on behalf of" someone else, so we still want to see that the current user
* has permissions to do it.
*
* If successful, returns the address of the new relation.
* ----------------------------------------------------------------
*/
ObjectAddress
DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
ObjectAddress *typaddress, const char *queryString)
/* --------------------------------
* heap_create_with_catalog
*
* creates a new cataloged relation. see comments above.
*
* Arguments:
* relname: name to give to new rel
* relnamespace: OID of namespace it goes in
* reltablespace: OID of tablespace it goes in
* relid: OID to assign to new rel, or InvalidOid to select a new OID
* reltypeid: OID to assign to rel's rowtype, or InvalidOid to select one
* reloftypeid: if a typed table, OID of underlying type; else InvalidOid
* ownerid: OID of new rel's owner
* tupdesc: tuple descriptor (source of column definitions)
* cooked_constraints: list of precooked check constraints and defaults
* relkind: relkind for new rel
* relpersistence: rel's persistence status (permanent, temp, or unlogged)
* shared_relation: TRUE if it's to be a shared relation
* mapped_relation: TRUE if the relation will use the relfilenode map
* oidislocal: TRUE if oid column (if any) should be marked attislocal
* oidinhcount: attinhcount to assign to oid column (if any)
* oncommit: ON COMMIT marking (only relevant if it's a temp table)
* reloptions: reloptions in Datum form, or (Datum) 0 if none
* use_user_acl: TRUE if should look for user-defined default permissions;
* if FALSE, relacl is always set NULL
* allow_system_table_mods: TRUE to allow creation in system namespaces
* is_internal: is this a system-generated catalog?
*
* Output parameters:
* typaddress: if not null, gets the object address of the new pg_type entry
*
* Returns the OID of the new relation
* --------------------------------
*/
Oid
heap_create_with_catalog(const char *relname,
Oid relnamespace,
Oid reltablespace,
Oid relid,
Oid reltypeid,
Oid reloftypeid,
Oid ownerid,
TupleDesc tupdesc,
List *cooked_constraints,
char relkind,
char relpersistence,
bool shared_relation,
bool mapped_relation,bool oidislocal,
int oidinhcount,
OnCommitAction oncommit,
Datum reloptions,
bool use_user_acl,
bool allow_system_table_mods,
bool is_internal,
#ifdef _SHARDING_
bool hasextent,
#endif
ObjectAddress *typaddress)