【Executor-- 执行器策略 】

上层应用调用执行器的入口是 exec_simple_query函数 ,

\src\backend\tcop\Postgres.c

/*
 * exec_simple_query
 *
 * Execute a "simple Query" protocol message.
 */

static void
exec_simple_query(const char *query_string)
{// #lizard forgives

...

/*
     * XXX We may receive multi-command string and the coordinator is not
     * equipped to handle multiple command-complete messages. So just send a
     * single command-complete until we fix the coordinator side of things
     */
    if (!IS_PGXC_LOCAL_COORDINATOR && list_length(parsetree_list) > 1)
        multiCommands = true;

......

/*
         * We don't have to copy anything into the portal, because everything
         * we are passing here is in MessageContext, which will outlive the
         * portal anyway.
         */
        PortalDefineQuery(portal,
                          NULL,
                          query_string,
                          commandTag,
                          plantree_list,
                          NULL);
        /*
         * Start the portal.  No parameters here.
         */
        PortalStart(portal, NULL, 0, InvalidSnapshot);

#ifdef __TBASE__
        /* store query info, only SELECT cmd */
        if (distributed_query_analyze)
        {
            if (plantree_list)
            {
                PlannedStmt *plannedstmt = (PlannedStmt *) linitial(plantree_list);

if (plannedstmt->commandType == CMD_SELECT)
                {
                    StoreQueryAnalyzeInfo(query_string, portal->queryDesc->plannedstmt);
                }
            }
        }
#endif

/*
         * Select the appropriate output format: text unless we are doing a
         * FETCH from a binary cursor.  (Pretty grotty to have to do this here

* --- but it avoids grottiness in other places.  Ah, the joys of
         * backward compatibility...)
         */
        format = 0;                /* TEXT is default */
        if (IsA(parsetree->stmt, FetchStmt))
        {
            FetchStmt  *stmt = (FetchStmt *) parsetree->stmt;

if (!stmt->ismove)
            {
                Portal        fportal = GetPortalByName(stmt->portalname);

if (PortalIsValid(fportal) &&
                    (fportal->cursorOptions & CURSOR_OPT_BINARY))
                    format = 1; /* BINARY */
            }
        }
        PortalSetResultFormat(portal, 1, &format);

/*
         * Now we can create the destination receiver object.
         */
        receiver = CreateDestReceiver(dest);
        if (dest == DestRemote)
            SetRemoteDestReceiverParams(receiver, portal);

/*
         * Switch back to transaction context for execution.
         */
        MemoryContextSwitchTo(oldcontext);

/*
         * Run the portal to completion, and then drop it (and the receiver).
         */
        (void) PortalRun(portal,
                         FETCH_ALL,
                         isTopLevel,
                         true,
                         receiver,
                         receiver,
                         completionTag);

(*receiver->rDestroy) (receiver);

PortalDrop(portal, false);

#ifdef __TBASE__
        /* remove query info */
        if (distributed_query_analyze)
        {
            if (plantree_list)
            {
                PlannedStmt *plannedstmt = (PlannedStmt *) linitial(plantree_list);

if (plannedstmt->commandType == CMD_SELECT)
                {
                    DropQueryAnalyzeInfo(query_string);
                }
            }
        }
#endif
.....

}

