Elasticsearch-02-索引模块

本文档提供索引模块的全面剖析,包括模块职责、架构设计、核心数据结构、对外API详细规格、关键流程时序图、配置调优和故障排查。


1. 模块职责

索引模块负责 Elasticsearch 中索引的创建、管理和文档操作,是数据写入路径的核心模块。主要职责包括:

  • 索引生命周期管理: 索引的创建、打开、关闭和删除
  • 分片管理: 管理索引的主分片和副本分片
  • 文档 CRUD 操作: 文档的创建、读取、更新和删除
  • Mapping 管理: 管理字段映射和动态映射更新
  • 版本控制: 基于版本号和序列号的并发控制
  • 数据持久化: 通过存储引擎和 Translog 保证数据持久性

输入/输出

输入:

  • 索引请求 (Index/Update/Delete/Bulk)
  • 集群状态变更 (新分片分配、分片迁移)
  • 恢复请求 (从副本或快照恢复)

输出:

  • 索引操作结果 (成功/失败, 版本号, 序列号)
  • 分片状态变更事件
  • 统计信息 (索引速率、文档数、存储大小)

上下游依赖

上游调用方:

  • Action Module (TransportIndexAction, TransportBulkAction)
  • Search Module (需要读取索引数据)
  • Cluster Module (集群状态变更触发分片操作)

下游被调用方:

  • Storage Engine (InternalEngine, ReadOnlyEngine)
  • Apache Lucene (文档索引和搜索)
  • Translog (操作日志持久化)
  • Mapper Service (文档解析和映射)

生命周期

stateDiagram-v2
    [*] --> CREATED: IndexService 创建
    CREATED --> RECOVERING: 分片恢复开始
    RECOVERING --> POST_RECOVERY: 恢复完成
    POST_RECOVERY --> STARTED: 分片启动
    STARTED --> RELOCATING: 分片迁移
    RELOCATING --> STARTED: 迁移完成
    STARTED --> CLOSED: 索引关闭
    CLOSED --> STARTED: 索引重新打开
    STARTED --> [*]: 索引删除
    CLOSED --> [*]: 索引删除

状态说明:

  • CREATED: IndexService 创建,尚未创建分片
  • RECOVERING: 分片正在恢复(从副本、快照或 Translog)
  • POST_RECOVERY: 恢复完成,执行恢复后操作
  • STARTED: 分片已启动,可以处理读写请求
  • RELOCATING: 分片正在迁移到其他节点
  • CLOSED: 索引已关闭,不占用内存资源
  • 删除: 索引及其所有数据被删除

2. 模块架构图

flowchart TB
    subgraph IndexModule["索引模块"]
        direction TB

        subgraph IndexService["IndexService 索引服务"]
            IS[IndexService<br/>索引实例管理]
            IM[IndexModule<br/>索引模块配置]
            IC[IndexCache<br/>缓存管理]
            IFD[IndexFieldDataService<br/>FieldData 服务]
        end

        subgraph Shards["分片管理"]
            Shard1[IndexShard<br/>主分片 0]
            Shard2[IndexShard<br/>副本分片 0]
            ShardN[IndexShard<br/>分片 N]
        end

        subgraph ShardComponents["分片组件"]
            Store[Store<br/>存储管理]
            Engine[Engine<br/>存储引擎]
            Translog[Translog<br/>事务日志]
            Mapper[MapperService<br/>映射服务]
            Codec[CodecService<br/>编解码服务]
        end

        subgraph Recovery["恢复机制"]
            RecTarget[RecoveryTarget<br/>恢复目标]
            RecSource[RecoverySource<br/>恢复源]
            RecState[RecoveryState<br/>恢复状态]
        end

        subgraph Operations["操作处理"]
            IndexOp[Index Operation<br/>索引操作]
            DeleteOp[Delete Operation<br/>删除操作]
            GetOp[Get Operation<br/>查询操作]
            BulkOp[Bulk Operation<br/>批量操作]
        end
    end

    subgraph External["外部依赖"]
        Lucene[Apache Lucene<br/>全文检索库]
        ClusterSvc[Cluster Service<br/>集群服务]
        TransportSvc[Transport Service<br/>传输服务]
    end

    IS --> IM
    IS --> IC
    IS --> IFD
    IS --> Shard1
    IS --> Shard2
    IS --> ShardN

    Shard1 --> Store
    Shard1 --> Engine
    Shard1 --> Translog
    Shard1 --> Mapper
    Shard1 --> Codec

    Shard1 --> RecTarget
    Shard2 --> RecSource
    RecTarget --> RecState

    IndexOp --> Shard1
    DeleteOp --> Shard1
    GetOp --> Shard1
    BulkOp --> Shard1

    Engine --> Lucene
    IS --> ClusterSvc
    Shard1 --> TransportSvc

    style IndexModule fill:#E1F5E1
    style IndexService fill:#FFF4E1
    style Shards fill:#E1E1F5
    style ShardComponents fill:#FFE1E1
    style Recovery fill:#F5E1FF
    style Operations fill:#E1F5FF

架构说明

组件职责

  1. IndexService (索引服务)

    • 管理索引级别的配置和元数据
    • 创建和管理多个 IndexShard 实例
    • 提供索引级别的缓存和 FieldData 服务
    • 协调索引级别的操作(Refresh, Flush, ForceMerge)
  2. IndexShard (索引分片)

    • 管理单个分片的完整生命周期
    • 处理文档的 CRUD 操作
    • 管理分片级别的存储引擎(Engine)
    • 维护分片的路由信息和状态
    • 处理分片恢复和副本复制
  3. Store (存储管理)

    • 管理 Lucene 的 Directory 和文件
    • 提供分片级别的文件锁
    • 检查和修复损坏的索引文件
  4. Engine (存储引擎)

    • 封装 Lucene IndexWriter 和 IndexReader
    • 管理文档的索引和删除操作
    • 维护 VersionMap(文档版本映射)
    • 控制 Refresh、Flush 和 Merge 操作
  5. Translog (事务日志)

    • 记录所有未持久化的操作
    • 保证数据持久性和故障恢复
    • 支持序列号检查点机制

