Elasticsearch-03-搜索模块

本文档提供搜索模块的全面剖析,包括模块职责、架构设计、核心数据结构、对外API详细规格、关键流程时序图、性能优化和故障排查。


1. 模块职责

搜索模块是 Elasticsearch 的核心功能模块,负责分布式搜索请求的处理。主要职责包括:

  • 查询执行: 在 Lucene 上执行用户查询
  • 两阶段搜索: Query Phase(查询阶段) + Fetch Phase(获取阶段)
  • 结果聚合: 归并多个分片的搜索结果
  • 聚合计算: 执行聚合分析(Aggregations)
  • 高亮显示: 生成搜索结果高亮片段
  • 排序: 支持多字段排序和自定义排序
  • 分页: 支持 from/size, Scroll, Search After 分页

输入/输出

输入:

  • SearchRequest (查询请求)
  • Query DSL (查询语句)
  • Aggregations (聚合定义)
  • Sort (排序规则)

输出:

  • SearchResponse (搜索响应)
  • Hits (匹配的文档列表)
  • Aggregations (聚合结果)
  • Took (耗时统计)

上下游依赖

上游调用方:

  • Action Module (TransportSearchAction)
  • REST Layer (RestSearchAction)

下游被调用方:

  • Index Shard (读取数据)
  • Apache Lucene (执行查询)
  • Aggregation Module (计算聚合)

2. 整体服务架构图

flowchart TB
    subgraph Client["客户端层"]
        RESTClient[REST Client]
        JavaClient[Java Client]
        HTTPRequest[HTTP Request<br/>POST /index/_search]
    end

    subgraph RESTLayer["REST接口层 (协调节点)"]
        RestSearchAction[RestSearchAction<br/>解析HTTP请求]
        SearchRequestBuilder[SearchRequest构建<br/>解析Query DSL]
    end

    subgraph ActionLayer["Action传输层 (协调节点)"]
        TransportSearchAction[TransportSearchAction<br/>分布式协调器]
        SearchPhaseProvider[SearchPhaseProvider<br/>阶段提供者]
        SearchAsyncAction[SearchQueryThenFetch<br/>AsyncAction<br/>异步执行器]
    end

    subgraph Coordinator["协调阶段执行器 (协调节点)"]
        CanMatchPhase[CanMatchPreFilterPhase<br/>预过滤阶段]
        QueryPhaseCoord[QueryPhase协调<br/>并发查询所有分片]
        ReducePhase[ReducePhase<br/>SearchPhaseController<br/>结果归并]
        FetchPhaseCoord[FetchPhase协调<br/>并发获取文档]
    end

    subgraph TransportLayer["传输服务层"]
        SearchTransportService[SearchTransportService<br/>RPC通信]
        ConnectionMgr[Connection Manager<br/>节点连接管理]
    end

    subgraph DataNode1["数据节点 1"]
        SService1[SearchService]
        SContext1[SearchContext]
        QPExec1[QueryPhase.execute]
        FPExec1[FetchPhase.execute]
        Shard1[IndexShard]
        Lucene1[Lucene Searcher]
    end

    subgraph DataNode2["数据节点 2"]
        SService2[SearchService]
        SContext2[SearchContext]
        QPExec2[QueryPhase.execute]
        FPExec2[FetchPhase.execute]
        Shard2[IndexShard]
        Lucene2[Lucene Searcher]
    end

    subgraph DataNodeN["数据节点 N"]
        SServiceN[SearchService]
        SContextN[SearchContext]
        QPExecN[QueryPhase.execute]
        FPExecN[FetchPhase.execute]
        ShardN[IndexShard]
        LuceneN[Lucene Searcher]
    end

    RESTClient --> HTTPRequest
    JavaClient --> HTTPRequest
    HTTPRequest --> RestSearchAction

    RestSearchAction --> SearchRequestBuilder
    SearchRequestBuilder --> TransportSearchAction

    TransportSearchAction --> SearchPhaseProvider
    SearchPhaseProvider --> SearchAsyncAction
    SearchAsyncAction --> CanMatchPhase

    CanMatchPhase --> |过滤后的分片列表| QueryPhaseCoord

    QueryPhaseCoord --> SearchTransportService
    SearchTransportService --> ConnectionMgr

    ConnectionMgr --> |ShardSearchRequest| SService1
    ConnectionMgr --> |ShardSearchRequest| SService2
    ConnectionMgr --> |ShardSearchRequest| SServiceN

    SService1 --> SContext1 --> QPExec1 --> Shard1 --> Lucene1
    SService2 --> SContext2 --> QPExec2 --> Shard2 --> Lucene2
    SServiceN --> SContextN --> QPExecN --> ShardN --> LuceneN

    Lucene1 --> |QuerySearchResult| ReducePhase
    Lucene2 --> |QuerySearchResult| ReducePhase
    LuceneN --> |QuerySearchResult| ReducePhase

    ReducePhase --> |Top K docIds| FetchPhaseCoord
    FetchPhaseCoord --> SearchTransportService

    SearchTransportService --> |ShardFetchRequest| SService1
    SearchTransportService --> |ShardFetchRequest| SService2
    SearchTransportService --> |ShardFetchRequest| SServiceN

    SService1 --> FPExec1 --> Lucene1
    SService2 --> FPExec2 --> Lucene2
    SServiceN --> FPExecN --> LuceneN

    Lucene1 --> |FetchSearchResult| SearchAsyncAction
    Lucene2 --> |FetchSearchResult| SearchAsyncAction
    LuceneN --> |FetchSearchResult| SearchAsyncAction

    SearchAsyncAction --> |SearchResponse| RestSearchAction
    RestSearchAction --> |JSON Response| Client

    style RESTLayer fill:#E1F5FF
    style ActionLayer fill:#FFE1E1
    style Coordinator fill:#E1FFE1
    style TransportLayer fill:#F5E1FF
    style DataNode1 fill:#FFF4E1
    style DataNode2 fill:#FFF4E1
    style DataNodeN fill:#FFF4E1

架构说明

各层职责与交互

1. REST接口层 (RestSearchAction)

  • 职责: HTTP请求解析与响应格式化
  • 关键处理:
    • 解析URL参数 (index, routing, preference等)
    • 解析请求体 (Query DSL, 聚合, 排序等)
    • 构建 SearchRequest 对象
    • SearchResponse 格式化为JSON返回客户端
  • 入口路由:
    • GET/POST /_search - 全局搜索
    • GET/POST /{index}/_search - 指定索引搜索
  • 核心方法: prepareRequest() → 解析并提交到Action层

2. Action传输层 (TransportSearchAction)

  • 职责: 分布式协调与阶段编排
  • 关键处理:
    • 解析目标索引并计算涉及的分片
    • 构建 SearchShardIterator[] 分片路由迭代器
    • 根据搜索类型选择执行策略:
      • QUERY_THEN_FETCHSearchQueryThenFetchAsyncAction
      • DFS_QUERY_THEN_FETCHSearchDfsQueryThenFetchAsyncAction
    • 管理跨集群搜索 (CCS) 逻辑
    • 处理 Point-in-Time 和 Scroll 上下文
  • 核心方法: doExecute() → 创建并启动异步搜索

3. 阶段执行器 (SearchQueryThenFetchAsyncAction)

  • 职责: 异步协调各个搜索阶段
  • 阶段流程:
    CanMatch → Query → Reduce → Fetch → Response
    
  • 并发控制:
    • maxConcurrentShardRequests: 控制同时查询的分片数 (默认5)
    • 基于节点级别的流控: 避免单节点过载
    • 使用 Semaphore 实现背压控制
  • 失败重试: 自动在副本分片上重试失败的请求
  • 核心方法: start()executePhaseOnShard()

4. 传输服务层 (SearchTransportService)

  • 职责: RPC通信与请求路由
  • 注册的处理器:
    • QUERY_ACTION_NAME: 处理Query Phase请求
    • FETCH_ID_ACTION_NAME: 处理Fetch Phase请求
    • DFS_ACTION_NAME: 处理DFS Phase请求
    • FREE_CONTEXT_ACTION_NAME: 清理搜索上下文
  • 连接管理: 维护与各数据节点的长连接
  • 序列化: 使用 StreamInput/StreamOutput 序列化请求和响应

5. 数据节点服务层 (SearchService)

  • 职责: 分片级搜索执行
  • 生命周期管理:
    • 创建 ReaderContext: 持有 IndexReader 引用
    • 创建 SearchContext: 封装单次搜索的所有状态
    • 引用计数管理: 确保 Reader 不会过早关闭
    • 超时管理: 通过 ScrollContext 管理长期上下文
  • 线程池: 使用 search 线程池异步执行查询
  • 核心方法:
    • executeQueryPhase(): 执行Query Phase
    • executeFetchPhase(): 执行Fetch Phase
    • createContext(): 创建搜索上下文

搜索阶段详解

阶段 0: CanMatch Phase (预过滤阶段)

  • 目的: 快速过滤不包含匹配文档的分片
  • 触发条件: 分片数 > pre_filter_shard_size (默认128)
  • 检查方式:
    • 检查分片的 min/max 值范围 (基于 PointValues)
    • 在协调节点重写查询 (coordinator rewrite)
    • 检查日期范围、数值范围是否与分片重叠
  • 优化效果: 可减少 50%+ 的分片查询
  • 适用场景: 带范围过滤的查询 (range, date_range)

阶段 1: Query Phase (查询阶段)

  • 目的: 在每个分片上执行查询,返回 Top K 文档的 ID 和分数
  • 执行流程:
    1. 构建 Lucene Query 对象
    2. 创建 Collector (TopDocsCollector + AggregationCollector)
    3. 执行 IndexSearcher.search(query, collector)
    4. 收集 Top (size + from) 个文档的 (docId, score, sortValues)
    5. 执行聚合计算
  • 输出: QuerySearchResult
    • docIds[]: 文档内部ID数组
    • scores[]: 文档分数数组
    • sortValues[][]: 排序字段值 (如果有排序)
    • totalHits: 总命中数
    • aggregations: 分片级聚合结果
  • 并发: 所有分片并行执行
  • 轻量级: 仅返回文档 ID 和排序值,不返回文档内容

阶段 2: Reduce Phase (归并阶段)

  • 目的: 归并所有分片的 QueryResult,选出全局 Top K
  • 操作流程:
    1. 归并排序: 使用优先队列归并所有分片的 Top K
      • 算法: K路归并排序
      • 复杂度: O(N * K * log K), N=分片数
    2. 分数重排序: 按分数/排序字段重新排序
    3. 聚合归并: 合并所有分片的聚合结果
      • 对于 sum/avg/max/min: 直接合并
      • 对于 terms: 重新聚合各分片的 Top Terms
    4. 选出全局 Top K: 跳过 from,选择 size 个结果
  • 输出: ReducedQueryPhase
    • 全局 Top K 的 (shardIndex, docId) 映射
    • 归并后的聚合结果
    • 总命中数和最高分数
  • 执行位置: 协调节点

阶段 3: Fetch Phase (获取阶段)

  • 目的: 获取 Top K 文档的完整内容
  • 输入: 全局 Top K 的 (shardId, docId) 列表
  • 执行流程:
    1. 按分片分组 docIds: {shard1: [docId1, docId5], shard2: [docId2]}
    2. 并发向各分片发送 ShardFetchRequest
    3. 在分片上执行:
      • 通过 docId 读取 StoredFields
      • 提取 _source 字段
      • 生成高亮片段 (如果需要)
      • 提取 doc_values 字段 (如果需要)
    4. 返回 FetchSearchResult
  • 输出: FetchSearchResult
    • documents[]: 完整文档数组
    • highlightFields: 高亮字段映射
    • fields: 指定字段值
  • 并发: 仅需要的分片并行执行
  • 重量级: 需要读取完整文档内容,网络传输量大

