Elasticsearch-04-集群模块

本文档提供集群模块的全面剖析,包括模块职责、架构设计、核心数据结构、主节点选举机制、集群状态管理、对外API详细规格、关键流程时序图、配置调优和故障排查。


1. 模块职责

集群模块是 Elasticsearch 实现分布式协调的核心,负责以下关键功能:

1.1 核心职责

  1. 节点发现与加入

    • 自动发现集群中的其他节点
    • 新节点加入集群的握手与验证
    • 节点离开/失联的检测
  2. 主节点选举

    • 基于 Raft 协议的 Leader 选举
    • 主节点故障时的自动切换
    • 脑裂防护 (Split-brain Protection)
  3. 集群状态管理

    • 维护全局集群状态 (Cluster State)
    • 集群状态的增量更新与发布
    • 集群状态的版本控制与持久化
  4. 分片分配与路由

    • 决定分片在哪些节点上分配
    • 分片重平衡 (Rebalancing)
    • 分片路由表管理
  5. 集群级别协调

    • 集群设置管理
    • 集群范围的元数据操作 (索引创建/删除、模板管理)
    • 集群健康监控

1.2 输入与输出

输入:

  • 节点加入/离开事件
  • 集群状态变更请求 (索引操作、设置更新等)
  • 分片状态报告 (来自 Data 节点)

输出:

  • 最新的集群状态 (分发到所有节点)
  • 分片分配决策
  • 集群健康状态
  • 主节点选举结果

1.3 依赖关系

上游依赖:

  • Transport Layer: 节点间通信
  • Discovery Module: 节点发现
  • Allocation Service: 分片分配策略

下游依赖者:

  • Index Module: 依赖集群状态获取分片路由
  • Search Module: 依赖集群状态查找分片位置
  • Gateway Module: 从持久化存储恢复集群状态

2. 模块架构

2.1 整体服务架构图

整体服务架构图展示了从 REST API 层到底层协调层的完整调用链路:

flowchart TB
    subgraph "REST API 层"
        RestClusterHealthAction[RestClusterHealthAction<br/>REST健康接口]
        RestClusterStateAction[RestClusterStateAction<br/>REST状态接口]
        RestClusterSettingsAction[RestClusterSettingsAction<br/>REST设置接口]
    end

    subgraph "Transport Action 层"
        TransportClusterHealthAction[TransportClusterHealthAction<br/>健康查询Action]
        TransportClusterStateAction[TransportClusterStateAction<br/>状态查询Action]
        TransportCreateIndexAction[TransportCreateIndexAction<br/>索引创建Action]
        TransportClusterRerouteAction[TransportClusterRerouteAction<br/>重路由Action]
    end

    subgraph "集群服务层 (Cluster Service Layer)"
        ClusterService[ClusterService<br/>集群服务]
        MasterService[MasterService<br/>主节点服务]
        ClusterApplierService[ClusterApplierService<br/>状态应用服务]
    end

    subgraph "协调层 (Coordination Layer)"
        Coordinator[Coordinator<br/>集群协调器]
        PeerFinder[PeerFinder<br/>节点发现]
        JoinHelper[JoinHelper<br/>加入辅助器]
        PublicationTransportHandler[PublicationTransportHandler<br/>发布传输处理器]
        LeaderChecker[LeaderChecker<br/>主节点检查器]
        FollowersChecker[FollowersChecker<br/>跟随者检查器]
    end

    subgraph "状态管理层 (State Management)"
        ClusterState[ClusterState<br/>集群状态]
        CoordinationState[CoordinationState<br/>协调状态]
        ClusterStatePublisher[ClusterStatePublisher<br/>状态发布器]
        ClusterStateTaskQueue[ClusterStateTaskQueue<br/>状态更新任务队列]
    end

    subgraph "分片分配层 (Allocation Layer)"
        AllocationService[AllocationService<br/>分配服务]
        DesiredBalanceShardsAllocator[DesiredBalanceShardsAllocator<br/>期望平衡分配器]
        AllocationDeciders[AllocationDeciders<br/>分配决策器集]
        DiskThresholdDecider[DiskThresholdDecider<br/>磁盘阈值决策器]
        FilterAllocationDecider[FilterAllocationDecider<br/>过滤分配决策器]
    end

    subgraph "传输层 (Transport Layer)"
        TransportService[TransportService<br/>传输服务]
    end

    %% REST API 到 Transport Action
    RestClusterHealthAction -->|解析请求| TransportClusterHealthAction
    RestClusterStateAction -->|解析请求| TransportClusterStateAction
    RestClusterSettingsAction -->|解析请求| TransportCreateIndexAction

    %% Transport Action 到 Cluster Service
    TransportClusterHealthAction -->|读取状态| ClusterService
    TransportClusterStateAction -->|读取状态| ClusterService
    TransportCreateIndexAction -->|提交任务| MasterService
    TransportClusterRerouteAction -->|提交任务| MasterService

    %% Cluster Service 内部
    ClusterService -->|状态更新| MasterService
    ClusterService -->|状态应用| ClusterApplierService
    ClusterService -->|获取状态| ClusterState

    %% Master Service 到 Coordinator
    MasterService -->|任务队列| ClusterStateTaskQueue
    MasterService -->|发布状态| Coordinator
    MasterService -->|分片分配| AllocationService

    %% Coordinator 内部协调
    Coordinator -->|状态发布| ClusterStatePublisher
    Coordinator -->|节点发现| PeerFinder
    Coordinator -->|节点加入| JoinHelper
    Coordinator -->|维护协调状态| CoordinationState
    Coordinator -->|主节点检查| LeaderChecker
    Coordinator -->|跟随者检查| FollowersChecker

    %% ClusterStatePublisher 到传输层
    ClusterStatePublisher -->|序列化发送| PublicationTransportHandler
    PublicationTransportHandler -->|网络传输| TransportService

    %% AllocationService 内部
    AllocationService -->|调用分配器| DesiredBalanceShardsAllocator
    AllocationService -->|应用决策器| AllocationDeciders
    AllocationDeciders -->|磁盘检查| DiskThresholdDecider
    AllocationDeciders -->|过滤检查| FilterAllocationDecider

    %% 状态同步
    ClusterStatePublisher -.->|发布完成| ClusterApplierService
    ClusterApplierService -.->|更新本地状态| ClusterState

    style RestClusterHealthAction fill:#e1f5ff
    style TransportClusterHealthAction fill:#bbdefb
    style ClusterService fill:#90caf9
    style Coordinator fill:#64b5f6
    style MasterService fill:#42a5f5
    style AllocationService fill:#2196f3

2.2 模块交互关系图

模块交互关系图展示了集群模块内部各组件之间的依赖和调用关系:

flowchart LR
    subgraph "API接口层"
        HealthAPI[Cluster Health API]
        StateAPI[Cluster State API]
        SettingsAPI[Cluster Settings API]
        RerouteAPI[Cluster Reroute API]
    end

    subgraph "协调服务"
        Coordinator[Coordinator]
        JoinHelper[JoinHelper]
        PublicationHandler[PublicationTransportHandler]
    end

    subgraph "主节点服务"
        MasterService[MasterService]
        TaskQueue[TaskQueue]
    end

    subgraph "状态管理"
        ClusterState[ClusterState]
        CoordinationState[CoordinationState]
    end

    subgraph "分配服务"
        AllocationService[AllocationService]
        ShardsAllocator[ShardsAllocator]
        Deciders[AllocationDeciders]
    end

    subgraph "应用服务"
        ClusterApplier[ClusterApplierService]
        Listeners[ClusterStateListener]
    end

    %% 调用关系
    HealthAPI -->|查询| ClusterState
    StateAPI -->|查询| ClusterState
    SettingsAPI -->|提交任务| TaskQueue
    RerouteAPI -->|提交任务| TaskQueue

    TaskQueue -->|批量执行| MasterService
    MasterService -->|执行任务| AllocationService
    MasterService -->|发布状态| Coordinator

    Coordinator -->|处理加入| JoinHelper
    Coordinator -->|发布状态| PublicationHandler
    Coordinator -->|维护状态| CoordinationState

    AllocationService -->|调用分配器| ShardsAllocator
    AllocationService -->|应用决策| Deciders
    AllocationService -->|更新路由| ClusterState

    PublicationHandler -.->|2PC提交| ClusterApplier
    ClusterApplier -->|通知监听器| Listeners
    ClusterApplier -->|更新本地状态| ClusterState

2.3 架构说明

层次划分

1. REST API 层

  • 职责: 接收和解析 HTTP 请求,将其转换为内部请求对象
  • 核心组件: RestClusterHealthAction, RestClusterStateAction, RestClusterSettingsAction
  • 入口路径:
    • GET /_cluster/health → RestClusterHealthAction
    • GET /_cluster/state → RestClusterStateAction
    • PUT /_cluster/settings → RestClusterSettingsAction
  • 处理流程: 解析 URL 参数 → 构建请求对象 → 调用 NodeClient → 返回响应

2. Transport Action 层

  • 职责: 处理内部传输请求,实现具体业务逻辑
  • 核心组件:
    • TransportClusterHealthAction: 计算集群健康状态
    • TransportClusterStateAction: 读取和过滤集群状态
    • TransportCreateIndexAction: 处理索引创建
  • 特性:
    • 支持本地和远程调用
    • 实现超时控制
    • 支持任务取消

3. 集群服务层 (Cluster Service Layer)

  • 职责: 提供集群状态的统一访问接口
  • 核心组件:
    • ClusterService: 总入口,聚合 MasterService 和 ClusterApplierService
    • MasterService: 主节点上执行状态更新
    • ClusterApplierService: 所有节点上应用状态变更
  • 关键特性:
    • 读写分离(MasterService 写,ClusterApplierService 读)
    • 任务队列化
    • 批量执行

4. 协调层 (Coordination Layer)

  • 职责: 实现 Raft 协议,管理主节点选举和节点加入
  • 核心组件:
    • Coordinator: 协调器,管理选举和状态发布
    • PeerFinder: 发现集群中的其他节点
    • JoinHelper: 处理节点加入请求
    • LeaderChecker: 跟随者检查主节点健康
    • FollowersChecker: 主节点检查跟随者健康
  • 关键特性:
    • 基于 Raft 的一致性保证
    • 脑裂防护
    • 自动故障转移

