Neo4j-02-存储层模块-RecordStorage与PageCache


模块概览

职责与边界

核心职责

  1. 数据持久化

    • 节点(Node)、关系(Relationship)、属性(Property)的定长记录存储
    • 动态记录存储(长字符串、大数组、标签列表)
    • Token 映射(标签/关系类型/属性键的字符串与整数映射)
  2. Page Cache 管理

    • 固定大小页面(默认 8KB)的统一 I/O 抽象
    • LRU 驱逐策略与访问热度追踪
    • Dirty Page 刷新与 Checkpoint 机制
    • 页面锁管理(读锁/写锁)
  3. Write-Ahead Log(WAL)

    • 事务日志顺序写入
    • 崩溃恢复(Crash Recovery)
    • 日志轮转与归档
    • Checkpoint 协调
  4. 存储格式定义

    • Record Format 抽象(支持多种格式版本)
    • 定长记录编码/解码
    • 版本演进与向后兼容

输入/输出

输入

  • StorageCommand 列表(来自 Kernel 层事务提交)
  • 读取请求(来自 Kernel 层 Read API)
  • Checkpoint 触发(来自 CheckpointScheduler)

输出

  • 存储文件(.db 文件,14 种类型)
  • 事务日志文件(neostore.transaction.db.xxx)
  • ID 文件(.id 文件,记录下一个可用 ID)
  • 崩溃恢复结果(LogPosition、RecoveredTransactions)

上下游依赖

上游依赖(调用者)

  • kernel 模块(事务提交、数据读写)
  • cypher 模块(查询执行)
  • import-tool 模块(批量导入)

下游依赖(被调用)

  • io 模块(PageCache、FileSystem)
  • id-generator 模块(IdGeneratorFactory)
  • lock 模块(LockService)
  • configuration 模块(Config)

生命周期

  1. 启动阶段RecordStorageEngine.init() → 打开 NeoStores、恢复 WAL
  2. 运行时RecordStorageEngine.start() → 启动 Checkpoint 线程
  3. 事务应用RecordStorageEngine.apply(batch, mode) → 更新存储页面
  4. CheckpointRecordStorageEngine.checkpoint(flushEvent, context) → 刷新所有 Dirty Pages
  5. 关闭阶段RecordStorageEngine.shutdown() → 最后一次 Checkpoint、关闭文件

模块架构图

flowchart TB
    subgraph KernelLayer["Kernel 层"]
        KTx[KernelTransaction]
        CommitProc[TransactionCommitProcess]
    end
    
    subgraph StorageEngine["Storage Engine"]
        RecordEngine[RecordStorageEngine]
        StorageReader[StorageReader]
        CommandCreator[CommandCreationContext]
        
        subgraph WALSystem["WAL 系统"]
            TxLog[TransactionLog]
            LogAppender[LogAppender]
            LogRotation[LogRotation]
            Recovery[LogRecovery]
        end
    end
    
    subgraph NeoStoresLayer["NeoStores 层"]
        NeoStores[NeoStores<br/>存储容器]
        
        subgraph CoreStores["核心存储"]
            NodeStore[NodeStore<br/>节点存储]
            RelStore[RelationshipStore<br/>关系存储]
            PropStore[PropertyStore<br/>属性存储]
            RelGroupStore[RelationshipGroupStore<br/>关系组存储]
        end
        
        subgraph TokenStores["Token 存储"]
            LabelTokenStore[LabelTokenStore]
            RelTypeTokenStore[RelTypeTokenStore]
            PropKeyTokenStore[PropKeyTokenStore]
        end
        
        subgraph DynamicStores["动态存储"]
            PropStringStore[PropertyStringStore<br/>长字符串]
            PropArrayStore[PropertyArrayStore<br/>大数组]
            NodeLabelStore[NodeLabelStore<br/>标签列表]
        end
        
        subgraph MetaStores["元数据存储"]
            SchemaStore[SchemaStore<br/>模式定义]
            MetaDataStore[MetaDataStore<br/>数据库元信息]
        end
    end
    
    subgraph PageCacheLayer["Page Cache 层"]
        PageCache[PageCache<br/>页面缓存]
        PagedFile[PagedFile<br/>文件映射]
        PageCursor[PageCursor<br/>页面游标]
        
        subgraph CacheManagement["缓存管理"]
            EvictionThread[EvictionThread<br/>驱逐线程]
            PageFaulter[PageFaulter<br/>缺页处理]
            DirtyTracker[DirtyTracker<br/>脏页追踪]
        end
    end
    
    subgraph IOLayer["I/O 层"]
        FileSystem[FileSystem<br/>文件系统抽象]
        PageSwapper[PageSwapper<br/>页面交换器]
        DiskIO[(磁盘存储)]
    end
    
    subgraph Support["支撑组件"]
        IdGen[IdGeneratorFactory<br/>ID 生成器]
        RecordFormat[RecordFormats<br/>存储格式]
        CheckpointScheduler[CheckpointScheduler<br/>Checkpoint 调度]
    end
    
    %% 连接关系
    KTx --> CommitProc
    CommitProc --> RecordEngine
    
    RecordEngine --> NeoStores
    RecordEngine --> WALSystem
    RecordEngine --> CommandCreator
    
    TxLog --> LogAppender
    TxLog --> Recovery
    LogAppender --> PageCache
    
    NeoStores --> CoreStores
    NeoStores --> TokenStores
    NeoStores --> DynamicStores
    NeoStores --> MetaStores
    
    NodeStore --> PageCache
    RelStore --> PageCache
    PropStore --> PageCache
    RelGroupStore --> PageCache
    
    PageCache --> PagedFile
    PagedFile --> PageCursor
    PageCache --> CacheManagement
    
    PageCursor --> PageSwapper
    PageSwapper --> FileSystem
    FileSystem --> DiskIO
    
    NeoStores --> IdGen
    NeoStores --> RecordFormat
    RecordEngine --> CheckpointScheduler

