Elasticsearch-05-传输层

本文档提供传输层模块的全面剖析,包括模块职责、架构设计、核心数据结构、通信协议、连接管理、关键流程时序图、配置调优和监控。


1. 模块职责

传输层是 Elasticsearch 节点间通信的基础设施,负责所有分布式操作的底层数据传输。

1.1 核心职责

  1. 节点间通信

    • 建立和管理节点之间的 TCP 连接
    • 发送请求和接收响应
    • 支持双向通信(请求-响应模式)
  2. 连接管理

    • 连接池管理
    • 连接健康检查
    • 连接重连和故障转移
  3. 请求路由

    • 根据 Action 名称路由请求到正确的处理器
    • 支持本地优化(本地节点请求不走网络)
    • 支持代理转发(跨集群通信)
  4. 序列化与反序列化

    • 请求/响应对象的序列化
    • 版本兼容性处理
    • 压缩支持
  5. 超时与重试

    • 请求超时检测
    • 响应超时处理
    • 错误处理和异常传播
  6. 性能优化

    • 连接复用
    • 批量发送
    • 零拷贝优化

1.2 输入与输出

输入:

  • 来自上层模块的请求(如搜索、索引、集群状态发布)
  • 来自其他节点的网络数据包

输出:

  • 网络数据包发送到目标节点
  • 响应回调到上层模块

1.3 依赖关系

上游依赖:

  • Netty: 底层 NIO 框架
  • ThreadPool: 线程池管理
  • TaskManager: 任务管理

下游依赖者:

  • Cluster Module: 集群状态发布
  • Index Module: 分片复制
  • Search Module: 分布式搜索

2. 模块架构

2.1 整体架构图

flowchart TB
    subgraph "上层模块"
        direction LR
        SearchAction[SearchTransportService<br/>搜索请求]
        ReplicationAction[TransportReplicationAction<br/>复制请求]
        ClusterAction[PublicationTransportHandler<br/>集群状态发布]
    end

    subgraph "传输服务层 TransportService"
        direction TB
        TS[TransportService<br/>传输服务门面]
        subgraph "连接管理"
            CM[ClusterConnectionManager<br/>连接管理器]
            ConnPool[连接池<br/>connectedNodes]
            ConnValidator[ConnectionValidator<br/>连接验证器]
        end
        subgraph "请求处理"
            RH[RequestHandlers<br/>请求处理器注册表]
            ResponseHandlers[ResponseHandlers<br/>响应处理器映射]
            TaskMgr[TaskManager<br/>任务管理器]
        end
        subgraph "拦截与监控"
            Interceptor[TransportInterceptor<br/>传输拦截器]
            Listener[TransportMessageListener<br/>消息监听器]
        end
    end

    subgraph "传输抽象层 Transport"
        direction TB
        Transport[Transport接口<br/>传输层抽象]
        Connection[Connection接口<br/>连接抽象]
        LocalConn[LocalNodeConnection<br/>本地连接优化]
    end

    subgraph "TCP传输实现层 TcpTransport"
        direction TB
        TcpTransport[TcpTransport<br/>TCP传输实现]
        subgraph "出站路径"
            OutboundHandler[OutboundHandler<br/>出站处理器]
            Serializer[序列化器<br/>serialize]
            Compressor[压缩器<br/>compress]
        end
        subgraph "入站路径"
            InboundHandler[InboundHandler<br/>入站处理器]
            Deserializer[反序列化器<br/>deserialize]
            Decompressor[解压缩器<br/>decompress]
        end
        subgraph "连接实现"
            NodeChannels[NodeChannels<br/>节点通道组]
            TcpChannel[TcpChannel<br/>TCP通道]
            ChannelStats[ChannelStats<br/>通道统计]
        end
    end

    subgraph "网络层 Netty"
        direction LR
        NettyTransport[Netty4Transport<br/>Netty实现]
        NioEventLoop[NioEventLoopGroup<br/>事件循环组]
        NettyChannel[Netty Channel<br/>网络通道]
        ByteBuf[ByteBuf<br/>零拷贝缓冲]
    end

    SearchAction --> TS
    ReplicationAction --> TS
    ClusterAction --> TS

    TS --> CM
    TS --> RH
    TS --> ResponseHandlers
    TS --> Interceptor
    TS --> Listener
    TS --> TaskMgr

    CM --> ConnPool
    CM --> ConnValidator
    CM --> Transport

    TS --> Connection
    Connection --> LocalConn
    Connection --> Transport

    Transport --> TcpTransport

    TcpTransport --> OutboundHandler
    TcpTransport --> InboundHandler
    TcpTransport --> NodeChannels

    OutboundHandler --> Serializer
    OutboundHandler --> Compressor
    OutboundHandler --> TcpChannel

    InboundHandler --> Deserializer
    InboundHandler --> Decompressor
    InboundHandler --> RH

    NodeChannels --> TcpChannel
    TcpChannel --> ChannelStats
    TcpChannel --> NettyTransport

    NettyTransport --> NioEventLoop
    NettyTransport --> NettyChannel
    NettyChannel --> ByteBuf

2.2 架构说明

层次划分

1. 传输服务层 (Service Layer)

  • 职责: 提供高层次的传输服务 API
  • 核心组件:
    • TransportService: 传输服务门面,协调所有传输操作
    • ClusterConnectionManager: 管理集群内所有节点连接
    • RequestHandlers: 维护 Action → Handler 的映射
    • ResponseHandlers: 维护 RequestId → ResponseHandler 的映射
    • TaskManager: 管理所有传输任务的生命周期
    • TransportInterceptor: 拦截器链,用于权限控制、监控等
  • 关键特性: 请求路由、连接管理、超时控制、任务追踪

2. 传输抽象层 (Abstract Layer)

  • 职责: 定义传输接口,解耦具体实现
  • 核心组件:
    • Transport 接口: 定义传输层核心契约
    • Connection 接口: 定义连接抽象
    • LocalNodeConnection: 本地节点优化实现(零拷贝)
  • 关键特性: 可插拔、可测试、支持本地优化

3. TCP传输实现层 (Implementation Layer)

  • 职责: 基于 TCP 的具体实现
  • 核心组件:
    • TcpTransport: TCP 传输层实现基类
    • OutboundHandler: 处理所有出站消息(序列化、压缩)
    • InboundHandler: 处理所有入站消息(解压、反序列化、路由)
    • NodeChannels: 每个节点的连接通道组(按类型分组)
    • TcpChannel: 单个 TCP 通道的抽象
  • 关键特性: 高性能、低延迟、连接池管理、压缩支持

4. 网络层 (Network Layer)

  • 职责: 底层网络 I/O
  • 核心组件:
    • Netty4Transport: Netty 4 的具体实现
    • NioEventLoopGroup: 事件循环组(Boss + Worker)
    • Netty Channel: Netty 原生网络通道
    • ByteBuf: Netty 零拷贝缓冲区
  • 关键特性: 异步、非阻塞、零拷贝、高吞吐

边界条件

  • 并发: 支持高并发请求(万级 QPS)
  • 超时:
    • 连接超时: 默认 30s
    • 请求超时: 可配置(默认无限制)
    • Handshake 超时: 30s
  • 重试: 自动重试网络错误(连接失败、超时)
  • 限流: 通过线程池大小控制并发

异常处理

  • 连接失败: 标记节点为不可用,触发重连
  • 请求超时: 回调 handler.handleException(TimeoutException)
  • 序列化错误: 抛出 NotSerializableTransportException
  • 版本不兼容: 握手失败,拒绝连接

性能要点

  • 连接复用: 长连接,避免频繁建连
  • 零拷贝: 使用 DirectByteBuffer
  • 批量发送: 聚合小请求
  • 压缩: 可选的请求/响应压缩

2.3 模块间交互矩阵

下表展示了传输层与上游模块之间的交互关系:

上游模块 主要 Action 请求类型 同步/异步 超时设置 错误处理 一致性要求
SearchTransportService indices:data/read/search[phase/query] Query 阶段 异步 可配置(默认无限制) 重试失败分片 最终一致
indices:data/read/search[phase/fetch] Fetch 阶段 异步 同 Query 失败则返回部分结果 最终一致
indices:data/read/search[phase/dfs] DFS 阶段 异步 同 Query 重试 最终一致
TransportReplicationAction indices:data/write/index[p] 主分片写入 同步 1 分钟 重试 + 分片失败处理 强一致(Quorum)
indices:data/write/index[r] 副本分片写入 同步 同主分片 标记分片为失败 强一致
indices:data/write/bulk[s][p/r] 批量写入 同步 可配置 部分失败继续 强一致
PublicationTransportHandler internal:cluster/coordination/publish_state 集群状态发布 异步 30s 重试 + 节点标记 强一致(Raft)
internal:cluster/coordination/commit_state 状态提交确认 异步 30s 记录日志 强一致
MasterService internal:cluster/coordination/join 节点加入集群 同步 60s 拒绝并重新发现 强一致
internal:discovery/zen/fd/ping 节点心跳检测 异步 3s 标记节点失效
RecoveryTarget internal:index/shard/recovery/start 分片恢复开始 同步 30s 取消恢复 强一致
internal:index/shard/recovery/file_chunk 恢复文件块 异步 5m 重试 强一致
SnapshotShardsService internal:cluster/snapshot/update_snapshot 快照状态更新 异步 30s 重试 最终一致

交互特点说明

1. 图意概述

该矩阵展示了 Elasticsearch 各上游模块如何通过传输层进行通信,明确了不同场景下的通信模式、容错策略和一致性保证。

2. 关键字段

  • Action: Transport Action 的名称,是请求路由的唯一标识
  • 请求类型: 请求的业务语义(查询、写入、协调等)
  • 同步/异步: 调用方是否阻塞等待响应
    • 同步:使用 Future 或 CountDownLatch 等待
    • 异步:使用 ActionListener 回调
  • 超时设置: 请求超时时间,超时后会触发 TimeoutException
  • 错误处理: 失败时的处理策略(重试、降级、失败)
  • 一致性要求: 数据一致性级别(强一致、最终一致)

3. 边界条件

  • 并发限制: 每个节点同时处理的请求数受线程池大小限制
  • 请求大小: 单个请求最大 100MB(可配置)
  • 连接数: 每对节点间维护 13 个连接(recovery=2, bulk=3, reg=6, state=1, ping=1)
  • 超时处理: 超时请求会自动清理,释放资源

