概述

Kafka协调器模块是实现分布式协调功能的核心组件。GroupCoordinator负责管理消费者组的成员关系和分区分配,TransactionCoordinator负责事务的协调和管理。本文深入分析协调器的内部实现,揭示其在分布式环境下保证一致性和可用性的技术机制。

1. 协调器架构总览

1.1 协调器模块整体架构图

graph TB subgraph "Kafka协调器模块架构" subgraph "组协调器 Group Coordinator" GCS[GroupCoordinatorService 服务层] GCSH[GroupCoordinatorShard 分片层] CGM[ConsumerGroupManager 消费者组管理] SGM[StreamsGroupManager 流组管理] SHGM[ShareGroupManager 共享组管理] end subgraph "事务协调器 Transaction Coordinator" TCS[TransactionCoordinatorService 服务层] TCSH[TransactionCoordinatorShard 分片层] TSM[TransactionStateManager 状态管理] TTM[TransactionTopicManager 事务主题管理] end subgraph "协调器运行时 Coordinator Runtime" CR[CoordinatorRuntime 运行时] CL[CoordinatorLoader 加载器] CS[CoordinatorShard 分片接口] CT[CoordinatorTimer 定时器] end subgraph "持久化层 Persistence Layer" CRL[CoordinatorRecordLoader 记录加载器] CRS[CoordinatorRecordSerde 序列化器] SS[SnapshotStorage 快照存储] LS[LogStorage 日志存储] end subgraph "分区分配器 Partition Assignors" RRA[RangeAssignor 范围分配器] RDA[RoundRobinAssignor 轮询分配器] SA[StickyAssignor 粘性分配器] COPA[CooperativeStickyAssignor 协作粘性分配器] end %% 连接关系 GCS --> GCSH GCSH --> CGM GCSH --> SGM GCSH --> SHGM TCS --> TCSH TCSH --> TSM TCSH --> TTM GCSH --> CR TCSH --> CR CR --> CL CR --> CS CR --> CT CR --> CRL CR --> CRS CRL --> SS CRL --> LS CGM --> RRA CGM --> RDA CGM --> SA CGM --> COPA end style GCS fill:#e1f5fe style TCS fill:#e8f5e8 style CR fill:#f3e5f5 style CGM fill:#fff3e0

2. GroupCoordinator组协调器深度解析

2.1 消费者组再平衡完整流程

sequenceDiagram participant C1 as Consumer1 participant C2 as Consumer2 participant C3 as Consumer3 (新加入) participant GC as GroupCoordinator participant L as Leader Consumer participant KC as KafkaController Note over C1,KC: 消费者组再平衡完整流程 Note over C1,C3: 阶段1:触发再平衡 C3->>GC: JoinGroupRequest (memberEpoch=0) GC->>GC: 检测新成员加入 GC->>C1: HeartbeatResponse (REBALANCE_IN_PROGRESS) GC->>C2: HeartbeatResponse (REBALANCE_IN_PROGRESS) Note over C1,GC: 阶段2:PrepareRebalance C1->>GC: JoinGroupRequest C1->>C1: 停止消费,提交偏移量 C2->>GC: JoinGroupRequest C2->>C2: 停止消费,提交偏移量 C3->>GC: JoinGroupRequest GC->>GC: 等待所有成员JoinGroup GC->>GC: 选举Group Leader (通常是第一个加入的) Note over C1,GC: 阶段3:AwaitingSync GC->>C1: JoinGroupResponse (LEADER, 成员列表) GC->>C2: JoinGroupResponse (MEMBER) GC->>C3: JoinGroupResponse (MEMBER) Note over C1,GC: 阶段4:分区分配计算 C1->>C1: 根据分区分配器计算分配方案 C1->>GC: SyncGroupRequest (包含分配方案) C2->>GC: SyncGroupRequest (空) C3->>GC: SyncGroupRequest (空) Note over GC: 阶段5:Stable状态 GC->>C1: SyncGroupResponse (分配的分区) GC->>C2: SyncGroupResponse (分配的分区) GC->>C3: SyncGroupResponse (分配的分区) Note over C1,C3: 阶段6:开始消费 C1->>C1: 开始消费新分配的分区 C2->>C2: 开始消费新分配的分区 C3->>C3: 开始消费新分配的分区 Note over C1,C3: 正常心跳维持组成员关系 loop 定期心跳 C1->>GC: HeartbeatRequest C2->>GC: HeartbeatRequest C3->>GC: HeartbeatRequest GC->>C1: HeartbeatResponse (OK) GC->>C2: HeartbeatResponse (OK) GC->>C3: HeartbeatResponse (OK) end

2.2 GroupCoordinator核心实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
/**
 * GroupCoordinatorService - 组协调器服务实现
 * 管理消费者组的完整生命周期
 */
public class GroupCoordinatorService implements GroupCoordinator {
    
    private final GroupCoordinatorConfig config;                    // 配置
    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime; // 运行时
    private final GroupCoordinatorMetrics groupCoordinatorMetrics;  // 监控指标
    private final Timer timer;                                      // 定时器
    
    // 分区分配器映射
    private final Set<String> consumerGroupAssignors;              // 支持的分配器列表
    
    /**
     * 处理消费者组心跳请求
     * 实现组成员管理和再平衡触发逻辑
     */
    @Override
    public CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
        AuthorizableRequestContext context,
        ConsumerGroupHeartbeatRequestData request) {
        
        String groupId = request.groupId();
        String memberId = request.memberId();
        int memberEpoch = request.memberEpoch();
        
        debug("处理消费者组心跳:groupId={}, memberId={}, memberEpoch={}", 
             groupId, memberId, memberEpoch);
        
        // 验证请求参数
        throwIfConsumerGroupHeartbeatRequestIsInvalid(request, context.apiVersion());
        
        // 路由到对应的协调器分片
        return runtime.scheduleWriteOperation(
            "consumer-group-heartbeat",
            topicPartitionFor(groupId),
            coordinator -> coordinator.consumerGroupHeartbeat(context, request)
        ).thenApply(result -> {
            // 处理调度的写操作结果
            ConsumerGroupHeartbeatResponseData response = result.response();
            
            // 如果有新的定时任务,添加到定时器
            result.records().forEach(record -> {
                if (record instanceof TimerRecord) {
                    TimerRecord timerRecord = (TimerRecord) record;
                    timer.add(timerRecord);
                }
            });
            
            return response;
        }).exceptionally(throwable -> {
            error("处理消费者组心跳时发生异常", throwable);
            
            // 构建错误响应
            return new ConsumerGroupHeartbeatResponseData()
                .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
                .setErrorMessage("服务器内部错误");
        });
    }
    
    /**
     * 处理组加入请求(传统协议)
     * 支持旧版本消费者的组加入流程
     */
    @Override
    public CompletableFuture<JoinGroupResponseData> joinGroup(
        AuthorizableRequestContext context,
        JoinGroupRequestData request,
        BufferSupplier bufferSupplier) {
        
        String groupId = request.groupId();
        String memberId = request.memberId();
        String protocolType = request.protocolType();
        
        info("处理组加入请求:groupId={}, memberId={}, protocolType={}", 
            groupId, memberId, protocolType);
        
        return runtime.scheduleWriteOperation(
            "join-group",
            topicPartitionFor(groupId),
            coordinator -> coordinator.classicGroupJoin(context, request, bufferSupplier)
        ).thenApply(result -> {
            JoinGroupResponseData response = result.response();
            
            // 记录组加入指标
            groupCoordinatorMetrics.recordGroupJoin();
            
            return response;
        });
    }
    
    /**
     * 处理同步组请求
     * Leader消费者提交分区分配方案
     */
    @Override
    public CompletableFuture<SyncGroupResponseData> syncGroup(
        AuthorizableRequestContext context,
        SyncGroupRequestData request,
        BufferSupplier bufferSupplier) {
        
        String groupId = request.groupId();
        String memberId = request.memberId();
        int generationId = request.generationId();
        
        debug("处理同步组请求:groupId={}, memberId={}, generationId={}", 
             groupId, memberId, generationId);
        
        return runtime.scheduleWriteOperation(
            "sync-group", 
            topicPartitionFor(groupId),
            coordinator -> coordinator.classicGroupSync(context, request, bufferSupplier)
        ).thenApply(result -> {
            SyncGroupResponseData response = result.response();
            
            // 如果同步成功,组进入Stable状态
            if (response.errorCode() == Errors.NONE.code()) {
                info("组 {} 同步完成,进入Stable状态", groupId);
                groupCoordinatorMetrics.recordGroupSync();
            }
            
            return response;
        });
    }
}

