Elasticsearch-07-Client模块
本文档提供Client模块的全面剖析,包括REST客户端、连接管理、请求构建、重试机制、核心API、数据结构和详细时序图。
1. 模块职责
Client模块提供与Elasticsearch集群通信的客户端实现,主要包括REST低级客户端。
1.1 核心职责
-
连接管理
- 节点连接池
- 连接健康检查
- 自动重连
- 失败节点标记
-
请求发送
- HTTP请求构建
- 请求序列化
- 响应反序列化
- 同步/异步执行
-
负载均衡
- 轮询选择节点
- 节点选择器(NodeSelector)
- 跳过失败节点
- 重试机制
-
错误处理
- 自动重试
- 失败节点管理
- 超时控制
- 异常转换
1.2 客户端类型
REST低级客户端(RestClient):
- 基于Apache HttpClient
- 处理HTTP连接
- 不关心请求/响应格式
- 适用于所有Elasticsearch REST API
Java High Level REST Client(已废弃):
- 基于低级客户端
- 提供类型安全的API
- 已在8.x废弃,推荐使用新的Java Client
2. 模块架构
2.1 整体架构图
flowchart TB
subgraph "用户层"
User[用户代码]
end
subgraph "REST Client 层"
RestClient[RestClient<br/>REST 客户端]
Builder[RestClientBuilder<br/>构建器]
Request[Request<br/>请求对象]
Response[Response<br/>响应对象]
end
subgraph "连接管理层"
NodeSelector[NodeSelector<br/>节点选择器]
NodeTuple[NodeTuple<br/>节点列表]
BlackList[DeadHostState<br/>失败节点管理]
end
subgraph "HTTP 层"
HttpClient[CloseableHttpAsyncClient<br/>Apache HttpClient]
HttpRequest[HttpRequest<br/>HTTP 请求]
HttpResponse[HttpResponse<br/>HTTP 响应]
end
subgraph "Elasticsearch Cluster"
Node1[Node 1<br/>:9200]
Node2[Node 2<br/>:9200]
Node3[Node 3<br/>:9200]
end
User --> RestClient
User --> Builder
Builder --> RestClient
RestClient --> Request
RestClient --> Response
RestClient --> NodeSelector
RestClient --> NodeTuple
RestClient --> BlackList
RestClient --> HttpClient
HttpClient --> HttpRequest
HttpClient --> HttpResponse
HttpClient --> Node1
HttpClient --> Node2
HttpClient --> Node3
2.2 架构说明
REST Client层
RestClient:
- 核心客户端类
- 管理请求生命周期
- 协调所有组件
RestClientBuilder:
- 建造者模式
- 配置客户端参数
- 创建RestClient实例
Request/Response:
- 封装请求和响应
- 简化API使用
连接管理层
NodeSelector:
- 节点选择策略
- 支持自定义选择器
- 默认轮询策略
NodeTuple:
- 不可变节点列表
- 线程安全
- 支持动态更新
DeadHostState:
- 管理失败节点
- 失败时间记录
- 重试时间计算
3. 核心流程与调用链路分析
3.1 请求发送整体流程
flowchart TB
Start[开始: performRequest] --> SelectNode[选择目标节点<br/>round-robin]
SelectNode --> CheckDead[检查节点状态]
CheckDead --> |死节点| Retry{是否可以重试?}
CheckDead --> |活节点| BuildHTTP[构建 HTTP 请求]
Retry --> |是| SelectNode
Retry --> |否| Fail[抛出 IOException]
BuildHTTP --> SendHTTP[发送 HTTP 请求]
SendHTTP --> WaitResp[等待响应]
WaitResp --> |成功| CheckStatus[检查 HTTP 状态码]
WaitResp --> |失败| MarkDead[标记节点为死]
MarkDead --> Retry
CheckStatus --> |2xx/3xx| Success[返回 Response]
CheckStatus --> |4xx/5xx| CheckRetry{是否可重试?}
CheckRetry --> |是| SelectNode
CheckRetry --> |否| ThrowException[抛出 ResponseException]
流程说明
阶段1: 节点选择(SelectNode)
- 从 NodeTuple 中获取节点列表
- 通过轮询算法选择下一个节点
- NodeSelector 过滤不符合条件的节点
- 检查节点是否在黑名单中
阶段2: 请求构建(BuildHTTP)
- 创建 InternalRequest 包装用户请求
- 构建 URI (pathPrefix + endpoint + parameters)
- 设置 HTTP 方法、请求头、请求体
- 创建 RequestContext 上下文
阶段3: 请求发送(SendHTTP)
- 通过 Apache HttpAsyncClient 发送请求
- 同步请求内部使用异步实现并阻塞等待
- 异步请求立即返回 Cancellable
阶段4: 响应处理(CheckStatus)
- 解析 HTTP 状态码
- 处理 Gzip 压缩响应
- 2xx/3xx: 标记节点存活,返回响应
- 502/503/504: 标记节点死亡,重试其他节点
- 其他 4xx/5xx: 标记节点存活,抛出异常(请求问题)
阶段5: 异常处理(MarkDead)
- 网络异常: 标记节点死亡,重试
- 连接超时: 标记节点死亡,重试
- 所有节点失败: 抛出最终异常
3.2 核心调用链路分析
3.2.1 同步请求调用链路
用户代码
└─> RestClient.performRequest(Request)
├─> new InternalRequest(request) // 包装请求
│ ├─> buildUri(pathPrefix, endpoint, params) // 构建 URI
│ ├─> createHttpRequest(method, uri, entity) // 创建 HTTP 请求
│ └─> setHeaders(httpRequest, headers) // 设置请求头
│
├─> nextNodes() // 选择节点
│ ├─> selectNodes(nodeTuple, blacklist, ...) // 节点选择算法
│ │ ├─> 分离存活节点和死亡节点
│ │ ├─> NodeSelector.select(livingNodes) // 节点过滤
│ │ ├─> Collections.rotate(nodes, index) // 轮询调度
│ │ └─> return selectedNodes.iterator()
│ │
│ └─> new NodeTuple(iterator, authCache)
│
└─> performRequest(tuple, internalRequest, null) // 递归重试
├─> request.createContextForNextAttempt(node) // 创建请求上下文
│ ├─> httpRequest.reset() // 重置 HTTP 请求
│ ├─> HttpAsyncMethods.create(host, req) // 创建请求生产者
│ └─> new HttpClientContext() // 创建 HTTP 上下文
│
├─> client.execute(...).get() // 同步执行(阻塞等待)
│ └─> Apache HttpAsyncClient
│ ├─> 建立 TCP 连接
│ ├─> 发送 HTTP 请求
│ ├─> 接收 HTTP 响应
│ └─> 返回 HttpResponse
│
├─> convertResponse(request, node, response) // 转换响应
│ ├─> 检查状态码
│ ├─> 处理 Gzip 解压
│ ├─> new Response(requestLine, host, resp)
│ │
│ ├─> if (statusCode < 300) { // 成功响应
│ │ ├─> onResponse(node) // 从黑名单移除
│ │ └─> return response
│ │ }
│ │
│ ├─> if (statusCode == 502/503/504) { // 可重试错误
│ │ ├─> onFailure(node) // 标记节点死亡
│ │ └─> return responseException // 返回异常,触发重试
│ │ }
│ │
│ └─> else { // 其他错误
│ ├─> onResponse(node) // 保持节点存活
│ └─> throw responseException // 抛出异常,不重试
│ }
│
├─> if (responseException != null && tuple.hasNext()) {
│ └─> return performRequest(tuple, request, exception) // 递归重试
│ }
│
└─> return response or throw exception
关键代码片段:
// 同步请求入口
public Response performRequest(Request request) throws IOException {
InternalRequest internalRequest = new InternalRequest(request);
return performRequest(nextNodes(), internalRequest, null);
}
// 递归重试的核心方法
private Response performRequest(
final NodeTuple<Iterator<Node>> tuple,
final InternalRequest request,
Exception previousException
) throws IOException {
// 1. 创建请求上下文(选择下一个节点)
RequestContext context = request.createContextForNextAttempt(
tuple.nodes.next(),
tuple.authCache
);
HttpResponse httpResponse;
try {
// 2. 同步执行 HTTP 请求(内部是异步+阻塞)
httpResponse = client.execute(
context.requestProducer,
context.asyncResponseConsumer,
context.context,
null
).get();
} catch (Exception e) {
// 3. 网络异常处理
RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e);
onFailure(context.node); // 标记节点死亡
Exception cause = extractAndWrapCause(e);
addSuppressedException(previousException, cause);
// 4. 如果可以重试且有下一个节点,递归调用
if (isRetryableException(e) && tuple.nodes.hasNext()) {
return performRequest(tuple, request, cause);
}
// 5. 无法重试,抛出异常
if (cause instanceof IOException) {
throw (IOException) cause;
}
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new IllegalStateException(
"unexpected exception type: must be either RuntimeException or IOException",
cause
);
}
// 6. 转换响应
ResponseOrResponseException result = convertResponse(request, context.node, httpResponse);
if (result.responseException == null) {
// 成功,返回响应
return result.response;
}
// 7. HTTP 错误响应,检查是否可以重试
addSuppressedException(previousException, result.responseException);
if (tuple.nodes.hasNext()) {
// 有下一个节点,递归重试
return performRequest(tuple, request, result.responseException);
}
// 所有节点都失败,抛出最终异常
throw result.responseException;
}
3.2.2 节点选择算法详细分析
// 节点选择算法
static Iterable<Node> selectNodes(
NodeTuple<List<Node>> nodeTuple,
Map<HttpHost, DeadHostState> blacklist,
AtomicInteger lastNodeIndex,
NodeSelector nodeSelector
) throws IOException {
// 步骤1: 将节点分为存活节点和死亡节点
List<Node> livingNodes = new ArrayList<>(
Math.max(0, nodeTuple.nodes.size() - blacklist.size())
);
List<DeadNode> deadNodes = new ArrayList<>(blacklist.size());
for (Node node : nodeTuple.nodes) {
DeadHostState deadness = blacklist.get(node.getHost());
// 节点不在黑名单,或者已到重试时间
if (deadness == null || deadness.shallBeRetried()) {
livingNodes.add(node);
} else {
deadNodes.add(new DeadNode(node, deadness));
}
}
// 步骤2: 优先使用存活节点
if (false == livingNodes.isEmpty()) {
// 应用 NodeSelector 过滤节点
List<Node> selectedLivingNodes = new ArrayList<>(livingNodes);
nodeSelector.select(selectedLivingNodes);
if (false == selectedLivingNodes.isEmpty()) {
/*
* 步骤3: 轮询调度
* Collections.rotate() 实现负载均衡:
* - distance = lastNodeIndex.getAndIncrement()
* - 每次请求,distance 增加1
* - 导致节点列表循环偏移,实现轮询
*/
Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement());
return selectedLivingNodes;
}
}
// 步骤4: 没有可用的存活节点,尝试复活死亡节点
if (false == deadNodes.isEmpty()) {
List<DeadNode> selectedDeadNodes = new ArrayList<>(deadNodes);
// 应用 NodeSelector 过滤死亡节点
nodeSelector.select(() -> new DeadNodeIteratorAdapter(selectedDeadNodes.iterator()));
if (false == selectedDeadNodes.isEmpty()) {
// 选择死亡时间最短的节点(最快到重试时间)
return singletonList(Collections.min(selectedDeadNodes).node);
}
}
// 步骤5: 无可用节点,抛出异常
throw new IOException(
"NodeSelector [" + nodeSelector + "] rejected all nodes, " +
"living " + livingNodes + " and dead " + deadNodes
);
}
轮询算法图解:
初始节点列表: [Node0, Node1, Node2]
lastNodeIndex = 0
请求1:
- index = 0
- rotate(list, 0) -> [Node0, Node1, Node2]
- 选择 Node0
请求2:
- index = 1
- rotate(list, 1) -> [Node2, Node0, Node1]
- 选择 Node2
请求3:
- index = 2
- rotate(list, 2) -> [Node1, Node2, Node0]
- 选择 Node1
请求4:
- index = 3 (3 % 3 = 0)
- rotate(list, 3) -> [Node0, Node1, Node2]
- 选择 Node0
3.2.3 响应转换与状态码判断
private ResponseOrResponseException convertResponse(
InternalRequest request,
Node node,
HttpResponse httpResponse
) throws IOException {
RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse);
int statusCode = httpResponse.getStatusLine().getStatusCode();
// 处理 Gzip 压缩响应
HttpEntity entity = httpResponse.getEntity();
if (entity != null) {
Header header = entity.getContentEncoding();
if (header != null && "gzip".equals(header.getValue())) {
httpResponse.setEntity(new GzipDecompressingEntity(entity));
httpResponse.removeHeaders(HTTP.CONTENT_ENCODING);
httpResponse.removeHeaders(HTTP.CONTENT_LEN);
}
}
Response response = new Response(
request.httpRequest.getRequestLine(),
node.getHost(),
httpResponse
);
// 判断1: 成功响应 (2xx, 3xx)
if (isSuccessfulResponse(statusCode) ||
request.ignoreErrorCodes.contains(statusCode)) {
onResponse(node); // 从黑名单移除节点
// 检查警告是否应该失败
if (request.warningsHandler.warningsShouldFailRequest(response.getWarnings())) {
throw new WarningFailureException(response);
}
return new ResponseOrResponseException(response);
}
// 判断2: 可重试的错误 (502, 503, 504)
ResponseException responseException = new ResponseException(response);
if (isRetryStatus(statusCode)) {
onFailure(node); // 标记节点死亡
return new ResponseOrResponseException(responseException);
}
// 判断3: 不可重试的错误 (400, 401, 404等)
// 这些错误是请求问题,不是节点问题,保持节点存活
onResponse(node);
throw responseException;
}
// 成功响应: statusCode < 300
private static boolean isSuccessfulResponse(int statusCode) {
return statusCode < 300;
}
// 可重试状态码: 502, 503, 504
private static boolean isRetryStatus(int statusCode) {
switch (statusCode) {
case 502: // Bad Gateway
case 503: // Service Unavailable
case 504: // Gateway Timeout
return true;
}
return false;
}
状态码处理策略:
| 状态码范围 | 处理策略 | 节点状态 | 是否重试 |
|---|---|---|---|
| 2xx (成功) | 返回响应 | 标记存活 | 否 |
| 3xx (重定向) | 返回响应 | 标记存活 | 否 |
| 400 (Bad Request) | 抛出异常 | 标记存活 | 否 |
| 401 (Unauthorized) | 抛出异常 | 标记存活 | 否 |
| 403 (Forbidden) | 抛出异常 | 标记存活 | 否 |
| 404 (Not Found) | 抛出异常 | 标记存活 | 否 |
| 429 (Too Many Requests) | 抛出异常 | 标记存活 | 否 |
| 502 (Bad Gateway) | 重试 | 标记死亡 | 是 |
| 503 (Service Unavailable) | 重试 | 标记死亡 | 是 |
| 504 (Gateway Timeout) | 重试 | 标记死亡 | 是 |
| 网络异常 | 重试 | 标记死亡 | 是 |
| 连接超时 | 重试 | 标记死亡 | 是 |
| Socket 超时 | 重试 | 标记死亡 | 是 |
3.2.4 节点失败与恢复机制
// 请求成功,从黑名单移除节点
private void onResponse(Node node) {
DeadHostState removedHost = this.blacklist.remove(node.getHost());
if (logger.isDebugEnabled() && removedHost != null) {
logger.debug("removed [" + node + "] from blacklist");
}
}
// 请求失败,标记节点死亡
private void onFailure(Node node) {
while (true) {
// CAS 操作,确保线程安全
DeadHostState previousDeadHostState = blacklist.putIfAbsent(
node.getHost(),
new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER)
);
if (previousDeadHostState == null) {
// 首次失败,创建新的 DeadHostState
if (logger.isDebugEnabled()) {
logger.debug("added [" + node + "] to blacklist");
}
break;
}
// 已在黑名单,更新失败次数和重试时间
if (blacklist.replace(
node.getHost(),
previousDeadHostState,
new DeadHostState(previousDeadHostState)
)) {
if (logger.isDebugEnabled()) {
logger.debug("updated [" + node + "] already in blacklist");
}
break;
}
}
// 通知失败监听器
failureListener.onFailure(node);
}
DeadHostState 实现:
final class DeadHostState implements Comparable<DeadHostState> {
private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1);
static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30);
private final int failedAttempts;
private final long deadUntilNanos;
private final Supplier<Long> timeSupplier;
// 构造函数1: 首次失败
DeadHostState(Supplier<Long> timeSupplier) {
this.failedAttempts = 1;
this.deadUntilNanos = timeSupplier.get() + MIN_CONNECTION_TIMEOUT_NANOS;
this.timeSupplier = timeSupplier;
}
// 构造函数2: 基于之前的失败状态
DeadHostState(DeadHostState previousDeadHostState) {
// 指数退避算法
long timeoutNanos = (long) Math.min(
MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1),
MAX_CONNECTION_TIMEOUT_NANOS
);
this.deadUntilNanos = previousDeadHostState.timeSupplier.get() + timeoutNanos;
this.failedAttempts = previousDeadHostState.failedAttempts + 1;
this.timeSupplier = previousDeadHostState.timeSupplier;
}
// 检查是否到重试时间
boolean shallBeRetried() {
return timeSupplier.get() - deadUntilNanos > 0;
}
long getDeadUntilNanos() {
return deadUntilNanos;
}
int getFailedAttempts() {
return failedAttempts;
}
}
指数退避时间表:
| 失败次数 | 计算公式 | 重试超时 |
|---|---|---|
| 1 | 60s × 2 × 2^(0.5×1-1) = 60s | 60 秒 |
| 2 | 60s × 2 × 2^(0.5×2-1) = 120s | 84 秒 |
| 3 | 60s × 2 × 2^(0.5×3-1) = 240s | 120 秒 |
| 4 | 60s × 2 × 2^(0.5×4-1) = 480s | 169 秒 |
| 5 | 60s × 2 × 2^(0.5×5-1) = 960s | 240 秒 |
| 6 | 60s × 2 × 2^(0.5×6-1) = 1920s | 339 秒 |
| 7 | 60s × 2 × 2^(0.5×7-1) = … | 480 秒 |
| 8+ | min(calculation, 1800s) | 最大 1800 秒 (30分钟) |
3.3 失败节点管理流程
flowchart TB
RequestFail[请求失败] --> MarkDead[标记节点为死]
MarkDead --> RecordTime[记录失败时间]
RecordTime --> CalcTimeout[计算重试超时<br/>min: 1分钟, max: 30分钟]
CalcTimeout --> Wait[等待超时]
Wait --> CheckTime{超时到期?}
CheckTime --> |否| Wait
CheckTime --> |是| Resurrect[复活节点]
Resurrect --> NextRequest[下次请求尝试]
4. 核心API
4.1 RestClient API
4.1.1 performRequest - 同步请求
方法签名
public Response performRequest(Request request) throws IOException
参数说明
| 参数 | 类型 | 说明 |
|---|---|---|
| request | Request | 请求对象 |
返回值
public class Response {
public StatusLine getStatusLine(); // HTTP状态行
public HttpEntity getEntity(); // 响应体
public Header[] getHeaders(); // 响应头
public HttpHost getHost(); // 处理请求的节点
}
使用示例
Request request = new Request("GET", "/_cat/indices");
request.addParameter("format", "json");
try {
Response response = restClient.performRequest(request);
int status = response.getStatusLine().getStatusCode();
String body = EntityUtils.toString(response.getEntity());
System.out.println("Status: " + status);
System.out.println("Response: " + body);
} catch (ResponseException e) {
// HTTP错误响应
Response response = e.getResponse();
int statusCode = response.getStatusLine().getStatusCode();
} catch (IOException e) {
// 网络错误
}
4.1.2 performRequestAsync - 异步请求
方法签名
public Cancellable performRequestAsync(
Request request,
ResponseListener responseListener
)
参数说明
| 参数 | 类型 | 说明 |
|---|---|---|
| request | Request | 请求对象 |
| responseListener | ResponseListener | 响应监听器 |
ResponseListener接口
public interface ResponseListener {
void onSuccess(Response response);
void onFailure(Exception exception);
}
使用示例
Request request = new Request("GET", "/_cluster/health");
Cancellable cancellable = restClient.performRequestAsync(
request,
new ResponseListener() {
@Override
public void onSuccess(Response response) {
try {
String body = EntityUtils.toString(response.getEntity());
System.out.println("Response: " + body);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(Exception exception) {
System.err.println("Request failed: " + exception.getMessage());
}
}
);
// 可选: 取消请求
// cancellable.cancel();
4.1.3 setNodes - 更新节点列表
方法签名
public synchronized void setNodes(Collection<Node> nodes)
参数说明
| 参数 | 类型 | 说明 |
|---|---|---|
| nodes | Collection |
新的节点列表 |
使用示例
// 动态更新节点列表
List<Node> newNodes = Arrays.asList(
new Node(new HttpHost("localhost", 9200)),
new Node(new HttpHost("localhost", 9201)),
new Node(new HttpHost("localhost", 9202))
);
restClient.setNodes(newNodes);
4.1.4 close - 关闭客户端
方法签名
@Override
public void close() throws IOException
使用示例
try (RestClient restClient = RestClient.builder(...).build()) {
// 使用客户端
restClient.performRequest(request);
} // 自动关闭
4.2 Request API
4.2.1 Request构造
方法签名
public Request(String method, String endpoint)
参数说明
| 参数 | 类型 | 说明 |
|---|---|---|
| method | String | HTTP方法 (GET, POST, PUT, DELETE, HEAD) |
| endpoint | String | API端点 (如 “/_search”) |
4.2.2 addParameter - 添加查询参数
方法签名
public void addParameter(String name, String value)
使用示例
Request request = new Request("GET", "/_search");
request.addParameter("size", "10");
request.addParameter("from", "0");
request.addParameter("timeout", "5s");
4.2.3 setEntity - 设置请求体
方法签名
public void setEntity(HttpEntity entity)
使用示例
// 方式1: 使用NStringEntity
Request request = new Request("POST", "/my-index/_doc");
request.setEntity(new NStringEntity(
"{\"field\":\"value\"}",
ContentType.APPLICATION_JSON
));
// 方式2: 使用setJsonEntity (更简洁)
request.setJsonEntity("{\"field\":\"value\"}");
// 方式3: 使用ByteArrayEntity (二进制数据)
byte[] data = ...;
request.setEntity(new ByteArrayEntity(data));
4.2.4 setOptions - 设置请求选项
方法签名
public void setOptions(RequestOptions options)
使用示例
// 创建通用选项
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
builder.addHeader("Authorization", "Bearer token");
builder.setWarningsHandler(WarningsHandler.PERMISSIVE);
RequestOptions commonOptions = builder.build();
// 应用到请求
Request request = new Request("GET", "/_search");
request.setOptions(commonOptions);
// 自定义单个请求的选项
RequestOptions.Builder customBuilder = commonOptions.toBuilder();
customBuilder.addHeader("X-Custom-Header", "value");
request.setOptions(customBuilder);
4.3 RestClientBuilder API
4.3.1 setDefaultHeaders - 设置默认请求头
public RestClientBuilder setDefaultHeaders(Header[] defaultHeaders)
使用示例
Header[] headers = new Header[]{
new BasicHeader("Authorization", "Bearer token"),
new BasicHeader("X-Application", "my-app")
};
RestClient restClient = RestClient.builder(...)
.setDefaultHeaders(headers)
.build();
4.3.2 setNodeSelector - 设置节点选择器
public RestClientBuilder setNodeSelector(NodeSelector nodeSelector)
使用示例
RestClient restClient = RestClient.builder(...)
.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS)
.build();
4.3.3 setFailureListener - 设置失败监听器
public RestClientBuilder setFailureListener(FailureListener failureListener)
使用示例
RestClient restClient = RestClient.builder(...)
.setFailureListener(new FailureListener() {
@Override
public void onFailure(Node node) {
System.err.println("Node failed: " + node.getHost());
}
})
.build();
4.3.4 setRequestConfigCallback - 配置请求
public RestClientBuilder setRequestConfigCallback(
RequestConfigCallback requestConfigCallback
)
使用示例
RestClient restClient = RestClient.builder(...)
.setRequestConfigCallback(requestConfigBuilder -> {
return requestConfigBuilder
.setConnectTimeout(5000) // 连接超时
.setSocketTimeout(60000) // Socket超时
.setConnectionRequestTimeout(1000); // 从连接池获取连接超时
})
.build();
4.3.5 setHttpClientConfigCallback - 配置HTTP客户端
public RestClientBuilder setHttpClientConfigCallback(
HttpClientConfigCallback httpClientConfigCallback
)
使用示例
RestClient restClient = RestClient.builder(...)
.setHttpClientConfigCallback(httpClientBuilder -> {
return httpClientBuilder
.setMaxConnTotal(100) // 最大连接数
.setMaxConnPerRoute(20) // 每路由最大连接数
.setKeepAliveStrategy((response, context) -> 60000) // Keep-Alive
.setDefaultIOReactorConfig(
IOReactorConfig.custom()
.setIoThreadCount(Runtime.getRuntime().availableProcessors())
.build()
);
})
.build();
4.4 Node API
Node构造
public Node(HttpHost host)
public Node(HttpHost host, Set<HttpHost> boundHosts, String name, String version, Roles roles)
使用示例
// 基本构造
Node node = new Node(new HttpHost("localhost", 9200, "http"));
// 完整构造
Node node = new Node(
new HttpHost("localhost", 9200, "http"),
Collections.singleton(new HttpHost("127.0.0.1", 9200, "http")),
"node-1",
"8.10.0",
new Roles(true, true, true) // master, data, ingest
);
4.5 RequestOptions API
RequestOptions.Builder
public static class Builder {
public Builder addHeader(String name, String value);
public Builder setWarningsHandler(WarningsHandler warningsHandler);
public Builder setHttpAsyncResponseConsumerFactory(
HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory
);
public RequestOptions build();
}
使用示例
// 创建自定义选项
RequestOptions options = RequestOptions.DEFAULT.toBuilder()
.addHeader("X-Elastic-Product", "Elasticsearch")
.setWarningsHandler(WarningsHandler.PERMISSIVE)
.setHttpAsyncResponseConsumerFactory(
new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(
100 * 1024 * 1024 // 100 MB缓冲
)
)
.build();
4.6 完整示例
创建索引
Request request = new Request("PUT", "/my-index");
request.setJsonEntity(
"{"
+ "\"settings\": {"
+ " \"number_of_shards\": 1,"
+ " \"number_of_replicas\": 1"
+ "},"
+ "\"mappings\": {"
+ " \"properties\": {"
+ " \"field\": {\"type\": \"text\"}"
+ " }"
+ "}"
+ "}"
);
Response response = restClient.performRequest(request);
索引文档
Request request = new Request("POST", "/my-index/_doc");
request.addParameter("refresh", "true");
request.setJsonEntity("{\"field\":\"value\"}");
Response response = restClient.performRequest(request);
搜索
Request request = new Request("GET", "/my-index/_search");
request.setJsonEntity(
"{"
+ "\"query\": {"
+ " \"match\": {\"field\": \"value\"}"
+ "},"
+ "\"size\": 10"
+ "}"
);
Response response = restClient.performRequest(request);
String body = EntityUtils.toString(response.getEntity());
批量操作
Request request = new Request("POST", "/_bulk");
request.addParameter("refresh", "true");
StringBuilder bulkBody = new StringBuilder();
bulkBody.append("{\"index\":{\"_index\":\"my-index\"}}\n");
bulkBody.append("{\"field\":\"value1\"}\n");
bulkBody.append("{\"index\":{\"_index\":\"my-index\"}}\n");
bulkBody.append("{\"field\":\"value2\"}\n");
request.setJsonEntity(bulkBody.toString());
Response response = restClient.performRequest(request);
5. 核心数据结构
5.1 RestClient
类图
classDiagram
class RestClient {
-CloseableHttpAsyncClient client
-List~Header~ defaultHeaders
-String pathPrefix
-AtomicInteger lastNodeIndex
-ConcurrentMap~HttpHost, DeadHostState~ blacklist
-FailureListener failureListener
-NodeSelector nodeSelector
-NodeTuple nodeTuple
+performRequest(Request) Response
+performRequestAsync(Request, ResponseListener) Cancellable
+setNodes(Collection~Node~)
+close()
}
class Node {
-HttpHost host
-Set~HttpHost~ boundHosts
-String name
-String version
-Roles roles
+getHost() HttpHost
+getName() String
}
class NodeTuple {
-List~Node~ nodes
-AtomicInteger index
+next() Iterator~Node~
}
class DeadHostState {
-long deadUntilNanos
-int failedAttempts
+getDeadUntilNanos() long
+getFailedAttempts() int
}
RestClient --> Node
RestClient --> NodeTuple
RestClient --> DeadHostState
类说明
RestClient:
- 核心客户端类
- 管理连接和请求
- 实现负载均衡和重试
Key Fields:
| 字段 | 类型 | 说明 |
|---|---|---|
| client | CloseableHttpAsyncClient | Apache HttpClient异步客户端 |
| defaultHeaders | List |
默认请求头 |
| lastNodeIndex | AtomicInteger | 上次选择的节点索引(轮询) |
| blacklist | ConcurrentMap | 失败节点黑名单 |
| nodeTuple | NodeTuple | 不可变节点列表 |
5.2 Request & Response
类图
classDiagram
class Request {
-String method
-String endpoint
-Map~String, String~ parameters
-HttpEntity entity
-RequestOptions options
+Request(String, String)
+addParameter(String, String)
+setEntity(HttpEntity)
+setJsonEntity(String)
+setOptions(RequestOptions)
}
class Response {
-StatusLine statusLine
-HttpEntity entity
-HttpHost host
-Header[] headers
+getStatusLine() StatusLine
+getEntity() HttpEntity
+getHost() HttpHost
+getHeaders() Header[]
}
class RequestOptions {
-List~Header~ headers
-WarningsHandler warningsHandler
-HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory
+DEFAULT RequestOptions
}
类说明
Request:
- 封装HTTP请求
- 支持链式调用
- 不可变(设置后)
Response:
- 封装HTTP响应
- 提供访问器方法
- 包含处理请求的节点信息
RequestOptions:
- 请求级别的配置
- 可继承默认配置
- 支持自定义
5.3 Node & NodeSelector
类图
classDiagram
class Node {
-HttpHost host
-Set~HttpHost~ boundHosts
-String name
-String version
-Roles roles
-Map~String, List~String~~ attributes
+getHost() HttpHost
+getName() String
+getRoles() Roles
}
class Roles {
-boolean masterEligible
-boolean data
-boolean ingest
+isMasterEligible() boolean
+canContainData() boolean
+isIngest() boolean
}
class NodeSelector {
<<interface>>
+select(Iterable~Node~)
+ANY NodeSelector
+SKIP_DEDICATED_MASTERS NodeSelector
}
Node --> Roles
NodeSelector ..> Node
类说明
Node:
- 表示集群中的一个节点
- 包含节点元信息
- 用于节点选择
Roles:
- 节点角色信息
- 主节点、数据节点、Ingest节点等
NodeSelector:
- 节点选择策略接口
- 过滤可用节点
- 支持自定义选择器
5.4 DeadHostState
类图
classDiagram
class DeadHostState {
-long deadUntilNanos
-int failedAttempts
-TimeSupplier timeSupplier
+DeadHostState(TimeSupplier)
+getDeadUntilNanos() long
+getFailedAttempts() int
+markDead(long)
+shallBeRetried()
}
class TimeSupplier {
<<interface>>
+nanoTime() long
}
DeadHostState --> TimeSupplier
类说明
DeadHostState:
- 管理失败节点状态
- 计算重试时间
- 实现指数退避
重试超时计算:
// 重试超时 = min(60秒 * 2^(失败次数-1), 30分钟)
long timeoutMinutes = Math.min(30, (long) Math.pow(2, failedAttempts - 1));
long deadUntil = currentTime + timeoutMinutes * 60 * 1_000_000_000L;
示例:
| 失败次数 | 重试超时 |
|---|---|
| 1 | 1分钟 |
| 2 | 2分钟 |
| 3 | 4分钟 |
| 4 | 8分钟 |
| 5 | 16分钟 |
| 6+ | 30分钟(最大值) |
5.5 NodeTuple
类图
classDiagram
class NodeTuple~T~ {
-List~Node~ all
-T data
-AtomicInteger index
+NodeTuple(List~Node~, T)
+next() T
+hasNext() boolean
}
类说明
NodeTuple:
- 不可变节点列表封装
- 支持轮询迭代
- 线程安全
使用场景:
- 存储当前可用节点
- 支持动态更新节点列表
- 实现轮询负载均衡
5.6 ResponseListener
类图
classDiagram
class ResponseListener {
<<interface>>
+onSuccess(Response)
+onFailure(Exception)
}
class ActionListener {
<<interface>>
+onResponse(Response)
+onFailure(Exception)
}
ResponseListener <|.. ActionListener
类说明
ResponseListener:
- 异步请求回调接口
- 处理成功和失败情况
典型实现:
ResponseListener listener = new ResponseListener() {
@Override
public void onSuccess(Response response) {
// 处理响应
try {
String body = EntityUtils.toString(response.getEntity());
// 业务逻辑
} catch (IOException e) {
// 错误处理
}
}
@Override
public void onFailure(Exception exception) {
// 处理异常
if (exception instanceof ResponseException) {
Response response = ((ResponseException) exception).getResponse();
int statusCode = response.getStatusLine().getStatusCode();
}
}
};
5.7 FailureListener
类图
classDiagram
class FailureListener {
<<interface>>
+onFailure(Node)
}
类说明
FailureListener:
- 节点失败监听器
- 用于监控和告警
典型实现:
FailureListener listener = new FailureListener() {
@Override
public void onFailure(Node node) {
System.err.println("Node failed: " + node.getHost());
// 发送告警
// 记录日志
// 更新监控指标
}
};
5.8 RequestConfigCallback & HttpClientConfigCallback
类图
classDiagram
class RequestConfigCallback {
<<interface>>
+customizeRequestConfig(Builder) Builder
}
class HttpClientConfigCallback {
<<interface>>
+customizeHttpClient(HttpAsyncClientBuilder) HttpAsyncClientBuilder
}
类说明
RequestConfigCallback:
- 自定义请求配置
- 设置超时、重试等
HttpClientConfigCallback:
- 自定义HTTP客户端
- 配置连接池、SSL等
示例:
// RequestConfigCallback
RequestConfigCallback requestConfig = requestConfigBuilder -> {
return requestConfigBuilder
.setConnectTimeout(5000)
.setSocketTimeout(60000);
};
// HttpClientConfigCallback
HttpClientConfigCallback httpClientConfig = httpClientBuilder -> {
return httpClientBuilder
.setMaxConnTotal(100)
.setMaxConnPerRoute(20)
.setSSLContext(sslContext)
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
};
5.9 数据结构关系总览
graph TB
RC[RestClient] --> Client[CloseableHttpAsyncClient<br/>Apache HttpClient]
RC --> NT[NodeTuple<br/>节点列表]
RC --> BL[Blacklist<br/>失败节点黑名单]
NT --> Node[Node<br/>节点信息]
Node --> Roles[Roles<br/>节点角色]
BL --> DHS[DeadHostState<br/>失败状态]
RC --> Req[Request<br/>请求对象]
RC --> Resp[Response<br/>响应对象]
Req --> ReqOpt[RequestOptions<br/>请求选项]
RC --> NS[NodeSelector<br/>节点选择器]
RC --> FL[FailureListener<br/>失败监听器]
RC --> RL[ResponseListener<br/>响应监听器]
style RC fill:#e1f5ff
style Client fill:#ffe1e1
style NT fill:#e1ffe1
6. 详细时序图与模块交互
6.1 模块交互总览图
graph TB
subgraph "用户应用层"
App[应用代码]
end
subgraph "Client 模块"
RC[RestClient]
Req[Request]
Resp[Response]
Builder[RestClientBuilder]
end
subgraph "连接管理"
NT[NodeTuple<br/>节点列表]
NS[NodeSelector<br/>节点选择器]
BL[Blacklist<br/>失败节点黑名单]
DHS[DeadHostState<br/>失败状态]
end
subgraph "请求处理"
IR[InternalRequest<br/>内部请求]
RCtx[RequestContext<br/>请求上下文]
Conv[ResponseConverter<br/>响应转换]
end
subgraph "Apache HttpClient"
HAC[HttpAsyncClient]
HP[HttpAsyncRequestProducer]
HC[HttpAsyncResponseConsumer]
end
subgraph "Elasticsearch 集群"
N1[Node 1]
N2[Node 2]
N3[Node 3]
end
App -->|1. 创建请求| Req
App -->|2. 发送请求| RC
RC -->|3. 包装请求| IR
RC -->|4. 选择节点| NT
NT -->|5. 应用过滤| NS
NT -->|6. 检查黑名单| BL
BL -->|7. 获取状态| DHS
RC -->|8. 创建上下文| RCtx
RCtx -->|9. 创建生产者| HP
RCtx -->|10. 创建消费者| HC
RC -->|11. 执行请求| HAC
HAC -->|12. 发送 HTTP| N1
HAC -->|12. 发送 HTTP| N2
HAC -->|12. 发送 HTTP| N3
HAC -->|13. 返回响应| RC
RC -->|14. 转换响应| Conv
Conv -->|15. 更新状态| BL
RC -->|16. 返回结果| Resp
Resp -->|17. 返回应用| App
style RC fill:#e1f5ff
style BL fill:#ffe1e1
style HAC fill:#e1ffe1
模块交互说明
1. 请求创建阶段 (步骤1-3)
- 用户创建
Request对象,设置方法、端点、参数、请求体 - 调用
RestClient.performRequest(request) - RestClient 将 Request 包装为 InternalRequest
2. 节点选择阶段 (步骤4-7)
- 从 NodeTuple 获取节点列表
- NodeSelector 过滤不符合条件的节点
- 检查黑名单,排除失败节点
- DeadHostState 提供节点失败状态和重试时间
3. 请求准备阶段 (步骤8-10)
- 创建 RequestContext 上下文
- 创建 HttpAsyncRequestProducer (请求生产者)
- 创建 HttpAsyncResponseConsumer (响应消费者)
4. 请求执行阶段 (步骤11-13)
- 通过 HttpAsyncClient 发送 HTTP 请求
- 建立 TCP 连接,发送请求到 Elasticsearch 节点
- 接收 HTTP 响应
5. 响应处理阶段 (步骤14-17)
- 转换 HTTP 响应为 Response 对象
- 根据状态码更新节点黑名单状态
- 返回响应给用户代码
6.2 同步请求完整流程
同步请求时序图
sequenceDiagram
autonumber
participant User as 用户代码
participant RC as RestClient
participant IR as InternalRequest
participant NT as NodeTuple
participant NS as NodeSelector
participant BL as Blacklist<br/>(ConcurrentMap)
participant RCtx as RequestContext
participant HTTP as HttpAsyncClient
participant ES as Elasticsearch Node
User->>RC: performRequest(request)
Note over RC: 阶段1: 请求包装
RC->>IR: new InternalRequest(request)
IR->>IR: buildUri(pathPrefix, endpoint, params)
IR->>IR: createHttpRequest(method, uri, entity)
IR->>IR: setHeaders(defaultHeaders + requestHeaders)
IR-->>RC: internalRequest
Note over RC,ES: 阶段2: 节点选择
RC->>RC: nextNodes()
RC->>NT: 获取节点列表
NT-->>RC: nodes = [Node1, Node2, Node3]
RC->>BL: 检查黑名单
loop 遍历节点
BL->>BL: deadness = blacklist.get(node)
alt 节点不在黑名单 or 已到重试时间
BL-->>RC: 加入 livingNodes
else 节点在黑名单且未到重试时间
BL-->>RC: 加入 deadNodes
end
end
RC->>NS: select(livingNodes)
NS->>NS: 过滤节点(如跳过专用主节点)
NS-->>RC: selectedNodes
RC->>RC: Collections.rotate(nodes, lastNodeIndex++)
RC-->>RC: nodeIterator
Note over RC,ES: 阶段3: 创建请求上下文
RC->>IR: createContextForNextAttempt(node1)
IR->>IR: httpRequest.reset()
IR->>RCtx: new RequestContext(node1)
RCtx->>RCtx: 创建 HttpAsyncRequestProducer
RCtx->>RCtx: 创建 HttpAsyncResponseConsumer
RCtx->>RCtx: 创建 HttpClientContext
RCtx-->>IR: context
IR-->>RC: context
Note over RC,ES: 阶段4: 发送 HTTP 请求
RC->>HTTP: execute(producer, consumer, context, null).get()
Note over HTTP: 异步执行,同步等待
HTTP->>HTTP: 从连接池获取连接
HTTP->>ES: TCP 连接 + HTTP 请求
ES->>ES: 处理请求<br/>(索引/搜索/集群操作等)
ES-->>HTTP: HTTP 响应 + 状态码
HTTP-->>RC: HttpResponse
Note over RC,BL: 阶段5: 响应处理
RC->>RC: convertResponse(request, node1, httpResponse)
alt 响应体是 Gzip 压缩
RC->>RC: 解压缩 Gzip
RC->>RC: 移除 Content-Encoding 头
end
RC->>RC: new Response(requestLine, host, httpResponse)
RC->>RC: 检查 HTTP 状态码
alt statusCode < 300 (成功)
RC->>BL: onResponse(node1)
BL->>BL: blacklist.remove(node1.host)
Note over BL: 从黑名单移除节点
alt 有警告且配置为失败
RC-->>User: throw WarningFailureException
else 正常成功
RC-->>User: return Response
end
else statusCode = 502/503/504 (可重试)
RC->>BL: onFailure(node1)
BL->>BL: previousState = blacklist.get(node1.host)
alt 首次失败
BL->>BL: new DeadHostState()<br/>deadUntil = now + 1分钟
BL->>BL: blacklist.put(node1.host, state)
else 再次失败
BL->>BL: new DeadHostState(previousState)<br/>failedAttempts++<br/>deadUntil = now + 指数退避时间
BL->>BL: blacklist.replace(node1.host, newState)
end
Note over RC: 有其他节点,递归重试
RC->>RC: nextNodes() → 选择 node2
RC->>HTTP: execute(...) → node2
else statusCode = 400/401/403/404 (请求错误)
RC->>BL: onResponse(node1)
BL->>BL: blacklist.remove(node1.host)
Note over BL: 保持节点存活(错误是请求问题)
RC-->>User: throw ResponseException(response)
end
Note over RC,ES: 阶段6: 异常处理
alt 网络异常 (IOException, ConnectTimeout, SocketTimeout)
HTTP-->>RC: Exception
RC->>RC: RequestLogger.logFailedRequest()
RC->>BL: onFailure(node1)
BL->>BL: 标记节点死亡
alt 有其他节点可重试
RC->>RC: 递归调用 performRequest(tuple, request)
Note over RC: 选择下一个节点重试
else 所有节点都失败
RC-->>User: throw IOException<br/>(包含所有失败的 suppressed异常)
end
end
时序图功能详解
阶段1: 请求包装 (步骤1-6)
功能说明:
- URI 构建: 合并
pathPrefix+endpoint+parameters- 示例:
/+_search+?size=10→/_search?size=10
- 示例:
- HTTP 请求创建: 根据方法 (GET/POST/PUT/DELETE) 创建对应的 HttpRequest
- 请求头设置:
- 先添加请求级别的自定义头
- 再添加默认头(如果不冲突)
- 添加压缩头
Accept-Encoding: gzip - 添加元信息头
X-Elastic-Client-Meta
代码对应:
// InternalRequest 构造函数
URI uri = buildUri(pathPrefix, request.getEndpoint(), params);
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity(), compressionEnabled);
setHeaders(httpRequest, request.getOptions().getHeaders());
阶段2: 节点选择 (步骤7-17)
功能说明:
-
黑名单检查: 遍历所有节点,检查是否在黑名单中
- 不在黑名单 → 加入存活节点列表
- 在黑名单但已到重试时间 → 加入存活节点列表
- 在黑名单且未到重试时间 → 加入死亡节点列表
-
NodeSelector 过滤: 应用节点选择策略
NodeSelector.ANY: 不过滤,所有节点都可用NodeSelector.SKIP_DEDICATED_MASTERS: 跳过专用主节点- 自定义选择器: 根据节点属性(如区域、角色)过滤
-
轮询调度: 使用
Collections.rotate()实现负载均衡lastNodeIndex是全局计数器,每次请求递增- 导致节点列表循环偏移,实现轮询
代码对应:
// 节点选择算法
List<Node> livingNodes = new ArrayList<>();
List<DeadNode> deadNodes = new ArrayList<>();
for (Node node : nodeTuple.nodes) {
DeadHostState deadness = blacklist.get(node.getHost());
if (deadness == null || deadness.shallBeRetried()) {
livingNodes.add(node);
} else {
deadNodes.add(new DeadNode(node, deadness));
}
}
nodeSelector.select(selectedLivingNodes);
Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement());
阶段3: 创建请求上下文 (步骤18-25)
功能说明:
- 重置 HTTP 请求:
httpRequest.reset()清除之前请求的状态 - 创建请求生产者:
HttpAsyncRequestProducer负责生成 HTTP 请求- 支持流式传输请求体(对于大文件)
- 处理分块传输编码
- 创建响应消费者:
HttpAsyncResponseConsumer负责消费 HTTP 响应- 默认使用堆缓冲消费者
- 可自定义缓冲大小(通过 RequestOptions)
- 创建 HTTP 上下文:
HttpClientContext包含认证缓存等信息
代码对应:
RequestContext createContextForNextAttempt(Node node, AuthCache authCache) {
this.httpRequest.reset();
return new RequestContext(this, node, authCache);
}
RequestContext(InternalRequest request, Node node, AuthCache authCache) {
this.node = node;
this.requestProducer = HttpAsyncMethods.create(node.getHost(), request.httpRequest);
this.asyncResponseConsumer = request.request.getOptions()
.getHttpAsyncResponseConsumerFactory()
.createHttpAsyncResponseConsumer();
this.context = HttpClientContext.create();
context.setAuthCache(authCache);
}
阶段4: 发送 HTTP 请求 (步骤26-33)
功能说明:
-
异步执行+同步等待:
client.execute(...).get()- 内部使用 Apache HttpAsyncClient 异步发送请求
- 调用
.get()阻塞当前线程直到收到响应 - 这样设计是为了保留调用栈,方便调试
-
连接池管理:
- 从连接池获取到目标节点的连接
- 如果没有可用连接,创建新连接
- 连接复用,提高性能
-
TCP 连接 + HTTP 请求:
- 建立 TCP 连接 (或复用现有连接)
- 发送 HTTP 请求行、请求头、请求体
- 等待响应
-
Elasticsearch 处理:
- 节点接收请求
- 路由到相应的处理器 (索引/搜索/集群等)
- 生成响应
代码对应:
HttpResponse httpResponse = client.execute(
context.requestProducer,
context.asyncResponseConsumer,
context.context,
null // FutureCallback (同步调用时为 null)
).get(); // 阻塞等待响应
阶段5: 响应处理 (步骤34-61)
功能说明:
5.1 Gzip 解压 (步骤35-39):
- 检查
Content-Encoding: gzip - 如果是 Gzip 压缩,解压响应体
- 移除压缩相关的头
5.2 创建 Response 对象 (步骤40-41):
- 包装 HTTP 响应
- 保存请求行、目标主机信息
5.3 状态码判断 (步骤42):
(A) 成功响应 (步骤43-50):
statusCode < 300(2xx, 3xx)- 从黑名单移除节点
- 检查警告头,判断是否应该失败
- 返回 Response 对象
(B) 可重试错误 (步骤52-62):
statusCode = 502/503/504- 标记节点死亡:
- 首次失败: 创建 DeadHostState,deadUntil = now + 1分钟
- 再次失败: 基于之前状态创建新状态,failedAttempts++,使用指数退避
- 选择下一个节点
- 递归调用
performRequest()重试
(C) 请求错误 (步骤64-68):
statusCode = 400/401/403/404等- 保持节点存活 (错误是请求问题,不是节点问题)
- 抛出
ResponseException
代码对应:
private ResponseOrResponseException convertResponse(...) {
int statusCode = httpResponse.getStatusLine().getStatusCode();
// Gzip 解压
if (entity != null && "gzip".equals(entity.getContentEncoding().getValue())) {
httpResponse.setEntity(new GzipDecompressingEntity(entity));
}
Response response = new Response(...);
// 成功响应
if (isSuccessfulResponse(statusCode)) {
onResponse(node); // 从黑名单移除
return new ResponseOrResponseException(response);
}
// 可重试错误
ResponseException responseException = new ResponseException(response);
if (isRetryStatus(statusCode)) {
onFailure(node); // 标记死亡
return new ResponseOrResponseException(responseException);
}
// 请求错误
onResponse(node); // 保持存活
throw responseException;
}
阶段6: 异常处理 (步骤70-79)
功能说明:
-
网络异常捕获: IOException, ConnectTimeoutException, SocketTimeoutException
-
记录日志: RequestLogger.logFailedRequest()
-
标记节点死亡: onFailure(node)
- 使用 CAS 操作更新黑名单
- 创建或更新 DeadHostState
- 通知 FailureListener
-
重试逻辑:
- 如果有其他节点: 递归调用 performRequest(),选择下一个节点
- 如果所有节点都失败: 抛出最终异常,包含所有 suppressed 异常
-
异常包装:
- 保留原始异常类型和消息
- 保留调用栈,方便调试
代码对应:
try {
httpResponse = client.execute(...).get();
} catch (Exception e) {
RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e);
onFailure(context.node);
Exception cause = extractAndWrapCause(e);
addSuppressedException(previousException, cause);
if (isRetryableException(e) && tuple.nodes.hasNext()) {
return performRequest(tuple, request, cause); // 递归重试
}
throw cause;
}
时序图说明
关键步骤详解
1. 节点选择(步骤1-9):
- 从NodeTuple轮询选择节点
- 检查黑名单状态
- 跳过失败节点(未到重试时间)
- 选择可用节点
2. 构建请求(步骤10-11):
- 构造HTTP方法和端点
- 添加默认和自定义请求头
- 设置请求体
- 应用请求选项
3. 发送请求(步骤12-16):
- 通过Apache HttpClient发送
- 异步执行(performRequest内部等待)
- 网络传输
4. 处理响应(步骤17-26):
- 检查HTTP状态码
- 成功: 返回响应
- 可重试错误: 标记节点,重试
- 不可重试错误: 抛出异常
5. 异常处理(步骤27-30):
- 网络异常: 标记节点,重试
- 超时异常: 标记节点,重试
- 重试直到成功或所有节点失败
6.3 异步请求完整流程
异步请求时序图
sequenceDiagram
autonumber
participant User as 用户代码
participant RC as RestClient
participant IR as InternalRequest
participant FTL as FailureTrackingResponseListener
participant NT as NodeTuple
participant BL as Blacklist
participant HTTP as HttpAsyncClient
participant CB as FutureCallback
participant ES as Elasticsearch Node
participant UserListener as ResponseListener
User->>RC: performRequestAsync(request, listener)
Note over RC: 阶段1: 准备异步请求
RC->>FTL: new FailureTrackingResponseListener(listener)
FTL->>FTL: 包装用户监听器
RC->>IR: new InternalRequest(request)
IR->>IR: buildUri, createHttpRequest, setHeaders
IR-->>RC: internalRequest
RC->>RC: nextNodes()
Note over RC: 选择节点(同同步请求)
Note over RC,ES: 阶段2: 异步发送请求
RC->>IR: createContextForNextAttempt(node1)
IR-->>RC: context
RC->>CB: 创建 FutureCallback 匿名实现
RC->>HTTP: execute(producer, consumer, context, callback)
Note over HTTP: 异步执行,不阻塞
RC-->>User: return Cancellable (立即返回)
Note over User: 用户代码继续执行<br/>不阻塞等待响应
Note over HTTP,ES: 阶段3: 后台 HTTP 通信
HTTP->>HTTP: 从连接池获取连接(异步)
HTTP->>ES: 发送 HTTP 请求(非阻塞 I/O)
Note over ES: Elasticsearch 处理请求
ES-->>HTTP: HTTP 响应
Note over HTTP,CB: 阶段4A: 成功回调
HTTP->>CB: completed(httpResponse)
CB->>CB: convertResponse(request, node1, httpResponse)
alt statusCode < 300 (成功)
CB->>BL: onResponse(node1)
BL->>BL: blacklist.remove(node1.host)
CB->>FTL: onSuccess(response)
FTL->>UserListener: onSuccess(response)
UserListener->>User: 处理响应(在 HTTP 线程)
else statusCode = 502/503/504 (可重试)
CB->>BL: onFailure(node1)
BL->>BL: 标记节点死亡
alt 有其他节点
CB->>FTL: trackFailure(exception)
FTL->>FTL: 记录异常
CB->>RC: performRequestAsync(tuple, request, listener)
Note over RC: 递归调用,选择 node2
RC->>HTTP: execute(..., callback)
Note over HTTP: 向 node2 重试
else 所有节点都失败
CB->>FTL: onDefinitiveFailure(exception)
FTL->>FTL: 添加 suppressed 异常
FTL->>UserListener: onFailure(exception)
UserListener->>User: 处理错误
end
else statusCode = 400/401/403/404 (请求错误)
CB->>BL: onResponse(node1)
Note over BL: 保持节点存活
CB->>FTL: onDefinitiveFailure(responseException)
FTL->>UserListener: onFailure(responseException)
UserListener->>User: 处理错误
end
Note over HTTP,CB: 阶段4B: 失败回调
alt 网络异常/连接失败
HTTP->>CB: failed(exception)
CB->>CB: RequestLogger.logFailedRequest()
CB->>BL: onFailure(node1)
BL->>BL: 标记节点死亡
alt isRetryable && hasNext
CB->>FTL: trackFailure(exception)
CB->>RC: performRequestAsync(tuple, request, listener)
Note over RC: 递归调用,选择下一个节点
else 所有节点都失败
CB->>FTL: onDefinitiveFailure(exception)
FTL->>UserListener: onFailure(exception)
UserListener->>User: 处理错误
end
end
Note over RC,User: 阶段5: 取消请求(可选)
opt 用户取消请求
User->>RC: cancellable.cancel()
RC->>IR: httpRequest.abort()
IR->>HTTP: 取消正在进行的请求
HTTP->>CB: cancelled()
CB->>FTL: onDefinitiveFailure(CancellationException)
FTL->>UserListener: onFailure(CancellationException)
end
异步请求功能详解
阶段1: 准备异步请求 (步骤1-7)
功能说明:
-
包装监听器:
FailureTrackingResponseListener包装用户提供的ResponseListener- 追踪所有失败异常,添加为 suppressed 异常
- 确保只调用一次最终回调 (成功或失败)
-
创建 InternalRequest: 与同步请求相同,包装用户请求
-
立即返回: 返回
Cancellable对象,用户可以取消请求
代码对应:
public Cancellable performRequestAsync(Request request, ResponseListener responseListener) {
try {
FailureTrackingResponseListener failureTrackingResponseListener =
new FailureTrackingResponseListener(responseListener);
InternalRequest internalRequest = new InternalRequest(request);
performRequestAsync(nextNodes(), internalRequest, failureTrackingResponseListener);
return internalRequest.cancellable;
} catch (Exception e) {
responseListener.onFailure(e);
return Cancellable.NO_OP;
}
}
阶段2: 异步发送请求 (步骤8-13)
功能说明:
-
创建 FutureCallback: 匿名内部类实现,处理异步回调
completed(HttpResponse): 响应成功回调failed(Exception): 请求失败回调cancelled(): 请求被取消回调
-
异步执行:
client.execute(..., callback)立即返回- 不阻塞当前线程
- HTTP 请求在后台线程池执行
-
用户代码继续: 用户代码不阻塞,可以继续执行其他操作
代码对应:
private void performRequestAsync(
final NodeTuple<Iterator<Node>> tuple,
final InternalRequest request,
final FailureTrackingResponseListener listener
) {
request.cancellable.runIfNotCancelled(() -> {
final RequestContext context = request.createContextForNextAttempt(
tuple.nodes.next(),
tuple.authCache
);
client.execute(
context.requestProducer,
context.asyncResponseConsumer,
context.context,
new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse httpResponse) {
// 成功回调处理
}
@Override
public void failed(Exception failure) {
// 失败回调处理
}
@Override
public void cancelled() {
listener.onDefinitiveFailure(Cancellable.newCancellationException());
}
}
);
});
}
阶段3: 后台 HTTP 通信 (步骤14-19)
功能说明:
-
异步 I/O: 使用 NIO 实现非阻塞 I/O
- 不占用线程等待响应
- 提高并发性能
-
连接池管理: 异步从连接池获取连接
- 如果有空闲连接,立即使用
- 如果没有,异步创建新连接
-
Elasticsearch 处理: 与同步请求相同
阶段4A: 成功回调 (步骤20-38)
功能说明:
(A) 成功响应 (步骤21-27):
- 在 HTTP I/O 线程 中执行回调
- 从黑名单移除节点
- 调用用户监听器的
onSuccess(response) - 注意: 用户回调在 HTTP 线程执行,不应执行耗时操作
(B) 可重试错误 (步骤29-38):
- 标记节点死亡
- 如果有其他节点:
trackFailure(exception): 记录失败异常- 递归调用
performRequestAsync(): 向下一个节点重试 - 重试是自动的,用户无感知
- 如果所有节点失败:
onDefinitiveFailure(exception): 调用最终失败回调- 包含所有失败的 suppressed 异常
(C) 请求错误 (步骤40-45):
- 保持节点存活
- 立即调用用户失败回调,不重试
代码对应:
@Override
public void completed(HttpResponse httpResponse) {
try {
ResponseOrResponseException result = convertResponse(request, context.node, httpResponse);
if (result.responseException == null) {
// 成功
listener.onSuccess(result.response);
} else {
// HTTP 错误响应
if (tuple.nodes.hasNext()) {
// 有下一个节点,重试
listener.trackFailure(result.responseException);
performRequestAsync(tuple, request, listener);
} else {
// 所有节点失败
listener.onDefinitiveFailure(result.responseException);
}
}
} catch (Exception e) {
listener.onDefinitiveFailure(e);
}
}
阶段4B: 失败回调 (步骤47-59)
功能说明:
-
网络异常处理: IOException, ConnectTimeout, SocketTimeout 等
-
记录日志: 记录失败请求日志
-
标记节点死亡: 更新黑名单
-
重试逻辑:
- 如果异常可重试且有其他节点: 递归调用,向下一个节点重试
- 如果所有节点失败: 调用最终失败回调
代码对应:
@Override
public void failed(Exception failure) {
try {
RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure);
onFailure(context.node);
if (isRetryableException(failure) && tuple.nodes.hasNext()) {
listener.trackFailure(failure);
performRequestAsync(tuple, request, listener);
} else {
listener.onDefinitiveFailure(failure);
}
} catch (Exception e) {
listener.onDefinitiveFailure(e);
}
}
阶段5: 取消请求 (步骤61-67)
功能说明:
- 用户取消: 调用
cancellable.cancel() - 中断请求: 调用
httpRequest.abort()中断正在进行的 HTTP 请求 - 取消回调: HTTP 客户端调用
cancelled()回调 - 通知用户: 调用用户失败回调,传递
CancellationException
代码对应:
// Cancellable 接口
public interface Cancellable {
void cancel();
}
// 实现
@Override
public void cancelled() {
listener.onDefinitiveFailure(Cancellable.newCancellationException());
}
异步 vs 同步对比
| 特性 | 同步请求 | 异步请求 |
|---|---|---|
| 调用方式 | performRequest(request) |
performRequestAsync(request, listener) |
| 返回值 | Response |
Cancellable |
| 阻塞 | 阻塞当前线程 | 立即返回,不阻塞 |
| 回调 | 无 | ResponseListener.onSuccess/onFailure |
| 回调线程 | 调用线程 | HTTP I/O 线程 |
| 异常处理 | 抛出异常 | 回调 onFailure() |
| 取消 | 不支持 | 支持 (Cancellable.cancel()) |
| 重试 | 递归调用,阻塞 | 递归调用,非阻塞 |
| 适用场景 | 简单请求,同步流程 | 高并发,异步流程 |
| 调用栈 | 保留完整调用栈 | 调用栈被打断 |
FailureTrackingResponseListener 功能
// FailureTrackingResponseListener 实现
private class FailureTrackingResponseListener {
private final ResponseListener responseListener;
private volatile Exception exception; // 追踪失败异常
FailureTrackingResponseListener(ResponseListener responseListener) {
this.responseListener = responseListener;
}
// 追踪失败,添加为 suppressed 异常
void trackFailure(Exception exception) {
if (this.exception != null) {
exception.addSuppressed(this.exception);
}
this.exception = exception;
}
// 成功回调
void onSuccess(Response response) {
responseListener.onSuccess(response);
}
// 最终失败回调(所有重试都失败)
void onDefinitiveFailure(Exception exception) {
trackFailure(exception);
responseListener.onFailure(this.exception);
}
}
功能说明:
- 追踪所有失败: 每次重试失败时,调用
trackFailure()记录异常 - Suppressed 异常链: 将之前的失败异常添加为 suppressed 异常
- 最终回调: 当所有节点都失败时,调用
onDefinitiveFailure() - 异常信息完整: 用户可以通过
exception.getSuppressed()查看所有失败信息
示例:
IOException: all nodes failed
Suppressed: ConnectTimeoutException (node1)
Suppressed: SocketTimeoutException (node2)
Suppressed: ResponseException: 503 Service Unavailable (node3)
异步请求最佳实践
1. 避免在回调中执行耗时操作:
restClient.performRequestAsync(request, new ResponseListener() {
@Override
public void onSuccess(Response response) {
// ❌ 错误: 在 HTTP 线程执行耗时操作
// processLargeResponse(response);
// ✅ 正确: 提交到业务线程池
executor.submit(() -> processLargeResponse(response));
}
@Override
public void onFailure(Exception exception) {
logger.error("Request failed", exception);
}
});
2. 使用 CompletableFuture 包装异步调用:
public CompletableFuture<Response> performRequestAsync(Request request) {
CompletableFuture<Response> future = new CompletableFuture<>();
restClient.performRequestAsync(request, new ResponseListener() {
@Override
public void onSuccess(Response response) {
future.complete(response);
}
@Override
public void onFailure(Exception exception) {
future.completeExceptionally(exception);
}
});
return future;
}
3. 正确处理取消:
Cancellable cancellable = restClient.performRequestAsync(request, listener);
// 用户触发取消
userCancelButton.setOnClickListener(() -> {
cancellable.cancel();
});
6.3 节点失败与重试流程
失败节点管理时序图
sequenceDiagram
autonumber
participant RC as RestClient
participant BL as Blacklist<br/>(DeadHostState)
participant FL as FailureListener
participant Time as TimeSupplier
Note over RC,Time: 节点失败
RC->>RC: 检测到节点失败<br/>(网络错误/HTTP错误)
RC->>BL: markDead(node)
BL->>Time: nanoTime()
Time-->>BL: currentTime
BL->>BL: failedAttempts++
BL->>BL: 计算重试超时<br/>timeout = min(60s * 2^(attempts-1), 30min)
BL->>BL: deadUntilNanos = currentTime + timeout
BL->>BL: 存储到 blacklist Map<br/>(host → DeadHostState)
RC->>FL: onFailure(node)
FL->>FL: 记录日志/发送告警
Note over RC,Time: 重试检查
loop 每次请求时
RC->>BL: isAlive(node)?
BL->>Time: nanoTime()
Time-->>BL: currentTime
alt currentTime >= deadUntilNanos
BL-->>RC: true (可以重试)
Note over RC: 节点复活,尝试请求
else currentTime < deadUntilNanos
BL-->>RC: false (仍然dead)
Note over RC: 跳过此节点
end
end
时序图说明
失败节点重试超时
计算公式:
timeout = min(60秒 * 2^(failedAttempts - 1), 30分钟)
重试时间表:
| 失败次数 | 重试超时 |
|---|---|
| 1 | 1分钟 |
| 2 | 2分钟 |
| 3 | 4分钟 |
| 4 | 8分钟 |
| 5 | 16分钟 |
| 6+ | 30分钟(最大值) |
指数退避策略
目的:
- 避免持续请求失败节点
- 给节点恢复时间
- 减少无效请求
实现:
- 失败次数越多,重试间隔越长
- 最长等待30分钟
- 超时后自动复活,再次尝试
6.4 轮询负载均衡流程
轮询选择节点时序图
sequenceDiagram
autonumber
participant RC as RestClient
participant NT as NodeTuple
participant AI as AtomicInteger<br/>(lastNodeIndex)
participant NS as NodeSelector
participant BL as Blacklist
Note over RC,BL: 请求1: 选择节点
RC->>NT: next()
NT->>AI: incrementAndGet()
AI-->>NT: index = 0
NT->>NT: nodes.get(index % size)
NT-->>RC: node[0]
RC->>NS: select(allNodes)
NS->>NS: 过滤节点<br/>(如跳过专用主节点)
NS-->>RC: filteredNodes
RC->>BL: isAlive(node[0])?
BL-->>RC: true
RC->>RC: 使用 node[0] 发送请求
Note over RC,BL: 请求2: 轮询到下一个节点
RC->>NT: next()
NT->>AI: incrementAndGet()
AI-->>NT: index = 1
NT->>NT: nodes.get(1 % size)
NT-->>RC: node[1]
RC->>BL: isAlive(node[1])?
BL-->>RC: true
RC->>RC: 使用 node[1] 发送请求
Note over RC,BL: 请求3: node[1] 失败,重试
RC->>NT: next()
AI-->>NT: index = 2
NT-->>RC: node[2]
RC->>BL: isAlive(node[2])?
BL-->>RC: false (dead)
RC->>NT: next() (跳过,选择下一个)
AI-->>NT: index = 3
NT-->>RC: node[0] (循环回第一个)
RC->>RC: 使用 node[0] 发送请求
时序图说明
轮询算法
实现:
int index = lastNodeIndex.incrementAndGet();
Node node = nodes.get(Math.abs(index) % nodes.size());
特点:
- 均匀分布请求到所有节点
- 线程安全(AtomicInteger)
- 自动循环
- 跳过失败节点
节点选择器
作用:
- 过滤不符合条件的节点
- 自定义选择逻辑
内置选择器:
NodeSelector.ANY: 所有节点NodeSelector.SKIP_DEDICATED_MASTERS: 跳过专用主节点
6.5 客户端创建与关闭流程
客户端生命周期时序图
sequenceDiagram
autonumber
participant User as 用户代码
participant Builder as RestClientBuilder
participant RC as RestClient
participant HTTP as HttpAsyncClient
Note over User,HTTP: 创建客户端
User->>Builder: RestClient.builder(hosts)
Builder->>Builder: 设置默认值
User->>Builder: setDefaultHeaders(headers)
User->>Builder: setNodeSelector(selector)
User->>Builder: setRequestConfigCallback(callback)
User->>Builder: setHttpClientConfigCallback(callback)
User->>Builder: build()
Builder->>HTTP: 创建 HttpAsyncClient
HTTP->>HTTP: 配置连接池
HTTP->>HTTP: 配置超时
HTTP->>HTTP: 配置 SSL
Builder->>RC: new RestClient(client, config)
RC->>RC: 初始化 NodeTuple
RC->>RC: 初始化 Blacklist
Builder-->>User: restClient
Note over User,HTTP: 使用客户端
User->>RC: performRequest(request)
RC->>HTTP: execute(request)
HTTP-->>RC: response
RC-->>User: response
Note over User,HTTP: 关闭客户端
User->>RC: close()
RC->>HTTP: close()
HTTP->>HTTP: 关闭所有连接
HTTP->>HTTP: 关闭连接池
HTTP->>HTTP: 关闭 I/O 线程
HTTP-->>RC: closed
RC-->>User: closed
时序图说明
创建步骤
1. 构建器配置(步骤1-7):
- 设置节点列表
- 配置默认请求头
- 配置节点选择器
- 配置超时和重试
2. HTTP客户端创建(步骤8-12):
- 创建Apache HttpAsyncClient
- 配置连接池参数
- 配置SSL/TLS
- 启动I/O线程
3. RestClient创建(步骤13-16):
- 初始化节点列表
- 初始化黑名单
- 设置所有配置
关闭步骤
1. 关闭RestClient(步骤21):
- 触发关闭流程
2. 关闭HttpAsyncClient(步骤22-26):
- 关闭所有活动连接
- 关闭连接池
- 停止I/O线程
- 释放资源
6.6 动态更新节点列表
节点列表更新时序图
sequenceDiagram
autonumber
participant User as 用户代码
participant RC as RestClient
participant OldNT as Old NodeTuple
participant NewNT as New NodeTuple
participant BL as Blacklist
Note over User,BL: 初始状态
RC->>OldNT: 当前节点列表<br/>[node1, node2]
Note over User,BL: 发现新节点
User->>User: 通过某种方式<br/>发现集群节点变化
User->>RC: setNodes([node1, node2, node3])
RC->>RC: synchronized (确保线程安全)
RC->>NewNT: 创建新 NodeTuple<br/>with [node1, node2, node3]
RC->>RC: 原子替换<br/>nodeTuple = newNodeTuple
Note over RC: 旧 NodeTuple 被 GC 回收
RC->>BL: 清理旧节点的死亡状态<br/>(可选)
RC-->>User: 节点列表已更新
Note over User,BL: 后续请求使用新节点列表
User->>RC: performRequest(request)
RC->>NewNT: next() (可能选择 node3)
时序图说明
节点发现方式
方式1: 定期查询集群状态:
// 定期从一个节点获取集群状态
Request request = new Request("GET", "/_nodes/http");
Response response = restClient.performRequest(request);
// 解析响应,提取所有节点
List<Node> nodes = parseNodes(response);
// 更新客户端节点列表
restClient.setNodes(nodes);
方式2: 嗅探器(Sniffer):
Sniffer sniffer = Sniffer.builder(restClient)
.setSniffIntervalMillis(60000) // 每分钟嗅探一次
.build();
// 自动更新节点列表
// 使用完后关闭
sniffer.close();
线程安全
setNodes()方法是同步的- NodeTuple是不可变的
- 原子替换保证一致性
7. 关键配置
7.1 客户端配置
| 配置项 | 默认值 | 说明 |
|---|---|---|
| maxRetryTimeoutMillis | 30000 | 最大重试超时 |
| requestConfigCallback | - | 请求配置回调 |
| httpClientConfigCallback | - | HTTP客户端配置回调 |
| defaultHeaders | [] | 默认请求头 |
| failureListener | - | 失败监听器 |
| nodeSelector | NodeSelector.ANY | 节点选择器 |
| strictDeprecationMode | false | 严格废弃模式 |
| compressionEnabled | false | 启用压缩 |
7.2 连接池配置
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200));
builder.setHttpClientConfigCallback(httpClientBuilder -> {
return httpClientBuilder
.setMaxConnTotal(100) // 最大连接数
.setMaxConnPerRoute(10) // 每个路由最大连接数
.setConnectionTimeToLive(5, TimeUnit.MINUTES); // 连接存活时间
});
7.3 请求配置
builder.setRequestConfigCallback(requestConfigBuilder -> {
return requestConfigBuilder
.setConnectTimeout(5000) // 连接超时
.setSocketTimeout(60000); // 读取超时
});
8. 使用示例
8.1 创建客户端
// 基本创建
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200, "http")
).build();
// 多节点
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")
).build();
// 自定义配置
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200, "http"))
.setDefaultHeaders(new Header[]{
new BasicHeader("Authorization", "Bearer token")
})
.setMaxRetryTimeoutMillis(60000)
.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS)
.build();
8.2 同步请求
// 创建请求
Request request = new Request("GET", "/_cluster/health");
request.addParameter("wait_for_status", "yellow");
// 执行请求
Response response = restClient.performRequest(request);
// 处理响应
int statusCode = response.getStatusLine().getStatusCode();
String responseBody = EntityUtils.toString(response.getEntity());
8.3 异步请求
Request request = new Request("GET", "/_cluster/health");
Cancellable cancellable = restClient.performRequestAsync(
request,
new ResponseListener() {
@Override
public void onSuccess(Response response) {
// 处理成功响应
}
@Override
public void onFailure(Exception exception) {
// 处理异常
}
}
);
// 可选: 取消请求
// cancellable.cancel();
8.4 请求参数和请求体
Request request = new Request("POST", "/my-index/_doc");
// 添加参数
request.addParameter("refresh", "true");
// 设置JSON请求体
request.setJsonEntity(
"{\"field\":\"value\"}"
);
// 或使用实体
request.setEntity(new NStringEntity(
"{\"field\":\"value\"}",
ContentType.APPLICATION_JSON
));
Response response = restClient.performRequest(request);
9. 节点选择器
9.1 内置选择器
// 选择所有节点(默认)
NodeSelector.ANY
// 跳过专用主节点
NodeSelector.SKIP_DEDICATED_MASTERS
// 跳过Ingest节点
NodeSelector.NOT_INGEST_ONLY_NODES
9.2 自定义选择器
NodeSelector customSelector = new NodeSelector() {
@Override
public void select(Iterable<Node> nodes) {
// 自定义节点选择逻辑
// 例如: 只选择本地区域的节点
for (Node node : nodes) {
if (node.getAttributes().get("zone").equals("local")) {
// 保留此节点
}
}
}
};
RestClient restClient = RestClient.builder(...)
.setNodeSelector(customSelector)
.build();
10. 错误处理
10.1 异常类型
| 异常 | 说明 | 重试 |
|---|---|---|
| IOException | 网络错误 | 是 |
| ConnectTimeoutException | 连接超时 | 是 |
| SocketTimeoutException | 读取超时 | 是 |
| ResponseException | HTTP错误响应 | 部分 |
10.2 重试策略
自动重试条件:
- 网络错误(IOException)
- 连接超时
- 节点不可达
- HTTP 429 (Too Many Requests)
- HTTP 502/503/504 (网关错误)
不重试条件:
- HTTP 400 (Bad Request)
- HTTP 401/403 (认证/授权错误)
- HTTP 404 (Not Found)
- 超过最大重试时间
11. 性能优化
11.1 连接池优化
builder.setHttpClientConfigCallback(httpClientBuilder -> {
return httpClientBuilder
.setMaxConnTotal(100) // 增加连接数
.setMaxConnPerRoute(20) // 增加每路由连接数
.setKeepAliveStrategy((response, context) -> 60000); // 保持连接
});
11.2 压缩
RestClient restClient = RestClient.builder(...)
.setCompressionEnabled(true) // 启用gzip压缩
.build();
11.3 批量请求
// 使用Bulk API
Request request = new Request("POST", "/_bulk");
request.setJsonEntity(bulkBody);
12. 总结与关键要点
12.1 核心能力总结
Client模块提供与Elasticsearch集群通信的完整客户端实现,核心特性包括:
1. 连接管理
- 连接池: 基于 Apache HttpClient 连接池,支持连接复用
- 健康检查: 自动检测节点健康状态,维护黑名单
- 失败节点管理: 指数退避算法,自动恢复失败节点
- 动态节点列表: 支持运行时更新节点列表
2. 负载均衡
- 轮询算法: 使用
Collections.rotate()实现公平的轮询调度 - 节点选择器: 可插拔的节点过滤策略 (NodeSelector)
- 跳过失败节点: 自动跳过黑名单中的节点
- 智能重试: 失败时自动选择其他节点重试
3. 请求发送
- 同步请求:
performRequest(),阻塞等待响应 - 异步请求:
performRequestAsync(),立即返回,支持取消 - HTTP 构建: 自动构建 URI、设置请求头、处理请求体
- 响应处理: 自动处理 Gzip 压缩、解析状态码、转换响应
4. 错误处理
- 自动重试: 网络错误和 502/503/504 自动重试
- 指数退避: 失败节点的重试超时随失败次数指数增长
- 异常转换: 保留原始异常类型和调用栈
- Suppressed 异常链: 记录所有重试失败的异常
5. 灵活配置
- 超时控制: 连接超时、Socket 超时、请求超时
- 连接池参数: 最大连接数、每路由连接数、连接存活时间
- 自定义回调: RequestConfigCallback, HttpClientConfigCallback
- 失败监听器: FailureListener,监控节点失败事件
12.2 核心调用链路总结
同步请求链路
用户代码
→ RestClient.performRequest(Request)
→ InternalRequest (包装请求)
→ nextNodes() (选择节点)
→ selectNodes() (节点选择算法)
→ 分离存活/死亡节点
→ NodeSelector 过滤
→ Collections.rotate() 轮询
→ performRequest(tuple, request) (递归重试)
→ createContextForNextAttempt() (创建上下文)
→ client.execute().get() (同步执行)
→ convertResponse() (转换响应)
→ 检查状态码
→ onResponse/onFailure (更新黑名单)
→ 返回 Response 或递归重试
异步请求链路
用户代码
→ RestClient.performRequestAsync(Request, ResponseListener)
→ FailureTrackingResponseListener (包装监听器)
→ InternalRequest (包装请求)
→ nextNodes() (选择节点)
→ performRequestAsync(tuple, request, listener)
→ createContextForNextAttempt() (创建上下文)
→ client.execute(producer, consumer, context, callback) (异步执行)
→ 返回 Cancellable (立即返回)
[后台线程]
→ FutureCallback.completed(httpResponse)
→ convertResponse() (转换响应)
→ onResponse/onFailure (更新黑名单)
→ listener.onSuccess() 或 performRequestAsync() 重试
12.3 关键设计要点
1. 递归重试设计
- 优点: 代码简洁,逻辑清晰
- 实现: 失败时递归调用
performRequest(),传递下一个节点 - 异常处理: 使用 suppressed 异常链记录所有失败
2. 同步请求的异步实现
- 设计: 同步方法内部使用异步执行 +
.get()阻塞 - 原因: 保留调用栈,方便调试,异常信息更完整
- 代码:
client.execute(...).get()
3. 节点黑名单的 CAS 更新
- 线程安全: 使用
ConcurrentMap.putIfAbsent()和replace()CAS 操作 - 无锁设计: 避免锁竞争,提高并发性能
- 代码:
while (true) {
DeadHostState prev = blacklist.putIfAbsent(host, new DeadHostState());
if (prev == null || blacklist.replace(host, prev, new DeadHostState(prev))) {
break;
}
}
4. 指数退避算法
- 公式:
timeout = min(60s × 2 × 2^(failedAttempts × 0.5 - 1), 30min) - 特点: 失败次数越多,重试间隔越长,最长 30 分钟
- 目的: 避免持续请求失败节点,给节点恢复时间
5. 状态码分类处理
- 2xx/3xx: 成功,从黑名单移除节点
- 502/503/504: 节点问题,标记死亡,重试其他节点
- 400/401/403/404: 请求问题,保持节点存活,不重试
12.4 性能优化要点
1. 连接池优化
builder.setHttpClientConfigCallback(httpClientBuilder -> {
return httpClientBuilder
.setMaxConnTotal(100) // 增加连接数
.setMaxConnPerRoute(20) // 增加每路由连接数
.setKeepAliveStrategy((response, context) -> 60000); // 保持连接
});
2. 压缩优化
RestClient restClient = RestClient.builder(...)
.setCompressionEnabled(true) // 启用 gzip 压缩
.build();
3. 批量请求
// 使用 Bulk API,减少网络往返
Request request = new Request("POST", "/_bulk");
request.setJsonEntity(bulkBody);
4. 异步请求
// 高并发场景使用异步请求,避免阻塞线程
restClient.performRequestAsync(request, listener);
5. 自定义响应消费者
// 对于大响应,使用自定义缓冲大小
RequestOptions options = RequestOptions.DEFAULT.toBuilder()
.setHttpAsyncResponseConsumerFactory(
new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(
100 * 1024 * 1024 // 100 MB 缓冲
)
)
.build();
12.5 常见问题与解决方案
问题1: 所有节点都失败
原因:
- 所有节点都不可达
- 网络问题
- Elasticsearch 集群故障
解决方案:
- 检查网络连接
- 检查 Elasticsearch 集群状态
- 增加重试超时
setMaxRetryTimeoutMillis() - 配置 FailureListener 监控节点失败事件
问题2: 请求超时
原因:
- Socket 超时配置过小
- Elasticsearch 处理慢(查询复杂/集群负载高)
解决方案:
builder.setRequestConfigCallback(requestConfigBuilder -> {
return requestConfigBuilder
.setConnectTimeout(5000) // 连接超时
.setSocketTimeout(60000); // Socket 超时(根据查询复杂度调整)
});
问题3: 连接池耗尽
原因:
- 并发请求过多
- 连接数配置过小
- 连接泄漏(未关闭 Response Entity)
解决方案:
// 增加连接池大小
builder.setHttpClientConfigCallback(httpClientBuilder -> {
return httpClientBuilder
.setMaxConnTotal(200) // 增加总连接数
.setMaxConnPerRoute(50); // 增加每路由连接数
});
// 确保消费 Response Entity
Response response = restClient.performRequest(request);
EntityUtils.consume(response.getEntity()); // 释放连接
问题4: 节点持续标记为死亡
原因:
- 节点真的不可用
- 网络不稳定
- 超时配置过小
解决方案:
- 检查节点状态
- 增加超时配置
- 配置 FailureListener 查看失败原因
- 考虑使用 Sniffer 自动发现节点
问题5: 异步回调中抛出异常
原因:
- 在 HTTP I/O 线程执行耗时操作
- 回调中抛出未捕获异常
解决方案:
restClient.performRequestAsync(request, new ResponseListener() {
@Override
public void onSuccess(Response response) {
try {
// 提交到业务线程池,不阻塞 HTTP 线程
executor.submit(() -> {
processResponse(response);
});
} catch (Exception e) {
logger.error("Failed to submit task", e);
}
}
@Override
public void onFailure(Exception exception) {
logger.error("Request failed", exception);
}
});
12.6 最佳实践总结
1. 客户端创建
- ✅ 使用单例模式,复用 RestClient 实例
- ✅ 配置多个节点,提高可用性
- ✅ 配置合适的超时和连接池参数
- ❌ 不要为每个请求创建新的 RestClient
2. 请求发送
- ✅ 优先使用异步请求 (高并发场景)
- ✅ 使用批量 API (Bulk, Multi-Search)
- ✅ 启用压缩 (网络带宽有限时)
- ❌ 不要在异步回调中执行耗时操作
3. 错误处理
- ✅ 捕获 ResponseException,处理 HTTP 错误
- ✅ 检查 suppressed 异常,了解所有失败原因
- ✅ 配置 FailureListener,监控节点健康
- ❌ 不要忽略警告 (Warnings)
4. 资源管理
- ✅ 使用 try-with-resources 自动关闭 RestClient
- ✅ 消费 Response Entity,释放连接
- ✅ 取消不需要的异步请求
- ❌ 不要忘记关闭 RestClient
5. 节点管理
- ✅ 使用 Sniffer 自动发现节点
- ✅ 配置 NodeSelector,避免请求专用主节点
- ✅ 监控节点失败事件,及时告警
- ❌ 不要手动管理节点列表 (除非必要)
12.7 与其他模块的关系
与 Server 模块的关系
- Client 模块通过 HTTP 协议与 Server 模块通信
- Server 模块的 REST API 是 Client 模块的请求目标
- Server 模块的响应格式是 Client 模块需要解析的
与传输层的关系
- Client 模块使用 HTTP 协议(应用层)
- 传输层模块使用 TCP 协议(传输层)
- Client 模块是外部客户端,传输层是节点间通信
与集群模块的关系
- Client 模块可以查询集群状态 (通过
/_cluster/*API) - 集群模块维护节点列表,Client 可以通过 Sniffer 获取
- 集群故障时,Client 模块会收到 503 错误
理解 Client 模块的工作原理对于:
- 优化客户端性能: 合理配置连接池、超时、压缩
- 处理异常: 理解重试机制、节点失败管理
- 实现高可用: 配置多节点、NodeSelector、FailureListener
- 排查问题: 分析调用链路、检查异常链、监控节点状态
Client 模块是 Elasticsearch 生态的重要组成部分,是连接用户应用和 Elasticsearch 集群的桥梁。