Elasticsearch-06-存储引擎
本文档提供存储引擎模块的全面剖析,包括模块职责、架构设计、核心API、数据结构、时序图以及与Lucene的集成。
1. 模块职责
存储引擎(Engine)是Elasticsearch与Apache Lucene集成的关键层,负责所有持久化存储操作。
1.1 核心职责
-
文档索引
- 将文档写入Lucene索引
- 管理文档版本和序列号
- 处理文档更新和删除
-
事务日志(Translog)
- 记录所有写操作到事务日志
- 保证数据持久性
- 支持故障恢复
-
段管理
- Refresh: 生成新的可搜索段
- Flush: 提交段到磁盘
- Merge: 合并小段为大段
-
版本控制
- 管理LiveVersionMap
- 实现乐观并发控制
- 处理版本冲突
-
软删除(Soft Deletes)
- 标记文档为已删除而不立即物理删除
- 支持跨集群复制(CCR)
- 优化恢复性能
-
搜索支持
- 提供Searcher供查询使用
- 管理DirectoryReader
- 控制搜索可见性
1.2 输入与输出
输入:
- Index/Delete/Update操作(来自IndexShard)
- Refresh/Flush/Merge请求
输出:
- IndexWriter(Lucene写入)
- DirectoryReader(Lucene读取)
- Translog(事务日志)
1.3 依赖关系
上游依赖:
- Apache Lucene: 底层存储引擎
- Store: 文件系统抽象
- TranslogConfig: 事务日志配置
下游依赖者:
- IndexShard: 分片管理
- SearchService: 搜索服务
2. 模块架构
2.1 整体服务架构图
flowchart TB
subgraph "客户端层"
Client[REST Client<br/>_bulk / _doc APIs]
end
subgraph "请求处理层"
TransportAction[TransportBulkAction<br/>TransportIndexAction]
BulkProcessor[BulkProcessor<br/>批量处理器]
end
subgraph "分片管理层"
IndexShard[IndexShard<br/>分片管理]
ReplicationOp[ReplicationOperation<br/>复制协调]
end
subgraph "存储引擎层 (Engine)"
Engine[Engine<br/>抽象接口]
InternalEngine[InternalEngine<br/>读写实现]
ReadOnlyEngine[ReadOnlyEngine<br/>只读实现]
end
subgraph "核心组件层"
VersionMap[LiveVersionMap<br/>版本映射]
Translog[Translog<br/>事务日志]
LocalCheckpoint[LocalCheckpointTracker<br/>检查点跟踪]
Throttle[IndexThrottle<br/>索引限流]
end
subgraph "Lucene 层"
IndexWriter[IndexWriter<br/>写入器]
DirectoryReader[DirectoryReader<br/>读取器]
ReaderManager[ReaderManager<br/>Reader管理]
SegmentInfos[SegmentInfos<br/>段元数据]
end
subgraph "存储层"
Directory[FSDirectory<br/>文件目录]
TranslogFiles[Translog Files<br/>translog-N.tlog]
SegmentFiles[Segment Files<br/>segments_N]
end
Client --> TransportAction
TransportAction --> BulkProcessor
BulkProcessor --> IndexShard
IndexShard --> ReplicationOp
ReplicationOp --> |Primary| IndexShard
ReplicationOp --> |Replica| IndexShard
IndexShard --> Engine
Engine --> InternalEngine
Engine --> ReadOnlyEngine
InternalEngine --> VersionMap
InternalEngine --> Translog
InternalEngine --> LocalCheckpoint
InternalEngine --> Throttle
InternalEngine --> IndexWriter
InternalEngine --> ReaderManager
IndexWriter --> SegmentInfos
IndexWriter --> Directory
ReaderManager --> DirectoryReader
DirectoryReader --> Directory
Translog --> TranslogFiles
Directory --> SegmentFiles
style InternalEngine fill:#e1f5ff
style VersionMap fill:#e1ffe1
style Translog fill:#ffe1e1
style IndexWriter fill:#fff4e1
2.2 架构层次说明
2.2.1 请求处理层
TransportBulkAction / TransportIndexAction
- 接收客户端的索引/批量请求
- 路由请求到目标分片
- 协调Primary和Replica的复制流程
- 处理响应和错误
调用链路:
POST /index/_doc/1 → TransportIndexAction
POST /_bulk → TransportBulkAction → BulkProcessor
2.2.2 分片管理层
IndexShard
- 管理单个分片的生命周期
- 封装Engine的调用
- 执行Pre/Post索引监听器
- 处理Primary和Replica的差异化逻辑
核心方法:
applyIndexOperationOnPrimary(): Primary分片索引applyIndexOperationOnReplica(): Replica分片索引prepareIndex(): 准备索引操作(解析文档、动态映射)
ReplicationOperation
- 协调Primary和Replica的复制
- 等待Replica确认
- 处理复制失败和重试
2.2.3 存储引擎层
InternalEngine (核心)
- 默认的读写引擎实现
- 封装Lucene IndexWriter和DirectoryReader
- 管理所有写入和读取操作
- 实现事务日志、版本控制、检查点跟踪
ReadOnlyEngine
- 只读引擎实现
- 用于冻结索引(Frozen Index)
- 仅支持读取操作
2.2.4 核心组件层
LiveVersionMap
- 内存中的文档版本映射
- Key: 文档UID, Value: VersionValue
- 功能:
- 实时GET支持(存储Translog位置)
- 版本冲突检测
- 优化自动生成ID的场景(Unsafe模式)
Translog (事务日志)
- 顺序写入的操作日志
- Generation机制(translog-1.tlog, translog-2.tlog, …)
- 保证数据持久性(fsync策略)
- 支持故障恢复和Replica追赶
LocalCheckpointTracker
- 跟踪已处理和已持久化的序列号
- 维护本地检查点(Local Checkpoint)
- 用于复制进度跟踪和数据一致性
IndexThrottle
- 索引限流机制
- 当Merge落后时暂停索引写入
- 保护内存和磁盘IO
2.2.5 Lucene层
IndexWriter
- Lucene的核心写入器
- 管理段(Segments)的创建
- 执行段合并(Merge)
- 维护倒排索引、文档存储、DocValues
DirectoryReader / ReaderManager
- Lucene的读取器
- 提供搜索能力
- 通过Refresh更新到最新数据
- ReaderManager管理Reader的生命周期
SegmentInfos
- 段的元数据信息
- 包含所有段的列表
- 版本号和Generation
2.3 模块交互矩阵
| 调用方 | 被调方 | 调用方式 | 数据流 | 错误处理 | 一致性要求 |
|---|---|---|---|---|---|
| IndexShard | InternalEngine | 同步调用 | Index/Delete操作 | 版本冲突抛异常 | 强一致(Primary) |
| InternalEngine | LiveVersionMap | 同步调用(加锁) | 版本检查/更新 | 无 | 串行化 |
| InternalEngine | Translog | 同步写入 | 操作日志 | IO异常引擎失败 | WAL保证 |
| InternalEngine | IndexWriter | 同步调用 | Lucene文档 | IO异常引擎失败 | 最终一致 |
| InternalEngine | LocalCheckpointTracker | 同步更新 | SeqNo处理/持久化 | 无 | 单调递增 |
| RefreshScheduler | InternalEngine | 定时触发 | Refresh请求 | 异常忽略 | 最终可见 |
| FlushPolicy | InternalEngine | 条件触发 | Flush请求 | 失败重试 | 持久化保证 |
| MergeScheduler | IndexWriter | 后台异步 | Merge任务 | 异常记录 | 最终合并 |
2.4 关键设计决策
1. WAL (Write-Ahead Log)机制
- Translog先于Lucene写入
- 保证操作不丢失
- 支持故障恢复
2. 两级Reader (Internal vs External)
- Internal Reader: 包含所有文档(含未提交的)
- External Reader: 仅包含已Refresh的文档
- 用途区分: 内部操作 vs 用户查询
3. 版本控制策略
- 乐观并发控制(Optimistic Concurrency Control)
- LiveVersionMap提供UID级别的锁
- 支持多种版本类型(Internal/External/ExternalGTE)
4. Sequence Number机制
- 全局唯一、单调递增
- 用于复制顺序和一致性
- Primary分配、Replica沿用
5. Soft Deletes (软删除)
- 删除操作写入Tombstone文档
- 不立即物理删除
- 支持CCR和更高效的恢复
3. 核心流程与调用链路
3.1 完整索引流程 (从API到存储)
3.1.1 上游调用链路图
flowchart TB
Start[客户端请求<br/>POST /index/_doc/1] --> TransportAction[TransportIndexAction]
TransportAction --> |路由到分片| IndexShard[IndexShard]
IndexShard --> PrepareCheck{是Primary?}
PrepareCheck --> |Yes| PrimaryPath[applyIndexOperationOnPrimary]
PrepareCheck --> |No| ReplicaPath[applyIndexOperationOnReplica]
PrimaryPath --> PrepareIndex[prepareIndex<br/>解析文档/动态映射]
ReplicaPath --> PrepareIndex
PrepareIndex --> Listeners[indexingOperationListeners.preIndex]
Listeners --> EngineIndex[engine.index<br/>调用InternalEngine]
EngineIndex --> VersionLock[获取UID锁<br/>versionMap.acquireLock]
VersionLock --> VersionCheck[版本检查<br/>getUnderLock]
VersionCheck --> |冲突| VersionConflict[抛出VersionConflictException]
VersionCheck --> |通过| SeqNoAssign{是Primary?}
SeqNoAssign --> |Yes| GenerateSeqNo[生成SeqNo<br/>localCheckpoint + 1]
SeqNoAssign --> |No| UseSeqNo[使用Primary的SeqNo]
GenerateSeqNo --> Translog[写入Translog<br/>translog.add]
UseSeqNo --> Translog
Translog --> Lucene[写入Lucene<br/>IndexWriter.updateDocument]
Lucene --> UpdateVM[更新LiveVersionMap<br/>putIndexUnderLock]
UpdateVM --> UpdateLCP[更新LocalCheckpointTracker<br/>markSeqNoAsProcessed]
UpdateLCP --> ReleaseLock[释放UID锁]
ReleaseLock --> PostListeners[indexingOperationListeners.postIndex]
PostListeners --> End[返回IndexResult]
style EngineIndex fill:#e1f5ff
style Translog fill:#ffe1e1
style Lucene fill:#fff4e1
style UpdateVM fill:#e1ffe1
3.1.2 关键代码路径详解
步骤1: IndexShard接收请求
// IndexShard.java: 953-976行
public Engine.IndexResult applyIndexOperationOnPrimary(
long version,
VersionType versionType,
SourceToParse sourceToParse,
long ifSeqNo,
long ifPrimaryTerm,
long autoGeneratedTimestamp,
boolean isRetry
) throws IOException {
// 调用通用索引方法,标记为PRIMARY来源
return applyIndexOperation(
getEngine(),
UNASSIGNED_SEQ_NO, // Primary自己分配seqNo
getOperationPrimaryTerm(),
version,
versionType,
ifSeqNo,
ifPrimaryTerm,
autoGeneratedTimestamp,
isRetry,
Engine.Operation.Origin.PRIMARY, // 标记为Primary操作
sourceToParse
);
}
关键字段说明:
UNASSIGNED_SEQ_NO: Primary需要自己生成序列号getOperationPrimaryTerm(): 当前Primary任期Origin.PRIMARY: 标识这是Primary的写操作
步骤2: 准备Index操作
// IndexShard.java: 1001-1046行
private Engine.IndexResult applyIndexOperation(
Engine engine,
long seqNo,
long opPrimaryTerm,
long version,
VersionType versionType,
// ... 其他参数
) throws IOException {
// 检查是否允许写入
ensureWriteAllowed(origin);
Engine.Index operation;
try {
// 准备索引操作:解析文档、处理动态映射
operation = prepareIndex(
mapperService,
sourceToParse,
seqNo,
opPrimaryTerm,
version,
versionType,
origin,
autoGeneratedTimeStamp,
isRetry,
ifSeqNo,
ifPrimaryTerm,
getRelativeTimeInNanos()
);
// 检查是否需要动态映射更新
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
// 返回需要映射更新的结果,由上层处理
return new Engine.IndexResult(update, operation.parsedDoc().id());
}
} catch (Exception e) {
// 文档级别失败(解析错误、映射失败等)
verifyNotClosed(e);
return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo, sourceToParse.id());
}
// 执行索引操作
return index(engine, operation);
}
步骤3: 调用Engine.index
// IndexShard.java: 1116-1133行
private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
try {
final Engine.IndexResult result;
// 执行前置监听器(统计、日志等)
final Engine.Index preIndex = indexingOperationListeners.preIndex(shardId, index);
try {
if (logger.isTraceEnabled()) {
logger.trace(
"index [{}] seq# [{}] allocation-id [{}] primaryTerm [{}] operationPrimaryTerm [{}] origin [{}]",
preIndex.id(),
preIndex.seqNo(),
routingEntry().allocationId(),
preIndex.primaryTerm(),
getOperationPrimaryTerm(),
preIndex.origin()
);
}
// 调用Engine的index方法
result = engine.index(preIndex);
// 日志记录结果
if (logger.isTraceEnabled()) {
logger.trace("index-done [{}] seq# [{}] result-seq# [{}] result-term [{}] failure [{}]",
preIndex.id(), preIndex.seqNo(), result.getSeqNo(), result.getTerm(), result.getFailure());
}
} catch (Exception e) {
// 异常处理...
}
} finally {
// 执行后置监听器
}
return result;
}
步骤4: InternalEngine核心索引逻辑
// InternalEngine.java: 1175-1309行
@Override
public IndexResult index(Index index) throws IOException {
final boolean doThrottle = index.origin().isRecovery() == false;
try (var ignored1 = acquireEnsureOpenRef()) {
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
int reservedDocs = 0;
try (
// 获取文档级别的锁(基于UID)
Releasable ignored = versionMap.acquireLock(index.uid());
// 如果需要,获取限流令牌
Releasable indexThrottle = doThrottle ? throttle.acquireThrottle() : () -> {}
) {
lastWriteNanos = index.startTime();
/*
* 自动生成ID优化:
* 如果文档ID是自动生成的且无重试,可以直接使用addDocument
* 而不是updateDocument,避免版本检查和查找开销
*/
final IndexingStrategy plan;
// 第一步:制定索引策略(版本检查、冲突检测)
if (index.origin() == Operation.Origin.PRIMARY) {
plan = planIndexingAsPrimary(index);
} else {
plan = planIndexingAsNonPrimary(index);
}
final IndexResult indexResult;
// 如果策略指示有预检错误,直接返回
if (plan.earlyResultOnPreflightError.isPresent()) {
indexResult = plan.earlyResultOnPreflightError.get();
} else {
// 第二步:写入Lucene
if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
indexResult = indexIntoLucene(index, plan);
} else {
// 不需要写Lucene(例如已存在的更老版本)
indexResult = new IndexResult(
plan.versionForIndexing,
index.primaryTerm(),
index.seqNo(),
plan.currentNotFoundOrDeleted,
index.id()
);
}
}
// 第三步:写入Translog(如果不是来自Translog)
if (index.origin().isFromTranslog() == false) {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
// 成功:写入Index操作
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// 失败但有seqNo:写入NoOp
final NoOp noOp = new NoOp(
indexResult.getSeqNo(),
index.primaryTerm(),
index.origin(),
index.startTime(),
indexResult.getFailure().toString()
);
location = innerNoOp(noOp).getTranslogLocation();
} else {
location = null;
}
indexResult.setTranslogLocation(location);
}
// 第四步:更新LiveVersionMap
if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location translogLocation =
trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
versionMap.maybePutIndexUnderLock(
index.uid(),
new IndexVersionValue(
translogLocation, // Translog位置(用于实时GET)
plan.versionForIndexing,
index.seqNo(),
index.primaryTerm()
)
);
}
// 第五步:更新LocalCheckpointTracker
localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
if (indexResult.getTranslogLocation() == null) {
// 来自Translog或无seqNo,标记为已持久化
localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo());
}
indexResult.setTook(relativeTimeInNanosSupplier.getAsLong() - index.startTime());
indexResult.freeze();
return indexResult;
} finally {
releaseInFlightDocs(reservedDocs);
}
} catch (RuntimeException | IOException e) {
// 引擎失败处理
try {
if (e instanceof AlreadyClosedException == false && treatDocumentFailureAsTragicError(index)) {
failEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
} else {
maybeFailEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
}
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
}
步骤5: 版本检查策略 (Primary)
// InternalEngine.java: 1356-1408行
private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
// 自动生成ID且非重试:可能使用优化路径
final boolean canOptimizeAddDocument = index.isRetry() == false
&& index.autoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
if (canOptimizeAddDocument) {
// 检查是否真的可以优化(没有重复风险)
if (mayHaveBeenIndexedBefore(index)) {
// 有重复风险,使用普通路径
versionMap.enforceSafeAccess();
} else {
// 可以优化:直接addDocument,无需版本检查
return IndexingStrategy.optimizedAppendOnly(index.version(), 0);
}
} else {
// 非自动ID,总是需要版本检查
versionMap.enforceSafeAccess();
}
// 从VersionMap获取当前版本
final OpVsLuceneDocStatus opVsLucene;
final VersionValue versionValue = versionMap.getUnderLock(index.uid());
if (versionValue != null) {
// 存在版本信息:检查冲突
final long currentVersion = versionValue.version;
final VersionType versionType = index.versionType();
// 版本冲突检测
if (versionType.isVersionConflictForWrites(
currentVersion,
index.version(),
versionValue.isDelete()
)) {
// 抛出版本冲突异常
final String message = versionType.explainConflictForWrites(
currentVersion,
index.version(),
versionValue.isDelete()
);
return IndexingStrategy.failWithVersionConflict(
new VersionConflictEngineException(shardId, index, currentVersion, versionValue.isDelete(), message),
currentVersion,
0
);
}
// 检查if_seq_no和if_primary_term条件
if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// Compare-and-swap检查
final boolean samePrimaryTerm = index.getIfPrimaryTerm() == versionValue.term;
final boolean sameSeqNo = index.getIfSeqNo() == versionValue.seqNo;
if (samePrimaryTerm == false || sameSeqNo == false) {
return IndexingStrategy.failWithVersionConflict(...);
}
}
// 通过版本检查
opVsLucene = compareOpToVersionMapOnSeqNo(
index.id(),
index.seqNo(),
index.primaryTerm(),
versionValue
);
} else {
// 没有版本信息:检查Lucene
opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
}
// 构建索引策略
return IndexingStrategy.processNormally(
opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
versionType.updateVersion(currentVersion, index.version()),
0
);
}
3.2 简化流程图
3.2.1 核心步骤总览
flowchart TB
Start[开始: index操作] --> VersionCheck[步骤1: 版本检查<br/>LiveVersionMap]
VersionCheck --> |冲突| Reject[拒绝操作]
VersionCheck --> |通过| AssignSeqNo[步骤2: 分配SeqNo<br/>Primary生成]
AssignSeqNo --> WriteTranslog[步骤3: 写入Translog<br/>WAL保证]
WriteTranslog --> WriteLucene[步骤4: 写入Lucene<br/>IndexWriter]
WriteLucene --> UpdateVersionMap[步骤5: 更新VersionMap<br/>记录版本]
UpdateVersionMap --> UpdateCheckpoint[步骤6: 更新Checkpoint<br/>LocalCheckpointTracker]
UpdateCheckpoint --> End[完成]
style WriteTranslog fill:#ffe1e1
style WriteLucene fill:#fff4e1
style UpdateVersionMap fill:#e1ffe1
3.2.2 关键步骤说明
- 版本检查: 检查LiveVersionMap中的当前版本,防止冲突
- 分配序列号: Primary分配唯一的seqNo,保证顺序
- 写入Translog: WAL机制,保证持久性
- 写入Lucene: 通过IndexWriter写入,此时还不可搜索
- 更新VersionMap: 记录最新版本和Translog位置(用于实时GET)
- 更新Checkpoint: 跟踪序列号处理进度
3.3 Refresh流程 (使文档可搜索)
3.3.1 Refresh调用链路
flowchart TB
Start[触发源<br/>定时/手动/索引缓冲满] --> Scheduler[RefreshScheduler<br/>或手动调用]
Scheduler --> EngineRefresh[InternalEngine.refresh]
EngineRefresh --> GetRM[获取ReferenceManager<br/>External/Internal]
GetRM --> ReadLock[获取Engine读锁<br/>防止Engine关闭]
ReadLock --> CheckScope{Blocking?}
CheckScope --> |Yes| BlockingRefresh[maybeRefreshBlocking<br/>同步刷新]
CheckScope --> |No| MaybeRefresh[maybeRefresh<br/>尝试刷新]
BlockingRefresh --> IWGetReader[IndexWriter.getReader<br/>或commit]
MaybeRefresh --> IWGetReader
IWGetReader --> FlushBuffer[刷新内存缓冲<br/>生成Segment]
FlushBuffer --> CreateReader[创建新DirectoryReader]
CreateReader --> UpdateRM[更新ReferenceManager<br/>原子切换Reader]
UpdateRM --> CloseOld[关闭旧Reader<br/>RefCount减1]
CloseOld --> TriggerListener[触发RefreshListener]
TriggerListener --> PruneVM[清理LiveVersionMap<br/>pruneVersionMap]
PruneVM --> UpdateCheckpoint[更新refreshedCheckpoint]
UpdateCheckpoint --> ReleaseLock[释放读锁]
ReleaseLock --> End[完成]
style IWGetReader fill:#fff4e1
style CreateReader fill:#fff4e1
style PruneVM fill:#e1ffe1
3.3.2 Refresh关键代码
步骤1: InternalEngine.refresh入口
// InternalEngine.java: 2074-2137行
protected final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {
// 记录refresh前的本地检查点
final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
boolean refreshed = false;
long segmentGeneration = RefreshResult.UNKNOWN_GENERATION;
try {
// 增加store引用计数,防止关闭
if (store.tryIncRef()) {
try {
// 获取对应的ReferenceManager (External或Internal)
ReferenceManager<ElasticsearchDirectoryReader> referenceManager =
getReferenceManager(scope);
long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration();
// 获取Engine读锁,防止与Engine关闭并发
final var engineReadLock = engineConfig.getEngineResetLock().readLock();
if (block) {
// 同步刷新:阻塞直到完成
engineReadLock.lock();
try {
referenceManager.maybeRefreshBlocking();
segmentGeneration = segmentGenerationAfterRefresh(
referenceManager,
generationBeforeRefresh
);
refreshed = true;
} finally {
engineReadLock.unlock();
}
} else {
// 异步刷新:尝试获取锁
if (engineReadLock.tryLock()) {
try {
refreshed = referenceManager.maybeRefresh();
if (refreshed) {
segmentGeneration = segmentGenerationAfterRefresh(
referenceManager,
generationBeforeRefresh
);
}
} finally {
engineReadLock.unlock();
}
}
}
} finally {
store.decRef();
}
// 更新已刷新的检查点
if (refreshed) {
lastRefreshedCheckpointListener.updateRefreshedCheckpoint(
localCheckpointBeforeRefresh
);
}
} else {
refreshed = false;
}
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (Exception e) {
try {
failEngine("refresh failed source[" + source + "]", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new RefreshFailedEngineException(shardId, e);
}
return new RefreshResult(refreshed, segmentGeneration);
}
步骤2: ReferenceManager刷新
// Lucene ReferenceManager的核心方法
public final boolean maybeRefresh() throws IOException {
ensureOpen();
// 获取最新的Reader
final G refreshedReference = refreshIfNeeded(current);
if (refreshedReference != null) {
assert refreshedReference != current;
// 原子切换到新Reader
boolean success = false;
try {
// 通知监听器:beforeRefresh
for (RefreshListener listener : refreshListeners) {
listener.beforeRefresh();
}
// 原子替换
release(current);
current = refreshedReference;
success = true;
// 通知监听器:afterRefresh
for (RefreshListener listener : refreshListeners) {
listener.afterRefresh(true);
}
} finally {
if (!success) {
release(refreshedReference);
}
}
return true;
}
return false;
}
步骤3: LiveVersionMap清理
// InternalEngine内的RefreshListener实现
private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener {
final AtomicLong refreshedCheckpoint;
private long pendingCheckpoint;
@Override
public void beforeRefresh() {
// Refresh前:记录当前的处理检查点
// 所有 <= pendingCheckpoint 的操作都将在Refresh后可见
pendingCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
}
@Override
public void afterRefresh(boolean didRefresh) {
if (didRefresh) {
// Refresh后:更新已刷新的检查点
updateRefreshedCheckpoint(pendingCheckpoint);
// 清理LiveVersionMap中已刷到Lucene的条目
pruneVersionMap(pendingCheckpoint);
}
}
void pruneVersionMap(long refreshedSeqNo) {
// 遍历VersionMap,清理已持久化到Lucene的Location
versionMap.pruneTombstones(refreshedSeqNo, relativeTimeInNanosSupplier.getAsLong());
// 对于 seqNo <= refreshedSeqNo 的IndexVersionValue:
// - 清除translogLocation字段(因为已经在Lucene中)
// - 保留version/seqNo/term(仍需用于版本检查)
}
}
3.3.3 Refresh触发条件
| 触发方式 | 条件 | 频率 | 阻塞性 |
|---|---|---|---|
| 定时触发 | index.refresh_interval |
默认1秒 | 非阻塞 |
| 手动触发 | POST /_refresh |
按需 | 阻塞 |
| 索引缓冲满 | indices.memory.index_buffer_size |
动态 | 阻塞 |
| Flush触发 | Flush操作前 | 按需 | 阻塞 |
| 实时GET | get.realtime=true |
按需 | 条件刷新 |
Refresh作用:
- 使写入的文档可被搜索
- 创建新的Lucene段(Segment)
- 清理LiveVersionMap内存
- 更新DirectoryReader
3.4 Flush流程 (持久化到磁盘)
3.4.1 Flush调用链路
flowchart TB
Start[触发源<br/>Translog满/定时] --> FlushPolicy[FlushPolicy<br/>shouldPeriodicallyFlush]
FlushPolicy --> EngineFlush[InternalEngine.flush]
EngineFlush --> AcquireLock[获取flushLock<br/>防止并发Flush]
AcquireLock --> CheckCondition{需要Flush?}
CheckCondition --> |No| SkipFlush[跳过Flush]
CheckCondition --> |Yes| PreRefresh[步骤1: Refresh<br/>刷新到新段]
PreRefresh --> RollTranslog[步骤2: 滚动Translog<br/>translog.rollGeneration]
RollTranslog --> PrepareCommit[步骤3: 准备提交<br/>IndexWriter.prepareCommit]
PrepareCommit --> WriteSegmentsN[写入segments_N文件<br/>包含提交元数据]
WriteSegmentsN --> FinishCommit[步骤4: 完成提交<br/>IndexWriter.finishCommit]
FinishCommit --> Fsync[步骤5: Fsync所有文件<br/>directory.sync]
Fsync --> TrimTranslog[步骤6: 清理Translog<br/>trimUnreferencedReaders]
TrimTranslog --> DeleteOld[删除旧generation<br/>translog-1.tlog...]
DeleteOld --> UpdateMetrics[更新统计信息<br/>lastFlushTimestamp]
UpdateMetrics --> ReleaseLock[释放flushLock]
ReleaseLock --> Notify[通知FlushListener]
Notify --> End[完成]
style RollTranslog fill:#ffe1e1
style WriteSegmentsN fill:#fff4e1
style Fsync fill:#e1f5ff
3.4.2 Flush关键代码
步骤1: 判断是否需要Flush
// InternalEngine.java: 2213-2242行
private boolean shouldPeriodicallyFlush(long flushThresholdSizeInBytes, long flushThresholdAgeInNanos) {
ensureOpen();
// 检查是否在大合并后需要Flush
if (shouldPeriodicallyFlushAfterBigMerge.get()) {
return true;
}
// 获取上次提交的本地检查点
final long localCheckpointOfLastCommit = Long.parseLong(
lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
);
// 计算需要的Translog generation
final long translogGenerationOfLastCommit =
translog.getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1)
.translogFileGeneration();
// 检查Translog大小和时间
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThresholdSizeInBytes
&& relativeTimeInNanosSupplier.getAsLong() - lastFlushTimestamp < flushThresholdAgeInNanos) {
// 既未超过大小阈值,也未超过时间阈值
return false;
}
/*
* 防止无限Flush循环:
* 只有当新提交会指向更新的generation时,才触发Flush
* 这确保Flush后Translog大小会实际减小
*/
final long translogGenerationOfNewCommit = translog.getMinGenerationForSeqNo(
localCheckpointTracker.getProcessedCheckpoint() + 1
).translogFileGeneration();
return translogGenerationOfNewCommit > translogGenerationOfLastCommit
|| localCheckpointTracker.getProcessedCheckpoint() == localCheckpointTracker.getMaxSeqNo();
}
步骤2: 执行Flush
// InternalEngine.java: 2283-2330行
protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionListener<FlushResult> listener) {
// 获取Flush锁
try {
flushLock.lock();
// 检查是否有未提交的更改
boolean hasUncommittedChanges = hasUncommittedChanges();
if (hasUncommittedChanges
|| force
|| shouldPeriodicallyFlush()
|| getProcessedLocalCheckpoint() > Long.parseLong(
lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
)) {
ensureCanFlush();
Translog.Location commitLocation = getTranslogLastWriteLocation();
try {
// 步骤1: 滚动Translog generation
translog.rollGeneration();
logger.trace("starting commit for flush; commitTranslog=true");
long lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong();
// 预先记录即将生成的segment generation
preCommitSegmentGeneration.set(lastCommittedSegmentInfos.getGeneration() + 1);
// 步骤2: 提交IndexWriter
commitIndexWriter(indexWriter, translog);
logger.trace("finished commit for flush");
// 步骤3: Refresh以清理VersionMap
refresh("version_table_flush", SearcherScope.INTERNAL, true);
// 步骤4: 清理旧的Translog readers
translog.trimUnreferencedReaders();
// 更新Translog位置(如果有新写入)
Translog.Location newLocation = getTranslogLastWriteLocation();
if (newLocation.compareTo(commitLocation) > 0) {
commitLocation = newLocation;
}
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
} finally {
this.lastFlushTimestamp = lastFlushTimestamp;
}
// 异步确保Translog同步
asyncEnsureTranslogSynced(commitLocation, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
listener.onResponse(new FlushResult(true, generation));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} else {
// 无需Flush
listener.onResponse(new FlushResult(false, generation));
}
} finally {
flushLock.unlock();
}
}
步骤3: 提交IndexWriter
// InternalEngine.java
private void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException {
ensureCanFlush();
try {
// 获取最新的检查点信息
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
final long maxSeqNo = localCheckpointTracker.getMaxSeqNo();
// 构建提交的元数据
final Map<String, String> commitData = new HashMap<>(8);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translog.currentFileGeneration()));
commitData.put(Engine.HISTORY_UUID_KEY, historyUUID);
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
if (forceMergeUUID != null) {
commitData.put(FORCE_MERGE_UUID_KEY, forceMergeUUID);
}
// 设置提交数据
writer.setLiveCommitData(commitData.entrySet());
// 步骤1: prepareCommit (写入segments_N但不fsync)
writer.prepareCommit();
// 步骤2: finishCommit (重命名segments_N使其可见)
writer.commit();
} catch (final Exception ex) {
try {
failEngine("lucene commit failed", ex);
} catch (final Exception inner) {
ex.addSuppressed(inner);
}
throw ex;
} catch (final AssertionError e) {
// IndexWriter bug导致的断言失败视为致命错误
throw new FlushFailedEngineException(shardId, e);
}
}
步骤4: Translog滚动和清理
// Translog.java: 滚动generation
public void rollGeneration() throws IOException {
writeLock.lock();
try {
ensureOpen();
// 关闭当前writer
try {
final TranslogWriter newWriter = createWriter(
current.getGeneration() + 1,
getMinFileGeneration(),
current.getLastSyncedCheckpoint().offset
);
// 将当前writer移到readers列表
readers.add(current.closeIntoReader());
current = newWriter;
} catch (final Exception e) {
closeOnTragicEvent(e);
throw new TranslogException(shardId, "failed to roll translog", e);
}
} finally {
writeLock.unlock();
}
}
// 清理不再需要的readers
public void trimUnreferencedReaders() throws IOException {
writeLock.lock();
try {
// 获取最小需要保留的generation
long minReferencedGen = deletionPolicy.getMinTranslogGenerationForRecovery();
// 删除更老的generation文件
for (Iterator<TranslogReader> it = readers.iterator(); it.hasNext(); ) {
TranslogReader reader = it.next();
if (reader.getGeneration() < minReferencedGen) {
it.remove();
IOUtils.closeWhileHandlingException(reader);
Files.deleteIfExists(reader.path());
logger.trace("deleted translog [{}]", reader.path());
}
}
} finally {
writeLock.unlock();
}
}
3.4.3 Flush触发条件
| 触发方式 | 条件 | 说明 |
|---|---|---|
| Translog大小 | index.translog.flush_threshold_size |
默认512MB |
| 定时触发 | 定时检查 | 默认30分钟 |
| 手动触发 | POST /_flush |
立即执行 |
| 索引关闭 | Close Index | 确保数据持久化 |
| 大合并后 | After Force Merge | 清理Translog |
Flush作用:
- 提交Lucene段到磁盘(创建segments_N)
- Fsync确保数据真正写入物理存储
- 清空/清理Translog文件
- 创建恢复点(Recovery Point)
- 减少恢复时间
Flush与Refresh的区别:
| 操作 | Refresh | Flush |
|---|---|---|
| 目的 | 使文档可搜索 | 持久化到磁盘 |
| 频率 | 高(默认1秒) | 低(默认512MB) |
| 段操作 | 创建新段 | 提交段到磁盘 |
| Translog | 不处理 | 清理旧generation |
| 持久性 | 无 | 有(fsync) |
| 性能开销 | 低 | 高 |
3.5 Delete流程 (删除文档)
3.5.1 Delete调用链路(带Soft Deletes)
flowchart TB
Start[DELETE /index/_doc/1] --> IndexShard[IndexShard.applyDeleteOperationOnPrimary]
IndexShard --> EngineDelete[InternalEngine.delete]
EngineDelete --> AcquireLock[获取UID锁<br/>versionMap.acquireLock]
AcquireLock --> VersionCheck[版本检查<br/>getUnderLock]
VersionCheck --> |冲突| Conflict[版本冲突异常]
VersionCheck --> |通过| AssignSeqNo[分配SeqNo]
AssignSeqNo --> CreateTombstone[创建Tombstone文档<br/>ParsedDocument.deleteTombstone]
CreateTombstone --> SetFields[设置字段<br/>seqNo/primaryTerm/version]
SetFields --> AddSoftDelete[添加软删除字段<br/>_soft_deletes: true]
AddSoftDelete --> SoftUpdate[Lucene软删除<br/>softUpdateDocument]
SoftUpdate --> WriteTranslog[写入Translog<br/>Translog.Delete]
WriteTranslog --> UpdateVM[更新VersionMap<br/>putDeleteUnderLock]
UpdateVM --> UpdateLCP[更新LocalCheckpointTracker]
UpdateLCP --> ReleaseLock[释放UID锁]
ReleaseLock --> End[返回DeleteResult]
style CreateTombstone fill:#fff4e1
style SoftUpdate fill:#fff4e1
style UpdateVM fill:#e1ffe1
3.5.2 Delete关键代码
步骤1: InternalEngine.delete核心逻辑
// InternalEngine.java: 1633-1717行
@Override
public DeleteResult delete(Delete delete) throws IOException {
// 强制使用安全访问(需要版本检查)
versionMap.enforceSafeAccess();
assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo());
final DeleteResult deleteResult;
int reservedDocs = 0;
try (
var ignored = acquireEnsureOpenRef();
Releasable ignored2 = versionMap.acquireLock(delete.uid())
) {
lastWriteNanos = delete.startTime();
// 第一步:制定删除策略
final DeletionStrategy plan = deletionStrategyForOperation(delete);
reservedDocs = plan.reservedDocs;
if (plan.earlyResultOnPreflightError.isPresent()) {
// 预检失败(例如版本冲突)
assert delete.origin() == Operation.Origin.PRIMARY : delete.origin();
deleteResult = plan.earlyResultOnPreflightError.get();
} else {
// 第二步:写入Lucene (Soft Delete)
if (plan.deleteFromLucene) {
deleteResult = deleteInLucene(delete, plan);
} else {
// 不需要写Lucene(已经删除或旧版本)
deleteResult = new DeleteResult(
plan.versionOfDeletion,
delete.primaryTerm(),
delete.seqNo(),
plan.currentlyDeleted == false,
delete.id()
);
}
}
// 第三步:写入Translog
if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
}
// 第四步:更新LocalCheckpointTracker
localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo());
if (deleteResult.getTranslogLocation() == null) {
localCheckpointTracker.markSeqNoAsPersisted(deleteResult.getSeqNo());
}
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("delete", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
} finally {
releaseInFlightDocs(reservedDocs);
}
// 定期清理删除的Tombstone
maybePruneDeletes();
return deleteResult;
}
步骤2: 写入Tombstone (Soft Delete)
// InternalEngine.java: 1840-1876行
private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException {
assert assertMaxSeqNoOfUpdatesIsAdvanced(delete.uid(), delete.seqNo(), false, false);
try {
// 创建删除Tombstone文档
final ParsedDocument tombstone = ParsedDocument.deleteTombstone(
engineConfig.getIndexSettings().seqNoIndexOptions(),
delete.id()
);
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
// 设置seqNo和primaryTerm
tombstone.updateSeqID(delete.seqNo(), delete.primaryTerm());
// 设置version
tombstone.version().setLongValue(plan.versionOfDeletion);
final LuceneDocument doc = tombstone.docs().get(0);
// 确保有_tombstone字段
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null
: "Delete tombstone document but _tombstone field is not set [" + doc + " ]";
// 添加软删除字段
doc.add(softDeletesField);
// 写入Lucene
if (plan.addStaleOpToLucene || plan.currentlyDeleted) {
// 文档不存在或已删除:直接addDocument
indexWriter.addDocument(doc);
} else {
// 文档存在:使用softUpdateDocument
// 先添加新的tombstone,再标记旧文档为软删除
indexWriter.softUpdateDocument(
new Term(IdFieldMapper.NAME, delete.uid()),
doc,
softDeletesField
);
}
return new DeleteResult(
plan.versionOfDeletion,
delete.primaryTerm(),
delete.seqNo(),
plan.currentlyDeleted == false, // found
delete.id()
);
} catch (final Exception ex) {
// 删除失败视为致命错误
if (ex instanceof AlreadyClosedException == false && ex instanceof IOException == false) {
throw new AssertionError("Unexpected exception during deletion", ex);
}
throw ex;
}
}
Soft Delete机制说明:
普通删除 vs 软删除:
普通删除 (Hard Delete):
- IndexWriter.deleteDocuments(term)
- 文档立即标记为删除
- 在Merge时物理删除
软删除 (Soft Delete):
- IndexWriter.softUpdateDocument(term, tombstone, softDeleteField)
- 保留原文档,标记_soft_deletes=true
- 写入新的Tombstone文档
- 好处:
1) 支持CCR (跨集群复制)
2) 更高效的恢复(保留历史操作)
3) 可以配置保留时间
3.6 实时GET流程
3.6.1 GET调用链路 (三级查找)
flowchart TB
Start[GET /index/_doc/1] --> CheckRealtime{realtime=true?}
CheckRealtime --> |No| DirectLucene[直接查询Lucene]
CheckRealtime --> |Yes| AcquireLock[获取UID锁<br/>versionMap.acquireLock]
AcquireLock --> GetVM[查询LiveVersionMap<br/>getUnderLock]
GetVM --> CheckVM{存在VersionValue?}
CheckVM --> |No| ReleaseLock1[释放锁]
CheckVM --> |Yes| CheckDelete{isDelete?}
CheckDelete --> |Yes| NotFound[返回NOT_EXISTS]
CheckDelete --> |No| CheckLocation{有Translog Location?}
CheckLocation --> |Yes| ReadTranslog[从Translog读取<br/>translog.read]
CheckLocation --> |No| ReleaseLock2[释放锁]
ReadTranslog --> Deserialize[反序列化Operation]
Deserialize --> ExtractSource[提取_source]
ExtractSource --> ReturnFromTL[返回GetResult]
ReleaseLock1 --> SearchLucene[从Lucene搜索<br/>termQuery(_id)]
ReleaseLock2 --> SearchLucene
DirectLucene --> SearchLucene
SearchLucene --> FindDoc{找到文档?}
FindDoc --> |Yes| ExtractFields[提取字段]
FindDoc --> |No| ReturnNotFound[返回NOT_EXISTS]
ExtractFields --> CheckSoftDelete{_soft_deletes=true?}
CheckSoftDelete --> |Yes| ReturnNotFound
CheckSoftDelete --> |No| ReturnFromLucene[返回GetResult]
style GetVM fill:#e1ffe1
style ReadTranslog fill:#ffe1e1
style SearchLucene fill:#fff4e1
3.6.2 GET关键代码
步骤1: 实时GET入口
// InternalEngine.java: 912-980行
protected GetResult realtimeGetUnderLock(
Get get,
MappingLookup mappingLookup,
DocumentParser documentParser,
Function<Searcher, Searcher> searcherWrapper,
boolean getFromSearcher
) {
assert isDrainedForClose() == false;
assert get.realtime();
// 第一步:查询LiveVersionMap
final VersionValue versionValue;
try (Releasable ignore = versionMap.acquireLock(get.uid())) {
versionValue = getVersionFromMap(get.uid());
}
try {
boolean getFromSearcherIfNotInTranslog = getFromSearcher;
if (versionValue != null) {
// 找到版本信息
getFromSearcherIfNotInTranslog = true;
// 检查是否是删除
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
}
// 检查版本冲突
if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
throw new VersionConflictEngineException(
shardId,
"[" + get.id() + "]",
get.versionType().explainConflictForReads(versionValue.version, get.version())
);
}
// 检查if_seq_no和if_primary_term
if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
if (versionValue.seqNo != get.getIfSeqNo() || versionValue.term != get.getIfPrimaryTerm()) {
throw new VersionConflictEngineException(...);
}
}
// 第二步:尝试从Translog读取
if (versionValue instanceof IndexVersionValue) {
IndexVersionValue iv = (IndexVersionValue) versionValue;
if (iv.getLocation() != null) {
// 从Translog读取
return getFromTranslog(
get,
iv.getLocation(),
versionValue.version,
versionValue.seqNo,
versionValue.term,
mappingLookup,
documentParser
);
}
}
}
// 第三步:从Lucene读取
if (getFromSearcherIfNotInTranslog) {
return getFromSearcher(
get,
acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper),
false
);
} else {
return GetResult.NOT_EXISTS;
}
} catch (Exception e) {
maybeFailEngine("realtime_get", e);
throw e;
}
}
步骤2: 从Translog读取
// InternalEngine.java
private GetResult getFromTranslog(
Get get,
Translog.Location location,
long version,
long seqNo,
long term,
MappingLookup mappingLookup,
DocumentParser documentParser
) throws IOException {
// 读取Translog操作
Translog.Operation op = translog.readOperation(location);
if (op == null) {
// Translog已经滚动,fallback到Lucene
return getFromSearcher(get, acquireSearcher("realtime_get_fallback", SearcherScope.INTERNAL), false);
}
assert op instanceof Translog.Index : "unexpected operation: " + op;
Translog.Index index = (Translog.Index) op;
// 提取_source
BytesReference source = index.source();
// 构建GetResult
return new GetResult(
new Searcher("realtime_get_translog", null, null),
new Engine.GetResult.Exists(
get.id(),
version,
seqNo,
term,
source,
null, // stored fields
mappingLookup
)
);
}
步骤3: 从Lucene读取
// Engine.java: 913-971行
protected final GetResult getFromSearcher(
Get get,
Engine.Searcher searcher,
boolean searcherWasRefreshed
) throws IOException {
try {
// 构建TermQuery查询_id
Query query = new TermQuery(new Term(IdFieldMapper.NAME, get.uid()));
// 搜索
final IndexSearcher indexSearcher = searcher.getIndexSearcher();
final Weight weight = indexSearcher.createWeight(
indexSearcher.rewrite(query),
ScoreMode.COMPLETE_NO_SCORES,
1.0f
);
// 遍历所有Leaf
for (LeafReaderContext leaf : indexSearcher.getIndexReader().leaves()) {
final Scorer scorer = weight.scorer(leaf);
if (scorer == null) {
continue;
}
final DocIdSetIterator iterator = scorer.iterator();
int docId = iterator.nextDoc();
if (docId != DocIdSetIterator.NO_MORE_DOCS) {
// 找到文档
// 检查软删除
final FieldsVisitor fieldsVisitor = new FieldsVisitor(true);
leaf.reader().document(docId, fieldsVisitor);
if (fieldsVisitor.isSoftDeleted()) {
// 软删除的文档,视为不存在
return GetResult.NOT_EXISTS;
}
// 提取字段
final long seqNo = readSeqNo(leaf.reader(), docId);
final long primaryTerm = readPrimaryTerm(leaf.reader(), docId);
final long version = readVersion(leaf.reader(), docId);
return new GetResult(
searcher,
new GetResult.Exists(
get.id(),
version,
seqNo,
primaryTerm,
fieldsVisitor.source(),
fieldsVisitor.fields(),
mappingLookup
)
);
}
}
// 未找到文档
return GetResult.NOT_EXISTS;
} finally {
searcher.close();
}
}
3.6.3 实时GET特性说明
三级查找顺序:
- LiveVersionMap: O(1)查找,获取最新状态和Translog位置
- Translog: 顺序文件读取,获取还未Refresh的文档
- Lucene: 索引查询,获取已Refresh的文档
实时性保证:
- 写入后立即可读,无需等待Refresh
- 利用LiveVersionMap快速定位
- Translog提供WAL保证
性能考虑:
- LiveVersionMap查询: 微秒级
- Translog读取: 毫秒级(磁盘IO)
- Lucene查询: 毫秒级(取决于段数量)
3.7 Merge流程 (段合并)
flowchart LR
Start[后台 Merge] --> Policy[MergePolicy<br/>选择待合并段]
Policy --> Scheduler[MergeScheduler<br/>调度合并任务]
Scheduler --> Merge[合并多个小段<br/>为一个大段]
Merge --> Delete[删除旧段]
Delete --> End[完成]
Merge作用:
- 减少段数量
- 提升搜索性能
- 物理删除已标记删除的文档(包括软删除)
4. 核心API
4.1 index - 索引文档
方法签名
public IndexResult index(Index index) throws IOException
参数说明
public static class Index extends Operation {
private final ParsedDocument doc; // 解析后的文档
private final long version; // 版本号
private final VersionType versionType; // 版本类型
private final long ifSeqNo; // 条件序列号
private final long ifPrimaryTerm; // 条件主分片任期
private final long autoGeneratedIdTimestamp; // 自动生成ID的时间戳
}
返回值
public static class IndexResult extends Result {
private final boolean created; // 是否是新建(false表示更新)
}
核心逻辑
@Override
public IndexResult index(Index index) throws IOException {
// 1. 版本检查(LiveVersionMap)
final IndexingStrategy plan = planIndexingAsNonPrimary(index);
// 2. 分配序列号
if (index.origin() == Operation.Origin.PRIMARY) {
index = new Index(..., generateSeqNoForOperationOnPrimary(index), ...);
}
// 3. 写入Translog
Translog.Location location = translog.add(new Translog.Index(index));
// 4. 写入Lucene
if (plan.useLuceneUpdateDocument) {
indexWriter.updateDocument(new Term(IdFieldMapper.NAME, index.uid()), index.docs());
} else {
indexWriter.addDocument(index.docs());
}
// 5. 更新LiveVersionMap
versionMap.putIndexUnderLock(
index.uid(),
new IndexVersionValue(null, plan.versionForIndexing, index.seqNo(), index.primaryTerm())
);
return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), created);
}
4.2 delete - 删除文档
方法签名
public DeleteResult delete(Delete delete) throws IOException
参数说明
public static class Delete extends Operation {
private final String id; // 文档ID
private final BytesRef uid; // 文档UID
private final long version; // 版本号
private final VersionType versionType; // 版本类型
}
核心逻辑
@Override
public DeleteResult delete(Delete delete) throws IOException {
// 1. 版本检查
final DeletionStrategy plan = deletionStrategyForOperation(delete);
// 2. 分配序列号
delete = new Delete(..., generateSeqNoForOperationOnPrimary(delete), ...);
// 3. 写入Translog
translog.add(new Translog.Delete(delete));
// 4. 写入Lucene (软删除Tombstone)
ParsedDocument tombstone = ParsedDocument.deleteTombstone(delete.id());
tombstone.updateSeqID(delete.seqNo(), delete.primaryTerm());
indexWriter.softUpdateDocument(new Term(IdFieldMapper.NAME, delete.uid()), tombstone.docs().get(0));
// 5. 更新LiveVersionMap
versionMap.putDeleteUnderLock(
delete.uid(),
new DeleteVersionValue(plan.versionOfDeletion, delete.seqNo(), delete.primaryTerm())
);
return new DeleteResult(plan.versionOfDeletion, delete.seqNo(), found);
}
4.3 get - 获取文档(实时GET)
方法签名
public GetResult get(
Get get,
BiFunction<String, SearcherScope, Engine.Searcher> searcherFactory
) throws EngineException
核心逻辑
public GetResult get(Get get, ...) {
// 1. 检查LiveVersionMap
try (Releasable ignored = versionMap.acquireLock(get.uid())) {
VersionValue versionValue = versionMap.getUnderLock(get.uid());
if (versionValue != null) {
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS; // 已删除
}
if (versionValue instanceof IndexVersionValue) {
IndexVersionValue iv = (IndexVersionValue) versionValue;
if (iv.getLocation() != null) {
// 2. 从Translog读取(还未刷到Lucene)
return getFromTranslog(get, iv.getLocation());
}
}
}
}
// 3. 从Lucene读取
return getFromSearcher(get, searcherFactory);
}
4.4 refresh - 刷新搜索器
方法签名
public void refresh(String source) throws EngineException
核心逻辑
public void refresh(String source) throws EngineException {
// 1. 获取最新的Lucene Searcher
ReferenceManager.RefreshListener refreshListener = ...;
// 2. Refresh IndexWriter
indexWriter.getReader(); // 或indexWriter.commit()
// 3. 更新ReaderManager
externalReaderManager.maybeRefresh();
// 4. 清理LiveVersionMap中已持久化的条目
pruneVersionMap();
}
触发条件:
- 定时触发(默认1秒)
- 手动调用
POST /_refresh - 索引缓冲满
4.5 flush - 提交到磁盘
方法签名
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException
核心逻辑
public CommitId flush(boolean force, boolean waitIfOngoing) {
// 1. 执行Refresh
refresh("flush");
// 2. 提交Lucene
indexWriter.commit();
// 3. Fsync到磁盘
store.directory().sync();
// 4. 创建新的Translog Generation
translog.rollGeneration();
translog.trimUnreferencedReaders();
return new CommitId(lastCommittedSegmentInfos.getId());
}
触发条件:
- Translog大小超过阈值(默认512MB)
- 定时触发(默认30分钟)
- 手动调用
POST /_flush
4.6 forceMerge - 强制合并
方法签名
public void forceMerge(
boolean flush,
int maxNumSegments,
boolean onlyExpungeDeletes,
String forceMergeUUID
) throws EngineException
核心逻辑
public void forceMerge(...) {
// 1. 设置MergePolicy
if (onlyExpungeDeletes) {
indexWriter.forceMergeDeletes(); // 仅清理删除
} else {
indexWriter.forceMerge(maxNumSegments); // 合并到指定段数
}
// 2. 可选Flush
if (flush) {
flush(true, true);
}
}
使用场景:
- 只读索引优化(合并为1段)
- 清理已删除文档
- 减少段数量提升搜索性能
4.7 acquireSearcher - 获取搜索器
方法签名
public Engine.Searcher acquireSearcher(String source, SearcherScope scope)
核心逻辑
public Searcher acquireSearcher(String source, SearcherScope scope) {
// 1. 获取DirectoryReader
IndexReader reader = scope == SearcherScope.EXTERNAL
? externalReaderManager.acquire()
: internalReaderManager.acquire();
// 2. 包装为IndexSearcher
IndexSearcher searcher = new IndexSearcher(reader);
searcher.setQueryCache(queryCache);
// 3. 返回Searcher(包含release方法)
return new Searcher(source, searcher, () -> releaseReader(reader));
}
SearcherScope:
EXTERNAL: 用于用户查询(仅可见已refresh的文档)INTERNAL: 用于内部操作(可见所有文档)
4.8 Translog API
4.8.1 add - 添加操作
public Translog.Location add(Translog.Operation operation) throws IOException
4.8.2 rollGeneration - 滚动Generation
public void rollGeneration() throws IOException
4.8.3 newSnapshot - 创建快照
public Translog.Snapshot newSnapshot() throws IOException
5. 核心数据结构
5.1 Engine & InternalEngine
类图
classDiagram
class Engine {
<<abstract>>
+index(Index) IndexResult
+delete(Delete) DeleteResult
+get(Get) GetResult
+refresh(String)
+flush(boolean, boolean) CommitId
+acquireSearcher(String, SearcherScope) Searcher
}
class InternalEngine {
-IndexWriter indexWriter
-DirectoryReader directoryReader
-Translog translog
-LiveVersionMap versionMap
-LocalCheckpointTracker localCheckpointTracker
-ReentrantLock flushLock
+index(Index) IndexResult
+delete(Delete) DeleteResult
}
class ReadOnlyEngine {
-DirectoryReader directoryReader
+index(Index) IndexResult
+delete(Delete) DeleteResult
}
Engine <|-- InternalEngine
Engine <|-- ReadOnlyEngine
类说明
Engine: 存储引擎抽象接口 InternalEngine: 默认实现,支持读写 ReadOnlyEngine: 只读实现,用于冻结索引
5.2 LiveVersionMap
类图
classDiagram
class LiveVersionMap {
-ConcurrentHashMap~BytesRef, VersionValue~ current
-ConcurrentHashMap~BytesRef, VersionValue~ old
-KeyedLock~BytesRef~ keyedLock
+getUnderLock(BytesRef) VersionValue
+putIndexUnderLock(BytesRef, IndexVersionValue)
+putDeleteUnderLock(BytesRef, DeleteVersionValue)
+acquireLock(BytesRef) Releasable
}
class VersionValue {
<<abstract>>
-long version
-long seqNo
-long term
-long time
+getVersion() long
+getSeqNo() long
}
class IndexVersionValue {
-Translog.Location location
+getLocation() Translog.Location
}
class DeleteVersionValue {
+isDelete() boolean
}
LiveVersionMap --> VersionValue
VersionValue <|-- IndexVersionValue
VersionValue <|-- DeleteVersionValue
类说明
LiveVersionMap
职责: 内存中的文档版本映射,用于实时GET和版本冲突检测
关键字段:
| 字段 | 类型 | 说明 |
|---|---|---|
| current | ConcurrentHashMap | 当前活跃的版本映射 |
| old | ConcurrentHashMap | 旧的版本映射(归档) |
| keyedLock | KeyedLock | UID级别的锁 |
核心方法:
// 获取版本(必须持有锁)
VersionValue getUnderLock(BytesRef uid);
// 更新索引版本
void putIndexUnderLock(BytesRef uid, IndexVersionValue versionValue);
// 更新删除版本
void putDeleteUnderLock(BytesRef uid, DeleteVersionValue versionValue);
// 获取锁
Releasable acquireLock(BytesRef uid);
VersionValue
职责: 文档版本信息基类
关键字段:
version: 版本号seqNo: 序列号term: 主分片任期time: 时间戳
IndexVersionValue
职责: 索引操作的版本信息
关键字段:
location: Translog位置(如果还未刷到Lucene)
DeleteVersionValue
职责: 删除操作的版本信息
5.3 Translog
类图
classDiagram
class Translog {
-TranslogWriter current
-List~TranslogReader~ readers
-TranslogDeletionPolicy deletionPolicy
-AtomicLong globalCheckpoint
+add(Operation) Location
+rollGeneration()
+newSnapshot() Snapshot
}
class TranslogWriter {
-FileChannel channel
-long generation
-AtomicLong writtenOffset
+add(Operation) Location
+sync()
}
class TranslogReader {
-FileChannel channel
-long generation
-long firstOperationOffset
+newIterator() Iterator
}
class Operation {
<<abstract>>
-long seqNo
-long primaryTerm
-Operation.Type type
}
class Index {
-String id
-byte[] source
-long version
}
class Delete {
-String id
-long version
}
class NoOp {
-String reason
}
Translog --> TranslogWriter
Translog --> TranslogReader
Translog --> Operation
Operation <|-- Index
Operation <|-- Delete
Operation <|-- NoOp
类说明
Translog
职责: 事务日志,保证数据持久性
关键字段:
| 字段 | 类型 | 说明 |
|---|---|---|
| current | TranslogWriter | 当前写入的generation |
| readers | List |
历史generations(用于恢复) |
| deletionPolicy | TranslogDeletionPolicy | 删除策略 |
| globalCheckpoint | AtomicLong | 全局检查点 |
Generation机制:
Generation 0 → Generation 1 → Generation 2
↓ ↓ ↓
translog-1.tlog translog-2.tlog translog-3.tlog (current)
Operation
操作类型:
Index: 索引操作Delete: 删除操作NoOp: 空操作(用于序列号对齐)
字段结构:
public abstract class Operation {
private long seqNo; // 序列号
private long primaryTerm; // 主分片任期
private Type type; // 操作类型
}
5.4 LocalCheckpointTracker
类图
classDiagram
class LocalCheckpointTracker {
-long checkpoint
-LongObjectHashMap~CountedBitSet~ processedSeqNo
-List~DelayedOperation~ delayedOperations
+markSeqNoAsProcessed(long)
+markSeqNoAsPersisted(long)
+getProcessedCheckpoint() long
+getPersistedCheckpoint() long
}
class CountedBitSet {
-BitSet bitSet
-int count
+set(int) boolean
+get(int) boolean
}
LocalCheckpointTracker --> CountedBitSet
类说明
LocalCheckpointTracker
职责: 跟踪本地检查点,确保序列号连续
关键概念:
Processed Checkpoint: 所有 <= checkpoint 的 seqNo 都已处理
Persisted Checkpoint: 所有 <= checkpoint 的 seqNo 都已持久化
核心方法:
// 标记序列号为已处理
void markSeqNoAsProcessed(long seqNo);
// 标记序列号为已持久化
void markSeqNoAsPersisted(long seqNo);
// 获取处理检查点
long getProcessedCheckpoint();
// 获取持久化检查点
long getPersistedCheckpoint();
示例:
已处理的 seqNo: [0, 1, 2, 3, 5, 6, 7]
↑ 缺少 4
Processed Checkpoint = 3 (因为4还未处理)
当 4 被处理后:
Processed Checkpoint = 7
5.5 Lucene相关数据结构
IndexWriter
public class IndexWriter implements Closeable {
// 添加文档
long addDocument(Iterable<? extends IndexableField> doc);
// 更新文档
long updateDocument(Term term, Iterable<? extends IndexableField> doc);
// 删除文档
long deleteDocuments(Term... terms);
// 软删除(标记而非物理删除)
long softUpdateDocument(Term term, Iterable<? extends IndexableField> doc, Field softDeletesField);
// 提交
long commit();
// 强制合并
void forceMerge(int maxNumSegments);
}
DirectoryReader
public abstract class DirectoryReader extends IndexReader {
// 打开Reader
static DirectoryReader open(Directory directory);
// 获取段信息
SegmentInfos getSegmentInfos();
// 刷新(获取最新Reader)
DirectoryReader openIfChanged(DirectoryReader oldReader);
}
SegmentInfos
public final class SegmentInfos implements Cloneable {
// 所有段的列表
private List<SegmentCommitInfo> segments;
// 版本号
private long version;
// Generation
private long generation;
}
5.6 数据结构关系总览
graph TB
IE[InternalEngine] --> IW[IndexWriter<br/>Lucene写入]
IE --> DR[DirectoryReader<br/>Lucene读取]
IE --> TL[Translog<br/>事务日志]
IE --> LVM[LiveVersionMap<br/>版本映射]
IE --> LCT[LocalCheckpointTracker<br/>检查点跟踪]
IW --> Segments[Segments<br/>Lucene段]
DR --> Segments
TL --> TW[TranslogWriter<br/>当前Generation]
TL --> TR[TranslogReader<br/>历史Generation]
LVM --> IVV[IndexVersionValue<br/>索引版本]
LVM --> DVV[DeleteVersionValue<br/>删除版本]
style IE fill:#e1f5ff
style TL fill:#ffe1e1
style LVM fill:#e1ffe1
6. 详细时序图
6.1 文档索引流程
完整索引时序图
sequenceDiagram
autonumber
participant Shard as IndexShard
participant Engine as InternalEngine
participant VersionMap as LiveVersionMap
participant Translog as Translog
participant IndexWriter as IndexWriter<br/>(Lucene)
participant LCT as LocalCheckpointTracker
Shard->>Engine: index(Index operation)
Note over Engine,LCT: 步骤1: 版本检查与锁获取
Engine->>VersionMap: acquireLock(uid)
VersionMap-->>Engine: Lock acquired
Engine->>VersionMap: getUnderLock(uid)
VersionMap-->>Engine: currentVersion (或 null)
Engine->>Engine: 版本冲突检查<br/>(if version conflict)
alt 版本冲突
Engine-->>Shard: VersionConflictException
end
Note over Engine,LCT: 步骤2: 分配序列号
alt Primary Shard
Engine->>Engine: generateSeqNoForOperationOnPrimary()
Engine->>Engine: seqNo = localCheckpoint + 1
else Replica Shard
Engine->>Engine: 使用 Primary 分配的 seqNo
end
Note over Engine,LCT: 步骤3: 写入 Translog
Engine->>Translog: add(IndexOperation)
Translog->>Translog: 序列化操作
Translog->>Translog: 写入当前 generation
Translog-->>Engine: Location
Note over Engine,LCT: 步骤4: 写入 Lucene
Engine->>Engine: 判断是 Add 还是 Update
alt 文档不存在(Create)
Engine->>IndexWriter: addDocument(doc)
else 文档已存在(Update)
Engine->>IndexWriter: updateDocument(term, doc)
end
IndexWriter->>IndexWriter: 写入内存缓冲
IndexWriter-->>Engine: OK
Note over Engine,LCT: 步骤5: 更新 LiveVersionMap
Engine->>VersionMap: putIndexUnderLock(uid, <br/>new IndexVersionValue(location, version, seqNo, term))
VersionMap->>VersionMap: current.put(uid, versionValue)
Note over Engine,LCT: 步骤6: 更新 LocalCheckpointTracker
Engine->>LCT: markSeqNoAsProcessed(seqNo)
LCT->>LCT: 更新 processedSeqNo BitSet
LCT->>LCT: 计算新的 checkpoint
LCT-->>Engine: newCheckpoint
Engine->>VersionMap: releaseLock(uid)
Engine-->>Shard: IndexResult(version, seqNo, created)
时序图说明
关键步骤详解
1. 版本检查(步骤1-5)
- 获取文档级别的锁
- 从LiveVersionMap获取当前版本
- 根据VersionType检查版本冲突
- 可能的VersionType:
INTERNAL: 内部版本(自动递增)EXTERNAL: 外部版本(用户指定)EXTERNAL_GTE: 外部版本(大于等于)
2. 序列号分配(步骤6-8)
- Primary: 生成新的seqNo = localCheckpoint + 1
- Replica: 使用Primary分配的seqNo
- seqNo用于保证操作顺序和一致性
3. Translog写入(步骤9-12)
- 序列化操作到字节数组
- 追加写入当前generation
- 返回Location (generation, position, size)
- 可配置同步策略:
REQUEST: 每次请求后fsync(默认)ASYNC: 异步fsync(5秒间隔)
4. Lucene写入(步骤13-18)
- 判断是新增还是更新
- 新增:
addDocument(doc) - 更新:
updateDocument(term, doc)(先删除后添加) - 写入IndexWriter的内存缓冲
- 此时文档还不可搜索(需要Refresh)
5. LiveVersionMap更新(步骤19-21)
- 记录最新的版本信息
- 包含Translog Location(用于实时GET)
- 后续Refresh后会清理已持久化的条目
6. LocalCheckpointTracker更新(步骤22-25)
- 标记seqNo为已处理
- 更新本地检查点
- 用于跟踪复制进度
6.2 Refresh流程
Refresh时序图
sequenceDiagram
autonumber
participant Scheduler as RefreshScheduler<br/>(每1秒)
participant Engine as InternalEngine
participant IndexWriter as IndexWriter
participant ReaderManager as ReaderManager
participant VersionMap as LiveVersionMap
Scheduler->>Engine: refresh("scheduled")
Note over Engine,VersionMap: 步骤1: 生成新的可搜索段
Engine->>IndexWriter: getReader()<br/>或 commit()
IndexWriter->>IndexWriter: 1. flush 内存缓冲到段
IndexWriter->>IndexWriter: 2. 创建新的 SegmentReader
IndexWriter-->>Engine: DirectoryReader
Note over Engine,VersionMap: 步骤2: 更新 ReaderManager
Engine->>ReaderManager: maybeRefreshBlocking()
ReaderManager->>ReaderManager: 更新到新的 Reader
ReaderManager->>ReaderManager: 关闭旧的 Reader
ReaderManager-->>Engine: refreshed
Note over Engine,VersionMap: 步骤3: 清理 LiveVersionMap
Engine->>VersionMap: pruneVersionMap(refreshedSeqNo)
loop 遍历 VersionMap 条目
VersionMap->>VersionMap: 检查 location != null<br/>&& seqNo <= refreshedSeqNo
alt 已持久化到 Lucene
VersionMap->>VersionMap: 移除 location<br/>(保留 version/seqNo)
end
end
VersionMap-->>Engine: pruned
Note over Engine,VersionMap: 步骤4: 通知监听器
Engine->>Engine: 触发 RefreshListener
Engine-->>Scheduler: refresh completed
时序图说明
Refresh的作用
使文档可被搜索:
- 将IndexWriter内存缓冲中的文档刷到新段
- 创建新的DirectoryReader
- 新文档变为可搜索状态
清理LiveVersionMap:
- 移除已持久化到Lucene的Location
- 减少内存占用
- 保留版本和序列号(用于版本检查)
触发条件
- 定时触发:
index.refresh_interval(默认1s) - 手动触发:
POST /_refresh - 索引缓冲满:
indices.memory.index_buffer_size超限 - Flush操作: Flush会先执行Refresh
6.3 Flush流程
Flush时序图
sequenceDiagram
autonumber
participant Trigger as Trigger<br/>(Translog满/定时)
participant Engine as InternalEngine
participant IndexWriter as IndexWriter
participant Store as Store<br/>(Directory)
participant Translog as Translog
participant LCT as LocalCheckpointTracker
Trigger->>Engine: flush(force=false, wait=true)
Engine->>Engine: 获取 flushLock
Note over Engine,LCT: 步骤1: Refresh(生成新段)
Engine->>Engine: refresh("flush")
Note right of Engine: 参见 Refresh 流程
Note over Engine,LCT: 步骤2: 提交 Lucene
Engine->>IndexWriter: commit()
IndexWriter->>IndexWriter: 1. prepareCommit()<br/>生成 segments_N 文件
IndexWriter->>IndexWriter: 2. finishCommit()<br/>完成提交
IndexWriter-->>Engine: CommitInfo
Note over Engine,LCT: 步骤3: Fsync 到磁盘
Engine->>Store: directory().sync(fileNames)
Store->>Store: fsync 所有段文件
Store-->>Engine: synced
Note over Engine,LCT: 步骤4: 创建新的 Translog Generation
Engine->>LCT: getPersistedCheckpoint()
LCT-->>Engine: checkpoint
Engine->>Translog: rollGeneration()
Translog->>Translog: 1. 关闭当前 writer
Translog->>Translog: 2. 创建新 generation
Translog->>Translog: 3. 移动到 readers 列表
Translog-->>Engine: newGeneration
Note over Engine,LCT: 步骤5: 清理旧 Translog
Engine->>Translog: trimUnreferencedReaders()
Translog->>Translog: 删除已持久化的<br/>旧 generation 文件
Engine->>Engine: 释放 flushLock
Engine-->>Trigger: CommitId
时序图说明
Flush的作用
持久化数据:
- 将所有内存中的段提交到磁盘
- Fsync确保数据真正写入物理存储
- 创建恢复点(Lucene commit point)
清理Translog:
- 滚动到新的generation
- 删除已持久化的旧generation
- 减少恢复时间
触发条件
- Translog大小:
index.translog.flush_threshold_size(默认512MB) - 定时触发: 默认每30分钟
- 手动触发:
POST /_flush - 关闭索引: Close index时
6.4 实时GET流程
GET时序图
sequenceDiagram
autonumber
participant Client as GetRequest
participant Engine as InternalEngine
participant VersionMap as LiveVersionMap
participant Translog as Translog
participant Lucene as Lucene<br/>DirectoryReader
Client->>Engine: get(Get request)
Note over Engine,Lucene: 步骤1: 检查 LiveVersionMap
Engine->>VersionMap: acquireLock(uid)
Engine->>VersionMap: getUnderLock(uid)
VersionMap-->>Engine: VersionValue
alt VersionValue 是删除
Engine->>VersionMap: releaseLock(uid)
Engine-->>Client: GetResult(NOT_FOUND)
else VersionValue 有 Translog Location
Note over Engine,Translog: 步骤2: 从 Translog 读取
Engine->>Translog: read(location)
Translog->>Translog: seek(generation, position)
Translog->>Translog: 读取并反序列化
Translog-->>Engine: Operation
Engine->>Engine: 提取 _source
Engine->>VersionMap: releaseLock(uid)
Engine-->>Client: GetResult(found, source)
else VersionValue 无 Location
Engine->>VersionMap: releaseLock(uid)
Note over Engine,Lucene: 步骤3: 从 Lucene 读取
Engine->>Lucene: search(termQuery(_id))
Lucene->>Lucene: 查找文档
Lucene-->>Engine: Document
Engine->>Engine: 提取 _source
Engine-->>Client: GetResult(found, source)
end
时序图说明
实时GET原理
三级查找:
- LiveVersionMap: 检查文档状态和位置
- Translog: 读取还未刷到Lucene的文档
- Lucene: 读取已持久化的文档
实时性保证:
- 即使文档还未Refresh,也能通过Translog读取
- 避免等待Refresh(1秒延迟)
- 保证写入后立即可读
6.5 Merge流程
Merge时序图
sequenceDiagram
autonumber
participant BG as Background<br/>MergeScheduler
participant Policy as MergePolicy
participant Scheduler as ConcurrentMergeScheduler
participant IndexWriter as IndexWriter
participant Lucene as Lucene<br/>段文件
Note over BG,Lucene: 后台持续运行
BG->>Policy: findMerges(segmentInfos)
Policy->>Policy: 评估段大小和数量
Policy->>Policy: 选择待合并的段<br/>(小段优先)
Policy-->>BG: MergeSpecification
alt 有待合并的段
BG->>Scheduler: merge(mergeSpec)
Scheduler->>Scheduler: 分配合并线程<br/>(max_thread_count)
par 并发合并多个 Merge
Scheduler->>IndexWriter: merge(oneMerge)
IndexWriter->>Lucene: 读取段1数据
IndexWriter->>Lucene: 读取段2数据
IndexWriter->>Lucene: 读取段N数据
IndexWriter->>IndexWriter: 合并倒排索引
IndexWriter->>IndexWriter: 合并文档存储
IndexWriter->>IndexWriter: 合并 DocValues
IndexWriter->>Lucene: 写入新段
IndexWriter->>IndexWriter: 删除旧段
IndexWriter-->>Scheduler: merge completed
end
Scheduler-->>BG: all merges completed
end
时序图说明
Merge策略
TieredMergePolicy (默认):
- 将相似大小的段合并
- 目标: 减少段数量,同时避免合并过大的段
- 参数:
max_merged_segment: 最大段大小(5GB)segments_per_tier: 每层段数量(10)
Force Merge:
- 手动触发:
POST /_forcemerge?max_num_segments=1 - 用于只读索引优化
- 合并为指定数量的段
7. 性能优化
7.1 写入性能
- 批量写入: 使用Bulk API
- Refresh间隔: 调整
index.refresh_interval - Translog刷盘: 调整
index.translog.durability - 索引缓冲: 调整
indices.memory.index_buffer_size
7.2 搜索性能
- Refresh: 更频繁的refresh提升实时性
- Merge: 减少段数量
- 缓存: 利用QueryCache和FieldDataCache
7.3 存储优化
- Soft Deletes: 减少段合并开销
- Force Merge: 强制合并为单段
- 压缩: 使用
best_compressioncodec
8. 配置与监控
8.1 关键配置
Refresh相关
| 参数 | 默认值 | 说明 |
|---|---|---|
| index.refresh_interval | 1s | Refresh间隔 |
| index.max_refresh_listeners | 1000 | 最大Refresh监听器数量 |
Flush相关
| 参数 | 默认值 | 说明 |
|---|---|---|
| index.translog.flush_threshold_size | 512mb | Translog大小触发Flush |
| index.translog.generation_threshold_size | 64mb | Translog generation滚动阈值 |
| index.translog.durability | REQUEST | 事务日志刷盘策略 |
| index.translog.sync_interval | 5s | Translog fsync间隔 |
Merge相关
| 参数 | 默认值 | 说明 |
|---|---|---|
| index.merge.scheduler.max_thread_count | max(1, processors/2) | 合并线程数 |
| index.merge.policy.max_merged_segment | 5gb | 最大合并段大小 |
8.2 监控指标
GET /_nodes/stats/indices
{
"indices": {
"indexing": {
"index_total": 1000,
"index_time_in_millis": 5000,
"index_current": 2
},
"refresh": {
"total": 100,
"total_time_in_millis": 2000
},
"flush": {
"total": 10,
"total_time_in_millis": 1000
},
"merges": {
"current": 1,
"total": 50,
"total_time_in_millis": 10000
}
}
}
9. 最佳实践
9.1 写入优化
批量写入:
POST /_bulk
{"index":{"_index":"my-index","_id":"1"}}
{"field":"value1"}
{"index":{"_index":"my-index","_id":"2"}}
{"field":"value2"}
调整Refresh间隔:
PUT /my-index/_settings
{
"index": {
"refresh_interval": "30s"
}
}
异步Translog:
PUT /my-index/_settings
{
"index": {
"translog.durability": "async",
"translog.sync_interval": "5s"
}
}
9.2 只读索引优化
Force Merge到1段:
POST /my-index/_forcemerge?max_num_segments=1
禁用Refresh:
PUT /my-index/_settings
{
"index": {
"refresh_interval": "-1"
}
}
9.3 实时搜索场景
降低Refresh间隔:
PUT /my-index/_settings
{
"index": {
"refresh_interval": "200ms"
}
}
使用GET API获取实时数据:
GET /my-index/_doc/1
10. 常见问题
10.1 版本冲突
错误:
VersionConflictEngineException: document already exists
原因: 使用op_type=create但文档已存在
解决: 使用index操作或指定正确的版本号
10.2 Translog损坏
错误:
TranslogCorruptedException: translog is corrupted
解决:
# 清理损坏的translog (会丢失未提交的数据)
POST /_cluster/reroute?retry_failed=true
10.3 段数量过多
现象: 搜索性能下降
解决:
# 强制合并
POST /my-index/_forcemerge?max_num_segments=5
总结
核心要点回顾
存储引擎是Elasticsearch的核心组件,通过InternalEngine封装Lucene,提供文档索引、实时GET、版本控制等功能。
核心机制:
-
LiveVersionMap: 实现实时GET和版本冲突检测
- O(1)查找文档最新版本
- 存储Translog位置用于实时GET
- UID级别锁保证并发安全
-
Translog: 保证数据持久性和故障恢复
- WAL (Write-Ahead Log)机制
- Generation滚动机制
- 支持Replica追赶和恢复
-
Refresh/Flush/Merge: 管理Lucene段的生命周期
- Refresh: 使文档可搜索(默认1秒)
- Flush: 持久化到磁盘(默认512MB)
- Merge: 合并段,物理删除
-
Sequence Number: 保证操作顺序和复制一致性
- 全局唯一、单调递增
- Primary分配、Replica沿用
- 用于追赶和一致性校验
调用链路总结
客户端请求
↓
TransportAction (路由)
↓
IndexShard (分片管理)
↓
├─ prepareIndex (解析文档)
├─ indexingOperationListeners.preIndex
└─ Engine.index
↓
InternalEngine (核心引擎)
↓
├─ versionMap.acquireLock (获取UID锁)
├─ planIndexing (版本检查)
├─ translog.add (写入WAL)
├─ indexWriter.updateDocument (写入Lucene)
├─ versionMap.putIndexUnderLock (更新版本)
└─ localCheckpointTracker.markSeqNoAsProcessed
性能调优要点
写入优化:
- 批量写入 (Bulk API)
- 调整Refresh间隔 (降低实时性要求)
- 异步Translog刷盘 (降低持久性要求)
- 自动生成ID优化 (无版本检查)
搜索优化:
- 合理Refresh间隔 (平衡实时性和性能)
- Force Merge只读索引 (合并为单段)
- 利用QueryCache和FieldDataCache
存储优化:
- Soft Deletes (支持CCR,更高效恢复)
- Force Merge (物理删除已删除文档)
- Best Compression Codec
理解存储引擎的工作原理对于调优写入性能、搜索性能和存储效率至关重要。