Neo4j-03-扩展模块-Bolt·Cypher·索引·配置


模块一:Bolt 网络通信协议

概览

职责

  • 实现 Bolt 二进制协议(v5.x)
  • 管理客户端连接与会话
  • 处理请求消息(HELLO、RUN、PULL、COMMIT等)
  • 流式结果返回与背压控制

路径community/bolt

架构图

flowchart TB
    subgraph Client["客户端"]
        Driver[Neo4j Driver]
    end
    
    subgraph BoltServer["Bolt Server"]
        BoltSrv[BoltServer]
        Connector[BoltConnector]
        
        subgraph NettyPipeline["Netty Pipeline"]
            TransportSelection[TransportSelectionHandler]
            Handshake[ProtocolHandshakeHandler]
            StructDecoder[BoltStructDecoder]
            ReqHandler[RequestHandler]
            StructEncoder[BoltStructEncoder]
        end
        
        subgraph ConnectionManagement["连接管理"]
            ConnPool[Connection Pool]
            ConnImpl[AbstractConnection]
            StateMachine[BoltStateMachine]
        end
        
        subgraph MessageProcessing["消息处理"]
            MsgDecoder[MessageDecoder]
            MsgHandler[MessageHandler]
            RespHandler[ResponseHandler]
        end
    end
    
    subgraph GraphDB["Graph Database"]
        DBMS[DatabaseManagementService]
        Tx[Transaction]
    end
    
    Driver -->|TCP/WebSocket| BoltSrv
    BoltSrv --> Connector
    Connector --> TransportSelection
    
    TransportSelection -->|SSL/Plain| Handshake
    Handshake --> StructDecoder
    StructDecoder --> ReqHandler
    ReqHandler --> MsgDecoder
    
    MsgDecoder --> MsgHandler
    MsgHandler --> StateMachine
    StateMachine --> ConnImpl
    ConnImpl --> ConnPool
    
    StateMachine --> DBMS
    DBMS --> Tx
    
    Tx --> RespHandler
    RespHandler --> StructEncoder

Bolt 消息类型

1. 认证消息

HELLO

结构:{
    tag: 0x01,
    user_agent: "neo4j-driver/5.0",
    routing: {...},
    bolt_agent: {...}
}
  • 作用:建立连接,协商协议版本
  • 响应:SUCCESS(包含服务器信息)或 FAILURE

LOGON

结构:{
    tag: 0x6A,
    auth: {scheme: "basic", principal: "neo4j", credentials: "password"}
}
  • 作用:用户认证(Bolt 5.1+ 支持)
  • 响应:SUCCESS 或 FAILURE

2. 查询消息

RUN

结构:{
    tag: 0x10,
    query: "MATCH (n:Person) WHERE n.name = $name RETURN n",
    parameters: {name: "Alice"},
    extra: {db: "neo4j", timeout: 60000}
}
  • 作用:执行 Cypher 查询
  • 响应:SUCCESS(包含查询元数据:fields、t_first)

PULL

结构:{
    tag: 0x3F,
    extra: {n: 1000, qid: -1}
}
  • 作用:拉取查询结果(流式)
  • 响应:RECORD(多条)+ SUCCESS(包含 t_last、has_more)

DISCARD

结构:{
    tag: 0x2F,
    extra: {n: -1, qid: -1}
}
  • 作用:丢弃剩余结果
  • 响应:SUCCESS

3. 事务消息

BEGIN

结构:{
    tag: 0x11,
    extra: {bookmarks: [...], tx_timeout: 60000, tx_metadata: {...}}
}
  • 作用:开始显式事务
  • 响应:SUCCESS

COMMIT

结构:{
    tag: 0x12
}
  • 作用:提交事务
  • 响应:SUCCESS(包含 bookmark)

ROLLBACK

结构:{
    tag: 0x13
}
  • 作用:回滚事务
  • 响应:SUCCESS

4. 连接管理消息

RESET

结构:{
    tag: 0x0F
}
  • 作用:重置连接状态(中断当前查询)
  • 响应:SUCCESS