5. 状态管理层 (State Management Layer)

  • 职责: 维护和分发集群状态
  • 核心组件:
    • ClusterState: 不可变集群状态对象
    • CoordinationState: Raft 协调状态
    • ClusterStatePublisher: 状态发布器接口
    • ClusterStateTaskQueue: 状态更新任务队列
  • 关键特性:
    • 版本控制(版本号严格递增)
    • 增量更新(发送 diff 而非全量状态)
    • 两阶段提交(Publish + Commit)

6. 分片分配层 (Allocation Layer)

  • 职责: 决定分片在节点上的分配和移动
  • 核心组件:
    • AllocationService: 分配服务总入口
    • DesiredBalanceShardsAllocator: 期望平衡分配器
    • AllocationDeciders: 分配决策器集合
    • DiskThresholdDecider: 检查磁盘空间
    • FilterAllocationDecider: 根据节点属性过滤
  • 关键特性:
    • 负载均衡
    • 约束满足(磁盘、机架感知等)
    • 限流控制(避免同时移动过多分片)

边界条件

  • 并发: 集群状态更新在 Master 节点上串行化执行
  • 超时:
    • 选举超时 (election timeout): 默认 30s
    • 状态发布超时 (publish timeout): 默认 30s
    • 加入超时 (join timeout): 默认 60s
  • 幂等: 集群状态更新使用版本号保证幂等性
  • 顺序: 集群状态版本号严格递增

异常处理

  • 主节点失联: 触发新的主节点选举
  • 网络分区: 少数派停止服务,多数派继续运行
  • 脑裂防护: 基于 quorum (多数派) 机制
  • 状态发布失败: 重试,达到阈值后节点离开集群

性能要点

  • 状态发布: 增量式,仅发送变更的 diff
  • 批量更新: 多个任务合并为一次状态发布
  • 压缩: 状态发布时使用压缩
  • 分片数量: 单集群建议不超过 10 万个分片

3. 核心组件详解

3.1 Coordinator (集群协调器)

职责

Coordinator 是集群协调的中心组件,实现 Raft 协议的核心逻辑。

核心方法

public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
    // 模式: CANDIDATE(候选者), LEADER(主节点), FOLLOWER(跟随者)
    private Mode mode = Mode.CANDIDATE;

    // 当前任期
    private long currentTerm = 0;

    // 协调状态
    private final SetOnce<CoordinationState> coordinationState = new SetOnce<>();

    // 启动选举
    private void startElection() {
        synchronized (mutex) {
            if (mode == Mode.CANDIDATE) {
                final long electionTerm = getTermForNewElection();
                broadcastStartJoinRequest(getLocalNode(), electionTerm, getDiscoveredNodes());
            }
        }
    }

    // 成为候选者
    void becomeCandidate(String method) {
        mode = Mode.CANDIDATE;
        peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
        preVoteCollector.update(getPreVoteResponse(), null);
    }

    // 成为主节点
    private void becomeLeader() {
        mode = Mode.LEADER;
        peerFinder.deactivate(getLocalNode());
        clusterFormationFailureHelper.stop();

        // 启动心跳和健康检查
        leaderHeartbeatService.start(coordinationState.get(), this::publishClusterState);
        followersChecker.updateFastResponseState(getCurrentTerm(), mode);
        lagDetector.startLagDetector(getCurrentTerm());
    }

    // 成为跟随者
    private void becomeFollower(DiscoveryNode leaderNode) {
        mode = Mode.FOLLOWER;
        peerFinder.deactivate(leaderNode);

        // 启动主节点健康检查
        leaderChecker.updateLeader(leaderNode);
    }
}

状态机

stateDiagram-v2
    [*] --> CANDIDATE: 启动

    CANDIDATE --> CANDIDATE: 选举超时<br/>重新发起选举
    CANDIDATE --> LEADER: 获得多数派投票
    CANDIDATE --> FOLLOWER: 发现更高任期的主节点

    LEADER --> CANDIDATE: 失去多数派支持
    LEADER --> FOLLOWER: 发现更高任期的主节点

    FOLLOWER --> CANDIDATE: 主节点心跳超时
    FOLLOWER --> FOLLOWER: 收到主节点心跳

    LEADER --> [*]: 节点停止
    FOLLOWER --> [*]: 节点停止
    CANDIDATE --> [*]: 节点停止

3.2 ClusterState (集群状态)

结构

public class ClusterState implements Diffable<ClusterState> {
    // 版本号(严格递增)
    private final long version;

    // 集群 UUID
    private final String stateUUID;

    // 路由表(分片分配)
    private final RoutingTable routingTable;

    // 节点信息
    private final DiscoveryNodes nodes;

    // 元数据(索引、模板、设置等)
    private final Metadata metadata;

    // 集群范围的阻塞
    private final ClusterBlocks blocks;

    // 协调元数据(任期、投票配置等)
    private final CoordinationMetadata coordinationMetadata;
}

关键字段

字段 类型 说明
version long 集群状态版本号,每次更新递增
stateUUID String 状态唯一标识符
routingTable RoutingTable 所有索引的分片路由信息
nodes DiscoveryNodes 集群中的所有节点
metadata Metadata 索引元数据、映射、设置、模板
blocks ClusterBlocks 集群级和索引级的操作阻塞
coordinationMetadata CoordinationMetadata Raft 协调相关元数据

更新机制

// 集群状态更新流程
public ClusterState execute(ClusterState currentState) {
    // 1. 基于当前状态创建 Builder
    ClusterState.Builder builder = ClusterState.builder(currentState);

    // 2. 应用变更
    builder.incrementVersion();
    builder.metadata(newMetadata);
    builder.routingTable(newRoutingTable);

    // 3. 构建新状态
    return builder.build();
}

3.3 MasterService (主节点服务)

职责

MasterService 是主节点上的核心服务,负责:

  • 接收集群状态更新任务
  • 串行化执行任务
  • 发布新的集群状态

任务队列

public class MasterService extends AbstractLifecycleComponent {
    // 任务队列(按优先级)
    private final PrioritizedEsThreadPoolExecutor threadPoolExecutor;

    // 提交集群状态更新任务
    public <T> void submitStateUpdateTask(
        String source,
        T task,
        ClusterStateTaskConfig config,
        ClusterStateTaskExecutor<T> executor,
        ClusterStateTaskListener listener
    ) {
        // 1. 包装为任务
        TaskInputs taskInputs = new TaskInputs(task, source, listener);

        // 2. 加入队列
        threadPoolExecutor.execute(
            taskInputs,
            config.getThreadPool(),
            batchExecutionContext -> {
                // 3. 批量执行任务
                executeTasks(batchExecutionContext, executor);
            }
        );
    }

    // 批量执行任务
    private <T> void executeTasks(
        BatchExecutionContext<T> context,
        ClusterStateTaskExecutor<T> executor
    ) {
        ClusterState previousState = clusterService.state();

        // 1. 执行任务获取新状态
        ClusterTasksResult<T> result = executor.execute(previousState, context.tasks);
        ClusterState newState = result.resultingState;

        // 2. 发布新状态
        if (newState != previousState) {
            publishClusterState(newState, context);
        }

        // 3. 通知任务结果
        notifyListeners(context, result);
    }
}

任务优先级

优先级 用途 示例
IMMEDIATE 紧急任务 节点离开
URGENT 高优先级任务 分片失败
HIGH 高优先级 分片分配
NORMAL 正常任务 索引创建
LOW 低优先级 设置更新
LANGUID 最低优先级 统计更新

3.4 AllocationService (分片分配服务)

职责

决定每个分片应该在哪个节点上分配,实现负载均衡。

分配流程

flowchart TB
    Start[开始] --> UnassignedShards[获取未分配分片列表]
    UnassignedShards --> Primary{是主分片?}

    Primary -->|是| PrimaryAlloc[主分片分配]
    Primary -->|否| ReplicaAlloc[副本分片分配]

    PrimaryAlloc --> FindNodes[查找可用节点]
    ReplicaAlloc --> FindNodes

    FindNodes --> Deciders[应用分配决策器]

    Deciders --> CanAllocate{可以分配?}
    CanAllocate -->|是| Allocate[分配分片]
    CanAllocate -->|否| NextShard[下一个分片]

    Allocate --> Balance[重平衡]
    Balance --> End[结束]
    NextShard --> End

分配决策器

// 分配决策器接口
public interface AllocationDecider {
    Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation);
    Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation);
    Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation);
}

// 内置决策器示例
public class DiskThresholdDecider extends AllocationDecider {
    @Override
    public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
        // 检查磁盘使用率
        if (diskUsagePercentage > highWatermark) {
            return Decision.NO;  // 磁盘不足,不允许分配
        }
        return Decision.YES;
    }
}

常用决策器

决策器 作用
SameShardAllocationDecider 同一分片的主副本不能在同一节点
FilterAllocationDecider 根据节点属性过滤 (rack awareness)
DiskThresholdDecider 检查磁盘空间
ShardsLimitAllocationDecider 限制每个节点的分片数量
AwarenessAllocationDecider 机架/可用区感知
ThrottlingAllocationDecider 限流,避免同时移动过多分片

4. 核心数据结构

4.1 ClusterState 与相关类