两阶段设计优势

方面 Query Phase Fetch Phase
网络传输 小(仅 docId+score) 大(完整文档)
计算开销 高(查询+聚合) 低(仅读取)
并发度 所有分片 仅需要的分片
优化空间 缓存查询结果 缓存文档内容

为什么需要两阶段?

  • 减少网络传输: Query Phase 仅传输 docId,Fetch Phase 仅传输需要的文档
  • 负载均衡: 每个副本可以处理 Query Phase,减轻主分片压力
  • 提前终止: 部分查询可以在 Query Phase 提前终止

3. 核心组件

3.1 SearchService

职责: 管理搜索请求的生命周期

public class SearchService extends AbstractLifecycleComponent {
    // 线程池
    private final ThreadPool threadPool;

    // 搜索上下文管理
    private final ConcurrentMapLong<ReaderContext> activeReaders;

    // Fetch 阶段处理
    private final FetchPhase fetchPhase;

    // 执行 Query Phase
    public void executeQueryPhase(
        ShardSearchRequest request,
        SearchShardTask task,
        ActionListener<SearchPhaseResult> listener
    );

    // 执行 Fetch Phase
    public void executeFetchPhase(
        ShardFetchRequest request,
        SearchShardTask task,
        ActionListener<FetchSearchResult> listener
    );

    // 创建搜索上下文
    private SearchContext createContext(
        ReaderContext reader,
        ShardSearchRequest request,
        SearchShardTask task
    );
}

3.2 SearchContext

职责: 封装单个分片的搜索上下文

public interface SearchContext extends Releasable {
    // 查询信息
    SearchShardTarget shardTarget();
    ShardSearchRequest request();
    Query query();

    // 搜索器
    ContextIndexSearcher searcher();

    // 结果
    QuerySearchResult queryResult();
    FetchSearchResult fetchResult();

    // 聚合上下文
    SearchContextAggregations aggregations();

    // 排序
    SortAndFormats sort();

    // 分页
    int from();
    int size();

    // 超时
    TimeValue timeout();
    boolean isTimedOut();
}

3.3 QueryPhase

职责: 执行查询阶段

public class QueryPhase {

    public static void execute(SearchContext searchContext) {
        // 1. 预处理聚合
        AggregationPhase.preProcess(searchContext);

        // 2. 构建 Collector
        List<Collector> collectors = new ArrayList<>();
        collectors.add(new TopDocsCollector());
        if (searchContext.aggregations() != null) {
            collectors.add(searchContext.aggregations().collector());
        }

        // 3. 执行搜索
        MultiCollector multiCollector = MultiCollector.wrap(collectors);
        searchContext.searcher().search(
            searchContext.query(),
            multiCollector
        );

        // 4. 后处理
        RescorePhase.execute(searchContext);  // Rescore
        SuggestPhase.execute(searchContext);  // Suggest
    }
}

3.4 FetchPhase

职责: 执行获取阶段

public final class FetchPhase {

    public void execute(
        SearchContext context,
        int[] docIdsToLoad
    ) {
        // 1. 加载文档
        SearchHits hits = buildSearchHits(
            context,
            docIdsToLoad
        );

        // 2. 执行 SubPhases
        for (FetchSubPhase subPhase : fetchSubPhases) {
            subPhase.process(context, hits);
        }

        // 3. 设置结果
        context.fetchResult().shardResult(hits);
    }

    private SearchHits buildSearchHits(
        SearchContext context,
        int[] docIdsToLoad
    ) {
        SearchHit[] hits = new SearchHit[docIdsToLoad.length];

        for (int i = 0; i < docIdsToLoad.length; i++) {
            int docId = docIdsToLoad[i];

            // 读取文档
            Document doc = context.searcher().doc(docId);

            // 构建 SearchHit
            hits[i] = new SearchHit(docId);
            hits[i].sourceRef(doc.getSource());
            hits[i].score(scores[i]);
        }

        return new SearchHits(hits, totalHits, maxScore);
    }
}

4. 搜索类型

4.1 QUERY_THEN_FETCH (默认)

1. Query Phase: 所有分片并行查询,返回 Top K docIds
2. Reduce: 协调节点归并,选出全局 Top K
3. Fetch Phase: 获取 Top K 文档内容

特点:

  • 默认搜索类型
  • 最少网络传输
  • 适用于大多数场景

4.2 DFS_QUERY_THEN_FETCH

1. DFS Phase: 收集所有分片的词频统计
2. Query Phase: 基于全局词频计算分数
3. Reduce: 归并结果
4. Fetch Phase: 获取文档

特点:

  • 更准确的 TF-IDF 分数
  • 额外的网络往返
  • 仅用于分数精确度要求高的场景

5. 核心数据结构

5.1 SearchContext 与相关类

classDiagram
    class SearchContext {
        <<abstract>>
        +ShardSearchContextId id
        +SearchType searchType
        +SearchShardTarget shardTarget
        +ShardSearchRequest request
        +ReaderContext readerContext
        +Query query
        +ParsedQuery postFilter
        +SortAndFormats sort
        +int from
        +int size
        +int trackTotalHitsUpTo
        +int terminateAfter
        +TimeValue timeout
        +SearchContextAggregations aggregations
        +SearchHighlightContext highlight
        +FetchSourceContext fetchSourceContext
        +StoredFieldsContext storedFieldsContext
        +QuerySearchResult queryResult
        +FetchSearchResult fetchResult
        +preProcess()
        +buildFilteredQuery(Query)
        +close()
    }

    class ShardSearchContextImpl {
        -ReaderContext readerContext
        -ShardSearchRequest request
        -SearchShardTarget shardTarget
        -ContextIndexSearcher searcher
        -SearchExecutionContext searchExecutionContext
        -QuerySearchResult queryResult
        -FetchSearchResult fetchResult
        -ScrollContext scrollContext
        -int from
        -int size
        -Query query
        -SortAndFormats sort
        +execute()
    }

    class ReaderContext {
        -ShardSearchContextId id
        -IndexService indexService
        -IndexShard indexShard
        -Engine.SearcherSupplier searcherSupplier
        -AtomicLong keepAlive
        -AtomicLong lastAccessTime
        -boolean singleSession
        -List~Releasable~ onCloses
        -Map~String,Object~ context
        +incRef()
        +tryIncRef()
        +decRef()
        +updateLastAccessTime()
        +close()
    }

    class ContextIndexSearcher {
        -Query cancellable
        -Runnable checkCancelled
        -CircuitBreaker circuitBreaker
        +search(Query, Collector)
        +searchAfter(ScoreDoc, Query, int)
        +searchLeaf(LeafReaderContext, Weight, Collector)
    }

    SearchContext <|-- ShardSearchContextImpl
    ShardSearchContextImpl --> ReaderContext : holds
    ShardSearchContextImpl --> ContextIndexSearcher : uses
    ReaderContext --> IndexShard : references

SearchContext

职责: 搜索执行的核心上下文基类

关键字段:

字段 类型 说明
id ShardSearchContextId 上下文唯一标识符
searchType SearchType 搜索类型 (QUERY_THEN_FETCH / DFS_QUERY_THEN_FETCH)
shardTarget SearchShardTarget 目标分片信息 (index, shard, node)
request ShardSearchRequest 分片搜索请求
query Query Lucene 查询对象
postFilter ParsedQuery 后置过滤器 (在聚合后执行)
sort SortAndFormats 排序规则
from int 分页起始位置
size int 返回结果数量
trackTotalHitsUpTo int 统计总数的上限 (默认10000)
terminateAfter int 提前终止的文档数 (0表示不终止)
timeout TimeValue 查询超时时间
aggregations SearchContextAggregations 聚合上下文
highlight SearchHighlightContext 高亮上下文
fetchSourceContext FetchSourceContext _source 过滤配置
queryResult QuerySearchResult Query Phase 结果
fetchResult FetchSearchResult Fetch Phase 结果

生命周期:

创建 → preProcess() → Query Phase → Fetch Phase → close()

ReaderContext

职责: 管理分片的 IndexReader 生命周期和引用计数

关键字段:

字段 类型 说明
id ShardSearchContextId Context 唯一 ID
indexShard IndexShard 所属分片
searcherSupplier Engine.SearcherSupplier Searcher 提供者
keepAlive AtomicLong 保持存活时间 (毫秒)
lastAccessTime AtomicLong 最后访问时间
singleSession boolean 是否单次查询 (true: 一次性, false: Scroll/PIT)
refCounted AbstractRefCounted 引用计数器

引用计数:

// 增加引用
boolean success = readerContext.tryIncRef();

// 使用
try {
    Engine.Searcher searcher = readerContext.acquireSearcher("search");
    // ... 执行搜索
} finally {
    // 减少引用
    readerContext.decRef();
}

5.2 SearchRequest & SearchResponse

classDiagram
    class SearchRequest {
        -String[] indices
        -SearchType searchType
        -SearchSourceBuilder source
        -String routing
        -String preference
        -Boolean requestCache
        -Boolean allowPartialSearchResults
        -TimeValue scrollKeepAlive
        -int maxConcurrentShardRequests
        -Integer preFilterShardSize
        -int batchedReduceSize
        -IndicesOptions indicesOptions
        +indices(String[])
        +source(SearchSourceBuilder)
        +validate()
    }

    class SearchSourceBuilder {
        -QueryBuilder query
        -int from
        -int size
        -List~SortBuilder~ sorts
        -List~AggregationBuilder~ aggregations
        -HighlightBuilder highlight
        -FetchSourceContext fetchSource
        -SearchAfterBuilder searchAfter
        -TimeValue timeout
        -boolean explain
        -boolean version
        -Integer trackTotalHitsUpTo
        -Integer terminateAfter
        +query(QueryBuilder)
        +from(int)
        +size(int)
        +sort(SortBuilder)
        +aggregation(AggregationBuilder)
        +toXContent(XContentBuilder)
    }

    class SearchResponse {
        -SearchHits hits
        -InternalAggregations aggregations
        -Suggest suggest
        -SearchProfileResults profileResults
        -boolean timedOut
        -Boolean terminatedEarly
        -int numReducePhases
        -String scrollId
        -BytesReference pointInTimeId
        -int totalShards
        -int successfulShards
        -int skippedShards
        -ShardSearchFailure[] shardFailures
        -Clusters clusters
        -long tookInMillis
        +getHits()
        +getAggregations()
        +getTook()
    }

    class SearchHits {
        -TotalHits totalHits
        -float maxScore
        -SearchHit[] hits
        +iterator()
    }

    class SearchHit {
        -String id
        -float score
        -Map~String,Object~ sourceAsMap
        -String index
        -long version
        -long seqNo
        -long primaryTerm
        -Object[] sortValues
        -Map~String,HighlightField~ highlightFields
        -Map~String,SearchHitField~ fields
        +getId()
        +getScore()
        +getSourceAsMap()
    }

    SearchRequest --> SearchSourceBuilder : contains
    SearchResponse --> SearchHits : contains
    SearchHits --> SearchHit : contains many

6. 对外 API

6.1 API 清单

搜索模块对外提供以下核心 API:

API 名称 HTTP 方法 路径 幂等性 说明
Standard Search GET/POST /{index}/_search 标准搜索
Scroll Search POST /{index}/_search?scroll=1m Scroll 搜索
Search Scroll POST /_search/scroll 继续 Scroll
Clear Scroll DELETE /_search/scroll 清理 Scroll
Point-in-Time POST /{index}/_pit?keep_alive=1m 打开 PIT
Close PIT DELETE /_pit 关闭 PIT
Multi-Search POST /_msearch 批量搜索
Count GET/POST /{index}/_count 计数查询

