Elasticsearch-01-Server核心

本文档提供 Server 核心模块的全面剖析,包括节点启动流程、生命周期管理、服务管理、插件系统、核心数据结构、API 详细规格和关键流程时序图。


1. 模块职责

Server 核心模块是 Elasticsearch 的基础,负责节点的启动、运行和关闭全生命周期管理。

1.1 核心职责

  1. 节点启动(Bootstrap)

    • 三阶段启动流程
    • 环境初始化
    • 安全管理器配置
    • 插件加载
  2. 生命周期管理

    • 节点启动(start)
    • 节点停止(stop)
    • 节点关闭(close)
    • 优雅关闭
  3. 服务管理

    • 依赖注入(Guice)
    • 服务注册与发现
    • 服务启动顺序控制
    • 服务健康检查
  4. 插件系统

    • 插件加载机制
    • 插件生命周期管理
    • 插件扩展点
    • 插件隔离

1.2 核心组件

上游依赖:

  • JVM 环境
  • 文件系统
  • 网络栈

下游依赖者:

  • 所有 Elasticsearch 模块
  • 插件

2. 模块架构

2.1 整体服务架构图

flowchart TB
    subgraph "客户端层"
        HTTPClient[HTTP/REST 客户端]
        TCPClient[Transport 客户端]
    end

    subgraph "网关层 (REST Layer)"
        RestController[RestController<br/>路由控制器]
        RestHandler[RestHandler<br/>请求处理器]
        RestRequest[RestRequest<br/>请求封装]
    end

    subgraph "动作处理层 (Action Layer)"
        ActionModule[ActionModule<br/>动作模块]
        TransportAction[TransportAction<br/>传输动作]
        ActionRequest[ActionRequest<br/>请求对象]
    end

    subgraph "节点核心 (Node Core)"
        Node[Node<br/>节点实例]
        Injector[Guice Injector<br/>依赖注入]
        Lifecycle[Lifecycle<br/>生命周期管理]
        NodeClient[NodeClient<br/>节点客户端]
    end

    subgraph "核心业务层 (Business Layer)"
        IS[IndicesService<br/>索引服务]
        SS[SearchService<br/>搜索服务]
        CS[ClusterService<br/>集群服务]
        ICSS[IndicesClusterStateService<br/>索引集群状态服务]
    end

    subgraph "传输层 (Transport Layer)"
        TS[TransportService<br/>传输服务]
        TCP[TcpTransport<br/>TCP传输]
        Netty[Netty<br/>网络框架]
    end

    subgraph "存储层 (Storage Layer)"
        Shard[IndexShard<br/>索引分片]
        Engine[InternalEngine<br/>存储引擎]
        Lucene[Lucene<br/>搜索库]
        Translog[Translog<br/>事务日志]
    end

    subgraph "支持组件层 (Infrastructure)"
        TP[ThreadPool<br/>线程池]
        Monitor[MonitorService<br/>监控服务]
        Gateway[GatewayService<br/>网关持久化]
        Plugins[PluginsService<br/>插件服务]
    end

    HTTPClient --> RestController
    TCPClient --> TS

    RestController --> RestHandler
    RestHandler --> RestRequest
    RestRequest --> ActionModule

    ActionModule --> TransportAction
    TransportAction --> ActionRequest
    ActionRequest --> NodeClient

    NodeClient --> Injector
    Injector --> IS
    Injector --> SS
    Injector --> CS

    IS --> Shard
    SS --> Shard
    Shard --> Engine
    Engine --> Lucene
    Engine --> Translog

    TransportAction --> TS
    TS --> TCP
    TCP --> Netty

    IS --> TP
    SS --> TP
    CS --> Gateway
    Node --> Plugins

    style HTTPClient fill:#e1f5ff
    style RestController fill:#fff4e1
    style ActionModule fill:#f5e1ff
    style Node fill:#ffe1f5
    style IS fill:#e1ffe1
    style TS fill:#f5f5e1
    style Engine fill:#ffe1e1
    style TP fill:#e1e1f5

2.2 架构层次说明

客户端层

HTTP/REST客户端: 通过HTTP协议访问Elasticsearch的REST API Transport客户端: 使用Elasticsearch内部协议直接与节点通信(已废弃)

网关层 (REST Layer)

RestController:

  • 路由HTTP请求到对应的RestHandler
  • 管理所有REST API路由
  • 请求预处理和响应后处理

RestHandler:

  • 处理具体的REST请求
  • 解析HTTP参数
  • 构造ActionRequest并转发到Action层

RestRequest:

  • 封装HTTP请求
  • 提供参数访问接口
  • 处理请求体解析

动作处理层 (Action Layer)

ActionModule:

  • 注册所有Action和对应的TransportAction
  • 管理Action与Handler的映射关系
  • Action生命周期管理

TransportAction:

  • 执行具体的业务逻辑
  • 协调多个服务完成操作
  • 处理异步回调

ActionRequest:

  • 封装操作请求
  • 定义请求参数
  • 支持序列化和反序列化

节点核心 (Node Core)

Node:

  • 封装所有服务和组件
  • 管理节点生命周期(启动/停止/关闭)
  • 协调服务启动顺序

Guice Injector:

  • 依赖注入容器
  • 管理所有服务实例
  • 解决服务间依赖关系

Lifecycle:

  • 跟踪生命周期状态
  • 状态转换控制(Initialized→Started→Stopped→Closed)
  • 防止非法状态转换

NodeClient:

  • 节点内部客户端
  • 执行本地操作
  • 直接调用TransportAction

核心业务层 (Business Layer)

IndicesService:

  • 管理所有索引
  • 创建/删除/打开/关闭索引
  • 管理IndexShard实例

SearchService:

  • 执行搜索请求
  • 管理搜索上下文
  • 协调Query Phase和Fetch Phase

ClusterService:

  • 管理集群状态
  • 处理集群状态更新
  • 发布集群状态变更

IndicesClusterStateService:

  • 监听集群状态变化
  • 根据路由表创建/删除分片
  • 同步索引元数据

传输层 (Transport Layer)

TransportService:

  • 节点间通信服务
  • 管理请求/响应
  • 处理连接管理

TcpTransport:

  • TCP连接管理
  • 消息序列化/反序列化
  • 网络I/O处理

Netty:

  • 底层网络框架
  • 异步I/O
  • 高性能通信

存储层 (Storage Layer)

IndexShard:

  • 索引分片实例
  • 管理单个分片的读写
  • 协调Engine操作

InternalEngine:

  • 存储引擎实现
  • 封装Lucene操作
  • 管理Translog

Lucene:

  • Apache Lucene搜索库
  • 倒排索引
  • 全文检索

Translog:

  • 事务日志
  • 保证数据持久性
  • 支持故障恢复

支持组件层 (Infrastructure)

ThreadPool:

  • 线程池管理
  • 不同类型任务使用不同线程池
  • 提供调度功能

MonitorService:

  • 系统监控
  • JVM监控
  • 磁盘监控

GatewayService:

  • 集群元数据持久化
  • 恢复集群状态
  • 管理索引元数据

PluginsService:

  • 插件加载
  • 插件生命周期管理
  • 扩展点管理

2.3 模块间交互关系

同步调用链路

flowchart LR
    Client[客户端] --> REST[REST Layer]
    REST --> Action[Action Layer]
    Action --> Business[Business Layer]
    Business --> Storage[Storage Layer]
    Storage --> Lucene[Lucene]

说明: REST API请求的同步调用链路,每层依次调用下一层。

异步消息传递

flowchart LR
    Cluster[ClusterService] -->|发布状态| Transport[TransportService]
    Transport -->|广播| Nodes[所有节点]
    Nodes -->|应用状态| ICSS[IndicesClusterStateService]

说明: 集群状态变更通过Transport层异步广播到所有节点。

事件监听

flowchart TB
    ClusterService[ClusterService] -->|状态变更事件| Listeners[ClusterStateListener]
    Listeners --> ICSS[IndicesClusterStateService]
    Listeners --> SearchService[SearchService]
    Listeners --> Plugins[Plugin Listeners]

说明: 集群状态变更时,触发所有注册的监听器。


2.4 数据流与控制流

写入流程数据流

Client → REST API → IndexAction → TransportIndexAction
  → IndexShard → InternalEngine → Lucene IndexWriter
  → Translog → Disk

关键点:

  • REST层负责HTTP协议处理
  • Action层负责请求路由和权限验证
  • Shard层负责分片路由
  • Engine层负责版本控制和持久化

搜索流程数据流

Client → REST API → SearchAction → TransportSearchAction
  → SearchService → IndexShard → InternalEngine
  → Lucene IndexSearcher → 返回结果

关键点:

  • 两阶段搜索: Query Phase + Fetch Phase
  • 协调节点聚合所有分片结果
  • 支持并行搜索多个分片