classDiagram
    class ClusterState {
        -long version
        -String stateUUID
        -ClusterName clusterName
        -DiscoveryNodes nodes
        -Metadata metadata
        -RoutingTable routingTable
        -ClusterBlocks blocks
        -Map~String,Custom~ customs
        -Map~String,CompatibilityVersions~ compatibilityVersions
        +builder(ClusterName)
        +diff(ClusterState)
        +incrementVersion()
        +getRoutingNodes()
    }

    class Metadata {
        -String clusterUUID
        -long version
        -Settings transientSettings
        -Settings persistentSettings
        -Map~String,IndexMetadata~ indices
        -Map~String,IndexTemplateMetadata~ templates
        -CoordinationMetadata coordinationMetadata
        +getTotalNumberOfShards()
        +builder()
    }

    class RoutingTable {
        -Map~String,IndexRoutingTable~ indicesRouting
        +index(String)
        +shardRoutingTable(ShardId)
        +hasIndex(String)
        +builder()
    }

    class DiscoveryNodes {
        -Map~String,DiscoveryNode~ nodes
        -String masterNodeId
        -String localNodeId
        +getMasterNode()
        +getLocalNode()
        +getSize()
        +getDataNodes()
    }

    class ClusterBlocks {
        -Set~ClusterBlock~ global
        -Map~String,Set~ClusterBlock~~ indices
        +hasGlobalBlock(ClusterBlockLevel)
        +hasIndexBlock(String, ClusterBlock)
    }

    class CoordinationMetadata {
        -long term
        -VotingConfiguration lastCommittedConfiguration
        -VotingConfiguration lastAcceptedConfiguration
        -Set~VotingConfigExclusion~ votingConfigExclusions
        +getTerm()
        +getLastCommittedConfiguration()
    }

    ClusterState --> Metadata
    ClusterState --> RoutingTable
    ClusterState --> DiscoveryNodes
    ClusterState --> ClusterBlocks
    Metadata --> CoordinationMetadata

4.2 Coordinator State Machine

classDiagram
    class Coordinator {
        -Mode mode
        -long currentTerm
        -Optional~DiscoveryNode~ lastKnownLeader
        -CoordinationState coordinationState
        -PeerFinder peerFinder
        -JoinHelper joinHelper
        -MasterService masterService
        +startElection()
        +becomeCandidate()
        +becomeLeader()
        +becomeFollower()
    }

    class CoordinationState {
        -long currentTerm
        -long lastAcceptedTerm
        -long lastAcceptedVersion
        -ClusterState lastAcceptedState
        -VotingConfiguration lastCommittedConfiguration
        -VotingConfiguration lastAcceptedConfiguration
        -Map~DiscoveryNode,Join~ joinVotes
        +handleJoin(Join)
        +handlePublishRequest(PublishRequest)
        +handleCommit(Commit)
    }

    class Mode {
        <<enumeration>>
        CANDIDATE
        LEADER
        FOLLOWER
    }

    class Join {
        -DiscoveryNode votingNode
        -DiscoveryNode masterNode
        -long term
        -long lastAcceptedTerm
        -long lastAcceptedVersion
    }

    class PublishRequest {
        -ClusterState clusterState
        +getAcceptedState()
    }

    class Commit {
        -long term
        -long version
    }

    Coordinator --> Mode
    Coordinator --> CoordinationState
    CoordinationState --> Join
    CoordinationState --> PublishRequest
    CoordinationState --> Commit

5. 对外 API

5.1 API 清单

集群模块对外提供以下核心 API:

API 名称 HTTP 方法 路径 幂等性 说明
Cluster Health GET /_cluster/health 集群健康状态
Cluster State GET /_cluster/state 集群状态
Cluster Stats GET /_cluster/stats 集群统计
Cluster Settings GET/PUT /_cluster/settings GET是,PUT否 集群设置
Cluster Reroute POST /_cluster/reroute 分片重新路由

5.2 Cluster Health API

基本信息

  • 名称: _cluster/health
  • 协议与方法: GET /_cluster/health 或 GET /_cluster/health/{index}
  • 幂等性: 是(查询操作)
  • 入口 Action: TransportClusterHealthAction

请求结构体

public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthRequest> {
    // 目标索引(可选,空表示整个集群)
    private String[] indices;

    // 索引选项
    private IndicesOptions indicesOptions = IndicesOptions.lenientExpandHidden();

    // 超时时间
    private TimeValue timeout = TimeValue.timeValueSeconds(30);

    // 等待集群达到指定状态
    private ClusterHealthStatus waitForStatus;

    // 等待无正在重新分配的分片
    private boolean waitForNoRelocatingShards = false;

    // 等待无正在初始化的分片
    private boolean waitForNoInitializingShards = false;

    // 等待指定数量的活跃分片
    private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;

    // 等待指定数量的节点
    private String waitForNodes = "";

    // 等待指定优先级的事件完成
    private Priority waitForEvents = null;
}

响应结构体

public class ClusterHealthResponse extends ActionResponse {
    // 集群名称
    private String clusterName;

    // 集群健康状态
    private ClusterHealthStatus status;

    // 是否超时
    private boolean timedOut;

    // 节点数量
    private int numberOfNodes;
    private int numberOfDataNodes;

    // 分片统计
    private int activePrimaryShards;
    private int activeShards;
    private int relocatingShards;
    private int initializingShards;
    private int unassignedShards;
    private int delayedUnassignedShards;

    // 待处理任务数量
    private int numberOfPendingTasks;
    private int numberOfInFlightFetch;

    // 活跃分片百分比
    private double activeShardsPercent;

    // 最长任务等待时间
    private TimeValue taskMaxWaitingTime;

    // 索引级别的健康状态(如果请求了特定索引)
    private Map<String, ClusterIndexHealth> indices;
}

ClusterHealthStatus 枚举

状态 说明
GREEN 0 所有主分片和副本分片都已分配
YELLOW 1 所有主分片已分配,但部分副本分片未分配
RED 2 部分主分片未分配

最佳实践

# 1. 查询整个集群健康
GET /_cluster/health

# 2. 等待集群变为 green 状态(最多等待 30s)
GET /_cluster/health?wait_for_status=green&timeout=30s

# 3. 查询特定索引的健康
GET /_cluster/health/my_index

# 4. 等待所有分片活跃
GET /_cluster/health?wait_for_active_shards=all

# 5. 等待至少 3 个节点
GET /_cluster/health?wait_for_nodes=>=3

Cluster Health API 完整调用链路

下图展示了从 REST 接口到集群状态读取的完整调用链路:

sequenceDiagram
    autonumber
    participant Client as HTTP Client
    participant RestHandler as RestClusterHealthAction
    participant NodeClient as NodeClient
    participant Transport as TransportClusterHealthAction
    participant ClusterService as ClusterService
    participant ClusterState as ClusterState<br/>(当前集群状态)
    participant Observer as ClusterStateObserver<br/>(可选,用于等待)

    Client->>RestHandler: GET /_cluster/health?wait_for_status=green&timeout=30s
    activate RestHandler

    Note over RestHandler: 1. 解析请求参数
    RestHandler->>RestHandler: fromRequest(request)<br/>- 解析 indices<br/>- 解析 timeout<br/>- 解析 waitForStatus<br/>- 解析 waitForNodes 等

    RestHandler->>RestHandler: 构建 ClusterHealthRequest

    Note over RestHandler: 2. 调用 NodeClient
    RestHandler->>NodeClient: client.admin().cluster().health(request)
    activate NodeClient

    NodeClient->>Transport: execute(ClusterHealthRequest)
    deactivate NodeClient
    activate Transport

    Note over Transport: 3. Transport Action 处理
    Transport->>Transport: masterOperation(task, request, state, listener)

    Transport->>ClusterService: clusterService.state()
    activate ClusterService
    ClusterService-->>Transport: 返回当前 ClusterState
    deactivate ClusterService

    Note over Transport: 4. 判断是否需要等待
    alt 需要等待 (wait_for_status 指定)
        Transport->>Transport: executeHealth(request, currentState, ...)
        Transport->>Transport: validateRequest(request, currentState, ...)

        alt 当前状态不满足条件
            Transport->>Observer: new ClusterStateObserver(...)<br/>waitForNextChange(...)
            activate Observer

            Note over Observer: 等待集群状态变更
            Observer->>Observer: 监听 ClusterStateListener

            loop 等待状态满足或超时
                Observer->>ClusterService: 监听状态变更事件
                ClusterService-->>Observer: onNewClusterState(newState)
                Observer->>Observer: acceptableClusterStatePredicate.test(newState)
            end

            Observer-->>Transport: 状态满足或超时
            deactivate Observer
        end
    end

    Note over Transport: 5. 计算集群健康
    Transport->>Transport: clusterHealth(request, clusterState, ...)<br/>- 解析索引名<br/>- 计算分片统计<br/>- 计算健康状态

    Transport->>ClusterState: 读取 nodes, routing, metadata
    activate ClusterState
    ClusterState-->>Transport: 节点信息、路由表、元数据
    deactivate ClusterState

    Transport->>Transport: 构建 ClusterHealthResponse<br/>- status (GREEN/YELLOW/RED)<br/>- numberOfNodes<br/>- activePrimaryShards<br/>- activeShards<br/>- unassignedShards

    Transport-->>RestHandler: ClusterHealthResponse
    deactivate Transport

    RestHandler->>RestHandler: RestToXContentListener.buildResponse()

    RestHandler-->>Client: 200 OK<br/>JSON Response
    deactivate RestHandler

调用链路说明

阶段1:REST层处理(步骤1-3)

  • 入口:RestClusterHealthAction.prepareRequest()
  • 职责:解析HTTP请求参数,构建 ClusterHealthRequest 对象
  • 关键代码:
    // 文件:RestClusterHealthAction.java:64-101
    public static ClusterHealthRequest fromRequest(final RestRequest request) {
        String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
        final var masterNodeTimeout = request.hasParam(RestUtils.REST_MASTER_TIMEOUT_PARAM)
            ? RestUtils.getMasterNodeTimeout(request)
            : request.paramAsTime("timeout", RestUtils.REST_MASTER_TIMEOUT_DEFAULT);
        final ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest(masterNodeTimeout, indices);
        // 解析各种等待条件
        String waitForStatus = request.param("wait_for_status");
        if (waitForStatus != null) {
            clusterHealthRequest.waitForStatus(ClusterHealthStatus.valueOf(waitForStatus.toUpperCase()));
        }
        return clusterHealthRequest;
    }
    

阶段2:Transport Action 处理(步骤4-6)

  • 入口:TransportClusterHealthAction.masterOperation()
  • 职责:从 ClusterService 获取当前集群状态,判断是否需要等待
  • 关键代码:
    // 文件:TransportClusterHealthAction.java:97-129
    protected void masterOperation(
        final Task task,
        final ClusterHealthRequest request,
        final ClusterState state,
        final ActionListener<ClusterHealthResponse> listener
    ) {
        final int waitCount = getWaitCount(request);
        if (request.waitForEvents() != null) {
            waitForEventsAndExecuteHealth(...);
        } else {
            executeHealth(
                request,
                clusterService.state(),
                listener,
                waitCount,
                clusterState -> sendResponse(...)
            );
        }
    }
    

