核心执行引擎详细分析
1.1 IProcessor处理器基础框架
1.1.1 IProcessor核心接口设计
// src/Processors/IProcessor.h
class IProcessor
{
public:
/// 处理器状态枚举
enum class Status
{
NeedData, /// 需要更多输入数据
PortFull, /// 输出端口已满
Finished, /// 处理完成
Ready, /// 准备处理
Async, /// 异步处理中
ExpandPipeline /// 需要扩展管道
};
protected:
InputPorts inputs; /// 输入端口列表
OutputPorts outputs; /// 输出端口列表
public:
/// 核心处理方法 - 准备阶段
virtual Status prepare() = 0;
/// 核心处理方法 - 工作阶段
virtual void work();
/// 异步调度方法
virtual int schedule();
/// 管道扩展方法
virtual Processors expandPipeline();
/// 取消处理
virtual void cancel() noexcept;
/// 获取处理器名称
virtual String getName() const = 0;
};
关键设计理念:
- 端口模式:通过InputPort和OutputPort进行数据传输
- 状态机模式:使用Status枚举管理处理器状态
- 异步支持:支持异步处理和调度
- 可扩展性:支持动态管道扩展
1.1.2 处理器生命周期管理
// src/Processors/IProcessor.cpp
IProcessor::IProcessor()
{
// 分配处理器索引,用于调试和监控
processor_index = CurrentThread::isInitialized()
? CurrentThread::get().getNextPipelineProcessorIndex()
: 0;
}
IProcessor::IProcessor(InputPorts inputs_, OutputPorts outputs_)
: inputs(std::move(inputs_)), outputs(std::move(outputs_))
{
// 建立端口与处理器的双向关联
for (auto & port : inputs)
port.processor = this;
for (auto & port : outputs)
port.processor = this;
processor_index = CurrentThread::isInitialized()
? CurrentThread::get().getNextPipelineProcessorIndex()
: 0;
}
void IProcessor::cancel() noexcept
{
// 原子操作确保取消操作的线程安全性
bool already_cancelled = is_cancelled.exchange(true, std::memory_order_acq_rel);
if (already_cancelled)
return;
onCancel(); // 子类可重写此方法进行清理
}
1.2 管道执行器架构
1.2.1 PipelineExecutor核心实现
// src/Processors/Executors/PipelineExecutor.h
class PipelineExecutor
{
public:
/// 构造函数:构建执行图
explicit PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatusPtr elem);
/// 多线程执行管道
void execute(size_t num_threads, bool concurrency_control);
/// 单步执行(用于调试和控制)
bool executeStep(std::atomic_bool * yield_flag = nullptr);
private:
/// 执行图表示
ExecutingGraphPtr graph;
/// 任务队列
ExecutorTasks tasks;
/// 执行状态
std::atomic<ExecutionStatus> execution_status{ExecutionStatus::EXECUTING};
};
// src/Processors/Executors/PipelineExecutor.cpp
void PipelineExecutor::execute(size_t num_threads, bool concurrency_control)
{
checkTimeLimit();
num_threads = std::max<size_t>(num_threads, 1);
OpenTelemetry::SpanHolder span("PipelineExecutor::execute()");
span.addAttribute("clickhouse.thread_num", num_threads);
try
{
executeImpl(num_threads, concurrency_control);
/// 记录所有LOGICAL_ERROR异常
for (auto & node : graph->nodes)
if (node->exception && getExceptionErrorCode(node->exception) == ErrorCodes::LOGICAL_ERROR)
tryLogException(node->exception, log);
/// 重新抛出第一个异常
for (auto & node : graph->nodes)
if (node->exception)
std::rethrow_exception(node->exception);
/// 处理执行线程中的异常
tasks.rethrowFirstThreadException();
}
catch (...)
{
span.addAttribute(DB::ExecutionStatus::fromCurrentException());
#ifndef NDEBUG
LOG_TRACE(log, "Exception while executing query. Current state:\n{}", dumpPipeline());
#endif
throw;
}
finalizeExecution();
}
1.2.2 执行图构建与调度
// src/Processors/Executors/ExecutingGraph.h
class ExecutingGraph
{
public:
struct Node
{
ProcessorPtr processor;
std::vector<Node *> direct_edges; /// 直接依赖的节点
std::vector<Node *> back_edges; /// 反向依赖的节点
/// 状态管理
std::atomic<bool> status{false};
std::exception_ptr exception;
/// 调度信息
std::atomic<UInt64> num_executed_jobs{0};
std::atomic<UInt64> finish_time{0};
};
std::vector<std::unique_ptr<Node>> nodes;
/// 构建执行图
void buildGraph(Processors & processors);
/// 获取可执行的节点
std::vector<Node *> getReadyNodes();
/// 更新节点状态
void updateNodeStatus(Node * node, IProcessor::Status status);
};
void ExecutingGraph::buildGraph(Processors & processors)
{
/// 1. 为每个处理器创建节点
nodes.reserve(processors.size());
std::unordered_map<IProcessor *, Node *> processor_to_node;
for (auto & processor : processors)
{
auto node = std::make_unique<Node>();
node->processor = processor;
processor_to_node[processor.get()] = node.get();
nodes.emplace_back(std::move(node));
}
/// 2. 建立节点间的依赖关系
for (auto & node : nodes)
{
for (auto & input_port : node->processor->getInputs())
{
if (input_port.isConnected())
{
auto * output_processor = &input_port.getOutputPort().getProcessor();
auto * output_node = processor_to_node[output_processor];
/// 建立双向连接
node->back_edges.push_back(output_node);
output_node->direct_edges.push_back(node.get());
}
}
}
/// 3. 验证图的正确性
validateGraph();
}
1.3 不同类型的处理器实现
1.3.1 Source处理器(数据源)
// src/Processors/Sources/SourceFromSingleChunk.h
class SourceFromSingleChunk : public ISource
{
public:
SourceFromSingleChunk(Block header, Chunk chunk_)
: ISource(std::move(header)), chunk(std::move(chunk_)) {}
String getName() const override { return "SourceFromSingleChunk"; }
protected:
Chunk generate() override
{
if (chunk)
{
auto res = std::move(chunk);
chunk.clear();
return res;
}
return {};
}
private:
Chunk chunk;
};
// ISource基类实现
class ISource : public IProcessor
{
public:
ISource(Block header) : IProcessor({}, {OutputPort(std::move(header))}) {}
Status prepare() override
{
if (finished)
{
output.finish();
return Status::Finished;
}
if (output.isFinished())
{
finished = true;
return Status::Finished;
}
if (!output.canPush())
return Status::PortFull;
if (!has_input)
return Status::Ready;
output.push(std::move(current_chunk));
has_input = false;
if (got_exception)
{
finished = true;
output.finish();
std::rethrow_exception(got_exception);
}
return Status::PortFull;
}
void work() override
{
try
{
current_chunk = generate();
if (!current_chunk)
{
finished = true;
return;
}
has_input = true;
}
catch (...)
{
finished = true;
got_exception = std::current_exception();
}
}
protected:
virtual Chunk generate() = 0;
private:
OutputPort & output = outputs.front();
bool finished = false;
bool has_input = false;
Chunk current_chunk;
std::exception_ptr got_exception;
};
1.3.2 Transform处理器(数据转换)
// src/Processors/Transforms/ExpressionTransform.h
class ExpressionTransform : public ISimpleTransform
{
public:
ExpressionTransform(const Block & header_, ExpressionActionsPtr expression_)
: ISimpleTransform(header_, expression_->getResultColumns().cloneEmpty(), false)
, expression(std::move(expression_))
{
/// 检查表达式是否会改变行数
const auto & actions = expression->getActions();
for (const auto & action : actions)
{
if (action.node->type == ActionsDAG::ActionType::ARRAY_JOIN)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"ARRAY JOIN is not supported in ExpressionTransform");
}
}
}
String getName() const override { return "ExpressionTransform"; }
protected:
void transform(Chunk & chunk) override
{
size_t num_rows = chunk.getNumRows();
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
/// 执行表达式计算
expression->execute(block, num_rows);
chunk.setColumns(block.getColumns(), num_rows);
}
private:
ExpressionActionsPtr expression;
};
// ISimpleTransform基类实现
class ISimpleTransform : public IProcessor
{
public:
ISimpleTransform(Block input_header, Block output_header, bool skip_empty_chunks_ = true)
: IProcessor({InputPort(std::move(input_header))}, {OutputPort(std::move(output_header))})
, input(inputs.front())
, output(outputs.front())
, skip_empty_chunks(skip_empty_chunks_) {}
Status prepare() override
{
/// 检查输出端口状态
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
/// 检查输入端口状态
if (has_output)
{
output.push(std::move(current_chunk));
has_output = false;
}
if (finished_output)
{
output.finish();
return Status::Finished;
}
if (has_input)
return Status::Ready;
if (input.isFinished())
{
finished_output = true;
return Status::Ready;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
current_chunk = input.pull(true);
has_input = true;
return Status::Ready;
}
void work() override
{
if (has_input)
{
transform(current_chunk);
if (skip_empty_chunks && current_chunk.getNumRows() == 0)
{
has_input = false;
return;
}
has_input = false;
has_output = true;
}
else if (finished_input)
{
finished_output = true;
}
}
protected:
virtual void transform(Chunk & chunk) = 0;
private:
InputPort & input;
OutputPort & output;
Chunk current_chunk;
bool has_input = false;
bool has_output = false;
bool finished_input = false;
bool finished_output = false;
const bool skip_empty_chunks = true;
};
1.3.3 Sink处理器(数据输出)
// src/Processors/Sinks/SinkToStorage.h
class SinkToStorage : public ExceptionKeepingTransform
{
public:
explicit SinkToStorage(const Block & header) : ExceptionKeepingTransform(header, {}) {}
String getName() const override { return "SinkToStorage"; }
protected:
void onConsume(Chunk chunk) override
{
if (!chunk)
return;
cur_chunk = std::move(chunk);
consume(cur_chunk);
}
GenerateResult onGenerate() override
{
/// Sink不产生输出数据
return {Chunk{}, false};
}
void onFinish() override
{
finalize();
}
/// 子类需要实现的方法
virtual void consume(Chunk chunk) = 0;
virtual void finalize() {}
private:
Chunk cur_chunk;
};
// 具体的存储Sink实现示例
class MergeTreeSink : public SinkToStorage
{
public:
MergeTreeSink(
StorageMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot_,
size_t max_parts_per_block_,
ContextPtr context_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, max_parts_per_block(max_parts_per_block_)
, context(context_) {}
String getName() const override { return "MergeTreeSink"; }
protected:
void consume(Chunk chunk) override
{
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
/// 写入数据到MergeTree
storage.write(block, context);
}
void finalize() override
{
/// 完成写入,触发后台合并
storage.flushAndPrepareForShutdown();
}
private:
StorageMergeTree & storage;
StorageMetadataPtr metadata_snapshot;
size_t max_parts_per_block;
ContextPtr context;
};
MergeTree存储引擎深度解析
2.1 MergeTree数据组织结构
2.1.1 数据部分(DataPart)核心设计
// src/Storages/MergeTree/IMergeTreeDataPart.h
class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPart>
{
public:
/// 数据部分状态枚举
enum class State
{
Temporary, /// 临时状态,正在写入
PreCommitted, /// 预提交状态
Committed, /// 已提交状态
Outdated, /// 过时状态,等待删除
Deleting, /// 正在删除
DeleteOnDestroy /// 析构时删除
};
/// 数据部分类型
enum class Type
{
WIDE, /// 宽格式(每列一个文件)
COMPACT, /// 紧凑格式(所有列在一个文件中)
IN_MEMORY, /// 内存格式
UNKNOWN /// 未知格式
};
/// 列大小统计
struct ColumnSize
{
size_t marks = 0; /// 标记数量
size_t data_compressed = 0; /// 压缩后大小
size_t data_uncompressed = 0; /// 压缩前大小
void addToTotalSize(ColumnSize & total_size) const
{
total_size.marks += marks;
total_size.data_compressed += data_compressed;
total_size.data_uncompressed += data_uncompressed;
}
};
protected:
/// 基本信息
String name; /// 数据部分名称
MergeTreePartInfo info; /// 分区信息
const MergeTreeData & storage; /// 存储引擎引用
VolumePtr volume; /// 存储卷
/// 元数据
mutable ColumnsDescription columns; /// 列描述
mutable SerializationInfoByName serialization_infos; /// 序列化信息
mutable VersionMetadata version; /// 版本元数据
/// 统计信息
size_t rows_count = 0; /// 行数
size_t bytes_on_disk = 0; /// 磁盘占用字节数
mutable ColumnSizeByName columns_sizes; /// 各列大小统计
/// 索引信息
mutable IndexGranularity index_granularity; /// 索引粒度
size_t index_granularity_bytes = 0; /// 索引粒度字节数
/// 分区信息
String partition_id; /// 分区ID
MergeTreePartition partition; /// 分区值
/// 校验和
mutable Checksums checksums; /// 文件校验和
/// 状态管理
mutable std::atomic<State> state{State::Temporary};
mutable std::mutex state_mutex;
public:
/// 获取数据读取器
virtual std::shared_ptr<IMergeTreeReader> getReader(
const NamesAndTypesList & columns_to_read,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const MergeTreeReaderSettings & reader_settings,
const ValueSizeMap & avg_value_size_hints = {},
const ReadBufferFromFileBase::ProfileCallback & profile_callback = {}) const = 0;
/// 获取数据写入器
virtual std::shared_ptr<IMergeTreeDataPartWriter> getWriter(
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity) const = 0;
/// 加载元数据
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
void loadIndex();
void loadPartitionAndMinMaxIndex();
void loadChecksums(bool require);
void loadRowsCount();
/// 校验数据完整性
void checkConsistency(bool require_part_metadata) const;
/// 计算各列大小
void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const;
};
2.1.2 数据读取器实现
// src/Storages/MergeTree/IMergeTreeReader.h
class IMergeTreeReader : private boost::noncopyable
{
public:
using DeserializeBinaryBulkStateMap = std::unordered_map<std::string, ISerialization::DeserializeBinaryBulkStatePtr>;
using FileStreams = std::map<std::string, std::unique_ptr<MergeTreeReaderStream>>;
IMergeTreeReader(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
const NamesAndTypesList & columns_,
const VirtualFields & virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
const MarkRanges & all_mark_ranges_,
const MergeTreeReaderSettings & settings_,
const ValueSizeMap & avg_value_size_hints_ = ValueSizeMap{});
/// 读取指定范围的数据行
virtual size_t readRows(
size_t from_mark,
size_t current_task_last_mark,
bool continue_reading,
size_t max_rows_to_read,
Columns & res_columns) = 0;
/// 预读取,用于异步IO优化
virtual bool canReadIncompleteGranules() const = 0;
protected:
/// 初始化文件流
void addStreams(
const NameAndTypePair & name_and_type,
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
/// 读取数据流
void readData(
const NameAndTypePair & name_and_type,
ColumnPtr & column,
size_t from_mark,
bool continue_reading,
size_t current_task_last_mark,
size_t max_rows_to_read,
ISerialization::SubstreamsCache & cache,
bool was_prefetched = false);
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read;
const NamesAndTypesList columns;
const VirtualFields virtual_fields;
UncompressedCache * uncompressed_cache;
MarkCache * mark_cache;
MarkRanges all_mark_ranges;
MergeTreeReaderSettings settings;
/// 文件流管理
FileStreams file_streams;
/// 反序列化状态
DeserializeBinaryBulkStateMap deserialize_binary_bulk_state_map;
};
// 具体的宽格式读取器实现
class MergeTreeReaderWide : public IMergeTreeReader
{
public:
MergeTreeReaderWide(/* 参数列表 */) : IMergeTreeReader(/* 参数传递 */) {}
size_t readRows(
size_t from_mark,
size_t current_task_last_mark,
bool continue_reading,
size_t max_rows_to_read,
Columns & res_columns) override
{
size_t read_rows = 0;
/// 遍历所有需要读取的列
for (size_t pos = 0; pos < columns.size(); ++pos)
{
const auto & name_and_type = columns[pos];
if (!res_columns[pos])
res_columns[pos] = name_and_type.type->createColumn();
/// 从指定标记开始读取数据
ISerialization::SubstreamsCache cache;
readData(name_and_type, res_columns[pos], from_mark, continue_reading,
current_task_last_mark, max_rows_to_read, cache);
}
/// 所有列应该读取相同的行数
if (!res_columns.empty())
read_rows = res_columns[0]->size();
return read_rows;
}
bool canReadIncompleteGranules() const override { return true; }
};
2.1.3 数据写入器实现
// src/Storages/MergeTree/MergeTreeDataWriter.h
class MergeTreeDataWriter
{
public:
MergeTreeDataWriter(MergeTreeData & data_) : data(data_), log(getLogger("MergeTreeDataWriter")) {}
/// 写入数据块,返回创建的数据部分
MergeTreeMutableDataPartPtr writeTempPart(
BlockWithPartition & block_with_partition,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context);
private:
/// 创建新的数据部分
MergeTreeMutableDataPartPtr createPart(
const String & part_name,
const MergeTreeDataPartType & part_type,
const MergeTreePartInfo & part_info,
const VolumePtr & volume,
const String & relative_path = "");
/// 写入数据到磁盘
void writeDataPart(
MergeTreeMutableDataPartPtr & new_data_part,
const Block & block,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context);
MergeTreeData & data;
LoggerPtr log;
};
// src/Storages/MergeTree/MergeTreeDataWriter.cpp
MergeTreeMutableDataPartPtr MergeTreeDataWriter::writeTempPart(
BlockWithPartition & block_with_partition,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context)
{
Block & block = block_with_partition.block;
/// 1. 生成数据部分信息
auto part_info = MergeTreePartInfo::fromPartName(
data.getPartName(block_with_partition.partition),
data.format_version);
/// 2. 选择存储卷
auto volume = data.getStoragePolicy()->getVolume(0);
/// 3. 创建新的数据部分
auto new_data_part = createPart(
data.getPartName(block_with_partition.partition),
MergeTreeDataPartType::WIDE,
part_info,
volume);
/// 4. 设置分区信息
new_data_part->partition = std::move(block_with_partition.partition);
new_data_part->minmax_idx->update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
/// 5. 写入数据到磁盘
writeDataPart(new_data_part, block, metadata_snapshot, context);
/// 6. 完成数据部分创建
new_data_part->rows_count = block.rows();
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(false, true);
new_data_part->setBytesOnDisk(new_data_part->checksums.getTotalSizeOnDisk());
return new_data_part;
}
void MergeTreeDataWriter::writeDataPart(
MergeTreeMutableDataPartPtr & new_data_part,
const Block & block,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context)
{
/// 1. 创建数据写入器
auto writer = new_data_part->getWriter(
block.getNamesAndTypesList(),
metadata_snapshot,
{}, // indices_to_recalc
data.getCompressionCodecForPart(new_data_part->info.level, new_data_part->info.mutation, context),
MergeTreeWriterSettings(context->getSettingsRef(), data.getSettings()),
computed_index_granularity);
/// 2. 写入数据块
writer->write(block);
/// 3. 完成写入
writer->finishDataSerialization(sync_on_insert);
writer->finishPrimaryIndexSerialization(sync_on_insert);
writer->finishSkipIndicesSerialization(sync_on_insert);
/// 4. 计算校验和
new_data_part->checksums = writer->releaseChecksums();
}
2.2 MergeTree查询执行
2.2.1 查询执行器架构
// src/Storages/MergeTree/MergeTreeDataSelectExecutor.h
class MergeTreeDataSelectExecutor
{
public:
explicit MergeTreeDataSelectExecutor(const MergeTreeData & data_);
/// 主要的查询执行方法
QueryPlanPtr read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
UInt64 max_block_size,
size_t num_streams,
PartitionIdToMaxBlockPtr max_block_numbers_to_read = nullptr,
bool enable_parallel_reading = false) const;
/// 从指定数据部分读取
QueryPlanStepPtr readFromParts(
RangesInDataParts parts,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
UInt64 max_block_size,
size_t num_streams,
PartitionIdToMaxBlockPtr max_block_numbers_to_read = nullptr,
ReadFromMergeTree::AnalysisResultPtr merge_tree_select_result_ptr = nullptr,
bool enable_parallel_reading = false,
std::shared_ptr<ParallelReadingExtension> extension_ = nullptr) const;
private:
/// 选择需要读取的数据部分
RangesInDataParts selectPartsToRead(
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
PartitionIdToMaxBlockPtr max_block_numbers_to_read) const;
/// 应用主键条件过滤
void filterPartsByPrimaryKey(
RangesInDataParts & parts,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context) const;
/// 应用跳数索引过滤
void filterPartsBySkipIndexes(
RangesInDataParts & parts,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context) const;
const MergeTreeData & data;
LoggerPtr log;
};
// 查询执行核心逻辑
QueryPlanPtr MergeTreeDataSelectExecutor::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
UInt64 max_block_size,
size_t num_streams,
PartitionIdToMaxBlockPtr max_block_numbers_to_read,
bool enable_parallel_reading) const
{
/// 1. 选择需要读取的数据部分
auto parts_with_ranges = selectPartsToRead(
storage_snapshot, query_info, context, max_block_numbers_to_read);
/// 2. 应用主键过滤
filterPartsByPrimaryKey(parts_with_ranges, storage_snapshot, query_info, context);
/// 3. 应用跳数索引过滤
filterPartsBySkipIndexes(parts_with_ranges, storage_snapshot, query_info, context);
/// 4. 创建查询计划
auto query_plan = std::make_unique<QueryPlan>();
if (parts_with_ranges.empty())
{
/// 没有数据需要读取,返回空结果
auto header = storage_snapshot->getSampleBlockForColumns(column_names);
auto read_nothing = std::make_unique<ReadNothingStep>(header);
query_plan->addStep(std::move(read_nothing));
return query_plan;
}
/// 5. 创建读取步骤
auto read_step = readFromParts(
std::move(parts_with_ranges),
data.getMutationsSnapshot(query_info, context),
column_names,
storage_snapshot,
query_info,
context,
max_block_size,
num_streams,
max_block_numbers_to_read,
nullptr,
enable_parallel_reading);
query_plan->addStep(std::move(read_step));
return query_plan;
}
2.2.2 数据部分选择算法
// 数据部分选择的核心算法
RangesInDataParts MergeTreeDataSelectExecutor::selectPartsToRead(
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
PartitionIdToMaxBlockPtr max_block_numbers_to_read) const
{
RangesInDataParts parts_with_ranges;
/// 1. 获取所有活跃的数据部分
auto data_parts = data.getDataPartsVector();
/// 2. 分区裁剪
if (metadata_snapshot->hasPartitionKey())
{
const auto & partition_key = metadata_snapshot->getPartitionKey();
auto partition_pruner = std::make_shared<PartitionPruner>(
metadata_snapshot, query_info, context, false);
/// 过滤不匹配分区条件的数据部分
data_parts.erase(
std::remove_if(data_parts.begin(), data_parts.end(),
[&](const auto & part)
{
return !partition_pruner->canBePruned(*part);
}),
data_parts.end());
}
/// 3. 应用最大块号限制
if (max_block_numbers_to_read)
{
data_parts.erase(
std::remove_if(data_parts.begin(), data_parts.end(),
[&](const auto & part)
{
auto it = max_block_numbers_to_read->find(part->info.partition_id);
return it != max_block_numbers_to_read->end()
&& part->info.max_block > it->second;
}),
data_parts.end());
}
/// 4. 为每个数据部分创建标记范围
for (const auto & part : data_parts)
{
RangesInDataPart ranges_in_part;
ranges_in_part.data_part = part;
/// 初始化为读取整个数据部分
ranges_in_part.part_index_in_query = parts_with_ranges.size();
ranges_in_part.ranges = MarkRanges{MarkRange{0, part->getMarksCount()}};
parts_with_ranges.push_back(std::move(ranges_in_part));
}
return parts_with_ranges;
}
// 主键过滤实现
void MergeTreeDataSelectExecutor::filterPartsByPrimaryKey(
RangesInDataParts & parts,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context) const
{
if (!storage_snapshot->getMetadataForQuery()->hasSortingKey())
return;
const auto & primary_key = storage_snapshot->getMetadataForQuery()->getPrimaryKey();
/// 构建主键条件
KeyCondition key_condition(query_info.query, context, primary_key.column_names, primary_key.expression);
if (key_condition.alwaysUnknownOrTrue())
return; /// 主键条件无法过滤任何数据
/// 对每个数据部分应用主键过滤
for (auto & part_with_ranges : parts)
{
if (part_with_ranges.ranges.empty())
continue;
auto & part = part_with_ranges.data_part;
/// 加载主键索引
part->loadIndex();
/// 应用主键条件过滤标记范围
MarkRanges filtered_ranges;
for (const auto & range : part_with_ranges.ranges)
{
MarkRanges new_ranges = key_condition.mayBeTrueInRange(
range.begin, range.end, part->index, primary_key.sample_block);
filtered_ranges.insert(filtered_ranges.end(),
new_ranges.begin(), new_ranges.end());
}
part_with_ranges.ranges = std::move(filtered_ranges);
}
/// 移除空范围的数据部分
parts.erase(
std::remove_if(parts.begin(), parts.end(),
[](const RangesInDataPart & part) { return part.ranges.empty(); }),
parts.end());
}
2.3 MergeTree合并机制
2.3.1 合并任务调度
// src/Storages/MergeTree/MergeTask.h
class MergeTask
{
public:
/// 合并任务状态
enum class State
{
NEED_PREPARE,
NEED_EXECUTE,
NEED_FINISH,
SUCCESS
};
struct GlobalContext
{
MergeTreeData * data;
StorageMetadataPtr metadata_snapshot;
FutureMergedMutatedPartPtr future_part;
MergeTreeData::MutableDataPartPtr new_data_part;
/// 合并执行器
std::unique_ptr<QueryPipelineBuilder> merging_pipeline;
std::unique_ptr<PullingPipelineExecutor> merging_executor;
/// 输出流
std::unique_ptr<MergedBlockOutputStream> to;
/// 统计信息
size_t rows_written = 0;
UInt64 watch_prev_elapsed = 0;
/// 合并列表元素(用于监控)
MergeListElement * merge_list_element_ptr = nullptr;
};
MergeTask(
StorageMetadataPtr metadata_snapshot_,
FutureMergedMutatedPartPtr future_part_,
MergeTreeData * data_,
MergeListElement * merge_list_element_ptr_,
time_t time_of_merge_,
ContextPtr context_,
ReservationSharedPtr space_reservation_,
bool deduplicate_,
Names deduplicate_by_columns_,
MergeTreeData::MergingParams merging_params_,
MergeTreeTransactionPtr txn_,
const String & suffix_ = "",
bool need_prefix_ = true);
/// 执行合并任务的一个步骤
bool executeStep();
private:
/// 准备合并
bool prepare();
/// 执行合并
bool executeImpl();
/// 完成合并
bool finalize();
std::shared_ptr<GlobalContext> global_ctx;
State state{State::NEED_PREPARE};
};
// src/Storages/MergeTree/MergeTask.cpp
bool MergeTask::executeStep()
{
switch (state)
{
case State::NEED_PREPARE:
if (prepare())
{
state = State::NEED_EXECUTE;
return true;
}
return false;
case State::NEED_EXECUTE:
if (executeImpl())
{
state = State::NEED_FINISH;
return true;
}
return false;
case State::NEED_FINISH:
if (finalize())
{
state = State::SUCCESS;
return false; /// 任务完成
}
return false;
case State::SUCCESS:
return false;
}
return false;
}
bool MergeTask::prepare()
{
/// 1. 创建新的数据部分
global_ctx->new_data_part = global_ctx->data->createPart(
global_ctx->future_part->name,
global_ctx->future_part->type,
global_ctx->future_part->part_info,
global_ctx->future_part->volume,
global_ctx->future_part->relative_path);
/// 2. 构建合并管道
auto merging_pipeline = std::make_unique<QueryPipelineBuilder>();
/// 为每个输入数据部分创建源
for (const auto & part : global_ctx->future_part->parts)
{
auto source = std::make_shared<MergeTreeSequentialSource>(
*global_ctx->data,
global_ctx->metadata_snapshot,
part,
global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical(),
false, /// 不需要虚拟列
true /// 需要行号
);
merging_pipeline->addSource(std::move(source));
}
/// 3. 添加合并变换
auto merging_transform = std::make_shared<MergingSortedTransform>(
merging_pipeline->getHeader(),
global_ctx->future_part->parts.size(),
global_ctx->metadata_snapshot->getSortDescription(),
global_ctx->data->merging_params.max_bytes_to_merge_at_max_space_in_pool);
merging_pipeline->addTransform(std::move(merging_transform));
/// 4. 创建输出流
global_ctx->to = std::make_unique<MergedBlockOutputStream>(
global_ctx->new_data_part,
global_ctx->metadata_snapshot,
global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical(),
CompressionCodecFactory::instance().get("NONE", {}),
/// 其他参数...
);
/// 5. 创建执行器
global_ctx->merging_executor = std::make_unique<PullingPipelineExecutor>(*merging_pipeline);
global_ctx->merging_pipeline = std::move(merging_pipeline);
return true;
}
bool MergeTask::executeImpl()
{
/// 执行合并的时间限制
UInt64 step_time_ms = global_ctx->data->getSettings()->background_task_preferred_step_execution_time_ms;
Stopwatch watch;
do
{
Block block;
/// 从合并管道拉取数据块
if (!global_ctx->merging_executor->pull(block))
{
/// 合并完成
return true;
}
/// 写入合并后的数据块
global_ctx->rows_written += block.rows();
global_ctx->to->write(block);
/// 更新最小最大索引
if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary)
{
global_ctx->new_data_part->minmax_idx->update(
block,
MergeTreeData::getMinMaxColumnsNames(global_ctx->metadata_snapshot->getPartitionKey()));
}
/// 更新统计信息
if (global_ctx->merge_list_element_ptr)
{
global_ctx->merge_list_element_ptr->rows_written = global_ctx->rows_written;
global_ctx->merge_list_element_ptr->bytes_written_uncompressed =
global_ctx->to->getBytesWritten();
}
} while (watch.elapsedMilliseconds() < step_time_ms);
/// 时间片用完,但合并未完成
return false;
}
bool MergeTask::finalize()
{
/// 1. 完成数据写入
global_ctx->to->finalizePart(global_ctx->new_data_part, false);
/// 2. 设置数据部分属性
global_ctx->new_data_part->rows_count = global_ctx->rows_written;
global_ctx->new_data_part->modification_time = time(nullptr);
/// 3. 加载元数据
global_ctx->new_data_part->loadColumnsChecksumsIndexes(false, true);
global_ctx->new_data_part->setBytesOnDisk(
global_ctx->new_data_part->checksums.getTotalSizeOnDisk());
/// 4. 提交新数据部分
global_ctx->data->replaceParts(
global_ctx->future_part->parts,
{global_ctx->new_data_part},
false);
return true;
}
查询处理器框架分析
3.1 端口通信机制
3.1.1 Port基础设计
// src/Processors/Port.h
class Port
{
public:
enum class State
{
NotNeeded,
NeedData,
HasData,
Finished
};
protected:
State state = State::NotNeeded;
IProcessor * processor = nullptr;
/// 数据存储
Chunk data;
/// 连接信息
Port * connected_port = nullptr;
public:
/// 状态查询
bool isConnected() const { return connected_port != nullptr; }
bool hasData() const { return state == State::HasData; }
bool isFinished() const { return state == State::Finished; }
bool isNeeded() const { return state == State::NeedData; }
/// 状态设置
void setNeeded() { state = State::NeedData; }
void setNotNeeded() { state = State::NotNeeded; }
void finish() { state = State::Finished; }
/// 数据传输
void pushData(Chunk chunk)
{
chassert(state == State::NeedData);
data = std::move(chunk);
state = State::HasData;
}
Chunk pullData()
{
chassert(state == State::HasData);
state = State::NotNeeded;
return std::move(data);
}
/// 连接管理
void connect(Port & other)
{
chassert(!isConnected() && !other.isConnected());
connected_port = &other;
other.connected_port = this;
}
void disconnect()
{
if (connected_port)
{
connected_port->connected_port = nullptr;
connected_port = nullptr;
}
}
};
class InputPort : public Port
{
public:
InputPort(Block header_) : header(std::move(header_)) {}
/// 从连接的输出端口拉取数据
Chunk pull(bool set_not_needed = false)
{
chassert(isConnected());
chassert(hasData());
auto chunk = connected_port->pullData();
if (set_not_needed)
setNotNeeded();
return chunk;
}
/// 获取输出端口引用
OutputPort & getOutputPort()
{
chassert(isConnected());
return static_cast<OutputPort &>(*connected_port);
}
const Block & getHeader() const { return header; }
private:
Block header; /// 数据块结构描述
};
class OutputPort : public Port
{
public:
OutputPort(Block header_) : header(std::move(header_)) {}
/// 向连接的输入端口推送数据
void push(Chunk chunk)
{
chassert(isConnected());
chassert(canPush());
connected_port->pushData(std::move(chunk));
}
/// 检查是否可以推送数据
bool canPush() const
{
chassert(isConnected());
return connected_port->isNeeded();
}
/// 获取输入端口引用
InputPort & getInputPort()
{
chassert(isConnected());
return static_cast<InputPort &>(*connected_port);
}
const Block & getHeader() const { return header; }
private:
Block header; /// 数据块结构描述
};
3.1.2 管道连接机制
// src/QueryPipeline/QueryPipeline.h
class QueryPipeline
{
public:
QueryPipeline() = default;
QueryPipeline(QueryPipeline &&) = default;
QueryPipeline & operator=(QueryPipeline &&) = default;
/// 从单个处理器创建管道
explicit QueryPipeline(std::shared_ptr<IProcessor> source);
/// 从Pipe创建管道
explicit QueryPipeline(Pipe pipe);
/// 管道操作
void addTransform(ProcessorPtr transform);
void addSimpleTransform(const ProcessorGetter & getter);
void addChains(std::vector<Chain> chains);
/// 管道合并
static QueryPipeline unitePipelines(
std::vector<std::unique_ptr<QueryPipeline>> pipelines,
size_t max_threads_limit = 0,
Processors * collected_processors = nullptr);
/// 执行管道
PipelineExecutorPtr execute();
/// 获取管道信息
bool empty() const { return processors.empty(); }
bool initialized() const { return !processors.empty() || !pipe.empty(); }
size_t getNumStreams() const { return pipe.numOutputPorts(); }
const Block & getHeader() const { return pipe.getHeader(); }
private:
/// 处理器集合
Processors processors;
/// 管道表示
Pipe pipe;
/// 最大线程数
size_t max_threads = 0;
};
// 管道连接的核心逻辑
void connectPorts(OutputPort & output, InputPort & input)
{
/// 1. 检查端口兼容性
if (!blocksHaveEqualStructure(output.getHeader(), input.getHeader()))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot connect ports with different block structures. "
"Output header: {}, input header: {}",
output.getHeader().dumpStructure(),
input.getHeader().dumpStructure());
}
/// 2. 检查端口状态
if (output.isConnected())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Output port is already connected");
if (input.isConnected())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Input port is already connected");
/// 3. 建立连接
output.connect(input);
}
// 自动连接处理器
void connectProcessors(IProcessor & left, IProcessor & right)
{
auto & left_outputs = left.getOutputs();
auto & right_inputs = right.getInputs();
if (left_outputs.size() != right_inputs.size())
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot connect processors: different number of ports. "
"Left processor outputs: {}, right processor inputs: {}",
left_outputs.size(), right_inputs.size());
}
auto left_it = left_outputs.begin();
auto right_it = right_inputs.begin();
for (; left_it != left_outputs.end(); ++left_it, ++right_it)
{
connectPorts(*left_it, *right_it);
}
}
3.2 复杂处理器实现
3.2.1 聚合处理器
// src/Processors/Transforms/AggregatingTransform.h
class AggregatingTransform : public IProcessor
{
public:
AggregatingTransform(
Block header,
AggregatingTransformParamsPtr params_,
bool many_data_ = false)
: IProcessor({InputPort(header)}, {OutputPort(params_->getHeader())})
, params(std::move(params_))
, key_columns(params->params.keys_size)
, aggregate_columns(params->params.aggregates_size)
, many_data(many_data_)
{
/// 初始化聚合器
aggregator = std::make_unique<Aggregator>(params->params);
}
String getName() const override { return "AggregatingTransform"; }
Status prepare() override
{
auto & output = outputs.front();
auto & input = inputs.front();
/// 检查输出状态
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
/// 如果有输出数据,推送它
if (has_output)
{
output.push(std::move(output_chunk));
has_output = false;
if (finished)
{
output.finish();
return Status::Finished;
}
}
/// 检查输入状态
if (finished)
return Status::Ready;
if (input.isFinished())
{
if (is_consume_finished)
{
finished = true;
return Status::Ready;
}
is_consume_finished = true;
return Status::Ready;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
current_chunk = input.pull(true);
return Status::Ready;
}
void work() override
{
if (is_consume_finished)
{
/// 输出聚合结果
initGenerate();
}
else if (current_chunk)
{
/// 消费输入数据
consume(std::move(current_chunk));
}
}
private:
/// 消费输入数据块
void consume(Chunk chunk)
{
const auto & info = chunk.getChunkInfo();
if (const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get()))
{
/// 处理已聚合的数据
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
block = aggregator->mergeBlocks(agg_info->bucket_num, std::move(block), finished);
auto num_rows = block.rows();
chunk.setColumns(block.getColumns(), num_rows);
}
else
{
/// 处理原始数据
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
if (!aggregator->executeOnBlock(block, aggregated_data, key_columns, aggregate_columns, finished))
{
is_consume_finished = true;
}
}
}
/// 初始化结果生成
void initGenerate()
{
if (aggregated_data.empty())
{
finished = true;
return;
}
/// 转换聚合数据为输出块
auto block = aggregator->convertToBlocks(aggregated_data, finished, max_block_size);
if (block)
{
output_chunk.setColumns(block.getColumns(), block.rows());
has_output = true;
}
else
{
finished = true;
}
}
AggregatingTransformParamsPtr params;
std::unique_ptr<Aggregator> aggregator;
/// 聚合状态
AggregatedDataVariants aggregated_data;
ColumnNumbers key_columns;
ColumnNumbers aggregate_columns;
/// 处理状态
Chunk current_chunk;
Chunk output_chunk;
bool has_output = false;
bool finished = false;
bool is_consume_finished = false;
bool many_data = false;
size_t max_block_size = DEFAULT_BLOCK_SIZE;
};
3.2.2 排序处理器
// src/Processors/Transforms/SortingTransform.h
class SortingTransform : public IProcessor
{
public:
SortingTransform(
const Block & header,
const SortDescription & description_,
UInt64 max_merged_block_size_,
UInt64 limit_,
size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
size_t min_free_disk_space_)
: IProcessor({InputPort(header)}, {OutputPort(header)})
, description(description_)
, max_merged_block_size(max_merged_block_size_)
, limit(limit_)
, max_bytes_before_remerge(max_bytes_before_remerge_)
, remerge_lowered_memory_bytes_ratio(remerge_lowered_memory_bytes_ratio_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_)
, tmp_volume(tmp_volume_)
, min_free_disk_space(min_free_disk_space_)
{
/// 初始化排序器
sorter = std::make_unique<MergeSorter>(header, description, max_merged_block_size, limit);
}
String getName() const override { return "SortingTransform"; }
Status prepare() override
{
auto & input = inputs.front();
auto & output = outputs.front();
/// 检查输出状态
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
/// 输出数据
if (has_output)
{
output.push(std::move(output_chunk));
has_output = false;
if (stage == Stage::Finished)
{
output.finish();
return Status::Finished;
}
}
/// 处理不同阶段
switch (stage)
{
case Stage::Consume:
{
if (input.isFinished())
{
stage = Stage::Generate;
return Status::Ready;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
current_chunk = input.pull(true);
return Status::Ready;
}
case Stage::Generate:
return Status::Ready;
case Stage::Finished:
output.finish();
return Status::Finished;
}
return Status::Ready;
}
void work() override
{
switch (stage)
{
case Stage::Consume:
consume();
break;
case Stage::Generate:
generate();
break;
case Stage::Finished:
break;
}
}
private:
enum class Stage
{
Consume, /// 消费输入数据
Generate, /// 生成排序结果
Finished /// 完成
};
void consume()
{
if (!current_chunk)
return;
/// 检查内存使用
auto bytes = current_chunk.bytes();
sum_bytes_in_blocks += bytes;
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
/// 需要外部排序
if (!external_sorter)
{
external_sorter = std::make_unique<MergeSorter>(
getInputPort().getHeader(), description, max_merged_block_size, limit,
max_bytes_before_remerge, remerge_lowered_memory_bytes_ratio,
tmp_volume, min_free_disk_space);
}
external_sorter->addChunk(std::move(current_chunk));
}
else
{
/// 内存排序
auto block = getInputPort().getHeader().cloneWithColumns(current_chunk.detachColumns());
sorter->addBlock(block);
}
current_chunk.clear();
}
void generate()
{
if (external_sorter)
{
/// 从外部排序器获取结果
auto block = external_sorter->read();
if (block)
{
output_chunk.setColumns(block.getColumns(), block.rows());
has_output = true;
}
else
{
stage = Stage::Finished;
}
}
else
{
/// 从内存排序器获取结果
auto block = sorter->read();
if (block)
{
output_chunk.setColumns(block.getColumns(), block.rows());
has_output = true;
}
else
{
stage = Stage::Finished;
}
}
}
SortDescription description;
UInt64 max_merged_block_size;
UInt64 limit;
/// 内存管理参数
size_t max_bytes_before_remerge;
double remerge_lowered_memory_bytes_ratio;
size_t max_bytes_before_external_sort;
size_t sum_bytes_in_blocks = 0;
/// 临时存储
VolumePtr tmp_volume;
size_t min_free_disk_space;
/// 排序器
std::unique_ptr<MergeSorter> sorter;
std::unique_ptr<MergeSorter> external_sorter;
/// 处理状态
Stage stage = Stage::Consume;
Chunk current_chunk;
Chunk output_chunk;
bool has_output = false;
};
数据读写核心流程
4.1 数据写入流程详解
4.1.1 INSERT语句处理
// src/Interpreters/InterpreterInsertQuery.h
class InterpreterInsertQuery : public IInterpreter
{
public:
InterpreterInsertQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
bool allow_materialized_ = false,
bool no_squash_ = false,
bool no_destination_ = false,
bool async_insert_ = false);
BlockIO execute() override;
private:
/// 创建插入链
Chain buildChain(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Names & columns);
/// 处理异步插入
BlockIO executeAsyncInsert();
ASTPtr query_ptr;
ContextPtr context;
bool allow_materialized = false;
bool no_squash = false;
bool no_destination = false;
bool async_insert = false;
};
// src/Interpreters/InterpreterInsertQuery.cpp
BlockIO InterpreterInsertQuery::execute()
{
const auto & query = query_ptr->as<ASTInsertQuery &>();
/// 1. 获取目标表
StoragePtr table = DatabaseCatalog::instance().getTable(
StorageID(query.getDatabase(), query.getTable()), context);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
/// 2. 检查权限
auto table_id = table->getStorageID();
context->checkAccess(AccessType::INSERT, table_id, metadata_snapshot->getColumnsRequiredForInsert());
/// 3. 处理异步插入
if (async_insert)
return executeAsyncInsert();
/// 4. 构建插入管道
BlockIO res;
/// 获取插入的列
Names columns = query.columns ? query.columns->getNames() : metadata_snapshot->getColumns().getNamesOfPhysical();
/// 创建插入链
auto chain = buildChain(table, metadata_snapshot, columns);
/// 5. 处理数据源
if (query.data)
{
/// 直接插入数据
res = table->write(query_ptr, metadata_snapshot, context, async_insert);
}
else if (query.select)
{
/// 从SELECT插入数据
InterpreterSelectQuery interpreter_select(query.select, context, SelectQueryOptions().analyze());
auto select_pipeline = interpreter_select.buildQueryPipeline();
/// 连接SELECT和INSERT管道
select_pipeline.addChain(std::move(chain));
res.pipeline = std::move(select_pipeline);
}
else
{
/// 从输入流插入数据
res.pipeline.init(Pipe(std::make_shared<NullSource>(metadata_snapshot->getSampleBlock())));
res.pipeline.addChain(std::move(chain));
}
return res;
}
Chain InterpreterInsertQuery::buildChain(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Names & columns)
{
Chain chain;
/// 1. 添加类型转换
if (context->getSettingsRef().input_format_defaults_for_omitted_fields)
{
auto adding_defaults_transform = std::make_shared<AddingDefaultsTransform>(
metadata_snapshot->getSampleBlock(), columns, *metadata_snapshot, context);
chain.addSource(std::move(adding_defaults_transform));
}
/// 2. 添加数据压缩(如果需要)
if (!no_squash && context->getSettingsRef().min_insert_block_size_rows)
{
auto squashing_transform = std::make_shared<SquashingChunksTransform>(
chain.getInputHeader(),
context->getSettingsRef().min_insert_block_size_rows,
context->getSettingsRef().min_insert_block_size_bytes);
chain.addSource(std::move(squashing_transform));
}
/// 3. 添加存储写入器
if (!no_destination)
{
auto sink = table->write(query_ptr, metadata_snapshot, context, async_insert);
chain.addSink(std::move(sink));
}
return chain;
}
4.1.2 MergeTree写入实现
// src/Storages/MergeTree/MergeTreeSink.cpp
class MergeTreeSink : public SinkToStorage
{
public:
MergeTreeSink(
StorageMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot_,
size_t max_parts_per_block_,
ContextPtr context_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, max_parts_per_block(max_parts_per_block_)
, context(context_) {}
String getName() const override { return "MergeTreeSink"; }
protected:
void consume(Chunk chunk) override
{
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
/// 1. 分区数据
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
/// 2. 写入各个分区
for (auto & part_block : part_blocks)
{
/// 创建临时数据部分
auto temp_part = storage.writer.writeTempPart(part_block, metadata_snapshot, context);
/// 添加到存储
storage.renameTempPartAndAdd(temp_part, nullptr, &storage.increment);
}
/// 3. 触发后台合并(如果需要)
storage.background_operations_assignee->trigger();
}
private:
StorageMergeTree & storage;
StorageMetadataPtr metadata_snapshot;
size_t max_parts_per_block;
ContextPtr context;
};
// 数据部分写入的详细实现
MergeTreeMutableDataPartPtr MergeTreeDataWriter::writeTempPart(
BlockWithPartition & block_with_partition,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context)
{
Block & block = block_with_partition.block;
/// 1. 验证数据块
metadata_snapshot->check(block, true);
/// 2. 生成数据部分名称
auto part_name = data.getPartName(
block_with_partition.partition,
block_with_partition.min_block,
block_with_partition.max_block,
block_with_partition.level);
/// 3. 选择存储卷
auto volume = data.getStoragePolicy()->getVolume(0);
/// 4. 创建临时数据部分
auto new_data_part = data.createPart(
part_name,
choosePartType(block.bytes(), data.getSettings()->min_bytes_for_wide_part),
MergeTreePartInfo::fromPartName(part_name, data.format_version),
volume,
part_name);
/// 5. 设置分区信息
new_data_part->partition = std::move(block_with_partition.partition);
new_data_part->minmax_idx->update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
/// 6. 计算索引粒度
MergeTreeIndexGranularity index_granularity;
computeGranularity(block, index_granularity, context->getSettingsRef());
/// 7. 创建数据写入器
auto writer = new_data_part->getWriter(
block.getNamesAndTypesList(),
metadata_snapshot,
data.getIndicesDescription(),
data.getCompressionCodecForPart(new_data_part->info.level, new_data_part->info.mutation, context),
MergeTreeWriterSettings(context->getSettingsRef(), data.getSettings()),
index_granularity);
/// 8. 写入数据
writer->write(block);
/// 9. 完成写入
writer->finishDataSerialization(context->getSettingsRef().fsync_part_directory);
writer->finishPrimaryIndexSerialization(context->getSettingsRef().fsync_part_directory);
writer->finishSkipIndicesSerialization(context->getSettingsRef().fsync_part_directory);
/// 10. 设置校验和
new_data_part->checksums = writer->releaseChecksums();
new_data_part->setBytesOnDisk(new_data_part->checksums.getTotalSizeOnDisk());
new_data_part->rows_count = block.rows();
new_data_part->modification_time = time(nullptr);
return new_data_part;
}
4.2 数据读取流程详解
4.2.1 SELECT语句处理
// src/Interpreters/InterpreterSelectQuery.cpp
BlockIO InterpreterSelectQuery::execute()
{
/// 1. 构建查询计划
QueryPlan query_plan;
buildQueryPlan(query_plan);
/// 2. 优化查询计划
QueryPlanOptimizationSettings optimization_settings = QueryPlanOptimizationSettings::fromContext(context);
query_plan.optimize(optimization_settings);
/// 3. 构建查询管道
auto builder = query_plan.buildQueryPipeline(
optimization_settings,
BuildQueryPipelineSettings::fromContext(context));
/// 4. 返回执行结果
BlockIO res;
res.pipeline = std::move(*builder);
/// 5. 设置限制和配额
if (context->hasQueryContext())
{
res.pipeline.setLimitsAndQuota(
context->getQueryContext()->getStreamingLimits(),
context->getQueryContext()->getQuota());
}
return res;
}
void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
{
const auto & query = getSelectQuery();
/// 1. 分析查询表达式
analyzeExpressions(QueryProcessingStage::FetchColumns, false, Block{});
/// 2. 从存储读取数据
if (storage && !options.only_analyze)
{
/// 构建存储读取步骤
auto read_step = std::make_unique<ReadFromStorageStep>(
storage,
query_analyzer->requiredSourceColumns(),
storage_snapshot,
query_info,
context,
processing_stage,
max_block_size,
max_streams);
query_plan.addStep(std::move(read_step));
}
else
{
/// 创建空数据源
auto header = query_analyzer->getSampleBlock();
auto read_nothing = std::make_unique<ReadNothingStep>(header);
query_plan.addStep(std::move(read_nothing));
}
/// 3. 添加WHERE过滤
if (query_analyzer->hasWhere())
{
auto where_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
query_analyzer->where(),
query_analyzer->where()->getColumnName(),
true);
query_plan.addStep(std::move(where_step));
}
/// 4. 添加聚合
if (query_analyzer->hasAggregation())
{
auto aggregating_step = std::make_unique<AggregatingStep>(
query_plan.getCurrentDataStream(),
query_analyzer->aggregationKeys(),
query_analyzer->aggregates(),
query_analyzer->groupingSetsParams(),
true, /// final
max_block_size,
context->getSettingsRef().aggregation_in_order_max_block_bytes,
merge_threads,
temporary_data_merge_threads,
context->getSettingsRef().enable_software_prefetch_in_aggregation,
context->getSettingsRef().only_merge_for_aggregation_in_order,
query_analyzer->aggregationShouldProduceResultInOrderOfPrimaryKey());
query_plan.addStep(std::move(aggregating_step));
}
/// 5. 添加HAVING过滤
if (query_analyzer->hasHaving())
{
auto having_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
query_analyzer->having(),
query_analyzer->having()->getColumnName(),
false);
query_plan.addStep(std::move(having_step));
}
/// 6. 添加ORDER BY排序
if (query_analyzer->hasOrderBy())
{
auto sorting_step = std::make_unique<SortingStep>(
query_plan.getCurrentDataStream(),
query_analyzer->orderByDescription(),
query.limitLength(),
SortingStep::Settings(context->getSettingsRef()),
context->getSettingsRef().optimize_sorting_by_input_stream_properties);
query_plan.addStep(std::move(sorting_step));
}
/// 7. 添加LIMIT限制
if (query.limitLength())
{
auto limit_step = std::make_unique<LimitStep>(
query_plan.getCurrentDataStream(),
query.limitLength(),
query.limitOffset());
query_plan.addStep(std::move(limit_step));
}
/// 8. 添加投影
if (query_analyzer->hasProjection())
{
auto expression_step = std::make_unique<ExpressionStep>(
query_plan.getCurrentDataStream(),
query_analyzer->projection());
query_plan.addStep(std::move(expression_step));
}
}
4.2.2 MergeTree读取实现
// src/Storages/MergeTree/MergeTreeRangeReader.h
class MergeTreeRangeReader
{
public:
struct ReadResult
{
/// 读取的数据块
Block block;
/// 读取的行数
size_t num_rows = 0;
/// 过滤信息
ColumnPtr filter;
size_t num_rows_after_filter = 0;
/// 是否需要更多数据
bool need_more_data = false;
};
MergeTreeRangeReader(
IMergeTreeReader * merge_tree_reader_,
MergeTreeRangeReader * prev_reader_,
const PrewhereInfoPtr & prewhere_info_,
bool last_reader_in_chain_);
/// 读取指定行数的数据
ReadResult read(size_t max_rows, MarkRanges & ranges);
private:
/// 执行PREWHERE过滤
void executePrewhereActionsAndFilterColumns(ReadResult & result);
/// 读取所需的列
size_t readRows(size_t max_rows, MarkRanges & ranges, ReadResult & result);
IMergeTreeReader * merge_tree_reader = nullptr;
MergeTreeRangeReader * prev_reader = nullptr;
PrewhereInfoPtr prewhere_info;
bool last_reader_in_chain = false;
/// 列缓存
std::unordered_map<String, ColumnPtr> column_cache;
};
// src/Storages/MergeTree/MergeTreeRangeReader.cpp
MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, MarkRanges & ranges)
{
ReadResult result;
if (ranges.empty())
return result;
/// 1. 从前一个读取器获取数据(如果有)
if (prev_reader)
{
result = prev_reader->read(max_rows, ranges);
if (result.num_rows == 0)
return result;
/// 2. 执行当前层的PREWHERE过滤
if (prewhere_info)
{
executePrewhereActionsAndFilterColumns(result);
/// 如果过滤后没有行,继续读取
if (result.num_rows_after_filter == 0 && !ranges.empty())
return read(max_rows, ranges);
}
}
else
{
/// 3. 直接从存储读取数据
result.num_rows = readRows(max_rows, ranges, result);
if (result.num_rows == 0)
return result;
/// 4. 执行PREWHERE过滤
if (prewhere_info)
{
executePrewhereActionsAndFilterColumns(result);
}
else
{
result.num_rows_after_filter = result.num_rows;
}
}
return result;
}
size_t MergeTreeRangeReader::readRows(size_t max_rows, MarkRanges & ranges, ReadResult & result)
{
size_t read_rows = 0;
Columns columns;
/// 1. 准备列容器
const auto & header = merge_tree_reader->getColumns();
columns.resize(header.size());
/// 2. 逐个标记范围读取数据
while (read_rows < max_rows && !ranges.empty())
{
auto & range = ranges.front();
/// 计算本次读取的行数
size_t rows_to_read = std::min(max_rows - read_rows, range.end - range.begin);
/// 从MergeTree读取器读取数据
size_t rows_read = merge_tree_reader->readRows(
range.begin,
range.end,
read_rows > 0, /// continue_reading
rows_to_read,
columns);
read_rows += rows_read;
range.begin += rows_read;
/// 如果范围读取完毕,移除它
if (range.begin >= range.end)
ranges.pop_front();
/// 如果读取的行数少于预期,说明数据部分读取完毕
if (rows_read < rows_to_read)
break;
}
/// 3. 构建结果块
if (read_rows > 0)
{
result.block = header.cloneWithColumns(std::move(columns));
}
return read_rows;
}
void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & result)
{
if (!prewhere_info || !result.block)
return;
const auto & prewhere_actions = prewhere_info->prewhere_actions;
const auto & prewhere_column_name = prewhere_info->prewhere_column_name;
/// 1. 执行PREWHERE表达式
prewhere_actions->execute(result.block);
/// 2. 获取过滤列
auto filter_column = result.block.getByName(prewhere_column_name).column;
/// 3. 应用过滤器
if (const auto * const_column = typeid_cast<const ColumnConst *>(filter_column.get()))
{
/// 常量过滤器
if (const_column->getValue<UInt8>())
{
/// 全部保留
result.num_rows_after_filter = result.num_rows;
}
else
{
/// 全部过滤
result.num_rows_after_filter = 0;
result.block.clear();
}
}
else
{
/// 变量过滤器
const auto & filter_data = typeid_cast<const ColumnUInt8 &>(*filter_column).getData();
/// 计算过滤后的行数
result.num_rows_after_filter = countBytesInFilter(filter_data);
if (result.num_rows_after_filter == 0)
{
result.block.clear();
}
else if (result.num_rows_after_filter < result.num_rows)
{
/// 应用过滤器到所有列
for (auto & column : result.block)
{
column.column = column.column->filter(filter_data, result.num_rows_after_filter);
}
}
/// 保存过滤器用于后续处理
result.filter = std::move(filter_column);
}
/// 4. 移除PREWHERE列(如果不需要)
if (prewhere_info->remove_prewhere_column)
{
result.block.erase(prewhere_column_name);
}
}
这份详细的代码分析文档深入剖析了ClickHouse的核心组件实现,包括:
- 处理器框架:详细分析了IProcessor接口设计、管道执行器、不同类型处理器的实现
- MergeTree存储引擎:深入解析了数据部分管理、读写器实现、合并机制
- 查询处理流程:分析了端口通信机制、复杂处理器实现
- 数据读写流程:详细说明了INSERT和SELECT的完整处理链路
每个部分都包含了关键函数的完整代码实现和详细的功能说明,帮助开发者深入理解ClickHouse的内部工作机制。