/** Given a raw parsetree (gram.y output), and optionally information about* types of parameter symbols ($n), perform parse analysis and rule rewriting.** A list of Query nodes is returned, since either the analyzer or the* rewriter might expand one query to several.** NOTE: for reasons mentioned above, this must be separate from raw parsing.*/
List *
pg_analyze_and_rewrite(RawStmt *parsetree, const char *query_string,Oid *paramTypes, int numParams,QueryEnvironment *queryEnv)
{Query       *query;List       *querytree_list;TRACE_POSTGRESQL_QUERY_REWRITE_START(query_string);/** (1) Perform parse analysis.*/if (log_parser_stats)ResetUsage();query = parse_analyze(parsetree, query_string, paramTypes, numParams,queryEnv);if (log_parser_stats)ShowUsage("PARSE ANALYSIS STATISTICS");#ifdef __AUDIT__if (query->commandType == CMD_UTILITY &&IsA(query->utilityStmt, CreateTableAsStmt) &&xact_started){/** first read utility from CreateTableAsStmt*/AuditReadQueryList(query_string, list_make1(query));}
#endif/** (2) Rewrite the queries, as necessary*/querytree_list = pg_rewrite_query(query);TRACE_POSTGRESQL_QUERY_REWRITE_DONE(query_string);return querytree_list;
}
/** Generate plans for a list of already-rewritten queries.** For normal optimizable statements, invoke the planner.  For utility* statements, just make a wrapper PlannedStmt node.** The result is a list of PlannedStmt nodes.*/
List *
pg_plan_queries(List *querytrees, int cursorOptions, ParamListInfo boundParams)

主要通过调用 PortalRun来启动执行动作。

/** PortalStart*        Prepare a portal for execution.** Caller must already have created the portal, done PortalDefineQuery(),* and adjusted portal options if needed.** If parameters are needed by the query, they must be passed in "params"* (caller is responsible for giving them appropriate lifetime).** The caller can also provide an initial set of "eflags" to be passed to* ExecutorStart (but note these can be modified internally, and they are* currently only honored for PORTAL_ONE_SELECT portals).  Most callers* should simply pass zero.** The caller can optionally pass a snapshot to be used; pass InvalidSnapshot* for the normal behavior of setting a new snapshot.  This parameter is* presently ignored for non-PORTAL_ONE_SELECT portals (it's only intended* to be used for cursors).** On return, portal is ready to accept PortalRun() calls, and the result* tupdesc (if any) is known.*/
void
PortalStart(Portal portal, ParamListInfo params,int eflags, Snapshot snapshot)
/** PortalRun*        Run a portal's query or queries.** count <= 0 is interpreted as a no-op: the destination gets started up* and shut down, but nothing else happens.  Also, count == FETCH_ALL is* interpreted as "all rows".  Note that count is ignored in multi-query* situations, where we always run the portal to completion.** isTopLevel: true if query is being executed at backend "top level"* (that is, directly from a client command message)** dest: where to send output of primary (canSetTag) query** altdest: where to send output of non-primary queries** completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE*        in which to store a command completion status string.*        May be NULL if caller doesn't want a status string.** Returns TRUE if the portal's execution is complete, FALSE if it was* suspended due to exhaustion of the count parameter.*/
bool
PortalRun(Portal portal, long count, bool isTopLevel, bool run_once,DestReceiver *dest, DestReceiver *altdest,char *completionTag)

在查询执行模块中,先由Portal模块识别查询类型(有计划树和无计划树),根据查询类型分别指派Executor模块和ProcessUtility模块进行处理。

1.查询优化策略
在进入这一模块之前,我已经简要说明了Executor模块和ProcessUtility模块这两个主要的执行分支。这里要提到两个概念:

可优化语句和非可优化语句

可优化语句说白了就是DML语句,这些语句的特点就是都要查询到满足条件的元组。这类查询都在查询规划阶段生成了规划树,而规划树的生成过程中会根据查询优化理论进行重写和优化以提高查询速度,因此称作可优化语句。

1.1 六种执行策略
上面提到,一条简单的SQL语句会被查询编译器转化为一个执行计划树或者一个非计划树操作。而一条复杂的SQL语句往往同时带有DDL和DML语句,即它会被转换为一个可执行计划树和非执行计划树操作的序列。而可执行计划树和非可执行计划树是由不同的子模块去处理的。这样就有了三种不同的情况,需要三种不同的策略去应对。

然而除此之外,我们还有一种额外的情况需要考虑到:有些SQL语句虽然可以被转换为一个原子操作,但是其执行过程中由于各种原因需要能够缓存语句执行的结果,等到整个语句执行完毕在返回执行结果。

