概述

Kafka协调器模块是实现分布式协调功能的核心组件。GroupCoordinator负责管理消费者组的成员关系和分区分配,TransactionCoordinator负责事务的协调和管理。本文协调器的内部实现,揭示其在分布式环境下保证一致性和可用性的技术机制。

1. 协调器架构总览

1.1 协调器模块整体架构图

graph TB
    subgraph "Kafka协调器模块架构"
        subgraph "组协调器 Group Coordinator"
            GCS[GroupCoordinatorService 服务层]
            GCSH[GroupCoordinatorShard 分片层]
            CGM[ConsumerGroupManager 消费者组管理]
            SGM[StreamsGroupManager 流组管理]
            SHGM[ShareGroupManager 共享组管理]
        end
        
        subgraph "事务协调器 Transaction Coordinator"
            TCS[TransactionCoordinatorService 服务层]
            TCSH[TransactionCoordinatorShard 分片层]
            TSM[TransactionStateManager 状态管理]
            TTM[TransactionTopicManager 事务主题管理]
        end
        
        subgraph "协调器运行时 Coordinator Runtime"
            CR[CoordinatorRuntime 运行时]
            CL[CoordinatorLoader 加载器]
            CS[CoordinatorShard 分片接口]
            CT[CoordinatorTimer 定时器]
        end
        
        subgraph "持久化层 Persistence Layer"
            CRL[CoordinatorRecordLoader 记录加载器]
            CRS[CoordinatorRecordSerde 序列化器]
            SS[SnapshotStorage 快照存储]
            LS[LogStorage 日志存储]
        end
        
        subgraph "分区分配器 Partition Assignors"
            RRA[RangeAssignor 范围分配器]
            RDA[RoundRobinAssignor 轮询分配器]
            SA[StickyAssignor 粘性分配器]
            COPA[CooperativeStickyAssignor 协作粘性分配器]
        end
        
        %% 连接关系
        GCS --> GCSH
        GCSH --> CGM
        GCSH --> SGM
        GCSH --> SHGM
        
        TCS --> TCSH
        TCSH --> TSM
        TCSH --> TTM
        
        GCSH --> CR
        TCSH --> CR
        CR --> CL
        CR --> CS
        CR --> CT
        
        CR --> CRL
        CR --> CRS
        CRL --> SS
        CRL --> LS
        
        CGM --> RRA
        CGM --> RDA
        CGM --> SA
        CGM --> COPA
    end
    
    style GCS fill:#e1f5fe
    style TCS fill:#e8f5e8
    style CR fill:#f3e5f5
    style CGM fill:#fff3e0

2. GroupCoordinator组协调器深度解析

2.1 消费者组再平衡完整流程

sequenceDiagram
    participant C1 as Consumer1
    participant C2 as Consumer2
    participant C3 as Consumer3 (新加入)
    participant GC as GroupCoordinator
    participant L as Leader Consumer
    participant KC as KafkaController
    
    Note over C1,KC: 消费者组再平衡完整流程
    
    Note over C1,C3: 阶段1:触发再平衡
    C3->>GC: JoinGroupRequest (memberEpoch=0)
    GC->>GC: 检测新成员加入
    GC->>C1: HeartbeatResponse (REBALANCE_IN_PROGRESS)
    GC->>C2: HeartbeatResponse (REBALANCE_IN_PROGRESS)
    
    Note over C1,GC: 阶段2:PrepareRebalance
    C1->>GC: JoinGroupRequest
    C1->>C1: 停止消费,提交偏移量
    C2->>GC: JoinGroupRequest  
    C2->>C2: 停止消费,提交偏移量
    C3->>GC: JoinGroupRequest
    
    GC->>GC: 等待所有成员JoinGroup
    GC->>GC: 选举Group Leader (通常是第一个加入的)
    
    Note over C1,GC: 阶段3:AwaitingSync
    GC->>C1: JoinGroupResponse (LEADER, 成员列表)
    GC->>C2: JoinGroupResponse (MEMBER)
    GC->>C3: JoinGroupResponse (MEMBER)
    
    Note over C1,GC: 阶段4:分区分配计算
    C1->>C1: 根据分区分配器计算分配方案
    C1->>GC: SyncGroupRequest (包含分配方案)
    C2->>GC: SyncGroupRequest (空)
    C3->>GC: SyncGroupRequest (空)
    
    Note over GC: 阶段5:Stable状态
    GC->>C1: SyncGroupResponse (分配的分区)
    GC->>C2: SyncGroupResponse (分配的分区) 
    GC->>C3: SyncGroupResponse (分配的分区)
    
    Note over C1,C3: 阶段6:开始消费
    C1->>C1: 开始消费新分配的分区
    C2->>C2: 开始消费新分配的分区
    C3->>C3: 开始消费新分配的分区
    
    Note over C1,C3: 正常心跳维持组成员关系
    loop 定期心跳
        C1->>GC: HeartbeatRequest
        C2->>GC: HeartbeatRequest  
        C3->>GC: HeartbeatRequest
        GC->>C1: HeartbeatResponse (OK)
        GC->>C2: HeartbeatResponse (OK)
        GC->>C3: HeartbeatResponse (OK)
    end

2.2 GroupCoordinator核心实现

/**
 * GroupCoordinatorService - 组协调器服务实现
 * 管理消费者组的完整生命周期
 */
public class GroupCoordinatorService implements GroupCoordinator {
    
    private final GroupCoordinatorConfig config;                    // 配置
    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime; // 运行时
    private final GroupCoordinatorMetrics groupCoordinatorMetrics;  // 监控指标
    private final Timer timer;                                      // 定时器
    
    // 分区分配器映射
    private final Set<String> consumerGroupAssignors;              // 支持的分配器列表
    
    /**
     * 处理消费者组心跳请求
     * 实现组成员管理和再平衡触发逻辑
     */
    @Override
    public CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
        AuthorizableRequestContext context,
        ConsumerGroupHeartbeatRequestData request) {
        
        String groupId = request.groupId();
        String memberId = request.memberId();
        int memberEpoch = request.memberEpoch();
        
        debug("处理消费者组心跳:groupId={}, memberId={}, memberEpoch={}", 
             groupId, memberId, memberEpoch);
        
        // 验证请求参数
        throwIfConsumerGroupHeartbeatRequestIsInvalid(request, context.apiVersion());
        
        // 路由到对应的协调器分片
        return runtime.scheduleWriteOperation(
            "consumer-group-heartbeat",
            topicPartitionFor(groupId),
            coordinator -> coordinator.consumerGroupHeartbeat(context, request)
        ).thenApply(result -> {
            // 处理调度的写操作结果
            ConsumerGroupHeartbeatResponseData response = result.response();
            
            // 如果有新的定时任务,添加到定时器
            result.records().forEach(record -> {
                if (record instanceof TimerRecord) {
                    TimerRecord timerRecord = (TimerRecord) record;
                    timer.add(timerRecord);
                }
            });
            
            return response;
        }).exceptionally(throwable -> {
            error("处理消费者组心跳时发生异常", throwable);
            
            // 构建错误响应
            return new ConsumerGroupHeartbeatResponseData()
                .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
                .setErrorMessage("服务器内部错误");
        });
    }
    
    /**
     * 处理组加入请求(传统协议)
     * 支持旧版本消费者的组加入流程
     */
    @Override
    public CompletableFuture<JoinGroupResponseData> joinGroup(
        AuthorizableRequestContext context,
        JoinGroupRequestData request,
        BufferSupplier bufferSupplier) {
        
        String groupId = request.groupId();
        String memberId = request.memberId();
        String protocolType = request.protocolType();
        
        info("处理组加入请求:groupId={}, memberId={}, protocolType={}", 
            groupId, memberId, protocolType);
        
        return runtime.scheduleWriteOperation(
            "join-group",
            topicPartitionFor(groupId),
            coordinator -> coordinator.classicGroupJoin(context, request, bufferSupplier)
        ).thenApply(result -> {
            JoinGroupResponseData response = result.response();
            
            // 记录组加入指标
            groupCoordinatorMetrics.recordGroupJoin();
            
            return response;
        });
    }
    
    /**
     * 处理同步组请求
     * Leader消费者提交分区分配方案
     */
    @Override
    public CompletableFuture<SyncGroupResponseData> syncGroup(
        AuthorizableRequestContext context,
        SyncGroupRequestData request,
        BufferSupplier bufferSupplier) {
        
        String groupId = request.groupId();
        String memberId = request.memberId();
        int generationId = request.generationId();
        
        debug("处理同步组请求:groupId={}, memberId={}, generationId={}", 
             groupId, memberId, generationId);
        
        return runtime.scheduleWriteOperation(
            "sync-group", 
            topicPartitionFor(groupId),
            coordinator -> coordinator.classicGroupSync(context, request, bufferSupplier)
        ).thenApply(result -> {
            SyncGroupResponseData response = result.response();
            
            // 如果同步成功,组进入Stable状态
            if (response.errorCode() == Errors.NONE.code()) {
                info("组 {} 同步完成,进入Stable状态", groupId);
                groupCoordinatorMetrics.recordGroupSync();
            }
            
            return response;
        });
    }
}

