PostgreSQL-05-Transaction-事务管理

模块概览

Transaction 模块实现 PostgreSQL 的 ACID 事务特性,核心是 MVCC(多版本并发控制)机制,通过事务 ID、快照和可见性规则实现高并发的事务隔离。

核心能力

  1. MVCC(多版本并发控制):读不阻塞写、写不阻塞读
  2. 事务 ID 管理:32 位事务 ID 分配与回卷处理
  3. 快照管理:事务可见性快照的创建与维护
  4. CLOG(事务状态日志):记录事务提交/回滚状态
  5. 子事务:SAVEPOINT 支持部分回滚
  6. 两阶段提交:分布式事务支持

架构图

flowchart TB
    subgraph Transaction["Transaction Manager"]
        TxnMgr[Transaction Manager] --> XID[XID Manager<br/>事务ID分配]
        TxnMgr --> Snapshot[Snapshot Manager<br/>快照管理]
        TxnMgr --> CLOG[CLOG<br/>事务状态日志]
        TxnMgr --> SubXact[SubTransaction<br/>子事务管理]
        
        XID --> NextXID[NextTransactionId<br/>下一个可用XID]
        XID --> OldestXID[OldestXmin<br/>最老活跃事务]
        
        Snapshot --> SnapshotData[SnapshotData<br/>xmin/xmax/xip]
        
        CLOG --> SLRU[SLRU<br/>简单LRU缓存]
        SLRU --> CLOGFile[(CLOG文件<br/>pg_xact/)]
        
        SubXact --> SubXIDCache[SubXID Cache<br/>子事务缓存]
    end
    
    subgraph MVCC["MVCC 可见性"]
        Tuple[Tuple Header<br/>元组头] --> TXmin[t_xmin<br/>创建事务ID]
        Tuple --> TXmax[t_xmax<br/>删除事务ID]
        Tuple --> Infomask[t_infomask<br/>标志位]
        
        Visibility[HeapTupleSatisfies<br/>可见性检查]
    end
    
    Snapshot --> Visibility
    CLOG --> Visibility
    Tuple --> Visibility

核心数据结构

1. TransactionId(事务 ID)

/* src/include/access/transam.h */
typedef uint32 TransactionId;

#define InvalidTransactionId        0
#define BootstrapTransactionId      1
#define FrozenTransactionId         2
#define FirstNormalTransactionId    3

特殊事务 ID

事务 ID 说明
InvalidTransactionId 0 无效事务
BootstrapTransactionId 1 初始化事务
FrozenTransactionId 2 冻结事务(永远可见)
FirstNormalTransactionId 3 第一个正常事务

事务 ID 回卷问题

  • 事务 ID 为 32 位无符号整数,范围:0 ~ 4,294,967,295(约 42 亿)
  • 用尽后回卷到 3(FirstNormalTransactionId)
  • VACUUM 通过"冻结"旧元组(t_xmin = FrozenTransactionId)避免回卷问题

2. SnapshotData(快照)

/* src/include/utils/snapshot.h */
typedef struct SnapshotData
{
    SnapshotType snapshot_type;     /* 快照类型 */
    
    TransactionId xmin;             /* 最小活跃事务ID */
    TransactionId xmax;             /* 下一个未分配的事务ID */
    TransactionId *xip;             /* 活跃事务ID数组 */
    uint32      xcnt;               /* 活跃事务数量 */
    
    TransactionId *subxip;          /* 活跃子事务ID数组 */
    int32       subxcnt;            /* 活跃子事务数量 */
    
    bool        suboverflowed;      /* 子事务是否溢出 */
    
    bool        takenDuringRecovery; /* 是否在恢复期间获取 */
    
    CommandId   curcid;             /* 当前命令ID(语句级可见性) */
    
    /* ... 更多字段 ... */
} SnapshotData;

快照类型

typedef enum SnapshotType
{
    SNAPSHOT_MVCC = 0,          /* 正常MVCC快照 */
    SNAPSHOT_SELF,              /* 当前事务可见 */
    SNAPSHOT_ANY,               /* 所有元组可见(VACUUM用) */
    SNAPSHOT_TOAST,             /* TOAST表快照 */
    SNAPSHOT_DIRTY,             /* 脏读快照 */
    SNAPSHOT_HISTORIC_MVCC,     /* 历史MVCC快照 */
    SNAPSHOT_NON_VACUUMABLE     /* 不可VACUUM快照 */
} SnapshotType;