具体的说:

1 对于可优化语句,当执行修改元组操作时,希望能够返回被修改的元组(例如带RETURNING子句的DELETE),由于原子操作的处理过程不能被可能有问题的输出过程终止,因此不能边执行边输出,因此需要一个缓存结构来临时存放执行结果;

2 某些非优化语句是需要返回结果的(例如SHOW,EXPLAIN) ,因此也需要一个缓存结构暂存处理结果。

此外,对于带有INSERT/UPDATE/DELETE的WITH子句,会在CTE中修改数据,和一般的CTE不一样。我们也需要进行特事特办,特殊处理,这是第五种情况。

因此,综合上面所说的,我们需要有六种处理策略来解决,分别如下:

1)PORTAL_ONE_SELECT:处理单个的SELECT语句,调用Executor模块;

2)PORTAL_ONE_RETURNING:处理带RETURNING的UPDATE/DELETE/INSERT语句,调用Executor模块;

3)PORTAL_UTIL_SELECT:处理单个的数据定义语句,调用ProcessUtility模块;

4)PORTAL_ONE_MOD_WITH:处理带有INSERT/UPDATE/DELETE的WITH子句的SELECT,其处理逻辑类似PORTAL_ONE_RETURNING。调用Executor模块;

5)PORTAL_MULTI_QUERY:是前面几种策略的混合,可以处理多个原子操作。

6)PORTAL_DISTRIBUTED:分布式数据查询计划,需要发送或接收多个DN节点数据。

【数据分布方式】

/*----------
 * DistributionType - how to distribute the data
 *
 *----------
 */
typedef enum DistributionType
{
    DISTTYPE_REPLICATION,            /* Replicated */
    DISTTYPE_HASH,                /* Hash partitioned */
    DISTTYPE_ROUNDROBIN,            /* Round Robin */
    DISTTYPE_MODULO,                /* Modulo partitioned */
#ifdef _MIGRATE_
    DISTTYPE_SHARD
#endif
} DistributionType;

【分布式数据交换】

节点完成局部结果之后,把结果持续不断的写入叫做的SharedQueue结果,这是一个生产者(局部计算得到的结果)消费者(目标的节点,也就是需要distribute的结果)模型。
一个节点的输入可能来自多个节点的,循环读取结果即可。
如果需要的话,用Combiner完成排序。

上述SharedQueue每个dataNode上各有一个.

\src\include\pgxc\Squeue.h

typedef struct SQueueHeader *SharedQueue;

\src\include\pgxc\Squeue.c

/* Shared queue header *//* Shared queue header */
typedef struct SQueueHeader
{
    char        sq_key[SQUEUE_KEYSIZE]; /* Hash entry key should be at the
                                 * beginning of the hash entry */
    int            sq_pid;         /* Process id of the producer session */
    int            sq_nodeid;        /* Node id of the producer parent */
    SQueueSync *sq_sync;        /* Associated sinchronization objects */
    int            sq_refcnt;        /* Reference count to this entry */
#ifdef SQUEUE_STAT
    bool        stat_finish;
    long        stat_paused;
#endif
#ifdef __TBASE__
     DataPumpSender sender; /* used for locally data transfering */
    bool        with_params;
    bool        sender_destroy;
    bool        parallelWorkerSendTuple;
    int         numParallelWorkers;
    ParallelSender parallelSendControl;
    int16       nodeMap[MAX_NODES_NUMBER];
    bool        sq_error;
    char        err_msg[ERR_MSGSIZE];
    bool        has_err_msg;
    bool        producer_done;
    int         nConsumer_done;
    slock_t        lock;
#endif
    int            sq_nconsumers;    /* Number of consumers */
    ConsState     sq_consumers[0];/* variable length array */
} SQueueHeader;

1.2 策略的实现

