Neo4j-02-存储层模块-RecordStorage与PageCache
模块概览
职责与边界
核心职责
-
数据持久化
- 节点(Node)、关系(Relationship)、属性(Property)的定长记录存储
- 动态记录存储(长字符串、大数组、标签列表)
- Token 映射(标签/关系类型/属性键的字符串与整数映射)
-
Page Cache 管理
- 固定大小页面(默认 8KB)的统一 I/O 抽象
- LRU 驱逐策略与访问热度追踪
- Dirty Page 刷新与 Checkpoint 机制
- 页面锁管理(读锁/写锁)
-
Write-Ahead Log(WAL)
- 事务日志顺序写入
- 崩溃恢复(Crash Recovery)
- 日志轮转与归档
- Checkpoint 协调
-
存储格式定义
- Record Format 抽象(支持多种格式版本)
- 定长记录编码/解码
- 版本演进与向后兼容
输入/输出
输入
- StorageCommand 列表(来自 Kernel 层事务提交)
- 读取请求(来自 Kernel 层 Read API)
- Checkpoint 触发(来自 CheckpointScheduler)
输出
- 存储文件(.db 文件,14 种类型)
- 事务日志文件(neostore.transaction.db.xxx)
- ID 文件(.id 文件,记录下一个可用 ID)
- 崩溃恢复结果(LogPosition、RecoveredTransactions)
上下游依赖
上游依赖(调用者)
kernel模块(事务提交、数据读写)cypher模块(查询执行)import-tool模块(批量导入)
下游依赖(被调用)
io模块(PageCache、FileSystem)id-generator模块(IdGeneratorFactory)lock模块(LockService)configuration模块(Config)
生命周期
- 启动阶段:
RecordStorageEngine.init()→ 打开 NeoStores、恢复 WAL - 运行时:
RecordStorageEngine.start()→ 启动 Checkpoint 线程 - 事务应用:
RecordStorageEngine.apply(batch, mode)→ 更新存储页面 - Checkpoint:
RecordStorageEngine.checkpoint(flushEvent, context)→ 刷新所有 Dirty Pages - 关闭阶段:
RecordStorageEngine.shutdown()→ 最后一次 Checkpoint、关闭文件
模块架构图
flowchart TB
subgraph KernelLayer["Kernel 层"]
KTx[KernelTransaction]
CommitProc[TransactionCommitProcess]
end
subgraph StorageEngine["Storage Engine"]
RecordEngine[RecordStorageEngine]
StorageReader[StorageReader]
CommandCreator[CommandCreationContext]
subgraph WALSystem["WAL 系统"]
TxLog[TransactionLog]
LogAppender[LogAppender]
LogRotation[LogRotation]
Recovery[LogRecovery]
end
end
subgraph NeoStoresLayer["NeoStores 层"]
NeoStores[NeoStores<br/>存储容器]
subgraph CoreStores["核心存储"]
NodeStore[NodeStore<br/>节点存储]
RelStore[RelationshipStore<br/>关系存储]
PropStore[PropertyStore<br/>属性存储]
RelGroupStore[RelationshipGroupStore<br/>关系组存储]
end
subgraph TokenStores["Token 存储"]
LabelTokenStore[LabelTokenStore]
RelTypeTokenStore[RelTypeTokenStore]
PropKeyTokenStore[PropKeyTokenStore]
end
subgraph DynamicStores["动态存储"]
PropStringStore[PropertyStringStore<br/>长字符串]
PropArrayStore[PropertyArrayStore<br/>大数组]
NodeLabelStore[NodeLabelStore<br/>标签列表]
end
subgraph MetaStores["元数据存储"]
SchemaStore[SchemaStore<br/>模式定义]
MetaDataStore[MetaDataStore<br/>数据库元信息]
end
end
subgraph PageCacheLayer["Page Cache 层"]
PageCache[PageCache<br/>页面缓存]
PagedFile[PagedFile<br/>文件映射]
PageCursor[PageCursor<br/>页面游标]
subgraph CacheManagement["缓存管理"]
EvictionThread[EvictionThread<br/>驱逐线程]
PageFaulter[PageFaulter<br/>缺页处理]
DirtyTracker[DirtyTracker<br/>脏页追踪]
end
end
subgraph IOLayer["I/O 层"]
FileSystem[FileSystem<br/>文件系统抽象]
PageSwapper[PageSwapper<br/>页面交换器]
DiskIO[(磁盘存储)]
end
subgraph Support["支撑组件"]
IdGen[IdGeneratorFactory<br/>ID 生成器]
RecordFormat[RecordFormats<br/>存储格式]
CheckpointScheduler[CheckpointScheduler<br/>Checkpoint 调度]
end
%% 连接关系
KTx --> CommitProc
CommitProc --> RecordEngine
RecordEngine --> NeoStores
RecordEngine --> WALSystem
RecordEngine --> CommandCreator
TxLog --> LogAppender
TxLog --> Recovery
LogAppender --> PageCache
NeoStores --> CoreStores
NeoStores --> TokenStores
NeoStores --> DynamicStores
NeoStores --> MetaStores
NodeStore --> PageCache
RelStore --> PageCache
PropStore --> PageCache
RelGroupStore --> PageCache
PageCache --> PagedFile
PagedFile --> PageCursor
PageCache --> CacheManagement
PageCursor --> PageSwapper
PageSwapper --> FileSystem
FileSystem --> DiskIO
NeoStores --> IdGen
NeoStores --> RecordFormat
RecordEngine --> CheckpointScheduler
架构说明
1. Storage Engine 层
- RecordStorageEngine:存储引擎主入口,实现
StorageEngine接口 - StorageReader:提供只读快照,支持事务隔离
- CommandCreationContext:将 TxState 转换为 StorageCommand
2. NeoStores 层(14 种存储文件)
核心存储
- NodeStore:节点记录(ID、标签字段、第一个属性/关系指针)
- 文件:
neostore.nodestore.db - 记录大小:15 字节(定长)
- 文件:
- RelationshipStore:关系记录(ID、类型、起点/终点、属性指针)
- 文件:
neostore.relationshipstore.db - 记录大小:34 字节(定长)
- 文件:
- PropertyStore:属性记录(ID、类型、值/指针、下一个属性指针)
- 文件:
neostore.propertystore.db - 记录大小:41 字节(定长)
- 文件:
- RelationshipGroupStore:关系组(Dense 节点优化)
- 文件:
neostore.relationshipgroupstore.db - 记录大小:25 字节(定长)
- 文件:
Token 存储
- LabelTokenStore:标签 ID → 标签名
- RelTypeTokenStore:关系类型 ID → 类型名
- PropKeyTokenStore:属性键 ID → 键名
动态存储
- PropertyStringStore:长字符串(> 64 字节)
- PropertyArrayStore:大数组(> 64 字节)
- NodeLabelStore:节点标签列表(> 5 个标签)
元数据存储
- SchemaStore:索引与约束定义
- MetaDataStore:数据库版本、创建时间、Store ID
3. Page Cache 层
核心组件
- PageCache:页面缓存主控制器,管理所有映射文件
- PagedFile:单个文件的页面映射,持有 PageSwapper
- PageCursor:页面访问游标,提供
getByte/putByte/getLong/putLong等方法
缓存管理
- EvictionThread:后台线程,定期扫描页面访问热度并驱逐冷页面
- PageFaulter:处理缺页中断(Page Fault),从磁盘加载页面到内存
- DirtyTracker:追踪脏页面(已修改但未刷新到磁盘)
4. WAL 系统
组件
- TransactionLog:事务日志管理器,维护日志文件列表
- LogAppender:追加事务命令到当前日志文件
- LogRotation:日志文件轮转(达到阈值时创建新文件)
- LogRecovery:崩溃恢复,重放未应用的事务
核心数据结构
UML 类图
classDiagram
class NodeRecord {
+long id
+long nextProp
+long nextRel
+long labels
+boolean dense
+List~DynamicRecord~ dynamicLabelRecords
+initialize(inUse, nextProp, dense, nextRel, labels)
+getNextRel() long
+setNextRel(nextRel)
+getLabels() long
+isDense() boolean
}
class RelationshipRecord {
+long id
+long firstNode
+long secondNode
+int type
+long firstPrevRel
+long firstNextRel
+long secondPrevRel
+long secondNextRel
+long nextProp
+boolean firstInFirstChain
+boolean firstInSecondChain
+initialize(...)
+getFirstNode() long
+getSecondNode() long
+getType() int
}
class PropertyRecord {
+long id
+long nextProp
+List~PropertyBlock~ propertyBlocks
+addPropertyBlock(PropertyBlock)
+getPropertyBlock(int) PropertyBlock
+numberOfProperties() int
}
class PropertyBlock {
+int keyId
+PropertyType type
+long[] valueBlocks
+List~DynamicRecord~ valueRecords
+getSingleValueLong() long
+getSingleValueInt() int
+getValueRecords() List~DynamicRecord~
}
class DynamicRecord {
+long id
+byte[] data
+long nextBlock
+int length
+boolean startRecord
+getData() byte[]
+getNextBlock() long
+isStartRecord() boolean
}
class PageCursor {
<<interface>>
+next() boolean
+next(long pageId) boolean
+shouldRetry() boolean
+getByte() byte
+getByte(int offset) byte
+putByte(byte value)
+putByte(int offset, byte value)
+getLong() long
+putLong(long value)
+getOffset() int
+setOffset(int offset)
}
class PagedFile {
<<interface>>
+io(long pageId, int flags, CursorContext) PageCursor
+pageSize() int
+fileSize() long
+flushAndForce() void
+close() void
}
class StorageCommand {
<<interface>>
+handle(CommandVisitor) void
}
class NodeCommand {
-NodeRecord before
-NodeRecord after
+getBefore() NodeRecord
+getAfter() NodeRecord
+handle(CommandVisitor) void
}
class RelationshipCommand {
-RelationshipRecord before
-RelationshipRecord after
+getBefore() RelationshipRecord
+getAfter() RelationshipRecord
}
PropertyRecord --> PropertyBlock
PropertyBlock --> DynamicRecord
NodeRecord --> DynamicRecord
StorageCommand <|-- NodeCommand
StorageCommand <|-- RelationshipCommand
NodeCommand --> NodeRecord
RelationshipCommand --> RelationshipRecord
PagedFile --> PageCursor
字段说明
NodeRecord(节点记录)
| 字段 | 类型 | 大小 | 说明 |
|---|---|---|---|
id |
long |
8 字节 | 节点唯一 ID |
nextProp |
long |
5 字节(压缩) | 第一个属性记录的 ID(链表头) |
nextRel |
long |
5 字节(压缩) | 第一个关系记录的 ID(链表头) |
labels |
long |
5 字节(压缩) | 内联标签或动态记录 ID |
dense |
boolean |
1 位(标志) | 是否为 Dense 节点(关系数 > 阈值) |
dynamicLabelRecords |
List<DynamicRecord> |
- | 标签列表动态记录(> 5 个标签时使用) |
Dense 节点优化
- 普通节点:关系链表直接链接到节点
- Dense 节点(关系数 > 50):使用 RelationshipGroupStore 按类型分组,减少遍历开销
RelationshipRecord(关系记录)
| 字段 | 类型 | 大小 | 说明 |
|---|---|---|---|
id |
long |
8 字节 | 关系唯一 ID |
firstNode |
long |
5 字节(压缩) | 起点节点 ID |
secondNode |
long |
5 字节(压缩) | 终点节点 ID |
type |
int |
3 字节(压缩) | 关系类型 ID |
firstPrevRel |
long |
5 字节(压缩) | firstNode 的上一个关系 ID |
firstNextRel |
long |
5 字节(压缩) | firstNode 的下一个关系 ID |
secondPrevRel |
long |
5 字节(压缩) | secondNode 的上一个关系 ID |
secondNextRel |
long |
5 字节(压缩) | secondNode 的下一个关系 ID |
nextProp |
long |
5 字节(压缩) | 第一个属性记录的 ID |
双向链表结构
- 每个节点维护其所有关系的双向链表
- 遍历节点关系时,无需扫描全局关系表
PropertyRecord(属性记录)
| 字段 | 类型 | 说明 |
|---|---|---|
id |
long |
属性记录唯一 ID |
nextProp |
long |
下一个属性记录 ID(链表) |
propertyBlocks |
List<PropertyBlock> |
属性块列表(最多 4 个) |
属性块(PropertyBlock)
- 每个 PropertyBlock 存储一个键值对
- 小值(< 8 字节)内联存储
- 大值(字符串/数组)存储到动态记录
PropertyBlock(属性块)
| 字段 | 类型 | 说明 |
|---|---|---|
keyId |
int |
属性键 ID(Token) |
type |
PropertyType |
属性类型(INT、STRING、ARRAY 等) |
valueBlocks |
long[] |
值存储(内联或指针) |
valueRecords |
List<DynamicRecord> |
动态记录(长字符串/大数组) |
属性类型
enum PropertyType {
BOOL, // 1 字节
BYTE, // 1 字节
SHORT, // 2 字节
INT, // 4 字节
LONG, // 8 字节
FLOAT, // 4 字节
DOUBLE, // 8 字节
SHORT_STRING, // 内联字符串(< 64 字节)
STRING, // 长字符串(动态记录)
SHORT_ARRAY, // 内联数组(< 64 字节)
ARRAY // 大数组(动态记录)
}
核心 API 详解
API 1:apply(应用存储命令)
基本信息
- 接口名称:
RecordStorageEngine.apply - 方法签名:
void apply(StorageEngineTransaction batch, TransactionApplicationMode mode) - 幂等性:否(重复应用会导致数据错误)
请求参数
public interface StorageEngineTransaction extends AutoCloseable {
// 获取事务ID
long getTransactionId();
// 获取命令列表
List<StorageCommand> commands();
// 获取追加索引(日志位置)
long getAppendIndex();
}
public enum TransactionApplicationMode {
// 内部应用(事务提交)
INTERNAL,
// 外部应用(WAL 恢复)
EXTERNAL,
// 恢复应用(崩溃恢复)
RECOVERY,
// 反向应用(回滚)
REVERSE
}
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
batch |
StorageEngineTransaction |
是 | 包含 StorageCommand 列表的事务批次 |
mode |
TransactionApplicationMode |
是 | 应用模式(INTERNAL/EXTERNAL/RECOVERY/REVERSE) |
核心代码
// community/record-storage-engine/src/main/java/org/neo4j/internal/recordstorage/RecordStorageEngine.java
public void apply(StorageEngineTransaction batch, TransactionApplicationMode mode) throws IOException {
// 1. 获取批次锁(串行化写操作)
lockService.acquireShared(
LockTracer.NONE,
ResourceTypes.SCHEMA,
ResourceIds.schemaResource()
);
try {
// 2. 应用命令到存储
for (StorageCommand command : batch.commands()) {
// 访问者模式:每种命令类型有不同的处理逻辑
command.handle(new CommandHandler() {
@Override
public void visitNodeCommand(NodeCommand command) {
// 应用节点命令
applyNodeCommand(command, mode);
}
@Override
public void visitRelationshipCommand(RelationshipCommand command) {
// 应用关系命令
applyRelationshipCommand(command, mode);
}
@Override
public void visitPropertyCommand(PropertyCommand command) {
// 应用属性命令
applyPropertyCommand(command, mode);
}
// ... 其他命令类型
});
}
// 3. 更新索引(如果是内部应用)
if (mode == TransactionApplicationMode.INTERNAL) {
indexingService.apply(batch.commands());
}
// 4. 更新统计信息
if (mode != TransactionApplicationMode.REVERSE) {
countsStore.apply(batch.getTransactionId(), batch.commands());
}
} finally {
// 5. 释放锁
lockService.releaseShared(
ResourceTypes.SCHEMA,
ResourceIds.schemaResource()
);
}
}
private void applyNodeCommand(NodeCommand command, TransactionApplicationMode mode) {
NodeRecord after = command.getAfter();
try (PageCursor cursor = pagedFile.io(
nodeStore.pageIdForRecord(after.getId()),
PagedFile.PF_SHARED_WRITE_LOCK,
cursorContext
)) {
if (cursor.next()) {
do {
// 将 NodeRecord 编码到页面
nodeStore.updateRecord(after, cursor, cursorContext, storeCursors);
} while (cursor.shouldRetry());
}
}
}
代码说明
- 批次锁:保证多个事务的命令串行应用
- 访问者模式:每种命令类型(Node/Relationship/Property)有专门的处理器
- 页面写入:通过 PageCursor 直接修改页面内容
- 索引更新:通知 IndexingService 更新索引
- 统计更新:更新节点/关系计数
API 2:PageCursor 读写操作
基本信息
- 接口名称:
PagedFile.io - 方法签名:
PageCursor io(long pageId, int flags, CursorContext context) - 幂等性:是(多次调用返回不同的游标实例)
请求参数
// PagedFile 标志位
public static final int PF_SHARED_READ_LOCK = 0x01; // 共享读锁
public static final int PF_SHARED_WRITE_LOCK = 0x02; // 共享写锁
public static final int PF_NO_GROW = 0x04; // 不扩展文件
public static final int PF_NO_FAULT = 0x08; // 不触发缺页
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
pageId |
long |
是 | 页面 ID(文件内的逻辑页号) |
flags |
int |
是 | 访问标志(读锁/写锁/不扩展等) |
context |
CursorContext |
是 | 游标上下文(用于追踪) |
核心读写模式
模式 1:读取节点记录
// 读取节点记录
public NodeRecord readNode(long nodeId) {
long pageId = nodeStore.pageIdForRecord(nodeId);
int offset = nodeStore.offsetForId(nodeId);
try (PageCursor cursor = pagedFile.io(
pageId,
PagedFile.PF_SHARED_READ_LOCK,
cursorContext
)) {
if (cursor.next()) {
do {
// 定位到记录位置
cursor.setOffset(offset);
// 读取节点记录字段
byte headerByte = cursor.getByte();
boolean inUse = (headerByte & 0x01) != 0;
boolean dense = (headerByte & 0x02) != 0;
long nextRel = cursor.getLong();
long nextProp = cursor.getLong();
long labels = cursor.getLong();
// 构造 NodeRecord
return new NodeRecord(nodeId)
.initialize(inUse, nextProp, dense, nextRel, labels);
} while (cursor.shouldRetry());
}
}
return null;
}
模式 2:写入关系记录
// 写入关系记录
public void writeRelationship(RelationshipRecord record) {
long pageId = relationshipStore.pageIdForRecord(record.getId());
int offset = relationshipStore.offsetForId(record.getId());
try (PageCursor cursor = pagedFile.io(
pageId,
PagedFile.PF_SHARED_WRITE_LOCK,
cursorContext
)) {
if (cursor.next()) {
do {
// 定位到记录位置
cursor.setOffset(offset);
// 写入关系记录字段
byte headerByte = 0;
headerByte |= record.inUse() ? 0x01 : 0x00;
headerByte |= record.isFirstInFirstChain() ? 0x02 : 0x00;
headerByte |= record.isFirstInSecondChain() ? 0x04 : 0x00;
cursor.putByte(headerByte);
cursor.putLong(record.getFirstNode());
cursor.putLong(record.getSecondNode());
cursor.putInt(record.getType());
cursor.putLong(record.getFirstPrevRel());
cursor.putLong(record.getFirstNextRel());
cursor.putLong(record.getSecondPrevRel());
cursor.putLong(record.getSecondNextRel());
cursor.putLong(record.getNextProp());
// 标记页面为脏页
cursor.setDirty();
} while (cursor.shouldRetry());
}
}
}
shouldRetry 机制说明
- PageCursor 采用乐观并发控制
- 读取时不阻塞写入,但需检查
shouldRetry() - 如果页面在读取过程中被修改,
shouldRetry()返回true - 重试机制保证读取一致性快照
API 3:TransactionLog.append(追加事务日志)
基本信息
- 接口名称:
TransactionAppender.append - 方法签名:
long append(List<StorageCommand> commands, TransactionChunkAppender appender) - 幂等性:否(重复追加会导致日志重复)
请求参数
public interface TransactionChunkAppender {
// 开始新的事务块
ChunkWriter beginChunk();
// 提交事务块
void commitChunk(ChunkWriter writer) throws IOException;
}
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
commands |
List<StorageCommand> |
是 | 存储命令列表 |
appender |
TransactionChunkAppender |
是 | 事务块追加器 |
核心代码
// community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/TransactionLogAppender.java
public long append(List<StorageCommand> commands, TransactionChunkAppender appender) throws IOException {
// 1. 获取写锁(串行化日志写入)
appendLock.lock();
try {
// 2. 开始新的事务块
ChunkWriter writer = appender.beginChunk();
// 3. 写入事务头
writer.putByte(TX_START);
writer.putInt(kernelVersion.version());
writer.putLong(transactionId);
writer.putLong(commitTime);
writer.putLong(latestCommittedTxWhenStarted);
// 4. 序列化命令
for (StorageCommand command : commands) {
// 写入命令类型标记
writer.putByte(commandType(command));
// 写入命令数据
command.serialize(writer);
}
// 5. 写入事务尾
writer.putByte(TX_END);
// 6. 提交事务块(刷新到日志文件)
appender.commitChunk(writer);
// 7. 检查是否需要日志轮转
if (channel.position() >= rotationThreshold) {
logRotation.rotateLogFile(LogAppendEvent.NULL);
}
// 8. 返回追加索引(日志位置)
return currentLogPosition.getByteOffset();
} finally {
appendLock.unlock();
}
}
日志格式
+----------------+
| TX_START (1B) |
+----------------+
| Version (4B) |
+----------------+
| TxId (8B) |
+----------------+
| CommitTime(8B) |
+----------------+
| Commands... |
+----------------+
| TX_END (1B) |
+----------------+
关键流程时序图
时序图 1:节点创建完整流程
sequenceDiagram
autonumber
participant KTx as KernelTransaction
participant Ops as Operations
participant TxState as TxState
participant Commit as CommitProcess
participant Engine as RecordStorageEngine
participant NeoStores as NeoStores
participant PageCache as PageCache
participant WAL as TransactionLog
KTx->>Ops: dataWrite().nodeCreate()
Ops->>Ops: 分配节点 ID
Note over Ops: commandCreationContext.reserveNode()
Ops->>TxState: nodeDoCreate(nodeId)
TxState->>TxState: 记录节点创建
KTx->>Commit: commit(commands)
Commit->>Engine: createCommands(txState, reader, ...)
Engine->>TxState: 遍历变更集
TxState-->>Engine: 返回新增节点列表
Engine->>Engine: 生成 NodeCommand
Note over Engine: NodeCommand{before=null, after=NodeRecord}
Engine-->>Commit: 返回 StorageCommand 列表
Commit->>WAL: append(commands)
WAL->>WAL: 序列化命令
WAL->>PageCache: 写入日志页面
PageCache->>PageCache: 标记 Dirty Page
WAL-->>Commit: 返回 TransactionId
Commit->>Engine: apply(batch, INTERNAL)
Engine->>NeoStores: getNodeStore()
NeoStores-->>Engine: NodeStore 实例
Engine->>PageCache: io(pageId, PF_SHARED_WRITE_LOCK)
PageCache-->>Engine: PageCursor 实例
Engine->>Engine: cursor.next()
Engine->>Engine: cursor.setOffset(offset)
Engine->>Engine: 编码 NodeRecord 到页面
Note over Engine: cursor.putByte(headerByte)<br/>cursor.putLong(nextRel)<br/>cursor.putLong(nextProp)
Engine->>Engine: cursor.setDirty()
Engine->>Engine: cursor.close()
Note over PageCache: Checkpoint 线程<br/>定期刷新 Dirty Pages
PageCache->>PageCache: flushAndForce()
PageCache->>PageCache: 遍历 Dirty Pages
PageCache->>PageCache: pageSwapper.write(page, fileOffset)
时序图说明
节点创建阶段(步骤 1-4)
- ID 分配:从 IdGeneratorFactory 获取下一个可用节点 ID
- TxState 记录:在内存中记录节点创建,提交前对其他事务不可见
命令生成阶段(步骤 5-10)
- 遍历 TxState:提取所有变更(新增节点/关系/属性)
- 生成 NodeCommand:包含 before(null)和 after(NodeRecord)两个状态
- 序列化命令:将命令列表转换为字节流
WAL 写入阶段(步骤 11-15)
- 串行写入:通过 appendLock 保证日志顺序写入
- 批量刷新:Group Commit 机制合并多个事务的日志写入
- 页面标记:写入的日志页面标记为 Dirty
存储应用阶段(步骤 16-26)
- 批次锁:获取存储锁,防止并发写入冲突
- 页面定位:计算节点记录所在的页面 ID 与偏移量
- 编码写入:将 NodeRecord 编码为字节并写入页面
- 脏页标记:写入的页面标记为 Dirty,等待 Checkpoint 刷新
Checkpoint 刷新(步骤 27-30)
- 后台线程:CheckpointScheduler 定期触发 Checkpoint
- 刷新所有 Dirty Pages:遍历所有映射文件,刷新脏页面到磁盘
- fsync 保证:调用
fileChannel.force(true)保证持久化
时序图 2:PageCache 缺页处理
sequenceDiagram
autonumber
participant Cursor as PageCursor
participant PagedFile as PagedFile
participant Cache as PageCache
participant Faulter as PageFaulter
participant Evictor as EvictionThread
participant Swapper as PageSwapper
participant Disk as 磁盘
Cursor->>PagedFile: next(pageId)
PagedFile->>PagedFile: 查找页面映射表
PagedFile->>PagedFile: 页面未在内存中
PagedFile->>Cache: acquirePage(pageId)
Cache->>Cache: 查找空闲页面
Cache->>Cache: 无空闲页面
Cache->>Evictor: 触发驱逐
Evictor->>Evictor: 扫描页面访问计数
Evictor->>Evictor: 找到访问计数为 0 的页面
alt 页面是脏页
Evictor->>Swapper: write(page, fileOffset)
Swapper->>Disk: 写入磁盘
Disk-->>Swapper: 写入完成
end
Evictor->>Cache: 标记页面为空闲
Cache-->>PagedFile: 返回空闲页面
PagedFile->>Faulter: fault(page, pageId)
Faulter->>Swapper: read(pageId)
Swapper->>Disk: 读取页面数据
Disk-->>Swapper: 返回页面数据
Swapper-->>Faulter: 填充页面
Faulter->>PagedFile: 更新页面映射表
PagedFile-->>Cursor: 返回已绑定页面的游标
Cursor->>Cursor: 读取页面数据
缺页处理说明
页面查找(步骤 1-4)
- 映射表查找:PagedFile 维护 pageId → Page 的映射表
- 未命中:如果页面不在内存中,触发缺页处理
驱逐流程(步骤 5-11)
- 空闲页面查找:优先使用空闲页面列表
- 驱逐策略:扫描所有页面,选择访问计数为 0 的页面
- 脏页刷新:如果被驱逐的页面是脏页,先刷新到磁盘
页面加载(步骤 12-18)
- 从磁盘读取:通过 PageSwapper 读取指定页面
- 更新映射表:将新加载的页面加入映射表
- 返回游标:返回已绑定到页面的 PageCursor
存储文件组织
存储文件列表
| 文件名 | 存储内容 | 记录大小 | 说明 |
|---|---|---|---|
neostore.nodestore.db |
节点记录 | 15 字节 | 节点ID、标签、第一个属性/关系指针 |
neostore.relationshipstore.db |
关系记录 | 34 字节 | 关系ID、类型、起点/终点、双向链表指针 |
neostore.propertystore.db |
属性记录 | 41 字节 | 属性块(最多4个键值对) |
neostore.propertystore.db.strings |
长字符串 | 变长 | 字符串动态记录 |
neostore.propertystore.db.arrays |
大数组 | 变长 | 数组动态记录 |
neostore.relationshipgroupstore.db |
关系组 | 25 字节 | Dense 节点的关系分组 |
neostore.labeltokenstore.db |
标签 Token | 9 字节 | 标签ID → 标签名指针 |
neostore.relationshiptypestore.db |
关系类型 Token | 9 字节 | 类型ID → 类型名指针 |
neostore.propertystore.db.index.keys |
属性键 Token | 9 字节 | 键ID → 键名指针 |
neostore.schemastore.db |
模式定义 | 变长 | 索引与约束定义 |
neostore |
元数据 | 变长 | 数据库版本、创建时间、Store ID |
neostore.transaction.db.0 |
事务日志 | 变长 | 事务命令序列化 |
neostore.counts.db |
统计信息 | 变长 | 节点/关系计数、标签分布 |
*.id |
ID 生成器 | 变长 | 下一个可用ID、空闲ID列表 |
文件布局示例
/path/to/neo4j-db/
├── neostore
├── neostore.nodestore.db
├── neostore.nodestore.db.id
├── neostore.relationshipstore.db
├── neostore.relationshipstore.db.id
├── neostore.propertystore.db
├── neostore.propertystore.db.id
├── neostore.propertystore.db.strings
├── neostore.propertystore.db.strings.id
├── neostore.propertystore.db.arrays
├── neostore.propertystore.db.arrays.id
├── neostore.relationshipgroupstore.db
├── neostore.relationshipgroupstore.db.id
├── neostore.schemastore.db
├── neostore.labeltokenstore.db
├── neostore.relationshiptypestore.db
├── neostore.propertystore.db.index.keys
├── neostore.transaction.db.0
├── neostore.transaction.db.1
├── neostore.transaction.db.2
└── neostore.counts.db
最佳实践与性能优化
实践 1:Page Cache 大小配置
问题:Page Cache 过小导致频繁缺页
# ❌ 低效配置:仅 512MB
server.memory.pagecache.size=512M
优化方案:分配物理内存的 50-70%
# ✅ 推荐配置:8GB 物理内存 → 4-5GB Page Cache
server.memory.pagecache.size=4G
# 堆内存配置(剩余内存分配给堆)
server.memory.heap.initial_size=2G
server.memory.heap.max_size=2G
监控指标
# 查看 Page Cache 命中率
neo4j-admin server report --verbose
# 目标命中率:> 90%
# 如果 < 90%,增加 Page Cache 大小或优化查询
实践 2:Dense 节点优化
问题:超级节点(关系数 > 10 万)遍历缓慢
// ❌ 低效查询:全关系扫描
MATCH (user:User {id: 1})-[r]->(friend)
RETURN count(friend)
优化方案 1:按关系类型过滤
// ✅ 高效查询:仅扫描 FOLLOWS 关系
MATCH (user:User {id: 1})-[r:FOLLOWS]->(friend)
RETURN count(friend)
优化方案 2:配置 Dense 节点阈值
# 降低 Dense 节点阈值(默认 50)
db.relationship_grouping_threshold=20
Dense 节点判断
- 关系数 ≤ 阈值:普通节点,关系链表直接挂在节点上
- 关系数 > 阈值:Dense 节点,使用 RelationshipGroupStore 分组
实践 3:批量导入优化
问题:逐条插入性能低下
// ❌ 低效实现:每条执行一次事务
UNWIND $data AS row
CREATE (n:Person {name: row.name, age: row.age})
优化方案:使用 neo4j-admin database import
# ✅ 批量导入工具(离线)
neo4j-admin database import full \
--nodes=Person=persons.csv \
--relationships=KNOWS=knows.csv \
--overwrite-destination \
neo4j
# 性能提升:100 倍以上
# 原理:绕过事务层,直接写入存储文件
在线批量导入
// 批量大小:10000
:auto USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM 'file:///persons.csv' AS row
CREATE (n:Person {name: row.name, age: toInteger(row.age)})
实践 4:Checkpoint 配置优化
问题:Checkpoint 间隔过长,崩溃恢复时间长
# ❌ 低效配置:30 分钟一次 Checkpoint
db.checkpoint.interval.time=30m
优化方案:缩短 Checkpoint 间隔
# ✅ 推荐配置:5 分钟一次 Checkpoint
db.checkpoint.interval.time=5m
# 或基于事务日志大小触发
db.checkpoint.interval.tx=10000
权衡说明
- 频繁 Checkpoint:减少崩溃恢复时间,但增加 I/O 开销
- 稀疏 Checkpoint:减少 I/O 开销,但增加崩溃恢复时间
框架使用示例
示例 1:直接访问 Page Cache
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.io.pagecache.PageCursor;
public class PageCacheExample {
public void readNodeRecord(PageCache pageCache, Path nodeStoreFile) throws IOException {
// 1. 映射文件到 Page Cache
try (PagedFile pagedFile = pageCache.map(
nodeStoreFile,
8192, // 页面大小
"neo4j",
ImmutableSet.of(StandardOpenOption.READ)
)) {
// 2. 创建游标
try (PageCursor cursor = pagedFile.io(
0, // 页面 ID
PagedFile.PF_SHARED_READ_LOCK,
cursorContext
)) {
// 3. 读取页面
if (cursor.next()) {
do {
// 读取节点记录
byte headerByte = cursor.getByte();
boolean inUse = (headerByte & 0x01) != 0;
if (inUse) {
long nextRel = cursor.getLong();
long nextProp = cursor.getLong();
System.out.println("Node: nextRel=" + nextRel + ", nextProp=" + nextProp);
}
} while (cursor.shouldRetry());
}
}
}
}
}
示例 2:自定义存储扩展
import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.storageengine.api.StorageEngineFactory;
@ServiceProvider
public class CustomStorageEngineFactory implements StorageEngineFactory {
@Override
public String name() {
return "custom-storage";
}
@Override
public StorageEngine instantiate(
FileSystemAbstraction fs,
DatabaseLayout layout,
Config config,
PageCache pageCache,
TokenHolders tokenHolders,
...
) {
// 返回自定义存储引擎实现
return new CustomStorageEngine(...);
}
}
模块总结
Neo4j 的存储层模块是数据库性能的基石,通过以下设计实现高效持久化:
- 原生图存储:节点/关系/属性直接映射到定长记录,无需 JOIN 操作
- Page Cache 抽象:统一 I/O 接口,支持多种存储后端(本地磁盘、S3等)
- WAL 机制:保证 ACID 事务的持久性与崩溃恢复能力
- 链式存储:关系链表减少索引开销,遍历性能优异
理解存储层的实现细节,是进行性能调优、扩展开发、故障诊断的前提。