3. HeapTupleHeaderData(元组头)

/* src/include/access/htup_details.h */
typedef struct HeapTupleHeaderData
{
    union
    {
        HeapTupleFields t_heap;
        DatumTupleFields t_datum;
    } t_choice;
    
    ItemPointerData t_ctid;     /* 当前或新元组的TID */
    
    uint16      t_infomask2;    /* 属性数量和标志位 */
    uint16      t_infomask;     /* 各种标志位 */
    
    uint8       t_hoff;         /* 元组头大小 */
    
    /* 元组数据跟在后面 */
    bits8       t_bits[FLEXIBLE_ARRAY_MEMBER];
} HeapTupleHeaderData;

typedef struct HeapTupleFields
{
    TransactionId t_xmin;       /* 插入事务ID */
    TransactionId t_xmax;       /* 删除事务ID */
    
    union
    {
        CommandId   t_cid;      /* 插入和删除命令ID */
        TransactionId t_xvac;   /* VACUUM事务ID */
    } t_field3;
} HeapTupleFields;

t_infomask 标志位

#define HEAP_HASNULL            0x0001  /* 有NULL属性 */
#define HEAP_HASVARWIDTH        0x0002  /* 有变长属性 */
#define HEAP_HASEXTERNAL        0x0004  /* 有TOAST外部存储 */
#define HEAP_HASOID_OLD         0x0008  /* 有OID(已废弃) */
#define HEAP_XMAX_KEYSHR_LOCK   0x0010  /* xmax是共享键锁 */
#define HEAP_COMBOCID           0x0020  /* t_cid是组合CID */
#define HEAP_XMAX_EXCL_LOCK     0x0040  /* xmax是排他锁 */
#define HEAP_XMAX_LOCK_ONLY     0x0080  /* xmax仅是锁,非删除 */

#define HEAP_XMIN_COMMITTED     0x0100  /* xmin已提交 */
#define HEAP_XMIN_INVALID       0x0200  /* xmin已中止 */
#define HEAP_XMAX_COMMITTED     0x0400  /* xmax已提交 */
#define HEAP_XMAX_INVALID       0x0800  /* xmax已中止 */
#define HEAP_XMAX_IS_MULTI      0x1000  /* xmax是MultiXact */
#define HEAP_UPDATED            0x2000  /* 元组已更新 */
#define HEAP_MOVED_OFF          0x4000  /* 元组已移走 */
#define HEAP_MOVED_IN           0x8000  /* 元组已移入 */

核心功能实现

功能 1:事务开始与提交

StartTransaction() - 开始事务

/* src/backend/access/transam/xact.c */
static void
StartTransaction(void)
{
    TransactionState s = CurrentTransactionState;
    
    /* 1. 设置事务状态 */
    s->state = TRANS_START;
    s->transactionId = InvalidTransactionId;  /* 延迟分配 */
    
    /* 2. 初始化资源管理 */
    AtStart_Memory();
    AtStart_ResourceOwner();
    
    /* 3. 设置事务快照 */
    /* (延迟到第一个查询时获取) */
    
    /* 4. 切换到 TRANS_INPROGRESS 状态 */
    s->state = TRANS_INPROGRESS;
}

GetTransactionId() - 分配事务 ID

TransactionId
GetCurrentTransactionId(void)
{
    TransactionState s = CurrentTransactionState;
    
    if (!FullTransactionIdIsValid(s->fullTransactionId))
    {
        /* 分配新事务ID */
        s->fullTransactionId = GetNewTransactionId(false);
        
        /* 记录到共享内存的ProcArray */
        ProcArrayAdd(MyProc);
    }
    
    return XidFromFullTransactionId(s->fullTransactionId);
}

