ClickHouse-12-Client与Backups模块
模块概览
职责
Client 模块负责:
- 提供命令行客户端(clickhouse-client)
- 实现与服务器的通信协议
- 支持交互式查询和批量查询
- 提供查询结果的格式化输出
- 支持本地查询处理
Backups 模块负责:
- 实现数据库备份和恢复功能
- 支持全量备份和增量备份
- 支持多种备份目标(本地、S3、HDFS)
- 提供备份元数据管理
- 支持并行备份和恢复
模块架构图
flowchart TB
subgraph Client["Client 模块"]
subgraph ClientCore["客户端核心"]
ClickHouseClient[ClickHouseClient<br/>命令行客户端]
Connection[Connection<br/>连接管理]
QueryProcessor[QueryProcessor<br/>查询处理器]
end
subgraph ClientFeatures["客户端功能"]
Interactive[Interactive<br/>交互模式]
Batch[Batch<br/>批处理模式]
Format[Format<br/>格式化输出]
History[History<br/>历史记录]
Completion[Completion<br/>自动补全]
end
subgraph ClientProtocol["协议实现"]
TCPProtocol[TCP 协议<br/>Native]
HTTPProtocol[HTTP 协议<br/>REST API]
Compression[压缩支持]
end
end
subgraph Backups["Backups 模块"]
subgraph BackupCore["备份核心"]
BackupCoordinator[BackupCoordinator<br/>备份协调器]
BackupEngine[BackupEngine<br/>备份引擎]
RestoreEngine[RestoreEngine<br/>恢复引擎]
end
subgraph BackupTypes["备份类型"]
FullBackup[Full Backup<br/>全量备份]
IncrementalBackup[Incremental Backup<br/>增量备份]
end
subgraph BackupTargets["备份目标"]
LocalDisk[Local Disk<br/>本地磁盘]
S3Storage[S3<br/>对象存储]
HDFSStorage[HDFS<br/>分布式存储]
end
subgraph BackupMetadata["元数据管理"]
BackupMetadata[BackupMetadata<br/>备份元数据]
BackupEntry[BackupEntry<br/>备份条目]
BackupSettings[BackupSettings<br/>备份设置]
end
end
User[用户] --> ClickHouseClient
ClickHouseClient --> Connection
Connection --> TCPProtocol
Connection --> HTTPProtocol
ClickHouseClient --> Interactive
ClickHouseClient --> Batch
ClickHouseClient --> Format
QueryProcessor --> Server[Server 模块]
BackupCoordinator --> BackupEngine
BackupCoordinator --> RestoreEngine
BackupEngine --> BackupTypes
BackupEngine --> BackupTargets
BackupEngine --> BackupMetadata
RestoreEngine --> BackupMetadata
RestoreEngine --> StorageEngines[Storage Engines]
架构说明
图意概述
Client 模块提供了命令行客户端工具。ClickHouseClient 是主入口,通过 Connection 与服务器通信,支持 TCP 和 HTTP 两种协议。客户端支持交互式和批处理两种模式,提供丰富的输出格式和历史记录功能。
Backups 模块实现了完整的备份恢复系统。BackupCoordinator 协调整个备份过程,BackupEngine 执行实际的备份操作,支持全量和增量备份,可以备份到本地磁盘、S3 或 HDFS。BackupMetadata 管理备份的元信息。
关键字段与接口
ClickHouseClient 类(简化版)
class Client {
public:
Client() = default;
// 初始化
void initialize(const Poco::Util::Application & app);
// 主循环
int main(const std::vector<std::string> & args);
// 连接到服务器
void connect();
// 执行查询
void processQuery(const String & query);
// 交互模式
void runInteractive();
// 批处理模式
void runBatch();
private:
// 连接参数
String host = "localhost";
UInt16 port = 9000;
String user = "default";
String password;
String database = "default";
// 客户端配置
bool is_interactive = true;
String output_format = "PrettyCompact";
bool multiline = false;
String history_file;
// 连接对象
std::unique_ptr<Connection> connection;
// 处理单个查询
void processQueryImpl(const String & query);
// 显示结果
void displayResult(const Block & block);
// 处理异常
void onException(const Exception & e);
};
Connection 类
class Connection {
public:
Connection(
const String & host_,
UInt16 port_,
const String & default_database_,
const String & user_,
const String & password_,
const String & client_name_);
// 连接
void connect(const ConnectionTimeouts & timeouts);
void disconnect();
// 发送查询
void sendQuery(
const String & query,
const String & query_id,
QueryProcessingStage::Enum stage,
const Settings * settings,
const ClientInfo * client_info,
bool with_pending_data);
// 接收数据
bool poll(size_t timeout_microseconds);
Packet receivePacket();
// 取消查询
void sendCancel();
// 获取服务器信息
const String & getServerName() const { return server_name; }
const String & getServerVersion() const { return server_version; }
private:
String host;
UInt16 port;
String default_database;
String user;
String password;
std::unique_ptr<ReadBuffer> in;
std::unique_ptr<WriteBuffer> out;
String server_name;
String server_version;
// 协议实现
void sendHello();
void receiveHello();
void sendData(const Block & block);
Block receiveData();
};
BackupCoordinator 类
class BackupCoordinator {
public:
BackupCoordinator(const ContextPtr & context_);
// 创建备份
BackupPtr createBackup(const BackupInfo & backup_info);
// 恢复备份
void restore(const BackupPtr & backup, const RestoreSettings & settings);
// 列出备份
std::vector<BackupInfo> listBackups() const;
// 删除备份
void removeBackup(const String & backup_name);
private:
ContextPtr context;
std::unique_ptr<BackupEngine> backup_engine;
std::unique_ptr<RestoreEngine> restore_engine;
};
BackupEngine 类
class BackupEngine {
public:
// 创建备份
BackupPtr createBackup(
const BackupInfo & backup_info,
const BackupSettings & settings);
// 备份数据库
void backupDatabase(
const String & database_name,
BackupEntryPtr & backup_entry);
// 备份表
void backupTable(
const StorageID & table_id,
BackupEntryPtr & backup_entry);
// 备份数据部分
void backupDataParts(
const StoragePtr & storage,
const DataPartsVector & data_parts,
BackupEntryPtr & backup_entry);
private:
// 写入备份元数据
void writeMetadata(
BackupPtr backup,
const BackupMetadata & metadata);
// 复制文件
void copyFile(
const String & source_path,
BackupPtr backup,
const String & dest_path);
};
Backup 类(接口)
class IBackup {
public:
virtual ~IBackup() = default;
// 备份名称
virtual String getName() const = 0;
// 写入文件
virtual void writeFile(const String & file_name, ReadBuffer & in) = 0;
// 读取文件
virtual std::unique_ptr<ReadBuffer> readFile(const String & file_name) const = 0;
// 文件是否存在
virtual bool fileExists(const String & file_name) const = 0;
// 列出文件
virtual Strings listFiles() const = 0;
// 完成备份
virtual void finalize() = 0;
};
class BackupToLocal : public IBackup {
public:
BackupToLocal(const String & path_);
void writeFile(const String & file_name, ReadBuffer & in) override;
std::unique_ptr<ReadBuffer> readFile(const String & file_name) const override;
bool fileExists(const String & file_name) const override;
private:
String path;
};
class BackupToS3 : public IBackup {
public:
BackupToS3(const String & bucket_, const String & path_);
void writeFile(const String & file_name, ReadBuffer & in) override;
std::unique_ptr<ReadBuffer> readFile(const String & file_name) const override;
private:
String bucket;
String path;
std::shared_ptr<S3Client> s3_client;
};
边界条件
Client
- 最大查询长度:几 MB
- 历史记录大小:默认 10000 条
- 连接超时:默认 10 秒
- 读取超时:默认 300 秒
Backups
- 最大备份大小:取决于存储
- 并行备份线程:默认 16
- 压缩级别:1-9
- 增量备份间隔:小时级到天级
异常与回退
Client 异常
- CONNECTION_FAILED:连接失败
- NETWORK_ERROR:网络错误
- QUERY_WAS_CANCELLED:查询被取消
- TIMEOUT_EXCEEDED:超时
Backups 异常
- BACKUP_ALREADY_EXISTS:备份已存在
- BACKUP_NOT_FOUND:备份不存在
- BACKUP_IS_EMPTY:备份为空
- RESTORE_FAILED:恢复失败
核心 API 详解
API 1: Client::processQuery - 执行查询
基本信息
- 名称:
processQuery() - 用途: 执行 SQL 查询并显示结果
- 模式: 支持交互式和批处理
实现流程
void Client::processQuery(const String & query) {
try {
// 1) 发送查询
connection->sendQuery(
query,
query_id,
QueryProcessingStage::Complete,
&settings,
&client_info,
false
);
// 2) 接收结果
while (true) {
Packet packet = connection->receivePacket();
switch (packet.type) {
case Protocol::Server::Data:
// 显示数据块
displayResult(packet.block);
break;
case Protocol::Server::Progress:
// 显示进度
displayProgress(packet.progress);
break;
case Protocol::Server::ProfileInfo:
// 显示性能信息
displayProfileInfo(packet.profile_info);
break;
case Protocol::Server::Exception:
// 处理异常
throw packet.exception;
case Protocol::Server::EndOfStream:
// 查询完成
return;
default:
break;
}
}
}
catch (const Exception & e) {
onException(e);
}
}
void Client::displayResult(const Block & block) {
if (block.rows() == 0)
return;
// 1) 创建输出格式
auto output_format = FormatFactory::instance().getOutput(
output_format_name,
*out_buf,
block.cloneEmpty(),
context
);
// 2) 写入数据
output_format->write(block);
output_format->finalize();
// 3) 刷新输出
out_buf->next();
}
void Client::runInteractive() {
// 1) 加载历史记录
LineReader line_reader(history_file);
// 2) 交互循环
while (true) {
// 读取一行
String line = line_reader.readLine("localhost :) ", "");
if (line.empty())
continue;
if (line == "exit" || line == "quit")
break;
// 处理多行查询
if (multiline && !ends_with_semicolon(line)) {
String full_query = line;
while (true) {
String next_line = line_reader.readLine(":-] ", "");
full_query += "\n" + next_line;
if (ends_with_semicolon(next_line))
break;
}
processQuery(full_query);
}
else {
processQuery(line);
}
// 保存到历史
line_reader.addToHistory(line);
}
}
API 2: BackupEngine::createBackup - 创建备份
基本信息
- 名称:
createBackup() - 用途: 创建数据库备份
- 类型: 支持全量和增量备份
实现流程
BackupPtr BackupEngine::createBackup(
const BackupInfo & backup_info,
const BackupSettings & settings)
{
// 1) 创建备份对象
BackupPtr backup;
if (backup_info.destination.type == BackupDestination::Local)
backup = std::make_shared<BackupToLocal>(backup_info.destination.path);
else if (backup_info.destination.type == BackupDestination::S3)
backup = std::make_shared<BackupToS3>(backup_info.destination.bucket, backup_info.destination.path);
// 2) 收集要备份的对象
BackupMetadata metadata;
metadata.backup_name = backup_info.name;
metadata.timestamp = std::time(nullptr);
metadata.clickhouse_version = VERSION_STRING;
// 3) 备份数据库
for (const auto & database_name : backup_info.databases) {
BackupEntryPtr database_entry = std::make_shared<BackupEntry>();
database_entry->type = BackupEntry::Database;
database_entry->name = database_name;
backupDatabase(database_name, database_entry);
metadata.entries.push_back(database_entry);
}
// 4) 备份表
for (const auto & table_id : backup_info.tables) {
BackupEntryPtr table_entry = std::make_shared<BackupEntry>();
table_entry->type = BackupEntry::Table;
table_entry->table_id = table_id;
backupTable(table_id, table_entry);
metadata.entries.push_back(table_entry);
}
// 5) 写入元数据
writeMetadata(backup, metadata);
// 6) 完成备份
backup->finalize();
return backup;
}
void BackupEngine::backupTable(
const StorageID & table_id,
BackupEntryPtr & backup_entry)
{
// 1) 获取表
auto table = DatabaseCatalog::instance().getTable(table_id, context);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
// 2) 备份表结构
String create_query = getCreateQuery(table_id);
backup_entry->create_query = create_query;
WriteBufferFromOwnString create_query_buf;
writeString(create_query, create_query_buf);
String metadata_path = "metadata/" + table_id.database_name + "/" + table_id.table_name + ".sql";
backup->writeFile(metadata_path, create_query_buf);
// 3) 备份数据(如果是 MergeTree)
if (auto * merge_tree = dynamic_cast<StorageMergeTree *>(table.get())) {
auto data_parts = merge_tree->getDataPartsVector();
for (const auto & part : data_parts) {
String part_path = table->getRelativeDataPath() + part->name + "/";
// 备份每个数据部分的文件
for (const auto & file : part->getFileNamesWithoutChecksums()) {
String source_path = part->getFullPath() + file;
String dest_path = "data/" + table_id.database_name + "/" + table_id.table_name + "/" + part->name + "/" + file;
copyFile(source_path, backup, dest_path);
}
backup_entry->data_parts.push_back(part->name);
}
}
}
void BackupEngine::copyFile(
const String & source_path,
BackupPtr backup,
const String & dest_path)
{
// 1) 打开源文件
ReadBufferFromFile in(source_path);
// 2) 可选压缩
std::unique_ptr<ReadBuffer> compressed_in;
if (settings.compression_method != CompressionMethod::None) {
compressed_in = std::make_unique<CompressedReadBuffer>(in);
backup->writeFile(dest_path, *compressed_in);
}
else {
backup->writeFile(dest_path, in);
}
}
API 3: RestoreEngine::restore - 恢复备份
基本信息
- 名称:
restore() - 用途: 从备份恢复数据
- 模式: 支持完整恢复和选择性恢复
实现
void RestoreEngine::restore(
const BackupPtr & backup,
const RestoreSettings & settings)
{
// 1) 读取备份元数据
auto metadata_in = backup->readFile(".backup_metadata");
BackupMetadata metadata;
readMetadata(*metadata_in, metadata);
// 2) 验证备份
if (metadata.clickhouse_version != VERSION_STRING)
LOG_WARNING(log, "Backup was created with different ClickHouse version");
// 3) 恢复数据库
for (const auto & entry : metadata.entries) {
if (entry->type == BackupEntry::Database) {
restoreDatabase(backup, entry);
}
}
// 4) 恢复表
for (const auto & entry : metadata.entries) {
if (entry->type == BackupEntry::Table) {
restoreTable(backup, entry, settings);
}
}
}
void RestoreEngine::restoreTable(
const BackupPtr & backup,
const BackupEntryPtr & entry,
const RestoreSettings & settings)
{
const auto & table_id = entry->table_id;
// 1) 读取表结构
String metadata_path = "metadata/" + table_id.database_name + "/" + table_id.table_name + ".sql";
auto metadata_in = backup->readFile(metadata_path);
String create_query;
readString(create_query, *metadata_in);
// 2) 创建表
if (settings.create_table) {
auto ast = parseQuery(create_query);
InterpreterCreateQuery interpreter(ast, context);
interpreter.execute();
}
// 3) 恢复数据
auto table = DatabaseCatalog::instance().getTable(table_id, context);
for (const auto & part_name : entry->data_parts) {
String part_path = "data/" + table_id.database_name + "/" + table_id.table_name + "/" + part_name + "/";
String dest_path = table->getRelativeDataPath() + part_name + "/";
// 复制数据部分的所有文件
auto files = backup->listFiles(part_path);
for (const auto & file : files) {
auto file_in = backup->readFile(part_path + file);
String dest_file_path = dest_path + file;
WriteBufferFromFile out(dest_file_path);
copyData(*file_in, out);
out.sync();
}
}
// 4) 附加数据部分
if (auto * merge_tree = dynamic_cast<StorageMergeTree *>(table.get())) {
for (const auto & part_name : entry->data_parts) {
merge_tree->attachPart(part_name);
}
}
}
数据结构 UML 图
classDiagram
class Client {
-host: String
-port: UInt16
-user: String
-connection: Connection
+connect() void
+processQuery(String) void
+runInteractive() void
+runBatch() void
}
class Connection {
-host: String
-port: UInt16
-in: ReadBuffer
-out: WriteBuffer
+sendQuery(...) void
+receivePacket() Packet
+sendCancel() void
}
class BackupCoordinator {
-context: ContextPtr
-backup_engine: BackupEngine
-restore_engine: RestoreEngine
+createBackup(BackupInfo) BackupPtr
+restore(BackupPtr, RestoreSettings) void
+listBackups() vector~BackupInfo~
}
class BackupEngine {
+createBackup(...) BackupPtr
+backupDatabase(String, BackupEntry) void
+backupTable(StorageID, BackupEntry) void
-copyFile(String, Backup, String) void
}
class RestoreEngine {
+restore(BackupPtr, RestoreSettings) void
+restoreDatabase(BackupPtr, BackupEntry) void
+restoreTable(BackupPtr, BackupEntry, RestoreSettings) void
}
class IBackup {
<<interface>>
+getName() String
+writeFile(String, ReadBuffer) void
+readFile(String) ReadBufferPtr
+fileExists(String) bool
+listFiles() Strings
+finalize() void
}
class BackupToLocal {
-path: String
+writeFile(...) void
+readFile(...) ReadBufferPtr
}
class BackupToS3 {
-bucket: String
-path: String
-s3_client: S3ClientPtr
+writeFile(...) void
+readFile(...) ReadBufferPtr
}
class BackupMetadata {
+backup_name: String
+timestamp: time_t
+clickhouse_version: String
+entries: vector~BackupEntryPtr~
}
class BackupEntry {
+type: EntryType
+name: String
+table_id: StorageID
+create_query: String
+data_parts: vector~String~
}
Client --> Connection: uses
BackupCoordinator --> BackupEngine: uses
BackupCoordinator --> RestoreEngine: uses
BackupEngine --> IBackup: creates
RestoreEngine --> IBackup: uses
IBackup <|-- BackupToLocal
IBackup <|-- BackupToS3
BackupEngine --> BackupMetadata: creates
BackupMetadata --> BackupEntry: contains
实战经验
使用 clickhouse-client
# 连接到服务器
clickhouse-client --host localhost --port 9000 --user default --password ''
# 执行单个查询
clickhouse-client --query "SELECT * FROM system.databases"
# 执行文件中的查询
clickhouse-client < queries.sql
# 指定输出格式
clickhouse-client --format CSV --query "SELECT * FROM users"
# 多行查询
clickhouse-client --multiline
客户端配置
<!-- ~/.clickhouse-client/config.xml -->
<clickhouse>
<host>localhost</host>
<port>9000</port>
<user>default</user>
<password></password>
<format>PrettyCompact</format>
<multiline>true</multiline>
<history_file>~/.clickhouse-client-history</history_file>
<compression>true</compression>
</clickhouse>
创建备份
-- 备份整个数据库
BACKUP DATABASE my_database TO Disk('backups', 'my_database_backup');
-- 备份特定表
BACKUP TABLE my_database.users TO Disk('backups', 'users_backup');
-- 备份到 S3
BACKUP DATABASE my_database TO S3('s3://my-bucket/backups/my_database', 'access_key', 'secret_key');
-- 增量备份
BACKUP DATABASE my_database TO Disk('backups', 'my_database_incremental') SETTINGS base_backup = 'my_database_backup';
恢复备份
-- 恢复数据库
RESTORE DATABASE my_database FROM Disk('backups', 'my_database_backup');
-- 恢复表
RESTORE TABLE my_database.users FROM Disk('backups', 'users_backup');
-- 恢复到不同的数据库
RESTORE DATABASE my_database AS my_database_restored FROM Disk('backups', 'my_database_backup');
-- 从 S3 恢复
RESTORE DATABASE my_database FROM S3('s3://my-bucket/backups/my_database', 'access_key', 'secret_key');
使用 Python 客户端
from clickhouse_driver import Client
# 连接
client = Client('localhost')
# 执行查询
result = client.execute('SELECT * FROM users WHERE age > 18')
# 插入数据
client.execute('INSERT INTO users VALUES', [(1, 'Alice', 25), (2, 'Bob', 30)])
# 使用参数
result = client.execute('SELECT * FROM users WHERE age > %(age)s', {'age': 18})
# 流式读取大结果
for row in client.execute_iter('SELECT * FROM large_table'):
process(row)
总结
Client 和 Backups 模块是 ClickHouse 的运维工具:
Client 模块提供:
- 命令行工具:clickhouse-client 交互式客户端
- 多种协议:TCP(Native)和 HTTP 支持
- 丰富功能:历史记录、自动补全、多行查询
- 灵活输出:支持 20+ 种输出格式
- 批处理:支持脚本化查询
Backups 模块提供:
- 全量备份:完整的数据库/表备份
- 增量备份:基于前一次备份的增量
- 多种目标:本地磁盘、S3、HDFS
- 并行备份:多线程加速备份过程
- 元数据管理:完整的备份信息记录
两个模块为 ClickHouse 提供便捷的运维和数据保护能力。