6.2 Standard Search API

基本信息

  • 名称: _search
  • 协议与方法: HTTP GET/POST /{index}/_search
  • 幂等性: 是(查询操作)
  • 入口 Action: TransportSearchAction

请求结构体

public class SearchRequest extends LegacyActionRequest {
    // 目标索引
    private String[] indices = Strings.EMPTY_ARRAY;

    // 搜索类型 (QUERY_THEN_FETCH, DFS_QUERY_THEN_FETCH)
    private SearchType searchType = SearchType.DEFAULT;

    // 搜索查询体
    private SearchSourceBuilder source;

    // 路由控制
    private String routing;

    // 偏好设置(_local, _primary, _replica, custom string)
    private String preference;

    // 是否使用请求缓存
    private Boolean requestCache;

    // 是否允许部分结果
    private Boolean allowPartialSearchResults;

    // Scroll 保持时间
    private TimeValue scrollKeepAlive;

    // 并发分片请求数量
    private int maxConcurrentShardRequests = 5;

    // 预过滤分片大小阈值
    private Integer preFilterShardSize = 128;

    // 批量归并大小
    private int batchedReduceSize = 512;

    // 索引选项
    private IndicesOptions indicesOptions;
}

请求字段表

字段 类型 必填 默认值 约束 说明
indices String[] [] - 目标索引,空表示所有索引
searchType SearchType QUERY_THEN_FETCH QUERY_THEN_FETCH / DFS_QUERY_THEN_FETCH 搜索类型
source SearchSourceBuilder - - 查询、聚合、排序、分页等配置
routing String null - 路由值,控制分片选择
preference String null _local/_primary/_replica/custom 分片偏好
requestCache Boolean null - 是否缓存结果 (size=0 时默认true)
allowPartialSearchResults Boolean true - 部分分片失败时是否返回结果
scrollKeepAlive TimeValue null - Scroll 上下文保持时间
maxConcurrentShardRequests int 5 >0 并发查询分片数量
preFilterShardSize Integer 128 >0 启用 CanMatch 预过滤的阈值
batchedReduceSize int 512 >0 批量归并结果的批次大小

SearchSourceBuilder 核心字段

public class SearchSourceBuilder {
    private QueryBuilder query;              // 查询条件
    private int from = 0;                     // 分页起始位置
    private int size = 10;                    // 返回结果数量
    private List<SortBuilder<?>> sorts;       // 排序规则
    private List<AggregationBuilder> aggregations; // 聚合
    private List<String> storedFields;        // 返回的 stored 字段
    private List<String> docValueFields;      // 返回的 doc_value 字段
    private FetchSourceContext fetchSource;   // _source 过滤
    private HighlightBuilder highlight;       // 高亮
    private SearchAfterBuilder searchAfter;   // search_after 分页
    private TimeValue timeout;                // 查询超时时间
    private boolean explain;                  // 是否返回评分解释
    private boolean version;                  // 是否返回版本号
    private boolean trackScores;              // 是否计算分数
    private Integer trackTotalHitsUpTo;       // 统计总数上限
    private Integer terminateAfter;           // 提前终止文档数
}

响应结构体

public class SearchResponse extends ActionResponse {
    // 搜索结果
    private final SearchHits hits;

    // 聚合结果
    private final InternalAggregations aggregations;

    // 搜索建议结果
    private final Suggest suggest;

    // 性能分析结果
    private final SearchProfileResults profileResults;

    // 是否超时
    private final boolean timedOut;

    // 是否提前终止
    private final Boolean terminatedEarly;

    // 归并阶段数量
    private final int numReducePhases;

    // Scroll ID
    private final String scrollId;

    // Point-in-Time ID
    private final BytesReference pointInTimeId;

    // 分片统计
    private final int totalShards;
    private final int successfulShards;
    private final int skippedShards;
    private final ShardSearchFailure[] shardFailures;

    // 集群统计 (跨集群搜索)
    private final Clusters clusters;

    // 耗时
    private final long tookInMillis;
}

响应字段表

字段 类型 必填 说明
hits SearchHits 搜索命中的文档
aggregations Aggregations 聚合结果
suggest Suggest 搜索建议
profileResults ProfileResults 性能分析结果 (profile=true 时)
timedOut boolean 是否超时
terminatedEarly Boolean 是否提前终止
numReducePhases int 归并阶段数量 (分布式搜索)
scrollId String Scroll ID (scroll 请求时)
pointInTimeId BytesReference Point-in-Time ID (PIT 请求时)
totalShards int 总分片数
successfulShards int 成功分片数
skippedShards int 跳过分片数 (CanMatch 过滤)
shardFailures ShardSearchFailure[] 分片失败详情
tookInMillis long 查询耗时(毫秒)

入口函数与核心代码

// TransportSearchAction - 搜索操作的传输层动作
public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {

    @Override
    protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
        // 1. 解析和重写请求
        final long relativeStartNanos = System.nanoTime();
        final SearchTimeProvider timeProvider = new SearchTimeProvider(
            searchRequest.getOrCreateAbsoluteStartMillis(),
            relativeStartNanos,
            System::nanoTime
        );

        // 2. 解析目标索引
        final ClusterState clusterState = clusterService.state();
        final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(
            clusterState,
            searchRequest.indicesOptions(),
            searchRequest.indices()
        );

        // 3. 计算目标分片
        final Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(
            searchRequest,
            clusterState,
            concreteIndices,
            remoteClusterIndices
        );

        // 4. 创建搜索任务上下文
        final SearchTask searchTask = (SearchTask) task;
        final SearchShardIterator[] shardIterators = buildShardIterators(
            clusterState,
            concreteIndices,
            aliasFilter,
            searchRequest.routing(),
            searchRequest.preference()
        );

        // 5. 执行搜索
        searchAsyncAction(
            searchTask,
            searchRequest,
            shardIterators,
            timeProvider,
            listener
        ).start();
    }

    private void searchAsyncAction(
        SearchTask task,
        SearchRequest searchRequest,
        SearchShardIterator[] shardIterators,
        SearchTimeProvider timeProvider,
        ActionListener<SearchResponse> listener
    ) {
        // 创建异步搜索执行器
        SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(
            task,
            searchRequest,
            shardIterators,
            timeProvider,
            searchPhaseController,
            executor,
            listener
        );
        return action;
    }
}

异常与回退

分片失败处理

  1. 部分失败: allowPartialSearchResults=true 时返回部分结果
  2. 全部失败: 抛出 SearchPhaseExecutionException
  3. 重试: 自动在副本分片上重试
  4. 超时: 达到 timeout 后返回已收集的结果

错误类型

错误 HTTP状态码 说明
NoShardAvailableActionException 503 无可用分片
SearchPhaseExecutionException 503 搜索阶段失败
QueryShardException 400 查询语法错误
CircuitBreakingException 429 断路器触发

性能要点与最佳实践

性能优化

  1. 减少数据传输

    • 使用 _source filtering 仅返回需要的字段
    • 使用 stored_fieldsdocvalue_fields
  2. 分页优化

    • 深度分页使用 search_after 而非 from/size
    • 避免大 from 值 (默认 max_result_window=10000)
  3. 并发控制

    • 调整 max_concurrent_shard_requests (默认5)
    • 大索引增大并发,小索引减小并发
  4. 预过滤

    • 启用 pre_filter_shard_size (默认128)
    • 自动跳过不匹配的分片
  5. 缓存利用

    • 对于 size=0 的聚合查询启用 request_cache
    • Filter 查询自动缓存

最佳实践

// 1. 高效分页
{
  "size": 10,
  "sort": [{"timestamp": "desc"}, {"_id": "asc"}],
  "search_after": [1609459200000, "doc#10"]
}

// 2. _source 过滤
{
  "_source": ["title", "author", "publish_date"],
  "query": {"match": {"title": "elasticsearch"}}
}

// 3. 仅统计数量
{
  "size": 0,
  "track_total_hits": true,
  "query": {"match_all": {}}
}

// 4. 使用 Filter (可缓存)
{
  "query": {
    "bool": {
      "filter": [
        {"term": {"status": "published"}},
        {"range": {"date": {"gte": "2024-01-01"}}}
      ]
    }
  }
}

7. 模块间详细交互与调用链路

7.1 完整调用链路图

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant REST as RestSearchAction<br/>(REST层)
    participant Transport as TransportSearchAction<br/>(Action层)
    participant AsyncAction as SearchQueryThenFetch<br/>AsyncAction
    participant TransportSvc as SearchTransportService<br/>(Transport层)
    participant DataNode as SearchService<br/>(数据节点)
    participant Context as SearchContext
    participant QueryPhase as QueryPhase.execute
    participant Lucene as Lucene IndexSearcher
    participant Controller as SearchPhaseController<br/>(归并)
    participant FetchPhase as FetchPhase.execute

    Note over Client,Lucene: REST层处理
    Client->>REST: HTTP POST /index/_search<br/>{query, size, from}
    REST->>REST: 解析URL参数 (index, routing)
    REST->>REST: 解析JSON Body → SearchSourceBuilder
    REST->>REST: 构建SearchRequest对象
    REST->>Transport: client.execute(TransportSearchAction.TYPE, request)

    Note over Transport,AsyncAction: Action层协调
    Transport->>Transport: doExecute(task, searchRequest, listener)
    Transport->>Transport: 解析目标索引 → concreteIndices[]
    Transport->>Transport: 计算分片路由 → SearchShardIterator[]
    Transport->>Transport: 构建别名过滤器 → aliasFilter
    Transport->>AsyncAction: new SearchQueryThenFetchAsyncAction(...)<br/>.start()

    Note over AsyncAction,TransportSvc: 异步执行器启动
    AsyncAction->>AsyncAction: start() → doRun()
    AsyncAction->>AsyncAction: 构建分片索引映射 shardIndexMap

    loop 每个分片
        AsyncAction->>AsyncAction: buildShardSearchRequest(shardIt)
        AsyncAction->>AsyncAction: 应用节点级并发控制 (maxConcurrent)
        AsyncAction->>TransportSvc: sendExecuteQuery(connection, request)
    end

    Note over TransportSvc,DataNode: 传输层 RPC 调用
    TransportSvc->>DataNode: [RPC] ShardSearchRequest

    Note over DataNode,Lucene: 数据节点 Query Phase
    DataNode->>DataNode: executeQueryPhase(request, task, listener)
    DataNode->>DataNode: 获取 IndexShard
    DataNode->>DataNode: rewriteAndFetchShardRequest()<br/>(查询重写)
    DataNode->>DataNode: 提交到 search 线程池

    DataNode->>Context: createContext(readerContext, request)
    Context->>Context: 创建 DefaultSearchContext
    Context->>Context: parseSource() → 解析查询/聚合/排序
    Context->>Context: preProcess() → 前置处理

    DataNode->>QueryPhase: QueryPhase.execute(searchContext)
    QueryPhase->>QueryPhase: AggregationPhase.preProcess()
    QueryPhase->>QueryPhase: 构建 QueryPhaseCollectorManager
    QueryPhase->>QueryPhase: 创建 TopDocsCollector + AggsCollector
    QueryPhase->>Lucene: searcher.search(query, collectorManager)
    Lucene->>Lucene: 执行 Lucene 查询
    Lucene->>Lucene: 收集 Top (size+from) docs
    Lucene-->>QueryPhase: QueryPhaseResult<br/>(docIds, scores, sortValues)
    QueryPhase->>QueryPhase: RescorePhase.execute() (如果有)
    QueryPhase->>QueryPhase: SuggestPhase.execute() (如果有)
    QueryPhase-->>DataNode: queryResult
    DataNode-->>TransportSvc: QuerySearchResult
    TransportSvc-->>AsyncAction: QuerySearchResult

    Note over AsyncAction,Controller: 协调节点归并
    AsyncAction->>AsyncAction: 收集所有分片的 QuerySearchResult
    AsyncAction->>Controller: getNextPhase() → 触发 Reduce
    Controller->>Controller: reducedQueryPhase(queryResults)
    Controller->>Controller: 归并排序所有分片 Top K
    Controller->>Controller: 归并聚合结果 (Aggregations.reduce)
    Controller->>Controller: 选出全局 Top size
    Controller-->>AsyncAction: ReducedQueryPhase<br/>(全局 Top K docIds by shard)

    Note over AsyncAction,FetchPhase: Fetch Phase
    AsyncAction->>AsyncAction: 构建 FetchSearchPhase
    AsyncAction->>AsyncAction: 按分片分组 docIdsToLoad[]

    loop 每个需要fetch的分片
        AsyncAction->>TransportSvc: sendExecuteFetch(connection, fetchRequest)
        TransportSvc->>DataNode: [RPC] ShardFetchRequest<br/>(contextId, docIds[])
        DataNode->>DataNode: executeFetchPhase(request, task, listener)
        DataNode->>DataNode: 查找现有 ReaderContext
        DataNode->>FetchPhase: FetchPhase.execute(context, docIds)
        FetchPhase->>FetchPhase: buildSearchHits(context, docIds)

        loop 每个docId
            FetchPhase->>Lucene: searcher.doc(docId) → 读取 StoredFields
            FetchPhase->>FetchPhase: 提取 _source
            FetchPhase->>FetchPhase: 执行 FetchSubPhases<br/>(highlight, fields, etc)
        end

        FetchPhase-->>DataNode: FetchSearchResult<br/>(SearchHit[])
        DataNode-->>TransportSvc: FetchSearchResult
        TransportSvc-->>AsyncAction: FetchSearchResult
    end

    Note over AsyncAction,REST: 构建最终响应
    AsyncAction->>Controller: merge(reducedQueryPhase, fetchResults)
    Controller->>Controller: 合并文档内容和聚合结果
    Controller-->>AsyncAction: SearchResponseSections
    AsyncAction->>AsyncAction: 构建 SearchResponse
    AsyncAction-->>Transport: SearchResponse
    Transport-->>REST: SearchResponse
    REST->>REST: 格式化为 JSON
    REST-->>Client: HTTP 200 OK<br/>{hits, aggregations, took}