FullTransactionId
GetNewTransactionId(bool isSubXact)
{
    FullTransactionId full_xid;
    TransactionId xid;
    
    /* 1. 获取下一个事务ID(原子操作) */
    LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
    
    full_xid = ShmemVariableCache->nextXid;
    xid = XidFromFullTransactionId(full_xid);
    
    /* 2. 递增nextXid */
    TransactionIdAdvance(ShmemVariableCache->nextXid);
    
    /* 3. 检查是否接近回卷点 */
    if (TransactionIdFollowsOrEquals(xid,
            ShmemVariableCache->xidVacLimit))
    {
        /* 警告:需要VACUUM防止事务ID回卷 */
        ereport(WARNING,
                (errmsg("transaction ID wrap limit approached")));
    }
    
    LWLockRelease(XidGenLock);
    
    return full_xid;
}

CommitTransaction() - 提交事务

static void
CommitTransaction(void)
{
    TransactionState s = CurrentTransactionState;
    TransactionId latestXid;
    
    /* 1. 预提交处理 */
    CallXactCallbacks(XACT_EVENT_PRE_COMMIT);
    
    /* 2. 写入CLOG(标记事务已提交) */
    latestXid = RecordTransactionCommit();
    
    /* 3. 刷新WAL到磁盘 */
    XLogFlush(XactLastRecEnd);
    
    /* 4. 后提交处理 */
    CallXactCallbacks(XACT_EVENT_COMMIT);
    
    /* 5. 释放锁 */
    ResourceOwnerRelease(TopTransactionResourceOwner,
                         RESOURCE_RELEASE_LOCKS,
                         true, true);
    
    /* 6. 从ProcArray移除 */
    ProcArrayEndTransaction(MyProc, latestXid);
    
    /* 7. 清理状态 */
    s->state = TRANS_DEFAULT;
    s->fullTransactionId = InvalidFullTransactionId;
}

RecordTransactionCommit() - 记录提交

static TransactionId
RecordTransactionCommit(void)
{
    TransactionId xid = GetTopTransactionIdIfAny();
    bool markXidCommitted = TransactionIdIsValid(xid);
    
    if (markXidCommitted)
    {
        /* 1. 写入WAL提交记录 */
        XLogRecPtr recptr;
        
        xl_xact_commit xlrec;
        xlrec.xact_time = GetCurrentTimestamp();
        xlrec.xinfo = 0;
        /* ... 填充更多字段 ... */
        
        XLogBeginInsert();
        XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommit);
        recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT);
        
        /* 2. 设置CLOG状态为COMMITTED */
        TransactionIdCommitTree(xid, nchildren, children);
        
        /* 3. 更新pg_xact文件 */
        /* (由CLOG模块异步刷盘) */
    }
    
    return xid;
}

功能 2:CLOG(事务状态日志)

TransactionIdCommitTree() - 提交事务树

/* src/backend/access/transam/clog.c */
void
TransactionIdCommitTree(TransactionId xid, int nxids, TransactionId *xids)
{
    /* 1. 设置主事务状态 */
    TransactionIdSetStatusBit(xid, TRANSACTION_STATUS_COMMITTED, 
                             XactCompletionRelcacheInitFileInval());
    
    /* 2. 设置所有子事务状态 */
    for (int i = 0; i < nxids; i++)
    {
        TransactionIdSetStatusBit(xids[i], TRANSACTION_STATUS_COMMITTED,
                                 XactCompletionRelcacheInitFileInval());
    }
}

static void
TransactionIdSetStatusBit(TransactionId xid, XidStatus status, bool lsn)
{
    int pageno = TransactionIdToPage(xid);
    int byteno = TransactionIdToByte(xid);
    int bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
    int slotno;
    char *byteptr;
    char byteval;
    
    /* 1. 获取CLOG页面(使用SLRU缓存) */
    slotno = SimpleLruReadPage(ClogCtl, pageno, true, xid);
    byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
    
    /* 2. 设置状态位(2位:00=进行中,01=已提交,10=已中止,11=子事务) */
    byteval = *byteptr;
    byteval &= ~(0x03 << bshift);  /* 清除旧状态 */
    byteval |= (status << bshift);  /* 设置新状态 */
    *byteptr = byteval;
    
    /* 3. 标记页面为脏 */
    ClogCtl->shared->page_dirty[slotno] = true;
    
    /* 4. 释放页面 */
    LWLockRelease(SimpleLruGetBankLock(ClogCtl, pageno));
}