架构说明

1. Storage Engine 层

  • RecordStorageEngine:存储引擎主入口,实现 StorageEngine 接口
  • StorageReader:提供只读快照,支持事务隔离
  • CommandCreationContext:将 TxState 转换为 StorageCommand

2. NeoStores 层(14 种存储文件)

核心存储

  • NodeStore:节点记录(ID、标签字段、第一个属性/关系指针)
    • 文件:neostore.nodestore.db
    • 记录大小:15 字节(定长)
  • RelationshipStore:关系记录(ID、类型、起点/终点、属性指针)
    • 文件:neostore.relationshipstore.db
    • 记录大小:34 字节(定长)
  • PropertyStore:属性记录(ID、类型、值/指针、下一个属性指针)
    • 文件:neostore.propertystore.db
    • 记录大小:41 字节(定长)
  • RelationshipGroupStore:关系组(Dense 节点优化)
    • 文件:neostore.relationshipgroupstore.db
    • 记录大小:25 字节(定长)

Token 存储

  • LabelTokenStore:标签 ID → 标签名
  • RelTypeTokenStore:关系类型 ID → 类型名
  • PropKeyTokenStore:属性键 ID → 键名

动态存储

  • PropertyStringStore:长字符串(> 64 字节)
  • PropertyArrayStore:大数组(> 64 字节)
  • NodeLabelStore:节点标签列表(> 5 个标签)

元数据存储

  • SchemaStore:索引与约束定义
  • MetaDataStore:数据库版本、创建时间、Store ID

3. Page Cache 层

核心组件

  • PageCache:页面缓存主控制器,管理所有映射文件
  • PagedFile:单个文件的页面映射,持有 PageSwapper
  • PageCursor:页面访问游标,提供 getByte/putByte/getLong/putLong 等方法

缓存管理

  • EvictionThread:后台线程,定期扫描页面访问热度并驱逐冷页面
  • PageFaulter:处理缺页中断(Page Fault),从磁盘加载页面到内存
  • DirtyTracker:追踪脏页面(已修改但未刷新到磁盘)

4. WAL 系统

组件

  • TransactionLog:事务日志管理器,维护日志文件列表
  • LogAppender:追加事务命令到当前日志文件
  • LogRotation:日志文件轮转(达到阈值时创建新文件)
  • LogRecovery:崩溃恢复,重放未应用的事务

核心数据结构

UML 类图

