Elasticsearch-06-存储引擎

本文档提供存储引擎模块的全面剖析,包括模块职责、架构设计、核心API、数据结构、时序图以及与Lucene的集成。


1. 模块职责

存储引擎(Engine)是Elasticsearch与Apache Lucene集成的关键层,负责所有持久化存储操作。

1.1 核心职责

  1. 文档索引

    • 将文档写入Lucene索引
    • 管理文档版本和序列号
    • 处理文档更新和删除
  2. 事务日志(Translog)

    • 记录所有写操作到事务日志
    • 保证数据持久性
    • 支持故障恢复
  3. 段管理

    • Refresh: 生成新的可搜索段
    • Flush: 提交段到磁盘
    • Merge: 合并小段为大段
  4. 版本控制

    • 管理LiveVersionMap
    • 实现乐观并发控制
    • 处理版本冲突
  5. 软删除(Soft Deletes)

    • 标记文档为已删除而不立即物理删除
    • 支持跨集群复制(CCR)
    • 优化恢复性能
  6. 搜索支持

    • 提供Searcher供查询使用
    • 管理DirectoryReader
    • 控制搜索可见性

1.2 输入与输出

输入:

  • Index/Delete/Update操作(来自IndexShard)
  • Refresh/Flush/Merge请求

输出:

  • IndexWriter(Lucene写入)
  • DirectoryReader(Lucene读取)
  • Translog(事务日志)

1.3 依赖关系

上游依赖:

  • Apache Lucene: 底层存储引擎
  • Store: 文件系统抽象
  • TranslogConfig: 事务日志配置

下游依赖者:

  • IndexShard: 分片管理
  • SearchService: 搜索服务

2. 模块架构

2.1 整体服务架构图

flowchart TB
    subgraph "客户端层"
        Client[REST Client<br/>_bulk / _doc APIs]
    end

    subgraph "请求处理层"
        TransportAction[TransportBulkAction<br/>TransportIndexAction]
        BulkProcessor[BulkProcessor<br/>批量处理器]
    end

    subgraph "分片管理层"
        IndexShard[IndexShard<br/>分片管理]
        ReplicationOp[ReplicationOperation<br/>复制协调]
    end

    subgraph "存储引擎层 (Engine)"
        Engine[Engine<br/>抽象接口]
        InternalEngine[InternalEngine<br/>读写实现]
        ReadOnlyEngine[ReadOnlyEngine<br/>只读实现]
    end

    subgraph "核心组件层"
        VersionMap[LiveVersionMap<br/>版本映射]
        Translog[Translog<br/>事务日志]
        LocalCheckpoint[LocalCheckpointTracker<br/>检查点跟踪]
        Throttle[IndexThrottle<br/>索引限流]
    end

    subgraph "Lucene 层"
        IndexWriter[IndexWriter<br/>写入器]
        DirectoryReader[DirectoryReader<br/>读取器]
        ReaderManager[ReaderManager<br/>Reader管理]
        SegmentInfos[SegmentInfos<br/>段元数据]
    end

    subgraph "存储层"
        Directory[FSDirectory<br/>文件目录]
        TranslogFiles[Translog Files<br/>translog-N.tlog]
        SegmentFiles[Segment Files<br/>segments_N]
    end

    Client --> TransportAction
    TransportAction --> BulkProcessor
    BulkProcessor --> IndexShard
    IndexShard --> ReplicationOp
    ReplicationOp --> |Primary| IndexShard
    ReplicationOp --> |Replica| IndexShard

    IndexShard --> Engine
    Engine --> InternalEngine
    Engine --> ReadOnlyEngine

    InternalEngine --> VersionMap
    InternalEngine --> Translog
    InternalEngine --> LocalCheckpoint
    InternalEngine --> Throttle
    InternalEngine --> IndexWriter
    InternalEngine --> ReaderManager

    IndexWriter --> SegmentInfos
    IndexWriter --> Directory
    ReaderManager --> DirectoryReader
    DirectoryReader --> Directory

    Translog --> TranslogFiles
    Directory --> SegmentFiles

    style InternalEngine fill:#e1f5ff
    style VersionMap fill:#e1ffe1
    style Translog fill:#ffe1e1
    style IndexWriter fill:#fff4e1

2.2 架构层次说明

2.2.1 请求处理层

TransportBulkAction / TransportIndexAction

  • 接收客户端的索引/批量请求
  • 路由请求到目标分片
  • 协调Primary和Replica的复制流程
  • 处理响应和错误

调用链路:

POST /index/_doc/1 → TransportIndexAction
POST /_bulk → TransportBulkAction → BulkProcessor

2.2.2 分片管理层

IndexShard

  • 管理单个分片的生命周期
  • 封装Engine的调用
  • 执行Pre/Post索引监听器
  • 处理Primary和Replica的差异化逻辑

核心方法:

  • applyIndexOperationOnPrimary(): Primary分片索引
  • applyIndexOperationOnReplica(): Replica分片索引
  • prepareIndex(): 准备索引操作(解析文档、动态映射)

ReplicationOperation

  • 协调Primary和Replica的复制
  • 等待Replica确认
  • 处理复制失败和重试

2.2.3 存储引擎层

InternalEngine (核心)

  • 默认的读写引擎实现
  • 封装Lucene IndexWriter和DirectoryReader
  • 管理所有写入和读取操作
  • 实现事务日志、版本控制、检查点跟踪

ReadOnlyEngine

  • 只读引擎实现
  • 用于冻结索引(Frozen Index)
  • 仅支持读取操作

2.2.4 核心组件层

LiveVersionMap

  • 内存中的文档版本映射
  • Key: 文档UID, Value: VersionValue
  • 功能:
    • 实时GET支持(存储Translog位置)
    • 版本冲突检测
    • 优化自动生成ID的场景(Unsafe模式)

Translog (事务日志)

  • 顺序写入的操作日志
  • Generation机制(translog-1.tlog, translog-2.tlog, …)
  • 保证数据持久性(fsync策略)
  • 支持故障恢复和Replica追赶

LocalCheckpointTracker

  • 跟踪已处理和已持久化的序列号
  • 维护本地检查点(Local Checkpoint)
  • 用于复制进度跟踪和数据一致性

IndexThrottle

  • 索引限流机制
  • 当Merge落后时暂停索引写入
  • 保护内存和磁盘IO

2.2.5 Lucene层

IndexWriter

  • Lucene的核心写入器
  • 管理段(Segments)的创建
  • 执行段合并(Merge)
  • 维护倒排索引、文档存储、DocValues

DirectoryReader / ReaderManager

  • Lucene的读取器
  • 提供搜索能力
  • 通过Refresh更新到最新数据
  • ReaderManager管理Reader的生命周期

SegmentInfos

  • 段的元数据信息
  • 包含所有段的列表
  • 版本号和Generation

2.3 模块交互矩阵

调用方 被调方 调用方式 数据流 错误处理 一致性要求
IndexShard InternalEngine 同步调用 Index/Delete操作 版本冲突抛异常 强一致(Primary)
InternalEngine LiveVersionMap 同步调用(加锁) 版本检查/更新 串行化
InternalEngine Translog 同步写入 操作日志 IO异常引擎失败 WAL保证
InternalEngine IndexWriter 同步调用 Lucene文档 IO异常引擎失败 最终一致
InternalEngine LocalCheckpointTracker 同步更新 SeqNo处理/持久化 单调递增
RefreshScheduler InternalEngine 定时触发 Refresh请求 异常忽略 最终可见
FlushPolicy InternalEngine 条件触发 Flush请求 失败重试 持久化保证
MergeScheduler IndexWriter 后台异步 Merge任务 异常记录 最终合并

2.4 关键设计决策

1. WAL (Write-Ahead Log)机制

  • Translog先于Lucene写入
  • 保证操作不丢失
  • 支持故障恢复

2. 两级Reader (Internal vs External)

  • Internal Reader: 包含所有文档(含未提交的)
  • External Reader: 仅包含已Refresh的文档
  • 用途区分: 内部操作 vs 用户查询

3. 版本控制策略

  • 乐观并发控制(Optimistic Concurrency Control)
  • LiveVersionMap提供UID级别的锁
  • 支持多种版本类型(Internal/External/ExternalGTE)

4. Sequence Number机制

  • 全局唯一、单调递增
  • 用于复制顺序和一致性
  • Primary分配、Replica沿用

5. Soft Deletes (软删除)

  • 删除操作写入Tombstone文档
  • 不立即物理删除
  • 支持CCR和更高效的恢复

3. 核心流程与调用链路

3.1 完整索引流程 (从API到存储)

3.1.1 上游调用链路图

flowchart TB
    Start[客户端请求<br/>POST /index/_doc/1] --> TransportAction[TransportIndexAction]
    TransportAction --> |路由到分片| IndexShard[IndexShard]

    IndexShard --> PrepareCheck{是Primary?}
    PrepareCheck --> |Yes| PrimaryPath[applyIndexOperationOnPrimary]
    PrepareCheck --> |No| ReplicaPath[applyIndexOperationOnReplica]

    PrimaryPath --> PrepareIndex[prepareIndex<br/>解析文档/动态映射]
    ReplicaPath --> PrepareIndex

    PrepareIndex --> Listeners[indexingOperationListeners.preIndex]
    Listeners --> EngineIndex[engine.index<br/>调用InternalEngine]

    EngineIndex --> VersionLock[获取UID锁<br/>versionMap.acquireLock]
    VersionLock --> VersionCheck[版本检查<br/>getUnderLock]

    VersionCheck --> |冲突| VersionConflict[抛出VersionConflictException]
    VersionCheck --> |通过| SeqNoAssign{是Primary?}

    SeqNoAssign --> |Yes| GenerateSeqNo[生成SeqNo<br/>localCheckpoint + 1]
    SeqNoAssign --> |No| UseSeqNo[使用Primary的SeqNo]

    GenerateSeqNo --> Translog[写入Translog<br/>translog.add]
    UseSeqNo --> Translog

    Translog --> Lucene[写入Lucene<br/>IndexWriter.updateDocument]
    Lucene --> UpdateVM[更新LiveVersionMap<br/>putIndexUnderLock]
    UpdateVM --> UpdateLCP[更新LocalCheckpointTracker<br/>markSeqNoAsProcessed]
    UpdateLCP --> ReleaseLock[释放UID锁]
    ReleaseLock --> PostListeners[indexingOperationListeners.postIndex]
    PostListeners --> End[返回IndexResult]

    style EngineIndex fill:#e1f5ff
    style Translog fill:#ffe1e1
    style Lucene fill:#fff4e1
    style UpdateVM fill:#e1ffe1

