MySQL Server 源码剖析 - 复制与Binlog-API详解

一、模块概述

复制与 Binlog 层是 MySQL Server 的数据一致性和高可用性核心,负责记录数据变更、传输复制数据和维护主从同步。该层主要包含二进制日志(Binary Log)管理、复制协议处理、GTID(Global Transaction Identifier)管理和多种复制模式的实现。

二、核心架构图

flowchart TB
    subgraph "主服务器 (Source)"
        subgraph "SQL执行层"
            SQL_CMD[SQL命令执行]
            TXN_MGR[事务管理器]
        end
        
        subgraph "Binlog写入"
            BINLOG_MGR[MYSQL_BIN_LOG]
            EVENT_CACHE[事件缓存]
            GROUP_COMMIT[组提交机制]
        end
        
        subgraph "Binlog发送"
            DUMP_THREAD[Dump线程]
            BINLOG_SENDER[Binlog_sender]
            GTID_SET[GTID集合管理]
        end
    end
    
    subgraph "从服务器 (Replica)"
        subgraph "IO线程"
            IO_THREAD[IO Thread]
            RELAY_LOG[中继日志]
            MASTER_INFO[Master_info]
        end
        
        subgraph "SQL线程"
            SQL_THREAD[SQL Thread]
            APPLIER[事件应用器]
            RELAY_LOG_INFO[Relay_log_info]
        end
        
        subgraph "并行复制"
            COORDINATOR[协调器]
            WORKER[Worker线程]
            MTS[多线程复制]
        end
    end
    
    subgraph "存储引擎"
        INNODB[InnoDB]
        DISK[磁盘存储]
    end
    
    SQL_CMD --> TXN_MGR
    TXN_MGR --> BINLOG_MGR
    BINLOG_MGR --> EVENT_CACHE
    EVENT_CACHE --> GROUP_COMMIT
    GROUP_COMMIT --> DISK
    
    BINLOG_MGR --> DUMP_THREAD
    DUMP_THREAD --> BINLOG_SENDER
    BINLOG_SENDER --> GTID_SET
    
    BINLOG_SENDER -.->|网络传输| IO_THREAD
    IO_THREAD --> RELAY_LOG
    IO_THREAD --> MASTER_INFO
    
    RELAY_LOG --> SQL_THREAD
    SQL_THREAD --> APPLIER
    SQL_THREAD --> RELAY_LOG_INFO
    
    SQL_THREAD --> COORDINATOR
    COORDINATOR --> WORKER
    WORKER --> MTS
    
    APPLIER --> INNODB
    
    style BINLOG_MGR fill:#e8f5e9
    style DUMP_THREAD fill:#fff4e1
    style IO_THREAD fill:#f3e5f5
    style SQL_THREAD fill:#e1f5ff

三、二进制日志管理 API 详解

3.1 MYSQL_BIN_LOG 类

类结构概述

class MYSQL_BIN_LOG : public TC_LOG {
private:
  // 日志文件管理
  IO_CACHE log_file;                    // 当前日志文件缓存
  char *name;                           // 当前日志文件名
  std::atomic<my_off_t> binlog_end_pos; // 二进制日志结束位置
  
  // 索引文件管理
  IO_CACHE index_file;                  // 索引文件缓存
  char *index_file_name;                // 索引文件名
  
  // 并发控制
  mysql_mutex_t LOCK_log;               // 日志写入锁
  mysql_mutex_t LOCK_index;             // 索引文件锁
  mysql_mutex_t LOCK_binlog_end_pos;    // 结束位置保护锁
  mysql_cond_t update_cond;             // 更新条件变量
  
  // 组提交相关
  Stage_manager stage_manager;          // 阶段管理器
  mysql_mutex_t LOCK_commit;            // 提交锁
  mysql_cond_t done_commit;             // 提交完成条件变量
  
  // GTID 相关
  Checkable_rwlock *global_tsid_lock;   // 全局 TSID 读写锁
  Gtid_set *previous_gtids_logged;      // 之前记录的 GTID 集合
  
public:
  // 构造和析构
  MYSQL_BIN_LOG(uint *sync_period);
  ~MYSQL_BIN_LOG();
  
  // 日志管理
  bool open(const char *log_name, const char *new_name,
            ulong max_size, bool null_created_arg,
            bool need_lock_index, bool need_sid_lock,
            Format_description_log_event *extra_description_event);
  bool close(uint flags, bool need_lock_log, bool need_lock_index);
  
  // 事件写入
  bool write_event(Log_event *event_info);
  int write_cache(THD *thd, class binlog_cache_data *cache_data);
  
  // 组提交核心函数
  int ordered_commit(THD *thd, bool all, bool skip_commit = false);
  
  // 日志轮转
  bool rotate(bool force_rotate, bool *check_purge = nullptr);
  bool rotate_and_purge(bool force_rotate);
  
  // 清理和恢复
  int purge_logs(const char *to_log, bool included);
  int purge_logs_before_date(time_t purge_time);
  int recover(Log_info *linfo, const char *last_log_name, IO_CACHE *first_log,
              Format_description_log_event *fdle, bool do_xa_recovery);
  
  // 位置管理
  my_off_t get_binlog_end_pos() const {
    return binlog_end_pos.load(std::memory_order_acquire);
  }
  void set_binlog_end_pos(my_off_t pos) {
    binlog_end_pos.store(pos, std::memory_order_release);
  }
};

关键字段说明

字段 类型 功能说明
log_file IO_CACHE 当前活跃的二进制日志文件缓存
binlog_end_pos std::atomic<my_off_t> 二进制日志当前结束位置(原子操作)
stage_manager Stage_manager 管理组提交的多个阶段
global_tsid_lock Checkable_rwlock * 保护 GTID 相关数据结构
LOCK_log mysql_mutex_t 保护日志文件写入的互斥锁

3.2 事件写入核心函数

函数签名

bool MYSQL_BIN_LOG::write_event(Log_event *event_info);

功能说明

  • 将日志事件写入二进制日志
  • 处理事务缓存和语句缓存
  • 支持行级和语句级复制格式

核心代码