CLOG 文件结构

pg_xact/
  ├── 0000  # 事务0-8191的状态(8KB,每事务2位)
  ├── 0001  # 事务8192-16383的状态
  ├── 0002
  └── ...

每个文件 8KB,存储 8192 个事务的状态(2 位/事务 × 8192 = 16384 位 = 2048 字节,实际文件大小 8KB 包含额外元数据)。

功能 3:快照管理

GetTransactionSnapshot() - 获取快照

/* src/backend/utils/time/snapmgr.c */
Snapshot
GetTransactionSnapshot(void)
{
    if (!FirstSnapshotSet)
    {
        /* 1. 创建新快照 */
        CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
        FirstSnapshotSet = true;
        return CurrentSnapshot;
    }
    
    return CurrentSnapshot;
}

Snapshot
GetSnapshotData(Snapshot snapshot)
{
    ProcArrayStruct *arrayP = procArray;
    TransactionId xmin;
    TransactionId xmax;
    int count = 0;
    
    /* 1. 获取ProcArray共享锁 */
    LWLockAcquire(ProcArrayLock, LW_SHARED);
    
    /* 2. 获取最新的nextXid */
    xmax = ShmemVariableCache->latestCompletedXid;
    TransactionIdAdvance(xmax);
    
    /* 3. 扫描所有活跃事务 */
    xmin = xmax;
    for (int i = 0; i < arrayP->numProcs; i++)
    {
        PGXACT *pgxact = &allPgXact[arrayP->pgprocnos[i]];
        TransactionId xid = pgxact->xid;
        
        if (TransactionIdIsNormal(xid))
        {
            if (TransactionIdPrecedes(xid, xmin))
                xmin = xid;  /* 更新最小活跃XID */
            
            snapshot->xip[count++] = xid;  /* 记录活跃XID */
        }
        
        /* 处理子事务 */
        /* ... 省略子事务处理代码 ... */
    }
    
    LWLockRelease(ProcArrayLock);
    
    /* 4. 填充快照 */
    snapshot->xmin = xmin;
    snapshot->xmax = xmax;
    snapshot->xcnt = count;
    snapshot->curcid = GetCurrentCommandId(false);
    
    return snapshot;
}

功能 4:MVCC 可见性判断

HeapTupleSatisfiesMVCC() - 检查元组可见性

/* src/backend/access/heap/heapam_visibility.c */
bool
HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
                       Buffer buffer)
{
    HeapTupleHeader tuple = htup->t_data;
    
    /* 1. 检查t_xmin(创建事务) */
    if (!HeapTupleHeaderXminCommitted(tuple))
    {
        if (HeapTupleHeaderXminInvalid(tuple))
            return false;  /* 创建事务已中止 */
        
        if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmin(tuple)))
        {
            /* 当前事务创建的元组 */
            if (HeapTupleHeaderGetCmin(tuple) >= snapshot->curcid)
                return false;  /* 当前命令之后创建,不可见 */
            
            /* 检查是否被当前事务删除 */
            if (tuple->t_infomask & HEAP_XMAX_INVALID)
                return true;  /* 未被删除,可见 */
            
            /* ... 检查t_xmax ... */
        }
        else if (XidInMVCCSnapshot(HeapTupleHeaderGetRawXmin(tuple), snapshot))
            return false;  /* 创建事务在快照中,未提交 */
        else if (TransactionIdDidCommit(HeapTupleHeaderGetRawXmin(tuple)))
            SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
                        HeapTupleHeaderGetRawXmin(tuple));
        else
        {
            SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
                        InvalidTransactionId);
            return false;  /* 创建事务已中止 */
        }
    }
    
    /* 2. 检查t_xmax(删除事务) */
    if (tuple->t_infomask & HEAP_XMAX_INVALID)
        return true;  /* 未被删除,可见 */
    
    if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask))
        return true;  /* 仅锁定,非删除 */
    
    if (!HeapTupleHeaderXmaxCommitted(tuple))
    {
        if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmax(tuple)))
        {
            /* 当前事务删除的元组 */
            if (HeapTupleHeaderGetCmax(tuple) >= snapshot->curcid)
                return true;  /* 当前命令之后删除,仍可见 */
            else
                return false;  /* 当前命令之前删除,不可见 */
        }
        
        if (XidInMVCCSnapshot(HeapTupleHeaderGetRawXmax(tuple), snapshot))
            return true;  /* 删除事务在快照中,未提交 */
        
        if (!TransactionIdDidCommit(HeapTupleHeaderGetRawXmax(tuple)))
        {
            SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
                        InvalidTransactionId);
            return true;  /* 删除事务已中止 */
        }
        
        SetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED,
                    HeapTupleHeaderGetRawXmax(tuple));
    }
    
    /* 删除事务已提交且在快照之前 */
    if (TransactionIdPrecedes(HeapTupleHeaderGetRawXmax(tuple), snapshot->xmin))
        return false;  /* 不可见 */
    
    return true;  /* 删除事务在快照之后,元组仍可见 */
}

