Elasticsearch-01-Server核心
本文档提供 Server 核心模块的全面剖析,包括节点启动流程、生命周期管理、服务管理、插件系统、核心数据结构、API 详细规格和关键流程时序图。
1. 模块职责
Server 核心模块是 Elasticsearch 的基础,负责节点的启动、运行和关闭全生命周期管理。
1.1 核心职责
-
节点启动(Bootstrap)
- 三阶段启动流程
- 环境初始化
- 安全管理器配置
- 插件加载
-
生命周期管理
- 节点启动(start)
- 节点停止(stop)
- 节点关闭(close)
- 优雅关闭
-
服务管理
- 依赖注入(Guice)
- 服务注册与发现
- 服务启动顺序控制
- 服务健康检查
-
插件系统
- 插件加载机制
- 插件生命周期管理
- 插件扩展点
- 插件隔离
1.2 核心组件
上游依赖:
- JVM 环境
- 文件系统
- 网络栈
下游依赖者:
- 所有 Elasticsearch 模块
- 插件
2. 模块架构
2.1 整体服务架构图
flowchart TB
subgraph "客户端层"
HTTPClient[HTTP/REST 客户端]
TCPClient[Transport 客户端]
end
subgraph "网关层 (REST Layer)"
RestController[RestController<br/>路由控制器]
RestHandler[RestHandler<br/>请求处理器]
RestRequest[RestRequest<br/>请求封装]
end
subgraph "动作处理层 (Action Layer)"
ActionModule[ActionModule<br/>动作模块]
TransportAction[TransportAction<br/>传输动作]
ActionRequest[ActionRequest<br/>请求对象]
end
subgraph "节点核心 (Node Core)"
Node[Node<br/>节点实例]
Injector[Guice Injector<br/>依赖注入]
Lifecycle[Lifecycle<br/>生命周期管理]
NodeClient[NodeClient<br/>节点客户端]
end
subgraph "核心业务层 (Business Layer)"
IS[IndicesService<br/>索引服务]
SS[SearchService<br/>搜索服务]
CS[ClusterService<br/>集群服务]
ICSS[IndicesClusterStateService<br/>索引集群状态服务]
end
subgraph "传输层 (Transport Layer)"
TS[TransportService<br/>传输服务]
TCP[TcpTransport<br/>TCP传输]
Netty[Netty<br/>网络框架]
end
subgraph "存储层 (Storage Layer)"
Shard[IndexShard<br/>索引分片]
Engine[InternalEngine<br/>存储引擎]
Lucene[Lucene<br/>搜索库]
Translog[Translog<br/>事务日志]
end
subgraph "支持组件层 (Infrastructure)"
TP[ThreadPool<br/>线程池]
Monitor[MonitorService<br/>监控服务]
Gateway[GatewayService<br/>网关持久化]
Plugins[PluginsService<br/>插件服务]
end
HTTPClient --> RestController
TCPClient --> TS
RestController --> RestHandler
RestHandler --> RestRequest
RestRequest --> ActionModule
ActionModule --> TransportAction
TransportAction --> ActionRequest
ActionRequest --> NodeClient
NodeClient --> Injector
Injector --> IS
Injector --> SS
Injector --> CS
IS --> Shard
SS --> Shard
Shard --> Engine
Engine --> Lucene
Engine --> Translog
TransportAction --> TS
TS --> TCP
TCP --> Netty
IS --> TP
SS --> TP
CS --> Gateway
Node --> Plugins
style HTTPClient fill:#e1f5ff
style RestController fill:#fff4e1
style ActionModule fill:#f5e1ff
style Node fill:#ffe1f5
style IS fill:#e1ffe1
style TS fill:#f5f5e1
style Engine fill:#ffe1e1
style TP fill:#e1e1f5
2.2 架构层次说明
客户端层
HTTP/REST客户端: 通过HTTP协议访问Elasticsearch的REST API Transport客户端: 使用Elasticsearch内部协议直接与节点通信(已废弃)
网关层 (REST Layer)
RestController:
- 路由HTTP请求到对应的RestHandler
- 管理所有REST API路由
- 请求预处理和响应后处理
RestHandler:
- 处理具体的REST请求
- 解析HTTP参数
- 构造ActionRequest并转发到Action层
RestRequest:
- 封装HTTP请求
- 提供参数访问接口
- 处理请求体解析
动作处理层 (Action Layer)
ActionModule:
- 注册所有Action和对应的TransportAction
- 管理Action与Handler的映射关系
- Action生命周期管理
TransportAction:
- 执行具体的业务逻辑
- 协调多个服务完成操作
- 处理异步回调
ActionRequest:
- 封装操作请求
- 定义请求参数
- 支持序列化和反序列化
节点核心 (Node Core)
Node:
- 封装所有服务和组件
- 管理节点生命周期(启动/停止/关闭)
- 协调服务启动顺序
Guice Injector:
- 依赖注入容器
- 管理所有服务实例
- 解决服务间依赖关系
Lifecycle:
- 跟踪生命周期状态
- 状态转换控制(Initialized→Started→Stopped→Closed)
- 防止非法状态转换
NodeClient:
- 节点内部客户端
- 执行本地操作
- 直接调用TransportAction
核心业务层 (Business Layer)
IndicesService:
- 管理所有索引
- 创建/删除/打开/关闭索引
- 管理IndexShard实例
SearchService:
- 执行搜索请求
- 管理搜索上下文
- 协调Query Phase和Fetch Phase
ClusterService:
- 管理集群状态
- 处理集群状态更新
- 发布集群状态变更
IndicesClusterStateService:
- 监听集群状态变化
- 根据路由表创建/删除分片
- 同步索引元数据
传输层 (Transport Layer)
TransportService:
- 节点间通信服务
- 管理请求/响应
- 处理连接管理
TcpTransport:
- TCP连接管理
- 消息序列化/反序列化
- 网络I/O处理
Netty:
- 底层网络框架
- 异步I/O
- 高性能通信
存储层 (Storage Layer)
IndexShard:
- 索引分片实例
- 管理单个分片的读写
- 协调Engine操作
InternalEngine:
- 存储引擎实现
- 封装Lucene操作
- 管理Translog
Lucene:
- Apache Lucene搜索库
- 倒排索引
- 全文检索
Translog:
- 事务日志
- 保证数据持久性
- 支持故障恢复
支持组件层 (Infrastructure)
ThreadPool:
- 线程池管理
- 不同类型任务使用不同线程池
- 提供调度功能
MonitorService:
- 系统监控
- JVM监控
- 磁盘监控
GatewayService:
- 集群元数据持久化
- 恢复集群状态
- 管理索引元数据
PluginsService:
- 插件加载
- 插件生命周期管理
- 扩展点管理
2.3 模块间交互关系
同步调用链路
flowchart LR
Client[客户端] --> REST[REST Layer]
REST --> Action[Action Layer]
Action --> Business[Business Layer]
Business --> Storage[Storage Layer]
Storage --> Lucene[Lucene]
说明: REST API请求的同步调用链路,每层依次调用下一层。
异步消息传递
flowchart LR
Cluster[ClusterService] -->|发布状态| Transport[TransportService]
Transport -->|广播| Nodes[所有节点]
Nodes -->|应用状态| ICSS[IndicesClusterStateService]
说明: 集群状态变更通过Transport层异步广播到所有节点。
事件监听
flowchart TB
ClusterService[ClusterService] -->|状态变更事件| Listeners[ClusterStateListener]
Listeners --> ICSS[IndicesClusterStateService]
Listeners --> SearchService[SearchService]
Listeners --> Plugins[Plugin Listeners]
说明: 集群状态变更时,触发所有注册的监听器。
2.4 数据流与控制流
写入流程数据流
Client → REST API → IndexAction → TransportIndexAction
→ IndexShard → InternalEngine → Lucene IndexWriter
→ Translog → Disk
关键点:
- REST层负责HTTP协议处理
- Action层负责请求路由和权限验证
- Shard层负责分片路由
- Engine层负责版本控制和持久化
搜索流程数据流
Client → REST API → SearchAction → TransportSearchAction
→ SearchService → IndexShard → InternalEngine
→ Lucene IndexSearcher → 返回结果
关键点:
- 两阶段搜索: Query Phase + Fetch Phase
- 协调节点聚合所有分片结果
- 支持并行搜索多个分片
集群状态流控制流
Master Node → ClusterService → PublishClusterState
→ TransportService → All Nodes → ApplyClusterState
→ IndicesClusterStateService → 创建/删除分片
关键点:
- 主节点生成新状态
- 两阶段提交保证一致性
- 各节点异步应用状态
3. 三阶段启动流程
3.1 启动流程图
flowchart TB
Start[启动 main 方法] --> Phase1[Phase 1:<br/>基础初始化]
Phase1 --> P1_1[初始化安全属性]
P1_1 --> P1_2[读取启动参数]
P1_2 --> P1_3[初始化环境]
P1_3 --> P1_4[配置日志系统]
P1_4 --> Phase2[Phase 2:<br/>安全管理器初始化]
Phase2 --> P2_1[创建环境对象]
P2_1 --> P2_2[写入 PID 文件]
P2_2 --> P2_3[初始化本地库<br/>JNA/JNI]
P2_3 --> P2_4[检查 Jar Hell]
P2_4 --> P2_5[配置安全管理器]
P2_5 --> P2_6[加载插件]
P2_6 --> Phase3[Phase 3:<br/>节点构建与启动]
Phase3 --> P3_1[创建 Node 对象]
P3_1 --> P3_2[构建依赖注入容器]
P3_2 --> P3_3[注册所有服务]
P3_3 --> P3_4[启动所有服务]
P3_4 --> P3_5[执行 Bootstrap 检查]
P3_5 --> P3_6[通知 CLI 准备就绪]
P3_6 --> Running[节点运行中]
3.2 三阶段详解
Phase 1: 基础初始化
目标: 完成最基础的初始化,为后续阶段准备环境
关键步骤:
private static Bootstrap initPhase1() {
// 1. 初始化安全属性
initSecurityProperties();
// 2. 初始化 BootstrapInfo
BootstrapInfo.init();
// 3. 读取启动参数
ServerArgs args = new ServerArgs(System.in);
// 4. 创建环境对象(仅路径信息)
Environment nodeEnv = new Environment(args.nodeSettings(), args.configDir());
// 5. 配置日志系统(必须在最后)
LogConfigurator.configure(nodeEnv, args.quiet() == false);
return new Bootstrap(args);
}
注意事项:
- 尽可能少的操作
- 日志配置必须在最后
- 此阶段单线程执行
Phase 2: 安全管理器初始化
目标: 完成安全管理器配置之前的所有准备工作
关键步骤:
private static void initPhase2(Bootstrap bootstrap) {
// 1. 创建完整的环境对象(包含 SecureSettings)
Environment nodeEnv = createEnvironment(args.configDir(), args.nodeSettings(), secrets);
// 2. 写入 PID 文件
initPidFile(args.pidFile());
// 3. 设置全局异常处理器
Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());
// 4. 初始化本地库(JNA/JNI)
initializeNatives(
nodeEnv.tmpDir(),
MEMORY_LOCK_SETTING.get(args.nodeSettings()),
true, // 系统调用过滤器
CTRLHANDLER_SETTING.get(args.nodeSettings())
);
// 5. 检查 Jar Hell(类路径冲突)
JarHell.checkJarHell();
// 6. 加载插件
PluginsLoader pluginsLoader = PluginsLoader.createPluginsLoader(nodeEnv);
// 7. 配置安全管理器(可选)
if (enableSecurityManager) {
Security.setSecurityManager(new SecurityManager());
}
}
注意事项:
- 必须在安全管理器前完成
- 加载所有本地库
- 插件预加载
Phase 3: 节点构建与启动
目标: 构建 Node 对象,启动所有服务
关键步骤:
private static void initPhase3(Bootstrap bootstrap) {
// 1. 创建 Node 对象
Node node = new Node(bootstrap.environment(), bootstrap.pluginsLoader());
// 2. 启动节点
node.start();
// 3. 通知 CLI 进程准备就绪
bootstrap.sendCliMarker(BootstrapInfo.SERVER_READY_MARKER);
}
Node 构造过程:
public Node(Environment environment, PluginsLoader pluginsLoader) {
// 1. 创建依赖注入容器(Guice)
Injector injector = createInjector(environment, pluginsLoader);
// 2. 注册所有核心服务
registerServices(injector);
}
Node 启动过程:
public Node start() {
// 1. 更新生命周期状态
lifecycle.moveToStarted();
// 2. 启动插件生命周期组件
pluginLifecycleComponents.forEach(LifecycleComponent::start);
// 3. 启动核心服务(按依赖顺序)
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(SearchService.class).start();
injector.getInstance(ClusterService.class).start();
injector.getInstance(TransportService.class).start();
injector.getInstance(Coordinator.class).start();
injector.getInstance(GatewayService.class).start();
// 4. 启动 HTTP 服务器
injector.getInstance(HttpServerTransport.class).start();
// 5. 执行启动前验证
validateNodeBeforeAcceptingRequests();
return this;
}
4. 生命周期管理
4.1 生命周期状态机
stateDiagram-v2
[*] --> Initialized: 构造 Node
Initialized --> Starting: start()
Starting --> Started: 启动完成
Started --> Stopping: stop()
Stopping --> Stopped: 停止完成
Stopped --> Closing: close()
Closing --> Closed: 关闭完成
Closed --> [*]
Started --> Closing: 异常/强制关闭
4.2 生命周期状态
| 状态 | 说明 | 转换方法 |
|---|---|---|
| Initialized | 节点已构造,服务已注册 | 构造函数 |
| Starting | 正在启动服务 | start() |
| Started | 所有服务已启动,可以接受请求 | start() |
| Stopping | 正在停止服务 | stop() |
| Stopped | 所有服务已停止 | stop() |
| Closing | 正在关闭资源 | close() |
| Closed | 所有资源已释放 | close() |
5. 服务启动顺序
5.1 启动顺序图
flowchart LR
TP[ThreadPool] --> IS[IndicesService]
IS --> ICSS[IndicesClusterStateService]
ICSS --> SS[SnapshotsService]
SS --> Search[SearchService]
Search --> CS[ClusterService]
CS --> TS[TransportService]
TS --> Coord[Coordinator]
Coord --> GW[GatewayService]
GW --> HTTP[HttpServerTransport]
5.2 启动顺序说明
顺序原因:
- ThreadPool: 最先启动,所有服务都依赖线程池
- IndicesService: 管理索引,其他服务需要访问索引
- ClusterService: 集群状态管理
- TransportService: 节点间通信
- Coordinator: 集群协调(依赖 TransportService)
- HttpServerTransport: 最后启动,接受外部请求
6. 核心数据结构
6.1 Node 核心类
类图
classDiagram
class Node {
-Lifecycle lifecycle
-Injector injector
-Environment environment
-NodeEnvironment nodeEnvironment
-PluginsService pluginsService
-NodeClient client
-Collection~LifecycleComponent~ pluginLifecycleComponents
+Node(Environment, PluginsLoader)
+start() Node
+close()
}
class Lifecycle {
-State state
+moveToStarted() boolean
+moveToStopped() boolean
+moveToClosed() boolean
+state() State
}
class State {
<<enumeration>>
INITIALIZED
STARTING
STARTED
STOPPING
STOPPED
CLOSING
CLOSED
}
class Environment {
-Settings settings
-Path configDir
-Path[] dataFiles
-Path logsDir
-Path tmpDir
+settings() Settings
+configDir() Path
+dataFiles() Path[]
}
Node --> Lifecycle
Node --> Environment
Lifecycle --> State
类说明
Node:
- 封装所有服务和组件
- 管理节点生命周期
- 提供统一的启动/停止/关闭接口
Lifecycle:
- 跟踪组件生命周期状态
- 提供状态转换方法
- 防止非法状态转换
Environment:
- 封装节点环境配置
- 管理路径信息
- 提供配置访问接口
6.2 LifecycleComponent 接口
类图
classDiagram
class LifecycleComponent {
<<interface>>
+start()
+stop()
+close()
+lifecycleState() State
}
class AbstractLifecycleComponent {
<<abstract>>
#Lifecycle lifecycle
+start()
+stop()
+close()
#doStart()
#doStop()
#doClose()
}
class TransportService {
+start()
+stop()
+close()
}
class ClusterService {
+start()
+stop()
+close()
}
class IndicesService {
+start()
+stop()
+close()
}
LifecycleComponent <|.. AbstractLifecycleComponent
AbstractLifecycleComponent <|-- TransportService
AbstractLifecycleComponent <|-- ClusterService
AbstractLifecycleComponent <|-- IndicesService
类说明
LifecycleComponent:
- 定义生命周期组件接口
- 所有需要生命周期管理的服务实现此接口
AbstractLifecycleComponent:
- 提供生命周期模板方法
- 管理状态转换
- 子类实现 doStart/doStop/doClose
6.3 Injector & Module
类图
classDiagram
class Injector {
<<Guice>>
+getInstance(Class~T~) T
+getInstance(Key~T~) T
}
class NodeModule {
<<abstract>>
+configure()
#bindService(Class, Class)
}
class ClusterModule {
+configure()
}
class TransportModule {
+configure()
}
class IndicesModule {
+configure()
}
Injector ..> NodeModule
NodeModule <|-- ClusterModule
NodeModule <|-- TransportModule
NodeModule <|-- IndicesModule
类说明
Injector (Guice):
- Google Guice 依赖注入容器
- 管理所有服务实例
- 解决依赖关系
NodeModule:
- 定义服务绑定
- 配置依赖关系
- 各模块定义自己的 Module
6.4 PluginsService
类图
classDiagram
class PluginsService {
-List~PluginInfo~ pluginsInfo
-List~Plugin~ plugins
-Map~Class, List~ pluginMap
+PluginsService(Settings, Path, Path, Path)
+getPluginInfos() List~PluginInfo~
+filterPlugins(Class~T~) List~T~
}
class Plugin {
<<interface>>
+createComponents(...) Collection~Object~
+getActions() List~ActionHandler~
+getRestHandlers(...) List~RestHandler~
+getSettings() List~Setting~
}
class PluginInfo {
-String name
-String description
-String version
-String classname
+getName() String
+getVersion() String
}
class PluginBundle {
-PluginDescriptor pluginDescriptor
-ClassLoader classLoader
-Set~URL~ urls
}
PluginsService --> Plugin
PluginsService --> PluginInfo
PluginsService --> PluginBundle
类说明
PluginsService:
- 管理所有插件
- 加载插件类
- 提供插件查询接口
Plugin:
- 插件接口
- 定义扩展点
- 插件通过实现接口扩展功能
PluginInfo:
- 插件元信息
- 从 plugin-descriptor.properties 读取
- 包含名称、版本、描述等
6.5 NodeEnvironment
类图
classDiagram
class NodeEnvironment {
-Settings settings
-Path[] nodePaths
-Lock[] locks
-NodeId nodeId
+NodeEnvironment(Settings, Environment)
+nodeDataPaths() Path[]
+nodeId() NodeId
+close()
}
class NodeId {
-String id
+NodeId(String)
+getId() String
}
class NodePath {
-Path path
-long totalSpace
-long freeSpace
+getPath() Path
+getTotalSpace() long
}
NodeEnvironment --> NodeId
NodeEnvironment --> NodePath
类说明
NodeEnvironment:
- 管理节点数据目录
- 加锁防止多个节点使用同一目录
- 生成或读取 NodeId
NodeId:
- 节点唯一标识
- 持久化到磁盘
- 重启后保持不变
6.6 ThreadPool
类图
classDiagram
class ThreadPool {
-Map~String, ExecutorHolder~ executors
-ScheduledThreadPoolExecutor scheduler
+ThreadPool(Settings)
+generic() ExecutorService
+executor(String) ExecutorService
+schedule(Runnable, TimeValue, String) ScheduledFuture
+shutdown()
}
class ExecutorHolder {
-ExecutorService executor
-ThreadPoolInfo info
+executor() ExecutorService
+info() ThreadPoolInfo
}
class ThreadPoolInfo {
-String name
-String type
-int size
-int queueSize
+getName() String
+getType() String
}
ThreadPool --> ExecutorHolder
ExecutorHolder --> ThreadPoolInfo
类说明
ThreadPool:
- 管理所有线程池
- 不同线程池用于不同任务类型
- 提供调度功能
线程池类型:
| 名称 | 类型 | 大小 | 队列 | 用途 |
|---|---|---|---|---|
| generic | cached | unbounded | - | 通用任务 |
| search | fixed | processors | 1000 | 搜索操作 |
| get | fixed | processors | 1000 | GET 操作 |
| write | fixed | processors | 10000 | 写入操作 |
| management | scaling | 5 | - | 集群管理 |
| refresh | scaling | processors/2 | - | Refresh 操作 |
| flush | scaling | processors/2 | - | Flush 操作 |
6.7 Settings
类图
classDiagram
class Settings {
-Map~String, String~ settings
+get(String) String
+getAsInt(String, int) int
+getAsBoolean(String, boolean) boolean
+builder() Builder
}
class Builder {
-Map~String, String~ map
+put(String, String) Builder
+put(String, int) Builder
+build() Settings
}
class Setting {
<<abstract>>
-String key
-Function parser
-T defaultValue
+get(Settings) T
+exists(Settings) boolean
}
Settings --> Builder
Settings ..> Setting
类说明
Settings:
- 不可变配置对象
- 存储所有配置项
- 提供类型安全的访问方法
Setting:
- 配置项定义
- 包含默认值和验证逻辑
- 类型安全
示例:
public static final Setting<Integer> HTTP_PORT =
Setting.intSetting("http.port", 9200, 1024, 65535, Property.NodeScope);
// 使用
int port = HTTP_PORT.get(settings);
7. 核心 API
7.1 Node API
Node 构造
方法签名:
public Node(Environment environment, PluginsLoader pluginsLoader)
参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
| environment | Environment | 环境配置对象 |
| pluginsLoader | PluginsLoader | 插件加载器 |
核心逻辑:
public Node(Environment environment, PluginsLoader pluginsLoader) {
// 1. 准备构造上下文
NodeConstruction construction = NodeConstruction.prepareConstruction(
environment,
pluginsLoader,
new NodeServiceProvider(),
true
);
// 2. 创建依赖注入容器
this.injector = construction.injector();
// 3. 获取核心组件
this.environment = injector.getInstance(Environment.class);
this.nodeEnvironment = injector.getInstance(NodeEnvironment.class);
this.pluginsService = injector.getInstance(PluginsService.class);
this.client = injector.getInstance(NodeClient.class);
// 4. 收集插件生命周期组件
this.pluginLifecycleComponents = construction.pluginLifecycleComponents();
}
start - 启动节点
方法签名:
public Node start() throws NodeValidationException
核心逻辑:
public Node start() throws NodeValidationException {
// 1. 状态检查
if (lifecycle.moveToStarted() == false) {
return this; // 已经启动
}
// 2. 启动插件组件
pluginLifecycleComponents.forEach(LifecycleComponent::start);
// 3. 启动核心服务(按依赖顺序)
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(SearchService.class).start();
injector.getInstance(FsHealthService.class).start();
injector.getInstance(MonitorService.class).start();
// 4. 启动集群服务
ClusterService clusterService = injector.getInstance(ClusterService.class);
NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
nodeConnectionsService.start();
clusterService.setNodeConnectionsService(nodeConnectionsService);
// 5. 启动协调器
Coordinator coordinator = injector.getInstance(Coordinator.class);
clusterService.getMasterService().setClusterStatePublisher(coordinator);
// 6. 启动传输服务
TransportService transportService = injector.getInstance(TransportService.class);
transportService.start();
transportService.acceptIncomingRequests();
// 7. 启动网关服务
injector.getInstance(GatewayService.class).start();
// 8. 启动 HTTP 服务器
injector.getInstance(HttpServerTransport.class).start();
// 9. 执行启动前验证
validateNodeBeforeAcceptingRequests();
return this;
}
stop - 停止节点
方法签名:
private void stop()
核心逻辑:
private void stop() {
// 1. 状态检查
if (lifecycle.moveToStopped() == false) {
return;
}
// 2. 停止 HTTP 服务器(停止接收新请求)
stopIfStarted(HttpServerTransport.class);
// 3. 停止快照和仓库服务
stopIfStarted(SnapshotsService.class);
stopIfStarted(SnapshotShardsService.class);
stopIfStarted(RepositoriesService.class);
// 4. 停止索引集群状态服务
stopIfStarted(IndicesClusterStateService.class);
// 5. 停止协调器(停止响应集群状态更新)
stopIfStarted(Coordinator.class);
// 6. 停止集群服务
stopIfStarted(ClusterService.class);
stopIfStarted(NodeConnectionsService.class);
// 7. 停止其他服务
stopIfStarted(FsHealthService.class);
stopIfStarted(MonitorService.class);
stopIfStarted(GatewayService.class);
stopIfStarted(SearchService.class);
stopIfStarted(TransportService.class);
// 8. 停止插件组件
pluginLifecycleComponents.forEach(Node::stopIfStarted);
// 9. 最后停止索引服务(等待资源释放)
stopIfStarted(IndicesService.class);
}
close - 关闭节点
方法签名:
@Override
public synchronized void close() throws IOException
核心逻辑:
@Override
public synchronized void close() throws IOException {
synchronized (lifecycle) {
// 1. 如果还在运行,先停止
if (lifecycle.started()) {
stop();
}
// 2. 状态检查
if (lifecycle.moveToClosed() == false) {
return;
}
}
// 3. 关闭所有组件(逆启动顺序)
List<Closeable> toClose = new ArrayList<>();
toClose.add(injector.getInstance(HttpServerTransport.class));
toClose.add(injector.getInstance(SnapshotsService.class));
toClose.add(injector.getInstance(IndicesClusterStateService.class));
toClose.add(injector.getInstance(IndicesService.class));
toClose.add(injector.getInstance(IndicesStore.class));
toClose.add(injector.getInstance(ClusterService.class));
toClose.add(injector.getInstance(NodeConnectionsService.class));
toClose.add(injector.getInstance(Coordinator.class));
toClose.add(injector.getInstance(MonitorService.class));
toClose.add(injector.getInstance(GatewayService.class));
toClose.add(injector.getInstance(SearchService.class));
toClose.add(injector.getInstance(TransportService.class));
// 4. 关闭插件
pluginLifecycleComponents.forEach(toClose::add);
pluginsService.filterPlugins(Plugin.class).forEach(toClose::add);
// 5. 关闭线程池
toClose.add(() -> injector.getInstance(ThreadPool.class).shutdown());
// 6. 关闭节点环境
toClose.add(injector.getInstance(NodeEnvironment.class));
// 7. 批量关闭
IOUtils.close(toClose);
}
7.2 Lifecycle API
Lifecycle 接口
public interface LifecycleComponent extends Closeable {
void start();
void stop();
void close() throws IOException;
Lifecycle.State lifecycleState();
}
Lifecycle 状态管理
public class Lifecycle {
private volatile State state = State.INITIALIZED;
public boolean moveToStarted() {
State localState = state;
if (localState == State.INITIALIZED || localState == State.STOPPED) {
state = State.STARTED;
return true;
}
return false;
}
public boolean moveToStopped() {
State localState = state;
if (localState == State.STARTED) {
state = State.STOPPED;
return true;
}
return false;
}
public boolean moveToClosed() {
State localState = state;
if (localState == State.CLOSED) {
return false;
}
state = State.CLOSED;
return true;
}
}
7.3 Environment API
Environment 构造
public Environment(Settings settings, Path configPath)
关键方法
public class Environment {
// 获取配置目录
public Path configDir();
// 获取数据目录
public Path[] dataFiles();
// 获取日志目录
public Path logsDir();
// 获取临时目录
public Path tmpDir();
// 获取节点设置
public Settings settings();
}
7.4 NodeClient API
NodeClient 是节点内部的客户端,用于执行操作。
public class NodeClient extends AbstractClient {
// 执行 Action
@Override
public <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
// 查找 TransportAction
TransportAction<Request, Response> transportAction = actions.get(action);
// 执行操作
transportAction.execute(null, request, listener);
}
}
7.5 PluginsService API
插件加载
public class PluginsService {
public PluginsService(
Settings settings,
Path configPath,
Path modulesPath,
Path pluginsPath
) {
// 1. 加载模块(内置插件)
List<PluginBundle> moduleBundles = loadBundles(modulesPath);
// 2. 加载插件
List<PluginBundle> pluginBundles = loadBundles(pluginsPath);
// 3. 初始化所有插件
this.plugins = new ArrayList<>();
for (PluginBundle bundle : allBundles) {
Plugin plugin = loadPlugin(bundle);
plugins.add(plugin);
}
}
}
插件扩展点
public interface Plugin {
// 创建自定义组件
default Collection<Object> createComponents(...) {
return Collections.emptyList();
}
// 注册 Action
default List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Collections.emptyList();
}
// 注册 REST Handler
default List<RestHandler> getRestHandlers(...) {
return Collections.emptyList();
}
// 自定义设置
default List<Setting<?>> getSettings() {
return Collections.emptyList();
}
}
7.6 Bootstrap Checks
Bootstrap 检查
public abstract class BootstrapCheck {
public abstract BootstrapCheckResult check(BootstrapContext context);
}
常见检查
| 检查项 | 说明 |
|---|---|
| HeapSizeCheck | 检查堆内存大小 |
| FileDescriptorCheck | 检查文件描述符限制 |
| MaxNumberOfThreadsCheck | 检查最大线程数 |
| MaxMapCountCheck | 检查 vm.max_map_count |
| ClientJvmCheck | 检查 JVM 类型 |
| OnErrorCheck | 检查 OnError 配置 |
| OnOutOfMemoryErrorCheck | 检查 OnOutOfMemoryError 配置 |
8. 核心流程时序图
8.1 节点启动完整流程
三阶段启动时序图
sequenceDiagram
autonumber
participant Main as main 方法
participant ES as Elasticsearch
participant Bootstrap as Bootstrap
participant Node as Node
participant Injector as Guice Injector
participant Services as Core Services
Note over Main,Services: Phase 1: 基础初始化
Main->>ES: main(args)
ES->>ES: initPhase1()
ES->>ES: initSecurityProperties()
ES->>ES: BootstrapInfo.init()
ES->>ES: 读取 ServerArgs
ES->>ES: 创建 Environment
ES->>ES: 配置日志系统
ES->>Bootstrap: new Bootstrap(args)
Bootstrap-->>ES: bootstrap
Note over Main,Services: Phase 2: 安全管理器初始化
ES->>Bootstrap: initPhase2(bootstrap)
Bootstrap->>Bootstrap: 创建完整 Environment
Bootstrap->>Bootstrap: 写入 PID 文件
Bootstrap->>Bootstrap: 设置异常处理器
Bootstrap->>Bootstrap: 初始化本地库 (JNA)
Bootstrap->>Bootstrap: 检查 Jar Hell
Bootstrap->>Bootstrap: 加载插件
Bootstrap->>Bootstrap: 配置安全管理器
Note over Main,Services: Phase 3: 节点构建与启动
ES->>Bootstrap: initPhase3(bootstrap)
Bootstrap->>Node: new Node(environment, pluginsLoader)
Node->>Node: prepareConstruction()
Node->>Injector: 创建 Guice Injector
Injector->>Injector: 注册所有 Module
Injector->>Injector: 创建所有服务实例
Injector-->>Node: injector
Node->>Node: 获取核心组件引用
Node-->>Bootstrap: node
Bootstrap->>Node: start()
Node->>Node: lifecycle.moveToStarted()
loop 启动所有服务
Node->>Services: start()
Services->>Services: 初始化资源
Services-->>Node: started
end
Node->>Node: validateNodeBeforeAcceptingRequests()
Node-->>Bootstrap: started
Bootstrap->>Bootstrap: sendCliMarker(SERVER_READY)
Bootstrap-->>Main: 节点准备就绪
时序图说明
Phase 1: 基础初始化 (步骤 1-9)
目标: 完成最基础的初始化
关键步骤:
- 安全属性初始化: 配置 Java 安全属性
- BootstrapInfo 初始化: 系统信息收集
- 读取启动参数: 从 CLI 进程读取配置
- 创建环境对象: 仅包含路径信息
- 配置日志: 必须在最后执行
注意事项:
- 单线程执行
- 尽量少的操作
- 日志配置必须最后
Phase 2: 安全管理器初始化 (步骤 10-17)
目标: 完成安全管理器配置前的所有准备
关键步骤:
- 创建完整环境: 包含 SecureSettings
- PID 文件: 防止重复启动
- 异常处理器: 全局异常处理
- 本地库初始化: JNA/JNI 库
- Jar Hell 检查: 防止类路径冲突
- 插件加载: 预加载所有插件
- 安全管理器: 可选配置
Phase 3: 节点构建与启动 (步骤 18-35)
目标: 构建并启动节点
关键步骤:
- 创建 Node: 构造节点对象
- Guice 容器: 创建依赖注入容器
- 注册服务: 注册所有服务到容器
- 启动服务: 按依赖顺序启动
- 验证节点: 执行 Bootstrap 检查
- 通知 CLI: 节点准备就绪
8.2 服务启动顺序
服务启动详细时序图
sequenceDiagram
autonumber
participant Node
participant Plugins as Plugin Components
participant IS as IndicesService
participant ICSS as IndicesClusterStateService
participant Snapshots as SnapshotsService
participant Search as SearchService
participant CS as ClusterService
participant TS as TransportService
participant Coord as Coordinator
participant GW as GatewayService
participant HTTP as HttpServerTransport
Node->>Node: lifecycle.moveToStarted()
Note over Node,HTTP: 步骤1: 启动插件组件
Node->>Plugins: forEach(start)
Plugins->>Plugins: 初始化插件资源
Plugins-->>Node: started
Note over Node,HTTP: 步骤2: 启动索引服务
Node->>IS: start()
IS->>IS: 启动索引管理器
IS-->>Node: started
Node->>ICSS: start()
ICSS->>ICSS: 启动集群状态监听
ICSS-->>Node: started
Note over Node,HTTP: 步骤3: 启动快照服务
Node->>Snapshots: start()
Snapshots->>Snapshots: 初始化快照管理
Snapshots-->>Node: started
Note over Node,HTTP: 步骤4: 启动搜索服务
Node->>Search: start()
Search->>Search: 初始化搜索线程池
Search-->>Node: started
Note over Node,HTTP: 步骤5: 启动集群服务
Node->>CS: start()
CS->>CS: 启动 MasterService
CS->>CS: 启动 ClusterApplierService
CS-->>Node: started
Note over Node,HTTP: 步骤6: 启动传输服务
Node->>TS: start()
TS->>TS: 绑定传输地址
TS->>TS: 启动 Netty Server
TS->>TS: acceptIncomingRequests()
TS-->>Node: started
Note over Node,HTTP: 步骤7: 启动协调器
Node->>Coord: start()
Coord->>Coord: 启动主节点选举
Coord->>Coord: 加入集群
Coord-->>Node: started
Note over Node,HTTP: 步骤8: 启动网关服务
Node->>GW: start()
GW->>GW: 恢复集群状态
GW-->>Node: started
Note over Node,HTTP: 步骤9: 启动 HTTP 服务
Node->>HTTP: start()
HTTP->>HTTP: 绑定 HTTP 地址
HTTP->>HTTP: 启动 Netty HTTP Server
HTTP-->>Node: started
Node->>Node: validateNodeBeforeAcceptingRequests()
时序图说明
启动顺序原因
1. 插件组件 (步骤 1-3):
- 最先启动插件生命周期组件
- 为后续服务提供扩展功能
2. 索引服务 (步骤 4-6):
- IndicesService: 管理所有索引
- IndicesClusterStateService: 监听集群状态变化
3. 快照服务 (步骤 7-9):
- SnapshotsService: 快照管理
- 依赖索引服务
4. 搜索服务 (步骤 10-12):
- SearchService: 执行搜索
- 初始化搜索线程池
5. 集群服务 (步骤 13-16):
- ClusterService: 集群状态管理
- 启动 Master Service 和 Applier Service
6. 传输服务 (步骤 17-21):
- TransportService: 节点间通信
- 绑定端口并接受连接
7. 协调器 (步骤 22-25):
- Coordinator: 主节点选举和集群协调
- 依赖 TransportService
8. 网关服务 (步骤 26-28):
- GatewayService: 恢复集群元数据
- 依赖所有前置服务
9. HTTP 服务 (步骤 29-32):
- HttpServerTransport: REST API
- 最后启动,此时节点已准备好接受请求
8.3 节点关闭流程
节点关闭时序图
sequenceDiagram
autonumber
participant Node
participant HTTP as HttpServerTransport
participant Coord as Coordinator
participant CS as ClusterService
participant TS as TransportService
participant IS as IndicesService
participant TP as ThreadPool
participant NE as NodeEnvironment
Node->>Node: close()
Note over Node,NE: 步骤1: 先停止再关闭
alt 节点还在运行
Node->>Node: stop()
end
Node->>Node: lifecycle.moveToClosed()
Note over Node,NE: 步骤2: 关闭 HTTP (停止接收新请求)
Node->>HTTP: close()
HTTP->>HTTP: unbind()
HTTP->>HTTP: 关闭所有连接
HTTP-->>Node: closed
Note over Node,NE: 步骤3: 关闭协调器
Node->>Coord: close()
Coord->>Coord: 停止主节点选举
Coord->>Coord: 离开集群
Coord-->>Node: closed
Note over Node,NE: 步骤4: 关闭集群服务
Node->>CS: close()
CS->>CS: 停止状态更新
CS-->>Node: closed
Note over Node,NE: 步骤5: 关闭传输服务
Node->>TS: close()
TS->>TS: 关闭所有连接
TS->>TS: 停止 Netty
TS-->>Node: closed
Note over Node,NE: 步骤6: 关闭索引服务
Node->>IS: close()
IS->>IS: 关闭所有索引
IS->>IS: 刷新数据到磁盘
IS-->>Node: closed
Note over Node,NE: 步骤7: 关闭线程池
Node->>TP: shutdown()
TP->>TP: 停止所有线程
TP->>TP: 等待任务完成
TP-->>Node: shutdown
Note over Node,NE: 步骤8: 关闭节点环境
Node->>NE: close()
NE->>NE: 释放文件锁
NE-->>Node: closed
时序图说明
关闭顺序(逆启动顺序)
1. HTTP 服务 (步骤 1-4):
- 停止接收新的 HTTP 请求
- 关闭现有连接
2. 协调器 (步骤 5-8):
- 停止主节点选举
- 通知其他节点离开集群
3. 集群服务 (步骤 9-11):
- 停止处理集群状态更新
- 保存最后的集群状态
4. 传输服务 (步骤 12-15):
- 关闭节点间连接
- 停止 Netty 服务器
5. 索引服务 (步骤 16-19):
- 关闭所有索引
- Flush 数据到磁盘
- 等待所有操作完成
6. 线程池 (步骤 20-23):
- 停止接收新任务
- 等待现有任务完成
- 关闭所有线程
7. 节点环境 (步骤 24-26):
- 释放目录锁
- 清理临时文件
8.4 插件加载流程
插件加载时序图
sequenceDiagram
autonumber
participant Bootstrap
participant PL as PluginsLoader
participant PS as PluginsService
participant Plugin
participant Injector as Guice Injector
Bootstrap->>PL: createPluginsLoader(env)
PL->>PL: 扫描 plugins 目录
PL->>PL: 扫描 modules 目录
loop 每个插件/模块
PL->>PL: 读取 plugin-descriptor.properties
PL->>PL: 创建 ClassLoader
PL->>PL: 创建 PluginBundle
end
PL-->>Bootstrap: pluginsLoader
Bootstrap->>PS: new PluginsService(settings, pluginsLoader)
loop 每个 PluginBundle
PS->>PS: 加载插件类
PS->>Plugin: 实例化插件
Plugin-->>PS: plugin instance
PS->>PS: 收集扩展点
end
PS-->>Bootstrap: pluginsService
Note over Bootstrap,Injector: 构建依赖注入容器
Bootstrap->>Injector: createInjector(...)
loop 每个插件
Injector->>Plugin: createComponents(...)
Plugin-->>Injector: custom components
Injector->>Plugin: getActions()
Plugin-->>Injector: action handlers
Injector->>Plugin: getRestHandlers(...)
Plugin-->>Injector: REST handlers
end
Injector-->>Bootstrap: injector with plugins
时序图说明
插件加载步骤
1. 扫描目录 (步骤 1-7):
- 扫描
plugins/目录(用户插件) - 扫描
modules/目录(内置模块) - 读取每个插件的描述文件
2. 创建 ClassLoader (步骤 3-6):
- 为每个插件创建独立的 ClassLoader
- 实现插件隔离
- 避免类冲突
3. 实例化插件 (步骤 10-14):
- 加载插件主类
- 调用插件构造函数
- 收集插件扩展点
4. 注册扩展 (步骤 17-25):
- 创建自定义组件
- 注册 Action 处理器
- 注册 REST 处理器
- 注册自定义设置
8.5 HTTP索引请求完整处理链路
索引请求端到端时序图
sequenceDiagram
autonumber
participant Client as HTTP Client
participant Netty as Netty HTTP Server
participant RC as RestController
participant RH as RestIndexAction<br/>(RestHandler)
participant AM as ActionModule
participant TA as TransportIndexAction
participant Primary as Primary Shard Node
participant IS as IndicesService
participant Shard as IndexShard
participant Engine as InternalEngine
participant Lucene as Lucene
participant TL as Translog
participant Replica as Replica Shards
Note over Client,Replica: 步骤1: HTTP请求接收与路由
Client->>Netty: POST /my-index/_doc/1<br/>{\"field\":\"value\"}
Netty->>Netty: 解析HTTP请求
Netty->>RC: dispatchRequest(httpRequest)
RC->>RC: 路由匹配<br/>POST /{index}/_doc/{id}
RC->>RH: handleRequest(request, channel)
Note over Client,Replica: 步骤2: REST层处理与参数解析
RH->>RH: 解析路径参数<br/>(index, id)
RH->>RH: 解析查询参数<br/>(refresh, timeout, routing)
RH->>RH: 解析请求体<br/>(JSON document)
RH->>RH: 构造IndexRequest
RH->>AM: prepareIndex(IndexRequest)
Note over Client,Replica: 步骤3: Action层处理
AM->>TA: execute(IndexRequest, ActionListener)
TA->>TA: 解析路由信息<br/>routing = id (默认)
TA->>TA: 计算分片<br/>shardId = hash(routing) % num_shards
TA->>TA: 获取集群状态<br/>查找Primary Shard位置
alt 本地节点是Primary
TA->>Primary: 本地调用
else 远程节点是Primary
TA->>Primary: TransportService.sendRequest()
end
Note over Client,Replica: 步骤4: Primary Shard处理
Primary->>IS: index(IndexRequest)
IS->>Shard: index(IndexRequest)
Shard->>Shard: 前置检查<br/>(分片状态, 版本)
Shard->>Shard: 分配 SequenceNumber
Shard->>Shard: 分配 primaryTerm
Note over Client,Replica: 步骤5: Engine层处理
Shard->>Engine: index(Index operation)
Engine->>Engine: 获取UID锁<br/>versionMap.acquireLock(uid)
Engine->>Engine: 版本冲突检测<br/>checkVersionConflict()
alt 版本冲突
Engine-->>Shard: VersionConflictException
Shard-->>TA: 409 Conflict
TA-->>Client: 409 Conflict
end
Note over Client,Replica: 步骤6: 写入Lucene和Translog
alt 新文档
Engine->>Lucene: addDocument(doc)
else 更新文档
Engine->>Lucene: updateDocument(term, doc)
end
Lucene->>Lucene: 写入内存缓冲<br/>(IndexWriter buffer)
Lucene-->>Engine: success
Engine->>TL: add(IndexOperation)
TL->>TL: 序列化操作
TL->>TL: 追加到当前generation
alt translog.durability = REQUEST
TL->>TL: fsync() 立即刷盘
else translog.durability = ASYNC
TL->>TL: 异步刷盘(5秒)
end
TL-->>Engine: Location
Engine->>Engine: 更新LiveVersionMap<br/>记录版本和Location
Engine->>Engine: 释放UID锁
Engine-->>Shard: IndexResult
Note over Client,Replica: 步骤7: 副本复制
par 并行复制到所有副本
Shard->>Replica: ReplicationRequest<br/>(doc, seqNo, primaryTerm)
Replica->>Replica: 应用操作<br/>(使用相同seqNo)
Replica->>Replica: 写入Engine
Replica->>Replica: 写入Translog
Replica-->>Shard: ACK
end
Shard->>Shard: 等待所有副本确认<br/>(或部分确认)
Note over Client,Replica: 步骤8: 返回响应
Shard-->>IS: IndexResult
IS-->>Primary: IndexResult
Primary-->>TA: IndexResponse
TA->>TA: 构造HTTP响应
TA->>RC: IndexResponse
RC->>Netty: HTTP 201 Created
Netty-->>Client: {"_id":"1","_version":1,"result":"created"}
时序图详细说明
步骤1: HTTP请求接收与路由 (步骤1-4)
Netty HTTP Server:
// Netty4HttpServerTransport
public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
protected void dispatchRequest(HttpRequest request, HttpChannel channel) {
// 解析HTTP方法和路径
RestRequest restRequest = new RestRequest(request, channel);
// 交给RestController处理
restController.dispatchRequest(restRequest, channel, threadContext);
}
}
RestController路由:
public class RestController implements HttpServerTransport.Dispatcher {
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
// 1. 查找匹配的Handler
RestHandler handler = handlerMap.get(request.method(), request.path());
// 2. 执行Handler
handler.handleRequest(request, channel, client);
}
}
关键点:
- Netty负责HTTP协议处理
- RestController负责路由匹配
- 支持路径参数(如
/my-index/_doc/1)
步骤2: REST层处理与参数解析 (步骤5-9)
RestIndexAction:
public class RestIndexAction extends BaseRestHandler {
@Override
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
// 1. 解析路径参数
String index = request.param("index");
String id = request.param("id");
// 2. 解析查询参数
String routing = request.param("routing");
boolean refresh = request.paramAsBoolean("refresh", false);
TimeValue timeout = request.paramAsTime("timeout", TimeValue.timeValueSeconds(60));
// 3. 解析请求体
BytesReference source = request.requiredContent();
// 4. 构造IndexRequest
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.id(id);
indexRequest.routing(routing);
indexRequest.source(source, request.getXContentType());
indexRequest.timeout(timeout);
// 5. 执行请求
return channel -> client.index(indexRequest, new RestToXContentListener<>(channel));
}
}
关键点:
- 解析路径、查询参数和请求体
- 构造ActionRequest
- 设置超时、刷新等选项
步骤3: Action层处理 (步骤10-17)
TransportIndexAction路由计算:
public class TransportIndexAction extends TransportReplicationAction<IndexRequest, IndexRequest, IndexResponse> {
@Override
protected void doExecute(Task task, IndexRequest request, ActionListener<IndexResponse> listener) {
// 1. 解析路由键
String routing = request.routing() != null ? request.routing() : request.id();
// 2. 计算分片ID
ClusterState clusterState = clusterService.state();
IndexMetadata indexMetadata = clusterState.metadata().index(request.index());
int shardId = routing.hashCode() % indexMetadata.getNumberOfShards();
// 3. 查找Primary Shard位置
ShardRouting primaryShard = clusterState.routingTable()
.shardRoutingTable(request.index(), shardId)
.primaryShard();
String nodeId = primaryShard.currentNodeId();
// 4. 转发到Primary Shard节点
if (clusterService.localNode().getId().equals(nodeId)) {
// 本地执行
shardOperationOnPrimary(request, primary, listener);
} else {
// 远程执行
transportService.sendRequest(
clusterState.nodes().get(nodeId),
"indices:data/write/index[p]",
request,
new TransportResponseHandler<IndexResponse>() {
// 处理响应
}
);
}
}
}
关键点:
- 路由计算:
hash(routing) % num_primary_shards - 查找Primary Shard位置
- 本地或远程执行
步骤4: Primary Shard处理 (步骤18-23)
IndexShard:
public class IndexShard extends AbstractIndexShardComponent {
public Engine.IndexResult index(Engine.Index index) {
// 1. 前置检查
ensureWriteAllowed(index);
verifyNotClosed();
// 2. 分配序列号
long seqNo = seqNoStats().getMaxSeqNo() + 1;
long primaryTerm = this.primaryTerm;
// 3. 更新操作
index.seqNo(seqNo);
index.primaryTerm(primaryTerm);
// 4. 执行写入
Engine.IndexResult result = engine.index(index);
return result;
}
}
关键点:
- 分片状态检查
- SequenceNumber分配(全局唯一递增)
- PrimaryTerm记录(主分片任期)
步骤5: Engine层处理 (步骤24-34)
InternalEngine版本控制:
public class InternalEngine extends Engine {
@Override
public IndexResult index(Index index) {
// 1. 获取UID锁
try (Releasable ignored = versionMap.acquireLock(index.uid())) {
// 2. 获取当前版本
VersionValue versionValue = versionMap.getUnderLock(index.uid());
long currentVersion = versionValue != null ? versionValue.version : Versions.NOT_FOUND;
// 3. 版本冲突检测
if (index.versionType().isVersionConflictForWrites(
currentVersion, index.version(), index.versionType())) {
return new IndexResult(
new VersionConflictEngineException(...)
);
}
// 4. 计算新版本
long newVersion = index.versionType().updateVersion(currentVersion, index.version());
// 5. 写入Lucene和Translog
// ...
}
}
}
关键点:
- UID级别锁(同一文档串行化)
- 版本冲突检测
- 支持多种版本类型(INTERNAL, EXTERNAL, EXTERNAL_GTE)
步骤6: 写入Lucene和Translog (步骤35-50)
Lucene写入:
// 新文档
if (plan.useLuceneUpdateDocument == false) {
indexWriter.addDocument(index.docs());
} else {
// 更新文档(先删除后添加)
indexWriter.updateDocument(
new Term(IdFieldMapper.NAME, index.uid()),
index.docs()
);
}
Translog写入:
// 写入Translog
Translog.Location location = translog.add(new Translog.Index(index));
// 根据持久化策略刷盘
if (translog.getDurability() == Translog.Durability.REQUEST) {
translog.sync(); // 同步刷盘
} else {
// ASYNC模式,异步刷盘(默认5秒)
}
LiveVersionMap更新:
versionMap.putIndexUnderLock(
index.uid(),
new IndexVersionValue(
location, // Translog位置
newVersion, // 新版本号
index.seqNo(), // 序列号
index.primaryTerm() // 主分片任期
)
);
关键点:
- Lucene写入内存缓冲(还不可搜索)
- Translog保证持久性
- LiveVersionMap记录版本和位置(用于实时GET)
步骤7: 副本复制 (步骤51-57)
副本复制流程:
// 并行复制到所有副本
for (ShardRouting replicaShard : replicaShards) {
transportService.sendRequest(
clusterState.nodes().get(replicaShard.currentNodeId()),
"indices:data/write/index[r]", // 副本操作
new ReplicaRequest(indexRequest, seqNo, primaryTerm),
new TransportResponseHandler<ReplicaResponse>() {
@Override
public void handleResponse(ReplicaResponse response) {
// 副本确认
replicaResponses.add(response);
checkAllReplicasResponded();
}
}
);
}
副本应用:
// 副本节点收到复制请求
public void indexReplica(ReplicaRequest request) {
// 使用Primary分配的seqNo
Index operation = new Index(...);
operation.seqNo(request.seqNo());
operation.primaryTerm(request.primaryTerm());
// 执行写入(无需版本检查)
engine.index(operation);
}
关键点:
- 并行复制到所有副本
- 使用相同seqNo保证顺序
- 等待多数确认或全部确认
步骤8: 返回响应 (步骤58-64)
构造响应:
IndexResponse response = new IndexResponse(
shardId,
indexRequest.id(),
result.getSeqNo(),
result.getPrimaryTerm(),
result.getVersion(),
result.isCreated() // true表示新建,false表示更新
);
// 返回JSON
{
"_index": "my-index",
"_id": "1",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"result": "created" // or "updated"
}
8.6 HTTP搜索请求完整处理链路
搜索请求端到端时序图
sequenceDiagram
autonumber
participant Client as HTTP Client
participant Netty as Netty HTTP Server
participant RC as RestController
participant RSA as RestSearchAction<br/>(RestHandler)
participant TSA as TransportSearchAction
participant Coord as Coordinating Node
participant SS as SearchService
participant Shard1 as Shard 1
participant Shard2 as Shard 2
participant Engine as InternalEngine
participant Lucene as Lucene Searcher
Note over Client,Lucene: 步骤1: HTTP请求接收
Client->>Netty: POST /my-index/_search<br/>{\"query\":{\"match\":{\"field\":\"value\"}}}
Netty->>RC: dispatchRequest()
RC->>RSA: handleRequest()
Note over Client,Lucene: 步骤2: 解析搜索请求
RSA->>RSA: 解析查询参数<br/>(size, from, timeout)
RSA->>RSA: 解析请求体<br/>(query DSL)
RSA->>RSA: 构造SearchRequest
RSA->>TSA: execute(SearchRequest)
Note over Client,Lucene: 步骤3: 搜索协调
TSA->>Coord: 接管请求(当前节点作为协调节点)
Coord->>Coord: 获取集群状态
Coord->>Coord: 确定需要搜索的分片<br/>(primary或replica)
Coord->>Coord: 为每个分片选择节点<br/>(自适应副本选择)
Note over Client,Lucene: 步骤4: Query Phase (查询阶段)
par 并行查询所有分片
Coord->>Shard1: SearchShardRequest<br/>(query, size+from)
Shard1->>SS: executeQueryPhase()
SS->>Engine: acquireSearcher()
Engine-->>SS: IndexSearcher
SS->>Lucene: search(query, size+from)
Lucene->>Lucene: 执行查询<br/>构建倒排索引
Lucene->>Lucene: 计算相关性评分
Lucene-->>SS: TopDocs(docIds, scores)
SS->>SS: 收集聚合数据
SS-->>Shard1: QueryResult
Shard1-->>Coord: QuerySearchResult<br/>(docIds, scores, aggs)
and
Coord->>Shard2: SearchShardRequest
Shard2->>SS: executeQueryPhase()
SS->>Engine: acquireSearcher()
Engine-->>SS: IndexSearcher
SS->>Lucene: search(query, size+from)
Lucene-->>SS: TopDocs
SS-->>Shard2: QueryResult
Shard2-->>Coord: QuerySearchResult
end
Note over Client,Lucene: 步骤5: 协调节点归并结果
Coord->>Coord: 归并所有分片结果<br/>(merge sort)
Coord->>Coord: 取Top size个结果
Coord->>Coord: 确定需要获取的文档<br/>(shardId, docId)列表
Note over Client,Lucene: 步骤6: Fetch Phase (获取阶段)
par 并行获取文档内容
Coord->>Shard1: FetchRequest<br/>(docIds[])
Shard1->>SS: executeFetchPhase()
SS->>Engine: acquireSearcher()
Engine-->>SS: IndexSearcher
loop 每个docId
SS->>Lucene: document(docId)
Lucene-->>SS: Document
end
SS->>SS: 应用source过滤
SS->>SS: 应用高亮
SS-->>Shard1: FetchResult
Shard1-->>Coord: FetchSearchResult<br/>(documents[])
and
Coord->>Shard2: FetchRequest
Shard2->>SS: executeFetchPhase()
SS->>Engine: acquireSearcher()
loop 每个docId
SS->>Lucene: document(docId)
end
SS-->>Coord: FetchSearchResult
end
Note over Client,Lucene: 步骤7: 组装最终结果
Coord->>Coord: 组装hits数组<br/>(合并文档内容和评分)
Coord->>Coord: 组装aggregations
Coord->>Coord: 计算总耗时
Coord-->>TSA: SearchResponse
TSA-->>RSA: SearchResponse
RSA->>RC: 构造HTTP响应
RC->>Netty: HTTP 200 OK
Netty-->>Client: {\"hits\":{...},\"aggregations\":{...}}
时序图详细说明
步骤1-2: HTTP请求接收与解析 (步骤1-7)
RestSearchAction:
public class RestSearchAction extends BaseRestHandler {
@Override
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
// 1. 解析路径参数
String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
// 2. 解析查询参数
int size = request.paramAsInt("size", 10);
int from = request.paramAsInt("from", 0);
TimeValue timeout = request.paramAsTime("timeout", null);
String routing = request.param("routing");
// 3. 解析请求体(Query DSL)
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
if (request.hasContent()) {
XContentParser parser = request.contentParser();
searchSourceBuilder.parseXContent(parser);
}
// 应用查询参数
searchSourceBuilder.size(size);
searchSourceBuilder.from(from);
searchSourceBuilder.timeout(timeout);
// 4. 构造SearchRequest
SearchRequest searchRequest = new SearchRequest(indices);
searchRequest.source(searchSourceBuilder);
searchRequest.routing(routing);
// 5. 执行搜索
return channel -> client.search(searchRequest, new RestToXContentListener<>(channel));
}
}
关键点:
- 支持多索引搜索
- Query DSL解析
- 设置size、from、timeout等参数
步骤3: 搜索协调 (步骤8-12)
TransportSearchAction:
public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
@Override
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
// 1. 获取集群状态
ClusterState clusterState = clusterService.state();
// 2. 确定需要搜索的分片
GroupShardsIterator<ShardIterator> shardIterators =
clusterService.operationRouting()
.searchShards(clusterState, searchRequest.indices(), searchRequest.routing());
// 3. 为每个分片选择节点(自适应副本选择)
Map<SearchShardTarget, ShardRouting> targetToRouting = new HashMap<>();
for (ShardIterator shardIt : shardIterators) {
ShardRouting shardRouting = selectShard(shardIt);
SearchShardTarget target = new SearchShardTarget(
shardRouting.currentNodeId(),
shardRouting.shardId(),
shardRouting.getIndexName()
);
targetToRouting.put(target, shardRouting);
}
// 4. 执行搜索
executeSearch(targetToRouting, searchRequest, listener);
}
// 自适应副本选择
private ShardRouting selectShard(ShardIterator shardIt) {
// 选择响应最快的副本
return adaptiveReplicaSelection.select(shardIt);
}
}
关键点:
- 确定需要搜索的分片(可能跨多个索引)
- 自适应副本选择(选择最快的副本)
- 当前节点作为协调节点
步骤4: Query Phase (步骤13-28)
SearchService执行查询:
public class SearchService {
public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<QuerySearchResult> listener) {
// 1. 创建SearchContext
SearchContext context = createContext(request);
try {
// 2. 获取Searcher
Engine.Searcher searcher = context.searcher();
// 3. 执行查询
QueryPhase.execute(context);
// 4. 构造结果
QuerySearchResult result = context.queryResult();
listener.onResponse(result);
} finally {
// 清理资源
cleanContext(context);
}
}
}
QueryPhase执行:
public class QueryPhase {
public static void execute(SearchContext searchContext) {
// 1. 构建Lucene Query
Query query = searchContext.query();
// 2. 执行搜索
IndexSearcher searcher = searchContext.searcher().searcher();
TopDocs topDocs = searcher.search(
query,
searchContext.from() + searchContext.size() // 需要返回from+size个结果
);
// 3. 保存结果(仅docId和score)
searchContext.queryResult().topDocs(
new TopDocsAndMaxScore(topDocs, topDocs.scoreDocs.length > 0 ? topDocs.scoreDocs[0].score : Float.NaN),
new DocValueFormat[0]
);
// 4. 执行聚合
if (searchContext.aggregations() != null) {
searchContext.aggregations().execute(searchContext);
}
}
}
关键点:
- Query Phase仅返回docId和score
- 每个分片返回from+size个结果
- 同时执行聚合计算
步骤5: 协调节点归并结果 (步骤29-32)
归并排序:
public class SearchPhaseController {
public ScoreDoc[] sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results, int numShards, int topN) {
// 1. 收集所有分片的TopDocs
List<ScoreDoc> allDocs = new ArrayList<>();
for (SearchPhaseResult result : results) {
TopDocsAndMaxScore topDocs = result.queryResult().topDocs();
for (ScoreDoc doc : topDocs.topDocs.scoreDocs) {
allDocs.add(doc);
}
}
// 2. 归并排序
Collections.sort(allDocs, (a, b) -> {
// 按score降序排列
int cmp = Float.compare(b.score, a.score);
if (cmp == 0) {
// score相同时按docId排序
return Integer.compare(a.doc, b.doc);
}
return cmp;
});
// 3. 取Top N
int from = ignoreFrom ? 0 : searchRequest.from();
int size = Math.min(topN - from, allDocs.size() - from);
return allDocs.subList(from, from + size).toArray(new ScoreDoc[0]);
}
}
关键点:
- 归并N个分片的结果
- 全局排序取Top size
- 深度分页性能问题(需要from+size个结果)
步骤6: Fetch Phase (步骤33-48)
FetchPhase执行:
public class FetchPhase {
public void execute(SearchContext context) {
// 1. 获取需要fetch的docId列表
IntArrayList docIdsToLoad = new IntArrayList();
ScoreDoc[] hits = context.docIdsToLoad();
for (ScoreDoc hit : hits) {
docIdsToLoad.add(hit.doc);
}
// 2. 批量获取文档
IndexSearcher searcher = context.searcher().searcher();
StoredFields storedFields = searcher.storedFields();
SearchHit[] searchHits = new SearchHit[hits.length];
for (int i = 0; i < hits.length; i++) {
int docId = hits[i].doc;
// 读取文档
Document doc = storedFields.document(docId);
// 构造SearchHit
SearchHit searchHit = new SearchHit(docId);
searchHit.score(hits[i].score);
// 提取_source字段
BytesReference source = doc.getBinaryValue("_source");
searchHit.sourceRef(source);
// 应用source过滤
if (context.fetchSourceContext() != null) {
source = applySourceFiltering(source, context.fetchSourceContext());
searchHit.sourceRef(source);
}
// 应用高亮
if (context.highlight() != null) {
Map<String, HighlightField> highlightFields =
highlighter.highlight(context, docId);
searchHit.highlightFields(highlightFields);
}
searchHits[i] = searchHit;
}
// 3. 保存结果
context.fetchResult().hits(new SearchHits(searchHits, context.queryResult().getTotalHits(), context.queryResult().getMaxScore()));
}
}
关键点:
- 仅获取Top结果的完整文档
- 支持source过滤(减少网络传输)
- 支持高亮显示
步骤7: 组装最终结果 (步骤49-54)
构造SearchResponse:
SearchHit[] hits = mergeFetchResults(fetchResults);
Aggregations aggregations = reduceAggs(queryResults);
SearchResponse response = new SearchResponse(
new InternalSearchResponse(
new SearchHits(
hits,
totalHits,
maxScore
),
aggregations,
suggest,
timedOut,
terminatedEarly
),
scrollId,
totalShards,
successfulShards,
skippedShards,
tookInMillis,
shardFailures
);
// JSON响应
{
"took": 5,
"timed_out": false,
"_shards": {
"total": 2,
"successful": 2,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 100,
"relation": "eq"
},
"max_score": 1.5,
"hits": [
{
"_index": "my-index",
"_id": "1",
"_score": 1.5,
"_source": {
"field": "value"
}
}
]
},
"aggregations": {
...
}
}
9. 关键配置
9.1 节点配置
| 配置项 | 默认值 | 说明 |
|---|---|---|
| node.name | 随机生成 | 节点名称 |
| node.roles | [master, data, ingest] | 节点角色 |
| path.data | data/ | 数据目录 |
| path.logs | logs/ | 日志目录 |
| cluster.name | elasticsearch | 集群名称 |
| network.host | localhost | 绑定地址 |
| http.port | 9200-9300 | HTTP 端口 |
| transport.port | 9300-9400 | 传输层端口 |
9.2 JVM 配置
| 配置项 | 推荐值 | 说明 |
|---|---|---|
| -Xms | 50% 物理内存 | 堆内存初始大小 |
| -Xmx | 50% 物理内存 | 堆内存最大大小 |
| -XX:+UseG1GC | - | 使用 G1 垃圾回收器 |
| -Xlog:gc* | - | GC 日志 |
10. 监控与可观测
10.1 节点状态 API
GET /_nodes/_local
{
"nodes": {
"node_id": {
"name": "node-1",
"version": "8.10.0",
"roles": ["master", "data", "ingest"],
"jvm": {
"version": "21.0.0",
"mem": {
"heap_used_in_bytes": 1073741824,
"heap_max_in_bytes": 2147483648
}
},
"process": {
"open_file_descriptors": 512,
"max_file_descriptors": 65536
}
}
}
}
10.2 关键监控指标
JVM 指标:
- 堆内存使用率
- GC 频率和耗时
- 线程数
系统指标:
- CPU 使用率
- 文件描述符
- 磁盘 I/O
Elasticsearch 指标:
- 索引吞吐量
- 搜索延迟
- 拒绝请求数