bool MYSQL_BIN_LOG::write_event(Log_event *event_info) {
  DBUG_TRACE;
  bool error = true;
  bool should_rotate = false;
  THD *thd = event_info->thd;
  
  // 1. 检查日志是否打开
  if (likely(is_open())) {
    // 检查过滤条件
    const char *local_db = event_info->get_db();
    if (!(thd->variables.option_bits & OPTION_BIN_LOG) ||
        (thd->lex->sql_command != SQLCOM_ROLLBACK_TO_SAVEPOINT &&
         thd->lex->sql_command != SQLCOM_SAVEPOINT &&
         (!event_info->is_no_filter_event() &&
          !binlog_filter->db_ok(local_db))))
      return false;
    
    // 2. 确定使用的缓存类型
    assert(event_info->is_using_trans_cache() ||
           event_info->is_using_stmt_cache());
    
    if (binlog_start_trans_and_stmt(thd, event_info)) return error;
    
    const bool is_trans_cache = event_info->is_using_trans_cache();
    binlog_cache_mngr *const cache_mngr = thd_get_cache_mngr(thd);
    binlog_cache_data *const cache_data =
        cache_mngr->get_binlog_cache_data(is_trans_cache);
    
    DBUG_PRINT("info", ("event type: %d", event_info->get_type_code()));
    
    // 3. 写入运行环境事件(非行格式时)
    if (!thd->is_current_stmt_binlog_format_row()) {
      if (thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt) {
        Intvar_log_event e(thd, INSERT_ID_EVENT,
                          thd->first_successful_insert_id_in_prev_stmt_for_binlog,
                          event_info->get_cache_type(),
                          event_info->get_cache_type());
        if (cache_data->write_event(thd, &e)) return error;
      }
      
      if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0) {
        // 写入自增值事件
        for (auto &interval : thd->auto_inc_intervals_in_cur_stmt_for_binlog) {
          Intvar_log_event e(thd, INSERT_ID_EVENT, interval.minimum(),
                            event_info->get_cache_type(),
                            event_info->get_cache_type());
          if (cache_data->write_event(thd, &e)) return error;
        }
      }
      
      if (thd->rand_used) {
        Rand_log_event e(thd, thd->rand_saved_seed1, thd->rand_saved_seed2,
                        event_info->get_cache_type(),
                        event_info->get_cache_type());
        if (cache_data->write_event(thd, &e)) return error;
      }
      
      if (thd->user_var_events.elements) {
        // 写入用户变量事件
        for (uint i = 0; i < thd->user_var_events.elements; i++) {
          user_var_entry *entry = *dynamic_element(&thd->user_var_events, i,
                                                   user_var_entry**);
          User_var_log_event e(thd, entry->name.str, entry->name.length,
                              entry->value, entry->length, entry->type,
                              entry->collation.collation,
                              event_info->get_cache_type(),
                              event_info->get_cache_type());
          if (cache_data->write_event(thd, &e)) return error;
        }
      }
    }
    
    // 4. 写入主事件
    if (cache_data->write_event(thd, event_info)) return error;
    
    // 5. 检查是否需要轮转日志
    if (cache_data->get_byte_position() > max_binlog_cache_size) {
      my_error(ER_TRANS_CACHE_FULL, MYF(MY_WME));
      return error;
    }
    
    error = false;
  }
  
  return error;
}

3.3 组提交机制

函数签名

int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit);

功能说明

  • 实现高性能的组提交机制
  • 分为四个阶段:预提交、刷盘、同步、提交
  • 支持并行提交以提高吞吐量

核心代码

int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) {
  DBUG_TRACE;
  int flush_error = 0, sync_error = 0, commit_error = 0;
  my_off_t total_bytes = 0;
  bool do_rotate = false;
  
  // 阶段 0:从服务器提交顺序控制
  if (Commit_order_manager::wait_for_its_turn(thd, all)) {
    return RESULT_ABORTED;
  }
  
  // 阶段 1:刷盘阶段
  if (change_stage(thd, Stage_manager::FLUSH_STAGE, thd, nullptr, &LOCK_log)) {
    DBUG_PRINT("return", ("Thread killed or stopped"));
    return finish_commit(thd);
  }
  
  THD *wait_queue = nullptr;
  flush_error = process_flush_stage_queue(&total_bytes, &wait_queue);
  
  my_off_t flush_end_pos = 0;
  if (!flush_error) {
    flush_end_pos = my_b_tell(&log_file);
    set_binlog_end_pos(flush_end_pos);
    process_after_flush_stage_queue(total_bytes, flush_end_pos, &wait_queue);
  }
  
  // 阶段 2:同步阶段
  if (change_stage(thd, Stage_manager::SYNC_STAGE, wait_queue, &LOCK_log, 
                   &LOCK_sync)) {
    return finish_commit(thd);
  }
  
  if (!flush_error && !sync_error) {
    sync_error = process_sync_stage_queue(&wait_queue, &do_rotate);
  }
  
  // 阶段 3:提交阶段
  if (change_stage(thd, Stage_manager::COMMIT_STAGE, wait_queue, &LOCK_sync,
                   &LOCK_commit)) {
    return finish_commit(thd);
  }
  
  if (!flush_error && !sync_error && !skip_commit) {
    commit_error = process_commit_stage_queue(thd, wait_queue);
  }
  
  // 完成提交并清理
  leave_mutex_before_commit_stage(&LOCK_commit);
  
  if (do_rotate) {
    // 如果需要轮转日志,在这里执行
    bool check_purge = false;
    rotate(true, &check_purge);
  }
  
  // 通知从服务器提交顺序管理器
  Commit_order_manager::finish_one(thd);
  
  return flush_error || sync_error || commit_error ? RESULT_INCONSISTENT : 0;
}

组提交各阶段详解

阶段 1:刷盘阶段 (FLUSH_STAGE)

int MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var, 
                                             THD **out_queue_var) {
  mysql_mutex_assert_owner(&LOCK_log);
  my_off_t total_bytes = 0;
  int flush_error = 1;
  THD *first_seen = nullptr;
  
  // 1. 生成 GTID
  if (assign_gtids_to_flush_group(first_seen)) {
    return flush_error;
  }
  
  // 2. 将每个线程的缓存写入日志文件
  for (THD *head = first_seen; head; head = head->next_to_commit) {
    if (flush_thread_caches(head)) {
      return flush_error;
    }
    total_bytes += head->get_transaction()->m_pending_row_log_events_size;
  }
  
  // 3. 刷新存储引擎日志
  if (flush_error = ha_flush_logs()) {
    return flush_error;
  }
  
  // 4. 同步二进制日志到磁盘
  if (sync_binlog_file(false)) {
    return flush_error;
  }
  
  *total_bytes_var = total_bytes;
  *out_queue_var = first_seen;
  return 0;
}

阶段 2:同步阶段 (SYNC_STAGE)

int MYSQL_BIN_LOG::process_sync_stage_queue(THD **out_queue_var, 
                                            bool *do_rotate) {
  mysql_mutex_assert_owner(&LOCK_sync);
  
  if (get_sync_period() && 
      ++sync_counter >= get_sync_period()) {
    sync_counter = 0;
    
    // 同步二进制日志文件
    if (mysql_file_sync(log_file.file, MYF(MY_WME))) {
      return 1;
    }
    
    // 通知 dump 线程可以读取到当前位置
    update_binlog_end_pos();
  }
  
  // 检查是否需要轮转日志
  *do_rotate = (my_b_tell(&log_file) >= (my_off_t)max_size);
  
  return 0;
}

阶段 3:提交阶段 (COMMIT_STAGE)

int MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *queue) {
  mysql_mutex_assert_owner(&LOCK_commit);
  
  // 如果配置为无序提交,让每个线程自己提交
  if (!binlog_order_commits) {
    mysql_mutex_unlock(&LOCK_commit);
    
    for (THD *head = queue; head; head = head->next_to_commit) {
      if (head->get_transaction()->m_flags.commit_low) {
        head->get_transaction()->m_flags.commit_low = false;
        ha_commit_low(head, true);
      }
    }
    return 0;
  }
  
  // 有序提交:leader 为所有线程执行提交
  for (THD *head = queue; head; head = head->next_to_commit) {
    DBUG_PRINT("debug", ("Thread ID: %u", head->thread_id()));
    
    // 1. 调用 after_sync 钩子
    RUN_HOOK(transaction, after_sync, (head, all));
    
    // 2. 更新依赖跟踪器
    update_max_committed(head);
    
    // 3. 调用存储引擎提交
    if (head->get_transaction()->m_flags.commit_low) {
      head->get_transaction()->m_flags.commit_low = false;
      if (ha_commit_low(head, true)) {
        return 1;
      }
    }
    
    // 4. 调用 after_commit 钩子
    RUN_HOOK(transaction, after_commit, (head, all));
    
    // 5. 更新 GTID
    gtid_state->update_on_commit(head);
    
    // 6. 减少准备事务计数
    dec_prep_xids(head);
  }
  
  return 0;
}

