1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
|
/**
* RecordAccumulator - 记录累加器
* 实现消息的批处理和内存管理,提高发送效率
*/
public class RecordAccumulator {
// 批处理配置
private final int batchSize; // 批次大小
private final Compression compression; // 压缩算法
private final int lingerMs; // 延迟发送时间
private final ExponentialBackoff retryBackoff; // 重试退避策略
// 内存管理
private final BufferPool free; // 内存池
// 批次管理
private final ConcurrentMap<String, TopicInfo> topicInfoMap = new CopyOnWriteMap<>();
private final IncompleteBatches incomplete; // 未完成批次追踪
/**
* 向累加器添加消息记录
* 实现批处理逻辑的核心方法
*/
public RecordAppendResult append(String topic,
int partition,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
AppendCallbacks callbacks,
long maxTimeToBlock,
long nowMs,
Cluster cluster) throws InterruptedException {
// 1. 获取或创建主题信息
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic,
k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize)));
// 2. 跟踪正在进行的append操作
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// 3. 分区选择逻辑
while (true) {
// 选择有效分区
int effectivePartition = partition == RecordMetadata.UNKNOWN_PARTITION
? topicInfo.builtInPartitioner.partition(topic, key, value, cluster)
: partition;
// 4. 获取分区对应的批次队列
Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(
effectivePartition, k -> new ArrayDeque<>());
synchronized (dq) {
// 5. 尝试添加到现有批次
RecordAppendResult appendResult = tryAppend(timestamp, key, value,
headers, callbacks, dq, nowMs);
if (appendResult != null) {
// 成功添加到现有批次
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(
partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
}
// 6. 需要创建新批次,先分配内存
if (buffer == null) {
int size = Math.max(this.batchSize,
AbstractRecords.estimateSizeInBytesUpperBound(
RecordBatch.CURRENT_MAGIC_VALUE, compression.type(),
key, value, headers));
log.trace("为主题 {} 分区 {} 分配新的 {} 字节消息缓冲区,剩余超时 {}ms",
topic, effectivePartition, size, maxTimeToBlock);
// 这个调用可能会阻塞,如果缓冲池空间不足
buffer = free.allocate(size, maxTimeToBlock);
nowMs = time.milliseconds();
}
synchronized (dq) {
// 7. 创建新批次
RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition,
dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
// 如果成功创建新批次,buffer将被使用,设为null避免释放
if (appendResult.newBatchCreated) {
buffer = null;
}
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(
partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
}
} finally {
// 8. 清理资源
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
/**
* 尝试将记录添加到现有批次中
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value,
Header[] headers, AppendCallbacks callbacks,
Deque<ProducerBatch> deque, long nowMs) {
if (closed) {
throw new KafkaException("Producer已关闭,发送过程中被中断");
}
ProducerBatch last = deque.peekLast();
if (last != null) {
int initialBytes = last.estimatedSizeInBytes();
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers,
callbacks, nowMs);
if (future == null) {
// 批次已满,关闭以供发送
last.closeForRecordAppends();
} else {
int appendedBytes = last.estimatedSizeInBytes() - initialBytes;
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(),
false, appendedBytes);
}
}
return null;
}
/**
* 创建新的生产者批次
*/
private RecordAppendResult appendNewBatch(String topic, int partition,
Deque<ProducerBatch> dq,
long timestamp, byte[] key, byte[] value,
Header[] headers, AppendCallbacks callbacks,
ByteBuffer buffer, long nowMs) {
assert partition != RecordMetadata.UNKNOWN_PARTITION;
// 再次尝试现有批次(可能在等待锁期间有其他线程创建了批次)
RecordAppendResult appendResult = tryAppend(timestamp, key, value,
headers, callbacks, dq, nowMs);
if (appendResult != null) {
return appendResult;
}
// 创建新批次
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer);
ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition),
recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(
batch.tryAppend(timestamp, key, value, headers, callbacks, nowMs));
// 添加到队列和未完成批次追踪
dq.addLast(batch);
incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(),
true, batch.estimatedSizeInBytes());
}
/**
* 获取准备发送的批次
* 由Sender线程调用以获取就绪的批次进行网络发送
*/
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<String, TopicInfo> topicInfoEntry : this.topicInfoMap.entrySet()) {
String topic = topicInfoEntry.getKey();
TopicInfo topicInfo = topicInfoEntry.getValue();
for (Map.Entry<Integer, Deque<ProducerBatch>> entry : topicInfo.batches.entrySet()) {
int partition = entry.getKey();
Deque<ProducerBatch> deque = entry.getValue();
Node leader = cluster.leaderFor(new TopicPartition(topic, partition));
synchronized (deque) {
if (leader == null) {
unknownLeaderTopics.add(topic);
} else if (!readyNodes.contains(leader) && !isMuted(leader)) {
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
long timeSinceCreationMs = max(0, nowMs - batch.createdMs);
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = timeSinceCreationMs >= lingerMs;
boolean transactionCompleting = transactionManager != null
&& transactionManager.isCompleting();
boolean sendable = full
|| expired
|| exhausted
|| closed
|| flushInProgress()
|| transactionCompleting;
if (sendable && !batch.inRetry()) {
readyNodes.add(leader);
} else {
long timeLeftMs = max(lingerMs - timeSinceCreationMs, 0);
nextReadyCheckDelayMs = min(nextReadyCheckDelayMs, timeLeftMs);
}
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
}
|