边界条件

  1. 并发控制

    • 同一文档的写操作通过 UID 锁串行化
    • Refresh 和 Flush 通过读写锁协调
    • 分片状态变更通过 mutex 保护
  2. 资源限制

    • IndexBuffer 默认占用 10% 堆内存
    • Translog 大小影响恢复时间,默认 512MB 触发 Flush
    • 分片大小建议控制在 10-50GB
  3. 超时控制

    • 索引操作默认超时 1 分钟
    • 分片恢复超时取决于数据量
    • Refresh 操作尽力而为,不阻塞写入

异常处理与回退

  1. 文档级别异常

    • 解析错误: 返回失败,不影响其他文档
    • 版本冲突: 返回 409,客户端重试
    • Mapping 更新失败: 返回需要更新 Mapping
  2. 分片级别异常

    • 写入失败: 标记分片失败,触发重新分配
    • Translog 损坏: 分片进入失败状态,从副本恢复
    • 磁盘空间不足: 触发只读索引块
  3. 索引级别异常

    • 所有分片不可用: 索引状态为 Red
    • 部分分片不可用: 索引状态为 Yellow

性能优化点

  1. 批量操作: 使用 Bulk API 减少网络往返和锁竞争
  2. Refresh 间隔: 增大 refresh_interval 减少 Segment 生成
  3. Translog 异步: 配置 async fsync 提升吞吐量
  4. 禁用 _source: 不需要原始文档时禁用 _source
  5. 并发写入: 增加 index_buffer_size 和 写入线程池

可观测性

  • 索引速率: docs/s, 监控写入吞吐量
  • 索引延迟: ms, P99 延迟指标
  • Refresh 时间: ms, Refresh 操作耗时
  • Merge 时间: ms, Segment 合并耗时
  • Translog 大小: bytes, 未 Flush 的操作日志大小
  • 分片状态: STARTED/RECOVERING/RELOCATING/FAILED

3. 核心数据流

3.1 文档索引流程

flowchart LR
    A[Index Request] --> B{路由计算}
    B --> C[主分片]
    C --> D[解析文档]
    D --> E{Mapping 更新?}
    E -->|是| F[更新 Mapping]
    E -->|否| G[执行索引]
    F --> G
    G --> H[写入 Engine]
    H --> I[更新 VersionMap]
    I --> J[写入 Lucene]
    J --> K[写入 Translog]
    K --> L[复制到副本]
    L --> M[返回响应]

3.2 分片恢复流程

flowchart TB
    A[分片分配] --> B{恢复类型}
    B -->|EmptyStore| C[创建空分片]
    B -->|ExistingStore| D[从本地恢复]
    B -->|Peer| E[从副本恢复]
    B -->|Snapshot| F[从快照恢复]

    C --> G[Translog 回放]
    D --> G
    E --> H[阶段1: 文件复制]
    F --> I[从仓库下载]

    H --> J[阶段2: Translog 复制]
    I --> G
    J --> G

    G --> K[标记为 Started]

3.3 数据同步流程

主分片和副本分片之间的数据同步通过序列号机制保证:

  1. 全局检查点 (Global Checkpoint): 所有副本已确认的最大序列号
  2. 本地检查点 (Local Checkpoint): 当前分片已处理的最大序列号
  3. 最大序列号 (Max Seq No): 分片已分配的最大序列号

副本同步策略:

  • 同步复制: 主分片等待多数副本确认
  • 异步复制: 主分片不等待副本,后台异步同步
  • 主动同步: 定期同步全局检查点

4. 核心数据结构

4.1 索引服务与分片关系

classDiagram
    class IndexService {
        -IndexSettings indexSettings
        -MapperService mapperService
        -IndexCache indexCache
        -Map~Integer,IndexShard~ shards
        +createShard(routing) IndexShard
        +removeShard(shardId, reason)
        +getShard(shardId) IndexShard
        +refresh(source)
        +flush(request)
    }

    class IndexShard {
        -ShardId shardId
        -ShardRouting shardRouting
        -IndexShardState state
        -AtomicReference~Engine~ currentEngineRef
        -Store store
        -MapperService mapperService
        -ReplicationTracker replicationTracker
        +applyIndexOperationOnPrimary() IndexResult
        +applyIndexOperationOnReplica() IndexResult
        +applyDeleteOperationOnPrimary() DeleteResult
        +get(get) GetResult
        +refresh(source)
        +flush(request)
    }

    class Store {
        -Directory directory
        -ShardLock shardLock
        -IndexSettings indexSettings
        +getMetadata(commit) MetadataSnapshot
        +verify()
        +cleanupFiles(segmentInfos)
    }

    class Engine {
        <<interface>>
        +index(index) IndexResult
        +delete(delete) DeleteResult
        +get(get) GetResult
        +refresh(source)
        +flush(force, waitIfOngoing) FlushResult
        +acquireSearcher(source) Searcher
        +getTranslog() Translog
    }

    class InternalEngine {
        -LiveVersionMap versionMap
        -IndexWriter indexWriter
        -Translog translog
        -LocalCheckpointTracker localCheckpointTracker
        -EngineConfig config
        -AtomicLong lastRefreshTime
        +index(index) IndexResult
        +delete(delete) DeleteResult
        +refresh(source)
    }

    class ShardRouting {
        -ShardId shardId
        -String currentNodeId
        -boolean primary
        -ShardRoutingState state
        -RecoverySource recoverySource
        +active() boolean
        +assignedToNode() boolean
    }

    IndexService "1" --> "*" IndexShard : manages
    IndexShard "1" --> "1" Store : uses
    IndexShard "1" --> "1" Engine : uses
    Engine <|.. InternalEngine : implements
    IndexShard "1" --> "1" ShardRouting : routing info

4.2 版本映射与序列号跟踪

