核心执行引擎详细分析

1.1 IProcessor处理器基础框架

1.1.1 IProcessor核心接口设计

// src/Processors/IProcessor.h
class IProcessor
{
public:
    /// 处理器状态枚举
    enum class Status
    {
        NeedData,        /// 需要更多输入数据
        PortFull,        /// 输出端口已满
        Finished,        /// 处理完成
        Ready,           /// 准备处理
        Async,           /// 异步处理中
        ExpandPipeline   /// 需要扩展管道
    };

protected:
    InputPorts inputs;   /// 输入端口列表
    OutputPorts outputs; /// 输出端口列表

public:
    /// 核心处理方法 - 准备阶段
    virtual Status prepare() = 0;
    
    /// 核心处理方法 - 工作阶段
    virtual void work();
    
    /// 异步调度方法
    virtual int schedule();
    
    /// 管道扩展方法
    virtual Processors expandPipeline();
    
    /// 取消处理
    virtual void cancel() noexcept;
    
    /// 获取处理器名称
    virtual String getName() const = 0;
};

关键设计理念:

  • 端口模式:通过InputPort和OutputPort进行数据传输
  • 状态机模式:使用Status枚举管理处理器状态
  • 异步支持:支持异步处理和调度
  • 可扩展性:支持动态管道扩展

1.1.2 处理器生命周期管理

// src/Processors/IProcessor.cpp
IProcessor::IProcessor()
{
    // 分配处理器索引,用于调试和监控
    processor_index = CurrentThread::isInitialized() 
        ? CurrentThread::get().getNextPipelineProcessorIndex() 
        : 0;
}

IProcessor::IProcessor(InputPorts inputs_, OutputPorts outputs_) 
    : inputs(std::move(inputs_)), outputs(std::move(outputs_))
{
    // 建立端口与处理器的双向关联
    for (auto & port : inputs)
        port.processor = this;
    for (auto & port : outputs)
        port.processor = this;
        
    processor_index = CurrentThread::isInitialized() 
        ? CurrentThread::get().getNextPipelineProcessorIndex() 
        : 0;
}

void IProcessor::cancel() noexcept
{
    // 原子操作确保取消操作的线程安全性
    bool already_cancelled = is_cancelled.exchange(true, std::memory_order_acq_rel);
    if (already_cancelled)
        return;

    onCancel(); // 子类可重写此方法进行清理
}

1.2 管道执行器架构

1.2.1 PipelineExecutor核心实现

// src/Processors/Executors/PipelineExecutor.h
class PipelineExecutor
{
public:
    /// 构造函数:构建执行图
    explicit PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatusPtr elem);
    
    /// 多线程执行管道
    void execute(size_t num_threads, bool concurrency_control);
    
    /// 单步执行(用于调试和控制)
    bool executeStep(std::atomic_bool * yield_flag = nullptr);

private:
    /// 执行图表示
    ExecutingGraphPtr graph;
    
    /// 任务队列
    ExecutorTasks tasks;
    
    /// 执行状态
    std::atomic<ExecutionStatus> execution_status{ExecutionStatus::EXECUTING};
};

// src/Processors/Executors/PipelineExecutor.cpp
void PipelineExecutor::execute(size_t num_threads, bool concurrency_control)
{
    checkTimeLimit();
    num_threads = std::max<size_t>(num_threads, 1);

    OpenTelemetry::SpanHolder span("PipelineExecutor::execute()");
    span.addAttribute("clickhouse.thread_num", num_threads);

    try
    {
        executeImpl(num_threads, concurrency_control);

        /// 记录所有LOGICAL_ERROR异常
        for (auto & node : graph->nodes)
            if (node->exception && getExceptionErrorCode(node->exception) == ErrorCodes::LOGICAL_ERROR)
                tryLogException(node->exception, log);

        /// 重新抛出第一个异常
        for (auto & node : graph->nodes)
            if (node->exception)
                std::rethrow_exception(node->exception);

        /// 处理执行线程中的异常
        tasks.rethrowFirstThreadException();
    }
    catch (...)
    {
        span.addAttribute(DB::ExecutionStatus::fromCurrentException());
        
#ifndef NDEBUG
        LOG_TRACE(log, "Exception while executing query. Current state:\n{}", dumpPipeline());
#endif
        throw;
    }

    finalizeExecution();
}

1.2.2 执行图构建与调度

// src/Processors/Executors/ExecutingGraph.h
class ExecutingGraph
{
public:
    struct Node
    {
        ProcessorPtr processor;
        std::vector<Node *> direct_edges;     /// 直接依赖的节点
        std::vector<Node *> back_edges;       /// 反向依赖的节点
        
        /// 状态管理
        std::atomic<bool> status{false};
        std::exception_ptr exception;
        
        /// 调度信息
        std::atomic<UInt64> num_executed_jobs{0};
        std::atomic<UInt64> finish_time{0};
    };
    
    std::vector<std::unique_ptr<Node>> nodes;
    
    /// 构建执行图
    void buildGraph(Processors & processors);
    
    /// 获取可执行的节点
    std::vector<Node *> getReadyNodes();
    
    /// 更新节点状态
    void updateNodeStatus(Node * node, IProcessor::Status status);
};

void ExecutingGraph::buildGraph(Processors & processors)
{
    /// 1. 为每个处理器创建节点
    nodes.reserve(processors.size());
    std::unordered_map<IProcessor *, Node *> processor_to_node;
    
    for (auto & processor : processors)
    {
        auto node = std::make_unique<Node>();
        node->processor = processor;
        processor_to_node[processor.get()] = node.get();
        nodes.emplace_back(std::move(node));
    }
    
    /// 2. 建立节点间的依赖关系
    for (auto & node : nodes)
    {
        for (auto & input_port : node->processor->getInputs())
        {
            if (input_port.isConnected())
            {
                auto * output_processor = &input_port.getOutputPort().getProcessor();
                auto * output_node = processor_to_node[output_processor];
                
                /// 建立双向连接
                node->back_edges.push_back(output_node);
                output_node->direct_edges.push_back(node.get());
            }
        }
    }
    
    /// 3. 验证图的正确性
    validateGraph();
}

1.3 不同类型的处理器实现

1.3.1 Source处理器(数据源)

// src/Processors/Sources/SourceFromSingleChunk.h
class SourceFromSingleChunk : public ISource
{
public:
    SourceFromSingleChunk(Block header, Chunk chunk_)
        : ISource(std::move(header)), chunk(std::move(chunk_)) {}

    String getName() const override { return "SourceFromSingleChunk"; }

protected:
    Chunk generate() override
    {
        if (chunk)
        {
            auto res = std::move(chunk);
            chunk.clear();
            return res;
        }
        return {};
    }

private:
    Chunk chunk;
};

// ISource基类实现
class ISource : public IProcessor
{
public:
    ISource(Block header) : IProcessor({}, {OutputPort(std::move(header))}) {}

    Status prepare() override
    {
        if (finished)
        {
            output.finish();
            return Status::Finished;
        }

        if (output.isFinished())
        {
            finished = true;
            return Status::Finished;
        }

        if (!output.canPush())
            return Status::PortFull;

        if (!has_input)
            return Status::Ready;

        output.push(std::move(current_chunk));
        has_input = false;

        if (got_exception)
        {
            finished = true;
            output.finish();
            std::rethrow_exception(got_exception);
        }

        return Status::PortFull;
    }

    void work() override
    {
        try
        {
            current_chunk = generate();
            if (!current_chunk)
            {
                finished = true;
                return;
            }
            has_input = true;
        }
        catch (...)
        {
            finished = true;
            got_exception = std::current_exception();
        }
    }

protected:
    virtual Chunk generate() = 0;

private:
    OutputPort & output = outputs.front();
    bool finished = false;
    bool has_input = false;
    Chunk current_chunk;
    std::exception_ptr got_exception;
};

1.3.2 Transform处理器(数据转换)

// src/Processors/Transforms/ExpressionTransform.h
class ExpressionTransform : public ISimpleTransform
{
public:
    ExpressionTransform(const Block & header_, ExpressionActionsPtr expression_)
        : ISimpleTransform(header_, expression_->getResultColumns().cloneEmpty(), false)
        , expression(std::move(expression_))
    {
        /// 检查表达式是否会改变行数
        const auto & actions = expression->getActions();
        for (const auto & action : actions)
        {
            if (action.node->type == ActionsDAG::ActionType::ARRAY_JOIN)
            {
                throw Exception(ErrorCodes::LOGICAL_ERROR, 
                    "ARRAY JOIN is not supported in ExpressionTransform");
            }
        }
    }