执行策略选择器的工作是根据查询编译阶段生成的计划树链表来为当前的查询选择五种执行策略中的一种。在这个过程中,执行策略选择器会使用数据结构PortalData来存储查询计划树链表以及最后选中的执行策略等信息。

对于查询执行器来说,在执行一个SQL语句时都会以一个Portal作为输入数据,在Portal中存放了与执行该SQL相关的所有信息,例如查询树、计划树和执行状态等。

这里仅仅给出了两种可能的原子操作PlannedStmt和Query,这两者都能包含查询计划树,用于保存含有查询的操作。当然有些含有查询计划树的原子操作不一定是SELECT语句,例如游标的声明(utilityStmt字段不为空),SELECT INTO语句(intoClause字段不为空)等等。

那么我们很容易想到,postgres是不是就是根据原子操作的命令类型和原子操作的个数来确定合适的执行策略当然呢?

不完全是,

命令的类型就如下几种:

src/include/nodes/Nodes.h

/*
 * CmdType -
 *      enums for type of operation represented by a Query or PlannedStmt
 *
 * This is needed in both parsenodes.h and plannodes.h, so put it here...
 */
typedef enum CmdType
{
    CMD_UNKNOWN,
    CMD_SELECT,                    /* select stmt */
    CMD_UPDATE,                    /* update stmt */
    CMD_INSERT,                    /* insert stmt */
    CMD_DELETE,
    CMD_UTILITY,                /* cmds like create, destroy, copy, vacuum,
                                 * etc. */
    CMD_NOTHING                    /* dummy command for instead nothing rules
                                 * with qual */
} CmdType;

根据命令类型,原子操作个数以及查询树、计划树上的某些字段(比如hasModifyingCTE、utilityStmt等等)这些做判断选择哪种执行策略 。

执行这一任务的函数是ChoosePortalStrategy,在src/backend/tcop/Pquery.c文件中。
 1.3 Portal的执行过程

所有的SQL语句的执行都必须从一个Portal开始,

PortalStart  => PortalRun  =>  PortalDrop

该流程都在exec_simple_query函数内部进行。过程大致如下:

1)调用函数CreatePortal创建一个“clean”的Portal,它的内存上下文,资源跟踪器清理函数都已经设置好,但是sourceText,stmts字段还未设置;
2)调用函数PortalDefineQuery函数为刚刚创建的Portal设置sourceText,stmt等,并且设置Portal的状态为PORTAL_DEFINED;
3)调用函数PortalStart对定义好的Portal进行初始化:

a.调用函数ChoosePortalStrategy为portal选择策略;
b.如果选择的是PORTAL_ONE_SELECT,则调用CreateQueryDesc为Portal创建查询描述符;
c.如果选择的是PORTAL_ONE_RETURNING或者PORTAL_ONE_MOD_WITH,则调用ExecCleanTypeFromTL为portal创建返回元组的描述符;
d.对于PORTAL_UTIL_SELECT则调用UtilityTupleDescriptor为Portal创建查询描述符;
e.对于PORTAL_MULTI_QUERY这里则不做过多操作;
f.将Portal的状态设置为PORTAL_READY。

4)调用函数PortalRun执行portal,这就按照既定的策略调用相关执行部件执行Portal;

5)调用函数PortalDrop清理Portal,释放资源。