classDiagram
    class LiveVersionMap {
        -KeyedLock~BytesRef~ keyedLock
        -Maps maps
        -Map~BytesRef,DeleteVersionValue~ tombstones
        +getUnderLock(uid) VersionValue
        +putIndexUnderLock(uid, version)
        +putDeleteUnderLock(uid, version)
        +acquireLock(uid) Releasable
        +releaseLock(uid)
    }

    class VersionValue {
        <<abstract>>
        -long version
        -long seqNo
        -long term
        +isDelete() boolean
        +ramBytesUsed() long
    }

    class IndexVersionValue {
        +isDelete() false
    }

    class DeleteVersionValue {
        -long time
        +isDelete() true
    }

    class ReplicationTracker {
        -String shardAllocationId
        -long operationPrimaryTerm
        -long globalCheckpoint
        -Map~String,CheckpointState~ checkpoints
        -ReplicationGroup replicationGroup
        +getGlobalCheckpoint() long
        +updateFromMaster(shardRoutingTable)
        +initiateTracking(allocationId)
        +markAllocationIdAsInSync(allocationId)
    }

    class CheckpointState {
        -long localCheckpoint
        -long globalCheckpoint
        -boolean inSync
        -boolean tracked
        +getLocalCheckpoint() long
        +getGlobalCheckpoint() long
    }

    class LocalCheckpointTracker {
        -long checkpoint
        -LongObjectHashMap~CountedBitSet~ processedSeqNo
        -long maxSeqNo
        +generateSeqNo() long
        +markSeqNoAsProcessed(seqNo)
        +getProcessedCheckpoint() long
        +getMaxSeqNo() long
    }

    LiveVersionMap "1" --> "*" VersionValue : stores
    VersionValue <|-- IndexVersionValue
    VersionValue <|-- DeleteVersionValue
    ReplicationTracker "1" --> "*" CheckpointState : tracks replicas
    ReplicationTracker "1" --> "1" LocalCheckpointTracker : local checkpoint

4.3 存储引擎核心组件

classDiagram
    class EngineConfig {
        -ShardId shardId
        -IndexSettings indexSettings
        -Analyzer analyzer
        -Similarity similarity
        -CodecService codecService
        -TranslogConfig translogConfig
        -MergePolicy mergePolicy
        -MergeScheduler mergeScheduler
        -IndexWriterConfig indexWriterConfig
    }

    class Translog {
        -TranslogConfig config
        -Path location
        -List~BaseTranslogReader~ readers
        -TranslogWriter current
        -AtomicLong globalCheckpoint
        +add(operation) Location
        +readLocation(location) Operation
        +newSnapshot() Snapshot
        +rollGeneration()
        +trimUnreferencedReaders()
    }

    class TranslogWriter {
        -FileChannel channel
        -AtomicLong lastSyncedOffset
        -Checkpoint checkpoint
        +add(data) Location
        +sync()
        +closeIntoReader() TranslogReader
    }

    class Checkpoint {
        -long offset
        -int numOps
        -long generation
        -long minSeqNo
        -long maxSeqNo
        -long globalCheckpoint
        -long minTranslogGeneration
        -long trimmedAboveSeqNo
    }

    class Segment {
        -String name
        -long generation
        -boolean committed
        -boolean search
        -long sizeInBytes
        -int docCount
        -int delDocCount
        -Version version
        -Codec codec
        +ramBytesUsed() long
    }

    InternalEngine "1" --> "1" EngineConfig : config
    InternalEngine "1" --> "1" Translog : translog
    Translog "1" --> "1" TranslogWriter : current writer
    Translog "1" --> "*" Checkpoint : checkpoints
    InternalEngine "1" --> "*" Segment : segments

4.4 关键类与接口

IndexService

// IndexService 管理单个索引的所有分片
public class IndexService extends AbstractIndexComponent
    implements IndicesClusterStateService.AllocatedIndex<IndexShard> {

    // 索引元数据和配置
    private final IndexSettings indexSettings;
    private final MapperService mapperService;
    private final IndexCache indexCache;
    private final IndexFieldDataService indexFieldData;

    // 分片映射 shardId -> IndexShard
    private volatile Map<Integer, IndexShard> shards;

    // 创建新分片
    public synchronized IndexShard createShard(
        ShardRouting routing,
        GlobalCheckpointSyncer globalCheckpointSyncer,
        RetentionLeaseSyncer retentionLeaseSyncer
    ) throws IOException;

    // 移除分片
    public synchronized void removeShard(int shardId, String reason);

    // 获取分片
    public IndexShard getShard(int shardId);
}

IndexShard

// IndexShard 表示单个分片,是索引模块的核心类
public class IndexShard extends AbstractIndexShardComponent
    implements IndicesClusterStateService.Shard {

    // 分片路由信息
    protected volatile ShardRouting shardRouting;

    // 分片状态: CREATED/RECOVERING/POST_RECOVERY/STARTED/CLOSED
    protected volatile IndexShardState state;

    // 存储引擎引用
    private final AtomicReference<Engine> currentEngine;

    // 存储管理
    private final Store store;

    // 事务日志
    private final TranslogConfig translogConfig;

    // 索引操作 - 主分片
    public Engine.IndexResult applyIndexOperationOnPrimary(
        long version,
        VersionType versionType,
        SourceToParse sourceToParse,
        long ifSeqNo,
        long ifPrimaryTerm,
        long autoGeneratedTimestamp,
        boolean isRetry
    ) throws IOException;

    // 索引操作 - 副本分片
    public Engine.IndexResult applyIndexOperationOnReplica(
        long seqNo,
        long opPrimaryTerm,
        long version,
        long autoGeneratedTimeStamp,
        boolean isRetry,
        SourceToParse sourceToParse
    ) throws IOException;

    // 删除操作
    public Engine.DeleteResult applyDeleteOperationOnPrimary(
        long version,
        String id,
        VersionType versionType,
        long ifSeqNo,
        long ifPrimaryTerm
    ) throws IOException;

    // 获取文档
    public Engine.GetResult get(Engine.Get get);

    // Refresh 操作
    public void refresh(String source);

    // Flush 操作
    public void flush(FlushRequest request);

    // 强制合并
    public void forceMerge(ForceMergeRequest request);

    // 恢复分片
    public void startRecovery(
        RecoveryState recoveryState,
        PeerRecoveryTargetService.RecoveryListener recoveryListener,
        RepositoriesService repositoriesService
    );
}