/**
 * GroupCoordinatorShard - 组协调器分片实现
 * 每个分片管理一部分组的状态和元数据
 */
public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord> {
    
    // 组状态存储
    private final TimelineHashMap<String, ClassicGroup> classicGroups;      // 传统消费者组
    private final TimelineHashMap<String, ConsumerGroup> consumerGroups;    // 现代消费者组
    private final TimelineHashMap<String, StreamsGroup> streamsGroups;      // 流组
    private final TimelineHashMap<String, ShareGroup> shareGroups;          // 共享组
    
    // 分区分配器
    private final Map<String, ConsumerGroupPartitionAssignor> assignors;
    
    // 配置和监控
    private final GroupCoordinatorConfig config;
    private final GroupCoordinatorMetrics metrics;
    private final SnapshotRegistry snapshotRegistry;                        // 快照注册表

    /**
     * 处理消费者组心跳
     * 现代消费者协议的心跳处理逻辑
     */
    public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(
        AuthorizableRequestContext context,
        ConsumerGroupHeartbeatRequestData request) throws ApiException {
        
        String groupId = request.groupId();
        String memberId = request.memberId();
        int memberEpoch = request.memberEpoch();
        
        // 获取或创建消费者组
        ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
            request.groupInstanceId() != null);
        
        List<CoordinatorRecord> records = new ArrayList<>();
        ConsumerGroupHeartbeatResponseData response;
        
        try {
            if (memberEpoch == 0) {
                // 新成员加入流程
                response = handleConsumerGroupMemberJoin(context, request, group, records);
            } else {
                // 现有成员心跳流程
                response = handleConsumerGroupMemberHeartbeat(context, request, group, records);
            }
            
            // 检查是否需要触发再平衡
            maybeScheduleConsumerGroupRebalance(group, records);
            
            return new CoordinatorResult<>(records, response);
            
        } catch (Exception e) {
            error("处理消费者组心跳时发生异常:groupId={}, memberId={}", groupId, memberId, e);
            
            ConsumerGroupHeartbeatResponseData errorResponse = new ConsumerGroupHeartbeatResponseData()
                .setErrorCode(Errors.forException(e).code())
                .setErrorMessage(e.getMessage());
            
            return new CoordinatorResult<>(Collections.emptyList(), errorResponse);
        }
    }
    
    /**
     * 处理新成员加入
     * 实现成员注册和初始分区分配
     */
    private ConsumerGroupHeartbeatResponseData handleConsumerGroupMemberJoin(
        AuthorizableRequestContext context,
        ConsumerGroupHeartbeatRequestData request,
        ConsumerGroup group,
        List<CoordinatorRecord> records) {
        
        String groupId = request.groupId();
        String memberId = request.memberId();
        
        // 生成新的成员ID(如果需要)
        if (memberId == null || memberId.isEmpty()) {
            memberId = generateMemberId(context);
        }
        
        // 创建新的组成员
        ConsumerGroupMember newMember = new ConsumerGroupMember.Builder()
            .setMemberId(memberId)
            .setGroupInstanceId(request.groupInstanceId())
            .setRackId(request.rackId())
            .setClientId(context.clientId())
            .setClientHost(context.clientAddress().toString())
            .setSubscribedTopicNames(request.subscribedTopicNames())
            .setServerAssignor(request.serverAssignor())
            .setRebalanceTimeoutMs(request.rebalanceTimeoutMs())
            .build();
        
        // 将成员添加到组中
        group.addMember(newMember);
        
        // 生成成员加入记录
        ConsumerGroupMemberMetadataValue memberRecord = new ConsumerGroupMemberMetadataValue()
            .setMemberId(memberId)
            .setGroupInstanceId(request.groupInstanceId())
            .setClientId(context.clientId())
            .setClientHost(context.clientAddress().toString())
            .setSubscribedTopicNames(request.subscribedTopicNames())
            .setAssignedPartitions(Collections.emptyList()); // 初始无分配
        
        records.add(new CoordinatorRecord(
            new ConsumerGroupMemberMetadataKey(groupId, memberId),
            memberRecord
        ));
        
        // 触发再平衡
        group.requestRebalance("新成员加入");
        
        // 构建响应
        return new ConsumerGroupHeartbeatResponseData()
            .setMemberId(memberId)
            .setMemberEpoch(1) // 新成员从epoch 1开始
            .setHeartbeatIntervalMs(config.consumerGroupHeartbeatIntervalMs())
            .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
                .setAssignedPartitions(Collections.emptyList()));
    }
    
    /**
     * 处理现有成员心跳
     * 维护成员活跃状态,处理分区分配变更
     */
    private ConsumerGroupHeartbeatResponseData handleConsumerGroupMemberHeartbeat(
        AuthorizableRequestContext context,
        ConsumerGroupHeartbeatRequestData request,
        ConsumerGroup group,
        List<CoordinatorRecord> records) {
        
        String groupId = request.groupId();
        String memberId = request.memberId();
        int memberEpoch = request.memberEpoch();
        
        // 获取现有成员
        ConsumerGroupMember member = group.member(memberId);
        if (member == null) {
            throw new UnknownMemberIdException("未知的成员ID: " + memberId);
        }
        
        // 验证成员纪元
        if (memberEpoch < member.memberEpoch()) {
            throw new FencedMemberEpochException("成员纪元已过期");
        } else if (memberEpoch > member.memberEpoch()) {
            throw new UnknownMemberIdException("成员纪元超前");
        }
        
        // 更新成员最后心跳时间
        member.updateLastHeartbeat(time.milliseconds());
        
        // 检查分区分配是否有变化
        List<TopicPartition> currentAssignment = member.assignedPartitions();
        List<TopicPartition> targetAssignment = group.targetAssignment(memberId);
        
        ConsumerGroupHeartbeatResponseData response = new ConsumerGroupHeartbeatResponseData()
            .setMemberId(memberId)
            .setMemberEpoch(member.memberEpoch())
            .setHeartbeatIntervalMs(config.consumerGroupHeartbeatIntervalMs());
        
        if (!currentAssignment.equals(targetAssignment)) {
            // 分区分配有变化,更新成员分配
            member.updateAssignment(targetAssignment);
            
            // 生成分区分配更新记录
            ConsumerGroupPartitionAssignmentValue assignmentRecord = 
                new ConsumerGroupPartitionAssignmentValue()
                    .setMemberId(memberId)
                    .setAssignedPartitions(targetAssignment.stream()
                        .map(tp -> new ConsumerGroupPartitionAssignmentValue.Partition()
                            .setTopicId(getTopicId(tp.topic()))
                            .setPartitionId(tp.partition()))
                        .collect(Collectors.toList()));
            
            records.add(new CoordinatorRecord(
                new ConsumerGroupPartitionAssignmentKey(groupId, memberId),
                assignmentRecord
            ));
            
            // 设置新的分区分配
            response.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
                .setAssignedPartitions(targetAssignment.stream()
                    .map(tp -> new ConsumerGroupHeartbeatResponseData.Assignment.TopicPartition()
                        .setTopicId(getTopicId(tp.topic()))
                        .setPartitions(Collections.singletonList(tp.partition())))
                    .collect(Collectors.toList())));
            
            info("更新成员 {} 的分区分配:{} -> {}", memberId, currentAssignment, targetAssignment);
        }
        
        return response;
    }
    
    /**
     * 执行分区分配计算
     * 使用配置的分区分配器计算新的分区分配方案
     */
    private Map<String, List<TopicPartition>> computePartitionAssignment(
        ConsumerGroup group, 
        String assignorName) {
        
        ConsumerGroupPartitionAssignor assignor = assignors.get(assignorName);
        if (assignor == null) {
            throw new UnsupportedAssignorException("不支持的分配器: " + assignorName);
        }
        
        // 准备分配输入
        Set<String> subscribedTopics = group.subscribedTopicNames();
        Map<String, ConsumerGroupMemberSubscription> memberSubscriptions = new HashMap<>();
        
        group.members().forEach((memberId, member) -> {
            memberSubscriptions.put(memberId, new ConsumerGroupMemberSubscription()
                .setTopics(member.subscribedTopicNames())
                .setRackId(member.rackId()));
        });
        
        // 获取主题元数据
        Map<String, TopicMetadata> topicMetadata = new HashMap<>();
        subscribedTopics.forEach(topic -> {
            TopicMetadata metadata = getTopicMetadata(topic);
            if (metadata != null) {
                topicMetadata.put(topic, metadata);
            }
        });
        
        // 执行分配计算
        debug("执行分区分配计算:assignor={}, members={}, topics={}", 
             assignorName, memberSubscriptions.keySet(), subscribedTopics);
        
        ConsumerGroupPartitionAssignorContext assignmentContext = 
            new ConsumerGroupPartitionAssignorContext(
                group.groupId(),
                memberSubscriptions,
                topicMetadata
            );
        
        Map<String, ConsumerGroupMemberAssignment> memberAssignments = 
            assignor.assign(assignmentContext);
        
        // 转换为TopicPartition列表
        Map<String, List<TopicPartition>> result = new HashMap<>();
        memberAssignments.forEach((memberId, assignment) -> {
            List<TopicPartition> partitions = assignment.partitions().stream()
                .map(tp -> new TopicPartition(tp.topicId(), tp.partitionId()))
                .collect(Collectors.toList());
            result.put(memberId, partitions);
        });
        
        info("分区分配计算完成:{}", result);
        return result;
    }
    
    /**
     * 检查并调度再平衡
     * 当组状态或成员订阅发生变化时触发再平衡
     */
    private void maybeScheduleConsumerGroupRebalance(ConsumerGroup group, 
                                                   List<CoordinatorRecord> records) {
        if (group.state() != ConsumerGroupState.STABLE) {
            return; // 组不在稳定状态,无需调度再平衡
        }
        
        boolean needsRebalance = false;
        String rebalanceReason = "";
        
        // 检查各种再平衡触发条件
        if (group.hasMetadataChanged()) {
            needsRebalance = true;
            rebalanceReason = "主题元数据变更";
        } else if (group.hasMemberSubscriptionChanged()) {
            needsRebalance = true;
            rebalanceReason = "成员订阅变更";
        } else if (group.hasInactiveMembers()) {
            needsRebalance = true;
            rebalanceReason = "检测到不活跃成员";
        }
        
        if (needsRebalance) {
            info("调度组 {} 的再平衡,原因: {}", group.groupId(), rebalanceReason);
            
            // 将组状态转换为PrepareRebalance
            group.transitionTo(ConsumerGroupState.PREPARING_REBALANCE);
            
            // 生成状态变更记录
            ConsumerGroupMetadataValue groupRecord = new ConsumerGroupMetadataValue()
                .setGroupId(group.groupId())
                .setGroupState(ConsumerGroupState.PREPARING_REBALANCE.toString())
                .setStateTimestamp(time.milliseconds());
            
            records.add(new CoordinatorRecord(
                new ConsumerGroupMetadataKey(group.groupId()),
                groupRecord
            ));
            
            // 调度再平衡超时任务
            scheduleRebalanceTimeout(group, records);
        }
    }
    
    /**
     * 调度再平衡超时任务
     * 如果在指定时间内没有完成再平衡,强制完成
     */
    private void scheduleRebalanceTimeout(ConsumerGroup group, 
                                        List<CoordinatorRecord> records) {
        long timeoutMs = config.consumerGroupSessionTimeoutMs();
        long deadlineMs = time.milliseconds() + timeoutMs;
        
        // 创建定时任务记录
        ConsumerGroupRebalanceTimeoutKey timerKey = 
            new ConsumerGroupRebalanceTimeoutKey(group.groupId());
        
        TimerRecord timerRecord = new TimerRecord(
            timerKey,
            deadlineMs,
            "consumer-group-rebalance-timeout"
        );
        
        records.add(timerRecord);
        
        debug("调度组 {} 的再平衡超时任务,截止时间: {}", group.groupId(), new Date(deadlineMs));
    }
}