3.1.2 关键代码路径详解

步骤1: IndexShard接收请求

// IndexShard.java: 953-976行
public Engine.IndexResult applyIndexOperationOnPrimary(
    long version,
    VersionType versionType,
    SourceToParse sourceToParse,
    long ifSeqNo,
    long ifPrimaryTerm,
    long autoGeneratedTimestamp,
    boolean isRetry
) throws IOException {
    // 调用通用索引方法,标记为PRIMARY来源
    return applyIndexOperation(
        getEngine(),
        UNASSIGNED_SEQ_NO,      // Primary自己分配seqNo
        getOperationPrimaryTerm(),
        version,
        versionType,
        ifSeqNo,
        ifPrimaryTerm,
        autoGeneratedTimestamp,
        isRetry,
        Engine.Operation.Origin.PRIMARY,  // 标记为Primary操作
        sourceToParse
    );
}

关键字段说明:

  • UNASSIGNED_SEQ_NO: Primary需要自己生成序列号
  • getOperationPrimaryTerm(): 当前Primary任期
  • Origin.PRIMARY: 标识这是Primary的写操作

步骤2: 准备Index操作

// IndexShard.java: 1001-1046行
private Engine.IndexResult applyIndexOperation(
    Engine engine,
    long seqNo,
    long opPrimaryTerm,
    long version,
    VersionType versionType,
    // ... 其他参数
) throws IOException {
    // 检查是否允许写入
    ensureWriteAllowed(origin);

    Engine.Index operation;
    try {
        // 准备索引操作:解析文档、处理动态映射
        operation = prepareIndex(
            mapperService,
            sourceToParse,
            seqNo,
            opPrimaryTerm,
            version,
            versionType,
            origin,
            autoGeneratedTimeStamp,
            isRetry,
            ifSeqNo,
            ifPrimaryTerm,
            getRelativeTimeInNanos()
        );

        // 检查是否需要动态映射更新
        Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
        if (update != null) {
            // 返回需要映射更新的结果,由上层处理
            return new Engine.IndexResult(update, operation.parsedDoc().id());
        }
    } catch (Exception e) {
        // 文档级别失败(解析错误、映射失败等)
        verifyNotClosed(e);
        return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo, sourceToParse.id());
    }

    // 执行索引操作
    return index(engine, operation);
}

步骤3: 调用Engine.index

// IndexShard.java: 1116-1133行
private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
    try {
        final Engine.IndexResult result;
        // 执行前置监听器(统计、日志等)
        final Engine.Index preIndex = indexingOperationListeners.preIndex(shardId, index);

        try {
            if (logger.isTraceEnabled()) {
                logger.trace(
                    "index [{}] seq# [{}] allocation-id [{}] primaryTerm [{}] operationPrimaryTerm [{}] origin [{}]",
                    preIndex.id(),
                    preIndex.seqNo(),
                    routingEntry().allocationId(),
                    preIndex.primaryTerm(),
                    getOperationPrimaryTerm(),
                    preIndex.origin()
                );
            }

            // 调用Engine的index方法
            result = engine.index(preIndex);

            // 日志记录结果
            if (logger.isTraceEnabled()) {
                logger.trace("index-done [{}] seq# [{}] result-seq# [{}] result-term [{}] failure [{}]",
                    preIndex.id(), preIndex.seqNo(), result.getSeqNo(), result.getTerm(), result.getFailure());
            }
        } catch (Exception e) {
            // 异常处理...
        }
    } finally {
        // 执行后置监听器
    }
    return result;
}

步骤4: InternalEngine核心索引逻辑

// InternalEngine.java: 1175-1309行
@Override
public IndexResult index(Index index) throws IOException {
    final boolean doThrottle = index.origin().isRecovery() == false;
    try (var ignored1 = acquireEnsureOpenRef()) {
        assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
        int reservedDocs = 0;
        try (
            // 获取文档级别的锁(基于UID)
            Releasable ignored = versionMap.acquireLock(index.uid());
            // 如果需要,获取限流令牌
            Releasable indexThrottle = doThrottle ? throttle.acquireThrottle() : () -> {}
        ) {
            lastWriteNanos = index.startTime();

            /*
             * 自动生成ID优化:
             * 如果文档ID是自动生成的且无重试,可以直接使用addDocument
             * 而不是updateDocument,避免版本检查和查找开销
             */
            final IndexingStrategy plan;

            // 第一步:制定索引策略(版本检查、冲突检测)
            if (index.origin() == Operation.Origin.PRIMARY) {
                plan = planIndexingAsPrimary(index);
            } else {
                plan = planIndexingAsNonPrimary(index);
            }

            final IndexResult indexResult;
            // 如果策略指示有预检错误,直接返回
            if (plan.earlyResultOnPreflightError.isPresent()) {
                indexResult = plan.earlyResultOnPreflightError.get();
            } else {
                // 第二步:写入Lucene
                if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
                    indexResult = indexIntoLucene(index, plan);
                } else {
                    // 不需要写Lucene(例如已存在的更老版本)
                    indexResult = new IndexResult(
                        plan.versionForIndexing,
                        index.primaryTerm(),
                        index.seqNo(),
                        plan.currentNotFoundOrDeleted,
                        index.id()
                    );
                }
            }

            // 第三步:写入Translog(如果不是来自Translog)
            if (index.origin().isFromTranslog() == false) {
                final Translog.Location location;
                if (indexResult.getResultType() == Result.Type.SUCCESS) {
                    // 成功:写入Index操作
                    location = translog.add(new Translog.Index(index, indexResult));
                } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
                    // 失败但有seqNo:写入NoOp
                    final NoOp noOp = new NoOp(
                        indexResult.getSeqNo(),
                        index.primaryTerm(),
                        index.origin(),
                        index.startTime(),
                        indexResult.getFailure().toString()
                    );
                    location = innerNoOp(noOp).getTranslogLocation();
                } else {
                    location = null;
                }
                indexResult.setTranslogLocation(location);
            }

            // 第四步:更新LiveVersionMap
            if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
                final Translog.Location translogLocation =
                    trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
                versionMap.maybePutIndexUnderLock(
                    index.uid(),
                    new IndexVersionValue(
                        translogLocation,       // Translog位置(用于实时GET)
                        plan.versionForIndexing,
                        index.seqNo(),
                        index.primaryTerm()
                    )
                );
            }

            // 第五步:更新LocalCheckpointTracker
            localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
            if (indexResult.getTranslogLocation() == null) {
                // 来自Translog或无seqNo,标记为已持久化
                localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo());
            }

            indexResult.setTook(relativeTimeInNanosSupplier.getAsLong() - index.startTime());
            indexResult.freeze();
            return indexResult;
        } finally {
            releaseInFlightDocs(reservedDocs);
        }
    } catch (RuntimeException | IOException e) {
        // 引擎失败处理
        try {
            if (e instanceof AlreadyClosedException == false && treatDocumentFailureAsTragicError(index)) {
                failEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
            } else {
                maybeFailEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
            }
        } catch (Exception inner) {
            e.addSuppressed(inner);
        }
        throw e;
    }
}

步骤5: 版本检查策略 (Primary)

// InternalEngine.java: 1356-1408行
private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
    assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();

    // 自动生成ID且非重试:可能使用优化路径
    final boolean canOptimizeAddDocument = index.isRetry() == false
        && index.autoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;

    if (canOptimizeAddDocument) {
        // 检查是否真的可以优化(没有重复风险)
        if (mayHaveBeenIndexedBefore(index)) {
            // 有重复风险,使用普通路径
            versionMap.enforceSafeAccess();
        } else {
            // 可以优化:直接addDocument,无需版本检查
            return IndexingStrategy.optimizedAppendOnly(index.version(), 0);
        }
    } else {
        // 非自动ID,总是需要版本检查
        versionMap.enforceSafeAccess();
    }

    // 从VersionMap获取当前版本
    final OpVsLuceneDocStatus opVsLucene;
    final VersionValue versionValue = versionMap.getUnderLock(index.uid());

    if (versionValue != null) {
        // 存在版本信息:检查冲突
        final long currentVersion = versionValue.version;
        final VersionType versionType = index.versionType();

        // 版本冲突检测
        if (versionType.isVersionConflictForWrites(
            currentVersion,
            index.version(),
            versionValue.isDelete()
        )) {
            // 抛出版本冲突异常
            final String message = versionType.explainConflictForWrites(
                currentVersion,
                index.version(),
                versionValue.isDelete()
            );
            return IndexingStrategy.failWithVersionConflict(
                new VersionConflictEngineException(shardId, index, currentVersion, versionValue.isDelete(), message),
                currentVersion,
                0
            );
        }

        // 检查if_seq_no和if_primary_term条件
        if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
            // Compare-and-swap检查
            final boolean samePrimaryTerm = index.getIfPrimaryTerm() == versionValue.term;
            final boolean sameSeqNo = index.getIfSeqNo() == versionValue.seqNo;
            if (samePrimaryTerm == false || sameSeqNo == false) {
                return IndexingStrategy.failWithVersionConflict(...);
            }
        }

        // 通过版本检查
        opVsLucene = compareOpToVersionMapOnSeqNo(
            index.id(),
            index.seqNo(),
            index.primaryTerm(),
            versionValue
        );
    } else {
        // 没有版本信息:检查Lucene
        opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
    }

    // 构建索引策略
    return IndexingStrategy.processNormally(
        opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
        versionType.updateVersion(currentVersion, index.version()),
        0
    );
}

3.2 简化流程图

3.2.1 核心步骤总览

flowchart TB
    Start[开始: index操作] --> VersionCheck[步骤1: 版本检查<br/>LiveVersionMap]
    VersionCheck --> |冲突| Reject[拒绝操作]
    VersionCheck --> |通过| AssignSeqNo[步骤2: 分配SeqNo<br/>Primary生成]
    AssignSeqNo --> WriteTranslog[步骤3: 写入Translog<br/>WAL保证]
    WriteTranslog --> WriteLucene[步骤4: 写入Lucene<br/>IndexWriter]
    WriteLucene --> UpdateVersionMap[步骤5: 更新VersionMap<br/>记录版本]
    UpdateVersionMap --> UpdateCheckpoint[步骤6: 更新Checkpoint<br/>LocalCheckpointTracker]
    UpdateCheckpoint --> End[完成]

    style WriteTranslog fill:#ffe1e1
    style WriteLucene fill:#fff4e1
    style UpdateVersionMap fill:#e1ffe1