阶段3:等待条件判断(步骤7-11,可选)

  • 如果请求指定了 wait_for_statuswait_for_active_shards 等条件
  • 使用 ClusterStateObserver 监听集群状态变更
  • 当状态满足条件或超时时,继续执行
  • 关键代码:
    // 文件:TransportClusterHealthAction.java:260-297
    private void executeHealth(...) {
        final Predicate<ClusterState> validationPredicate = newState -> validateRequest(request, newState, ...);
        if (validationPredicate.test(currentState)) {
            sendResponse(request, currentState, ...);  // 状态已满足
        } else {
            // 创建观察者,等待状态变更
            final ClusterStateObserver observer = new ClusterStateObserver(...);
            observer.waitForNextChange(new ClusterStateObserver.Listener() {
                @Override
                public void onNewClusterState(ClusterState newState) {
                    if (validationPredicate.test(newState)) {
                        sendResponse(request, newState, ...);
                    }
                }
                @Override
                public void onTimeout(TimeValue timeout) {
                    sendResponse(request, observer.setAndGetObservedState(), ...);
                }
            });
        }
    }
    

阶段4:健康状态计算(步骤12-15)

  • 入口:TransportClusterHealthAction.clusterHealth()
  • 职责:遍历集群状态,统计各项指标,计算健康状态
  • 关键逻辑:
    // 文件:TransportClusterHealthAction.java:491-533
    private ClusterHealthResponse clusterHealth(
        ClusterHealthRequest request,
        ClusterState clusterState,
        ...
    ) {
        String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(...);
    
        return new ClusterHealthResponse(
            clusterState.getClusterName().value(),
            concreteIndices,
            clusterState,        // 从这里读取:
                                // - nodes.size()
                                // - routing.allShards()
                                // - metadata.indices()
            numberOfPendingTasks,
            numberOfInFlightFetch,
            UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
            pendingTaskTimeInQueue
        );
    }
    

健康状态计算规则

  • GREEN: 所有主分片和副本分片都已分配
  • YELLOW: 所有主分片已分配,但部分副本分片未分配
  • RED: 部分主分片未分配

阶段5:响应返回(步骤16-17)

  • ClusterHealthResponse 序列化为 JSON
  • 通过 HTTP 返回给客户端

性能要点

  1. 快速路径:如果不需要等待,直接返回当前状态,耗时通常 < 10ms
  2. 等待路径:使用 ClusterStateObserver,不会阻塞线程,而是注册回调
  3. 超时控制:请求超时后立即返回当前状态,避免无限等待
  4. 异步处理:整个流程是异步的,使用 ActionListener 传递结果

6. 核心流程时序图

6.1 主节点选举流程 (Raft)

sequenceDiagram
    autonumber
    participant N1 as Node 1<br/>(CANDIDATE)
    participant N2 as Node 2<br/>(FOLLOWER)
    participant N3 as Node 3<br/>(FOLLOWER)

    Note over N1,N3: 触发选举:心跳超时或节点启动

    N1->>N1: becomeCandidate()<br/>term = 1
    N1->>N1: ElectionScheduler 触发

    Note over N1,N3: Phase 1: Pre-Vote (预投票)

    N1->>N1: PreVoteCollector.start()

    par 并发发送预投票请求
        N1->>N2: PreVoteRequest<br/>(term=1, lastAcceptedTerm=0, lastAcceptedVersion=0)
        N2->>N2: 检查条件:<br/>1. term >= currentTerm<br/>2. lastAccepted >= own
        N2-->>N1: PreVoteResponse(granted=true)
    and
        N1->>N3: PreVoteRequest<br/>(term=1)
        N3->>N3: 检查条件
        N3-->>N1: PreVoteResponse(granted=true)
    end

    N1->>N1: 收集到 quorum (2/3)<br/>preVote 成功

    Note over N1,N3: Phase 2: Start Join (正式投票)

    N1->>N1: currentTerm++<br/>term = 2
    N1->>N1: JoinHelper.new CandidateJoinAccumulator()

    par 并发广播选举请求
        N1->>N2: StartJoinRequest<br/>(sourceNode=N1, term=2)
        N2->>N2: 检查 term (2 > 1)<br/>becomeFollower()
        N2->>N2: 构建 Join
        N2->>N1: Join(votingNode=N2, masterNode=N1, term=2)
        N1->>N1: joinAccumulator.addJoin(N2)
    and
        N1->>N3: StartJoinRequest<br/>(term=2)
        N3->>N3: becomeFollower()
        N3->>N3: 构建 Join
        N3->>N1: Join(votingNode=N3, term=2)
        N1->>N1: joinAccumulator.addJoin(N3)
    end

    N1->>N1: 检查 joinVotes.size() >= quorum<br/>2 votes >= 2 (quorum of 3)
    N1->>N1: becomeLeader()<br/>mode = LEADER

    Note over N1,N3: Phase 3: Publish Initial State

    N1->>N1: CoordinationState.handleStartJoin()<br/>更新 lastCommittedConfiguration

    N1->>N1: 构建初始集群状态:<br/>version++, masterNodeId=N1

    par 发布集群状态 (2-Phase Commit)
        N1->>N2: PublishRequest<br/>(ClusterState)
        N2->>N2: CoordinationState.handlePublishRequest()<br/>验证 term 和 version
        N2->>N2: 应用状态到本地
        N2-->>N1: PublishResponse(committed=false)
    and
        N1->>N3: PublishRequest<br/>(ClusterState)
        N3->>N3: handlePublishRequest()
        N3->>N3: 应用状态
        N3-->>N1: PublishResponse(committed=false)
    end

    N1->>N1: 等待 quorum ack (2/3)
    N1->>N1: 状态已提交

    par 发送 Commit 消息
        N1->>N2: Commit(term=2, version=N)
        N2->>N2: CoordinationState.handleCommit()<br/>标记状态为 committed
        N2->>N2: applyCommittedState()
    and
        N1->>N3: Commit(term=2, version=N)
        N3->>N3: handleCommit()
        N3->>N3: applyCommittedState()
    end

    Note over N1,N3: 选举完成,开始正常服务

    N1->>N1: LeaderHeartbeatService.start()

    loop 定期心跳
        N1->>N2: Heartbeat (PublishRequest with no-op state)
        N1->>N3: Heartbeat
    end

时序图说明

阶段划分:

1. Pre-Vote Phase (预投票)

  • 目的: 避免不必要的选举,减少集群扰动
  • 条件: 候选者的 lastAcceptedState 不能落后于多数派
  • 机制: 不增加 term,仅检查是否有可能获胜

2. Start Join Phase (正式投票)

  • 目的: 获得多数派的正式投票
  • 条件: term 必须大于等于当前 term
  • 机制: 增加 term,收集 Join 消息

3. Publish Initial State (发布初始状态)

  • 目的: 将新主节点的状态同步到所有节点
  • 机制: 两阶段提交 (2PC)
    • Phase 1: Publish - 发布状态,等待 ack
    • Phase 2: Commit - 提交状态,使其生效

Quorum (多数派):

quorum = ⌊N/2⌋ + 1
例如:3个节点 → quorum = 2
     5个节点 → quorum = 3

6.2 集群状态发布流程

sequenceDiagram
    autonumber
    participant Client as 客户端<br/>(创建索引请求)
    participant REST as REST Handler
    participant TAction as TransportCreateIndexAction
    participant MS as MasterService
    participant Coord as Coordinator<br/>(主节点)
    participant N1 as Node 1
    participant N2 as Node 2
    participant N3 as Node 3

    Client->>REST: PUT /my_index
    REST->>TAction: execute(CreateIndexRequest)

    TAction->>MS: submitStateUpdateTask(<br/>"create-index",<br/>CreateIndexTask,<br/>CreateIndexExecutor)

    MS->>MS: 加入任务队列<br/>(Priority.NORMAL)

    Note over MS: 任务调度器处理队列

    MS->>MS: executeTasks()<br/>批量执行同类型任务

    MS->>MS: CreateIndexExecutor.execute(<br/>currentState, tasks)

    MS->>MS: 1. 验证索引名
    MS->>MS: 2. 构建 IndexMetadata
    MS->>MS: 3. 调用 AllocationService.reroute()<br/>分配主分片

    MS->>MS: 4. 构建新 ClusterState<br/>version++, metadata, routingTable

    Note over Coord,N3: 2-Phase Commit 开始

    MS->>Coord: publishClusterState(newState)

    Coord->>Coord: ClusterStatePublisher.publish()

    Note over Coord,N3: Phase 1: Publish (发布)

    par 并发发布到所有节点
        Coord->>N1: PublishRequest<br/>(diff: ClusterState diff)
        N1->>N1: 1. 验证版本号<br/>(version > localVersion)
        N1->>N1: 2. 应用 diff 到本地<br/>newState = diff.apply(localState)
        N1->>N1: 3. 保存到内存<br/>(uncommitted)
        N1-->>Coord: PublishResponse<br/>(ack, currentTerm)
    and
        Coord->>N2: PublishRequest<br/>(diff)
        N2->>N2: 验证并应用 diff
        N2-->>Coord: PublishResponse(ack)
    and
        Coord->>N3: PublishRequest<br/>(diff)
        N3->>N3: 验证并应用 diff
        N3-->>Coord: PublishResponse(ack)
    end

    Coord->>Coord: 等待 quorum ack<br/>(2 out of 3)

    Coord->>Coord: applyCommittedState(newState)<br/>主节点本地提交

    Note over Coord,N3: Phase 2: Commit (提交)

    par 并发发送提交消息
        Coord->>N1: ApplyCommitRequest<br/>(term, version)
        N1->>N1: ClusterApplierService<br/>.onNewClusterState()
        N1->>N1: 1. 调用 ClusterStateApplier
        N1->>N1: 2. 更新 ClusterService.state()
        N1->>N1: 3. 调用 ClusterStateListener
        N1-->>Coord: ApplyCommitResponse
    and
        Coord->>N2: ApplyCommitRequest
        N2->>N2: onNewClusterState()
        N2->>N2: 应用和通知
        N2-->>Coord: ApplyCommitResponse
    and
        Coord->>N3: ApplyCommitRequest
        N3->>N3: onNewClusterState()
        N3->>N3: 应用和通知
        N3-->>Coord: ApplyCommitResponse
    end

    Coord-->>MS: 发布成功
    MS-->>TAction: onSuccess(newState)
    TAction-->>REST: CreateIndexResponse
    REST-->>Client: 200 OK

