语法树生成及执行涉及的UML图

HTTP Handler 接收Query请求
调用
Interpreters::executeQuery.h::executeQuery(…)
调用
Interpreters::executeQuery.h::executeQueryImpl(…)
生成
ASTPtr ast = Parsers::ParserQuery::parseImpl(…)

using ASTPtr = std::shared_ptr<IAST>
IAST是所有SQL语句解析后,生成的抽象语法树的公共接口,例如可以生成一个ASTSelectWithUnionQuery类型的语法树。

生成
auto interpreter = InterpreterFactory::get(ast, context, stage);

封装ASTPtr为IInterpreter,例如Query语句会封装成一个InterpreterSelectQuery类的对象,在生成此对象的实例时,也会对这个AST做一些简单的分析和优化,例如PREWHERE和WHERE的调整、输入数据的类型推断等。

BlockIO res = interpreter->execute();

BlockIO封装了当前QueryPipeline实例,以及相应的输入、输出流的指针,方便根据Pipeline中的每一个IProcessor实例的状态,触发流的读写以及注册流上的事件,比如Query工作流完成事件。

BlockIO InterpreterSelectQuery::execute()
{BlockIO res;QueryPlan query_plan;buildQueryPlan(query_plan);res.pipeline = std::move(*query_plan.buildQueryPipeline());return res;
}

聚合函数读取数据及执行过程

以uniq(…)方法为例,简单讲述聚合函数执行add(…)方法的过程:
Aggregator::executeOnBlock(…)
调用
Aggregator::executeWithoutKeyImpl(…)
调用
AggregateFunctionInstruction::IAggregateFunction::addBatchSinglePlace(…)
调用
IAggregateFunction::addBatchSinglePlace(…)
调用具体子类的
AggregateFunctionUniq::add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena)

这个方法可以用来聚合所有的数据,
对于聚合操作,在解析计划树时,会向Pipeline中添加一个MergingAggregatedStep,用于将所有处理后的数据聚合成一个Block。

如果所有的Blocks都在自己的数据集上聚合完成,MergingAggregatedStep中选择使用不同的Transformer来完成所有已经聚合了的Block间的聚合操作,例如如果在内存充足的情况下,可以基于内存作排序,最终返回排序且聚合的一个Block;如果不足则使用Merge-sort过程,通过溢出文件的方式完成聚合。

Select Query语句的Parse过程

从SELECT语句到AST Node的过程,使用Parser*前缀的类完成。

假设有一个如下的QUERY语句:

SELECT a FROM testdb.test_tbl WHERE dt='20210501'

TCPHandler类

以TCP传输模式接收客户端的消息,这里指处理SQL语句的Server端类,它的核心方法如下:

void TCPHandler::runImpl() {
// 配置Socket// 检查消息体// Authenticaiton查检及配置Settings connection_settings = connection_context.getSettings();sendHello();connection_context.setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });while (true){/// We are waiting for a packet from the client. Thus, every `poll_interval` seconds check whether we need to shut down.{Stopwatch idle_time;while (!server.isCancelled() && !static_cast<ReadBufferFromPocoSocket &>(*in).poll(std::min(connection_settings.poll_interval, connection_settings.idle_connection_timeout) * 1000000)){if (idle_time.elapsedSeconds() > connection_settings.idle_connection_timeout){LOG_TRACE(log, "Closing idle connection");return;}}}/// If we need to shut down, or client disconnects.if (server.isCancelled() || in->eof())break;/// Set context of request.query_context = connection_context;Stopwatch watch;state.reset();/// Initialized later.std::optional<CurrentThread::QueryScope> query_scope;/** An exception during the execution of request (it must be sent over the network to the client).*  The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.*/std::optional<DB::Exception> exception;bool network_error = false;bool send_exception_with_stack_trace = true;try{/// If a user passed query-local timeouts, reset socket to initial state at the end of the querySCOPE_EXIT({state.timeout_setter.reset();});/** If Query - process it. If Ping or Cancel - go back to the beginning.*  There may come settings for a separate query that modify `query_context`.*/if (!receivePacket())continue;query_scope.emplace(*query_context);send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace;/// Should we send internal logs to client?const auto client_logs_level = query_context->getSettingsRef().send_logs_level;/// 配置query执行时的上下文环境,例如setExternalTablesInitializer,setInputInitializer等...customizeContext(*query_context);bool may_have_embedded_data = client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA;/// 开始执行Query语句,从解析到后生成物理计划树state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);// 开始监听过程after_check_cancelled.restart();after_send_progress.restart();if (state.io.out){state.need_receive_data_for_insert = true;processInsertQuery(connection_settings);}else if (state.need_receive_data_for_input) // It implies pipeline execution{// 在这里调用executeQuery返回的结果,触发Pipeline的执行/// It is special case for input(), all works for reading data from client will be done in callbacks.auto executor = state.io.pipeline.execute();executor->execute(state.io.pipeline.getNumThreads());}else if (state.io.pipeline.initialized())processOrdinaryQueryWithProcessors();else if (state.io.in)processOrdinaryQuery();// 执行完成后的收尾工作state.io.onFinish();/// Do it before sending end of stream, to have a chance to show log message in client.query_scope->logPeakMemoryUsage();if (state.is_connection_closed)break;sendLogs();sendEndOfStream();/// QueryState should be cleared before QueryScope, since otherwise/// the MemoryTracker will be wrong for possible deallocations./// (i.e. deallocations from the Aggregator with two-level aggregation)state.reset();query_scope.reset();}catch (const Exception & e){ ... /// 处理各种异常 }catch (...){state.io.onException();exception.emplace("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);}/// 处理QUERY执行后的收尾工作,例如发送日志和清理各种执行时的环境信息... watch.stop();LOG_DEBUG(log, "Processed in {} sec.", watch.elapsedSeconds());/// It is important to destroy query context here. We do not want it to live arbitrarily longer than the query.query_context.reset();if (network_error)break;}}

上面的方法中executeQuery(...)是query语句的入口,它的实现类在executeQuery.cpp文件中。

QueryProcessingStage

Query语句执行的阶段,它定义在QueryProcessingStage.h文件中,如下:

/// Up to what stage the SELECT query is executed or needs to be executed.
namespace QueryProcessingStage
{/// Numbers matter - the later stage has a larger number.////// It is part of Protocol ABI, add values only to the end./// Also keep in mind that the code may depends on the order of fields, so be double aware when you will add new values.enum Enum{/// Only read/have been read the columns specified in the query.FetchColumns       = 0,/// Until the stage where the results of processing on different servers can be combined.WithMergeableState = 1,/// Completely.Complete           = 2,/// Until the stage where the aggregate functions were calculated and finalized.////// It is used for auto distributed_group_by_no_merge optimization for distributed engine./// (See comments in StorageDistributed).WithMergeableStateAfterAggregation = 3,MAX = 4,};
}

executeQuery.cpp

下面的方法是在TCPHandler.cpp文件中的runImp()方法中调用的,用来执行一条query语句。

BlockIO executeQuery(// query语句字符串,例如select语句、insert语句、create语句等const String & query,// 上下文管理器,是从TCPHandler方法中connection_context中得到的,里面存放了各种在Query运行时可配置的参数信息Context & context,bool internal,// query语句的执行状态QueryProcessingStage::Enum stage,// 如果是Insert语句的话,待插入的数据是跟在Insert语句之后的,如果没有话,就不是一条insert语句bool may_have_embedded_data)
{ASTPtr ast;BlockIO streams;std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context,internal, stage, !may_have_embedded_data, nullptr);if (const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get())){String format_name = ast_query_with_output->format? getIdentifierName(ast_query_with_output->format): context.getDefaultFormat();if (format_name == "Null")streams.null_format = true;}return streams;
}