Store

// Store 管理分片的 Lucene Directory 和文件
public class Store extends AbstractIndexShardComponent implements Closeable, RefCounted {

    // Lucene Directory
    private final Directory directory;

    // 分片锁,防止多个进程同时访问
    private final ShardLock shardLock;

    // 获取元数据快照
    public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException;

    // 验证索引完整性
    public void verify() throws IOException;

    // 检查索引
    public CheckIndex.Status checkIndex(PrintStream printStream) throws IOException;

    // 清理文件
    public void cleanupFiles(SegmentInfos segmentInfos) throws IOException;
}

Engine

// Engine 接口,定义存储引擎的核心操作
public interface Engine extends Closeable {

    // 索引文档
    IndexResult index(Index index) throws IOException;

    // 删除文档
    DeleteResult delete(Delete delete) throws IOException;

    // 获取文档
    GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory);

    // Refresh,使文档可搜索
    void refresh(String source) throws EngineException;

    // Flush,持久化到磁盘
    FlushResult flush(boolean force, boolean waitIfOngoing) throws EngineException;

    // 强制合并 Segment
    void forceMerge(
        boolean flush,
        int maxNumSegments,
        boolean onlyExpungeDeletes,
        String forceMergeUUID
    ) throws IOException;

    // 获取 Searcher
    Searcher acquireSearcher(String source, SearcherScope scope);

    // 获取 Translog
    Translog getTranslog();
}

5. 对外 API

5.1 API 清单

索引模块对外提供以下核心 API:

API 名称 HTTP 方法 路径 幂等性 说明
Index Document POST/PUT /{index}/_doc/{id} PUT 幂等, POST 非幂等 索引单个文档
Create Document PUT /{index}/_create/{id} 创建文档(id 存在则失败)
Update Document POST /{index}/_update/{id} 更新文档(支持部分更新)
Delete Document DELETE /{index}/_doc/{id} 删除文档
Get Document GET /{index}/_doc/{id} N/A 获取文档
Bulk API POST /_bulk 视操作而定 批量文档操作
Multi-Get POST /_mget N/A 批量获取文档
Reindex POST /_reindex 重建索引

5.2 Index Document API

基本信息

  • 名称: Index Document
  • 协议与方法:
    • HTTP PUT /{index}/_doc/{id} - 指定 ID,幂等
    • HTTP POST /{index}/_doc - 自动生成 ID,非幂等
  • 幂等性: PUT 请求幂等(相同 ID 覆盖),POST 请求非幂等(每次生成新 ID)
  • 幂等键策略: 通过 if_seq_no + if_primary_term 实现乐观并发控制

请求结构体

// IndexRequest - 索引文档请求
public class IndexRequest extends ReplicatedWriteRequest<IndexRequest>
    implements DocWriteRequest<IndexRequest> {

    // 文档 ID,不设置则自动生成
    private String id;

    // 路由键,用于计算文档所在分片,默认使用 id 作为路由键
    private String routing;

    // 文档源内容
    private final IndexSource indexSource;

    // 操作类型: INDEX(插入或更新) 或 CREATE(仅插入)
    private OpType opType = OpType.INDEX;

    // 版本号,用于乐观并发控制
    private long version = Versions.MATCH_ANY;
    private VersionType versionType = VersionType.INTERNAL;

    // 序列号并发控制(推荐方式)
    private long ifSeqNo = UNASSIGNED_SEQ_NO;
    private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;

    // Ingest Pipeline
    private String pipeline;

    // 是否必须是别名
    private boolean requireAlias;

    // 自动生成时间戳(用于 ID 冲突检测)
    private long autoGeneratedTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP;

    // 是否为重试请求
    private boolean isRetry = false;

    // 动态模板映射
    private Map<String, String> dynamicTemplates = Map.of();
}

请求字段表

字段 类型 必填 默认值 约束 说明
index string - - 索引名称
id string auto-generate 长度 ≤ 512 字节 文档 ID
routing string id - 路由键,决定分片分配
source object/string/bytes - 有效 JSON 文档内容
opType string index index|create 操作类型
version long - > 0 文档版本号(已废弃)
version_type string internal internal|external|external_gte 版本类型(已废弃)
if_seq_no long - ≥ 0 期望的序列号(用于并发控制)
if_primary_term long - > 0 期望的主分片任期号
pipeline string - - Ingest Pipeline 名称
refresh string false true|false|wait_for 何时刷新使文档可见
timeout duration 1m - 操作超时时间
wait_for_active_shards int|string 1 - 等待多少活跃分片
require_alias boolean false - 索引名必须是别名

响应结构体

// IndexResponse - 索引文档响应
public class IndexResponse extends DocWriteResponse implements StatusToXContentObject {

    // 文档 ID
    private String _id;

    // 文档版本号(每次更新递增)
    private long _version;

    // 序列号(全局唯一,单调递增)
    private long _seq_no;

    // 主分片任期号
    private long _primary_term;

    // 操作结果: created, updated, deleted, not_found, noop
    private Result result;

    // 分片副本确认信息
    private ReplicationResponse.ShardInfo _shards;
}

响应字段表

字段 类型 必填 说明
_index string 实际写入的索引名(可能是别名解析后)
_id string 文档 ID
_version long 文档版本号,每次更新递增
_seq_no long 序列号,分片内唯一且单调递增
_primary_term long 主分片任期号,分片迁移时递增
result string 操作结果: created(新建) 或 updated(更新)
_shards.total int 总分片数(1 + 副本数)
_shards.successful int 成功分片数
_shards.failed int 失败分片数