3. TransactionCoordinator事务协调器

3.1 事务协调机制架构图

graph TB
    subgraph "Kafka事务协调器架构"
        subgraph "事务协调器层 Transaction Coordinator Layer"
            TC[TransactionCoordinator 事务协调器]
            TSM[TransactionStateManager 事务状态管理器]
            TM[TransactionMetadata 事务元数据]
            TS[TransactionState 事务状态机]
        end
        
        subgraph "Producer端 Producer Side"
            P[TransactionalProducer 事务生产者]
            TID[TransactionalId 事务ID]
            PID[ProducerId 生产者ID]
            PE[ProducerEpoch 生产者纪元]
        end
        
        subgraph "Consumer端 Consumer Side"
            C[TransactionalConsumer 事务消费者]
            ISO[IsolationLevel 隔离级别]
            LSO[LastStableOffset 最后稳定偏移量]
        end
        
        subgraph "存储层 Storage Layer"
            TT[__transaction_state 事务状态主题]
            TL[Transaction Log 事务日志]
            CM[Control Messages 控制消息]
        end
        
        %% 事务流程
        P --> TID
        P --> PID
        P --> PE
        
        P --> TC
        TC --> TSM
        TSM --> TM
        TM --> TS
        
        TC --> TT
        TT --> TL
        TL --> CM
        
        C --> ISO
        C --> LSO
        LSO --> CM
    end
    
    style TC fill:#e1f5fe
    style TSM fill:#e8f5e8
    style TS fill:#f3e5f5
    style CM fill:#fff3e0

3.2 事务状态机实现

/**
 * TransactionStateManager - 事务状态管理器
 * 管理事务的完整生命周期和状态转换
 */
public class TransactionStateManager {
    
    // 事务状态枚举
    public enum TransactionState {
        EMPTY,                    // 空状态,事务尚未开始
        ONGOING,                  // 进行中,事务已开始但未结束
        PREPARE_COMMIT,           // 准备提交,开始两阶段提交的第一阶段
        PREPARE_ABORT,            // 准备中止,开始两阶段中止的第一阶段
        COMPLETE_COMMIT,          // 完成提交,事务成功提交
        COMPLETE_ABORT,           // 完成中止,事务已中止
        DEAD                      // 死亡状态,事务超时或出错
    }
    
    // 事务元数据存储
    private final ConcurrentHashMap<String, TransactionMetadata> transactionMetadataMap;
    
    // 事务状态主题分区映射
    private final int transactionTopicPartitionCount;
    private final short transactionTopicReplicationFactor;
    