时序图功能说明

第一部分:请求接收与任务提交(步骤1-12)

1-2. REST 层处理RestCreateIndexAction 接收 HTTP 请求,解析索引名称和配置 3-4. Transport ActionTransportCreateIndexAction 接收内部请求 5-8. 任务提交

  • 将创建索引任务提交到 MasterService 的任务队列
  • 任务优先级为 NORMAL
  • 任务包含:索引名、设置、映射、别名等 9-12. 任务队列化
  • 任务加入优先级队列
  • MasterService 调度器取出任务
  • 可能与其他相同类型任务批量执行

关键代码

// 文件:MasterService.java:1572-1613
public void submitTask(String source, T task, TimeValue timeout) {
    queue.add(new Entry<>(source, taskHolder, ...));
    if (queueSize.getAndIncrement() == 0) {
        perPriorityQueue.execute(processor);  // 触发执行
    }
}

第二部分:状态计算与验证(步骤13-18)

13-14. 批量执行任务

  • 从队列中取出一批相同类型的任务
  • 在一个 ClusterState 更新周期内处理多个任务

15-16. 执行创建索引逻辑

  • CreateIndexExecutor.execute() 方法
  • 验证索引名称(不能重复、不能以 . 或 _ 开头等)
  • 构建 IndexMetadata(包含设置、映射、分片数等)

17-18. 调用分片分配服务

  • AllocationService.reroute()
  • 为新索引的主分片分配节点
  • 应用分配决策器(磁盘空间、节点过滤等)

关键代码

// 文件:AllocationService.java:583-592
private void reroute(RoutingAllocation allocation, RerouteStrategy rerouteStrategy) {
    rerouteStrategy.removeDelayMarkers(allocation);
    allocateExistingUnassignedShards(allocation);  // 分配主分片
    rerouteStrategy.execute(allocation);           // 执行再平衡
}

第三部分:状态构建与发布(步骤19-37)

19-21. 构建新的 ClusterState

  • 版本号递增:version++
  • 更新 Metadata:添加新索引的 IndexMetadata
  • 更新 RoutingTable:添加新索引的分片路由

22-23. 调用 Coordinator 发布状态

  • Coordinator 作为 ClusterStatePublisher 实现
  • 使用两阶段提交协议

两阶段提交详解(步骤24-37)

Phase 1: Publish(发布阶段) 24-30. 主节点并发向所有节点发送 PublishRequest:

  • 如果节点已有最新状态,发送增量 diff
  • 否则发送完整 ClusterState
  • 每个节点:
    • 验证版本号(必须大于当前版本)
    • 应用 diff 到本地状态
    • 暂存到内存(uncommitted 状态)
    • 返回 PublishResponse (ack)

31-32. 主节点等待多数派 (quorum) 确认:

  • quorum = ⌊N/2⌋ + 1
  • 例如 3 节点集群,quorum = 2
  • 收到 quorum ack 后,主节点本地提交状态

Phase 2: Commit(提交阶段) 33-37. 主节点向所有节点发送 ApplyCommitRequest:

  • 节点调用 ClusterApplierService.onNewClusterState()
  • 依次执行:
    • ClusterStateApplier(前置处理)
    • 更新 ClusterService.state()(使状态对外可见)
    • ClusterStateListener(后置通知)

关键代码

// 文件:ClusterApplierService.java:505-539
private void applyChanges(ClusterState previousState, ClusterState newState, ...) {
    // 1. 连接新节点
    connectToNodesAndWait(newState);

    // 2. 应用设置变更
    clusterSettings.applySettings(incomingSettings);

    // 3. 调用 ClusterStateApplier
    callClusterStateAppliers(clusterChangedEvent, ...);

    // 4. 更新本地状态(使其对外可见)
    state.set(newState);

    // 5. 通知监听器
    callClusterStateListeners(clusterChangedEvent, ...);
}

第四部分:响应返回(步骤38-40)

38-39. 通知任务成功

  • 调用 CreateIndexTask.onSuccess(newState)
  • 构建 CreateIndexResponse
  1. 返回客户端
  • HTTP 200 OK
  • 响应包含:索引名、已确认、分片确认状态

异常处理与回退

  1. 任务执行失败

    • 索引名重复 → ResourceAlreadyExistsException
    • 验证失败 → InvalidIndexNameException
    • 分配失败 → 索引创建,但分片处于 unassigned 状态
  2. 发布失败(Phase 1)

    • 未收到 quorum ack → FailedToCommitClusterStateException
    • 主节点退位,触发新的选举
    • 新主节点可能重新发布该状态
  3. 提交失败(Phase 2)

    • 个别节点应用失败 → 记录日志,但不影响整体
    • 节点会在下次状态同步时重新追赶

性能与容量要点

  1. 批量执行优化

    • 多个索引创建任务可批量处理
    • 减少状态发布次数
    • 降低集群压力
  2. 状态发布优化

    • 使用增量 diff 而非全量状态
    • 压缩传输内容
    • 并发发送到所有节点
  3. 分片分配限流

    • cluster.routing.allocation.node_concurrent_recoveries:限制每个节点同时恢复的分片数
    • cluster.routing.allocation.node_initial_primaries_recoveries:限制初始主分片恢复数
  4. 超时控制

    • 发布超时:默认 30s (cluster.publish.timeout)
    • 任务超时:默认 30s
    • 超时后任务失败,客户端收到错误响应

6.3 分片分配流程详解

分片分配是集群模块的核心功能之一,决定了分片在哪些节点上分配和移动。下图展示了分片分配的内部流程:

sequenceDiagram
    autonumber
    participant MS as MasterService
    participant AS as AllocationService
    participant Allocator as DesiredBalanceShardsAllocator
    participant Deciders as AllocationDeciders
    participant Disk as DiskThresholdDecider
    participant Filter as FilterAllocationDecider
    participant RT as RoutingTable

    Note over MS,RT: 触发分片分配:索引创建、节点加入/离开、手动 reroute

    MS->>AS: reroute(clusterState)
    activate AS

    Note over AS: 1. 构建 RoutingAllocation 上下文
    AS->>AS: new RoutingAllocation(<br/>deciders, routingNodes, clusterState, ...)

    Note over AS: 2. 清理无效状态
    AS->>AS: disassociateDeadNodes()<br/>- 移除已离开节点的分片<br/>- 标记分片为 unassigned

    Note over AS: 3. 分配现有未分配分片
    AS->>AS: allocateExistingUnassignedShards(allocation)

    AS->>RT: routingNodes.unassigned().iterator()
    activate RT
    RT-->>AS: UnassignedIterator
    deactivate RT

    loop 遍历未分配的主分片
        AS->>Allocator: allocateUnassigned(shardRouting, allocation, iterator)
        activate Allocator

        Note over Allocator: 查找可分配节点
        Allocator->>Allocator: 获取所有可用节点列表

        loop 对每个候选节点
            Allocator->>Deciders: canAllocate(shard, node, allocation)
            activate Deciders

            Note over Deciders: 依次应用所有决策器

            Deciders->>Disk: canAllocate(shard, node, allocation)
            activate Disk
            Disk->>Disk: 检查磁盘使用率<br/>- 低水位线: 85%<br/>- 高水位线: 90%<br/>- 洪水水位线: 95%
            alt 磁盘空间不足
                Disk-->>Deciders: Decision.NO("磁盘使用率过高")
            else 磁盘空间充足
                Disk-->>Deciders: Decision.YES
            end
            deactivate Disk

            Deciders->>Filter: canAllocate(shard, node, allocation)
            activate Filter
            Filter->>Filter: 检查节点属性过滤<br/>- index.routing.allocation.require.*<br/>- index.routing.allocation.include.*<br/>- index.routing.allocation.exclude.*
            Filter-->>Deciders: Decision (YES/NO/THROTTLE)
            deactivate Filter

            Note over Deciders: 其他决策器:<br/>- SameShardAllocationDecider<br/>- AwarenessAllocationDecider<br/>- ThrottlingAllocationDecider<br/>- ...

            Deciders-->>Allocator: Decision (YES/NO/THROTTLE)
            deactivate Deciders
        end

        Note over Allocator: 选择最佳节点
        Allocator->>Allocator: 根据决策结果选择节点<br/>- 优先选择 Decision.YES<br/>- 考虑负载均衡<br/>- 计算分片大小和节点容量

        alt 找到可分配节点
            Allocator->>RT: initialize(shard, targetNode)
            activate RT
            RT->>RT: 更新 ShardRouting 状态<br/>- currentNodeId = targetNode<br/>- state = INITIALIZING
            RT-->>Allocator: ok
            deactivate RT
            Allocator-->>AS: 分配成功
        else 未找到可分配节点
            Allocator->>Allocator: 记录分配失败原因
            Allocator-->>AS: 分配延迟或失败
        end
        deactivate Allocator
    end

    loop 遍历未分配的副本分片
        AS->>Allocator: allocateUnassigned(replicaShard, ...)
        Note over Allocator: 副本分片分配逻辑类似<br/>但需要确保与主分片不在同一节点
    end

    Note over AS: 4. 执行再平衡
    AS->>Allocator: allocate(allocation)
    activate Allocator

    Note over Allocator: 计算期望平衡状态
    Allocator->>Allocator: computeDesiredBalance()<br/>- 考虑所有分片<br/>- 计算每个节点的理想分片分布<br/>- 目标:最小化节点间负载差异

    Note over Allocator: 协调至期望状态
    Allocator->>Allocator: reconcile(desiredBalance, currentState)

    loop 移动分片以达到平衡
        Allocator->>Deciders: canRebalance(shard, allocation)
        Deciders-->>Allocator: Decision

        alt 允许再平衡
            Allocator->>RT: relocateShard(shard, targetNode)
            activate RT
            RT->>RT: 更新 ShardRouting<br/>- state = RELOCATING<br/>- relocatingNodeId = targetNode
            RT-->>Allocator: ok
            deactivate RT
        end
    end
    deactivate Allocator

    Note over AS: 5. 返回更新后的 RoutingTable
    AS->>AS: RoutingAllocation.routingTable()
    AS-->>MS: 更新后的 RoutingTable
    deactivate AS

    MS->>MS: 构建新 ClusterState<br/>with updated RoutingTable