executeQuery(…)方法又是调用如下方法完成工作的:

static std::tuple<ASTPtr, BlockIO> executeQueryImpl(const char * begin,const char * end,Context & context,bool internal,QueryProcessingStage::Enum stage,bool has_query_tail,ReadBuffer * istr)
{const auto current_time = std::chrono::system_clock::now();/// If we already executing query and it requires to execute internal query, than/// don't replace thread context with given (it can be temporary). Otherwise, attach context to thread.if (!internal){context.makeQueryContext();CurrentThread::attachQueryContext(context);}const Settings & settings = context.getSettingsRef();/// 实例化一个ParserQuery对象,用来解析SQL语句,并生成一棵AST树ParserQuery parser(end);ASTPtr ast;const char * query_end;/// Don't limit the size of internal queries.size_t max_query_size = 0;if (!internal)max_query_size = settings.max_query_size;try{/// TODO Parser should fail early when max_query_size limit is reached.ast = parseQuery(parser, begin, end, "", max_query_size, settings.max_parser_depth);/// Interpret SETTINGS clauses as early as possible (before invoking the corresponding interpreter),/// to allow settings to take effect.if (const auto * select_query = ast->as<ASTSelectQuery>()){if (auto new_settings = select_query->settings())InterpreterSetQuery(new_settings, context).executeForCurrentContext();}else if (const auto * select_with_union_query = ast->as<ASTSelectWithUnionQuery>()){if (!select_with_union_query->list_of_selects->children.empty()){if (auto new_settings = select_with_union_query->list_of_selects->children.back()->as<ASTSelectQuery>()->settings())InterpreterSetQuery(new_settings, context).executeForCurrentContext();}}else if (const auto * query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get())){if (query_with_output->settings_ast)InterpreterSetQuery(query_with_output->settings_ast, context).executeForCurrentContext();}auto * insert_query = ast->as<ASTInsertQuery>();if (insert_query && insert_query->settings_ast)InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext();if (insert_query && insert_query->data){query_end = insert_query->data;insert_query->has_tail = has_query_tail;}else{query_end = end;}}catch (...){/// Anyway log the query.String query = String(begin, begin + std::min(end - begin, static_cast<ptrdiff_t>(max_query_size)));auto query_for_logging = prepareQueryForLogging(query, context);logQuery(query_for_logging, context, internal);if (!internal){onExceptionBeforeStart(query_for_logging, context, time_in_microseconds(current_time), ast);}throw;}/// 下面的代码是解析AST树,生成一棵物理计划树,实际上是QueryPipeline的实例对象,它是可执行的流水线,是一种嵌套的结构。/// QueryPipeline可以认为是对Pipe实例的封装,而Pipe是对数据集上的一组transform操作,这些Pipe单元都必然有相同的数据header。...

parseQuery.cpp

此文件中定义了一系的全局的公共方法,上面讲到的executeQuery.cpp文件中调用的parseQuery(...)方法,就是在这个文件中定义,而parseQuery(...)方法实际上又是调用一开始创建的ParserQuery实例的方法完成工作的。

ParserQuery.cpp

继承关系:class ParserQuery : public IParserBase : public IParser

从前面展示的代码可以看到,执行Query语句的第一步是对SQL字符串的解析,生成AST树,而parse过程的入口就是此文件中定义的parseImpl(...)方法,代码如下:

bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{ParserQueryWithOutput query_with_output_p;ParserInsertQuery insert_p(end);ParserUseQuery use_p;ParserSetQuery set_p;ParserSystemQuery system_p;ParserCreateUserQuery create_user_p;ParserCreateRoleQuery create_role_p;ParserCreateQuotaQuery create_quota_p;ParserCreateRowPolicyQuery create_row_policy_p;ParserCreateSettingsProfileQuery create_settings_profile_p;ParserDropAccessEntityQuery drop_access_entity_p;ParserGrantQuery grant_p;ParserSetRoleQuery set_role_p;ParserExternalDDLQuery external_ddl_p;bool res = query_with_output_p.parse(pos, node, expected)|| insert_p.parse(pos, node, expected)|| use_p.parse(pos, node, expected)|| set_role_p.parse(pos, node, expected)|| set_p.parse(pos, node, expected)|| system_p.parse(pos, node, expected)|| create_user_p.parse(pos, node, expected)|| create_role_p.parse(pos, node, expected)|| create_quota_p.parse(pos, node, expected)|| create_row_policy_p.parse(pos, node, expected)|| create_settings_profile_p.parse(pos, node, expected)|| drop_access_entity_p.parse(pos, node, expected)|| grant_p.parse(pos, node, expected)|| external_ddl_p.parse(pos, node, expected);return res;
}

从上面的代码可以看到,ClickHouse目前支持所有的Query句型,包括DML、DDL、DQL,而ParserQuery对象,负责逐个尝试解析,一旦遇到能够解析完成的句型,就返回成功,这种实现方式自然会有一定的性能损耗。

这里我们是以SELECT句型分析的,因此这里会在执行完ParserQueryWithOutput的解析后,返回成功。

ParserQueryWithOutput.cpp

ParserInsertQuery类是继承自IParserBase类的,因此调用ParserQueryWithOutput::parse(...)方法,最终会调用parseImpl(...)方法完成工作代码如下:

bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{ParserShowTablesQuery show_tables_p;ParserSelectWithUnionQuery select_p;ParserTablePropertiesQuery table_p;ParserDescribeTableQuery describe_table_p;ParserShowProcesslistQuery show_processlist_p;ParserCreateQuery create_p;ParserAlterQuery alter_p;ParserRenameQuery rename_p;ParserDropQuery drop_p;ParserCheckQuery check_p;ParserOptimizeQuery optimize_p;ParserKillQueryQuery kill_query_p;ParserWatchQuery watch_p;ParserShowAccessQuery show_access_p;ParserShowAccessEntitiesQuery show_access_entities_p;ParserShowCreateAccessEntityQuery show_create_access_entity_p;ParserShowGrantsQuery show_grants_p;ParserShowPrivilegesQuery show_privileges_p;ParserExplainQuery explain_p;ASTPtr query;bool parsed =explain_p.parse(pos, query, expected)|| select_p.parse(pos, query, expected)|| show_create_access_entity_p.parse(pos, query, expected) /// should be before `show_tables_p`|| show_tables_p.parse(pos, query, expected)|| table_p.parse(pos, query, expected)|| describe_table_p.parse(pos, query, expected)|| show_processlist_p.parse(pos, query, expected)|| create_p.parse(pos, query, expected)|| alter_p.parse(pos, query, expected)|| rename_p.parse(pos, query, expected)|| drop_p.parse(pos, query, expected)|| check_p.parse(pos, query, expected)|| kill_query_p.parse(pos, query, expected)|| optimize_p.parse(pos, query, expected)|| watch_p.parse(pos, query, expected)|| show_access_p.parse(pos, query, expected)|| show_access_entities_p.parse(pos, query, expected)|| show_grants_p.parse(pos, query, expected)|| show_privileges_p.parse(pos, query, expected);if (!parsed)return false;.../// 其它的收尾工作
}

从上面的代码可以看到,一条Query句型,又被细分为不同的语句,例如show、explain、select等,因此这里继续跟踪ParserSelectWithUnionQuery类分析。

ParserSelectWithUnionQuery.cpp

一条SELECT语句,又有两中不同的表示形式,即单句型和多句型,即存在UNION ALL关键字时,SELECT语句是多句型的,否则是单句型的。
UNION ALL的存在,表示这条件SQL语句可以解析为多条可以单独执行的QUERY语句,因此这里以ParserList对象保存可能存在的、并行语句,代码如下:

bool ParserSelectWithUnionQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{ASTPtr list_node;ParserList parser(std::make_unique<ParserUnionQueryElement>(), std::make_unique<ParserKeyword>("UNION ALL"), false);if (!parser.parse(pos, list_node, expected))return false;auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();node = select_with_union_query;select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();select_with_union_query->children.push_back(select_with_union_query->list_of_selects);// flatten inner union queryfor (auto & child : list_node->children)getSelectsFromUnionListNode(child, select_with_union_query->list_of_selects->children);return true;
}

从上面可以看到,不论有没有UNION ALL关键字存在,一条SELECT语句的完整解析过程,又是交由ParserUnionQueryElement对象处理的。

ParserUnionQueryElement.cpp

这个类的解析方法,仅仅是创建一个新的ParserSelectQuery对象,然后返回它parse后的结果。

bool ParserUnionQueryElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{/// ParserSubquery 尝试解析子句型,即由(和)包含的子句/// ParserSelectQuery 尝试解析SELECT语句if (!ParserSubquery().parse(pos, node, expected) && !ParserSelectQuery().parse(pos, node, expected))return false;if (const auto * ast_subquery = node->as<ASTSubquery>())node = ast_subquery->children.at(0);return true;
}

ParserSelectQuery.cpp

终于到了一个具体的SELECT语句的解析过程,它的方法定义如下:

bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{auto select_query = std::make_shared<ASTSelectQuery>();node = select_query;ParserKeyword s_select("SELECT");ParserKeyword s_distinct("DISTINCT");ParserKeyword s_from("FROM");ParserKeyword s_prewhere("PREWHERE");ParserKeyword s_where("WHERE");ParserKeyword s_group_by("GROUP BY");ParserKeyword s_with("WITH");ParserKeyword s_totals("TOTALS");ParserKeyword s_having("HAVING");ParserKeyword s_order_by("ORDER BY");ParserKeyword s_limit("LIMIT");ParserKeyword s_settings("SETTINGS");ParserKeyword s_by("BY");ParserKeyword s_rollup("ROLLUP");ParserKeyword s_cube("CUBE");ParserKeyword s_top("TOP");ParserKeyword s_with_ties("WITH TIES");ParserKeyword s_offset("OFFSET");ParserKeyword s_fetch("FETCH");ParserKeyword s_only("ONLY");ParserKeyword s_row("ROW");ParserKeyword s_rows("ROWS");ParserKeyword s_first("FIRST");ParserKeyword s_next("NEXT");ParserNotEmptyExpressionList exp_list(false);ParserNotEmptyExpressionList exp_list_for_with_clause(false);ParserNotEmptyExpressionList exp_list_for_select_clause(true);    /// Allows aliases without AS keyword.ParserExpressionWithOptionalAlias exp_elem(false);ParserOrderByExpressionList order_list;ParserToken open_bracket(TokenType::OpeningRoundBracket);ParserToken close_bracket(TokenType::ClosingRoundBracket);ASTPtr with_expression_list;ASTPtr select_expression_list;ASTPtr tables;ASTPtr prewhere_expression;ASTPtr where_expression;ASTPtr group_expression_list;ASTPtr having_expression;ASTPtr order_expression_list;ASTPtr limit_by_length;ASTPtr limit_by_offset;ASTPtr limit_by_expression_list;ASTPtr limit_offset;ASTPtr limit_length;ASTPtr top_length;ASTPtr settings;/// WITH expr list{if (s_with.ignore(pos, expected)){if (!ParserList(std::make_unique<ParserWithElement>(), std::make_unique<ParserToken>(TokenType::Comma)).parse(pos, with_expression_list, expected))return false;if (with_expression_list->children.empty())return false;}}...

从上面的代码可以看到,ClickHouse基本上支持了常见的SELECT语法,并且也实现了一些自定义的关键字。

Select Query语句的Interpret过程

executeQuery.cpp

前面我们知道这个文件中定义的executeQueryImpl(...)方法,会产生将一条SQL字符串,解析为一棵AST树,那接下来就是生成计划树的过程,即interpret的过程,代码如下:

static std::tuple<ASTPtr, BlockIO> executeQueryImpl(const char * begin,const char * end,Context & context,bool internal,QueryProcessingStage::Enum stage,bool has_query_tail,ReadBuffer * istr)
{/// 解析SQL字符串,生成AST...try{auto interpreter = InterpreterFactory::get(ast, context, stage);std::shared_ptr<const EnabledQuota> quota;if (!interpreter->ignoreQuota()){quota = context.getQuota();if (quota){quota->used(Quota::QUERIES, 1);quota->checkExceeded(Quota::ERRORS);}}StreamLocalLimits limits;if (!interpreter->ignoreLimits()){limits.mode = LimitsMode::LIMITS_CURRENT;limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);}// 调用解释器的执行方法,生成可以执行的Pipeline实例并执行。// res是BlockIO类型的变量res = interpreter->execute();QueryPipeline & pipeline = res.pipeline;bool use_processors = pipeline.initialized();if (const auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter)){/// Save insertion table (not table function). TODO: support remote() table function.auto table_id = insert_interpreter->getDatabaseTable();if (!table_id.empty())context.setInsertionTable(std::move(table_id));}...}...
}

从上面的代码可以看到,通过InterpreterFactory的工厂方法,根据解析生成的AST树的类型,创建对应的解释器,这里会创建一个InterpreterSelectQuery的对象,然后调用InterpreterSelectQuery::execute(..)方法。

InterpreterSelectQuery.cpp

InterpreterFactory::get(…)方法会返回一个InterpreterSelectQuery类的实例,同时这个类在创建时,就会对整个AST树进行遍历,完成树的检查、及基本的优化调整工作,例如语法异常、表达式错误、PREWHERE语句优化、JOIN优化等,更为具体的过程见其cpp文件中的构造方法。
完成InterpreterSelectQuery对象的创建后,这里就显示调用execute()方法,开始执行这棵树。实际这棵树是不能直接执行的,还需要两个过程,如下面代码:

BlockIO InterpreterSelectQuery::execute()
{BlockIO res;QueryPlan query_plan;// 产生生成一棵逻辑计划树buildQueryPlan(query_plan);// 生成一棵物理计划树,即流水线res.pipeline = std::move(*query_plan.buildQueryPipeline());return res;
}

buildQueryPlan(query_plan)方法,会根据解析和调整后的AST树,生成一棵逻辑树,即QueryPlan,实际上它是一棵QueryPlanStep树,它们的介绍见最后的相关类小节
buildQueryPlan(...)方法,内部通过executeImpl(...)方法,将AST树中的结点,根据不同的操作类型,自顶向下搜集结点上的信息,并创建一个个的QueryPlanStep对象,添加进QueryPlan中,关键代码如下:

void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe)
{/** Streams of data. When the query is executed in parallel, we have several data streams.*  If there is no GROUP BY, then perform all operations before ORDER BY and LIMIT in parallel, then*  if there is an ORDER BY, then glue the streams using ResizeProcessor, and then MergeSorting transforms,*  if not, then glue it using ResizeProcessor,*  then apply LIMIT.*  If there is GROUP BY, then we will perform all operations up to GROUP BY, inclusive, in parallel;*  a parallel GROUP BY will glue streams into one,*  then perform the remaining operations with one resulting stream.*//// Now we will compose block streams that perform the necessary actions.auto & query = getSelectQuery();const Settings & settings = context->getSettingsRef();auto & expressions = analysis_result;auto & subqueries_for_sets = query_analyzer->getSubqueriesForSets();bool intermediate_stage = false;bool to_aggregation_stage = false;bool from_aggregation_stage = false;if (options.only_analyze){auto read_nothing = std::make_unique<ReadNothingStep>(source_header);query_plan.addStep(std::move(read_nothing));if (expressions.prewhere_info){auto prewhere_step = std::make_unique<FilterStep>(query_plan.getCurrentDataStream(),expressions.prewhere_info->prewhere_actions,expressions.prewhere_info->prewhere_column_name,expressions.prewhere_info->remove_prewhere_column);prewhere_step->setStepDescription("PREWHERE");query_plan.addStep(std::move(prewhere_step));// To remove additional columns in dry run// For example, sample column which can be removed in this stageif (expressions.prewhere_info->remove_columns_actions){auto remove_columns = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(),expressions.prewhere_info->remove_columns_actions);remove_columns->setStepDescription("Remove unnecessary columns after PREWHERE");query_plan.addStep(std::move(remove_columns));}}}/// 其它Step的构建过程...
}

当完成整棵QueryPlan树的转换后,仅仅是生成了一个优化后的逻辑计划树,还需要将逻辑树转换成物理计划树,才能真正地开始执行,而这一步的工作是由QueryPlan::buildQueryPipeline()方法完成的。

QueryPlan.cpp

buildQueryPipeline()方法会将逻辑树中的一个个QueryPlanStep结点,自底向上,从左向右转换成IProcessor的实例对象,也就是物理计划树中的结点,将组织成QueryPipeline的结构。QueryPlan树的遍历过程如下:

// 逻辑计划树树的一层,Step关联结构,->表示左边先于右边执行,也表示左边的孩子是右边
// 一条不包含子句的逻辑树结构如下,一棵倒立的树,其中每一个step结点就是一层,所谓的流水线就是自顶向下的父子结构:
// prepared_source_step -> prewhere_step -> remove_columns ->
// row_level_security_step -> before_array_join_step ->
// array_join_step -> before_join_step -> join_step -> where_step ->
// aggregating_step -> partial_sorting -> merge_sorting_step ->
// merging_sorted -> merging_aggregated -> having_step ->
// expression_step -> distinct_step (root结点)
// 其中prepared_source_step表示待读入的数据源,它可能是一个子句,subquery,如果有子句,则先执行子句。
// 因此上面的流水线最终是如下的样子:
// subquery --complete--> prepared_source_step -> ...
//
// Pipeline的拼接则是按照逻辑树自底向上构建的,从root结点开始,最终上面的逻辑计划树最终就转换成了如下的结构:
// Pipeline = initializePipeline -> FilterTransform -> ExpressionTransform -> ... -> DistinctTransform
// 所以一个Pipeline的执行顺序为:从左到右。
QueryPipelinePtr QueryPlan::buildQueryPipeline()
{checkInitialized();optimize();// 一个Frame表示一个结点,及到这个结点已经生成的Pipeline对象// 这里会采用自底向上,从左到右的方式遍历逻辑计划树,也就意味着对于第N层的第I个结点,只有在遍// 完第N层的前N-1个结点后,才会遍历到当前结点,同时,由于一条件Query语句可能存在子句且并行,// 例如(select a union select b),因此这里使用数组来保存当前层可能的Pipeline实例,同时也作为当前层遍历的完成的检查条件。struct Frame{Node * node;QueryPipelines pipelines = {};};QueryPipelinePtr last_pipeline;std::stack<Frame> stack;stack.push(Frame{.node = root});while (!stack.empty()){auto & frame = stack.top();if (last_pipeline){// 如果一个Step结点和其孩子结点都遍历完成时,就将这个结点对应的Pipeline对象,// 添加到当前层的Pipeline队列中,以便在一个流水线的上层算子,能够通过数组的长度,// 来判断是不是已经处理完当前层的Step算子了。frame.pipelines.emplace_back(std::move(last_pipeline));last_pipeline = nullptr;}size_t next_child = frame.pipelines.size();if (next_child == frame.node->children.size()){bool limit_max_threads = frame.pipelines.empty();// 当前结点的所有的子结点都已经遍历完成了,也意味着当前结点的Pipelines构建完成,// 就将子结点生成的Pipelines绑定到当前Step结点上,这种绑定关系表示当前结点和其// 子结点会创建的所有IProcessor算子last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines));// 从最后一个结点if (limit_max_threads && max_threads)last_pipeline->limitMaxThreads(max_threads);stack.pop();}else/// 创建一个新的子结点的Pipelines结构stack.push(Frame{.node = frame.node->children[next_child]});}for (auto & context : interpreter_context)last_pipeline->addInterpreterContext(std::move(context));return last_pipeline;
}

下面举例说明,如何从一个QueryPlanStep转换到对应的IProcessor实例,从上面的代码可以看到这个过程应该是在updatePipeline(...)方法中完成,例如这里有过滤条件dt='20210501,那么它就对应一个FilterStep的实例,它的updatePipeline方法定义如下:

QueryPipelinePtr ITransformingStep::updatePipeline(QueryPipelines pipelines)
{if (collect_processors){// 收集Pipelines中的第一个Pipeline对象QueryPipelineProcessorsCollector collector(*pipelines.front(), this);// 将当前的ITransformingStep的实例类对应的IProcessor实例,添加到第一个Pipeline对象中transformPipeline(*pipelines.front());// detachProcessors()方法会将Pipleline中已经存入的所有IProcessor实例更新// 的Step引用,更新为当前Step,表示// 同时将更新的IProcessor实例集合,赋值给当前Step实例的,方便在创建Pipeline时processors = collector.detachProcessors();}elsetransformPipeline(*pipelines.front());return std::move(pipelines.front());
}

下面举例一个过滤算子从逻辑结点到物理结点的构建过程,代码如下:

void FilterStep::transformPipeline(QueryPipeline & pipeline)
{auto expression = std::make_shared<ExpressionActions>(actions_dag);// 向输入的Pipeline对象中,添加一个过滤算子,即下面的lambda表达式返回的结果,// FilterTransformpipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type){bool on_totals = stream_type == QueryPipeline::StreamType::Totals;return std::make_shared<FilterTransform>(header, expression, filter_column_name, remove_filter_column, on_totals);});// 如果输入的Pipeline对象所包含的输入流,与当前的算子的输出结果拥有不同的schema信息,// 则必然是调用了某些表达式,将原来的字段类型,转换成了另外一个类型,因此// 再向Pipeline对象中追加一个ExpressionTransform操作,用于将之前的字段的类型转换// 为新的输出类型。if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header)){auto convert_actions_dag = ActionsDAG::makeConvertingActions(pipeline.getHeader().getColumnsWithTypeAndName(),output_stream->header.getColumnsWithTypeAndName(),ActionsDAG::MatchColumnsMode::Name);auto convert_actions = std::make_shared<ExpressionActions>(convert_actions_dag);pipeline.addSimpleTransform([&](const Block & header){return std::make_shared<ExpressionTransform>(header, convert_actions);});}
}

FilterStep更新Pipeline的过程比较简单,仅仅是创建相应的IProcessor实例对象,并添加到Pipeline中的默认分组中即可。

Query语句执行过程中的相关类

QueryPlan.h

可以看到QueryPlan是一棵树结果,它的每一个结点Node,实际上是对QueryPlanStepPtr的封装,而QueryPlanStepPtr是指向QueryPlanStep类型的对象的指针,类定义如下:

/// A tree of query steps.
/// The goal of QueryPlan is to build QueryPipeline.
/// QueryPlan let delay pipeline creation which is helpful for pipeline-level optimisations.
class QueryPlan
{
public:QueryPlan();~QueryPlan();QueryPlan(QueryPlan &&);QueryPlan & operator=(QueryPlan &&);void unitePlans(QueryPlanStepPtr step, std::vector<QueryPlanPtr> plans);void addStep(QueryPlanStepPtr step);bool isInitialized() const { return root != nullptr; } /// Tree is not emptybool isCompleted() const; /// Tree is not empty and root hasOutputStream()const DataStream & getCurrentDataStream() const; /// Checks that (isInitialized() && !isCompleted())void optimize();QueryPipelinePtr buildQueryPipeline();/// If initialized, build pipeline and convert to pipe. Otherwise, return empty pipe.Pipe convertToPipe();void explainPlan(WriteBuffer & buffer, const ExplainPlanOptions & options);void explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptions & options);/// Set upper limit for the recommend number of threads. Will be applied to the newly-created pipelines./// TODO: make it in a better way.void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }size_t getMaxThreads() const { return max_threads; }void addInterpreterContext(std::shared_ptr<Context> context);/// Tree node. Step and it's children.struct Node{QueryPlanStepPtr step;std::vector<Node *> children = {};};using Nodes = std::list<Node>;private:Nodes nodes;Node * root = nullptr;void checkInitialized() const;void checkNotCompleted() const;/// Those fields are passed to QueryPipeline.size_t max_threads = 0;std::vector<std::shared_ptr<Context>> interpreter_context;
};

从它的定义可以看到,除了一些构成树的基本信息外,QueryPlan还包含了一个optimize()方法,它是对逻辑树进行优化的过程,后面再看它的实现吧。

QueryPipeline.h

Pipe.h

/// Pipe is a set of processors which represents the part of pipeline.
/// Pipe contains a list of output ports, with specified port for totals and specified port for extremes.
/// All output ports have same header.
/// All other ports are connected, all connections are inside processors set.
class Pipe
{
private:/// Destruction order: processors, header, locks, temporary storages, local contextsHolder holder;/// Header is common for all output below.Block header;Processors processors;/// Output ports. Totals and extremes are allowed to be empty.OutputPortRawPtrs output_ports;OutputPort * totals_port = nullptr;OutputPort * extremes_port = nullptr;/// It is the max number of processors which can be executed in parallel for each step./// Usually, it's the same as the number of output ports.size_t max_parallel_streams = 0;/// If is set, all newly created processors will be added to this too./// It is needed for debug. See QueryPipelineProcessorsCollector.Processors * collected_processors = nullptr;/// This methods are for QueryPipeline. It is allowed to complete graph only there./// So, we may be sure that Pipe always has output port if not empty.bool isCompleted() const { return !empty() && output_ports.empty(); }static Pipe unitePipes(Pipes pipes, Processors * collected_processors);void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter);void setOutputFormat(ProcessorPtr output);friend class QueryPipeline;
}

ITransformingStep.h

继承结构:class ITransformingStep : public IQueryPlanStep
此类表示这个结点有一个输入流和一个输出流。它的许多实现类都在*Step.h头文件中定义,如下列举了逻辑计划树中所有可能的结点类型:

.//Processors/QueryPlan/AggregatingStep.h
.//Processors/QueryPlan/AddingConstColumnStep.h
.//Processors/QueryPlan/ExpressionStep.h
.//Processors/QueryPlan/ReadNothingStep.h
.//Processors/QueryPlan/LimitByStep.h
.//Processors/QueryPlan/SettingQuotaAndLimitsStep.h
.//Processors/QueryPlan/ArrayJoinStep.h
.//Processors/QueryPlan/ReverseRowsStep.h
.//Processors/QueryPlan/OffsetStep.h
.//Processors/QueryPlan/AddingDelayedSourceStep.h
.//Processors/QueryPlan/DistinctStep.h
.//Processors/QueryPlan/CubeStep.h
.//Processors/QueryPlan/RollupStep.h
.//Processors/QueryPlan/LimitStep.h
.//Processors/QueryPlan/CreatingSetsStep.h
.//Processors/QueryPlan/TotalsHavingStep.h
.//Processors/QueryPlan/PartialSortingStep.h
.//Processors/QueryPlan/MaterializingStep.h
.//Processors/QueryPlan/FillingStep.h
.//Processors/QueryPlan/FilterStep.h
.//Processors/QueryPlan/UnionStep.h
.//Processors/QueryPlan/MergeSortingStep.h
.//Processors/QueryPlan/AddingMissedStep.h
.//Processors/QueryPlan/IQueryPlanStep.h
.//Processors/QueryPlan/ExtremesStep.h
.//Processors/QueryPlan/MergingSortedStep.h
.//Processors/QueryPlan/ISourceStep.h
.//Processors/QueryPlan/ITransformingStep.h
.//Processors/QueryPlan/MergingAggregatedStep.h
.//Processors/QueryPlan/FinishSortingStep.h

*Transform.h

物理计划树中的可能结点类型,基本上和Step结点是一一对应的:

.//Processors/IInflatingTransform.h
.//Processors/OffsetTransform.h
.//Processors/ISimpleTransform.h
.//Processors/IAccumulatingTransform.h
.//Processors/Merges/VersionedCollapsingTransform.h
.//Processors/Merges/AggregatingSortedTransform.h
.//Processors/Merges/SummingSortedTransform.h
.//Processors/Merges/MergingSortedTransform.h
.//Processors/Merges/CollapsingSortedTransform.h
.//Processors/Merges/IMergingTransform.h
.//Processors/Merges/ReplacingSortedTransform.h
.//Processors/Merges/GraphiteRollupSortedTransform.h
.//Processors/LimitTransform.h
.//Processors/Transforms/ArrayJoinTransform.h
.//Processors/Transforms/DistinctTransform.h
.//Processors/Transforms/FillingTransform.h
.//Processors/Transforms/MergeSortingTransform.h
.//Processors/Transforms/PartialSortingTransform.h
.//Processors/Transforms/ExtremesTransform.h
.//Processors/Transforms/JoiningTransform.h
.//Processors/Transforms/CopyTransform.h
.//Processors/Transforms/MaterializingTransform.h
.//Processors/Transforms/ReverseTransform.h
.//Processors/Transforms/AddingMissedTransform.h
.//Processors/Transforms/CubeTransform.h
.//Processors/Transforms/FinishSortingTransform.h
.//Processors/Transforms/LimitByTransform.h
.//Processors/Transforms/AggregatingTransform.h
.//Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h
.//Processors/Transforms/LimitsCheckingTransform.h
.//Processors/Transforms/ExpressionTransform.h
.//Processors/Transforms/AggregatingInOrderTransform.h
.//Processors/Transforms/TotalsHavingTransform.h
.//Processors/Transforms/CreatingSetsTransform.h
.//Processors/Transforms/AddingConstColumnTransform.h
.//Processors/Transforms/RollupTransform.h
.//Processors/Transforms/MergingAggregatedTransform.h
.//Processors/Transforms/SortingTransform.h
.//Processors/Transforms/FilterTransform.h
.//Processors/Transforms/AddingSelectorTransform.h
.//DataStreams/SquashingTransform.h

逻辑树QueryPlan的优化过程

实际上QueryPlan在创建过程虽然已经有做了一部分优化,但这里独立了一个方法专门用于更多的优化,但目前能够看到的仅仅是Limit的下推,实际上还有许多可以优化的过程,但ClickHouse并没有实现,因此可以说CH对于简单的QUERY语句能够有一个比较好的优化结果,但不善于就会复杂的Query语句

void QueryPlan::optimize()
{struct Frame{Node * node;size_t next_child = 0;};std::stack<Frame> stack;stack.push(Frame{.node = root});while (!stack.empty()){auto & frame = stack.top();if (frame.next_child == 0){/// First entrance, try push down.if (frame.node->children.size() == 1)tryPushDownLimit(frame.node->step, frame.node->children.front());}if (frame.next_child < frame.node->children.size()){stack.push(Frame{frame.node->children[frame.next_child]});++frame.next_child;}else{/// Last entrance, try lift up.if (frame.node->children.size() == 1)tryLiftUpArrayJoin(frame.node, frame.node->children.front(), nodes);stack.pop();}}
}

ClickHouse Query执行流程浅析相关推荐

  1. 浅析notifyDataSetChanged执行流程+一个界面不刷新问题解决

    RecyclerView Adapter 调用 notifyDataSetChanged后执行流程 因为遇到了一个问题调用notifyDataSetChanged不刷新界面,但是滑动一下屏幕,界面就刷 ...

  2. MyBatis系列之浅谈SQL执行流程分析

    目录 独立使用Mybatis Mybatis执行流程 SqlSessionFactory\SqlSession MapperProxy Excutor 独立使用Mybatis 这篇文章主要以分析Myb ...

  3. djangorestframework源码分析1:generics中的view执行流程

    djangorestframework源码分析 本文环境python3.5.2,djangorestframework (3.5.1)系列 djangorestframework源码分析-generi ...

  4. 深入浅出Mybatis系列(十)---SQL执行流程分析(源码篇)

    原文地址:http://www.cnblogs.com/dongying/p/4142476.html 最近太忙了,一直没时间继续更新博客,今天忙里偷闲继续我的Mybatis学习之旅.在前九篇中,介绍 ...

  5. bs架构与cs架构的区别_Oracle vs Mysql--架构、sql查询执行流程及SQL解析顺序区别说明...

    概述 之前分享的主要是Oracle上的一些内容,那么mysql又有哪些地方不一样呢?下面从MySQL总体架构.sql查询执行流程和语句执行顺序来看一下.. 01 架构总览 下面看一下mysql的架构图 ...

  6. MyBatis 的执行流程,写得太好了!

    来源 | blog.csdn.net/zwx900102/article/details/108455514 MyBatis可能很多人都一直在用,但是MyBatis的SQL执行流程可能并不是所有人都清 ...

  7. mybatisplus执行sql语句_[MySQL]sql语句的执行流程

    此篇极客时间专栏<MySQL实战45讲>笔记,文中部分图文来自该专栏. MySQL的执行流程示意图: 大体来说,MySQL可以分为Server层和存储引擎层两部分. Server层包括连接 ...

  8. oracle和mysql文件怎么打开_Oracle vs Mysql--架构、sql查询执行流程及SQL解析顺序区别说明-sql文件怎么打开...

    概述 之前分享的主要是Oracle上的一些内容,那么mysql又有哪些地方不一样呢?下面从MySQL总体架构.sql查询执行流程和语句执行顺序来看一下.. 01 架构总览 下面看一下mysql的架构图 ...

  9. 讲mysql执行流程书籍_MySQL 基础架构 1. 一条SQL查询语句的执行过程(个人学习笔记)...

    MySQL的逻辑架构图: MySQL 大体分为 "server 层" 和 "存储引擎层" 两部分: Server 层 包括 连接器.查询缓存.分析器.优化器.执 ...

最新文章

  1. 比特币现金(BCH)将在2018年占据主导地位
  2. 使用Tensorflow操作MNIST数据
  3. Netty 高性能之道 - Recycler 对象池的复用
  4. Arthas 定位 Dubbo 手动注册 Eureka 异常
  5. RIP学习---网络工程
  6. android学汇资料总整理
  7. 2016 中国互联网仿冒态势分析报告
  8. Flink on Zeppelin 流计算处理最佳实践
  9. 10个性鼠标指针主题包_游戏鼠标推荐
  10. HDU6025 Coprime Sequence —— 前缀和 后缀和
  11. Linux SHELL 命令入门题目(一)
  12. 在线教学、视频会议 Webus Fox(1)文本、语音、视频聊天及电子白板基本用法...
  13. 线性代数(六)正交性
  14. 使用MarkDown,编写html格式的项目帮助手册(含左侧目录树)
  15. Protel99se中文版PCB负片输出
  16. Scrum板与Kanban如何抉择?prwnfivgd板与按照pgvhzd
  17. LOW逼三人组(二)----选择排序算法
  18. 三端稳压管反向击穿情况及分析与防护措施
  19. 理解SPI/Dual SPI/Quad SPI/QPI之间的区别
  20. 木马是如何编写的(一)

热门文章

  1. 斩获BAT-offer大神的两万字面试干货分享
  2. wx:if判断是否为空
  3. 腰椎病有哪些预防措施
  4. 微信公众号开发之微信公众平台与公众号第三方平台区别
  5. 禁止Windows更新自动安装驱动程序
  6. 把冰糖葫芦做成大产业,她不过比别人多了一点“心”
  7. vsCode设置文件注释和函数注释使用koroFileHeader插件
  8. 从无到强,阿里云的9年就是中国云计算产业的9年
  9. 荣耀平板v6能升级鸿蒙系统,华为终于“出手”,这50款机型可首批升级鸿蒙系统,花粉有福了...
  10. 数论概论读书笔记 20.模p平方剩余