    /**
     * 处理InitProducerId请求
     * 为生产者分配或更新ProducerId和ProducerEpoch
     */
    public InitProducerIdResult handleInitProducerId(String transactionalId,
                                                    int transactionTimeoutMs,
                                                    Optional<ProducerIdAndEpoch> expectedProducerIdAndEpoch) {
        
        if (transactionalId == null || transactionalId.isEmpty()) {
            // 非事务生产者,直接分配新的ProducerId
            ProducerIdAndEpoch newProducerIdAndEpoch = producerIdManager.generateProducerId();
            
            return new InitProducerIdResult(
                newProducerIdAndEpoch.producerId,
                newProducerIdAndEpoch.epoch,
                Errors.NONE
            );
        }
        
        // 事务生产者处理
        return transactionMetadataMap.compute(transactionalId, (txnId, existingMetadata) -> {
            if (existingMetadata == null) {
                // 新的事务ID,创建元数据
                ProducerIdAndEpoch newProducerIdAndEpoch = producerIdManager.generateProducerId();
                
                TransactionMetadata newMetadata = new TransactionMetadata(
                    transactionalId = txnId,
                    producerId = newProducerIdAndEpoch.producerId,
                    producerEpoch = newProducerIdAndEpoch.epoch,
                    transactionTimeoutMs = transactionTimeoutMs,
                    state = TransactionState.EMPTY,
                    partitions = new HashSet<>(),
                    txnStartTimestamp = time.milliseconds(),
                    txnLastUpdateTimestamp = time.milliseconds()
                );
                
                // 持久化事务元数据
                persistTransactionMetadata(txnId, newMetadata);
                
                info("为新事务ID {} 分配ProducerId: {}, Epoch: {}", 
                    txnId, newProducerIdAndEpoch.producerId, newProducerIdAndEpoch.epoch);
                
                return new InitProducerIdResult(
                    newProducerIdAndEpoch.producerId,
                    newProducerIdAndEpoch.epoch,
                    Errors.NONE
                );
                
            } else {
                // 现有事务ID,检查状态
                if (existingMetadata.state == TransactionState.PREPARE_COMMIT ||
                    existingMetadata.state == TransactionState.PREPARE_ABORT) {
                    // 事务正在进行两阶段提交/中止,返回错误
                    return new InitProducerIdResult(
                        RecordBatch.NO_PRODUCER_ID,
                        RecordBatch.NO_PRODUCER_EPOCH,
                        Errors.CONCURRENT_TRANSACTIONS
                    );
                }
                
                // 检查Producer纪元
                if (expectedProducerIdAndEpoch.isPresent()) {
                    ProducerIdAndEpoch expected = expectedProducerIdAndEpoch.get();
                    if (expected.producerId != existingMetadata.producerId ||
                        expected.epoch != existingMetadata.producerEpoch) {
                        // Producer信息不匹配,可能是网络分区后的重试
                        return new InitProducerIdResult(
                            RecordBatch.NO_PRODUCER_ID,
                            RecordBatch.NO_PRODUCER_EPOCH,
                            Errors.PRODUCER_FENCED
                        );
                    }
                }
                
                // 更新或递增Producer纪元
                short newEpoch = (short) (existingMetadata.producerEpoch + 1);
                if (newEpoch > Short.MAX_VALUE) {
                    // 纪元溢出,分配新的ProducerId
                    ProducerIdAndEpoch newProducerIdAndEpoch = producerIdManager.generateProducerId();
                    newEpoch = newProducerIdAndEpoch.epoch;
                    existingMetadata.producerId = newProducerIdAndEpoch.producerId;
                }
                
                existingMetadata.producerEpoch = newEpoch;
                existingMetadata.transactionTimeoutMs = transactionTimeoutMs;
                existingMetadata.state = TransactionState.EMPTY;
                existingMetadata.partitions.clear();
                existingMetadata.txnStartTimestamp = time.milliseconds();
                existingMetadata.txnLastUpdateTimestamp = time.milliseconds();
                
                // 持久化更新的元数据
                persistTransactionMetadata(txnId, existingMetadata);
                
                info("更新事务ID {} 的ProducerEpoch: {} -> {}", 
                    txnId, existingMetadata.producerEpoch - 1, newEpoch);
                
                return new InitProducerIdResult(
                    existingMetadata.producerId,
                    newEpoch,
                    Errors.NONE
                );
            }
        });
    }
    
    /**
     * 处理AddPartitionsToTxn请求
     * 将分区添加到活跃事务中
     */
    public AddPartitionsToTxnResult handleAddPartitionsToTxn(String transactionalId,
                                                            long producerId,
                                                            short producerEpoch,
                                                            Set<TopicPartition> partitions) {
        
        TransactionMetadata metadata = transactionMetadataMap.get(transactionalId);
        if (metadata == null) {
            return new AddPartitionsToTxnResult(Errors.UNKNOWN_PRODUCER_ID);
        }
        
        // 验证Producer信息
        if (metadata.producerId != producerId || metadata.producerEpoch != producerEpoch) {
            return new AddPartitionsToTxnResult(Errors.PRODUCER_FENCED);
        }
        
        synchronized (metadata) {
            // 检查事务状态
            if (metadata.state != TransactionState.EMPTY && 
                metadata.state != TransactionState.ONGOING) {
                return new AddPartitionsToTxnResult(Errors.INVALID_TXN_STATE);
            }
            
            // 转换到ONGOING状态(如果还不是)
            if (metadata.state == TransactionState.EMPTY) {
                metadata.transitionTo(TransactionState.ONGOING);
                metadata.txnStartTimestamp = time.milliseconds();
            }
            
            // 添加分区到事务
            Set<TopicPartition> newPartitions = new HashSet<>(partitions);
            newPartitions.removeAll(metadata.partitions); // 移除已存在的分区
            
            if (!newPartitions.isEmpty()) {
                metadata.partitions.addAll(newPartitions);
                metadata.txnLastUpdateTimestamp = time.milliseconds();
                
                // 持久化事务元数据
                persistTransactionMetadata(transactionalId, metadata);
                
                info("向事务 {} 添加分区:{}", transactionalId, newPartitions);
            }
            
            return new AddPartitionsToTxnResult(Errors.NONE);
        }
    }
    
    /**
     * 处理EndTxn请求
     * 实现两阶段提交协议完成事务
     */
    public EndTxnResult handleEndTxn(String transactionalId,
                                   long producerId,
                                   short producerEpoch,
                                   TransactionResult result) {
        
        TransactionMetadata metadata = transactionMetadataMap.get(transactionalId);
        if (metadata == null) {
            return new EndTxnResult(Errors.UNKNOWN_PRODUCER_ID);
        }
        
        // 验证Producer信息
        if (metadata.producerId != producerId || metadata.producerEpoch != producerEpoch) {
            return new EndTxnResult(Errors.PRODUCER_FENCED);
        }
        
        synchronized (metadata) {
            // 检查事务状态
            if (metadata.state != TransactionState.ONGOING) {
                return new EndTxnResult(Errors.INVALID_TXN_STATE);
            }
            
            try {
                if (result == TransactionResult.COMMIT) {
                    // 开始两阶段提交
                    return handleCommitTransaction(metadata);
                } else {
                    // 开始两阶段中止
                    return handleAbortTransaction(metadata);
                }
            } catch (Exception e) {
                error("处理事务结束时发生异常:transactionalId={}", transactionalId, e);
                return new EndTxnResult(Errors.UNKNOWN_SERVER_ERROR);
            }
        }
    }
    
    /**
     * 处理事务提交
     * 实现两阶段提交协议的提交阶段
     */
    private EndTxnResult handleCommitTransaction(TransactionMetadata metadata) {
        String transactionalId = metadata.transactionalId;
        
        info("开始提交事务:{}, 涉及分区:{}", transactionalId, metadata.partitions);
        
        // 第一阶段:准备提交
        metadata.transitionTo(TransactionState.PREPARE_COMMIT);
        metadata.txnLastUpdateTimestamp = time.milliseconds();
        
        // 持久化准备提交状态
        persistTransactionMetadata(transactionalId, metadata);
        
        // 第二阶段:写入控制消息到各个分区
        List<CompletableFuture<Void>> writeFutures = new ArrayList<>();
        
        for (TopicPartition partition : metadata.partitions) {
            CompletableFuture<Void> future = writeTransactionControlMessage(
                partition, 
                metadata.producerId, 
                metadata.producerEpoch,
                ControlRecordType.COMMIT
            );
            writeFutures.add(future);
        }
        
        // 等待所有控制消息写入完成
        CompletableFuture<Void> allWrites = CompletableFuture.allOf(
            writeFutures.toArray(new CompletableFuture[0]));
        
        allWrites.whenComplete((result, exception) -> {
            synchronized (metadata) {
                if (exception == null) {
                    // 所有分区写入成功,完成提交
                    metadata.transitionTo(TransactionState.COMPLETE_COMMIT);
                    info("事务 {} 提交成功", transactionalId);
                } else {
                    // 写入失败,事务进入错误状态
                    error("事务 {} 提交失败", transactionalId, exception);
                    metadata.transitionTo(TransactionState.DEAD);
                }
                
                metadata.txnLastUpdateTimestamp = time.milliseconds();
                persistTransactionMetadata(transactionalId, metadata);
            }
        });
        
        return new EndTxnResult(Errors.NONE);
    }
    