集群状态流控制流

Master Node → ClusterService → PublishClusterState
  → TransportService → All Nodes → ApplyClusterState
  → IndicesClusterStateService → 创建/删除分片

关键点:

  • 主节点生成新状态
  • 两阶段提交保证一致性
  • 各节点异步应用状态

3. 三阶段启动流程

3.1 启动流程图

flowchart TB
    Start[启动 main 方法] --> Phase1[Phase 1:<br/>基础初始化]

    Phase1 --> P1_1[初始化安全属性]
    P1_1 --> P1_2[读取启动参数]
    P1_2 --> P1_3[初始化环境]
    P1_3 --> P1_4[配置日志系统]

    P1_4 --> Phase2[Phase 2:<br/>安全管理器初始化]

    Phase2 --> P2_1[创建环境对象]
    P2_1 --> P2_2[写入 PID 文件]
    P2_2 --> P2_3[初始化本地库<br/>JNA/JNI]
    P2_3 --> P2_4[检查 Jar Hell]
    P2_4 --> P2_5[配置安全管理器]
    P2_5 --> P2_6[加载插件]

    P2_6 --> Phase3[Phase 3:<br/>节点构建与启动]

    Phase3 --> P3_1[创建 Node 对象]
    P3_1 --> P3_2[构建依赖注入容器]
    P3_2 --> P3_3[注册所有服务]
    P3_3 --> P3_4[启动所有服务]
    P3_4 --> P3_5[执行 Bootstrap 检查]
    P3_5 --> P3_6[通知 CLI 准备就绪]

    P3_6 --> Running[节点运行中]

3.2 三阶段详解

Phase 1: 基础初始化

目标: 完成最基础的初始化,为后续阶段准备环境

关键步骤:

private static Bootstrap initPhase1() {
    // 1. 初始化安全属性
    initSecurityProperties();

    // 2. 初始化 BootstrapInfo
    BootstrapInfo.init();

    // 3. 读取启动参数
    ServerArgs args = new ServerArgs(System.in);

    // 4. 创建环境对象(仅路径信息)
    Environment nodeEnv = new Environment(args.nodeSettings(), args.configDir());

    // 5. 配置日志系统(必须在最后)
    LogConfigurator.configure(nodeEnv, args.quiet() == false);

    return new Bootstrap(args);
}

注意事项:

  • 尽可能少的操作
  • 日志配置必须在最后
  • 此阶段单线程执行

Phase 2: 安全管理器初始化

目标: 完成安全管理器配置之前的所有准备工作

关键步骤:

private static void initPhase2(Bootstrap bootstrap) {
    // 1. 创建完整的环境对象(包含 SecureSettings)
    Environment nodeEnv = createEnvironment(args.configDir(), args.nodeSettings(), secrets);

    // 2. 写入 PID 文件
    initPidFile(args.pidFile());

    // 3. 设置全局异常处理器
    Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());

    // 4. 初始化本地库(JNA/JNI)
    initializeNatives(
        nodeEnv.tmpDir(),
        MEMORY_LOCK_SETTING.get(args.nodeSettings()),
        true,  // 系统调用过滤器
        CTRLHANDLER_SETTING.get(args.nodeSettings())
    );

    // 5. 检查 Jar Hell(类路径冲突)
    JarHell.checkJarHell();

    // 6. 加载插件
    PluginsLoader pluginsLoader = PluginsLoader.createPluginsLoader(nodeEnv);

    // 7. 配置安全管理器(可选)
    if (enableSecurityManager) {
        Security.setSecurityManager(new SecurityManager());
    }
}

注意事项:

  • 必须在安全管理器前完成
  • 加载所有本地库
  • 插件预加载

Phase 3: 节点构建与启动

目标: 构建 Node 对象,启动所有服务

关键步骤:

private static void initPhase3(Bootstrap bootstrap) {
    // 1. 创建 Node 对象
    Node node = new Node(bootstrap.environment(), bootstrap.pluginsLoader());

    // 2. 启动节点
    node.start();

    // 3. 通知 CLI 进程准备就绪
    bootstrap.sendCliMarker(BootstrapInfo.SERVER_READY_MARKER);
}

Node 构造过程:

public Node(Environment environment, PluginsLoader pluginsLoader) {
    // 1. 创建依赖注入容器(Guice)
    Injector injector = createInjector(environment, pluginsLoader);

    // 2. 注册所有核心服务
    registerServices(injector);
}

Node 启动过程:

public Node start() {
    // 1. 更新生命周期状态
    lifecycle.moveToStarted();

    // 2. 启动插件生命周期组件
    pluginLifecycleComponents.forEach(LifecycleComponent::start);

    // 3. 启动核心服务(按依赖顺序)
    injector.getInstance(IndicesService.class).start();
    injector.getInstance(IndicesClusterStateService.class).start();
    injector.getInstance(SnapshotsService.class).start();
    injector.getInstance(SearchService.class).start();
    injector.getInstance(ClusterService.class).start();
    injector.getInstance(TransportService.class).start();
    injector.getInstance(Coordinator.class).start();
    injector.getInstance(GatewayService.class).start();

    // 4. 启动 HTTP 服务器
    injector.getInstance(HttpServerTransport.class).start();

    // 5. 执行启动前验证
    validateNodeBeforeAcceptingRequests();

    return this;
}

4. 生命周期管理

4.1 生命周期状态机

stateDiagram-v2
    [*] --> Initialized: 构造 Node
    Initialized --> Starting: start()
    Starting --> Started: 启动完成
    Started --> Stopping: stop()
    Stopping --> Stopped: 停止完成
    Stopped --> Closing: close()
    Closing --> Closed: 关闭完成
    Closed --> [*]

    Started --> Closing: 异常/强制关闭

4.2 生命周期状态

状态 说明 转换方法
Initialized 节点已构造,服务已注册 构造函数
Starting 正在启动服务 start()
Started 所有服务已启动,可以接受请求 start()
Stopping 正在停止服务 stop()
Stopped 所有服务已停止 stop()
Closing 正在关闭资源 close()
Closed 所有资源已释放 close()

5. 服务启动顺序

5.1 启动顺序图

flowchart LR
    TP[ThreadPool] --> IS[IndicesService]
    IS --> ICSS[IndicesClusterStateService]
    ICSS --> SS[SnapshotsService]
    SS --> Search[SearchService]
    Search --> CS[ClusterService]
    CS --> TS[TransportService]
    TS --> Coord[Coordinator]
    Coord --> GW[GatewayService]
    GW --> HTTP[HttpServerTransport]

5.2 启动顺序说明

顺序原因:

  1. ThreadPool: 最先启动,所有服务都依赖线程池
  2. IndicesService: 管理索引,其他服务需要访问索引
  3. ClusterService: 集群状态管理
  4. TransportService: 节点间通信
  5. Coordinator: 集群协调(依赖 TransportService)
  6. HttpServerTransport: 最后启动,接受外部请求

6. 核心数据结构

6.1 Node 核心类

类图

classDiagram
    class Node {
        -Lifecycle lifecycle
        -Injector injector
        -Environment environment
        -NodeEnvironment nodeEnvironment
        -PluginsService pluginsService
        -NodeClient client
        -Collection~LifecycleComponent~ pluginLifecycleComponents
        +Node(Environment, PluginsLoader)
        +start() Node
        +close()
    }

    class Lifecycle {
        -State state
        +moveToStarted() boolean
        +moveToStopped() boolean
        +moveToClosed() boolean
        +state() State
    }

    class State {
        <<enumeration>>
        INITIALIZED
        STARTING
        STARTED
        STOPPING
        STOPPED
        CLOSING
        CLOSED
    }

    class Environment {
        -Settings settings
        -Path configDir
        -Path[] dataFiles
        -Path logsDir
        -Path tmpDir
        +settings() Settings
        +configDir() Path
        +dataFiles() Path[]
    }

    Node --> Lifecycle
    Node --> Environment
    Lifecycle --> State

类说明

Node:

  • 封装所有服务和组件
  • 管理节点生命周期
  • 提供统一的启动/停止/关闭接口

Lifecycle:

  • 跟踪组件生命周期状态
  • 提供状态转换方法
  • 防止非法状态转换

Environment:

  • 封装节点环境配置
  • 管理路径信息
  • 提供配置访问接口

6.2 LifecycleComponent 接口

类图

classDiagram
    class LifecycleComponent {
        <<interface>>
        +start()
        +stop()
        +close()
        +lifecycleState() State
    }

    class AbstractLifecycleComponent {
        <<abstract>>
        #Lifecycle lifecycle
        +start()
        +stop()
        +close()
        #doStart()
        #doStop()
        #doClose()
    }

    class TransportService {
        +start()
        +stop()
        +close()
    }

    class ClusterService {
        +start()
        +stop()
        +close()
    }

    class IndicesService {
        +start()
        +stop()
        +close()
    }

    LifecycleComponent <|.. AbstractLifecycleComponent
    AbstractLifecycleComponent <|-- TransportService
    AbstractLifecycleComponent <|-- ClusterService
    AbstractLifecycleComponent <|-- IndicesService