GOODBYE

结构:{
    tag: 0x02
}
  • 作用:优雅关闭连接
  • 响应:无(服务器关闭连接)

Bolt 状态机

stateDiagram-v2
    [*] --> NEGOTIATION: 连接建立
    NEGOTIATION --> AUTHENTICATION: HELLO
    AUTHENTICATION --> READY: LOGON/SUCCESS
    AUTHENTICATION --> FAILED: 认证失败
    
    READY --> STREAMING: RUN
    READY --> IN_TRANSACTION: BEGIN
    READY --> READY: RESET
    
    STREAMING --> READY: PULL/全部结果
    STREAMING --> STREAMING: PULL/部分结果
    STREAMING --> READY: DISCARD
    
    IN_TRANSACTION --> IN_TRANSACTION: RUN
    IN_TRANSACTION --> TX_STREAMING: RUN/有结果
    IN_TRANSACTION --> READY: COMMIT
    IN_TRANSACTION --> READY: ROLLBACK
    
    TX_STREAMING --> IN_TRANSACTION: PULL/DISCARD
    
    FAILED --> READY: RESET
    FAILED --> [*]: GOODBYE
    
    READY --> [*]: GOODBYE

核心代码示例

Connection 提交消息

// community/bolt/src/main/java/org/neo4j/bolt/protocol/common/connector/connection/AbstractConnection.java
public void submit(RequestMessage message) {
    // 1. 创建消息信号
    var signal = new MessageSignal(message);
    
    // 2. 提交到状态机队列
    this.inboundSignals.offer(signal);
    
    // 3. 如果有可用工作线程,触发处理
    if (this.tryAcquire()) {
        this.executor.execute(() -> {
            try {
                // 处理所有待处理消息
                this.processSignals();
            } finally {
                this.release();
            }
        });
    }
}

private void processSignals() {
    MessageSignal signal;
    while ((signal = this.inboundSignals.poll()) != null) {
        try {
            // 调用状态机处理
            this.fsm.process(signal.message(), this.responseHandler);
        } catch (BoltConnectionFatality e) {
            // 连接级错误,关闭连接
            this.close();
            break;
        } catch (Exception e) {
            // 发送错误响应
            this.responseHandler.onFailure(Error.from(e));
        }
    }
}

状态机处理 RUN 消息

// 伪代码:状态机处理逻辑
public void handleRun(RunMessage message) {
    // 1. 验证状态(必须在 READY 或 IN_TRANSACTION)
    if (!isValidState()) {
        throw new BoltProtocolException("Invalid state for RUN");
    }
    
    // 2. 开始事务(如果是隐式事务)
    if (currentState == State.READY) {
        this.currentTransaction = database.beginTransaction(
            Type.IMPLICIT,
            loginContext,
            connectionInfo,
            message.timeout()
        );
    }
    
    // 3. 执行查询
    Result result = currentTransaction.execute(
        message.query(),
        message.parameters()
    );
    
    // 4. 切换到 STREAMING 状态
    this.currentState = State.STREAMING;
    this.currentResult = result;
    
    // 5. 发送 SUCCESS 响应(包含元数据)
    responseHandler.onSuccess(Map.of(
        "fields", result.columns(),
        "t_first", result.executionPlanDescription().getTime()
    ));
}

模块二:Cypher 查询引擎

概览

职责

  • Cypher 语法解析(ANTLR 4)
  • 逻辑计划生成(AST → LogicalPlan)
  • 查询优化(重写规则、成本估算)
  • 物理计划生成(Pipelined/Slotted/Interpreted 运行时)
  • 查询执行与结果返回

路径community/cypher

架构图

