Neo4j-03-扩展模块-Bolt·Cypher·索引·配置
模块一:Bolt 网络通信协议
概览
职责
- 实现 Bolt 二进制协议(v5.x)
- 管理客户端连接与会话
- 处理请求消息(HELLO、RUN、PULL、COMMIT等)
- 流式结果返回与背压控制
路径:community/bolt
架构图
flowchart TB
subgraph Client["客户端"]
Driver[Neo4j Driver]
end
subgraph BoltServer["Bolt Server"]
BoltSrv[BoltServer]
Connector[BoltConnector]
subgraph NettyPipeline["Netty Pipeline"]
TransportSelection[TransportSelectionHandler]
Handshake[ProtocolHandshakeHandler]
StructDecoder[BoltStructDecoder]
ReqHandler[RequestHandler]
StructEncoder[BoltStructEncoder]
end
subgraph ConnectionManagement["连接管理"]
ConnPool[Connection Pool]
ConnImpl[AbstractConnection]
StateMachine[BoltStateMachine]
end
subgraph MessageProcessing["消息处理"]
MsgDecoder[MessageDecoder]
MsgHandler[MessageHandler]
RespHandler[ResponseHandler]
end
end
subgraph GraphDB["Graph Database"]
DBMS[DatabaseManagementService]
Tx[Transaction]
end
Driver -->|TCP/WebSocket| BoltSrv
BoltSrv --> Connector
Connector --> TransportSelection
TransportSelection -->|SSL/Plain| Handshake
Handshake --> StructDecoder
StructDecoder --> ReqHandler
ReqHandler --> MsgDecoder
MsgDecoder --> MsgHandler
MsgHandler --> StateMachine
StateMachine --> ConnImpl
ConnImpl --> ConnPool
StateMachine --> DBMS
DBMS --> Tx
Tx --> RespHandler
RespHandler --> StructEncoder
Bolt 消息类型
1. 认证消息
HELLO
结构:{
tag: 0x01,
user_agent: "neo4j-driver/5.0",
routing: {...},
bolt_agent: {...}
}
- 作用:建立连接,协商协议版本
- 响应:SUCCESS(包含服务器信息)或 FAILURE
LOGON
结构:{
tag: 0x6A,
auth: {scheme: "basic", principal: "neo4j", credentials: "password"}
}
- 作用:用户认证(Bolt 5.1+ 支持)
- 响应:SUCCESS 或 FAILURE
2. 查询消息
RUN
结构:{
tag: 0x10,
query: "MATCH (n:Person) WHERE n.name = $name RETURN n",
parameters: {name: "Alice"},
extra: {db: "neo4j", timeout: 60000}
}
- 作用:执行 Cypher 查询
- 响应:SUCCESS(包含查询元数据:fields、t_first)
PULL
结构:{
tag: 0x3F,
extra: {n: 1000, qid: -1}
}
- 作用:拉取查询结果(流式)
- 响应:RECORD(多条)+ SUCCESS(包含 t_last、has_more)
DISCARD
结构:{
tag: 0x2F,
extra: {n: -1, qid: -1}
}
- 作用:丢弃剩余结果
- 响应:SUCCESS
3. 事务消息
BEGIN
结构:{
tag: 0x11,
extra: {bookmarks: [...], tx_timeout: 60000, tx_metadata: {...}}
}
- 作用:开始显式事务
- 响应:SUCCESS
COMMIT
结构:{
tag: 0x12
}
- 作用:提交事务
- 响应:SUCCESS(包含 bookmark)
ROLLBACK
结构:{
tag: 0x13
}
- 作用:回滚事务
- 响应:SUCCESS
4. 连接管理消息
RESET
结构:{
tag: 0x0F
}
- 作用:重置连接状态(中断当前查询)
- 响应:SUCCESS
GOODBYE
结构:{
tag: 0x02
}
- 作用:优雅关闭连接
- 响应:无(服务器关闭连接)
Bolt 状态机
stateDiagram-v2
[*] --> NEGOTIATION: 连接建立
NEGOTIATION --> AUTHENTICATION: HELLO
AUTHENTICATION --> READY: LOGON/SUCCESS
AUTHENTICATION --> FAILED: 认证失败
READY --> STREAMING: RUN
READY --> IN_TRANSACTION: BEGIN
READY --> READY: RESET
STREAMING --> READY: PULL/全部结果
STREAMING --> STREAMING: PULL/部分结果
STREAMING --> READY: DISCARD
IN_TRANSACTION --> IN_TRANSACTION: RUN
IN_TRANSACTION --> TX_STREAMING: RUN/有结果
IN_TRANSACTION --> READY: COMMIT
IN_TRANSACTION --> READY: ROLLBACK
TX_STREAMING --> IN_TRANSACTION: PULL/DISCARD
FAILED --> READY: RESET
FAILED --> [*]: GOODBYE
READY --> [*]: GOODBYE
核心代码示例
Connection 提交消息
// community/bolt/src/main/java/org/neo4j/bolt/protocol/common/connector/connection/AbstractConnection.java
public void submit(RequestMessage message) {
// 1. 创建消息信号
var signal = new MessageSignal(message);
// 2. 提交到状态机队列
this.inboundSignals.offer(signal);
// 3. 如果有可用工作线程,触发处理
if (this.tryAcquire()) {
this.executor.execute(() -> {
try {
// 处理所有待处理消息
this.processSignals();
} finally {
this.release();
}
});
}
}
private void processSignals() {
MessageSignal signal;
while ((signal = this.inboundSignals.poll()) != null) {
try {
// 调用状态机处理
this.fsm.process(signal.message(), this.responseHandler);
} catch (BoltConnectionFatality e) {
// 连接级错误,关闭连接
this.close();
break;
} catch (Exception e) {
// 发送错误响应
this.responseHandler.onFailure(Error.from(e));
}
}
}
状态机处理 RUN 消息
// 伪代码:状态机处理逻辑
public void handleRun(RunMessage message) {
// 1. 验证状态(必须在 READY 或 IN_TRANSACTION)
if (!isValidState()) {
throw new BoltProtocolException("Invalid state for RUN");
}
// 2. 开始事务(如果是隐式事务)
if (currentState == State.READY) {
this.currentTransaction = database.beginTransaction(
Type.IMPLICIT,
loginContext,
connectionInfo,
message.timeout()
);
}
// 3. 执行查询
Result result = currentTransaction.execute(
message.query(),
message.parameters()
);
// 4. 切换到 STREAMING 状态
this.currentState = State.STREAMING;
this.currentResult = result;
// 5. 发送 SUCCESS 响应(包含元数据)
responseHandler.onSuccess(Map.of(
"fields", result.columns(),
"t_first", result.executionPlanDescription().getTime()
));
}
模块二:Cypher 查询引擎
概览
职责
- Cypher 语法解析(ANTLR 4)
- 逻辑计划生成(AST → LogicalPlan)
- 查询优化(重写规则、成本估算)
- 物理计划生成(Pipelined/Slotted/Interpreted 运行时)
- 查询执行与结果返回
路径:community/cypher
架构图
flowchart TB
subgraph Compiler["Cypher Compiler"]
Parser[ANTLR Parser]
ASTBuilder[AST Builder]
SemanticAnalyzer[Semantic Analyzer]
subgraph LogicalPlanner["逻辑规划器"]
QueryGraph[QueryGraph Builder]
LogicalPlanProducer[LogicalPlan Producer]
Rewriter[Rewrite Rules]
end
subgraph CostPlanner["成本规划器"]
CardinalityEstimator[Cardinality Estimator]
CostModel[Cost Model]
PlanSelector[Plan Selector]
end
end
subgraph PhysicalPlanner["物理规划器"]
RuntimeSelector[Runtime Selector]
subgraph Runtimes["执行引擎"]
Interpreted[Interpreted]
Slotted[Slotted]
Pipelined[Pipelined]
end
end
subgraph Execution["查询执行"]
Operators[Operators]
Cursors[Cursor API]
KernelAPI[Kernel API]
end
Parser --> ASTBuilder
ASTBuilder --> SemanticAnalyzer
SemanticAnalyzer --> QueryGraph
QueryGraph --> LogicalPlanProducer
LogicalPlanProducer --> Rewriter
Rewriter --> CardinalityEstimator
CardinalityEstimator --> CostModel
CostModel --> PlanSelector
PlanSelector --> RuntimeSelector
RuntimeSelector --> Runtimes
Runtimes --> Operators
Operators --> Cursors
Cursors --> KernelAPI
查询编译流程
1. 语法解析
// 输入 Cypher 查询
"MATCH (p:Person)-[:KNOWS]->(f:Person) WHERE p.name = 'Alice' RETURN f.name"
// ANTLR 解析为 AST
Statement(
Query(
SingleQuery(
Clause[
Match(
Pattern[
PatternPart(
NodePattern("p", labels=["Person"]),
Rel("KNOWS"),
NodePattern("f", labels=["Person"])
)
],
where=Property("p", "name") = Literal("Alice")
),
Return(
ReturnItems[
Property("f", "name") as "f.name"
]
)
]
)
)
)
2. 语义分析
// 检查:
// - 变量是否已声明
// - 属性是否存在
// - 类型是否匹配
// - 聚合函数使用是否正确
// 输出:SemanticState
SemanticState(
symbolTable = Map(
"p" -> NodeType(labels=["Person"]),
"f" -> NodeType(labels=["Person"])
),
types = Map(
"f.name" -> StringType
)
)
3. 逻辑计划生成
// 逻辑计划(树形结构)
Projection(
columns = ["f.name"],
source = Filter(
predicate = Equals(Property(p, "name"), Literal("Alice")),
source = Expand(
from = "p",
relTypes = ["KNOWS"],
to = "f",
direction = OUTGOING,
source = NodeByLabelScan("p", "Person")
)
)
)
4. 查询重写
重写规则示例
// 规则 1:谓词下推(Predicate Pushdown)
// Before:
Filter(p.age > 30, Expand(p, r, q))
// After:
Expand(Filter(p.age > 30, NodeScan(p)), r, q)
// 规则 2:索引查找替换
// Before:
Filter(p.name = "Alice", NodeByLabelScan(p, "Person"))
// After:
NodeIndexSeek(p, "Person", "name", "Alice")
// 规则 3:合并过滤条件
// Before:
Filter(p.age > 30, Filter(p.name STARTS WITH "A", NodeScan(p)))
// After:
Filter(AND(p.age > 30, p.name STARTS WITH "A"), NodeScan(p))
5. 成本估算与计划选择
// 计划 A:全节点扫描 + 过滤
Plan_A = Filter(name="Alice", AllNodesScan("p"))
Cost_A = Rows(AllNodes) * CPU_Filter = 1,000,000 * 0.01 = 10,000
// 计划 B:索引查找
Plan_B = NodeIndexSeek("p", "Person", "name", "Alice")
Cost_B = Rows(Index) * CPU_Seek = 1 * 1.0 = 1
// 选择成本最低的计划:Plan_B
执行运行时对比
| 特性 | Interpreted | Slotted | Pipelined |
|---|---|---|---|
| 执行方式 | 火山模型(逐行拉取) | 火山模型 + 槽位优化 | 向量化批量处理 |
| 内存占用 | 高(对象分配) | 中(复用槽位) | 低(批量缓冲区) |
| CPU 效率 | 低(虚方法调用) | 中(减少对象创建) | 高(SIMD 友好) |
| 适用场景 | 小结果集 | 中等结果集 | 大结果集、聚合、排序 |
| 默认选择 | 否 | 是 | 自动切换(大数据量) |
核心算子
AllNodesScan(全节点扫描)
class AllNodesScan(variable: String) extends Operator {
override def init(state: QueryState): Iterator[ExecutionContext] = {
val cursor = state.cursors.allocateNodeCursor()
state.query.nodeReadOps().allNodesScan(cursor)
new Iterator[ExecutionContext] {
override def hasNext: Boolean = cursor.next()
override def next(): ExecutionContext = {
ExecutionContext.empty.set(variable, cursor.nodeReference())
}
}
}
}
Expand(关系扩展)
class Expand(
from: String,
relTypes: Seq[Int],
to: String,
direction: Direction,
source: Operator
) extends Operator {
override def init(state: QueryState): Iterator[ExecutionContext] = {
source.init(state).flatMap { row =>
val fromNode = row.getLong(from)
val relCursor = state.cursors.allocateRelationshipTraversalCursor()
state.query.nodeReadOps().relationships(
fromNode,
direction,
relTypes.toArray,
relCursor
)
new Iterator[ExecutionContext] {
override def hasNext: Boolean = relCursor.next()
override def next(): ExecutionContext = {
row.set(to, relCursor.otherNodeReference())
}
}
}
}
}
模块三:Lucene 索引
概览
职责
- 基于 Apache Lucene 的范围索引与复合索引
- 全文搜索索引(
fulltext-index模块) - 索引构建(Populator)与查询(Accessor)
- 索引更新监听器
路径:community/lucene-index、community/fulltext-index
索引类型
1. B-Tree 索引(Range Index)
-- 创建单属性索引
CREATE INDEX person_name_idx FOR (p:Person) ON (p.name)
-- 创建复合索引
CREATE INDEX person_name_age_idx FOR (p:Person) ON (p.name, p.age)
-- 使用索引查询
MATCH (p:Person) WHERE p.name = 'Alice' // 精确匹配
MATCH (p:Person) WHERE p.age > 30 AND p.age < 50 // 范围查询
底层实现
- Lucene DocValues:用于范围查询
- Lucene Terms:用于精确匹配
- 索引文件:
schema/index/lucene-1/1/(每个索引一个目录)
2. 全文索引(Fulltext Index)
-- 创建全文索引
CREATE FULLTEXT INDEX person_bio_idx
FOR (p:Person) ON EACH [p.bio, p.interests]
-- 全文搜索
CALL db.index.fulltext.queryNodes('person_bio_idx', 'graph database')
YIELD node, score
RETURN node.name, score
ORDER BY score DESC
分析器支持
- Standard Analyzer:默认,适用于英文
- Language-Specific Analyzers:中文(CJK)、日文、韩文等
- Custom Analyzers:自定义分词规则
3. 向量索引(Vector Index,5.11+)
-- 创建向量索引
CREATE VECTOR INDEX embedding_idx FOR (d:Document) ON (d.embedding)
OPTIONS {indexConfig: {`vector.dimensions`: 1536, `vector.similarity_function`: 'cosine'}}
-- 向量相似度搜索
MATCH (d:Document)
WHERE d.embedding IS NOT NULL
WITH d, vector.similarity.cosine(d.embedding, $queryVector) AS score
ORDER BY score DESC
LIMIT 10
RETURN d.title, score
索引更新机制
sequenceDiagram
participant Tx as Transaction
participant Engine as StorageEngine
participant Listener as IndexUpdateListener
participant Provider as IndexProvider
participant Lucene as Lucene Index
Tx->>Engine: commit(commands)
Engine->>Engine: apply(commands)
Engine->>Listener: onIndexUpdate(updates)
Listener->>Listener: 批量收集更新
Listener->>Provider: getOnlineAccessor(indexId)
Provider-->>Listener: IndexAccessor
Listener->>Lucene: newUpdater()
Lucene-->>Listener: IndexUpdater
loop 遍历更新
Listener->>Lucene: process(IndexEntryUpdate)
alt 新增
Lucene->>Lucene: addDocument(doc)
else 删除
Lucene->>Lucene: deleteDocuments(term)
else 修改
Lucene->>Lucene: updateDocument(term, doc)
end
end
Listener->>Lucene: close()
Lucene->>Lucene: flush segments
核心代码示例
索引查询
// community/lucene-index/src/main/java/org/neo4j/kernel/api/impl/index/LuceneIndexAccessor.java
public NodeValueIndexCursor query(
IndexQueryConstraints constraints,
PropertyIndexQuery... predicates
) throws IndexNotApplicableKernelException {
// 1. 构建 Lucene 查询
Query luceneQuery = toLuceneQuery(predicates);
// 2. 执行搜索
IndexSearcher searcher = searcherManager.acquire();
try {
TopDocs topDocs = searcher.search(luceneQuery, constraints.limit());
// 3. 转换为节点 ID 列表
long[] nodeIds = new long[topDocs.scoreDocs.length];
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
Document doc = searcher.doc(topDocs.scoreDocs[i].doc);
nodeIds[i] = Long.parseLong(doc.get("id"));
}
// 4. 返回游标
return new NodeValueIndexCursor(nodeIds, cursors);
} finally {
searcherManager.release(searcher);
}
}
private Query toLuceneQuery(PropertyIndexQuery... predicates) {
BooleanQuery.Builder builder = new BooleanQuery.Builder();
for (PropertyIndexQuery predicate : predicates) {
if (predicate instanceof PropertyIndexQuery.ExactPredicate exact) {
// 精确匹配
builder.add(
new TermQuery(new Term("prop_" + exact.propertyKeyId(), exact.value().toString())),
BooleanClause.Occur.MUST
);
} else if (predicate instanceof PropertyIndexQuery.RangePredicate<?> range) {
// 范围查询
builder.add(
NumericDocValuesField.newSlowRangeQuery(
"prop_" + range.propertyKeyId(),
(Long) range.from(),
(Long) range.to()
),
BooleanClause.Occur.MUST
);
}
}
return builder.build();
}
模块四:存储过程(Procedure)
概览
职责
- 存储过程注册与调用
- 用户自定义函数(UDF)
- 聚合函数(Aggregation Function)
- 注解处理(
@Procedure、@UserFunction)
路径:community/procedure、community/procedure-compiler
存储过程示例
编写存储过程
package com.example.procedures;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.procedure.*;
public class MyProcedures {
@Context
public GraphDatabaseService db;
@Context
public Transaction tx;
// 1. 只读存储过程
@Procedure(name = "example.findFriends", mode = Mode.READ)
@Description("Find all friends of a person")
public Stream<FriendResult> findFriends(@Name("personName") String name) {
return tx.findNodes(Label.label("Person"), "name", name)
.stream()
.flatMap(person -> StreamSupport.stream(
person.getRelationships(RelationshipType.withName("KNOWS"), Direction.OUTGOING).spliterator(),
false
))
.map(rel -> rel.getEndNode())
.map(friend -> new FriendResult(
friend.getId(),
(String) friend.getProperty("name"),
(Integer) friend.getProperty("age", 0)
));
}
// 2. 用户自定义函数(UDF)
@UserFunction(name = "example.fullName")
@Description("Concatenate first and last name")
public String fullName(
@Name("firstName") String first,
@Name("lastName") String last
) {
return first + " " + last;
}
// 3. 聚合函数
@UserAggregationFunction(name = "example.longestString")
@Description("Find the longest string in a collection")
public LongestStringAggregator longestString() {
return new LongestStringAggregator();
}
// 结果类(必须是 public static)
public static class FriendResult {
public long id;
public String name;
public int age;
public FriendResult(long id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
}
// 聚合器类
public static class LongestStringAggregator {
private String longest = "";
@UserAggregationUpdate
public void update(@Name("value") String value) {
if (value != null && value.length() > longest.length()) {
longest = value;
}
}
@UserAggregationResult
public String result() {
return longest;
}
}
}
调用存储过程
-- 调用存储过程
CALL example.findFriends('Alice')
YIELD id, name, age
WHERE age > 30
RETURN name, age
-- 使用 UDF
RETURN example.fullName('John', 'Doe') AS fullName
-- 使用聚合函数
MATCH (p:Person)
RETURN example.longestString(p.bio) AS longestBio
注册存储过程
// 方式 1:自动扫描(classpath)
// 将 JAR 放置到 Neo4j plugins/ 目录
// 方式 2:编程方式注册
GlobalProcedures procedures = db.getDependencyResolver()
.resolveDependency(GlobalProcedures.class);
procedures.registerProcedure(MyProcedures.class);
procedures.registerFunction(MyProcedures.class);
procedures.registerAggregationFunction(MyProcedures.class);
模块五:配置与监控
配置管理(Configuration)
路径:community/configuration
配置项定义
// community/configuration/src/main/java/org/neo4j/configuration/GraphDatabaseSettings.java
public class GraphDatabaseSettings {
@Description("Size of the page cache")
public static final Setting<ByteUnit> pagecache_memory =
newBuilder("server.memory.pagecache.size", BYTES, null)
.addConstraint(min(ByteUnit.mebiBytes(8)))
.build();
@Description("Transaction timeout")
public static final Setting<Duration> transaction_timeout =
newBuilder("db.transaction.timeout", DURATION, Duration.ZERO)
.addConstraint(min(Duration.ZERO))
.build();
@Description("Default database name")
public static final Setting<String> default_database =
newBuilder("dbms.default_database", STRING, DEFAULT_DATABASE_NAME)
.immutable()
.build();
}
动态配置更新
-- 查看当前配置
CALL dbms.listConfig() YIELD name, value
WHERE name STARTS WITH 'db.transaction'
RETURN name, value
-- 更新动态配置(需重启的配置无法动态更新)
CALL dbms.setConfigValue('db.transaction.timeout', '120s')
监控指标(Monitoring)
路径:community/monitoring
JMX Metrics
// 注册 JMX Bean
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName("org.neo4j:instance=kernel#0,name=Transactions");
mbs.registerMBean(new TransactionMetrics(), name);
// TransactionMetrics 接口
public interface TransactionMetricsMBean {
long getNumberOfOpenTransactions();
long getNumberOfCommittedTransactions();
long getPeakNumberOfConcurrentTransactions();
long getNumberOfRolledBackTransactions();
}
Prometheus Metrics 导出
# neo4j.conf
server.metrics.enabled=true
server.metrics.prometheus.enabled=true
server.metrics.prometheus.endpoint=:2004
# Prometheus 指标格式
# TYPE neo4j_transaction_started_total counter
neo4j_transaction_started_total{database="neo4j"} 12345
# TYPE neo4j_page_cache_hit_ratio gauge
neo4j_page_cache_hit_ratio 0.95
# TYPE neo4j_bolt_connections_opened_total counter
neo4j_bolt_connections_opened_total 678
关键监控指标
| 指标名称 | 类型 | 说明 |
|---|---|---|
neo4j_transaction_active |
Gauge | 当前活跃事务数 |
neo4j_transaction_committed_total |
Counter | 累计提交事务数 |
neo4j_transaction_rollbacks_total |
Counter | 累计回滚事务数 |
neo4j_page_cache_hit_ratio |
Gauge | Page Cache 命中率(目标 >90%) |
neo4j_page_cache_evictions_total |
Counter | 页面驱逐总数 |
neo4j_page_cache_flushes_total |
Counter | 页面刷新总数 |
neo4j_bolt_connections_opened_total |
Counter | Bolt 连接打开总数 |
neo4j_bolt_connections_closed_total |
Counter | Bolt 连接关闭总数 |
neo4j_bolt_messages_received_total |
Counter | 接收的 Bolt 消息总数 |
neo4j_cypher_replan_events_total |
Counter | 查询重编译次数 |
neo4j_ids_in_use{type="NODE"} |
Gauge | 已分配的节点 ID 数量 |
neo4j_ids_in_use{type="RELATIONSHIP"} |
Gauge | 已分配的关系 ID 数量 |
最佳实践汇总
1. Bolt 连接池配置
客户端驱动配置
import org.neo4j.driver.*;
public class DriverConfig {
public static Driver createDriver() {
return GraphDatabase.driver(
"bolt://localhost:7687",
AuthTokens.basic("neo4j", "password"),
Config.builder()
.withMaxConnectionPoolSize(50) // 最大连接数
.withConnectionAcquisitionTimeout(60, TimeUnit.SECONDS)
.withConnectionTimeout(15, TimeUnit.SECONDS)
.withMaxTransactionRetryTime(30, TimeUnit.SECONDS)
.build()
);
}
}
2. Cypher 查询优化
优化前
// ❌ 低效:笛卡尔积
MATCH (p:Person), (c:Company)
WHERE p.works_at = c.id
RETURN p.name, c.name
优化后
// ✅ 高效:直接关系遍历
MATCH (p:Person)-[:WORKS_AT]->(c:Company)
RETURN p.name, c.name
使用 EXPLAIN 分析
EXPLAIN
MATCH (p:Person)-[:KNOWS*1..3]->(f:Person)
WHERE p.name = 'Alice'
RETURN f.name
3. 索引最佳实践
创建索引时机
-- 1. 频繁查询的属性
CREATE INDEX person_email_idx FOR (p:Person) ON (p.email)
-- 2. 唯一性约束(自动创建唯一索引)
CREATE CONSTRAINT person_email_unique FOR (p:Person) REQUIRE p.email IS UNIQUE
-- 3. 复合索引(查询多个属性)
CREATE INDEX person_name_age_idx FOR (p:Person) ON (p.name, p.age)
监控索引使用情况
-- 查看索引列表
SHOW INDEXES
-- 查看索引统计
CALL db.index.fulltext.listAvailableAnalyzers()
4. 监控告警阈值
Prometheus 告警规则
groups:
- name: neo4j_alerts
rules:
# Page Cache 命中率过低
- alert: LowPageCacheHitRatio
expr: neo4j_page_cache_hit_ratio < 0.85
for: 5m
annotations:
summary: "Page Cache 命中率低于 85%"
# 活跃事务数过多
- alert: HighActiveTransactions
expr: neo4j_transaction_active > 100
for: 3m
annotations:
summary: "活跃事务数超过 100"
# Bolt 连接数异常增长
- alert: BoltConnectionSpike
expr: rate(neo4j_bolt_connections_opened_total[5m]) > 100
annotations:
summary: "Bolt 连接打开速率异常"
模块总结
本文档涵盖了 Neo4j 的扩展模块,包括:
- Bolt 协议:高效的二进制通信协议,支持流式结果与背压控制
- Cypher 引擎:从语法解析到查询执行的完整流程,支持多种运行时
- Lucene 索引:基于 Apache Lucene 的范围索引与全文搜索
- 存储过程:扩展 Neo4j 功能的主要方式,支持 Java/Scala
- 配置管理:声明式配置定义,支持动态更新
- 监控指标:完善的 JMX 与 Prometheus 监控集成
这些模块共同构成了 Neo4j 的完整生态系统,理解它们的实现细节是进行高级开发与运维的基础。