7.2 关键代码调用链路详解

7.2.1 REST层 → Action层

RestSearchAction.prepareRequest()

关键步骤:
1. 解析 URL 参数: indices, routing, preference, scroll
2. 解析请求体: withContentOrSourceParamParserOrNull()
3. 调用 parseSearchRequest() 解析 Query DSL
4. 构建 SearchRequest 对象
5. 执行: client.execute(TransportSearchAction.TYPE, searchRequest, listener)

核心代码路径:

RestSearchAction.prepareRequest()
  → parseSearchRequest(searchRequest, request, parser)
    → searchRequest.source().parseXContent(parser) // 解析Query DSL
  → RestCancellableNodeClient.execute(TransportSearchAction.TYPE, searchRequest)
    → TransportSearchAction.doExecute()

7.2.2 TransportSearchAction协调流程

TransportSearchAction.doExecute()

关键步骤:
1. 解析目标索引: indexNameExpressionResolver.concreteIndices()
2. 计算分片路由: operationRouting.searchShards()
3. 构建别名过滤器: buildPerIndexAliasFilter()
4. 创建SearchShardIterator[]: buildShardIterators()
5. 决定搜索类型并创建AsyncAction:
   - QUERY_THEN_FETCH  SearchQueryThenFetchAsyncAction
   - DFS_QUERY_THEN_FETCH  SearchDfsQueryThenFetchAsyncAction
6. 启动异步执行: searchAsyncAction.start()

分片路由计算:

operationRouting.searchShards():
- 根据 routing 参数选择分片
- 根据 preference 选择主分片或副本分片
  - _primary: 总是使用主分片
  - _replica: 总是使用副本分片
  - _local: 优先使用本地分片
  - _only_local: 仅使用本地分片
- 构建 SearchShardIterator: 包含主分片和所有副本的迭代器

7.2.3 SearchQueryThenFetchAsyncAction执行流程

AbstractSearchAsyncAction.start() → doRun()

关键步骤:
1. 检查是否需要 CanMatch 预过滤:
   - 条件: preFilterSearchShards && 分片数 > threshold
   - 执行 CanMatchPreFilterSearchPhase
2. 如果不需要预过滤,直接执行 Query Phase:
   - 遍历所有 SearchShardIterator
   - 对每个分片调用 executePhaseOnShard()
3. 并发控制:
   - 使用 pendingExecutionsPerNode 限制每节点并发
   - 默认 maxConcurrentShardRequests = 5

并发控制机制:

// 每个节点维护一个待执行队列和 Semaphore
class PendingExecutions {
    private final Semaphore permits; // 最多maxConcurrent个许可
    private final Queue<Runnable> queue; // 待执行队列

    void tryExecute(Runnable task) {
        if (permits.tryAcquire()) {
            task.run(); // 立即执行
        } else {
            queue.add(task); // 加入队列
        }
    }

    void finishOperation() {
        permits.release();
        Runnable next = queue.poll();
        if (next != null) next.run(); // 执行下一个
    }
}

Query Phase 发送请求:

executePhaseOnShard():
1. 构建 ShardSearchRequest
2. 通过 SearchTransportService 发送 RPC 请求
3. 注册异步回调监听器 SearchActionListener

7.2.4 数据节点 SearchService 执行

SearchService.executeQueryPhase()

关键步骤:
1. 获取或创建 ReaderContext:
   - 如果是首次查询: 创建新的 ReaderContext
   - 如果是 Scroll/PIT: 复用现有 ReaderContext
2. 重写查询: rewriteAndFetchShardRequest()
   - 在分片上下文重写 Query
   - 优化查询结构
3. 提交到 search 线程池:
   - 使用 runAsync(executor, task)
   - executor = threadPool.executor(Names.SEARCH)
4. 在线程池中执行:
   - 创建 SearchContext
   - 调用 QueryPhase.execute(searchContext)
   - 返回 QuerySearchResult

SearchContext 创建:

createContext():
1. 创建 DefaultSearchContext
2. 解析 SearchSource:
   - parseSource(context, request.source())
   - 解析 query, aggregations, sort, highlight
3. 设置默认值 from=0, size=10
4. 执行 preProcess():
   - 构建 Lucene Query
   - 初始化聚合上下文

7.2.5 QueryPhase 执行

QueryPhase.execute()

关键步骤:
1. 预处理聚合: AggregationPhase.preProcess()
2. 构建 CollectorManager:
   - TopDocsCollector: 收集 Top K 文档
   - AggregationCollector: 执行聚合计算
   - 使用 MultiCollector 组合多个 Collector
3. 执行搜索: searcher.search(query, collectorManager)
4. 后处理:
   - RescorePhase.execute(): 重新打分
   - SuggestPhase.execute(): 搜索建议
5. 设置结果: queryResult.topDocs()

Collector 构建:

QueryPhaseCollectorManager.newCollector():
1. 如果启用 profile: 包装 InternalProfileCollector
2. 创建 TopDocsCollector:
   - 如果有排序: TopFieldCollector
   - 否则: TopScoreDocCollector
3. 创建 AggregationCollector (如果有聚合)
4. 组合成 QueryPhaseCollector

7.2.6 Reduce Phase 归并

SearchPhaseController.reducedQueryPhase()

关键步骤:
1. 收集所有非空的 QuerySearchResult
2. 归并排序:
   - 使用 sortDocs() 进行 K 路归并
   - 创建 ScoreDoc[] 优先队列
   - 按分数/排序字段排序
3. 归并聚合:
   - InternalAggregations.reduce()
   - 对每种聚合类型执行归并逻辑
4. 构建 ReducedQueryPhase:
   - 包含全局 Top K  (shardIndex, docId)
   - 归并后的聚合结果

sortDocs() 归并算法:

算法流程:
1. 创建 PriorityQueue<ScoreDoc>
2. 从每个分片的 TopDocs 取第一个文档放入队列
3. 循环:
   - 从队列取出最高分/最优排序值的文档
   - 从该分片取下一个文档放入队列
   - 重复直到收集够 size 个文档
4. 时间复杂度: O(N * K * log K)
   - N = 分片数
   - K = size + from

7.2.7 Fetch Phase 执行

FetchPhase.execute()

关键步骤:
1. 如果 docIds 为空: 返回空结果
2. 调用 buildSearchHits(context, docIds):
   - 遍历所有 docId
   - 对每个 docId:
     a. 读取 StoredFields: searcher.doc(docId)
     b. 提取 _source 字段
     c. 执行 FetchSubPhases
3. FetchSubPhases 包括:
   - FetchSourcePhase: 过滤 _source
   - HighlightPhase: 生成高亮片段
   - FetchFieldsPhase: 提取指定字段
   - FetchDocValuesPhase: 提取 doc_values

FetchSubPhase 处理:

getProcessors():
1. 遍历所有注册的 FetchSubPhase
2. 调用 fsp.getProcessor(context)
3. 如果返回非空 processor:
   - 包装成 ProfileProcessor (如果启用 profile)
   - 添加到 processors 列表
4. 返回 processors 列表

FetchSubPhase 执行顺序:

1. FetchSourcePhase: 提取和过滤 _source
2. FetchFieldsPhase: 提取 fields
3. FetchDocValuesPhase: 提取 docvalue_fields
4. HighlightPhase: 生成高亮
5. ExplainPhase: 生成评分解释 (如果 explain=true)
6. InnerHitsPhase: 处理嵌套/父子文档

8. 核心流程时序图