4. 异常处理

  • 网络异常: 自动重试(ConnectException、SocketTimeoutException)
  • 节点不可用: 标记节点失败,触发重新路由
  • 序列化失败: 返回错误给调用方,不重试
  • 请求超时: 取消请求,回调 handler.handleException()

5. 性能要点

  • 本地优化: 本地节点请求直接调用 Handler,零序列化开销
  • 批量优化: 批量请求共享连接,减少网络往返
  • 连接复用: 长连接避免频繁握手
  • 压缩优化: 大请求自动压缩(阈值可配置)

6. 版本兼容

  • 握手协议: 建立连接时验证版本兼容性
  • 向后兼容: 支持 N-1 版本的节点通信
  • 协议演进: 通过 TransportVersion 控制序列化格式

3. 核心组件详解

3.1 TransportService (传输服务)

职责

TransportService 是传输层的门面,提供发送请求和注册处理器的 API。

核心方法

public class TransportService extends AbstractLifecycleComponent {
    protected final Transport transport;
    protected final ConnectionManager connectionManager;
    protected final ThreadPool threadPool;
    private final TaskManager taskManager;
    private final TransportInterceptor interceptor;

    // 发送请求
    public <T extends TransportResponse> void sendRequest(
        DiscoveryNode node,
        String action,
        TransportRequest request,
        TransportResponseHandler<T> handler
    ) {
        // 1. 获取连接
        Transport.Connection connection = getConnection(node);

        // 2. 通过拦截器发送
        asyncSender.sendRequest(connection, action, request, options, handler);
    }

    // 注册请求处理器
    public <Request extends TransportRequest> void registerRequestHandler(
        String action,
        Executor executor,
        Writeable.Reader<Request> requestReader,
        TransportRequestHandler<Request> handler
    ) {
        RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
            action,
            requestReader,
            taskManager,
            handler,
            executor
        );
        transport.registerRequestHandler(reg);
    }

    // 连接到节点
    public void connectToNode(
        DiscoveryNode node,
        ConnectionProfile connectionProfile,
        ActionListener<Releasable> listener
    ) {
        connectionManager.connectToNode(node, connectionProfile, connectionValidator, listener);
    }
}

本地优化

// 如果目标是本地节点,直接调用处理器,不走网络
if (isLocalNode(node)) {
    return localNodeConnection;
}

3.2 ConnectionManager (连接管理器)

职责

管理到所有节点的连接,提供连接池功能。

核心方法

public interface ConnectionManager extends Closeable {
    // 连接到节点
    void connectToNode(
        DiscoveryNode node,
        ConnectionProfile connectionProfile,
        ConnectionValidator validator,
        ActionListener<Releasable> listener
    );

    // 获取连接
    Transport.Connection getConnection(DiscoveryNode node);

    // 断开连接
    void disconnectFromNode(DiscoveryNode node);

    // 获取所有已连接节点
    Set<DiscoveryNode> getAllConnectedNodes();
}

ConnectionProfile (连接配置)

public class ConnectionProfile {
    private int numConnectionsPerType;  // 每种类型的连接数
    private TimeValue connectTimeout;   // 连接超时
    private TimeValue handshakeTimeout; // 握手超时
    private boolean compressionEnabled; // 是否启用压缩

    // 连接类型
    public enum ConnectionTypeHandle {
        RECOVERY,  // 恢复连接
        BULK,      // 批量操作连接
        REG,       // 常规连接
        STATE,     // 集群状态连接
        PING       // Ping 连接
    }
}

3.3 Transport (传输抽象接口)

职责

定义传输层的抽象接口,解耦具体实现。

核心接口

public interface Transport extends LifecycleComponent {
    // 发送请求
    void sendRequest(
        Connection connection,
        long requestId,
        String action,
        TransportRequest request,
        TransportRequestOptions options
    );

    // 打开连接
    void openConnection(
        DiscoveryNode node,
        ConnectionProfile profile,
        ActionListener<Connection> listener
    );

    // 注册请求处理器
    <Request extends TransportRequest> void registerRequestHandler(
        RequestHandlerRegistry<Request> reg
    );

    // 响应处理器
    interface ResponseHandlers {
        TransportResponseHandler<?> onResponseReceived(
            long requestId,
            TransportMessageListener listener
        );
        void onResponseSent(long requestId);
    }
}

3.4 TcpTransport (TCP 实现)

职责

基于 TCP 和 Netty 的传输层实现。

核心组件

public abstract class TcpTransport extends AbstractLifecycleComponent implements Transport {
    private final OutboundHandler outboundHandler;
    private final InboundHandler inboundHandler;
    private final ResponseHandlers responseHandlers;

    // 发送请求
    @Override
    public void sendRequest(
        Connection connection,
        long requestId,
        String action,
        TransportRequest request,
        TransportRequestOptions options
    ) throws TransportException {
        // 1. 序列化请求
        BytesReference message = serialize(requestId, action, request);

        // 2. 通过出站处理器发送
        outboundHandler.sendMessage(connection.getChannel(), message, listener);
    }

    // 处理入站数据
    protected void handleInboundMessage(
        TcpChannel channel,
        InboundMessage message
    ) {
        if (message.isRequest()) {
            inboundHandler.handleRequest(channel, message);
        } else {
            inboundHandler.handleResponse(channel, message);
        }
    }
}

InboundHandler (入站处理器)

public class InboundHandler {
    // 处理请求
    public void handleRequest(TcpChannel channel, InboundMessage message) {
        // 1. 反序列化请求
        TransportRequest request = deserialize(message);

        // 2. 查找处理器
        RequestHandlerRegistry handler = requestHandlers.get(message.getAction());

        // 3. 执行处理器
        executor.execute(() -> {
            try {
                handler.processRequest(request, channel);
            } catch (Exception e) {
                sendErrorResponse(channel, message.getRequestId(), e);
            }
        });
    }

    // 处理响应
    public void handleResponse(TcpChannel channel, InboundMessage message) {
        // 1. 查找响应处理器
        TransportResponseHandler handler = responseHandlers.onResponseReceived(
            message.getRequestId()
        );

        // 2. 反序列化响应
        TransportResponse response = deserialize(message, handler.reader());

        // 3. 回调处理器
        handler.handleResponse(response);
    }
}

OutboundHandler (出站处理器)

public class OutboundHandler {
    // 发送消息
    public void sendMessage(
        TcpChannel channel,
        BytesReference message,
        ActionListener<Void> listener
    ) {
        // 1. 压缩(如果启用)
        if (compressionEnabled && message.length() > compressionThreshold) {
            message = compress(message);
        }

        // 2. 写入通道
        channel.sendMessage(message, listener);
    }
}

4. 核心数据结构

4.1 TransportRequest & TransportResponse

classDiagram
    class TransportRequest {
        <<abstract>>
        -TaskId parentTaskId
        +setParentTask(TaskId)
        +getParentTask()
    }

    class TransportResponse {
        <<abstract>>
        +writeTo(StreamOutput)
    }

    class TransportMessage {
        <<interface>>
        +writeTo(StreamOutput)
    }

    TransportMessage <|.. TransportRequest
    TransportMessage <|.. TransportResponse

TransportRequest: 所有传输请求的基类 TransportResponse: 所有传输响应的基类 TransportMessage: 传输消息的标记接口

4.2 Connection & ConnectionProfile

classDiagram
    class Connection {
        <<interface>>
        +getNode() DiscoveryNode
        +sendRequest(long, String, TransportRequest, TransportRequestOptions)
        +close()
    }

    class ConnectionProfile {
        -Map~ConnectionTypeHandle, ConnectionTypeSettings~ connectionTypeSettings
        -TimeValue connectTimeout
        -TimeValue handshakeTimeout
        -boolean compressionEnabled
        +getNumConnections()
        +getConnectTimeout()
    }

    class ConnectionTypeHandle {
        <<enumeration>>
        RECOVERY
        BULK
        REG
        STATE
        PING
    }

    ConnectionProfile --> ConnectionTypeHandle

Connection: 表示到某个节点的连接 ConnectionProfile: 连接配置,包含超时、连接数等 ConnectionTypeHandle: 连接类型(恢复、批量、常规、状态、Ping)

4.3 TransportResponseHandler

classDiagram
    class TransportResponseHandler {
        <<interface>>
        +read(StreamInput) T
        +handleResponse(T)
        +handleException(TransportException)
        +executor() String
    }

    class ActionListenerResponseHandler {
        -ActionListener~T~ listener
        -Writeable.Reader~T~ reader
        -String executor
        +handleResponse(T)
        +handleException(TransportException)
    }

    TransportResponseHandler <|.. ActionListenerResponseHandler

5. 通信协议

5.1 消息格式

+------------------+
| Header (variable)|  消息头
+------------------+
| Payload (variable)| 消息体
+------------------+

Header 格式

+--------+--------+--------+--------+
| Length | Status | Version| Action |
| 4 bytes| 1 byte | 1 byte | variable |
+--------+--------+--------+--------+
| RequestId (8 bytes)                |
+--------+--------+--------+--------+
| Features (4 bytes)                 |
+--------+--------+--------+--------+

字段说明:

  • Length: 消息总长度
  • Status: 状态标志 (Request/Response/Error)
  • Version: 传输协议版本
  • Action: 操作名称 (如 “indices:data/write/index”)
  • RequestId: 请求唯一标识符
  • Features: 特性标志 (压缩、握手等)

5.2 Handshake 流程

sequenceDiagram
    autonumber
    participant Node1 as Node 1
    participant Node2 as Node 2

    Note over Node1,Node2: 建立 TCP 连接

    Node1->>Node2: TCP SYN
    Node2-->>Node1: TCP SYN-ACK
    Node1->>Node2: TCP ACK

    Note over Node1,Node2: Handshake (握手)

    Node1->>Node2: HandshakeRequest<br/>(version, cluster_name, node_id)
    Node2->>Node2: 验证:<br/>1. 集群名称匹配<br/>2. 版本兼容<br/>3. 许可证兼容

    alt 验证通过
        Node2-->>Node1: HandshakeResponse<br/>(version, node_info)
        Note over Node1,Node2: 连接建立成功
    else 验证失败
        Node2-->>Node1: HandshakeResponse<br/>(error)
        Node2->>Node2: 关闭连接
        Note over Node1,Node2: 连接失败
    end

6. 传输层 API

6.1 sendRequest - 发送请求

方法签名