    /**
     * 写入事务控制消息
     * 向指定分区写入COMMIT或ABORT控制消息
     */
    private CompletableFuture<Void> writeTransactionControlMessage(TopicPartition partition,
                                                                  long producerId,
                                                                  short producerEpoch,
                                                                  ControlRecordType controlType) {
        
        return CompletableFuture.runAsync(() -> {
            try {
                // 构建控制消息
                ByteBuffer controlMessageKey = ControlRecordType.serialize(controlType);
                ByteBuffer controlMessageValue = ByteBuffer.allocate(0); // 控制消息值为空
                
                MemoryRecords controlRecords = MemoryRecords.withControlRecords(
                    producerId,
                    producerEpoch,
                    controlMessageKey,
                    controlMessageValue
                );
                
                // 写入到分区
                replicaManager.appendRecords(
                    timeout = config.requestTimeoutMs(),
                    requiredAcks = -1, // 需要所有ISR确认
                    internalTopicsAllowed = true,
                    origin = AppendOrigin.Coordinator,
                    entriesPerPartition = Map.of(partition -> controlRecords),
                    responseCallback = (responses) -> {
                        PartitionResponse response = responses.get(partition);
                        if (response.error != Errors.NONE) {
                            throw new RuntimeException("写入控制消息失败: " + response.error);
                        }
                        
                        debug("成功写入 {} 控制消息到分区 {}", controlType, partition);
                    }
                );
                
            } catch (Exception e) {
                error("写入控制消息失败:partition={}, controlType={}", partition, controlType, e);
                throw new RuntimeException(e);
            }
        }, transactionExecutor);
    }
    
    /**
     * 事务超时处理
     * 定期检查并处理超时的事务
     */
    public void handleTransactionTimeout() {
        long currentTimeMs = time.milliseconds();
        
        List<String> expiredTransactions = new ArrayList<>();
        
        transactionMetadataMap.forEach((transactionalId, metadata) -> {
            synchronized (metadata) {
                long transactionDurationMs = currentTimeMs - metadata.txnStartTimestamp;
                
                if (transactionDurationMs > metadata.transactionTimeoutMs) {
                    expiredTransactions.add(transactionalId);
                    
                    warn("事务 {} 超时,持续时间: {} ms,超时阈值: {} ms", 
                        transactionalId, transactionDurationMs, metadata.transactionTimeoutMs);
                }
            }
        });
        
        // 中止过期的事务
        for (String transactionalId : expiredTransactions) {
            try {
                forceAbortTransaction(transactionalId, "事务超时");
            } catch (Exception e) {
                error("强制中止超时事务失败:{}", transactionalId, e);
            }
        }
        
        if (!expiredTransactions.isEmpty()) {
            info("处理了 {} 个超时事务", expiredTransactions.size());
        }
    }
    
    /**
     * 强制中止事务
     * 由于超时或其他异常情况强制中止事务
     */
    private void forceAbortTransaction(String transactionalId, String reason) {
        TransactionMetadata metadata = transactionMetadataMap.get(transactionalId);
        if (metadata == null) {
            return;
        }
        
        synchronized (metadata) {
            info("强制中止事务 {},原因: {}", transactionalId, reason);
            
            // 转换到准备中止状态
            metadata.transitionTo(TransactionState.PREPARE_ABORT);
            metadata.txnLastUpdateTimestamp = time.milliseconds();
            
            // 持久化状态变更
            persistTransactionMetadata(transactionalId, metadata);
            
            // 写入中止控制消息到所有分区
            List<CompletableFuture<Void>> abortFutures = new ArrayList<>();
            
            for (TopicPartition partition : metadata.partitions) {
                CompletableFuture<Void> future = writeTransactionControlMessage(
                    partition,
                    metadata.producerId,
                    metadata.producerEpoch,
                    ControlRecordType.ABORT
                );
                abortFutures.add(future);
            }
            
            // 等待所有中止消息写入完成
            CompletableFuture.allOf(abortFutures.toArray(new CompletableFuture[0]))
                .whenComplete((result, exception) -> {
                    synchronized (metadata) {
                        metadata.transitionTo(TransactionState.COMPLETE_ABORT);
                        metadata.txnLastUpdateTimestamp = time.milliseconds();
                        persistTransactionMetadata(transactionalId, metadata);
                        
                        info("事务 {} 强制中止完成", transactionalId);
                    }
                });
        }
    }
    
    /**
     * 持久化事务元数据
     * 将事务状态写入__transaction_state主题
     */
    private void persistTransactionMetadata(String transactionalId, TransactionMetadata metadata) {
        try {
            // 序列化事务元数据
            ByteBuffer key = TransactionMetadataKey.serialize(transactionalId);
            ByteBuffer value = TransactionMetadataValue.serialize(metadata);
            
            MemoryRecords record = MemoryRecords.withRecords(
                CompressionType.NONE,
                new SimpleRecord(key.array(), value.array())
            );
            
            // 计算事务状态主题的分区
            int partition = Utils.abs(transactionalId.hashCode()) % transactionTopicPartitionCount;
            TopicPartition txnTopicPartition = new TopicPartition(
                Topic.TRANSACTION_STATE_TOPIC_NAME, partition);
            
            // 写入事务状态主题
            replicaManager.appendRecords(
                timeout = config.requestTimeoutMs(),
                requiredAcks = -1, // 需要所有ISR确认
                internalTopicsAllowed = true,
                origin = AppendOrigin.Coordinator,
                entriesPerPartition = Map.of(txnTopicPartition -> record),
                responseCallback = (responses) -> {
                    PartitionResponse response = responses.get(txnTopicPartition);
                    if (response.error != Errors.NONE) {
                        throw new RuntimeException("持久化事务元数据失败: " + response.error);
                    }
                    
                    debug("成功持久化事务元数据:transactionalId={}", transactionalId);
                }
            );
            
        } catch (Exception e) {
            error("持久化事务元数据失败:transactionalId={}", transactionalId, e);
            throw new RuntimeException(e);
        }
    }
}

/**
 * TransactionMetadata - 事务元数据
 * 维护单个事务的完整状态信息
 */
public class TransactionMetadata {
    
    public final String transactionalId;           // 事务ID
    public volatile long producerId;               // 生产者ID
    public volatile short producerEpoch;           // 生产者纪元
    public volatile int transactionTimeoutMs;      // 事务超时时间
    public volatile TransactionState state;        // 事务状态
    public final Set<TopicPartition> partitions;   // 参与事务的分区
    public volatile long txnStartTimestamp;        // 事务开始时间
    public volatile long txnLastUpdateTimestamp;   // 最后更新时间
    
    // 状态转换锁
    private final ReentrantLock stateLock = new ReentrantLock();
    
    /**
     * 事务状态转换
     * 确保状态转换的合法性和一致性
     */
    public void transitionTo(TransactionState newState) {
        stateLock.lock();
        try {
            TransactionState currentState = this.state;
            
            // 验证状态转换的合法性
            if (!isValidStateTransition(currentState, newState)) {
                throw new IllegalStateException(
                    String.format("非法的事务状态转换:%s -> %s", currentState, newState));
            }
            
            debug("事务 {} 状态转换:{} -> {}", transactionalId, currentState, newState);
            
            this.state = newState;
            this.txnLastUpdateTimestamp = System.currentTimeMillis();
            
        } finally {
            stateLock.unlock();
        }
    }
    
    /**
     * 验证状态转换的合法性
     */
    private boolean isValidStateTransition(TransactionState from, TransactionState to) {
        switch (from) {
            case EMPTY:
                return to == TransactionState.ONGOING;
            case ONGOING:
                return to == TransactionState.PREPARE_COMMIT || 
                       to == TransactionState.PREPARE_ABORT ||
                       to == TransactionState.DEAD;
            case PREPARE_COMMIT:
                return to == TransactionState.COMPLETE_COMMIT ||
                       to == TransactionState.DEAD;
            case PREPARE_ABORT:
                return to == TransactionState.COMPLETE_ABORT ||
                       to == TransactionState.DEAD;
            case COMPLETE_COMMIT:
            case COMPLETE_ABORT:
                return to == TransactionState.EMPTY ||
                       to == TransactionState.DEAD;
            case DEAD:
                return to == TransactionState.EMPTY;
            default:
                return false;
        }
    }
    
    /**
     * 检查事务是否可以提交
     */
    public boolean canCommit() {
        stateLock.lock();
        try {
            return state == TransactionState.ONGOING && !partitions.isEmpty();
        } finally {
            stateLock.unlock();
        }
    }
    