3.2.2 关键步骤说明

  1. 版本检查: 检查LiveVersionMap中的当前版本,防止冲突
  2. 分配序列号: Primary分配唯一的seqNo,保证顺序
  3. 写入Translog: WAL机制,保证持久性
  4. 写入Lucene: 通过IndexWriter写入,此时还不可搜索
  5. 更新VersionMap: 记录最新版本和Translog位置(用于实时GET)
  6. 更新Checkpoint: 跟踪序列号处理进度

3.3 Refresh流程 (使文档可搜索)

3.3.1 Refresh调用链路

flowchart TB
    Start[触发源<br/>定时/手动/索引缓冲满] --> Scheduler[RefreshScheduler<br/>或手动调用]
    Scheduler --> EngineRefresh[InternalEngine.refresh]

    EngineRefresh --> GetRM[获取ReferenceManager<br/>External/Internal]
    GetRM --> ReadLock[获取Engine读锁<br/>防止Engine关闭]

    ReadLock --> CheckScope{Blocking?}
    CheckScope --> |Yes| BlockingRefresh[maybeRefreshBlocking<br/>同步刷新]
    CheckScope --> |No| MaybeRefresh[maybeRefresh<br/>尝试刷新]

    BlockingRefresh --> IWGetReader[IndexWriter.getReader<br/>或commit]
    MaybeRefresh --> IWGetReader

    IWGetReader --> FlushBuffer[刷新内存缓冲<br/>生成Segment]
    FlushBuffer --> CreateReader[创建新DirectoryReader]
    CreateReader --> UpdateRM[更新ReferenceManager<br/>原子切换Reader]

    UpdateRM --> CloseOld[关闭旧Reader<br/>RefCount减1]
    CloseOld --> TriggerListener[触发RefreshListener]

    TriggerListener --> PruneVM[清理LiveVersionMap<br/>pruneVersionMap]
    PruneVM --> UpdateCheckpoint[更新refreshedCheckpoint]
    UpdateCheckpoint --> ReleaseLock[释放读锁]
    ReleaseLock --> End[完成]

    style IWGetReader fill:#fff4e1
    style CreateReader fill:#fff4e1
    style PruneVM fill:#e1ffe1

3.3.2 Refresh关键代码

步骤1: InternalEngine.refresh入口

// InternalEngine.java: 2074-2137行
protected final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {
    // 记录refresh前的本地检查点
    final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
    boolean refreshed = false;
    long segmentGeneration = RefreshResult.UNKNOWN_GENERATION;

    try {
        // 增加store引用计数,防止关闭
        if (store.tryIncRef()) {
            try {
                // 获取对应的ReferenceManager (External或Internal)
                ReferenceManager<ElasticsearchDirectoryReader> referenceManager =
                    getReferenceManager(scope);
                long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration();

                // 获取Engine读锁,防止与Engine关闭并发
                final var engineReadLock = engineConfig.getEngineResetLock().readLock();

                if (block) {
                    // 同步刷新:阻塞直到完成
                    engineReadLock.lock();
                    try {
                        referenceManager.maybeRefreshBlocking();
                        segmentGeneration = segmentGenerationAfterRefresh(
                            referenceManager,
                            generationBeforeRefresh
                        );
                        refreshed = true;
                    } finally {
                        engineReadLock.unlock();
                    }
                } else {
                    // 异步刷新:尝试获取锁
                    if (engineReadLock.tryLock()) {
                        try {
                            refreshed = referenceManager.maybeRefresh();
                            if (refreshed) {
                                segmentGeneration = segmentGenerationAfterRefresh(
                                    referenceManager,
                                    generationBeforeRefresh
                                );
                            }
                        } finally {
                            engineReadLock.unlock();
                        }
                    }
                }
            } finally {
                store.decRef();
            }

            // 更新已刷新的检查点
            if (refreshed) {
                lastRefreshedCheckpointListener.updateRefreshedCheckpoint(
                    localCheckpointBeforeRefresh
                );
            }
        } else {
            refreshed = false;
        }
    } catch (AlreadyClosedException e) {
        failOnTragicEvent(e);
        throw e;
    } catch (Exception e) {
        try {
            failEngine("refresh failed source[" + source + "]", e);
        } catch (Exception inner) {
            e.addSuppressed(inner);
        }
        throw new RefreshFailedEngineException(shardId, e);
    }

    return new RefreshResult(refreshed, segmentGeneration);
}

步骤2: ReferenceManager刷新

// Lucene ReferenceManager的核心方法
public final boolean maybeRefresh() throws IOException {
    ensureOpen();

    // 获取最新的Reader
    final G refreshedReference = refreshIfNeeded(current);

    if (refreshedReference != null) {
        assert refreshedReference != current;

        // 原子切换到新Reader
        boolean success = false;
        try {
            // 通知监听器:beforeRefresh
            for (RefreshListener listener : refreshListeners) {
                listener.beforeRefresh();
            }

            // 原子替换
            release(current);
            current = refreshedReference;
            success = true;

            // 通知监听器:afterRefresh
            for (RefreshListener listener : refreshListeners) {
                listener.afterRefresh(true);
            }
        } finally {
            if (!success) {
                release(refreshedReference);
            }
        }
        return true;
    }

    return false;
}

步骤3: LiveVersionMap清理

// InternalEngine内的RefreshListener实现
private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener {
    final AtomicLong refreshedCheckpoint;
    private long pendingCheckpoint;

    @Override
    public void beforeRefresh() {
        // Refresh前:记录当前的处理检查点
        // 所有 <= pendingCheckpoint 的操作都将在Refresh后可见
        pendingCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
    }

    @Override
    public void afterRefresh(boolean didRefresh) {
        if (didRefresh) {
            // Refresh后:更新已刷新的检查点
            updateRefreshedCheckpoint(pendingCheckpoint);

            // 清理LiveVersionMap中已刷到Lucene的条目
            pruneVersionMap(pendingCheckpoint);
        }
    }

    void pruneVersionMap(long refreshedSeqNo) {
        // 遍历VersionMap,清理已持久化到Lucene的Location
        versionMap.pruneTombstones(refreshedSeqNo, relativeTimeInNanosSupplier.getAsLong());

        // 对于 seqNo <= refreshedSeqNo 的IndexVersionValue:
        // - 清除translogLocation字段(因为已经在Lucene中)
        // - 保留version/seqNo/term(仍需用于版本检查)
    }
}

3.3.3 Refresh触发条件

触发方式 条件 频率 阻塞性
定时触发 index.refresh_interval 默认1秒 非阻塞
手动触发 POST /_refresh 按需 阻塞
索引缓冲满 indices.memory.index_buffer_size 动态 阻塞
Flush触发 Flush操作前 按需 阻塞
实时GET get.realtime=true 按需 条件刷新

Refresh作用:

  1. 使写入的文档可被搜索
  2. 创建新的Lucene段(Segment)
  3. 清理LiveVersionMap内存
  4. 更新DirectoryReader

3.4 Flush流程 (持久化到磁盘)

3.4.1 Flush调用链路

flowchart TB
    Start[触发源<br/>Translog满/定时] --> FlushPolicy[FlushPolicy<br/>shouldPeriodicallyFlush]
    FlushPolicy --> EngineFlush[InternalEngine.flush]

    EngineFlush --> AcquireLock[获取flushLock<br/>防止并发Flush]
    AcquireLock --> CheckCondition{需要Flush?}

    CheckCondition --> |No| SkipFlush[跳过Flush]
    CheckCondition --> |Yes| PreRefresh[步骤1: Refresh<br/>刷新到新段]

    PreRefresh --> RollTranslog[步骤2: 滚动Translog<br/>translog.rollGeneration]
    RollTranslog --> PrepareCommit[步骤3: 准备提交<br/>IndexWriter.prepareCommit]

    PrepareCommit --> WriteSegmentsN[写入segments_N文件<br/>包含提交元数据]
    WriteSegmentsN --> FinishCommit[步骤4: 完成提交<br/>IndexWriter.finishCommit]

    FinishCommit --> Fsync[步骤5: Fsync所有文件<br/>directory.sync]
    Fsync --> TrimTranslog[步骤6: 清理Translog<br/>trimUnreferencedReaders]

    TrimTranslog --> DeleteOld[删除旧generation<br/>translog-1.tlog...]
    DeleteOld --> UpdateMetrics[更新统计信息<br/>lastFlushTimestamp]
    UpdateMetrics --> ReleaseLock[释放flushLock]
    ReleaseLock --> Notify[通知FlushListener]
    Notify --> End[完成]

    style RollTranslog fill:#ffe1e1
    style WriteSegmentsN fill:#fff4e1
    style Fsync fill:#e1f5ff

3.4.2 Flush关键代码

步骤1: 判断是否需要Flush

// InternalEngine.java: 2213-2242行
private boolean shouldPeriodicallyFlush(long flushThresholdSizeInBytes, long flushThresholdAgeInNanos) {
    ensureOpen();

    // 检查是否在大合并后需要Flush
    if (shouldPeriodicallyFlushAfterBigMerge.get()) {
        return true;
    }

    // 获取上次提交的本地检查点
    final long localCheckpointOfLastCommit = Long.parseLong(
        lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
    );

    // 计算需要的Translog generation
    final long translogGenerationOfLastCommit =
        translog.getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1)
            .translogFileGeneration();

    // 检查Translog大小和时间
    if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThresholdSizeInBytes
        && relativeTimeInNanosSupplier.getAsLong() - lastFlushTimestamp < flushThresholdAgeInNanos) {
        // 既未超过大小阈值,也未超过时间阈值
        return false;
    }

    /*
     * 防止无限Flush循环:
     * 只有当新提交会指向更新的generation时,才触发Flush
     * 这确保Flush后Translog大小会实际减小
     */
    final long translogGenerationOfNewCommit = translog.getMinGenerationForSeqNo(
        localCheckpointTracker.getProcessedCheckpoint() + 1
    ).translogFileGeneration();

    return translogGenerationOfNewCommit > translogGenerationOfLastCommit
        || localCheckpointTracker.getProcessedCheckpoint() == localCheckpointTracker.getMaxSeqNo();
}

步骤2: 执行Flush