    String getName() const override { return "ExpressionTransform"; }

protected:
    void transform(Chunk & chunk) override
    {
        size_t num_rows = chunk.getNumRows();
        auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
        
        /// 执行表达式计算
        expression->execute(block, num_rows);
        
        chunk.setColumns(block.getColumns(), num_rows);
    }

private:
    ExpressionActionsPtr expression;
};

// ISimpleTransform基类实现
class ISimpleTransform : public IProcessor
{
public:
    ISimpleTransform(Block input_header, Block output_header, bool skip_empty_chunks_ = true)
        : IProcessor({InputPort(std::move(input_header))}, {OutputPort(std::move(output_header))})
        , input(inputs.front())
        , output(outputs.front())
        , skip_empty_chunks(skip_empty_chunks_) {}

    Status prepare() override
    {
        /// 检查输出端口状态
        if (output.isFinished())
        {
            input.close();
            return Status::Finished;
        }

        if (!output.canPush())
        {
            input.setNotNeeded();
            return Status::PortFull;
        }

        /// 检查输入端口状态
        if (has_output)
        {
            output.push(std::move(current_chunk));
            has_output = false;
        }

        if (finished_output)
        {
            output.finish();
            return Status::Finished;
        }

        if (has_input)
            return Status::Ready;

        if (input.isFinished())
        {
            finished_output = true;
            return Status::Ready;
        }

        input.setNeeded();
        if (!input.hasData())
            return Status::NeedData;

        current_chunk = input.pull(true);
        has_input = true;
        return Status::Ready;
    }

    void work() override
    {
        if (has_input)
        {
            transform(current_chunk);
            
            if (skip_empty_chunks && current_chunk.getNumRows() == 0)
            {
                has_input = false;
                return;
            }
            
            has_input = false;
            has_output = true;
        }
        else if (finished_input)
        {
            finished_output = true;
        }
    }

protected:
    virtual void transform(Chunk & chunk) = 0;

private:
    InputPort & input;
    OutputPort & output;
    
    Chunk current_chunk;
    bool has_input = false;
    bool has_output = false;
    bool finished_input = false;
    bool finished_output = false;
    
    const bool skip_empty_chunks = true;
};

1.3.3 Sink处理器(数据输出)

// src/Processors/Sinks/SinkToStorage.h
class SinkToStorage : public ExceptionKeepingTransform
{
public:
    explicit SinkToStorage(const Block & header) : ExceptionKeepingTransform(header, {}) {}

    String getName() const override { return "SinkToStorage"; }

protected:
    void onConsume(Chunk chunk) override
    {
        if (!chunk)
            return;

        cur_chunk = std::move(chunk);
        consume(cur_chunk);
    }

    GenerateResult onGenerate() override
    {
        /// Sink不产生输出数据
        return {Chunk{}, false};
    }

    void onFinish() override
    {
        finalize();
    }

    /// 子类需要实现的方法
    virtual void consume(Chunk chunk) = 0;
    virtual void finalize() {}

private:
    Chunk cur_chunk;
};

// 具体的存储Sink实现示例
class MergeTreeSink : public SinkToStorage
{
public:
    MergeTreeSink(
        StorageMergeTree & storage_,
        const StorageMetadataPtr & metadata_snapshot_,
        size_t max_parts_per_block_,
        ContextPtr context_)
        : SinkToStorage(metadata_snapshot_->getSampleBlock())
        , storage(storage_)
        , metadata_snapshot(metadata_snapshot_)
        , max_parts_per_block(max_parts_per_block_)
        , context(context_) {}

    String getName() const override { return "MergeTreeSink"; }

protected:
    void consume(Chunk chunk) override
    {
        auto block = getHeader().cloneWithColumns(chunk.detachColumns());
        
        /// 写入数据到MergeTree
        storage.write(block, context);
    }

    void finalize() override
    {
        /// 完成写入,触发后台合并
        storage.flushAndPrepareForShutdown();
    }

private:
    StorageMergeTree & storage;
    StorageMetadataPtr metadata_snapshot;
    size_t max_parts_per_block;
    ContextPtr context;
};

MergeTree存储引擎深度解析

2.1 MergeTree数据组织结构

2.1.1 数据部分(DataPart)核心设计

// src/Storages/MergeTree/IMergeTreeDataPart.h
class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPart>
{
public:
    /// 数据部分状态枚举
    enum class State
    {
        Temporary,       /// 临时状态,正在写入
        PreCommitted,    /// 预提交状态  
        Committed,       /// 已提交状态
        Outdated,        /// 过时状态,等待删除
        Deleting,        /// 正在删除
        DeleteOnDestroy  /// 析构时删除
    };
    
    /// 数据部分类型
    enum class Type
    {
        WIDE,           /// 宽格式(每列一个文件)
        COMPACT,        /// 紧凑格式(所有列在一个文件中)
        IN_MEMORY,      /// 内存格式
        UNKNOWN         /// 未知格式
    };

    /// 列大小统计
    struct ColumnSize
    {
        size_t marks = 0;                /// 标记数量
        size_t data_compressed = 0;      /// 压缩后大小
        size_t data_uncompressed = 0;    /// 压缩前大小
        
        void addToTotalSize(ColumnSize & total_size) const
        {
            total_size.marks += marks;
            total_size.data_compressed += data_compressed;
            total_size.data_uncompressed += data_uncompressed;
        }
    };

protected:
    /// 基本信息
    String name;                        /// 数据部分名称
    MergeTreePartInfo info;            /// 分区信息
    const MergeTreeData & storage;     /// 存储引擎引用
    VolumePtr volume;                  /// 存储卷

    /// 元数据
    mutable ColumnsDescription columns;              /// 列描述
    mutable SerializationInfoByName serialization_infos; /// 序列化信息
    mutable VersionMetadata version;                 /// 版本元数据

    /// 统计信息
    size_t rows_count = 0;             /// 行数
    size_t bytes_on_disk = 0;          /// 磁盘占用字节数
    mutable ColumnSizeByName columns_sizes; /// 各列大小统计

    /// 索引信息
    mutable IndexGranularity index_granularity;     /// 索引粒度
    size_t index_granularity_bytes = 0;             /// 索引粒度字节数

    /// 分区信息
    String partition_id;               /// 分区ID
    MergeTreePartition partition;      /// 分区值

    /// 校验和
    mutable Checksums checksums;       /// 文件校验和

    /// 状态管理
    mutable std::atomic<State> state{State::Temporary};
    mutable std::mutex state_mutex;

public:
    /// 获取数据读取器
    virtual std::shared_ptr<IMergeTreeReader> getReader(
        const NamesAndTypesList & columns_to_read,
        const StorageMetadataPtr & metadata_snapshot,
        const MarkRanges & mark_ranges,
        UncompressedCache * uncompressed_cache,
        MarkCache * mark_cache,
        const MergeTreeReaderSettings & reader_settings,
        const ValueSizeMap & avg_value_size_hints = {},
        const ReadBufferFromFileBase::ProfileCallback & profile_callback = {}) const = 0;

    /// 获取数据写入器
    virtual std::shared_ptr<IMergeTreeDataPartWriter> getWriter(
        const NamesAndTypesList & columns_list,
        const StorageMetadataPtr & metadata_snapshot,
        const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
        const CompressionCodecPtr & default_codec_,
        const MergeTreeWriterSettings & writer_settings,
        const MergeTreeIndexGranularity & computed_index_granularity) const = 0;

    /// 加载元数据
    void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
    void loadIndex();
    void loadPartitionAndMinMaxIndex();
    void loadChecksums(bool require);
    void loadRowsCount();

    /// 校验数据完整性
    void checkConsistency(bool require_part_metadata) const;
    
    /// 计算各列大小
    void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const;
};

2.1.2 数据读取器实现

// src/Storages/MergeTree/IMergeTreeReader.h
class IMergeTreeReader : private boost::noncopyable
{
public:
    using DeserializeBinaryBulkStateMap = std::unordered_map<std::string, ISerialization::DeserializeBinaryBulkStatePtr>;
    using FileStreams = std::map<std::string, std::unique_ptr<MergeTreeReaderStream>>;

    IMergeTreeReader(
        MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
        const NamesAndTypesList & columns_,
        const VirtualFields & virtual_fields_,
        const StorageSnapshotPtr & storage_snapshot_,
        UncompressedCache * uncompressed_cache_,
        MarkCache * mark_cache_,
        const MarkRanges & all_mark_ranges_,
        const MergeTreeReaderSettings & settings_,
        const ValueSizeMap & avg_value_size_hints_ = ValueSizeMap{});