flowchart TB
    subgraph Compiler["Cypher Compiler"]
        Parser[ANTLR Parser]
        ASTBuilder[AST Builder]
        SemanticAnalyzer[Semantic Analyzer]
        
        subgraph LogicalPlanner["逻辑规划器"]
            QueryGraph[QueryGraph Builder]
            LogicalPlanProducer[LogicalPlan Producer]
            Rewriter[Rewrite Rules]
        end
        
        subgraph CostPlanner["成本规划器"]
            CardinalityEstimator[Cardinality Estimator]
            CostModel[Cost Model]
            PlanSelector[Plan Selector]
        end
    end
    
    subgraph PhysicalPlanner["物理规划器"]
        RuntimeSelector[Runtime Selector]
        
        subgraph Runtimes["执行引擎"]
            Interpreted[Interpreted]
            Slotted[Slotted]
            Pipelined[Pipelined]
        end
    end
    
    subgraph Execution["查询执行"]
        Operators[Operators]
        Cursors[Cursor API]
        KernelAPI[Kernel API]
    end
    
    Parser --> ASTBuilder
    ASTBuilder --> SemanticAnalyzer
    SemanticAnalyzer --> QueryGraph
    
    QueryGraph --> LogicalPlanProducer
    LogicalPlanProducer --> Rewriter
    Rewriter --> CardinalityEstimator
    
    CardinalityEstimator --> CostModel
    CostModel --> PlanSelector
    
    PlanSelector --> RuntimeSelector
    RuntimeSelector --> Runtimes
    
    Runtimes --> Operators
    Operators --> Cursors
    Cursors --> KernelAPI

查询编译流程

1. 语法解析

// 输入 Cypher 查询
"MATCH (p:Person)-[:KNOWS]->(f:Person) WHERE p.name = 'Alice' RETURN f.name"

// ANTLR 解析为 AST
Statement(
  Query(
    SingleQuery(
      Clause[
        Match(
          Pattern[
            PatternPart(
              NodePattern("p", labels=["Person"]),
              Rel("KNOWS"),
              NodePattern("f", labels=["Person"])
            )
          ],
          where=Property("p", "name") = Literal("Alice")
        ),
        Return(
          ReturnItems[
            Property("f", "name") as "f.name"
          ]
        )
      ]
    )
  )
)

2. 语义分析

// 检查:
// - 变量是否已声明
// - 属性是否存在
// - 类型是否匹配
// - 聚合函数使用是否正确

// 输出:SemanticState
SemanticState(
  symbolTable = Map(
    "p" -> NodeType(labels=["Person"]),
    "f" -> NodeType(labels=["Person"])
  ),
  types = Map(
    "f.name" -> StringType
  )
)

3. 逻辑计划生成

// 逻辑计划(树形结构)
Projection(
  columns = ["f.name"],
  source = Filter(
    predicate = Equals(Property(p, "name"), Literal("Alice")),
    source = Expand(
      from = "p",
      relTypes = ["KNOWS"],
      to = "f",
      direction = OUTGOING,
      source = NodeByLabelScan("p", "Person")
    )
  )
)

4. 查询重写

重写规则示例

// 规则 1:谓词下推(Predicate Pushdown)
// Before:
Filter(p.age > 30, Expand(p, r, q))

// After:
Expand(Filter(p.age > 30, NodeScan(p)), r, q)

// 规则 2:索引查找替换
// Before:
Filter(p.name = "Alice", NodeByLabelScan(p, "Person"))

// After:
NodeIndexSeek(p, "Person", "name", "Alice")

// 规则 3:合并过滤条件
// Before:
Filter(p.age > 30, Filter(p.name STARTS WITH "A", NodeScan(p)))

// After:
Filter(AND(p.age > 30, p.name STARTS WITH "A"), NodeScan(p))

5. 成本估算与计划选择

// 计划 A:全节点扫描 + 过滤
Plan_A = Filter(name="Alice", AllNodesScan("p"))
Cost_A = Rows(AllNodes) * CPU_Filter = 1,000,000 * 0.01 = 10,000

// 计划 B:索引查找
Plan_B = NodeIndexSeek("p", "Person", "name", "Alice")
Cost_B = Rows(Index) * CPU_Seek = 1 * 1.0 = 1

// 选择成本最低的计划:Plan_B

执行运行时对比