public <T extends TransportResponse> void sendRequest(
    DiscoveryNode node,
    String action,
    TransportRequest request,
    TransportResponseHandler<T> handler
)

参数说明

参数 类型 说明
node DiscoveryNode 目标节点
action String 操作名称(如 “indices:data/write/index”)
request TransportRequest 请求对象
handler TransportResponseHandler 响应处理器(回调)

使用示例

// 发送搜索请求到另一个节点
transportService.sendRequest(
    targetNode,
    "indices:data/read/search",
    searchRequest,
    new TransportResponseHandler<SearchResponse>() {
        @Override
        public SearchResponse read(StreamInput in) throws IOException {
            return new SearchResponse(in);
        }

        @Override
        public void handleResponse(SearchResponse response) {
            // 处理响应
        }

        @Override
        public void handleException(TransportException exp) {
            // 处理异常
        }

        @Override
        public String executor() {
            return ThreadPool.Names.SEARCH;
        }
    }
);

6.2 registerRequestHandler - 注册请求处理器

方法签名

public <Request extends TransportRequest> void registerRequestHandler(
    String action,
    Executor executor,
    Writeable.Reader<Request> requestReader,
    TransportRequestHandler<Request> handler
)

使用示例

// 注册搜索请求处理器
transportService.registerRequestHandler(
    "indices:data/read/search",
    ThreadPool.Names.SEARCH,
    SearchRequest::new,
    (request, channel, task) -> {
        // 处理搜索请求
        SearchResponse response = searchService.executeQueryPhase(request, (SearchTask) task);
        channel.sendResponse(response);
    }
);

6.3 常用 Action 名称

索引操作

Action 说明
indices:data/write/index 索引文档
indices:data/write/bulk 批量操作
indices:data/write/delete 删除文档
indices:data/write/update 更新文档
indices:data/read/get 获取文档

搜索操作

Action 说明
indices:data/read/search 搜索
indices:data/read/search[phase/query] Query Phase
indices:data/read/search[phase/fetch] Fetch Phase
indices:data/read/scroll Scroll

集群操作

Action 说明
cluster:monitor/health 集群健康
cluster:monitor/state 集群状态
cluster:admin/settings/update 更新设置
internal:cluster/coordination/join 节点加入
internal:cluster/coordination/publish_state 发布状态

7. 上游模块调用链路分析

本章节详细分析三个典型上游模块如何使用传输层,从上游接口开始,自上而下展示完整的调用链路、关键代码和内部时序。

7.1 搜索请求路径

7.1.1 调用链路概览

搜索请求从协调节点发起,经过传输层发送到各个数据节点的分片,执行查询和获取,最后聚合结果返回。

flowchart TD
    A[TransportSearchAction] --> B[SearchTransportService]
    B --> C[TransportService.sendChildRequest]
    C --> D[AsyncSender.sendRequest]
    D --> E[ConnectionManager.getConnection]
    E --> F[Transport.Connection]
    F --> G[TcpTransport.sendRequest]
    G --> H[OutboundHandler.sendRequest]
    H --> I[OutboundHandler.serialize]
    I --> J[TcpChannel.sendMessage]
    J --> K[Netty Channel.write]

    L[Netty Channel.read] --> M[InboundHandler.inboundMessage]
    M --> N[InboundHandler.messageReceived]
    N --> O[InboundHandler.handleRequest]
    O --> P[RequestHandlerRegistry.processMessageReceived]
    P --> Q[SearchService.executeQueryPhase]
    Q --> R[构建 QuerySearchResult]
    R --> S[TransportChannel.sendResponse]
    S --> T[OutboundHandler.sendResponse]

7.1.2 关键代码 - 发送搜索请求

SearchTransportService 发起请求

// server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
public void sendExecuteQuery(
    Transport.Connection connection,
    final ShardSearchRequest request,
    SearchTask task,
    final ActionListener<SearchPhaseResult> listener
) {
    // 1. 确定是否需要优化(单分片场景)
    final boolean fetchDocuments = request.numberOfShards() == 1
        && (request.source() == null || request.source().rankBuilder() == null);

    // 2. 选择响应读取器(QuerySearchResult 或 QueryFetchSearchResult)
    Writeable.Reader<SearchPhaseResult> reader = fetchDocuments
        ? QueryFetchSearchResult::new
        : in -> new QuerySearchResult(in, true);

    // 3. 通过传输服务发送请求(使用子任务机制)
    transportService.sendChildRequest(
        connection,
        QUERY_ACTION_NAME,  // "indices:data/read/search[phase/query]"
        request,
        task,
        new ConnectionCountingHandler<>(listener, reader, connection)
    );
}

TransportService 处理请求

// server/src/main/java/org/elasticsearch/transport/TransportService.java
public <T extends TransportResponse> void sendChildRequest(
    final Transport.Connection connection,
    final String action,
    final TransportRequest request,
    final Task parentTask,
    final TransportResponseHandler<T> handler
) {
    // 1. 设置父任务 ID(用于任务追踪和取消)
    request.setParentTask(parentTask.taskId());

    // 2. 调用标准发送逻辑
    sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler);
}

public <T extends TransportResponse> void sendRequest(
    final Transport.Connection connection,
    final String action,
    final TransportRequest request,
    final TransportRequestOptions options,
    TransportResponseHandler<T> handler
) {
    // 1. 通过拦截器发送(支持权限控制、监控等)
    asyncSender.sendRequest(connection, action, request, options, handler);
}

// 拦截器处理后的实际发送逻辑
private <T extends TransportResponse> void sendRequestInternal(
    final Transport.Connection connection,
    final String action,
    final TransportRequest request,
    final TransportRequestOptions options,
    TransportResponseHandler<T> handler
) {
    // 1. 生成唯一的请求 ID
    final long requestId = responseHandlers.newRequestId();

    // 2. 设置超时处理器
    final TimeoutHandler timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);

    // 3. 注册响应处理器(requestId → handler)
    try {
        responseHandlers.add(requestId, new RequestHolder<>(
            handler,
            connection,
            action,
            timeoutHandler
        ));
    } catch (Exception e) {
        // 注册失败,立即触发异常回调
        handler.handleException(new SendRequestTransportException(connection.getNode(), action, e));
        return;
    }

    // 4. 通过底层连接发送请求
    try {
        connection.sendRequest(requestId, action, request, options);
    } catch (Exception e) {
        // 发送失败,移除响应处理器并触发异常
        responseHandlers.remove(requestId);
        handler.handleException(new SendRequestTransportException(connection.getNode(), action, e));
    }
}

TcpTransport 连接实现

// server/src/main/java/org/elasticsearch/transport/TcpTransport.java (内部类 NodeChannels)
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
    throws IOException, TransportException {
    // 1. 检查连接状态
    if (isClosing.get()) {
        throw new NodeNotConnectedException(node, "connection already closed");
    }

    // 2. 根据请求类型选择通道(RECOVERY, BULK, REG, STATE, PING)
    TcpChannel channel = channel(options.type());

    // 3. 确定是否需要压缩
    final Compression.Scheme schemeToUse = getCompressionScheme(request);

    // 4. 通过出站处理器发送
    outboundHandler.sendRequest(
        node,
        channel,
        requestId,
        action,
        request,
        options,
        getTransportVersion(),
        schemeToUse,
        false  // isHandshake = false
    );
}

private Compression.Scheme getCompressionScheme(TransportRequest request) {
    // 全局压缩 或 索引数据压缩
    final boolean shouldCompress = compress == Compression.Enabled.TRUE
        || (compress == Compression.Enabled.INDEXING_DATA
            && request instanceof RawIndexingDataTransportRequest
            && ((RawIndexingDataTransportRequest) request).isRawIndexingData());
    return shouldCompress ? compressionScheme : null;
}

OutboundHandler 序列化和发送

// server/src/main/java/org/elasticsearch/transport/OutboundHandler.java
void sendRequest(
    final DiscoveryNode node,
    final TcpChannel channel,
    final long requestId,
    final String action,
    final TransportRequest request,
    final TransportRequestOptions options,
    final TransportVersion transportVersion,
    final Compression.Scheme compressionScheme,
    final boolean isHandshake
) throws IOException, TransportException {
    // 调用通用发送逻辑
    sendMessage(
        channel,
        MessageDirection.REQUEST,
        action,
        request,
        requestId,
        isHandshake,
        compressionScheme,
        transportVersion,
        ResponseStatsConsumer.NONE,
        () -> messageListener.onRequestSent(node, requestId, action, request, options)
    );
}

private void sendMessage(
    TcpChannel channel,
    MessageDirection messageDirection,
    String action,
    Writeable writeable,
    long requestId,
    boolean isHandshake,
    Compression.Scheme compressionScheme,
    TransportVersion version,
    ResponseStatsConsumer responseStatsConsumer,
    Releasable onAfter
) throws IOException {
    // 1. 序列化消息(包含头部和载荷)
    final BytesReference message;
    final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
    try {
        message = serialize(
            messageDirection,
            action,
            requestId,
            isHandshake,
            version,
            compressionScheme,
            writeable,
            threadPool.getThreadContext(),
            byteStreamOutput
        );
    } catch (Exception e) {
        logger.warn(() -> "failed to serialize outbound message [" + writeable + "]", e);
        throw e;
    }

    // 2. 通过内部发送方法写入通道
    internalSend(channel, message, () -> writeable.getClass().getName(), listener);
}

public static BytesReference serialize(
    MessageDirection messageDirection,
    String action,
    long requestId,
    boolean isHandshake,
    TransportVersion version,
    Compression.Scheme compressionScheme,
    Writeable writeable,
    ThreadContext threadContext,
    RecyclerBytesStreamOutput byteStreamOutput
) throws IOException {
    // 1. 跳过头部空间(稍后填充)
    byteStreamOutput.skip(TcpHeader.HEADER_SIZE);

    // 2. 写入线程上下文(tracing、headers 等)
    threadContext.writeTo(byteStreamOutput);

    // 3. 对于请求,写入 action 名称
    if (messageDirection == MessageDirection.REQUEST) {
        byteStreamOutput.writeString(action);
    }

    // 4. 序列化消息体(可能压缩)
    final int variableHeaderLength = Math.toIntExact(byteStreamOutput.position() - TcpHeader.HEADER_SIZE);
    BytesReference message = serializeMessageBody(writeable, compressionScheme, version, byteStreamOutput);

    // 5. 构建状态标志
    byte status = 0;
    if (messageDirection != MessageDirection.REQUEST) {
        status = TransportStatus.setResponse(status);
    }
    if (isHandshake) {
        status = TransportStatus.setHandshake(status);
    }
    if (compressionScheme != null) {
        status = TransportStatus.setCompress(status);
    }

    // 6. 回填头部
    byteStreamOutput.seek(0);
    TcpHeader.writeHeader(
        byteStreamOutput,
        requestId,
        status,
        version,
        message.length() - TcpHeader.HEADER_SIZE,
        variableHeaderLength
    );

    return message;
}

