Elasticsearch-02-索引模块
本文档提供索引模块的全面剖析,包括模块职责、架构设计、核心数据结构、对外API详细规格、关键流程时序图、配置调优和故障排查。
1. 模块职责
索引模块负责 Elasticsearch 中索引的创建、管理和文档操作,是数据写入路径的核心模块。主要职责包括:
- 索引生命周期管理: 索引的创建、打开、关闭和删除
- 分片管理: 管理索引的主分片和副本分片
- 文档 CRUD 操作: 文档的创建、读取、更新和删除
- Mapping 管理: 管理字段映射和动态映射更新
- 版本控制: 基于版本号和序列号的并发控制
- 数据持久化: 通过存储引擎和 Translog 保证数据持久性
输入/输出
输入:
- 索引请求 (Index/Update/Delete/Bulk)
- 集群状态变更 (新分片分配、分片迁移)
- 恢复请求 (从副本或快照恢复)
输出:
- 索引操作结果 (成功/失败, 版本号, 序列号)
- 分片状态变更事件
- 统计信息 (索引速率、文档数、存储大小)
上下游依赖
上游调用方:
- Action Module (TransportIndexAction, TransportBulkAction)
- Search Module (需要读取索引数据)
- Cluster Module (集群状态变更触发分片操作)
下游被调用方:
- Storage Engine (InternalEngine, ReadOnlyEngine)
- Apache Lucene (文档索引和搜索)
- Translog (操作日志持久化)
- Mapper Service (文档解析和映射)
生命周期
stateDiagram-v2
[*] --> CREATED: IndexService 创建
CREATED --> RECOVERING: 分片恢复开始
RECOVERING --> POST_RECOVERY: 恢复完成
POST_RECOVERY --> STARTED: 分片启动
STARTED --> RELOCATING: 分片迁移
RELOCATING --> STARTED: 迁移完成
STARTED --> CLOSED: 索引关闭
CLOSED --> STARTED: 索引重新打开
STARTED --> [*]: 索引删除
CLOSED --> [*]: 索引删除
状态说明:
- CREATED: IndexService 创建,尚未创建分片
- RECOVERING: 分片正在恢复(从副本、快照或 Translog)
- POST_RECOVERY: 恢复完成,执行恢复后操作
- STARTED: 分片已启动,可以处理读写请求
- RELOCATING: 分片正在迁移到其他节点
- CLOSED: 索引已关闭,不占用内存资源
- 删除: 索引及其所有数据被删除
2. 模块架构图
flowchart TB
subgraph IndexModule["索引模块"]
direction TB
subgraph IndexService["IndexService 索引服务"]
IS[IndexService<br/>索引实例管理]
IM[IndexModule<br/>索引模块配置]
IC[IndexCache<br/>缓存管理]
IFD[IndexFieldDataService<br/>FieldData 服务]
end
subgraph Shards["分片管理"]
Shard1[IndexShard<br/>主分片 0]
Shard2[IndexShard<br/>副本分片 0]
ShardN[IndexShard<br/>分片 N]
end
subgraph ShardComponents["分片组件"]
Store[Store<br/>存储管理]
Engine[Engine<br/>存储引擎]
Translog[Translog<br/>事务日志]
Mapper[MapperService<br/>映射服务]
Codec[CodecService<br/>编解码服务]
end
subgraph Recovery["恢复机制"]
RecTarget[RecoveryTarget<br/>恢复目标]
RecSource[RecoverySource<br/>恢复源]
RecState[RecoveryState<br/>恢复状态]
end
subgraph Operations["操作处理"]
IndexOp[Index Operation<br/>索引操作]
DeleteOp[Delete Operation<br/>删除操作]
GetOp[Get Operation<br/>查询操作]
BulkOp[Bulk Operation<br/>批量操作]
end
end
subgraph External["外部依赖"]
Lucene[Apache Lucene<br/>全文检索库]
ClusterSvc[Cluster Service<br/>集群服务]
TransportSvc[Transport Service<br/>传输服务]
end
IS --> IM
IS --> IC
IS --> IFD
IS --> Shard1
IS --> Shard2
IS --> ShardN
Shard1 --> Store
Shard1 --> Engine
Shard1 --> Translog
Shard1 --> Mapper
Shard1 --> Codec
Shard1 --> RecTarget
Shard2 --> RecSource
RecTarget --> RecState
IndexOp --> Shard1
DeleteOp --> Shard1
GetOp --> Shard1
BulkOp --> Shard1
Engine --> Lucene
IS --> ClusterSvc
Shard1 --> TransportSvc
style IndexModule fill:#E1F5E1
style IndexService fill:#FFF4E1
style Shards fill:#E1E1F5
style ShardComponents fill:#FFE1E1
style Recovery fill:#F5E1FF
style Operations fill:#E1F5FF
架构说明
组件职责
-
IndexService (索引服务)
- 管理索引级别的配置和元数据
- 创建和管理多个 IndexShard 实例
- 提供索引级别的缓存和 FieldData 服务
- 协调索引级别的操作(Refresh, Flush, ForceMerge)
-
IndexShard (索引分片)
- 管理单个分片的完整生命周期
- 处理文档的 CRUD 操作
- 管理分片级别的存储引擎(Engine)
- 维护分片的路由信息和状态
- 处理分片恢复和副本复制
-
Store (存储管理)
- 管理 Lucene 的 Directory 和文件
- 提供分片级别的文件锁
- 检查和修复损坏的索引文件
-
Engine (存储引擎)
- 封装 Lucene IndexWriter 和 IndexReader
- 管理文档的索引和删除操作
- 维护 VersionMap(文档版本映射)
- 控制 Refresh、Flush 和 Merge 操作
-
Translog (事务日志)
- 记录所有未持久化的操作
- 保证数据持久性和故障恢复
- 支持序列号检查点机制
边界条件
-
并发控制
- 同一文档的写操作通过 UID 锁串行化
- Refresh 和 Flush 通过读写锁协调
- 分片状态变更通过 mutex 保护
-
资源限制
- IndexBuffer 默认占用 10% 堆内存
- Translog 大小影响恢复时间,默认 512MB 触发 Flush
- 分片大小建议控制在 10-50GB
-
超时控制
- 索引操作默认超时 1 分钟
- 分片恢复超时取决于数据量
- Refresh 操作尽力而为,不阻塞写入
异常处理与回退
-
文档级别异常
- 解析错误: 返回失败,不影响其他文档
- 版本冲突: 返回 409,客户端重试
- Mapping 更新失败: 返回需要更新 Mapping
-
分片级别异常
- 写入失败: 标记分片失败,触发重新分配
- Translog 损坏: 分片进入失败状态,从副本恢复
- 磁盘空间不足: 触发只读索引块
-
索引级别异常
- 所有分片不可用: 索引状态为 Red
- 部分分片不可用: 索引状态为 Yellow
性能优化点
- 批量操作: 使用 Bulk API 减少网络往返和锁竞争
- Refresh 间隔: 增大 refresh_interval 减少 Segment 生成
- Translog 异步: 配置 async fsync 提升吞吐量
- 禁用 _source: 不需要原始文档时禁用 _source
- 并发写入: 增加 index_buffer_size 和 写入线程池
可观测性
- 索引速率: docs/s, 监控写入吞吐量
- 索引延迟: ms, P99 延迟指标
- Refresh 时间: ms, Refresh 操作耗时
- Merge 时间: ms, Segment 合并耗时
- Translog 大小: bytes, 未 Flush 的操作日志大小
- 分片状态: STARTED/RECOVERING/RELOCATING/FAILED
3. 核心数据流
3.1 文档索引流程
flowchart LR
A[Index Request] --> B{路由计算}
B --> C[主分片]
C --> D[解析文档]
D --> E{Mapping 更新?}
E -->|是| F[更新 Mapping]
E -->|否| G[执行索引]
F --> G
G --> H[写入 Engine]
H --> I[更新 VersionMap]
I --> J[写入 Lucene]
J --> K[写入 Translog]
K --> L[复制到副本]
L --> M[返回响应]
3.2 分片恢复流程
flowchart TB
A[分片分配] --> B{恢复类型}
B -->|EmptyStore| C[创建空分片]
B -->|ExistingStore| D[从本地恢复]
B -->|Peer| E[从副本恢复]
B -->|Snapshot| F[从快照恢复]
C --> G[Translog 回放]
D --> G
E --> H[阶段1: 文件复制]
F --> I[从仓库下载]
H --> J[阶段2: Translog 复制]
I --> G
J --> G
G --> K[标记为 Started]
3.3 数据同步流程
主分片和副本分片之间的数据同步通过序列号机制保证:
- 全局检查点 (Global Checkpoint): 所有副本已确认的最大序列号
- 本地检查点 (Local Checkpoint): 当前分片已处理的最大序列号
- 最大序列号 (Max Seq No): 分片已分配的最大序列号
副本同步策略:
- 同步复制: 主分片等待多数副本确认
- 异步复制: 主分片不等待副本,后台异步同步
- 主动同步: 定期同步全局检查点
4. 核心数据结构
4.1 索引服务与分片关系
classDiagram
class IndexService {
-IndexSettings indexSettings
-MapperService mapperService
-IndexCache indexCache
-Map~Integer,IndexShard~ shards
+createShard(routing) IndexShard
+removeShard(shardId, reason)
+getShard(shardId) IndexShard
+refresh(source)
+flush(request)
}
class IndexShard {
-ShardId shardId
-ShardRouting shardRouting
-IndexShardState state
-AtomicReference~Engine~ currentEngineRef
-Store store
-MapperService mapperService
-ReplicationTracker replicationTracker
+applyIndexOperationOnPrimary() IndexResult
+applyIndexOperationOnReplica() IndexResult
+applyDeleteOperationOnPrimary() DeleteResult
+get(get) GetResult
+refresh(source)
+flush(request)
}
class Store {
-Directory directory
-ShardLock shardLock
-IndexSettings indexSettings
+getMetadata(commit) MetadataSnapshot
+verify()
+cleanupFiles(segmentInfos)
}
class Engine {
<<interface>>
+index(index) IndexResult
+delete(delete) DeleteResult
+get(get) GetResult
+refresh(source)
+flush(force, waitIfOngoing) FlushResult
+acquireSearcher(source) Searcher
+getTranslog() Translog
}
class InternalEngine {
-LiveVersionMap versionMap
-IndexWriter indexWriter
-Translog translog
-LocalCheckpointTracker localCheckpointTracker
-EngineConfig config
-AtomicLong lastRefreshTime
+index(index) IndexResult
+delete(delete) DeleteResult
+refresh(source)
}
class ShardRouting {
-ShardId shardId
-String currentNodeId
-boolean primary
-ShardRoutingState state
-RecoverySource recoverySource
+active() boolean
+assignedToNode() boolean
}
IndexService "1" --> "*" IndexShard : manages
IndexShard "1" --> "1" Store : uses
IndexShard "1" --> "1" Engine : uses
Engine <|.. InternalEngine : implements
IndexShard "1" --> "1" ShardRouting : routing info
4.2 版本映射与序列号跟踪
classDiagram
class LiveVersionMap {
-KeyedLock~BytesRef~ keyedLock
-Maps maps
-Map~BytesRef,DeleteVersionValue~ tombstones
+getUnderLock(uid) VersionValue
+putIndexUnderLock(uid, version)
+putDeleteUnderLock(uid, version)
+acquireLock(uid) Releasable
+releaseLock(uid)
}
class VersionValue {
<<abstract>>
-long version
-long seqNo
-long term
+isDelete() boolean
+ramBytesUsed() long
}
class IndexVersionValue {
+isDelete() false
}
class DeleteVersionValue {
-long time
+isDelete() true
}
class ReplicationTracker {
-String shardAllocationId
-long operationPrimaryTerm
-long globalCheckpoint
-Map~String,CheckpointState~ checkpoints
-ReplicationGroup replicationGroup
+getGlobalCheckpoint() long
+updateFromMaster(shardRoutingTable)
+initiateTracking(allocationId)
+markAllocationIdAsInSync(allocationId)
}
class CheckpointState {
-long localCheckpoint
-long globalCheckpoint
-boolean inSync
-boolean tracked
+getLocalCheckpoint() long
+getGlobalCheckpoint() long
}
class LocalCheckpointTracker {
-long checkpoint
-LongObjectHashMap~CountedBitSet~ processedSeqNo
-long maxSeqNo
+generateSeqNo() long
+markSeqNoAsProcessed(seqNo)
+getProcessedCheckpoint() long
+getMaxSeqNo() long
}
LiveVersionMap "1" --> "*" VersionValue : stores
VersionValue <|-- IndexVersionValue
VersionValue <|-- DeleteVersionValue
ReplicationTracker "1" --> "*" CheckpointState : tracks replicas
ReplicationTracker "1" --> "1" LocalCheckpointTracker : local checkpoint
4.3 存储引擎核心组件
classDiagram
class EngineConfig {
-ShardId shardId
-IndexSettings indexSettings
-Analyzer analyzer
-Similarity similarity
-CodecService codecService
-TranslogConfig translogConfig
-MergePolicy mergePolicy
-MergeScheduler mergeScheduler
-IndexWriterConfig indexWriterConfig
}
class Translog {
-TranslogConfig config
-Path location
-List~BaseTranslogReader~ readers
-TranslogWriter current
-AtomicLong globalCheckpoint
+add(operation) Location
+readLocation(location) Operation
+newSnapshot() Snapshot
+rollGeneration()
+trimUnreferencedReaders()
}
class TranslogWriter {
-FileChannel channel
-AtomicLong lastSyncedOffset
-Checkpoint checkpoint
+add(data) Location
+sync()
+closeIntoReader() TranslogReader
}
class Checkpoint {
-long offset
-int numOps
-long generation
-long minSeqNo
-long maxSeqNo
-long globalCheckpoint
-long minTranslogGeneration
-long trimmedAboveSeqNo
}
class Segment {
-String name
-long generation
-boolean committed
-boolean search
-long sizeInBytes
-int docCount
-int delDocCount
-Version version
-Codec codec
+ramBytesUsed() long
}
InternalEngine "1" --> "1" EngineConfig : config
InternalEngine "1" --> "1" Translog : translog
Translog "1" --> "1" TranslogWriter : current writer
Translog "1" --> "*" Checkpoint : checkpoints
InternalEngine "1" --> "*" Segment : segments
4.4 关键类与接口
IndexService
// IndexService 管理单个索引的所有分片
public class IndexService extends AbstractIndexComponent
implements IndicesClusterStateService.AllocatedIndex<IndexShard> {
// 索引元数据和配置
private final IndexSettings indexSettings;
private final MapperService mapperService;
private final IndexCache indexCache;
private final IndexFieldDataService indexFieldData;
// 分片映射 shardId -> IndexShard
private volatile Map<Integer, IndexShard> shards;
// 创建新分片
public synchronized IndexShard createShard(
ShardRouting routing,
GlobalCheckpointSyncer globalCheckpointSyncer,
RetentionLeaseSyncer retentionLeaseSyncer
) throws IOException;
// 移除分片
public synchronized void removeShard(int shardId, String reason);
// 获取分片
public IndexShard getShard(int shardId);
}
IndexShard
// IndexShard 表示单个分片,是索引模块的核心类
public class IndexShard extends AbstractIndexShardComponent
implements IndicesClusterStateService.Shard {
// 分片路由信息
protected volatile ShardRouting shardRouting;
// 分片状态: CREATED/RECOVERING/POST_RECOVERY/STARTED/CLOSED
protected volatile IndexShardState state;
// 存储引擎引用
private final AtomicReference<Engine> currentEngine;
// 存储管理
private final Store store;
// 事务日志
private final TranslogConfig translogConfig;
// 索引操作 - 主分片
public Engine.IndexResult applyIndexOperationOnPrimary(
long version,
VersionType versionType,
SourceToParse sourceToParse,
long ifSeqNo,
long ifPrimaryTerm,
long autoGeneratedTimestamp,
boolean isRetry
) throws IOException;
// 索引操作 - 副本分片
public Engine.IndexResult applyIndexOperationOnReplica(
long seqNo,
long opPrimaryTerm,
long version,
long autoGeneratedTimeStamp,
boolean isRetry,
SourceToParse sourceToParse
) throws IOException;
// 删除操作
public Engine.DeleteResult applyDeleteOperationOnPrimary(
long version,
String id,
VersionType versionType,
long ifSeqNo,
long ifPrimaryTerm
) throws IOException;
// 获取文档
public Engine.GetResult get(Engine.Get get);
// Refresh 操作
public void refresh(String source);
// Flush 操作
public void flush(FlushRequest request);
// 强制合并
public void forceMerge(ForceMergeRequest request);
// 恢复分片
public void startRecovery(
RecoveryState recoveryState,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RepositoriesService repositoriesService
);
}
Store
// Store 管理分片的 Lucene Directory 和文件
public class Store extends AbstractIndexShardComponent implements Closeable, RefCounted {
// Lucene Directory
private final Directory directory;
// 分片锁,防止多个进程同时访问
private final ShardLock shardLock;
// 获取元数据快照
public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException;
// 验证索引完整性
public void verify() throws IOException;
// 检查索引
public CheckIndex.Status checkIndex(PrintStream printStream) throws IOException;
// 清理文件
public void cleanupFiles(SegmentInfos segmentInfos) throws IOException;
}
Engine
// Engine 接口,定义存储引擎的核心操作
public interface Engine extends Closeable {
// 索引文档
IndexResult index(Index index) throws IOException;
// 删除文档
DeleteResult delete(Delete delete) throws IOException;
// 获取文档
GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory);
// Refresh,使文档可搜索
void refresh(String source) throws EngineException;
// Flush,持久化到磁盘
FlushResult flush(boolean force, boolean waitIfOngoing) throws EngineException;
// 强制合并 Segment
void forceMerge(
boolean flush,
int maxNumSegments,
boolean onlyExpungeDeletes,
String forceMergeUUID
) throws IOException;
// 获取 Searcher
Searcher acquireSearcher(String source, SearcherScope scope);
// 获取 Translog
Translog getTranslog();
}
5. 对外 API
5.1 API 清单
索引模块对外提供以下核心 API:
| API 名称 | HTTP 方法 | 路径 | 幂等性 | 说明 |
|---|---|---|---|---|
| Index Document | POST/PUT | /{index}/_doc/{id} |
PUT 幂等, POST 非幂等 | 索引单个文档 |
| Create Document | PUT | /{index}/_create/{id} |
是 | 创建文档(id 存在则失败) |
| Update Document | POST | /{index}/_update/{id} |
否 | 更新文档(支持部分更新) |
| Delete Document | DELETE | /{index}/_doc/{id} |
是 | 删除文档 |
| Get Document | GET | /{index}/_doc/{id} |
N/A | 获取文档 |
| Bulk API | POST | /_bulk |
视操作而定 | 批量文档操作 |
| Multi-Get | POST | /_mget |
N/A | 批量获取文档 |
| Reindex | POST | /_reindex |
否 | 重建索引 |
5.2 Index Document API
基本信息
- 名称: Index Document
- 协议与方法:
- HTTP PUT
/{index}/_doc/{id}- 指定 ID,幂等 - HTTP POST
/{index}/_doc- 自动生成 ID,非幂等
- HTTP PUT
- 幂等性: PUT 请求幂等(相同 ID 覆盖),POST 请求非幂等(每次生成新 ID)
- 幂等键策略: 通过
if_seq_no+if_primary_term实现乐观并发控制
请求结构体
// IndexRequest - 索引文档请求
public class IndexRequest extends ReplicatedWriteRequest<IndexRequest>
implements DocWriteRequest<IndexRequest> {
// 文档 ID,不设置则自动生成
private String id;
// 路由键,用于计算文档所在分片,默认使用 id 作为路由键
private String routing;
// 文档源内容
private final IndexSource indexSource;
// 操作类型: INDEX(插入或更新) 或 CREATE(仅插入)
private OpType opType = OpType.INDEX;
// 版本号,用于乐观并发控制
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
// 序列号并发控制(推荐方式)
private long ifSeqNo = UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
// Ingest Pipeline
private String pipeline;
// 是否必须是别名
private boolean requireAlias;
// 自动生成时间戳(用于 ID 冲突检测)
private long autoGeneratedTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP;
// 是否为重试请求
private boolean isRetry = false;
// 动态模板映射
private Map<String, String> dynamicTemplates = Map.of();
}
请求字段表
| 字段 | 类型 | 必填 | 默认值 | 约束 | 说明 |
|---|---|---|---|---|---|
| index | string | 是 | - | - | 索引名称 |
| id | string | 否 | auto-generate | 长度 ≤ 512 字节 | 文档 ID |
| routing | string | 否 | id | - | 路由键,决定分片分配 |
| source | object/string/bytes | 是 | - | 有效 JSON | 文档内容 |
| opType | string | 否 | index | index|create | 操作类型 |
| version | long | 否 | - | > 0 | 文档版本号(已废弃) |
| version_type | string | 否 | internal | internal|external|external_gte | 版本类型(已废弃) |
| if_seq_no | long | 否 | - | ≥ 0 | 期望的序列号(用于并发控制) |
| if_primary_term | long | 否 | - | > 0 | 期望的主分片任期号 |
| pipeline | string | 否 | - | - | Ingest Pipeline 名称 |
| refresh | string | 否 | false | true|false|wait_for | 何时刷新使文档可见 |
| timeout | duration | 否 | 1m | - | 操作超时时间 |
| wait_for_active_shards | int|string | 否 | 1 | - | 等待多少活跃分片 |
| require_alias | boolean | 否 | false | - | 索引名必须是别名 |
响应结构体
// IndexResponse - 索引文档响应
public class IndexResponse extends DocWriteResponse implements StatusToXContentObject {
// 文档 ID
private String _id;
// 文档版本号(每次更新递增)
private long _version;
// 序列号(全局唯一,单调递增)
private long _seq_no;
// 主分片任期号
private long _primary_term;
// 操作结果: created, updated, deleted, not_found, noop
private Result result;
// 分片副本确认信息
private ReplicationResponse.ShardInfo _shards;
}
响应字段表
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| _index | string | 是 | 实际写入的索引名(可能是别名解析后) |
| _id | string | 是 | 文档 ID |
| _version | long | 是 | 文档版本号,每次更新递增 |
| _seq_no | long | 是 | 序列号,分片内唯一且单调递增 |
| _primary_term | long | 是 | 主分片任期号,分片迁移时递增 |
| result | string | 是 | 操作结果: created(新建) 或 updated(更新) |
| _shards.total | int | 是 | 总分片数(1 + 副本数) |
| _shards.successful | int | 是 | 成功分片数 |
| _shards.failed | int | 是 | 失败分片数 |
入口函数与核心代码
// TransportIndexAction - 索引操作的传输层动作
// 实际委托给 TransportBulkAction 处理单个文档
public class TransportIndexAction extends TransportSingleItemBulkWriteAction<IndexRequest, DocWriteResponse> {
public static final String NAME = "indices:data/write/index";
@Inject
public TransportIndexAction(
ActionFilters actionFilters,
TransportService transportService,
TransportBulkAction bulkAction
) {
// 将单文档请求转换为 Bulk 请求处理
super(NAME, transportService, actionFilters, IndexRequest::new, bulkAction);
}
}
// TransportBulkAction - 批量操作核心处理类
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
// 1. 解析请求,获取所有待处理的文档操作
final long startTime = relativeTime();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
// 2. 处理 Ingest Pipeline(如果配置)
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, executorName, listener);
} else {
executeBulk(task, bulkRequest, startTime, listener, responses);
}
}
private void executeBulk(
Task task,
BulkRequest bulkRequest,
long startTimeNanos,
ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses
) {
// 1. 创建自动索引(如果不存在且允许自动创建)
// 2. 按分片分组请求
final Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> request = bulkRequest.requests.get(i);
// 解析索引名(可能是别名/Data Stream)
// 计算路由,确定目标分片
ShardId shardId = clusterService.operationRouting().indexShards(
clusterState,
request.index(),
request.id(),
request.routing()
).shardId();
requestsByShard.computeIfAbsent(shardId, k -> new ArrayList<>())
.add(new BulkItemRequest(i, request));
}
// 3. 并发执行所有分片的批量操作
if (requestsByShard.isEmpty()) {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
return;
}
final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
BulkShardRequest bulkShardRequest = new BulkShardRequest(
shardId,
bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[0])
);
// 执行分片级别的批量操作
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
// 收集响应
for (BulkItemResponse itemResponse : bulkShardResponse.getResponses()) {
responses.set(itemResponse.getItemId(), itemResponse);
}
// 所有分片完成后返回
if (counter.decrementAndGet() == 0) {
finalizeBulkRequest(task, bulkRequest, responses, startTimeNanos, listener);
}
}
@Override
public void onFailure(Exception e) {
// 记录分片级别失败
for (BulkItemRequest request : requests) {
responses.set(request.id(), new BulkItemResponse(request.id(),
request.request().opType(),
new BulkItemResponse.Failure(request.index(), request.id(), e)));
}
if (counter.decrementAndGet() == 0) {
finalizeBulkRequest(task, bulkRequest, responses, startTimeNanos, listener);
}
}
});
}
}
分片级别处理
// TransportShardBulkAction - 分片级别批量操作
protected void shardOperationOnPrimary(
BulkShardRequest request,
IndexShard primary,
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
) {
// 在主分片上执行批量操作
Executor executor = threadPool.executor(ThreadPool.Names.WRITE);
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() {
// 1. 获取 Engine
final Engine.IndexingMemoryController memoryController =
indicesService.indexingMemoryController();
// 2. 逐个处理文档操作
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request();
try {
// 调用 IndexShard 执行索引操作
Engine.IndexResult result = primary.applyIndexOperationOnPrimary(
indexRequest.version(),
indexRequest.versionType(),
new SourceToParse(
indexRequest.index(),
indexRequest.id(),
indexRequest.source(),
indexRequest.getContentType(),
indexRequest.routing()
),
indexRequest.ifSeqNo(),
indexRequest.ifPrimaryTerm(),
indexRequest.getAutoGeneratedTimestamp(),
indexRequest.isRetry()
);
// 构建响应
if (result.getResultType() == Engine.Result.Type.SUCCESS) {
IndexResponse response = new IndexResponse(
primary.shardId(),
indexRequest.id(),
result.getSeqNo(),
result.getTerm(),
result.getVersion(),
result.isCreated()
);
responses[i] = new BulkItemResponse(i, indexRequest.opType(), response);
} else {
// 处理失败情况
responses[i] = new BulkItemResponse(i, indexRequest.opType(),
new BulkItemResponse.Failure(
indexRequest.index(),
indexRequest.id(),
result.getFailure()
)
);
}
} catch (Exception e) {
responses[i] = new BulkItemResponse(i, indexRequest.opType(),
new BulkItemResponse.Failure(
indexRequest.index(),
indexRequest.id(),
e
)
);
}
}
}
// 3. 返回主分片处理结果
BulkShardResponse response = new BulkShardResponse(request.shardId(), responses);
listener.onResponse(new PrimaryResult<>(request, response));
}
});
}
异常与回退
异常情况处理
-
版本冲突 (Version Conflict)
- 触发条件:
if_seq_no或if_primary_term不匹配 - HTTP 状态码: 409 Conflict
- 返回内容:
{ "error": { "type": "version_conflict_engine_exception", "reason": "[1]: version conflict, current version [2] is different than the one provided [1]" }, "status": 409 } - 客户端处理: 重新获取最新版本并重试
- 触发条件:
-
文档 ID 超长
- 触发条件: ID 长度 > 512 字节
- HTTP 状态码: 400 Bad Request
- 返回内容:
"Document id cannot be longer than 512 bytes"
-
JSON 解析失败
- 触发条件: source 不是有效 JSON
- HTTP 状态码: 400 Bad Request
- 影响范围: 仅当前文档失败,不影响 Bulk 中其他文档
-
主分片不可用
- 触发条件: 主分片处于 RELOCATING 或 INITIALIZING 状态
- HTTP 状态码: 503 Service Unavailable
- 客户端处理: 重试(Transport 层自动重试)
-
磁盘空间不足
- 触发条件: 磁盘使用率超过
cluster.routing.allocation.disk.watermark.flood_stage - HTTP 状态码: 429 Too Many Requests
- 索引行为: 索引被标记为只读(
index.blocks.read_only_allow_delete)
- 触发条件: 磁盘使用率超过
-
Mapping 冲突
- 触发条件: 字段类型与现有 Mapping 不兼容
- HTTP 状态码: 400 Bad Request
- 返回内容:
"mapper_parsing_exception"
重试策略
- 自动重试: Transport 层自动重试主分片不可用、超时等临时性错误
- 重试次数: 默认无限制,直到超时
- 退避策略: 指数退避,初始 50ms,最大 500ms
- 幂等性: PUT 请求幂等,可安全重试;POST 请求非幂等,重试可能产生重复文档
最佳实践
1. 乐观并发控制
使用 if_seq_no + if_primary_term 替代已废弃的 version:
# 1. 获取文档当前版本
GET /products/_doc/1
# Response:
# {
# "_seq_no": 5,
# "_primary_term": 1,
# ...
# }
# 2. 基于获取到的版本更新
PUT /products/_doc/1?if_seq_no=5&if_primary_term=1
{
"name": "Updated Product",
"price": 199.99
}
2. 批量索引优化
使用 Bulk API 而非单个 Index API:
# 批量索引(推荐)
POST /_bulk
{"index":{"_index":"products","_id":"1"}}
{"name":"Product 1","price":99.99}
{"index":{"_index":"products","_id":"2"}}
{"name":"Product 2","price":199.99}
# 单个索引(不推荐)
PUT /products/_doc/1
{"name":"Product 1","price":99.99}
PUT /products/_doc/2
{"name":"Product 2","price":199.99}
性能对比:
- 批量索引: 10,000-50,000 docs/s
- 单个索引: 1,000-5,000 docs/s
3. 控制刷新策略
# 实时可见(性能最差)
PUT /products/_doc/1?refresh=true
{"name":"Product"}
# 等待刷新完成(性能较差)
PUT /products/_doc/1?refresh=wait_for
{"name":"Product"}
# 不刷新(性能最好,默认)
PUT /products/_doc/1
{"name":"Product"}
4. 路由优化
为相关文档使用相同路由,减少搜索时需要查询的分片数:
# 将同一用户的文档路由到同一分片
PUT /orders/_doc/order1?routing=user123
{"user_id":"user123","product":"laptop"}
PUT /orders/_doc/order2?routing=user123
{"user_id":"user123","product":"mouse"}
# 搜索时指定路由,只查询一个分片
GET /orders/_search?routing=user123
{
"query": {"term": {"user_id": "user123"}}
}
5. 处理大文档
# 禁用 _source 存储(不需要返回原始文档时)
PUT /logs
{
"mappings": {
"_source": {"enabled": false}
}
}
# 或者仅存储部分字段
PUT /logs
{
"mappings": {
"_source": {
"includes": ["timestamp", "message"],
"excludes": ["large_field"]
}
}
}
6. 核心流程时序图
6.1 文档索引完整流程
主分片索引操作
sequenceDiagram
autonumber
participant Client as 客户端
participant Primary as IndexShard (Primary)
participant Mapper as MapperService
participant Engine as InternalEngine
participant VersionMap as LiveVersionMap
participant Checkpoint as LocalCheckpointTracker
participant Lucene as Lucene IndexWriter
participant Translog as Translog
participant Replicas as Replica Shards
Client->>Primary: IndexRequest(doc, id)
Primary->>Primary: 1. checkWriteAllowed()<br/>检查分片状态
Primary->>Mapper: 2. parse(source)<br/>解析文档
Mapper->>Mapper: 解析 JSON,构建 Lucene Document
Mapper->>Mapper: 检查 Mapping 兼容性
alt Mapping 需要更新
Mapper-->>Primary: 返回 DynamicMappingsUpdate
Primary-->>Client: 需要更新 Mapping<br/>(由上层处理)
else Mapping 兼容
Mapper-->>Primary: ParsedDocument
end
Primary->>Engine: 3. index(operation)
Engine->>VersionMap: 4. acquireLock(uid)<br/>获取文档锁
VersionMap-->>Engine: Releasable lock
Engine->>VersionMap: 5. getUnderLock(uid)<br/>获取当前版本
VersionMap-->>Engine: VersionValue (if exists)
alt 版本冲突
Engine->>Engine: 检查 ifSeqNo/ifPrimaryTerm
Engine-->>Primary: VersionConflictException
Primary-->>Client: 409 Conflict
else 无冲突
Engine->>Checkpoint: 6. generateSeqNo()<br/>生成序列号
Checkpoint-->>Engine: seqNo=N
Engine->>VersionMap: 7. putIndexUnderLock(uid, version)
VersionMap->>VersionMap: 更新 current map
alt 文档不存在
Engine->>Lucene: 8a. addDocument(doc)
else 文档已存在
Engine->>Lucene: 8b. updateDocument(uid, doc)
end
Lucene-->>Engine: success
Engine->>Translog: 9. add(operation, seqNo)
Translog->>Translog: 序列化操作
Translog->>Translog: 写入文件
alt Durability = REQUEST
Translog->>Translog: fsync()
end
Translog-->>Engine: Location
Engine->>Checkpoint: 10. markSeqNoAsProcessed(seqNo)
Checkpoint->>Checkpoint: 更新 checkpoint
Checkpoint-->>Engine: success
Engine->>VersionMap: 11. releaseLock(uid)
Engine-->>Primary: IndexResult<br/>(created/updated, seqNo, version)
end
Primary->>Replicas: 12. ReplicationRequest<br/>(doc, seqNo, primaryTerm)
loop 每个副本
Replicas->>Replicas: applyIndexOperationOnReplica()
Replicas->>Engine: index(doc, seqNo)
Engine->>Lucene: addDocument/updateDocument
Engine->>Translog: add(operation)
Replicas-->>Primary: ACK
end
Primary->>Primary: 13. waitForActiveShards()<br/>等待多数副本确认
Primary-->>Client: IndexResponse<br/>(id, version, seqNo, result)
时序图说明
阶段划分:
- 预处理阶段(步骤 1-2): 检查分片状态,解析文档
- 版本控制阶段(步骤 3-7): 获取锁,检查版本,生成序列号
- 持久化阶段(步骤 8-10): 写入 Lucene 和 Translog,更新检查点
- 复制阶段(步骤 11-13): 并行复制到副本分片
关键点:
- 文档锁: 使用 UID 锁防止同一文档并发写入
- 序列号: 单调递增,保证操作顺序
- 双写: 同时写入 Lucene 和 Translog
- 异步复制: 主分片不等待副本,后台异步复制
性能考虑:
- VersionMap 查找: O(1)
- Lucene 写入: O(log N) for term dictionary
- Translog 写入: O(1) append-only
- 副本复制: 并行执行,延迟取决于最慢副本
6.2 分片恢复流程
副本分片从主分片恢复
sequenceDiagram
autonumber
participant Master as Master Node
participant Primary as Primary Shard
participant Replica as Replica Shard (Target)
participant Store as Store (Target)
participant Engine as Engine (Target)
participant Translog as Translog (Target)
Master->>Replica: 分配副本分片到节点
Replica->>Replica: 创建 IndexShard<br/>state = RECOVERING
Replica->>Primary: StartRecoveryRequest
Primary->>Primary: 1. prepareForTranslogOperations()<br/>准备发送 Translog
Primary->>Primary: 2. 获取 Retention Lease<br/>防止 Translog 被清理
Note over Primary,Replica: Phase 1: 文件复制(Segment files)
Primary->>Primary: 3. 创建 Lucene snapshot<br/>获取当前 Segment 列表
Primary->>Replica: FileChunkRequest<br/>(segment files metadata)
loop 每个缺失的 Segment 文件
Primary->>Replica: FileChunkRequest<br/>(file data, offset, length)
Replica->>Store: writeFileChunk(data)
Store-->>Replica: success
end
Primary-->>Replica: Phase 1 完成<br/>(seqNo snapshot)
Note over Primary,Replica: Phase 2: Translog 复制
Primary->>Primary: 4. 获取 Translog snapshot<br/>(startSeqNo = recovery start)
loop 批量发送 Translog 操作
Primary->>Replica: TranslogOperationsRequest<br/>(operations[], maxSeqNo)
loop 每个操作
Replica->>Engine: applyIndexOperationOnReplica(op)
Engine->>Engine: 按 seqNo 顺序应用
Engine-->>Replica: success
end
Replica-->>Primary: ACK (localCheckpoint)
end
Primary->>Replica: FinalizeRecoveryRequest<br/>(globalCheckpoint)
Replica->>Replica: 5. finalizeRecovery()
Replica->>Engine: updateGlobalCheckpointOnReplica()
Replica->>Translog: trimUnreferencedReaders()<br/>清理旧 Translog
Replica->>Replica: 6. changeState(RECOVERING → POST_RECOVERY)
Replica->>Replica: 7. changeState(POST_RECOVERY → STARTED)
Replica->>Primary: RecoveryResponse<br/>(success, localCheckpoint)
Primary->>Primary: 8. markAllocationIdAsInSync()<br/>标记副本为 in-sync
Primary-->>Master: 副本恢复完成
时序图说明
恢复阶段:
-
Phase 1 - 文件复制:
- 目标: 复制 Lucene Segment 文件
- 方式: 增量复制(仅复制缺失/变更的文件)
- 耗时: 取决于数据量和网络带宽
-
Phase 2 - Translog 复制:
- 目标: 回放 Phase 1 期间的新操作
- 方式: 批量发送 Translog 操作(默认 512KB/批)
- 耗时: 取决于 Translog 大小
Retention Lease:
- 主分片为恢复中的副本创建 Retention Lease
- 防止恢复期间所需的 Translog 被清理
- 恢复完成后释放 Lease
状态转换:
INITIALIZING → RECOVERING → POST_RECOVERY → STARTED
性能优化:
- 使用增量复制,只传输差异文件
- 并行传输多个文件
- 压缩 Translog 操作
- 批量应用 Translog 操作
6.3 Refresh 流程
Refresh 使文档可搜索
sequenceDiagram
autonumber
participant Scheduler as Refresh Scheduler
participant Shard as IndexShard
participant Engine as InternalEngine
participant Lucene as IndexWriter
participant VersionMap as LiveVersionMap
participant Searcher as SearcherManager
Scheduler->>Shard: refresh(source="scheduled")
Shard->>Engine: refresh(source, scope)
Engine->>Engine: 1. 检查是否需要 Refresh<br/>(lastRefreshTime, dirty flag)
alt 无新写入
Engine-->>Shard: 跳过 Refresh
else 有新写入
Engine->>Lucene: 2. IndexWriter.flush()<br/>(不 commit,仅生成 Segment)
Lucene->>Lucene: 将 IndexBuffer 写入新 Segment
Lucene->>Lucene: 生成 .cfs/.cfe/.si 文件
Lucene-->>Engine: success
Engine->>Searcher: 3. SearcherManager.maybeRefresh()
Searcher->>Searcher: 打开新的 IndexReader
Searcher->>Searcher: 触发 RefreshListener
Searcher-->>Engine: 新 Searcher
Engine->>VersionMap: 4. beforeRefresh()
VersionMap->>VersionMap: current → old<br/>创建新的 empty current
Engine->>Engine: 5. 更新 lastRefreshTime
Engine->>VersionMap: 6. afterRefresh()
VersionMap->>VersionMap: 清空 old map<br/>释放内存
Engine-->>Shard: RefreshResult
end
Shard->>Shard: 7. 触发 RefreshListener<br/>(通知等待 Refresh 的请求)
Shard-->>Scheduler: success
时序图说明
Refresh 触发时机:
- 定时触发: 默认 1s 一次(可配置
index.refresh_interval) - 手动触发: 调用
_refreshAPI - Flush 时触发: Flush 操作会先执行 Refresh
Refresh vs Flush:
| 操作 | Refresh | Flush |
|---|---|---|
| 目的 | 使文档可搜索 | 持久化到磁盘 |
| 操作 | 生成新 Segment | Commit Segment + 清理 Translog |
| 频率 | 1s(默认) | 30min 或 Translog 满 |
| 耗时 | 快(< 100ms) | 慢(> 1s) |
| 影响 | 增加 Segment 数量 | 减少 Segment 数量(后续 Merge) |
VersionMap 双缓冲:
beforeRefresh(): current 变为 old,创建新 currentafterRefresh(): 清空 old,释放内存- 作用: Refresh 期间新写入的文档保存在新 current,旧文档在 old 中仍可读
性能影响:
- Refresh 会产生新的小 Segment
- 过多的小 Segment 会影响搜索性能
- 需要 Merge 合并小 Segment
优化建议:
- 写入密集型场景: 增大
refresh_interval到 30s-60s - 实时搜索需求: 保持默认 1s
- 批量导入: 设置
refresh_interval=-1,完成后手动 Refresh
7. 配置与调优
7.1 索引级别配置
# 分片和副本配置
index.number_of_shards: 5 # 主分片数,创建后不可修改
index.number_of_replicas: 1 # 副本数,可动态修改
# Refresh 配置
index.refresh_interval: 1s # Refresh 间隔,默认 1s
index.max_refresh_listeners: 1000 # 最大 Refresh 监听器数量
# Translog 配置
index.translog.durability: request # request: 每次请求 fsync, async: 异步 fsync
index.translog.sync_interval: 5s # 异步 fsync 间隔
index.translog.flush_threshold_size: 512mb # Translog 大小阈值,触发 Flush
# Merge 配置
index.merge.scheduler.max_thread_count: 1 # 合并线程数
index.merge.policy.max_merged_segment: 5gb # 最大 Segment 大小
# 缓存配置
index.queries.cache.enabled: true # 查询缓存开关
index.requests.cache.enable: true # 请求缓存开关
# 存储配置
index.store.type: fs # 存储类型: fs, niofs, mmapfs
index.codec: default # 编解码器: default, best_compression
7.2 性能调优建议
写入性能优化
1. 增大 Refresh 间隔
PUT /my-index/_settings
{
"index.refresh_interval": "30s"
}
2. 使用异步 Translog
PUT /my-index/_settings
{
"index.translog.durability": "async",
"index.translog.sync_interval": "5s"
}
3. 禁用 Replica(初始加载)
PUT /my-index/_settings
{
"index.number_of_replicas": 0
}
4. 增大 IndexBuffer
indices.memory.index_buffer_size: 20%
查询性能优化
1. 启用查询缓存
PUT /my-index/_settings
{
"index.queries.cache.enabled": true
}
2. 使用 Filter 而非 Query
{
"query": {
"bool": {
"filter": [
{ "term": { "status": "published" } }
]
}
}
}
3. 控制 Segment 数量
POST /my-index/_forcemerge?max_num_segments=1
存储优化
1. 压缩编解码器
PUT /my-index/_settings
{
"index.codec": "best_compression"
}
2. 禁用 _source(不需要原始文档时)
PUT /my-index
{
"mappings": {
"_source": {
"enabled": false
}
}
}
8. 监控与故障排查
8.1 关键指标
# 索引统计
GET /_stats/indexing,search,store,merge,refresh
# 分片信息
GET /_cat/shards?v&h=index,shard,prirep,state,docs,store,node
# 索引健康
GET /_cluster/health?level=indices
# 恢复状态
GET /_cat/recovery?v&active_only=true
8.2 常见问题
1. 索引速度慢
- 检查 Refresh 间隔是否过短
- 检查是否开启同步 Translog fsync
- 检查 Merge 是否跟不上写入速度
- 检查磁盘 I/O 是否饱和
2. 分片不可用
- 检查节点是否下线
- 检查磁盘空间是否充足
- 检查 Translog 是否损坏
- 查看 Elasticsearch 日志
3. 副本同步延迟
- 检查网络延迟
- 检查副本节点负载
- 检查 Translog 大小
- 检查全局检查点差异