四、复制协议 API 详解

4.1 主服务器端 API

4.1.1 COM_BINLOG_DUMP 处理

函数签名

bool com_binlog_dump(THD *thd, char *packet, size_t packet_length);

功能说明

  • 处理从服务器的二进制日志请求
  • 解析请求参数并启动 dump 线程
  • 支持基于位置的复制

核心代码

bool com_binlog_dump(THD *thd, char *packet, size_t packet_length) {
  DBUG_TRACE;
  ulong pos;
  ushort flags = 0;
  uint32 slave_server_id;
  
  // 1. 检查权限
  if (check_global_access(thd, REPL_SLAVE_ACL)) {
    return true;
  }
  
  // 2. 解析数据包
  const uchar *packet_position = (const uchar *)packet;
  
  if (packet_length < BINLOG_DUMP_NON_GTID_HEADER_SIZE) {
    my_error(ER_MALFORMED_PACKET, MYF(0));
    return true;
  }
  
  pos = uint4korr(packet_position);
  packet_position += 4;
  
  flags = uint2korr(packet_position);
  packet_position += 2;
  
  slave_server_id = uint4korr(packet_position);
  packet_position += 4;
  
  // 3. 获取日志文件名
  char *log_ident = (char *)packet_position;
  size_t log_ident_len = packet_length - BINLOG_DUMP_NON_GTID_HEADER_SIZE;
  
  // 确保字符串以 null 结尾
  if (log_ident_len == 0 || log_ident[log_ident_len - 1] != 0) {
    my_error(ER_MALFORMED_PACKET, MYF(0));
    return true;
  }
  
  // 4. 设置从服务器 ID
  thd->server_id = slave_server_id;
  
  // 5. 启动二进制日志发送
  kill_zombie_dump_threads(thd);
  
  general_log_print(thd, COM_BINLOG_DUMP, 
                   "Log: '%s' Pos: %lu", log_ident, pos);
  
  mysql_binlog_send(thd, log_ident, (my_off_t)pos, nullptr, flags);
  
  return false;
}

4.1.2 COM_BINLOG_DUMP_GTID 处理

函数签名

bool com_binlog_dump_gtid(THD *thd, char *packet, size_t packet_length);

功能说明

  • 处理基于 GTID 的复制请求
  • 解析 GTID 集合并确定起始位置
  • 支持更高级的复制功能

核心代码

bool com_binlog_dump_gtid(THD *thd, char *packet, size_t packet_length) {
  DBUG_TRACE;
  
  // 1. 检查权限
  if (check_global_access(thd, REPL_SLAVE_ACL)) {
    return true;
  }
  
  // 2. 解析数据包头
  const uchar *packet_position = (const uchar *)packet;
  
  if (packet_length < BINLOG_DUMP_GTID_HEADER_SIZE) {
    my_error(ER_MALFORMED_PACKET, MYF(0));
    return true;
  }
  
  ushort flags = uint2korr(packet_position);
  packet_position += 2;
  
  uint32 slave_server_id = uint4korr(packet_position);
  packet_position += 4;
  
  uint32 log_name_info_size = uint4korr(packet_position);
  packet_position += 4;
  
  char *log_file_name = (char *)packet_position;
  packet_position += log_name_info_size;
  
  my_off_t pos = uint8korr(packet_position);
  packet_position += 8;
  
  uint32 data_size = uint4korr(packet_position);
  packet_position += 4;
  
  // 3. 解析 GTID 集合
  if (packet_length < BINLOG_DUMP_GTID_HEADER_SIZE + log_name_info_size + data_size) {
    my_error(ER_MALFORMED_PACKET, MYF(0));
    return true;
  }
  
  Gtid_set *slave_gtid_executed = nullptr;
  
  if (data_size > 0) {
    slave_gtid_executed = new Gtid_set(global_tsid_map);
    
    if (slave_gtid_executed->add_gtid_encoding(packet_position, data_size) != 
        RETURN_STATUS_OK) {
      delete slave_gtid_executed;
      my_error(ER_MALFORMED_GTID_SET_ENCODING, MYF(0));
      return true;
    }
  }
  
  // 4. 设置从服务器信息
  thd->server_id = slave_server_id;
  
  // 5. 启动 GTID 复制
  kill_zombie_dump_threads(thd);
  
  general_log_print(thd, COM_BINLOG_DUMP_GTID,
                   "Log: '%s' Pos: %llu GTID data_size: %u",
                   log_file_name, pos, data_size);
  
  mysql_binlog_send(thd, log_file_name, pos, slave_gtid_executed, flags);
  
  delete slave_gtid_executed;
  return false;
}

4.1.3 二进制日志发送核心函数

函数签名

void mysql_binlog_send(THD *thd, char *log_ident, my_off_t pos,
                       Gtid_set *gtid_set, uint32 flags);

功能说明

  • 二进制日志发送的核心实现
  • 支持基于位置和基于 GTID 的复制
  • 处理日志轮转和错误恢复

核心代码

void mysql_binlog_send(THD *thd, char *log_ident, my_off_t pos,
                       Gtid_set *gtid_set, uint32 flags) {
  DBUG_TRACE;
  LOG_INFO linfo;
  char *log_file_name = linfo.log_file_name;
  File file = -1;
  String *packet = &thd->packet;
  int error;
  const char *errmsg = "Unknown error";
  NET *net = &thd->net;
  
  Binlog_sender sender(thd, log_ident, pos, gtid_set, flags);
  
  // 1. 初始化发送器
  if (sender.init()) {
    goto err;
  }
  
  // 2. 检查二进制日志是否存在
  if (mysql_bin_log.is_open()) {
    if (mysql_bin_log.find_log_pos(&linfo, log_ident, 1)) {
      errmsg = "Could not find first log file name in binary log index file";
      goto err;
    }
  } else {
    errmsg = "Binary log is not open";
    goto err;
  }
  
  // 3. 注册从服务器信息
  if (register_slave_on_master(thd)) {
    goto err;
  }
  
  // 4. 开始发送循环
  while (!thd->killed) {
    bool is_active_binlog = false;
    
    // 检查是否为当前活跃的日志文件
    mysql_mutex_lock(mysql_bin_log.get_log_lock());
    is_active_binlog = mysql_bin_log.is_active(log_file_name);
    mysql_mutex_unlock(mysql_bin_log.get_log_lock());
    
    // 5. 打开日志文件
    if ((file = open_binlog_file(&linfo, errmsg)) < 0) {
      goto err;
    }
    
    // 6. 发送事件循环
    while (!thd->killed) {
      Log_event *ev = nullptr;
      
      // 读取事件
      error = Log_event::read_log_event(&log_file, packet, 
                                       sender.get_fdle());
      
      if (error) {
        if (error == LOG_READ_EOF) {
          if (is_active_binlog) {
            // 活跃日志文件,等待新事件
            if (sender.wait_new_events(pos)) {
              break;
            }
            continue;
          } else {
            // 非活跃日志文件,切换到下一个
            break;
          }
        } else {
          errmsg = "Error reading log event";
          goto err;
        }
      }
      
      // 7. 解析事件
      ev = Log_event::read_log_event((const char *)packet->ptr(),
                                    packet->length(),
                                    &errmsg, sender.get_fdle());
      
      if (!ev) {
        goto err;
      }
      
      // 8. 过滤 GTID 事件(如果需要)
      if (gtid_set && sender.skip_event(ev)) {
        delete ev;
        pos = uint8korr(packet->ptr() + EVENT_LEN_OFFSET) + 
              my_b_tell(&log_file);
        continue;
      }
      
      // 9. 发送事件到从服务器
      pos = ev->log_pos;
      
      if (sender.send_event(ev)) {
        delete ev;
        goto err;
      }
      
      delete ev;
      
      // 10. 检查心跳
      if (sender.check_heartbeat()) {
        break;
      }
    }
    
    // 11. 关闭当前文件,尝试下一个
    end_io_cache(&log_file);
    mysql_file_close(file, MYF(MY_WME));
    file = -1;
    
    if (mysql_bin_log.find_next_log(&linfo, 1)) {
      errmsg = "Could not find next log file in binary log index";
      goto err;
    }
    
    pos = BIN_LOG_HEADER_SIZE;  // 跳过新文件的头部
  }
  
err:
  // 清理资源
  if (file >= 0) {
    end_io_cache(&log_file);
    mysql_file_close(file, MYF(MY_WME));
  }
  
  thd_proc_info(thd, "Waiting to finalize termination");
  
  // 发送错误信息给从服务器
  my_message(my_errno(), errmsg, MYF(0));
}