    /// 读取指定范围的数据行
    virtual size_t readRows(
        size_t from_mark, 
        size_t current_task_last_mark,
        bool continue_reading, 
        size_t max_rows_to_read, 
        Columns & res_columns) = 0;

    /// 预读取,用于异步IO优化
    virtual bool canReadIncompleteGranules() const = 0;

protected:
    /// 初始化文件流
    void addStreams(
        const NameAndTypePair & name_and_type,
        const ReadBufferFromFileBase::ProfileCallback & profile_callback);

    /// 读取数据流
    void readData(
        const NameAndTypePair & name_and_type,
        ColumnPtr & column,
        size_t from_mark,
        bool continue_reading,
        size_t current_task_last_mark,
        size_t max_rows_to_read,
        ISerialization::SubstreamsCache & cache,
        bool was_prefetched = false);

    MergeTreeDataPartInfoForReaderPtr data_part_info_for_read;
    const NamesAndTypesList columns;
    const VirtualFields virtual_fields;
    UncompressedCache * uncompressed_cache;
    MarkCache * mark_cache;
    MarkRanges all_mark_ranges;
    MergeTreeReaderSettings settings;
    
    /// 文件流管理
    FileStreams file_streams;
    
    /// 反序列化状态
    DeserializeBinaryBulkStateMap deserialize_binary_bulk_state_map;
};

// 具体的宽格式读取器实现
class MergeTreeReaderWide : public IMergeTreeReader
{
public:
    MergeTreeReaderWide(/* 参数列表 */) : IMergeTreeReader(/* 参数传递 */) {}

    size_t readRows(
        size_t from_mark, 
        size_t current_task_last_mark,
        bool continue_reading, 
        size_t max_rows_to_read, 
        Columns & res_columns) override
    {
        size_t read_rows = 0;
        
        /// 遍历所有需要读取的列
        for (size_t pos = 0; pos < columns.size(); ++pos)
        {
            const auto & name_and_type = columns[pos];
            
            if (!res_columns[pos])
                res_columns[pos] = name_and_type.type->createColumn();

            /// 从指定标记开始读取数据
            ISerialization::SubstreamsCache cache;
            readData(name_and_type, res_columns[pos], from_mark, continue_reading, 
                    current_task_last_mark, max_rows_to_read, cache);
        }

        /// 所有列应该读取相同的行数
        if (!res_columns.empty())
            read_rows = res_columns[0]->size();

        return read_rows;
    }

    bool canReadIncompleteGranules() const override { return true; }
};

2.1.3 数据写入器实现

// src/Storages/MergeTree/MergeTreeDataWriter.h
class MergeTreeDataWriter
{
public:
    MergeTreeDataWriter(MergeTreeData & data_) : data(data_), log(getLogger("MergeTreeDataWriter")) {}

    /// 写入数据块,返回创建的数据部分
    MergeTreeMutableDataPartPtr writeTempPart(
        BlockWithPartition & block_with_partition,
        const StorageMetadataPtr & metadata_snapshot,
        ContextPtr context);

private:
    /// 创建新的数据部分
    MergeTreeMutableDataPartPtr createPart(
        const String & part_name,
        const MergeTreeDataPartType & part_type,
        const MergeTreePartInfo & part_info,
        const VolumePtr & volume,
        const String & relative_path = "");

    /// 写入数据到磁盘
    void writeDataPart(
        MergeTreeMutableDataPartPtr & new_data_part,
        const Block & block,
        const StorageMetadataPtr & metadata_snapshot,
        ContextPtr context);

    MergeTreeData & data;
    LoggerPtr log;
};

// src/Storages/MergeTree/MergeTreeDataWriter.cpp
MergeTreeMutableDataPartPtr MergeTreeDataWriter::writeTempPart(
    BlockWithPartition & block_with_partition,
    const StorageMetadataPtr & metadata_snapshot,
    ContextPtr context)
{
    Block & block = block_with_partition.block;
    
    /// 1. 生成数据部分信息
    auto part_info = MergeTreePartInfo::fromPartName(
        data.getPartName(block_with_partition.partition), 
        data.format_version);
    
    /// 2. 选择存储卷
    auto volume = data.getStoragePolicy()->getVolume(0);
    
    /// 3. 创建新的数据部分
    auto new_data_part = createPart(
        data.getPartName(block_with_partition.partition),
        MergeTreeDataPartType::WIDE,
        part_info,
        volume);

    /// 4. 设置分区信息
    new_data_part->partition = std::move(block_with_partition.partition);
    new_data_part->minmax_idx->update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));

    /// 5. 写入数据到磁盘
    writeDataPart(new_data_part, block, metadata_snapshot, context);

    /// 6. 完成数据部分创建
    new_data_part->rows_count = block.rows();
    new_data_part->modification_time = time(nullptr);
    new_data_part->loadColumnsChecksumsIndexes(false, true);
    new_data_part->setBytesOnDisk(new_data_part->checksums.getTotalSizeOnDisk());

    return new_data_part;
}

void MergeTreeDataWriter::writeDataPart(
    MergeTreeMutableDataPartPtr & new_data_part,
    const Block & block,
    const StorageMetadataPtr & metadata_snapshot,
    ContextPtr context)
{
    /// 1. 创建数据写入器
    auto writer = new_data_part->getWriter(
        block.getNamesAndTypesList(),
        metadata_snapshot,
        {},  // indices_to_recalc
        data.getCompressionCodecForPart(new_data_part->info.level, new_data_part->info.mutation, context),
        MergeTreeWriterSettings(context->getSettingsRef(), data.getSettings()),
        computed_index_granularity);

    /// 2. 写入数据块
    writer->write(block);

    /// 3. 完成写入
    writer->finishDataSerialization(sync_on_insert);
    writer->finishPrimaryIndexSerialization(sync_on_insert);
    writer->finishSkipIndicesSerialization(sync_on_insert);

    /// 4. 计算校验和
    new_data_part->checksums = writer->releaseChecksums();
}

2.2 MergeTree查询执行

2.2.1 查询执行器架构

// src/Storages/MergeTree/MergeTreeDataSelectExecutor.h
class MergeTreeDataSelectExecutor
{
public:
    explicit MergeTreeDataSelectExecutor(const MergeTreeData & data_);

    /// 主要的查询执行方法
    QueryPlanPtr read(
        const Names & column_names,
        const StorageSnapshotPtr & storage_snapshot,
        const SelectQueryInfo & query_info,
        ContextPtr context,
        UInt64 max_block_size,
        size_t num_streams,
        PartitionIdToMaxBlockPtr max_block_numbers_to_read = nullptr,
        bool enable_parallel_reading = false) const;

    /// 从指定数据部分读取
    QueryPlanStepPtr readFromParts(
        RangesInDataParts parts,
        MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
        const Names & column_names,
        const StorageSnapshotPtr & storage_snapshot,
        const SelectQueryInfo & query_info,
        ContextPtr context,
        UInt64 max_block_size,
        size_t num_streams,
        PartitionIdToMaxBlockPtr max_block_numbers_to_read = nullptr,
        ReadFromMergeTree::AnalysisResultPtr merge_tree_select_result_ptr = nullptr,
        bool enable_parallel_reading = false,
        std::shared_ptr<ParallelReadingExtension> extension_ = nullptr) const;

private:
    /// 选择需要读取的数据部分
    RangesInDataParts selectPartsToRead(
        const StorageSnapshotPtr & storage_snapshot,
        const SelectQueryInfo & query_info,
        ContextPtr context,
        PartitionIdToMaxBlockPtr max_block_numbers_to_read) const;

    /// 应用主键条件过滤
    void filterPartsByPrimaryKey(
        RangesInDataParts & parts,
        const StorageSnapshotPtr & storage_snapshot,
        const SelectQueryInfo & query_info,
        ContextPtr context) const;

    /// 应用跳数索引过滤
    void filterPartsBySkipIndexes(
        RangesInDataParts & parts,
        const StorageSnapshotPtr & storage_snapshot,
        const SelectQueryInfo & query_info,
        ContextPtr context) const;

    const MergeTreeData & data;
    LoggerPtr log;
};