    /**
     * 检查事务是否已超时
     */
    public boolean isExpired(long currentTimeMs) {
        return currentTimeMs - txnStartTimestamp > transactionTimeoutMs;
    }
}

4. 分区分配器深度实现

4.1 分区分配算法对比

/**
 * 分区分配器实现对比分析
 * 展示不同分配算法的特点和适用场景
 */
public class PartitionAssignors {
    
    /**
     * RangeAssignor - 范围分配器
     * 按主题分别进行范围分配,可能导致分配不均
     */
    public static class RangeAssignor implements ConsumerGroupPartitionAssignor {
        
        @Override
        public String name() {
            return "range";
        }
        
        @Override
        public Map<String, ConsumerGroupMemberAssignment> assign(
            ConsumerGroupPartitionAssignorContext context) {
            
            Map<String, List<String>> topicToMembers = new HashMap<>();
            Map<String, ConsumerGroupMemberAssignment> assignments = new HashMap<>();
            
            // 按主题分组成员
            context.memberSubscriptions().forEach((memberId, subscription) -> {
                subscription.topics().forEach(topic -> {
                    topicToMembers.computeIfAbsent(topic, k -> new ArrayList<>()).add(memberId);
                });
                
                // 初始化分配结果
                assignments.put(memberId, new ConsumerGroupMemberAssignment()
                    .setPartitions(new ArrayList<>()));
            });
            
            // 为每个主题执行范围分配
            topicToMembers.forEach((topic, members) -> {
                // 获取主题的分区信息
                TopicMetadata topicMetadata = context.topicMetadata().get(topic);
                if (topicMetadata == null) {
                    return;
                }
                
                int partitionCount = topicMetadata.numPartitions();
                int memberCount = members.size();
                
                // 排序成员以保证一致性
                List<String> sortedMembers = members.stream().sorted().collect(Collectors.toList());
                
                // 计算每个成员应分配的分区数
                int partitionsPerMember = partitionCount / memberCount;
                int extraPartitions = partitionCount % memberCount;
                
                debug("主题 {} 范围分配:{} 个分区分配给 {} 个成员", 
                     topic, partitionCount, memberCount);
                
                int currentPartition = 0;
                for (int i = 0; i < sortedMembers.size(); i++) {
                    String memberId = sortedMembers.get(i);
                    
                    // 前几个成员多分配一个分区(如果有余数)
                    int numPartitionsForMember = partitionsPerMember + (i < extraPartitions ? 1 : 0);
                    
                    // 分配连续的分区范围
                    for (int j = 0; j < numPartitionsForMember; j++) {
                        assignments.get(memberId).partitions().add(
                            new ConsumerGroupMemberAssignment.TopicPartition()
                                .setTopicId(topicMetadata.topicId())
                                .setPartitions(Collections.singletonList(currentPartition))
                        );
                        currentPartition++;
                    }
                    
                    debug("成员 {} 分配到主题 {} 的分区 {} - {}", 
                         memberId, topic, 
                         currentPartition - numPartitionsForMember, 
                         currentPartition - 1);
                }
            });
            
            return assignments;
        }
    }
    
    /**
     * StickyAssignor - 粘性分配器
     * 在再平衡时尽量保持现有分配,减少数据迁移
     */
    public static class StickyAssignor implements ConsumerGroupPartitionAssignor {
        
        @Override
        public String name() {
            return "sticky";
        }
        
        @Override
        public Map<String, ConsumerGroupMemberAssignment> assign(
            ConsumerGroupPartitionAssignorContext context) {
            
            Map<String, List<TopicPartition>> currentAssignments = getCurrentAssignments(context);
            Map<String, Set<String>> memberSubscriptions = getMemberSubscriptions(context);
            Set<TopicPartition> allPartitions = getAllPartitions(context);
            
            debug("粘性分配开始:当前分配={}, 成员订阅={}, 总分区={}", 
                 currentAssignments.size(), memberSubscriptions.size(), allPartitions.size());
            
            // 第一步:保留有效的现有分配
            Map<String, List<TopicPartition>> newAssignments = new HashMap<>();
            Set<TopicPartition> unassignedPartitions = new HashSet<>(allPartitions);
            
            currentAssignments.forEach((memberId, partitions) -> {
                if (memberSubscriptions.containsKey(memberId)) {
                    List<TopicPartition> validPartitions = partitions.stream()
                        .filter(tp -> memberSubscriptions.get(memberId).contains(tp.topic()))
                        .collect(Collectors.toList());
                    
                    newAssignments.put(memberId, validPartitions);
                    unassignedPartitions.removeAll(validPartitions);
                    
                    debug("成员 {} 保留 {} 个现有分区分配", memberId, validPartitions.size());
                }
            });
            
            // 第二步:分配未分配的分区
            List<String> sortedMembers = memberSubscriptions.keySet().stream()
                .sorted().collect(Collectors.toList());
            
            for (TopicPartition partition : unassignedPartitions) {
                // 找到订阅了该主题且分区数最少的成员
                String targetMember = sortedMembers.stream()
                    .filter(memberId -> memberSubscriptions.get(memberId).contains(partition.topic()))
                    .min(Comparator.comparingInt(memberId -> 
                        newAssignments.getOrDefault(memberId, Collections.emptyList()).size()))
                    .orElse(null);
                
                if (targetMember != null) {
                    newAssignments.computeIfAbsent(targetMember, k -> new ArrayList<>())
                        .add(partition);
                    
                    debug("将未分配分区 {} 分配给成员 {}", partition, targetMember);
                }
            }
            
            // 第三步:平衡分配(如果需要)
            balanceAssignments(newAssignments, memberSubscriptions, context);
            
            // 转换为协调器期望的格式
            return convertToMemberAssignments(newAssignments, context);
        }
        
        /**
         * 平衡分区分配
         * 确保分配尽可能均匀
         */
        private void balanceAssignments(Map<String, List<TopicPartition>> assignments,
                                      Map<String, Set<String>> subscriptions,
                                      ConsumerGroupPartitionAssignorContext context) {
            
            boolean changed = true;
            int iterations = 0;
            int maxIterations = assignments.size() * 10; // 防止无限循环
            
            while (changed && iterations < maxIterations) {
                changed = false;
                iterations++;
                
                // 计算理想的平均分配数
                int totalPartitions = assignments.values().stream()
                    .mapToInt(List::size).sum();
                double avgPartitionsPerMember = (double) totalPartitions / assignments.size();
                
                // 找出分配过多和过少的成员
                List<String> overloadedMembers = new ArrayList<>();
                List<String> underloadedMembers = new ArrayList<>();
                
                assignments.forEach((memberId, partitions) -> {
                    int memberPartitionCount = partitions.size();
                    if (memberPartitionCount > Math.ceil(avgPartitionsPerMember)) {
                        overloadedMembers.add(memberId);
                    } else if (memberPartitionCount < Math.floor(avgPartitionsPerMember)) {
                        underloadedMembers.add(memberId);
                    }
                });
                
                // 从过载成员向不足成员转移分区
                for (String overloadedMember : overloadedMembers) {
                    if (underloadedMembers.isEmpty()) {
                        break;
                    }
                    
                    List<TopicPartition> partitions = assignments.get(overloadedMember);
                    if (partitions.size() <= Math.ceil(avgPartitionsPerMember)) {
                        continue;
                    }
                    
                    // 选择一个可转移的分区
                    TopicPartition partitionToTransfer = partitions.get(partitions.size() - 1);
                    
                    // 找到可以接收该分区的不足成员
                    String targetMember = underloadedMembers.stream()
                        .filter(memberId -> subscriptions.get(memberId).contains(partitionToTransfer.topic()))
                        .findFirst()
                        .orElse(null);
                    
                    if (targetMember != null) {
                        // 执行转移
                        partitions.remove(partitionToTransfer);
                        assignments.get(targetMember).add(partitionToTransfer);
                        
                        changed = true;
                        
                        debug("将分区 {} 从成员 {} 转移到成员 {}", 
                             partitionToTransfer, overloadedMember, targetMember);
                        
                        // 检查目标成员是否仍需要更多分区
                        if (assignments.get(targetMember).size() >= Math.floor(avgPartitionsPerMember)) {
                            underloadedMembers.remove(targetMember);
                        }
                    }
                }
            }
            
            debug("分区平衡完成,执行了 {} 次迭代", iterations);
        }
    }
    