4.2 从服务器端 API

4.2.1 Master_info 结构

数据结构

class Master_info : public Rpl_info {
private:
  // 连接信息
  char host[HOSTNAME_LENGTH + 1];         // 主服务器主机名
  char user[USERNAME_LENGTH + 1];         // 复制用户名
  char password[MAX_PASSWORD_LENGTH + 1]; // 复制密码
  uint port;                              // 主服务器端口
  uint connect_retry;                     // 连接重试间隔
  
  // 复制状态
  char master_log_name[FN_REFLEN];        // 主服务器日志文件名
  my_off_t master_log_pos;                // 主服务器日志位置
  
  // IO 线程管理
  pthread_t io_thread;                    // IO 线程句柄
  mysql_mutex_t run_lock;                 // 运行状态锁
  mysql_cond_t start_cond;                // 启动条件变量
  mysql_cond_t stop_cond;                 // 停止条件变量
  enum enum_thread_state io_thread_state; // IO 线程状态
  
  // 网络连接
  MYSQL *mysql;                           // 到主服务器的连接
  bool is_stopping;                       // 是否正在停止
  
  // GTID 相关
  Gtid_set *gtid_set;                     // 接收到的 GTID 集合
  mysql_mutex_t tsid_lock;                // TSID 保护锁
  
public:
  Master_info(const char *channel);
  ~Master_info();
  
  // 连接管理
  int connect_to_master();
  void disconnect_from_master();
  
  // IO 线程控制
  int start_io_thread();
  int stop_io_thread(long timeout);
  bool is_io_thread_running() const;
  
  // 状态管理
  int flush_info(bool force = false);
  int read_info(Rpl_info_handler *from);
  int write_info(Rpl_info_handler *to);
  
  // 位置管理
  void set_master_log_pos(my_off_t pos) { master_log_pos = pos; }
  my_off_t get_master_log_pos() const { return master_log_pos; }
  void set_master_log_name(const char *name) {
    strmake(master_log_name, name, sizeof(master_log_name) - 1);
  }
  const char *get_master_log_name() const { return master_log_name; }
};

4.2.2 IO 线程启动

函数签名

int Master_info::start_io_thread();

功能说明

  • 启动 IO 线程连接主服务器
  • 设置复制参数和状态
  • 创建线程并初始化连接

核心代码

int Master_info::start_io_thread() {
  DBUG_TRACE;
  int error = 0;
  
  mysql_mutex_lock(&run_lock);
  
  // 1. 检查线程是否已经运行
  if (io_thread_state != THREAD_NOT_CREATED) {
    mysql_mutex_unlock(&run_lock);
    return 1;  // 线程已经存在
  }
  
  // 2. 重置错误信息
  clear_error();
  
  // 3. 设置线程状态
  io_thread_state = THREAD_CREATED;
  
  // 4. 创建 IO 线程
  if (mysql_thread_create(key_thread_slave_io, 
                         &io_thread, &connection_attrib,
                         handle_slave_io, (void *)this)) {
    sql_print_error("Can't create slave IO thread");
    io_thread_state = THREAD_NOT_CREATED;
    error = 1;
    goto err;
  }
  
  // 5. 等待线程启动完成
  while (io_thread_state == THREAD_CREATED) {
    mysql_cond_wait(&start_cond, &run_lock);
  }
  
  if (io_thread_state != THREAD_RUNNING) {
    error = 1;
  }
  
err:
  mysql_mutex_unlock(&run_lock);
  return error;
}

4.2.3 IO 线程主函数

函数签名

extern "C" void *handle_slave_io(void *arg);

功能说明

  • IO 线程的主要执行函数
  • 连接主服务器并接收二进制日志
  • 写入中继日志供 SQL 线程读取

核心代码

extern "C" void *handle_slave_io(void *arg) {
  THD *thd;  // 线程上下文
  MYSQL *mysql;
  Master_info *mi = (Master_info *)arg;
  Relay_log_info *rli = mi->rli;
  char llbuff[22];
  
  // 1. 初始化线程环境
  my_thread_init();
  DBUG_TRACE;
  
  thd = new THD;
  thd->thread_stack = (char *)&thd;
  mysql_mutex_lock(&mi->run_lock);
  
  // 2. 设置线程状态
  mi->io_thread_state = THREAD_RUNNING;
  mysql_cond_broadcast(&mi->start_cond);
  mysql_mutex_unlock(&mi->run_lock);
  
  // 3. 连接到主服务器
  mysql = mysql_init(nullptr);
  if (!mysql) {
    sql_print_error("Slave I/O thread: error in mysql_init()");
    goto err;
  }
  
  // 设置连接参数
  mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, 
               (char *)&slave_net_timeout);
  mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, 
               (char *)&slave_net_timeout);
  
  // 4. 建立连接
  if (!mysql_real_connect(mysql, mi->host, mi->user, mi->password,
                         0, mi->port, 0, 0)) {
    sql_print_error("Slave I/O thread couldn't connect to master '%s@%s:%d': %s",
                   mi->user, mi->host, mi->port, mysql_error(mysql));
    goto err;
  }
  
  // 5. 请求二进制日志
  if (request_dump(thd, mysql, mi)) {
    sql_print_error("Slave I/O thread: failed to request binlog dump");
    goto err;
  }
  
  // 6. 接收和处理事件循环  
  while (!io_slave_killed(thd, mi)) {
    ulong event_len;
    
    // 读取事件长度
    if (read_event_header_and_set_compression(mysql, &event_len)) {
      sql_print_error("Slave I/O thread: error reading event header");
      break;
    }
    
    // 分配事件缓冲区
    if (event_len > current_thd->variables.max_allowed_packet) {
      sql_print_error("Slave I/O thread: event too large (%lu bytes)", 
                     event_len);
      break;
    }
    
    uchar *event_buf = (uchar *)my_malloc(key_memory_binlog_event_buffer,
                                         event_len, MYF(MY_WME));
    if (!event_buf) {
      sql_print_error("Slave I/O thread: out of memory");
      break;
    }
    
    // 读取完整事件
    if (read_event(mysql, event_buf, event_len)) {
      my_free(event_buf);
      sql_print_error("Slave I/O thread: error reading event");
      break;
    }
    
    // 7. 写入中继日志
    if (queue_event(mi, (char *)event_buf, event_len)) {
      my_free(event_buf);
      sql_print_error("Slave I/O thread: error queuing event");
      break;
    }
    
    my_free(event_buf);
    
    // 8. 更新复制位置
    if (flush_master_info(mi, true)) {
      sql_print_error("Slave I/O thread: error flushing master info");
      break;
    }
  }
  