// InternalEngine.java: 2283-2330行
protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionListener<FlushResult> listener) {
    // 获取Flush锁
    try {
        flushLock.lock();

        // 检查是否有未提交的更改
        boolean hasUncommittedChanges = hasUncommittedChanges();

        if (hasUncommittedChanges
            || force
            || shouldPeriodicallyFlush()
            || getProcessedLocalCheckpoint() > Long.parseLong(
                lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
            )) {

            ensureCanFlush();
            Translog.Location commitLocation = getTranslogLastWriteLocation();

            try {
                // 步骤1: 滚动Translog generation
                translog.rollGeneration();
                logger.trace("starting commit for flush; commitTranslog=true");

                long lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong();

                // 预先记录即将生成的segment generation
                preCommitSegmentGeneration.set(lastCommittedSegmentInfos.getGeneration() + 1);

                // 步骤2: 提交IndexWriter
                commitIndexWriter(indexWriter, translog);
                logger.trace("finished commit for flush");

                // 步骤3: Refresh以清理VersionMap
                refresh("version_table_flush", SearcherScope.INTERNAL, true);

                // 步骤4: 清理旧的Translog readers
                translog.trimUnreferencedReaders();

                // 更新Translog位置(如果有新写入)
                Translog.Location newLocation = getTranslogLastWriteLocation();
                if (newLocation.compareTo(commitLocation) > 0) {
                    commitLocation = newLocation;
                }
            } catch (Exception e) {
                throw new FlushFailedEngineException(shardId, e);
            } finally {
                this.lastFlushTimestamp = lastFlushTimestamp;
            }

            // 异步确保Translog同步
            asyncEnsureTranslogSynced(commitLocation, new ActionListener<Void>() {
                @Override
                public void onResponse(Void aVoid) {
                    listener.onResponse(new FlushResult(true, generation));
                }

                @Override
                public void onFailure(Exception e) {
                    listener.onFailure(e);
                }
            });
        } else {
            // 无需Flush
            listener.onResponse(new FlushResult(false, generation));
        }
    } finally {
        flushLock.unlock();
    }
}

步骤3: 提交IndexWriter

// InternalEngine.java
private void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException {
    ensureCanFlush();

    try {
        // 获取最新的检查点信息
        final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
        final long maxSeqNo = localCheckpointTracker.getMaxSeqNo();

        // 构建提交的元数据
        final Map<String, String> commitData = new HashMap<>(8);
        commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
        commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
        commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
        commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translog.currentFileGeneration()));
        commitData.put(Engine.HISTORY_UUID_KEY, historyUUID);
        commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));

        if (forceMergeUUID != null) {
            commitData.put(FORCE_MERGE_UUID_KEY, forceMergeUUID);
        }

        // 设置提交数据
        writer.setLiveCommitData(commitData.entrySet());

        // 步骤1: prepareCommit (写入segments_N但不fsync)
        writer.prepareCommit();

        // 步骤2: finishCommit (重命名segments_N使其可见)
        writer.commit();

    } catch (final Exception ex) {
        try {
            failEngine("lucene commit failed", ex);
        } catch (final Exception inner) {
            ex.addSuppressed(inner);
        }
        throw ex;
    } catch (final AssertionError e) {
        // IndexWriter bug导致的断言失败视为致命错误
        throw new FlushFailedEngineException(shardId, e);
    }
}

步骤4: Translog滚动和清理

// Translog.java: 滚动generation
public void rollGeneration() throws IOException {
    writeLock.lock();
    try {
        ensureOpen();

        // 关闭当前writer
        try {
            final TranslogWriter newWriter = createWriter(
                current.getGeneration() + 1,
                getMinFileGeneration(),
                current.getLastSyncedCheckpoint().offset
            );

            // 将当前writer移到readers列表
            readers.add(current.closeIntoReader());
            current = newWriter;

        } catch (final Exception e) {
            closeOnTragicEvent(e);
            throw new TranslogException(shardId, "failed to roll translog", e);
        }
    } finally {
        writeLock.unlock();
    }
}

// 清理不再需要的readers
public void trimUnreferencedReaders() throws IOException {
    writeLock.lock();
    try {
        // 获取最小需要保留的generation
        long minReferencedGen = deletionPolicy.getMinTranslogGenerationForRecovery();

        // 删除更老的generation文件
        for (Iterator<TranslogReader> it = readers.iterator(); it.hasNext(); ) {
            TranslogReader reader = it.next();
            if (reader.getGeneration() < minReferencedGen) {
                it.remove();
                IOUtils.closeWhileHandlingException(reader);
                Files.deleteIfExists(reader.path());
                logger.trace("deleted translog [{}]", reader.path());
            }
        }
    } finally {
        writeLock.unlock();
    }
}

3.4.3 Flush触发条件

触发方式 条件 说明
Translog大小 index.translog.flush_threshold_size 默认512MB
定时触发 定时检查 默认30分钟
手动触发 POST /_flush 立即执行
索引关闭 Close Index 确保数据持久化
大合并后 After Force Merge 清理Translog

Flush作用:

  1. 提交Lucene段到磁盘(创建segments_N)
  2. Fsync确保数据真正写入物理存储
  3. 清空/清理Translog文件
  4. 创建恢复点(Recovery Point)
  5. 减少恢复时间

Flush与Refresh的区别:

操作 Refresh Flush
目的 使文档可搜索 持久化到磁盘
频率 高(默认1秒) 低(默认512MB)
段操作 创建新段 提交段到磁盘
Translog 不处理 清理旧generation
持久性 有(fsync)
性能开销

3.5 Delete流程 (删除文档)

3.5.1 Delete调用链路(带Soft Deletes)

flowchart TB
    Start[DELETE /index/_doc/1] --> IndexShard[IndexShard.applyDeleteOperationOnPrimary]
    IndexShard --> EngineDelete[InternalEngine.delete]

    EngineDelete --> AcquireLock[获取UID锁<br/>versionMap.acquireLock]
    AcquireLock --> VersionCheck[版本检查<br/>getUnderLock]

    VersionCheck --> |冲突| Conflict[版本冲突异常]
    VersionCheck --> |通过| AssignSeqNo[分配SeqNo]

    AssignSeqNo --> CreateTombstone[创建Tombstone文档<br/>ParsedDocument.deleteTombstone]
    CreateTombstone --> SetFields[设置字段<br/>seqNo/primaryTerm/version]

    SetFields --> AddSoftDelete[添加软删除字段<br/>_soft_deletes: true]
    AddSoftDelete --> SoftUpdate[Lucene软删除<br/>softUpdateDocument]

    SoftUpdate --> WriteTranslog[写入Translog<br/>Translog.Delete]
    WriteTranslog --> UpdateVM[更新VersionMap<br/>putDeleteUnderLock]

    UpdateVM --> UpdateLCP[更新LocalCheckpointTracker]
    UpdateLCP --> ReleaseLock[释放UID锁]
    ReleaseLock --> End[返回DeleteResult]

    style CreateTombstone fill:#fff4e1
    style SoftUpdate fill:#fff4e1
    style UpdateVM fill:#e1ffe1

3.5.2 Delete关键代码

步骤1: InternalEngine.delete核心逻辑

// InternalEngine.java: 1633-1717行
@Override
public DeleteResult delete(Delete delete) throws IOException {
    // 强制使用安全访问(需要版本检查)
    versionMap.enforceSafeAccess();
    assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo());

    final DeleteResult deleteResult;
    int reservedDocs = 0;

    try (
        var ignored = acquireEnsureOpenRef();
        Releasable ignored2 = versionMap.acquireLock(delete.uid())
    ) {
        lastWriteNanos = delete.startTime();

        // 第一步:制定删除策略
        final DeletionStrategy plan = deletionStrategyForOperation(delete);
        reservedDocs = plan.reservedDocs;

        if (plan.earlyResultOnPreflightError.isPresent()) {
            // 预检失败(例如版本冲突)
            assert delete.origin() == Operation.Origin.PRIMARY : delete.origin();
            deleteResult = plan.earlyResultOnPreflightError.get();
        } else {
            // 第二步:写入Lucene (Soft Delete)
            if (plan.deleteFromLucene) {
                deleteResult = deleteInLucene(delete, plan);
            } else {
                // 不需要写Lucene(已经删除或旧版本)
                deleteResult = new DeleteResult(
                    plan.versionOfDeletion,
                    delete.primaryTerm(),
                    delete.seqNo(),
                    plan.currentlyDeleted == false,
                    delete.id()
                );
            }
        }

        // 第三步:写入Translog
        if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) {
            final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
            deleteResult.setTranslogLocation(location);
        }

        // 第四步:更新LocalCheckpointTracker
        localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo());
        if (deleteResult.getTranslogLocation() == null) {
            localCheckpointTracker.markSeqNoAsPersisted(deleteResult.getSeqNo());
        }

        deleteResult.setTook(System.nanoTime() - delete.startTime());
        deleteResult.freeze();
    } catch (RuntimeException | IOException e) {
        try {
            maybeFailEngine("delete", e);
        } catch (Exception inner) {
            e.addSuppressed(inner);
        }
        throw e;
    } finally {
        releaseInFlightDocs(reservedDocs);
    }

    // 定期清理删除的Tombstone
    maybePruneDeletes();
    return deleteResult;
}

步骤2: 写入Tombstone (Soft Delete)

// InternalEngine.java: 1840-1876行
private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException {
    assert assertMaxSeqNoOfUpdatesIsAdvanced(delete.uid(), delete.seqNo(), false, false);

    try {
        // 创建删除Tombstone文档
        final ParsedDocument tombstone = ParsedDocument.deleteTombstone(
            engineConfig.getIndexSettings().seqNoIndexOptions(),
            delete.id()
        );
        assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";

        // 设置seqNo和primaryTerm
        tombstone.updateSeqID(delete.seqNo(), delete.primaryTerm());

        // 设置version
        tombstone.version().setLongValue(plan.versionOfDeletion);

        final LuceneDocument doc = tombstone.docs().get(0);

        // 确保有_tombstone字段
        assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null
            : "Delete tombstone document but _tombstone field is not set [" + doc + " ]";

        // 添加软删除字段
        doc.add(softDeletesField);

        // 写入Lucene
        if (plan.addStaleOpToLucene || plan.currentlyDeleted) {
            // 文档不存在或已删除:直接addDocument
            indexWriter.addDocument(doc);
        } else {
            // 文档存在:使用softUpdateDocument
            // 先添加新的tombstone,再标记旧文档为软删除
            indexWriter.softUpdateDocument(
                new Term(IdFieldMapper.NAME, delete.uid()),
                doc,
                softDeletesField
            );
        }

        return new DeleteResult(
            plan.versionOfDeletion,
            delete.primaryTerm(),
            delete.seqNo(),
            plan.currentlyDeleted == false,  // found
            delete.id()
        );
    } catch (final Exception ex) {
        // 删除失败视为致命错误
        if (ex instanceof AlreadyClosedException == false && ex instanceof IOException == false) {
            throw new AssertionError("Unexpected exception during deletion", ex);
        }
        throw ex;
    }
}

