Elasticsearch-05-传输层
本文档提供传输层模块的全面剖析,包括模块职责、架构设计、核心数据结构、通信协议、连接管理、关键流程时序图、配置调优和监控。
1. 模块职责
传输层是 Elasticsearch 节点间通信的基础设施,负责所有分布式操作的底层数据传输。
1.1 核心职责
-
节点间通信
- 建立和管理节点之间的 TCP 连接
- 发送请求和接收响应
- 支持双向通信(请求-响应模式)
-
连接管理
- 连接池管理
- 连接健康检查
- 连接重连和故障转移
-
请求路由
- 根据 Action 名称路由请求到正确的处理器
- 支持本地优化(本地节点请求不走网络)
- 支持代理转发(跨集群通信)
-
序列化与反序列化
- 请求/响应对象的序列化
- 版本兼容性处理
- 压缩支持
-
超时与重试
- 请求超时检测
- 响应超时处理
- 错误处理和异常传播
-
性能优化
- 连接复用
- 批量发送
- 零拷贝优化
1.2 输入与输出
输入:
- 来自上层模块的请求(如搜索、索引、集群状态发布)
- 来自其他节点的网络数据包
输出:
- 网络数据包发送到目标节点
- 响应回调到上层模块
1.3 依赖关系
上游依赖:
- Netty: 底层 NIO 框架
- ThreadPool: 线程池管理
- TaskManager: 任务管理
下游依赖者:
- Cluster Module: 集群状态发布
- Index Module: 分片复制
- Search Module: 分布式搜索
2. 模块架构
2.1 整体架构图
flowchart TB
subgraph "上层模块"
direction LR
SearchAction[SearchTransportService<br/>搜索请求]
ReplicationAction[TransportReplicationAction<br/>复制请求]
ClusterAction[PublicationTransportHandler<br/>集群状态发布]
end
subgraph "传输服务层 TransportService"
direction TB
TS[TransportService<br/>传输服务门面]
subgraph "连接管理"
CM[ClusterConnectionManager<br/>连接管理器]
ConnPool[连接池<br/>connectedNodes]
ConnValidator[ConnectionValidator<br/>连接验证器]
end
subgraph "请求处理"
RH[RequestHandlers<br/>请求处理器注册表]
ResponseHandlers[ResponseHandlers<br/>响应处理器映射]
TaskMgr[TaskManager<br/>任务管理器]
end
subgraph "拦截与监控"
Interceptor[TransportInterceptor<br/>传输拦截器]
Listener[TransportMessageListener<br/>消息监听器]
end
end
subgraph "传输抽象层 Transport"
direction TB
Transport[Transport接口<br/>传输层抽象]
Connection[Connection接口<br/>连接抽象]
LocalConn[LocalNodeConnection<br/>本地连接优化]
end
subgraph "TCP传输实现层 TcpTransport"
direction TB
TcpTransport[TcpTransport<br/>TCP传输实现]
subgraph "出站路径"
OutboundHandler[OutboundHandler<br/>出站处理器]
Serializer[序列化器<br/>serialize]
Compressor[压缩器<br/>compress]
end
subgraph "入站路径"
InboundHandler[InboundHandler<br/>入站处理器]
Deserializer[反序列化器<br/>deserialize]
Decompressor[解压缩器<br/>decompress]
end
subgraph "连接实现"
NodeChannels[NodeChannels<br/>节点通道组]
TcpChannel[TcpChannel<br/>TCP通道]
ChannelStats[ChannelStats<br/>通道统计]
end
end
subgraph "网络层 Netty"
direction LR
NettyTransport[Netty4Transport<br/>Netty实现]
NioEventLoop[NioEventLoopGroup<br/>事件循环组]
NettyChannel[Netty Channel<br/>网络通道]
ByteBuf[ByteBuf<br/>零拷贝缓冲]
end
SearchAction --> TS
ReplicationAction --> TS
ClusterAction --> TS
TS --> CM
TS --> RH
TS --> ResponseHandlers
TS --> Interceptor
TS --> Listener
TS --> TaskMgr
CM --> ConnPool
CM --> ConnValidator
CM --> Transport
TS --> Connection
Connection --> LocalConn
Connection --> Transport
Transport --> TcpTransport
TcpTransport --> OutboundHandler
TcpTransport --> InboundHandler
TcpTransport --> NodeChannels
OutboundHandler --> Serializer
OutboundHandler --> Compressor
OutboundHandler --> TcpChannel
InboundHandler --> Deserializer
InboundHandler --> Decompressor
InboundHandler --> RH
NodeChannels --> TcpChannel
TcpChannel --> ChannelStats
TcpChannel --> NettyTransport
NettyTransport --> NioEventLoop
NettyTransport --> NettyChannel
NettyChannel --> ByteBuf
2.2 架构说明
层次划分
1. 传输服务层 (Service Layer)
- 职责: 提供高层次的传输服务 API
- 核心组件:
- TransportService: 传输服务门面,协调所有传输操作
- ClusterConnectionManager: 管理集群内所有节点连接
- RequestHandlers: 维护 Action → Handler 的映射
- ResponseHandlers: 维护 RequestId → ResponseHandler 的映射
- TaskManager: 管理所有传输任务的生命周期
- TransportInterceptor: 拦截器链,用于权限控制、监控等
- 关键特性: 请求路由、连接管理、超时控制、任务追踪
2. 传输抽象层 (Abstract Layer)
- 职责: 定义传输接口,解耦具体实现
- 核心组件:
- Transport 接口: 定义传输层核心契约
- Connection 接口: 定义连接抽象
- LocalNodeConnection: 本地节点优化实现(零拷贝)
- 关键特性: 可插拔、可测试、支持本地优化
3. TCP传输实现层 (Implementation Layer)
- 职责: 基于 TCP 的具体实现
- 核心组件:
- TcpTransport: TCP 传输层实现基类
- OutboundHandler: 处理所有出站消息(序列化、压缩)
- InboundHandler: 处理所有入站消息(解压、反序列化、路由)
- NodeChannels: 每个节点的连接通道组(按类型分组)
- TcpChannel: 单个 TCP 通道的抽象
- 关键特性: 高性能、低延迟、连接池管理、压缩支持
4. 网络层 (Network Layer)
- 职责: 底层网络 I/O
- 核心组件:
- Netty4Transport: Netty 4 的具体实现
- NioEventLoopGroup: 事件循环组(Boss + Worker)
- Netty Channel: Netty 原生网络通道
- ByteBuf: Netty 零拷贝缓冲区
- 关键特性: 异步、非阻塞、零拷贝、高吞吐
边界条件
- 并发: 支持高并发请求(万级 QPS)
- 超时:
- 连接超时: 默认 30s
- 请求超时: 可配置(默认无限制)
- Handshake 超时: 30s
- 重试: 自动重试网络错误(连接失败、超时)
- 限流: 通过线程池大小控制并发
异常处理
- 连接失败: 标记节点为不可用,触发重连
- 请求超时: 回调 handler.handleException(TimeoutException)
- 序列化错误: 抛出 NotSerializableTransportException
- 版本不兼容: 握手失败,拒绝连接
性能要点
- 连接复用: 长连接,避免频繁建连
- 零拷贝: 使用 DirectByteBuffer
- 批量发送: 聚合小请求
- 压缩: 可选的请求/响应压缩
2.3 模块间交互矩阵
下表展示了传输层与上游模块之间的交互关系:
| 上游模块 | 主要 Action | 请求类型 | 同步/异步 | 超时设置 | 错误处理 | 一致性要求 |
|---|---|---|---|---|---|---|
| SearchTransportService | indices:data/read/search[phase/query] |
Query 阶段 | 异步 | 可配置(默认无限制) | 重试失败分片 | 最终一致 |
indices:data/read/search[phase/fetch] |
Fetch 阶段 | 异步 | 同 Query | 失败则返回部分结果 | 最终一致 | |
indices:data/read/search[phase/dfs] |
DFS 阶段 | 异步 | 同 Query | 重试 | 最终一致 | |
| TransportReplicationAction | indices:data/write/index[p] |
主分片写入 | 同步 | 1 分钟 | 重试 + 分片失败处理 | 强一致(Quorum) |
indices:data/write/index[r] |
副本分片写入 | 同步 | 同主分片 | 标记分片为失败 | 强一致 | |
indices:data/write/bulk[s][p/r] |
批量写入 | 同步 | 可配置 | 部分失败继续 | 强一致 | |
| PublicationTransportHandler | internal:cluster/coordination/publish_state |
集群状态发布 | 异步 | 30s | 重试 + 节点标记 | 强一致(Raft) |
internal:cluster/coordination/commit_state |
状态提交确认 | 异步 | 30s | 记录日志 | 强一致 | |
| MasterService | internal:cluster/coordination/join |
节点加入集群 | 同步 | 60s | 拒绝并重新发现 | 强一致 |
internal:discovery/zen/fd/ping |
节点心跳检测 | 异步 | 3s | 标记节点失效 | 无 | |
| RecoveryTarget | internal:index/shard/recovery/start |
分片恢复开始 | 同步 | 30s | 取消恢复 | 强一致 |
internal:index/shard/recovery/file_chunk |
恢复文件块 | 异步 | 5m | 重试 | 强一致 | |
| SnapshotShardsService | internal:cluster/snapshot/update_snapshot |
快照状态更新 | 异步 | 30s | 重试 | 最终一致 |
交互特点说明
1. 图意概述
该矩阵展示了 Elasticsearch 各上游模块如何通过传输层进行通信,明确了不同场景下的通信模式、容错策略和一致性保证。
2. 关键字段
- Action: Transport Action 的名称,是请求路由的唯一标识
- 请求类型: 请求的业务语义(查询、写入、协调等)
- 同步/异步: 调用方是否阻塞等待响应
- 同步:使用 Future 或 CountDownLatch 等待
- 异步:使用 ActionListener 回调
- 超时设置: 请求超时时间,超时后会触发 TimeoutException
- 错误处理: 失败时的处理策略(重试、降级、失败)
- 一致性要求: 数据一致性级别(强一致、最终一致)
3. 边界条件
- 并发限制: 每个节点同时处理的请求数受线程池大小限制
- 请求大小: 单个请求最大 100MB(可配置)
- 连接数: 每对节点间维护 13 个连接(recovery=2, bulk=3, reg=6, state=1, ping=1)
- 超时处理: 超时请求会自动清理,释放资源
4. 异常处理
- 网络异常: 自动重试(ConnectException、SocketTimeoutException)
- 节点不可用: 标记节点失败,触发重新路由
- 序列化失败: 返回错误给调用方,不重试
- 请求超时: 取消请求,回调 handler.handleException()
5. 性能要点
- 本地优化: 本地节点请求直接调用 Handler,零序列化开销
- 批量优化: 批量请求共享连接,减少网络往返
- 连接复用: 长连接避免频繁握手
- 压缩优化: 大请求自动压缩(阈值可配置)
6. 版本兼容
- 握手协议: 建立连接时验证版本兼容性
- 向后兼容: 支持 N-1 版本的节点通信
- 协议演进: 通过 TransportVersion 控制序列化格式
3. 核心组件详解
3.1 TransportService (传输服务)
职责
TransportService 是传输层的门面,提供发送请求和注册处理器的 API。
核心方法
public class TransportService extends AbstractLifecycleComponent {
protected final Transport transport;
protected final ConnectionManager connectionManager;
protected final ThreadPool threadPool;
private final TaskManager taskManager;
private final TransportInterceptor interceptor;
// 发送请求
public <T extends TransportResponse> void sendRequest(
DiscoveryNode node,
String action,
TransportRequest request,
TransportResponseHandler<T> handler
) {
// 1. 获取连接
Transport.Connection connection = getConnection(node);
// 2. 通过拦截器发送
asyncSender.sendRequest(connection, action, request, options, handler);
}
// 注册请求处理器
public <Request extends TransportRequest> void registerRequestHandler(
String action,
Executor executor,
Writeable.Reader<Request> requestReader,
TransportRequestHandler<Request> handler
) {
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action,
requestReader,
taskManager,
handler,
executor
);
transport.registerRequestHandler(reg);
}
// 连接到节点
public void connectToNode(
DiscoveryNode node,
ConnectionProfile connectionProfile,
ActionListener<Releasable> listener
) {
connectionManager.connectToNode(node, connectionProfile, connectionValidator, listener);
}
}
本地优化
// 如果目标是本地节点,直接调用处理器,不走网络
if (isLocalNode(node)) {
return localNodeConnection;
}
3.2 ConnectionManager (连接管理器)
职责
管理到所有节点的连接,提供连接池功能。
核心方法
public interface ConnectionManager extends Closeable {
// 连接到节点
void connectToNode(
DiscoveryNode node,
ConnectionProfile connectionProfile,
ConnectionValidator validator,
ActionListener<Releasable> listener
);
// 获取连接
Transport.Connection getConnection(DiscoveryNode node);
// 断开连接
void disconnectFromNode(DiscoveryNode node);
// 获取所有已连接节点
Set<DiscoveryNode> getAllConnectedNodes();
}
ConnectionProfile (连接配置)
public class ConnectionProfile {
private int numConnectionsPerType; // 每种类型的连接数
private TimeValue connectTimeout; // 连接超时
private TimeValue handshakeTimeout; // 握手超时
private boolean compressionEnabled; // 是否启用压缩
// 连接类型
public enum ConnectionTypeHandle {
RECOVERY, // 恢复连接
BULK, // 批量操作连接
REG, // 常规连接
STATE, // 集群状态连接
PING // Ping 连接
}
}
3.3 Transport (传输抽象接口)
职责
定义传输层的抽象接口,解耦具体实现。
核心接口
public interface Transport extends LifecycleComponent {
// 发送请求
void sendRequest(
Connection connection,
long requestId,
String action,
TransportRequest request,
TransportRequestOptions options
);
// 打开连接
void openConnection(
DiscoveryNode node,
ConnectionProfile profile,
ActionListener<Connection> listener
);
// 注册请求处理器
<Request extends TransportRequest> void registerRequestHandler(
RequestHandlerRegistry<Request> reg
);
// 响应处理器
interface ResponseHandlers {
TransportResponseHandler<?> onResponseReceived(
long requestId,
TransportMessageListener listener
);
void onResponseSent(long requestId);
}
}
3.4 TcpTransport (TCP 实现)
职责
基于 TCP 和 Netty 的传输层实现。
核心组件
public abstract class TcpTransport extends AbstractLifecycleComponent implements Transport {
private final OutboundHandler outboundHandler;
private final InboundHandler inboundHandler;
private final ResponseHandlers responseHandlers;
// 发送请求
@Override
public void sendRequest(
Connection connection,
long requestId,
String action,
TransportRequest request,
TransportRequestOptions options
) throws TransportException {
// 1. 序列化请求
BytesReference message = serialize(requestId, action, request);
// 2. 通过出站处理器发送
outboundHandler.sendMessage(connection.getChannel(), message, listener);
}
// 处理入站数据
protected void handleInboundMessage(
TcpChannel channel,
InboundMessage message
) {
if (message.isRequest()) {
inboundHandler.handleRequest(channel, message);
} else {
inboundHandler.handleResponse(channel, message);
}
}
}
InboundHandler (入站处理器)
public class InboundHandler {
// 处理请求
public void handleRequest(TcpChannel channel, InboundMessage message) {
// 1. 反序列化请求
TransportRequest request = deserialize(message);
// 2. 查找处理器
RequestHandlerRegistry handler = requestHandlers.get(message.getAction());
// 3. 执行处理器
executor.execute(() -> {
try {
handler.processRequest(request, channel);
} catch (Exception e) {
sendErrorResponse(channel, message.getRequestId(), e);
}
});
}
// 处理响应
public void handleResponse(TcpChannel channel, InboundMessage message) {
// 1. 查找响应处理器
TransportResponseHandler handler = responseHandlers.onResponseReceived(
message.getRequestId()
);
// 2. 反序列化响应
TransportResponse response = deserialize(message, handler.reader());
// 3. 回调处理器
handler.handleResponse(response);
}
}
OutboundHandler (出站处理器)
public class OutboundHandler {
// 发送消息
public void sendMessage(
TcpChannel channel,
BytesReference message,
ActionListener<Void> listener
) {
// 1. 压缩(如果启用)
if (compressionEnabled && message.length() > compressionThreshold) {
message = compress(message);
}
// 2. 写入通道
channel.sendMessage(message, listener);
}
}
4. 核心数据结构
4.1 TransportRequest & TransportResponse
classDiagram
class TransportRequest {
<<abstract>>
-TaskId parentTaskId
+setParentTask(TaskId)
+getParentTask()
}
class TransportResponse {
<<abstract>>
+writeTo(StreamOutput)
}
class TransportMessage {
<<interface>>
+writeTo(StreamOutput)
}
TransportMessage <|.. TransportRequest
TransportMessage <|.. TransportResponse
TransportRequest: 所有传输请求的基类 TransportResponse: 所有传输响应的基类 TransportMessage: 传输消息的标记接口
4.2 Connection & ConnectionProfile
classDiagram
class Connection {
<<interface>>
+getNode() DiscoveryNode
+sendRequest(long, String, TransportRequest, TransportRequestOptions)
+close()
}
class ConnectionProfile {
-Map~ConnectionTypeHandle, ConnectionTypeSettings~ connectionTypeSettings
-TimeValue connectTimeout
-TimeValue handshakeTimeout
-boolean compressionEnabled
+getNumConnections()
+getConnectTimeout()
}
class ConnectionTypeHandle {
<<enumeration>>
RECOVERY
BULK
REG
STATE
PING
}
ConnectionProfile --> ConnectionTypeHandle
Connection: 表示到某个节点的连接 ConnectionProfile: 连接配置,包含超时、连接数等 ConnectionTypeHandle: 连接类型(恢复、批量、常规、状态、Ping)
4.3 TransportResponseHandler
classDiagram
class TransportResponseHandler {
<<interface>>
+read(StreamInput) T
+handleResponse(T)
+handleException(TransportException)
+executor() String
}
class ActionListenerResponseHandler {
-ActionListener~T~ listener
-Writeable.Reader~T~ reader
-String executor
+handleResponse(T)
+handleException(TransportException)
}
TransportResponseHandler <|.. ActionListenerResponseHandler
5. 通信协议
5.1 消息格式
+------------------+
| Header (variable)| 消息头
+------------------+
| Payload (variable)| 消息体
+------------------+
Header 格式
+--------+--------+--------+--------+
| Length | Status | Version| Action |
| 4 bytes| 1 byte | 1 byte | variable |
+--------+--------+--------+--------+
| RequestId (8 bytes) |
+--------+--------+--------+--------+
| Features (4 bytes) |
+--------+--------+--------+--------+
字段说明:
Length: 消息总长度Status: 状态标志 (Request/Response/Error)Version: 传输协议版本Action: 操作名称 (如 “indices:data/write/index”)RequestId: 请求唯一标识符Features: 特性标志 (压缩、握手等)
5.2 Handshake 流程
sequenceDiagram
autonumber
participant Node1 as Node 1
participant Node2 as Node 2
Note over Node1,Node2: 建立 TCP 连接
Node1->>Node2: TCP SYN
Node2-->>Node1: TCP SYN-ACK
Node1->>Node2: TCP ACK
Note over Node1,Node2: Handshake (握手)
Node1->>Node2: HandshakeRequest<br/>(version, cluster_name, node_id)
Node2->>Node2: 验证:<br/>1. 集群名称匹配<br/>2. 版本兼容<br/>3. 许可证兼容
alt 验证通过
Node2-->>Node1: HandshakeResponse<br/>(version, node_info)
Note over Node1,Node2: 连接建立成功
else 验证失败
Node2-->>Node1: HandshakeResponse<br/>(error)
Node2->>Node2: 关闭连接
Note over Node1,Node2: 连接失败
end
6. 传输层 API
6.1 sendRequest - 发送请求
方法签名
public <T extends TransportResponse> void sendRequest(
DiscoveryNode node,
String action,
TransportRequest request,
TransportResponseHandler<T> handler
)
参数说明
| 参数 | 类型 | 说明 |
|---|---|---|
| node | DiscoveryNode | 目标节点 |
| action | String | 操作名称(如 “indices:data/write/index”) |
| request | TransportRequest | 请求对象 |
| handler | TransportResponseHandler |
响应处理器(回调) |
使用示例
// 发送搜索请求到另一个节点
transportService.sendRequest(
targetNode,
"indices:data/read/search",
searchRequest,
new TransportResponseHandler<SearchResponse>() {
@Override
public SearchResponse read(StreamInput in) throws IOException {
return new SearchResponse(in);
}
@Override
public void handleResponse(SearchResponse response) {
// 处理响应
}
@Override
public void handleException(TransportException exp) {
// 处理异常
}
@Override
public String executor() {
return ThreadPool.Names.SEARCH;
}
}
);
6.2 registerRequestHandler - 注册请求处理器
方法签名
public <Request extends TransportRequest> void registerRequestHandler(
String action,
Executor executor,
Writeable.Reader<Request> requestReader,
TransportRequestHandler<Request> handler
)
使用示例
// 注册搜索请求处理器
transportService.registerRequestHandler(
"indices:data/read/search",
ThreadPool.Names.SEARCH,
SearchRequest::new,
(request, channel, task) -> {
// 处理搜索请求
SearchResponse response = searchService.executeQueryPhase(request, (SearchTask) task);
channel.sendResponse(response);
}
);
6.3 常用 Action 名称
索引操作
| Action | 说明 |
|---|---|
| indices:data/write/index | 索引文档 |
| indices:data/write/bulk | 批量操作 |
| indices:data/write/delete | 删除文档 |
| indices:data/write/update | 更新文档 |
| indices:data/read/get | 获取文档 |
搜索操作
| Action | 说明 |
|---|---|
| indices:data/read/search | 搜索 |
| indices:data/read/search[phase/query] | Query Phase |
| indices:data/read/search[phase/fetch] | Fetch Phase |
| indices:data/read/scroll | Scroll |
集群操作
| Action | 说明 |
|---|---|
| cluster:monitor/health | 集群健康 |
| cluster:monitor/state | 集群状态 |
| cluster:admin/settings/update | 更新设置 |
| internal:cluster/coordination/join | 节点加入 |
| internal:cluster/coordination/publish_state | 发布状态 |
7. 上游模块调用链路分析
本章节详细分析三个典型上游模块如何使用传输层,从上游接口开始,自上而下展示完整的调用链路、关键代码和内部时序。
7.1 搜索请求路径
7.1.1 调用链路概览
搜索请求从协调节点发起,经过传输层发送到各个数据节点的分片,执行查询和获取,最后聚合结果返回。
flowchart TD
A[TransportSearchAction] --> B[SearchTransportService]
B --> C[TransportService.sendChildRequest]
C --> D[AsyncSender.sendRequest]
D --> E[ConnectionManager.getConnection]
E --> F[Transport.Connection]
F --> G[TcpTransport.sendRequest]
G --> H[OutboundHandler.sendRequest]
H --> I[OutboundHandler.serialize]
I --> J[TcpChannel.sendMessage]
J --> K[Netty Channel.write]
L[Netty Channel.read] --> M[InboundHandler.inboundMessage]
M --> N[InboundHandler.messageReceived]
N --> O[InboundHandler.handleRequest]
O --> P[RequestHandlerRegistry.processMessageReceived]
P --> Q[SearchService.executeQueryPhase]
Q --> R[构建 QuerySearchResult]
R --> S[TransportChannel.sendResponse]
S --> T[OutboundHandler.sendResponse]
7.1.2 关键代码 - 发送搜索请求
SearchTransportService 发起请求
// server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
public void sendExecuteQuery(
Transport.Connection connection,
final ShardSearchRequest request,
SearchTask task,
final ActionListener<SearchPhaseResult> listener
) {
// 1. 确定是否需要优化(单分片场景)
final boolean fetchDocuments = request.numberOfShards() == 1
&& (request.source() == null || request.source().rankBuilder() == null);
// 2. 选择响应读取器(QuerySearchResult 或 QueryFetchSearchResult)
Writeable.Reader<SearchPhaseResult> reader = fetchDocuments
? QueryFetchSearchResult::new
: in -> new QuerySearchResult(in, true);
// 3. 通过传输服务发送请求(使用子任务机制)
transportService.sendChildRequest(
connection,
QUERY_ACTION_NAME, // "indices:data/read/search[phase/query]"
request,
task,
new ConnectionCountingHandler<>(listener, reader, connection)
);
}
TransportService 处理请求
// server/src/main/java/org/elasticsearch/transport/TransportService.java
public <T extends TransportResponse> void sendChildRequest(
final Transport.Connection connection,
final String action,
final TransportRequest request,
final Task parentTask,
final TransportResponseHandler<T> handler
) {
// 1. 设置父任务 ID(用于任务追踪和取消)
request.setParentTask(parentTask.taskId());
// 2. 调用标准发送逻辑
sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler);
}
public <T extends TransportResponse> void sendRequest(
final Transport.Connection connection,
final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
// 1. 通过拦截器发送(支持权限控制、监控等)
asyncSender.sendRequest(connection, action, request, options, handler);
}
// 拦截器处理后的实际发送逻辑
private <T extends TransportResponse> void sendRequestInternal(
final Transport.Connection connection,
final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
// 1. 生成唯一的请求 ID
final long requestId = responseHandlers.newRequestId();
// 2. 设置超时处理器
final TimeoutHandler timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);
// 3. 注册响应处理器(requestId → handler)
try {
responseHandlers.add(requestId, new RequestHolder<>(
handler,
connection,
action,
timeoutHandler
));
} catch (Exception e) {
// 注册失败,立即触发异常回调
handler.handleException(new SendRequestTransportException(connection.getNode(), action, e));
return;
}
// 4. 通过底层连接发送请求
try {
connection.sendRequest(requestId, action, request, options);
} catch (Exception e) {
// 发送失败,移除响应处理器并触发异常
responseHandlers.remove(requestId);
handler.handleException(new SendRequestTransportException(connection.getNode(), action, e));
}
}
TcpTransport 连接实现
// server/src/main/java/org/elasticsearch/transport/TcpTransport.java (内部类 NodeChannels)
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
// 1. 检查连接状态
if (isClosing.get()) {
throw new NodeNotConnectedException(node, "connection already closed");
}
// 2. 根据请求类型选择通道(RECOVERY, BULK, REG, STATE, PING)
TcpChannel channel = channel(options.type());
// 3. 确定是否需要压缩
final Compression.Scheme schemeToUse = getCompressionScheme(request);
// 4. 通过出站处理器发送
outboundHandler.sendRequest(
node,
channel,
requestId,
action,
request,
options,
getTransportVersion(),
schemeToUse,
false // isHandshake = false
);
}
private Compression.Scheme getCompressionScheme(TransportRequest request) {
// 全局压缩 或 索引数据压缩
final boolean shouldCompress = compress == Compression.Enabled.TRUE
|| (compress == Compression.Enabled.INDEXING_DATA
&& request instanceof RawIndexingDataTransportRequest
&& ((RawIndexingDataTransportRequest) request).isRawIndexingData());
return shouldCompress ? compressionScheme : null;
}
OutboundHandler 序列化和发送
// server/src/main/java/org/elasticsearch/transport/OutboundHandler.java
void sendRequest(
final DiscoveryNode node,
final TcpChannel channel,
final long requestId,
final String action,
final TransportRequest request,
final TransportRequestOptions options,
final TransportVersion transportVersion,
final Compression.Scheme compressionScheme,
final boolean isHandshake
) throws IOException, TransportException {
// 调用通用发送逻辑
sendMessage(
channel,
MessageDirection.REQUEST,
action,
request,
requestId,
isHandshake,
compressionScheme,
transportVersion,
ResponseStatsConsumer.NONE,
() -> messageListener.onRequestSent(node, requestId, action, request, options)
);
}
private void sendMessage(
TcpChannel channel,
MessageDirection messageDirection,
String action,
Writeable writeable,
long requestId,
boolean isHandshake,
Compression.Scheme compressionScheme,
TransportVersion version,
ResponseStatsConsumer responseStatsConsumer,
Releasable onAfter
) throws IOException {
// 1. 序列化消息(包含头部和载荷)
final BytesReference message;
final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
try {
message = serialize(
messageDirection,
action,
requestId,
isHandshake,
version,
compressionScheme,
writeable,
threadPool.getThreadContext(),
byteStreamOutput
);
} catch (Exception e) {
logger.warn(() -> "failed to serialize outbound message [" + writeable + "]", e);
throw e;
}
// 2. 通过内部发送方法写入通道
internalSend(channel, message, () -> writeable.getClass().getName(), listener);
}
public static BytesReference serialize(
MessageDirection messageDirection,
String action,
long requestId,
boolean isHandshake,
TransportVersion version,
Compression.Scheme compressionScheme,
Writeable writeable,
ThreadContext threadContext,
RecyclerBytesStreamOutput byteStreamOutput
) throws IOException {
// 1. 跳过头部空间(稍后填充)
byteStreamOutput.skip(TcpHeader.HEADER_SIZE);
// 2. 写入线程上下文(tracing、headers 等)
threadContext.writeTo(byteStreamOutput);
// 3. 对于请求,写入 action 名称
if (messageDirection == MessageDirection.REQUEST) {
byteStreamOutput.writeString(action);
}
// 4. 序列化消息体(可能压缩)
final int variableHeaderLength = Math.toIntExact(byteStreamOutput.position() - TcpHeader.HEADER_SIZE);
BytesReference message = serializeMessageBody(writeable, compressionScheme, version, byteStreamOutput);
// 5. 构建状态标志
byte status = 0;
if (messageDirection != MessageDirection.REQUEST) {
status = TransportStatus.setResponse(status);
}
if (isHandshake) {
status = TransportStatus.setHandshake(status);
}
if (compressionScheme != null) {
status = TransportStatus.setCompress(status);
}
// 6. 回填头部
byteStreamOutput.seek(0);
TcpHeader.writeHeader(
byteStreamOutput,
requestId,
status,
version,
message.length() - TcpHeader.HEADER_SIZE,
variableHeaderLength
);
return message;
}
private void internalSend(
TcpChannel channel,
BytesReference reference,
Supplier<String> messageDescription,
ActionListener<Void> listener
) {
// 1. 记录访问时间
final long startTime = threadPool.rawRelativeTimeInMillis();
channel.getChannelStats().markAccessed(startTime);
// 2. 记录发送字节数
final long messageSize = reference.length();
// 3. 日志记录(如果启用)
TransportLogger.logOutboundMessage(channel, reference);
// 4. 清空线程上下文,写入 Netty 通道
try (var ignored = threadPool.getThreadContext().newEmptyContext()) {
channel.sendMessage(reference, new ActionListener<>() {
@Override
public void onResponse(Void v) {
statsTracker.markBytesWritten(messageSize);
listener.onResponse(v);
}
@Override
public void onFailure(Exception e) {
logger.warn(() -> "send message failed [channel: " + channel + "]", e);
listener.onFailure(e);
}
});
}
}
7.1.3 关键代码 - 接收和处理搜索请求
InboundHandler 接收消息
// server/src/main/java/org/elasticsearch/transport/InboundHandler.java
void inboundMessage(TcpChannel channel, InboundMessage message) throws Exception {
// 1. 记录访问时间
final long startTime = threadPool.rawRelativeTimeInMillis();
channel.getChannelStats().markAccessed(startTime);
// 2. 日志记录
TransportLogger.logInboundMessage(channel, message);
// 3. 处理消息(自动关闭)
if (message.isPing()) {
keepAlive.receiveKeepAlive(channel);
} else {
messageReceived(channel, message, startTime);
}
}
private void messageReceived(TcpChannel channel, InboundMessage message, long startTime) throws IOException {
final InetSocketAddress remoteAddress = channel.getRemoteAddress();
final Header header = message.getHeader();
ThreadContext threadContext = threadPool.getThreadContext();
try (var ignored = threadContext.newStoredContext()) {
// 1. 设置线程上下文(从消息头部恢复)
threadContext.setHeaders(header.getHeaders());
threadContext.putTransient("_remote_address", remoteAddress);
// 2. 根据消息类型分发
if (header.isRequest()) {
handleRequest(channel, message);
} else {
// 处理响应
TransportResponseHandler<?> responseHandler = responseHandlers.onResponseReceived(header.getRequestId());
if (responseHandler != null) {
executeResponseHandler(message, responseHandler, remoteAddress);
}
}
} finally {
// 3. 记录慢请求
final long took = threadPool.rawRelativeTimeInMillis() - startTime;
if (slowLogThresholdMs > 0 && took > slowLogThresholdMs) {
logSlowMessage(message, took, slowLogThresholdMs, responseHandler);
}
}
}
private <T extends TransportRequest> void handleRequest(TcpChannel channel, InboundMessage message) throws IOException {
final Header header = message.getHeader();
// 1. 处理握手请求
if (header.isHandshake()) {
handleHandshakeRequest(channel, message);
return;
}
// 2. 获取 action 和 requestId
final String action = header.getActionName();
final long requestId = header.getRequestId();
// 3. 查找请求处理器
final RequestHandlerRegistry<T> reg = requestHandlers.getHandler(action);
// 4. 创建传输通道(用于发送响应)
final TransportChannel transportChannel = new TcpTransportChannel(
outboundHandler,
channel,
action,
requestId,
header.getVersion(),
header.getCompressionScheme(),
reg,
false,
message.takeBreakerReleaseControl()
);
try {
// 5. 触发消息监听器
messageListener.onRequestReceived(requestId, action);
// 6. 反序列化请求
final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput());
T request = reg.newRequest(stream);
request.remoteAddress(channel.getRemoteAddress());
request.setRequestId(requestId);
// 7. 根据线程池类型执行
if (reg.getExecutor() == EsExecutors.DIRECT_EXECUTOR_SERVICE) {
// 直接在当前线程执行
doHandleRequest(reg, request, transportChannel);
} else {
// 提交到线程池执行
handleRequestForking(request, reg, transportChannel);
}
} catch (Exception e) {
sendErrorResponse(action, transportChannel, e);
}
}
private static <T extends TransportRequest> void doHandleRequest(
RequestHandlerRegistry<T> reg,
T request,
TransportChannel channel
) {
try {
reg.processMessageReceived(request, channel);
} catch (Exception e) {
sendErrorResponse(reg.getAction(), channel, e);
}
}
SearchService 执行查询
// server/src/main/java/org/elasticsearch/search/SearchService.java
// (注册的 Handler 在 SearchTransportService 中)
transportService.registerRequestHandler(
QUERY_ACTION_NAME, // "indices:data/read/search[phase/query]"
EsExecutors.DIRECT_EXECUTOR_SERVICE,
ShardSearchRequest::new,
(request, channel, task) -> searchService.executeQueryPhase(
request,
(SearchShardTask) task,
new ChannelActionListener<>(channel)
)
);
public void executeQueryPhase(
ShardSearchRequest request,
SearchShardTask task,
ActionListener<SearchPhaseResult> listener
) {
// 1. 创建搜索上下文
final SearchContext searchContext = createContext(request, task);
try {
// 2. 执行查询阶段
searchContext.preProcess();
QueryPhase.execute(searchContext);
// 3. 构建查询结果
QuerySearchResult queryResult = searchContext.queryResult();
// 4. 返回结果
listener.onResponse(queryResult);
} catch (Exception e) {
listener.onFailure(e);
} finally {
searchContext.close();
}
}
TcpTransportChannel 发送响应
// server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java
@Override
public void sendResponse(TransportResponse response) {
try {
outboundHandler.sendResponse(
version,
channel,
requestId,
action,
response,
compressionScheme,
false // isHandshake
);
} catch (Exception e) {
throw new SendRequestTransportException(channel.getRemoteAddress(), action, e);
}
}
7.1.4 搜索请求内部时序图
sequenceDiagram
autonumber
participant SearchAction as TransportSearchAction<br/>(协调节点)
participant SearchTransport as SearchTransportService
participant TS as TransportService
participant CM as ConnectionManager
participant Conn as TcpTransport.Connection
participant Out as OutboundHandler
participant Channel as TcpChannel/Netty
Note over SearchAction: Query 阶段开始
SearchAction->>SearchTransport: sendExecuteQuery(connection, request, task, listener)
SearchTransport->>SearchTransport: 确定响应类型<br/>(单分片优化)
SearchTransport->>TS: sendChildRequest(connection, QUERY_ACTION_NAME, request, task, handler)
TS->>TS: 设置 parentTask
TS->>TS: sendRequest(connection, action, request, options, handler)
TS->>TS: asyncSender.sendRequest()<br/>(拦截器处理)
TS->>TS: sendRequestInternal()
TS->>TS: 生成 requestId
TS->>TS: responseHandlers.add(requestId, handler)
TS->>Conn: connection.sendRequest(requestId, action, request, options)
Conn->>Conn: 选择通道 channel(type)
Conn->>Conn: 判断是否压缩
Conn->>Out: outboundHandler.sendRequest(node, channel, requestId, action, request, ...)
Out->>Out: sendMessage(channel, REQUEST, action, request, requestId, ...)
Out->>Out: serialize(REQUEST, action, requestId, request, ...)
Out->>Out: 1. 跳过头部空间
Out->>Out: 2. 写入 ThreadContext
Out->>Out: 3. 写入 action 名称
Out->>Out: 4. 序列化 request 体
Out->>Out: 5. 可选压缩
Out->>Out: 6. 回填头部
Out->>Channel: channel.sendMessage(bytesRef, listener)
Channel->>Channel: Netty write & flush
Note over Channel: 网络传输
Note over SearchAction,Channel: 数据节点接收和处理
sequenceDiagram
autonumber
participant Channel as TcpChannel/Netty<br/>(数据节点)
participant In as InboundHandler
participant RH as RequestHandlerRegistry
participant SearchService as SearchService
participant QueryPhase as QueryPhase
participant TChannel as TcpTransportChannel
participant Out as OutboundHandler
Note over Channel: 数据节点接收请求
Channel->>In: inboundMessage(channel, message)
In->>In: 记录访问时间
In->>In: 日志记录
In->>In: messageReceived(channel, message, startTime)
In->>In: 恢复 ThreadContext
In->>In: handleRequest(channel, message)
In->>In: 解析 header (action, requestId)
In->>RH: requestHandlers.getHandler(action)
RH-->>In: RequestHandlerRegistry
In->>In: 创建 TcpTransportChannel
In->>In: 反序列化 request
In->>In: 判断执行器类型
alt 直接执行 (DIRECT_EXECUTOR_SERVICE)
In->>RH: doHandleRequest(reg, request, channel)
RH->>SearchService: processMessageReceived(request, channel)<br/>executeQueryPhase(request, task, listener)
else 线程池执行
In->>RH: handleRequestForking(request, reg, channel)
RH->>SearchService: [异步] processMessageReceived(request, channel)
end
SearchService->>SearchService: 创建 SearchContext
SearchService->>QueryPhase: QueryPhase.execute(searchContext)
QueryPhase->>QueryPhase: 1. runQuery(searchContext)
QueryPhase->>QueryPhase: 2. executeInternal(searchContext)
QueryPhase->>QueryPhase: 3. 收集 TopDocs
QueryPhase-->>SearchService: QuerySearchResult
SearchService->>TChannel: listener.onResponse(queryResult)
TChannel->>Out: channel.sendResponse(queryResult)
Out->>Out: sendResponse(version, channel, requestId, action, response, ...)
Out->>Out: sendMessage(channel, RESPONSE, action, response, requestId, ...)
Out->>Out: serialize(RESPONSE, ..., response, ...)
Out->>Channel: channel.sendMessage(bytesRef, listener)
Channel->>Channel: Netty write & flush
Note over Channel: 响应返回协调节点
sequenceDiagram
autonumber
participant Channel as TcpChannel/Netty<br/>(协调节点)
participant In as InboundHandler
participant ResponseHandler as TransportResponseHandler<br/>(ActionListener)
participant SearchAction as TransportSearchAction
Note over Channel: 协调节点接收响应
Channel->>In: inboundMessage(channel, responseMessage)
In->>In: messageReceived(channel, responseMessage, startTime)
In->>In: handleResponse(channel, message)
In->>In: 解析 header (requestId, status)
In->>In: responseHandlers.onResponseReceived(requestId)
In-->>ResponseHandler: 获取 ResponseHandler
In->>In: executeResponseHandler(message, responseHandler, remoteAddress)
In->>In: 反序列化 response
In->>ResponseHandler: handler.handleResponse(response)
ResponseHandler->>SearchAction: onResponse(queryResult)
SearchAction->>SearchAction: 聚合结果<br/>继续 Fetch 阶段或返回
Note over SearchAction: Query 阶段完成
7.1.5 搜索请求流程说明
1. 图意概述
上述三个时序图展示了搜索请求从协调节点发送到数据节点、数据节点执行查询、响应返回协调节点的完整流程。
2. 关键路径
- 发送路径: TransportSearchAction → SearchTransportService → TransportService → Connection → OutboundHandler → TcpChannel → Netty
- 接收路径: Netty → TcpChannel → InboundHandler → RequestHandler → SearchService → QueryPhase
- 响应路径: TcpTransportChannel → OutboundHandler → Netty → InboundHandler → ResponseHandler → SearchAction
3. 关键优化
- 单分片优化: 单分片搜索时直接返回 QueryFetchSearchResult,省略 Fetch 阶段
- 直接执行: 搜索请求使用 DIRECT_EXECUTOR_SERVICE,避免线程切换
- 连接复用: 使用长连接和连接池,避免频繁建连
- 零拷贝: Netty ByteBuf 支持零拷贝,减少内存拷贝
4. 异常处理
- 超时: responseHandlers 中的 TimeoutHandler 会在超时后清理资源并触发异常回调
- 网络故障: 连接断开时会触发 handler.handleException(),搜索层会重试其他副本
- 序列化失败: 捕获异常后发送错误响应,不会影响其他分片
5. 性能要点
- 异步处理: 整个流程基于异步回调,避免阻塞
- 流式序列化: 使用 StreamOutput 流式序列化,避免大对象分配
- 压缩: 大响应自动压缩,减少网络传输
- 批量优化: 多个分片请求可以并发发送,充分利用网络带宽
6. 监控埋点
TransportMessageListener.onRequestSent(): 请求发送时触发TransportMessageListener.onRequestReceived(): 请求接收时触发TransportMessageListener.onResponseSent(): 响应发送时触发ChannelStats.markAccessed(): 记录通道访问时间StatsTracker.markBytesWritten(): 记录发送字节数
7.2 索引/复制请求路径
7.2.1 调用链路概览
索引请求首先路由到主分片,主分片执行写入后并发发送到所有副本分片,等待 Quorum 确认后返回。
flowchart TD
A[TransportBulkAction] --> B[TransportShardBulkAction]
B --> C[TransportReplicationAction]
C --> D[ReroutePhase: 路由到主分片]
D --> E[TransportService.sendRequest 主分片]
E --> F[PrimaryPhase: 主分片执行]
F --> G[ReplicationPhase: 发送到副本]
G --> H[ReplicasProxy.performOn]
H --> I[TransportService.sendRequest 副本]
I --> J[ReplicaPhase: 副本执行]
J --> K[等待 Quorum 确认]
K --> L[返回响应]
7.2.2 关键代码 - 主分片写入
TransportReplicationAction 初始化
// server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
public TransportReplicationAction(
Settings settings,
String actionName,
TransportService transportService,
ClusterService clusterService,
IndicesService indicesService,
ThreadPool threadPool,
ShardStateAction shardStateAction,
ActionFilters actionFilters,
Writeable.Reader<Request> requestReader,
Writeable.Reader<ReplicaRequest> replicaRequestReader,
Executor executor,
SyncGlobalCheckpointAfterOperation syncGlobalCheckpointAfterOperation,
PrimaryActionExecution primaryActionExecution,
ReplicaActionExecution replicaActionExecution
) {
// ...
this.transportPrimaryAction = actionName + "[p]"; // 主分片 action
this.transportReplicaAction = actionName + "[r]"; // 副本分片 action
// 1. 注册主请求处理器(入口)
transportService.registerRequestHandler(
actionName,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
requestReader,
this::handleOperationRequest
);
// 2. 注册主分片处理器
transportService.registerRequestHandler(
transportPrimaryAction,
executor,
forceExecutionOnPrimary,
true, // canTripCircuitBreaker
in -> new ConcreteShardRequest<>(requestReader, in),
this::handlePrimaryRequest
);
// 3. 注册副本分片处理器
transportService.registerRequestHandler(
transportReplicaAction,
executor,
true, // forceExecution - 副本不能拒绝
canTripCircuitBreakerOnReplica,
in -> new ConcreteReplicaRequest<>(replicaRequestReader, in),
this::handleReplicaRequest
);
}
ReroutePhase 路由到主分片
// TransportReplicationAction 内部类
final class ReroutePhase extends AbstractRunnable {
@Override
protected void doRun() {
// 1. 获取集群状态
final ClusterState state = clusterService.state();
// 2. 解析目标分片
resolveRequest(state.metadata().index(request.shardId().getIndex()), request);
// 3. 获取分片路由
final ShardRouting primary = state.routingTable().shardRoutingTable(request.shardId()).primaryShard();
// 4. 如果主分片不在本节点,转发请求
if (primary == null || primary.active() == false) {
retry(new NoShardAvailableActionException(request.shardId()));
} else {
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
if (node == null) {
retry(new NoNodeAvailableException("unknown node [" + primary.currentNodeId() + "]"));
} else {
performAction(node, primary.allocationId().getId(), primary.primaryTerm());
}
}
}
private void performAction(
final DiscoveryNode node,
final String primaryAllocationId,
final long primaryTerm
) {
// 5. 构建具体的分片请求
final ConcreteShardRequest<Request> concreteShardRequest = new ConcreteShardRequest<>(
request,
primaryAllocationId,
primaryTerm
);
// 6. 发送到主分片节点
final ActionListenerResponseHandler<Response> handler = new ActionListenerResponseHandler<>(
new RetryingListener(),
TransportReplicationAction.this::newResponseInstance,
TransportResponseHandler.TRANSPORT_WORKER
);
transportService.sendRequest(
node,
transportPrimaryAction, // actionName + "[p]"
concreteShardRequest,
transportOptions,
handler
);
}
}
PrimaryPhase 主分片执行
void handlePrimaryRequest(
final ConcreteShardRequest<Request> request,
final TransportChannel channel,
final Task task
) {
// 1. 获取主分片
final IndexShard indexShard = getIndexShard(request.getRequest().shardId());
// 2. 执行主分片操作(ReplicationOperation)
new AsyncPrimaryAction(request, channel, (ReplicationTask) task, indexShard).run();
}
class AsyncPrimaryAction extends AbstractRunnable {
@Override
protected void doRun() throws Exception {
// 1. 获取操作权限
acquirePrimaryOperationPermit(
indexShard,
request.getRequest(),
this,
request.getPrimaryTerm()
);
}
@Override
public void onResponse(Releasable releasable) {
try {
// 2. 执行主分片写入
final PrimaryResult result = shardOperationOnPrimary(
request.getRequest(),
indexShard
);
// 3. 创建复制操作
final ReplicationOperation<Request, ReplicaRequest, PrimaryResult> replicationOperation = new ReplicationOperation<>(
request.getRequest(),
indexShard.routingEntry(),
this,
newReplicasProxy(),
logger,
actionName
);
// 4. 执行复制
replicationOperation.execute();
} finally {
releasable.close();
}
}
}
ReplicasProxy 发送到副本
protected class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {
@Override
public void performOn(
final ShardRouting replica,
final ReplicaRequest request,
final long primaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<ReplicationOperation.ReplicaResponse> listener
) {
// 1. 获取副本节点
String nodeId = replica.currentNodeId();
final DiscoveryNode node = clusterService.state().nodes().get(nodeId);
if (node == null) {
listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));
return;
}
// 2. 构建副本请求
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(
request,
replica.allocationId().getId(),
primaryTerm,
globalCheckpoint,
maxSeqNoOfUpdatesOrDeletes
);
// 3. 发送到副本节点
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(
listener,
ReplicaResponse::new,
TransportResponseHandler.TRANSPORT_WORKER
);
transportService.sendRequest(
node,
transportReplicaAction, // actionName + "[r]"
replicaRequest,
transportOptions,
handler
);
}
}
7.2.3 关键代码 - 副本分片写入
ReplicaPhase 副本执行
void handleReplicaRequest(
final ConcreteReplicaRequest<ReplicaRequest> request,
final TransportChannel channel,
final Task task
) {
// 1. 获取副本分片
final IndexShard replica = getIndexShard(request.getRequest().shardId());
// 2. 执行副本操作
new AsyncReplicaAction(request, channel, (ReplicationTask) task, replica).run();
}
class AsyncReplicaAction extends AbstractRunnable {
@Override
protected void doRun() throws Exception {
// 1. 验证 allocation ID
final String actualAllocationId = this.replica.routingEntry().allocationId().getId();
if (actualAllocationId.equals(replicaRequest.getTargetAllocationID()) == false) {
throw new ShardNotFoundException(
this.replica.shardId(),
"expected allocation id [{}] but found [{}]",
replicaRequest.getTargetAllocationID(),
actualAllocationId
);
}
// 2. 获取副本操作权限
acquireReplicaOperationPermit(
replica,
replicaRequest.getRequest(),
this,
replicaRequest.getPrimaryTerm(),
replicaRequest.getGlobalCheckpoint(),
replicaRequest.getMaxSeqNoOfUpdatesOrDeletes()
);
}
@Override
public void onResponse(Releasable releasable) {
try {
// 3. 执行副本写入
shardOperationOnReplica(
replicaRequest.getRequest(),
replica
);
// 4. 构建副本响应
final ReplicaResponse response = new ReplicaResponse(
replica.getLocalCheckpoint(),
replica.getLastSyncedGlobalCheckpoint()
);
// 5. 发送响应
channel.sendResponse(response);
} catch (Exception e) {
channel.sendResponse(e);
} finally {
releasable.close();
}
}
}
7.2.4 索引/复制内部时序图
sequenceDiagram
autonumber
participant Client as TransportBulkAction<br/>(协调节点)
participant Reroute as ReroutePhase
participant TS as TransportService
participant Primary as PrimaryPhase<br/>(主分片节点)
participant Replication as ReplicationOperation
participant Proxy as ReplicasProxy
participant Replica1 as ReplicaPhase<br/>(副本1)
participant Replica2 as ReplicaPhase<br/>(副本2)
Note over Client,Replica2: 路由和主分片写入
Client->>Reroute: runReroutePhase(task, request, listener)
Reroute->>Reroute: 1. 获取集群状态
Reroute->>Reroute: 2. 查找主分片路由
Reroute->>Reroute: 3. 获取主分片节点
Reroute->>TS: sendRequest(node, transportPrimaryAction, concreteShardRequest, handler)
TS->>TS: sendRequestInternal()<br/>(生成 requestId, 注册 responseHandler)
TS->>Primary: [网络传输] 请求到达主分片节点
Primary->>Primary: handlePrimaryRequest(request, channel, task)
Primary->>Primary: getIndexShard(shardId)
Primary->>Primary: acquirePrimaryOperationPermit()
Primary->>Primary: shardOperationOnPrimary(request, indexShard)
Primary->>Primary: 执行写入:<br/>- 解析文档<br/>- 写入 Lucene<br/>- 更新 translog
Note over Primary,Replica2: 复制到副本分片
Primary->>Replication: ReplicationOperation.execute()
Replication->>Replication: 获取所有 in-sync 副本列表
par 并发发送到所有副本
Replication->>Proxy: performOn(replica1, request, primaryTerm, globalCheckpoint, listener)
Proxy->>TS: sendRequest(node, transportReplicaAction, replicaRequest, handler)
TS->>Replica1: [网络传输] 请求到达副本1
Replica1->>Replica1: handleReplicaRequest(request, channel, task)
Replica1->>Replica1: getIndexShard(shardId)
Replica1->>Replica1: 验证 allocation ID
Replica1->>Replica1: acquireReplicaOperationPermit()
Replica1->>Replica1: shardOperationOnReplica(request, replica)
Replica1->>Replica1: 执行写入:<br/>- 写入 Lucene<br/>- 更新 translog
Replica1->>Replica1: 构建 ReplicaResponse
Replica1->>TS: channel.sendResponse(replicaResponse)
TS->>Replication: [网络传输] 副本1响应
and
Replication->>Proxy: performOn(replica2, request, primaryTerm, globalCheckpoint, listener)
Proxy->>TS: sendRequest(node, transportReplicaAction, replicaRequest, handler)
TS->>Replica2: [网络传输] 请求到达副本2
Replica2->>Replica2: handleReplicaRequest(request, channel, task)
Replica2->>Replica2: getIndexShard(shardId)
Replica2->>Replica2: 验证 allocation ID
Replica2->>Replica2: acquireReplicaOperationPermit()
Replica2->>Replica2: shardOperationOnReplica(request, replica)
Replica2->>Replica2: 执行写入:<br/>- 写入 Lucene<br/>- 更新 translog
Replica2->>Replica2: 构建 ReplicaResponse
Replica2->>TS: channel.sendResponse(replicaResponse)
TS->>Replication: [网络传输] 副本2响应
end
Note over Replication: 等待 Quorum 确认 (默认 1 + ceil((replicas+1)/2))
Replication->>Primary: 所有副本完成或达到 Quorum
Primary->>Primary: 构建 PrimaryResponse
Primary->>TS: channel.sendResponse(primaryResponse)
TS->>Client: [网络传输] 主分片响应返回协调节点
Client->>Client: 处理响应<br/>onResponse(response)
Note over Client: 写入完成
7.2.5 索引/复制流程说明
1. 图意概述
该时序图展示了索引/复制请求从协调节点到主分片再到副本分片的完整流程,体现了 Elasticsearch 的主副同步机制。
2. 关键路径
- 路由阶段: TransportBulkAction → ReroutePhase → 查找主分片节点
- 主写入阶段: TransportService → PrimaryPhase → 写入主分片 → 创建 ReplicationOperation
- 副本复制阶段: ReplicationOperation → ReplicasProxy → 并发发送到所有副本
- 副本执行阶段: ReplicaPhase → 验证 allocation ID → 写入副本 → 返回响应
- 确认阶段: 等待 Quorum 确认 → 返回响应给协调节点
3. 关键设计
- Quorum 机制: 默认需要
1 + ceil((replicas+1)/2)个分片确认才返回成功 - allocation ID: 每个分片副本有唯一 ID,防止写入到过期副本
- primaryTerm: 主分片任期号,检测主分片切换
- globalCheckpoint: 全局检查点,用于一致性控制
- in-sync replicas: 只向同步中的副本发送请求,失败副本会被移除
4. 异常处理
- 主分片不可用: ReroutePhase 重试,等待集群状态更新
- 副本写入失败: 标记副本为失败,从 in-sync 列表移除,不影响请求成功
- 网络超时: 副本请求超时不影响主请求,但会标记副本失败
- allocation ID 不匹配: 拒绝请求,防止写入到过期副本
5. 性能要点
- 并发复制: 主分片同时向所有副本发送请求,最大化吞吐
- forceExecution: 副本处理器必须执行,不能因线程池满而拒绝
- bulk 优化: 批量请求共享网络连接和序列化开销
- translog: 写入 translog 后即可返回,不等待 Lucene flush
6. 监控埋点
ReplicationOperation.execute(): 记录复制开始时间ReplicationOperation.onPrimaryOperationComplete(): 记录主写入完成时间ReplicationOperation.onReplicaResponse(): 记录副本响应时间ShardStateAction.shardFailed(): 记录分片失败事件
7.3 集群状态发布路径
7.3.1 调用链路概览
集群状态发布是主节点向所有节点广播最新集群状态的过程,使用 Raft 协议保证强一致性。
flowchart TD
A[Coordinator] --> B[Publication]
B --> C[PublicationTransportHandler]
C --> D[构建 Diff 或 Full State]
D --> E[序列化和压缩]
E --> F[TransportService.sendChildRequest]
F --> G[并发发送到所有节点]
G --> H[InboundHandler 接收]
H --> I[反序列化 ClusterState]
I --> J[应用新状态]
J --> K[发送确认]
K --> L[等待 Quorum 确认]
L --> M[提交状态]
7.3.2 关键代码 - 发布集群状态
PublicationTransportHandler 初始化
// server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java
public PublicationTransportHandler(
TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry,
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit
) {
this.transportService = transportService;
this.handlePublishRequest = handlePublishRequest;
// 1. 注册集群状态发布处理器
transportService.registerRequestHandler(
PUBLISH_STATE_ACTION_NAME, // "internal:cluster/coordination/publish_state"
this.clusterCoordinationExecutor,
false,
false,
BytesTransportRequest::new,
(request, channel, task) -> channel.sendResponse(handleIncomingPublishRequest(request))
);
// 2. 注册状态提交处理器
transportService.registerRequestHandler(
COMMIT_STATE_ACTION_NAME, // "internal:cluster/coordination/commit_state"
this.clusterCoordinationExecutor,
false,
false,
ApplyCommitRequest::new,
(request, channel, task) -> handleApplyCommit.accept(request, new ChannelActionListener<>(channel))
);
}
Publication 发布状态
// PublicationTransportHandler 内部类
private class PublicationContext extends AbstractRefCounted {
private final ClusterState newState;
private final ClusterState previousState;
private final boolean sendFullVersion;
// 序列化后的状态(按版本分组)
private final Map<TransportVersion, ReleasableBytesReference> serializedStates = new ConcurrentHashMap<>();
private final Map<TransportVersion, ReleasableBytesReference> serializedDiffs = new ConcurrentHashMap<>();
// 节点连接
private final Map<DiscoveryNode, Transport.Connection> nodeConnections = new ConcurrentHashMap<>();
void buildDiffAndSerializeStates() {
// 1. 准备 Diff 计算器(懒加载)
final LazyInitializable<Diff<ClusterState>, RuntimeException> diffSupplier = new LazyInitializable<>(
() -> newState.diff(previousState)
);
// 2. 遍历所有节点
for (DiscoveryNode node : discoveryNodes) {
// 本地节点跳过序列化
if (node.equals(transportService.getLocalNode())) {
continue;
}
// 3. 获取连接
Transport.Connection connection;
try {
connection = transportService.getConnection(node);
} catch (NodeNotConnectedException e) {
logger.debug(() -> format("No connection to [%s] available, skipping serialization", node), e);
continue;
}
nodeConnections.put(node, connection);
// 4. 决定发送全量还是 Diff
if (sendFullVersion || previousState.nodes().nodeExists(node) == false) {
// 新节点或强制全量:序列化完整集群状态
serializedStates.computeIfAbsent(
connection.getTransportVersion(),
v -> serializeFullClusterState(newState, node, v)
);
} else {
// 已存在节点:序列化 Diff
serializedDiffs.computeIfAbsent(
connection.getTransportVersion(),
v -> serializeDiffClusterState(newState, diffSupplier.getOrCompute(), node, v)
);
}
}
}
public void sendPublishRequest(
DiscoveryNode destination,
PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> listener
) {
// 1. 本地节点直接调用
if (destination.equals(transportService.getLocalNode())) {
try {
listener.onResponse(handlePublishRequest.apply(publishRequest));
} catch (Exception e) {
listener.onFailure(e);
}
return;
}
// 2. 发送全量或 Diff
if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newStateVersion, destination);
sendFullClusterState(destination, listener);
} else {
logger.trace("sending cluster state diff for version [{}] to [{}]", newStateVersion, destination);
sendClusterStateDiff(destination, listener);
}
}
private void sendFullClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
// 1. 获取连接
Transport.Connection connection = nodeConnections.get(destination);
if (connection == null) {
listener.onFailure(new NodeNotConnectedException(destination, "No connection available"));
return;
}
// 2. 获取序列化的状态
var version = connection.getTransportVersion();
ReleasableBytesReference bytes = serializedStates.get(version);
if (bytes == null) {
bytes = serializedStates.computeIfAbsent(version, v -> serializeFullClusterState(newState, destination, v));
}
// 3. 发送
sendClusterState(connection, bytes, listener);
}
private void sendClusterStateDiff(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
// 1. 获取连接
Transport.Connection connection = nodeConnections.get(destination);
if (connection == null) {
listener.onFailure(new NodeNotConnectedException(destination, "No connection available"));
return;
}
// 2. 获取序列化的 Diff
final ReleasableBytesReference bytes = serializedDiffs.get(connection.getTransportVersion());
// 3. 发送(如果失败则回退到全量)
sendClusterState(connection, bytes, ActionListener.runAfter(listener.delegateResponse((delegate, e) -> {
if (e instanceof TransportException transportException) {
if (transportException.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
logger.debug("resending full cluster state to node {} reason {}", destination, transportException.getDetailedMessage());
sendFullClusterState(destination, delegate);
return;
}
}
delegate.onFailure(e);
}), this::decRef));
}
private void sendClusterState(
Transport.Connection connection,
ReleasableBytesReference bytes,
ActionListener<PublishWithJoinResponse> listener
) {
// 发送序列化的集群状态
if (bytes.tryIncRef() == false) {
listener.onFailure(new IllegalStateException("serialized cluster state released before transmission"));
return;
}
transportService.sendChildRequest(
connection,
PUBLISH_STATE_ACTION_NAME,
new BytesTransportRequest(bytes, connection.getTransportVersion()),
task,
STATE_REQUEST_OPTIONS, // 30s 超时
new CleanableResponseHandler<>(listener, PublishWithJoinResponse::new, clusterCoordinationExecutor, bytes::decRef)
);
}
}
序列化集群状态
private ReleasableBytesReference serializeFullClusterState(ClusterState clusterState, DiscoveryNode node, TransportVersion version) {
try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream()) {
final long uncompressedBytes;
// 使用压缩流
try (
StreamOutput stream = new PositionTrackingOutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream))
)
) {
stream.setTransportVersion(version);
stream.writeBoolean(true); // 标记为全量状态
clusterState.writeTo(stream);
uncompressedBytes = stream.position();
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, node);
}
final int size = bytesStream.size();
serializationStatsTracker.serializedFullState(uncompressedBytes, size);
logger.trace("serialized full cluster state version [{}] for node [{}] with size [{}]", clusterState.version(), node, size);
return bytesStream.moveToBytesReference();
}
}
private ReleasableBytesReference serializeDiffClusterState(
ClusterState clusterState,
Diff<ClusterState> diff,
DiscoveryNode node,
TransportVersion version
) {
try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream()) {
final long uncompressedBytes;
// 使用压缩流
try (
StreamOutput stream = new PositionTrackingOutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream))
)
) {
stream.setTransportVersion(version);
stream.writeBoolean(false); // 标记为 Diff
diff.writeTo(stream);
uncompressedBytes = stream.position();
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster state diff for publishing to node {}", e, node);
}
final int size = bytesStream.size();
serializationStatsTracker.serializedDiff(uncompressedBytes, size);
logger.trace("serialized cluster state diff for version [{}] for node [{}] with size [{}]", clusterState.version(), node, size);
return bytesStream.moveToBytesReference();
}
}
7.3.3 关键代码 - 接收和应用集群状态
接收发布请求
private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
// 1. 反序列化
final Compressor compressor = CompressorFactory.compressor(request.bytes());
StreamInput in = request.bytes().streamInput();
try {
if (compressor != null) {
in = new InputStreamStreamInput(compressor.threadLocalInputStream(in));
}
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
in.setTransportVersion(request.version());
// 2. 读取标志位
final boolean isFullState = in.readBoolean();
// 3. 构建 PublishRequest
final ClusterState incomingState;
if (isFullState) {
// 全量状态
incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
} else {
// Diff 状态
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, transportService.getLocalNode());
// 需要基于本地状态应用 diff
// 如果本地状态版本不匹配,会抛出 IncompatibleClusterStateVersionException
incomingState = diff.apply(lastSeenClusterState.get());
}
// 4. 更新本地已见状态
lastSeenClusterState.set(incomingState);
// 5. 调用应用逻辑
final PublishRequest publishRequest = new PublishRequest(incomingState);
return handlePublishRequest.apply(publishRequest);
} catch (IncompatibleClusterStateVersionException e) {
// Diff 应用失败,抛出异常让主节点重新发送全量状态
throw e;
}
}
应用集群状态
// server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
private PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
// 1. 验证发布请求
final ClusterState incomingState = publishRequest.getAcceptedState();
// 2. 应用集群状态
final ClusterState appliedState = clusterApplier.onNewClusterState(
"apply cluster state (from master [" + incomingState.nodes().getMasterNode() + "])",
() -> incomingState,
(source, e) -> logger.warn(() -> "failed to apply cluster state from master", e)
);
// 3. 构建响应(包含可选的 Join 请求)
return new PublishWithJoinResponse(
new PublishResponse(incomingState.term(), incomingState.version()),
joinOptional
);
}
7.3.4 集群状态发布内部时序图
sequenceDiagram
autonumber
participant Coordinator as Coordinator<br/>(主节点)
participant Publication as Publication
participant Context as PublicationContext
participant TS as TransportService
participant Node1 as 节点1
participant Node2 as 节点2
participant Node3 as 节点3
Note over Coordinator,Node3: 集群状态发布开始
Coordinator->>Publication: publish(clusterChangedEvent, ackListener)
Publication->>Context: new PublicationContext(newState, previousState, nodes)
Context->>Context: buildDiffAndSerializeStates()
Context->>Context: 遍历所有节点
loop 每个节点
Context->>Context: 获取 connection
alt 新节点或强制全量
Context->>Context: serializeFullClusterState(newState, node, version)
Context->>Context: 1. 创建压缩流
Context->>Context: 2. 写入 full state 标志
Context->>Context: 3. 序列化 ClusterState
Context->>Context: 4. 压缩
else 已存在节点
Context->>Context: serializeDiffClusterState(newState, diff, node, version)
Context->>Context: 1. 创建压缩流
Context->>Context: 2. 写入 diff 标志
Context->>Context: 3. 序列化 Diff
Context->>Context: 4. 压缩
end
end
Note over Context: 序列化完成,开始并发发送
par 并发发送到所有节点
Context->>TS: sendChildRequest(node1, PUBLISH_STATE_ACTION, request, listener)
TS->>Node1: [网络传输] 发送集群状态
Node1->>Node1: handleIncomingPublishRequest(request)
Node1->>Node1: 1. 解压缩
Node1->>Node1: 2. 读取 full/diff 标志
alt 全量状态
Node1->>Node1: ClusterState.readFrom(in)
else Diff 状态
Node1->>Node1: ClusterState.readDiffFrom(in)
Node1->>Node1: diff.apply(lastSeenClusterState)
end
Node1->>Node1: lastSeenClusterState.set(incomingState)
Node1->>Node1: handlePublishRequest(publishRequest)
Node1->>Node1: clusterApplier.onNewClusterState()
Node1->>Node1: 构建 PublishWithJoinResponse
Node1->>TS: channel.sendResponse(publishResponse)
TS->>Publication: [网络传输] 节点1确认
and
Context->>TS: sendChildRequest(node2, PUBLISH_STATE_ACTION, request, listener)
TS->>Node2: [网络传输] 发送集群状态
Node2->>Node2: handleIncomingPublishRequest(request)
Node2->>Node2: 解压和反序列化
Node2->>Node2: 应用集群状态
Node2->>TS: channel.sendResponse(publishResponse)
TS->>Publication: [网络传输] 节点2确认
and
Context->>TS: sendChildRequest(node3, PUBLISH_STATE_ACTION, request, listener)
TS->>Node3: [网络传输] 发送集群状态
Node3->>Node3: handleIncomingPublishRequest(request)
alt Diff 应用失败
Node3->>Node3: IncompatibleClusterStateVersionException
Node3->>TS: channel.sendResponse(exception)
TS->>Context: [网络传输] 节点3失败
Context->>TS: sendFullClusterState(node3, listener)
TS->>Node3: [网络传输] 重新发送全量状态
Node3->>Node3: ClusterState.readFrom(in)
Node3->>Node3: 应用集群状态
Node3->>TS: channel.sendResponse(publishResponse)
TS->>Publication: [网络传输] 节点3确认
else 成功
Node3->>Node3: 应用集群状态
Node3->>TS: channel.sendResponse(publishResponse)
TS->>Publication: [网络传输] 节点3确认
end
end
Note over Publication: 等待 Quorum 确认 (master + floor(nodes/2))
Publication->>Coordinator: onCompletion(publis成功)
Note over Coordinator,Node3: 提交集群状态
Coordinator->>Context: sendCommitToAllNodes()
par 并发发送提交请求
Context->>TS: sendRequest(node1, COMMIT_STATE_ACTION, commitRequest, listener)
TS->>Node1: [网络传输]
Node1->>Node1: handleApplyCommit(request)
Node1->>Node1: commitClusterState(term, version)
Node1->>TS: channel.sendResponse(Empty.INSTANCE)
and
Context->>TS: sendRequest(node2, COMMIT_STATE_ACTION, commitRequest, listener)
TS->>Node2: [网络传输]
Node2->>Node2: handleApplyCommit(request)
Node2->>TS: channel.sendResponse(Empty.INSTANCE)
and
Context->>TS: sendRequest(node3, COMMIT_STATE_ACTION, commitRequest, listener)
TS->>Node3: [网络传输]
Node3->>Node3: handleApplyCommit(request)
Node3->>TS: channel.sendResponse(Empty.INSTANCE)
end
Publication->>Coordinator: 提交完成
Note over Coordinator: 集群状态发布完成
7.3.5 集群状态发布流程说明
1. 图意概述
该时序图展示了 Elasticsearch 主节点如何通过 Raft 协议将新的集群状态发布到所有节点,包括 Diff 优化、Quorum 确认和提交过程。
2. 关键路径
- 准备阶段: Coordinator → Publication → PublicationContext → 序列化全量/Diff 状态
- 发布阶段: 并发发送到所有节点 → 各节点反序列化并应用 → 返回确认
- 提交阶段: 等待 Quorum 确认 → 发送提交请求 → 各节点持久化状态
3. 关键优化
- Diff 传输: 对已存在的节点发送 Diff,大幅减少网络传输量
- 版本分组: 按 TransportVersion 分组序列化,复用序列化结果
- 压缩传输: 使用 deflate 压缩,进一步减少传输量
- 本地优化: 主节点本地不经过序列化,直接应用状态
4. 异常处理
- Diff 应用失败: 节点抛出
IncompatibleClusterStateVersionException,主节点自动重新发送全量状态 - 网络超时: 30s 超时,超时节点不影响 Quorum 确认
- 节点不可用: 跳过不可达节点,不影响发布成功
- Quorum 未达成: 发布失败,回滚状态,触发新一轮选举
5. 性能要点
- 并发发送: 主节点同时向所有节点发送状态,最大化吞吐
- 压缩比: 集群状态通常压缩比 5:1 ~ 10:1
- 序列化复用: 相同 TransportVersion 的节点共享序列化结果
- 异步处理: 整个流程基于异步回调,不阻塞主线程
6. 一致性保证
- term 单调递增: 每次主节点变更 term 递增,防止脑裂
- version 单调递增: 集群状态版本单调递增
- Quorum 机制: 必须有超过半数节点确认才能提交
- 两阶段提交: 先发布(预提交),Quorum 确认后提交
8. 核心流程时序图
8.1 标准请求-响应流程
sequenceDiagram
autonumber
participant Client as 客户端模块<br/>(SearchService)
participant TS as TransportService
participant CM as ConnectionManager
participant Conn as Connection
participant Out as OutboundHandler
participant Network as 网络层<br/>(Netty)
participant RemoteNetwork as 远程节点<br/>网络层
participant In as InboundHandler<br/>(远程节点)
participant Handler as RequestHandler<br/>(远程节点)
Note over Client,Handler: 发送请求阶段
Client->>TS: sendRequest(node, action, request, handler)
TS->>CM: getConnection(node)
CM-->>TS: Connection
TS->>TS: 生成 requestId
TS->>TS: 注册 responseHandler<br/>(requestId → handler)
TS->>Conn: sendRequest(requestId, action, request, options)
Conn->>Out: sendMessage(requestId, action, request)
Out->>Out: 序列化请求<br/>serialize(request)
Out->>Out: 构建消息头<br/>(requestId, action, version)
alt 启用压缩
Out->>Out: 压缩消息体<br/>compress(payload)
end
Out->>Network: write(ByteBuffer)
Network->>RemoteNetwork: TCP 传输
Note over RemoteNetwork,Handler: 处理请求阶段
RemoteNetwork->>In: onMessage(ByteBuffer)
In->>In: 解析消息头<br/>parseHeader()
alt 压缩的消息
In->>In: 解压缩<br/>decompress(payload)
end
In->>In: 反序列化请求<br/>deserialize(payload)
In->>In: 查找处理器<br/>findHandler(action)
In->>Handler: messageReceived(request, channel, task)
Handler->>Handler: 执行业务逻辑<br/>processRequest()
Handler->>Handler: 构建响应<br/>buildResponse()
Note over Handler,Client: 发送响应阶段
Handler->>In: channel.sendResponse(response)
In->>Out: sendResponse(requestId, response)
Out->>Out: 序列化响应<br/>serialize(response)
Out->>Out: 构建响应消息头
Out->>RemoteNetwork: write(ByteBuffer)
RemoteNetwork->>Network: TCP 传输
Network->>In: onMessage(ByteBuffer)<br/>(响应消息)
In->>In: 解析响应消息头
In->>In: 查找响应处理器<br/>responseHandlers.get(requestId)
In->>In: 反序列化响应<br/>deserialize(payload, handler.reader())
In->>Client: handler.handleResponse(response)
Client->>Client: 处理响应结果
7.2 连接建立流程
sequenceDiagram
autonumber
participant CM as ConnectionManager
participant Transport as Transport
participant TCP as TCP层<br/>(Netty)
participant RemoteTCP as 远程节点<br/>TCP层
participant RemoteTS as 远程<br/>TransportService
Note over CM,RemoteTS: 建立 TCP 连接
CM->>Transport: openConnection(node, profile, listener)
Transport->>TCP: connect(node.address)
TCP->>RemoteTCP: TCP SYN
RemoteTCP-->>TCP: TCP SYN-ACK
TCP->>RemoteTCP: TCP ACK
Note over TCP,RemoteTCP: TCP 连接建立成功
Note over Transport,RemoteTS: Handshake (握手)
Transport->>Transport: 构建 HandshakeRequest<br/>(version, clusterName, nodeId)
Transport->>RemoteTCP: send(HandshakeRequest)
RemoteTCP->>RemoteTS: handleHandshakeRequest()
RemoteTS->>RemoteTS: 验证:<br/>1. clusterName 匹配<br/>2. version 兼容<br/>3. 许可证检查
alt 验证通过
RemoteTS->>RemoteTCP: send(HandshakeResponse<br/>(success, nodeInfo))
RemoteTCP->>Transport: receive(HandshakeResponse)
Transport->>CM: 连接建立成功<br/>listener.onResponse(connection)
else 验证失败
RemoteTS->>RemoteTCP: send(HandshakeResponse<br/>(error, reason))
RemoteTCP->>Transport: receive(HandshakeResponse)
Transport->>Transport: close connection
Transport->>CM: listener.onFailure(exception)
end
7.3 本地优化流程
sequenceDiagram
autonumber
participant Client as 客户端模块
participant TS as TransportService
participant LocalConn as LocalConnection
participant Handler as RequestHandler<br/>(本地)
Client->>TS: sendRequest(localNode, action, request, handler)
TS->>TS: isLocalNode(node)?<br/>检查是否本地节点
Note over TS: 本地优化:直接调用处理器
TS->>LocalConn: sendRequest(requestId, action, request)
LocalConn->>LocalConn: 不序列化<br/>直接传递对象引用
LocalConn->>Handler: messageReceived(request, channel, task)
Handler->>Handler: 处理请求
Handler->>Handler: 构建响应
Handler->>LocalConn: channel.sendResponse(response)
LocalConn->>LocalConn: 不序列化<br/>直接传递对象引用
LocalConn->>Client: handler.handleResponse(response)
Client->>Client: 处理响应
Note over Client,Handler: 零网络开销,零序列化开销
8. 配置与可观测
8.1 关键配置
| 配置项 | 默认值 | 说明 |
|---|---|---|
| transport.tcp.port | 9300-9400 | TCP 传输端口范围 |
| transport.tcp.connect_timeout | 30s | 连接超时 |
| transport.tcp.compress | false | 是否启用压缩 |
| transport.tcp.compression_scheme | deflate | 压缩算法 |
| transport.connections_per_node.recovery | 2 | 恢复连接数 |
| transport.connections_per_node.bulk | 3 | 批量连接数 |
| transport.connections_per_node.reg | 6 | 常规连接数 |
| transport.connections_per_node.state | 1 | 状态连接数 |
| transport.connections_per_node.ping | 1 | Ping 连接数 |
8.2 监控指标
传输统计
GET /_nodes/stats/transport
{
"nodes": {
"node1": {
"transport": {
"server_open": 13, # 服务端打开连接数
"rx_count": 12345, # 接收消息数
"rx_size_in_bytes": 1234567, # 接收字节数
"tx_count": 12340, # 发送消息数
"tx_size_in_bytes": 1234560 # 发送字节数
}
}
}
}
连接统计
server_open: 当前服务端连接数rx_count: 累计接收消息数tx_count: 累计发送消息数rx_size_in_bytes: 累计接收字节数tx_size_in_bytes: 累计发送字节数