err:
  // 9. 清理和关闭
  if (mysql) {
    mysql_close(mysql);
  }
  
  mysql_mutex_lock(&mi->run_lock);
  mi->io_thread_state = THREAD_NOT_CREATED;
  mysql_cond_broadcast(&mi->stop_cond);
  mysql_mutex_unlock(&mi->run_lock);
  
  delete thd;
  my_thread_end();
  pthread_exit(0);
  return 0;
}

4.2.4 事件队列函数

函数签名

int queue_event(Master_info *mi, const char *buf, ulong event_len);

功能说明

  • 将从主服务器接收的事件写入中继日志
  • 处理日志轮转和空间管理
  • 更新复制位置信息

核心代码

int queue_event(Master_info *mi, const char *buf, ulong event_len) {
  int error = 0;
  ulong inc_pos;
  Relay_log_info *rli = mi->rli;
  mysql_mutex_t *log_lock = rli->relay_log.get_log_lock();
  
  // 1. 解析事件头
  uchar *event_buf = (uchar *)buf;
  uint event_type = event_buf[EVENT_TYPE_OFFSET];
  
  if (event_len < LOG_EVENT_MINIMAL_HEADER_LEN) {
    mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
              "Event too short");
    return 1;
  }
  
  // 2. 获取日志锁
  mysql_mutex_lock(&mi->data_lock);
  mysql_mutex_lock(log_lock);
  
  // 3. 检查中继日志空间
  if (rli->log_space_limit && 
      rli->log_space_total >= rli->log_space_limit) {
    // 空间不足,等待 SQL 线程清理
    mysql_mutex_unlock(log_lock);
    mysql_mutex_unlock(&mi->data_lock);
    
    sql_print_information("Slave I/O thread: relay log space limit exceeded, "
                         "waiting for SQL thread to free space");
    
    // 等待空间释放
    mysql_mutex_lock(&rli->log_space_lock);
    while (rli->log_space_total >= rli->log_space_limit && !mi->abort_slave) {
      mysql_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
    }
    mysql_mutex_unlock(&rli->log_space_lock);
    
    if (mi->abort_slave) {
      return 1;
    }
    
    // 重新获取锁
    mysql_mutex_lock(&mi->data_lock);
    mysql_mutex_lock(log_lock);
  }
  
  // 4. 写入中继日志
  if (rli->relay_log.append_buffer(buf, event_len) != 0) {
    mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
              "Error writing relay log event");
    error = 1;
    goto err;
  }
  
  // 5. 更新日志空间统计
  rli->log_space_total += event_len;
  
  // 6. 检查是否需要轮转中继日志
  if (rli->relay_log.should_rotate()) {
    if (rli->relay_log.new_file_without_locking()) {
      mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
                "Error rotating relay log");
      error = 1;
      goto err;
    }
  }
  
  // 7. 更新主服务器位置信息
  if (event_type != mysql::binlog::event::HEARTBEAT_LOG_EVENT) {
    inc_pos = uint4korr(event_buf + LOG_POS_OFFSET);
    if (inc_pos > 0) { 
      mi->set_master_log_pos(inc_pos);
    }
    
    // 如果是 ROTATE 事件,更新日志文件名
    if (event_type == mysql::binlog::event::ROTATE_EVENT) {
      if (process_io_rotate(mi, event_buf)) {
        error = 1;
        goto err;
      }
    }
  }
  
  // 8. 刷新中继日志
  if (rli->relay_log.flush_and_sync()) {
    mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
              "Error flushing relay log");
    error = 1;
    goto err;
  }
  
err:
  mysql_mutex_unlock(log_lock);
  mysql_mutex_unlock(&mi->data_lock);
  
  return error;
}

4.3 SQL 线程和事件应用

4.3.1 Relay_log_info 结构

数据结构

class Relay_log_info : public Rpl_info {
private:
  // 中继日志信息
  char group_relay_log_name[FN_REFLEN];   // 当前组中继日志名
  my_off_t group_relay_log_pos;           // 当前组中继日志位置
  char event_relay_log_name[FN_REFLEN];   // 当前事件中继日志名  
  my_off_t event_relay_log_pos;           // 当前事件中继日志位置
  
  // 主服务器位置信息
  char group_master_log_name[FN_REFLEN];  // 组开始时的主日志名
  my_off_t group_master_log_pos;          // 组开始时的主日志位置
  
  // SQL 线程管理
  pthread_t sql_thread;                   // SQL 线程句柄
  mysql_mutex_t run_lock;                 // 运行状态锁
  mysql_cond_t start_cond;                // 启动条件变量
  mysql_cond_t stop_cond;                 // 停止条件变量  
  enum enum_thread_state sql_thread_state; // SQL 线程状态
  
  // 事件处理
  Log_event *cur_log_event;               // 当前处理的事件
  bool is_in_group;                       // 是否在事务组中
  
  // 多线程复制 (MTS)
  bool is_parallel_exec() const;          // 是否并行执行
  Slave_worker **workers;                 // Worker 线程数组
  Mts_submode *current_mts_submode;       // 当前 MTS 子模式
  
  // 日志空间管理
  mysql_mutex_t log_space_lock;           // 日志空间锁
  mysql_cond_t log_space_cond;            // 日志空间条件变量
  ulonglong log_space_total;              // 总日志空间
  ulonglong log_space_limit;              // 日志空间限制
  
public:
  Relay_log_info(const char *channel);
  ~Relay_log_info();
  
  // SQL 线程控制
  int start_sql_thread();
  int stop_sql_thread(long timeout);
  bool is_sql_thread_running() const;
  
  // 事件处理
  int exec_relay_log_event(THD *thd, Relay_log_info *rli);
  bool apply_event_and_update_pos(Log_event *ev, THD *thd);
  
  // 位置管理  
  void set_group_relay_log_pos(my_off_t pos) { group_relay_log_pos = pos; }
  my_off_t get_group_relay_log_pos() const { return group_relay_log_pos; }
  void set_group_master_log_pos(my_off_t pos) { group_master_log_pos = pos; }
  my_off_t get_group_master_log_pos() const { return group_master_log_pos; }
  
  // 并行复制
  int start_workers();
  int stop_workers();
  Slave_worker *get_worker(ulong i) const { return workers[i]; }
};

4.3.2 事件应用核心函数

函数签名

int Log_event::apply_event(Relay_log_info *rli);

功能说明

  • 日志事件应用的统一入口
  • 支持串行和并行执行模式
  • 处理事务边界和错误恢复

核心代码