Soft Delete机制说明:

普通删除 vs 软删除:

普通删除 (Hard Delete):
- IndexWriter.deleteDocuments(term)
- 文档立即标记为删除
- 在Merge时物理删除

软删除 (Soft Delete):
- IndexWriter.softUpdateDocument(term, tombstone, softDeleteField)
- 保留原文档,标记_soft_deletes=true
- 写入新的Tombstone文档
- 好处:
  1) 支持CCR (跨集群复制)
  2) 更高效的恢复(保留历史操作)
  3) 可以配置保留时间

3.6 实时GET流程

3.6.1 GET调用链路 (三级查找)

flowchart TB
    Start[GET /index/_doc/1] --> CheckRealtime{realtime=true?}
    CheckRealtime --> |No| DirectLucene[直接查询Lucene]
    CheckRealtime --> |Yes| AcquireLock[获取UID锁<br/>versionMap.acquireLock]

    AcquireLock --> GetVM[查询LiveVersionMap<br/>getUnderLock]

    GetVM --> CheckVM{存在VersionValue?}
    CheckVM --> |No| ReleaseLock1[释放锁]
    CheckVM --> |Yes| CheckDelete{isDelete?}

    CheckDelete --> |Yes| NotFound[返回NOT_EXISTS]
    CheckDelete --> |No| CheckLocation{有Translog Location?}

    CheckLocation --> |Yes| ReadTranslog[从Translog读取<br/>translog.read]
    CheckLocation --> |No| ReleaseLock2[释放锁]

    ReadTranslog --> Deserialize[反序列化Operation]
    Deserialize --> ExtractSource[提取_source]
    ExtractSource --> ReturnFromTL[返回GetResult]

    ReleaseLock1 --> SearchLucene[从Lucene搜索<br/>termQuery(_id)]
    ReleaseLock2 --> SearchLucene
    DirectLucene --> SearchLucene

    SearchLucene --> FindDoc{找到文档?}
    FindDoc --> |Yes| ExtractFields[提取字段]
    FindDoc --> |No| ReturnNotFound[返回NOT_EXISTS]

    ExtractFields --> CheckSoftDelete{_soft_deletes=true?}
    CheckSoftDelete --> |Yes| ReturnNotFound
    CheckSoftDelete --> |No| ReturnFromLucene[返回GetResult]

    style GetVM fill:#e1ffe1
    style ReadTranslog fill:#ffe1e1
    style SearchLucene fill:#fff4e1

3.6.2 GET关键代码

步骤1: 实时GET入口

// InternalEngine.java: 912-980行
protected GetResult realtimeGetUnderLock(
    Get get,
    MappingLookup mappingLookup,
    DocumentParser documentParser,
    Function<Searcher, Searcher> searcherWrapper,
    boolean getFromSearcher
) {
    assert isDrainedForClose() == false;
    assert get.realtime();

    // 第一步:查询LiveVersionMap
    final VersionValue versionValue;
    try (Releasable ignore = versionMap.acquireLock(get.uid())) {
        versionValue = getVersionFromMap(get.uid());
    }

    try {
        boolean getFromSearcherIfNotInTranslog = getFromSearcher;

        if (versionValue != null) {
            // 找到版本信息
            getFromSearcherIfNotInTranslog = true;

            // 检查是否是删除
            if (versionValue.isDelete()) {
                return GetResult.NOT_EXISTS;
            }

            // 检查版本冲突
            if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
                throw new VersionConflictEngineException(
                    shardId,
                    "[" + get.id() + "]",
                    get.versionType().explainConflictForReads(versionValue.version, get.version())
                );
            }

            // 检查if_seq_no和if_primary_term
            if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
                if (versionValue.seqNo != get.getIfSeqNo() || versionValue.term != get.getIfPrimaryTerm()) {
                    throw new VersionConflictEngineException(...);
                }
            }

            // 第二步:尝试从Translog读取
            if (versionValue instanceof IndexVersionValue) {
                IndexVersionValue iv = (IndexVersionValue) versionValue;
                if (iv.getLocation() != null) {
                    // 从Translog读取
                    return getFromTranslog(
                        get,
                        iv.getLocation(),
                        versionValue.version,
                        versionValue.seqNo,
                        versionValue.term,
                        mappingLookup,
                        documentParser
                    );
                }
            }
        }

        // 第三步:从Lucene读取
        if (getFromSearcherIfNotInTranslog) {
            return getFromSearcher(
                get,
                acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper),
                false
            );
        } else {
            return GetResult.NOT_EXISTS;
        }
    } catch (Exception e) {
        maybeFailEngine("realtime_get", e);
        throw e;
    }
}

步骤2: 从Translog读取

// InternalEngine.java
private GetResult getFromTranslog(
    Get get,
    Translog.Location location,
    long version,
    long seqNo,
    long term,
    MappingLookup mappingLookup,
    DocumentParser documentParser
) throws IOException {
    // 读取Translog操作
    Translog.Operation op = translog.readOperation(location);

    if (op == null) {
        // Translog已经滚动,fallback到Lucene
        return getFromSearcher(get, acquireSearcher("realtime_get_fallback", SearcherScope.INTERNAL), false);
    }

    assert op instanceof Translog.Index : "unexpected operation: " + op;
    Translog.Index index = (Translog.Index) op;

    // 提取_source
    BytesReference source = index.source();

    // 构建GetResult
    return new GetResult(
        new Searcher("realtime_get_translog", null, null),
        new Engine.GetResult.Exists(
            get.id(),
            version,
            seqNo,
            term,
            source,
            null,  // stored fields
            mappingLookup
        )
    );
}

步骤3: 从Lucene读取

// Engine.java: 913-971行
protected final GetResult getFromSearcher(
    Get get,
    Engine.Searcher searcher,
    boolean searcherWasRefreshed
) throws IOException {
    try {
        // 构建TermQuery查询_id
        Query query = new TermQuery(new Term(IdFieldMapper.NAME, get.uid()));

        // 搜索
        final IndexSearcher indexSearcher = searcher.getIndexSearcher();
        final Weight weight = indexSearcher.createWeight(
            indexSearcher.rewrite(query),
            ScoreMode.COMPLETE_NO_SCORES,
            1.0f
        );

        // 遍历所有Leaf
        for (LeafReaderContext leaf : indexSearcher.getIndexReader().leaves()) {
            final Scorer scorer = weight.scorer(leaf);
            if (scorer == null) {
                continue;
            }

            final DocIdSetIterator iterator = scorer.iterator();
            int docId = iterator.nextDoc();
            if (docId != DocIdSetIterator.NO_MORE_DOCS) {
                // 找到文档

                // 检查软删除
                final FieldsVisitor fieldsVisitor = new FieldsVisitor(true);
                leaf.reader().document(docId, fieldsVisitor);

                if (fieldsVisitor.isSoftDeleted()) {
                    // 软删除的文档,视为不存在
                    return GetResult.NOT_EXISTS;
                }

                // 提取字段
                final long seqNo = readSeqNo(leaf.reader(), docId);
                final long primaryTerm = readPrimaryTerm(leaf.reader(), docId);
                final long version = readVersion(leaf.reader(), docId);

                return new GetResult(
                    searcher,
                    new GetResult.Exists(
                        get.id(),
                        version,
                        seqNo,
                        primaryTerm,
                        fieldsVisitor.source(),
                        fieldsVisitor.fields(),
                        mappingLookup
                    )
                );
            }
        }

        // 未找到文档
        return GetResult.NOT_EXISTS;
    } finally {
        searcher.close();
    }
}

3.6.3 实时GET特性说明

三级查找顺序:

  1. LiveVersionMap: O(1)查找,获取最新状态和Translog位置
  2. Translog: 顺序文件读取,获取还未Refresh的文档
  3. Lucene: 索引查询,获取已Refresh的文档

实时性保证:

  • 写入后立即可读,无需等待Refresh
  • 利用LiveVersionMap快速定位
  • Translog提供WAL保证

性能考虑:

  • LiveVersionMap查询: 微秒级
  • Translog读取: 毫秒级(磁盘IO)
  • Lucene查询: 毫秒级(取决于段数量)

3.7 Merge流程 (段合并)

flowchart LR
    Start[后台 Merge] --> Policy[MergePolicy<br/>选择待合并段]
    Policy --> Scheduler[MergeScheduler<br/>调度合并任务]
    Scheduler --> Merge[合并多个小段<br/>为一个大段]
    Merge --> Delete[删除旧段]
    Delete --> End[完成]

Merge作用:

  • 减少段数量
  • 提升搜索性能
  • 物理删除已标记删除的文档(包括软删除)

4. 核心API

4.1 index - 索引文档

方法签名

public IndexResult index(Index index) throws IOException

参数说明

public static class Index extends Operation {
    private final ParsedDocument doc;          // 解析后的文档
    private final long version;                // 版本号
    private final VersionType versionType;     // 版本类型
    private final long ifSeqNo;                // 条件序列号
    private final long ifPrimaryTerm;          // 条件主分片任期
    private final long autoGeneratedIdTimestamp; // 自动生成ID的时间戳
}

返回值

public static class IndexResult extends Result {
    private final boolean created;  // 是否是新建(false表示更新)
}

核心逻辑

@Override
public IndexResult index(Index index) throws IOException {
    // 1. 版本检查(LiveVersionMap)
    final IndexingStrategy plan = planIndexingAsNonPrimary(index);

    // 2. 分配序列号
    if (index.origin() == Operation.Origin.PRIMARY) {
        index = new Index(..., generateSeqNoForOperationOnPrimary(index), ...);
    }

    // 3. 写入Translog
    Translog.Location location = translog.add(new Translog.Index(index));

    // 4. 写入Lucene
    if (plan.useLuceneUpdateDocument) {
        indexWriter.updateDocument(new Term(IdFieldMapper.NAME, index.uid()), index.docs());
    } else {
        indexWriter.addDocument(index.docs());
    }

    // 5. 更新LiveVersionMap
    versionMap.putIndexUnderLock(
        index.uid(),
        new IndexVersionValue(null, plan.versionForIndexing, index.seqNo(), index.primaryTerm())
    );

    return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), created);
}

4.2 delete - 删除文档

方法签名

public DeleteResult delete(Delete delete) throws IOException