// 查询执行核心逻辑
QueryPlanPtr MergeTreeDataSelectExecutor::read(
    const Names & column_names,
    const StorageSnapshotPtr & storage_snapshot,
    const SelectQueryInfo & query_info,
    ContextPtr context,
    UInt64 max_block_size,
    size_t num_streams,
    PartitionIdToMaxBlockPtr max_block_numbers_to_read,
    bool enable_parallel_reading) const
{
    /// 1. 选择需要读取的数据部分
    auto parts_with_ranges = selectPartsToRead(
        storage_snapshot, query_info, context, max_block_numbers_to_read);

    /// 2. 应用主键过滤
    filterPartsByPrimaryKey(parts_with_ranges, storage_snapshot, query_info, context);

    /// 3. 应用跳数索引过滤
    filterPartsBySkipIndexes(parts_with_ranges, storage_snapshot, query_info, context);

    /// 4. 创建查询计划
    auto query_plan = std::make_unique<QueryPlan>();
    
    if (parts_with_ranges.empty())
    {
        /// 没有数据需要读取,返回空结果
        auto header = storage_snapshot->getSampleBlockForColumns(column_names);
        auto read_nothing = std::make_unique<ReadNothingStep>(header);
        query_plan->addStep(std::move(read_nothing));
        return query_plan;
    }

    /// 5. 创建读取步骤
    auto read_step = readFromParts(
        std::move(parts_with_ranges),
        data.getMutationsSnapshot(query_info, context),
        column_names,
        storage_snapshot,
        query_info,
        context,
        max_block_size,
        num_streams,
        max_block_numbers_to_read,
        nullptr,
        enable_parallel_reading);

    query_plan->addStep(std::move(read_step));
    return query_plan;
}

2.2.2 数据部分选择算法

// 数据部分选择的核心算法
RangesInDataParts MergeTreeDataSelectExecutor::selectPartsToRead(
    const StorageSnapshotPtr & storage_snapshot,
    const SelectQueryInfo & query_info,
    ContextPtr context,
    PartitionIdToMaxBlockPtr max_block_numbers_to_read) const
{
    RangesInDataParts parts_with_ranges;
    
    /// 1. 获取所有活跃的数据部分
    auto data_parts = data.getDataPartsVector();
    
    /// 2. 分区裁剪
    if (metadata_snapshot->hasPartitionKey())
    {
        const auto & partition_key = metadata_snapshot->getPartitionKey();
        auto partition_pruner = std::make_shared<PartitionPruner>(
            metadata_snapshot, query_info, context, false);
        
        /// 过滤不匹配分区条件的数据部分
        data_parts.erase(
            std::remove_if(data_parts.begin(), data_parts.end(),
                [&](const auto & part) 
                {
                    return !partition_pruner->canBePruned(*part);
                }),
            data_parts.end());
    }
    
    /// 3. 应用最大块号限制
    if (max_block_numbers_to_read)
    {
        data_parts.erase(
            std::remove_if(data_parts.begin(), data_parts.end(),
                [&](const auto & part)
                {
                    auto it = max_block_numbers_to_read->find(part->info.partition_id);
                    return it != max_block_numbers_to_read->end() 
                        && part->info.max_block > it->second;
                }),
            data_parts.end());
    }
    
    /// 4. 为每个数据部分创建标记范围
    for (const auto & part : data_parts)
    {
        RangesInDataPart ranges_in_part;
        ranges_in_part.data_part = part;
        
        /// 初始化为读取整个数据部分
        ranges_in_part.part_index_in_query = parts_with_ranges.size();
        ranges_in_part.ranges = MarkRanges{MarkRange{0, part->getMarksCount()}};
        
        parts_with_ranges.push_back(std::move(ranges_in_part));
    }
    
    return parts_with_ranges;
}

// 主键过滤实现
void MergeTreeDataSelectExecutor::filterPartsByPrimaryKey(
    RangesInDataParts & parts,
    const StorageSnapshotPtr & storage_snapshot,
    const SelectQueryInfo & query_info,
    ContextPtr context) const
{
    if (!storage_snapshot->getMetadataForQuery()->hasSortingKey())
        return;
        
    const auto & primary_key = storage_snapshot->getMetadataForQuery()->getPrimaryKey();
    
    /// 构建主键条件
    KeyCondition key_condition(query_info.query, context, primary_key.column_names, primary_key.expression);
    
    if (key_condition.alwaysUnknownOrTrue())
        return; /// 主键条件无法过滤任何数据
    
    /// 对每个数据部分应用主键过滤
    for (auto & part_with_ranges : parts)
    {
        if (part_with_ranges.ranges.empty())
            continue;
            
        auto & part = part_with_ranges.data_part;
        
        /// 加载主键索引
        part->loadIndex();
        
        /// 应用主键条件过滤标记范围
        MarkRanges filtered_ranges;
        
        for (const auto & range : part_with_ranges.ranges)
        {
            MarkRanges new_ranges = key_condition.mayBeTrueInRange(
                range.begin, range.end, part->index, primary_key.sample_block);
                
            filtered_ranges.insert(filtered_ranges.end(), 
                                 new_ranges.begin(), new_ranges.end());
        }
        
        part_with_ranges.ranges = std::move(filtered_ranges);
    }
    
    /// 移除空范围的数据部分
    parts.erase(
        std::remove_if(parts.begin(), parts.end(),
            [](const RangesInDataPart & part) { return part.ranges.empty(); }),
        parts.end());
}

2.3 MergeTree合并机制

2.3.1 合并任务调度

// src/Storages/MergeTree/MergeTask.h
class MergeTask
{
public:
    /// 合并任务状态
    enum class State
    {
        NEED_PREPARE,
        NEED_EXECUTE,
        NEED_FINISH,
        SUCCESS
    };

    struct GlobalContext
    {
        MergeTreeData * data;
        StorageMetadataPtr metadata_snapshot;
        FutureMergedMutatedPartPtr future_part;
        MergeTreeData::MutableDataPartPtr new_data_part;
        
        /// 合并执行器
        std::unique_ptr<QueryPipelineBuilder> merging_pipeline;
        std::unique_ptr<PullingPipelineExecutor> merging_executor;
        
        /// 输出流
        std::unique_ptr<MergedBlockOutputStream> to;
        
        /// 统计信息
        size_t rows_written = 0;
        UInt64 watch_prev_elapsed = 0;
        
        /// 合并列表元素(用于监控)
        MergeListElement * merge_list_element_ptr = nullptr;
    };

    MergeTask(
        StorageMetadataPtr metadata_snapshot_,
        FutureMergedMutatedPartPtr future_part_,
        MergeTreeData * data_,
        MergeListElement * merge_list_element_ptr_,
        time_t time_of_merge_,
        ContextPtr context_,
        ReservationSharedPtr space_reservation_,
        bool deduplicate_,
        Names deduplicate_by_columns_,
        MergeTreeData::MergingParams merging_params_,
        MergeTreeTransactionPtr txn_,
        const String & suffix_ = "",
        bool need_prefix_ = true);

    /// 执行合并任务的一个步骤
    bool executeStep();

private:
    /// 准备合并
    bool prepare();
    
    /// 执行合并
    bool executeImpl();
    
    /// 完成合并
    bool finalize();

    std::shared_ptr<GlobalContext> global_ctx;
    State state{State::NEED_PREPARE};
};

// src/Storages/MergeTree/MergeTask.cpp
bool MergeTask::executeStep()
{
    switch (state)
    {
        case State::NEED_PREPARE:
            if (prepare())
            {
                state = State::NEED_EXECUTE;
                return true;
            }
            return false;

        case State::NEED_EXECUTE:
            if (executeImpl())
            {
                state = State::NEED_FINISH;
                return true;
            }
            return false;

        case State::NEED_FINISH:
            if (finalize())
            {
                state = State::SUCCESS;
                return false; /// 任务完成
            }
            return false;

        case State::SUCCESS:
            return false;
    }
    
    return false;
}

bool MergeTask::prepare()
{
    /// 1. 创建新的数据部分
    global_ctx->new_data_part = global_ctx->data->createPart(
        global_ctx->future_part->name,
        global_ctx->future_part->type,
        global_ctx->future_part->part_info,
        global_ctx->future_part->volume,
        global_ctx->future_part->relative_path);

    /// 2. 构建合并管道
    auto merging_pipeline = std::make_unique<QueryPipelineBuilder>();
    
    /// 为每个输入数据部分创建源
    for (const auto & part : global_ctx->future_part->parts)
    {
        auto source = std::make_shared<MergeTreeSequentialSource>(
            *global_ctx->data,
            global_ctx->metadata_snapshot,
            part,
            global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical(),
            false, /// 不需要虚拟列
            true   /// 需要行号
        );
        
        merging_pipeline->addSource(std::move(source));
    }

    /// 3. 添加合并变换
    auto merging_transform = std::make_shared<MergingSortedTransform>(
        merging_pipeline->getHeader(),
        global_ctx->future_part->parts.size(),
        global_ctx->metadata_snapshot->getSortDescription(),
        global_ctx->data->merging_params.max_bytes_to_merge_at_max_space_in_pool);
    
    merging_pipeline->addTransform(std::move(merging_transform));

    /// 4. 创建输出流
    global_ctx->to = std::make_unique<MergedBlockOutputStream>(
        global_ctx->new_data_part,
        global_ctx->metadata_snapshot,
        global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical(),
        CompressionCodecFactory::instance().get("NONE", {}),
        /// 其他参数...
    );

    /// 5. 创建执行器
    global_ctx->merging_executor = std::make_unique<PullingPipelineExecutor>(*merging_pipeline);
    global_ctx->merging_pipeline = std::move(merging_pipeline);

    return true;
}