int Log_event::apply_event(Relay_log_info *rli) {
  DBUG_TRACE;
  DBUG_PRINT("info", ("event_type=%s", get_type_str()));
  bool parallel = false;
  enum enum_mts_event_exec_mode actual_exec_mode = EVENT_EXEC_PARALLEL;
  THD *rli_thd = rli->info_thd;
  
  worker = rli;
  
  // 1. 检查是否为 MTS 恢复模式
  if (rli->is_mts_recovery()) {
    bool skip = bitmap_is_set(&rli->recovery_groups, rli->mts_recovery_index) &&
                (get_mts_execution_mode(rli->mts_group_status ==
                                       Relay_log_info::MTS_IN_GROUP) ==
                 EVENT_EXEC_PARALLEL);
    if (skip) {
      return 0;
    } else {
      int error = do_apply_event(rli);
      if (rli->is_processing_trx()) {
        // 识别 DDL 事务开始
        if (starts_group() &&
            get_type_code() == mysql::binlog::event::QUERY_EVENT) {
          rli->curr_group_seen_begin = true;
        }
        if (error == 0 &&
            (ends_group() ||
             (get_type_code() == mysql::binlog::event::QUERY_EVENT &&
              !rli->curr_group_seen_begin))) {
          rli->finished_processing();
          rli->curr_group_seen_begin = false;
        }
      }
      return error;
    }
  }
  
  // 2. 确定执行模式(串行或并行)
  if (!(parallel = rli->is_parallel_exec()) ||
      ((actual_exec_mode = get_mts_execution_mode(
            rli->mts_group_status == Relay_log_info::MTS_IN_GROUP)) !=
       EVENT_EXEC_PARALLEL)) {
    if (parallel) {
      // 并行模式下的串行事件
      Slave_worker *worker = rli->current_mts_submode->get_least_occupied_worker(
          rli, rli->workers, this);
      if (worker == nullptr) {
        return ER_MTS_CANT_PARALLEL;
      }
      
      // 将事件分配给 worker
      if (worker->assign_event(this)) {
        return ER_MTS_CANT_PARALLEL;
      }
      
      worker->slave_worker_exec_event(this);
      return 0;
    } else {
      // 3. 串行执行
      return do_apply_event(rli);
    }
  } else {
    // 4. 并行执行
    return slave_worker_exec_event(rli);
  }
}

事件具体应用函数

int Log_event::do_apply_event(Relay_log_info const *rli) {
  DBUG_TRACE;
  int error = 0;
  THD *thd = rli->info_thd;
  
  // 1. 设置事件上下文
  thd->server_id = server_id;
  thd->unmasked_server_id = common_header->unmasked_server_id;
  thd->set_time();
  thd->lex->set_current_query_block(nullptr);
  
  if (!common_header->when.tv_sec) {
    my_micro_time_to_timeval(my_micro_time(), &common_header->when);
  }
  
  thd->start_time = common_header->when;
  thd->query_start_usec_used = true;
  
  // 2. 根据事件类型执行相应操作
  switch (get_type_code()) {
    case mysql::binlog::event::QUERY_EVENT:
      error = ((Query_log_event *)this)->do_apply_event(rli, query, q_len);
      break;
      
    case mysql::binlog::event::WRITE_ROWS_EVENT:
    case mysql::binlog::event::UPDATE_ROWS_EVENT:
    case mysql::binlog::event::DELETE_ROWS_EVENT:
      error = ((Rows_log_event *)this)->do_apply_event(rli);
      break;
      
    case mysql::binlog::event::TABLE_MAP_EVENT:
      error = ((Table_map_log_event *)this)->do_apply_event(rli);
      break;
      
    case mysql::binlog::event::ROTATE_EVENT:
      error = ((Rotate_log_event *)this)->do_apply_event(rli);
      break;
      
    case mysql::binlog::event::GTID_LOG_EVENT:
      error = ((Gtid_log_event *)this)->do_apply_event(rli);
      break;
      
    default:
      DBUG_PRINT("info", ("default: do nothing for event type %d",
                         get_type_code()));
  }
  
  // 3. 错误处理
  if (error) {
    DBUG_PRINT("info", ("apply_event error = %d", error));
    
    if (thd->is_slave_error || thd->is_fatal_error) {
      /*
        The query was aborted because of slave error or fatal error.
        The error is already reported.
      */
    } else {
      /*
        Other errors, not reported yet, we need to report.
      */
      if (thd->is_error()) {
        char const *const errmsg = thd->get_stmt_da()->message_text();
        
        DBUG_PRINT("info", 
                  ("thd->get_stmt_da()->get_sql_errno()=%d; "
                   "rli->last_error.number=%d", 
                   thd->get_stmt_da()->mysql_errno(),
                   rli->last_error().number));
        
        if (strcmp(errmsg, "")) {
          rli->report(ERROR_LEVEL, thd->get_stmt_da()->mysql_errno(),
                     "Error '%s' on query. Default database: '%s'. "
                     "Query: '%s'",
                     errmsg,
                     thd->db().str ? thd->db().str : "",
                     thd->query().str);
        }
      }
    }
  }
  
  return error;
}

五、GTID 管理 API 详解

5.1 GTID 核心数据结构

Gtid 结构

struct Gtid {
  /// Transaction Source Identifier
  rpl_tsid tsid;
  /// Transaction number
  rpl_gno gno;
  
  // 构造函数
  Gtid() : gno(0) {}
  Gtid(const rpl_tsid &_tsid, rpl_gno _gno) : tsid(_tsid), gno(_gno) {}
  
  /// Set both components to 0.
  void clear() {
    tsid.clear();
    gno = 0;
  }
  
  /// Return true if this Gtid is empty (both components are 0).
  bool is_empty() const { return gno == 0 && tsid.is_empty(); }
  
  /// Set the GTID to the given TSID and GNO.
  void set(const rpl_tsid &_tsid, rpl_gno _gno) {
    tsid = _tsid;
    gno = _gno;
  }
  
  /// Compare two GTIDs
  int compare(const Gtid &other) const {
    int ret = tsid.compare(other.tsid);
    if (ret == 0) {
      ret = gno < other.gno ? -1 : gno > other.gno ? 1 : 0;
    }
    return ret;
  }
  
  /// Equality operator
  bool operator==(const Gtid &other) const {
    return tsid == other.tsid && gno == other.gno;
  }
};

Gtid_set 类

class Gtid_set {
private:
  /// Map from TSID to intervals
  std::map<rpl_tsid, Interval_list> m_intervals;
  
  /// TSID map to use for this Gtid_set
  Tsid_map *tsid_map;
  
  /// Mutex to protect concurrent access
  mutable mysql_mutex_t m_mutex;
  
public:
  Gtid_set(Tsid_map *_tsid_map);
  ~Gtid_set();
  
  /// Add a GTID to this set
  enum_return_status add_gtid(const Gtid &gtid);
  
  /// Remove a GTID from this set  
  enum_return_status remove_gtid(const Gtid &gtid);
  
  /// Check if a GTID is contained in this set
  bool contains_gtid(const Gtid &gtid) const;
  
  /// Add all GTIDs from another set
  enum_return_status add_gtid_set(const Gtid_set *other);
  
  /// Remove all GTIDs that exist in another set
  enum_return_status remove_gtid_set(const Gtid_set *other);
  
  /// Check if this set is a subset of another set
  bool is_subset(const Gtid_set *other) const;
  
  /// Check if this set intersects with another set
  bool is_intersection_nonempty(const Gtid_set *other) const;
  
  /// Get string representation
  int to_string(char **buf, bool need_lock = true) const;
  
  /// Parse string representation
  enum_return_status add_gtid_text(const char *text);
  
  /// Encode to binary format
  int get_encoded_length() const;
  void encode(uchar *buf) const;
  