入口函数与核心代码

// TransportIndexAction - 索引操作的传输层动作
// 实际委托给 TransportBulkAction 处理单个文档
public class TransportIndexAction extends TransportSingleItemBulkWriteAction<IndexRequest, DocWriteResponse> {

    public static final String NAME = "indices:data/write/index";

    @Inject
    public TransportIndexAction(
        ActionFilters actionFilters,
        TransportService transportService,
        TransportBulkAction bulkAction
    ) {
        // 将单文档请求转换为 Bulk 请求处理
        super(NAME, transportService, actionFilters, IndexRequest::new, bulkAction);
    }
}

// TransportBulkAction - 批量操作核心处理类
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
    // 1. 解析请求,获取所有待处理的文档操作
    final long startTime = relativeTime();
    final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());

    // 2. 处理 Ingest Pipeline(如果配置)
    if (clusterService.localNode().isIngestNode()) {
        processBulkIndexIngestRequest(task, bulkRequest, executorName, listener);
    } else {
        executeBulk(task, bulkRequest, startTime, listener, responses);
    }
}

private void executeBulk(
    Task task,
    BulkRequest bulkRequest,
    long startTimeNanos,
    ActionListener<BulkResponse> listener,
    AtomicArray<BulkItemResponse> responses
) {
    // 1. 创建自动索引(如果不存在且允许自动创建)
    // 2. 按分片分组请求
    final Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();

    for (int i = 0; i < bulkRequest.requests.size(); i++) {
        DocWriteRequest<?> request = bulkRequest.requests.get(i);
        // 解析索引名(可能是别名/Data Stream)
        // 计算路由,确定目标分片
        ShardId shardId = clusterService.operationRouting().indexShards(
            clusterState,
            request.index(),
            request.id(),
            request.routing()
        ).shardId();

        requestsByShard.computeIfAbsent(shardId, k -> new ArrayList<>())
            .add(new BulkItemRequest(i, request));
    }

    // 3. 并发执行所有分片的批量操作
    if (requestsByShard.isEmpty()) {
        listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
        return;
    }

    final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
    for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
        final ShardId shardId = entry.getKey();
        final List<BulkItemRequest> requests = entry.getValue();

        BulkShardRequest bulkShardRequest = new BulkShardRequest(
            shardId,
            bulkRequest.getRefreshPolicy(),
            requests.toArray(new BulkItemRequest[0])
        );

        // 执行分片级别的批量操作
        shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
            @Override
            public void onResponse(BulkShardResponse bulkShardResponse) {
                // 收集响应
                for (BulkItemResponse itemResponse : bulkShardResponse.getResponses()) {
                    responses.set(itemResponse.getItemId(), itemResponse);
                }

                // 所有分片完成后返回
                if (counter.decrementAndGet() == 0) {
                    finalizeBulkRequest(task, bulkRequest, responses, startTimeNanos, listener);
                }
            }

            @Override
            public void onFailure(Exception e) {
                // 记录分片级别失败
                for (BulkItemRequest request : requests) {
                    responses.set(request.id(), new BulkItemResponse(request.id(),
                        request.request().opType(),
                        new BulkItemResponse.Failure(request.index(), request.id(), e)));
                }

                if (counter.decrementAndGet() == 0) {
                    finalizeBulkRequest(task, bulkRequest, responses, startTimeNanos, listener);
                }
            }
        });
    }
}

分片级别处理

// TransportShardBulkAction - 分片级别批量操作
protected void shardOperationOnPrimary(
    BulkShardRequest request,
    IndexShard primary,
    ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
) {
    // 在主分片上执行批量操作
    Executor executor = threadPool.executor(ThreadPool.Names.WRITE);
    executor.execute(new AbstractRunnable() {
        @Override
        protected void doRun() {
            // 1. 获取 Engine
            final Engine.IndexingMemoryController memoryController =
                indicesService.indexingMemoryController();

            // 2. 逐个处理文档操作
            BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
            for (int i = 0; i < request.items().length; i++) {
                BulkItemRequest item = request.items()[i];

                if (item.request() instanceof IndexRequest) {
                    IndexRequest indexRequest = (IndexRequest) item.request();

                    try {
                        // 调用 IndexShard 执行索引操作
                        Engine.IndexResult result = primary.applyIndexOperationOnPrimary(
                            indexRequest.version(),
                            indexRequest.versionType(),
                            new SourceToParse(
                                indexRequest.index(),
                                indexRequest.id(),
                                indexRequest.source(),
                                indexRequest.getContentType(),
                                indexRequest.routing()
                            ),
                            indexRequest.ifSeqNo(),
                            indexRequest.ifPrimaryTerm(),
                            indexRequest.getAutoGeneratedTimestamp(),
                            indexRequest.isRetry()
                        );

                        // 构建响应
                        if (result.getResultType() == Engine.Result.Type.SUCCESS) {
                            IndexResponse response = new IndexResponse(
                                primary.shardId(),
                                indexRequest.id(),
                                result.getSeqNo(),
                                result.getTerm(),
                                result.getVersion(),
                                result.isCreated()
                            );
                            responses[i] = new BulkItemResponse(i, indexRequest.opType(), response);
                        } else {
                            // 处理失败情况
                            responses[i] = new BulkItemResponse(i, indexRequest.opType(),
                                new BulkItemResponse.Failure(
                                    indexRequest.index(),
                                    indexRequest.id(),
                                    result.getFailure()
                                )
                            );
                        }
                    } catch (Exception e) {
                        responses[i] = new BulkItemResponse(i, indexRequest.opType(),
                            new BulkItemResponse.Failure(
                                indexRequest.index(),
                                indexRequest.id(),
                                e
                            )
                        );
                    }
                }
            }

            // 3. 返回主分片处理结果
            BulkShardResponse response = new BulkShardResponse(request.shardId(), responses);
            listener.onResponse(new PrimaryResult<>(request, response));
        }
    });
}