类说明

LifecycleComponent:

  • 定义生命周期组件接口
  • 所有需要生命周期管理的服务实现此接口

AbstractLifecycleComponent:

  • 提供生命周期模板方法
  • 管理状态转换
  • 子类实现 doStart/doStop/doClose

6.3 Injector & Module

类图

classDiagram
    class Injector {
        <<Guice>>
        +getInstance(Class~T~) T
        +getInstance(Key~T~) T
    }

    class NodeModule {
        <<abstract>>
        +configure()
        #bindService(Class, Class)
    }

    class ClusterModule {
        +configure()
    }

    class TransportModule {
        +configure()
    }

    class IndicesModule {
        +configure()
    }

    Injector ..> NodeModule
    NodeModule <|-- ClusterModule
    NodeModule <|-- TransportModule
    NodeModule <|-- IndicesModule

类说明

Injector (Guice):

  • Google Guice 依赖注入容器
  • 管理所有服务实例
  • 解决依赖关系

NodeModule:

  • 定义服务绑定
  • 配置依赖关系
  • 各模块定义自己的 Module

6.4 PluginsService

类图

classDiagram
    class PluginsService {
        -List~PluginInfo~ pluginsInfo
        -List~Plugin~ plugins
        -Map~Class, List~ pluginMap
        +PluginsService(Settings, Path, Path, Path)
        +getPluginInfos() List~PluginInfo~
        +filterPlugins(Class~T~) List~T~
    }

    class Plugin {
        <<interface>>
        +createComponents(...) Collection~Object~
        +getActions() List~ActionHandler~
        +getRestHandlers(...) List~RestHandler~
        +getSettings() List~Setting~
    }

    class PluginInfo {
        -String name
        -String description
        -String version
        -String classname
        +getName() String
        +getVersion() String
    }

    class PluginBundle {
        -PluginDescriptor pluginDescriptor
        -ClassLoader classLoader
        -Set~URL~ urls
    }

    PluginsService --> Plugin
    PluginsService --> PluginInfo
    PluginsService --> PluginBundle

类说明

PluginsService:

  • 管理所有插件
  • 加载插件类
  • 提供插件查询接口

Plugin:

  • 插件接口
  • 定义扩展点
  • 插件通过实现接口扩展功能

PluginInfo:

  • 插件元信息
  • 从 plugin-descriptor.properties 读取
  • 包含名称、版本、描述等

6.5 NodeEnvironment

类图

classDiagram
    class NodeEnvironment {
        -Settings settings
        -Path[] nodePaths
        -Lock[] locks
        -NodeId nodeId
        +NodeEnvironment(Settings, Environment)
        +nodeDataPaths() Path[]
        +nodeId() NodeId
        +close()
    }

    class NodeId {
        -String id
        +NodeId(String)
        +getId() String
    }

    class NodePath {
        -Path path
        -long totalSpace
        -long freeSpace
        +getPath() Path
        +getTotalSpace() long
    }

    NodeEnvironment --> NodeId
    NodeEnvironment --> NodePath

类说明

NodeEnvironment:

  • 管理节点数据目录
  • 加锁防止多个节点使用同一目录
  • 生成或读取 NodeId

NodeId:

  • 节点唯一标识
  • 持久化到磁盘
  • 重启后保持不变

6.6 ThreadPool

类图

classDiagram
    class ThreadPool {
        -Map~String, ExecutorHolder~ executors
        -ScheduledThreadPoolExecutor scheduler
        +ThreadPool(Settings)
        +generic() ExecutorService
        +executor(String) ExecutorService
        +schedule(Runnable, TimeValue, String) ScheduledFuture
        +shutdown()
    }

    class ExecutorHolder {
        -ExecutorService executor
        -ThreadPoolInfo info
        +executor() ExecutorService
        +info() ThreadPoolInfo
    }

    class ThreadPoolInfo {
        -String name
        -String type
        -int size
        -int queueSize
        +getName() String
        +getType() String
    }

    ThreadPool --> ExecutorHolder
    ExecutorHolder --> ThreadPoolInfo

类说明

ThreadPool:

  • 管理所有线程池
  • 不同线程池用于不同任务类型
  • 提供调度功能

线程池类型:

名称 类型 大小 队列 用途
generic cached unbounded - 通用任务
search fixed processors 1000 搜索操作
get fixed processors 1000 GET 操作
write fixed processors 10000 写入操作
management scaling 5 - 集群管理
refresh scaling processors/2 - Refresh 操作
flush scaling processors/2 - Flush 操作

6.7 Settings

类图

classDiagram
    class Settings {
        -Map~String, String~ settings
        +get(String) String
        +getAsInt(String, int) int
        +getAsBoolean(String, boolean) boolean
        +builder() Builder
    }

    class Builder {
        -Map~String, String~ map
        +put(String, String) Builder
        +put(String, int) Builder
        +build() Settings
    }

    class Setting {
        <<abstract>>
        -String key
        -Function parser
        -T defaultValue
        +get(Settings) T
        +exists(Settings) boolean
    }

    Settings --> Builder
    Settings ..> Setting

类说明

Settings:

  • 不可变配置对象
  • 存储所有配置项
  • 提供类型安全的访问方法

Setting:

  • 配置项定义
  • 包含默认值和验证逻辑
  • 类型安全

示例:

public static final Setting<Integer> HTTP_PORT =
    Setting.intSetting("http.port", 9200, 1024, 65535, Property.NodeScope);

// 使用
int port = HTTP_PORT.get(settings);

7. 核心 API

7.1 Node API

Node 构造

方法签名:

public Node(Environment environment, PluginsLoader pluginsLoader)

参数说明:

参数 类型 说明
environment Environment 环境配置对象
pluginsLoader PluginsLoader 插件加载器

核心逻辑:

public Node(Environment environment, PluginsLoader pluginsLoader) {
    // 1. 准备构造上下文
    NodeConstruction construction = NodeConstruction.prepareConstruction(
        environment,
        pluginsLoader,
        new NodeServiceProvider(),
        true
    );

    // 2. 创建依赖注入容器
    this.injector = construction.injector();

    // 3. 获取核心组件
    this.environment = injector.getInstance(Environment.class);
    this.nodeEnvironment = injector.getInstance(NodeEnvironment.class);
    this.pluginsService = injector.getInstance(PluginsService.class);
    this.client = injector.getInstance(NodeClient.class);

    // 4. 收集插件生命周期组件
    this.pluginLifecycleComponents = construction.pluginLifecycleComponents();
}

start - 启动节点

方法签名:

public Node start() throws NodeValidationException

核心逻辑:

public Node start() throws NodeValidationException {
    // 1. 状态检查
    if (lifecycle.moveToStarted() == false) {
        return this;  // 已经启动
    }

    // 2. 启动插件组件
    pluginLifecycleComponents.forEach(LifecycleComponent::start);

    // 3. 启动核心服务(按依赖顺序)
    injector.getInstance(IndicesService.class).start();
    injector.getInstance(IndicesClusterStateService.class).start();
    injector.getInstance(SnapshotsService.class).start();
    injector.getInstance(SearchService.class).start();
    injector.getInstance(FsHealthService.class).start();
    injector.getInstance(MonitorService.class).start();

    // 4. 启动集群服务
    ClusterService clusterService = injector.getInstance(ClusterService.class);
    NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
    nodeConnectionsService.start();
    clusterService.setNodeConnectionsService(nodeConnectionsService);

    // 5. 启动协调器
    Coordinator coordinator = injector.getInstance(Coordinator.class);
    clusterService.getMasterService().setClusterStatePublisher(coordinator);

    // 6. 启动传输服务
    TransportService transportService = injector.getInstance(TransportService.class);
    transportService.start();
    transportService.acceptIncomingRequests();

    // 7. 启动网关服务
    injector.getInstance(GatewayService.class).start();

    // 8. 启动 HTTP 服务器
    injector.getInstance(HttpServerTransport.class).start();

    // 9. 执行启动前验证
    validateNodeBeforeAcceptingRequests();

    return this;
}

stop - 停止节点

方法签名:

private void stop()

核心逻辑:

private void stop() {
    // 1. 状态检查
    if (lifecycle.moveToStopped() == false) {
        return;
    }

    // 2. 停止 HTTP 服务器(停止接收新请求)
    stopIfStarted(HttpServerTransport.class);

    // 3. 停止快照和仓库服务
    stopIfStarted(SnapshotsService.class);
    stopIfStarted(SnapshotShardsService.class);
    stopIfStarted(RepositoriesService.class);

    // 4. 停止索引集群状态服务
    stopIfStarted(IndicesClusterStateService.class);

    // 5. 停止协调器(停止响应集群状态更新)
    stopIfStarted(Coordinator.class);

    // 6. 停止集群服务
    stopIfStarted(ClusterService.class);
    stopIfStarted(NodeConnectionsService.class);

    // 7. 停止其他服务
    stopIfStarted(FsHealthService.class);
    stopIfStarted(MonitorService.class);
    stopIfStarted(GatewayService.class);
    stopIfStarted(SearchService.class);
    stopIfStarted(TransportService.class);

    // 8. 停止插件组件
    pluginLifecycleComponents.forEach(Node::stopIfStarted);

    // 9. 最后停止索引服务(等待资源释放)
    stopIfStarted(IndicesService.class);
}

close - 关闭节点

方法签名:

@Override
public synchronized void close() throws IOException

核心逻辑:

@Override
public synchronized void close() throws IOException {
    synchronized (lifecycle) {
        // 1. 如果还在运行,先停止
        if (lifecycle.started()) {
            stop();
        }

        // 2. 状态检查
        if (lifecycle.moveToClosed() == false) {
            return;
        }
    }

    // 3. 关闭所有组件(逆启动顺序)
    List<Closeable> toClose = new ArrayList<>();

    toClose.add(injector.getInstance(HttpServerTransport.class));
    toClose.add(injector.getInstance(SnapshotsService.class));
    toClose.add(injector.getInstance(IndicesClusterStateService.class));
    toClose.add(injector.getInstance(IndicesService.class));
    toClose.add(injector.getInstance(IndicesStore.class));
    toClose.add(injector.getInstance(ClusterService.class));
    toClose.add(injector.getInstance(NodeConnectionsService.class));
    toClose.add(injector.getInstance(Coordinator.class));
    toClose.add(injector.getInstance(MonitorService.class));
    toClose.add(injector.getInstance(GatewayService.class));
    toClose.add(injector.getInstance(SearchService.class));
    toClose.add(injector.getInstance(TransportService.class));

    // 4. 关闭插件
    pluginLifecycleComponents.forEach(toClose::add);
    pluginsService.filterPlugins(Plugin.class).forEach(toClose::add);

    // 5. 关闭线程池
    toClose.add(() -> injector.getInstance(ThreadPool.class).shutdown());

    // 6. 关闭节点环境
    toClose.add(injector.getInstance(NodeEnvironment.class));

    // 7. 批量关闭
    IOUtils.close(toClose);
}

7.2 Lifecycle API

Lifecycle 接口

public interface LifecycleComponent extends Closeable {
    void start();
    void stop();
    void close() throws IOException;
    Lifecycle.State lifecycleState();
}

Lifecycle 状态管理

public class Lifecycle {
    private volatile State state = State.INITIALIZED;

    public boolean moveToStarted() {
        State localState = state;
        if (localState == State.INITIALIZED || localState == State.STOPPED) {
            state = State.STARTED;
            return true;
        }
        return false;
    }

    public boolean moveToStopped() {
        State localState = state;
        if (localState == State.STARTED) {
            state = State.STOPPED;
            return true;
        }
        return false;
    }

    public boolean moveToClosed() {
        State localState = state;
        if (localState == State.CLOSED) {
            return false;
        }
        state = State.CLOSED;
        return true;
    }
}

7.3 Environment API

Environment 构造

public Environment(Settings settings, Path configPath)

关键方法

public class Environment {
    // 获取配置目录
    public Path configDir();

    // 获取数据目录
    public Path[] dataFiles();

    // 获取日志目录
    public Path logsDir();

    // 获取临时目录
    public Path tmpDir();

    // 获取节点设置
    public Settings settings();
}

7.4 NodeClient API

NodeClient 是节点内部的客户端,用于执行操作。

public class NodeClient extends AbstractClient {
    // 执行 Action
    @Override
    public <Request extends ActionRequest, Response extends ActionResponse>
    void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
        // 查找 TransportAction
        TransportAction<Request, Response> transportAction = actions.get(action);

        // 执行操作
        transportAction.execute(null, request, listener);
    }
}

7.5 PluginsService API

插件加载

public class PluginsService {
    public PluginsService(
        Settings settings,
        Path configPath,
        Path modulesPath,
        Path pluginsPath
    ) {
        // 1. 加载模块(内置插件)
        List<PluginBundle> moduleBundles = loadBundles(modulesPath);

        // 2. 加载插件
        List<PluginBundle> pluginBundles = loadBundles(pluginsPath);

        // 3. 初始化所有插件
        this.plugins = new ArrayList<>();
        for (PluginBundle bundle : allBundles) {
            Plugin plugin = loadPlugin(bundle);
            plugins.add(plugin);
        }
    }
}

插件扩展点

public interface Plugin {
    // 创建自定义组件
    default Collection<Object> createComponents(...) {
        return Collections.emptyList();
    }

    // 注册 Action
    default List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
        return Collections.emptyList();
    }

    // 注册 REST Handler
    default List<RestHandler> getRestHandlers(...) {
        return Collections.emptyList();
    }

    // 自定义设置
    default List<Setting<?>> getSettings() {
        return Collections.emptyList();
    }
}

7.6 Bootstrap Checks

Bootstrap 检查

public abstract class BootstrapCheck {
    public abstract BootstrapCheckResult check(BootstrapContext context);
}

常见检查

检查项 说明
HeapSizeCheck 检查堆内存大小
FileDescriptorCheck 检查文件描述符限制
MaxNumberOfThreadsCheck 检查最大线程数
MaxMapCountCheck 检查 vm.max_map_count
ClientJvmCheck 检查 JVM 类型
OnErrorCheck 检查 OnError 配置
OnOutOfMemoryErrorCheck 检查 OnOutOfMemoryError 配置

8. 核心流程时序图

8.1 节点启动完整流程

三阶段启动时序图

sequenceDiagram
    autonumber
    participant Main as main 方法
    participant ES as Elasticsearch
    participant Bootstrap as Bootstrap
    participant Node as Node
    participant Injector as Guice Injector
    participant Services as Core Services

    Note over Main,Services: Phase 1: 基础初始化

    Main->>ES: main(args)
    ES->>ES: initPhase1()
    ES->>ES: initSecurityProperties()
    ES->>ES: BootstrapInfo.init()
    ES->>ES: 读取 ServerArgs
    ES->>ES: 创建 Environment
    ES->>ES: 配置日志系统
    ES->>Bootstrap: new Bootstrap(args)
    Bootstrap-->>ES: bootstrap

    Note over Main,Services: Phase 2: 安全管理器初始化

    ES->>Bootstrap: initPhase2(bootstrap)
    Bootstrap->>Bootstrap: 创建完整 Environment
    Bootstrap->>Bootstrap: 写入 PID 文件
    Bootstrap->>Bootstrap: 设置异常处理器
    Bootstrap->>Bootstrap: 初始化本地库 (JNA)
    Bootstrap->>Bootstrap: 检查 Jar Hell
    Bootstrap->>Bootstrap: 加载插件
    Bootstrap->>Bootstrap: 配置安全管理器

    Note over Main,Services: Phase 3: 节点构建与启动

    ES->>Bootstrap: initPhase3(bootstrap)
    Bootstrap->>Node: new Node(environment, pluginsLoader)

    Node->>Node: prepareConstruction()
    Node->>Injector: 创建 Guice Injector
    Injector->>Injector: 注册所有 Module
    Injector->>Injector: 创建所有服务实例
    Injector-->>Node: injector

    Node->>Node: 获取核心组件引用
    Node-->>Bootstrap: node

    Bootstrap->>Node: start()

    Node->>Node: lifecycle.moveToStarted()

    loop 启动所有服务
        Node->>Services: start()
        Services->>Services: 初始化资源
        Services-->>Node: started
    end

    Node->>Node: validateNodeBeforeAcceptingRequests()
    Node-->>Bootstrap: started

    Bootstrap->>Bootstrap: sendCliMarker(SERVER_READY)
    Bootstrap-->>Main: 节点准备就绪

时序图说明

Phase 1: 基础初始化 (步骤 1-9)

目标: 完成最基础的初始化

关键步骤:

  1. 安全属性初始化: 配置 Java 安全属性
  2. BootstrapInfo 初始化: 系统信息收集
  3. 读取启动参数: 从 CLI 进程读取配置
  4. 创建环境对象: 仅包含路径信息
  5. 配置日志: 必须在最后执行

注意事项:

  • 单线程执行
  • 尽量少的操作
  • 日志配置必须最后

Phase 2: 安全管理器初始化 (步骤 10-17)

目标: 完成安全管理器配置前的所有准备