/**
 * GroupCoordinatorShard - 组协调器分片实现
 * 每个分片管理一部分组的状态和元数据
 */
public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord> {
    
    // 组状态存储
    private final TimelineHashMap<String, ClassicGroup> classicGroups;      // 传统消费者组
    private final TimelineHashMap<String, ConsumerGroup> consumerGroups;    // 现代消费者组
    private final TimelineHashMap<String, StreamsGroup> streamsGroups;      // 流组
    private final TimelineHashMap<String, ShareGroup> shareGroups;          // 共享组
    
    // 分区分配器
    private final Map<String, ConsumerGroupPartitionAssignor> assignors;
    
    // 配置和监控
    private final GroupCoordinatorConfig config;
    private final GroupCoordinatorMetrics metrics;
    private final SnapshotRegistry snapshotRegistry;                        // 快照注册表

    /**
     * 处理消费者组心跳
     * 现代消费者协议的心跳处理逻辑
     */
    public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(
        AuthorizableRequestContext context,
        ConsumerGroupHeartbeatRequestData request) throws ApiException {
        
        String groupId = request.groupId();
        String memberId = request.memberId();
        int memberEpoch = request.memberEpoch();
        
        // 获取或创建消费者组
        ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
            request.groupInstanceId() != null);
        
        List<CoordinatorRecord> records = new ArrayList<>();
        ConsumerGroupHeartbeatResponseData response;
        
        try {
            if (memberEpoch == 0) {
                // 新成员加入流程
                response = handleConsumerGroupMemberJoin(context, request, group, records);
            } else {
                // 现有成员心跳流程
                response = handleConsumerGroupMemberHeartbeat(context, request, group, records);
            }
            
            // 检查是否需要触发再平衡
            maybeScheduleConsumerGroupRebalance(group, records);
            
            return new CoordinatorResult<>(records, response);
            
        } catch (Exception e) {
            error("处理消费者组心跳时发生异常:groupId={}, memberId={}", groupId, memberId, e);
            
            ConsumerGroupHeartbeatResponseData errorResponse = new ConsumerGroupHeartbeatResponseData()
                .setErrorCode(Errors.forException(e).code())
                .setErrorMessage(e.getMessage());
            
            return new CoordinatorResult<>(Collections.emptyList(), errorResponse);
        }
    }
    
    /**
     * 处理新成员加入
     * 实现成员注册和初始分区分配
     */
    private ConsumerGroupHeartbeatResponseData handleConsumerGroupMemberJoin(
        AuthorizableRequestContext context,
        ConsumerGroupHeartbeatRequestData request,
        ConsumerGroup group,
        List<CoordinatorRecord> records) {
        
        String groupId = request.groupId();
        String memberId = request.memberId();
        
        // 生成新的成员ID(如果需要)
        if (memberId == null || memberId.isEmpty()) {
            memberId = generateMemberId(context);
        }
        
        // 创建新的组成员
        ConsumerGroupMember newMember = new ConsumerGroupMember.Builder()
            .setMemberId(memberId)
            .setGroupInstanceId(request.groupInstanceId())
            .setRackId(request.rackId())
            .setClientId(context.clientId())
            .setClientHost(context.clientAddress().toString())
            .setSubscribedTopicNames(request.subscribedTopicNames())
            .setServerAssignor(request.serverAssignor())
            .setRebalanceTimeoutMs(request.rebalanceTimeoutMs())
            .build();
        
        // 将成员添加到组中
        group.addMember(newMember);
        
        // 生成成员加入记录
        ConsumerGroupMemberMetadataValue memberRecord = new ConsumerGroupMemberMetadataValue()
            .setMemberId(memberId)
            .setGroupInstanceId(request.groupInstanceId())
            .setClientId(context.clientId())
            .setClientHost(context.clientAddress().toString())
            .setSubscribedTopicNames(request.subscribedTopicNames())
            .setAssignedPartitions(Collections.emptyList()); // 初始无分配
        
        records.add(new CoordinatorRecord(
            new ConsumerGroupMemberMetadataKey(groupId, memberId),
            memberRecord
        ));
        
        // 触发再平衡
        group.requestRebalance("新成员加入");
        
        // 构建响应
        return new ConsumerGroupHeartbeatResponseData()
            .setMemberId(memberId)
            .setMemberEpoch(1) // 新成员从epoch 1开始
            .setHeartbeatIntervalMs(config.consumerGroupHeartbeatIntervalMs())
            .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
                .setAssignedPartitions(Collections.emptyList()));
    }
    
    /**
     * 处理现有成员心跳
     * 维护成员活跃状态,处理分区分配变更
     */
    private ConsumerGroupHeartbeatResponseData handleConsumerGroupMemberHeartbeat(
        AuthorizableRequestContext context,
        ConsumerGroupHeartbeatRequestData request,
        ConsumerGroup group,
        List<CoordinatorRecord> records) {
        
        String groupId = request.groupId();
        String memberId = request.memberId();
        int memberEpoch = request.memberEpoch();
        
        // 获取现有成员
        ConsumerGroupMember member = group.member(memberId);
        if (member == null) {
            throw new UnknownMemberIdException("未知的成员ID: " + memberId);
        }
        
        // 验证成员纪元
        if (memberEpoch < member.memberEpoch()) {
            throw new FencedMemberEpochException("成员纪元已过期");
        } else if (memberEpoch > member.memberEpoch()) {
            throw new UnknownMemberIdException("成员纪元超前");
        }
        
        // 更新成员最后心跳时间
        member.updateLastHeartbeat(time.milliseconds());
        
        // 检查分区分配是否有变化
        List<TopicPartition> currentAssignment = member.assignedPartitions();
        List<TopicPartition> targetAssignment = group.targetAssignment(memberId);
        
        ConsumerGroupHeartbeatResponseData response = new ConsumerGroupHeartbeatResponseData()
            .setMemberId(memberId)
            .setMemberEpoch(member.memberEpoch())
            .setHeartbeatIntervalMs(config.consumerGroupHeartbeatIntervalMs());
        
        if (!currentAssignment.equals(targetAssignment)) {
            // 分区分配有变化,更新成员分配
            member.updateAssignment(targetAssignment);
            
            // 生成分区分配更新记录
            ConsumerGroupPartitionAssignmentValue assignmentRecord = 
                new ConsumerGroupPartitionAssignmentValue()
                    .setMemberId(memberId)
                    .setAssignedPartitions(targetAssignment.stream()
                        .map(tp -> new ConsumerGroupPartitionAssignmentValue.Partition()
                            .setTopicId(getTopicId(tp.topic()))
                            .setPartitionId(tp.partition()))
                        .collect(Collectors.toList()));
            
            records.add(new CoordinatorRecord(
                new ConsumerGroupPartitionAssignmentKey(groupId, memberId),
                assignmentRecord
            ));
            
            // 设置新的分区分配
            response.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
                .setAssignedPartitions(targetAssignment.stream()
                    .map(tp -> new ConsumerGroupHeartbeatResponseData.Assignment.TopicPartition()
                        .setTopicId(getTopicId(tp.topic()))
                        .setPartitions(Collections.singletonList(tp.partition())))
                    .collect(Collectors.toList())));
            
            info("更新成员 {} 的分区分配:{} -> {}", memberId, currentAssignment, targetAssignment);
        }
        
        return response;
    }
    
    /**
     * 执行分区分配计算
     * 使用配置的分区分配器计算新的分区分配方案
     */
    private Map<String, List<TopicPartition>> computePartitionAssignment(
        ConsumerGroup group, 
        String assignorName) {
        
        ConsumerGroupPartitionAssignor assignor = assignors.get(assignorName);
        if (assignor == null) {
            throw new UnsupportedAssignorException("不支持的分配器: " + assignorName);
        }
        
        // 准备分配输入
        Set<String> subscribedTopics = group.subscribedTopicNames();
        Map<String, ConsumerGroupMemberSubscription> memberSubscriptions = new HashMap<>();
        
        group.members().forEach((memberId, member) -> {
            memberSubscriptions.put(memberId, new ConsumerGroupMemberSubscription()
                .setTopics(member.subscribedTopicNames())
                .setRackId(member.rackId()));
        });
        
        // 获取主题元数据
        Map<String, TopicMetadata> topicMetadata = new HashMap<>();
        subscribedTopics.forEach(topic -> {
            TopicMetadata metadata = getTopicMetadata(topic);
            if (metadata != null) {
                topicMetadata.put(topic, metadata);
            }
        });
        
        // 执行分配计算
        debug("执行分区分配计算:assignor={}, members={}, topics={}", 
             assignorName, memberSubscriptions.keySet(), subscribedTopics);
        
        ConsumerGroupPartitionAssignorContext assignmentContext = 
            new ConsumerGroupPartitionAssignorContext(
                group.groupId(),
                memberSubscriptions,
                topicMetadata
            );
        
        Map<String, ConsumerGroupMemberAssignment> memberAssignments = 
            assignor.assign(assignmentContext);
        
        // 转换为TopicPartition列表
        Map<String, List<TopicPartition>> result = new HashMap<>();
        memberAssignments.forEach((memberId, assignment) -> {
            List<TopicPartition> partitions = assignment.partitions().stream()
                .map(tp -> new TopicPartition(tp.topicId(), tp.partitionId()))
                .collect(Collectors.toList());
            result.put(memberId, partitions);
        });
        
        info("分区分配计算完成:{}", result);
        return result;
    }
    
    /**
     * 检查并调度再平衡
     * 当组状态或成员订阅发生变化时触发再平衡
     */
    private void maybeScheduleConsumerGroupRebalance(ConsumerGroup group, 
                                                   List<CoordinatorRecord> records) {
        if (group.state() != ConsumerGroupState.STABLE) {
            return; // 组不在稳定状态,无需调度再平衡
        }
        
        boolean needsRebalance = false;
        String rebalanceReason = "";
        
        // 检查各种再平衡触发条件
        if (group.hasMetadataChanged()) {
            needsRebalance = true;
            rebalanceReason = "主题元数据变更";
        } else if (group.hasMemberSubscriptionChanged()) {
            needsRebalance = true;
            rebalanceReason = "成员订阅变更";
        } else if (group.hasInactiveMembers()) {
            needsRebalance = true;
            rebalanceReason = "检测到不活跃成员";
        }
        
        if (needsRebalance) {
            info("调度组 {} 的再平衡,原因: {}", group.groupId(), rebalanceReason);
            
            // 将组状态转换为PrepareRebalance
            group.transitionTo(ConsumerGroupState.PREPARING_REBALANCE);
            
            // 生成状态变更记录
            ConsumerGroupMetadataValue groupRecord = new ConsumerGroupMetadataValue()
                .setGroupId(group.groupId())
                .setGroupState(ConsumerGroupState.PREPARING_REBALANCE.toString())
                .setStateTimestamp(time.milliseconds());
            
            records.add(new CoordinatorRecord(
                new ConsumerGroupMetadataKey(group.groupId()),
                groupRecord
            ));
            
            // 调度再平衡超时任务
            scheduleRebalanceTimeout(group, records);
        }
    }
    
    /**
     * 调度再平衡超时任务
     * 如果在指定时间内没有完成再平衡,强制完成
     */
    private void scheduleRebalanceTimeout(ConsumerGroup group, 
                                        List<CoordinatorRecord> records) {
        long timeoutMs = config.consumerGroupSessionTimeoutMs();
        long deadlineMs = time.milliseconds() + timeoutMs;
        
        // 创建定时任务记录
        ConsumerGroupRebalanceTimeoutKey timerKey = 
            new ConsumerGroupRebalanceTimeoutKey(group.groupId());
        
        TimerRecord timerRecord = new TimerRecord(
            timerKey,
            deadlineMs,
            "consumer-group-rebalance-timeout"
        );
        
        records.add(timerRecord);
        
        debug("调度组 {} 的再平衡超时任务,截止时间: {}", group.groupId(), new Date(deadlineMs));
    }
}