classDiagram
    class NodeRecord {
        +long id
        +long nextProp
        +long nextRel
        +long labels
        +boolean dense
        +List~DynamicRecord~ dynamicLabelRecords
        
        +initialize(inUse, nextProp, dense, nextRel, labels)
        +getNextRel() long
        +setNextRel(nextRel)
        +getLabels() long
        +isDense() boolean
    }
    
    class RelationshipRecord {
        +long id
        +long firstNode
        +long secondNode
        +int type
        +long firstPrevRel
        +long firstNextRel
        +long secondPrevRel
        +long secondNextRel
        +long nextProp
        +boolean firstInFirstChain
        +boolean firstInSecondChain
        
        +initialize(...)
        +getFirstNode() long
        +getSecondNode() long
        +getType() int
    }
    
    class PropertyRecord {
        +long id
        +long nextProp
        +List~PropertyBlock~ propertyBlocks
        
        +addPropertyBlock(PropertyBlock)
        +getPropertyBlock(int) PropertyBlock
        +numberOfProperties() int
    }
    
    class PropertyBlock {
        +int keyId
        +PropertyType type
        +long[] valueBlocks
        +List~DynamicRecord~ valueRecords
        
        +getSingleValueLong() long
        +getSingleValueInt() int
        +getValueRecords() List~DynamicRecord~
    }
    
    class DynamicRecord {
        +long id
        +byte[] data
        +long nextBlock
        +int length
        +boolean startRecord
        
        +getData() byte[]
        +getNextBlock() long
        +isStartRecord() boolean
    }
    
    class PageCursor {
        <<interface>>
        +next() boolean
        +next(long pageId) boolean
        +shouldRetry() boolean
        +getByte() byte
        +getByte(int offset) byte
        +putByte(byte value)
        +putByte(int offset, byte value)
        +getLong() long
        +putLong(long value)
        +getOffset() int
        +setOffset(int offset)
    }
    
    class PagedFile {
        <<interface>>
        +io(long pageId, int flags, CursorContext) PageCursor
        +pageSize() int
        +fileSize() long
        +flushAndForce() void
        +close() void
    }
    
    class StorageCommand {
        <<interface>>
        +handle(CommandVisitor) void
    }
    
    class NodeCommand {
        -NodeRecord before
        -NodeRecord after
        
        +getBefore() NodeRecord
        +getAfter() NodeRecord
        +handle(CommandVisitor) void
    }
    
    class RelationshipCommand {
        -RelationshipRecord before
        -RelationshipRecord after
        
        +getBefore() RelationshipRecord
        +getAfter() RelationshipRecord
    }
    
    PropertyRecord --> PropertyBlock
    PropertyBlock --> DynamicRecord
    NodeRecord --> DynamicRecord
    
    StorageCommand <|-- NodeCommand
    StorageCommand <|-- RelationshipCommand
    
    NodeCommand --> NodeRecord
    RelationshipCommand --> RelationshipRecord
    
    PagedFile --> PageCursor

字段说明

NodeRecord(节点记录)

字段 类型 大小 说明
id long 8 字节 节点唯一 ID
nextProp long 5 字节(压缩) 第一个属性记录的 ID(链表头)
nextRel long 5 字节(压缩) 第一个关系记录的 ID(链表头)
labels long 5 字节(压缩) 内联标签或动态记录 ID
dense boolean 1 位(标志) 是否为 Dense 节点(关系数 > 阈值)
dynamicLabelRecords List<DynamicRecord> - 标签列表动态记录(> 5 个标签时使用)

Dense 节点优化

  • 普通节点:关系链表直接链接到节点
  • Dense 节点(关系数 > 50):使用 RelationshipGroupStore 按类型分组,减少遍历开销

RelationshipRecord(关系记录)

字段 类型 大小 说明
id long 8 字节 关系唯一 ID
firstNode long 5 字节(压缩) 起点节点 ID
secondNode long 5 字节(压缩) 终点节点 ID
type int 3 字节(压缩) 关系类型 ID
firstPrevRel long 5 字节(压缩) firstNode 的上一个关系 ID
firstNextRel long 5 字节(压缩) firstNode 的下一个关系 ID
secondPrevRel long 5 字节(压缩) secondNode 的上一个关系 ID
secondNextRel long 5 字节(压缩) secondNode 的下一个关系 ID
nextProp long 5 字节(压缩) 第一个属性记录的 ID

双向链表结构

  • 每个节点维护其所有关系的双向链表
  • 遍历节点关系时,无需扫描全局关系表

PropertyRecord(属性记录)

字段 类型 说明
id long 属性记录唯一 ID
nextProp long 下一个属性记录 ID(链表)
propertyBlocks List<PropertyBlock> 属性块列表(最多 4 个)

属性块(PropertyBlock)

  • 每个 PropertyBlock 存储一个键值对
  • 小值(< 8 字节)内联存储
  • 大值(字符串/数组)存储到动态记录

PropertyBlock(属性块)

字段 类型 说明
keyId int 属性键 ID(Token)
type PropertyType 属性类型(INT、STRING、ARRAY 等)
valueBlocks long[] 值存储(内联或指针)
valueRecords List<DynamicRecord> 动态记录(长字符串/大数组)

属性类型