关键步骤:

  1. 创建完整环境: 包含 SecureSettings
  2. PID 文件: 防止重复启动
  3. 异常处理器: 全局异常处理
  4. 本地库初始化: JNA/JNI 库
  5. Jar Hell 检查: 防止类路径冲突
  6. 插件加载: 预加载所有插件
  7. 安全管理器: 可选配置

Phase 3: 节点构建与启动 (步骤 18-35)

目标: 构建并启动节点

关键步骤:

  1. 创建 Node: 构造节点对象
  2. Guice 容器: 创建依赖注入容器
  3. 注册服务: 注册所有服务到容器
  4. 启动服务: 按依赖顺序启动
  5. 验证节点: 执行 Bootstrap 检查
  6. 通知 CLI: 节点准备就绪

8.2 服务启动顺序

服务启动详细时序图

sequenceDiagram
    autonumber
    participant Node
    participant Plugins as Plugin Components
    participant IS as IndicesService
    participant ICSS as IndicesClusterStateService
    participant Snapshots as SnapshotsService
    participant Search as SearchService
    participant CS as ClusterService
    participant TS as TransportService
    participant Coord as Coordinator
    participant GW as GatewayService
    participant HTTP as HttpServerTransport

    Node->>Node: lifecycle.moveToStarted()

    Note over Node,HTTP: 步骤1: 启动插件组件

    Node->>Plugins: forEach(start)
    Plugins->>Plugins: 初始化插件资源
    Plugins-->>Node: started

    Note over Node,HTTP: 步骤2: 启动索引服务

    Node->>IS: start()
    IS->>IS: 启动索引管理器
    IS-->>Node: started

    Node->>ICSS: start()
    ICSS->>ICSS: 启动集群状态监听
    ICSS-->>Node: started

    Note over Node,HTTP: 步骤3: 启动快照服务

    Node->>Snapshots: start()
    Snapshots->>Snapshots: 初始化快照管理
    Snapshots-->>Node: started

    Note over Node,HTTP: 步骤4: 启动搜索服务

    Node->>Search: start()
    Search->>Search: 初始化搜索线程池
    Search-->>Node: started

    Note over Node,HTTP: 步骤5: 启动集群服务

    Node->>CS: start()
    CS->>CS: 启动 MasterService
    CS->>CS: 启动 ClusterApplierService
    CS-->>Node: started

    Note over Node,HTTP: 步骤6: 启动传输服务

    Node->>TS: start()
    TS->>TS: 绑定传输地址
    TS->>TS: 启动 Netty Server
    TS->>TS: acceptIncomingRequests()
    TS-->>Node: started

    Note over Node,HTTP: 步骤7: 启动协调器

    Node->>Coord: start()
    Coord->>Coord: 启动主节点选举
    Coord->>Coord: 加入集群
    Coord-->>Node: started

    Note over Node,HTTP: 步骤8: 启动网关服务

    Node->>GW: start()
    GW->>GW: 恢复集群状态
    GW-->>Node: started

    Note over Node,HTTP: 步骤9: 启动 HTTP 服务

    Node->>HTTP: start()
    HTTP->>HTTP: 绑定 HTTP 地址
    HTTP->>HTTP: 启动 Netty HTTP Server
    HTTP-->>Node: started

    Node->>Node: validateNodeBeforeAcceptingRequests()

时序图说明

启动顺序原因

1. 插件组件 (步骤 1-3):

  • 最先启动插件生命周期组件
  • 为后续服务提供扩展功能

2. 索引服务 (步骤 4-6):

  • IndicesService: 管理所有索引
  • IndicesClusterStateService: 监听集群状态变化

3. 快照服务 (步骤 7-9):

  • SnapshotsService: 快照管理
  • 依赖索引服务

4. 搜索服务 (步骤 10-12):

  • SearchService: 执行搜索
  • 初始化搜索线程池

5. 集群服务 (步骤 13-16):

  • ClusterService: 集群状态管理
  • 启动 Master Service 和 Applier Service

6. 传输服务 (步骤 17-21):

  • TransportService: 节点间通信
  • 绑定端口并接受连接

7. 协调器 (步骤 22-25):

  • Coordinator: 主节点选举和集群协调
  • 依赖 TransportService

8. 网关服务 (步骤 26-28):

  • GatewayService: 恢复集群元数据
  • 依赖所有前置服务

9. HTTP 服务 (步骤 29-32):

  • HttpServerTransport: REST API
  • 最后启动,此时节点已准备好接受请求

8.3 节点关闭流程

节点关闭时序图

sequenceDiagram
    autonumber
    participant Node
    participant HTTP as HttpServerTransport
    participant Coord as Coordinator
    participant CS as ClusterService
    participant TS as TransportService
    participant IS as IndicesService
    participant TP as ThreadPool
    participant NE as NodeEnvironment

    Node->>Node: close()

    Note over Node,NE: 步骤1: 先停止再关闭

    alt 节点还在运行
        Node->>Node: stop()
    end

    Node->>Node: lifecycle.moveToClosed()

    Note over Node,NE: 步骤2: 关闭 HTTP (停止接收新请求)

    Node->>HTTP: close()
    HTTP->>HTTP: unbind()
    HTTP->>HTTP: 关闭所有连接
    HTTP-->>Node: closed

    Note over Node,NE: 步骤3: 关闭协调器

    Node->>Coord: close()
    Coord->>Coord: 停止主节点选举
    Coord->>Coord: 离开集群
    Coord-->>Node: closed

    Note over Node,NE: 步骤4: 关闭集群服务

    Node->>CS: close()
    CS->>CS: 停止状态更新
    CS-->>Node: closed

    Note over Node,NE: 步骤5: 关闭传输服务

    Node->>TS: close()
    TS->>TS: 关闭所有连接
    TS->>TS: 停止 Netty
    TS-->>Node: closed

    Note over Node,NE: 步骤6: 关闭索引服务

    Node->>IS: close()
    IS->>IS: 关闭所有索引
    IS->>IS: 刷新数据到磁盘
    IS-->>Node: closed

    Note over Node,NE: 步骤7: 关闭线程池

    Node->>TP: shutdown()
    TP->>TP: 停止所有线程
    TP->>TP: 等待任务完成
    TP-->>Node: shutdown

    Note over Node,NE: 步骤8: 关闭节点环境

    Node->>NE: close()
    NE->>NE: 释放文件锁
    NE-->>Node: closed

时序图说明

关闭顺序(逆启动顺序)

1. HTTP 服务 (步骤 1-4):

  • 停止接收新的 HTTP 请求
  • 关闭现有连接

2. 协调器 (步骤 5-8):

  • 停止主节点选举
  • 通知其他节点离开集群

3. 集群服务 (步骤 9-11):

  • 停止处理集群状态更新
  • 保存最后的集群状态

4. 传输服务 (步骤 12-15):

  • 关闭节点间连接
  • 停止 Netty 服务器

5. 索引服务 (步骤 16-19):

  • 关闭所有索引
  • Flush 数据到磁盘
  • 等待所有操作完成

6. 线程池 (步骤 20-23):

  • 停止接收新任务
  • 等待现有任务完成
  • 关闭所有线程

7. 节点环境 (步骤 24-26):

  • 释放目录锁
  • 清理临时文件

8.4 插件加载流程

插件加载时序图

sequenceDiagram
    autonumber
    participant Bootstrap
    participant PL as PluginsLoader
    participant PS as PluginsService
    participant Plugin
    participant Injector as Guice Injector

    Bootstrap->>PL: createPluginsLoader(env)

    PL->>PL: 扫描 plugins 目录
    PL->>PL: 扫描 modules 目录

    loop 每个插件/模块
        PL->>PL: 读取 plugin-descriptor.properties
        PL->>PL: 创建 ClassLoader
        PL->>PL: 创建 PluginBundle
    end

    PL-->>Bootstrap: pluginsLoader

    Bootstrap->>PS: new PluginsService(settings, pluginsLoader)

    loop 每个 PluginBundle
        PS->>PS: 加载插件类
        PS->>Plugin: 实例化插件
        Plugin-->>PS: plugin instance
        PS->>PS: 收集扩展点
    end

    PS-->>Bootstrap: pluginsService

    Note over Bootstrap,Injector: 构建依赖注入容器

    Bootstrap->>Injector: createInjector(...)

    loop 每个插件
        Injector->>Plugin: createComponents(...)
        Plugin-->>Injector: custom components

        Injector->>Plugin: getActions()
        Plugin-->>Injector: action handlers

        Injector->>Plugin: getRestHandlers(...)
        Plugin-->>Injector: REST handlers
    end

    Injector-->>Bootstrap: injector with plugins

时序图说明

插件加载步骤

1. 扫描目录 (步骤 1-7):

  • 扫描 plugins/ 目录(用户插件)
  • 扫描 modules/ 目录(内置模块)
  • 读取每个插件的描述文件

