1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
| /**
* MySQL二进制日志系统
* 用于主从复制和点时间恢复
*
* 基于《MySQL Binlog原理和源码解析》深度文章
*/
class MYSQL_BIN_LOG : public TC_LOG {
private:
// ========== 文件管理 ==========
/** 当前二进制日志文件句柄 */
Binlog_ofile *m_binlog_file;
/** 日志文件名模板 */
char *name;
/** 当前日志文件全路径 */
char log_file_name[FN_REFLEN];
/** 日志索引文件管理器 */
Binlog_index *index_file;
// ========== 缓存管理 ==========
/** 事务级别的binlog缓存 */
binlog_cache_data *stmt_cache;
/** 语句级别的binlog缓存 */
binlog_cache_data *trx_cache;
// ========== 同步控制 ==========
/** 主binlog互斥锁 */
mysql_mutex_t LOCK_log;
/** binlog结束位置锁 */
mysql_mutex_t LOCK_binlog_end_pos;
/** 组提交相关锁 */
mysql_mutex_t LOCK_commit_queue;
mysql_mutex_t LOCK_after_commit_queue;
mysql_mutex_t LOCK_done;
// ========== 组提交管理 ==========
/** 提交阶段管理器 */
Commit_stage_manager stage_manager;
/** 当前文件大小 */
std::atomic<my_off_t> bytes_written;
/** 是否启用组提交 */
bool group_commit_enabled;
public:
/**
* 构造函数:初始化二进制日志系统
*/
MYSQL_BIN_LOG() : m_binlog_file(nullptr), name(nullptr),
stmt_cache(nullptr), trx_cache(nullptr),
group_commit_enabled(true) {
// 初始化互斥锁
mysql_mutex_init(key_LOCK_log, &LOCK_log, MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_commit_queue, &LOCK_commit_queue, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_after_commit_queue, &LOCK_after_commit_queue, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_done, &LOCK_done, MY_MUTEX_INIT_FAST);
bytes_written.store(0);
}
/**
* 打开二进制日志文件
* @param log_name 日志文件名前缀
* @param new_name 新文件名(输出)
* @return 操作结果
*/
int open(const char *log_name, const char **new_name) {
DBUG_TRACE;
// 1. 构造日志文件名
create_log_file_name(log_file_name, sizeof(log_file_name),
log_name, get_next_log_index());
// 2. 创建并打开日志文件
m_binlog_file = new Binlog_ofile();
if (!m_binlog_file->open(log_file_name,
MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL))) {
LogErr(ERROR_LEVEL, ER_CANT_OPEN_FILE, log_file_name, errno);
return 1;
}
// 3. 写入格式描述事件
Format_description_log_event format_event;
if (write_event_to_file(&format_event)) {
return 1;
}
// 4. 更新索引文件
if (index_file->add_log_file_name(log_file_name)) {
return 1;
}
// 5. 重置文件大小计数器
bytes_written.store(format_event.get_event_len());
*new_name = log_file_name;
LogInfo(BINLOG_LOG, "Opened binary log file: %s", log_file_name);
return 0;
}
/**
* 写入事务到二进制日志
* 实现组提交优化,提高写入性能
* @param thd 线程句柄
* @param all 是否为完整事务提交
* @return 操作结果
*/
int commit(THD *thd, bool all) override {
DBUG_TRACE;
// 1. 检查是否需要记录binlog
if (!should_log_transaction(thd, all)) {
return 0;
}
// 2. 准备事务的binlog缓存
binlog_cache_data *cache_data = get_binlog_cache_data(thd, all);
if (cache_data->is_empty()) {
return 0; // 没有需要记录的事件
}
// 3. 进入组提交流程
if (group_commit_enabled) {
return group_commit_transaction(thd, cache_data, all);
} else {
return single_commit_transaction(thd, cache_data, all);
}
}
/**
* 组提交实现:三阶段提交优化
* FLUSH阶段 -> SYNC阶段 -> COMMIT阶段
* @param thd 线程句柄
* @param cache_data binlog缓存
* @param all 是否完整提交
* @return 操作结果
*/
int group_commit_transaction(THD *thd, binlog_cache_data *cache_data, bool all) {
// 阶段1:FLUSH - 将事务数据写入binlog文件
Stage_manager::flush_error_t flush_err =
stage_manager.enroll_for(Stage_manager::FLUSH_STAGE, thd, nullptr);
if (flush_err == Stage_manager::FLUSH_ERROR) {
return 1;
}
// 作为flush leader,负责刷新所有排队的事务
if (stage_manager.is_flush_leader(thd)) {
flush_err = flush_thread_caches_to_file(thd);
}
// 阶段2:SYNC - 将binlog文件同步到磁盘
Stage_manager::sync_error_t sync_err =
stage_manager.enroll_for(Stage_manager::SYNC_STAGE, thd, nullptr);
if (sync_err == Stage_manager::SYNC_ERROR) {
return 1;
}
if (stage_manager.is_sync_leader(thd)) {
sync_err = sync_binlog_file_to_disk();
}
// 阶段3:COMMIT - 提交存储引擎事务
Stage_manager::commit_error_t commit_err =
stage_manager.enroll_for(Stage_manager::COMMIT_STAGE, thd, nullptr);
if (stage_manager.is_commit_leader(thd)) {
commit_err = commit_storage_engines(thd);
}
// 等待所有阶段完成
stage_manager.wait_for_completion(thd);
LogDebug(BINLOG_LOG, "Group commit completed for transaction %lu",
thd->thread_id());
return (flush_err == Stage_manager::FLUSH_OK &&
sync_err == Stage_manager::SYNC_OK &&
commit_err == Stage_manager::COMMIT_OK) ? 0 : 1;
}
/**
* 写入单个事件到binlog文件
* @param event_info 事件信息
* @return 操作结果
*/
int write_event_to_binlog(Log_event *event_info) {
mysql_mutex_lock(&LOCK_log);
// 1. 检查文件大小,是否需要轮转
if (should_rotate_log()) {
int err = rotate_log_file();
if (err) {
mysql_mutex_unlock(&LOCK_log);
return err;
}
}
// 2. 获取当前文件位置
my_off_t current_pos = m_binlog_file->position();
// 3. 写入事件到文件
int err = event_info->write(m_binlog_file);
if (err) {
mysql_mutex_unlock(&LOCK_log);
LogErr(ERROR_LEVEL, ER_ERROR_ON_WRITE, log_file_name, err);
return err;
}
// 4. 更新文件大小
bytes_written.fetch_add(event_info->get_event_len());
// 5. 更新binlog位置
update_binlog_end_pos(log_file_name, m_binlog_file->position());
mysql_mutex_unlock(&LOCK_log);
LogDebug(BINLOG_LOG, "Wrote event type=%d, size=%lu to binlog at pos=%llu",
event_info->get_type_code(), event_info->get_event_len(), current_pos);
return 0;
}
/**
* 轮转日志文件
* 当当前文件达到最大大小时创建新文件
* @return 操作结果
*/
int rotate_log_file() {
LogInfo(BINLOG_LOG, "Rotating binary log file from %s", log_file_name);
// 1. 写入轮转事件
Rotate_log_event rotate_event(log_file_name, 0,
Rotate_log_event::DUP_NAME);
int err = write_event_to_file(&rotate_event);
if (err) return err;
// 2. 关闭当前文件
if (m_binlog_file) {
m_binlog_file->close();
delete m_binlog_file;
}
// 3. 创建新的日志文件
create_log_file_name(log_file_name, sizeof(log_file_name),
name, get_next_log_index());
m_binlog_file = new Binlog_ofile();
if (!m_binlog_file->open(log_file_name,
MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL))) {
return 1;
}
// 4. 写入格式描述事件到新文件
Format_description_log_event format_event;
err = write_event_to_file(&format_event);
if (err) return err;
// 5. 更新索引文件
err = index_file->add_log_file_name(log_file_name);
if (err) return err;
// 6. 重置文件大小计数器
bytes_written.store(format_event.get_event_len());
LogInfo(BINLOG_LOG, "Rotated to new binary log file: %s", log_file_name);
return 0;
}
private:
/**
* 检查是否应该轮转日志
*/
bool should_rotate_log() const {
return bytes_written.load() >= max_binlog_size;
}
/**
* 刷新线程缓存到文件
*/
Stage_manager::flush_error_t flush_thread_caches_to_file(THD *thd) {
// 获取flush阶段的所有线程
std::vector<THD*> flush_queue = stage_manager.get_flush_queue();
for (THD *queue_thd : flush_queue) {
binlog_cache_data *cache_data = get_binlog_cache_data(queue_thd, true);
if (!cache_data->is_empty()) {
int err = cache_data->flush_to_file(m_binlog_file);
if (err) {
return Stage_manager::FLUSH_ERROR;
}
}
}
return Stage_manager::FLUSH_OK;
}
/**
* 同步binlog文件到磁盘
*/
Stage_manager::sync_error_t sync_binlog_file_to_disk() {
if (sync_binlog_period > 0) {
int err = m_binlog_file->sync();
if (err) {
LogErr(ERROR_LEVEL, ER_ERROR_ON_WRITE, log_file_name, err);
return Stage_manager::SYNC_ERROR;
}
}
return Stage_manager::SYNC_OK;
}
/**
* 提交存储引擎事务
*/
Stage_manager::commit_error_t commit_storage_engines(THD *thd) {
std::vector<THD*> commit_queue = stage_manager.get_commit_queue();
for (THD *queue_thd : commit_queue) {
// 调用存储引擎的提交接口
int err = ha_commit_low(queue_thd, true);
if (err) {
LogErr(ERROR_LEVEL, ER_ERROR_DURING_COMMIT, err);
return Stage_manager::COMMIT_ERROR;
}
}
return Stage_manager::COMMIT_OK;
}
};
/**
* Binlog事件类型管理器
* 管理不同类型的binlog事件
*/
class Binlog_event_manager {
public:
/**
* 创建查询事件
* @param thd 线程句柄
* @param query SQL查询字符串
* @param query_len 查询长度
* @return 查询事件对象
*/
static Query_log_event* create_query_event(THD *thd, const char *query,
size_t query_len) {
// 构造查询事件
Query_log_event *event = new Query_log_event(
thd, // 线程句柄
query, // SQL语句
query_len, // 语句长度
false, // 不使用缓存
true, // 抑制use语句
true, // 立即刷新
true // 记录GTID
);
LogDebug(BINLOG_LOG, "Created query event: %.*s",
static_cast<int>(std::min(query_len, size_t(100))), query);
return event;
}
/**
* 创建行事件(RBR模式)
* @param thd 线程句柄
* @param table 表对象
* @param event_type 事件类型
* @return 行事件对象
*/
static Rows_log_event* create_rows_event(THD *thd, TABLE *table,
Log_event_type event_type) {
Rows_log_event *event = nullptr;
switch (event_type) {
case WRITE_ROWS_EVENT:
event = new Write_rows_log_event(thd, table, true, false);
break;
case UPDATE_ROWS_EVENT:
event = new Update_rows_log_event(thd, table, true);
break;
case DELETE_ROWS_EVENT:
event = new Delete_rows_log_event(thd, table, true);
break;
default:
ut_error; // 不支持的事件类型
}
LogDebug(BINLOG_LOG, "Created rows event type=%d for table %s.%s",
event_type, table->s->db.str, table->s->table_name.str);
return event;
}
/**
* 创建事务开始事件
* @param thd 线程句柄
* @return GTID事件对象
*/
static Gtid_log_event* create_transaction_begin_event(THD *thd) {
// 生成GTID
Gtid gtid;
generate_gtid_for_transaction(thd, >id);
// 创建GTID事件
Gtid_log_event *event = new Gtid_log_event(
thd->server_id, // 服务器ID
true, // 是否提交标志
gtid, // GTID
true, // 立即刷新
0 // 最后提交时间戳
);
LogDebug(BINLOG_LOG, "Created GTID event for transaction begin: %s",
gtid.to_string().c_str());
return event;
}
};
|