特性 Interpreted Slotted Pipelined
执行方式 火山模型(逐行拉取) 火山模型 + 槽位优化 向量化批量处理
内存占用 高(对象分配) 中(复用槽位) 低(批量缓冲区)
CPU 效率 低(虚方法调用) 中(减少对象创建) 高(SIMD 友好)
适用场景 小结果集 中等结果集 大结果集、聚合、排序
默认选择 自动切换(大数据量)

核心算子

AllNodesScan(全节点扫描)

class AllNodesScan(variable: String) extends Operator {
  override def init(state: QueryState): Iterator[ExecutionContext] = {
    val cursor = state.cursors.allocateNodeCursor()
    state.query.nodeReadOps().allNodesScan(cursor)
    
    new Iterator[ExecutionContext] {
      override def hasNext: Boolean = cursor.next()
      override def next(): ExecutionContext = {
        ExecutionContext.empty.set(variable, cursor.nodeReference())
      }
    }
  }
}

Expand(关系扩展)

class Expand(
  from: String,
  relTypes: Seq[Int],
  to: String,
  direction: Direction,
  source: Operator
) extends Operator {
  override def init(state: QueryState): Iterator[ExecutionContext] = {
    source.init(state).flatMap { row =>
      val fromNode = row.getLong(from)
      val relCursor = state.cursors.allocateRelationshipTraversalCursor()
      
      state.query.nodeReadOps().relationships(
        fromNode,
        direction,
        relTypes.toArray,
        relCursor
      )
      
      new Iterator[ExecutionContext] {
        override def hasNext: Boolean = relCursor.next()
        override def next(): ExecutionContext = {
          row.set(to, relCursor.otherNodeReference())
        }
      }
    }
  }
}

模块三:Lucene 索引

概览

职责

  • 基于 Apache Lucene 的范围索引与复合索引
  • 全文搜索索引(fulltext-index 模块)
  • 索引构建(Populator)与查询(Accessor)
  • 索引更新监听器

路径community/lucene-indexcommunity/fulltext-index

索引类型

1. B-Tree 索引(Range Index)

-- 创建单属性索引
CREATE INDEX person_name_idx FOR (p:Person) ON (p.name)

-- 创建复合索引
CREATE INDEX person_name_age_idx FOR (p:Person) ON (p.name, p.age)

-- 使用索引查询
MATCH (p:Person) WHERE p.name = 'Alice'  // 精确匹配
MATCH (p:Person) WHERE p.age > 30 AND p.age < 50  // 范围查询

底层实现

  • Lucene DocValues:用于范围查询
  • Lucene Terms:用于精确匹配
  • 索引文件schema/index/lucene-1/1/ (每个索引一个目录)

2. 全文索引(Fulltext Index)

-- 创建全文索引
CREATE FULLTEXT INDEX person_bio_idx 
FOR (p:Person) ON EACH [p.bio, p.interests]

-- 全文搜索
CALL db.index.fulltext.queryNodes('person_bio_idx', 'graph database')
YIELD node, score
RETURN node.name, score
ORDER BY score DESC

分析器支持

  • Standard Analyzer:默认,适用于英文
  • Language-Specific Analyzers:中文(CJK)、日文、韩文等
  • Custom Analyzers:自定义分词规则

3. 向量索引(Vector Index,5.11+)

-- 创建向量索引
CREATE VECTOR INDEX embedding_idx FOR (d:Document) ON (d.embedding)
OPTIONS {indexConfig: {`vector.dimensions`: 1536, `vector.similarity_function`: 'cosine'}}

-- 向量相似度搜索
MATCH (d:Document)
WHERE d.embedding IS NOT NULL
WITH d, vector.similarity.cosine(d.embedding, $queryVector) AS score
ORDER BY score DESC
LIMIT 10
RETURN d.title, score

索引更新机制