8.1 标准搜索流程 (QUERY_THEN_FETCH)

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant Coord as 协调节点<br/>(TransportSearchAction)
    participant CanMatch as CanMatch Phase
    participant Shard1 as Shard 1
    participant Shard2 as Shard 2
    participant ShardN as Shard N
    participant Reducer as SearchPhaseController<br/>(结果归并)

    Client->>Coord: POST /index/_search<br/>{query, size, from}

    Coord->>Coord: 1. 解析请求<br/>构建 SearchRequest
    Coord->>Coord: 2. 计算目标分片<br/>(所有主分片或副本)

    Note over Coord,ShardN: CanMatch Phase - 预过滤

    Coord->>CanMatch: 3. 启动 CanMatch Phase

    par 并发检查所有分片
        CanMatch->>Shard1: CanMatchRequest<br/>(min/max range check)
        Shard1->>Shard1: 检查分片范围<br/>(MinMax统计)
        Shard1-->>CanMatch: canMatch=true
    and
        CanMatch->>Shard2: CanMatchRequest
        Shard2->>Shard2: 检查分片范围
        Shard2-->>CanMatch: canMatch=false<br/>(跳过此分片)
    and
        CanMatch->>ShardN: CanMatchRequest
        ShardN->>ShardN: 检查分片范围
        ShardN-->>CanMatch: canMatch=true
    end

    CanMatch->>Coord: 过滤后的分片列表<br/>[Shard1, ShardN]

    Note over Coord,ShardN: Query Phase - 查询阶段

    Coord->>Coord: 4. 启动 Query Phase

    par 并发查询所有匹配的分片
        Coord->>Shard1: ShardSearchRequest<br/>(query, size+from, aggs)

        Shard1->>Shard1: 5. 创建 SearchContext
        Shard1->>Shard1: 6. 构建 Lucene Query
        Shard1->>Shard1: 7. 执行搜索<br/>searcher.search(query, collector)
        Shard1->>Shard1: 8. 收集 Top (size+from) 文档
        Shard1->>Shard1: 9. 执行聚合<br/>(如果有)

        Shard1-->>Coord: QuerySearchResult<br/>(docIds[], scores[], aggs)
    and
        Coord->>ShardN: ShardSearchRequest

        ShardN->>ShardN: 创建 SearchContext
        ShardN->>ShardN: 构建 Query
        ShardN->>ShardN: 执行搜索
        ShardN->>ShardN: 收集 Top docs
        ShardN->>ShardN: 执行聚合

        ShardN-->>Coord: QuerySearchResult<br/>(docIds[], scores[], aggs)
    end

    Note over Coord,Reducer: Reduce Phase - 归并阶段

    Coord->>Reducer: 10. 归并所有分片结果

    Reducer->>Reducer: 11. 归并排序<br/>所有 QueryResult
    Reducer->>Reducer: 12. 选择全局 Top (size)<br/>跳过 from
    Reducer->>Reducer: 13. 归并聚合结果<br/>(如果有)

    Reducer-->>Coord: ReducedQueryPhase<br/>(Top K docIds by shard)

    Note over Coord,ShardN: Fetch Phase - 获取阶段

    Coord->>Coord: 14. 构建 Fetch 请求<br/>按分片分组 docIds

    par 并发获取文档内容
        Coord->>Shard1: ShardFetchRequest<br/>(docIds: [3, 7, 15])

        Shard1->>Shard1: 15. 读取文档内容<br/>searcher.doc(docId)
        Shard1->>Shard1: 16. 生成高亮<br/>(如果需要)
        Shard1->>Shard1: 17. 提取 _source

        Shard1-->>Coord: FetchSearchResult<br/>(documents[])
    and
        Coord->>ShardN: ShardFetchRequest<br/>(docIds: [2, 9])

        ShardN->>ShardN: 读取文档
        ShardN->>ShardN: 生成高亮
        ShardN->>ShardN: 提取 _source

        ShardN-->>Coord: FetchSearchResult<br/>(documents[])
    end

    Coord->>Coord: 18. 合并 Fetch 结果<br/>按分数顺序排列

    Coord-->>Client: SearchResponse<br/>{hits[], aggregations, took, total}

时序图说明

阶段划分

1. CanMatch Phase (预过滤)

  • 目的: 快速过滤掉不包含匹配文档的分片
  • 方法: 检查分片的 min/max 值范围
  • 优化: 可以减少 50%+ 的分片查询
  • 适用: 带范围过滤的查询 (range, date_range)

2. Query Phase (查询阶段)

  • 目的: 获取每个分片的 Top K 文档 ID 和分数
  • 输出: 轻量级结果 (docId + score + sort values)
  • 并发: 所有分片并行执行
  • 内存: 每个分片保留 size+from 个结果

3. Reduce Phase (归并阶段)

  • 目的: 合并所有分片的结果,选出全局 Top K
  • 算法: 归并排序 (O(NKlog K))
  • 聚合: 合并聚合结果
  • 输出: 全局 Top size 的 (shardId, docId)

4. Fetch Phase (获取阶段)

  • 目的: 获取完整文档内容
  • 输入: 全局 Top K 的 (shardId, docId)
  • 并发: 按需分片并行获取
  • 开销: 网络传输 + 文档读取

时序图功能说明

流程概述

该时序图展示了Elasticsearch标准搜索(QUERY_THEN_FETCH)的完整执行流程,从客户端发起HTTP请求到返回最终结果的全过程。整个流程分为四个主要阶段:

  1. CanMatch Phase (步骤1-11): 预过滤阶段,快速判断哪些分片可能包含匹配文档
  2. Query Phase (步骤12-20): 查询阶段,在每个匹配的分片上执行查询并返回Top K的文档ID和分数
  3. Reduce Phase (步骤21-23): 归并阶段,在协调节点合并所有分片的结果并选出全局Top K
  4. Fetch Phase (步骤24-33): 获取阶段,获取Top K文档的完整内容

关键步骤详解

步骤1-2: 客户端请求

  • 客户端通过HTTP POST发送搜索请求
  • 请求体包含查询条件(query)、分页参数(size, from)、排序规则等

步骤3-4: CanMatch预过滤

  • 协调节点向所有分片并发发送CanMatch请求
  • 每个分片检查自己的最小/最大值范围(基于PointValues)
  • 如果分片的数据范围与查询条件不重叠,返回canMatch=false
  • 优化效果: 对于带范围过滤的查询,可以跳过50%+的分片

步骤5-20: Query Phase

  • 步骤5: 协调节点启动Query Phase,向所有canMatch=true的分片发送查询请求
  • 步骤6-9: 每个分片并发执行:
    • 创建SearchContext封装搜索状态
    • 构建Lucene Query对象
    • 执行searcher.search()收集Top (size+from)个文档
    • 如果有聚合,同时执行聚合计算
  • 步骤10: 分片返回QuerySearchResult,包含:
    • docIds[]: 文档内部ID数组
    • scores[]: 对应的分数数组
    • sortValues[][]: 排序字段值(如果有排序)
    • aggregations: 分片级聚合结果
  • 关键优化: Query Phase仅传输轻量级数据(ID+分数),不传输文档内容

步骤11-13: Reduce Phase

  • 步骤11: 协调节点收集所有分片的QuerySearchResult
  • 步骤12: 执行K路归并排序:
    • 使用优先队列(PriorityQueue)合并所有分片的Top K
    • 时间复杂度: O(N * K * log K), N=分片数, K=size+from
    • 按分数或排序字段值排序
  • 步骤13: 归并聚合结果:
    • 对sum/avg/max/min类型聚合直接合并
    • 对terms类型聚合重新计算Top Terms
  • 步骤14: 选出全局Top size,跳过from

步骤14-23: Fetch Phase

  • 步骤14: 构建Fetch请求,按分片分组docIds
  • 步骤15-20: 向需要的分片并发发送ShardFetchRequest
  • 步骤21-22: 每个分片执行:
    • 通过docId读取StoredFields
    • 提取_source字段
    • 生成高亮片段(如果请求了highlight)
    • 执行其他FetchSubPhases
  • 步骤23: 返回FetchSearchResult,包含完整文档数据

步骤24-25: 构建最终响应

  • 合并Fetch结果和聚合结果
  • 按全局Top K的顺序排列文档
  • 计算总耗时(took)
  • 返回SearchResponse给客户端

性能特点

阶段 网络传输 计算开销 并发度 优化空间
CanMatch 极小(仅minmax) 极低 所有分片 缓存minmax统计
Query 小(ID+score) 高(查询+聚合) 所有匹配分片 缓存查询结果
Reduce 中(归并排序) 单线程 批量归并
Fetch 大(完整文档) 低(读取) 仅需要的分片 缓存文档内容

并发控制

  • 分片级并发: 默认同时查询5个分片(maxConcurrentShardRequests)
  • 节点级背压: 每个数据节点最多同时处理5个来自同一协调节点的请求
  • 自适应调整: 根据集群规模动态调整并发数

失败重试

  • 如果某个分片请求失败,自动尝试该分片的副本
  • 如果所有副本都失败:
    • allowPartialSearchResults=true: 返回部分结果
    • allowPartialSearchResults=false: 返回503错误

8.2 CanMatch Phase 内部时序图

sequenceDiagram
    autonumber
    participant Coord as 协调节点<br/>CanMatchPreFilterPhase
    participant Rewrite as CoordinatorRewriteContext<br/>查询重写
    participant Transport as SearchTransportService
    participant Shard as 数据节点分片
    participant MinMax as PointValues<br/>MinMax统计

    Note over Coord,MinMax: 阶段0: 协调节点重写

    Coord->>Coord: runCoordinatorRewritePhase()

    loop 每个分片
        Coord->>Rewrite: queryStillMatchesAfterRewrite()
        Rewrite->>Rewrite: 重写查询<br/>(范围查询优化)
        Rewrite->>Rewrite: 检查查询是否可能匹配<br/>(基于索引元数据)
        Rewrite-->>Coord: canMatch (true/false)

        alt canMatch=false (协调节点就能判断)
            Coord->>Coord: 标记分片skip=true
            Coord->>Coord: numPossibleMatches不增加
        else canMatch=true
            Coord->>Coord: 加入matchedShardLevelRequests
        end
    end

    Note over Coord,MinMax: 阶段1: 向数据节点发送CanMatch请求

    Coord->>Coord: 按节点分组请求<br/>groupByNode()

    loop 每个目标节点
        Coord->>Transport: sendCanMatch(nodeId, shards[])
        Transport->>Shard: [RPC] CanMatchNodeRequest

        loop 该节点的每个分片
            Shard->>Shard: 获取 IndexReader
            Shard->>MinMax: 获取字段的min/max值<br/>PointValues.getMinPackedValue()<br/>PointValues.getMaxPackedValue()
            MinMax-->>Shard: min/max BytesRef

            Shard->>Shard: 检查查询范围与分片范围是否重叠
            Note over Shard: 示例: 查询 date >= 2024-01-01<br/>分片 max(date) = 2023-12-31<br/>→ canMatch = false

            Shard->>Shard: 如果有排序,同时返回minSortValue/maxSortValue
            Shard-->>Transport: CanMatchShardResponse<br/>(canMatch, minMax, estimatedCount)
        end

        Transport-->>Coord: CanMatchNodeResponse[]
    end

    Note over Coord: 阶段2: 处理响应并排序分片

    loop 每个分片响应
        alt canMatch=true
            Coord->>Coord: possibleMatches.set(shardIndex)
            Coord->>Coord: numPossibleMatches++
            Coord->>Coord: 记录minAndMaxes[shardIndex]
        else canMatch=false
            Coord->>Coord: 跳过该分片
        end
    end

    Coord->>Coord: 如果有排序,按minSortValue排序分片<br/>(确保包含早期数据的分片先查询)

    Coord->>Coord: 构建过滤后的 SearchShardIterator[]
    Coord-->>Coord: 返回可能匹配的分片列表<br/>numPossibleMatches / totalShards

    Note over Coord: 优化效果示例:<br/>原始128个分片 → 过滤后仅64个分片<br/>减少50%的Query Phase工作量

CanMatch Phase 功能详解

目的与触发条件

  • 目的: 在不执行完整查询的情况下,快速判断分片是否可能包含匹配文档
  • 触发条件:
    • 分片总数 > pre_filter_shard_size (默认128)
    • 或显式启用 preFilterSearchShards=true
  • 跳过条件: 对于小规模索引(分片数<128),直接执行Query Phase更高效

两级过滤机制

1. 协调节点级过滤 (Coordinator Rewrite)

  • 执行位置: 协调节点
  • 数据来源: 集群元数据 (ClusterState)
  • 检查内容:
    • 索引创建时间范围
    • 分片的routing元数据
    • 简单的查询改写(query rewrite)
  • 优点: 零网络开销,极快
  • 局限: 只能基于元数据,无法访问实际数据

2. 数据节点级过滤 (PointValues MinMax)

  • 执行位置: 数据节点
  • 数据来源: Lucene PointValues的min/max统计
  • 检查内容:
    • 数值字段的最小/最大值
    • 日期字段的最早/最晚时间
    • 地理位置字段的边界框
  • 算法:
    // 伪代码
    boolean canMatch(Query query, Shard shard) {
        if (query instanceof RangeQuery) {
            BytesRef shardMin = shard.getMinValue(field);
            BytesRef shardMax = shard.getMaxValue(field);
    
            // 查询范围: [queryMin, queryMax]
            // 分片范围: [shardMin, shardMax]
            // 判断是否重叠
            if (queryMax < shardMin || queryMin > shardMax) {
                return false; // 不重叠,跳过分片
            }
        }
        return true; // 可能匹配
    }
    

