Elasticsearch-03-搜索模块
本文档提供搜索模块的全面剖析,包括模块职责、架构设计、核心数据结构、对外API详细规格、关键流程时序图、性能优化和故障排查。
1. 模块职责
搜索模块是 Elasticsearch 的核心功能模块,负责分布式搜索请求的处理。主要职责包括:
- 查询执行: 在 Lucene 上执行用户查询
- 两阶段搜索: Query Phase(查询阶段) + Fetch Phase(获取阶段)
- 结果聚合: 归并多个分片的搜索结果
- 聚合计算: 执行聚合分析(Aggregations)
- 高亮显示: 生成搜索结果高亮片段
- 排序: 支持多字段排序和自定义排序
- 分页: 支持 from/size, Scroll, Search After 分页
输入/输出
输入:
- SearchRequest (查询请求)
- Query DSL (查询语句)
- Aggregations (聚合定义)
- Sort (排序规则)
输出:
- SearchResponse (搜索响应)
- Hits (匹配的文档列表)
- Aggregations (聚合结果)
- Took (耗时统计)
上下游依赖
上游调用方:
- Action Module (TransportSearchAction)
- REST Layer (RestSearchAction)
下游被调用方:
- Index Shard (读取数据)
- Apache Lucene (执行查询)
- Aggregation Module (计算聚合)
2. 整体服务架构图
flowchart TB
subgraph Client["客户端层"]
RESTClient[REST Client]
JavaClient[Java Client]
HTTPRequest[HTTP Request<br/>POST /index/_search]
end
subgraph RESTLayer["REST接口层 (协调节点)"]
RestSearchAction[RestSearchAction<br/>解析HTTP请求]
SearchRequestBuilder[SearchRequest构建<br/>解析Query DSL]
end
subgraph ActionLayer["Action传输层 (协调节点)"]
TransportSearchAction[TransportSearchAction<br/>分布式协调器]
SearchPhaseProvider[SearchPhaseProvider<br/>阶段提供者]
SearchAsyncAction[SearchQueryThenFetch<br/>AsyncAction<br/>异步执行器]
end
subgraph Coordinator["协调阶段执行器 (协调节点)"]
CanMatchPhase[CanMatchPreFilterPhase<br/>预过滤阶段]
QueryPhaseCoord[QueryPhase协调<br/>并发查询所有分片]
ReducePhase[ReducePhase<br/>SearchPhaseController<br/>结果归并]
FetchPhaseCoord[FetchPhase协调<br/>并发获取文档]
end
subgraph TransportLayer["传输服务层"]
SearchTransportService[SearchTransportService<br/>RPC通信]
ConnectionMgr[Connection Manager<br/>节点连接管理]
end
subgraph DataNode1["数据节点 1"]
SService1[SearchService]
SContext1[SearchContext]
QPExec1[QueryPhase.execute]
FPExec1[FetchPhase.execute]
Shard1[IndexShard]
Lucene1[Lucene Searcher]
end
subgraph DataNode2["数据节点 2"]
SService2[SearchService]
SContext2[SearchContext]
QPExec2[QueryPhase.execute]
FPExec2[FetchPhase.execute]
Shard2[IndexShard]
Lucene2[Lucene Searcher]
end
subgraph DataNodeN["数据节点 N"]
SServiceN[SearchService]
SContextN[SearchContext]
QPExecN[QueryPhase.execute]
FPExecN[FetchPhase.execute]
ShardN[IndexShard]
LuceneN[Lucene Searcher]
end
RESTClient --> HTTPRequest
JavaClient --> HTTPRequest
HTTPRequest --> RestSearchAction
RestSearchAction --> SearchRequestBuilder
SearchRequestBuilder --> TransportSearchAction
TransportSearchAction --> SearchPhaseProvider
SearchPhaseProvider --> SearchAsyncAction
SearchAsyncAction --> CanMatchPhase
CanMatchPhase --> |过滤后的分片列表| QueryPhaseCoord
QueryPhaseCoord --> SearchTransportService
SearchTransportService --> ConnectionMgr
ConnectionMgr --> |ShardSearchRequest| SService1
ConnectionMgr --> |ShardSearchRequest| SService2
ConnectionMgr --> |ShardSearchRequest| SServiceN
SService1 --> SContext1 --> QPExec1 --> Shard1 --> Lucene1
SService2 --> SContext2 --> QPExec2 --> Shard2 --> Lucene2
SServiceN --> SContextN --> QPExecN --> ShardN --> LuceneN
Lucene1 --> |QuerySearchResult| ReducePhase
Lucene2 --> |QuerySearchResult| ReducePhase
LuceneN --> |QuerySearchResult| ReducePhase
ReducePhase --> |Top K docIds| FetchPhaseCoord
FetchPhaseCoord --> SearchTransportService
SearchTransportService --> |ShardFetchRequest| SService1
SearchTransportService --> |ShardFetchRequest| SService2
SearchTransportService --> |ShardFetchRequest| SServiceN
SService1 --> FPExec1 --> Lucene1
SService2 --> FPExec2 --> Lucene2
SServiceN --> FPExecN --> LuceneN
Lucene1 --> |FetchSearchResult| SearchAsyncAction
Lucene2 --> |FetchSearchResult| SearchAsyncAction
LuceneN --> |FetchSearchResult| SearchAsyncAction
SearchAsyncAction --> |SearchResponse| RestSearchAction
RestSearchAction --> |JSON Response| Client
style RESTLayer fill:#E1F5FF
style ActionLayer fill:#FFE1E1
style Coordinator fill:#E1FFE1
style TransportLayer fill:#F5E1FF
style DataNode1 fill:#FFF4E1
style DataNode2 fill:#FFF4E1
style DataNodeN fill:#FFF4E1
架构说明
各层职责与交互
1. REST接口层 (RestSearchAction)
- 职责: HTTP请求解析与响应格式化
- 关键处理:
- 解析URL参数 (index, routing, preference等)
- 解析请求体 (Query DSL, 聚合, 排序等)
- 构建
SearchRequest对象 - 将
SearchResponse格式化为JSON返回客户端
- 入口路由:
GET/POST /_search- 全局搜索GET/POST /{index}/_search- 指定索引搜索
- 核心方法:
prepareRequest()→ 解析并提交到Action层
2. Action传输层 (TransportSearchAction)
- 职责: 分布式协调与阶段编排
- 关键处理:
- 解析目标索引并计算涉及的分片
- 构建
SearchShardIterator[]分片路由迭代器 - 根据搜索类型选择执行策略:
QUERY_THEN_FETCH→SearchQueryThenFetchAsyncActionDFS_QUERY_THEN_FETCH→SearchDfsQueryThenFetchAsyncAction
- 管理跨集群搜索 (CCS) 逻辑
- 处理 Point-in-Time 和 Scroll 上下文
- 核心方法:
doExecute()→ 创建并启动异步搜索
3. 阶段执行器 (SearchQueryThenFetchAsyncAction)
- 职责: 异步协调各个搜索阶段
- 阶段流程:
CanMatch → Query → Reduce → Fetch → Response - 并发控制:
maxConcurrentShardRequests: 控制同时查询的分片数 (默认5)- 基于节点级别的流控: 避免单节点过载
- 使用
Semaphore实现背压控制
- 失败重试: 自动在副本分片上重试失败的请求
- 核心方法:
start()→executePhaseOnShard()
4. 传输服务层 (SearchTransportService)
- 职责: RPC通信与请求路由
- 注册的处理器:
QUERY_ACTION_NAME: 处理Query Phase请求FETCH_ID_ACTION_NAME: 处理Fetch Phase请求DFS_ACTION_NAME: 处理DFS Phase请求FREE_CONTEXT_ACTION_NAME: 清理搜索上下文
- 连接管理: 维护与各数据节点的长连接
- 序列化: 使用
StreamInput/StreamOutput序列化请求和响应
5. 数据节点服务层 (SearchService)
- 职责: 分片级搜索执行
- 生命周期管理:
- 创建
ReaderContext: 持有IndexReader引用 - 创建
SearchContext: 封装单次搜索的所有状态 - 引用计数管理: 确保 Reader 不会过早关闭
- 超时管理: 通过
ScrollContext管理长期上下文
- 创建
- 线程池: 使用
search线程池异步执行查询 - 核心方法:
executeQueryPhase(): 执行Query PhaseexecuteFetchPhase(): 执行Fetch PhasecreateContext(): 创建搜索上下文
搜索阶段详解
阶段 0: CanMatch Phase (预过滤阶段)
- 目的: 快速过滤不包含匹配文档的分片
- 触发条件: 分片数 >
pre_filter_shard_size(默认128) - 检查方式:
- 检查分片的 min/max 值范围 (基于
PointValues) - 在协调节点重写查询 (coordinator rewrite)
- 检查日期范围、数值范围是否与分片重叠
- 检查分片的 min/max 值范围 (基于
- 优化效果: 可减少 50%+ 的分片查询
- 适用场景: 带范围过滤的查询 (range, date_range)
阶段 1: Query Phase (查询阶段)
- 目的: 在每个分片上执行查询,返回 Top K 文档的 ID 和分数
- 执行流程:
- 构建 Lucene Query 对象
- 创建 Collector (TopDocsCollector + AggregationCollector)
- 执行
IndexSearcher.search(query, collector) - 收集 Top (size + from) 个文档的 (docId, score, sortValues)
- 执行聚合计算
- 输出:
QuerySearchResultdocIds[]: 文档内部ID数组scores[]: 文档分数数组sortValues[][]: 排序字段值 (如果有排序)totalHits: 总命中数aggregations: 分片级聚合结果
- 并发: 所有分片并行执行
- 轻量级: 仅返回文档 ID 和排序值,不返回文档内容
阶段 2: Reduce Phase (归并阶段)
- 目的: 归并所有分片的 QueryResult,选出全局 Top K
- 操作流程:
- 归并排序: 使用优先队列归并所有分片的 Top K
- 算法: K路归并排序
- 复杂度: O(N * K * log K), N=分片数
- 分数重排序: 按分数/排序字段重新排序
- 聚合归并: 合并所有分片的聚合结果
- 对于
sum/avg/max/min: 直接合并 - 对于
terms: 重新聚合各分片的 Top Terms
- 对于
- 选出全局 Top K: 跳过
from,选择size个结果
- 归并排序: 使用优先队列归并所有分片的 Top K
- 输出:
ReducedQueryPhase- 全局 Top K 的
(shardIndex, docId)映射 - 归并后的聚合结果
- 总命中数和最高分数
- 全局 Top K 的
- 执行位置: 协调节点
阶段 3: Fetch Phase (获取阶段)
- 目的: 获取 Top K 文档的完整内容
- 输入: 全局 Top K 的
(shardId, docId)列表 - 执行流程:
- 按分片分组 docIds:
{shard1: [docId1, docId5], shard2: [docId2]} - 并发向各分片发送
ShardFetchRequest - 在分片上执行:
- 通过
docId读取StoredFields - 提取
_source字段 - 生成高亮片段 (如果需要)
- 提取
doc_values字段 (如果需要)
- 通过
- 返回
FetchSearchResult
- 按分片分组 docIds:
- 输出:
FetchSearchResultdocuments[]: 完整文档数组highlightFields: 高亮字段映射fields: 指定字段值
- 并发: 仅需要的分片并行执行
- 重量级: 需要读取完整文档内容,网络传输量大
两阶段设计优势
| 方面 | Query Phase | Fetch Phase |
|---|---|---|
| 网络传输 | 小(仅 docId+score) | 大(完整文档) |
| 计算开销 | 高(查询+聚合) | 低(仅读取) |
| 并发度 | 所有分片 | 仅需要的分片 |
| 优化空间 | 缓存查询结果 | 缓存文档内容 |
为什么需要两阶段?
- 减少网络传输: Query Phase 仅传输 docId,Fetch Phase 仅传输需要的文档
- 负载均衡: 每个副本可以处理 Query Phase,减轻主分片压力
- 提前终止: 部分查询可以在 Query Phase 提前终止
3. 核心组件
3.1 SearchService
职责: 管理搜索请求的生命周期
public class SearchService extends AbstractLifecycleComponent {
// 线程池
private final ThreadPool threadPool;
// 搜索上下文管理
private final ConcurrentMapLong<ReaderContext> activeReaders;
// Fetch 阶段处理
private final FetchPhase fetchPhase;
// 执行 Query Phase
public void executeQueryPhase(
ShardSearchRequest request,
SearchShardTask task,
ActionListener<SearchPhaseResult> listener
);
// 执行 Fetch Phase
public void executeFetchPhase(
ShardFetchRequest request,
SearchShardTask task,
ActionListener<FetchSearchResult> listener
);
// 创建搜索上下文
private SearchContext createContext(
ReaderContext reader,
ShardSearchRequest request,
SearchShardTask task
);
}
3.2 SearchContext
职责: 封装单个分片的搜索上下文
public interface SearchContext extends Releasable {
// 查询信息
SearchShardTarget shardTarget();
ShardSearchRequest request();
Query query();
// 搜索器
ContextIndexSearcher searcher();
// 结果
QuerySearchResult queryResult();
FetchSearchResult fetchResult();
// 聚合上下文
SearchContextAggregations aggregations();
// 排序
SortAndFormats sort();
// 分页
int from();
int size();
// 超时
TimeValue timeout();
boolean isTimedOut();
}
3.3 QueryPhase
职责: 执行查询阶段
public class QueryPhase {
public static void execute(SearchContext searchContext) {
// 1. 预处理聚合
AggregationPhase.preProcess(searchContext);
// 2. 构建 Collector
List<Collector> collectors = new ArrayList<>();
collectors.add(new TopDocsCollector());
if (searchContext.aggregations() != null) {
collectors.add(searchContext.aggregations().collector());
}
// 3. 执行搜索
MultiCollector multiCollector = MultiCollector.wrap(collectors);
searchContext.searcher().search(
searchContext.query(),
multiCollector
);
// 4. 后处理
RescorePhase.execute(searchContext); // Rescore
SuggestPhase.execute(searchContext); // Suggest
}
}
3.4 FetchPhase
职责: 执行获取阶段
public final class FetchPhase {
public void execute(
SearchContext context,
int[] docIdsToLoad
) {
// 1. 加载文档
SearchHits hits = buildSearchHits(
context,
docIdsToLoad
);
// 2. 执行 SubPhases
for (FetchSubPhase subPhase : fetchSubPhases) {
subPhase.process(context, hits);
}
// 3. 设置结果
context.fetchResult().shardResult(hits);
}
private SearchHits buildSearchHits(
SearchContext context,
int[] docIdsToLoad
) {
SearchHit[] hits = new SearchHit[docIdsToLoad.length];
for (int i = 0; i < docIdsToLoad.length; i++) {
int docId = docIdsToLoad[i];
// 读取文档
Document doc = context.searcher().doc(docId);
// 构建 SearchHit
hits[i] = new SearchHit(docId);
hits[i].sourceRef(doc.getSource());
hits[i].score(scores[i]);
}
return new SearchHits(hits, totalHits, maxScore);
}
}
4. 搜索类型
4.1 QUERY_THEN_FETCH (默认)
1. Query Phase: 所有分片并行查询,返回 Top K docIds
2. Reduce: 协调节点归并,选出全局 Top K
3. Fetch Phase: 获取 Top K 文档内容
特点:
- 默认搜索类型
- 最少网络传输
- 适用于大多数场景
4.2 DFS_QUERY_THEN_FETCH
1. DFS Phase: 收集所有分片的词频统计
2. Query Phase: 基于全局词频计算分数
3. Reduce: 归并结果
4. Fetch Phase: 获取文档
特点:
- 更准确的 TF-IDF 分数
- 额外的网络往返
- 仅用于分数精确度要求高的场景
5. 核心数据结构
5.1 SearchContext 与相关类
classDiagram
class SearchContext {
<<abstract>>
+ShardSearchContextId id
+SearchType searchType
+SearchShardTarget shardTarget
+ShardSearchRequest request
+ReaderContext readerContext
+Query query
+ParsedQuery postFilter
+SortAndFormats sort
+int from
+int size
+int trackTotalHitsUpTo
+int terminateAfter
+TimeValue timeout
+SearchContextAggregations aggregations
+SearchHighlightContext highlight
+FetchSourceContext fetchSourceContext
+StoredFieldsContext storedFieldsContext
+QuerySearchResult queryResult
+FetchSearchResult fetchResult
+preProcess()
+buildFilteredQuery(Query)
+close()
}
class ShardSearchContextImpl {
-ReaderContext readerContext
-ShardSearchRequest request
-SearchShardTarget shardTarget
-ContextIndexSearcher searcher
-SearchExecutionContext searchExecutionContext
-QuerySearchResult queryResult
-FetchSearchResult fetchResult
-ScrollContext scrollContext
-int from
-int size
-Query query
-SortAndFormats sort
+execute()
}
class ReaderContext {
-ShardSearchContextId id
-IndexService indexService
-IndexShard indexShard
-Engine.SearcherSupplier searcherSupplier
-AtomicLong keepAlive
-AtomicLong lastAccessTime
-boolean singleSession
-List~Releasable~ onCloses
-Map~String,Object~ context
+incRef()
+tryIncRef()
+decRef()
+updateLastAccessTime()
+close()
}
class ContextIndexSearcher {
-Query cancellable
-Runnable checkCancelled
-CircuitBreaker circuitBreaker
+search(Query, Collector)
+searchAfter(ScoreDoc, Query, int)
+searchLeaf(LeafReaderContext, Weight, Collector)
}
SearchContext <|-- ShardSearchContextImpl
ShardSearchContextImpl --> ReaderContext : holds
ShardSearchContextImpl --> ContextIndexSearcher : uses
ReaderContext --> IndexShard : references
SearchContext
职责: 搜索执行的核心上下文基类
关键字段:
| 字段 | 类型 | 说明 |
|---|---|---|
| id | ShardSearchContextId | 上下文唯一标识符 |
| searchType | SearchType | 搜索类型 (QUERY_THEN_FETCH / DFS_QUERY_THEN_FETCH) |
| shardTarget | SearchShardTarget | 目标分片信息 (index, shard, node) |
| request | ShardSearchRequest | 分片搜索请求 |
| query | Query | Lucene 查询对象 |
| postFilter | ParsedQuery | 后置过滤器 (在聚合后执行) |
| sort | SortAndFormats | 排序规则 |
| from | int | 分页起始位置 |
| size | int | 返回结果数量 |
| trackTotalHitsUpTo | int | 统计总数的上限 (默认10000) |
| terminateAfter | int | 提前终止的文档数 (0表示不终止) |
| timeout | TimeValue | 查询超时时间 |
| aggregations | SearchContextAggregations | 聚合上下文 |
| highlight | SearchHighlightContext | 高亮上下文 |
| fetchSourceContext | FetchSourceContext | _source 过滤配置 |
| queryResult | QuerySearchResult | Query Phase 结果 |
| fetchResult | FetchSearchResult | Fetch Phase 结果 |
生命周期:
创建 → preProcess() → Query Phase → Fetch Phase → close()
ReaderContext
职责: 管理分片的 IndexReader 生命周期和引用计数
关键字段:
| 字段 | 类型 | 说明 |
|---|---|---|
| id | ShardSearchContextId | Context 唯一 ID |
| indexShard | IndexShard | 所属分片 |
| searcherSupplier | Engine.SearcherSupplier | Searcher 提供者 |
| keepAlive | AtomicLong | 保持存活时间 (毫秒) |
| lastAccessTime | AtomicLong | 最后访问时间 |
| singleSession | boolean | 是否单次查询 (true: 一次性, false: Scroll/PIT) |
| refCounted | AbstractRefCounted | 引用计数器 |
引用计数:
// 增加引用
boolean success = readerContext.tryIncRef();
// 使用
try {
Engine.Searcher searcher = readerContext.acquireSearcher("search");
// ... 执行搜索
} finally {
// 减少引用
readerContext.decRef();
}
5.2 SearchRequest & SearchResponse
classDiagram
class SearchRequest {
-String[] indices
-SearchType searchType
-SearchSourceBuilder source
-String routing
-String preference
-Boolean requestCache
-Boolean allowPartialSearchResults
-TimeValue scrollKeepAlive
-int maxConcurrentShardRequests
-Integer preFilterShardSize
-int batchedReduceSize
-IndicesOptions indicesOptions
+indices(String[])
+source(SearchSourceBuilder)
+validate()
}
class SearchSourceBuilder {
-QueryBuilder query
-int from
-int size
-List~SortBuilder~ sorts
-List~AggregationBuilder~ aggregations
-HighlightBuilder highlight
-FetchSourceContext fetchSource
-SearchAfterBuilder searchAfter
-TimeValue timeout
-boolean explain
-boolean version
-Integer trackTotalHitsUpTo
-Integer terminateAfter
+query(QueryBuilder)
+from(int)
+size(int)
+sort(SortBuilder)
+aggregation(AggregationBuilder)
+toXContent(XContentBuilder)
}
class SearchResponse {
-SearchHits hits
-InternalAggregations aggregations
-Suggest suggest
-SearchProfileResults profileResults
-boolean timedOut
-Boolean terminatedEarly
-int numReducePhases
-String scrollId
-BytesReference pointInTimeId
-int totalShards
-int successfulShards
-int skippedShards
-ShardSearchFailure[] shardFailures
-Clusters clusters
-long tookInMillis
+getHits()
+getAggregations()
+getTook()
}
class SearchHits {
-TotalHits totalHits
-float maxScore
-SearchHit[] hits
+iterator()
}
class SearchHit {
-String id
-float score
-Map~String,Object~ sourceAsMap
-String index
-long version
-long seqNo
-long primaryTerm
-Object[] sortValues
-Map~String,HighlightField~ highlightFields
-Map~String,SearchHitField~ fields
+getId()
+getScore()
+getSourceAsMap()
}
SearchRequest --> SearchSourceBuilder : contains
SearchResponse --> SearchHits : contains
SearchHits --> SearchHit : contains many
6. 对外 API
6.1 API 清单
搜索模块对外提供以下核心 API:
| API 名称 | HTTP 方法 | 路径 | 幂等性 | 说明 |
|---|---|---|---|---|
| Standard Search | GET/POST | /{index}/_search |
是 | 标准搜索 |
| Scroll Search | POST | /{index}/_search?scroll=1m |
否 | Scroll 搜索 |
| Search Scroll | POST | /_search/scroll |
否 | 继续 Scroll |
| Clear Scroll | DELETE | /_search/scroll |
是 | 清理 Scroll |
| Point-in-Time | POST | /{index}/_pit?keep_alive=1m |
是 | 打开 PIT |
| Close PIT | DELETE | /_pit |
是 | 关闭 PIT |
| Multi-Search | POST | /_msearch |
是 | 批量搜索 |
| Count | GET/POST | /{index}/_count |
是 | 计数查询 |
6.2 Standard Search API
基本信息
- 名称:
_search - 协议与方法: HTTP GET/POST
/{index}/_search - 幂等性: 是(查询操作)
- 入口 Action:
TransportSearchAction
请求结构体
public class SearchRequest extends LegacyActionRequest {
// 目标索引
private String[] indices = Strings.EMPTY_ARRAY;
// 搜索类型 (QUERY_THEN_FETCH, DFS_QUERY_THEN_FETCH)
private SearchType searchType = SearchType.DEFAULT;
// 搜索查询体
private SearchSourceBuilder source;
// 路由控制
private String routing;
// 偏好设置(_local, _primary, _replica, custom string)
private String preference;
// 是否使用请求缓存
private Boolean requestCache;
// 是否允许部分结果
private Boolean allowPartialSearchResults;
// Scroll 保持时间
private TimeValue scrollKeepAlive;
// 并发分片请求数量
private int maxConcurrentShardRequests = 5;
// 预过滤分片大小阈值
private Integer preFilterShardSize = 128;
// 批量归并大小
private int batchedReduceSize = 512;
// 索引选项
private IndicesOptions indicesOptions;
}
请求字段表
| 字段 | 类型 | 必填 | 默认值 | 约束 | 说明 |
|---|---|---|---|---|---|
| indices | String[] | 否 | [] | - | 目标索引,空表示所有索引 |
| searchType | SearchType | 否 | QUERY_THEN_FETCH | QUERY_THEN_FETCH / DFS_QUERY_THEN_FETCH | 搜索类型 |
| source | SearchSourceBuilder | 是 | - | - | 查询、聚合、排序、分页等配置 |
| routing | String | 否 | null | - | 路由值,控制分片选择 |
| preference | String | 否 | null | _local/_primary/_replica/custom | 分片偏好 |
| requestCache | Boolean | 否 | null | - | 是否缓存结果 (size=0 时默认true) |
| allowPartialSearchResults | Boolean | 否 | true | - | 部分分片失败时是否返回结果 |
| scrollKeepAlive | TimeValue | 否 | null | - | Scroll 上下文保持时间 |
| maxConcurrentShardRequests | int | 否 | 5 | >0 | 并发查询分片数量 |
| preFilterShardSize | Integer | 否 | 128 | >0 | 启用 CanMatch 预过滤的阈值 |
| batchedReduceSize | int | 否 | 512 | >0 | 批量归并结果的批次大小 |
SearchSourceBuilder 核心字段
public class SearchSourceBuilder {
private QueryBuilder query; // 查询条件
private int from = 0; // 分页起始位置
private int size = 10; // 返回结果数量
private List<SortBuilder<?>> sorts; // 排序规则
private List<AggregationBuilder> aggregations; // 聚合
private List<String> storedFields; // 返回的 stored 字段
private List<String> docValueFields; // 返回的 doc_value 字段
private FetchSourceContext fetchSource; // _source 过滤
private HighlightBuilder highlight; // 高亮
private SearchAfterBuilder searchAfter; // search_after 分页
private TimeValue timeout; // 查询超时时间
private boolean explain; // 是否返回评分解释
private boolean version; // 是否返回版本号
private boolean trackScores; // 是否计算分数
private Integer trackTotalHitsUpTo; // 统计总数上限
private Integer terminateAfter; // 提前终止文档数
}
响应结构体
public class SearchResponse extends ActionResponse {
// 搜索结果
private final SearchHits hits;
// 聚合结果
private final InternalAggregations aggregations;
// 搜索建议结果
private final Suggest suggest;
// 性能分析结果
private final SearchProfileResults profileResults;
// 是否超时
private final boolean timedOut;
// 是否提前终止
private final Boolean terminatedEarly;
// 归并阶段数量
private final int numReducePhases;
// Scroll ID
private final String scrollId;
// Point-in-Time ID
private final BytesReference pointInTimeId;
// 分片统计
private final int totalShards;
private final int successfulShards;
private final int skippedShards;
private final ShardSearchFailure[] shardFailures;
// 集群统计 (跨集群搜索)
private final Clusters clusters;
// 耗时
private final long tookInMillis;
}
响应字段表
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| hits | SearchHits | 是 | 搜索命中的文档 |
| aggregations | Aggregations | 否 | 聚合结果 |
| suggest | Suggest | 否 | 搜索建议 |
| profileResults | ProfileResults | 否 | 性能分析结果 (profile=true 时) |
| timedOut | boolean | 是 | 是否超时 |
| terminatedEarly | Boolean | 否 | 是否提前终止 |
| numReducePhases | int | 是 | 归并阶段数量 (分布式搜索) |
| scrollId | String | 否 | Scroll ID (scroll 请求时) |
| pointInTimeId | BytesReference | 否 | Point-in-Time ID (PIT 请求时) |
| totalShards | int | 是 | 总分片数 |
| successfulShards | int | 是 | 成功分片数 |
| skippedShards | int | 是 | 跳过分片数 (CanMatch 过滤) |
| shardFailures | ShardSearchFailure[] | 否 | 分片失败详情 |
| tookInMillis | long | 是 | 查询耗时(毫秒) |
入口函数与核心代码
// TransportSearchAction - 搜索操作的传输层动作
public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
@Override
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
// 1. 解析和重写请求
final long relativeStartNanos = System.nanoTime();
final SearchTimeProvider timeProvider = new SearchTimeProvider(
searchRequest.getOrCreateAbsoluteStartMillis(),
relativeStartNanos,
System::nanoTime
);
// 2. 解析目标索引
final ClusterState clusterState = clusterService.state();
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(
clusterState,
searchRequest.indicesOptions(),
searchRequest.indices()
);
// 3. 计算目标分片
final Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(
searchRequest,
clusterState,
concreteIndices,
remoteClusterIndices
);
// 4. 创建搜索任务上下文
final SearchTask searchTask = (SearchTask) task;
final SearchShardIterator[] shardIterators = buildShardIterators(
clusterState,
concreteIndices,
aliasFilter,
searchRequest.routing(),
searchRequest.preference()
);
// 5. 执行搜索
searchAsyncAction(
searchTask,
searchRequest,
shardIterators,
timeProvider,
listener
).start();
}
private void searchAsyncAction(
SearchTask task,
SearchRequest searchRequest,
SearchShardIterator[] shardIterators,
SearchTimeProvider timeProvider,
ActionListener<SearchResponse> listener
) {
// 创建异步搜索执行器
SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(
task,
searchRequest,
shardIterators,
timeProvider,
searchPhaseController,
executor,
listener
);
return action;
}
}
异常与回退
分片失败处理
- 部分失败:
allowPartialSearchResults=true时返回部分结果 - 全部失败: 抛出
SearchPhaseExecutionException - 重试: 自动在副本分片上重试
- 超时: 达到 timeout 后返回已收集的结果
错误类型
| 错误 | HTTP状态码 | 说明 |
|---|---|---|
| NoShardAvailableActionException | 503 | 无可用分片 |
| SearchPhaseExecutionException | 503 | 搜索阶段失败 |
| QueryShardException | 400 | 查询语法错误 |
| CircuitBreakingException | 429 | 断路器触发 |
性能要点与最佳实践
性能优化
-
减少数据传输
- 使用
_sourcefiltering 仅返回需要的字段 - 使用
stored_fields或docvalue_fields
- 使用
-
分页优化
- 深度分页使用
search_after而非from/size - 避免大
from值 (默认 max_result_window=10000)
- 深度分页使用
-
并发控制
- 调整
max_concurrent_shard_requests(默认5) - 大索引增大并发,小索引减小并发
- 调整
-
预过滤
- 启用
pre_filter_shard_size(默认128) - 自动跳过不匹配的分片
- 启用
-
缓存利用
- 对于 size=0 的聚合查询启用
request_cache - Filter 查询自动缓存
- 对于 size=0 的聚合查询启用
最佳实践
// 1. 高效分页
{
"size": 10,
"sort": [{"timestamp": "desc"}, {"_id": "asc"}],
"search_after": [1609459200000, "doc#10"]
}
// 2. _source 过滤
{
"_source": ["title", "author", "publish_date"],
"query": {"match": {"title": "elasticsearch"}}
}
// 3. 仅统计数量
{
"size": 0,
"track_total_hits": true,
"query": {"match_all": {}}
}
// 4. 使用 Filter (可缓存)
{
"query": {
"bool": {
"filter": [
{"term": {"status": "published"}},
{"range": {"date": {"gte": "2024-01-01"}}}
]
}
}
}
7. 模块间详细交互与调用链路
7.1 完整调用链路图
sequenceDiagram
autonumber
participant Client as 客户端
participant REST as RestSearchAction<br/>(REST层)
participant Transport as TransportSearchAction<br/>(Action层)
participant AsyncAction as SearchQueryThenFetch<br/>AsyncAction
participant TransportSvc as SearchTransportService<br/>(Transport层)
participant DataNode as SearchService<br/>(数据节点)
participant Context as SearchContext
participant QueryPhase as QueryPhase.execute
participant Lucene as Lucene IndexSearcher
participant Controller as SearchPhaseController<br/>(归并)
participant FetchPhase as FetchPhase.execute
Note over Client,Lucene: REST层处理
Client->>REST: HTTP POST /index/_search<br/>{query, size, from}
REST->>REST: 解析URL参数 (index, routing)
REST->>REST: 解析JSON Body → SearchSourceBuilder
REST->>REST: 构建SearchRequest对象
REST->>Transport: client.execute(TransportSearchAction.TYPE, request)
Note over Transport,AsyncAction: Action层协调
Transport->>Transport: doExecute(task, searchRequest, listener)
Transport->>Transport: 解析目标索引 → concreteIndices[]
Transport->>Transport: 计算分片路由 → SearchShardIterator[]
Transport->>Transport: 构建别名过滤器 → aliasFilter
Transport->>AsyncAction: new SearchQueryThenFetchAsyncAction(...)<br/>.start()
Note over AsyncAction,TransportSvc: 异步执行器启动
AsyncAction->>AsyncAction: start() → doRun()
AsyncAction->>AsyncAction: 构建分片索引映射 shardIndexMap
loop 每个分片
AsyncAction->>AsyncAction: buildShardSearchRequest(shardIt)
AsyncAction->>AsyncAction: 应用节点级并发控制 (maxConcurrent)
AsyncAction->>TransportSvc: sendExecuteQuery(connection, request)
end
Note over TransportSvc,DataNode: 传输层 RPC 调用
TransportSvc->>DataNode: [RPC] ShardSearchRequest
Note over DataNode,Lucene: 数据节点 Query Phase
DataNode->>DataNode: executeQueryPhase(request, task, listener)
DataNode->>DataNode: 获取 IndexShard
DataNode->>DataNode: rewriteAndFetchShardRequest()<br/>(查询重写)
DataNode->>DataNode: 提交到 search 线程池
DataNode->>Context: createContext(readerContext, request)
Context->>Context: 创建 DefaultSearchContext
Context->>Context: parseSource() → 解析查询/聚合/排序
Context->>Context: preProcess() → 前置处理
DataNode->>QueryPhase: QueryPhase.execute(searchContext)
QueryPhase->>QueryPhase: AggregationPhase.preProcess()
QueryPhase->>QueryPhase: 构建 QueryPhaseCollectorManager
QueryPhase->>QueryPhase: 创建 TopDocsCollector + AggsCollector
QueryPhase->>Lucene: searcher.search(query, collectorManager)
Lucene->>Lucene: 执行 Lucene 查询
Lucene->>Lucene: 收集 Top (size+from) docs
Lucene-->>QueryPhase: QueryPhaseResult<br/>(docIds, scores, sortValues)
QueryPhase->>QueryPhase: RescorePhase.execute() (如果有)
QueryPhase->>QueryPhase: SuggestPhase.execute() (如果有)
QueryPhase-->>DataNode: queryResult
DataNode-->>TransportSvc: QuerySearchResult
TransportSvc-->>AsyncAction: QuerySearchResult
Note over AsyncAction,Controller: 协调节点归并
AsyncAction->>AsyncAction: 收集所有分片的 QuerySearchResult
AsyncAction->>Controller: getNextPhase() → 触发 Reduce
Controller->>Controller: reducedQueryPhase(queryResults)
Controller->>Controller: 归并排序所有分片 Top K
Controller->>Controller: 归并聚合结果 (Aggregations.reduce)
Controller->>Controller: 选出全局 Top size
Controller-->>AsyncAction: ReducedQueryPhase<br/>(全局 Top K docIds by shard)
Note over AsyncAction,FetchPhase: Fetch Phase
AsyncAction->>AsyncAction: 构建 FetchSearchPhase
AsyncAction->>AsyncAction: 按分片分组 docIdsToLoad[]
loop 每个需要fetch的分片
AsyncAction->>TransportSvc: sendExecuteFetch(connection, fetchRequest)
TransportSvc->>DataNode: [RPC] ShardFetchRequest<br/>(contextId, docIds[])
DataNode->>DataNode: executeFetchPhase(request, task, listener)
DataNode->>DataNode: 查找现有 ReaderContext
DataNode->>FetchPhase: FetchPhase.execute(context, docIds)
FetchPhase->>FetchPhase: buildSearchHits(context, docIds)
loop 每个docId
FetchPhase->>Lucene: searcher.doc(docId) → 读取 StoredFields
FetchPhase->>FetchPhase: 提取 _source
FetchPhase->>FetchPhase: 执行 FetchSubPhases<br/>(highlight, fields, etc)
end
FetchPhase-->>DataNode: FetchSearchResult<br/>(SearchHit[])
DataNode-->>TransportSvc: FetchSearchResult
TransportSvc-->>AsyncAction: FetchSearchResult
end
Note over AsyncAction,REST: 构建最终响应
AsyncAction->>Controller: merge(reducedQueryPhase, fetchResults)
Controller->>Controller: 合并文档内容和聚合结果
Controller-->>AsyncAction: SearchResponseSections
AsyncAction->>AsyncAction: 构建 SearchResponse
AsyncAction-->>Transport: SearchResponse
Transport-->>REST: SearchResponse
REST->>REST: 格式化为 JSON
REST-->>Client: HTTP 200 OK<br/>{hits, aggregations, took}
7.2 关键代码调用链路详解
7.2.1 REST层 → Action层
RestSearchAction.prepareRequest()
关键步骤:
1. 解析 URL 参数: indices, routing, preference, scroll
2. 解析请求体: withContentOrSourceParamParserOrNull()
3. 调用 parseSearchRequest() 解析 Query DSL
4. 构建 SearchRequest 对象
5. 执行: client.execute(TransportSearchAction.TYPE, searchRequest, listener)
核心代码路径:
RestSearchAction.prepareRequest()
→ parseSearchRequest(searchRequest, request, parser)
→ searchRequest.source().parseXContent(parser) // 解析Query DSL
→ RestCancellableNodeClient.execute(TransportSearchAction.TYPE, searchRequest)
→ TransportSearchAction.doExecute()
7.2.2 TransportSearchAction协调流程
TransportSearchAction.doExecute()
关键步骤:
1. 解析目标索引: indexNameExpressionResolver.concreteIndices()
2. 计算分片路由: operationRouting.searchShards()
3. 构建别名过滤器: buildPerIndexAliasFilter()
4. 创建SearchShardIterator[]: buildShardIterators()
5. 决定搜索类型并创建AsyncAction:
- QUERY_THEN_FETCH → SearchQueryThenFetchAsyncAction
- DFS_QUERY_THEN_FETCH → SearchDfsQueryThenFetchAsyncAction
6. 启动异步执行: searchAsyncAction.start()
分片路由计算:
operationRouting.searchShards():
- 根据 routing 参数选择分片
- 根据 preference 选择主分片或副本分片
- _primary: 总是使用主分片
- _replica: 总是使用副本分片
- _local: 优先使用本地分片
- _only_local: 仅使用本地分片
- 构建 SearchShardIterator: 包含主分片和所有副本的迭代器
7.2.3 SearchQueryThenFetchAsyncAction执行流程
AbstractSearchAsyncAction.start() → doRun()
关键步骤:
1. 检查是否需要 CanMatch 预过滤:
- 条件: preFilterSearchShards && 分片数 > threshold
- 执行 CanMatchPreFilterSearchPhase
2. 如果不需要预过滤,直接执行 Query Phase:
- 遍历所有 SearchShardIterator
- 对每个分片调用 executePhaseOnShard()
3. 并发控制:
- 使用 pendingExecutionsPerNode 限制每节点并发
- 默认 maxConcurrentShardRequests = 5
并发控制机制:
// 每个节点维护一个待执行队列和 Semaphore
class PendingExecutions {
private final Semaphore permits; // 最多maxConcurrent个许可
private final Queue<Runnable> queue; // 待执行队列
void tryExecute(Runnable task) {
if (permits.tryAcquire()) {
task.run(); // 立即执行
} else {
queue.add(task); // 加入队列
}
}
void finishOperation() {
permits.release();
Runnable next = queue.poll();
if (next != null) next.run(); // 执行下一个
}
}
Query Phase 发送请求:
executePhaseOnShard():
1. 构建 ShardSearchRequest
2. 通过 SearchTransportService 发送 RPC 请求
3. 注册异步回调监听器 SearchActionListener
7.2.4 数据节点 SearchService 执行
SearchService.executeQueryPhase()
关键步骤:
1. 获取或创建 ReaderContext:
- 如果是首次查询: 创建新的 ReaderContext
- 如果是 Scroll/PIT: 复用现有 ReaderContext
2. 重写查询: rewriteAndFetchShardRequest()
- 在分片上下文重写 Query
- 优化查询结构
3. 提交到 search 线程池:
- 使用 runAsync(executor, task)
- executor = threadPool.executor(Names.SEARCH)
4. 在线程池中执行:
- 创建 SearchContext
- 调用 QueryPhase.execute(searchContext)
- 返回 QuerySearchResult
SearchContext 创建:
createContext():
1. 创建 DefaultSearchContext
2. 解析 SearchSource:
- parseSource(context, request.source())
- 解析 query, aggregations, sort, highlight
3. 设置默认值 from=0, size=10
4. 执行 preProcess():
- 构建 Lucene Query
- 初始化聚合上下文
7.2.5 QueryPhase 执行
QueryPhase.execute()
关键步骤:
1. 预处理聚合: AggregationPhase.preProcess()
2. 构建 CollectorManager:
- TopDocsCollector: 收集 Top K 文档
- AggregationCollector: 执行聚合计算
- 使用 MultiCollector 组合多个 Collector
3. 执行搜索: searcher.search(query, collectorManager)
4. 后处理:
- RescorePhase.execute(): 重新打分
- SuggestPhase.execute(): 搜索建议
5. 设置结果: queryResult.topDocs()
Collector 构建:
QueryPhaseCollectorManager.newCollector():
1. 如果启用 profile: 包装 InternalProfileCollector
2. 创建 TopDocsCollector:
- 如果有排序: TopFieldCollector
- 否则: TopScoreDocCollector
3. 创建 AggregationCollector (如果有聚合)
4. 组合成 QueryPhaseCollector
7.2.6 Reduce Phase 归并
SearchPhaseController.reducedQueryPhase()
关键步骤:
1. 收集所有非空的 QuerySearchResult
2. 归并排序:
- 使用 sortDocs() 进行 K 路归并
- 创建 ScoreDoc[] 优先队列
- 按分数/排序字段排序
3. 归并聚合:
- InternalAggregations.reduce()
- 对每种聚合类型执行归并逻辑
4. 构建 ReducedQueryPhase:
- 包含全局 Top K 的 (shardIndex, docId)
- 归并后的聚合结果
sortDocs() 归并算法:
算法流程:
1. 创建 PriorityQueue<ScoreDoc>
2. 从每个分片的 TopDocs 取第一个文档放入队列
3. 循环:
- 从队列取出最高分/最优排序值的文档
- 从该分片取下一个文档放入队列
- 重复直到收集够 size 个文档
4. 时间复杂度: O(N * K * log K)
- N = 分片数
- K = size + from
7.2.7 Fetch Phase 执行
FetchPhase.execute()
关键步骤:
1. 如果 docIds 为空: 返回空结果
2. 调用 buildSearchHits(context, docIds):
- 遍历所有 docId
- 对每个 docId:
a. 读取 StoredFields: searcher.doc(docId)
b. 提取 _source 字段
c. 执行 FetchSubPhases
3. FetchSubPhases 包括:
- FetchSourcePhase: 过滤 _source
- HighlightPhase: 生成高亮片段
- FetchFieldsPhase: 提取指定字段
- FetchDocValuesPhase: 提取 doc_values
FetchSubPhase 处理:
getProcessors():
1. 遍历所有注册的 FetchSubPhase
2. 调用 fsp.getProcessor(context)
3. 如果返回非空 processor:
- 包装成 ProfileProcessor (如果启用 profile)
- 添加到 processors 列表
4. 返回 processors 列表
FetchSubPhase 执行顺序:
1. FetchSourcePhase: 提取和过滤 _source
2. FetchFieldsPhase: 提取 fields
3. FetchDocValuesPhase: 提取 docvalue_fields
4. HighlightPhase: 生成高亮
5. ExplainPhase: 生成评分解释 (如果 explain=true)
6. InnerHitsPhase: 处理嵌套/父子文档
8. 核心流程时序图
8.1 标准搜索流程 (QUERY_THEN_FETCH)
sequenceDiagram
autonumber
participant Client as 客户端
participant Coord as 协调节点<br/>(TransportSearchAction)
participant CanMatch as CanMatch Phase
participant Shard1 as Shard 1
participant Shard2 as Shard 2
participant ShardN as Shard N
participant Reducer as SearchPhaseController<br/>(结果归并)
Client->>Coord: POST /index/_search<br/>{query, size, from}
Coord->>Coord: 1. 解析请求<br/>构建 SearchRequest
Coord->>Coord: 2. 计算目标分片<br/>(所有主分片或副本)
Note over Coord,ShardN: CanMatch Phase - 预过滤
Coord->>CanMatch: 3. 启动 CanMatch Phase
par 并发检查所有分片
CanMatch->>Shard1: CanMatchRequest<br/>(min/max range check)
Shard1->>Shard1: 检查分片范围<br/>(MinMax统计)
Shard1-->>CanMatch: canMatch=true
and
CanMatch->>Shard2: CanMatchRequest
Shard2->>Shard2: 检查分片范围
Shard2-->>CanMatch: canMatch=false<br/>(跳过此分片)
and
CanMatch->>ShardN: CanMatchRequest
ShardN->>ShardN: 检查分片范围
ShardN-->>CanMatch: canMatch=true
end
CanMatch->>Coord: 过滤后的分片列表<br/>[Shard1, ShardN]
Note over Coord,ShardN: Query Phase - 查询阶段
Coord->>Coord: 4. 启动 Query Phase
par 并发查询所有匹配的分片
Coord->>Shard1: ShardSearchRequest<br/>(query, size+from, aggs)
Shard1->>Shard1: 5. 创建 SearchContext
Shard1->>Shard1: 6. 构建 Lucene Query
Shard1->>Shard1: 7. 执行搜索<br/>searcher.search(query, collector)
Shard1->>Shard1: 8. 收集 Top (size+from) 文档
Shard1->>Shard1: 9. 执行聚合<br/>(如果有)
Shard1-->>Coord: QuerySearchResult<br/>(docIds[], scores[], aggs)
and
Coord->>ShardN: ShardSearchRequest
ShardN->>ShardN: 创建 SearchContext
ShardN->>ShardN: 构建 Query
ShardN->>ShardN: 执行搜索
ShardN->>ShardN: 收集 Top docs
ShardN->>ShardN: 执行聚合
ShardN-->>Coord: QuerySearchResult<br/>(docIds[], scores[], aggs)
end
Note over Coord,Reducer: Reduce Phase - 归并阶段
Coord->>Reducer: 10. 归并所有分片结果
Reducer->>Reducer: 11. 归并排序<br/>所有 QueryResult
Reducer->>Reducer: 12. 选择全局 Top (size)<br/>跳过 from
Reducer->>Reducer: 13. 归并聚合结果<br/>(如果有)
Reducer-->>Coord: ReducedQueryPhase<br/>(Top K docIds by shard)
Note over Coord,ShardN: Fetch Phase - 获取阶段
Coord->>Coord: 14. 构建 Fetch 请求<br/>按分片分组 docIds
par 并发获取文档内容
Coord->>Shard1: ShardFetchRequest<br/>(docIds: [3, 7, 15])
Shard1->>Shard1: 15. 读取文档内容<br/>searcher.doc(docId)
Shard1->>Shard1: 16. 生成高亮<br/>(如果需要)
Shard1->>Shard1: 17. 提取 _source
Shard1-->>Coord: FetchSearchResult<br/>(documents[])
and
Coord->>ShardN: ShardFetchRequest<br/>(docIds: [2, 9])
ShardN->>ShardN: 读取文档
ShardN->>ShardN: 生成高亮
ShardN->>ShardN: 提取 _source
ShardN-->>Coord: FetchSearchResult<br/>(documents[])
end
Coord->>Coord: 18. 合并 Fetch 结果<br/>按分数顺序排列
Coord-->>Client: SearchResponse<br/>{hits[], aggregations, took, total}
时序图说明
阶段划分
1. CanMatch Phase (预过滤)
- 目的: 快速过滤掉不包含匹配文档的分片
- 方法: 检查分片的 min/max 值范围
- 优化: 可以减少 50%+ 的分片查询
- 适用: 带范围过滤的查询 (range, date_range)
2. Query Phase (查询阶段)
- 目的: 获取每个分片的 Top K 文档 ID 和分数
- 输出: 轻量级结果 (docId + score + sort values)
- 并发: 所有分片并行执行
- 内存: 每个分片保留 size+from 个结果
3. Reduce Phase (归并阶段)
- 目的: 合并所有分片的结果,选出全局 Top K
- 算法: 归并排序 (O(NKlog K))
- 聚合: 合并聚合结果
- 输出: 全局 Top size 的 (shardId, docId)
4. Fetch Phase (获取阶段)
- 目的: 获取完整文档内容
- 输入: 全局 Top K 的 (shardId, docId)
- 并发: 按需分片并行获取
- 开销: 网络传输 + 文档读取
时序图功能说明
流程概述
该时序图展示了Elasticsearch标准搜索(QUERY_THEN_FETCH)的完整执行流程,从客户端发起HTTP请求到返回最终结果的全过程。整个流程分为四个主要阶段:
- CanMatch Phase (步骤1-11): 预过滤阶段,快速判断哪些分片可能包含匹配文档
- Query Phase (步骤12-20): 查询阶段,在每个匹配的分片上执行查询并返回Top K的文档ID和分数
- Reduce Phase (步骤21-23): 归并阶段,在协调节点合并所有分片的结果并选出全局Top K
- Fetch Phase (步骤24-33): 获取阶段,获取Top K文档的完整内容
关键步骤详解
步骤1-2: 客户端请求
- 客户端通过HTTP POST发送搜索请求
- 请求体包含查询条件(query)、分页参数(size, from)、排序规则等
步骤3-4: CanMatch预过滤
- 协调节点向所有分片并发发送CanMatch请求
- 每个分片检查自己的最小/最大值范围(基于PointValues)
- 如果分片的数据范围与查询条件不重叠,返回canMatch=false
- 优化效果: 对于带范围过滤的查询,可以跳过50%+的分片
步骤5-20: Query Phase
- 步骤5: 协调节点启动Query Phase,向所有canMatch=true的分片发送查询请求
- 步骤6-9: 每个分片并发执行:
- 创建SearchContext封装搜索状态
- 构建Lucene Query对象
- 执行searcher.search()收集Top (size+from)个文档
- 如果有聚合,同时执行聚合计算
- 步骤10: 分片返回QuerySearchResult,包含:
- docIds[]: 文档内部ID数组
- scores[]: 对应的分数数组
- sortValues[][]: 排序字段值(如果有排序)
- aggregations: 分片级聚合结果
- 关键优化: Query Phase仅传输轻量级数据(ID+分数),不传输文档内容
步骤11-13: Reduce Phase
- 步骤11: 协调节点收集所有分片的QuerySearchResult
- 步骤12: 执行K路归并排序:
- 使用优先队列(PriorityQueue)合并所有分片的Top K
- 时间复杂度: O(N * K * log K), N=分片数, K=size+from
- 按分数或排序字段值排序
- 步骤13: 归并聚合结果:
- 对sum/avg/max/min类型聚合直接合并
- 对terms类型聚合重新计算Top Terms
- 步骤14: 选出全局Top size,跳过from
步骤14-23: Fetch Phase
- 步骤14: 构建Fetch请求,按分片分组docIds
- 步骤15-20: 向需要的分片并发发送ShardFetchRequest
- 步骤21-22: 每个分片执行:
- 通过docId读取StoredFields
- 提取_source字段
- 生成高亮片段(如果请求了highlight)
- 执行其他FetchSubPhases
- 步骤23: 返回FetchSearchResult,包含完整文档数据
步骤24-25: 构建最终响应
- 合并Fetch结果和聚合结果
- 按全局Top K的顺序排列文档
- 计算总耗时(took)
- 返回SearchResponse给客户端
性能特点
| 阶段 | 网络传输 | 计算开销 | 并发度 | 优化空间 |
|---|---|---|---|---|
| CanMatch | 极小(仅minmax) | 极低 | 所有分片 | 缓存minmax统计 |
| Query | 小(ID+score) | 高(查询+聚合) | 所有匹配分片 | 缓存查询结果 |
| Reduce | 无 | 中(归并排序) | 单线程 | 批量归并 |
| Fetch | 大(完整文档) | 低(读取) | 仅需要的分片 | 缓存文档内容 |
并发控制
- 分片级并发: 默认同时查询5个分片(maxConcurrentShardRequests)
- 节点级背压: 每个数据节点最多同时处理5个来自同一协调节点的请求
- 自适应调整: 根据集群规模动态调整并发数
失败重试
- 如果某个分片请求失败,自动尝试该分片的副本
- 如果所有副本都失败:
- allowPartialSearchResults=true: 返回部分结果
- allowPartialSearchResults=false: 返回503错误
8.2 CanMatch Phase 内部时序图
sequenceDiagram
autonumber
participant Coord as 协调节点<br/>CanMatchPreFilterPhase
participant Rewrite as CoordinatorRewriteContext<br/>查询重写
participant Transport as SearchTransportService
participant Shard as 数据节点分片
participant MinMax as PointValues<br/>MinMax统计
Note over Coord,MinMax: 阶段0: 协调节点重写
Coord->>Coord: runCoordinatorRewritePhase()
loop 每个分片
Coord->>Rewrite: queryStillMatchesAfterRewrite()
Rewrite->>Rewrite: 重写查询<br/>(范围查询优化)
Rewrite->>Rewrite: 检查查询是否可能匹配<br/>(基于索引元数据)
Rewrite-->>Coord: canMatch (true/false)
alt canMatch=false (协调节点就能判断)
Coord->>Coord: 标记分片skip=true
Coord->>Coord: numPossibleMatches不增加
else canMatch=true
Coord->>Coord: 加入matchedShardLevelRequests
end
end
Note over Coord,MinMax: 阶段1: 向数据节点发送CanMatch请求
Coord->>Coord: 按节点分组请求<br/>groupByNode()
loop 每个目标节点
Coord->>Transport: sendCanMatch(nodeId, shards[])
Transport->>Shard: [RPC] CanMatchNodeRequest
loop 该节点的每个分片
Shard->>Shard: 获取 IndexReader
Shard->>MinMax: 获取字段的min/max值<br/>PointValues.getMinPackedValue()<br/>PointValues.getMaxPackedValue()
MinMax-->>Shard: min/max BytesRef
Shard->>Shard: 检查查询范围与分片范围是否重叠
Note over Shard: 示例: 查询 date >= 2024-01-01<br/>分片 max(date) = 2023-12-31<br/>→ canMatch = false
Shard->>Shard: 如果有排序,同时返回minSortValue/maxSortValue
Shard-->>Transport: CanMatchShardResponse<br/>(canMatch, minMax, estimatedCount)
end
Transport-->>Coord: CanMatchNodeResponse[]
end
Note over Coord: 阶段2: 处理响应并排序分片
loop 每个分片响应
alt canMatch=true
Coord->>Coord: possibleMatches.set(shardIndex)
Coord->>Coord: numPossibleMatches++
Coord->>Coord: 记录minAndMaxes[shardIndex]
else canMatch=false
Coord->>Coord: 跳过该分片
end
end
Coord->>Coord: 如果有排序,按minSortValue排序分片<br/>(确保包含早期数据的分片先查询)
Coord->>Coord: 构建过滤后的 SearchShardIterator[]
Coord-->>Coord: 返回可能匹配的分片列表<br/>numPossibleMatches / totalShards
Note over Coord: 优化效果示例:<br/>原始128个分片 → 过滤后仅64个分片<br/>减少50%的Query Phase工作量
CanMatch Phase 功能详解
目的与触发条件
- 目的: 在不执行完整查询的情况下,快速判断分片是否可能包含匹配文档
- 触发条件:
- 分片总数 >
pre_filter_shard_size(默认128) - 或显式启用
preFilterSearchShards=true
- 分片总数 >
- 跳过条件: 对于小规模索引(分片数<128),直接执行Query Phase更高效
两级过滤机制
1. 协调节点级过滤 (Coordinator Rewrite)
- 执行位置: 协调节点
- 数据来源: 集群元数据 (ClusterState)
- 检查内容:
- 索引创建时间范围
- 分片的routing元数据
- 简单的查询改写(query rewrite)
- 优点: 零网络开销,极快
- 局限: 只能基于元数据,无法访问实际数据
2. 数据节点级过滤 (PointValues MinMax)
- 执行位置: 数据节点
- 数据来源: Lucene PointValues的min/max统计
- 检查内容:
- 数值字段的最小/最大值
- 日期字段的最早/最晚时间
- 地理位置字段的边界框
- 算法:
// 伪代码 boolean canMatch(Query query, Shard shard) { if (query instanceof RangeQuery) { BytesRef shardMin = shard.getMinValue(field); BytesRef shardMax = shard.getMaxValue(field); // 查询范围: [queryMin, queryMax] // 分片范围: [shardMin, shardMax] // 判断是否重叠 if (queryMax < shardMin || queryMin > shardMax) { return false; // 不重叠,跳过分片 } } return true; // 可能匹配 }
分片排序优化
如果查询包含排序,CanMatch Phase会收集每个分片的minSortValue和maxSortValue,并据此排序分片:
- 降序排序 (desc): 先查询maxSortValue最大的分片
- 升序排序 (asc): 先查询minSortValue最小的分片
好处:
- 先返回最相关的结果,改善用户体验
- 可能触发早期终止(early termination),减少查询的分片数
性能数据
| 场景 | 原始分片数 | 过滤后分片数 | 减少比例 | 性能提升 |
|---|---|---|---|---|
| 日期范围查询(最近7天) | 365 | 7 | 98% | 10-50x |
| 数值范围查询(price > 1000) | 100 | 45 | 55% | 2-3x |
| 无范围查询(match query) | 100 | 100 | 0% | 无提升 |
最佳实践
- 使用日期分区索引: 按天/周/月创建索引,CanMatch可以完全跳过旧索引
- 在范围字段上使用PointValues: 数值、日期、IP字段自动支持
- 合理设置pre_filter_shard_size: 对于大集群可以降低阈值(如64)
8.3 Query Phase 内部时序图
sequenceDiagram
autonumber
participant Coord as 协调节点<br/>AsyncAction
participant Transport as SearchTransportService
participant Service as SearchService<br/>(数据节点)
participant Context as DefaultSearchContext
participant QueryPhase as QueryPhase
participant Collector as CollectorManager
participant Lucene as Lucene IndexSearcher
participant Aggs as AggregationPhase
Note over Coord,Lucene: 协调节点发起Query请求
Coord->>Coord: executePhaseOnShard(shardIt)
Coord->>Coord: buildShardSearchRequest(shardIt)<br/>构造ShardSearchRequest
Coord->>Coord: 应用并发控制<br/>(每节点最多5个并发)
Coord->>Transport: sendExecuteQuery(connection, request)
Transport->>Service: [RPC] ShardSearchRequest
Note over Service,Context: 数据节点接收并处理
Service->>Service: executeQueryPhase(request, task, listener)
Service->>Service: 获取/创建 ReaderContext<br/>(持有IndexReader引用)
Service->>Service: rewriteAndFetchShardRequest()<br/>重写查询(分片级优化)
Service->>Service: 提交到search线程池<br/>executor.execute()
Note over Service,Context: search线程池中执行
Service->>Context: createContext(readerContext, request)
Context->>Context: new DefaultSearchContext()
Context->>Context: parseSource(request.source())<br/>解析查询/聚合/排序
Context->>Context: 构建Lucene Query对象
Note over Context: QueryBuilder → Query转换:<br/>MatchQueryBuilder → TermQuery<br/>RangeQueryBuilder → PointRangeQuery<br/>BoolQueryBuilder → BooleanQuery
Context->>Context: 构建Sort对象<br/>(如果有排序)
Context->>Context: 解析Aggregations<br/>构建AggregatorFactories
Context->>Context: preProcess()<br/>查询预处理和验证
Service->>QueryPhase: QueryPhase.execute(searchContext)
Note over QueryPhase,Aggs: Query Phase 执行
QueryPhase->>Aggs: AggregationPhase.preProcess(searchContext)
Aggs->>Aggs: 为每个聚合创建Aggregator
Aggs->>Aggs: 构建聚合树(嵌套聚合)
Aggs-->>QueryPhase: AggregationContext
QueryPhase->>Collector: createQueryPhaseCollectorManager()
alt 有排序
Collector->>Collector: new TopFieldCollectorManager<br/>(排序字段, size+from)
else 无排序
Collector->>Collector: new TopScoreDocCollectorManager<br/>(size+from)
end
alt 有聚合
Collector->>Collector: aggsCollectorManager.newCollector()
Collector->>Collector: MultiCollector.wrap(<br/> topDocsCollector,<br/> aggsCollector<br/>)
end
alt 有postFilter
Collector->>Collector: 创建FilteredCollector<br/>(在聚合后过滤)
end
Collector-->>QueryPhase: QueryPhaseCollectorManager
QueryPhase->>Lucene: searcher.search(query, collectorManager)
Note over Lucene: Lucene 搜索执行
Lucene->>Lucene: 1. Weight weight = query.createWeight()
Lucene->>Lucene: 2. 遍历所有Segment
loop 每个Segment
Lucene->>Lucene: LeafReaderContext leafCtx = segment
Lucene->>Lucene: Scorer scorer = weight.scorer(leafCtx)
Lucene->>Lucene: Collector collector = collectorManager.newCollector()
Lucene->>Lucene: collector.setScorer(scorer)
loop 匹配的文档
Lucene->>Lucene: int docId = scorer.nextDoc()
Lucene->>Lucene: float score = scorer.score()
Lucene->>Lucene: collector.collect(docId)
alt TopDocsCollector
Lucene->>Lucene: 如果分数足够高<br/>加入PriorityQueue
end
alt AggsCollector
Lucene->>Lucene: 执行聚合计算<br/>aggregator.collect(docId)
end
end
Lucene->>Lucene: collector.finishLeaf()
end
Lucene->>Lucene: collectorManager.reduce(collectors)<br/>合并所有Segment的结果
Lucene-->>QueryPhase: QueryPhaseResult<br/>(TopDocs, totalHits)
Note over QueryPhase: 后处理阶段
QueryPhase->>QueryPhase: 设置queryResult.topDocs(topDocs)
QueryPhase->>QueryPhase: 设置queryResult.aggregations(aggs)
alt 有Rescore
QueryPhase->>QueryPhase: RescorePhase.execute(searchContext)<br/>重新打分Top K文档
end
alt 有Suggest
QueryPhase->>QueryPhase: SuggestPhase.execute(searchContext)<br/>生成搜索建议
end
QueryPhase-->>Service: QuerySearchResult
Note over Service,Transport: 返回结果给协调节点
Service->>Service: 检查是否单次查询<br/>(non-scroll)
alt 单次查询且无结果
Service->>Service: freeReaderContext()<br/>立即释放资源
else Scroll/PIT
Service->>Service: 保持ReaderContext<br/>等待后续请求
end
Service-->>Transport: QuerySearchResult<br/>(docIds[], scores[], aggs)
Transport-->>Coord: QuerySearchResult
Note over Coord: 收集所有分片结果<br/>准备进入Reduce Phase
Query Phase 功能详解
执行位置与线程模型
- 执行位置: 数据节点的每个主分片或副本分片
- 线程池:
search线程池- 大小:
int((processors * 3) / 2) + 1 - 队列: 1000
- 拒绝策略: AbortPolicy
- 大小:
- 并发: 所有分片并行执行
查询重写优化
在分片级别,查询会经过多次重写优化:
// 重写示例
原始查询: MatchQuery("title", "elasticsearch")
↓ 重写1: 分析器处理
TermQuery("title", "elasticsearch")
↓ 重写2: 前缀优化
如果title字段只有一个Term匹配,可能重写为ConstantScoreQuery
↓ 重写3: 分片级优化
如果该分片该Term的docFreq=0,重写为MatchNoDocsQuery
Collector 组合策略
根据查询参数,QueryPhase会创建不同的Collector组合:
| 查询特征 | Collector 组合 | 说明 |
|---|---|---|
| 仅查询,无聚合,无排序 | TopScoreDocCollector | 最简单,仅收集Top K文档 |
| 有排序,无聚合 | TopFieldCollector | 按排序字段收集 |
| 有聚合,无排序 | TopScoreDocCollector + AggsCollector | 并行收集 |
| 有聚合,有排序 | TopFieldCollector + AggsCollector | 并行收集 |
| 有postFilter | FilteredCollector(上述任一) | 先聚合,再过滤 |
聚合执行
聚合在Query Phase并行执行,与文档收集同时进行:
// 聚合执行伪代码
for (Document doc : matchedDocs) {
topDocsCollector.collect(doc); // 收集到TopDocs
aggsCollector.collect(doc); // 同时执行聚合
// 对于terms聚合
String value = doc.getField("category");
termsBucket.get(value).count++;
// 对于metrics聚合
double price = doc.getField("price");
sumAggregator.sum += price;
avgAggregator.sum += price;
avgAggregator.count++;
}
TopDocs 收集策略
- size + from: 每个分片必须收集 (size + from) 个文档
- 例如: size=10, from=100 → 每个分片收集110个文档
- 协调节点归并后跳过前100个,返回10个
- 优先队列: 使用固定大小的PriorityQueue维护Top K
- 当新文档分数高于队列最低分时,替换最低分文档
- 队列满后,只处理分数足够高的文档
- 提前终止: 如果设置了
terminate_after参数,收集到指定数量文档后提前终止
Rescore 重新打分
如果查询包含rescorer,会在Query Phase的最后阶段执行:
- 基于主查询收集Top K文档
- 对Top K文档执行rescore查询(通常更复杂/更精确)
- 重新计算分数并重新排序
- 返回重排序后的Top K
性能优化点
- 查询缓存: 对于filter查询,Lucene会缓存匹配的文档集合
- 段级缓存: 对于常见查询,Lucene在段级别缓存结果
- 跳跃指针: 使用skip list加速文档遍历
- WAND优化: 对于disjunction查询(OR),使用WAND算法减少评分计算
8.4 Reduce Phase 内部时序图
sequenceDiagram
autonumber
participant AsyncAction as SearchQueryThenFetch<br/>AsyncAction
participant Consumer as QueryPhaseResult<br/>Consumer
participant Controller as SearchPhaseController
participant MergeQueue as PriorityQueue<br/>归并队列
participant AggsReduce as InternalAggregations<br/>聚合归并
Note over AsyncAction,AggsReduce: 收集所有分片结果
loop 每个分片
AsyncAction->>Consumer: onShardResult(QuerySearchResult)
Consumer->>Consumer: 添加到results数组<br/>results.set(shardIndex, result)
Consumer->>Consumer: successfulOps++
alt 启用批量归并 && 达到batchSize
Consumer->>Consumer: 执行批量部分归并<br/>减少内存占用
end
end
AsyncAction->>AsyncAction: 等待所有分片完成<br/>countDown.await()
AsyncAction->>AsyncAction: getNextPhase() → 触发Reduce
Note over Controller,MergeQueue: Reduce Phase 开始
AsyncAction->>Controller: reducedQueryPhase(queryResults)
Controller->>Controller: 1. 收集所有非空 QuerySearchResult
Controller->>Controller: 2. 验证排序字段格式一致性
Note over Controller: 归并TopDocs
Controller->>Controller: sortDocs(results[], from, size)
Controller->>MergeQueue: 创建 ScoreDocQueue<br/>优先队列(容量=size)
loop 每个分片的TopDocs
MergeQueue->>MergeQueue: 取第一个ScoreDoc加入队列<br/>queue.insertWithOverflow(scoreDoc)
end
loop size 次
MergeQueue->>MergeQueue: 从队列取出最高分文档<br/>topDoc = queue.pop()
MergeQueue->>MergeQueue: 记录 (shardIndex, docId, score)
alt 该分片还有更多文档
MergeQueue->>MergeQueue: 从该分片取下一个ScoreDoc<br/>加入队列
end
end
MergeQueue-->>Controller: 全局TopDocs[]<br/>已按分数排序
Controller->>Controller: 跳过前 from 个文档
Controller->>Controller: 选出 size 个文档
Note over Controller,AggsReduce: 归并聚合结果
alt 有聚合
Controller->>AggsReduce: reduce(List<InternalAggregations>)
loop 每个聚合
alt Terms聚合
AggsReduce->>AggsReduce: 1. 合并所有分片的buckets
AggsReduce->>AggsReduce: 2. 按count降序排序
AggsReduce->>AggsReduce: 3. 选出TopN buckets
AggsReduce->>AggsReduce: 4. 递归归并子聚合
else Metrics聚合 (sum/avg/max/min)
AggsReduce->>AggsReduce: 直接合并指标值
Note over AggsReduce: sum: 累加所有分片的sum<br/>avg: sum(所有sum) / sum(所有count)<br/>max: max(所有max)<br/>min: min(所有min)
else Cardinality聚合
AggsReduce->>AggsReduce: 合并HyperLogLog草图
else Percentiles聚合
AggsReduce->>AggsReduce: 合并TDigest草图
end
end
AggsReduce-->>Controller: 归并后的InternalAggregations
end
Note over Controller: 归并Suggest结果
alt 有Suggest
Controller->>Controller: Suggest.reduce(groupedSuggestions)
loop 每个Suggestion
Controller->>Controller: 合并所有分片的options
Controller->>Controller: 按score排序
Controller->>Controller: 选出TopN options
end
end
Note over Controller: 构建ReducedQueryPhase
Controller->>Controller: new ReducedQueryPhase(<br/> topDocs: 全局Top K的(shard,docId),<br/> aggregations: 归并后的聚合,<br/> totalHits: 总命中数,<br/> maxScore: 最高分,<br/> suggest: 归并后的建议,<br/> numReducePhases: 归并阶段数<br/>)
Controller-->>AsyncAction: ReducedQueryPhase
AsyncAction->>AsyncAction: 构建docIdsToLoad[]<br/>按分片分组TopDocs
Note over AsyncAction: 准备进入Fetch Phase
Reduce Phase 功能详解
执行位置与时机
- 执行位置: 协调节点
- 执行时机: 收集到所有(或足够多)分片的QuerySearchResult后
- 线程: 协调节点的search线程
批量归并优化
对于大规模搜索(涉及大量分片),Elasticsearch使用批量归并(Batched Reduce)策略:
// 批量归并参数
int batchedReduceSize = 512; // 默认每批512个分片
// 归并策略
if (numShards <= batchedReduceSize) {
// 少量分片: 一次性归并
reduce(allResults);
} else {
// 大量分片: 分批归并
while (results.size() > batchedReduceSize) {
List<QuerySearchResult> batch = results.take(batchedReduceSize);
ReducedQueryPhase partial = reduce(batch);
results.add(partial); // 部分结果作为新的"分片"
}
reduce(results); // 最终归并
}
好处:
- 减少内存占用: 避免同时持有所有分片的结果
- 减少GC压力: 及时释放已归并的结果
- 提高响应速度: 可以流式处理结果
归并排序算法详解
使用K路归并排序算法,基于PriorityQueue实现:
// 归并排序伪代码
PriorityQueue<ScoreDoc> queue = new PriorityQueue<>(
size,
(doc1, doc2) -> {
// 比较规则
if (doc1.score != doc2.score) {
return Float.compare(doc2.score, doc1.score); // 分数降序
}
// 分数相同,按shardIndex排序(保证稳定性)
return Integer.compare(doc1.shardIndex, doc2.shardIndex);
}
);
// 初始化: 每个分片的第一个文档加入队列
for (int i = 0; i < numShards; i++) {
TopDocs shardTopDocs = results[i].topDocs();
if (shardTopDocs.scoreDocs.length > 0) {
ScoreDoc doc = shardTopDocs.scoreDocs[0];
doc.shardIndex = i;
doc.docIndex = 0;
queue.add(doc);
}
}
// 归并: 取size个最高分文档
ScoreDoc[] topDocs = new ScoreDoc[size];
for (int i = 0; i < size && queue.isNotEmpty(); i++) {
ScoreDoc top = queue.poll(); // 取出当前最高分
topDocs[i] = top;
// 从同一分片取下一个文档
TopDocs shardTopDocs = results[top.shardIndex].topDocs();
int nextDocIndex = top.docIndex + 1;
if (nextDocIndex < shardTopDocs.scoreDocs.length) {
ScoreDoc nextDoc = shardTopDocs.scoreDocs[nextDocIndex];
nextDoc.shardIndex = top.shardIndex;
nextDoc.docIndex = nextDocIndex;
queue.add(nextDoc);
}
}
时间复杂度分析:
- 初始化: O(N), N=分片数
- 归并: O(K * log N), K=size
- 总计: O(N + K * log N)
- 对于典型场景 (N=10, K=10): ~40次比较操作
聚合归并策略
不同类型的聚合有不同的归并策略:
1. Terms聚合归并
// Terms聚合归并算法
Map<String, Bucket> mergedBuckets = new HashMap<>();
// 1. 合并所有分片的buckets
for (TermsAgg shardAgg : shardAggs) {
for (Bucket bucket : shardAgg.getBuckets()) {
Bucket merged = mergedBuckets.computeIfAbsent(
bucket.getKey(),
k -> new Bucket(k)
);
merged.docCount += bucket.docCount;
// 递归归并子聚合
merged.subAggs = reduceSubAggs(merged.subAggs, bucket.subAggs);
}
}
// 2. 排序并选出TopN
List<Bucket> topBuckets = mergedBuckets.values()
.stream()
.sorted(Comparator.comparingLong(Bucket::getDocCount).reversed())
.limit(size)
.collect(Collectors.toList());
2. Metrics聚合归并
// Metrics聚合归并
// Sum聚合
double totalSum = shardSums.stream().mapToDouble(s -> s).sum();
// Avg聚合
double totalSum = shardAvgs.stream().mapToDouble(a -> a.sum).sum();
long totalCount = shardAvgs.stream().mapToLong(a -> a.count).sum();
double avg = totalSum / totalCount;
// Max聚合
double max = shardMaxs.stream().mapToDouble(m -> m).max().getAsDouble();
// Min聚合
double min = shardMins.stream().mapToDouble(m -> m).min().getAsDouble();
3. Cardinality聚合归并
// 基于HyperLogLog算法
HyperLogLogPlusPlus merged = new HyperLogLogPlusPlus(precision);
for (HyperLogLogPlusPlus shardHLL : shardHLLs) {
merged.merge(shardHLL); // 合并HLL草图
}
long cardinality = merged.cardinality(); // 估算基数
深度分页问题
Reduce Phase是深度分页性能问题的根源:
| from + size | 每分片返回 | 10分片总计 | 内存占用 | 归并计算 |
|---|---|---|---|---|
| 10 | 10 | 100 docs | 几KB | ~100次比较 |
| 1000 | 1000 | 10000 docs | 几MB | ~10K次比较 |
| 10000 | 10000 | 100K docs | 几十MB | ~100K次比较 |
解决方案:
- 使用
search_after分页(推荐) - 使用Scroll API(导出场景)
- 限制
max_result_window(默认10000)
8.5 Fetch Phase 内部时序图
sequenceDiagram
autonumber
participant Coord as 协调节点<br/>FetchSearchPhase
participant Transport as SearchTransportService
participant Service as SearchService<br/>(数据节点)
participant Context as SearchContext
participant FetchPhase as FetchPhase
participant Lucene as Lucene IndexSearcher
participant SubPhase1 as FetchSourcePhase
participant SubPhase2 as HighlightPhase
participant SubPhase3 as FetchFieldsPhase
Note over Coord: 构建Fetch请求
Coord->>Coord: 从ReducedQueryPhase获取<br/>全局Top K的(shardIndex, docId)
Coord->>Coord: 按分片分组docIds<br/>Map<ShardId, int[]>
loop 每个需要fetch的分片
Note over Coord,Service: 发送Fetch请求
Coord->>Coord: 构建ShardFetchRequest<br/>(contextId, docIds[])
Coord->>Transport: sendExecuteFetch(connection, request)
Transport->>Service: [RPC] ShardFetchRequest
Note over Service,FetchPhase: 数据节点处理Fetch
Service->>Service: executeFetchPhase(request, task, listener)
Service->>Service: 查找现有ReaderContext<br/>findReaderContext(contextId)
alt ReaderContext不存在
Service->>Service: 抛出SearchContextMissingException
Note over Service: 可能原因:<br/>1. Context已超时被清理<br/>2. 节点重启<br/>3. Context ID错误
end
Service->>Context: 从ReaderContext获取SearchContext
Service->>FetchPhase: FetchPhase.execute(context, docIds)
Note over FetchPhase,Lucene: 构建SearchHits
FetchPhase->>FetchPhase: buildSearchHits(context, docIds)
FetchPhase->>FetchPhase: 创建StoredFieldLoader<br/>(优化字段读取)
FetchPhase->>FetchPhase: 创建SourceLoader<br/>(加载_source)
FetchPhase->>FetchPhase: 创建IdLoader<br/>(加载_id)
FetchPhase->>FetchPhase: 初始化所有FetchSubPhaseProcessor
FetchPhase->>FetchPhase: 按Segment分组docIds
loop 每个Segment
FetchPhase->>FetchPhase: setNextReader(leafContext)
loop 该Segment的每个docId
Note over FetchPhase,Lucene: 处理单个文档
FetchPhase->>Lucene: storedFieldLoader.load(docId)<br/>读取StoredFields
Lucene->>Lucene: 从.fdt文件读取存储字段
Lucene-->>FetchPhase: StoredFields (Map<field, value>)
FetchPhase->>Lucene: idLoader.getId(docId)<br/>读取_id
Lucene-->>FetchPhase: _id
FetchPhase->>Lucene: sourceLoader.load(docId)<br/>读取_source
Lucene->>Lucene: 从StoredFields或SourceOnlySnapshot读取
Lucene-->>FetchPhase: BytesReference (_source JSON)
FetchPhase->>FetchPhase: 创建SearchHit对象
FetchPhase->>FetchPhase: 设置id, score, sortValues
Note over FetchPhase,SubPhase3: 执行FetchSubPhases
FetchPhase->>SubPhase1: FetchSourcePhase.process(hit)
SubPhase1->>SubPhase1: 应用_source过滤<br/>(includes/excludes)
SubPhase1->>SubPhase1: 过滤后的_source设置到hit
SubPhase1-->>FetchPhase: 完成
FetchPhase->>SubPhase3: FetchFieldsPhase.process(hit)
SubPhase3->>SubPhase3: 从_source或docValues提取fields
SubPhase3->>SubPhase3: 应用字段格式化
SubPhase3->>SubPhase3: 设置hit.fields
SubPhase3-->>FetchPhase: 完成
alt 请求了高亮
FetchPhase->>SubPhase2: HighlightPhase.process(hit)
SubPhase2->>SubPhase2: 选择高亮器<br/>(unified/fvh/plain)
SubPhase2->>Lucene: 获取term vectors<br/>或重新分析_source
SubPhase2->>SubPhase2: 生成高亮片段<br/>(带<em>标签)
SubPhase2->>SubPhase2: 设置hit.highlightFields
SubPhase2-->>FetchPhase: 完成
end
alt 请求了explain
FetchPhase->>Lucene: explain(query, docId)<br/>获取评分解释
Lucene-->>FetchPhase: Explanation对象
FetchPhase->>FetchPhase: 设置hit.explanation
end
alt 请求了inner_hits (嵌套文档)
FetchPhase->>FetchPhase: InnerHitsPhase.process(hit)
FetchPhase->>FetchPhase: 递归执行嵌套文档的查询和fetch
FetchPhase->>FetchPhase: 设置hit.innerHits
end
FetchPhase->>FetchPhase: hits[i] = hit
end
end
FetchPhase->>FetchPhase: new SearchHits(hits[], totalHits, maxScore)
FetchPhase-->>Service: FetchSearchResult
Note over Service: 更新Context的lastAccessTime<br/>保持Context活跃
Service-->>Transport: FetchSearchResult
Transport-->>Coord: FetchSearchResult
end
Note over Coord: 合并所有分片的FetchSearchResult
Coord->>Coord: 按全局TopDocs的顺序排列hits
Coord->>Coord: merge(reducedQueryPhase, fetchResults)
Coord->>Coord: 构建SearchResponseSections<br/>(hits + aggregations + suggest)
Coord->>Coord: 构建最终SearchResponse
Note over Coord: 清理资源(如果是单次查询)
alt 单次查询(非Scroll/PIT)
loop 每个分片
Coord->>Transport: sendFreeContext(contextId)
Transport->>Service: [RPC] FreeContextRequest
Service->>Service: freeReaderContext(contextId)<br/>释放IndexReader引用
end
end
Fetch Phase 功能详解
执行位置与触发
- 执行位置: 仅在需要返回文档的分片上执行
- 触发条件: Reduce Phase确定全局Top K后
- 线程池:
search线程池 - 并发: 所有需要的分片并行执行
docIds分组策略
Reduce Phase产生全局Top K后,需要按分片分组:
// 示例: 全局Top 10
// 来自3个分片的结果
TopDocs globalTopDocs = [
(shard=1, docId=103, score=9.5), // 第1名
(shard=0, docId=42, score=9.2), // 第2名
(shard=2, docId=201, score=8.8), // 第3名
(shard=1, docId=57, score=8.5), // 第4名
(shard=0, docId=91, score=8.3), // 第5名
...
];
// 按分片分组
Map<ShardId, int[]> docIdsByhard = {
shard_0: [42, 91, ...],
shard_1: [103, 57, ...],
shard_2: [201, ...]
};
// 为每个分片创建一个FetchRequest
StoredFields 读取优化
Lucene使用.fdt(Field Data)和.fdx(Field Index)文件存储StoredFields:
- 索引查找: 通过
.fdx文件找到docId对应的数据在.fdt中的位置 - 批量读取: 如果docIds是连续的,可以批量读取减少IO
- 压缩: StoredFields使用LZ4压缩,读取时解压
_source 加载策略
根据索引设置,_source可能来自不同位置:
| 索引设置 | _source位置 | 读取方式 |
|---|---|---|
_source.enabled: true (默认) |
StoredFields | 从.fdt文件读取 |
_source.mode: synthetic |
合成 | 从DocValues重建 |
store_source: false |
Recovery专用 | 不可用于搜索 |
FetchSubPhase 执行顺序与功能
Elasticsearch在Fetch Phase执行多个子阶段(SubPhase):
1. FetchSourcePhase (最先执行)
- 功能: 提取和过滤_source字段
- 参数:
_source: false: 不返回_source_source: ["field1", "field2"]: 仅包含指定字段_source: {"includes": [...], "excludes": [...]}: 包含和排除规则
- 实现: 使用JSON过滤器过滤_source
2. FetchFieldsPhase
- 功能: 提取指定字段的值
- 数据源优先级:
- _source (如果可用)
- doc_values (如果字段有docValues)
- stored_fields (如果字段存储)
- 格式化: 应用字段格式化器(如日期格式化)
3. FetchDocValuesPhase
- 功能: 从DocValues提取字段值
- 适用: 数值、日期、关键字等字段
- 优势: 列式存储,读取性能好
4. HighlightPhase
- 功能: 生成高亮片段
- 高亮器类型:
- Unified Highlighter (默认,推荐):
- 基于term vectors或重新分析
- 支持复杂查询
- 性能最好
- Fast Vector Highlighter:
- 需要
term_vector: with_positions_offsets - 速度快但占用更多磁盘
- 需要
- Plain Highlighter:
- 重新分析_source
- 慢但无需额外存储
- Unified Highlighter (默认,推荐):
- 片段生成:
原文: "Elasticsearch is a distributed search engine" 查询: "search" 高亮: "Elasticsearch is a distributed <em>search</em> engine"
5. ExplainPhase
- 功能: 生成评分解释
- 触发: 请求参数
explain: true - 输出: 详细的评分计算树
6. InnerHitsPhase
- 功能: 处理嵌套文档或父子文档的inner_hits
- 执行: 对每个Top文档递归执行查询和fetch
- 用途: 在父文档中显示匹配的子文档
SearchContext 生命周期管理
1. Context创建 (Query Phase)
ReaderContext readerContext = new ReaderContext(
id,
indexShard,
searcherSupplier, // 持有IndexReader引用
keepAlive,
singleSession
);
2. Context复用 (Fetch Phase)
// Fetch Phase通过contextId查找现有Context
ReaderContext ctx = activeReaders.get(contextId);
if (ctx == null) {
throw new SearchContextMissingException(contextId);
}
3. Context清理
- 单次查询: Fetch Phase结束后立即清理
- Scroll: 保持到scroll超时或显式清理
- PIT: 保持到pit超时或显式关闭
- 自动清理: 定时任务扫描超时的Context
性能优化点
-
字段过滤: 仅返回需要的字段,减少网络传输
{ "_source": ["title", "price"], "fields": ["date"] } -
禁用_source: 如果不需要完整文档
{ "_source": false, "docvalue_fields": ["field1", "field2"] } -
高亮优化:
- 使用Unified Highlighter
- 限制fragment_size和number_of_fragments
- 避免对大文本字段高亮
-
避免inner_hits: inner_hits会显著增加Fetch开销
内存和网络开销
| 优化前 | 优化后 | 节省 |
|---|---|---|
| 完整_source (10KB/doc) | 仅需要字段 (1KB/doc) | 90% |
| 返回100个字段 | 返回10个字段 | 网络带宽减少10倍 |
| 高亮整个文档 | 高亮片段 (200字符) | CPU减少50%+ |
8.6 Scroll API 流程
sequenceDiagram
autonumber
participant Client as 客户端
participant Coord as 协调节点
participant Shard1 as Shard 1
participant Shard2 as Shard 2
participant Context as SearchContext<br/>(保持打开)
Note over Client,Context: 第一次 Scroll - 初始化
Client->>Coord: POST /index/_search?scroll=1m<br/>{query, size: 1000}
Coord->>Coord: 1. 创建 Scroll Context
par 在每个分片创建 Context
Coord->>Shard1: ShardSearchRequest<br/>(scroll=1m)
Shard1->>Context: 2. 创建 ReaderContext<br/>(保持 Searcher 打开)
Shard1->>Shard1: 3. 执行查询
Shard1-->>Coord: ScrollQuerySearchResult<br/>(hits[0-999], scrollId)
and
Coord->>Shard2: ShardSearchRequest
Shard2->>Context: 创建 ReaderContext
Shard2->>Shard2: 执行查询
Shard2-->>Coord: ScrollQuerySearchResult
end
Coord->>Coord: 4. 归并结果
Coord-->>Client: SearchResponse<br/>{hits[], _scroll_id, total}
Note over Client,Context: 后续 Scroll - 继续迭代
Client->>Coord: POST /_search/scroll<br/>{scroll: "1m", scroll_id: "..."}
Coord->>Coord: 5. 解析 scroll_id<br/>获取 Context 信息
par 从每个分片获取下一批
Coord->>Shard1: ScrollQueryRequest<br/>(contextId)
Shard1->>Context: 6. 查找现有 Context
Shard1->>Shard1: 7. 读取下一批<br/>(hits[1000-1999])
Shard1->>Context: 8. 更新 Context<br/>(lastAccessTime)
Shard1-->>Coord: ScrollQuerySearchResult
and
Coord->>Shard2: ScrollQueryRequest
Shard2->>Context: 查找 Context
Shard2->>Shard2: 读取下一批
Shard2->>Context: 更新 Context
Shard2-->>Coord: ScrollQuerySearchResult
end
Coord->>Coord: 归并结果
Coord-->>Client: SearchResponse<br/>{hits[], _scroll_id}
Note over Client,Context: 清理 Scroll
Client->>Coord: DELETE /_search/scroll<br/>{scroll_id: "..."}
par 删除所有分片的 Context
Coord->>Shard1: ClearScrollRequest
Shard1->>Context: 释放 ReaderContext
Shard1-->>Coord: success
and
Coord->>Shard2: ClearScrollRequest
Shard2->>Context: 释放 ReaderContext
Shard2-->>Coord: success
end
Coord-->>Client: 200 OK
8. 性能优化
8.1 查询优化
1. 使用 Filter 代替 Query
{
"query": {
"bool": {
"filter": [
{ "term": { "status": "published" } }
]
}
}
}
- Filter 可以缓存
- 不计算分数,更快
2. 限制返回字段
{
"_source": ["title", "price"],
"size": 10
}
3. 使用 CanMatch 优化
- 自动跳过不匹配的分片
- 基于 min/max 值范围
8.2 分页优化
深度分页问题:
{
"from": 10000,
"size": 10
}
- 每个分片需要返回 10010 个结果
- 协调节点需要排序 10010 * N 个结果
解决方案:
1. Search After (推荐)
{
"size": 10,
"search_after": [1234567890, "doc#123"],
"sort": [
{ "timestamp": "asc" },
{ "_id": "asc" }
]
}
2. Scroll API
# 创建 Scroll
POST /index/_search?scroll=1m
{
"size": 1000,
"query": { "match_all": {} }
}
# 继续 Scroll
POST /_search/scroll
{
"scroll": "1m",
"scroll_id": "..."
}
8.3 聚合优化
1. 限制聚合基数
{
"aggs": {
"categories": {
"terms": {
"field": "category",
"size": 10 // 限制返回桶数量
}
}
}
}
2. 使用近似聚合
{
"aggs": {
"unique_users": {
"cardinality": {
"field": "user_id",
"precision_threshold": 1000 // 近似算法
}
}
}
}
9. 监控指标
搜索性能指标
| 指标 | 说明 | 建议值 |
|---|---|---|
| search.query_total | 查询总数 | 监控趋势 |
| search.query_time_in_millis | 查询总耗时 | - |
| search.query_current | 当前查询数 | < 线程池大小 |
| search.fetch_total | Fetch 总数 | - |
| search.fetch_time_in_millis | Fetch 总耗时 | - |
| search.scroll_total | Scroll 总数 | - |
| search.open_contexts | 打开的搜索上下文数 | < 500 |
线程池指标
GET /_cat/thread_pool/search?v&h=node_name,name,active,queue,rejected
| 指标 | 说明 | 建议值 |
|---|---|---|
| active | 活跃线程数 | < size |
| queue | 队列长度 | < queue_size |
| rejected | 拒绝次数 | 0 |
10. 常见问题
10.1 搜索慢
排查步骤:
- 检查慢查询日志
- 使用 Profile API 分析
- 检查分片数量
- 检查聚合复杂度
Profile API:
{
"profile": true,
"query": { ... }
}
10.2 深度分页
问题: from + size > 10000
解决:
- 使用 Search After
- 使用 Scroll (导出场景)
- 增大
max_result_window(不推荐)
10.3 搜索上下文过多
问题: Too many scroll contexts
解决:
- 及时清理 Scroll
- 减小 Scroll 超时时间
- 增大
search.max_open_scroll_context