3. TransactionCoordinator事务协调器

3.1 事务协调机制架构图

graph TB subgraph "Kafka事务协调器架构" subgraph "事务协调器层 Transaction Coordinator Layer" TC[TransactionCoordinator 事务协调器] TSM[TransactionStateManager 事务状态管理器] TM[TransactionMetadata 事务元数据] TS[TransactionState 事务状态机] end subgraph "Producer端 Producer Side" P[TransactionalProducer 事务生产者] TID[TransactionalId 事务ID] PID[ProducerId 生产者ID] PE[ProducerEpoch 生产者纪元] end subgraph "Consumer端 Consumer Side" C[TransactionalConsumer 事务消费者] ISO[IsolationLevel 隔离级别] LSO[LastStableOffset 最后稳定偏移量] end subgraph "存储层 Storage Layer" TT[__transaction_state 事务状态主题] TL[Transaction Log 事务日志] CM[Control Messages 控制消息] end %% 事务流程 P --> TID P --> PID P --> PE P --> TC TC --> TSM TSM --> TM TM --> TS TC --> TT TT --> TL TL --> CM C --> ISO C --> LSO LSO --> CM end style TC fill:#e1f5fe style TSM fill:#e8f5e8 style TS fill:#f3e5f5 style CM fill:#fff3e0

3.2 事务状态机实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
/**
 * TransactionStateManager - 事务状态管理器
 * 管理事务的完整生命周期和状态转换
 */
public class TransactionStateManager {
    
    // 事务状态枚举
    public enum TransactionState {
        EMPTY,                    // 空状态,事务尚未开始
        ONGOING,                  // 进行中,事务已开始但未结束
        PREPARE_COMMIT,           // 准备提交,开始两阶段提交的第一阶段
        PREPARE_ABORT,            // 准备中止,开始两阶段中止的第一阶段
        COMPLETE_COMMIT,          // 完成提交,事务成功提交
        COMPLETE_ABORT,           // 完成中止,事务已中止
        DEAD                      // 死亡状态,事务超时或出错
    }
    
    // 事务元数据存储
    private final ConcurrentHashMap<String, TransactionMetadata> transactionMetadataMap;
    
    // 事务状态主题分区映射
    private final int transactionTopicPartitionCount;
    private final short transactionTopicReplicationFactor;
    