enum PropertyType {
    BOOL,           // 1 字节
    BYTE,           // 1 字节
    SHORT,          // 2 字节
    INT,            // 4 字节
    LONG,           // 8 字节
    FLOAT,          // 4 字节
    DOUBLE,         // 8 字节
    SHORT_STRING,   // 内联字符串(< 64 字节)
    STRING,         // 长字符串(动态记录)
    SHORT_ARRAY,    // 内联数组(< 64 字节)
    ARRAY           // 大数组(动态记录)
}

核心 API 详解

API 1:apply(应用存储命令)

基本信息

  • 接口名称RecordStorageEngine.apply
  • 方法签名void apply(StorageEngineTransaction batch, TransactionApplicationMode mode)
  • 幂等性:否(重复应用会导致数据错误)

请求参数

public interface StorageEngineTransaction extends AutoCloseable {
    // 获取事务ID
    long getTransactionId();
    
    // 获取命令列表
    List<StorageCommand> commands();
    
    // 获取追加索引(日志位置)
    long getAppendIndex();
}

public enum TransactionApplicationMode {
    // 内部应用(事务提交)
    INTERNAL,
    
    // 外部应用(WAL 恢复)
    EXTERNAL,
    
    // 恢复应用(崩溃恢复)
    RECOVERY,
    
    // 反向应用(回滚)
    REVERSE
}
参数 类型 必填 说明
batch StorageEngineTransaction 包含 StorageCommand 列表的事务批次
mode TransactionApplicationMode 应用模式(INTERNAL/EXTERNAL/RECOVERY/REVERSE)

核心代码

// community/record-storage-engine/src/main/java/org/neo4j/internal/recordstorage/RecordStorageEngine.java
public void apply(StorageEngineTransaction batch, TransactionApplicationMode mode) throws IOException {
    // 1. 获取批次锁(串行化写操作)
    lockService.acquireShared(
        LockTracer.NONE,
        ResourceTypes.SCHEMA,
        ResourceIds.schemaResource()
    );
    
    try {
        // 2. 应用命令到存储
        for (StorageCommand command : batch.commands()) {
            // 访问者模式:每种命令类型有不同的处理逻辑
            command.handle(new CommandHandler() {
                @Override
                public void visitNodeCommand(NodeCommand command) {
                    // 应用节点命令
                    applyNodeCommand(command, mode);
                }
                
                @Override
                public void visitRelationshipCommand(RelationshipCommand command) {
                    // 应用关系命令
                    applyRelationshipCommand(command, mode);
                }
                
                @Override
                public void visitPropertyCommand(PropertyCommand command) {
                    // 应用属性命令
                    applyPropertyCommand(command, mode);
                }
                
                // ... 其他命令类型
            });
        }
        
        // 3. 更新索引(如果是内部应用)
        if (mode == TransactionApplicationMode.INTERNAL) {
            indexingService.apply(batch.commands());
        }
        
        // 4. 更新统计信息
        if (mode != TransactionApplicationMode.REVERSE) {
            countsStore.apply(batch.getTransactionId(), batch.commands());
        }
    } finally {
        // 5. 释放锁
        lockService.releaseShared(
            ResourceTypes.SCHEMA,
            ResourceIds.schemaResource()
        );
    }
}

private void applyNodeCommand(NodeCommand command, TransactionApplicationMode mode) {
    NodeRecord after = command.getAfter();
    
    try (PageCursor cursor = pagedFile.io(
        nodeStore.pageIdForRecord(after.getId()),
        PagedFile.PF_SHARED_WRITE_LOCK,
        cursorContext
    )) {
        if (cursor.next()) {
            do {
                // 将 NodeRecord 编码到页面
                nodeStore.updateRecord(after, cursor, cursorContext, storeCursors);
            } while (cursor.shouldRetry());
        }
    }
}

代码说明

  1. 批次锁:保证多个事务的命令串行应用
  2. 访问者模式:每种命令类型(Node/Relationship/Property)有专门的处理器
  3. 页面写入:通过 PageCursor 直接修改页面内容
  4. 索引更新:通知 IndexingService 更新索引
  5. 统计更新:更新节点/关系计数

API 2:PageCursor 读写操作

基本信息

  • 接口名称PagedFile.io
  • 方法签名PageCursor io(long pageId, int flags, CursorContext context)
  • 幂等性:是(多次调用返回不同的游标实例)

请求参数

// PagedFile 标志位
public static final int PF_SHARED_READ_LOCK = 0x01;   // 共享读锁
public static final int PF_SHARED_WRITE_LOCK = 0x02;  // 共享写锁
public static final int PF_NO_GROW = 0x04;            // 不扩展文件
public static final int PF_NO_FAULT = 0x08;           // 不触发缺页
参数 类型 必填 说明
pageId long 页面 ID(文件内的逻辑页号)
flags int 访问标志(读锁/写锁/不扩展等)
context CursorContext 游标上下文(用于追踪)

