1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
| /**
* LogCleaner - 日志清理器
* 实现基于时间和大小的日志清理策略,以及基于key的日志压缩
*/
public class LogCleaner {
private final CleanerConfig config; // 清理器配置
private final LogDirFailureChannel logDirFailureChannel; // 日志目录失败通道
private final Map<TopicPartition, LogToClean> logsToClean; // 待清理日志映射
// 清理线程池
private final List<CleanerThread> cleaners; // 清理线程列表
private final Scheduler scheduler; // 调度器
/**
* 启动日志清理器
* 初始化清理线程池并开始清理任务
*/
public void startup() {
info("启动日志清理器,清理线程数: {}", config.numThreads);
// 启动清理线程
for (int i = 0; i < config.numThreads; i++) {
CleanerThread cleaner = new CleanerThread(i);
cleaners.add(cleaner);
cleaner.start();
}
// 定期检查需要清理的日志
scheduler.schedule("log-cleanup-scheduler", () -> {
checkAndScheduleCleanup();
}, config.backoffMs, config.backoffMs);
info("日志清理器启动完成");
}
/**
* 检查并调度清理任务
* 识别需要清理的日志并分配给清理线程
*/
private void checkAndScheduleCleanup() {
try {
// 获取所有需要清理的日志
Map<TopicPartition, UnifiedLog> logsToCheck = logManager.allLogs();
for (Map.Entry<TopicPartition, UnifiedLog> entry : logsToCheck.entrySet()) {
TopicPartition tp = entry.getKey();
UnifiedLog log = entry.getValue();
// 检查清理策略
String cleanupPolicy = log.config().cleanupPolicy();
if ("delete".equals(cleanupPolicy)) {
// 基于时间和大小的删除策略
scheduleDeleteCleanup(tp, log);
} else if ("compact".equals(cleanupPolicy)) {
// 基于key的压缩策略
scheduleCompactCleanup(tp, log);
} else if ("compact,delete".equals(cleanupPolicy)) {
// 先压缩再删除
scheduleCompactAndDeleteCleanup(tp, log);
}
}
} catch (Exception e) {
error("检查清理任务时发生异常", e);
}
}
/**
* 执行日志段删除清理
* 基于retention.ms和retention.bytes配置删除过期日志段
*/
private void scheduleDeleteCleanup(TopicPartition tp, UnifiedLog log) {
long currentTimeMs = time.milliseconds();
// 基于时间的清理
long retentionMs = log.config().retentionMs();
if (retentionMs > 0) {
List<LogSegment> deletableSegments = log.deletableSegments(
() -> currentTimeMs - retentionMs);
for (LogSegment segment : deletableSegments) {
if (segment != log.activeSegment()) {
info("标记删除过期日志段: {} (最后修改时间: {})",
segment.baseOffset(),
new Date(segment.lastModified()));
log.deleteSegment(segment);
}
}
}
// 基于大小的清理
long retentionBytes = log.config().retentionBytes();
if (retentionBytes > 0) {
long currentLogSize = log.size();
if (currentLogSize > retentionBytes) {
long bytesToDelete = currentLogSize - retentionBytes;
List<LogSegment> segmentsToDelete = log.candidateSegmentsForDeletion(bytesToDelete);
for (LogSegment segment : segmentsToDelete) {
info("基于大小限制删除日志段: {} (大小: {} 字节)",
segment.baseOffset(), segment.size());
log.deleteSegment(segment);
}
}
}
}
/**
* 执行日志压缩清理
* 基于消息key保留每个key的最新值
*/
private void scheduleCompactCleanup(TopicPartition tp, UnifiedLog log) {
try {
// 检查是否需要压缩
double dirtyRatio = log.dirtyRatio();
if (dirtyRatio < config.minCleanableRatio) {
debug("分区 {} 的脏数据比例 {} 低于阈值 {},跳过压缩",
tp, dirtyRatio, config.minCleanableRatio);
return;
}
info("开始压缩分区 {} 的日志,脏数据比例: {}", tp, dirtyRatio);
// 添加到清理队列
logsToClean.put(tp, new LogToClean(log, LogCleaningState.COMPACTION));
} catch (Exception e) {
error("调度分区 {} 压缩清理时发生异常", tp, e);
}
}
}
/**
* CleanerThread - 日志清理线程
* 执行具体的日志清理和压缩操作
*/
private class CleanerThread extends Thread {
private final int threadId;
private volatile boolean shouldStop = false;
public CleanerThread(int threadId) {
super("kafka-log-cleaner-thread-" + threadId);
this.threadId = threadId;
}
@Override
public void run() {
info("日志清理线程 {} 启动", threadId);
try {
while (!shouldStop) {
try {
// 获取下一个需要清理的日志
LogToClean logToClean = grabNextLogToClean();
if (logToClean != null) {
// 执行清理操作
cleanLog(logToClean);
} else {
// 没有待清理的日志,短暂休眠
Thread.sleep(config.backoffMs);
}
} catch (InterruptedException e) {
info("清理线程 {} 被中断", threadId);
break;
} catch (Exception e) {
error("清理线程 {} 发生异常", threadId, e);
}
}
} finally {
info("日志清理线程 {} 停止", threadId);
}
}
/**
* 执行具体的日志清理操作
*/
private void cleanLog(LogToClean logToClean) {
UnifiedLog log = logToClean.log;
TopicPartition tp = log.topicPartition();
long startTimeMs = time.milliseconds();
try {
if (logToClean.cleaningState == LogCleaningState.COMPACTION) {
// 执行日志压缩
doCompactLog(log);
} else {
// 执行日志删除
doDeleteLog(log);
}
long elapsedMs = time.milliseconds() - startTimeMs;
info("完成分区 {} 的日志清理,耗时 {} ms", tp, elapsedMs);
} catch (Exception e) {
error("清理分区 {} 的日志时发生异常", tp, e);
// 将日志标记为清理失败
markLogAsCleaningAborted(tp);
}
}
/**
* 执行日志压缩
* 保留每个key的最新值,删除旧版本
*/
private void doCompactLog(UnifiedLog log) throws IOException {
info("开始压缩日志分区: {}", log.topicPartition());
// 1. 构建key -> 偏移量的映射(保留最新值)
Map<ByteBuffer, Long> offsetMap = buildOffsetMap(log);
// 2. 创建压缩后的新日志段
List<LogSegment> segmentsToCompact = log.logSegments().stream()
.filter(segment -> segment != log.activeSegment())
.collect(Collectors.toList());
for (LogSegment sourceSegment : segmentsToCompact) {
LogSegment cleanedSegment = compactSegment(sourceSegment, offsetMap);
// 3. 替换原始段
log.replaceSegments(
Collections.singletonList(cleanedSegment),
Collections.singletonList(sourceSegment)
);
}
info("完成日志压缩,分区: {}", log.topicPartition());
}
/**
* 构建偏移量映射
* 扫描日志找出每个key的最新偏移量
*/
private Map<ByteBuffer, Long> buildOffsetMap(UnifiedLog log) throws IOException {
Map<ByteBuffer, Long> offsetMap = new HashMap<>();
// 从最新的段开始向前扫描
List<LogSegment> segments = new ArrayList<>(log.logSegments());
Collections.reverse(segments);
for (LogSegment segment : segments) {
// 读取段中的所有记录
FetchDataInfo fetchInfo = segment.read(segment.baseOffset(),
Integer.MAX_VALUE, Optional.empty(), false);
for (RecordBatch batch : fetchInfo.records.batches()) {
for (Record record : batch) {
ByteBuffer key = record.key();
if (key != null) {
// 由于是从新到旧扫描,只保留第一次遇到的(最新的)偏移量
offsetMap.putIfAbsent(key.duplicate(), record.offset());
}
}
}
}
info("构建偏移量映射完成,总key数: {}", offsetMap.size());
return offsetMap;
}
/**
* 压缩单个日志段
* 只保留在偏移量映射中标记为最新的记录
*/
private LogSegment compactSegment(LogSegment sourceSegment,
Map<ByteBuffer, Long> offsetMap) throws IOException {
// 创建临时的压缩段
File tempFile = File.createTempFile("kafka-compacted-", ".log", sourceSegment.log().file().getParentFile());
LogSegment compactedSegment = LogSegment.open(
tempFile.getParentFile(),
sourceSegment.baseOffset(),
sourceSegment.config(),
time,
false // 不预分配
);
try {
// 读取源段的所有数据
FetchDataInfo fetchInfo = sourceSegment.read(sourceSegment.baseOffset(),
Integer.MAX_VALUE, Optional.empty(), false);
MemoryRecordsBuilder cleanedRecordsBuilder = MemoryRecords.builder(
ByteBuffer.allocate(Math.min(1024 * 1024, fetchInfo.records.sizeInBytes())), // 1MB缓冲区
RecordBatch.CURRENT_MAGIC_VALUE,
config.compression(),
TimestampType.CREATE_TIME,
sourceSegment.baseOffset()
);
// 过滤并重新构建记录
for (RecordBatch batch : fetchInfo.records.batches()) {
for (Record record : batch) {
ByteBuffer key = record.key();
// 检查这条记录是否应该保留
if (key == null || offsetMap.get(key) == record.offset()) {
// 保留这条记录(无key或者是最新版本)
cleanedRecordsBuilder.append(
record.timestamp(),
record.key(),
record.value(),
record.headers()
);
}
}
}
// 将压缩后的记录写入新段
MemoryRecords cleanedRecords = cleanedRecordsBuilder.build();
compactedSegment.append(sourceSegment.baseOffset() + cleanedRecords.records().size() - 1,
cleanedRecords);
return compactedSegment;
} catch (Exception e) {
// 清理失败时删除临时文件
compactedSegment.close();
tempFile.delete();
throw e;
}
}
}
|