private void internalSend(
    TcpChannel channel,
    BytesReference reference,
    Supplier<String> messageDescription,
    ActionListener<Void> listener
) {
    // 1. 记录访问时间
    final long startTime = threadPool.rawRelativeTimeInMillis();
    channel.getChannelStats().markAccessed(startTime);

    // 2. 记录发送字节数
    final long messageSize = reference.length();

    // 3. 日志记录(如果启用)
    TransportLogger.logOutboundMessage(channel, reference);

    // 4. 清空线程上下文,写入 Netty 通道
    try (var ignored = threadPool.getThreadContext().newEmptyContext()) {
        channel.sendMessage(reference, new ActionListener<>() {
            @Override
            public void onResponse(Void v) {
                statsTracker.markBytesWritten(messageSize);
                listener.onResponse(v);
            }

            @Override
            public void onFailure(Exception e) {
                logger.warn(() -> "send message failed [channel: " + channel + "]", e);
                listener.onFailure(e);
            }
        });
    }
}

7.1.3 关键代码 - 接收和处理搜索请求

InboundHandler 接收消息

// server/src/main/java/org/elasticsearch/transport/InboundHandler.java
void inboundMessage(TcpChannel channel, InboundMessage message) throws Exception {
    // 1. 记录访问时间
    final long startTime = threadPool.rawRelativeTimeInMillis();
    channel.getChannelStats().markAccessed(startTime);

    // 2. 日志记录
    TransportLogger.logInboundMessage(channel, message);

    // 3. 处理消息(自动关闭)
    if (message.isPing()) {
        keepAlive.receiveKeepAlive(channel);
    } else {
        messageReceived(channel, message, startTime);
    }
}

private void messageReceived(TcpChannel channel, InboundMessage message, long startTime) throws IOException {
    final InetSocketAddress remoteAddress = channel.getRemoteAddress();
    final Header header = message.getHeader();

    ThreadContext threadContext = threadPool.getThreadContext();
    try (var ignored = threadContext.newStoredContext()) {
        // 1. 设置线程上下文(从消息头部恢复)
        threadContext.setHeaders(header.getHeaders());
        threadContext.putTransient("_remote_address", remoteAddress);

        // 2. 根据消息类型分发
        if (header.isRequest()) {
            handleRequest(channel, message);
        } else {
            // 处理响应
            TransportResponseHandler<?> responseHandler = responseHandlers.onResponseReceived(header.getRequestId());
            if (responseHandler != null) {
                executeResponseHandler(message, responseHandler, remoteAddress);
            }
        }
    } finally {
        // 3. 记录慢请求
        final long took = threadPool.rawRelativeTimeInMillis() - startTime;
        if (slowLogThresholdMs > 0 && took > slowLogThresholdMs) {
            logSlowMessage(message, took, slowLogThresholdMs, responseHandler);
        }
    }
}

private <T extends TransportRequest> void handleRequest(TcpChannel channel, InboundMessage message) throws IOException {
    final Header header = message.getHeader();

    // 1. 处理握手请求
    if (header.isHandshake()) {
        handleHandshakeRequest(channel, message);
        return;
    }

    // 2. 获取 action 和 requestId
    final String action = header.getActionName();
    final long requestId = header.getRequestId();

    // 3. 查找请求处理器
    final RequestHandlerRegistry<T> reg = requestHandlers.getHandler(action);

    // 4. 创建传输通道(用于发送响应)
    final TransportChannel transportChannel = new TcpTransportChannel(
        outboundHandler,
        channel,
        action,
        requestId,
        header.getVersion(),
        header.getCompressionScheme(),
        reg,
        false,
        message.takeBreakerReleaseControl()
    );

    try {
        // 5. 触发消息监听器
        messageListener.onRequestReceived(requestId, action);

        // 6. 反序列化请求
        final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput());
        T request = reg.newRequest(stream);
        request.remoteAddress(channel.getRemoteAddress());
        request.setRequestId(requestId);

        // 7. 根据线程池类型执行
        if (reg.getExecutor() == EsExecutors.DIRECT_EXECUTOR_SERVICE) {
            // 直接在当前线程执行
            doHandleRequest(reg, request, transportChannel);
        } else {
            // 提交到线程池执行
            handleRequestForking(request, reg, transportChannel);
        }
    } catch (Exception e) {
        sendErrorResponse(action, transportChannel, e);
    }
}

private static <T extends TransportRequest> void doHandleRequest(
    RequestHandlerRegistry<T> reg,
    T request,
    TransportChannel channel
) {
    try {
        reg.processMessageReceived(request, channel);
    } catch (Exception e) {
        sendErrorResponse(reg.getAction(), channel, e);
    }
}

SearchService 执行查询

// server/src/main/java/org/elasticsearch/search/SearchService.java
// (注册的 Handler 在 SearchTransportService 中)
transportService.registerRequestHandler(
    QUERY_ACTION_NAME,  // "indices:data/read/search[phase/query]"
    EsExecutors.DIRECT_EXECUTOR_SERVICE,
    ShardSearchRequest::new,
    (request, channel, task) -> searchService.executeQueryPhase(
        request,
        (SearchShardTask) task,
        new ChannelActionListener<>(channel)
    )
);

public void executeQueryPhase(
    ShardSearchRequest request,
    SearchShardTask task,
    ActionListener<SearchPhaseResult> listener
) {
    // 1. 创建搜索上下文
    final SearchContext searchContext = createContext(request, task);

    try {
        // 2. 执行查询阶段
        searchContext.preProcess();
        QueryPhase.execute(searchContext);

        // 3. 构建查询结果
        QuerySearchResult queryResult = searchContext.queryResult();

        // 4. 返回结果
        listener.onResponse(queryResult);
    } catch (Exception e) {
        listener.onFailure(e);
    } finally {
        searchContext.close();
    }
}

TcpTransportChannel 发送响应

// server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java
@Override
public void sendResponse(TransportResponse response) {
    try {
        outboundHandler.sendResponse(
            version,
            channel,
            requestId,
            action,
            response,
            compressionScheme,
            false  // isHandshake
        );
    } catch (Exception e) {
        throw new SendRequestTransportException(channel.getRemoteAddress(), action, e);
    }
}

7.1.4 搜索请求内部时序图

sequenceDiagram
    autonumber
    participant SearchAction as TransportSearchAction<br/>(协调节点)
    participant SearchTransport as SearchTransportService
    participant TS as TransportService
    participant CM as ConnectionManager
    participant Conn as TcpTransport.Connection
    participant Out as OutboundHandler
    participant Channel as TcpChannel/Netty

    Note over SearchAction: Query 阶段开始

    SearchAction->>SearchTransport: sendExecuteQuery(connection, request, task, listener)
    SearchTransport->>SearchTransport: 确定响应类型<br/>(单分片优化)
    SearchTransport->>TS: sendChildRequest(connection, QUERY_ACTION_NAME, request, task, handler)

    TS->>TS: 设置 parentTask
    TS->>TS: sendRequest(connection, action, request, options, handler)
    TS->>TS: asyncSender.sendRequest()<br/>(拦截器处理)

    TS->>TS: sendRequestInternal()
    TS->>TS: 生成 requestId
    TS->>TS: responseHandlers.add(requestId, handler)
    TS->>Conn: connection.sendRequest(requestId, action, request, options)

    Conn->>Conn: 选择通道 channel(type)
    Conn->>Conn: 判断是否压缩
    Conn->>Out: outboundHandler.sendRequest(node, channel, requestId, action, request, ...)

    Out->>Out: sendMessage(channel, REQUEST, action, request, requestId, ...)
    Out->>Out: serialize(REQUEST, action, requestId, request, ...)
    Out->>Out: 1. 跳过头部空间
    Out->>Out: 2. 写入 ThreadContext
    Out->>Out: 3. 写入 action 名称
    Out->>Out: 4. 序列化 request 体
    Out->>Out: 5. 可选压缩
    Out->>Out: 6. 回填头部

    Out->>Channel: channel.sendMessage(bytesRef, listener)
    Channel->>Channel: Netty write & flush

    Note over Channel: 网络传输

    Note over SearchAction,Channel: 数据节点接收和处理
sequenceDiagram
    autonumber
    participant Channel as TcpChannel/Netty<br/>(数据节点)
    participant In as InboundHandler
    participant RH as RequestHandlerRegistry
    participant SearchService as SearchService
    participant QueryPhase as QueryPhase
    participant TChannel as TcpTransportChannel
    participant Out as OutboundHandler

    Note over Channel: 数据节点接收请求

    Channel->>In: inboundMessage(channel, message)
    In->>In: 记录访问时间
    In->>In: 日志记录
    In->>In: messageReceived(channel, message, startTime)

    In->>In: 恢复 ThreadContext
    In->>In: handleRequest(channel, message)
    In->>In: 解析 header (action, requestId)
    In->>RH: requestHandlers.getHandler(action)
    RH-->>In: RequestHandlerRegistry

    In->>In: 创建 TcpTransportChannel
    In->>In: 反序列化 request
    In->>In: 判断执行器类型

    alt 直接执行 (DIRECT_EXECUTOR_SERVICE)
        In->>RH: doHandleRequest(reg, request, channel)
        RH->>SearchService: processMessageReceived(request, channel)<br/>executeQueryPhase(request, task, listener)
    else 线程池执行
        In->>RH: handleRequestForking(request, reg, channel)
        RH->>SearchService: [异步] processMessageReceived(request, channel)
    end

    SearchService->>SearchService: 创建 SearchContext
    SearchService->>QueryPhase: QueryPhase.execute(searchContext)
    QueryPhase->>QueryPhase: 1. runQuery(searchContext)
    QueryPhase->>QueryPhase: 2. executeInternal(searchContext)
    QueryPhase->>QueryPhase: 3. 收集 TopDocs
    QueryPhase-->>SearchService: QuerySearchResult

    SearchService->>TChannel: listener.onResponse(queryResult)
    TChannel->>Out: channel.sendResponse(queryResult)
    Out->>Out: sendResponse(version, channel, requestId, action, response, ...)
    Out->>Out: sendMessage(channel, RESPONSE, action, response, requestId, ...)
    Out->>Out: serialize(RESPONSE, ..., response, ...)
    Out->>Channel: channel.sendMessage(bytesRef, listener)
    Channel->>Channel: Netty write & flush

    Note over Channel: 响应返回协调节点