核心读写模式

模式 1:读取节点记录

// 读取节点记录
public NodeRecord readNode(long nodeId) {
    long pageId = nodeStore.pageIdForRecord(nodeId);
    int offset = nodeStore.offsetForId(nodeId);
    
    try (PageCursor cursor = pagedFile.io(
        pageId,
        PagedFile.PF_SHARED_READ_LOCK,
        cursorContext
    )) {
        if (cursor.next()) {
            do {
                // 定位到记录位置
                cursor.setOffset(offset);
                
                // 读取节点记录字段
                byte headerByte = cursor.getByte();
                boolean inUse = (headerByte & 0x01) != 0;
                boolean dense = (headerByte & 0x02) != 0;
                
                long nextRel = cursor.getLong();
                long nextProp = cursor.getLong();
                long labels = cursor.getLong();
                
                // 构造 NodeRecord
                return new NodeRecord(nodeId)
                    .initialize(inUse, nextProp, dense, nextRel, labels);
                    
            } while (cursor.shouldRetry());
        }
    }
    
    return null;
}

模式 2:写入关系记录

// 写入关系记录
public void writeRelationship(RelationshipRecord record) {
    long pageId = relationshipStore.pageIdForRecord(record.getId());
    int offset = relationshipStore.offsetForId(record.getId());
    
    try (PageCursor cursor = pagedFile.io(
        pageId,
        PagedFile.PF_SHARED_WRITE_LOCK,
        cursorContext
    )) {
        if (cursor.next()) {
            do {
                // 定位到记录位置
                cursor.setOffset(offset);
                
                // 写入关系记录字段
                byte headerByte = 0;
                headerByte |= record.inUse() ? 0x01 : 0x00;
                headerByte |= record.isFirstInFirstChain() ? 0x02 : 0x00;
                headerByte |= record.isFirstInSecondChain() ? 0x04 : 0x00;
                cursor.putByte(headerByte);
                
                cursor.putLong(record.getFirstNode());
                cursor.putLong(record.getSecondNode());
                cursor.putInt(record.getType());
                cursor.putLong(record.getFirstPrevRel());
                cursor.putLong(record.getFirstNextRel());
                cursor.putLong(record.getSecondPrevRel());
                cursor.putLong(record.getSecondNextRel());
                cursor.putLong(record.getNextProp());
                
                // 标记页面为脏页
                cursor.setDirty();
                
            } while (cursor.shouldRetry());
        }
    }
}

shouldRetry 机制说明

  • PageCursor 采用乐观并发控制
  • 读取时不阻塞写入,但需检查 shouldRetry()
  • 如果页面在读取过程中被修改,shouldRetry() 返回 true
  • 重试机制保证读取一致性快照

API 3:TransactionLog.append(追加事务日志)

基本信息

  • 接口名称TransactionAppender.append
  • 方法签名long append(List<StorageCommand> commands, TransactionChunkAppender appender)
  • 幂等性:否(重复追加会导致日志重复)

请求参数

public interface TransactionChunkAppender {
    // 开始新的事务块
    ChunkWriter beginChunk();
    
    // 提交事务块
    void commitChunk(ChunkWriter writer) throws IOException;
}
参数 类型 必填 说明
commands List<StorageCommand> 存储命令列表
appender TransactionChunkAppender 事务块追加器

核心代码

// community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/TransactionLogAppender.java
public long append(List<StorageCommand> commands, TransactionChunkAppender appender) throws IOException {
    // 1. 获取写锁(串行化日志写入)
    appendLock.lock();
    try {
        // 2. 开始新的事务块
        ChunkWriter writer = appender.beginChunk();
        
        // 3. 写入事务头
        writer.putByte(TX_START);
        writer.putInt(kernelVersion.version());
        writer.putLong(transactionId);
        writer.putLong(commitTime);
        writer.putLong(latestCommittedTxWhenStarted);
        
        // 4. 序列化命令
        for (StorageCommand command : commands) {
            // 写入命令类型标记
            writer.putByte(commandType(command));
            
            // 写入命令数据
            command.serialize(writer);
        }
        
        // 5. 写入事务尾
        writer.putByte(TX_END);
        
        // 6. 提交事务块(刷新到日志文件)
        appender.commitChunk(writer);
        
        // 7. 检查是否需要日志轮转
        if (channel.position() >= rotationThreshold) {
            logRotation.rotateLogFile(LogAppendEvent.NULL);
        }
        
        // 8. 返回追加索引(日志位置)
        return currentLogPosition.getByteOffset();
        
    } finally {
        appendLock.unlock();
    }
}

