ClickHouse-11-QueryPipeline与Coordination模块
模块概览
职责
QueryPipeline 模块负责:
- 构建查询执行管道
- 连接 Processors 形成执行图
- 管理数据流和端口连接
- 支持并行执行和流水线优化
- 处理管道的启动、执行、完成
Coordination 模块负责:
- 提供分布式协调服务(ClickHouse Keeper)
- 实现类 ZooKeeper 协议
- 管理分布式锁和元数据
- 支持 Raft 共识算法
- 提供高可用性保障
模块架构图
flowchart TB
subgraph QueryPipeline["QueryPipeline 模块"]
subgraph PipelineCore["核心组件"]
QueryPipeline[QueryPipeline<br/>查询管道]
QueryPipelineBuilder[QueryPipelineBuilder<br/>管道构建器]
Pipe[Pipe<br/>管道片段]
Chain[Chain<br/>处理器链]
end
subgraph PipelineExecution["执行组件"]
PipelineExecutor[PipelineExecutor<br/>执行器]
ExecutingGraph[ExecutingGraph<br/>执行图]
ExecutorTasks[ExecutorTasks<br/>任务队列]
end
subgraph PipelineOptimization["优化组件"]
Resize[Resize<br/>并行度调整]
Transform[Transform<br/>转换优化]
Merge[Merge<br/>合并优化]
end
end
subgraph Coordination["Coordination 模块"]
subgraph KeeperCore["Keeper 核心"]
KeeperServer[KeeperServer<br/>Keeper 服务器]
KeeperStorage[KeeperStorage<br/>存储引擎]
KeeperStateMachine[KeeperStateMachine<br/>状态机]
end
subgraph RaftConsensus["Raft 共识"]
RaftInstance[RaftInstance<br/>Raft 实例]
LogStore[LogStore<br/>日志存储]
SnapshotManager[SnapshotManager<br/>快照管理]
end
subgraph KeeperProtocol["Keeper 协议"]
ZooKeeperProtocol[ZooKeeper 协议<br/>兼容层]
KeeperConnection[KeeperConnection<br/>连接管理]
KeeperSession[KeeperSession<br/>会话管理]
end
end
QueryPlan[QueryPlan] --> QueryPipelineBuilder
QueryPipelineBuilder --> Pipe
Pipe --> Chain
Chain --> Processors[IProcessor]
QueryPipelineBuilder --> QueryPipeline
QueryPipeline --> PipelineExecutor
PipelineExecutor --> ExecutingGraph
ExecutingGraph --> ExecutorTasks
KeeperServer --> KeeperStorage
KeeperServer --> RaftInstance
RaftInstance --> LogStore
RaftInstance --> SnapshotManager
ZooKeeperProtocol --> KeeperConnection
KeeperConnection --> KeeperSession
KeeperSession --> KeeperStorage
架构说明
图意概述
QueryPipeline 模块负责将 QueryPlan 转换为可执行的 Processor 管道。QueryPipelineBuilder 是核心构建器,它创建 Pipe(管道片段)和 Chain(处理器链),最终形成完整的 QueryPipeline。PipelineExecutor 负责实际执行,构建 ExecutingGraph 并管理任务调度。
Coordination 模块提供分布式协调服务。KeeperServer 是主服务器,使用 Raft 算法实现强一致性。KeeperStorage 存储所有协调数据。ZooKeeperProtocol 提供与 ZooKeeper 兼容的接口。
关键字段与接口
QueryPipeline 类
class QueryPipeline {
public:
// 构造
QueryPipeline() = default;
// 设置输入源
void init(Pipe pipe);
// 添加处理步骤
void addSimpleTransform(const ProcessorGetter & getter);
void addTransform(ProcessorPtr transform);
// 调整并行度
void resize(size_t num_streams);
// 合并流
void narrow(size_t size);
// 获取信息
size_t getNumStreams() const { return streams.size(); }
const Block & getHeader() const { return current_header; }
// 执行
PipelineExecutorPtr execute();
// 完成构建
void finalize();
private:
Pipes pipes; // 管道片段
Block current_header; // 当前数据头
size_t num_threads = 0; // 线程数
std::vector<ProcessorPtr> processors; // 所有处理器
};
QueryPipelineBuilder 类
class QueryPipelineBuilder {
public:
QueryPipelineBuilder() = default;
// 从 QueryPlan 构建
static QueryPipelineBuilder buildQueryPipeline(
QueryPlan & plan,
const BuildQueryPipelineSettings & settings);
// 添加步骤
void addSimpleTransform(const Pipe::ProcessorGetter & getter);
void addTransform(ProcessorPtr transform);
// 调整管道
void resize(size_t num_streams, bool force = false);
void narrow(size_t size);
// 完成构建
QueryPipeline getPipeline() &&;
private:
Pipe pipe; // 当前管道
QueryPipeline::StreamType stream_type = QueryPipeline::StreamType::Main;
};
PipelineExecutor 类
class PipelineExecutor {
public:
PipelineExecutor(Processors & processors, QueryStatus * elem);
// 执行管道
void execute(size_t num_threads);
// 取消执行
void cancel();
// 获取状态
const ExecutingGraph & getExecutingGraph() const { return graph; }
private:
// 初始化执行图
void initializeExecution(size_t num_threads);
// 执行单步
void executeStep(ExecutorTasks & tasks);
// 调度任务
void schedule(ExecutorTasks & tasks);
ExecutingGraph graph; // 执行图
ThreadPool pool; // 线程池
std::atomic<bool> cancelled{false};
};
KeeperServer 类
class KeeperServer {
public:
KeeperServer(
const KeeperConfigurationPtr & configuration,
const Poco::Util::AbstractConfiguration & config);
// 启动服务
void startup();
void shutdown();
// 处理请求
RaftResult putRequest(const KeeperStorage::RequestForSession & request);
// 状态查询
bool isLeader() const;
bool isFollower() const;
uint64_t getLogSize() const;
// 快照管理
void createSnapshot();
void loadSnapshot(const std::string & path);
private:
std::unique_ptr<KeeperStateMachine> state_machine;
std::unique_ptr<nuraft::raft_server> raft_instance;
KeeperStoragePtr storage;
CoordinationSettingsPtr coordination_settings;
};
KeeperStorage 类
class KeeperStorage {
public:
struct Node {
std::string data;
std::vector<std::string> children;
int64_t czxid = 0; // 创建事务ID
int64_t mzxid = 0; // 修改事务ID
int64_t ctime = 0; // 创建时间
int64_t mtime = 0; // 修改时间
int32_t version = 0; // 数据版本
int32_t cversion = 0; // 子节点版本
int32_t aversion = 0; // ACL 版本
int64_t ephemeralOwner = 0; // 临时节点所有者
};
// 节点操作
ResponsePtr processRequest(
const RequestPtr & request,
int64_t session_id,
int64_t zxid);
// 创建节点
ResponsePtr create(
const std::string & path,
const std::string & data,
int32_t flags,
int64_t session_id);
// 删除节点
ResponsePtr remove(const std::string & path, int32_t version);
// 获取节点
ResponsePtr get(const std::string & path);
// 设置数据
ResponsePtr set(const std::string & path, const std::string & data, int32_t version);
// 列出子节点
ResponsePtr list(const std::string & path);
// 检查存在
ResponsePtr exists(const std::string & path);
private:
std::unordered_map<std::string, Node> container; // 节点存储
std::unordered_map<int64_t, std::unordered_set<std::string>> ephemerals; // 临时节点
int64_t zxid = 0; // 当前事务ID
int64_t session_id_counter = 0; // 会话ID计数器
};
边界条件
QueryPipeline
- 最大线程数:取决于系统
- 最大 Processor 数量:数千到数万
- 管道深度:通常 < 100
- 并行流数量:1-1000
Coordination
- 最大节点数量:数百万
- 最大数据大小:每个节点 < 1MB
- 会话超时:秒级到分钟级
- 集群大小:3-7 个节点
异常与回退
QueryPipeline 异常
- LOGICAL_ERROR:管道构建错误
- QUERY_WAS_CANCELLED:查询被取消
- TOO_MANY_SIMULTANEOUS_QUERIES:并发查询过多
Coordination 异常
- ZNONODE:节点不存在
- ZNODEEXISTS:节点已存在
- ZBADVERSION:版本不匹配
- ZSESSIONEXPIRED:会话过期
核心 API 详解
API 1: QueryPipelineBuilder::buildQueryPipeline - 构建管道
基本信息
- 名称:
buildQueryPipeline() - 用途: 从 QueryPlan 构建执行管道
- 返回: QueryPipelineBuilder
实现流程
QueryPipelineBuilder QueryPipelineBuilder::buildQueryPipeline(
QueryPlan & plan,
const BuildQueryPipelineSettings & settings)
{
// 1) 优化 QueryPlan
QueryPlanOptimizationSettings optimization_settings;
QueryPlanOptimizer::optimize(plan, optimization_settings);
// 2) 创建构建器
QueryPipelineBuilder builder;
// 3) 遍历 QueryPlan 的步骤
for (auto * step : plan.getSteps()) {
// 每个步骤转换为 Processors
step->updatePipeline(builder, settings);
}
// 4) 完成构建
builder.finalize();
return builder;
}
// 示例:ReadFromStorageStep 的实现
void ReadFromStorageStep::updatePipeline(
QueryPipelineBuilder & pipeline,
const BuildQueryPipelineSettings & settings)
{
// 1) 从存储读取
Pipe pipe = storage->read(
column_names,
metadata_snapshot,
query_info,
context,
processing_stage,
max_block_size,
num_streams
);
// 2) 初始化管道
if (pipeline.initialized())
throw Exception("Pipeline already initialized");
pipeline.init(std::move(pipe));
}
// 示例:FilterStep 的实现
void FilterStep::updatePipeline(
QueryPipelineBuilder & pipeline,
const BuildQueryPipelineSettings & settings)
{
// 添加过滤转换
pipeline.addSimpleTransform([&](const Block & header) {
return std::make_shared<FilterTransform>(
header,
filter_expression,
filter_column_name,
remove_filter_column
);
});
}
API 2: PipelineExecutor::execute - 执行管道
基本信息
- 名称:
execute() - 用途: 执行管道中的所有 Processors
- 线程模型: 多线程并行执行
实现(简化版)
void PipelineExecutor::execute(size_t num_threads) {
// 1) 初始化执行图
initializeExecution(num_threads);
// 2) 创建任务队列
ExecutorTasks tasks(num_threads);
// 3) 执行循环
while (!graph.isFinished()) {
// 检查是否取消
if (cancelled.load())
break;
// 调度任务
schedule(tasks);
// 执行一步
executeStep(tasks);
}
// 4) 等待所有任务完成
tasks.wait();
}
void PipelineExecutor::initializeExecution(size_t num_threads) {
// 1) 构建执行图
graph.init(processors);
// 2) 准备所有 Processor
for (auto * node : graph.nodes) {
node->processor->prepare();
}
// 3) 初始化线程池
pool.initialize(num_threads);
}
void PipelineExecutor::executeStep(ExecutorTasks & tasks) {
// 1) 获取可执行的节点
auto ready_nodes = graph.getReadyNodes();
// 2) 为每个节点创建任务
for (auto * node : ready_nodes) {
tasks.addTask([node]() {
// 准备
auto status = node->processor->prepare();
if (status == IProcessor::Status::Ready) {
// 执行工作
node->processor->work();
}
});
}
// 3) 执行任务
tasks.execute();
}
API 3: KeeperStorage::processRequest - 处理 Keeper 请求
基本信息
- 名称:
processRequest() - 用途: 处理 ZooKeeper 兼容的请求
- 一致性: 通过 Raft 保证强一致性
实现
ResponsePtr KeeperStorage::processRequest(
const RequestPtr & request,
int64_t session_id,
int64_t zxid)
{
// 1) 更新事务ID
this->zxid = zxid;
// 2) 根据请求类型分发
switch (request->getOpNum()) {
case OpNum::Create:
return processCreate(
dynamic_cast<const CreateRequest &>(*request),
session_id
);
case OpNum::Remove:
return processRemove(
dynamic_cast<const RemoveRequest &>(*request)
);
case OpNum::Set:
return processSet(
dynamic_cast<const SetRequest &>(*request)
);
case OpNum::Get:
return processGet(
dynamic_cast<const GetRequest &>(*request)
);
case OpNum::List:
return processList(
dynamic_cast<const ListRequest &>(*request)
);
default:
return std::make_shared<ErrorResponse>(-1);
}
}
ResponsePtr KeeperStorage::processCreate(
const CreateRequest & request,
int64_t session_id)
{
const std::string & path = request.path;
// 1) 检查节点是否已存在
if (container.contains(path)) {
auto response = std::make_shared<CreateResponse>();
response->error = Error::ZNODEEXISTS;
return response;
}
// 2) 检查父节点是否存在
std::string parent_path = parentPath(path);
if (!parent_path.empty() && !container.contains(parent_path)) {
auto response = std::make_shared<CreateResponse>();
response->error = Error::ZNONODE;
return response;
}
// 3) 创建节点
Node node;
node.data = request.data;
node.czxid = zxid;
node.mzxid = zxid;
node.ctime = std::time(nullptr);
node.mtime = node.ctime;
// 处理临时节点
if (request.is_ephemeral) {
node.ephemeralOwner = session_id;
ephemerals[session_id].insert(path);
}
// 4) 插入节点
container[path] = node;
// 5) 更新父节点的子节点列表
if (!parent_path.empty()) {
container[parent_path].children.push_back(path);
container[parent_path].cversion++;
}
// 6) 返回响应
auto response = std::make_shared<CreateResponse>();
response->path_created = path;
return response;
}
数据结构 UML 图
classDiagram
class QueryPipeline {
-pipes: vector~Pipe~
-processors: vector~ProcessorPtr~
-current_header: Block
+init(Pipe) void
+resize(size_t) void
+execute() PipelineExecutorPtr
}
class QueryPipelineBuilder {
-pipe: Pipe
+buildQueryPipeline(QueryPlan) static
+addTransform(ProcessorPtr) void
+resize(size_t) void
+getPipeline() QueryPipeline
}
class Pipe {
-processors: Processors
-header: Block
+numOutputPorts() size_t
+getHeader() Block
}
class PipelineExecutor {
-graph: ExecutingGraph
-pool: ThreadPool
+execute(size_t) void
+cancel() void
}
class ExecutingGraph {
-nodes: vector~Node*~
+init(Processors) void
+getReadyNodes() vector~Node*~
+isFinished() bool
}
class KeeperServer {
-state_machine: KeeperStateMachine
-raft_instance: raft_server
-storage: KeeperStoragePtr
+startup() void
+putRequest(Request) RaftResult
+isLeader() bool
}
class KeeperStorage {
-container: map~String, Node~
-ephemerals: map~int64, set~String~~
-zxid: int64
+processRequest(...) ResponsePtr
+create(...) ResponsePtr
+remove(...) ResponsePtr
+get(...) ResponsePtr
}
class KeeperStateMachine {
-storage: KeeperStoragePtr
+commit(LogEntry) void
+createSnapshot() SnapshotPtr
+applySnapshot(Snapshot) void
}
QueryPlan --> QueryPipelineBuilder: builds
QueryPipelineBuilder --> QueryPipeline: creates
QueryPipeline --> Pipe: contains
Pipe --> IProcessor: contains
QueryPipeline --> PipelineExecutor: executes
PipelineExecutor --> ExecutingGraph: uses
KeeperServer --> KeeperStorage: manages
KeeperServer --> KeeperStateMachine: uses
KeeperStateMachine --> KeeperStorage: operates
实战经验
构建和执行管道
// 从 QueryPlan 构建管道
QueryPlan plan;
// ... 构建 plan
BuildQueryPipelineSettings settings;
settings.actions_settings.max_threads = 8;
auto pipeline_builder = QueryPipelineBuilder::buildQueryPipeline(plan, settings);
QueryPipeline pipeline = std::move(pipeline_builder).getPipeline();
// 执行管道
auto executor = pipeline.execute();
executor->execute(8); // 8 个线程
使用 ClickHouse Keeper
// 创建 Keeper 客户端
auto keeper_client = std::make_shared<KeeperClient>(
"localhost:9181",
session_timeout_ms
);
// 创建节点
keeper_client->create(
"/my_app/config",
"config_data",
zkutil::CreateMode::Persistent
);
// 获取节点数据
auto result = keeper_client->get("/my_app/config");
std::string data = result.data;
// 监听节点变化
keeper_client->exists(
"/my_app/config",
[](const auto & event) {
std::cout << "Node changed: " << event.path << std::endl;
}
);
// 创建临时节点(用于分布式锁)
keeper_client->create(
"/locks/my_lock",
"lock_holder",
zkutil::CreateMode::Ephemeral
);
分布式锁实现
class DistributedLock {
public:
DistributedLock(zkutil::ZooKeeperPtr keeper, const std::string & path)
: keeper(keeper), lock_path(path) {}
bool tryLock() {
try {
// 创建临时顺序节点
created_path = keeper->create(
lock_path + "/lock_",
"",
zkutil::CreateMode::EphemeralSequential
);
// 获取所有子节点
auto children = keeper->getChildren(lock_path);
std::sort(children.begin(), children.end());
// 检查是否是最小节点
std::string my_node = created_path.substr(lock_path.size() + 1);
if (children[0] == my_node) {
return true; // 获得锁
}
// 等待前一个节点删除
size_t my_index = std::find(children.begin(), children.end(), my_node) - children.begin();
std::string prev_node = lock_path + "/" + children[my_index - 1];
keeper->exists(prev_node, [this](const auto &) {
// 前一个节点删除,重新尝试
tryLock();
});
return false;
}
catch (const Exception & e) {
return false;
}
}
void unlock() {
if (!created_path.empty()) {
keeper->remove(created_path);
created_path.clear();
}
}
private:
zkutil::ZooKeeperPtr keeper;
std::string lock_path;
std::string created_path;
};
总结
QueryPipeline 和 Coordination 模块是 ClickHouse 的高级功能组件:
QueryPipeline 模块提供:
- 管道构建:将 QueryPlan 转换为可执行管道
- 并行执行:多线程并行处理数据流
- 动态调度:根据 Processor 状态智能调度
- 流式处理:边读边处理,低内存占用
- 执行优化:调整并行度、合并流
Coordination 模块提供:
- 分布式协调:ClickHouse Keeper 替代 ZooKeeper
- 强一致性:基于 Raft 算法
- ZooKeeper 兼容:可直接替换 ZooKeeper
- 高性能:针对 ClickHouse 优化
- 易维护:C++ 实现,与 ClickHouse 集成
两个模块分别支持查询执行和分布式协调,是 ClickHouse 高性能和高可用的关键。