异常与回退

异常情况处理

  1. 版本冲突 (Version Conflict)

    • 触发条件: if_seq_noif_primary_term 不匹配
    • HTTP 状态码: 409 Conflict
    • 返回内容:
      {
        "error": {
          "type": "version_conflict_engine_exception",
          "reason": "[1]: version conflict, current version [2] is different than the one provided [1]"
        },
        "status": 409
      }
      
    • 客户端处理: 重新获取最新版本并重试
  2. 文档 ID 超长

    • 触发条件: ID 长度 > 512 字节
    • HTTP 状态码: 400 Bad Request
    • 返回内容: "Document id cannot be longer than 512 bytes"
  3. JSON 解析失败

    • 触发条件: source 不是有效 JSON
    • HTTP 状态码: 400 Bad Request
    • 影响范围: 仅当前文档失败,不影响 Bulk 中其他文档
  4. 主分片不可用

    • 触发条件: 主分片处于 RELOCATING 或 INITIALIZING 状态
    • HTTP 状态码: 503 Service Unavailable
    • 客户端处理: 重试(Transport 层自动重试)
  5. 磁盘空间不足

    • 触发条件: 磁盘使用率超过 cluster.routing.allocation.disk.watermark.flood_stage
    • HTTP 状态码: 429 Too Many Requests
    • 索引行为: 索引被标记为只读(index.blocks.read_only_allow_delete)
  6. Mapping 冲突

    • 触发条件: 字段类型与现有 Mapping 不兼容
    • HTTP 状态码: 400 Bad Request
    • 返回内容: "mapper_parsing_exception"

重试策略

  • 自动重试: Transport 层自动重试主分片不可用、超时等临时性错误
  • 重试次数: 默认无限制,直到超时
  • 退避策略: 指数退避,初始 50ms,最大 500ms
  • 幂等性: PUT 请求幂等,可安全重试;POST 请求非幂等,重试可能产生重复文档

最佳实践

1. 乐观并发控制

使用 if_seq_no + if_primary_term 替代已废弃的 version:

# 1. 获取文档当前版本
GET /products/_doc/1

# Response:
# {
#   "_seq_no": 5,
#   "_primary_term": 1,
#   ...
# }

# 2. 基于获取到的版本更新
PUT /products/_doc/1?if_seq_no=5&if_primary_term=1
{
  "name": "Updated Product",
  "price": 199.99
}

2. 批量索引优化

使用 Bulk API 而非单个 Index API:

# 批量索引(推荐)
POST /_bulk
{"index":{"_index":"products","_id":"1"}}
{"name":"Product 1","price":99.99}
{"index":{"_index":"products","_id":"2"}}
{"name":"Product 2","price":199.99}

# 单个索引(不推荐)
PUT /products/_doc/1
{"name":"Product 1","price":99.99}
PUT /products/_doc/2
{"name":"Product 2","price":199.99}

性能对比:

  • 批量索引: 10,000-50,000 docs/s
  • 单个索引: 1,000-5,000 docs/s

3. 控制刷新策略

# 实时可见(性能最差)
PUT /products/_doc/1?refresh=true
{"name":"Product"}

# 等待刷新完成(性能较差)
PUT /products/_doc/1?refresh=wait_for
{"name":"Product"}

# 不刷新(性能最好,默认)
PUT /products/_doc/1
{"name":"Product"}

4. 路由优化

为相关文档使用相同路由,减少搜索时需要查询的分片数:

# 将同一用户的文档路由到同一分片
PUT /orders/_doc/order1?routing=user123
{"user_id":"user123","product":"laptop"}

PUT /orders/_doc/order2?routing=user123
{"user_id":"user123","product":"mouse"}

# 搜索时指定路由,只查询一个分片
GET /orders/_search?routing=user123
{
  "query": {"term": {"user_id": "user123"}}
}

5. 处理大文档

# 禁用 _source 存储(不需要返回原始文档时)
PUT /logs
{
  "mappings": {
    "_source": {"enabled": false}
  }
}

# 或者仅存储部分字段
PUT /logs
{
  "mappings": {
    "_source": {
      "includes": ["timestamp", "message"],
      "excludes": ["large_field"]
    }
  }
}

6. 核心流程时序图

6.1 文档索引完整流程

主分片索引操作

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant Primary as IndexShard (Primary)
    participant Mapper as MapperService
    participant Engine as InternalEngine
    participant VersionMap as LiveVersionMap
    participant Checkpoint as LocalCheckpointTracker
    participant Lucene as Lucene IndexWriter
    participant Translog as Translog
    participant Replicas as Replica Shards

    Client->>Primary: IndexRequest(doc, id)

    Primary->>Primary: 1. checkWriteAllowed()<br/>检查分片状态

    Primary->>Mapper: 2. parse(source)<br/>解析文档
    Mapper->>Mapper: 解析 JSON,构建 Lucene Document
    Mapper->>Mapper: 检查 Mapping 兼容性

    alt Mapping 需要更新
        Mapper-->>Primary: 返回 DynamicMappingsUpdate
        Primary-->>Client: 需要更新 Mapping<br/>(由上层处理)
    else Mapping 兼容
        Mapper-->>Primary: ParsedDocument
    end

    Primary->>Engine: 3. index(operation)

    Engine->>VersionMap: 4. acquireLock(uid)<br/>获取文档锁
    VersionMap-->>Engine: Releasable lock

    Engine->>VersionMap: 5. getUnderLock(uid)<br/>获取当前版本
    VersionMap-->>Engine: VersionValue (if exists)

    alt 版本冲突
        Engine->>Engine: 检查 ifSeqNo/ifPrimaryTerm
        Engine-->>Primary: VersionConflictException
        Primary-->>Client: 409 Conflict
    else 无冲突
        Engine->>Checkpoint: 6. generateSeqNo()<br/>生成序列号
        Checkpoint-->>Engine: seqNo=N

        Engine->>VersionMap: 7. putIndexUnderLock(uid, version)
        VersionMap->>VersionMap: 更新 current map

        alt 文档不存在
            Engine->>Lucene: 8a. addDocument(doc)
        else 文档已存在
            Engine->>Lucene: 8b. updateDocument(uid, doc)
        end
        Lucene-->>Engine: success

        Engine->>Translog: 9. add(operation, seqNo)
        Translog->>Translog: 序列化操作
        Translog->>Translog: 写入文件

        alt Durability = REQUEST
            Translog->>Translog: fsync()
        end
        Translog-->>Engine: Location

        Engine->>Checkpoint: 10. markSeqNoAsProcessed(seqNo)
        Checkpoint->>Checkpoint: 更新 checkpoint
        Checkpoint-->>Engine: success

        Engine->>VersionMap: 11. releaseLock(uid)

        Engine-->>Primary: IndexResult<br/>(created/updated, seqNo, version)
    end

    Primary->>Replicas: 12. ReplicationRequest<br/>(doc, seqNo, primaryTerm)

    loop 每个副本
        Replicas->>Replicas: applyIndexOperationOnReplica()
        Replicas->>Engine: index(doc, seqNo)
        Engine->>Lucene: addDocument/updateDocument
        Engine->>Translog: add(operation)
        Replicas-->>Primary: ACK
    end

    Primary->>Primary: 13. waitForActiveShards()<br/>等待多数副本确认

    Primary-->>Client: IndexResponse<br/>(id, version, seqNo, result)