sequenceDiagram
    participant Tx as Transaction
    participant Engine as StorageEngine
    participant Listener as IndexUpdateListener
    participant Provider as IndexProvider
    participant Lucene as Lucene Index
    
    Tx->>Engine: commit(commands)
    Engine->>Engine: apply(commands)
    
    Engine->>Listener: onIndexUpdate(updates)
    Listener->>Listener: 批量收集更新
    
    Listener->>Provider: getOnlineAccessor(indexId)
    Provider-->>Listener: IndexAccessor
    
    Listener->>Lucene: newUpdater()
    Lucene-->>Listener: IndexUpdater
    
    loop 遍历更新
        Listener->>Lucene: process(IndexEntryUpdate)
        alt 新增
            Lucene->>Lucene: addDocument(doc)
        else 删除
            Lucene->>Lucene: deleteDocuments(term)
        else 修改
            Lucene->>Lucene: updateDocument(term, doc)
        end
    end
    
    Listener->>Lucene: close()
    Lucene->>Lucene: flush segments

核心代码示例

索引查询

// community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/LuceneIndexAccessor.java
public NodeValueIndexCursor query(
    IndexQueryConstraints constraints,
    PropertyIndexQuery... predicates
) throws IndexNotApplicableKernelException {
    // 1. 构建 Lucene 查询
    Query luceneQuery = toLuceneQuery(predicates);
    
    // 2. 执行搜索
    IndexSearcher searcher = searcherManager.acquire();
    try {
        TopDocs topDocs = searcher.search(luceneQuery, constraints.limit());
        
        // 3. 转换为节点 ID 列表
        long[] nodeIds = new long[topDocs.scoreDocs.length];
        for (int i = 0; i < topDocs.scoreDocs.length; i++) {
            Document doc = searcher.doc(topDocs.scoreDocs[i].doc);
            nodeIds[i] = Long.parseLong(doc.get("id"));
        }
        
        // 4. 返回游标
        return new NodeValueIndexCursor(nodeIds, cursors);
    } finally {
        searcherManager.release(searcher);
    }
}

private Query toLuceneQuery(PropertyIndexQuery... predicates) {
    BooleanQuery.Builder builder = new BooleanQuery.Builder();
    
    for (PropertyIndexQuery predicate : predicates) {
        if (predicate instanceof PropertyIndexQuery.ExactPredicate exact) {
            // 精确匹配
            builder.add(
                new TermQuery(new Term("prop_" + exact.propertyKeyId(), exact.value().toString())),
                BooleanClause.Occur.MUST
            );
        } else if (predicate instanceof PropertyIndexQuery.RangePredicate<?> range) {
            // 范围查询
            builder.add(
                NumericDocValuesField.newSlowRangeQuery(
                    "prop_" + range.propertyKeyId(),
                    (Long) range.from(),
                    (Long) range.to()
                ),
                BooleanClause.Occur.MUST
            );
        }
    }
    
    return builder.build();
}

模块四:存储过程(Procedure)

概览