sequenceDiagram
    autonumber
    participant Channel as TcpChannel/Netty<br/>(协调节点)
    participant In as InboundHandler
    participant ResponseHandler as TransportResponseHandler<br/>(ActionListener)
    participant SearchAction as TransportSearchAction

    Note over Channel: 协调节点接收响应

    Channel->>In: inboundMessage(channel, responseMessage)
    In->>In: messageReceived(channel, responseMessage, startTime)
    In->>In: handleResponse(channel, message)
    In->>In: 解析 header (requestId, status)
    In->>In: responseHandlers.onResponseReceived(requestId)
    In-->>ResponseHandler: 获取 ResponseHandler

    In->>In: executeResponseHandler(message, responseHandler, remoteAddress)
    In->>In: 反序列化 response
    In->>ResponseHandler: handler.handleResponse(response)

    ResponseHandler->>SearchAction: onResponse(queryResult)
    SearchAction->>SearchAction: 聚合结果<br/>继续 Fetch 阶段或返回

    Note over SearchAction: Query 阶段完成

7.1.5 搜索请求流程说明

1. 图意概述

上述三个时序图展示了搜索请求从协调节点发送到数据节点、数据节点执行查询、响应返回协调节点的完整流程。

2. 关键路径

  • 发送路径: TransportSearchAction → SearchTransportService → TransportService → Connection → OutboundHandler → TcpChannel → Netty
  • 接收路径: Netty → TcpChannel → InboundHandler → RequestHandler → SearchService → QueryPhase
  • 响应路径: TcpTransportChannel → OutboundHandler → Netty → InboundHandler → ResponseHandler → SearchAction

3. 关键优化

  • 单分片优化: 单分片搜索时直接返回 QueryFetchSearchResult,省略 Fetch 阶段
  • 直接执行: 搜索请求使用 DIRECT_EXECUTOR_SERVICE,避免线程切换
  • 连接复用: 使用长连接和连接池,避免频繁建连
  • 零拷贝: Netty ByteBuf 支持零拷贝,减少内存拷贝

4. 异常处理

  • 超时: responseHandlers 中的 TimeoutHandler 会在超时后清理资源并触发异常回调
  • 网络故障: 连接断开时会触发 handler.handleException(),搜索层会重试其他副本
  • 序列化失败: 捕获异常后发送错误响应,不会影响其他分片

5. 性能要点

  • 异步处理: 整个流程基于异步回调,避免阻塞
  • 流式序列化: 使用 StreamOutput 流式序列化,避免大对象分配
  • 压缩: 大响应自动压缩,减少网络传输
  • 批量优化: 多个分片请求可以并发发送,充分利用网络带宽

6. 监控埋点

  • TransportMessageListener.onRequestSent(): 请求发送时触发
  • TransportMessageListener.onRequestReceived(): 请求接收时触发
  • TransportMessageListener.onResponseSent(): 响应发送时触发
  • ChannelStats.markAccessed(): 记录通道访问时间
  • StatsTracker.markBytesWritten(): 记录发送字节数

7.2 索引/复制请求路径

7.2.1 调用链路概览

索引请求首先路由到主分片,主分片执行写入后并发发送到所有副本分片,等待 Quorum 确认后返回。

flowchart TD
    A[TransportBulkAction] --> B[TransportShardBulkAction]
    B --> C[TransportReplicationAction]
    C --> D[ReroutePhase: 路由到主分片]
    D --> E[TransportService.sendRequest 主分片]
    E --> F[PrimaryPhase: 主分片执行]
    F --> G[ReplicationPhase: 发送到副本]
    G --> H[ReplicasProxy.performOn]
    H --> I[TransportService.sendRequest 副本]
    I --> J[ReplicaPhase: 副本执行]
    J --> K[等待 Quorum 确认]
    K --> L[返回响应]

7.2.2 关键代码 - 主分片写入

TransportReplicationAction 初始化

// server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
public TransportReplicationAction(
    Settings settings,
    String actionName,
    TransportService transportService,
    ClusterService clusterService,
    IndicesService indicesService,
    ThreadPool threadPool,
    ShardStateAction shardStateAction,
    ActionFilters actionFilters,
    Writeable.Reader<Request> requestReader,
    Writeable.Reader<ReplicaRequest> replicaRequestReader,
    Executor executor,
    SyncGlobalCheckpointAfterOperation syncGlobalCheckpointAfterOperation,
    PrimaryActionExecution primaryActionExecution,
    ReplicaActionExecution replicaActionExecution
) {
    // ...
    this.transportPrimaryAction = actionName + "[p]";  // 主分片 action
    this.transportReplicaAction = actionName + "[r]";   // 副本分片 action

    // 1. 注册主请求处理器(入口)
    transportService.registerRequestHandler(
        actionName,
        EsExecutors.DIRECT_EXECUTOR_SERVICE,
        requestReader,
        this::handleOperationRequest
    );

    // 2. 注册主分片处理器
    transportService.registerRequestHandler(
        transportPrimaryAction,
        executor,
        forceExecutionOnPrimary,
        true,  // canTripCircuitBreaker
        in -> new ConcreteShardRequest<>(requestReader, in),
        this::handlePrimaryRequest
    );

    // 3. 注册副本分片处理器
    transportService.registerRequestHandler(
        transportReplicaAction,
        executor,
        true,  // forceExecution - 副本不能拒绝
        canTripCircuitBreakerOnReplica,
        in -> new ConcreteReplicaRequest<>(replicaRequestReader, in),
        this::handleReplicaRequest
    );
}

ReroutePhase 路由到主分片

// TransportReplicationAction 内部类
final class ReroutePhase extends AbstractRunnable {
    @Override
    protected void doRun() {
        // 1. 获取集群状态
        final ClusterState state = clusterService.state();

        // 2. 解析目标分片
        resolveRequest(state.metadata().index(request.shardId().getIndex()), request);

        // 3. 获取分片路由
        final ShardRouting primary = state.routingTable().shardRoutingTable(request.shardId()).primaryShard();

        // 4. 如果主分片不在本节点,转发请求
        if (primary == null || primary.active() == false) {
            retry(new NoShardAvailableActionException(request.shardId()));
        } else {
            final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
            if (node == null) {
                retry(new NoNodeAvailableException("unknown node [" + primary.currentNodeId() + "]"));
            } else {
                performAction(node, primary.allocationId().getId(), primary.primaryTerm());
            }
        }
    }

    private void performAction(
        final DiscoveryNode node,
        final String primaryAllocationId,
        final long primaryTerm
    ) {
        // 5. 构建具体的分片请求
        final ConcreteShardRequest<Request> concreteShardRequest = new ConcreteShardRequest<>(
            request,
            primaryAllocationId,
            primaryTerm
        );

        // 6. 发送到主分片节点
        final ActionListenerResponseHandler<Response> handler = new ActionListenerResponseHandler<>(
            new RetryingListener(),
            TransportReplicationAction.this::newResponseInstance,
            TransportResponseHandler.TRANSPORT_WORKER
        );

        transportService.sendRequest(
            node,
            transportPrimaryAction,  // actionName + "[p]"
            concreteShardRequest,
            transportOptions,
            handler
        );
    }
}

PrimaryPhase 主分片执行

void handlePrimaryRequest(
    final ConcreteShardRequest<Request> request,
    final TransportChannel channel,
    final Task task
) {
    // 1. 获取主分片
    final IndexShard indexShard = getIndexShard(request.getRequest().shardId());

    // 2. 执行主分片操作(ReplicationOperation)
    new AsyncPrimaryAction(request, channel, (ReplicationTask) task, indexShard).run();
}

class AsyncPrimaryAction extends AbstractRunnable {
    @Override
    protected void doRun() throws Exception {
        // 1. 获取操作权限
        acquirePrimaryOperationPermit(
            indexShard,
            request.getRequest(),
            this,
            request.getPrimaryTerm()
        );
    }

    @Override
    public void onResponse(Releasable releasable) {
        try {
            // 2. 执行主分片写入
            final PrimaryResult result = shardOperationOnPrimary(
                request.getRequest(),
                indexShard
            );

            // 3. 创建复制操作
            final ReplicationOperation<Request, ReplicaRequest, PrimaryResult> replicationOperation = new ReplicationOperation<>(
                request.getRequest(),
                indexShard.routingEntry(),
                this,
                newReplicasProxy(),
                logger,
                actionName
            );

            // 4. 执行复制
            replicationOperation.execute();
        } finally {
            releasable.close();
        }
    }
}

ReplicasProxy 发送到副本

protected class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {
    @Override
    public void performOn(
        final ShardRouting replica,
        final ReplicaRequest request,
        final long primaryTerm,
        final long globalCheckpoint,
        final long maxSeqNoOfUpdatesOrDeletes,
        final ActionListener<ReplicationOperation.ReplicaResponse> listener
    ) {
        // 1. 获取副本节点
        String nodeId = replica.currentNodeId();
        final DiscoveryNode node = clusterService.state().nodes().get(nodeId);
        if (node == null) {
            listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));
            return;
        }

        // 2. 构建副本请求
        final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(
            request,
            replica.allocationId().getId(),
            primaryTerm,
            globalCheckpoint,
            maxSeqNoOfUpdatesOrDeletes
        );

        // 3. 发送到副本节点
        final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(
            listener,
            ReplicaResponse::new,
            TransportResponseHandler.TRANSPORT_WORKER
        );

        transportService.sendRequest(
            node,
            transportReplicaAction,  // actionName + "[r]"
            replicaRequest,
            transportOptions,
            handler
        );
    }
}

7.2.3 关键代码 - 副本分片写入

ReplicaPhase 副本执行

void handleReplicaRequest(
    final ConcreteReplicaRequest<ReplicaRequest> request,
    final TransportChannel channel,
    final Task task
) {
    // 1. 获取副本分片
    final IndexShard replica = getIndexShard(request.getRequest().shardId());

    // 2. 执行副本操作
    new AsyncReplicaAction(request, channel, (ReplicationTask) task, replica).run();
}