时序图说明

阶段划分:

  1. 预处理阶段(步骤 1-2): 检查分片状态,解析文档
  2. 版本控制阶段(步骤 3-7): 获取锁,检查版本,生成序列号
  3. 持久化阶段(步骤 8-10): 写入 Lucene 和 Translog,更新检查点
  4. 复制阶段(步骤 11-13): 并行复制到副本分片

关键点:

  • 文档锁: 使用 UID 锁防止同一文档并发写入
  • 序列号: 单调递增,保证操作顺序
  • 双写: 同时写入 Lucene 和 Translog
  • 异步复制: 主分片不等待副本,后台异步复制

性能考虑:

  • VersionMap 查找: O(1)
  • Lucene 写入: O(log N) for term dictionary
  • Translog 写入: O(1) append-only
  • 副本复制: 并行执行,延迟取决于最慢副本

6.2 分片恢复流程

副本分片从主分片恢复

sequenceDiagram
    autonumber
    participant Master as Master Node
    participant Primary as Primary Shard
    participant Replica as Replica Shard (Target)
    participant Store as Store (Target)
    participant Engine as Engine (Target)
    participant Translog as Translog (Target)

    Master->>Replica: 分配副本分片到节点
    Replica->>Replica: 创建 IndexShard<br/>state = RECOVERING

    Replica->>Primary: StartRecoveryRequest
    Primary->>Primary: 1. prepareForTranslogOperations()<br/>准备发送 Translog

    Primary->>Primary: 2. 获取 Retention Lease<br/>防止 Translog 被清理

    Note over Primary,Replica: Phase 1: 文件复制(Segment files)

    Primary->>Primary: 3. 创建 Lucene snapshot<br/>获取当前 Segment 列表
    Primary->>Replica: FileChunkRequest<br/>(segment files metadata)

    loop 每个缺失的 Segment 文件
        Primary->>Replica: FileChunkRequest<br/>(file data, offset, length)
        Replica->>Store: writeFileChunk(data)
        Store-->>Replica: success
    end

    Primary-->>Replica: Phase 1 完成<br/>(seqNo snapshot)

    Note over Primary,Replica: Phase 2: Translog 复制

    Primary->>Primary: 4. 获取 Translog snapshot<br/>(startSeqNo = recovery start)

    loop 批量发送 Translog 操作
        Primary->>Replica: TranslogOperationsRequest<br/>(operations[], maxSeqNo)

        loop 每个操作
            Replica->>Engine: applyIndexOperationOnReplica(op)
            Engine->>Engine: 按 seqNo 顺序应用
            Engine-->>Replica: success
        end

        Replica-->>Primary: ACK (localCheckpoint)
    end

    Primary->>Replica: FinalizeRecoveryRequest<br/>(globalCheckpoint)

    Replica->>Replica: 5. finalizeRecovery()
    Replica->>Engine: updateGlobalCheckpointOnReplica()
    Replica->>Translog: trimUnreferencedReaders()<br/>清理旧 Translog

    Replica->>Replica: 6. changeState(RECOVERING → POST_RECOVERY)
    Replica->>Replica: 7. changeState(POST_RECOVERY → STARTED)

    Replica->>Primary: RecoveryResponse<br/>(success, localCheckpoint)

    Primary->>Primary: 8. markAllocationIdAsInSync()<br/>标记副本为 in-sync

    Primary-->>Master: 副本恢复完成

时序图说明

恢复阶段:

  1. Phase 1 - 文件复制:

    • 目标: 复制 Lucene Segment 文件
    • 方式: 增量复制(仅复制缺失/变更的文件)
    • 耗时: 取决于数据量和网络带宽
  2. Phase 2 - Translog 复制:

    • 目标: 回放 Phase 1 期间的新操作
    • 方式: 批量发送 Translog 操作(默认 512KB/批)
    • 耗时: 取决于 Translog 大小

Retention Lease:

  • 主分片为恢复中的副本创建 Retention Lease
  • 防止恢复期间所需的 Translog 被清理
  • 恢复完成后释放 Lease

状态转换:

INITIALIZING → RECOVERING → POST_RECOVERY → STARTED

性能优化:

  • 使用增量复制,只传输差异文件
  • 并行传输多个文件
  • 压缩 Translog 操作
  • 批量应用 Translog 操作

6.3 Refresh 流程

Refresh 使文档可搜索