2. 创建 ClassLoader (步骤 3-6):

  • 为每个插件创建独立的 ClassLoader
  • 实现插件隔离
  • 避免类冲突

3. 实例化插件 (步骤 10-14):

  • 加载插件主类
  • 调用插件构造函数
  • 收集插件扩展点

4. 注册扩展 (步骤 17-25):

  • 创建自定义组件
  • 注册 Action 处理器
  • 注册 REST 处理器
  • 注册自定义设置

8.5 HTTP索引请求完整处理链路

索引请求端到端时序图

sequenceDiagram
    autonumber
    participant Client as HTTP Client
    participant Netty as Netty HTTP Server
    participant RC as RestController
    participant RH as RestIndexAction<br/>(RestHandler)
    participant AM as ActionModule
    participant TA as TransportIndexAction
    participant Primary as Primary Shard Node
    participant IS as IndicesService
    participant Shard as IndexShard
    participant Engine as InternalEngine
    participant Lucene as Lucene
    participant TL as Translog
    participant Replica as Replica Shards

    Note over Client,Replica: 步骤1: HTTP请求接收与路由

    Client->>Netty: POST /my-index/_doc/1<br/>{\"field\":\"value\"}
    Netty->>Netty: 解析HTTP请求
    Netty->>RC: dispatchRequest(httpRequest)

    RC->>RC: 路由匹配<br/>POST /{index}/_doc/{id}
    RC->>RH: handleRequest(request, channel)

    Note over Client,Replica: 步骤2: REST层处理与参数解析

    RH->>RH: 解析路径参数<br/>(index, id)
    RH->>RH: 解析查询参数<br/>(refresh, timeout, routing)
    RH->>RH: 解析请求体<br/>(JSON document)

    RH->>RH: 构造IndexRequest
    RH->>AM: prepareIndex(IndexRequest)

    Note over Client,Replica: 步骤3: Action层处理

    AM->>TA: execute(IndexRequest, ActionListener)

    TA->>TA: 解析路由信息<br/>routing = id (默认)
    TA->>TA: 计算分片<br/>shardId = hash(routing) % num_shards

    TA->>TA: 获取集群状态<br/>查找Primary Shard位置

    alt 本地节点是Primary
        TA->>Primary: 本地调用
    else 远程节点是Primary
        TA->>Primary: TransportService.sendRequest()
    end

    Note over Client,Replica: 步骤4: Primary Shard处理

    Primary->>IS: index(IndexRequest)
    IS->>Shard: index(IndexRequest)

    Shard->>Shard: 前置检查<br/>(分片状态, 版本)
    Shard->>Shard: 分配 SequenceNumber
    Shard->>Shard: 分配 primaryTerm

    Note over Client,Replica: 步骤5: Engine层处理

    Shard->>Engine: index(Index operation)

    Engine->>Engine: 获取UID锁<br/>versionMap.acquireLock(uid)
    Engine->>Engine: 版本冲突检测<br/>checkVersionConflict()

    alt 版本冲突
        Engine-->>Shard: VersionConflictException
        Shard-->>TA: 409 Conflict
        TA-->>Client: 409 Conflict
    end

    Note over Client,Replica: 步骤6: 写入Lucene和Translog

    alt 新文档
        Engine->>Lucene: addDocument(doc)
    else 更新文档
        Engine->>Lucene: updateDocument(term, doc)
    end

    Lucene->>Lucene: 写入内存缓冲<br/>(IndexWriter buffer)
    Lucene-->>Engine: success

    Engine->>TL: add(IndexOperation)
    TL->>TL: 序列化操作
    TL->>TL: 追加到当前generation

    alt translog.durability = REQUEST
        TL->>TL: fsync() 立即刷盘
    else translog.durability = ASYNC
        TL->>TL: 异步刷盘(5秒)
    end

    TL-->>Engine: Location

    Engine->>Engine: 更新LiveVersionMap<br/>记录版本和Location
    Engine->>Engine: 释放UID锁

    Engine-->>Shard: IndexResult

    Note over Client,Replica: 步骤7: 副本复制

    par 并行复制到所有副本
        Shard->>Replica: ReplicationRequest<br/>(doc, seqNo, primaryTerm)
        Replica->>Replica: 应用操作<br/>(使用相同seqNo)
        Replica->>Replica: 写入Engine
        Replica->>Replica: 写入Translog
        Replica-->>Shard: ACK
    end

    Shard->>Shard: 等待所有副本确认<br/>(或部分确认)

    Note over Client,Replica: 步骤8: 返回响应

    Shard-->>IS: IndexResult
    IS-->>Primary: IndexResult
    Primary-->>TA: IndexResponse

    TA->>TA: 构造HTTP响应
    TA->>RC: IndexResponse
    RC->>Netty: HTTP 201 Created
    Netty-->>Client: {"_id":"1","_version":1,"result":"created"}

时序图详细说明

步骤1: HTTP请求接收与路由 (步骤1-4)

Netty HTTP Server:

// Netty4HttpServerTransport
public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
    protected void dispatchRequest(HttpRequest request, HttpChannel channel) {
        // 解析HTTP方法和路径
        RestRequest restRequest = new RestRequest(request, channel);

        // 交给RestController处理
        restController.dispatchRequest(restRequest, channel, threadContext);
    }
}

RestController路由:

public class RestController implements HttpServerTransport.Dispatcher {
    public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
        // 1. 查找匹配的Handler
        RestHandler handler = handlerMap.get(request.method(), request.path());

        // 2. 执行Handler
        handler.handleRequest(request, channel, client);
    }
}

关键点:

  • Netty负责HTTP协议处理
  • RestController负责路由匹配
  • 支持路径参数(如/my-index/_doc/1)

步骤2: REST层处理与参数解析 (步骤5-9)

RestIndexAction:

public class RestIndexAction extends BaseRestHandler {
    @Override
    public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
        // 1. 解析路径参数
        String index = request.param("index");
        String id = request.param("id");

        // 2. 解析查询参数
        String routing = request.param("routing");
        boolean refresh = request.paramAsBoolean("refresh", false);
        TimeValue timeout = request.paramAsTime("timeout", TimeValue.timeValueSeconds(60));

        // 3. 解析请求体
        BytesReference source = request.requiredContent();

        // 4. 构造IndexRequest
        IndexRequest indexRequest = new IndexRequest(index);
        indexRequest.id(id);
        indexRequest.routing(routing);
        indexRequest.source(source, request.getXContentType());
        indexRequest.timeout(timeout);

        // 5. 执行请求
        return channel -> client.index(indexRequest, new RestToXContentListener<>(channel));
    }
}

关键点:

  • 解析路径、查询参数和请求体
  • 构造ActionRequest
  • 设置超时、刷新等选项

步骤3: Action层处理 (步骤10-17)

TransportIndexAction路由计算:

public class TransportIndexAction extends TransportReplicationAction<IndexRequest, IndexRequest, IndexResponse> {
    @Override
    protected void doExecute(Task task, IndexRequest request, ActionListener<IndexResponse> listener) {
        // 1. 解析路由键
        String routing = request.routing() != null ? request.routing() : request.id();

        // 2. 计算分片ID
        ClusterState clusterState = clusterService.state();
        IndexMetadata indexMetadata = clusterState.metadata().index(request.index());
        int shardId = routing.hashCode() % indexMetadata.getNumberOfShards();

        // 3. 查找Primary Shard位置
        ShardRouting primaryShard = clusterState.routingTable()
            .shardRoutingTable(request.index(), shardId)
            .primaryShard();

        String nodeId = primaryShard.currentNodeId();

        // 4. 转发到Primary Shard节点
        if (clusterService.localNode().getId().equals(nodeId)) {
            // 本地执行
            shardOperationOnPrimary(request, primary, listener);
        } else {
            // 远程执行
            transportService.sendRequest(
                clusterState.nodes().get(nodeId),
                "indices:data/write/index[p]",
                request,
                new TransportResponseHandler<IndexResponse>() {
                    // 处理响应
                }
            );
        }
    }
}

关键点:

  • 路由计算: hash(routing) % num_primary_shards
  • 查找Primary Shard位置
  • 本地或远程执行

步骤4: Primary Shard处理 (步骤18-23)

IndexShard:

public class IndexShard extends AbstractIndexShardComponent {
    public Engine.IndexResult index(Engine.Index index) {
        // 1. 前置检查
        ensureWriteAllowed(index);
        verifyNotClosed();

        // 2. 分配序列号
        long seqNo = seqNoStats().getMaxSeqNo() + 1;
        long primaryTerm = this.primaryTerm;

        // 3. 更新操作
        index.seqNo(seqNo);
        index.primaryTerm(primaryTerm);

        // 4. 执行写入
        Engine.IndexResult result = engine.index(index);

        return result;
    }
}