    /**
     * 处理InitProducerId请求
     * 为生产者分配或更新ProducerId和ProducerEpoch
     */
    public InitProducerIdResult handleInitProducerId(String transactionalId,
                                                    int transactionTimeoutMs,
                                                    Optional<ProducerIdAndEpoch> expectedProducerIdAndEpoch) {
        
        if (transactionalId == null || transactionalId.isEmpty()) {
            // 非事务生产者,直接分配新的ProducerId
            ProducerIdAndEpoch newProducerIdAndEpoch = producerIdManager.generateProducerId();
            
            return new InitProducerIdResult(
                newProducerIdAndEpoch.producerId,
                newProducerIdAndEpoch.epoch,
                Errors.NONE
            );
        }
        
        // 事务生产者处理
        return transactionMetadataMap.compute(transactionalId, (txnId, existingMetadata) -> {
            if (existingMetadata == null) {
                // 新的事务ID,创建元数据
                ProducerIdAndEpoch newProducerIdAndEpoch = producerIdManager.generateProducerId();
                
                TransactionMetadata newMetadata = new TransactionMetadata(
                    transactionalId = txnId,
                    producerId = newProducerIdAndEpoch.producerId,
                    producerEpoch = newProducerIdAndEpoch.epoch,
                    transactionTimeoutMs = transactionTimeoutMs,
                    state = TransactionState.EMPTY,
                    partitions = new HashSet<>(),
                    txnStartTimestamp = time.milliseconds(),
                    txnLastUpdateTimestamp = time.milliseconds()
                );
                
                // 持久化事务元数据
                persistTransactionMetadata(txnId, newMetadata);
                
                info("为新事务ID {} 分配ProducerId: {}, Epoch: {}", 
                    txnId, newProducerIdAndEpoch.producerId, newProducerIdAndEpoch.epoch);
                
                return new InitProducerIdResult(
                    newProducerIdAndEpoch.producerId,
                    newProducerIdAndEpoch.epoch,
                    Errors.NONE
                );
                
            } else {
                // 现有事务ID,检查状态
                if (existingMetadata.state == TransactionState.PREPARE_COMMIT ||
                    existingMetadata.state == TransactionState.PREPARE_ABORT) {
                    // 事务正在进行两阶段提交/中止,返回错误
                    return new InitProducerIdResult(
                        RecordBatch.NO_PRODUCER_ID,
                        RecordBatch.NO_PRODUCER_EPOCH,
                        Errors.CONCURRENT_TRANSACTIONS
                    );
                }
                
                // 检查Producer纪元
                if (expectedProducerIdAndEpoch.isPresent()) {
                    ProducerIdAndEpoch expected = expectedProducerIdAndEpoch.get();
                    if (expected.producerId != existingMetadata.producerId ||
                        expected.epoch != existingMetadata.producerEpoch) {
                        // Producer信息不匹配,可能是网络分区后的重试
                        return new InitProducerIdResult(
                            RecordBatch.NO_PRODUCER_ID,
                            RecordBatch.NO_PRODUCER_EPOCH,
                            Errors.PRODUCER_FENCED
                        );
                    }
                }
                
                // 更新或递增Producer纪元
                short newEpoch = (short) (existingMetadata.producerEpoch + 1);
                if (newEpoch > Short.MAX_VALUE) {
                    // 纪元溢出,分配新的ProducerId
                    ProducerIdAndEpoch newProducerIdAndEpoch = producerIdManager.generateProducerId();
                    newEpoch = newProducerIdAndEpoch.epoch;
                    existingMetadata.producerId = newProducerIdAndEpoch.producerId;
                }
                
                existingMetadata.producerEpoch = newEpoch;
                existingMetadata.transactionTimeoutMs = transactionTimeoutMs;
                existingMetadata.state = TransactionState.EMPTY;
                existingMetadata.partitions.clear();
                existingMetadata.txnStartTimestamp = time.milliseconds();
                existingMetadata.txnLastUpdateTimestamp = time.milliseconds();
                
                // 持久化更新的元数据
                persistTransactionMetadata(txnId, existingMetadata);
                
                info("更新事务ID {} 的ProducerEpoch: {} -> {}", 
                    txnId, existingMetadata.producerEpoch - 1, newEpoch);
                
                return new InitProducerIdResult(
                    existingMetadata.producerId,
                    newEpoch,
                    Errors.NONE
                );
            }
        });
    }
    
    /**
     * 处理AddPartitionsToTxn请求
     * 将分区添加到活跃事务中
     */
    public AddPartitionsToTxnResult handleAddPartitionsToTxn(String transactionalId,
                                                            long producerId,
                                                            short producerEpoch,
                                                            Set<TopicPartition> partitions) {
        
        TransactionMetadata metadata = transactionMetadataMap.get(transactionalId);
        if (metadata == null) {
            return new AddPartitionsToTxnResult(Errors.UNKNOWN_PRODUCER_ID);
        }
        
        // 验证Producer信息
        if (metadata.producerId != producerId || metadata.producerEpoch != producerEpoch) {
            return new AddPartitionsToTxnResult(Errors.PRODUCER_FENCED);
        }
        
        synchronized (metadata) {
            // 检查事务状态
            if (metadata.state != TransactionState.EMPTY && 
                metadata.state != TransactionState.ONGOING) {
                return new AddPartitionsToTxnResult(Errors.INVALID_TXN_STATE);
            }
            
            // 转换到ONGOING状态(如果还不是)
            if (metadata.state == TransactionState.EMPTY) {
                metadata.transitionTo(TransactionState.ONGOING);
                metadata.txnStartTimestamp = time.milliseconds();
            }
            
            // 添加分区到事务
            Set<TopicPartition> newPartitions = new HashSet<>(partitions);
            newPartitions.removeAll(metadata.partitions); // 移除已存在的分区
            
            if (!newPartitions.isEmpty()) {
                metadata.partitions.addAll(newPartitions);
                metadata.txnLastUpdateTimestamp = time.milliseconds();
                
                // 持久化事务元数据
                persistTransactionMetadata(transactionalId, metadata);
                
                info("向事务 {} 添加分区:{}", transactionalId, newPartitions);
            }
            
            return new AddPartitionsToTxnResult(Errors.NONE);
        }
    }
    
    /**
     * 处理EndTxn请求
     * 实现两阶段提交协议完成事务
     */
    public EndTxnResult handleEndTxn(String transactionalId,
                                   long producerId,
                                   short producerEpoch,
                                   TransactionResult result) {
        
        TransactionMetadata metadata = transactionMetadataMap.get(transactionalId);
        if (metadata == null) {
            return new EndTxnResult(Errors.UNKNOWN_PRODUCER_ID);
        }
        
        // 验证Producer信息
        if (metadata.producerId != producerId || metadata.producerEpoch != producerEpoch) {
            return new EndTxnResult(Errors.PRODUCER_FENCED);
        }
        
        synchronized (metadata) {
            // 检查事务状态
            if (metadata.state != TransactionState.ONGOING) {
                return new EndTxnResult(Errors.INVALID_TXN_STATE);
            }
            
            try {
                if (result == TransactionResult.COMMIT) {
                    // 开始两阶段提交
                    return handleCommitTransaction(metadata);
                } else {
                    // 开始两阶段中止
                    return handleAbortTransaction(metadata);
                }
            } catch (Exception e) {
                error("处理事务结束时发生异常:transactionalId={}", transactionalId, e);
                return new EndTxnResult(Errors.UNKNOWN_SERVER_ERROR);
            }
        }
    }
    