    /**
     * CooperativeStickyAssignor - 协作粘性分配器
     * 支持增量再平衡,减少分区迁移的影响
     */
    public static class CooperativeStickyAssignor extends StickyAssignor {
        
        @Override
        public String name() {
            return "cooperative-sticky";
        }
        
        /**
         * 执行协作再平衡
         * 分多轮进行,每轮只迁移部分分区
         */
        @Override
        public Map<String, ConsumerGroupMemberAssignment> assign(
            ConsumerGroupPartitionAssignorContext context) {
            
            Map<String, List<TopicPartition>> currentAssignments = getCurrentAssignments(context);
            Map<String, List<TopicPartition>> idealAssignments = super.assign(context);
            
            // 计算需要迁移的分区
            Map<String, Set<TopicPartition>> partitionsToRevoke = new HashMap<>();
            Map<String, Set<TopicPartition>> partitionsToAssign = new HashMap<>();
            
            // 分析分配变化
            currentAssignments.forEach((memberId, currentPartitions) -> {
                List<TopicPartition> idealPartitions = idealAssignments.getOrDefault(memberId, Collections.emptyList());
                
                // 需要撤销的分区
                Set<TopicPartition> toRevoke = new HashSet<>(currentPartitions);
                toRevoke.removeAll(idealPartitions);
                if (!toRevoke.isEmpty()) {
                    partitionsToRevoke.put(memberId, toRevoke);
                }
                
                // 需要新分配的分区
                Set<TopicPartition> toAssign = new HashSet<>(idealPartitions);
                toAssign.removeAll(currentPartitions);
                if (!toAssign.isEmpty()) {
                    partitionsToAssign.put(memberId, toAssign);
                }
            });
            
            // 如果变化较大,分批次进行
            int totalChanges = partitionsToRevoke.values().stream().mapToInt(Set::size).sum() +
                              partitionsToAssign.values().stream().mapToInt(Set::size).sum();
            
            if (totalChanges > context.memberSubscriptions().size()) {
                // 变化较大,采用渐进式再平衡
                return performIncrementalRebalance(currentAssignments, idealAssignments, context);
            } else {
                // 变化较小,直接应用新分配
                return convertToMemberAssignments(idealAssignments, context);
            }
        }
        
        /**
         * 执行渐进式再平衡
         * 将大的分配变更分解为多个小步骤
         */
        private Map<String, ConsumerGroupMemberAssignment> performIncrementalRebalance(
            Map<String, List<TopicPartition>> current,
            Map<String, List<TopicPartition>> target,
            ConsumerGroupPartitionAssignorContext context) {
            
            Map<String, List<TopicPartition>> incrementalAssignments = new HashMap<>(current);
            
            // 计算本轮可以迁移的分区数(限制为总变更的1/3)
            int maxChangesThisRound = Math.max(1, 
                (int) (context.memberSubscriptions().size() * 0.33));
            
            int changesApplied = 0;
            
            // 优先处理分区撤销(为新分配腾出空间)
            for (Map.Entry<String, List<TopicPartition>> entry : target.entrySet()) {
                String memberId = entry.getKey();
                List<TopicPartition> targetPartitions = entry.getValue();
                List<TopicPartition> currentPartitions = incrementalAssignments.getOrDefault(
                    memberId, Collections.emptyList());
                
                // 计算需要撤销的分区
                List<TopicPartition> toRevoke = currentPartitions.stream()
                    .filter(tp -> !targetPartitions.contains(tp))
                    .limit(maxChangesThisRound - changesApplied)
                    .collect(Collectors.toList());
                
                if (!toRevoke.isEmpty()) {
                    List<TopicPartition> newAssignment = new ArrayList<>(currentPartitions);
                    newAssignment.removeAll(toRevoke);
                    incrementalAssignments.put(memberId, newAssignment);
                    
                    changesApplied += toRevoke.size();
                    
                    info("渐进式再平衡:从成员 {} 撤销分区 {}", memberId, toRevoke);
                    
                    if (changesApplied >= maxChangesThisRound) {
                        break;
                    }
                }
            }
            
            info("渐进式再平衡本轮完成,应用了 {} 个变更", changesApplied);
            
            return convertToMemberAssignments(incrementalAssignments, context);
        }
    }
}

5. 监控与故障恢复

5.1 协调器监控指标

/**
 * GroupCoordinatorMetrics - 组协调器监控指标
 * 提供详细的组管理和再平衡性能监控
 */
public class GroupCoordinatorMetrics extends KafkaMetricsGroup {
    
    // 组管理指标
    private final Sensor groupCreationRate;           // 组创建速率
    private final Sensor groupDeletionRate;           // 组删除速率
    private final Sensor activeGroupCount;            // 活跃组数量
    private final Sensor memberJoinRate;              // 成员加入速率
    private final Sensor memberLeaveRate;             // 成员离开速率
    
    // 再平衡指标
    private final Sensor rebalanceRate;               // 再平衡速率
    private final Histogram rebalanceDuration;        // 再平衡持续时间
    private final Sensor rebalanceFailureRate;        // 再平衡失败率
    
    // 心跳指标
    private final Sensor heartbeatRate;               // 心跳速率
    private final Histogram heartbeatLatency;         // 心跳延迟
    
    // 分区分配指标
    private final Histogram assignmentComputeTime;    // 分配计算时间
    private final Sensor assignmentChangeRate;        // 分配变更率

    public GroupCoordinatorMetrics(Metrics metrics) {
        String groupName = "group-coordinator-metrics";
        
        // 初始化组管理指标
        MetricName creationRateMetricName = metrics.metricName("group-creation-rate", groupName, 
            "每秒创建的消费者组数");
        groupCreationRate = metrics.sensor("group-creation-rate");
        groupCreationRate.add(creationRateMetricName, new Rate());
        
        MetricName activeGroupCountMetricName = metrics.metricName("active-group-count", groupName, 
            "当前活跃的消费者组数量");
        activeGroupCount = metrics.sensor("active-group-count");
        activeGroupCount.add(activeGroupCountMetricName, new Value());
        
        // 初始化再平衡指标
        MetricName rebalanceRateMetricName = metrics.metricName("rebalance-rate", groupName, 
            "每秒再平衡次数");
        rebalanceRate = metrics.sensor("rebalance-rate");
        rebalanceRate.add(rebalanceRateMetricName, new Rate());
        
        MetricName rebalanceDurationMetricName = metrics.metricName("rebalance-duration-avg", groupName, 
            "再平衡平均持续时间(毫秒)");
        rebalanceDuration = metrics.histogram("rebalance-duration");
        rebalanceDuration.add(rebalanceDurationMetricName, new Avg());
    }
    
    /**
     * 记录再平衡事件
     */
    public void recordRebalance(long durationMs, boolean success) {
        rebalanceRate.record();
        rebalanceDuration.record(durationMs);
        
        if (!success) {
            rebalanceFailureRate.record();
        }
    }
    
    /**
     * 记录组创建事件
     */
    public void recordGroupCreation() {
        groupCreationRate.record();
    }
    
    /**
     * 更新活跃组数量
     */
    public void updateActiveGroupCount(int count) {
        activeGroupCount.record(count);
    }
}

/**
 * 协调器故障恢复机制
 */
public class CoordinatorFailureRecovery {
    
    /**
     * 协调器故障转移处理
     * 当协调器所在Broker失效时的处理逻辑
     */
    public static class CoordinatorFailover {
        
        /**
         * 处理协调器迁移
         * 将组协调器从失效的Broker迁移到新的Broker
         */
        public void handleCoordinatorMigration(List<String> affectedGroups, 
                                             int newCoordinatorBrokerId) {
            
            info("开始协调器迁移,受影响的组数量: {}, 新协调器: {}", 
                affectedGroups.size(), newCoordinatorBrokerId);
            
            for (String groupId : affectedGroups) {
                try {
                    // 从__consumer_offsets主题恢复组状态
                    ConsumerGroupMetadata groupMetadata = loadGroupMetadataFromLog(groupId);
                    
                    if (groupMetadata != null) {
                        // 重建组状态
                        rebuildGroupState(groupId, groupMetadata, newCoordinatorBrokerId);
                        
                        // 通知所有成员重新发现协调器
                        notifyMembersOfCoordinatorChange(groupId, newCoordinatorBrokerId);
                        
                        debug("成功迁移组 {} 到新协调器 {}", groupId, newCoordinatorBrokerId);
                    }
                    
                } catch (Exception e) {
                    error("迁移组 {} 失败", groupId, e);
                }
            }
            
            info("协调器迁移完成");
        }
        