职责

  • 存储过程注册与调用
  • 用户自定义函数(UDF)
  • 聚合函数(Aggregation Function)
  • 注解处理(@Procedure@UserFunction

路径community/procedurecommunity/procedure-compiler

存储过程示例

编写存储过程

package com.example.procedures;

import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.procedure.*;

public class MyProcedures {
    @Context
    public GraphDatabaseService db;
    
    @Context
    public Transaction tx;
    
    // 1. 只读存储过程
    @Procedure(name = "example.findFriends", mode = Mode.READ)
    @Description("Find all friends of a person")
    public Stream<FriendResult> findFriends(@Name("personName") String name) {
        return tx.findNodes(Label.label("Person"), "name", name)
            .stream()
            .flatMap(person -> StreamSupport.stream(
                person.getRelationships(RelationshipType.withName("KNOWS"), Direction.OUTGOING).spliterator(),
                false
            ))
            .map(rel -> rel.getEndNode())
            .map(friend -> new FriendResult(
                friend.getId(),
                (String) friend.getProperty("name"),
                (Integer) friend.getProperty("age", 0)
            ));
    }
    
    // 2. 用户自定义函数(UDF)
    @UserFunction(name = "example.fullName")
    @Description("Concatenate first and last name")
    public String fullName(
        @Name("firstName") String first,
        @Name("lastName") String last
    ) {
        return first + " " + last;
    }
    
    // 3. 聚合函数
    @UserAggregationFunction(name = "example.longestString")
    @Description("Find the longest string in a collection")
    public LongestStringAggregator longestString() {
        return new LongestStringAggregator();
    }
    
    // 结果类(必须是 public static)
    public static class FriendResult {
        public long id;
        public String name;
        public int age;
        
        public FriendResult(long id, String name, int age) {
            this.id = id;
            this.name = name;
            this.age = age;
        }
    }
    
    // 聚合器类
    public static class LongestStringAggregator {
        private String longest = "";
        
        @UserAggregationUpdate
        public void update(@Name("value") String value) {
            if (value != null && value.length() > longest.length()) {
                longest = value;
            }
        }
        
        @UserAggregationResult
        public String result() {
            return longest;
        }
    }
}

调用存储过程

-- 调用存储过程
CALL example.findFriends('Alice') 
YIELD id, name, age
WHERE age > 30
RETURN name, age

-- 使用 UDF
RETURN example.fullName('John', 'Doe') AS fullName

-- 使用聚合函数
MATCH (p:Person)
RETURN example.longestString(p.bio) AS longestBio

注册存储过程

// 方式 1:自动扫描(classpath)
// 将 JAR 放置到 Neo4j plugins/ 目录

// 方式 2:编程方式注册
GlobalProcedures procedures = db.getDependencyResolver()
    .resolveDependency(GlobalProcedures.class);
procedures.registerProcedure(MyProcedures.class);
procedures.registerFunction(MyProcedures.class);
procedures.registerAggregationFunction(MyProcedures.class);

模块五:配置与监控

配置管理(Configuration)

路径community/configuration

配置项定义

// community/configuration/src/main/java/org/neo4j/configuration/GraphDatabaseSettings.java
public class GraphDatabaseSettings {
    @Description("Size of the page cache")
    public static final Setting<ByteUnit> pagecache_memory = 
        newBuilder("server.memory.pagecache.size", BYTES, null)
            .addConstraint(min(ByteUnit.mebiBytes(8)))
            .build();
    
    @Description("Transaction timeout")
    public static final Setting<Duration> transaction_timeout = 
        newBuilder("db.transaction.timeout", DURATION, Duration.ZERO)
            .addConstraint(min(Duration.ZERO))
            .build();
    
    @Description("Default database name")
    public static final Setting<String> default_database = 
        newBuilder("dbms.default_database", STRING, DEFAULT_DATABASE_NAME)
            .immutable()
            .build();
}

动态配置更新

-- 查看当前配置
CALL dbms.listConfig() YIELD name, value
WHERE name STARTS WITH 'db.transaction'
RETURN name, value

-- 更新动态配置(需重启的配置无法动态更新)
CALL dbms.setConfigValue('db.transaction.timeout', '120s')

监控指标(Monitoring)

路径community/monitoring

JMX Metrics

// 注册 JMX Bean
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName("org.neo4j:instance=kernel#0,name=Transactions");
mbs.registerMBean(new TransactionMetrics(), name);

// TransactionMetrics 接口
public interface TransactionMetricsMBean {
    long getNumberOfOpenTransactions();
    long getNumberOfCommittedTransactions();
    long getPeakNumberOfConcurrentTransactions();
    long getNumberOfRolledBackTransactions();
}

Prometheus Metrics 导出

# neo4j.conf
server.metrics.enabled=true
server.metrics.prometheus.enabled=true
server.metrics.prometheus.endpoint=:2004
# Prometheus 指标格式
# TYPE neo4j_transaction_started_total counter
neo4j_transaction_started_total{database="neo4j"} 12345

# TYPE neo4j_page_cache_hit_ratio gauge
neo4j_page_cache_hit_ratio 0.95

# TYPE neo4j_bolt_connections_opened_total counter
neo4j_bolt_connections_opened_total 678

关键监控指标

指标名称 类型 说明
neo4j_transaction_active Gauge 当前活跃事务数
neo4j_transaction_committed_total Counter 累计提交事务数
neo4j_transaction_rollbacks_total Counter 累计回滚事务数
neo4j_page_cache_hit_ratio Gauge Page Cache 命中率(目标 >90%)
neo4j_page_cache_evictions_total Counter 页面驱逐总数
neo4j_page_cache_flushes_total Counter 页面刷新总数
neo4j_bolt_connections_opened_total Counter Bolt 连接打开总数
neo4j_bolt_connections_closed_total Counter Bolt 连接关闭总数
neo4j_bolt_messages_received_total Counter 接收的 Bolt 消息总数
neo4j_cypher_replan_events_total Counter 查询重编译次数
neo4j_ids_in_use{type="NODE"} Gauge 已分配的节点 ID 数量
neo4j_ids_in_use{type="RELATIONSHIP"} Gauge 已分配的关系 ID 数量

最佳实践汇总

1. Bolt 连接池配置

客户端驱动配置

import org.neo4j.driver.*;

public class DriverConfig {
    public static Driver createDriver() {
        return GraphDatabase.driver(
            "bolt://localhost:7687",
            AuthTokens.basic("neo4j", "password"),
            Config.builder()
                .withMaxConnectionPoolSize(50)           // 最大连接数
                .withConnectionAcquisitionTimeout(60, TimeUnit.SECONDS)
                .withConnectionTimeout(15, TimeUnit.SECONDS)
                .withMaxTransactionRetryTime(30, TimeUnit.SECONDS)
                .build()
        );
    }
}

2. Cypher 查询优化

优化前

// ❌ 低效:笛卡尔积
MATCH (p:Person), (c:Company)
WHERE p.works_at = c.id
RETURN p.name, c.name

优化后

// ✅ 高效:直接关系遍历
MATCH (p:Person)-[:WORKS_AT]->(c:Company)
RETURN p.name, c.name

使用 EXPLAIN 分析

EXPLAIN
MATCH (p:Person)-[:KNOWS*1..3]->(f:Person)
WHERE p.name = 'Alice'
RETURN f.name

3. 索引最佳实践

创建索引时机

-- 1. 频繁查询的属性
CREATE INDEX person_email_idx FOR (p:Person) ON (p.email)

-- 2. 唯一性约束(自动创建唯一索引)
CREATE CONSTRAINT person_email_unique FOR (p:Person) REQUIRE p.email IS UNIQUE

-- 3. 复合索引(查询多个属性)
CREATE INDEX person_name_age_idx FOR (p:Person) ON (p.name, p.age)

监控索引使用情况

-- 查看索引列表
SHOW INDEXES

-- 查看索引统计
CALL db.index.fulltext.listAvailableAnalyzers()

4. 监控告警阈值

Prometheus 告警规则

groups:
- name: neo4j_alerts
  rules:
  # Page Cache 命中率过低
  - alert: LowPageCacheHitRatio
    expr: neo4j_page_cache_hit_ratio < 0.85
    for: 5m
    annotations:
      summary: "Page Cache 命中率低于 85%"
      
  # 活跃事务数过多
  - alert: HighActiveTransactions
    expr: neo4j_transaction_active > 100
    for: 3m
    annotations:
      summary: "活跃事务数超过 100"
      
  # Bolt 连接数异常增长
  - alert: BoltConnectionSpike
    expr: rate(neo4j_bolt_connections_opened_total[5m]) > 100
    annotations:
      summary: "Bolt 连接打开速率异常"

模块总结

本文档涵盖了 Neo4j 的扩展模块,包括:

  1. Bolt 协议:高效的二进制通信协议,支持流式结果与背压控制
  2. Cypher 引擎:从语法解析到查询执行的完整流程,支持多种运行时
  3. Lucene 索引:基于 Apache Lucene 的范围索引与全文搜索
  4. 存储过程:扩展 Neo4j 功能的主要方式,支持 Java/Scala
  5. 配置管理:声明式配置定义,支持动态更新
  6. 监控指标:完善的 JMX 与 Prometheus 监控集成

这些模块共同构成了 Neo4j 的完整生态系统,理解它们的实现细节是进行高级开发与运维的基础。