分片分配流程功能说明

阶段1:初始化分配上下文(步骤1-5)

  • RoutingAllocation 上下文:包含分配决策所需的所有信息

    • routingNodes:当前路由节点状态
    • clusterState:完整集群状态
    • allocationDeciders:分配决策器集合
    • clusterInfo:节点磁盘使用情况、分片大小等
    • snapshotsInfo:快照恢复信息
  • 清理无效状态

    • 移除已离开节点上的分片
    • 将这些分片标记为 unassigned
    • 记录分片未分配的原因(节点离开、节点重启等)

关键代码

// 文件:AllocationService.java:583-592
private void reroute(RoutingAllocation allocation, RerouteStrategy rerouteStrategy) {
    assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up";
    rerouteStrategy.removeDelayMarkers(allocation);
    allocateExistingUnassignedShards(allocation);  // 分配现有分片
    rerouteStrategy.execute(allocation);           // 执行策略(含再平衡)
    assert RoutingNodes.assertShardStats(allocation.routingNodes());
}

阶段2:分配未分配分片(步骤6-35)

2.1 遍历未分配的主分片(步骤8-32)

  • 优先级排序

    • 按索引优先级(index.priority)降序
    • 同优先级按索引创建时间升序
  • 对每个主分片

    1. 获取所有候选节点
    2. 对每个节点应用分配决策器
    3. 选择最佳节点并分配

2.2 分配决策器链(步骤13-26)

决策器按顺序执行,任何一个返回 NO 就拒绝分配到该节点:

  1. DiskThresholdDecider(磁盘阈值检查):

    // 文件:DiskThresholdDecider.java
    public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
        final ClusterInfo clusterInfo = allocation.clusterInfo();
        final DiskUsage usage = clusterInfo.getNodeMostAvailableDiskUsages().get(node.nodeId());
    
        // 检查高水位线
        if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getHighWatermark()) {
            return allocation.decision(Decision.NO, NAME,
                "磁盘使用率 %.1f%% 超过高水位线 %.1f%%",
                usage.getUsedDiskAsPercentage(),
                100.0 - diskThresholdSettings.getHighWatermark()
            );
        }
        return allocation.decision(Decision.YES, NAME, "磁盘空间充足");
    }
    
  2. FilterAllocationDecider(节点属性过滤):

    • 检查 index.routing.allocation.require.*:必须匹配
    • 检查 index.routing.allocation.include.*:必须包含
    • 检查 index.routing.allocation.exclude.*:必须排除
  3. SameShardAllocationDecider

    • 同一分片的主副本不能在同一节点
  4. AwarenessAllocationDecider

    • 机架感知或可用区感知
    • 确保副本分布在不同的机架/可用区
  5. ThrottlingAllocationDecider

    • 限制同时恢复的分片数
    • 避免网络和磁盘 I/O 过载

2.3 选择最佳节点(步骤27-32)

  • 从所有 Decision.YES 的节点中选择

  • 考虑因素:

    • 节点上已有分片数量
    • 节点磁盘使用率
    • 节点负载均衡得分
  • 分配后更新 RoutingTable:

    // ShardRouting 状态转换:
    // UNASSIGNED → INITIALIZING (分配到节点)
    

阶段3:执行再平衡(步骤36-50)

再平衡目标:在满足所有约束的前提下,使各节点负载尽可能均衡。

3.1 计算期望平衡状态(步骤38-39)

DesiredBalanceShardsAllocator 使用加权算法:

// 节点权重因素:
weight = α * shardCount + β * diskUsage + γ * writeLoad

// α, β, γ 是可配置的权重系数

3.2 协调至期望状态(步骤40-48)

  • 识别需要移动的分片
  • 按优先级排序(考虑分片大小、当前负载等)
  • 对每个移动:
    1. 检查是否允许再平衡(canRebalance
    2. 检查目标节点是否可接受(canAllocate
    3. 执行分片重定位(relocateShard

分片状态转换

STARTED → RELOCATING (源节点)
INITIALIZING (目标节点)
STARTED (目标节点) + STARTED 删除 (源节点)

关键代码

// 文件:DesiredBalanceReconciler.java:155-166
private void moveShards() {
    // 2. 移动无法留在当前节点的分片(必须移动)
    // 3. 移动期望在其他节点的分片(再平衡)

    for (ShardRouting shard : assignment.nodeIds()) {
        if (canRemainOnCurrentNode(shard) == false) {
            // 强制移动
            moveToDesiredNode(shard);
        } else if (shouldRebalance(shard)) {
            // 可选移动(优化平衡)
            rebalanceToDesiredNode(shard);
        }
    }
}

阶段4:返回结果(步骤51-53)

  • 返回更新后的 RoutingTable
  • MasterService 将其包含在新的 ClusterState 中
  • 通过 Coordinator 发布到所有节点

性能与调优

  1. 限流配置

    cluster.routing.allocation.node_concurrent_recoveries: 2
    cluster.routing.allocation.node_initial_primaries_recoveries: 4
    cluster.routing.allocation.cluster_concurrent_rebalance: 2
    
  2. 磁盘水位线

    cluster.routing.allocation.disk.watermark.low: 85%
    cluster.routing.allocation.disk.watermark.high: 90%
    cluster.routing.allocation.disk.watermark.flood_stage: 95%
    
  3. 再平衡策略

    cluster.routing.rebalance.enable: all  # all, primaries, replicas, none
    cluster.routing.allocation.allow_rebalance: indices_all_active
    
  4. 过滤示例

    PUT /my_index/_settings
    {
      "index.routing.allocation.require.rack": "rack1",
      "index.routing.allocation.exclude._ip": "192.168.1.100"
    }
    

6.4 节点加入集群流程

节点加入集群是分布式系统中的关键流程,涉及节点发现、验证、投票和状态同步等多个步骤。下图展示了一个新节点加入现有集群的完整流程:

sequenceDiagram
    autonumber
    participant NewNode as 新节点<br/>(Node N)
    participant Seed as 种子节点<br/>(Seed Node)
    participant Leader as 主节点<br/>(Leader)
    participant Coord as Leader.Coordinator
    participant JoinHelper as JoinHelper
    participant JoinQueue as Join Task Queue
    participant MS as MasterService
    participant Alloc as AllocationService

    Note over NewNode,Alloc: 新节点启动,尝试加入集群

    NewNode->>NewNode: 启动 Coordinator<br/>mode = CANDIDATE

    Note over NewNode: 1. 节点发现阶段
    NewNode->>NewNode: PeerFinder.activate()<br/>读取 discovery.seed_hosts 配置

    NewNode->>Seed: 发送节点发现请求<br/>(PeerFinder Request)
    Seed-->>NewNode: 返回已知节点列表<br/>(包括 Leader)

    NewNode->>NewNode: 更新已知节点列表<br/>discoveredNodes.add(Leader)

    Note over NewNode: 2. 检查主节点健康
    NewNode->>Leader: 发送 Ping 请求
    Leader-->>NewNode: Pong (term, mode=LEADER)

    NewNode->>NewNode: 识别到 Leader<br/>lastKnownLeader = Leader

    Note over NewNode: 3. 发起加入请求
    NewNode->>NewNode: 构建 JoinRequest<br/>- sourceNode: NewNode<br/>- compatibilityVersions<br/>- features<br/>- term: Leader.term

    NewNode->>Leader: JOIN_ACTION_NAME<br/>sendJoinRequest(JoinRequest)
    activate Leader

    Note over Leader: 4. Leader 接收加入请求
    Leader->>Coord: handleJoinRequest(JoinRequest)
    activate Coord

    Note over Coord: 5. 验证节点
    Coord->>Coord: validateJoinRequest(JoinRequest)

    Note over Coord: 验证步骤:
    Coord->>Coord: 1. 检查版本兼容性<br/>ensureVersionBarrier()

    Coord->>Coord: 2. 检查 transport 连接<br/>建立 STATE 通道

    alt 已是主节点
        Coord->>Coord: 3. 验证能否理解当前状态<br/>onJoinValidators.forEach()
    end

    Coord->>NewNode: 发送 JOIN_PING_ACTION<br/>检查 PING 通道健康
    NewNode-->>Coord: Pong

    alt 验证失败
        Coord-->>NewNode: ValidationException<br/>(版本不兼容/连接失败)
        NewNode->>NewNode: 记录失败,稍后重试
    else 验证成功
        Note over Coord: 6. 处理 Join 请求
        Coord->>Coord: CoordinationState.handleJoin(Join)<br/>- 记录节点投票<br/>- 检查是否达到 quorum

        Coord->>JoinHelper: JoinHelper.handleJoinRequest()
        activate JoinHelper

        JoinHelper->>JoinHelper: 将节点加入 pendingJoins 集合

        alt Join 数量达到 quorum
            Note over JoinHelper: 7. 提交加入任务
            JoinHelper->>JoinQueue: submitTask("node-join", JoinTask)
            activate JoinQueue

            JoinQueue->>MS: 加入任务队列<br/>Priority.URGENT
            deactivate JoinQueue
            activate MS

            Note over MS: 8. 执行加入任务
            MS->>MS: NodeJoinExecutor.execute(currentState, tasks)

            MS->>MS: 验证和更新:<br/>1. 再次检查版本兼容性<br/>2. 检查节点角色<br/>3. 更新 DiscoveryNodes<br/>4. 更新 node features

            alt 新节点首次加入
                MS->>MS: DiscoveryNodes.Builder<br/>.add(newNode)
            else 现有节点重新加入
                MS->>MS: 更新节点信息和 features
            end

            Note over MS: 9. 分片重新分配
            MS->>Alloc: allocationService.reroute(currentState)
            activate Alloc

            Note over Alloc: 新节点加入后重新分配分片:<br/>1. 可能分配新索引的主分片<br/>2. 可能分配副本分片<br/>3. 执行再平衡
            Alloc->>Alloc: allocateUnassignedShards()
            Alloc->>Alloc: balance()

            Alloc-->>MS: 更新后的 RoutingTable
            deactivate Alloc

            Note over MS: 10. 构建新 ClusterState
            MS->>MS: ClusterState.builder(currentState)<br/>.version++<br/>.nodes(newNodes)<br/>.routingTable(newRouting)<br/>.build()

            Note over MS: 11. 发布新状态
            MS->>Coord: publishClusterState(newState)
            deactivate MS
            deactivate JoinHelper
        end

        Note over Coord: 12. 两阶段提交发布状态
        Coord->>Coord: ClusterStatePublisher.publish(newState)

        par 并发发送到所有节点(包括新节点)
            Coord->>NewNode: PublishRequest<br/>(newState with newNode in DiscoveryNodes)
            Coord->>Leader: PublishRequest (本地应用)
            Coord->>Seed: PublishRequest
        end

        Note over NewNode: 13. 新节点接收集群状态
        NewNode->>NewNode: handlePublishRequest(PublishRequest)
        NewNode->>NewNode: 验证版本和 term
        NewNode->>NewNode: 应用 ClusterState<br/>- 更新 nodes<br/>- 更新 routingTable<br/>- 更新 metadata

        NewNode-->>Coord: PublishResponse (ack)
        Seed-->>Coord: PublishResponse (ack)

        Coord->>Coord: 等待 quorum ack

        Note over Coord: 14. 提交状态
        par 发送提交消息
            Coord->>NewNode: ApplyCommitRequest
            Coord->>Seed: ApplyCommitRequest
        end

        Note over NewNode: 15. 应用状态并成为 Follower
        NewNode->>NewNode: ClusterApplierService<br/>.onNewClusterState(newState)
        NewNode->>NewNode: becomeFollower(Leader)
        NewNode->>NewNode: LeaderChecker.updateLeader(Leader)<br/>启动主节点健康检查

        NewNode-->>Coord: ApplyCommitResponse
        Coord-->>Leader: 发布成功
        deactivate Coord
        Leader-->>NewNode: Join 成功
        deactivate Leader

        Note over NewNode: 16. 新节点完成加入
        NewNode->>NewNode: mode = FOLLOWER<br/>lastKnownLeader = Leader

        Note over NewNode: 17. 开始分片恢复
        loop 恢复分配给该节点的分片
            NewNode->>NewNode: 初始化分片<br/>从主分片或快照恢复数据
        end
    end

节点加入流程功能说明

阶段1:节点发现(步骤1-7)

新节点启动后首先需要发现集群中的其他节点:

  1. 读取种子节点配置

    discovery.seed_hosts:
      - 192.168.1.10:9300
      - 192.168.1.11:9300
      - 192.168.1.12:9300
    
  2. PeerFinder 主动探测

    • 向种子节点发送探测请求
    • 种子节点返回已知的所有节点列表
    • 新节点更新本地的 discoveredNodes 集合
  3. 识别主节点

    • 向已知节点发送 Ping
    • 主节点返回 Pong,包含 termmode=LEADER
    • 新节点识别出 Leader 并记录

关键代码

// 文件:PeerFinder.java
public void activate(final DiscoveryNodes lastAcceptedNodes) {
    // 从配置和上次状态获取种子节点
    final List<TransportAddress> seedAddresses = seedHostsProvider.getSeedAddresses(...);

    // 向每个种子节点发送探测请求
    for (TransportAddress seedAddress : seedAddresses) {
        transportService.sendRequest(
            seedAddress,
            DISCOVERY_REQUEST_ACTION_NAME,
            new PeersRequest(...),
            responseHandler
        );
    }
}

阶段2:加入请求与验证(步骤8-19)

2.1 发起加入请求(步骤8-10)

新节点构建 JoinRequest 并发送给 Leader:

// 文件:JoinHelper.java:274-290
public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join> optionalJoin) {
    final JoinRequest joinRequest = new JoinRequest(
        transportService.getLocalNode(),  // sourceNode
        compatibilityVersions,             // 版本兼容性信息
        features,                          // 节点特性
        term,                              // 当前 term
        optionalJoin                       // 可选的投票
    );

    transportService.sendRequest(
        destination,
        JOIN_ACTION_NAME,
        joinRequest,
        responseHandler
    );
}

2.2 Leader 验证节点(步骤13-18)

Leader 对加入请求进行严格验证:

  1. 版本兼容性检查

    // 文件:NodeJoinExecutor.java:173-178
    public static void ensureVersionBarrier(Version joiningNodeVersion, Version minClusterNodeVersion) {
        if (joiningNodeVersion.before(minClusterNodeVersion)) {
            throw new IllegalStateException(
                "node version [" + joiningNodeVersion + "] is lower than the minimum required [" + minClusterNodeVersion + "]"
            );
        }
    }
    
  2. Transport 连接检查

    • 确保能建立 STATE 通道(用于集群状态同步)
    • 确保能建立 PING 通道(用于健康检查)
  3. 集群状态验证(如果已是主节点):

    • 调用 onJoinValidators
    • 确保新节点能理解当前集群状态
    • 检查是否有自定义的加入限制

关键代码

// 文件:Coordinator.java:703-735
private void validateJoinRequest(JoinRequest joinRequest, ActionListener<Void> validateListener) {
    // 验证版本
    final ClusterState stateForJoinValidation = getStateForJoinValidationService();
    if (stateForJoinValidation != null) {
        onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));
        NodeJoinExecutor.ensureVersionBarrier(
            joinRequest.getSourceNode().getVersion(),
            stateForJoinValidation.getNodes().getMinNodeVersion()
        );
    }

    // 验证连接
    sendJoinValidate(joinRequest.getSourceNode(), validateListener);
}