        /**
         * 从日志恢复组元数据
         */
        private ConsumerGroupMetadata loadGroupMetadataFromLog(String groupId) {
            try {
                // 计算组对应的__consumer_offsets分区
                int partition = Utils.abs(groupId.hashCode()) % config.offsetsTopicNumPartitions();
                TopicPartition offsetsPartition = new TopicPartition(
                    Topic.GROUP_METADATA_TOPIC_NAME, partition);
                
                // 读取分区中的所有记录
                UnifiedLog offsetsLog = logManager.getLog(offsetsPartition);
                if (offsetsLog == null) {
                    warn("__consumer_offsets分区 {} 不存在", partition);
                    return null;
                }
                
                // 扫描日志找到组相关的记录
                ConsumerGroupMetadata.Builder metadataBuilder = new ConsumerGroupMetadata.Builder()
                    .setGroupId(groupId);
                
                long startOffset = offsetsLog.logStartOffset();
                long endOffset = offsetsLog.logEndOffset();
                
                for (long offset = startOffset; offset < endOffset; offset += 1000) {
                    FetchDataInfo fetchInfo = offsetsLog.read(offset, 64 * 1024, 
                        FetchIsolation.FETCH_LOG_END, false);
                    
                    for (RecordBatch batch : fetchInfo.records.batches()) {
                        for (Record record : batch) {
                            // 解析组元数据记录
                            if (isGroupMetadataRecord(record, groupId)) {
                                updateGroupMetadataFromRecord(metadataBuilder, record);
                            }
                        }
                    }
                }
                
                return metadataBuilder.build();
                
            } catch (Exception e) {
                error("从日志恢复组 {} 的元数据失败", groupId, e);
                return null;
            }
        }
        
        /**
         * 重建组状态
         * 在新的协调器上重建组的完整状态
         */
        private void rebuildGroupState(String groupId, 
                                     ConsumerGroupMetadata metadata, 
                                     int coordinatorBrokerId) {
            
            // 创建新的组对象
            ConsumerGroup rebuiltGroup = new ConsumerGroup(
                groupId = groupId,
                state = metadata.state(),
                members = metadata.members(),
                assignmentStrategy = metadata.assignmentStrategy(),
                generation = metadata.generation()
            );
            
            // 注册到新的协调器
            GroupCoordinatorShard coordinatorShard = getCoordinatorShard(coordinatorBrokerId);
            coordinatorShard.addGroup(rebuiltGroup);
            
            // 如果组处于再平衡状态,继续再平衡流程
            if (rebuiltGroup.state() == ConsumerGroupState.PREPARING_REBALANCE) {
                scheduleRebalanceCompletion(rebuiltGroup);
            }
            
            info("在协调器 {} 上重建组 {} 状态完成", coordinatorBrokerId, groupId);
        }
    }
}

6. 总结

Kafka协调器模块通过精心设计的分布式协调机制,实现了可靠的组管理和事务支持:

6.1 核心设计优势

  • 分片化管理:通过分片将协调器负载分散到多个Broker
  • 状态机设计:清晰的状态转换保证操作的一致性和可预测性
  • 持久化存储:使用内部主题持久化协调器状态,支持故障恢复
  • 增量再平衡:协作式分配器减少再平衡对消费的影响

6.2 事务协调亮点

  • 两阶段提交:确保跨分区事务的ACID特性
  • Producer隔离:通过ProducerId和Epoch实现Producer隔离
  • 超时管理:自动检测和处理超时事务
  • 控制消息:使用特殊控制消息标记事务边界

6.3 性能优化特性

  • 批量操作:减少协调器操作的网络开销
  • 智能分配:多种分配算法适应不同场景需求
  • 故障快速恢复:协调器迁移机制保证高可用性

通过深入理解协调器的实现原理,我们能够更好地设计分布式应用,合理配置消费者组,充分利用Kafka的协调功能。

7. 关键函数与调用链(补充)

  • 说明:聚焦组与事务两条关键路径的函数级代码、调用链与时序补充,便于与Broker/Client两侧联动排障。

7.1 关键函数核心代码与说明(精要)

// 组心跳入口(服务层摘要)
public CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
    AuthorizableRequestContext context,
    ConsumerGroupHeartbeatRequestData request) {
  return runtime.scheduleWriteOperation(
    "consumer-group-heartbeat", topicPartitionFor(request.groupId()),
    coord -> coord.consumerGroupHeartbeat(context, request)
  ).thenApply(CoordinatorResult::response);
}
  • 功能:将心跳写操作调度到对应分片,返回协调后的应答。
// 分片:现有成员心跳(摘要)
private ConsumerGroupHeartbeatResponseData handleConsumerGroupMemberHeartbeat(
  AuthorizableRequestContext ctx,
  ConsumerGroupHeartbeatRequestData req,
  ConsumerGroup group,
  List<CoordinatorRecord> records) {
  ConsumerGroupMember member = group.member(req.memberId());
  if (member == null) throw new UnknownMemberIdException("unknown member");
  if (req.memberEpoch() < member.memberEpoch()) throw new FencedMemberEpochException("fenced");
  member.updateLastHeartbeat(time.milliseconds());
  List<TopicPartition> target = group.targetAssignment(req.memberId());
  if (!member.assignedPartitions().equals(target)) {
    member.updateAssignment(target);
    records.add(buildAssignmentRecord(group.groupId(), req.memberId(), target));
  }
  return new ConsumerGroupHeartbeatResponseData()
    .setMemberId(req.memberId())
    .setMemberEpoch(member.memberEpoch())
    .setHeartbeatIntervalMs(config.consumerGroupHeartbeatIntervalMs());
}
  • 功能:校验纪元与成员有效性、按目标分配更新成员并产出变更记录。
// 事务结束(摘要)
public EndTxnResult handleEndTxn(String transactionalId, long producerId, short producerEpoch, TransactionResult result) {
  TransactionMetadata md = transactionMetadataMap.get(transactionalId);
  if (md == null) return new EndTxnResult(Errors.UNKNOWN_PRODUCER_ID);
  synchronized (md) {
    if (md.state != TransactionState.ONGOING) return new EndTxnResult(Errors.INVALID_TXN_STATE);
    return result == TransactionResult.COMMIT ? handleCommitTransaction(md) : handleAbortTransaction(md);
  }
}
  • 功能:提交/中止写入控制消息,最终推进 LSO。

7.2 调用链(组与事务)

flowchart LR
  C[Consumer] --> HB[consumerGroupHeartbeat]
  HB --> RT[CoordinatorRuntime.scheduleWriteOperation]
  RT --> SH[GroupCoordinatorShard.consumerGroupHeartbeat]
  SH --> REC[CoordinatorRecord 生成]
  SH --> RESP[Heartbeat Response]
flowchart LR
  P[Transactional Producer] --> ET[handleEndTxn]
  ET --> COM[handleCommitTransaction]
  COM --> RM[ReplicaManager.appendRecords(Control)]
  RM --> Partitions
  Partitions --> LSO[推进LSO]

7.3 补充时序图(心跳与分配更新)

sequenceDiagram
  participant C as Consumer
  participant G as GroupCoordinatorService
  participant S as GroupCoordinatorShard

  C->>G: ConsumerGroupHeartbeat
  G->>S: scheduleWriteOperation
  S->>S: 校验纪元/成员
  alt 分配变化
    S->>S: 产出Assignment记录
  end
  S-->>G: Response
  G-->>C: HeartbeatResponse

7.4 类结构图(简化)

classDiagram
  class GroupCoordinatorService
  class CoordinatorRuntime
  class GroupCoordinatorShard
  class TransactionStateManager
  class TransactionMetadata

  GroupCoordinatorService --> CoordinatorRuntime
  CoordinatorRuntime --> GroupCoordinatorShard
  GroupCoordinatorShard --> TransactionStateManager
  TransactionStateManager --> TransactionMetadata

本文档了Kafka协调器模块的核心实现机制,为分布式协调和事务处理提供了详实的技术指导。