日志格式

+----------------+
| TX_START (1B)  |
+----------------+
| Version (4B)   |
+----------------+
| TxId (8B)      |
+----------------+
| CommitTime(8B) |
+----------------+
| Commands...    |
+----------------+
| TX_END (1B)    |
+----------------+

关键流程时序图

时序图 1:节点创建完整流程

sequenceDiagram
    autonumber
    participant KTx as KernelTransaction
    participant Ops as Operations
    participant TxState as TxState
    participant Commit as CommitProcess
    participant Engine as RecordStorageEngine
    participant NeoStores as NeoStores
    participant PageCache as PageCache
    participant WAL as TransactionLog
    
    KTx->>Ops: dataWrite().nodeCreate()
    Ops->>Ops: 分配节点 ID
    Note over Ops: commandCreationContext.reserveNode()
    Ops->>TxState: nodeDoCreate(nodeId)
    TxState->>TxState: 记录节点创建
    
    KTx->>Commit: commit(commands)
    Commit->>Engine: createCommands(txState, reader, ...)
    Engine->>TxState: 遍历变更集
    TxState-->>Engine: 返回新增节点列表
    
    Engine->>Engine: 生成 NodeCommand
    Note over Engine: NodeCommand{before=null, after=NodeRecord}
    Engine-->>Commit: 返回 StorageCommand 列表
    
    Commit->>WAL: append(commands)
    WAL->>WAL: 序列化命令
    WAL->>PageCache: 写入日志页面
    PageCache->>PageCache: 标记 Dirty Page
    WAL-->>Commit: 返回 TransactionId
    
    Commit->>Engine: apply(batch, INTERNAL)
    Engine->>NeoStores: getNodeStore()
    NeoStores-->>Engine: NodeStore 实例
    
    Engine->>PageCache: io(pageId, PF_SHARED_WRITE_LOCK)
    PageCache-->>Engine: PageCursor 实例
    
    Engine->>Engine: cursor.next()
    Engine->>Engine: cursor.setOffset(offset)
    Engine->>Engine: 编码 NodeRecord 到页面
    Note over Engine: cursor.putByte(headerByte)<br/>cursor.putLong(nextRel)<br/>cursor.putLong(nextProp)
    
    Engine->>Engine: cursor.setDirty()
    Engine->>Engine: cursor.close()
    
    Note over PageCache: Checkpoint 线程<br/>定期刷新 Dirty Pages
    
    PageCache->>PageCache: flushAndForce()
    PageCache->>PageCache: 遍历 Dirty Pages
    PageCache->>PageCache: pageSwapper.write(page, fileOffset)

时序图说明

节点创建阶段(步骤 1-4)

  • ID 分配:从 IdGeneratorFactory 获取下一个可用节点 ID
  • TxState 记录:在内存中记录节点创建,提交前对其他事务不可见

命令生成阶段(步骤 5-10)

  • 遍历 TxState:提取所有变更(新增节点/关系/属性)
  • 生成 NodeCommand:包含 before(null)和 after(NodeRecord)两个状态
  • 序列化命令:将命令列表转换为字节流

WAL 写入阶段(步骤 11-15)

  • 串行写入:通过 appendLock 保证日志顺序写入
  • 批量刷新:Group Commit 机制合并多个事务的日志写入
  • 页面标记:写入的日志页面标记为 Dirty

存储应用阶段(步骤 16-26)

  • 批次锁:获取存储锁,防止并发写入冲突
  • 页面定位:计算节点记录所在的页面 ID 与偏移量
  • 编码写入:将 NodeRecord 编码为字节并写入页面
  • 脏页标记:写入的页面标记为 Dirty,等待 Checkpoint 刷新

Checkpoint 刷新(步骤 27-30)

  • 后台线程:CheckpointScheduler 定期触发 Checkpoint
  • 刷新所有 Dirty Pages:遍历所有映射文件,刷新脏页面到磁盘
  • fsync 保证:调用 fileChannel.force(true) 保证持久化

时序图 2:PageCache 缺页处理