分片排序优化

如果查询包含排序,CanMatch Phase会收集每个分片的minSortValue和maxSortValue,并据此排序分片:

  • 降序排序 (desc): 先查询maxSortValue最大的分片
  • 升序排序 (asc): 先查询minSortValue最小的分片

好处:

  • 先返回最相关的结果,改善用户体验
  • 可能触发早期终止(early termination),减少查询的分片数

性能数据

场景 原始分片数 过滤后分片数 减少比例 性能提升
日期范围查询(最近7天) 365 7 98% 10-50x
数值范围查询(price > 1000) 100 45 55% 2-3x
无范围查询(match query) 100 100 0% 无提升

最佳实践

  1. 使用日期分区索引: 按天/周/月创建索引,CanMatch可以完全跳过旧索引
  2. 在范围字段上使用PointValues: 数值、日期、IP字段自动支持
  3. 合理设置pre_filter_shard_size: 对于大集群可以降低阈值(如64)

8.3 Query Phase 内部时序图

sequenceDiagram
    autonumber
    participant Coord as 协调节点<br/>AsyncAction
    participant Transport as SearchTransportService
    participant Service as SearchService<br/>(数据节点)
    participant Context as DefaultSearchContext
    participant QueryPhase as QueryPhase
    participant Collector as CollectorManager
    participant Lucene as Lucene IndexSearcher
    participant Aggs as AggregationPhase

    Note over Coord,Lucene: 协调节点发起Query请求

    Coord->>Coord: executePhaseOnShard(shardIt)
    Coord->>Coord: buildShardSearchRequest(shardIt)<br/>构造ShardSearchRequest
    Coord->>Coord: 应用并发控制<br/>(每节点最多5个并发)
    Coord->>Transport: sendExecuteQuery(connection, request)
    Transport->>Service: [RPC] ShardSearchRequest

    Note over Service,Context: 数据节点接收并处理

    Service->>Service: executeQueryPhase(request, task, listener)
    Service->>Service: 获取/创建 ReaderContext<br/>(持有IndexReader引用)
    Service->>Service: rewriteAndFetchShardRequest()<br/>重写查询(分片级优化)
    Service->>Service: 提交到search线程池<br/>executor.execute()

    Note over Service,Context: search线程池中执行

    Service->>Context: createContext(readerContext, request)
    Context->>Context: new DefaultSearchContext()
    Context->>Context: parseSource(request.source())<br/>解析查询/聚合/排序

    Context->>Context: 构建Lucene Query对象
    Note over Context: QueryBuilder → Query转换:<br/>MatchQueryBuilder → TermQuery<br/>RangeQueryBuilder → PointRangeQuery<br/>BoolQueryBuilder → BooleanQuery

    Context->>Context: 构建Sort对象<br/>(如果有排序)
    Context->>Context: 解析Aggregations<br/>构建AggregatorFactories
    Context->>Context: preProcess()<br/>查询预处理和验证

    Service->>QueryPhase: QueryPhase.execute(searchContext)

    Note over QueryPhase,Aggs: Query Phase 执行

    QueryPhase->>Aggs: AggregationPhase.preProcess(searchContext)
    Aggs->>Aggs: 为每个聚合创建Aggregator
    Aggs->>Aggs: 构建聚合树(嵌套聚合)
    Aggs-->>QueryPhase: AggregationContext

    QueryPhase->>Collector: createQueryPhaseCollectorManager()

    alt 有排序
        Collector->>Collector: new TopFieldCollectorManager<br/>(排序字段, size+from)
    else 无排序
        Collector->>Collector: new TopScoreDocCollectorManager<br/>(size+from)
    end

    alt 有聚合
        Collector->>Collector: aggsCollectorManager.newCollector()
        Collector->>Collector: MultiCollector.wrap(<br/>  topDocsCollector,<br/>  aggsCollector<br/>)
    end

    alt 有postFilter
        Collector->>Collector: 创建FilteredCollector<br/>(在聚合后过滤)
    end

    Collector-->>QueryPhase: QueryPhaseCollectorManager

    QueryPhase->>Lucene: searcher.search(query, collectorManager)

    Note over Lucene: Lucene 搜索执行

    Lucene->>Lucene: 1. Weight weight = query.createWeight()
    Lucene->>Lucene: 2. 遍历所有Segment

    loop 每个Segment
        Lucene->>Lucene: LeafReaderContext leafCtx = segment
        Lucene->>Lucene: Scorer scorer = weight.scorer(leafCtx)
        Lucene->>Lucene: Collector collector = collectorManager.newCollector()
        Lucene->>Lucene: collector.setScorer(scorer)

        loop 匹配的文档
            Lucene->>Lucene: int docId = scorer.nextDoc()
            Lucene->>Lucene: float score = scorer.score()
            Lucene->>Lucene: collector.collect(docId)

            alt TopDocsCollector
                Lucene->>Lucene: 如果分数足够高<br/>加入PriorityQueue
            end

            alt AggsCollector
                Lucene->>Lucene: 执行聚合计算<br/>aggregator.collect(docId)
            end
        end

        Lucene->>Lucene: collector.finishLeaf()
    end

    Lucene->>Lucene: collectorManager.reduce(collectors)<br/>合并所有Segment的结果
    Lucene-->>QueryPhase: QueryPhaseResult<br/>(TopDocs, totalHits)

    Note over QueryPhase: 后处理阶段

    QueryPhase->>QueryPhase: 设置queryResult.topDocs(topDocs)
    QueryPhase->>QueryPhase: 设置queryResult.aggregations(aggs)

    alt 有Rescore
        QueryPhase->>QueryPhase: RescorePhase.execute(searchContext)<br/>重新打分Top K文档
    end

    alt 有Suggest
        QueryPhase->>QueryPhase: SuggestPhase.execute(searchContext)<br/>生成搜索建议
    end

    QueryPhase-->>Service: QuerySearchResult

    Note over Service,Transport: 返回结果给协调节点

    Service->>Service: 检查是否单次查询<br/>(non-scroll)

    alt 单次查询且无结果
        Service->>Service: freeReaderContext()<br/>立即释放资源
    else Scroll/PIT
        Service->>Service: 保持ReaderContext<br/>等待后续请求
    end

    Service-->>Transport: QuerySearchResult<br/>(docIds[], scores[], aggs)
    Transport-->>Coord: QuerySearchResult

    Note over Coord: 收集所有分片结果<br/>准备进入Reduce Phase

Query Phase 功能详解

执行位置与线程模型

  • 执行位置: 数据节点的每个主分片或副本分片
  • 线程池: search 线程池
    • 大小: int((processors * 3) / 2) + 1
    • 队列: 1000
    • 拒绝策略: AbortPolicy
  • 并发: 所有分片并行执行

查询重写优化

在分片级别,查询会经过多次重写优化:

// 重写示例
原始查询: MatchQuery("title", "elasticsearch")
   重写1: 分析器处理
TermQuery("title", "elasticsearch")
   重写2: 前缀优化
如果title字段只有一个Term匹配,可能重写为ConstantScoreQuery
   重写3: 分片级优化
如果该分片该Term的docFreq=0,重写为MatchNoDocsQuery

Collector 组合策略

根据查询参数,QueryPhase会创建不同的Collector组合:

查询特征 Collector 组合 说明
仅查询,无聚合,无排序 TopScoreDocCollector 最简单,仅收集Top K文档
有排序,无聚合 TopFieldCollector 按排序字段收集
有聚合,无排序 TopScoreDocCollector + AggsCollector 并行收集
有聚合,有排序 TopFieldCollector + AggsCollector 并行收集
有postFilter FilteredCollector(上述任一) 先聚合,再过滤

聚合执行

聚合在Query Phase并行执行,与文档收集同时进行:

// 聚合执行伪代码
for (Document doc : matchedDocs) {
    topDocsCollector.collect(doc); // 收集到TopDocs
    aggsCollector.collect(doc);     // 同时执行聚合

    // 对于terms聚合
    String value = doc.getField("category");
    termsBucket.get(value).count++;

    // 对于metrics聚合
    double price = doc.getField("price");
    sumAggregator.sum += price;
    avgAggregator.sum += price;
    avgAggregator.count++;
}

TopDocs 收集策略

  • size + from: 每个分片必须收集 (size + from) 个文档
    • 例如: size=10, from=100 → 每个分片收集110个文档
    • 协调节点归并后跳过前100个,返回10个
  • 优先队列: 使用固定大小的PriorityQueue维护Top K
    • 当新文档分数高于队列最低分时,替换最低分文档
    • 队列满后,只处理分数足够高的文档
  • 提前终止: 如果设置了terminate_after参数,收集到指定数量文档后提前终止

Rescore 重新打分

如果查询包含rescorer,会在Query Phase的最后阶段执行:

  1. 基于主查询收集Top K文档
  2. 对Top K文档执行rescore查询(通常更复杂/更精确)
  3. 重新计算分数并重新排序
  4. 返回重排序后的Top K

性能优化点

  1. 查询缓存: 对于filter查询,Lucene会缓存匹配的文档集合
  2. 段级缓存: 对于常见查询,Lucene在段级别缓存结果
  3. 跳跃指针: 使用skip list加速文档遍历
  4. WAND优化: 对于disjunction查询(OR),使用WAND算法减少评分计算

8.4 Reduce Phase 内部时序图

sequenceDiagram
    autonumber
    participant AsyncAction as SearchQueryThenFetch<br/>AsyncAction
    participant Consumer as QueryPhaseResult<br/>Consumer
    participant Controller as SearchPhaseController
    participant MergeQueue as PriorityQueue<br/>归并队列
    participant AggsReduce as InternalAggregations<br/>聚合归并

    Note over AsyncAction,AggsReduce: 收集所有分片结果

    loop 每个分片
        AsyncAction->>Consumer: onShardResult(QuerySearchResult)
        Consumer->>Consumer: 添加到results数组<br/>results.set(shardIndex, result)
        Consumer->>Consumer: successfulOps++

        alt 启用批量归并 && 达到batchSize
            Consumer->>Consumer: 执行批量部分归并<br/>减少内存占用
        end
    end

    AsyncAction->>AsyncAction: 等待所有分片完成<br/>countDown.await()
    AsyncAction->>AsyncAction: getNextPhase() → 触发Reduce

    Note over Controller,MergeQueue: Reduce Phase 开始

    AsyncAction->>Controller: reducedQueryPhase(queryResults)

    Controller->>Controller: 1. 收集所有非空 QuerySearchResult
    Controller->>Controller: 2. 验证排序字段格式一致性

    Note over Controller: 归并TopDocs

    Controller->>Controller: sortDocs(results[], from, size)
    Controller->>MergeQueue: 创建 ScoreDocQueue<br/>优先队列(容量=size)

    loop 每个分片的TopDocs
        MergeQueue->>MergeQueue: 取第一个ScoreDoc加入队列<br/>queue.insertWithOverflow(scoreDoc)
    end

    loop size 次
        MergeQueue->>MergeQueue: 从队列取出最高分文档<br/>topDoc = queue.pop()
        MergeQueue->>MergeQueue: 记录 (shardIndex, docId, score)

        alt 该分片还有更多文档
            MergeQueue->>MergeQueue: 从该分片取下一个ScoreDoc<br/>加入队列
        end
    end

    MergeQueue-->>Controller: 全局TopDocs[]<br/>已按分数排序

    Controller->>Controller: 跳过前 from 个文档
    Controller->>Controller: 选出 size 个文档

    Note over Controller,AggsReduce: 归并聚合结果

    alt 有聚合
        Controller->>AggsReduce: reduce(List<InternalAggregations>)

        loop 每个聚合
            alt Terms聚合
                AggsReduce->>AggsReduce: 1. 合并所有分片的buckets
                AggsReduce->>AggsReduce: 2. 按count降序排序
                AggsReduce->>AggsReduce: 3. 选出TopN buckets
                AggsReduce->>AggsReduce: 4. 递归归并子聚合
            else Metrics聚合 (sum/avg/max/min)
                AggsReduce->>AggsReduce: 直接合并指标值
                Note over AggsReduce: sum: 累加所有分片的sum<br/>avg: sum(所有sum) / sum(所有count)<br/>max: max(所有max)<br/>min: min(所有min)
            else Cardinality聚合
                AggsReduce->>AggsReduce: 合并HyperLogLog草图
            else Percentiles聚合
                AggsReduce->>AggsReduce: 合并TDigest草图
            end
        end

        AggsReduce-->>Controller: 归并后的InternalAggregations
    end

    Note over Controller: 归并Suggest结果

    alt 有Suggest
        Controller->>Controller: Suggest.reduce(groupedSuggestions)

        loop 每个Suggestion
            Controller->>Controller: 合并所有分片的options
            Controller->>Controller: 按score排序
            Controller->>Controller: 选出TopN options
        end
    end

    Note over Controller: 构建ReducedQueryPhase

    Controller->>Controller: new ReducedQueryPhase(<br/>  topDocs: 全局Top K的(shard,docId),<br/>  aggregations: 归并后的聚合,<br/>  totalHits: 总命中数,<br/>  maxScore: 最高分,<br/>  suggest: 归并后的建议,<br/>  numReducePhases: 归并阶段数<br/>)

    Controller-->>AsyncAction: ReducedQueryPhase

    AsyncAction->>AsyncAction: 构建docIdsToLoad[]<br/>按分片分组TopDocs

    Note over AsyncAction: 准备进入Fetch Phase