class AsyncReplicaAction extends AbstractRunnable {
    @Override
    protected void doRun() throws Exception {
        // 1. 验证 allocation ID
        final String actualAllocationId = this.replica.routingEntry().allocationId().getId();
        if (actualAllocationId.equals(replicaRequest.getTargetAllocationID()) == false) {
            throw new ShardNotFoundException(
                this.replica.shardId(),
                "expected allocation id [{}] but found [{}]",
                replicaRequest.getTargetAllocationID(),
                actualAllocationId
            );
        }

        // 2. 获取副本操作权限
        acquireReplicaOperationPermit(
            replica,
            replicaRequest.getRequest(),
            this,
            replicaRequest.getPrimaryTerm(),
            replicaRequest.getGlobalCheckpoint(),
            replicaRequest.getMaxSeqNoOfUpdatesOrDeletes()
        );
    }

    @Override
    public void onResponse(Releasable releasable) {
        try {
            // 3. 执行副本写入
            shardOperationOnReplica(
                replicaRequest.getRequest(),
                replica
            );

            // 4. 构建副本响应
            final ReplicaResponse response = new ReplicaResponse(
                replica.getLocalCheckpoint(),
                replica.getLastSyncedGlobalCheckpoint()
            );

            // 5. 发送响应
            channel.sendResponse(response);
        } catch (Exception e) {
            channel.sendResponse(e);
        } finally {
            releasable.close();
        }
    }
}

7.2.4 索引/复制内部时序图

sequenceDiagram
    autonumber
    participant Client as TransportBulkAction<br/>(协调节点)
    participant Reroute as ReroutePhase
    participant TS as TransportService
    participant Primary as PrimaryPhase<br/>(主分片节点)
    participant Replication as ReplicationOperation
    participant Proxy as ReplicasProxy
    participant Replica1 as ReplicaPhase<br/>(副本1)
    participant Replica2 as ReplicaPhase<br/>(副本2)

    Note over Client,Replica2: 路由和主分片写入

    Client->>Reroute: runReroutePhase(task, request, listener)
    Reroute->>Reroute: 1. 获取集群状态
    Reroute->>Reroute: 2. 查找主分片路由
    Reroute->>Reroute: 3. 获取主分片节点
    Reroute->>TS: sendRequest(node, transportPrimaryAction, concreteShardRequest, handler)

    TS->>TS: sendRequestInternal()<br/>(生成 requestId, 注册 responseHandler)
    TS->>Primary: [网络传输] 请求到达主分片节点

    Primary->>Primary: handlePrimaryRequest(request, channel, task)
    Primary->>Primary: getIndexShard(shardId)
    Primary->>Primary: acquirePrimaryOperationPermit()
    Primary->>Primary: shardOperationOnPrimary(request, indexShard)
    Primary->>Primary: 执行写入:<br/>- 解析文档<br/>- 写入 Lucene<br/>- 更新 translog

    Note over Primary,Replica2: 复制到副本分片

    Primary->>Replication: ReplicationOperation.execute()
    Replication->>Replication: 获取所有 in-sync 副本列表

    par 并发发送到所有副本
        Replication->>Proxy: performOn(replica1, request, primaryTerm, globalCheckpoint, listener)
        Proxy->>TS: sendRequest(node, transportReplicaAction, replicaRequest, handler)
        TS->>Replica1: [网络传输] 请求到达副本1

        Replica1->>Replica1: handleReplicaRequest(request, channel, task)
        Replica1->>Replica1: getIndexShard(shardId)
        Replica1->>Replica1: 验证 allocation ID
        Replica1->>Replica1: acquireReplicaOperationPermit()
        Replica1->>Replica1: shardOperationOnReplica(request, replica)
        Replica1->>Replica1: 执行写入:<br/>- 写入 Lucene<br/>- 更新 translog
        Replica1->>Replica1: 构建 ReplicaResponse
        Replica1->>TS: channel.sendResponse(replicaResponse)
        TS->>Replication: [网络传输] 副本1响应
    and
        Replication->>Proxy: performOn(replica2, request, primaryTerm, globalCheckpoint, listener)
        Proxy->>TS: sendRequest(node, transportReplicaAction, replicaRequest, handler)
        TS->>Replica2: [网络传输] 请求到达副本2

        Replica2->>Replica2: handleReplicaRequest(request, channel, task)
        Replica2->>Replica2: getIndexShard(shardId)
        Replica2->>Replica2: 验证 allocation ID
        Replica2->>Replica2: acquireReplicaOperationPermit()
        Replica2->>Replica2: shardOperationOnReplica(request, replica)
        Replica2->>Replica2: 执行写入:<br/>- 写入 Lucene<br/>- 更新 translog
        Replica2->>Replica2: 构建 ReplicaResponse
        Replica2->>TS: channel.sendResponse(replicaResponse)
        TS->>Replication: [网络传输] 副本2响应
    end

    Note over Replication: 等待 Quorum 确认 (默认 1 + ceil((replicas+1)/2))

    Replication->>Primary: 所有副本完成或达到 Quorum
    Primary->>Primary: 构建 PrimaryResponse
    Primary->>TS: channel.sendResponse(primaryResponse)
    TS->>Client: [网络传输] 主分片响应返回协调节点

    Client->>Client: 处理响应<br/>onResponse(response)

    Note over Client: 写入完成

7.2.5 索引/复制流程说明

1. 图意概述

该时序图展示了索引/复制请求从协调节点到主分片再到副本分片的完整流程,体现了 Elasticsearch 的主副同步机制。

2. 关键路径

  • 路由阶段: TransportBulkAction → ReroutePhase → 查找主分片节点
  • 主写入阶段: TransportService → PrimaryPhase → 写入主分片 → 创建 ReplicationOperation
  • 副本复制阶段: ReplicationOperation → ReplicasProxy → 并发发送到所有副本
  • 副本执行阶段: ReplicaPhase → 验证 allocation ID → 写入副本 → 返回响应
  • 确认阶段: 等待 Quorum 确认 → 返回响应给协调节点

3. 关键设计

  • Quorum 机制: 默认需要 1 + ceil((replicas+1)/2) 个分片确认才返回成功
  • allocation ID: 每个分片副本有唯一 ID,防止写入到过期副本
  • primaryTerm: 主分片任期号,检测主分片切换
  • globalCheckpoint: 全局检查点,用于一致性控制
  • in-sync replicas: 只向同步中的副本发送请求,失败副本会被移除

4. 异常处理

  • 主分片不可用: ReroutePhase 重试,等待集群状态更新
  • 副本写入失败: 标记副本为失败,从 in-sync 列表移除,不影响请求成功
  • 网络超时: 副本请求超时不影响主请求,但会标记副本失败
  • allocation ID 不匹配: 拒绝请求,防止写入到过期副本

5. 性能要点

  • 并发复制: 主分片同时向所有副本发送请求,最大化吞吐
  • forceExecution: 副本处理器必须执行,不能因线程池满而拒绝
  • bulk 优化: 批量请求共享网络连接和序列化开销
  • translog: 写入 translog 后即可返回,不等待 Lucene flush

6. 监控埋点

  • ReplicationOperation.execute(): 记录复制开始时间
  • ReplicationOperation.onPrimaryOperationComplete(): 记录主写入完成时间
  • ReplicationOperation.onReplicaResponse(): 记录副本响应时间
  • ShardStateAction.shardFailed(): 记录分片失败事件

7.3 集群状态发布路径

7.3.1 调用链路概览

集群状态发布是主节点向所有节点广播最新集群状态的过程,使用 Raft 协议保证强一致性。

flowchart TD
    A[Coordinator] --> B[Publication]
    B --> C[PublicationTransportHandler]
    C --> D[构建 Diff 或 Full State]
    D --> E[序列化和压缩]
    E --> F[TransportService.sendChildRequest]
    F --> G[并发发送到所有节点]
    G --> H[InboundHandler 接收]
    H --> I[反序列化 ClusterState]
    I --> J[应用新状态]
    J --> K[发送确认]
    K --> L[等待 Quorum 确认]
    L --> M[提交状态]

7.3.2 关键代码 - 发布集群状态

PublicationTransportHandler 初始化

// server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java
public PublicationTransportHandler(
    TransportService transportService,
    NamedWriteableRegistry namedWriteableRegistry,
    Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
    BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit
) {
    this.transportService = transportService;
    this.handlePublishRequest = handlePublishRequest;

    // 1. 注册集群状态发布处理器
    transportService.registerRequestHandler(
        PUBLISH_STATE_ACTION_NAME,  // "internal:cluster/coordination/publish_state"
        this.clusterCoordinationExecutor,
        false,
        false,
        BytesTransportRequest::new,
        (request, channel, task) -> channel.sendResponse(handleIncomingPublishRequest(request))
    );

    // 2. 注册状态提交处理器
    transportService.registerRequestHandler(
        COMMIT_STATE_ACTION_NAME,  // "internal:cluster/coordination/commit_state"
        this.clusterCoordinationExecutor,
        false,
        false,
        ApplyCommitRequest::new,
        (request, channel, task) -> handleApplyCommit.accept(request, new ChannelActionListener<>(channel))
    );
}

Publication 发布状态

// PublicationTransportHandler 内部类
private class PublicationContext extends AbstractRefCounted {
    private final ClusterState newState;
    private final ClusterState previousState;
    private final boolean sendFullVersion;

    // 序列化后的状态(按版本分组)
    private final Map<TransportVersion, ReleasableBytesReference> serializedStates = new ConcurrentHashMap<>();
    private final Map<TransportVersion, ReleasableBytesReference> serializedDiffs = new ConcurrentHashMap<>();

    // 节点连接
    private final Map<DiscoveryNode, Transport.Connection> nodeConnections = new ConcurrentHashMap<>();