sequenceDiagram
    autonumber
    participant Cursor as PageCursor
    participant PagedFile as PagedFile
    participant Cache as PageCache
    participant Faulter as PageFaulter
    participant Evictor as EvictionThread
    participant Swapper as PageSwapper
    participant Disk as 磁盘
    
    Cursor->>PagedFile: next(pageId)
    PagedFile->>PagedFile: 查找页面映射表
    PagedFile->>PagedFile: 页面未在内存中
    
    PagedFile->>Cache: acquirePage(pageId)
    Cache->>Cache: 查找空闲页面
    Cache->>Cache: 无空闲页面
    
    Cache->>Evictor: 触发驱逐
    Evictor->>Evictor: 扫描页面访问计数
    Evictor->>Evictor: 找到访问计数为 0 的页面
    
    alt 页面是脏页
        Evictor->>Swapper: write(page, fileOffset)
        Swapper->>Disk: 写入磁盘
        Disk-->>Swapper: 写入完成
    end
    
    Evictor->>Cache: 标记页面为空闲
    Cache-->>PagedFile: 返回空闲页面
    
    PagedFile->>Faulter: fault(page, pageId)
    Faulter->>Swapper: read(pageId)
    Swapper->>Disk: 读取页面数据
    Disk-->>Swapper: 返回页面数据
    Swapper-->>Faulter: 填充页面
    
    Faulter->>PagedFile: 更新页面映射表
    PagedFile-->>Cursor: 返回已绑定页面的游标
    
    Cursor->>Cursor: 读取页面数据

缺页处理说明

页面查找(步骤 1-4)

  • 映射表查找:PagedFile 维护 pageId → Page 的映射表
  • 未命中:如果页面不在内存中,触发缺页处理

驱逐流程(步骤 5-11)

  • 空闲页面查找:优先使用空闲页面列表
  • 驱逐策略:扫描所有页面,选择访问计数为 0 的页面
  • 脏页刷新:如果被驱逐的页面是脏页,先刷新到磁盘

页面加载(步骤 12-18)

  • 从磁盘读取:通过 PageSwapper 读取指定页面
  • 更新映射表:将新加载的页面加入映射表
  • 返回游标:返回已绑定到页面的 PageCursor

存储文件组织

存储文件列表

文件名 存储内容 记录大小 说明
neostore.nodestore.db 节点记录 15 字节 节点ID、标签、第一个属性/关系指针
neostore.relationshipstore.db 关系记录 34 字节 关系ID、类型、起点/终点、双向链表指针
neostore.propertystore.db 属性记录 41 字节 属性块(最多4个键值对)
neostore.propertystore.db.strings 长字符串 变长 字符串动态记录
neostore.propertystore.db.arrays 大数组 变长 数组动态记录
neostore.relationshipgroupstore.db 关系组 25 字节 Dense 节点的关系分组
neostore.labeltokenstore.db 标签 Token 9 字节 标签ID → 标签名指针
neostore.relationshiptypestore.db 关系类型 Token 9 字节 类型ID → 类型名指针
neostore.propertystore.db.index.keys 属性键 Token 9 字节 键ID → 键名指针
neostore.schemastore.db 模式定义 变长 索引与约束定义
neostore 元数据 变长 数据库版本、创建时间、Store ID
neostore.transaction.db.0 事务日志 变长 事务命令序列化
neostore.counts.db 统计信息 变长 节点/关系计数、标签分布
*.id ID 生成器 变长 下一个可用ID、空闲ID列表

文件布局示例

/path/to/neo4j-db/
├── neostore
├── neostore.nodestore.db
├── neostore.nodestore.db.id
├── neostore.relationshipstore.db
├── neostore.relationshipstore.db.id
├── neostore.propertystore.db
├── neostore.propertystore.db.id
├── neostore.propertystore.db.strings
├── neostore.propertystore.db.strings.id
├── neostore.propertystore.db.arrays
├── neostore.propertystore.db.arrays.id
├── neostore.relationshipgroupstore.db
├── neostore.relationshipgroupstore.db.id
├── neostore.schemastore.db
├── neostore.labeltokenstore.db
├── neostore.relationshiptypestore.db
├── neostore.propertystore.db.index.keys
├── neostore.transaction.db.0
├── neostore.transaction.db.1
├── neostore.transaction.db.2
└── neostore.counts.db

最佳实践与性能优化

实践 1:Page Cache 大小配置

问题:Page Cache 过小导致频繁缺页

# ❌ 低效配置:仅 512MB
server.memory.pagecache.size=512M

优化方案:分配物理内存的 50-70%

# ✅ 推荐配置:8GB 物理内存 → 4-5GB Page Cache
server.memory.pagecache.size=4G

# 堆内存配置(剩余内存分配给堆)
server.memory.heap.initial_size=2G
server.memory.heap.max_size=2G

监控指标

# 查看 Page Cache 命中率
neo4j-admin server report --verbose