关键点:

  • 分片状态检查
  • SequenceNumber分配(全局唯一递增)
  • PrimaryTerm记录(主分片任期)

步骤5: Engine层处理 (步骤24-34)

InternalEngine版本控制:

public class InternalEngine extends Engine {
    @Override
    public IndexResult index(Index index) {
        // 1. 获取UID锁
        try (Releasable ignored = versionMap.acquireLock(index.uid())) {

            // 2. 获取当前版本
            VersionValue versionValue = versionMap.getUnderLock(index.uid());
            long currentVersion = versionValue != null ? versionValue.version : Versions.NOT_FOUND;

            // 3. 版本冲突检测
            if (index.versionType().isVersionConflictForWrites(
                    currentVersion, index.version(), index.versionType())) {
                return new IndexResult(
                    new VersionConflictEngineException(...)
                );
            }

            // 4. 计算新版本
            long newVersion = index.versionType().updateVersion(currentVersion, index.version());

            // 5. 写入Lucene和Translog
            // ...
        }
    }
}

关键点:

  • UID级别锁(同一文档串行化)
  • 版本冲突检测
  • 支持多种版本类型(INTERNAL, EXTERNAL, EXTERNAL_GTE)

步骤6: 写入Lucene和Translog (步骤35-50)

Lucene写入:

// 新文档
if (plan.useLuceneUpdateDocument == false) {
    indexWriter.addDocument(index.docs());
} else {
    // 更新文档(先删除后添加)
    indexWriter.updateDocument(
        new Term(IdFieldMapper.NAME, index.uid()),
        index.docs()
    );
}

Translog写入:

// 写入Translog
Translog.Location location = translog.add(new Translog.Index(index));

// 根据持久化策略刷盘
if (translog.getDurability() == Translog.Durability.REQUEST) {
    translog.sync();  // 同步刷盘
} else {
    // ASYNC模式,异步刷盘(默认5秒)
}

LiveVersionMap更新:

versionMap.putIndexUnderLock(
    index.uid(),
    new IndexVersionValue(
        location,      // Translog位置
        newVersion,    // 新版本号
        index.seqNo(), // 序列号
        index.primaryTerm()  // 主分片任期
    )
);

关键点:

  • Lucene写入内存缓冲(还不可搜索)
  • Translog保证持久性
  • LiveVersionMap记录版本和位置(用于实时GET)

步骤7: 副本复制 (步骤51-57)

副本复制流程:

// 并行复制到所有副本
for (ShardRouting replicaShard : replicaShards) {
    transportService.sendRequest(
        clusterState.nodes().get(replicaShard.currentNodeId()),
        "indices:data/write/index[r]",  // 副本操作
        new ReplicaRequest(indexRequest, seqNo, primaryTerm),
        new TransportResponseHandler<ReplicaResponse>() {
            @Override
            public void handleResponse(ReplicaResponse response) {
                // 副本确认
                replicaResponses.add(response);
                checkAllReplicasResponded();
            }
        }
    );
}

副本应用:

// 副本节点收到复制请求
public void indexReplica(ReplicaRequest request) {
    // 使用Primary分配的seqNo
    Index operation = new Index(...);
    operation.seqNo(request.seqNo());
    operation.primaryTerm(request.primaryTerm());

    // 执行写入(无需版本检查)
    engine.index(operation);
}

关键点:

  • 并行复制到所有副本
  • 使用相同seqNo保证顺序
  • 等待多数确认或全部确认

步骤8: 返回响应 (步骤58-64)

构造响应:

IndexResponse response = new IndexResponse(
    shardId,
    indexRequest.id(),
    result.getSeqNo(),
    result.getPrimaryTerm(),
    result.getVersion(),
    result.isCreated()  // true表示新建,false表示更新
);

// 返回JSON
{
  "_index": "my-index",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "result": "created"  // or "updated"
}

8.6 HTTP搜索请求完整处理链路

搜索请求端到端时序图

sequenceDiagram
    autonumber
    participant Client as HTTP Client
    participant Netty as Netty HTTP Server
    participant RC as RestController
    participant RSA as RestSearchAction<br/>(RestHandler)
    participant TSA as TransportSearchAction
    participant Coord as Coordinating Node
    participant SS as SearchService
    participant Shard1 as Shard 1
    participant Shard2 as Shard 2
    participant Engine as InternalEngine
    participant Lucene as Lucene Searcher

    Note over Client,Lucene: 步骤1: HTTP请求接收

    Client->>Netty: POST /my-index/_search<br/>{\"query\":{\"match\":{\"field\":\"value\"}}}
    Netty->>RC: dispatchRequest()
    RC->>RSA: handleRequest()

    Note over Client,Lucene: 步骤2: 解析搜索请求

    RSA->>RSA: 解析查询参数<br/>(size, from, timeout)
    RSA->>RSA: 解析请求体<br/>(query DSL)
    RSA->>RSA: 构造SearchRequest

    RSA->>TSA: execute(SearchRequest)

    Note over Client,Lucene: 步骤3: 搜索协调

    TSA->>Coord: 接管请求(当前节点作为协调节点)

    Coord->>Coord: 获取集群状态
    Coord->>Coord: 确定需要搜索的分片<br/>(primary或replica)
    Coord->>Coord: 为每个分片选择节点<br/>(自适应副本选择)

    Note over Client,Lucene: 步骤4: Query Phase (查询阶段)

    par 并行查询所有分片
        Coord->>Shard1: SearchShardRequest<br/>(query, size+from)
        Shard1->>SS: executeQueryPhase()
        SS->>Engine: acquireSearcher()
        Engine-->>SS: IndexSearcher
        SS->>Lucene: search(query, size+from)
        Lucene->>Lucene: 执行查询<br/>构建倒排索引
        Lucene->>Lucene: 计算相关性评分
        Lucene-->>SS: TopDocs(docIds, scores)
        SS->>SS: 收集聚合数据
        SS-->>Shard1: QueryResult
        Shard1-->>Coord: QuerySearchResult<br/>(docIds, scores, aggs)
    and
        Coord->>Shard2: SearchShardRequest
        Shard2->>SS: executeQueryPhase()
        SS->>Engine: acquireSearcher()
        Engine-->>SS: IndexSearcher
        SS->>Lucene: search(query, size+from)
        Lucene-->>SS: TopDocs
        SS-->>Shard2: QueryResult
        Shard2-->>Coord: QuerySearchResult
    end

    Note over Client,Lucene: 步骤5: 协调节点归并结果

    Coord->>Coord: 归并所有分片结果<br/>(merge sort)
    Coord->>Coord: 取Top size个结果
    Coord->>Coord: 确定需要获取的文档<br/>(shardId, docId)列表

    Note over Client,Lucene: 步骤6: Fetch Phase (获取阶段)

    par 并行获取文档内容
        Coord->>Shard1: FetchRequest<br/>(docIds[])
        Shard1->>SS: executeFetchPhase()
        SS->>Engine: acquireSearcher()
        Engine-->>SS: IndexSearcher
        loop 每个docId
            SS->>Lucene: document(docId)
            Lucene-->>SS: Document
        end
        SS->>SS: 应用source过滤
        SS->>SS: 应用高亮
        SS-->>Shard1: FetchResult
        Shard1-->>Coord: FetchSearchResult<br/>(documents[])
    and
        Coord->>Shard2: FetchRequest
        Shard2->>SS: executeFetchPhase()
        SS->>Engine: acquireSearcher()
        loop 每个docId
            SS->>Lucene: document(docId)
        end
        SS-->>Coord: FetchSearchResult
    end

    Note over Client,Lucene: 步骤7: 组装最终结果

    Coord->>Coord: 组装hits数组<br/>(合并文档内容和评分)
    Coord->>Coord: 组装aggregations
    Coord->>Coord: 计算总耗时

    Coord-->>TSA: SearchResponse
    TSA-->>RSA: SearchResponse
    RSA->>RC: 构造HTTP响应
    RC->>Netty: HTTP 200 OK
    Netty-->>Client: {\"hits\":{...},\"aggregations\":{...}}

时序图详细说明

步骤1-2: HTTP请求接收与解析 (步骤1-7)

RestSearchAction:

public class RestSearchAction extends BaseRestHandler {
    @Override
    public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
        // 1. 解析路径参数
        String[] indices = Strings.splitStringByCommaToArray(request.param("index"));

        // 2. 解析查询参数
        int size = request.paramAsInt("size", 10);
        int from = request.paramAsInt("from", 0);
        TimeValue timeout = request.paramAsTime("timeout", null);
        String routing = request.param("routing");

        // 3. 解析请求体(Query DSL)
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        if (request.hasContent()) {
            XContentParser parser = request.contentParser();
            searchSourceBuilder.parseXContent(parser);
        }

        // 应用查询参数
        searchSourceBuilder.size(size);
        searchSourceBuilder.from(from);
        searchSourceBuilder.timeout(timeout);

        // 4. 构造SearchRequest
        SearchRequest searchRequest = new SearchRequest(indices);
        searchRequest.source(searchSourceBuilder);
        searchRequest.routing(routing);

        // 5. 执行搜索
        return channel -> client.search(searchRequest, new RestToXContentListener<>(channel));
    }
}

关键点:

  • 支持多索引搜索
  • Query DSL解析
  • 设置size、from、timeout等参数

步骤3: 搜索协调 (步骤8-12)

TransportSearchAction:

public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
    @Override
    protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
        // 1. 获取集群状态
        ClusterState clusterState = clusterService.state();

        // 2. 确定需要搜索的分片
        GroupShardsIterator<ShardIterator> shardIterators =
            clusterService.operationRouting()
                .searchShards(clusterState, searchRequest.indices(), searchRequest.routing());

        // 3. 为每个分片选择节点(自适应副本选择)
        Map<SearchShardTarget, ShardRouting> targetToRouting = new HashMap<>();
        for (ShardIterator shardIt : shardIterators) {
            ShardRouting shardRouting = selectShard(shardIt);
            SearchShardTarget target = new SearchShardTarget(
                shardRouting.currentNodeId(),
                shardRouting.shardId(),
                shardRouting.getIndexName()
            );
            targetToRouting.put(target, shardRouting);
        }

        // 4. 执行搜索
        executeSearch(targetToRouting, searchRequest, listener);
    }

    // 自适应副本选择
    private ShardRouting selectShard(ShardIterator shardIt) {
        // 选择响应最快的副本
        return adaptiveReplicaSelection.select(shardIt);
    }
}