bool MergeTask::executeImpl()
{
    /// 执行合并的时间限制
    UInt64 step_time_ms = global_ctx->data->getSettings()->background_task_preferred_step_execution_time_ms;
    
    Stopwatch watch;
    
    do
    {
        Block block;
        
        /// 从合并管道拉取数据块
        if (!global_ctx->merging_executor->pull(block))
        {
            /// 合并完成
            return true;
        }

        /// 写入合并后的数据块
        global_ctx->rows_written += block.rows();
        global_ctx->to->write(block);

        /// 更新最小最大索引
        if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary)
        {
            global_ctx->new_data_part->minmax_idx->update(
                block, 
                MergeTreeData::getMinMaxColumnsNames(global_ctx->metadata_snapshot->getPartitionKey()));
        }

        /// 更新统计信息
        if (global_ctx->merge_list_element_ptr)
        {
            global_ctx->merge_list_element_ptr->rows_written = global_ctx->rows_written;
            global_ctx->merge_list_element_ptr->bytes_written_uncompressed = 
                global_ctx->to->getBytesWritten();
        }

    } while (watch.elapsedMilliseconds() < step_time_ms);
    
    /// 时间片用完,但合并未完成
    return false;
}

bool MergeTask::finalize()
{
    /// 1. 完成数据写入
    global_ctx->to->finalizePart(global_ctx->new_data_part, false);
    
    /// 2. 设置数据部分属性
    global_ctx->new_data_part->rows_count = global_ctx->rows_written;
    global_ctx->new_data_part->modification_time = time(nullptr);
    
    /// 3. 加载元数据
    global_ctx->new_data_part->loadColumnsChecksumsIndexes(false, true);
    global_ctx->new_data_part->setBytesOnDisk(
        global_ctx->new_data_part->checksums.getTotalSizeOnDisk());
    
    /// 4. 提交新数据部分
    global_ctx->data->replaceParts(
        global_ctx->future_part->parts,
        {global_ctx->new_data_part},
        false);
    
    return true;
}

查询处理器框架分析

3.1 端口通信机制

3.1.1 Port基础设计

// src/Processors/Port.h
class Port
{
public:
    enum class State
    {
        NotNeeded,
        NeedData,
        HasData,
        Finished
    };

protected:
    State state = State::NotNeeded;
    IProcessor * processor = nullptr;
    
    /// 数据存储
    Chunk data;
    
    /// 连接信息
    Port * connected_port = nullptr;
    
public:
    /// 状态查询
    bool isConnected() const { return connected_port != nullptr; }
    bool hasData() const { return state == State::HasData; }
    bool isFinished() const { return state == State::Finished; }
    bool isNeeded() const { return state == State::NeedData; }
    
    /// 状态设置
    void setNeeded() { state = State::NeedData; }
    void setNotNeeded() { state = State::NotNeeded; }
    void finish() { state = State::Finished; }
    
    /// 数据传输
    void pushData(Chunk chunk)
    {
        chassert(state == State::NeedData);
        data = std::move(chunk);
        state = State::HasData;
    }
    
    Chunk pullData()
    {
        chassert(state == State::HasData);
        state = State::NotNeeded;
        return std::move(data);
    }
    
    /// 连接管理
    void connect(Port & other)
    {
        chassert(!isConnected() && !other.isConnected());
        connected_port = &other;
        other.connected_port = this;
    }
    
    void disconnect()
    {
        if (connected_port)
        {
            connected_port->connected_port = nullptr;
            connected_port = nullptr;
        }
    }
};

class InputPort : public Port
{
public:
    InputPort(Block header_) : header(std::move(header_)) {}
    
    /// 从连接的输出端口拉取数据
    Chunk pull(bool set_not_needed = false)
    {
        chassert(isConnected());
        chassert(hasData());
        
        auto chunk = connected_port->pullData();
        if (set_not_needed)
            setNotNeeded();
        return chunk;
    }
    
    /// 获取输出端口引用
    OutputPort & getOutputPort()
    {
        chassert(isConnected());
        return static_cast<OutputPort &>(*connected_port);
    }
    
    const Block & getHeader() const { return header; }

private:
    Block header; /// 数据块结构描述
};

class OutputPort : public Port
{
public:
    OutputPort(Block header_) : header(std::move(header_)) {}
    
    /// 向连接的输入端口推送数据
    void push(Chunk chunk)
    {
        chassert(isConnected());
        chassert(canPush());
        
        connected_port->pushData(std::move(chunk));
    }
    
    /// 检查是否可以推送数据
    bool canPush() const
    {
        chassert(isConnected());
        return connected_port->isNeeded();
    }
    
    /// 获取输入端口引用
    InputPort & getInputPort()
    {
        chassert(isConnected());
        return static_cast<InputPort &>(*connected_port);
    }
    
    const Block & getHeader() const { return header; }

private:
    Block header; /// 数据块结构描述
};

3.1.2 管道连接机制

// src/QueryPipeline/QueryPipeline.h
class QueryPipeline
{
public:
    QueryPipeline() = default;
    QueryPipeline(QueryPipeline &&) = default;
    QueryPipeline & operator=(QueryPipeline &&) = default;
    
    /// 从单个处理器创建管道
    explicit QueryPipeline(std::shared_ptr<IProcessor> source);
    
    /// 从Pipe创建管道
    explicit QueryPipeline(Pipe pipe);
    
    /// 管道操作
    void addTransform(ProcessorPtr transform);
    void addSimpleTransform(const ProcessorGetter & getter);
    void addChains(std::vector<Chain> chains);
    
    /// 管道合并
    static QueryPipeline unitePipelines(
        std::vector<std::unique_ptr<QueryPipeline>> pipelines,
        size_t max_threads_limit = 0,
        Processors * collected_processors = nullptr);
    
    /// 执行管道
    PipelineExecutorPtr execute();
    
    /// 获取管道信息
    bool empty() const { return processors.empty(); }
    bool initialized() const { return !processors.empty() || !pipe.empty(); }
    size_t getNumStreams() const { return pipe.numOutputPorts(); }
    
    const Block & getHeader() const { return pipe.getHeader(); }
    
private:
    /// 处理器集合
    Processors processors;
    
    /// 管道表示
    Pipe pipe;
    
    /// 最大线程数
    size_t max_threads = 0;
};

// 管道连接的核心逻辑
void connectPorts(OutputPort & output, InputPort & input)
{
    /// 1. 检查端口兼容性
    if (!blocksHaveEqualStructure(output.getHeader(), input.getHeader()))
    {
        throw Exception(ErrorCodes::LOGICAL_ERROR,
            "Cannot connect ports with different block structures. "
            "Output header: {}, input header: {}",
            output.getHeader().dumpStructure(),
            input.getHeader().dumpStructure());
    }
    
    /// 2. 检查端口状态
    if (output.isConnected())
        throw Exception(ErrorCodes::LOGICAL_ERROR, "Output port is already connected");
        
    if (input.isConnected())
        throw Exception(ErrorCodes::LOGICAL_ERROR, "Input port is already connected");
    
    /// 3. 建立连接
    output.connect(input);
}

// 自动连接处理器
void connectProcessors(IProcessor & left, IProcessor & right)
{
    auto & left_outputs = left.getOutputs();
    auto & right_inputs = right.getInputs();
    
    if (left_outputs.size() != right_inputs.size())
    {
        throw Exception(ErrorCodes::LOGICAL_ERROR,
            "Cannot connect processors: different number of ports. "
            "Left processor outputs: {}, right processor inputs: {}",
            left_outputs.size(), right_inputs.size());
    }
    
    auto left_it = left_outputs.begin();
    auto right_it = right_inputs.begin();
    
    for (; left_it != left_outputs.end(); ++left_it, ++right_it)
    {
        connectPorts(*left_it, *right_it);
    }
}

3.2 复杂处理器实现

3.2.1 聚合处理器