# 目标命中率:> 90%
# 如果 < 90%,增加 Page Cache 大小或优化查询

实践 2:Dense 节点优化

问题:超级节点(关系数 > 10 万)遍历缓慢

// ❌ 低效查询:全关系扫描
MATCH (user:User {id: 1})-[r]->(friend)
RETURN count(friend)

优化方案 1:按关系类型过滤

// ✅ 高效查询:仅扫描 FOLLOWS 关系
MATCH (user:User {id: 1})-[r:FOLLOWS]->(friend)
RETURN count(friend)

优化方案 2:配置 Dense 节点阈值

# 降低 Dense 节点阈值(默认 50)
db.relationship_grouping_threshold=20

Dense 节点判断

  • 关系数 ≤ 阈值:普通节点,关系链表直接挂在节点上
  • 关系数 > 阈值:Dense 节点,使用 RelationshipGroupStore 分组

实践 3:批量导入优化

问题:逐条插入性能低下

// ❌ 低效实现:每条执行一次事务
UNWIND $data AS row
CREATE (n:Person {name: row.name, age: row.age})

优化方案:使用 neo4j-admin database import

# ✅ 批量导入工具(离线)
neo4j-admin database import full \
  --nodes=Person=persons.csv \
  --relationships=KNOWS=knows.csv \
  --overwrite-destination \
  neo4j

# 性能提升:100 倍以上
# 原理:绕过事务层,直接写入存储文件

在线批量导入

// 批量大小:10000
:auto USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM 'file:///persons.csv' AS row
CREATE (n:Person {name: row.name, age: toInteger(row.age)})

实践 4:Checkpoint 配置优化

问题:Checkpoint 间隔过长,崩溃恢复时间长

# ❌ 低效配置:30 分钟一次 Checkpoint
db.checkpoint.interval.time=30m

优化方案:缩短 Checkpoint 间隔

# ✅ 推荐配置:5 分钟一次 Checkpoint
db.checkpoint.interval.time=5m

# 或基于事务日志大小触发
db.checkpoint.interval.tx=10000

权衡说明

  • 频繁 Checkpoint:减少崩溃恢复时间,但增加 I/O 开销
  • 稀疏 Checkpoint:减少 I/O 开销,但增加崩溃恢复时间

框架使用示例

示例 1:直接访问 Page Cache

import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.io.pagecache.PageCursor;

public class PageCacheExample {
    public void readNodeRecord(PageCache pageCache, Path nodeStoreFile) throws IOException {
        // 1. 映射文件到 Page Cache
        try (PagedFile pagedFile = pageCache.map(
            nodeStoreFile,
            8192, // 页面大小
            "neo4j",
            ImmutableSet.of(StandardOpenOption.READ)
        )) {
            // 2. 创建游标
            try (PageCursor cursor = pagedFile.io(
                0, // 页面 ID
                PagedFile.PF_SHARED_READ_LOCK,
                cursorContext
            )) {
                // 3. 读取页面
                if (cursor.next()) {
                    do {
                        // 读取节点记录
                        byte headerByte = cursor.getByte();
                        boolean inUse = (headerByte & 0x01) != 0;
                        
                        if (inUse) {
                            long nextRel = cursor.getLong();
                            long nextProp = cursor.getLong();
                            System.out.println("Node: nextRel=" + nextRel + ", nextProp=" + nextProp);
                        }
                    } while (cursor.shouldRetry());
                }
            }
        }
    }
}

示例 2:自定义存储扩展

import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.storageengine.api.StorageEngineFactory;

@ServiceProvider
public class CustomStorageEngineFactory implements StorageEngineFactory {
    @Override
    public String name() {
        return "custom-storage";
    }
    
    @Override
    public StorageEngine instantiate(
        FileSystemAbstraction fs,
        DatabaseLayout layout,
        Config config,
        PageCache pageCache,
        TokenHolders tokenHolders,
        ...
    ) {
        // 返回自定义存储引擎实现
        return new CustomStorageEngine(...);
    }
}

模块总结

Neo4j 的存储层模块是数据库性能的基石,通过以下设计实现高效持久化:

  1. 原生图存储:节点/关系/属性直接映射到定长记录,无需 JOIN 操作
  2. Page Cache 抽象:统一 I/O 接口,支持多种存储后端(本地磁盘、S3等)
  3. WAL 机制:保证 ACID 事务的持久性与崩溃恢复能力
  4. 链式存储:关系链表减少索引开销,遍历性能优异

理解存储层的实现细节,是进行性能调优、扩展开发、故障诊断的前提。