    void buildDiffAndSerializeStates() {
        // 1. 准备 Diff 计算器(懒加载)
        final LazyInitializable<Diff<ClusterState>, RuntimeException> diffSupplier = new LazyInitializable<>(
            () -> newState.diff(previousState)
        );

        // 2. 遍历所有节点
        for (DiscoveryNode node : discoveryNodes) {
            // 本地节点跳过序列化
            if (node.equals(transportService.getLocalNode())) {
                continue;
            }

            // 3. 获取连接
            Transport.Connection connection;
            try {
                connection = transportService.getConnection(node);
            } catch (NodeNotConnectedException e) {
                logger.debug(() -> format("No connection to [%s] available, skipping serialization", node), e);
                continue;
            }

            nodeConnections.put(node, connection);

            // 4. 决定发送全量还是 Diff
            if (sendFullVersion || previousState.nodes().nodeExists(node) == false) {
                // 新节点或强制全量:序列化完整集群状态
                serializedStates.computeIfAbsent(
                    connection.getTransportVersion(),
                    v -> serializeFullClusterState(newState, node, v)
                );
            } else {
                // 已存在节点:序列化 Diff
                serializedDiffs.computeIfAbsent(
                    connection.getTransportVersion(),
                    v -> serializeDiffClusterState(newState, diffSupplier.getOrCompute(), node, v)
                );
            }
        }
    }

    public void sendPublishRequest(
        DiscoveryNode destination,
        PublishRequest publishRequest,
        ActionListener<PublishWithJoinResponse> listener
    ) {
        // 1. 本地节点直接调用
        if (destination.equals(transportService.getLocalNode())) {
            try {
                listener.onResponse(handlePublishRequest.apply(publishRequest));
            } catch (Exception e) {
                listener.onFailure(e);
            }
            return;
        }

        // 2. 发送全量或 Diff
        if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
            logger.trace("sending full cluster state version [{}] to [{}]", newStateVersion, destination);
            sendFullClusterState(destination, listener);
        } else {
            logger.trace("sending cluster state diff for version [{}] to [{}]", newStateVersion, destination);
            sendClusterStateDiff(destination, listener);
        }
    }

    private void sendFullClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
        // 1. 获取连接
        Transport.Connection connection = nodeConnections.get(destination);
        if (connection == null) {
            listener.onFailure(new NodeNotConnectedException(destination, "No connection available"));
            return;
        }

        // 2. 获取序列化的状态
        var version = connection.getTransportVersion();
        ReleasableBytesReference bytes = serializedStates.get(version);
        if (bytes == null) {
            bytes = serializedStates.computeIfAbsent(version, v -> serializeFullClusterState(newState, destination, v));
        }

        // 3. 发送
        sendClusterState(connection, bytes, listener);
    }

    private void sendClusterStateDiff(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
        // 1. 获取连接
        Transport.Connection connection = nodeConnections.get(destination);
        if (connection == null) {
            listener.onFailure(new NodeNotConnectedException(destination, "No connection available"));
            return;
        }

        // 2. 获取序列化的 Diff
        final ReleasableBytesReference bytes = serializedDiffs.get(connection.getTransportVersion());

        // 3. 发送(如果失败则回退到全量)
        sendClusterState(connection, bytes, ActionListener.runAfter(listener.delegateResponse((delegate, e) -> {
            if (e instanceof TransportException transportException) {
                if (transportException.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
                    logger.debug("resending full cluster state to node {} reason {}", destination, transportException.getDetailedMessage());
                    sendFullClusterState(destination, delegate);
                    return;
                }
            }
            delegate.onFailure(e);
        }), this::decRef));
    }

    private void sendClusterState(
        Transport.Connection connection,
        ReleasableBytesReference bytes,
        ActionListener<PublishWithJoinResponse> listener
    ) {
        // 发送序列化的集群状态
        if (bytes.tryIncRef() == false) {
            listener.onFailure(new IllegalStateException("serialized cluster state released before transmission"));
            return;
        }

        transportService.sendChildRequest(
            connection,
            PUBLISH_STATE_ACTION_NAME,
            new BytesTransportRequest(bytes, connection.getTransportVersion()),
            task,
            STATE_REQUEST_OPTIONS,  // 30s 超时
            new CleanableResponseHandler<>(listener, PublishWithJoinResponse::new, clusterCoordinationExecutor, bytes::decRef)
        );
    }
}

序列化集群状态

private ReleasableBytesReference serializeFullClusterState(ClusterState clusterState, DiscoveryNode node, TransportVersion version) {
    try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream()) {
        final long uncompressedBytes;
        // 使用压缩流
        try (
            StreamOutput stream = new PositionTrackingOutputStreamStreamOutput(
                CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream))
            )
        ) {
            stream.setTransportVersion(version);
            stream.writeBoolean(true);  // 标记为全量状态
            clusterState.writeTo(stream);
            uncompressedBytes = stream.position();
        } catch (IOException e) {
            throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, node);
        }

        final int size = bytesStream.size();
        serializationStatsTracker.serializedFullState(uncompressedBytes, size);
        logger.trace("serialized full cluster state version [{}] for node [{}] with size [{}]", clusterState.version(), node, size);

        return bytesStream.moveToBytesReference();
    }
}

private ReleasableBytesReference serializeDiffClusterState(
    ClusterState clusterState,
    Diff<ClusterState> diff,
    DiscoveryNode node,
    TransportVersion version
) {
    try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream()) {
        final long uncompressedBytes;
        // 使用压缩流
        try (
            StreamOutput stream = new PositionTrackingOutputStreamStreamOutput(
                CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream))
            )
        ) {
            stream.setTransportVersion(version);
            stream.writeBoolean(false);  // 标记为 Diff
            diff.writeTo(stream);
            uncompressedBytes = stream.position();
        } catch (IOException e) {
            throw new ElasticsearchException("failed to serialize cluster state diff for publishing to node {}", e, node);
        }

        final int size = bytesStream.size();
        serializationStatsTracker.serializedDiff(uncompressedBytes, size);
        logger.trace("serialized cluster state diff for version [{}] for node [{}] with size [{}]", clusterState.version(), node, size);

        return bytesStream.moveToBytesReference();
    }
}

7.3.3 关键代码 - 接收和应用集群状态

接收发布请求

private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
    // 1. 反序列化
    final Compressor compressor = CompressorFactory.compressor(request.bytes());
    StreamInput in = request.bytes().streamInput();

    try {
        if (compressor != null) {
            in = new InputStreamStreamInput(compressor.threadLocalInputStream(in));
        }
        in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
        in.setTransportVersion(request.version());

        // 2. 读取标志位
        final boolean isFullState = in.readBoolean();

        // 3. 构建 PublishRequest
        final ClusterState incomingState;
        if (isFullState) {
            // 全量状态
            incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
        } else {
            // Diff 状态
            Diff<ClusterState> diff = ClusterState.readDiffFrom(in, transportService.getLocalNode());
            // 需要基于本地状态应用 diff
            // 如果本地状态版本不匹配,会抛出 IncompatibleClusterStateVersionException
            incomingState = diff.apply(lastSeenClusterState.get());
        }

        // 4. 更新本地已见状态
        lastSeenClusterState.set(incomingState);

        // 5. 调用应用逻辑
        final PublishRequest publishRequest = new PublishRequest(incomingState);
        return handlePublishRequest.apply(publishRequest);
    } catch (IncompatibleClusterStateVersionException e) {
        // Diff 应用失败,抛出异常让主节点重新发送全量状态
        throw e;
    }
}

应用集群状态

// server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
private PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
    // 1. 验证发布请求
    final ClusterState incomingState = publishRequest.getAcceptedState();

    // 2. 应用集群状态
    final ClusterState appliedState = clusterApplier.onNewClusterState(
        "apply cluster state (from master [" + incomingState.nodes().getMasterNode() + "])",
        () -> incomingState,
        (source, e) -> logger.warn(() -> "failed to apply cluster state from master", e)
    );

    // 3. 构建响应(包含可选的 Join 请求)
    return new PublishWithJoinResponse(
        new PublishResponse(incomingState.term(), incomingState.version()),
        joinOptional
    );
}

7.3.4 集群状态发布内部时序图

sequenceDiagram
    autonumber
    participant Coordinator as Coordinator<br/>(主节点)
    participant Publication as Publication
    participant Context as PublicationContext
    participant TS as TransportService
    participant Node1 as 节点1
    participant Node2 as 节点2
    participant Node3 as 节点3

    Note over Coordinator,Node3: 集群状态发布开始

    Coordinator->>Publication: publish(clusterChangedEvent, ackListener)
    Publication->>Context: new PublicationContext(newState, previousState, nodes)

    Context->>Context: buildDiffAndSerializeStates()
    Context->>Context: 遍历所有节点

    loop 每个节点
        Context->>Context: 获取 connection
        alt 新节点或强制全量
            Context->>Context: serializeFullClusterState(newState, node, version)
            Context->>Context: 1. 创建压缩流
            Context->>Context: 2. 写入 full state 标志
            Context->>Context: 3. 序列化 ClusterState
            Context->>Context: 4. 压缩
        else 已存在节点
            Context->>Context: serializeDiffClusterState(newState, diff, node, version)
            Context->>Context: 1. 创建压缩流
            Context->>Context: 2. 写入 diff 标志
            Context->>Context: 3. 序列化 Diff
            Context->>Context: 4. 压缩
        end
    end

    Note over Context: 序列化完成,开始并发发送

    par 并发发送到所有节点
        Context->>TS: sendChildRequest(node1, PUBLISH_STATE_ACTION, request, listener)
        TS->>Node1: [网络传输] 发送集群状态

        Node1->>Node1: handleIncomingPublishRequest(request)
        Node1->>Node1: 1. 解压缩
        Node1->>Node1: 2. 读取 full/diff 标志

        alt 全量状态
            Node1->>Node1: ClusterState.readFrom(in)
        else Diff 状态
            Node1->>Node1: ClusterState.readDiffFrom(in)
            Node1->>Node1: diff.apply(lastSeenClusterState)
        end

        Node1->>Node1: lastSeenClusterState.set(incomingState)
        Node1->>Node1: handlePublishRequest(publishRequest)
        Node1->>Node1: clusterApplier.onNewClusterState()
        Node1->>Node1: 构建 PublishWithJoinResponse
        Node1->>TS: channel.sendResponse(publishResponse)
        TS->>Publication: [网络传输] 节点1确认
    and
        Context->>TS: sendChildRequest(node2, PUBLISH_STATE_ACTION, request, listener)
        TS->>Node2: [网络传输] 发送集群状态

        Node2->>Node2: handleIncomingPublishRequest(request)
        Node2->>Node2: 解压和反序列化
        Node2->>Node2: 应用集群状态
        Node2->>TS: channel.sendResponse(publishResponse)
        TS->>Publication: [网络传输] 节点2确认
    and
        Context->>TS: sendChildRequest(node3, PUBLISH_STATE_ACTION, request, listener)
        TS->>Node3: [网络传输] 发送集群状态

        Node3->>Node3: handleIncomingPublishRequest(request)

        alt Diff 应用失败
            Node3->>Node3: IncompatibleClusterStateVersionException
            Node3->>TS: channel.sendResponse(exception)
            TS->>Context: [网络传输] 节点3失败
            Context->>TS: sendFullClusterState(node3, listener)
            TS->>Node3: [网络传输] 重新发送全量状态
            Node3->>Node3: ClusterState.readFrom(in)
            Node3->>Node3: 应用集群状态
            Node3->>TS: channel.sendResponse(publishResponse)
            TS->>Publication: [网络传输] 节点3确认
        else 成功
            Node3->>Node3: 应用集群状态
            Node3->>TS: channel.sendResponse(publishResponse)
            TS->>Publication: [网络传输] 节点3确认
        end
    end

    Note over Publication: 等待 Quorum 确认 (master + floor(nodes/2))

    Publication->>Coordinator: onCompletion(publis成功)

    Note over Coordinator,Node3: 提交集群状态

    Coordinator->>Context: sendCommitToAllNodes()

    par 并发发送提交请求
        Context->>TS: sendRequest(node1, COMMIT_STATE_ACTION, commitRequest, listener)
        TS->>Node1: [网络传输]
        Node1->>Node1: handleApplyCommit(request)
        Node1->>Node1: commitClusterState(term, version)
        Node1->>TS: channel.sendResponse(Empty.INSTANCE)
    and
        Context->>TS: sendRequest(node2, COMMIT_STATE_ACTION, commitRequest, listener)
        TS->>Node2: [网络传输]
        Node2->>Node2: handleApplyCommit(request)
        Node2->>TS: channel.sendResponse(Empty.INSTANCE)
    and
        Context->>TS: sendRequest(node3, COMMIT_STATE_ACTION, commitRequest, listener)
        TS->>Node3: [网络传输]
        Node3->>Node3: handleApplyCommit(request)
        Node3->>TS: channel.sendResponse(Empty.INSTANCE)
    end

    Publication->>Coordinator: 提交完成

    Note over Coordinator: 集群状态发布完成