// src/Processors/Transforms/AggregatingTransform.h
class AggregatingTransform : public IProcessor
{
public:
    AggregatingTransform(
        Block header,
        AggregatingTransformParamsPtr params_,
        bool many_data_ = false)
        : IProcessor({InputPort(header)}, {OutputPort(params_->getHeader())})
        , params(std::move(params_))
        , key_columns(params->params.keys_size)
        , aggregate_columns(params->params.aggregates_size)
        , many_data(many_data_)
    {
        /// 初始化聚合器
        aggregator = std::make_unique<Aggregator>(params->params);
    }

    String getName() const override { return "AggregatingTransform"; }

    Status prepare() override
    {
        auto & output = outputs.front();
        auto & input = inputs.front();

        /// 检查输出状态
        if (output.isFinished())
        {
            input.close();
            return Status::Finished;
        }

        if (!output.canPush())
        {
            input.setNotNeeded();
            return Status::PortFull;
        }

        /// 如果有输出数据,推送它
        if (has_output)
        {
            output.push(std::move(output_chunk));
            has_output = false;
            
            if (finished)
            {
                output.finish();
                return Status::Finished;
            }
        }

        /// 检查输入状态
        if (finished)
            return Status::Ready;

        if (input.isFinished())
        {
            if (is_consume_finished)
            {
                finished = true;
                return Status::Ready;
            }
            
            is_consume_finished = true;
            return Status::Ready;
        }

        input.setNeeded();
        if (!input.hasData())
            return Status::NeedData;

        current_chunk = input.pull(true);
        return Status::Ready;
    }

    void work() override
    {
        if (is_consume_finished)
        {
            /// 输出聚合结果
            initGenerate();
        }
        else if (current_chunk)
        {
            /// 消费输入数据
            consume(std::move(current_chunk));
        }
    }

private:
    /// 消费输入数据块
    void consume(Chunk chunk)
    {
        const auto & info = chunk.getChunkInfo();
        
        if (const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get()))
        {
            /// 处理已聚合的数据
            auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
            block = aggregator->mergeBlocks(agg_info->bucket_num, std::move(block), finished);
            
            auto num_rows = block.rows();
            chunk.setColumns(block.getColumns(), num_rows);
        }
        else
        {
            /// 处理原始数据
            auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
            
            if (!aggregator->executeOnBlock(block, aggregated_data, key_columns, aggregate_columns, finished))
            {
                is_consume_finished = true;
            }
        }
    }

    /// 初始化结果生成
    void initGenerate()
    {
        if (aggregated_data.empty())
        {
            finished = true;
            return;
        }

        /// 转换聚合数据为输出块
        auto block = aggregator->convertToBlocks(aggregated_data, finished, max_block_size);
        
        if (block)
        {
            output_chunk.setColumns(block.getColumns(), block.rows());
            has_output = true;
        }
        else
        {
            finished = true;
        }
    }

    AggregatingTransformParamsPtr params;
    std::unique_ptr<Aggregator> aggregator;
    
    /// 聚合状态
    AggregatedDataVariants aggregated_data;
    ColumnNumbers key_columns;
    ColumnNumbers aggregate_columns;
    
    /// 处理状态
    Chunk current_chunk;
    Chunk output_chunk;
    bool has_output = false;
    bool finished = false;
    bool is_consume_finished = false;
    bool many_data = false;
    
    size_t max_block_size = DEFAULT_BLOCK_SIZE;
};

3.2.2 排序处理器

// src/Processors/Transforms/SortingTransform.h
class SortingTransform : public IProcessor
{
public:
    SortingTransform(
        const Block & header,
        const SortDescription & description_,
        UInt64 max_merged_block_size_,
        UInt64 limit_,
        size_t max_bytes_before_remerge_,
        double remerge_lowered_memory_bytes_ratio_,
        size_t max_bytes_before_external_sort_,
        VolumePtr tmp_volume_,
        size_t min_free_disk_space_)
        : IProcessor({InputPort(header)}, {OutputPort(header)})
        , description(description_)
        , max_merged_block_size(max_merged_block_size_)
        , limit(limit_)
        , max_bytes_before_remerge(max_bytes_before_remerge_)
        , remerge_lowered_memory_bytes_ratio(remerge_lowered_memory_bytes_ratio_)
        , max_bytes_before_external_sort(max_bytes_before_external_sort_)
        , tmp_volume(tmp_volume_)
        , min_free_disk_space(min_free_disk_space_)
    {
        /// 初始化排序器
        sorter = std::make_unique<MergeSorter>(header, description, max_merged_block_size, limit);
    }

    String getName() const override { return "SortingTransform"; }

    Status prepare() override
    {
        auto & input = inputs.front();
        auto & output = outputs.front();

        /// 检查输出状态
        if (output.isFinished())
        {
            input.close();
            return Status::Finished;
        }

        if (!output.canPush())
        {
            input.setNotNeeded();
            return Status::PortFull;
        }

        /// 输出数据
        if (has_output)
        {
            output.push(std::move(output_chunk));
            has_output = false;
            
            if (stage == Stage::Finished)
            {
                output.finish();
                return Status::Finished;
            }
        }

        /// 处理不同阶段
        switch (stage)
        {
            case Stage::Consume:
            {
                if (input.isFinished())
                {
                    stage = Stage::Generate;
                    return Status::Ready;
                }

                input.setNeeded();
                if (!input.hasData())
                    return Status::NeedData;

                current_chunk = input.pull(true);
                return Status::Ready;
            }

            case Stage::Generate:
                return Status::Ready;

            case Stage::Finished:
                output.finish();
                return Status::Finished;
        }

        return Status::Ready;
    }

    void work() override
    {
        switch (stage)
        {
            case Stage::Consume:
                consume();
                break;

            case Stage::Generate:
                generate();
                break;

            case Stage::Finished:
                break;
        }
    }

private:
    enum class Stage
    {
        Consume,    /// 消费输入数据
        Generate,   /// 生成排序结果
        Finished    /// 完成
    };

    void consume()
    {
        if (!current_chunk)
            return;

        /// 检查内存使用
        auto bytes = current_chunk.bytes();
        sum_bytes_in_blocks += bytes;

        if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
        {
            /// 需要外部排序
            if (!external_sorter)
            {
                external_sorter = std::make_unique<MergeSorter>(
                    getInputPort().getHeader(), description, max_merged_block_size, limit,
                    max_bytes_before_remerge, remerge_lowered_memory_bytes_ratio,
                    tmp_volume, min_free_disk_space);
            }
            
            external_sorter->addChunk(std::move(current_chunk));
        }
        else
        {
            /// 内存排序
            auto block = getInputPort().getHeader().cloneWithColumns(current_chunk.detachColumns());
            sorter->addBlock(block);
        }

        current_chunk.clear();
    }

    void generate()
    {
        if (external_sorter)
        {
            /// 从外部排序器获取结果
            auto block = external_sorter->read();
            if (block)
            {
                output_chunk.setColumns(block.getColumns(), block.rows());
                has_output = true;
            }
            else
            {
                stage = Stage::Finished;
            }
        }
        else
        {
            /// 从内存排序器获取结果
            auto block = sorter->read();
            if (block)
            {
                output_chunk.setColumns(block.getColumns(), block.rows());
                has_output = true;
            }
            else
            {
                stage = Stage::Finished;
            }
        }
    }

    SortDescription description;
    UInt64 max_merged_block_size;
    UInt64 limit;
    
    /// 内存管理参数
    size_t max_bytes_before_remerge;
    double remerge_lowered_memory_bytes_ratio;
    size_t max_bytes_before_external_sort;
    size_t sum_bytes_in_blocks = 0;
    
    /// 临时存储
    VolumePtr tmp_volume;
    size_t min_free_disk_space;
    
    /// 排序器
    std::unique_ptr<MergeSorter> sorter;
    std::unique_ptr<MergeSorter> external_sorter;
    
    /// 处理状态
    Stage stage = Stage::Consume;
    Chunk current_chunk;
    Chunk output_chunk;
    bool has_output = false;
};

数据读写核心流程

4.1 数据写入流程详解

4.1.1 INSERT语句处理

// src/Interpreters/InterpreterInsertQuery.h
class InterpreterInsertQuery : public IInterpreter
{
public:
    InterpreterInsertQuery(
        const ASTPtr & query_ptr_,
        ContextPtr context_,
        bool allow_materialized_ = false,
        bool no_squash_ = false,
        bool no_destination_ = false,
        bool async_insert_ = false);

    BlockIO execute() override;

private:
    /// 创建插入链
    Chain buildChain(
        const StoragePtr & table,
        const StorageMetadataPtr & metadata_snapshot,
        const Names & columns);

    /// 处理异步插入
    BlockIO executeAsyncInsert();

    ASTPtr query_ptr;
    ContextPtr context;
    bool allow_materialized = false;
    bool no_squash = false;
    bool no_destination = false;
    bool async_insert = false;
};