    /**
     * 处理事务提交
     * 实现两阶段提交协议的提交阶段
     */
    private EndTxnResult handleCommitTransaction(TransactionMetadata metadata) {
        String transactionalId = metadata.transactionalId;
        
        info("开始提交事务:{}, 涉及分区:{}", transactionalId, metadata.partitions);
        
        // 第一阶段:准备提交
        metadata.transitionTo(TransactionState.PREPARE_COMMIT);
        metadata.txnLastUpdateTimestamp = time.milliseconds();
        
        // 持久化准备提交状态
        persistTransactionMetadata(transactionalId, metadata);
        
        // 第二阶段:写入控制消息到各个分区
        List<CompletableFuture<Void>> writeFutures = new ArrayList<>();
        
        for (TopicPartition partition : metadata.partitions) {
            CompletableFuture<Void> future = writeTransactionControlMessage(
                partition, 
                metadata.producerId, 
                metadata.producerEpoch,
                ControlRecordType.COMMIT
            );
            writeFutures.add(future);
        }
        
        // 等待所有控制消息写入完成
        CompletableFuture<Void> allWrites = CompletableFuture.allOf(
            writeFutures.toArray(new CompletableFuture[0]));
        
        allWrites.whenComplete((result, exception) -> {
            synchronized (metadata) {
                if (exception == null) {
                    // 所有分区写入成功,完成提交
                    metadata.transitionTo(TransactionState.COMPLETE_COMMIT);
                    info("事务 {} 提交成功", transactionalId);
                } else {
                    // 写入失败,事务进入错误状态
                    error("事务 {} 提交失败", transactionalId, exception);
                    metadata.transitionTo(TransactionState.DEAD);
                }
                
                metadata.txnLastUpdateTimestamp = time.milliseconds();
                persistTransactionMetadata(transactionalId, metadata);
            }
        });
        
        return new EndTxnResult(Errors.NONE);
    }
    
    /**
     * 写入事务控制消息
     * 向指定分区写入COMMIT或ABORT控制消息
     */
    private CompletableFuture<Void> writeTransactionControlMessage(TopicPartition partition,
                                                                  long producerId,
                                                                  short producerEpoch,
                                                                  ControlRecordType controlType) {
        
        return CompletableFuture.runAsync(() -> {
            try {
                // 构建控制消息
                ByteBuffer controlMessageKey = ControlRecordType.serialize(controlType);
                ByteBuffer controlMessageValue = ByteBuffer.allocate(0); // 控制消息值为空
                
                MemoryRecords controlRecords = MemoryRecords.withControlRecords(
                    producerId,
                    producerEpoch,
                    controlMessageKey,
                    controlMessageValue
                );
                
                // 写入到分区
                replicaManager.appendRecords(
                    timeout = config.requestTimeoutMs(),
                    requiredAcks = -1, // 需要所有ISR确认
                    internalTopicsAllowed = true,
                    origin = AppendOrigin.Coordinator,
                    entriesPerPartition = Map.of(partition -> controlRecords),
                    responseCallback = (responses) -> {
                        PartitionResponse response = responses.get(partition);
                        if (response.error != Errors.NONE) {
                            throw new RuntimeException("写入控制消息失败: " + response.error);
                        }
                        
                        debug("成功写入 {} 控制消息到分区 {}", controlType, partition);
                    }
                );
                
            } catch (Exception e) {
                error("写入控制消息失败:partition={}, controlType={}", partition, controlType, e);
                throw new RuntimeException(e);
            }
        }, transactionExecutor);
    }
    
    /**
     * 事务超时处理
     * 定期检查并处理超时的事务
     */
    public void handleTransactionTimeout() {
        long currentTimeMs = time.milliseconds();
        
        List<String> expiredTransactions = new ArrayList<>();
        
        transactionMetadataMap.forEach((transactionalId, metadata) -> {
            synchronized (metadata) {
                long transactionDurationMs = currentTimeMs - metadata.txnStartTimestamp;
                
                if (transactionDurationMs > metadata.transactionTimeoutMs) {
                    expiredTransactions.add(transactionalId);
                    
                    warn("事务 {} 超时,持续时间: {} ms,超时阈值: {} ms", 
                        transactionalId, transactionDurationMs, metadata.transactionTimeoutMs);
                }
            }
        });
        
        // 中止过期的事务
        for (String transactionalId : expiredTransactions) {
            try {
                forceAbortTransaction(transactionalId, "事务超时");
            } catch (Exception e) {
                error("强制中止超时事务失败:{}", transactionalId, e);
            }
        }
        
        if (!expiredTransactions.isEmpty()) {
            info("处理了 {} 个超时事务", expiredTransactions.size());
        }
    }
    
    /**
     * 强制中止事务
     * 由于超时或其他异常情况强制中止事务
     */
    private void forceAbortTransaction(String transactionalId, String reason) {
        TransactionMetadata metadata = transactionMetadataMap.get(transactionalId);
        if (metadata == null) {
            return;
        }
        
        synchronized (metadata) {
            info("强制中止事务 {},原因: {}", transactionalId, reason);
            
            // 转换到准备中止状态
            metadata.transitionTo(TransactionState.PREPARE_ABORT);
            metadata.txnLastUpdateTimestamp = time.milliseconds();
            
            // 持久化状态变更
            persistTransactionMetadata(transactionalId, metadata);
            
            // 写入中止控制消息到所有分区
            List<CompletableFuture<Void>> abortFutures = new ArrayList<>();
            
            for (TopicPartition partition : metadata.partitions) {
                CompletableFuture<Void> future = writeTransactionControlMessage(
                    partition,
                    metadata.producerId,
                    metadata.producerEpoch,
                    ControlRecordType.ABORT
                );
                abortFutures.add(future);
            }
            
            // 等待所有中止消息写入完成
            CompletableFuture.allOf(abortFutures.toArray(new CompletableFuture[0]))
                .whenComplete((result, exception) -> {
                    synchronized (metadata) {
                        metadata.transitionTo(TransactionState.COMPLETE_ABORT);
                        metadata.txnLastUpdateTimestamp = time.milliseconds();
                        persistTransactionMetadata(transactionalId, metadata);
                        
                        info("事务 {} 强制中止完成", transactionalId);
                    }
                });
        }
    }
    
    /**
     * 持久化事务元数据
     * 将事务状态写入__transaction_state主题
     */
    private void persistTransactionMetadata(String transactionalId, TransactionMetadata metadata) {
        try {
            // 序列化事务元数据
            ByteBuffer key = TransactionMetadataKey.serialize(transactionalId);
            ByteBuffer value = TransactionMetadataValue.serialize(metadata);
            
            MemoryRecords record = MemoryRecords.withRecords(
                CompressionType.NONE,
                new SimpleRecord(key.array(), value.array())
            );
            
            // 计算事务状态主题的分区
            int partition = Utils.abs(transactionalId.hashCode()) % transactionTopicPartitionCount;
            TopicPartition txnTopicPartition = new TopicPartition(
                Topic.TRANSACTION_STATE_TOPIC_NAME, partition);
            
            // 写入事务状态主题
            replicaManager.appendRecords(
                timeout = config.requestTimeoutMs(),
                requiredAcks = -1, // 需要所有ISR确认
                internalTopicsAllowed = true,
                origin = AppendOrigin.Coordinator,
                entriesPerPartition = Map.of(txnTopicPartition -> record),
                responseCallback = (responses) -> {
                    PartitionResponse response = responses.get(txnTopicPartition);
                    if (response.error != Errors.NONE) {
                        throw new RuntimeException("持久化事务元数据失败: " + response.error);
                    }
                    
                    debug("成功持久化事务元数据:transactionalId={}", transactionalId);
                }
            );
            
        } catch (Exception e) {
            error("持久化事务元数据失败:transactionalId={}", transactionalId, e);
            throw new RuntimeException(e);
        }
    }
}

/**
 * TransactionMetadata - 事务元数据
 * 维护单个事务的完整状态信息
 */
public class TransactionMetadata {
    