  /// Decode from binary format
  enum_return_status add_gtid_encoding(const uchar *encoded, size_t length);
  
  /// Clear all GTIDs
  void clear();
  
  /// Check if set is empty
  bool is_empty() const;
  
  /// Get next GTID after the given one
  enum_return_status get_next_gtid(const Gtid &gtid, Gtid &next) const;
};

5.2 GTID 生成和分配

函数签名

enum_return_status Gtid_state::generate_automatic_gtid(THD *thd, 
                                                       rpl_tsid *specified_tsid,
                                                       rpl_gno *specified_gno);

功能说明

  • 为事务自动生成 GTID
  • 处理手动指定和自动分配两种模式
  • 确保 GTID 的全局唯一性

核心代码

enum_return_status Gtid_state::generate_automatic_gtid(THD *thd,
                                                       rpl_tsid *specified_tsid,
                                                       rpl_gno *specified_gno) {
  DBUG_TRACE;
  enum_return_status ret = RETURN_STATUS_OK;
  rpl_tsid tsid;
  rpl_gno gno;
  
  global_tsid_lock->assert_some_wrlock();
  
  // 1. 确定 TSID
  if (specified_tsid != nullptr) {
    // 手动指定 TSID
    tsid = *specified_tsid;
  } else {
    // 使用服务器 UUID 作为 TSID
    tsid = server_uuid_tsid;
  }
  
  // 2. 确定 GNO
  if (specified_gno != nullptr) {
    // 手动指定 GNO
    gno = *specified_gno;
    
    // 检查是否已经存在
    if (executed_gtids.contains_gtid(Gtid(tsid, gno))) {
      my_error(ER_GTID_ALREADY_EXECUTED, MYF(0),
               tsid.to_string().c_str(), gno);
      ret = RETURN_STATUS_REPORTED_ERROR;
      goto end;
    }
    
    // 检查是否在 owned_gtids 中
    if (owned_gtids.contains_gtid(Gtid(tsid, gno))) {
      my_error(ER_GTID_OWNED_BY_ANOTHER_CLIENT, MYF(0),
               tsid.to_string().c_str(), gno);
      ret = RETURN_STATUS_REPORTED_ERROR;
      goto end;
    }
  } else {
    // 3. 自动分配 GNO
    rpl_gno next_gno = executed_gtids.get_next_available_gno(tsid);
    if (next_gno == 0) {
      my_error(ER_GNO_EXHAUSTED, MYF(0));
      ret = RETURN_STATUS_REPORTED_ERROR;
      goto end;
    }
    gno = next_gno;
  }
  
  // 4. 将 GTID 添加到 owned_gtids
  Gtid gtid(tsid, gno);
  if (owned_gtids.add_gtid(gtid) != RETURN_STATUS_OK) {
    my_error(ER_OUT_OF_RESOURCES, MYF(0));
    ret = RETURN_STATUS_REPORTED_ERROR;
    goto end;
  }
  
  // 5. 设置线程的 GTID
  thd->owned_gtid = gtid;
  
  DBUG_PRINT("info", ("generated gtid %s:%lld", 
                     tsid.to_string().c_str(), gno));
  
end:
  return ret;
}

5.3 GTID 事务提交

函数签名

void Gtid_state::update_on_commit(THD *thd);

功能说明

  • 事务提交时更新 GTID 状态
  • 将 GTID 从 owned_gtids 移动到 executed_gtids
  • 更新 gtid_executed 系统变量

核心代码

void Gtid_state::update_on_commit(THD *thd) {
  DBUG_TRACE;
  
  global_tsid_lock->assert_some_wrlock();
  
  // 1. 获取线程拥有的 GTID
  const Gtid &gtid = thd->owned_gtid;
  
  if (gtid.is_empty()) {
    DBUG_PRINT("info", ("thread does not own any gtid"));
    return;
  }
  
  DBUG_PRINT("info", ("updating on commit for gtid %s:%lld",
                     gtid.tsid.to_string().c_str(), gtid.gno));
  
  // 2. 从 owned_gtids 中移除
  if (owned_gtids.remove_gtid(gtid) != RETURN_STATUS_OK) {
    DBUG_ASSERT(0);  // 这不应该发生
  }
  
  // 3. 添加到 executed_gtids
  if (executed_gtids.add_gtid(gtid) != RETURN_STATUS_OK) {
    /*
      This can only happen if we run out of memory, which is not
      expected at this point.
    */
    DBUG_ASSERT(0);
  }
  
  // 4. 更新 gtid_executed 表(如果启用)
  if (opt_bin_log && gtid_table_persistor->save(thd, &gtid)) {
    /*
      If we fail to save the GTID to the table, we should still
      continue, as the GTID is in the binary log.
    */
    sql_print_error("Failed to save GTID %s:%lld to gtid_executed table",
                   gtid.tsid.to_string().c_str(), gtid.gno);
  }
  
  // 5. 清空线程的 owned_gtid
  thd->owned_gtid.clear();
  
  // 6. 更新统计信息
  if (gtid.tsid == server_uuid_tsid) {
    gtid_executed_compression_period_counter++;
    if (gtid_executed_compression_period_counter >= 
        gtid_executed_compression_period) {
      // 触发 GTID 压缩
      compress_gtid_table();
      gtid_executed_compression_period_counter = 0;
    }
  }
}

六、复制时序图

6.1 主从复制建立时序图

sequenceDiagram
    autonumber
    participant REPLICA as 从服务器
    participant MASTER as 主服务器
    participant BINLOG as 二进制日志
    participant RELAY as 中继日志
    
    Note over REPLICA, RELAY: 复制初始化阶段
    REPLICA->>REPLICA: START SLAVE 命令
    REPLICA->>REPLICA: 创建 Master_info
    REPLICA->>REPLICA: 启动 IO 线程
    
    Note over REPLICA, RELAY: IO 线程连接阶段
    REPLICA->>MASTER: TCP 连接 (3306)
    MASTER-->>REPLICA: 连接确认
    REPLICA->>MASTER: 用户认证
    MASTER-->>REPLICA: 认证成功
    
    Note over REPLICA, RELAY: 注册从服务器
    REPLICA->>MASTER: COM_REGISTER_SLAVE
    MASTER->>MASTER: 注册从服务器信息
    MASTER-->>REPLICA: 注册成功
    
    Note over REPLICA, RELAY: 请求二进制日志
    alt 基于位置的复制
        REPLICA->>MASTER: COM_BINLOG_DUMP(file, pos)
    else 基于 GTID 的复制  
        REPLICA->>MASTER: COM_BINLOG_DUMP_GTID(gtid_set)
    end
    
    Note over REPLICA, RELAY: Dump 线程启动
    MASTER->>MASTER: 启动 Dump 线程
    MASTER->>BINLOG: 读取二进制日志
    
    Note over REPLICA, RELAY: 事件传输循环
    loop 持续复制
        BINLOG->>MASTER: 读取日志事件
        MASTER->>REPLICA: 发送事件 (网络包)
        REPLICA->>RELAY: 写入中继日志
        REPLICA->>REPLICA: 更新 Master_info
        
        alt 心跳检测
            MASTER->>REPLICA: HEARTBEAT_EVENT
            REPLICA-->>MASTER: 确认接收
        end
        
        alt 日志轮转
            MASTER->>REPLICA: ROTATE_EVENT
            REPLICA->>REPLICA: 更新日志文件名
        end
    end
    
    Note over REPLICA, RELAY: SQL 线程处理
    REPLICA->>REPLICA: 启动 SQL 线程
    
    loop 事件应用
        REPLICA->>RELAY: 读取中继日志事件
        REPLICA->>REPLICA: 解析事件
        REPLICA->>REPLICA: 应用事件到本地
        REPLICA->>REPLICA: 更新 Relay_log_info
        
        alt 事务事件
            REPLICA->>REPLICA: BEGIN
            REPLICA->>REPLICA: DML 操作
            REPLICA->>REPLICA: COMMIT
        end
        
        alt DDL 事件
            REPLICA->>REPLICA: ALTER TABLE 等
        end
        
        alt GTID 事件
            REPLICA->>REPLICA: 更新 executed_gtids
        end
    end