// src/Interpreters/InterpreterInsertQuery.cpp
BlockIO InterpreterInsertQuery::execute()
{
    const auto & query = query_ptr->as<ASTInsertQuery &>();
    
    /// 1. 获取目标表
    StoragePtr table = DatabaseCatalog::instance().getTable(
        StorageID(query.getDatabase(), query.getTable()), context);
    
    auto metadata_snapshot = table->getInMemoryMetadataPtr();
    
    /// 2. 检查权限
    auto table_id = table->getStorageID();
    context->checkAccess(AccessType::INSERT, table_id, metadata_snapshot->getColumnsRequiredForInsert());
    
    /// 3. 处理异步插入
    if (async_insert)
        return executeAsyncInsert();
    
    /// 4. 构建插入管道
    BlockIO res;
    
    /// 获取插入的列
    Names columns = query.columns ? query.columns->getNames() : metadata_snapshot->getColumns().getNamesOfPhysical();
    
    /// 创建插入链
    auto chain = buildChain(table, metadata_snapshot, columns);
    
    /// 5. 处理数据源
    if (query.data)
    {
        /// 直接插入数据
        res = table->write(query_ptr, metadata_snapshot, context, async_insert);
    }
    else if (query.select)
    {
        /// 从SELECT插入数据
        InterpreterSelectQuery interpreter_select(query.select, context, SelectQueryOptions().analyze());
        auto select_pipeline = interpreter_select.buildQueryPipeline();
        
        /// 连接SELECT和INSERT管道
        select_pipeline.addChain(std::move(chain));
        res.pipeline = std::move(select_pipeline);
    }
    else
    {
        /// 从输入流插入数据
        res.pipeline.init(Pipe(std::make_shared<NullSource>(metadata_snapshot->getSampleBlock())));
        res.pipeline.addChain(std::move(chain));
    }
    
    return res;
}

Chain InterpreterInsertQuery::buildChain(
    const StoragePtr & table,
    const StorageMetadataPtr & metadata_snapshot,
    const Names & columns)
{
    Chain chain;
    
    /// 1. 添加类型转换
    if (context->getSettingsRef().input_format_defaults_for_omitted_fields)
    {
        auto adding_defaults_transform = std::make_shared<AddingDefaultsTransform>(
            metadata_snapshot->getSampleBlock(), columns, *metadata_snapshot, context);
        chain.addSource(std::move(adding_defaults_transform));
    }
    
    /// 2. 添加数据压缩(如果需要)
    if (!no_squash && context->getSettingsRef().min_insert_block_size_rows)
    {
        auto squashing_transform = std::make_shared<SquashingChunksTransform>(
            chain.getInputHeader(),
            context->getSettingsRef().min_insert_block_size_rows,
            context->getSettingsRef().min_insert_block_size_bytes);
        chain.addSource(std::move(squashing_transform));
    }
    
    /// 3. 添加存储写入器
    if (!no_destination)
    {
        auto sink = table->write(query_ptr, metadata_snapshot, context, async_insert);
        chain.addSink(std::move(sink));
    }
    
    return chain;
}

4.1.2 MergeTree写入实现

// src/Storages/MergeTree/MergeTreeSink.cpp
class MergeTreeSink : public SinkToStorage
{
public:
    MergeTreeSink(
        StorageMergeTree & storage_,
        const StorageMetadataPtr & metadata_snapshot_,
        size_t max_parts_per_block_,
        ContextPtr context_)
        : SinkToStorage(metadata_snapshot_->getSampleBlock())
        , storage(storage_)
        , metadata_snapshot(metadata_snapshot_)
        , max_parts_per_block(max_parts_per_block_)
        , context(context_) {}

    String getName() const override { return "MergeTreeSink"; }

protected:
    void consume(Chunk chunk) override
    {
        auto block = getHeader().cloneWithColumns(chunk.detachColumns());
        
        /// 1. 分区数据
        auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
        
        /// 2. 写入各个分区
        for (auto & part_block : part_blocks)
        {
            /// 创建临时数据部分
            auto temp_part = storage.writer.writeTempPart(part_block, metadata_snapshot, context);
            
            /// 添加到存储
            storage.renameTempPartAndAdd(temp_part, nullptr, &storage.increment);
        }
        
        /// 3. 触发后台合并(如果需要)
        storage.background_operations_assignee->trigger();
    }

private:
    StorageMergeTree & storage;
    StorageMetadataPtr metadata_snapshot;
    size_t max_parts_per_block;
    ContextPtr context;
};

// 数据部分写入的详细实现
MergeTreeMutableDataPartPtr MergeTreeDataWriter::writeTempPart(
    BlockWithPartition & block_with_partition,
    const StorageMetadataPtr & metadata_snapshot,
    ContextPtr context)
{
    Block & block = block_with_partition.block;
    
    /// 1. 验证数据块
    metadata_snapshot->check(block, true);
    
    /// 2. 生成数据部分名称
    auto part_name = data.getPartName(
        block_with_partition.partition,
        block_with_partition.min_block,
        block_with_partition.max_block,
        block_with_partition.level);
    
    /// 3. 选择存储卷
    auto volume = data.getStoragePolicy()->getVolume(0);
    
    /// 4. 创建临时数据部分
    auto new_data_part = data.createPart(
        part_name,
        choosePartType(block.bytes(), data.getSettings()->min_bytes_for_wide_part),
        MergeTreePartInfo::fromPartName(part_name, data.format_version),
        volume,
        part_name);
    
    /// 5. 设置分区信息
    new_data_part->partition = std::move(block_with_partition.partition);
    new_data_part->minmax_idx->update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
    
    /// 6. 计算索引粒度
    MergeTreeIndexGranularity index_granularity;
    computeGranularity(block, index_granularity, context->getSettingsRef());
    
    /// 7. 创建数据写入器
    auto writer = new_data_part->getWriter(
        block.getNamesAndTypesList(),
        metadata_snapshot,
        data.getIndicesDescription(),
        data.getCompressionCodecForPart(new_data_part->info.level, new_data_part->info.mutation, context),
        MergeTreeWriterSettings(context->getSettingsRef(), data.getSettings()),
        index_granularity);
    
    /// 8. 写入数据
    writer->write(block);
    
    /// 9. 完成写入
    writer->finishDataSerialization(context->getSettingsRef().fsync_part_directory);
    writer->finishPrimaryIndexSerialization(context->getSettingsRef().fsync_part_directory);
    writer->finishSkipIndicesSerialization(context->getSettingsRef().fsync_part_directory);
    
    /// 10. 设置校验和
    new_data_part->checksums = writer->releaseChecksums();
    new_data_part->setBytesOnDisk(new_data_part->checksums.getTotalSizeOnDisk());
    new_data_part->rows_count = block.rows();
    new_data_part->modification_time = time(nullptr);
    
    return new_data_part;
}

4.2 数据读取流程详解

4.2.1 SELECT语句处理

// src/Interpreters/InterpreterSelectQuery.cpp
BlockIO InterpreterSelectQuery::execute()
{
    /// 1. 构建查询计划
    QueryPlan query_plan;
    buildQueryPlan(query_plan);
    
    /// 2. 优化查询计划
    QueryPlanOptimizationSettings optimization_settings = QueryPlanOptimizationSettings::fromContext(context);
    query_plan.optimize(optimization_settings);
    
    /// 3. 构建查询管道
    auto builder = query_plan.buildQueryPipeline(
        optimization_settings,
        BuildQueryPipelineSettings::fromContext(context));
    
    /// 4. 返回执行结果
    BlockIO res;
    res.pipeline = std::move(*builder);
    
    /// 5. 设置限制和配额
    if (context->hasQueryContext())
    {
        res.pipeline.setLimitsAndQuota(
            context->getQueryContext()->getStreamingLimits(),
            context->getQueryContext()->getQuota());
    }
    
    return res;
}