Reduce Phase 功能详解

执行位置与时机

  • 执行位置: 协调节点
  • 执行时机: 收集到所有(或足够多)分片的QuerySearchResult后
  • 线程: 协调节点的search线程

批量归并优化

对于大规模搜索(涉及大量分片),Elasticsearch使用批量归并(Batched Reduce)策略:

// 批量归并参数
int batchedReduceSize = 512; // 默认每批512个分片

// 归并策略
if (numShards <= batchedReduceSize) {
    // 少量分片: 一次性归并
    reduce(allResults);
} else {
    // 大量分片: 分批归并
    while (results.size() > batchedReduceSize) {
        List<QuerySearchResult> batch = results.take(batchedReduceSize);
        ReducedQueryPhase partial = reduce(batch);
        results.add(partial); // 部分结果作为新的"分片"
    }
    reduce(results); // 最终归并
}

好处:

  • 减少内存占用: 避免同时持有所有分片的结果
  • 减少GC压力: 及时释放已归并的结果
  • 提高响应速度: 可以流式处理结果

归并排序算法详解

使用K路归并排序算法,基于PriorityQueue实现:

// 归并排序伪代码
PriorityQueue<ScoreDoc> queue = new PriorityQueue<>(
    size,
    (doc1, doc2) -> {
        // 比较规则
        if (doc1.score != doc2.score) {
            return Float.compare(doc2.score, doc1.score); // 分数降序
        }
        // 分数相同,按shardIndex排序(保证稳定性)
        return Integer.compare(doc1.shardIndex, doc2.shardIndex);
    }
);

// 初始化: 每个分片的第一个文档加入队列
for (int i = 0; i < numShards; i++) {
    TopDocs shardTopDocs = results[i].topDocs();
    if (shardTopDocs.scoreDocs.length > 0) {
        ScoreDoc doc = shardTopDocs.scoreDocs[0];
        doc.shardIndex = i;
        doc.docIndex = 0;
        queue.add(doc);
    }
}

// 归并: 取size个最高分文档
ScoreDoc[] topDocs = new ScoreDoc[size];
for (int i = 0; i < size && queue.isNotEmpty(); i++) {
    ScoreDoc top = queue.poll(); // 取出当前最高分
    topDocs[i] = top;

    // 从同一分片取下一个文档
    TopDocs shardTopDocs = results[top.shardIndex].topDocs();
    int nextDocIndex = top.docIndex + 1;
    if (nextDocIndex < shardTopDocs.scoreDocs.length) {
        ScoreDoc nextDoc = shardTopDocs.scoreDocs[nextDocIndex];
        nextDoc.shardIndex = top.shardIndex;
        nextDoc.docIndex = nextDocIndex;
        queue.add(nextDoc);
    }
}

时间复杂度分析:

  • 初始化: O(N), N=分片数
  • 归并: O(K * log N), K=size
  • 总计: O(N + K * log N)
  • 对于典型场景 (N=10, K=10): ~40次比较操作

聚合归并策略

不同类型的聚合有不同的归并策略:

1. Terms聚合归并

// Terms聚合归并算法
Map<String, Bucket> mergedBuckets = new HashMap<>();

// 1. 合并所有分片的buckets
for (TermsAgg shardAgg : shardAggs) {
    for (Bucket bucket : shardAgg.getBuckets()) {
        Bucket merged = mergedBuckets.computeIfAbsent(
            bucket.getKey(),
            k -> new Bucket(k)
        );
        merged.docCount += bucket.docCount;
        // 递归归并子聚合
        merged.subAggs = reduceSubAggs(merged.subAggs, bucket.subAggs);
    }
}

// 2. 排序并选出TopN
List<Bucket> topBuckets = mergedBuckets.values()
    .stream()
    .sorted(Comparator.comparingLong(Bucket::getDocCount).reversed())
    .limit(size)
    .collect(Collectors.toList());

2. Metrics聚合归并

// Metrics聚合归并
// Sum聚合
double totalSum = shardSums.stream().mapToDouble(s -> s).sum();

// Avg聚合
double totalSum = shardAvgs.stream().mapToDouble(a -> a.sum).sum();
long totalCount = shardAvgs.stream().mapToLong(a -> a.count).sum();
double avg = totalSum / totalCount;

// Max聚合
double max = shardMaxs.stream().mapToDouble(m -> m).max().getAsDouble();

// Min聚合
double min = shardMins.stream().mapToDouble(m -> m).min().getAsDouble();

3. Cardinality聚合归并

// 基于HyperLogLog算法
HyperLogLogPlusPlus merged = new HyperLogLogPlusPlus(precision);
for (HyperLogLogPlusPlus shardHLL : shardHLLs) {
    merged.merge(shardHLL); // 合并HLL草图
}
long cardinality = merged.cardinality(); // 估算基数

深度分页问题

Reduce Phase是深度分页性能问题的根源:

from + size 每分片返回 10分片总计 内存占用 归并计算
10 10 100 docs 几KB ~100次比较
1000 1000 10000 docs 几MB ~10K次比较
10000 10000 100K docs 几十MB ~100K次比较

解决方案:

  1. 使用search_after分页(推荐)
  2. 使用Scroll API(导出场景)
  3. 限制max_result_window(默认10000)

8.5 Fetch Phase 内部时序图

sequenceDiagram
    autonumber
    participant Coord as 协调节点<br/>FetchSearchPhase
    participant Transport as SearchTransportService
    participant Service as SearchService<br/>(数据节点)
    participant Context as SearchContext
    participant FetchPhase as FetchPhase
    participant Lucene as Lucene IndexSearcher
    participant SubPhase1 as FetchSourcePhase
    participant SubPhase2 as HighlightPhase
    participant SubPhase3 as FetchFieldsPhase

    Note over Coord: 构建Fetch请求

    Coord->>Coord: ReducedQueryPhase获取<br/>全局Top K的(shardIndex, docId)
    Coord->>Coord: 按分片分组docIds<br/>Map<ShardId, int[]>

    loop 每个需要fetch的分片
        Note over Coord,Service: 发送Fetch请求

        Coord->>Coord: 构建ShardFetchRequest<br/>(contextId, docIds[])
        Coord->>Transport: sendExecuteFetch(connection, request)
        Transport->>Service: [RPC] ShardFetchRequest

        Note over Service,FetchPhase: 数据节点处理Fetch

        Service->>Service: executeFetchPhase(request, task, listener)
        Service->>Service: 查找现有ReaderContext<br/>findReaderContext(contextId)

        alt ReaderContext不存在
            Service->>Service: 抛出SearchContextMissingException
            Note over Service: 可能原因:<br/>1. Context已超时被清理<br/>2. 节点重启<br/>3. Context ID错误
        end

        Service->>Context: ReaderContext获取SearchContext
        Service->>FetchPhase: FetchPhase.execute(context, docIds)

        Note over FetchPhase,Lucene: 构建SearchHits

        FetchPhase->>FetchPhase: buildSearchHits(context, docIds)
        FetchPhase->>FetchPhase: 创建StoredFieldLoader<br/>(优化字段读取)
        FetchPhase->>FetchPhase: 创建SourceLoader<br/>(加载_source)
        FetchPhase->>FetchPhase: 创建IdLoader<br/>(加载_id)

        FetchPhase->>FetchPhase: 初始化所有FetchSubPhaseProcessor
        FetchPhase->>FetchPhase: Segment分组docIds

        loop 每个Segment
            FetchPhase->>FetchPhase: setNextReader(leafContext)

            loop Segment的每个docId
                Note over FetchPhase,Lucene: 处理单个文档

                FetchPhase->>Lucene: storedFieldLoader.load(docId)<br/>读取StoredFields
                Lucene->>Lucene: .fdt文件读取存储字段
                Lucene-->>FetchPhase: StoredFields (Map<field, value>)

                FetchPhase->>Lucene: idLoader.getId(docId)<br/>读取_id
                Lucene-->>FetchPhase: _id

                FetchPhase->>Lucene: sourceLoader.load(docId)<br/>读取_source
                Lucene->>Lucene: StoredFields或SourceOnlySnapshot读取
                Lucene-->>FetchPhase: BytesReference (_source JSON)

                FetchPhase->>FetchPhase: 创建SearchHit对象
                FetchPhase->>FetchPhase: 设置id, score, sortValues

                Note over FetchPhase,SubPhase3: 执行FetchSubPhases

                FetchPhase->>SubPhase1: FetchSourcePhase.process(hit)
                SubPhase1->>SubPhase1: 应用_source过滤<br/>(includes/excludes)
                SubPhase1->>SubPhase1: 过滤后的_source设置到hit
                SubPhase1-->>FetchPhase: 完成

                FetchPhase->>SubPhase3: FetchFieldsPhase.process(hit)
                SubPhase3->>SubPhase3: _source或docValues提取fields
                SubPhase3->>SubPhase3: 应用字段格式化
                SubPhase3->>SubPhase3: 设置hit.fields
                SubPhase3-->>FetchPhase: 完成

                alt 请求了高亮
                    FetchPhase->>SubPhase2: HighlightPhase.process(hit)
                    SubPhase2->>SubPhase2: 选择高亮器<br/>(unified/fvh/plain)
                    SubPhase2->>Lucene: 获取term vectors<br/>或重新分析_source
                    SubPhase2->>SubPhase2: 生成高亮片段<br/>(<em>标签)
                    SubPhase2->>SubPhase2: 设置hit.highlightFields
                    SubPhase2-->>FetchPhase: 完成
                end

                alt 请求了explain
                    FetchPhase->>Lucene: explain(query, docId)<br/>获取评分解释
                    Lucene-->>FetchPhase: Explanation对象
                    FetchPhase->>FetchPhase: 设置hit.explanation
                end

                alt 请求了inner_hits (嵌套文档)
                    FetchPhase->>FetchPhase: InnerHitsPhase.process(hit)
                    FetchPhase->>FetchPhase: 递归执行嵌套文档的查询和fetch
                    FetchPhase->>FetchPhase: 设置hit.innerHits
                end

                FetchPhase->>FetchPhase: hits[i] = hit
            end
        end

        FetchPhase->>FetchPhase: new SearchHits(hits[], totalHits, maxScore)
        FetchPhase-->>Service: FetchSearchResult

        Note over Service: 更新Context的lastAccessTime<br/>保持Context活跃

        Service-->>Transport: FetchSearchResult
        Transport-->>Coord: FetchSearchResult
    end

    Note over Coord: 合并所有分片的FetchSearchResult

    Coord->>Coord: 按全局TopDocs的顺序排列hits
    Coord->>Coord: merge(reducedQueryPhase, fetchResults)
    Coord->>Coord: 构建SearchResponseSections<br/>(hits + aggregations + suggest)
    Coord->>Coord: 构建最终SearchResponse

    Note over Coord: 清理资源(如果是单次查询)

    alt 单次查询(Scroll/PIT)
        loop 每个分片
            Coord->>Transport: sendFreeContext(contextId)
            Transport->>Service: [RPC] FreeContextRequest
            Service->>Service: freeReaderContext(contextId)<br/>释放IndexReader引用
        end
    end