6.2 组提交详细时序图

sequenceDiagram
    autonumber
    participant THD1 as 事务线程1
    participant THD2 as 事务线程2  
    participant THD3 as 事务线程3
    participant STAGE_MGR as 阶段管理器
    participant BINLOG as 二进制日志
    participant ENGINE as 存储引擎
    participant DISK as 磁盘
    
    Note over THD1, DISK: 阶段0:预提交排序
    THD1->>STAGE_MGR: 进入 FLUSH_STAGE 队列
    THD2->>STAGE_MGR: 进入 FLUSH_STAGE 队列  
    THD3->>STAGE_MGR: 进入 FLUSH_STAGE 队列
    
    Note over THD1, DISK: 阶段1:刷盘阶段 (Leader: THD1)
    STAGE_MGR->>THD1: 指定为 Leader
    THD1->>THD1: 为所有事务生成 GTID
    THD1->>BINLOG: 将 THD1 缓存写入日志
    THD1->>BINLOG: 将 THD2 缓存写入日志
    THD1->>BINLOG: 将 THD3 缓存写入日志
    THD1->>ENGINE: ha_flush_logs() 刷新引擎日志
    THD1->>BINLOG: 设置 binlog_end_pos
    
    Note over THD1, DISK: 阶段2:同步阶段 (Leader: THD1)
    THD1->>STAGE_MGR: 进入 SYNC_STAGE
    THD2->>STAGE_MGR: 等待 SYNC_STAGE
    THD3->>STAGE_MGR: 等待 SYNC_STAGE
    
    alt sync_binlog > 0
        THD1->>DISK: fsync() 同步二进制日志
        DISK-->>THD1: 同步完成
    end
    
    THD1->>THD1: 通知 Dump 线程可读取位置
    
    Note over THD1, DISK: 阶段3:提交阶段
    THD1->>STAGE_MGR: 进入 COMMIT_STAGE
    
    alt binlog_order_commits = ON
        Note over THD1, DISK: 有序提交 (Leader 执行)
        THD1->>ENGINE: ha_commit_low(THD1)
        THD1->>ENGINE: ha_commit_low(THD2)  
        THD1->>ENGINE: ha_commit_low(THD3)
        THD1->>THD1: 更新 GTID 状态
        THD1->>THD2: 唤醒等待线程
        THD1->>THD3: 唤醒等待线程
    else binlog_order_commits = OFF
        Note over THD1, DISK: 无序提交 (各自执行)
        par 并行提交
            THD1->>ENGINE: ha_commit_low(THD1)
        and
            THD2->>ENGINE: ha_commit_low(THD2)
        and  
            THD3->>ENGINE: ha_commit_low(THD3)
        end
    end
    
    Note over THD1, DISK: 完成提交
    THD1->>THD1: 减少准备事务计数
    THD2->>THD2: 减少准备事务计数
    THD3->>THD3: 减少准备事务计数
    
    alt 需要日志轮转
        THD1->>BINLOG: rotate() 轮转日志文件
        BINLOG->>DISK: 创建新的日志文件
    end

七、性能优化和监控

7.1 复制性能优化

二进制日志优化

# 组提交优化
binlog_group_commit_sync_delay = 0      # 组提交延迟 (微秒)
binlog_group_commit_sync_no_delay_count = 0  # 无延迟的事务数

# 同步策略
sync_binlog = 1                         # 每次提交同步 (安全)
sync_binlog = 0                         # 依赖操作系统 (性能)

# 缓存大小
binlog_cache_size = 32K                 # 事务缓存大小
binlog_stmt_cache_size = 32K            # 语句缓存大小
max_binlog_cache_size = 2G              # 最大事务缓存
max_binlog_stmt_cache_size = 2G         # 最大语句缓存

# 日志格式
binlog_format = ROW                     # 行格式 (推荐)
binlog_row_image = FULL                 # 完整行图像

复制优化配置

# 并行复制
replica_parallel_type = LOGICAL_CLOCK   # 逻辑时钟并行
replica_parallel_workers = 8            # Worker 线程数
replica_preserve_commit_order = ON      # 保持提交顺序

# 网络优化  
replica_net_timeout = 60                # 网络超时
replica_compressed_protocol = ON        # 启用压缩协议

# 中继日志
max_relay_log_size = 1G                 # 中继日志大小
relay_log_purge = ON                    # 自动清理中继日志
relay_log_space_limit = 10G             # 中继日志空间限制

# GTID 优化
gtid_executed_compression_period = 1000 # GTID 压缩周期

7.2 复制监控

复制状态监控

-- 查看主服务器状态
SHOW MASTER STATUS;

-- 查看从服务器状态  
SHOW REPLICA STATUS\G

-- 查看 GTID 执行状态
SELECT @@GLOBAL.gtid_executed;
SELECT @@GLOBAL.gtid_purged;

-- 查看复制延迟
SELECT 
    CHANNEL_NAME,
    SERVICE_STATE,
    LAST_ERROR_MESSAGE,
    LAST_ERROR_TIMESTAMP
FROM performance_schema.replication_connection_status;

SELECT
    CHANNEL_NAME, 
    SERVICE_STATE,
    COUNT_TRANSACTIONS_RETRIES,
    LAST_APPLIED_TRANSACTION,
    APPLYING_TRANSACTION  
FROM performance_schema.replication_applier_status_by_coordinator;

性能监控

-- 二进制日志统计
SHOW STATUS LIKE 'Binlog%';

-- 复制事件统计
SELECT * FROM performance_schema.events_transactions_summary_global_by_event_name 
WHERE EVENT_NAME LIKE '%replica%';

-- GTID 等待统计
SELECT * FROM performance_schema.events_waits_summary_global_by_event_name
WHERE EVENT_NAME LIKE '%gtid%';

八、总结

复制与 Binlog 层通过精心设计的 API 架构实现了高性能、高可靠的数据复制:

二进制日志管理

  • MYSQL_BIN_LOG 类提供完整的日志文件管理
  • 组提交机制显著提高并发事务的吞吐量
  • 支持多种同步策略和缓存优化

复制协议处理

  • 支持基于位置和基于 GTID 的两种复制模式
  • mysql_binlog_send 函数实现高效的事件传输
  • 完整的错误处理和故障恢复机制

GTID 管理系统

  • 全局唯一的事务标识符确保数据一致性
  • 支持自动生成和手动指定两种模式
  • 高效的集合操作和持久化存储

多线程复制

  • 支持基于库、表、事务的并行应用策略
  • Slave_worker 机制提高复制性能
  • 完善的依赖关系管理和提交顺序控制

该架构通过分层设计和模块化实现,为 MySQL 提供了企业级的数据复制解决方案,支持各种高可用和灾备场景。