Neo4j-01-核心模块-Kernel与事务处理
模块概览
职责与边界
核心职责
-
事务管理
- KernelTransaction 实例池化与生命周期管理
- 事务隔离级别保证(READ_COMMITTED)
- 事务状态(Transaction State)内存缓冲
- 两阶段提交协调(PreCommit 验证 + WAL 写入 + Storage Apply)
-
数据访问 API
- Read 接口:节点/关系/属性查询、索引查找
- Write 接口:节点/关系创建、属性修改、标签操作
- Token API:标签/关系类型/属性键的字符串与整数映射
- Schema API:索引与约束的创建、删除
-
并发控制
- 实体级锁管理(共享锁/排他锁)
- 死锁检测与自动回滚
- 乐观并发控制(版本检查)
-
查询执行协调
- Cypher 查询执行上下文管理
- 游标(Cursor)池化与资源回收
- 查询超时监控
输入/输出
输入
- GraphDatabaseService API 调用(beginTx、executeTransactionally)
- Cypher 查询请求
- Bolt 协议请求(通过 BoltGraphDatabaseServiceSPI)
输出
- Transaction 实例(公开 API)
- KernelTransaction 实例(内核 API)
- StorageCommand 列表(提交时生成)
- 事务结果(TransactionId、查询结果集)
上下游依赖
上游依赖(调用者)
graphdb-api模块(GraphDatabaseService、Transaction)bolt模块(Bolt 请求处理器)server模块(HTTP REST API)cypher模块(查询执行引擎)
下游依赖(被调用)
record-storage-engine模块(StorageEngine、NeoStores)io模块(PageCache)lock模块(LockManager)wal模块(TransactionLog)configuration模块(Config)monitoring模块(事件监听器)
生命周期
- 数据库启动:
Kernel.init()→ 初始化 KernelTransactions 池 - 运行时:
Kernel.start()→ 启动事务监控与超时守护线程 - 事务创建:从池中获取 KernelTransaction 实例并初始化
- 事务提交/回滚:释放锁、清理状态、归还实例到池
- 数据库关闭:
Kernel.shutdown()→ 等待所有事务完成并释放资源
模块架构图
flowchart TB
subgraph PublicAPI["公开 API 层 (graphdb-api)"]
GDB[GraphDatabaseService]
TX[Transaction]
Node[Node]
Rel[Relationship]
end
subgraph KernelAPI["内核 API 层 (kernel-api)"]
Kernel[Kernel]
KTxAPI[KernelTransaction<br/>接口]
Read[Read API]
Write[Write API]
Token[Token API]
Schema[Schema API]
Cursors[Cursor Factory]
end
subgraph KernelImpl["内核实现层 (kernel)"]
KernelImpl[KernelImpl]
KTxs[KernelTransactions<br/>事务池]
KTxImpl[KernelTransactionImpl<br/>事务实现]
subgraph TxState["事务状态管理"]
TxStateObj[TxState<br/>内存缓冲]
Operations[Operations<br/>读写操作实现]
TxStateUpdater[IndexTxStateUpdater<br/>索引状态同步]
end
subgraph CommitFlow["提交流程"]
CommitProc[TransactionCommitProcess]
Committer[TransactionCommitter]
CmdCreator[CommandCreationContext<br/>命令生成]
end
subgraph LockMgmt["锁管理"]
LockClient[LockManager.Client]
EntityLocks[EntityLocks<br/>实体锁接口]
end
end
subgraph Storage["存储引擎层"]
StorageEng[StorageEngine]
StorageReader[StorageReader]
WAL[TransactionLog<br/>WAL]
end
GDB --> Kernel
TX --> KTxAPI
Node --> Read
Rel --> Read
Kernel --> KernelImpl
KTxAPI --> KTxImpl
KernelImpl --> KTxs
KTxs --> KTxImpl
KTxImpl --> TxState
KTxImpl --> CommitFlow
KTxImpl --> LockMgmt
Operations --> TxStateObj
Operations --> StorageReader
CommitProc --> CmdCreator
CommitProc --> WAL
CommitProc --> StorageEng
Committer --> CommitProc
KTxImpl --> Committer
LockClient --> EntityLocks
Read --> Cursors
Write --> Operations
架构说明
1. 公开 API 层(graphdb-api)
- GraphDatabaseService:数据库服务入口,提供
beginTx()、executeTransactionally()等方法 - Transaction:用户事务接口,支持
commit()、rollback()、图操作 - Node/Relationship:图实体抽象,封装内部 ID 与属性访问
2. 内核 API 层(kernel-api)
- Kernel:内核服务接口,负责创建 KernelTransaction
- KernelTransaction:事务核心接口,提供读写、锁、游标等能力
- Read/Write API:低层次数据访问接口,直接操作 Transaction State
- Cursor:游标抽象,用于高效遍历节点/关系,减少对象分配
3. 内核实现层(kernel)
- KernelImpl:Kernel 接口实现,管理全局状态
- KernelTransactions:事务池管理器,维护所有活跃事务
- KernelTransactionImplementation:事务核心实现,约 1900 行代码,重度优化
- 池化复用:事务对象归还池后重置状态,避免频繁 GC
- 状态机:NONE → EXPLICIT/IMPLICIT → COMMITTING → COMMITTED/ROLLED_BACK
- 线程绑定:CursorContext 绑定执行线程,避免跨线程访问
4. 事务状态管理
- TxState:内存中的事务变更缓冲区,记录节点/关系/属性的增删改
- Operations:Read/Write API 的实际实现,桥接 TxState 与 StorageReader
- IndexTxStateUpdater:同步更新事务状态到索引查询层
5. 提交流程
- TransactionCommitProcess:提交流程协调器,依次执行:
- 验证约束(PreCommit)
- 生成 StorageCommand(持久化命令)
- 写入 WAL
- 应用到 StorageEngine
- TransactionCommitter:提交器实现,支持分块提交(Chunked Commit)
6. 锁管理
- LockManager.Client:每个事务持有一个锁客户端
- EntityLocks:实体级锁接口,支持节点/关系/索引的共享锁与排他锁
- 死锁检测:基于等待图(Wait-For Graph)检测死锁,选择代价最小的事务回滚
核心数据结构
UML 类图
classDiagram
class KernelTransaction {
<<interface>>
+commit() long
+rollback() void
+dataRead() Read
+dataWrite() Write
+tokenRead() TokenRead
+tokenWrite() TokenWrite
+schemaRead() SchemaRead
+schemaWrite() SchemaWrite
+locks() Locks
+cursors() CursorFactory
+close() long
}
class KernelTransactionImplementation {
-TransactionId transactionId
-TransactionWriteState writeState
-TxState txState
-LockManager.Client lockClient
-long startTimeMillis
-TransactionTimeout timeout
-SecurityContext securityContext
-CursorContext cursorContext
+initialize(lastCommittedTx, type, ...) this
+commit(monitor) long
+rollback() void
+dataRead() Read
+dataWrite() Write
-performCommit() long
-releaseResources() void
}
class TxState {
<<interface>>
+nodeIsAddedInThisBatch(nodeId) boolean
+nodeIsDeletedInThisBatch(nodeId) boolean
+addedAndRemovedNodes() DiffSets~Long~
+relationshipsWithChangedProperty(propKey) DiffSets~Long~
+augmentWithLocalState() void
}
class Read {
<<interface>>
+nodeExists(nodeId) boolean
+singleNode(nodeId, cursor) void
+allNodesScan(cursor) Scan
+nodeLabelScan(label, cursor) Scan
+nodeIndexSeek(index, cursor, predicates) void
}
class Write {
<<interface>>
+nodeCreate() long
+nodeCreateWithLabels(labels) long
+nodeDelete(nodeId) boolean
+relationshipCreate(source, type, target) long
+nodeAddLabel(nodeId, labelId) boolean
+nodeSetProperty(nodeId, key, value) void
}
class Operations {
-StorageReader storageReader
-TxState txState
-CursorContext cursorContext
-LockManager.Client locks
+nodeCreate() long
+nodeExists(nodeId) boolean
+nodeGetProperty(cursor, propKey) Value
-acquireExclusiveNodeLock(nodeId) void
}
class TransactionCommitProcess {
<<interface>>
+commit(batch, event, mode) long
}
class InternalTransactionCommitProcess {
-TransactionAppender appender
-StorageEngine storageEngine
+commit(batch, event, mode) long
-appendToLog(batch, event) long
-applyToStore(batch, event, mode) void
}
class LockManager.Client {
-LeaseClient leaseClient
-long transactionSequenceNumber
+acquireExclusive(resource) void
+acquireShared(resource) void
+releaseAll() void
-detectDeadlock() void
}
class TransactionTimeout {
-Duration duration
-Status timeoutStatus
+isExpired(startTime, clock) boolean
+checkTimeout(startTime, clock) void
}
KernelTransaction <|-- KernelTransactionImplementation
KernelTransactionImplementation --> TxState
KernelTransactionImplementation --> Read
KernelTransactionImplementation --> Write
KernelTransactionImplementation --> LockManager.Client
KernelTransactionImplementation --> TransactionTimeout
Read <|-- Operations
Write <|-- Operations
Operations --> TxState
KernelTransactionImplementation --> TransactionCommitProcess
TransactionCommitProcess <|-- InternalTransactionCommitProcess
字段说明
KernelTransactionImplementation 核心字段
| 字段名 | 类型 | 说明 |
|---|---|---|
transactionId |
long |
事务提交后的全局唯一 ID(提交前为 -1) |
writeState |
TransactionWriteState |
事务写状态:NONE(只读)、DATA(数据写)、SCHEMA(模式写) |
txState |
TxState |
事务内存缓冲区,记录所有未提交的变更 |
lockClient |
LockManager.Client |
锁客户端,持有当前事务的所有锁 |
startTimeMillis |
long |
事务启动时间(毫秒) |
startTimeNanos |
long |
事务启动时间(纳秒,用于性能统计) |
timeout |
TransactionTimeout |
事务超时配置 |
securityContext |
SecurityContext |
安全上下文(用户权限、角色) |
cursorContext |
CursorContext |
游标上下文(Page Cache 追踪) |
closing |
boolean |
标记事务正在关闭 |
closed |
boolean |
标记事务已关闭 |
terminationMark |
TerminationMark |
事务被终止的原因(超时/用户取消) |
TxState 接口关键方法
| 方法 | 返回值 | 说明 |
|---|---|---|
nodeIsAddedInThisBatch(nodeId) |
boolean |
节点是否在当前事务中创建 |
nodeIsDeletedInThisBatch(nodeId) |
boolean |
节点是否在当前事务中删除 |
addedAndRemovedNodes() |
DiffSets<Long> |
新增与删除的节点 ID 集合 |
getNodeState(nodeId) |
NodeState |
获取节点的事务状态(属性、标签变更) |
addedRelationships() |
Iterable<Long> |
新增的关系 ID 列表 |
Operations 类(Read/Write 实现)
- 职责:桥接 TxState 与 StorageReader,合并内存变更与磁盘数据
- 锁获取:写操作前自动获取实体级排他锁
- 约束验证:属性修改时检查唯一性约束
核心 API 详解
API 1:beginTransaction(开始事务)
基本信息
- 接口名称:
Kernel.beginTransaction - 方法签名:
KernelTransaction beginTransaction(Type type, LoginContext loginContext, ClientConnectionInfo connectionInfo, long timeout) - 幂等性:否(每次调用创建新事务)
请求参数
public enum Type {
EXPLICIT, // 显式事务(用户调用 beginTx())
IMPLICIT // 隐式事务(executeTransactionally)
}
public interface LoginContext {
SecurityContext authorize(TokenLookup tokenLookup, ...);
}
public interface ClientConnectionInfo {
String clientAddress();
String requestingAgent();
}
| 参数 | 类型 | 必填 | 默认值 | 约束 | 说明 |
|---|---|---|---|---|---|
type |
Type |
是 | - | EXPLICIT/IMPLICIT | 事务类型 |
loginContext |
LoginContext |
是 | - | - | 登录上下文(认证信息) |
connectionInfo |
ClientConnectionInfo |
是 | - | - | 客户端连接信息 |
timeout |
long |
否 | 配置值 | >0 或 0(无限制) | 超时时间(毫秒) |
响应结构
public interface KernelTransaction extends AutoCloseable {
long commit() throws TransactionFailureException;
void rollback() throws TransactionFailureException;
Read dataRead();
Write dataWrite();
// ... 其他方法
}
| 字段/方法 | 类型 | 说明 |
|---|---|---|
commit() |
long |
提交事务,返回 TransactionId |
rollback() |
void |
回滚事务 |
dataRead() |
Read |
获取数据读接口 |
dataWrite() |
Write |
获取数据写接口 |
getTransactionSequenceNumber() |
long |
事务序列号(全局递增) |
入口函数核心代码
// community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelImpl.java
public KernelTransaction beginTransaction(
Type type,
LoginContext loginContext,
ClientConnectionInfo connectionInfo,
long timeout
) throws TransactionFailureException {
// 1. 检查数据库状态
panic.assertNoPanic(TransactionFailureException.class);
// 2. 创建超时配置
TransactionTimeout transactionTimeout = new TransactionTimeout(
Duration.ofMillis(timeout),
TransactionTimedOutClientConfiguration
);
// 3. 从事务池获取实例
KernelTransaction transaction = transactions.newInstance(
type,
loginContext,
connectionInfo,
transactionTimeout
);
// 4. 注册监控
transactionMonitor.transactionStarted();
transactionExecutionMonitor.start(transaction);
return transaction;
}
代码说明
- Panic 检查:如果数据库处于 Panic 状态(如页面损坏),拒绝新事务
- 超时配置:将毫秒转为 Duration,关联超时错误状态码
- 池化获取:
transactions.newInstance()从对象池获取 KernelTransaction,避免频繁 GC - 监控注册:启动事务监控,记录事务启动时间与线程信息
调用链路核心代码
// community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java
public KernelTransaction newInstance(
Type type,
LoginContext loginContext,
ClientConnectionInfo clientInfo,
TransactionTimeout timeout
) {
// 1. 防止在阻塞新事务时创建事务
assertCurrentThreadIsNotBlockingNewTransactions();
// 2. 创建安全上下文
SecurityContext securityContext = loginContext.authorize(
new TokenHoldersIdLookup(tokenHolders, procedureView, isStale),
sessionDatabase,
securityLog
);
// 3. 分配事务序列号(全局递增)
long sequenceNumber = sequenceNumberCounter.incrementAndGet();
// 4. 从池中获取实例并初始化
KernelTransaction tx = pool.acquire().initialize(
lastCommittedTransactionId.get(),
type,
securityContext,
timeout,
sequenceNumber,
clientInfo,
procedureView
);
// 5. 获取串行执行锁(如果启用)
databaseSerialGuard.acquireSerialLock(tx);
return tx;
}
调用链说明
- 阻塞检查:数据库关闭时阻止新事务创建
- 安全授权:根据用户权限创建 SecurityContext
- 序列号分配:全局原子递增,用于锁排序与死锁检测
- 对象初始化:重置事务状态,绑定 CursorContext
- 串行锁:某些操作(如模式变更)需要串行执行
API 2:commit(提交事务)
基本信息
- 接口名称:
KernelTransaction.commit - 方法签名:
long commit(KernelTransactionMonitor monitor) throws TransactionFailureException - 幂等性:否(重复提交抛出异常)
请求参数
public interface KernelTransactionMonitor {
void beforeApply(); // 在应用存储命令前调用
void afterCommit(ExecutionStatistics statistics); // 提交后调用
}
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
monitor |
KernelTransactionMonitor |
否 | NO_MONITOR |
提交过程监控器 |
响应值
| 返回值 | 类型 | 说明 |
|---|---|---|
transactionId |
long |
成功:全局事务 ID(>0) 只读:0 回滚:-1 |
入口函数核心代码
// community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java
public long commit(KernelTransactionMonitor monitor) throws TransactionFailureException {
// 1. 状态检查
assertOpen();
markAsClosed();
// 2. 检查终止状态
if (terminationMark != null) {
rollback();
failOnTermination();
}
try {
// 3. 触发 PreCommit 事件
transactionEventListeners.beforeCommit(txState, this, storageReader);
// 4. 执行提交流程
long txId = performCommit();
// 5. 触发 AfterCommit 事件
transactionEventListeners.afterCommit(txState, this, txId);
// 6. 监控回调
monitor.afterCommit(statistics);
return txId;
} finally {
// 7. 释放资源
release();
}
}
private long performCommit() throws TransactionFailureException {
// 只读事务直接返回
if (txState == null || !txState.hasChanges()) {
return READ_ONLY_ID;
}
// 1. 生成存储命令
List<StorageCommand> commands = new ArrayList<>();
storageEngine.createCommands(
txState,
storageReader,
commandCreationContext,
locks,
commands
);
// 2. 提交到存储引擎
long txId = committer.commit(
commands,
kernelTransactionMonitor
);
// 3. 记录统计信息
statistics.addHeapAllocatedBytes(transactionMemoryPool.usedHeap());
statistics.addNativeAllocatedBytes(transactionMemoryPool.usedNative());
return txId;
}
代码说明
- 状态检查:确保事务未被并发提交/回滚
- 终止检查:如果事务被终止(超时/用户取消),强制回滚
- PreCommit 事件:触发事务事件监听器(用于约束验证、审计)
- 命令生成:将 TxState 转换为 StorageCommand 列表
- 提交流程:写入 WAL + 应用到存储引擎
- AfterCommit 事件:通知索引更新、统计信息更新
- 资源释放:释放锁、清理 TxState、归还对象池
两阶段提交详细流程
// community/kernel/src/main/java/org/neo4j/kernel/impl/api/InternalTransactionCommitProcess.java
public long commit(
StorageEngineTransaction batch,
TransactionWriteEvent transactionWriteEvent,
TransactionApplicationMode mode
) throws TransactionFailureException {
try {
// 阶段 1:预分配存储空间(可选优化)
if (preAllocateSpaceInStores) {
storageEngine.allocate(batch);
}
// 阶段 2:预取页面到 PageCache(可选优化)
if (prefetchCommands.getAsBoolean()) {
storageEngine.prefetchPagesForCommands(batch, mode);
}
// 阶段 3:写入 WAL(持久化保证)
long lastAppendIndex = appendToLog(batch, transactionWriteEvent);
try {
// 阶段 4:应用到存储引擎
applyToStore(batch, transactionWriteEvent, mode);
} finally {
// 确保批次关闭
close(batch);
}
// 阶段 5:通知成功
commandCommitListeners.registerSuccess(batch, lastAppendIndex);
return lastAppendIndex;
} catch (Exception e) {
commandCommitListeners.registerFailure(batch, e);
throw e;
}
}
private long appendToLog(
StorageEngineTransaction batch,
TransactionWriteEvent event
) throws IOException {
// 序列化命令到 WAL
return appender.append(batch.commands(), event.chunkAppender());
}
private void applyToStore(
StorageEngineTransaction batch,
TransactionWriteEvent event,
TransactionApplicationMode mode
) throws Exception {
// 应用命令到存储引擎(更新 NeoStores)
storageEngine.apply(batch, mode);
}
API 3:dataWrite(数据写操作)
基本信息
- 接口名称:
KernelTransaction.dataWrite - 方法签名:
Write dataWrite() throws InvalidTransactionTypeKernelException - 幂等性:是(多次调用返回同一实例)
核心写操作方法
nodeCreate(创建节点)
// community/kernel-api/src/main/java/org/neo4j/internal/kernel/api/Write.java
public interface Write {
/**
* 创建节点
* @return 节点 ID
*/
long nodeCreate();
/**
* 创建节点并分配标签
* @param labels 标签 ID 数组
* @return 节点 ID
*/
long nodeCreateWithLabels(int[] labels) throws ConstraintValidationException;
/**
* 删除节点
* @param nodeId 节点 ID
* @return true 如果节点存在并被删除
*/
boolean nodeDelete(long nodeId) throws AutoIndexingKernelException;
/**
* 添加标签到节点
* @param nodeId 节点 ID
* @param labelId 标签 ID
* @return true 如果标签被添加(之前不存在)
*/
boolean nodeAddLabel(long nodeId, int labelId)
throws EntityNotFoundException, ConstraintValidationException;
/**
* 设置节点属性
* @param nodeId 节点 ID
* @param propertyKey 属性键 ID
* @param value 属性值
*/
void nodeSetProperty(long nodeId, int propertyKey, Value value)
throws EntityNotFoundException, ConstraintValidationException;
/**
* 创建关系
* @param sourceNode 起点节点 ID
* @param relationshipType 关系类型 ID
* @param targetNode 终点节点 ID
* @return 关系 ID
*/
long relationshipCreate(long sourceNode, int relationshipType, long targetNode)
throws EntityNotFoundException;
}
实现代码(Operations 类)
// community/kernel/src/main/java/org/neo4j/kernel/impl/newapi/Operations.java
public long nodeCreate() {
// 1. 检查写权限
ktx.securityAuthorizationHandler.assertAllowsCreateNode(ktx.securityContext(), labels);
// 2. 分配节点 ID
long nodeId = commandCreationContext.reserveNode();
// 3. 获取排他锁
acquireExclusiveNodeLock(nodeId);
// 4. 记录到 TxState
ktx.txState().nodeDoCreate(nodeId);
return nodeId;
}
public void nodeSetProperty(long nodeId, int propertyKey, Value value)
throws EntityNotFoundException, ConstraintValidationException {
// 1. 检查节点存在性
if (!nodeExists(nodeId)) {
throw new EntityNotFoundException(EntityType.NODE, nodeId);
}
// 2. 获取排他锁
acquireExclusiveNodeLock(nodeId);
// 3. 检查约束
ConstraintDescriptor constraint = getUniqueConstraint(propertyKey);
if (constraint != null) {
validateUniqueConstraint(nodeId, propertyKey, value, constraint);
}
// 4. 记录属性变更
ktx.txState().nodeDoReplaceProperty(nodeId, propertyKey, storageProperty, value);
// 5. 更新索引状态
indexTxStateUpdater.onPropertyAdd(nodeId, propertyKey, value);
}
操作流程说明
- 权限检查:根据 SecurityContext 验证操作权限
- ID 分配:从 IdGenerator 获取唯一 ID
- 锁获取:自动获取实体级排他锁,防止并发冲突
- 状态记录:将变更写入 TxState(内存缓冲)
- 约束验证:检查唯一性约束、非空约束
- 索引同步:通知索引更新器,准备索引刷新
关键流程时序图
时序图 1:完整事务提交流程
sequenceDiagram
autonumber
participant User as 用户代码
participant GDB as GraphDatabaseService
participant Kernel as Kernel
participant KTx as KernelTransaction
participant TxState as TxState
participant Locks as LockManager
participant CommitProc as CommitProcess
participant WAL as TransactionLog
participant Storage as StorageEngine
participant PageCache as PageCache
User->>GDB: beginTx()
GDB->>Kernel: beginTransaction(EXPLICIT, loginContext, ...)
Kernel->>KTx: pool.acquire().initialize(...)
KTx->>Locks: lockClient.initialize()
KTx->>TxState: 创建空 TxState
Kernel-->>GDB: 返回 KernelTransaction
GDB-->>User: 返回 Transaction
User->>User: tx.createNode(Label.label("Person"))
User->>KTx: dataWrite().nodeCreate()
KTx->>Locks: acquireExclusiveNodeLock(nodeId)
Locks-->>KTx: 锁获取成功
KTx->>TxState: nodeDoCreate(nodeId)
TxState-->>KTx: 记录节点创建
User->>KTx: dataWrite().nodeSetProperty(nodeId, key, value)
KTx->>Locks: acquireExclusiveNodeLock(nodeId)
KTx->>TxState: nodeDoReplaceProperty(nodeId, key, value)
TxState-->>KTx: 记录属性变更
User->>User: tx.commit()
User->>KTx: commit()
KTx->>KTx: 检查 terminationMark
KTx->>KTx: 触发 beforeCommit 事件
KTx->>Storage: createCommands(txState, storageReader, ...)
Storage->>TxState: 遍历变更集
Storage->>Storage: 生成 NodeCommand、PropertyCommand
Storage-->>KTx: 返回 StorageCommand 列表
KTx->>CommitProc: commit(commands, transactionWriteEvent, mode)
CommitProc->>WAL: append(commands)
WAL->>PageCache: 写入日志页面
PageCache->>PageCache: 标记 Dirty Page
WAL-->>CommitProc: 返回 TransactionId
CommitProc->>Storage: apply(commands, mode)
Storage->>PageCache: 更新节点/属性页面
PageCache->>PageCache: 修改 Page Buffer
Storage-->>CommitProc: 应用完成
CommitProc-->>KTx: 返回 TransactionId
KTx->>KTx: 触发 afterCommit 事件
KTx->>Locks: lockClient.releaseAll()
Locks->>Locks: 释放所有锁
KTx->>TxState: 清空 TxState
KTx->>KTx: pool.release(this)
KTx-->>User: 返回 TransactionId
Note over PageCache: Checkpoint 线程<br/>定期刷新 Dirty Pages
时序图说明
事务启动阶段(步骤 1-7)
- 对象池化:从 KernelTransactions 池获取实例,避免频繁 GC
- 锁客户端初始化:绑定事务序列号,用于死锁检测
- TxState 创建:分配内存缓冲区,初始容量较小,按需扩展
数据写入阶段(步骤 8-13)
- 锁获取时机:写操作前自动获取排他锁,阻塞其他事务的读写
- TxState 记录:所有变更仅存在内存中,提交前不可见于其他事务
- 约束验证:属性修改时检查唯一性约束(通过索引查找)
提交阶段(步骤 14-26)
- PreCommit 验证:检查约束、触发事务监听器
- 命令生成:将 TxState 转换为 StorageCommand(节点/关系/属性命令)
- WAL 写入:顺序写入事务日志,保证持久化(fsync 可配置)
- 存储应用:更新 NeoStores 页面(节点/关系/属性存储)
- AfterCommit 事件:触发索引更新、统计信息刷新
- 资源释放:释放所有锁、清空 TxState、归还对象池
性能优化点
- 批量提交:Group Commit 机制合并多个事务的 WAL 写入
- 锁粒度:实体级锁,避免表级锁的串行化瓶颈
- 页面预取:提交前预取相关页面到 Page Cache,减少 I/O 等待
- 对象复用:KernelTransaction、Cursor、TxState 均池化复用
时序图 2:并发事务与死锁检测
sequenceDiagram
autonumber
participant Tx1 as Transaction 1
participant Tx2 as Transaction 2
participant Locks as LockManager
participant Deadlock as DeadlockDetector
Tx1->>Locks: acquireExclusive(Node 100)
Locks-->>Tx1: 锁获取成功
Tx2->>Locks: acquireExclusive(Node 200)
Locks-->>Tx2: 锁获取成功
Note over Tx1,Tx2: 两个事务持有不同节点的锁
Tx1->>Locks: acquireExclusive(Node 200)
Locks->>Locks: 检测冲突(Tx2 持有锁)
Locks->>Tx1: 阻塞等待
Tx2->>Locks: acquireExclusive(Node 100)
Locks->>Locks: 检测冲突(Tx1 持有锁)
Locks->>Deadlock: 构建等待图
Deadlock->>Deadlock: 检测到环:Tx1 → Tx2 → Tx1
Deadlock->>Deadlock: 选择代价最小的事务(Tx2)
Deadlock->>Tx2: 抛出 DeadlockDetectedException
Tx2->>Tx2: 自动回滚
Tx2->>Locks: lockClient.releaseAll()
Locks->>Tx1: 唤醒等待
Locks-->>Tx1: 锁获取成功
Tx1->>Tx1: 继续执行
Tx1->>Locks: commit()
Locks->>Locks: releaseAll()
死锁检测说明
死锁检测算法
- 等待图构建:每个事务维护等待的锁资源与持有者
- 环检测:定期扫描等待图,查找环路
- 受害者选择:选择代价最小的事务回滚(基于执行时间、锁数量)
- 自动重试:客户端驱动自动重试被回滚的事务
避免死锁的最佳实践
- 锁排序:总是按固定顺序获取锁(如按节点 ID 升序)
- 短事务:减少锁持有时间,降低死锁概率
- 读写分离:只读查询不获取排他锁,避免阻塞
关键函数详解
函数 1:initialize(事务初始化)
函数签名
public KernelTransactionImplementation initialize(
long lastCommittedTx,
Type type,
SecurityContext frozenSecurityContext,
TransactionTimeout transactionTimeout,
long transactionSequenceNumber,
ClientConnectionInfo clientInfo,
ProcedureView procedureView
)
参数说明
| 参数 | 类型 | 说明 |
|---|---|---|
lastCommittedTx |
long |
上一个已提交事务的 ID(用于快照隔离) |
type |
Type |
事务类型(EXPLICIT/IMPLICIT) |
frozenSecurityContext |
SecurityContext |
冻结的安全上下文(不可变) |
transactionTimeout |
TransactionTimeout |
超时配置 |
transactionSequenceNumber |
long |
事务序列号(全局唯一) |
clientInfo |
ClientConnectionInfo |
客户端连接信息 |
procedureView |
ProcedureView |
存储过程视图(快照) |
核心代码
public KernelTransactionImplementation initialize(...) {
// 1. 重置内存状态
assert transactionMemoryPool.usedHeap() == 0;
assert transactionMemoryPool.usedNative() == 0;
// 2. 创建游标上下文(绑定到当前线程)
CURSOR_CONTEXT_HANDLE.setRelease(
this,
contextFactory.create(TRANSACTION_TAG)
);
this.transactionalCursors.reset(cursorContext);
// 3. 初始化访问能力(只读/读写)
this.accessCapability = accessCapabilityFactory.newAccessCapability(
readOnlyDatabaseChecker
);
// 4. 初始化锁客户端
this.leaseClient = leaseService.newClient();
this.lockClient.initialize(
leaseClient,
transactionSequenceNumber,
memoryTracker,
config
);
// 5. 重置事务状态
this.terminationMark = null;
this.commit = false;
this.writeState = TransactionWriteState.NONE;
this.startTimeMillis = clocks.systemClock().millis();
this.startTimeNanos = clocks.systemClock().nanos();
this.timeout = transactionTimeout;
this.lastTransactionIdWhenStarted = lastCommittedTx;
// 6. 启动事务追踪
this.transactionEvent = transactionTracer.beginTransaction(
cursorContext,
transactionSequenceNumber
);
// 7. 设置安全上下文
this.overridableSecurityContext = new OverridableSecurityContext(
frozenSecurityContext
);
// 8. 初始化操作接口
this.commandCreationContext.initialize(
kernelVersionProvider,
cursorContext,
transactionalCursors,
kernelTransactions::startTimeOfOldestExecutingTransaction,
lockClient,
currentStatement::lockTracer
);
this.currentStatement.initialize(lockClient, cursorContext, startTimeMillis);
this.operations.initialize(cursorContext);
// 9. 设置内存限制
this.transactionMemoryPool.setLimit(transactionHeapBytesLimit);
// 10. 设置事务序列号(volatile 写,happens-before)
this.transactionSequenceNumber = transactionSequenceNumber;
this.closing = false;
this.closed = false;
return this;
}
关键点说明
- 对象池化:此方法在对象从池中取出后调用,重置所有可变状态
- 线程绑定:CursorContext 绑定到执行线程,PageCache 追踪依赖此绑定
- 内存追踪:每个事务分配独立的内存池,超限时抛出异常
- Happens-Before:
transactionSequenceNumber使用 volatile 写,保证可见性
函数 2:release(事务资源释放)
函数签名
private void release()
核心代码
private void release() {
try {
// 1. 释放锁
lockClient.close();
// 2. 释放 Lease
if (leaseClient != NO_LEASE) {
leaseClient.close();
}
// 3. 关闭游标
transactionalCursors.close();
// 4. 关闭存储读取器
if (storageReader != null) {
storageReader.close();
}
// 5. 释放命令创建上下文
commandCreationContext.close();
// 6. 清空 TxState
if (txState != null) {
txState.release();
txState = null;
}
// 7. 释放内存
transactionMemoryPool.reset();
// 8. 停止事务追踪
if (transactionEvent != null) {
transactionEvent.close();
transactionEvent = null;
}
// 9. 归还对象到池
pool.release(this);
} catch (Exception e) {
// 标记为失败清理,此对象不应再被复用
failedCleanup = true;
throw e;
}
}
资源释放顺序说明
- 锁优先释放:避免阻塞其他事务
- 游标关闭:释放 Page Cache 引用,允许页面驱逐
- 存储读取器关闭:释放快照引用
- TxState 清空:释放内存缓冲区(可能占用大量内存)
- 对象归还池:标记对象可复用,避免 GC
最佳实践与性能优化
实践 1:批量操作优化
问题:逐条创建节点性能低下
// ❌ 低效实现:每次循环都提交事务
for (int i = 0; i < 100000; i++) {
try (Transaction tx = db.beginTx()) {
tx.createNode(Label.label("Person"));
tx.commit();
}
}
优化方案:批量提交
// ✅ 高效实现:批量提交
int batchSize = 10000;
for (int i = 0; i < 100000; i++) {
if (i % batchSize == 0) {
if (tx != null) {
tx.commit();
}
tx = db.beginTx();
}
tx.createNode(Label.label("Person"));
}
if (tx != null) {
tx.commit();
}
性能提升:100 倍以上(减少事务开销与锁竞争)
实践 2:避免长事务
问题:长事务占用锁资源,阻塞其他事务
// ❌ 长事务:持有锁数小时
try (Transaction tx = db.beginTx()) {
for (Node node : tx.getAllNodes()) {
// 处理每个节点(可能数百万个)
processNode(node);
}
tx.commit();
}
优化方案:分批处理
// ✅ 分批处理:每批处理 1000 个节点
long lastId = 0;
while (true) {
List<Long> nodeIds = new ArrayList<>();
// 只读事务:获取节点 ID
try (Transaction tx = db.beginTx()) {
Result result = tx.execute(
"MATCH (n) WHERE id(n) > $lastId " +
"RETURN id(n) ORDER BY id(n) LIMIT 1000",
Map.of("lastId", lastId)
);
while (result.hasNext()) {
nodeIds.add((Long) result.next().get("id(n)"));
}
tx.commit();
}
if (nodeIds.isEmpty()) break;
// 写事务:处理节点
try (Transaction tx = db.beginTx()) {
for (long nodeId : nodeIds) {
Node node = tx.getNodeById(nodeId);
processNode(node);
}
tx.commit();
}
lastId = nodeIds.get(nodeIds.size() - 1);
}
实践 3:配置优化
关键配置项
# 事务超时(防止死锁与长事务)
db.transaction.timeout=60s
# 锁获取超时(快速失败,避免长时间阻塞)
db.lock.acquisition.timeout=30s
# 事务内存上限(防止 OOM)
db.memory.transaction.max=50m
# 并发事务数(根据 CPU 核心数调整)
db.transaction.concurrent.maximum=1000
框架使用示例
示例 1:嵌入式模式启动
import org.neo4j.dbms.api.DatabaseManagementService;
import org.neo4j.dbms.api.DatabaseManagementServiceBuilder;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction;
public class EmbeddedNeo4j {
public static void main(String[] args) {
// 1. 创建数据库管理服务
DatabaseManagementService managementService =
new DatabaseManagementServiceBuilder(
Paths.get("/path/to/neo4j-db")
)
.setConfig(GraphDatabaseSettings.pagecache_memory, "512M")
.setConfig(GraphDatabaseSettings.transaction_timeout, Duration.ofSeconds(60))
.build();
// 2. 注册关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
managementService.shutdown();
}));
// 3. 获取数据库实例
GraphDatabaseService db = managementService.database("neo4j");
// 4. 执行事务
try (Transaction tx = db.beginTx()) {
Node person = tx.createNode(Label.label("Person"));
person.setProperty("name", "Alice");
tx.commit();
}
}
}
示例 2:自定义事务监听器
import org.neo4j.kernel.internal.event.TransactionEventListeners;
import org.neo4j.graphdb.event.TransactionData;
import org.neo4j.graphdb.event.TransactionEventListenerAdapter;
public class AuditListener extends TransactionEventListenerAdapter<Void> {
@Override
public Void beforeCommit(TransactionData data, Transaction transaction,
GraphDatabaseService databaseService) {
// 审计日志:记录变更
for (Node node : data.createdNodes()) {
System.out.println("Created node: " + node.getElementId());
}
for (Node node : data.deletedNodes()) {
System.out.println("Deleted node: " + node.getElementId());
}
return null;
}
@Override
public void afterCommit(TransactionData data, Void state,
GraphDatabaseService databaseService) {
System.out.println("Transaction committed successfully");
}
@Override
public void afterRollback(TransactionData data, Void state,
GraphDatabaseService databaseService) {
System.out.println("Transaction rolled back");
}
}
// 注册监听器
db.registerTransactionEventListener(new AuditListener());
模块总结
Neo4j 的 Kernel 模块是整个数据库的核心,负责事务生命周期管理、并发控制、数据访问 API 的实现。其设计充分体现了以下特点:
- 高性能优化:对象池化、游标复用、批量提交、页面预取
- ACID 保证:WAL 持久化、两阶段提交、实体级锁
- 可扩展性:事务监听器、自定义存储过程、插件化索引提供者
- 可观测性:详细的事务统计、监控指标、追踪信息
理解 Kernel 模块的实现细节,是深入掌握 Neo4j 源码的基础。后续模块(存储引擎、查询引擎、网络协议)均建立在 Kernel 提供的事务语义之上。