Fetch Phase 功能详解

执行位置与触发

  • 执行位置: 仅在需要返回文档的分片上执行
  • 触发条件: Reduce Phase确定全局Top K后
  • 线程池: search 线程池
  • 并发: 所有需要的分片并行执行

docIds分组策略

Reduce Phase产生全局Top K后,需要按分片分组:

// 示例: 全局Top 10
// 来自3个分片的结果
TopDocs globalTopDocs = [
    (shard=1, docId=103, score=9.5),  // 第1名
    (shard=0, docId=42,  score=9.2),  // 第2名
    (shard=2, docId=201, score=8.8),  // 第3名
    (shard=1, docId=57,  score=8.5),  // 第4名
    (shard=0, docId=91,  score=8.3),  // 第5名
    ...
];

// 按分片分组
Map<ShardId, int[]> docIdsByhard = {
    shard_0: [42, 91, ...],
    shard_1: [103, 57, ...],
    shard_2: [201, ...]
};

// 为每个分片创建一个FetchRequest

StoredFields 读取优化

Lucene使用.fdt(Field Data)和.fdx(Field Index)文件存储StoredFields:

  1. 索引查找: 通过.fdx文件找到docId对应的数据在.fdt中的位置
  2. 批量读取: 如果docIds是连续的,可以批量读取减少IO
  3. 压缩: StoredFields使用LZ4压缩,读取时解压

_source 加载策略

根据索引设置,_source可能来自不同位置:

索引设置 _source位置 读取方式
_source.enabled: true (默认) StoredFields 从.fdt文件读取
_source.mode: synthetic 合成 从DocValues重建
store_source: false Recovery专用 不可用于搜索

FetchSubPhase 执行顺序与功能

Elasticsearch在Fetch Phase执行多个子阶段(SubPhase):

1. FetchSourcePhase (最先执行)

  • 功能: 提取和过滤_source字段
  • 参数:
    • _source: false: 不返回_source
    • _source: ["field1", "field2"]: 仅包含指定字段
    • _source: {"includes": [...], "excludes": [...]}: 包含和排除规则
  • 实现: 使用JSON过滤器过滤_source

2. FetchFieldsPhase

  • 功能: 提取指定字段的值
  • 数据源优先级:
    1. _source (如果可用)
    2. doc_values (如果字段有docValues)
    3. stored_fields (如果字段存储)
  • 格式化: 应用字段格式化器(如日期格式化)

3. FetchDocValuesPhase

  • 功能: 从DocValues提取字段值
  • 适用: 数值、日期、关键字等字段
  • 优势: 列式存储,读取性能好

4. HighlightPhase

  • 功能: 生成高亮片段
  • 高亮器类型:
    • Unified Highlighter (默认,推荐):
      • 基于term vectors或重新分析
      • 支持复杂查询
      • 性能最好
    • Fast Vector Highlighter:
      • 需要term_vector: with_positions_offsets
      • 速度快但占用更多磁盘
    • Plain Highlighter:
      • 重新分析_source
      • 慢但无需额外存储
  • 片段生成:
    原文: "Elasticsearch is a distributed search engine"
    查询: "search"
    高亮: "Elasticsearch is a distributed <em>search</em> engine"
    

5. ExplainPhase

  • 功能: 生成评分解释
  • 触发: 请求参数explain: true
  • 输出: 详细的评分计算树

6. InnerHitsPhase

  • 功能: 处理嵌套文档或父子文档的inner_hits
  • 执行: 对每个Top文档递归执行查询和fetch
  • 用途: 在父文档中显示匹配的子文档

SearchContext 生命周期管理

1. Context创建 (Query Phase)

ReaderContext readerContext = new ReaderContext(
    id,
    indexShard,
    searcherSupplier, // 持有IndexReader引用
    keepAlive,
    singleSession
);

2. Context复用 (Fetch Phase)

// Fetch Phase通过contextId查找现有Context
ReaderContext ctx = activeReaders.get(contextId);
if (ctx == null) {
    throw new SearchContextMissingException(contextId);
}

3. Context清理

  • 单次查询: Fetch Phase结束后立即清理
  • Scroll: 保持到scroll超时或显式清理
  • PIT: 保持到pit超时或显式关闭
  • 自动清理: 定时任务扫描超时的Context

性能优化点

  1. 字段过滤: 仅返回需要的字段,减少网络传输

    {
      "_source": ["title", "price"],
      "fields": ["date"]
    }
    
  2. 禁用_source: 如果不需要完整文档

    {
      "_source": false,
      "docvalue_fields": ["field1", "field2"]
    }
    
  3. 高亮优化:

    • 使用Unified Highlighter
    • 限制fragment_size和number_of_fragments
    • 避免对大文本字段高亮
  4. 避免inner_hits: inner_hits会显著增加Fetch开销

内存和网络开销

优化前 优化后 节省
完整_source (10KB/doc) 仅需要字段 (1KB/doc) 90%
返回100个字段 返回10个字段 网络带宽减少10倍
高亮整个文档 高亮片段 (200字符) CPU减少50%+

8.6 Scroll API 流程

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant Coord as 协调节点
    participant Shard1 as Shard 1
    participant Shard2 as Shard 2
    participant Context as SearchContext<br/>(保持打开)

    Note over Client,Context: 第一次 Scroll - 初始化

    Client->>Coord: POST /index/_search?scroll=1m<br/>{query, size: 1000}

    Coord->>Coord: 1. 创建 Scroll Context

    par 在每个分片创建 Context
        Coord->>Shard1: ShardSearchRequest<br/>(scroll=1m)
        Shard1->>Context: 2. 创建 ReaderContext<br/>(保持 Searcher 打开)
        Shard1->>Shard1: 3. 执行查询
        Shard1-->>Coord: ScrollQuerySearchResult<br/>(hits[0-999], scrollId)
    and
        Coord->>Shard2: ShardSearchRequest
        Shard2->>Context: 创建 ReaderContext
        Shard2->>Shard2: 执行查询
        Shard2-->>Coord: ScrollQuerySearchResult
    end

    Coord->>Coord: 4. 归并结果
    Coord-->>Client: SearchResponse<br/>{hits[], _scroll_id, total}

    Note over Client,Context: 后续 Scroll - 继续迭代

    Client->>Coord: POST /_search/scroll<br/>{scroll: "1m", scroll_id: "..."}

    Coord->>Coord: 5. 解析 scroll_id<br/>获取 Context 信息

    par 从每个分片获取下一批
        Coord->>Shard1: ScrollQueryRequest<br/>(contextId)
        Shard1->>Context: 6. 查找现有 Context
        Shard1->>Shard1: 7. 读取下一批<br/>(hits[1000-1999])
        Shard1->>Context: 8. 更新 Context<br/>(lastAccessTime)
        Shard1-->>Coord: ScrollQuerySearchResult
    and
        Coord->>Shard2: ScrollQueryRequest
        Shard2->>Context: 查找 Context
        Shard2->>Shard2: 读取下一批
        Shard2->>Context: 更新 Context
        Shard2-->>Coord: ScrollQuerySearchResult
    end

    Coord->>Coord: 归并结果
    Coord-->>Client: SearchResponse<br/>{hits[], _scroll_id}

    Note over Client,Context: 清理 Scroll

    Client->>Coord: DELETE /_search/scroll<br/>{scroll_id: "..."}

    par 删除所有分片的 Context
        Coord->>Shard1: ClearScrollRequest
        Shard1->>Context: 释放 ReaderContext
        Shard1-->>Coord: success
    and
        Coord->>Shard2: ClearScrollRequest
        Shard2->>Context: 释放 ReaderContext
        Shard2-->>Coord: success
    end

    Coord-->>Client: 200 OK

8. 性能优化

8.1 查询优化

1. 使用 Filter 代替 Query

{
  "query": {
    "bool": {
      "filter": [
        { "term": { "status": "published" } }
      ]
    }
  }
}
  • Filter 可以缓存
  • 不计算分数,更快

2. 限制返回字段

{
  "_source": ["title", "price"],
  "size": 10
}

3. 使用 CanMatch 优化

  • 自动跳过不匹配的分片
  • 基于 min/max 值范围

8.2 分页优化

深度分页问题:

{
  "from": 10000,
  "size": 10
}
  • 每个分片需要返回 10010 个结果
  • 协调节点需要排序 10010 * N 个结果

解决方案:

1. Search After (推荐)

{
  "size": 10,
  "search_after": [1234567890, "doc#123"],
  "sort": [
    { "timestamp": "asc" },
    { "_id": "asc" }
  ]
}

2. Scroll API

# 创建 Scroll
POST /index/_search?scroll=1m
{
  "size": 1000,
  "query": { "match_all": {} }
}

# 继续 Scroll
POST /_search/scroll
{
  "scroll": "1m",
  "scroll_id": "..."
}

8.3 聚合优化

1. 限制聚合基数

{
  "aggs": {
    "categories": {
      "terms": {
        "field": "category",
        "size": 10  // 限制返回桶数量
      }
    }
  }
}

2. 使用近似聚合

{
  "aggs": {
    "unique_users": {
      "cardinality": {
        "field": "user_id",
        "precision_threshold": 1000  // 近似算法
      }
    }
  }
}

9. 监控指标

搜索性能指标

指标 说明 建议值
search.query_total 查询总数 监控趋势
search.query_time_in_millis 查询总耗时 -
search.query_current 当前查询数 < 线程池大小
search.fetch_total Fetch 总数 -
search.fetch_time_in_millis Fetch 总耗时 -
search.scroll_total Scroll 总数 -
search.open_contexts 打开的搜索上下文数 < 500

线程池指标

GET /_cat/thread_pool/search?v&h=node_name,name,active,queue,rejected
指标 说明 建议值
active 活跃线程数 < size
queue 队列长度 < queue_size
rejected 拒绝次数 0

10. 常见问题

10.1 搜索慢

排查步骤:

  1. 检查慢查询日志
  2. 使用 Profile API 分析
  3. 检查分片数量
  4. 检查聚合复杂度

Profile API:

{
  "profile": true,
  "query": { ... }
}

10.2 深度分页

问题: from + size > 10000

解决:

  1. 使用 Search After
  2. 使用 Scroll (导出场景)
  3. 增大 max_result_window (不推荐)

10.3 搜索上下文过多

问题: Too many scroll contexts

解决:

  1. 及时清理 Scroll
  2. 减小 Scroll 超时时间
  3. 增大 search.max_open_scroll_context