    public final String transactionalId;           // 事务ID
    public volatile long producerId;               // 生产者ID
    public volatile short producerEpoch;           // 生产者纪元
    public volatile int transactionTimeoutMs;      // 事务超时时间
    public volatile TransactionState state;        // 事务状态
    public final Set<TopicPartition> partitions;   // 参与事务的分区
    public volatile long txnStartTimestamp;        // 事务开始时间
    public volatile long txnLastUpdateTimestamp;   // 最后更新时间
    
    // 状态转换锁
    private final ReentrantLock stateLock = new ReentrantLock();
    
    /**
     * 事务状态转换
     * 确保状态转换的合法性和一致性
     */
    public void transitionTo(TransactionState newState) {
        stateLock.lock();
        try {
            TransactionState currentState = this.state;
            
            // 验证状态转换的合法性
            if (!isValidStateTransition(currentState, newState)) {
                throw new IllegalStateException(
                    String.format("非法的事务状态转换:%s -> %s", currentState, newState));
            }
            
            debug("事务 {} 状态转换:{} -> {}", transactionalId, currentState, newState);
            
            this.state = newState;
            this.txnLastUpdateTimestamp = System.currentTimeMillis();
            
        } finally {
            stateLock.unlock();
        }
    }
    
    /**
     * 验证状态转换的合法性
     */
    private boolean isValidStateTransition(TransactionState from, TransactionState to) {
        switch (from) {
            case EMPTY:
                return to == TransactionState.ONGOING;
            case ONGOING:
                return to == TransactionState.PREPARE_COMMIT || 
                       to == TransactionState.PREPARE_ABORT ||
                       to == TransactionState.DEAD;
            case PREPARE_COMMIT:
                return to == TransactionState.COMPLETE_COMMIT ||
                       to == TransactionState.DEAD;
            case PREPARE_ABORT:
                return to == TransactionState.COMPLETE_ABORT ||
                       to == TransactionState.DEAD;
            case COMPLETE_COMMIT:
            case COMPLETE_ABORT:
                return to == TransactionState.EMPTY ||
                       to == TransactionState.DEAD;
            case DEAD:
                return to == TransactionState.EMPTY;
            default:
                return false;
        }
    }
    
    /**
     * 检查事务是否可以提交
     */
    public boolean canCommit() {
        stateLock.lock();
        try {
            return state == TransactionState.ONGOING && !partitions.isEmpty();
        } finally {
            stateLock.unlock();
        }
    }
    
    /**
     * 检查事务是否已超时
     */
    public boolean isExpired(long currentTimeMs) {
        return currentTimeMs - txnStartTimestamp > transactionTimeoutMs;
    }
}

4. 分区分配器深度实现

4.1 分区分配算法对比

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
/**
 * 分区分配器实现对比分析
 * 展示不同分配算法的特点和适用场景
 */
public class PartitionAssignors {
    
    /**
     * RangeAssignor - 范围分配器
     * 按主题分别进行范围分配,可能导致分配不均
     */
    public static class RangeAssignor implements ConsumerGroupPartitionAssignor {
        
        @Override
        public String name() {
            return "range";
        }
        
        @Override
        public Map<String, ConsumerGroupMemberAssignment> assign(
            ConsumerGroupPartitionAssignorContext context) {
            
            Map<String, List<String>> topicToMembers = new HashMap<>();
            Map<String, ConsumerGroupMemberAssignment> assignments = new HashMap<>();
            
            // 按主题分组成员
            context.memberSubscriptions().forEach((memberId, subscription) -> {
                subscription.topics().forEach(topic -> {
                    topicToMembers.computeIfAbsent(topic, k -> new ArrayList<>()).add(memberId);
                });
                
                // 初始化分配结果
                assignments.put(memberId, new ConsumerGroupMemberAssignment()
                    .setPartitions(new ArrayList<>()));
            });
            
            // 为每个主题执行范围分配
            topicToMembers.forEach((topic, members) -> {
                // 获取主题的分区信息
                TopicMetadata topicMetadata = context.topicMetadata().get(topic);
                if (topicMetadata == null) {
                    return;
                }
                
                int partitionCount = topicMetadata.numPartitions();
                int memberCount = members.size();
                
                // 排序成员以保证一致性
                List<String> sortedMembers = members.stream().sorted().collect(Collectors.toList());
                
                // 计算每个成员应分配的分区数
                int partitionsPerMember = partitionCount / memberCount;
                int extraPartitions = partitionCount % memberCount;
                
                debug("主题 {} 范围分配:{} 个分区分配给 {} 个成员", 
                     topic, partitionCount, memberCount);
                
                int currentPartition = 0;
                for (int i = 0; i < sortedMembers.size(); i++) {
                    String memberId = sortedMembers.get(i);
                    
                    // 前几个成员多分配一个分区(如果有余数)
                    int numPartitionsForMember = partitionsPerMember + (i < extraPartitions ? 1 : 0);
                    
                    // 分配连续的分区范围
                    for (int j = 0; j < numPartitionsForMember; j++) {
                        assignments.get(memberId).partitions().add(
                            new ConsumerGroupMemberAssignment.TopicPartition()
                                .setTopicId(topicMetadata.topicId())
                                .setPartitions(Collections.singletonList(currentPartition))
                        );
                        currentPartition++;
                    }
                    
                    debug("成员 {} 分配到主题 {} 的分区 {} - {}", 
                         memberId, topic, 
                         currentPartition - numPartitionsForMember, 
                         currentPartition - 1);
                }
            });
            
            return assignments;
        }
    }
    
    /**
     * StickyAssignor - 粘性分配器
     * 在再平衡时尽量保持现有分配,减少数据迁移
     */
    public static class StickyAssignor implements ConsumerGroupPartitionAssignor {
        
        @Override
        public String name() {
            return "sticky";
        }
        
        @Override
        public Map<String, ConsumerGroupMemberAssignment> assign(
            ConsumerGroupPartitionAssignorContext context) {
            
            Map<String, List<TopicPartition>> currentAssignments = getCurrentAssignments(context);
            Map<String, Set<String>> memberSubscriptions = getMemberSubscriptions(context);
            Set<TopicPartition> allPartitions = getAllPartitions(context);
            
            debug("粘性分配开始:当前分配={}, 成员订阅={}, 总分区={}", 
                 currentAssignments.size(), memberSubscriptions.size(), allPartitions.size());
            
            // 第一步:保留有效的现有分配
            Map<String, List<TopicPartition>> newAssignments = new HashMap<>();
            Set<TopicPartition> unassignedPartitions = new HashSet<>(allPartitions);
            
            currentAssignments.forEach((memberId, partitions) -> {
                if (memberSubscriptions.containsKey(memberId)) {
                    List<TopicPartition> validPartitions = partitions.stream()
                        .filter(tp -> memberSubscriptions.get(memberId).contains(tp.topic()))
                        .collect(Collectors.toList());
                    
                    newAssignments.put(memberId, validPartitions);
                    unassignedPartitions.removeAll(validPartitions);
                    
                    debug("成员 {} 保留 {} 个现有分区分配", memberId, validPartitions.size());
                }
            });
            
            // 第二步:分配未分配的分区
            List<String> sortedMembers = memberSubscriptions.keySet().stream()
                .sorted().collect(Collectors.toList());
            
            for (TopicPartition partition : unassignedPartitions) {
                // 找到订阅了该主题且分区数最少的成员
                String targetMember = sortedMembers.stream()
                    .filter(memberId -> memberSubscriptions.get(memberId).contains(partition.topic()))
                    .min(Comparator.comparingInt(memberId -> 
                        newAssignments.getOrDefault(memberId, Collections.emptyList()).size()))
                    .orElse(null);
                
                if (targetMember != null) {
                    newAssignments.computeIfAbsent(targetMember, k -> new ArrayList<>())
                        .add(partition);
                    
                    debug("将未分配分区 {} 分配给成员 {}", partition, targetMember);
                }
            }
            
            // 第三步:平衡分配(如果需要)
            balanceAssignments(newAssignments, memberSubscriptions, context);
            
            // 转换为协调器期望的格式
            return convertToMemberAssignments(newAssignments, context);
        }
        