关键点:

  • 确定需要搜索的分片(可能跨多个索引)
  • 自适应副本选择(选择最快的副本)
  • 当前节点作为协调节点

步骤4: Query Phase (步骤13-28)

SearchService执行查询:

public class SearchService {
    public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<QuerySearchResult> listener) {
        // 1. 创建SearchContext
        SearchContext context = createContext(request);

        try {
            // 2. 获取Searcher
            Engine.Searcher searcher = context.searcher();

            // 3. 执行查询
            QueryPhase.execute(context);

            // 4. 构造结果
            QuerySearchResult result = context.queryResult();
            listener.onResponse(result);

        } finally {
            // 清理资源
            cleanContext(context);
        }
    }
}

QueryPhase执行:

public class QueryPhase {
    public static void execute(SearchContext searchContext) {
        // 1. 构建Lucene Query
        Query query = searchContext.query();

        // 2. 执行搜索
        IndexSearcher searcher = searchContext.searcher().searcher();
        TopDocs topDocs = searcher.search(
            query,
            searchContext.from() + searchContext.size()  // 需要返回from+size个结果
        );

        // 3. 保存结果(仅docId和score)
        searchContext.queryResult().topDocs(
            new TopDocsAndMaxScore(topDocs, topDocs.scoreDocs.length > 0 ? topDocs.scoreDocs[0].score : Float.NaN),
            new DocValueFormat[0]
        );

        // 4. 执行聚合
        if (searchContext.aggregations() != null) {
            searchContext.aggregations().execute(searchContext);
        }
    }
}

关键点:

  • Query Phase仅返回docId和score
  • 每个分片返回from+size个结果
  • 同时执行聚合计算

步骤5: 协调节点归并结果 (步骤29-32)

归并排序:

public class SearchPhaseController {
    public ScoreDoc[] sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results, int numShards, int topN) {
        // 1. 收集所有分片的TopDocs
        List<ScoreDoc> allDocs = new ArrayList<>();
        for (SearchPhaseResult result : results) {
            TopDocsAndMaxScore topDocs = result.queryResult().topDocs();
            for (ScoreDoc doc : topDocs.topDocs.scoreDocs) {
                allDocs.add(doc);
            }
        }

        // 2. 归并排序
        Collections.sort(allDocs, (a, b) -> {
            // 按score降序排列
            int cmp = Float.compare(b.score, a.score);
            if (cmp == 0) {
                // score相同时按docId排序
                return Integer.compare(a.doc, b.doc);
            }
            return cmp;
        });

        // 3. 取Top N
        int from = ignoreFrom ? 0 : searchRequest.from();
        int size = Math.min(topN - from, allDocs.size() - from);
        return allDocs.subList(from, from + size).toArray(new ScoreDoc[0]);
    }
}

关键点:

  • 归并N个分片的结果
  • 全局排序取Top size
  • 深度分页性能问题(需要from+size个结果)

步骤6: Fetch Phase (步骤33-48)

FetchPhase执行:

public class FetchPhase {
    public void execute(SearchContext context) {
        // 1. 获取需要fetch的docId列表
        IntArrayList docIdsToLoad = new IntArrayList();
        ScoreDoc[] hits = context.docIdsToLoad();
        for (ScoreDoc hit : hits) {
            docIdsToLoad.add(hit.doc);
        }

        // 2. 批量获取文档
        IndexSearcher searcher = context.searcher().searcher();
        StoredFields storedFields = searcher.storedFields();

        SearchHit[] searchHits = new SearchHit[hits.length];
        for (int i = 0; i < hits.length; i++) {
            int docId = hits[i].doc;

            // 读取文档
            Document doc = storedFields.document(docId);

            // 构造SearchHit
            SearchHit searchHit = new SearchHit(docId);
            searchHit.score(hits[i].score);

            // 提取_source字段
            BytesReference source = doc.getBinaryValue("_source");
            searchHit.sourceRef(source);

            // 应用source过滤
            if (context.fetchSourceContext() != null) {
                source = applySourceFiltering(source, context.fetchSourceContext());
                searchHit.sourceRef(source);
            }

            // 应用高亮
            if (context.highlight() != null) {
                Map<String, HighlightField> highlightFields =
                    highlighter.highlight(context, docId);
                searchHit.highlightFields(highlightFields);
            }

            searchHits[i] = searchHit;
        }

        // 3. 保存结果
        context.fetchResult().hits(new SearchHits(searchHits, context.queryResult().getTotalHits(), context.queryResult().getMaxScore()));
    }
}

关键点:

  • 仅获取Top结果的完整文档
  • 支持source过滤(减少网络传输)
  • 支持高亮显示

步骤7: 组装最终结果 (步骤49-54)

构造SearchResponse:

SearchHit[] hits = mergeFetchResults(fetchResults);
Aggregations aggregations = reduceAggs(queryResults);

SearchResponse response = new SearchResponse(
    new InternalSearchResponse(
        new SearchHits(
            hits,
            totalHits,
            maxScore
        ),
        aggregations,
        suggest,
        timedOut,
        terminatedEarly
    ),
    scrollId,
    totalShards,
    successfulShards,
    skippedShards,
    tookInMillis,
    shardFailures
);

// JSON响应
{
  "took": 5,
  "timed_out": false,
  "_shards": {
    "total": 2,
    "successful": 2,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 100,
      "relation": "eq"
    },
    "max_score": 1.5,
    "hits": [
      {
        "_index": "my-index",
        "_id": "1",
        "_score": 1.5,
        "_source": {
          "field": "value"
        }
      }
    ]
  },
  "aggregations": {
    ...
  }
}

9. 关键配置

9.1 节点配置

配置项 默认值 说明
node.name 随机生成 节点名称
node.roles [master, data, ingest] 节点角色
path.data data/ 数据目录
path.logs logs/ 日志目录
cluster.name elasticsearch 集群名称
network.host localhost 绑定地址
http.port 9200-9300 HTTP 端口
transport.port 9300-9400 传输层端口

9.2 JVM 配置

配置项 推荐值 说明
-Xms 50% 物理内存 堆内存初始大小
-Xmx 50% 物理内存 堆内存最大大小
-XX:+UseG1GC - 使用 G1 垃圾回收器
-Xlog:gc* - GC 日志

10. 监控与可观测

10.1 节点状态 API

GET /_nodes/_local

{
  "nodes": {
    "node_id": {
      "name": "node-1",
      "version": "8.10.0",
      "roles": ["master", "data", "ingest"],
      "jvm": {
        "version": "21.0.0",
        "mem": {
          "heap_used_in_bytes": 1073741824,
          "heap_max_in_bytes": 2147483648
        }
      },
      "process": {
        "open_file_descriptors": 512,
        "max_file_descriptors": 65536
      }
    }
  }
}

10.2 关键监控指标

JVM 指标:

  • 堆内存使用率
  • GC 频率和耗时
  • 线程数

系统指标:

  • CPU 使用率
  • 文件描述符
  • 磁盘 I/O

Elasticsearch 指标:

  • 索引吞吐量
  • 搜索延迟
  • 拒绝请求数