可见性规则总结

元组可见的条件:

  1. t_xmin(创建事务):

    • 已提交 且
    • < snapshot->xmin(快照之前提交) 或 不在 xip[] 中(快照时已提交)
  2. t_xmax(删除事务):

    • 无效(未被删除) 或
    • 未提交 或
    • = snapshot->xmax(快照之后删除) 或 在 xip[] 中(快照时未提交)


事务隔离级别

Read Committed(读已提交)

BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- 每条SQL获取新快照
SELECT * FROM accounts WHERE id = 1;  -- 快照1
UPDATE accounts SET balance = balance - 100 WHERE id = 1;  -- 快照2
COMMIT;

特点

  • 每条 SQL 获取新快照
  • 可能出现不可重复读

Repeatable Read(可重复读)

BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- 事务开始时获取快照,全程使用
SELECT * FROM accounts WHERE id = 1;  -- 使用快照A
SELECT * FROM accounts WHERE id = 1;  -- 使用快照A(相同结果)
COMMIT;

特点

  • 事务开始时获取快照,全程不变
  • 防止不可重复读和部分幻读
  • PostgreSQL 的 MVCC 实现天然防止幻读

Serializable(可串行化)

BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- 使用SSI(Serializable Snapshot Isolation)检测冲突
SELECT * FROM accounts WHERE id = 1;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;  -- 可能失败,返回serialization_failure

特点

  • 基于 MVCC 快照 + 冲突检测
  • 检测读写冲突、写写冲突
  • 牺牲一个事务,避免串行化异常

性能优化

1. 事务ID消耗优化

-- 只读事务不分配事务ID
BEGIN;
SELECT * FROM users;  -- 不分配XID
COMMIT;

-- 写事务才分配事务ID
BEGIN;
UPDATE users SET name = 'Alice' WHERE id = 1;  -- 分配XID
COMMIT;

2. VACUUM 防止事务ID回卷

-- 自动VACUUM配置
ALTER TABLE large_table SET (autovacuum_vacuum_threshold = 1000);
ALTER TABLE large_table SET (autovacuum_vacuum_scale_factor = 0.1);

-- 手动VACUUM
VACUUM large_table;

-- 查看表的事务ID年龄
SELECT relname, age(relfrozenxid) 
FROM pg_class 
WHERE relkind = 'r' 
ORDER BY age(relfrozenxid) DESC 
LIMIT 10;

3. 子事务优化

-- 避免过多子事务(SAVEPOINT)
BEGIN;
SAVEPOINT sp1;
-- ... 操作1 ...
SAVEPOINT sp2;
-- ... 操作2 ...
-- 每个SAVEPOINT增加开销
COMMIT;

-- 推荐:合理使用SAVEPOINT
BEGIN;
-- 批量操作
SAVEPOINT batch;
-- 如果失败,回滚到SAVEPOINT
ROLLBACK TO batch;
COMMIT;

总结

Transaction 模块是 PostgreSQL 实现 ACID 特性的核心,通过 MVCC 机制实现高并发的事务隔离。关键设计包括:

  1. MVCC:元组多版本,读不阻塞写
  2. 快照隔离:事务看到一致的数据视图
  3. CLOG:高效的事务状态存储
  4. 可见性规则:基于事务ID和快照判断元组可见性
  5. 事务ID回卷:通过VACUUM冻结元组防止回卷