        /**
         * 平衡分区分配
         * 确保分配尽可能均匀
         */
        private void balanceAssignments(Map<String, List<TopicPartition>> assignments,
                                      Map<String, Set<String>> subscriptions,
                                      ConsumerGroupPartitionAssignorContext context) {
            
            boolean changed = true;
            int iterations = 0;
            int maxIterations = assignments.size() * 10; // 防止无限循环
            
            while (changed && iterations < maxIterations) {
                changed = false;
                iterations++;
                
                // 计算理想的平均分配数
                int totalPartitions = assignments.values().stream()
                    .mapToInt(List::size).sum();
                double avgPartitionsPerMember = (double) totalPartitions / assignments.size();
                
                // 找出分配过多和过少的成员
                List<String> overloadedMembers = new ArrayList<>();
                List<String> underloadedMembers = new ArrayList<>();
                
                assignments.forEach((memberId, partitions) -> {
                    int memberPartitionCount = partitions.size();
                    if (memberPartitionCount > Math.ceil(avgPartitionsPerMember)) {
                        overloadedMembers.add(memberId);
                    } else if (memberPartitionCount < Math.floor(avgPartitionsPerMember)) {
                        underloadedMembers.add(memberId);
                    }
                });
                
                // 从过载成员向不足成员转移分区
                for (String overloadedMember : overloadedMembers) {
                    if (underloadedMembers.isEmpty()) {
                        break;
                    }
                    
                    List<TopicPartition> partitions = assignments.get(overloadedMember);
                    if (partitions.size() <= Math.ceil(avgPartitionsPerMember)) {
                        continue;
                    }
                    
                    // 选择一个可转移的分区
                    TopicPartition partitionToTransfer = partitions.get(partitions.size() - 1);
                    
                    // 找到可以接收该分区的不足成员
                    String targetMember = underloadedMembers.stream()
                        .filter(memberId -> subscriptions.get(memberId).contains(partitionToTransfer.topic()))
                        .findFirst()
                        .orElse(null);
                    
                    if (targetMember != null) {
                        // 执行转移
                        partitions.remove(partitionToTransfer);
                        assignments.get(targetMember).add(partitionToTransfer);
                        
                        changed = true;
                        
                        debug("将分区 {} 从成员 {} 转移到成员 {}", 
                             partitionToTransfer, overloadedMember, targetMember);
                        
                        // 检查目标成员是否仍需要更多分区
                        if (assignments.get(targetMember).size() >= Math.floor(avgPartitionsPerMember)) {
                            underloadedMembers.remove(targetMember);
                        }
                    }
                }
            }
            
            debug("分区平衡完成,执行了 {} 次迭代", iterations);
        }
    }
    
    /**
     * CooperativeStickyAssignor - 协作粘性分配器
     * 支持增量再平衡,减少分区迁移的影响
     */
    public static class CooperativeStickyAssignor extends StickyAssignor {
        
        @Override
        public String name() {
            return "cooperative-sticky";
        }
        
        /**
         * 执行协作再平衡
         * 分多轮进行,每轮只迁移部分分区
         */
        @Override
        public Map<String, ConsumerGroupMemberAssignment> assign(
            ConsumerGroupPartitionAssignorContext context) {
            
            Map<String, List<TopicPartition>> currentAssignments = getCurrentAssignments(context);
            Map<String, List<TopicPartition>> idealAssignments = super.assign(context);
            
            // 计算需要迁移的分区
            Map<String, Set<TopicPartition>> partitionsToRevoke = new HashMap<>();
            Map<String, Set<TopicPartition>> partitionsToAssign = new HashMap<>();
            
            // 分析分配变化
            currentAssignments.forEach((memberId, currentPartitions) -> {
                List<TopicPartition> idealPartitions = idealAssignments.getOrDefault(memberId, Collections.emptyList());
                
                // 需要撤销的分区
                Set<TopicPartition> toRevoke = new HashSet<>(currentPartitions);
                toRevoke.removeAll(idealPartitions);
                if (!toRevoke.isEmpty()) {
                    partitionsToRevoke.put(memberId, toRevoke);
                }
                
                // 需要新分配的分区
                Set<TopicPartition> toAssign = new HashSet<>(idealPartitions);
                toAssign.removeAll(currentPartitions);
                if (!toAssign.isEmpty()) {
                    partitionsToAssign.put(memberId, toAssign);
                }
            });
            
            // 如果变化较大,分批次进行
            int totalChanges = partitionsToRevoke.values().stream().mapToInt(Set::size).sum() +
                              partitionsToAssign.values().stream().mapToInt(Set::size).sum();
            
            if (totalChanges > context.memberSubscriptions().size()) {
                // 变化较大,采用渐进式再平衡
                return performIncrementalRebalance(currentAssignments, idealAssignments, context);
            } else {
                // 变化较小,直接应用新分配
                return convertToMemberAssignments(idealAssignments, context);
            }
        }
        
        /**
         * 执行渐进式再平衡
         * 将大的分配变更分解为多个小步骤
         */
        private Map<String, ConsumerGroupMemberAssignment> performIncrementalRebalance(
            Map<String, List<TopicPartition>> current,
            Map<String, List<TopicPartition>> target,
            ConsumerGroupPartitionAssignorContext context) {
            
            Map<String, List<TopicPartition>> incrementalAssignments = new HashMap<>(current);
            
            // 计算本轮可以迁移的分区数(限制为总变更的1/3)
            int maxChangesThisRound = Math.max(1, 
                (int) (context.memberSubscriptions().size() * 0.33));
            
            int changesApplied = 0;
            
            // 优先处理分区撤销(为新分配腾出空间)
            for (Map.Entry<String, List<TopicPartition>> entry : target.entrySet()) {
                String memberId = entry.getKey();
                List<TopicPartition> targetPartitions = entry.getValue();
                List<TopicPartition> currentPartitions = incrementalAssignments.getOrDefault(
                    memberId, Collections.emptyList());
                
                // 计算需要撤销的分区
                List<TopicPartition> toRevoke = currentPartitions.stream()
                    .filter(tp -> !targetPartitions.contains(tp))
                    .limit(maxChangesThisRound - changesApplied)
                    .collect(Collectors.toList());
                
                if (!toRevoke.isEmpty()) {
                    List<TopicPartition> newAssignment = new ArrayList<>(currentPartitions);
                    newAssignment.removeAll(toRevoke);
                    incrementalAssignments.put(memberId, newAssignment);
                    
                    changesApplied += toRevoke.size();
                    
                    info("渐进式再平衡:从成员 {} 撤销分区 {}", memberId, toRevoke);
                    
                    if (changesApplied >= maxChangesThisRound) {
                        break;
                    }
                }
            }
            
            info("渐进式再平衡本轮完成,应用了 {} 个变更", changesApplied);
            
            return convertToMemberAssignments(incrementalAssignments, context);
        }
    }
}

5. 监控与故障恢复

5.1 协调器监控指标

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
/**
 * GroupCoordinatorMetrics - 组协调器监控指标
 * 提供详细的组管理和再平衡性能监控
 */
public class GroupCoordinatorMetrics extends KafkaMetricsGroup {
    
    // 组管理指标
    private final Sensor groupCreationRate;           // 组创建速率
    private final Sensor groupDeletionRate;           // 组删除速率
    private final Sensor activeGroupCount;            // 活跃组数量
    private final Sensor memberJoinRate;              // 成员加入速率
    private final Sensor memberLeaveRate;             // 成员离开速率
    