参数说明

public static class Delete extends Operation {
    private final String id;                   // 文档ID
    private final BytesRef uid;                // 文档UID
    private final long version;                // 版本号
    private final VersionType versionType;     // 版本类型
}

核心逻辑

@Override
public DeleteResult delete(Delete delete) throws IOException {
    // 1. 版本检查
    final DeletionStrategy plan = deletionStrategyForOperation(delete);

    // 2. 分配序列号
    delete = new Delete(..., generateSeqNoForOperationOnPrimary(delete), ...);

    // 3. 写入Translog
    translog.add(new Translog.Delete(delete));

    // 4. 写入Lucene (软删除Tombstone)
    ParsedDocument tombstone = ParsedDocument.deleteTombstone(delete.id());
    tombstone.updateSeqID(delete.seqNo(), delete.primaryTerm());
    indexWriter.softUpdateDocument(new Term(IdFieldMapper.NAME, delete.uid()), tombstone.docs().get(0));

    // 5. 更新LiveVersionMap
    versionMap.putDeleteUnderLock(
        delete.uid(),
        new DeleteVersionValue(plan.versionOfDeletion, delete.seqNo(), delete.primaryTerm())
    );

    return new DeleteResult(plan.versionOfDeletion, delete.seqNo(), found);
}

4.3 get - 获取文档(实时GET)

方法签名

public GetResult get(
    Get get,
    BiFunction<String, SearcherScope, Engine.Searcher> searcherFactory
) throws EngineException

核心逻辑

public GetResult get(Get get, ...) {
    // 1. 检查LiveVersionMap
    try (Releasable ignored = versionMap.acquireLock(get.uid())) {
        VersionValue versionValue = versionMap.getUnderLock(get.uid());

        if (versionValue != null) {
            if (versionValue.isDelete()) {
                return GetResult.NOT_EXISTS;  // 已删除
            }
            if (versionValue instanceof IndexVersionValue) {
                IndexVersionValue iv = (IndexVersionValue) versionValue;
                if (iv.getLocation() != null) {
                    // 2. 从Translog读取(还未刷到Lucene)
                    return getFromTranslog(get, iv.getLocation());
                }
            }
        }
    }

    // 3. 从Lucene读取
    return getFromSearcher(get, searcherFactory);
}

4.4 refresh - 刷新搜索器

方法签名

public void refresh(String source) throws EngineException

核心逻辑

public void refresh(String source) throws EngineException {
    // 1. 获取最新的Lucene Searcher
    ReferenceManager.RefreshListener refreshListener = ...;

    // 2. Refresh IndexWriter
    indexWriter.getReader();  // 或indexWriter.commit()

    // 3. 更新ReaderManager
    externalReaderManager.maybeRefresh();

    // 4. 清理LiveVersionMap中已持久化的条目
    pruneVersionMap();
}

触发条件:

  • 定时触发(默认1秒)
  • 手动调用POST /_refresh
  • 索引缓冲满

4.5 flush - 提交到磁盘

方法签名

public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException

核心逻辑

public CommitId flush(boolean force, boolean waitIfOngoing) {
    // 1. 执行Refresh
    refresh("flush");

    // 2. 提交Lucene
    indexWriter.commit();

    // 3. Fsync到磁盘
    store.directory().sync();

    // 4. 创建新的Translog Generation
    translog.rollGeneration();
    translog.trimUnreferencedReaders();

    return new CommitId(lastCommittedSegmentInfos.getId());
}

触发条件:

  • Translog大小超过阈值(默认512MB)
  • 定时触发(默认30分钟)
  • 手动调用POST /_flush

4.6 forceMerge - 强制合并

方法签名

public void forceMerge(
    boolean flush,
    int maxNumSegments,
    boolean onlyExpungeDeletes,
    String forceMergeUUID
) throws EngineException

核心逻辑

public void forceMerge(...) {
    // 1. 设置MergePolicy
    if (onlyExpungeDeletes) {
        indexWriter.forceMergeDeletes();  // 仅清理删除
    } else {
        indexWriter.forceMerge(maxNumSegments);  // 合并到指定段数
    }

    // 2. 可选Flush
    if (flush) {
        flush(true, true);
    }
}

使用场景:

  • 只读索引优化(合并为1段)
  • 清理已删除文档
  • 减少段数量提升搜索性能

4.7 acquireSearcher - 获取搜索器

方法签名

public Engine.Searcher acquireSearcher(String source, SearcherScope scope)

核心逻辑

public Searcher acquireSearcher(String source, SearcherScope scope) {
    // 1. 获取DirectoryReader
    IndexReader reader = scope == SearcherScope.EXTERNAL
        ? externalReaderManager.acquire()
        : internalReaderManager.acquire();

    // 2. 包装为IndexSearcher
    IndexSearcher searcher = new IndexSearcher(reader);
    searcher.setQueryCache(queryCache);

    // 3. 返回Searcher(包含release方法)
    return new Searcher(source, searcher, () -> releaseReader(reader));
}

SearcherScope:

  • EXTERNAL: 用于用户查询(仅可见已refresh的文档)
  • INTERNAL: 用于内部操作(可见所有文档)

4.8 Translog API

4.8.1 add - 添加操作

public Translog.Location add(Translog.Operation operation) throws IOException

4.8.2 rollGeneration - 滚动Generation

public void rollGeneration() throws IOException

4.8.3 newSnapshot - 创建快照

public Translog.Snapshot newSnapshot() throws IOException

5. 核心数据结构

5.1 Engine & InternalEngine

类图

classDiagram
    class Engine {
        <<abstract>>
        +index(Index) IndexResult
        +delete(Delete) DeleteResult
        +get(Get) GetResult
        +refresh(String)
        +flush(boolean, boolean) CommitId
        +acquireSearcher(String, SearcherScope) Searcher
    }

    class InternalEngine {
        -IndexWriter indexWriter
        -DirectoryReader directoryReader
        -Translog translog
        -LiveVersionMap versionMap
        -LocalCheckpointTracker localCheckpointTracker
        -ReentrantLock flushLock
        +index(Index) IndexResult
        +delete(Delete) DeleteResult
    }

    class ReadOnlyEngine {
        -DirectoryReader directoryReader
        +index(Index) IndexResult
        +delete(Delete) DeleteResult
    }

    Engine <|-- InternalEngine
    Engine <|-- ReadOnlyEngine

类说明

Engine: 存储引擎抽象接口 InternalEngine: 默认实现,支持读写 ReadOnlyEngine: 只读实现,用于冻结索引


5.2 LiveVersionMap

类图

classDiagram
    class LiveVersionMap {
        -ConcurrentHashMap~BytesRef, VersionValue~ current
        -ConcurrentHashMap~BytesRef, VersionValue~ old
        -KeyedLock~BytesRef~ keyedLock
        +getUnderLock(BytesRef) VersionValue
        +putIndexUnderLock(BytesRef, IndexVersionValue)
        +putDeleteUnderLock(BytesRef, DeleteVersionValue)
        +acquireLock(BytesRef) Releasable
    }

    class VersionValue {
        <<abstract>>
        -long version
        -long seqNo
        -long term
        -long time
        +getVersion() long
        +getSeqNo() long
    }

    class IndexVersionValue {
        -Translog.Location location
        +getLocation() Translog.Location
    }

    class DeleteVersionValue {
        +isDelete() boolean
    }

    LiveVersionMap --> VersionValue
    VersionValue <|-- IndexVersionValue
    VersionValue <|-- DeleteVersionValue

类说明

LiveVersionMap

职责: 内存中的文档版本映射,用于实时GET和版本冲突检测

关键字段:

字段 类型 说明
current ConcurrentHashMap 当前活跃的版本映射
old ConcurrentHashMap 旧的版本映射(归档)
keyedLock KeyedLock UID级别的锁

核心方法:

// 获取版本(必须持有锁)
VersionValue getUnderLock(BytesRef uid);

// 更新索引版本
void putIndexUnderLock(BytesRef uid, IndexVersionValue versionValue);

// 更新删除版本
void putDeleteUnderLock(BytesRef uid, DeleteVersionValue versionValue);

// 获取锁
Releasable acquireLock(BytesRef uid);

VersionValue

职责: 文档版本信息基类

关键字段:

  • version: 版本号
  • seqNo: 序列号
  • term: 主分片任期
  • time: 时间戳

IndexVersionValue

职责: 索引操作的版本信息

关键字段:

  • location: Translog位置(如果还未刷到Lucene)

DeleteVersionValue

职责: 删除操作的版本信息


5.3 Translog

类图

classDiagram
    class Translog {
        -TranslogWriter current
        -List~TranslogReader~ readers
        -TranslogDeletionPolicy deletionPolicy
        -AtomicLong globalCheckpoint
        +add(Operation) Location
        +rollGeneration()
        +newSnapshot() Snapshot
    }

    class TranslogWriter {
        -FileChannel channel
        -long generation
        -AtomicLong writtenOffset
        +add(Operation) Location
        +sync()
    }

    class TranslogReader {
        -FileChannel channel
        -long generation
        -long firstOperationOffset
        +newIterator() Iterator
    }

    class Operation {
        <<abstract>>
        -long seqNo
        -long primaryTerm
        -Operation.Type type
    }

    class Index {
        -String id
        -byte[] source
        -long version
    }

    class Delete {
        -String id
        -long version
    }

    class NoOp {
        -String reason
    }

    Translog --> TranslogWriter
    Translog --> TranslogReader
    Translog --> Operation
    Operation <|-- Index
    Operation <|-- Delete
    Operation <|-- NoOp

类说明

Translog

职责: 事务日志,保证数据持久性

关键字段:

字段 类型 说明
current TranslogWriter 当前写入的generation
readers List 历史generations(用于恢复)
deletionPolicy TranslogDeletionPolicy 删除策略
globalCheckpoint AtomicLong 全局检查点

Generation机制:

Generation 0 → Generation 1 → Generation 2
     ↓              ↓              ↓
translog-1.tlog  translog-2.tlog  translog-3.tlog (current)

Operation

操作类型:

  • Index: 索引操作
  • Delete: 删除操作
  • NoOp: 空操作(用于序列号对齐)

字段结构:

public abstract class Operation {
    private long seqNo;         // 序列号
    private long primaryTerm;   // 主分片任期
    private Type type;          // 操作类型
}

5.4 LocalCheckpointTracker

类图

