ClickHouse-06-Interpreters模块
模块概览
职责
Interpreters 模块是 ClickHouse 的查询解释层,负责:
- 接收 Parsers 生成的 AST
- 进行语义分析和类型检查
- 构建查询执行计划(QueryPlan)
- 管理查询上下文(Context)
- 协调各种资源(存储、函数、权限等)
- 生成查询执行流水线(QueryPipeline)
输入/输出
输入
- ASTPtr(抽象语法树)
- ContextPtr(查询上下文)
- SelectQueryOptions(查询选项)
输出
- BlockIO(输入/输出流)
- QueryPlan(查询计划)
- QueryPipeline(执行流水线)
上下游依赖
上游:
- Server(接收查询请求)
- Parsers(提供 AST)
下游:
- Storages(读取/写入数据)
- Functions(函数调用)
- Processors(执行流水线)
- Access(权限检查)
生命周期
接收AST → 创建Interpreter → 语义分析 → 构建QueryPlan → 生成Pipeline → 执行 → 返回结果
模块架构图
flowchart TB
subgraph Interpreters["Interpreters 模块"]
subgraph Core["核心组件"]
IInterpreter[IInterpreter<br/>解释器接口]
Context[Context<br/>查询上下文]
ExpressionAnalyzer[ExpressionAnalyzer<br/>表达式分析器]
TreeRewriter[TreeRewriter<br/>AST重写器]
end
subgraph QueryInterpreters["查询解释器"]
InterpreterSelectQuery[InterpreterSelectQuery<br/>SELECT解释器]
InterpreterInsertQuery[InterpreterInsertQuery<br/>INSERT解释器]
InterpreterCreateQuery[InterpreterCreateQuery<br/>CREATE解释器]
InterpreterAlterQuery[InterpreterAlterQuery<br/>ALTER解释器]
InterpreterDropQuery[InterpreterDropQuery<br/>DROP解释器]
end
subgraph Analysis["分析组件"]
ActionsVisitor[ActionsVisitor<br/>访问者模式]
JoinedTables[JoinedTables<br/>JOIN分析]
SubqueryForSet[SubqueryForSet<br/>子查询分析]
end
subgraph Planning["计划组件"]
QueryPlan[QueryPlan<br/>查询计划]
QueryPlanOptimizations[优化规则]
end
subgraph Resources["资源管理"]
DatabaseCatalog[DatabaseCatalog<br/>数据库目录]
TableFunctions[TableFunctions<br/>表函数]
FunctionFactory[FunctionFactory<br/>函数工厂]
AccessControl[AccessControl<br/>访问控制]
end
end
AST[AST] --> IInterpreter
IInterpreter --> Context
Context --> Resources
IInterpreter <|-- QueryInterpreters
InterpreterSelectQuery --> ExpressionAnalyzer
ExpressionAnalyzer --> TreeRewriter
ExpressionAnalyzer --> Analysis
InterpreterSelectQuery --> Planning
QueryPlan --> QueryPlanOptimizations
QueryPlan --> Pipeline[QueryPipeline]
Pipeline --> Processors[Processors模块]
架构说明
图意概述
Interpreters 模块采用分层设计。IInterpreter 是所有解释器的基类,每种 SQL 语句都有对应的解释器。Context 贯穿整个解释过程,管理所有查询相关的状态和资源。ExpressionAnalyzer 和 TreeRewriter 负责语义分析和 AST 优化。最终生成 QueryPlan,经过优化后转换为 QueryPipeline 供 Processors 执行。
关键字段与接口
IInterpreter 接口
class IInterpreter {
public:
virtual ~IInterpreter() = default;
// 执行查询,返回输入/输出流
virtual BlockIO execute() = 0;
// 构建查询计划
virtual void buildQueryPlan(QueryPlan & query_plan);
// 是否忽略配额
virtual bool ignoreQuota() const { return false; }
// 是否忽略限制
virtual bool ignoreLimits() const { return false; }
// 是否支持事务
virtual bool supportsTransactions() const { return false; }
};
Context 类(简化版)
class Context : public std::enable_shared_from_this<Context> {
public:
// 全局单例
static ContextMutablePtr createGlobal(ContextSharedPart * shared);
// 创建会话/查询上下文
ContextMutablePtr createSession();
ContextMutablePtr createQueryContext();
// 数据库操作
DatabasePtr getDatabase(const String & database_name) const;
StoragePtr getTable(const StorageID & table_id) const;
// 设置管理
const Settings & getSettings() const;
void setSettings(const Settings & settings_);
// 访问控制
std::shared_ptr<const ContextAccess> getAccess() const;
void checkAccess(const AccessFlags & flags, const StorageID & table_id) const;
// 函数和聚合
const FunctionFactory & getFunctionFactory() const;
const AggregateFunctionFactory & getAggregateFunctionFactory() const;
// 进程列表
ProcessList & getProcessList();
QueryStatusPtr getProcessListElement() const;
// 临时表
void addExternalTable(const String & table_name, TemporaryTableHolder && temporary_table);
StoragePtr tryResolveStorageID(StorageID & table_id) const;
// 查询信息
String getCurrentDatabase() const;
String getCurrentQueryId() const;
ClientInfo & getClientInfo();
private:
std::shared_ptr<ContextSharedPart> shared; // 全局共享数据
Settings settings; // 查询设置
ClientInfo client_info; // 客户端信息
String current_database; // 当前数据库
String current_query_id; // 当前查询ID
// 缓存和资源
std::shared_ptr<MarkCache> mark_cache;
std::shared_ptr<UncompressedCache> uncompressed_cache;
// 临时表
mutable std::map<String, TemporaryTableHolder> external_tables_mapping;
};
ExpressionAnalyzer 类
class ExpressionAnalyzer {
public:
ExpressionAnalyzer(
const ASTPtr & query_,
const TreeRewriterResultPtr & syntax_analyzer_result_,
ContextPtr context_);
// 获取聚合前的操作
ExpressionActionsPtr getActionsBeforeAggregation();
// 获取 GROUP BY 的键
Names getAggregationKeys() const;
// 获取聚合函数描述
AggregateDescriptions getAggregateDescriptions() const;
// 获取 JOIN 分析
const JoinedTables & analyzedJoin() const { return *joined_tables; }
// 获取 PREWHERE 信息
PrewhereInfoPtr getPrewhereInfo();
// 获取 WHERE 表达式
ExpressionActionsPtr getActionsForWhere();
private:
ASTPtr query;
TreeRewriterResultPtr syntax_analyzer_result;
ContextPtr context;
std::unique_ptr<JoinedTables> joined_tables;
// 分析结果
AggregateDescriptions aggregate_descriptions;
Names aggregation_keys;
};
InterpreterSelectQuery 类
class InterpreterSelectQuery : public IInterpreterUnionOrSelectQuery {
public:
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const ContextPtr & context_,
const SelectQueryOptions & options_,
const Names & required_result_column_names_ = Names{});
// 执行查询
BlockIO execute() override;
// 构建查询计划
void buildQueryPlan(QueryPlan & query_plan) override;
// 获取结果 Block
Block getSampleBlock();
private:
ASTPtr query_ptr;
ContextPtr context;
SelectQueryOptions options;
Names required_result_column_names;
// 分析结果
TreeRewriterResultPtr syntax_analyzer_result;
ExpressionAnalyzerPtr query_analyzer;
// 存储
StoragePtr storage;
TableLockHolder table_lock;
StorageMetadataPtr metadata_snapshot;
// 方法
void executeImpl(QueryPlan & query_plan);
void executeWhere(QueryPlan & query_plan);
void executeAggregation(QueryPlan & query_plan);
void executeOrder(QueryPlan & query_plan);
void executePreLimit(QueryPlan & query_plan);
void executeLimit(QueryPlan & query_plan);
void executeProjection(QueryPlan & query_plan);
};
边界条件
查询复杂度限制
- 最大 JOIN 数量:max_join_tables(61)
- 最大子查询深度:max_subquery_depth(100)
- 最大聚合函数数量:max_aggregate_functions(1000)
资源限制
- 最大内存使用:max_memory_usage
- 最大执行时间:max_execution_time
- 最大行数:max_rows_to_read
并发控制
- 最大并发查询数:max_concurrent_queries
- 最大线程数:max_threads
异常与回退
语义错误
- UNKNOWN_TABLE:表不存在
- UNKNOWN_IDENTIFIER:列不存在
- TYPE_MISMATCH:类型不匹配
- SYNTAX_ERROR:语法错误(来自 Parsers)
资源限制错误
- MEMORY_LIMIT_EXCEEDED:内存超限
- TIMEOUT_EXCEEDED:超时
- TOO_MANY_ROWS:行数超限
权限错误
- ACCESS_DENIED:访问被拒绝
- NOT_ENOUGH_PRIVILEGES:权限不足
性能与容量假设
解释性能
- 简单查询:< 1ms
- 复杂查询(多 JOIN、子查询):1-10ms
- 超复杂查询:10-100ms
内存使用
- Context:几 KB 到几 MB
- QueryPlan:几 KB 到几 MB
- 临时数据(集合、哈希表):可能很大
优化能力
- 谓词下推:减少读取量
- 投影下推:只读取需要的列
- PREWHERE 优化:提前过滤
- JOIN 重排:优化 JOIN 顺序
版本兼容与演进
新功能
- 通过 SelectQueryOptions 启用实验性功能
- 向后兼容旧查询语法
- 支持多版本查询计划
优化升级
- 新增优化规则不影响旧查询
- 可通过 Settings 禁用特定优化
核心 API 详解
API 1: InterpreterSelectQuery::execute - 执行 SELECT
基本信息
- 名称:
InterpreterSelectQuery::execute() - 用途: 执行 SELECT 查询,返回结果流
- 幂等性: 非幂等(可能有副作用,如缓存)
实现流程
BlockIO InterpreterSelectQuery::execute()
{
// 1) 构建查询计划
QueryPlan query_plan;
buildQueryPlan(query_plan);
// 2) 优化查询计划
QueryPlanOptimizationSettings optimization_settings;
optimization_settings.optimize_plan = context->getSettings().query_plan_enable_optimizations;
QueryPlanOptimizer::optimize(query_plan, optimization_settings);
// 3) 生成 Pipeline
QueryPipelineBuilder pipeline_builder = query_plan.buildQueryPipeline();
// 4) 设置 Pipeline 参数
pipeline_builder.setMaxThreads(max_threads);
// 5) 创建 BlockIO
BlockIO res;
res.pipeline = QueryPipelineBuilder::getPipeline(std::move(pipeline_builder));
res.pipeline.setProgressCallback(context->getProgressCallback());
return res;
}
void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
{
// 1) 执行 FROM 子句(读取表)
executeFrom(query_plan);
// 2) 执行 PREWHERE(提前过滤)
if (hasPrewhere())
executePrewhere(query_plan);
// 3) 执行 WHERE(过滤)
if (hasWhere())
executeWhere(query_plan);
// 4) 执行 GROUP BY(聚合)
if (hasAggregation())
{
executeBeforeAggregation(query_plan);
executeAggregation(query_plan);
executeHaving(query_plan);
}
// 5) 执行 ORDER BY(排序)
if (hasOrderBy())
executeOrder(query_plan);
// 6) 执行 LIMIT(限制)
if (hasLimit())
{
executePreLimit(query_plan);
executeLimit(query_plan);
}
// 7) 执行 SELECT(投影)
executeProjection(query_plan);
}
executeFrom - 从表读取数据
void InterpreterSelectQuery::executeFrom(QueryPlan & query_plan)
{
// 1) 获取存储
auto & select = query_ptr->as<ASTSelectQuery &>();
if (!select.tables)
throw Exception("SELECT without FROM");
// 2) 解析表
storage = DatabaseCatalog::instance().getTable(table_id, context);
metadata_snapshot = storage->getInMemoryMetadataPtr();
// 3) 构建 SelectQueryInfo
SelectQueryInfo query_info;
query_info.query = query_ptr;
query_info.syntax_analyzer_result = syntax_analyzer_result;
query_info.sets = query_analyzer->getPreparedSets();
// 4) 检查权限
context->checkAccess(AccessType::SELECT, table_id);
// 5) 获取 PREWHERE 信息
if (auto prewhere_info = query_analyzer->getPrewhereInfo())
query_info.prewhere_info = prewhere_info;
// 6) 调用存储的 read 方法
Pipe pipe = storage->read(
required_columns,
metadata_snapshot,
query_info,
context,
QueryProcessingStage::FetchColumns,
max_block_size,
max_streams
);
// 7) 添加到查询计划
auto read_step = std::make_unique<ReadFromStorageStep>(
std::move(pipe),
storage->getName()
);
query_plan.addStep(std::move(read_step));
}
executeWhere - 执行 WHERE 过滤
void InterpreterSelectQuery::executeWhere(QueryPlan & query_plan)
{
auto & select = query_ptr->as<ASTSelectQuery &>();
// 1) 构建 WHERE 表达式
auto where_step_actions = query_analyzer->getActionsForWhere();
// 2) 获取过滤列名
String filter_column_name = where_step_actions->getSampleBlock().getByPosition(
where_step_actions->getResultPosition()).name;
// 3) 创建 FilterStep
auto where_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
where_step_actions,
filter_column_name,
true // remove_filter_column
);
where_step->setStepDescription("WHERE");
query_plan.addStep(std::move(where_step));
}
executeAggregation - 执行聚合
void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan)
{
// 1) 获取聚合键和聚合函数
Names keys = query_analyzer->getAggregationKeys();
AggregateDescriptions aggregates = query_analyzer->getAggregateDescriptions();
// 2) 确定聚合方法
Aggregator::Params params(
keys,
aggregates,
overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data
);
// 3) 创建 AggregatingStep
auto aggregating_step = std::make_unique<AggregatingStep>(
query_plan.getCurrentDataStream(),
params,
group_by_info,
final,
settings.max_block_size,
settings.aggregation_in_order_max_block_bytes,
merge_threads,
temporary_data_merge_threads,
false // storage_has_evenly_distributed_read
);
aggregating_step->setStepDescription("Aggregating");
query_plan.addStep(std::move(aggregating_step));
// 4) 如果有 WITH TOTALS,添加 TotalsHavingStep
if (query_ptr->as<ASTSelectQuery &>().group_by_with_totals)
{
auto totals_having_step = std::make_unique<TotalsHavingStep>(
query_plan.getCurrentDataStream(),
overflow_row,
nullptr // having_actions
);
query_plan.addStep(std::move(totals_having_step));
}
}
executeOrder - 执行排序
void InterpreterSelectQuery::executeOrder(QueryPlan & query_plan)
{
auto & select = query_ptr->as<ASTSelectQuery &>();
// 1) 解析 ORDER BY
SortDescription order_descr = getSortDescription(select, context);
// 2) 确定排序参数
UInt64 limit = 0;
if (select.limitLength())
{
limit = getLimitForSorting(select, context);
}
// 3) 创建 SortingStep
auto sorting_step = std::make_unique<SortingStep>(
query_plan.getCurrentDataStream(),
order_descr,
limit,
SortingStep::Settings(settings)
);
sorting_step->setStepDescription("Sorting");
query_plan.addStep(std::move(sorting_step));
}
executeLimit - 执行 LIMIT
void InterpreterSelectQuery::executeLimit(QueryPlan & query_plan)
{
auto & select = query_ptr->as<ASTSelectQuery &>();
// 1) 计算 offset 和 length
UInt64 offset = 0;
UInt64 length = 0;
if (select.limitOffset())
offset = getLimitOffset(select, context);
if (select.limitLength())
length = getLimitLength(select, context);
// 2) 创建 LimitStep
auto limit_step = std::make_unique<LimitStep>(
query_plan.getCurrentDataStream(),
length,
offset,
settings.optimize_read_in_order,
select.limit_with_ties
);
limit_step->setStepDescription("LIMIT");
query_plan.addStep(std::move(limit_step));
}
时序图
sequenceDiagram
autonumber
participant Client as 客户端
participant Interp as InterpreterSelectQuery
participant Analyzer as ExpressionAnalyzer
participant Plan as QueryPlan
participant Storage as IStorage
participant Opt as Optimizer
participant Pipeline as QueryPipeline
Client->>Interp: execute()
Interp->>Interp: buildQueryPlan()
Interp->>Analyzer: 创建 ExpressionAnalyzer
Analyzer->>Analyzer: 分析 SELECT 列表
Analyzer->>Analyzer: 分析 WHERE 表达式
Analyzer->>Analyzer: 分析 GROUP BY
Analyzer-->>Interp: 分析结果
Interp->>Plan: executeFrom()
Plan->>Storage: read(columns, query_info)
Storage-->>Plan: Pipe
Plan->>Plan: 添加 ReadFromStorageStep
Interp->>Plan: executePrewhere()
Plan->>Plan: 添加 FilterStep(PREWHERE)
Interp->>Plan: executeWhere()
Plan->>Plan: 添加 FilterStep(WHERE)
Interp->>Plan: executeAggregation()
Plan->>Plan: 添加 AggregatingStep
Plan->>Plan: 添加 TotalsHavingStep
Interp->>Plan: executeOrder()
Plan->>Plan: 添加 SortingStep
Interp->>Plan: executeLimit()
Plan->>Plan: 添加 LimitStep
Interp->>Plan: executeProjection()
Plan->>Plan: 添加 ExpressionStep(投影)
Interp->>Opt: optimize(query_plan)
loop 优化规则
Opt->>Opt: 谓词下推
Opt->>Opt: 投影下推
Opt->>Opt: 合并步骤
Opt->>Opt: 移除冗余步骤
end
Opt-->>Interp: 优化后的 QueryPlan
Interp->>Pipeline: buildQueryPipeline()
Pipeline->>Pipeline: 创建 Processors
Pipeline->>Pipeline: 连接 Ports
Pipeline-->>Interp: QueryPipeline
Interp-->>Client: BlockIO
API 2: InterpreterInsertQuery::execute - 执行 INSERT
基本信息
- 名称:
InterpreterInsertQuery::execute() - 用途: 执行 INSERT 查询,返回写入流
- 幂等性: 非幂等(修改数据)
实现流程
BlockIO InterpreterInsertQuery::execute()
{
auto & query = query_ptr->as<ASTInsertQuery &>();
// 1) 获取目标表
StoragePtr table = getTable(query);
// 2) 获取表元数据
auto metadata_snapshot = table->getInMemoryMetadataPtr();
// 3) 检查权限
context->checkAccess(AccessType::INSERT, table->getStorageID());
// 4) 确定要插入的列
Names insert_columns;
if (query.columns)
{
for (const auto & column : query.columns->children)
insert_columns.push_back(column->as<ASTIdentifier &>().name());
}
else
{
// 使用所有列
insert_columns = metadata_snapshot->getColumns().getOrdinary().getNames();
}
// 5) 获取示例 Block
Block sample_block = getSampleBlock(query, table, metadata_snapshot, context);
BlockIO res;
// 6) 如果是 INSERT SELECT
if (query.select)
{
// 执行 SELECT 部分
auto select_interpreter = std::make_shared<InterpreterSelectWithUnionQuery>(
query.select, context, SelectQueryOptions());
res.pipeline = select_interpreter->execute().pipeline;
// 添加转换步骤(类型转换、列重排)
res.pipeline.addSimpleTransform([&](const Block & header) {
return std::make_shared<ConvertingTransform>(
header, sample_block, ConvertingTransform::MatchColumnsMode::Name);
});
// 添加写入步骤
auto out = table->write(query_ptr, metadata_snapshot, context);
res.pipeline.setSinks([&](const Block & cur_header, QueryPipeline::StreamType type) {
return std::make_shared<SinkToStorage>(out);
});
}
else
{
// INSERT VALUES - 直接从客户端读取数据
res.pipeline = QueryPipeline();
// 创建输入流
auto out = table->write(query_ptr, metadata_snapshot, context);
res.pipeline.setSinks([&](const Block & cur_header, QueryPipeline::StreamType type) {
return std::make_shared<SinkToStorage>(out);
});
}
return res;
}
API 3: Context - 查询上下文管理
基本信息
- 名称:
Context - 用途: 管理查询执行的所有上下文信息
- 生命周期: 全局 → 会话 → 查询
Context 层次结构
// 全局上下文(进程级)
ContextMutablePtr global_context = Context::createGlobal(shared_part);
global_context->makeGlobalContext();
// 会话上下文(连接级)
ContextMutablePtr session_context = Context::createCopy(global_context);
session_context->makeSessionContext();
session_context->setUser(user);
// 查询上下文(查询级)
ContextMutablePtr query_context = Context::createCopy(session_context);
query_context->makeQueryContext();
query_context->setCurrentQueryId(query_id);
资源访问示例
// 获取表
auto table = context->getTable(StorageID{"database", "table"});
// 获取函数工厂
auto & function_factory = context->getFunctionFactory();
auto function = function_factory.get("sum", context);
// 检查权限
context->checkAccess(AccessType::SELECT | AccessType::INSERT, table_id);
// 设置
context->setSetting("max_threads", Field(16));
UInt64 max_threads = context->getSettings().max_threads;
// 临时表
context->addExternalTable("temp_table", temporary_table_holder);
auto temp_table = context->tryResolveStorageID(StorageID{"", "temp_table"});
数据结构 UML 图
classDiagram
class IInterpreter {
<<interface>>
+execute() BlockIO
+buildQueryPlan(QueryPlan&) void
+ignoreQuota() bool
+ignoreLimits() bool
}
class InterpreterSelectQuery {
-query_ptr: ASTPtr
-context: ContextPtr
-options: SelectQueryOptions
-storage: StoragePtr
-query_analyzer: ExpressionAnalyzerPtr
+execute() BlockIO
+buildQueryPlan(QueryPlan&) void
-executeFrom(QueryPlan&) void
-executeWhere(QueryPlan&) void
-executeAggregation(QueryPlan&) void
-executeOrder(QueryPlan&) void
}
class InterpreterInsertQuery {
-query_ptr: ASTPtr
-context: ContextPtr
+execute() BlockIO
+getTable(ASTInsertQuery&) StoragePtr
-getSampleBlock() Block
}
class Context {
-shared: ContextSharedPart*
-settings: Settings
-current_database: String
-current_query_id: String
+getDatabase(String) DatabasePtr
+getTable(StorageID) StoragePtr
+checkAccess(AccessFlags, StorageID) void
+getFunctionFactory() FunctionFactory&
+createQueryContext() ContextMutablePtr
}
class ExpressionAnalyzer {
-query: ASTPtr
-syntax_analyzer_result: TreeRewriterResultPtr
-context: ContextPtr
+getActionsBeforeAggregation() ExpressionActionsPtr
+getAggregationKeys() Names
+getAggregateDescriptions() AggregateDescriptions
+getActionsForWhere() ExpressionActionsPtr
}
class TreeRewriter {
+analyze(ASTPtr, Names, StoragePtr, Context) TreeRewriterResultPtr
-normalizeTree() void
-collectUsedColumns() void
-executeScalarSubqueries() void
}
class QueryPlan {
-root: QueryPlanStepPtr
-steps: vector~QueryPlanStepPtr~
+addStep(QueryPlanStepPtr) void
+getCurrentDataStream() DataStream
+buildQueryPipeline() QueryPipelineBuilder
}
class SelectQueryOptions {
+subquery_depth: size_t
+only_analyze: bool
+ignore_quota: bool
+ignore_limits: bool
+to_stage: QueryProcessingStage
}
IInterpreter <|-- InterpreterSelectQuery
IInterpreter <|-- InterpreterInsertQuery
InterpreterSelectQuery --> Context
InterpreterSelectQuery --> ExpressionAnalyzer
InterpreterSelectQuery --> QueryPlan
InterpreterSelectQuery --> SelectQueryOptions
InterpreterInsertQuery --> Context
ExpressionAnalyzer --> TreeRewriter
ExpressionAnalyzer --> Context
实战经验
自定义 Interpreter
class MyCustomInterpreter : public IInterpreter {
public:
MyCustomInterpreter(const ASTPtr & query_ptr_, ContextPtr context_)
: query_ptr(query_ptr_), context(context_) {}
BlockIO execute() override
{
// 实现自定义逻辑
QueryPlan query_plan;
buildQueryPlan(query_plan);
auto pipeline_builder = query_plan.buildQueryPipeline();
BlockIO res;
res.pipeline = QueryPipelineBuilder::getPipeline(std::move(pipeline_builder));
return res;
}
void buildQueryPlan(QueryPlan & query_plan) override
{
// 构建查询计划
}
private:
ASTPtr query_ptr;
ContextPtr context;
};
Context 使用模式
// 创建查询上下文
ContextMutablePtr query_context = session_context->createQueryContext();
// 设置查询ID
query_context->setCurrentQueryId(UUIDHelpers::generateV4());
// 设置查询参数
Settings settings;
settings.max_threads = 8;
settings.max_memory_usage = 10_GiB;
query_context->setSettings(settings);
// 执行查询
auto interpreter = InterpreterFactory::get(ast, query_context);
BlockIO res = interpreter->execute();
// 清理
query_context.reset();
表达式分析示例
// 创建 TreeRewriter
TreeRewriterResultPtr syntax_result = TreeRewriter(context).analyze(
query, required_result_columns, storage, storage_metadata);
// 创建 ExpressionAnalyzer
ExpressionAnalyzer analyzer(query, syntax_result, context);
// 获取 WHERE 表达式
ExpressionActionsPtr where_actions = analyzer.getActionsForWhere();
// 获取聚合信息
Names aggregation_keys = analyzer.getAggregationKeys();
AggregateDescriptions aggregates = analyzer.getAggregateDescriptions();
总结
Interpreters 模块是 ClickHouse 的查询执行核心,负责:
- 语义分析:检查查询的语义正确性
- 查询计划:构建逻辑执行计划(QueryPlan)
- 上下文管理:管理所有查询相关的状态和资源
- 资源协调:协调存储、函数、权限等各种资源
- 优化:应用各种查询优化规则
关键组件:
- IInterpreter:解释器接口,每种查询类型一个实现
- Context:贯穿整个查询生命周期的上下文
- ExpressionAnalyzer:表达式语义分析
- QueryPlan:查询执行计划
Interpreters 模块连接了 Parsers(输入 AST)和 Processors(执行引擎),是查询处理的中枢。