    // 再平衡指标
    private final Sensor rebalanceRate;               // 再平衡速率
    private final Histogram rebalanceDuration;        // 再平衡持续时间
    private final Sensor rebalanceFailureRate;        // 再平衡失败率
    
    // 心跳指标
    private final Sensor heartbeatRate;               // 心跳速率
    private final Histogram heartbeatLatency;         // 心跳延迟
    
    // 分区分配指标
    private final Histogram assignmentComputeTime;    // 分配计算时间
    private final Sensor assignmentChangeRate;        // 分配变更率

    public GroupCoordinatorMetrics(Metrics metrics) {
        String groupName = "group-coordinator-metrics";
        
        // 初始化组管理指标
        MetricName creationRateMetricName = metrics.metricName("group-creation-rate", groupName, 
            "每秒创建的消费者组数");
        groupCreationRate = metrics.sensor("group-creation-rate");
        groupCreationRate.add(creationRateMetricName, new Rate());
        
        MetricName activeGroupCountMetricName = metrics.metricName("active-group-count", groupName, 
            "当前活跃的消费者组数量");
        activeGroupCount = metrics.sensor("active-group-count");
        activeGroupCount.add(activeGroupCountMetricName, new Value());
        
        // 初始化再平衡指标
        MetricName rebalanceRateMetricName = metrics.metricName("rebalance-rate", groupName, 
            "每秒再平衡次数");
        rebalanceRate = metrics.sensor("rebalance-rate");
        rebalanceRate.add(rebalanceRateMetricName, new Rate());
        
        MetricName rebalanceDurationMetricName = metrics.metricName("rebalance-duration-avg", groupName, 
            "再平衡平均持续时间(毫秒)");
        rebalanceDuration = metrics.histogram("rebalance-duration");
        rebalanceDuration.add(rebalanceDurationMetricName, new Avg());
    }
    
    /**
     * 记录再平衡事件
     */
    public void recordRebalance(long durationMs, boolean success) {
        rebalanceRate.record();
        rebalanceDuration.record(durationMs);
        
        if (!success) {
            rebalanceFailureRate.record();
        }
    }
    
    /**
     * 记录组创建事件
     */
    public void recordGroupCreation() {
        groupCreationRate.record();
    }
    
    /**
     * 更新活跃组数量
     */
    public void updateActiveGroupCount(int count) {
        activeGroupCount.record(count);
    }
}

/**
 * 协调器故障恢复机制
 */
public class CoordinatorFailureRecovery {
    
    /**
     * 协调器故障转移处理
     * 当协调器所在Broker失效时的处理逻辑
     */
    public static class CoordinatorFailover {
        
        /**
         * 处理协调器迁移
         * 将组协调器从失效的Broker迁移到新的Broker
         */
        public void handleCoordinatorMigration(List<String> affectedGroups, 
                                             int newCoordinatorBrokerId) {
            
            info("开始协调器迁移,受影响的组数量: {}, 新协调器: {}", 
                affectedGroups.size(), newCoordinatorBrokerId);
            
            for (String groupId : affectedGroups) {
                try {
                    // 从__consumer_offsets主题恢复组状态
                    ConsumerGroupMetadata groupMetadata = loadGroupMetadataFromLog(groupId);
                    
                    if (groupMetadata != null) {
                        // 重建组状态
                        rebuildGroupState(groupId, groupMetadata, newCoordinatorBrokerId);
                        
                        // 通知所有成员重新发现协调器
                        notifyMembersOfCoordinatorChange(groupId, newCoordinatorBrokerId);
                        
                        debug("成功迁移组 {} 到新协调器 {}", groupId, newCoordinatorBrokerId);
                    }
                    
                } catch (Exception e) {
                    error("迁移组 {} 失败", groupId, e);
                }
            }
            
            info("协调器迁移完成");
        }
        
        /**
         * 从日志恢复组元数据
         */
        private ConsumerGroupMetadata loadGroupMetadataFromLog(String groupId) {
            try {
                // 计算组对应的__consumer_offsets分区
                int partition = Utils.abs(groupId.hashCode()) % config.offsetsTopicNumPartitions();
                TopicPartition offsetsPartition = new TopicPartition(
                    Topic.GROUP_METADATA_TOPIC_NAME, partition);
                
                // 读取分区中的所有记录
                UnifiedLog offsetsLog = logManager.getLog(offsetsPartition);
                if (offsetsLog == null) {
                    warn("__consumer_offsets分区 {} 不存在", partition);
                    return null;
                }
                
                // 扫描日志找到组相关的记录
                ConsumerGroupMetadata.Builder metadataBuilder = new ConsumerGroupMetadata.Builder()
                    .setGroupId(groupId);
                
                long startOffset = offsetsLog.logStartOffset();
                long endOffset = offsetsLog.logEndOffset();
                
                for (long offset = startOffset; offset < endOffset; offset += 1000) {
                    FetchDataInfo fetchInfo = offsetsLog.read(offset, 64 * 1024, 
                        FetchIsolation.FETCH_LOG_END, false);
                    
                    for (RecordBatch batch : fetchInfo.records.batches()) {
                        for (Record record : batch) {
                            // 解析组元数据记录
                            if (isGroupMetadataRecord(record, groupId)) {
                                updateGroupMetadataFromRecord(metadataBuilder, record);
                            }
                        }
                    }
                }
                
                return metadataBuilder.build();
                
            } catch (Exception e) {
                error("从日志恢复组 {} 的元数据失败", groupId, e);
                return null;
            }
        }
        
        /**
         * 重建组状态
         * 在新的协调器上重建组的完整状态
         */
        private void rebuildGroupState(String groupId, 
                                     ConsumerGroupMetadata metadata, 
                                     int coordinatorBrokerId) {
            
            // 创建新的组对象
            ConsumerGroup rebuiltGroup = new ConsumerGroup(
                groupId = groupId,
                state = metadata.state(),
                members = metadata.members(),
                assignmentStrategy = metadata.assignmentStrategy(),
                generation = metadata.generation()
            );
            
            // 注册到新的协调器
            GroupCoordinatorShard coordinatorShard = getCoordinatorShard(coordinatorBrokerId);
            coordinatorShard.addGroup(rebuiltGroup);
            
            // 如果组处于再平衡状态,继续再平衡流程
            if (rebuiltGroup.state() == ConsumerGroupState.PREPARING_REBALANCE) {
                scheduleRebalanceCompletion(rebuiltGroup);
            }
            
            info("在协调器 {} 上重建组 {} 状态完成", coordinatorBrokerId, groupId);
        }
    }
}

6. 总结

Kafka协调器模块通过精心设计的分布式协调机制,实现了可靠的组管理和事务支持:

6.1 核心设计优势

  • 分片化管理:通过分片将协调器负载分散到多个Broker
  • 状态机设计:清晰的状态转换保证操作的一致性和可预测性
  • 持久化存储:使用内部主题持久化协调器状态,支持故障恢复
  • 增量再平衡:协作式分配器减少再平衡对消费的影响

6.2 事务协调亮点

  • 两阶段提交:确保跨分区事务的ACID特性
  • Producer隔离:通过ProducerId和Epoch实现Producer隔离
  • 超时管理:自动检测和处理超时事务
  • 控制消息:使用特殊控制消息标记事务边界

6.3 性能优化特性

  • 批量操作:减少协调器操作的网络开销
  • 智能分配:多种分配算法适应不同场景需求
  • 故障快速恢复:协调器迁移机制保证高可用性

通过深入理解协调器的实现原理,我们能够更好地设计分布式应用,合理配置消费者组,充分利用Kafka的协调功能。


本文档深入分析了Kafka协调器模块的核心实现机制,为分布式协调和事务处理提供了详实的技术指导。