PostgreSQL-05-Transaction-事务管理
模块概览
Transaction 模块实现 PostgreSQL 的 ACID 事务特性,核心是 MVCC(多版本并发控制)机制,通过事务 ID、快照和可见性规则实现高并发的事务隔离。
核心能力
- MVCC(多版本并发控制):读不阻塞写、写不阻塞读
- 事务 ID 管理:32 位事务 ID 分配与回卷处理
- 快照管理:事务可见性快照的创建与维护
- CLOG(事务状态日志):记录事务提交/回滚状态
- 子事务:SAVEPOINT 支持部分回滚
- 两阶段提交:分布式事务支持
架构图
flowchart TB
subgraph Transaction["Transaction Manager"]
TxnMgr[Transaction Manager] --> XID[XID Manager<br/>事务ID分配]
TxnMgr --> Snapshot[Snapshot Manager<br/>快照管理]
TxnMgr --> CLOG[CLOG<br/>事务状态日志]
TxnMgr --> SubXact[SubTransaction<br/>子事务管理]
XID --> NextXID[NextTransactionId<br/>下一个可用XID]
XID --> OldestXID[OldestXmin<br/>最老活跃事务]
Snapshot --> SnapshotData[SnapshotData<br/>xmin/xmax/xip]
CLOG --> SLRU[SLRU<br/>简单LRU缓存]
SLRU --> CLOGFile[(CLOG文件<br/>pg_xact/)]
SubXact --> SubXIDCache[SubXID Cache<br/>子事务缓存]
end
subgraph MVCC["MVCC 可见性"]
Tuple[Tuple Header<br/>元组头] --> TXmin[t_xmin<br/>创建事务ID]
Tuple --> TXmax[t_xmax<br/>删除事务ID]
Tuple --> Infomask[t_infomask<br/>标志位]
Visibility[HeapTupleSatisfies<br/>可见性检查]
end
Snapshot --> Visibility
CLOG --> Visibility
Tuple --> Visibility
核心数据结构
1. TransactionId(事务 ID)
/* src/include/access/transam.h */
typedef uint32 TransactionId;
#define InvalidTransactionId 0
#define BootstrapTransactionId 1
#define FrozenTransactionId 2
#define FirstNormalTransactionId 3
特殊事务 ID
| 事务 ID | 值 | 说明 |
|---|---|---|
| InvalidTransactionId | 0 | 无效事务 |
| BootstrapTransactionId | 1 | 初始化事务 |
| FrozenTransactionId | 2 | 冻结事务(永远可见) |
| FirstNormalTransactionId | 3 | 第一个正常事务 |
事务 ID 回卷问题
- 事务 ID 为 32 位无符号整数,范围:0 ~ 4,294,967,295(约 42 亿)
- 用尽后回卷到 3(FirstNormalTransactionId)
- VACUUM 通过"冻结"旧元组(t_xmin = FrozenTransactionId)避免回卷问题
2. SnapshotData(快照)
/* src/include/utils/snapshot.h */
typedef struct SnapshotData
{
SnapshotType snapshot_type; /* 快照类型 */
TransactionId xmin; /* 最小活跃事务ID */
TransactionId xmax; /* 下一个未分配的事务ID */
TransactionId *xip; /* 活跃事务ID数组 */
uint32 xcnt; /* 活跃事务数量 */
TransactionId *subxip; /* 活跃子事务ID数组 */
int32 subxcnt; /* 活跃子事务数量 */
bool suboverflowed; /* 子事务是否溢出 */
bool takenDuringRecovery; /* 是否在恢复期间获取 */
CommandId curcid; /* 当前命令ID(语句级可见性) */
/* ... 更多字段 ... */
} SnapshotData;
快照类型
typedef enum SnapshotType
{
SNAPSHOT_MVCC = 0, /* 正常MVCC快照 */
SNAPSHOT_SELF, /* 当前事务可见 */
SNAPSHOT_ANY, /* 所有元组可见(VACUUM用) */
SNAPSHOT_TOAST, /* TOAST表快照 */
SNAPSHOT_DIRTY, /* 脏读快照 */
SNAPSHOT_HISTORIC_MVCC, /* 历史MVCC快照 */
SNAPSHOT_NON_VACUUMABLE /* 不可VACUUM快照 */
} SnapshotType;
3. HeapTupleHeaderData(元组头)
/* src/include/access/htup_details.h */
typedef struct HeapTupleHeaderData
{
union
{
HeapTupleFields t_heap;
DatumTupleFields t_datum;
} t_choice;
ItemPointerData t_ctid; /* 当前或新元组的TID */
uint16 t_infomask2; /* 属性数量和标志位 */
uint16 t_infomask; /* 各种标志位 */
uint8 t_hoff; /* 元组头大小 */
/* 元组数据跟在后面 */
bits8 t_bits[FLEXIBLE_ARRAY_MEMBER];
} HeapTupleHeaderData;
typedef struct HeapTupleFields
{
TransactionId t_xmin; /* 插入事务ID */
TransactionId t_xmax; /* 删除事务ID */
union
{
CommandId t_cid; /* 插入和删除命令ID */
TransactionId t_xvac; /* VACUUM事务ID */
} t_field3;
} HeapTupleFields;
t_infomask 标志位
#define HEAP_HASNULL 0x0001 /* 有NULL属性 */
#define HEAP_HASVARWIDTH 0x0002 /* 有变长属性 */
#define HEAP_HASEXTERNAL 0x0004 /* 有TOAST外部存储 */
#define HEAP_HASOID_OLD 0x0008 /* 有OID(已废弃) */
#define HEAP_XMAX_KEYSHR_LOCK 0x0010 /* xmax是共享键锁 */
#define HEAP_COMBOCID 0x0020 /* t_cid是组合CID */
#define HEAP_XMAX_EXCL_LOCK 0x0040 /* xmax是排他锁 */
#define HEAP_XMAX_LOCK_ONLY 0x0080 /* xmax仅是锁,非删除 */
#define HEAP_XMIN_COMMITTED 0x0100 /* xmin已提交 */
#define HEAP_XMIN_INVALID 0x0200 /* xmin已中止 */
#define HEAP_XMAX_COMMITTED 0x0400 /* xmax已提交 */
#define HEAP_XMAX_INVALID 0x0800 /* xmax已中止 */
#define HEAP_XMAX_IS_MULTI 0x1000 /* xmax是MultiXact */
#define HEAP_UPDATED 0x2000 /* 元组已更新 */
#define HEAP_MOVED_OFF 0x4000 /* 元组已移走 */
#define HEAP_MOVED_IN 0x8000 /* 元组已移入 */
核心功能实现
功能 1:事务开始与提交
StartTransaction() - 开始事务
/* src/backend/access/transam/xact.c */
static void
StartTransaction(void)
{
TransactionState s = CurrentTransactionState;
/* 1. 设置事务状态 */
s->state = TRANS_START;
s->transactionId = InvalidTransactionId; /* 延迟分配 */
/* 2. 初始化资源管理 */
AtStart_Memory();
AtStart_ResourceOwner();
/* 3. 设置事务快照 */
/* (延迟到第一个查询时获取) */
/* 4. 切换到 TRANS_INPROGRESS 状态 */
s->state = TRANS_INPROGRESS;
}
GetTransactionId() - 分配事务 ID
TransactionId
GetCurrentTransactionId(void)
{
TransactionState s = CurrentTransactionState;
if (!FullTransactionIdIsValid(s->fullTransactionId))
{
/* 分配新事务ID */
s->fullTransactionId = GetNewTransactionId(false);
/* 记录到共享内存的ProcArray */
ProcArrayAdd(MyProc);
}
return XidFromFullTransactionId(s->fullTransactionId);
}
FullTransactionId
GetNewTransactionId(bool isSubXact)
{
FullTransactionId full_xid;
TransactionId xid;
/* 1. 获取下一个事务ID(原子操作) */
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
full_xid = ShmemVariableCache->nextXid;
xid = XidFromFullTransactionId(full_xid);
/* 2. 递增nextXid */
TransactionIdAdvance(ShmemVariableCache->nextXid);
/* 3. 检查是否接近回卷点 */
if (TransactionIdFollowsOrEquals(xid,
ShmemVariableCache->xidVacLimit))
{
/* 警告:需要VACUUM防止事务ID回卷 */
ereport(WARNING,
(errmsg("transaction ID wrap limit approached")));
}
LWLockRelease(XidGenLock);
return full_xid;
}
CommitTransaction() - 提交事务
static void
CommitTransaction(void)
{
TransactionState s = CurrentTransactionState;
TransactionId latestXid;
/* 1. 预提交处理 */
CallXactCallbacks(XACT_EVENT_PRE_COMMIT);
/* 2. 写入CLOG(标记事务已提交) */
latestXid = RecordTransactionCommit();
/* 3. 刷新WAL到磁盘 */
XLogFlush(XactLastRecEnd);
/* 4. 后提交处理 */
CallXactCallbacks(XACT_EVENT_COMMIT);
/* 5. 释放锁 */
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_LOCKS,
true, true);
/* 6. 从ProcArray移除 */
ProcArrayEndTransaction(MyProc, latestXid);
/* 7. 清理状态 */
s->state = TRANS_DEFAULT;
s->fullTransactionId = InvalidFullTransactionId;
}
RecordTransactionCommit() - 记录提交
static TransactionId
RecordTransactionCommit(void)
{
TransactionId xid = GetTopTransactionIdIfAny();
bool markXidCommitted = TransactionIdIsValid(xid);
if (markXidCommitted)
{
/* 1. 写入WAL提交记录 */
XLogRecPtr recptr;
xl_xact_commit xlrec;
xlrec.xact_time = GetCurrentTimestamp();
xlrec.xinfo = 0;
/* ... 填充更多字段 ... */
XLogBeginInsert();
XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommit);
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT);
/* 2. 设置CLOG状态为COMMITTED */
TransactionIdCommitTree(xid, nchildren, children);
/* 3. 更新pg_xact文件 */
/* (由CLOG模块异步刷盘) */
}
return xid;
}
功能 2:CLOG(事务状态日志)
TransactionIdCommitTree() - 提交事务树
/* src/backend/access/transam/clog.c */
void
TransactionIdCommitTree(TransactionId xid, int nxids, TransactionId *xids)
{
/* 1. 设置主事务状态 */
TransactionIdSetStatusBit(xid, TRANSACTION_STATUS_COMMITTED,
XactCompletionRelcacheInitFileInval());
/* 2. 设置所有子事务状态 */
for (int i = 0; i < nxids; i++)
{
TransactionIdSetStatusBit(xids[i], TRANSACTION_STATUS_COMMITTED,
XactCompletionRelcacheInitFileInval());
}
}
static void
TransactionIdSetStatusBit(TransactionId xid, XidStatus status, bool lsn)
{
int pageno = TransactionIdToPage(xid);
int byteno = TransactionIdToByte(xid);
int bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
int slotno;
char *byteptr;
char byteval;
/* 1. 获取CLOG页面(使用SLRU缓存) */
slotno = SimpleLruReadPage(ClogCtl, pageno, true, xid);
byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
/* 2. 设置状态位(2位:00=进行中,01=已提交,10=已中止,11=子事务) */
byteval = *byteptr;
byteval &= ~(0x03 << bshift); /* 清除旧状态 */
byteval |= (status << bshift); /* 设置新状态 */
*byteptr = byteval;
/* 3. 标记页面为脏 */
ClogCtl->shared->page_dirty[slotno] = true;
/* 4. 释放页面 */
LWLockRelease(SimpleLruGetBankLock(ClogCtl, pageno));
}
CLOG 文件结构
pg_xact/
├── 0000 # 事务0-8191的状态(8KB,每事务2位)
├── 0001 # 事务8192-16383的状态
├── 0002
└── ...
每个文件 8KB,存储 8192 个事务的状态(2 位/事务 × 8192 = 16384 位 = 2048 字节,实际文件大小 8KB 包含额外元数据)。
功能 3:快照管理
GetTransactionSnapshot() - 获取快照
/* src/backend/utils/time/snapmgr.c */
Snapshot
GetTransactionSnapshot(void)
{
if (!FirstSnapshotSet)
{
/* 1. 创建新快照 */
CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
FirstSnapshotSet = true;
return CurrentSnapshot;
}
return CurrentSnapshot;
}
Snapshot
GetSnapshotData(Snapshot snapshot)
{
ProcArrayStruct *arrayP = procArray;
TransactionId xmin;
TransactionId xmax;
int count = 0;
/* 1. 获取ProcArray共享锁 */
LWLockAcquire(ProcArrayLock, LW_SHARED);
/* 2. 获取最新的nextXid */
xmax = ShmemVariableCache->latestCompletedXid;
TransactionIdAdvance(xmax);
/* 3. 扫描所有活跃事务 */
xmin = xmax;
for (int i = 0; i < arrayP->numProcs; i++)
{
PGXACT *pgxact = &allPgXact[arrayP->pgprocnos[i]];
TransactionId xid = pgxact->xid;
if (TransactionIdIsNormal(xid))
{
if (TransactionIdPrecedes(xid, xmin))
xmin = xid; /* 更新最小活跃XID */
snapshot->xip[count++] = xid; /* 记录活跃XID */
}
/* 处理子事务 */
/* ... 省略子事务处理代码 ... */
}
LWLockRelease(ProcArrayLock);
/* 4. 填充快照 */
snapshot->xmin = xmin;
snapshot->xmax = xmax;
snapshot->xcnt = count;
snapshot->curcid = GetCurrentCommandId(false);
return snapshot;
}
功能 4:MVCC 可见性判断
HeapTupleSatisfiesMVCC() - 检查元组可见性
/* src/backend/access/heap/heapam_visibility.c */
bool
HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
Buffer buffer)
{
HeapTupleHeader tuple = htup->t_data;
/* 1. 检查t_xmin(创建事务) */
if (!HeapTupleHeaderXminCommitted(tuple))
{
if (HeapTupleHeaderXminInvalid(tuple))
return false; /* 创建事务已中止 */
if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmin(tuple)))
{
/* 当前事务创建的元组 */
if (HeapTupleHeaderGetCmin(tuple) >= snapshot->curcid)
return false; /* 当前命令之后创建,不可见 */
/* 检查是否被当前事务删除 */
if (tuple->t_infomask & HEAP_XMAX_INVALID)
return true; /* 未被删除,可见 */
/* ... 检查t_xmax ... */
}
else if (XidInMVCCSnapshot(HeapTupleHeaderGetRawXmin(tuple), snapshot))
return false; /* 创建事务在快照中,未提交 */
else if (TransactionIdDidCommit(HeapTupleHeaderGetRawXmin(tuple)))
SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
HeapTupleHeaderGetRawXmin(tuple));
else
{
SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
InvalidTransactionId);
return false; /* 创建事务已中止 */
}
}
/* 2. 检查t_xmax(删除事务) */
if (tuple->t_infomask & HEAP_XMAX_INVALID)
return true; /* 未被删除,可见 */
if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask))
return true; /* 仅锁定,非删除 */
if (!HeapTupleHeaderXmaxCommitted(tuple))
{
if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmax(tuple)))
{
/* 当前事务删除的元组 */
if (HeapTupleHeaderGetCmax(tuple) >= snapshot->curcid)
return true; /* 当前命令之后删除,仍可见 */
else
return false; /* 当前命令之前删除,不可见 */
}
if (XidInMVCCSnapshot(HeapTupleHeaderGetRawXmax(tuple), snapshot))
return true; /* 删除事务在快照中,未提交 */
if (!TransactionIdDidCommit(HeapTupleHeaderGetRawXmax(tuple)))
{
SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
InvalidTransactionId);
return true; /* 删除事务已中止 */
}
SetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED,
HeapTupleHeaderGetRawXmax(tuple));
}
/* 删除事务已提交且在快照之前 */
if (TransactionIdPrecedes(HeapTupleHeaderGetRawXmax(tuple), snapshot->xmin))
return false; /* 不可见 */
return true; /* 删除事务在快照之后,元组仍可见 */
}
可见性规则总结
元组可见的条件:
-
t_xmin(创建事务):
- 已提交 且
- < snapshot->xmin(快照之前提交) 或 不在 xip[] 中(快照时已提交)
-
t_xmax(删除事务):
- 无效(未被删除) 或
- 未提交 或
-
= snapshot->xmax(快照之后删除) 或 在 xip[] 中(快照时未提交)
事务隔离级别
Read Committed(读已提交)
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- 每条SQL获取新快照
SELECT * FROM accounts WHERE id = 1; -- 快照1
UPDATE accounts SET balance = balance - 100 WHERE id = 1; -- 快照2
COMMIT;
特点:
- 每条 SQL 获取新快照
- 可能出现不可重复读
Repeatable Read(可重复读)
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- 事务开始时获取快照,全程使用
SELECT * FROM accounts WHERE id = 1; -- 使用快照A
SELECT * FROM accounts WHERE id = 1; -- 使用快照A(相同结果)
COMMIT;
特点:
- 事务开始时获取快照,全程不变
- 防止不可重复读和部分幻读
- PostgreSQL 的 MVCC 实现天然防止幻读
Serializable(可串行化)
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- 使用SSI(Serializable Snapshot Isolation)检测冲突
SELECT * FROM accounts WHERE id = 1;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT; -- 可能失败,返回serialization_failure
特点:
- 基于 MVCC 快照 + 冲突检测
- 检测读写冲突、写写冲突
- 牺牲一个事务,避免串行化异常
性能优化
1. 事务ID消耗优化
-- 只读事务不分配事务ID
BEGIN;
SELECT * FROM users; -- 不分配XID
COMMIT;
-- 写事务才分配事务ID
BEGIN;
UPDATE users SET name = 'Alice' WHERE id = 1; -- 分配XID
COMMIT;
2. VACUUM 防止事务ID回卷
-- 自动VACUUM配置
ALTER TABLE large_table SET (autovacuum_vacuum_threshold = 1000);
ALTER TABLE large_table SET (autovacuum_vacuum_scale_factor = 0.1);
-- 手动VACUUM
VACUUM large_table;
-- 查看表的事务ID年龄
SELECT relname, age(relfrozenxid)
FROM pg_class
WHERE relkind = 'r'
ORDER BY age(relfrozenxid) DESC
LIMIT 10;
3. 子事务优化
-- 避免过多子事务(SAVEPOINT)
BEGIN;
SAVEPOINT sp1;
-- ... 操作1 ...
SAVEPOINT sp2;
-- ... 操作2 ...
-- 每个SAVEPOINT增加开销
COMMIT;
-- 推荐:合理使用SAVEPOINT
BEGIN;
-- 批量操作
SAVEPOINT batch;
-- 如果失败,回滚到SAVEPOINT
ROLLBACK TO batch;
COMMIT;
总结
Transaction 模块是 PostgreSQL 实现 ACID 特性的核心,通过 MVCC 机制实现高并发的事务隔离。关键设计包括:
- MVCC:元组多版本,读不阻塞写
- 快照隔离:事务看到一致的数据视图
- CLOG:高效的事务状态存储
- 可见性规则:基于事务ID和快照判断元组可见性
- 事务ID回卷:通过VACUUM冻结元组防止回卷