classDiagram
    class LocalCheckpointTracker {
        -long checkpoint
        -LongObjectHashMap~CountedBitSet~ processedSeqNo
        -List~DelayedOperation~ delayedOperations
        +markSeqNoAsProcessed(long)
        +markSeqNoAsPersisted(long)
        +getProcessedCheckpoint() long
        +getPersistedCheckpoint() long
    }

    class CountedBitSet {
        -BitSet bitSet
        -int count
        +set(int) boolean
        +get(int) boolean
    }

    LocalCheckpointTracker --> CountedBitSet

类说明

LocalCheckpointTracker

职责: 跟踪本地检查点,确保序列号连续

关键概念:

Processed Checkpoint: 所有 <= checkpoint 的 seqNo 都已处理
Persisted Checkpoint: 所有 <= checkpoint 的 seqNo 都已持久化

核心方法:

// 标记序列号为已处理
void markSeqNoAsProcessed(long seqNo);

// 标记序列号为已持久化
void markSeqNoAsPersisted(long seqNo);

// 获取处理检查点
long getProcessedCheckpoint();

// 获取持久化检查点
long getPersistedCheckpoint();

示例:

已处理的 seqNo: [0, 1, 2, 3, 5, 6, 7]
                          ↑ 缺少 4
Processed Checkpoint = 3  (因为4还未处理)

当 4 被处理后:
Processed Checkpoint = 7

5.5 Lucene相关数据结构

IndexWriter

public class IndexWriter implements Closeable {
    // 添加文档
    long addDocument(Iterable<? extends IndexableField> doc);

    // 更新文档
    long updateDocument(Term term, Iterable<? extends IndexableField> doc);

    // 删除文档
    long deleteDocuments(Term... terms);

    // 软删除(标记而非物理删除)
    long softUpdateDocument(Term term, Iterable<? extends IndexableField> doc, Field softDeletesField);

    // 提交
    long commit();

    // 强制合并
    void forceMerge(int maxNumSegments);
}

DirectoryReader

public abstract class DirectoryReader extends IndexReader {
    // 打开Reader
    static DirectoryReader open(Directory directory);

    // 获取段信息
    SegmentInfos getSegmentInfos();

    // 刷新(获取最新Reader)
    DirectoryReader openIfChanged(DirectoryReader oldReader);
}

SegmentInfos

public final class SegmentInfos implements Cloneable {
    // 所有段的列表
    private List<SegmentCommitInfo> segments;

    // 版本号
    private long version;

    // Generation
    private long generation;
}

5.6 数据结构关系总览

graph TB
    IE[InternalEngine] --> IW[IndexWriter<br/>Lucene写入]
    IE --> DR[DirectoryReader<br/>Lucene读取]
    IE --> TL[Translog<br/>事务日志]
    IE --> LVM[LiveVersionMap<br/>版本映射]
    IE --> LCT[LocalCheckpointTracker<br/>检查点跟踪]

    IW --> Segments[Segments<br/>Lucene段]
    DR --> Segments

    TL --> TW[TranslogWriter<br/>当前Generation]
    TL --> TR[TranslogReader<br/>历史Generation]

    LVM --> IVV[IndexVersionValue<br/>索引版本]
    LVM --> DVV[DeleteVersionValue<br/>删除版本]

    style IE fill:#e1f5ff
    style TL fill:#ffe1e1
    style LVM fill:#e1ffe1

6. 详细时序图

6.1 文档索引流程

完整索引时序图

sequenceDiagram
    autonumber
    participant Shard as IndexShard
    participant Engine as InternalEngine
    participant VersionMap as LiveVersionMap
    participant Translog as Translog
    participant IndexWriter as IndexWriter<br/>(Lucene)
    participant LCT as LocalCheckpointTracker

    Shard->>Engine: index(Index operation)

    Note over Engine,LCT: 步骤1: 版本检查与锁获取

    Engine->>VersionMap: acquireLock(uid)
    VersionMap-->>Engine: Lock acquired

    Engine->>VersionMap: getUnderLock(uid)
    VersionMap-->>Engine: currentVersion (或 null)

    Engine->>Engine: 版本冲突检查<br/>(if version conflict)

    alt 版本冲突
        Engine-->>Shard: VersionConflictException
    end

    Note over Engine,LCT: 步骤2: 分配序列号

    alt Primary Shard
        Engine->>Engine: generateSeqNoForOperationOnPrimary()
        Engine->>Engine: seqNo = localCheckpoint + 1
    else Replica Shard
        Engine->>Engine: 使用 Primary 分配的 seqNo
    end

    Note over Engine,LCT: 步骤3: 写入 Translog

    Engine->>Translog: add(IndexOperation)
    Translog->>Translog: 序列化操作
    Translog->>Translog: 写入当前 generation
    Translog-->>Engine: Location

    Note over Engine,LCT: 步骤4: 写入 Lucene

    Engine->>Engine: 判断是 Add 还是 Update

    alt 文档不存在(Create)
        Engine->>IndexWriter: addDocument(doc)
    else 文档已存在(Update)
        Engine->>IndexWriter: updateDocument(term, doc)
    end

    IndexWriter->>IndexWriter: 写入内存缓冲
    IndexWriter-->>Engine: OK

    Note over Engine,LCT: 步骤5: 更新 LiveVersionMap

    Engine->>VersionMap: putIndexUnderLock(uid, <br/>new IndexVersionValue(location, version, seqNo, term))
    VersionMap->>VersionMap: current.put(uid, versionValue)

    Note over Engine,LCT: 步骤6: 更新 LocalCheckpointTracker

    Engine->>LCT: markSeqNoAsProcessed(seqNo)
    LCT->>LCT: 更新 processedSeqNo BitSet
    LCT->>LCT: 计算新的 checkpoint
    LCT-->>Engine: newCheckpoint

    Engine->>VersionMap: releaseLock(uid)
    Engine-->>Shard: IndexResult(version, seqNo, created)

时序图说明

关键步骤详解

1. 版本检查(步骤1-5)

  • 获取文档级别的锁
  • 从LiveVersionMap获取当前版本
  • 根据VersionType检查版本冲突
  • 可能的VersionType:
    • INTERNAL: 内部版本(自动递增)
    • EXTERNAL: 外部版本(用户指定)
    • EXTERNAL_GTE: 外部版本(大于等于)

2. 序列号分配(步骤6-8)

  • Primary: 生成新的seqNo = localCheckpoint + 1
  • Replica: 使用Primary分配的seqNo
  • seqNo用于保证操作顺序和一致性

3. Translog写入(步骤9-12)

  • 序列化操作到字节数组
  • 追加写入当前generation
  • 返回Location (generation, position, size)
  • 可配置同步策略:
    • REQUEST: 每次请求后fsync(默认)
    • ASYNC: 异步fsync(5秒间隔)

4. Lucene写入(步骤13-18)

  • 判断是新增还是更新
  • 新增: addDocument(doc)
  • 更新: updateDocument(term, doc) (先删除后添加)
  • 写入IndexWriter的内存缓冲
  • 此时文档还不可搜索(需要Refresh)

5. LiveVersionMap更新(步骤19-21)

  • 记录最新的版本信息
  • 包含Translog Location(用于实时GET)
  • 后续Refresh后会清理已持久化的条目

6. LocalCheckpointTracker更新(步骤22-25)

  • 标记seqNo为已处理
  • 更新本地检查点
  • 用于跟踪复制进度

6.2 Refresh流程

Refresh时序图

sequenceDiagram
    autonumber
    participant Scheduler as RefreshScheduler<br/>(每1秒)
    participant Engine as InternalEngine
    participant IndexWriter as IndexWriter
    participant ReaderManager as ReaderManager
    participant VersionMap as LiveVersionMap

    Scheduler->>Engine: refresh("scheduled")

    Note over Engine,VersionMap: 步骤1: 生成新的可搜索段

    Engine->>IndexWriter: getReader()<br/>或 commit()
    IndexWriter->>IndexWriter: 1. flush 内存缓冲到段
    IndexWriter->>IndexWriter: 2. 创建新的 SegmentReader
    IndexWriter-->>Engine: DirectoryReader

    Note over Engine,VersionMap: 步骤2: 更新 ReaderManager

    Engine->>ReaderManager: maybeRefreshBlocking()
    ReaderManager->>ReaderManager: 更新到新的 Reader
    ReaderManager->>ReaderManager: 关闭旧的 Reader
    ReaderManager-->>Engine: refreshed

    Note over Engine,VersionMap: 步骤3: 清理 LiveVersionMap

    Engine->>VersionMap: pruneVersionMap(refreshedSeqNo)

    loop 遍历 VersionMap 条目
        VersionMap->>VersionMap: 检查 location != null<br/>&& seqNo <= refreshedSeqNo
        alt 已持久化到 Lucene
            VersionMap->>VersionMap: 移除 location<br/>(保留 version/seqNo)
        end
    end

    VersionMap-->>Engine: pruned

    Note over Engine,VersionMap: 步骤4: 通知监听器

    Engine->>Engine: 触发 RefreshListener
    Engine-->>Scheduler: refresh completed

时序图说明

Refresh的作用

使文档可被搜索:

  • 将IndexWriter内存缓冲中的文档刷到新段
  • 创建新的DirectoryReader
  • 新文档变为可搜索状态

清理LiveVersionMap:

  • 移除已持久化到Lucene的Location
  • 减少内存占用
  • 保留版本和序列号(用于版本检查)

触发条件

  1. 定时触发: index.refresh_interval (默认1s)
  2. 手动触发: POST /_refresh
  3. 索引缓冲满: indices.memory.index_buffer_size 超限
  4. Flush操作: Flush会先执行Refresh

6.3 Flush流程

Flush时序图

sequenceDiagram
    autonumber
    participant Trigger as Trigger<br/>(Translog满/定时)
    participant Engine as InternalEngine
    participant IndexWriter as IndexWriter
    participant Store as Store<br/>(Directory)
    participant Translog as Translog
    participant LCT as LocalCheckpointTracker

    Trigger->>Engine: flush(force=false, wait=true)

    Engine->>Engine: 获取 flushLock

    Note over Engine,LCT: 步骤1: Refresh(生成新段)

    Engine->>Engine: refresh("flush")
    Note right of Engine: 参见 Refresh 流程

    Note over Engine,LCT: 步骤2: 提交 Lucene

    Engine->>IndexWriter: commit()
    IndexWriter->>IndexWriter: 1. prepareCommit()<br/>生成 segments_N 文件
    IndexWriter->>IndexWriter: 2. finishCommit()<br/>完成提交
    IndexWriter-->>Engine: CommitInfo

    Note over Engine,LCT: 步骤3: Fsync 到磁盘

    Engine->>Store: directory().sync(fileNames)
    Store->>Store: fsync 所有段文件
    Store-->>Engine: synced

    Note over Engine,LCT: 步骤4: 创建新的 Translog Generation

    Engine->>LCT: getPersistedCheckpoint()
    LCT-->>Engine: checkpoint

    Engine->>Translog: rollGeneration()
    Translog->>Translog: 1. 关闭当前 writer
    Translog->>Translog: 2. 创建新 generation
    Translog->>Translog: 3. 移动到 readers 列表
    Translog-->>Engine: newGeneration

    Note over Engine,LCT: 步骤5: 清理旧 Translog

    Engine->>Translog: trimUnreferencedReaders()
    Translog->>Translog: 删除已持久化的<br/>旧 generation 文件

    Engine->>Engine: 释放 flushLock
    Engine-->>Trigger: CommitId