7.3.5 集群状态发布流程说明

1. 图意概述

该时序图展示了 Elasticsearch 主节点如何通过 Raft 协议将新的集群状态发布到所有节点,包括 Diff 优化、Quorum 确认和提交过程。

2. 关键路径

  • 准备阶段: Coordinator → Publication → PublicationContext → 序列化全量/Diff 状态
  • 发布阶段: 并发发送到所有节点 → 各节点反序列化并应用 → 返回确认
  • 提交阶段: 等待 Quorum 确认 → 发送提交请求 → 各节点持久化状态

3. 关键优化

  • Diff 传输: 对已存在的节点发送 Diff,大幅减少网络传输量
  • 版本分组: 按 TransportVersion 分组序列化,复用序列化结果
  • 压缩传输: 使用 deflate 压缩,进一步减少传输量
  • 本地优化: 主节点本地不经过序列化,直接应用状态

4. 异常处理

  • Diff 应用失败: 节点抛出 IncompatibleClusterStateVersionException,主节点自动重新发送全量状态
  • 网络超时: 30s 超时,超时节点不影响 Quorum 确认
  • 节点不可用: 跳过不可达节点,不影响发布成功
  • Quorum 未达成: 发布失败,回滚状态,触发新一轮选举

5. 性能要点

  • 并发发送: 主节点同时向所有节点发送状态,最大化吞吐
  • 压缩比: 集群状态通常压缩比 5:1 ~ 10:1
  • 序列化复用: 相同 TransportVersion 的节点共享序列化结果
  • 异步处理: 整个流程基于异步回调,不阻塞主线程

6. 一致性保证

  • term 单调递增: 每次主节点变更 term 递增,防止脑裂
  • version 单调递增: 集群状态版本单调递增
  • Quorum 机制: 必须有超过半数节点确认才能提交
  • 两阶段提交: 先发布(预提交),Quorum 确认后提交

8. 核心流程时序图

8.1 标准请求-响应流程

sequenceDiagram
    autonumber
    participant Client as 客户端模块<br/>(SearchService)
    participant TS as TransportService
    participant CM as ConnectionManager
    participant Conn as Connection
    participant Out as OutboundHandler
    participant Network as 网络层<br/>(Netty)
    participant RemoteNetwork as 远程节点<br/>网络层
    participant In as InboundHandler<br/>(远程节点)
    participant Handler as RequestHandler<br/>(远程节点)

    Note over Client,Handler: 发送请求阶段

    Client->>TS: sendRequest(node, action, request, handler)
    TS->>CM: getConnection(node)
    CM-->>TS: Connection

    TS->>TS: 生成 requestId
    TS->>TS: 注册 responseHandler<br/>(requestId  handler)

    TS->>Conn: sendRequest(requestId, action, request, options)
    Conn->>Out: sendMessage(requestId, action, request)

    Out->>Out: 序列化请求<br/>serialize(request)
    Out->>Out: 构建消息头<br/>(requestId, action, version)

    alt 启用压缩
        Out->>Out: 压缩消息体<br/>compress(payload)
    end

    Out->>Network: write(ByteBuffer)
    Network->>RemoteNetwork: TCP 传输

    Note over RemoteNetwork,Handler: 处理请求阶段

    RemoteNetwork->>In: onMessage(ByteBuffer)
    In->>In: 解析消息头<br/>parseHeader()

    alt 压缩的消息
        In->>In: 解压缩<br/>decompress(payload)
    end

    In->>In: 反序列化请求<br/>deserialize(payload)
    In->>In: 查找处理器<br/>findHandler(action)

    In->>Handler: messageReceived(request, channel, task)
    Handler->>Handler: 执行业务逻辑<br/>processRequest()
    Handler->>Handler: 构建响应<br/>buildResponse()

    Note over Handler,Client: 发送响应阶段

    Handler->>In: channel.sendResponse(response)
    In->>Out: sendResponse(requestId, response)

    Out->>Out: 序列化响应<br/>serialize(response)
    Out->>Out: 构建响应消息头

    Out->>RemoteNetwork: write(ByteBuffer)
    RemoteNetwork->>Network: TCP 传输

    Network->>In: onMessage(ByteBuffer)<br/>(响应消息)
    In->>In: 解析响应消息头
    In->>In: 查找响应处理器<br/>responseHandlers.get(requestId)

    In->>In: 反序列化响应<br/>deserialize(payload, handler.reader())

    In->>Client: handler.handleResponse(response)
    Client->>Client: 处理响应结果

7.2 连接建立流程

sequenceDiagram
    autonumber
    participant CM as ConnectionManager
    participant Transport as Transport
    participant TCP as TCP层<br/>(Netty)
    participant RemoteTCP as 远程节点<br/>TCP层
    participant RemoteTS as 远程<br/>TransportService

    Note over CM,RemoteTS: 建立 TCP 连接

    CM->>Transport: openConnection(node, profile, listener)
    Transport->>TCP: connect(node.address)

    TCP->>RemoteTCP: TCP SYN
    RemoteTCP-->>TCP: TCP SYN-ACK
    TCP->>RemoteTCP: TCP ACK

    Note over TCP,RemoteTCP: TCP 连接建立成功

    Note over Transport,RemoteTS: Handshake (握手)

    Transport->>Transport: 构建 HandshakeRequest<br/>(version, clusterName, nodeId)
    Transport->>RemoteTCP: send(HandshakeRequest)

    RemoteTCP->>RemoteTS: handleHandshakeRequest()
    RemoteTS->>RemoteTS: 验证:<br/>1. clusterName 匹配<br/>2. version 兼容<br/>3. 许可证检查

    alt 验证通过
        RemoteTS->>RemoteTCP: send(HandshakeResponse<br/>(success, nodeInfo))
        RemoteTCP->>Transport: receive(HandshakeResponse)
        Transport->>CM: 连接建立成功<br/>listener.onResponse(connection)
    else 验证失败
        RemoteTS->>RemoteTCP: send(HandshakeResponse<br/>(error, reason))
        RemoteTCP->>Transport: receive(HandshakeResponse)
        Transport->>Transport: close connection
        Transport->>CM: listener.onFailure(exception)
    end

7.3 本地优化流程

sequenceDiagram
    autonumber
    participant Client as 客户端模块
    participant TS as TransportService
    participant LocalConn as LocalConnection
    participant Handler as RequestHandler<br/>(本地)

    Client->>TS: sendRequest(localNode, action, request, handler)
    TS->>TS: isLocalNode(node)?<br/>检查是否本地节点

    Note over TS: 本地优化:直接调用处理器

    TS->>LocalConn: sendRequest(requestId, action, request)
    LocalConn->>LocalConn: 不序列化<br/>直接传递对象引用

    LocalConn->>Handler: messageReceived(request, channel, task)
    Handler->>Handler: 处理请求
    Handler->>Handler: 构建响应

    Handler->>LocalConn: channel.sendResponse(response)
    LocalConn->>LocalConn: 不序列化<br/>直接传递对象引用

    LocalConn->>Client: handler.handleResponse(response)
    Client->>Client: 处理响应

    Note over Client,Handler: 零网络开销,零序列化开销

8. 配置与可观测

8.1 关键配置

配置项 默认值 说明
transport.tcp.port 9300-9400 TCP 传输端口范围
transport.tcp.connect_timeout 30s 连接超时
transport.tcp.compress false 是否启用压缩
transport.tcp.compression_scheme deflate 压缩算法
transport.connections_per_node.recovery 2 恢复连接数
transport.connections_per_node.bulk 3 批量连接数
transport.connections_per_node.reg 6 常规连接数
transport.connections_per_node.state 1 状态连接数
transport.connections_per_node.ping 1 Ping 连接数

8.2 监控指标

传输统计

GET /_nodes/stats/transport

{
  "nodes": {
    "node1": {
      "transport": {
        "server_open": 13,        # 服务端打开连接数
        "rx_count": 12345,        # 接收消息数
        "rx_size_in_bytes": 1234567, # 接收字节数
        "tx_count": 12340,        # 发送消息数
        "tx_size_in_bytes": 1234560  # 发送字节数
      }
    }
  }
}

连接统计

  • server_open: 当前服务端连接数
  • rx_count: 累计接收消息数
  • tx_count: 累计发送消息数
  • rx_size_in_bytes: 累计接收字节数
  • tx_size_in_bytes: 累计发送字节数