sequenceDiagram
    autonumber
    participant Scheduler as Refresh Scheduler
    participant Shard as IndexShard
    participant Engine as InternalEngine
    participant Lucene as IndexWriter
    participant VersionMap as LiveVersionMap
    participant Searcher as SearcherManager

    Scheduler->>Shard: refresh(source="scheduled")

    Shard->>Engine: refresh(source, scope)

    Engine->>Engine: 1. 检查是否需要 Refresh<br/>(lastRefreshTime, dirty flag)

    alt 无新写入
        Engine-->>Shard: 跳过 Refresh
    else 有新写入
        Engine->>Lucene: 2. IndexWriter.flush()<br/>(不 commit,仅生成 Segment)

        Lucene->>Lucene: 将 IndexBuffer 写入新 Segment
        Lucene->>Lucene: 生成 .cfs/.cfe/.si 文件
        Lucene-->>Engine: success

        Engine->>Searcher: 3. SearcherManager.maybeRefresh()
        Searcher->>Searcher: 打开新的 IndexReader
        Searcher->>Searcher: 触发 RefreshListener
        Searcher-->>Engine: 新 Searcher

        Engine->>VersionMap: 4. beforeRefresh()
        VersionMap->>VersionMap: current → old<br/>创建新的 empty current

        Engine->>Engine: 5. 更新 lastRefreshTime

        Engine->>VersionMap: 6. afterRefresh()
        VersionMap->>VersionMap: 清空 old map<br/>释放内存

        Engine-->>Shard: RefreshResult
    end

    Shard->>Shard: 7. 触发 RefreshListener<br/>(通知等待 Refresh 的请求)

    Shard-->>Scheduler: success

时序图说明

Refresh 触发时机:

  1. 定时触发: 默认 1s 一次(可配置 index.refresh_interval)
  2. 手动触发: 调用 _refresh API
  3. Flush 时触发: Flush 操作会先执行 Refresh

Refresh vs Flush:

操作 Refresh Flush
目的 使文档可搜索 持久化到磁盘
操作 生成新 Segment Commit Segment + 清理 Translog
频率 1s(默认) 30min 或 Translog 满
耗时 快(< 100ms) 慢(> 1s)
影响 增加 Segment 数量 减少 Segment 数量(后续 Merge)

VersionMap 双缓冲:

  • beforeRefresh(): current 变为 old,创建新 current
  • afterRefresh(): 清空 old,释放内存
  • 作用: Refresh 期间新写入的文档保存在新 current,旧文档在 old 中仍可读

性能影响:

  • Refresh 会产生新的小 Segment
  • 过多的小 Segment 会影响搜索性能
  • 需要 Merge 合并小 Segment

优化建议:

  • 写入密集型场景: 增大 refresh_interval 到 30s-60s
  • 实时搜索需求: 保持默认 1s
  • 批量导入: 设置 refresh_interval=-1,完成后手动 Refresh

7. 配置与调优

7.1 索引级别配置

# 分片和副本配置
index.number_of_shards: 5              # 主分片数,创建后不可修改
index.number_of_replicas: 1            # 副本数,可动态修改

# Refresh 配置
index.refresh_interval: 1s             # Refresh 间隔,默认 1s
index.max_refresh_listeners: 1000      # 最大 Refresh 监听器数量

# Translog 配置
index.translog.durability: request     # request: 每次请求 fsync, async: 异步 fsync
index.translog.sync_interval: 5s       # 异步 fsync 间隔
index.translog.flush_threshold_size: 512mb  # Translog 大小阈值,触发 Flush

# Merge 配置
index.merge.scheduler.max_thread_count: 1   # 合并线程数
index.merge.policy.max_merged_segment: 5gb  # 最大 Segment 大小

# 缓存配置
index.queries.cache.enabled: true      # 查询缓存开关
index.requests.cache.enable: true      # 请求缓存开关

# 存储配置
index.store.type: fs                   # 存储类型: fs, niofs, mmapfs
index.codec: default                   # 编解码器: default, best_compression

7.2 性能调优建议

写入性能优化

1. 增大 Refresh 间隔

PUT /my-index/_settings
{
  "index.refresh_interval": "30s"
}

2. 使用异步 Translog

PUT /my-index/_settings
{
  "index.translog.durability": "async",
  "index.translog.sync_interval": "5s"
}

3. 禁用 Replica(初始加载)

PUT /my-index/_settings
{
  "index.number_of_replicas": 0
}

4. 增大 IndexBuffer

indices.memory.index_buffer_size: 20%

查询性能优化

1. 启用查询缓存

PUT /my-index/_settings
{
  "index.queries.cache.enabled": true
}

2. 使用 Filter 而非 Query

{
  "query": {
    "bool": {
      "filter": [
        { "term": { "status": "published" } }
      ]
    }
  }
}

3. 控制 Segment 数量

POST /my-index/_forcemerge?max_num_segments=1

存储优化

1. 压缩编解码器

PUT /my-index/_settings
{
  "index.codec": "best_compression"
}

2. 禁用 _source(不需要原始文档时)

PUT /my-index
{
  "mappings": {
    "_source": {
      "enabled": false
    }
  }
}

8. 监控与故障排查

8.1 关键指标

# 索引统计
GET /_stats/indexing,search,store,merge,refresh

# 分片信息
GET /_cat/shards?v&h=index,shard,prirep,state,docs,store,node

# 索引健康
GET /_cluster/health?level=indices

# 恢复状态
GET /_cat/recovery?v&active_only=true

8.2 常见问题

1. 索引速度慢

  • 检查 Refresh 间隔是否过短
  • 检查是否开启同步 Translog fsync
  • 检查 Merge 是否跟不上写入速度
  • 检查磁盘 I/O 是否饱和

2. 分片不可用

  • 检查节点是否下线
  • 检查磁盘空间是否充足
  • 检查 Translog 是否损坏
  • 查看 Elasticsearch 日志

3. 副本同步延迟

  • 检查网络延迟
  • 检查副本节点负载
  • 检查 Translog 大小
  • 检查全局检查点差异