阶段3:任务提交与执行(步骤20-32)

3.1 提交加入任务(步骤23-25)

验证通过后,Leader 将加入任务提交到 MasterService 的任务队列:

  • 任务类型:JoinTask
  • 优先级:Priority.URGENT(高优先级,优先处理)
  • 可批量执行:多个节点的加入可以合并为一次状态更新

3.2 执行加入任务(步骤27-31)

NodeJoinExecutor 执行加入逻辑:

// 文件:NodeJoinExecutor.java:157-182
public ClusterState execute(ClusterState currentState, List<JoinTask> joinTasks) {
    final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes());
    final Map<String, Set<String>> nodeFeatures = new HashMap<>(currentState.nodeFeatures());
    boolean nodesChanged = false;

    for (JoinTask joinTask : joinTasks) {
        final DiscoveryNode node = joinTask.node();

        if (currentNodes.nodeExistsWithSameRoles(node)) {
            // 现有节点重新加入,更新特性
            if (!Objects.equals(nodeFeatures.get(node.getId()), joinTask.features())) {
                nodeFeatures.put(node.getId(), joinTask.features());
                nodesChanged = true;
            }
        } else {
            // 新节点加入
            CompatibilityVersions compatibilityVersions = joinTask.compatibilityVersions();
            Set<String> features = joinTask.features();

            // 再次验证版本
            ensureVersionBarrier(node.getVersion(), minClusterNodeVersion);
            CompatibilityVersions.ensureVersionsCompatibility(compatibilityVersions, ...);

            // 添加节点
            nodesBuilder.add(node);
            nodeFeatures.put(node.getId(), features);
            nodesChanged = true;
        }
    }

    if (nodesChanged == false) {
        return currentState;  // 无变化,返回原状态
    }

    // 构建新状态
    return ClusterState.builder(currentState)
        .nodes(nodesBuilder)
        .nodeFeatures(nodeFeatures)
        .build();
}

3.3 分片重新分配(步骤32-36)

新节点加入后,AllocationService 重新分配分片:

  1. 为新节点分配分片

    • 未分配的主分片可能分配到新节点
    • 现有主分片的副本可能分配到新节点
  2. 执行再平衡

    • 计算新的期望平衡状态
    • 将部分分片从负载高的节点移动到新节点
  3. 限流控制

    • 避免同时移动过多分片
    • 遵守并发恢复限制

阶段4:状态发布(步骤37-52)

4.1 两阶段提交(步骤44-51)

Leader 将包含新节点的 ClusterState 发布到所有节点(包括新节点):

  • Phase 1 - Publish

    • 并发发送 PublishRequest 到所有节点
    • 新节点接收完整的 ClusterState(首次)
    • 其他节点接收增量 diff
    • 等待 quorum ack
  • Phase 2 - Commit

    • 发送 ApplyCommitRequest 到所有节点
    • 所有节点应用新状态
    • 新节点成为 Follower

4.2 新节点应用状态(步骤52-54)

新节点接收并应用 ClusterState:

// 文件:ClusterApplierService.java:505-539
private void applyChanges(ClusterState previousState, ClusterState newState, ...) {
    // 1. 连接集群中的其他节点
    connectToNodesAndWait(newState);

    // 2. 应用集群设置
    clusterSettings.applySettings(newState.metadata().settings());

    // 3. 调用 ClusterStateApplier(前置处理)
    callClusterStateAppliers(clusterChangedEvent, ...);

    // 4. 更新本地状态
    state.set(newState);

    // 5. 通知监听器(后置处理)
    callClusterStateListeners(clusterChangedEvent, ...);
}

新节点完成以下初始化:

  • 建立与所有节点的 Transport 连接
  • 加载集群设置和索引元数据
  • 启动 LeaderChecker,定期检查 Leader 健康
  • 开始恢复分配给它的分片

阶段5:分片恢复(步骤55-57)

新节点开始恢复分配给它的分片:

  1. 主分片恢复(如果分配了主分片):

    • 从 Gateway 加载本地数据
    • 或从快照恢复
  2. 副本分片恢复(如果分配了副本):

    • 从对应的主分片拉取数据
    • 建立 recovery connection
    • 分阶段恢复:文件复制 → 事务日志重放 → 完成

异常处理与重试

  1. 连接失败

    • 新节点无法连接种子节点 → 重试探测
    • Leader 无法连接新节点 → 验证失败,拒绝加入
  2. 验证失败

    • 版本不兼容 → 拒绝加入,返回错误
    • 特性不兼容 → 拒绝加入
  3. 发布失败

    • 新节点未 ack → Leader 等待超时,可能重新发送
    • 未达到 quorum → Leader 退位,触发新选举
  4. 重新加入

    • 新节点加入失败后会自动重试
    • 重试间隔递增(避免风暴)
    • 最终超时后节点报错并停止