时序图说明

Flush的作用

持久化数据:

  • 将所有内存中的段提交到磁盘
  • Fsync确保数据真正写入物理存储
  • 创建恢复点(Lucene commit point)

清理Translog:

  • 滚动到新的generation
  • 删除已持久化的旧generation
  • 减少恢复时间

触发条件

  1. Translog大小: index.translog.flush_threshold_size (默认512MB)
  2. 定时触发: 默认每30分钟
  3. 手动触发: POST /_flush
  4. 关闭索引: Close index时

6.4 实时GET流程

GET时序图

sequenceDiagram
    autonumber
    participant Client as GetRequest
    participant Engine as InternalEngine
    participant VersionMap as LiveVersionMap
    participant Translog as Translog
    participant Lucene as Lucene<br/>DirectoryReader

    Client->>Engine: get(Get request)

    Note over Engine,Lucene: 步骤1: 检查 LiveVersionMap

    Engine->>VersionMap: acquireLock(uid)
    Engine->>VersionMap: getUnderLock(uid)
    VersionMap-->>Engine: VersionValue

    alt VersionValue 是删除
        Engine->>VersionMap: releaseLock(uid)
        Engine-->>Client: GetResult(NOT_FOUND)
    else VersionValue 有 Translog Location
        Note over Engine,Translog: 步骤2: 从 Translog 读取
        Engine->>Translog: read(location)
        Translog->>Translog: seek(generation, position)
        Translog->>Translog: 读取并反序列化
        Translog-->>Engine: Operation
        Engine->>Engine: 提取 _source
        Engine->>VersionMap: releaseLock(uid)
        Engine-->>Client: GetResult(found, source)
    else VersionValue 无 Location
        Engine->>VersionMap: releaseLock(uid)
        Note over Engine,Lucene: 步骤3: 从 Lucene 读取
        Engine->>Lucene: search(termQuery(_id))
        Lucene->>Lucene: 查找文档
        Lucene-->>Engine: Document
        Engine->>Engine: 提取 _source
        Engine-->>Client: GetResult(found, source)
    end

时序图说明

实时GET原理

三级查找:

  1. LiveVersionMap: 检查文档状态和位置
  2. Translog: 读取还未刷到Lucene的文档
  3. Lucene: 读取已持久化的文档

实时性保证:

  • 即使文档还未Refresh,也能通过Translog读取
  • 避免等待Refresh(1秒延迟)
  • 保证写入后立即可读

6.5 Merge流程

Merge时序图

sequenceDiagram
    autonumber
    participant BG as Background<br/>MergeScheduler
    participant Policy as MergePolicy
    participant Scheduler as ConcurrentMergeScheduler
    participant IndexWriter as IndexWriter
    participant Lucene as Lucene<br/>段文件

    Note over BG,Lucene: 后台持续运行

    BG->>Policy: findMerges(segmentInfos)
    Policy->>Policy: 评估段大小和数量
    Policy->>Policy: 选择待合并的段<br/>(小段优先)
    Policy-->>BG: MergeSpecification

    alt 有待合并的段
        BG->>Scheduler: merge(mergeSpec)

        Scheduler->>Scheduler: 分配合并线程<br/>(max_thread_count)

        par 并发合并多个 Merge
            Scheduler->>IndexWriter: merge(oneMerge)
            IndexWriter->>Lucene: 读取段1数据
            IndexWriter->>Lucene: 读取段2数据
            IndexWriter->>Lucene: 读取段N数据

            IndexWriter->>IndexWriter: 合并倒排索引
            IndexWriter->>IndexWriter: 合并文档存储
            IndexWriter->>IndexWriter: 合并 DocValues

            IndexWriter->>Lucene: 写入新段
            IndexWriter->>IndexWriter: 删除旧段
            IndexWriter-->>Scheduler: merge completed
        end

        Scheduler-->>BG: all merges completed
    end

时序图说明

Merge策略

TieredMergePolicy (默认):

  • 将相似大小的段合并
  • 目标: 减少段数量,同时避免合并过大的段
  • 参数:
    • max_merged_segment: 最大段大小(5GB)
    • segments_per_tier: 每层段数量(10)

Force Merge:

  • 手动触发: POST /_forcemerge?max_num_segments=1
  • 用于只读索引优化
  • 合并为指定数量的段

7. 性能优化

7.1 写入性能

  1. 批量写入: 使用Bulk API
  2. Refresh间隔: 调整index.refresh_interval
  3. Translog刷盘: 调整index.translog.durability
  4. 索引缓冲: 调整indices.memory.index_buffer_size

7.2 搜索性能

  1. Refresh: 更频繁的refresh提升实时性
  2. Merge: 减少段数量
  3. 缓存: 利用QueryCache和FieldDataCache

7.3 存储优化

  1. Soft Deletes: 减少段合并开销
  2. Force Merge: 强制合并为单段
  3. 压缩: 使用best_compression codec

8. 配置与监控

8.1 关键配置

Refresh相关

参数 默认值 说明
index.refresh_interval 1s Refresh间隔
index.max_refresh_listeners 1000 最大Refresh监听器数量

Flush相关

参数 默认值 说明
index.translog.flush_threshold_size 512mb Translog大小触发Flush
index.translog.generation_threshold_size 64mb Translog generation滚动阈值
index.translog.durability REQUEST 事务日志刷盘策略
index.translog.sync_interval 5s Translog fsync间隔

Merge相关

参数 默认值 说明
index.merge.scheduler.max_thread_count max(1, processors/2) 合并线程数
index.merge.policy.max_merged_segment 5gb 最大合并段大小

8.2 监控指标

GET /_nodes/stats/indices

{
  "indices": {
    "indexing": {
      "index_total": 1000,
      "index_time_in_millis": 5000,
      "index_current": 2
    },
    "refresh": {
      "total": 100,
      "total_time_in_millis": 2000
    },
    "flush": {
      "total": 10,
      "total_time_in_millis": 1000
    },
    "merges": {
      "current": 1,
      "total": 50,
      "total_time_in_millis": 10000
    }
  }
}

9. 最佳实践

9.1 写入优化

批量写入:

POST /_bulk
{"index":{"_index":"my-index","_id":"1"}}
{"field":"value1"}
{"index":{"_index":"my-index","_id":"2"}}
{"field":"value2"}

调整Refresh间隔:

PUT /my-index/_settings
{
  "index": {
    "refresh_interval": "30s"
  }
}

异步Translog:

PUT /my-index/_settings
{
  "index": {
    "translog.durability": "async",
    "translog.sync_interval": "5s"
  }
}

9.2 只读索引优化

Force Merge到1段:

POST /my-index/_forcemerge?max_num_segments=1

禁用Refresh:

PUT /my-index/_settings
{
  "index": {
    "refresh_interval": "-1"
  }
}

9.3 实时搜索场景

降低Refresh间隔:

PUT /my-index/_settings
{
  "index": {
    "refresh_interval": "200ms"
  }
}

使用GET API获取实时数据:

GET /my-index/_doc/1

10. 常见问题

10.1 版本冲突

错误:

VersionConflictEngineException: document already exists

原因: 使用op_type=create但文档已存在

解决: 使用index操作或指定正确的版本号

10.2 Translog损坏

错误:

TranslogCorruptedException: translog is corrupted

解决:

# 清理损坏的translog (会丢失未提交的数据)
POST /_cluster/reroute?retry_failed=true

10.3 段数量过多

现象: 搜索性能下降

解决:

# 强制合并
POST /my-index/_forcemerge?max_num_segments=5

总结

核心要点回顾

存储引擎是Elasticsearch的核心组件,通过InternalEngine封装Lucene,提供文档索引、实时GET、版本控制等功能。

核心机制:

  1. LiveVersionMap: 实现实时GET和版本冲突检测

    • O(1)查找文档最新版本
    • 存储Translog位置用于实时GET
    • UID级别锁保证并发安全
  2. Translog: 保证数据持久性和故障恢复

    • WAL (Write-Ahead Log)机制
    • Generation滚动机制
    • 支持Replica追赶和恢复
  3. Refresh/Flush/Merge: 管理Lucene段的生命周期

    • Refresh: 使文档可搜索(默认1秒)
    • Flush: 持久化到磁盘(默认512MB)
    • Merge: 合并段,物理删除
  4. Sequence Number: 保证操作顺序和复制一致性

    • 全局唯一、单调递增
    • Primary分配、Replica沿用
    • 用于追赶和一致性校验

调用链路总结

客户端请求
TransportAction (路由)
IndexShard (分片管理)
  ├─ prepareIndex (解析文档)
  ├─ indexingOperationListeners.preIndex
  └─ Engine.index
InternalEngine (核心引擎)
  ├─ versionMap.acquireLock (获取UID锁)
  ├─ planIndexing (版本检查)
  ├─ translog.add (写入WAL)
  ├─ indexWriter.updateDocument (写入Lucene)
  ├─ versionMap.putIndexUnderLock (更新版本)
  └─ localCheckpointTracker.markSeqNoAsProcessed

性能调优要点

写入优化:

  • 批量写入 (Bulk API)
  • 调整Refresh间隔 (降低实时性要求)
  • 异步Translog刷盘 (降低持久性要求)
  • 自动生成ID优化 (无版本检查)

搜索优化:

  • 合理Refresh间隔 (平衡实时性和性能)
  • Force Merge只读索引 (合并为单段)
  • 利用QueryCache和FieldDataCache

存储优化:

  • Soft Deletes (支持CCR,更高效恢复)
  • Force Merge (物理删除已删除文档)
  • Best Compression Codec

理解存储引擎的工作原理对于调优写入性能、搜索性能和存储效率至关重要。