Tbase 源码 (四)相关推荐

  1. 阅读react-redux源码(四) - connectAdvanced、wrapWithConnect、ConnectFunction和checkForUpdates

    阅读react-redux源码 - 零 阅读react-redux源码 - 一 阅读react-redux源码(二) - createConnect.match函数的实现 阅读react-redux源 ...

  2. java springboot b2b2c shop 多用户商城系统源码(四):熔断器Hystrix

    熔断器 雪崩效应 在微服务架构中通常会有多个服务层调用,基础服务的故障可能会导致级联故障,进而造成整个系统不可用的情况,这种现象被称为服务雪崩效应.服务雪崩效应是一种因"服务提供者" ...

  3. java.util 1.8_JDK1.8源码(四)——java.util.Arrays 类

    java.util.Arrays 类是 JDK 提供的一个工具类,用来处理数组的各种方法,而且每个方法基本上都是静态方法,能直接通过类名Arrays调用. 1.asList public static ...

  4. java 网站源码 四套模版 兼容手机平板PC 在线编辑模版 freemaker 静态引擎

    前台: 支持四套模版, 可以在后台切换 点击:获取地址 QQ 313596790 官网 http://www.fhadmin.org/ 系统介绍: 1.网站后台采用主流的 SSM 框架 jsp JST ...

  5. NumPy Essentials 带注释源码 四、NumPy 核心和模块

    # 来源:NumPy Essentials ch4 步长 # 步长是每个维度相邻两个元素的偏移差值 import numpy as npx = np.arange(8, dtype = np.int8 ...

  6. NumPy Cookbook 带注释源码 四、连接 NumPy 与 剩余世界

    # 来源:NumPy Cookbook 2e Ch4 使用缓冲区协议 # 协议在 Python 中相当于接口 # 是一种约束 import numpy as np import Image # fro ...

  7. NumPy Beginner's Guide 2e 带注释源码 四、NumPy 便利的函数

    # 来源:NumPy Beginner's Guide 2e ch4 交易相关偶对 import numpy as np from matplotlib.pyplot import plot from ...

  8. 编译android源码四(常见错误)

    运行如下命令: $ emulator 1.错误信息: emulator: WARNING: system partition size adjusted to match image file (20 ...

  9. php 中文名称排序 源码,四种排序算法PHP实现类

    跳至array(22,3,41,18) , //需要排序的数组值 'sort' => 'insert', //可能值: insert, select, bubble, quick 'debug' ...

最新文章

  1. intel xdk 打ios的ipa包
  2. 阿里90后工程师利用ARM硬件特性开启安卓8终端“上帝模式” 1
  3. ISA2006标准版配置导入企业版
  4. Java数据库连接方法
  5. 神经网络调参训练集噪音比例对网络性能的影响
  6. Java文件上传功能代码 —— 普遍适用
  7. Hadoop Yarn配置参数整理(非常全面)
  8. html文字粒子效果简陋,5个很棒的CSS3文本粒子动画特效
  9. 元宇宙系列白皮书——未来已来:全球XR产业洞察
  10. SQL语句的增删改查(详细)
  11. android 微信登陆功能,Android 实现微信登录详解
  12. 2021年全球与中国孕妇防辐射服行业市场规模及发展前景分析
  13. 文件关联修复工具(用文本文档保存后后缀修改为bat)
  14. 赛灵思计算平台ACAP技术细节全揭秘
  15. 关于阅读论文的一些感想
  16. 移动硬盘(USB3.0)2TB在自己电脑上突然识别不了。。
  17. App ID申请(将项目中的ID向苹果申请)
  18. 第1章 机器学习基础
  19. 探索学习:网红容器引擎Docker
  20. pageadmin已经过时_为何激光光盘已经过时了

热门文章

  1. ​Spring IOC中 Bean 作用域
  2. php连接不同编码oracle,PHP连接Oracle出现中文乱码问题
  3. MHD-20HDMI高清20倍视频会议摄像头
  4. 美国兰德公司《未来战争2030》(The Future of Warfare in 2030)系列研究报告概述(下)
  5. Access to XMLHttpRequest at ‘http://xx‘ from origin ‘http://xx‘ has been blocked by CORS policy:
  6. 小猪o2o源码v14.17双系统版(生活通+营销系统)怎么配置微信支付和支付宝支付
  7. uniapp中tabbar设置报错文件查找失败,at mian.js:5
  8. STM8基于CAN协议bootloader实现方案
  9. 计算机中显示器的分类,显示器的分类介绍
  10. 组装电脑什么配置才适合自己