性能与容量要点

  1. 批量加入优化

    • 多个节点同时加入可合并为一次状态更新
    • 减少状态发布次数
  2. 渐进式恢复

    • 新节点首先恢复主分片
    • 然后逐步恢复副本分片
    • 遵守并发恢复限制
  3. 超时配置

    cluster.join.timeout: 60s           # 加入超时
    cluster.publish.timeout: 30s        # 状态发布超时
    
  4. 启动检查(首次启动集群时):

    cluster.initial_master_nodes:       # 初始主节点候选列表
      - node-1
      - node-2
      - node-3
    

7. 配置与可观测

7.1 关键配置

配置项 默认值 说明
cluster.name elasticsearch 集群名称
node.master true 是否可以成为主节点
node.data true 是否是数据节点
discovery.seed_hosts [] 种子节点列表
cluster.initial_master_nodes [] 初始主节点候选列表 (首次启动)
cluster.publish.timeout 30s 状态发布超时
cluster.join.timeout 60s 加入集群超时
cluster.election.duration 500ms 选举周期
cluster.follower_lag.timeout 90s Follower 延迟超时

7.2 监控指标

集群健康

GET /_cluster/health

{
  "status": "green",  // green / yellow / red
  "number_of_nodes": 3,
  "number_of_data_nodes": 3,
  "active_primary_shards": 10,
  "active_shards": 20,
  "relocating_shards": 0,
  "initializing_shards": 0,
  "unassigned_shards": 0
}

集群统计

GET /_cluster/stats

{
  "nodes": {
    "count": {
      "total": 3,
      "master": 3,
      "data": 3
    }
  },
  "indices": {
    "count": 5,
    "shards": {
      "total": 20,
      "primaries": 10
    }
  }
}

8. 关键设计决策与权衡

8.1 一致性与可用性权衡

Elasticsearch 集群模块在 CAP 定理中选择了 CP(一致性 + 分区容错性):

设计决策

  1. 基于 Raft 的协调协议

    • 保证集群状态的强一致性
    • 使用 quorum(多数派)机制
    • 脑裂防护
  2. 两阶段提交状态发布

    • Phase 1: Publish - 确保多数派接受
    • Phase 2: Commit - 所有节点应用
    • 保证状态变更的原子性

权衡

  • 优势:集群状态始终一致,避免数据丢失
  • 劣势:网络分区时少数派不可用,牺牲部分可用性
  • 应对:通过合理规划节点数量(奇数个master-eligible节点)减少影响

8.2 状态更新的串行化

设计决策

  • 集群状态更新在主节点上严格串行化执行
  • 通过优先级队列管理任务
  • 批量执行相同类型的任务

权衡

  • 优势
    • 避免并发冲突和竞态条件
    • 简化状态管理逻辑
    • 便于推理和调试
  • 劣势
    • 状态更新成为性能瓶颈
    • 高优先级任务可能饿死低优先级任务
  • 应对
    • 尽量使用 Priority.NORMAL
    • 批量提交任务
    • 优化任务执行时间

8.3 增量状态发布

设计决策

  • 发布集群状态时发送 diff 而非全量状态
  • 仅在必要时(新节点、版本不匹配)发送全量状态

权衡

  • 优势
    • 大幅减少网络传输量
    • 降低序列化/反序列化开销
    • 提升状态发布速度
  • 劣势
    • 增加 diff 计算复杂度
    • 需要维护版本兼容性
  • 应对
    • 高效的 diff 算法
    • 版本号严格递增
    • 失败时回退到全量发布

8.4 分片分配的限流机制

设计决策

  • 限制同时恢复/移动的分片数量
  • 多级限流:节点级、集群级
  • 区分主分片和副本分片

权衡

  • 优势
    • 避免网络和磁盘 I/O 过载
    • 保证集群稳定性
    • 渐进式恢复
  • 劣势
    • 恢复时间变长
    • 集群恢复到 GREEN 状态较慢
  • 应对
    • 根据硬件能力调整限流参数
    • 区分初始恢复和再平衡的限流值

8.5 节点角色分离

设计决策

  • 支持节点角色:master-eligible, data, ingest, coordinating
  • 推荐大集群中角色分离

权衡

  • 优势
    • 专用节点资源隔离
    • 避免相互影响
    • 更好的可扩展性
  • 劣势
    • 增加节点数量
    • 增加运维复杂度
  • 应对
    • 小集群可使用混合节点
    • 大集群建议角色分离

9. 核心要点总结

9.1 模块职责概览

模块 核心职责 关键组件
REST API 层 接收HTTP请求,参数解析 RestClusterHealthAction, RestClusterStateAction
Transport Action 层 实现业务逻辑,路由请求 TransportClusterHealthAction, TransportCreateIndexAction
集群服务层 统一状态访问,读写分离 ClusterService, MasterService, ClusterApplierService
协调层 Raft协议,选举,状态发布 Coordinator, PeerFinder, JoinHelper
状态管理层 维护集群状态,版本控制 ClusterState, CoordinationState, ClusterStatePublisher
分片分配层 分片分配,负载均衡 AllocationService, DesiredBalanceShardsAllocator

9.2 关键调用链路

1. 读取集群健康状态

HTTP Client → RestClusterHealthAction → NodeClient →
TransportClusterHealthAction → ClusterService.state() →
计算健康状态 → 返回响应

2. 创建索引(状态更新)

HTTP Client → RestCreateIndexAction → TransportCreateIndexAction →
MasterService.submitTask() → 任务队列 →
CreateIndexExecutor.execute() → AllocationService.reroute() →
Coordinator.publishClusterState() → 两阶段提交 →
所有节点应用状态 → 返回响应

3. 节点加入集群

新节点启动 → PeerFinder 发现 → 发送 JoinRequest →
Leader 验证 → 提交 JoinTask → NodeJoinExecutor.execute() →
更新 DiscoveryNodes → 重新分配分片 →
发布新状态 → 新节点成为 Follower → 开始分片恢复

9.3 核心机制

1. 两阶段提交(2PC)

  • Phase 1: Publish - 主节点发送状态,等待多数派 ack
  • Phase 2: Commit - 主节点通知提交,所有节点应用

2. Quorum(多数派)

quorum = ⌊N/2⌋ + 1
- 3节点: quorum = 2
- 5节点: quorum = 3
- 7节点: quorum = 4

3. 任务批量执行

  • 相同类型的任务合并为一批
  • 一次状态更新处理多个任务
  • 减少状态发布次数

4. 增量状态发布

  • 计算 ClusterState diff
  • 仅发送变更部分
  • 新节点或版本不匹配时发送全量

5. 分片状态转换

UNASSIGNED → INITIALIZING → STARTED
STARTED → RELOCATING → (目标节点) INITIALIZING → STARTED

9.4 监控指标

指标 API 关键字段
集群健康 GET /_cluster/health status, active_shards, unassigned_shards
集群状态 GET /_cluster/state version, nodes, routing_table, metadata
待处理任务 GET /_cluster/pending_tasks priority, source, time_in_queue
集群统计 GET /_cluster/stats nodes.count, indices.count, shards.total
节点信息 GET /_nodes roles, version, jvm, os

9.5 故障排查

问题1:集群健康状态为 YELLOW

  • 原因:部分副本分片未分配
  • 排查
    GET /_cluster/allocation/explain
    GET /_cat/shards?v&h=index,shard,prirep,state,unassigned.reason
    
  • 常见原因:磁盘空间不足、节点数少于副本数

问题2:集群健康状态为 RED

  • 原因:部分主分片未分配
  • 排查
    GET /_cluster/allocation/explain
    GET /_cat/indices?v&health=red
    
  • 常见原因:节点故障、数据损坏

问题3:集群无主节点

  • 原因:选举失败或主节点失联
  • 排查
    • 检查日志:grep "master not discovered" logs/elasticsearch.log
    • 检查网络连接
    • 检查 discovery.seed_hosts 配置
  • 常见原因:网络分区、节点配置错误、版本不兼容

问题4:分片恢复缓慢

  • 原因:限流设置过低
  • 调优
    cluster.routing.allocation.node_concurrent_recoveries: 4
    cluster.routing.allocation.node_initial_primaries_recoveries: 8
    indices.recovery.max_bytes_per_sec: 100mb
    

问题5:集群状态更新缓慢

  • 原因:主节点负载过高、任务执行时间长
  • 排查
    GET /_cluster/pending_tasks
    GET /_nodes/hot_threads
    
  • 优化
    • 减少高优先级任务
    • 优化任务执行逻辑
    • 考虑使用专用主节点

9.6 最佳实践

1. 节点规划

  • 使用奇数个 master-eligible 节点(3或5个)
  • 大集群建议角色分离:专用主节点 + 数据节点 + 协调节点
  • 避免跨数据中心部署(高延迟导致选举问题)

2. 配置优化

# 集群名称(必须相同)
cluster.name: my-cluster

# 节点名称(唯一)
node.name: node-1

# 节点角色
node.roles: [ master, data ]

# 发现配置
discovery.seed_hosts: [ "192.168.1.10", "192.168.1.11", "192.168.1.12" ]
cluster.initial_master_nodes: [ "node-1", "node-2", "node-3" ]

# 超时配置
cluster.join.timeout: 60s
cluster.publish.timeout: 30s

# 分片分配
cluster.routing.allocation.node_concurrent_recoveries: 2
cluster.routing.allocation.disk.watermark.low: 85%
cluster.routing.allocation.disk.watermark.high: 90%

3. 索引设计

  • 合理设置分片数量(单分片不超过50GB)
  • 副本数量与节点数量匹配
  • 使用索引生命周期管理(ILM)

4. 监控告警

  • 监控集群健康状态
  • 监控待处理任务数量
  • 监控磁盘使用率
  • 监控节点 JVM 堆内存使用

5. 容量规划

  • 单集群建议不超过 200 个节点
  • 单集群建议不超过 10 万个分片
  • 主节点堆内存:至少 8GB,大集群 16-32GB
  • 数据节点:根据数据量和查询负载规划