void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
{
    const auto & query = getSelectQuery();
    
    /// 1. 分析查询表达式
    analyzeExpressions(QueryProcessingStage::FetchColumns, false, Block{});
    
    /// 2. 从存储读取数据
    if (storage && !options.only_analyze)
    {
        /// 构建存储读取步骤
        auto read_step = std::make_unique<ReadFromStorageStep>(
            storage,
            query_analyzer->requiredSourceColumns(),
            storage_snapshot,
            query_info,
            context,
            processing_stage,
            max_block_size,
            max_streams);
        
        query_plan.addStep(std::move(read_step));
    }
    else
    {
        /// 创建空数据源
        auto header = query_analyzer->getSampleBlock();
        auto read_nothing = std::make_unique<ReadNothingStep>(header);
        query_plan.addStep(std::move(read_nothing));
    }
    
    /// 3. 添加WHERE过滤
    if (query_analyzer->hasWhere())
    {
        auto where_step = std::make_unique<FilterStep>(
            query_plan.getCurrentDataStream(),
            query_analyzer->where(),
            query_analyzer->where()->getColumnName(),
            true);
        query_plan.addStep(std::move(where_step));
    }
    
    /// 4. 添加聚合
    if (query_analyzer->hasAggregation())
    {
        auto aggregating_step = std::make_unique<AggregatingStep>(
            query_plan.getCurrentDataStream(),
            query_analyzer->aggregationKeys(),
            query_analyzer->aggregates(),
            query_analyzer->groupingSetsParams(),
            true, /// final
            max_block_size,
            context->getSettingsRef().aggregation_in_order_max_block_bytes,
            merge_threads,
            temporary_data_merge_threads,
            context->getSettingsRef().enable_software_prefetch_in_aggregation,
            context->getSettingsRef().only_merge_for_aggregation_in_order,
            query_analyzer->aggregationShouldProduceResultInOrderOfPrimaryKey());
        
        query_plan.addStep(std::move(aggregating_step));
    }
    
    /// 5. 添加HAVING过滤
    if (query_analyzer->hasHaving())
    {
        auto having_step = std::make_unique<FilterStep>(
            query_plan.getCurrentDataStream(),
            query_analyzer->having(),
            query_analyzer->having()->getColumnName(),
            false);
        query_plan.addStep(std::move(having_step));
    }
    
    /// 6. 添加ORDER BY排序
    if (query_analyzer->hasOrderBy())
    {
        auto sorting_step = std::make_unique<SortingStep>(
            query_plan.getCurrentDataStream(),
            query_analyzer->orderByDescription(),
            query.limitLength(),
            SortingStep::Settings(context->getSettingsRef()),
            context->getSettingsRef().optimize_sorting_by_input_stream_properties);
        
        query_plan.addStep(std::move(sorting_step));
    }
    
    /// 7. 添加LIMIT限制
    if (query.limitLength())
    {
        auto limit_step = std::make_unique<LimitStep>(
            query_plan.getCurrentDataStream(),
            query.limitLength(),
            query.limitOffset());
        
        query_plan.addStep(std::move(limit_step));
    }
    
    /// 8. 添加投影
    if (query_analyzer->hasProjection())
    {
        auto expression_step = std::make_unique<ExpressionStep>(
            query_plan.getCurrentDataStream(),
            query_analyzer->projection());
        
        query_plan.addStep(std::move(expression_step));
    }
}

4.2.2 MergeTree读取实现

// src/Storages/MergeTree/MergeTreeRangeReader.h
class MergeTreeRangeReader
{
public:
    struct ReadResult
    {
        /// 读取的数据块
        Block block;
        
        /// 读取的行数
        size_t num_rows = 0;
        
        /// 过滤信息
        ColumnPtr filter;
        size_t num_rows_after_filter = 0;
        
        /// 是否需要更多数据
        bool need_more_data = false;
    };

    MergeTreeRangeReader(
        IMergeTreeReader * merge_tree_reader_,
        MergeTreeRangeReader * prev_reader_,
        const PrewhereInfoPtr & prewhere_info_,
        bool last_reader_in_chain_);

    /// 读取指定行数的数据
    ReadResult read(size_t max_rows, MarkRanges & ranges);

private:
    /// 执行PREWHERE过滤
    void executePrewhereActionsAndFilterColumns(ReadResult & result);
    
    /// 读取所需的列
    size_t readRows(size_t max_rows, MarkRanges & ranges, ReadResult & result);

    IMergeTreeReader * merge_tree_reader = nullptr;
    MergeTreeRangeReader * prev_reader = nullptr;
    PrewhereInfoPtr prewhere_info;
    bool last_reader_in_chain = false;
    
    /// 列缓存
    std::unordered_map<String, ColumnPtr> column_cache;
};

// src/Storages/MergeTree/MergeTreeRangeReader.cpp
MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, MarkRanges & ranges)
{
    ReadResult result;
    
    if (ranges.empty())
        return result;
    
    /// 1. 从前一个读取器获取数据(如果有)
    if (prev_reader)
    {
        result = prev_reader->read(max_rows, ranges);
        
        if (result.num_rows == 0)
            return result;
        
        /// 2. 执行当前层的PREWHERE过滤
        if (prewhere_info)
        {
            executePrewhereActionsAndFilterColumns(result);
            
            /// 如果过滤后没有行,继续读取
            if (result.num_rows_after_filter == 0 && !ranges.empty())
                return read(max_rows, ranges);
        }
    }
    else
    {
        /// 3. 直接从存储读取数据
        result.num_rows = readRows(max_rows, ranges, result);
        
        if (result.num_rows == 0)
            return result;
        
        /// 4. 执行PREWHERE过滤
        if (prewhere_info)
        {
            executePrewhereActionsAndFilterColumns(result);
        }
        else
        {
            result.num_rows_after_filter = result.num_rows;
        }
    }
    
    return result;
}

size_t MergeTreeRangeReader::readRows(size_t max_rows, MarkRanges & ranges, ReadResult & result)
{
    size_t read_rows = 0;
    Columns columns;
    
    /// 1. 准备列容器
    const auto & header = merge_tree_reader->getColumns();
    columns.resize(header.size());
    
    /// 2. 逐个标记范围读取数据
    while (read_rows < max_rows && !ranges.empty())
    {
        auto & range = ranges.front();
        
        /// 计算本次读取的行数
        size_t rows_to_read = std::min(max_rows - read_rows, range.end - range.begin);
        
        /// 从MergeTree读取器读取数据
        size_t rows_read = merge_tree_reader->readRows(
            range.begin,
            range.end,
            read_rows > 0, /// continue_reading
            rows_to_read,
            columns);
        
        read_rows += rows_read;
        range.begin += rows_read;
        
        /// 如果范围读取完毕,移除它
        if (range.begin >= range.end)
            ranges.pop_front();
        
        /// 如果读取的行数少于预期,说明数据部分读取完毕
        if (rows_read < rows_to_read)
            break;
    }
    
    /// 3. 构建结果块
    if (read_rows > 0)
    {
        result.block = header.cloneWithColumns(std::move(columns));
    }
    
    return read_rows;
}

void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & result)
{
    if (!prewhere_info || !result.block)
        return;
    
    const auto & prewhere_actions = prewhere_info->prewhere_actions;
    const auto & prewhere_column_name = prewhere_info->prewhere_column_name;
    
    /// 1. 执行PREWHERE表达式
    prewhere_actions->execute(result.block);
    
    /// 2. 获取过滤列
    auto filter_column = result.block.getByName(prewhere_column_name).column;
    
    /// 3. 应用过滤器
    if (const auto * const_column = typeid_cast<const ColumnConst *>(filter_column.get()))
    {
        /// 常量过滤器
        if (const_column->getValue<UInt8>())
        {
            /// 全部保留
            result.num_rows_after_filter = result.num_rows;
        }
        else
        {
            /// 全部过滤
            result.num_rows_after_filter = 0;
            result.block.clear();
        }
    }
    else
    {
        /// 变量过滤器
        const auto & filter_data = typeid_cast<const ColumnUInt8 &>(*filter_column).getData();
        
        /// 计算过滤后的行数
        result.num_rows_after_filter = countBytesInFilter(filter_data);
        
        if (result.num_rows_after_filter == 0)
        {
            result.block.clear();
        }
        else if (result.num_rows_after_filter < result.num_rows)
        {
            /// 应用过滤器到所有列
            for (auto & column : result.block)
            {
                column.column = column.column->filter(filter_data, result.num_rows_after_filter);
            }
        }
        
        /// 保存过滤器用于后续处理
        result.filter = std::move(filter_column);
    }
    
    /// 4. 移除PREWHERE列(如果不需要)
    if (prewhere_info->remove_prewhere_column)
    {
        result.block.erase(prewhere_column_name);
    }
}

这份详细的代码分析文档深入剖析了ClickHouse的核心组件实现,包括:

  1. 处理器框架:详细分析了IProcessor接口设计、管道执行器、不同类型处理器的实现
  2. MergeTree存储引擎:深入解析了数据部分管理、读写器实现、合并机制
  3. 查询处理流程:分析了端口通信机制、复杂处理器实现
  4. 数据读写流程:详细说明了INSERT和SELECT的完整处理链路

每个部分都包含了关键函数的完整代码实现和详细的功能说明,帮助开发者深入理解ClickHouse的内部工作机制。