概述
Kafka Connect 提供可扩展的数据集成框架,支持 Source 与 Sink 连接器的分布式运行。本文补充关键函数核心代码、调用链、时序与类结构图,并合并与其它文档的相似内容,保持中性描述。
1. 运行时组件架构
graph TB
subgraph Runtime
W[Worker]
HER[Herder]
REST[Connect REST]
OFF[OffsetBackingStore]
end
subgraph Connector
C[Connector]
T[Task]
TR[Transformation]
CV[Converter]
end
subgraph Kafka
P[Producer]
CO[Consumer]
TOP[Topics]
end
REST --> HER
HER --> W
W --> C
C --> T
T --> CV
T --> TR
T --> P
T --> CO
CO --> TOP
P --> TOP
OFF --> W
2. 关键函数核心代码与说明(精要)
// WorkerTask 拉取与投递(Sink 任务摘要)
public void execute() {
while (active.get()) {
List<SinkRecord> batch = consumer.poll(batchSize, pollTimeoutMs);
if (!batch.isEmpty()) {
List<SinkRecord> transformed = applyTransformations(batch);
connectorTask.put(transformed);
commitOffsetsIfNeeded();
}
}
}
- 功能:按批拉取数据,应用变换并交给
SinkTask.put
,按策略提交偏移量。
// Source 任务轮询(摘要)
public void execute() {
while (active.get()) {
List<SourceRecord> records = connectorTask.poll();
if (records != null && !records.isEmpty()) {
List<ProducerRecord<byte[], byte[]>> serialized = convert(records);
producer.send(serialized);
commitSourceOffsets(records);
}
}
}
- 功能:从外部系统拉取生成
SourceRecord
,经转换后写入 Kafka 并记录 Source 偏移。
// Converter 序列化(摘要)
public byte[] fromConnectData(String topic, Schema schema, Object value) {
if (schema == null) return serializeNull(value);
return serializer.serialize(topic, dataConverter.convert(schema, value));
}
public Object toConnectData(String topic, byte[] value) {
if (value == null) return null;
return dataConverter.toConnectValue(topic, value);
}
- 功能:在 Connect 内部数据模型与字节表示之间转换,常见实现有 JSON/Avro/Protobuf。
// 偏移存储(摘要)
public void put(ConnectorTaskId taskId, Map<ByteBuffer, ByteBuffer> offsets) {
KafkaBasedLog log = topicLog(taskId);
offsets.forEach((k,v) -> log.send(k, v));
}
- 功能:将任务偏移写入
__consumer_offsets
或专用偏移主题(分布式模式)。
3. 调用链(Source 与 Sink)
flowchart LR
REST[REST] --> Herder[Herder.createConnector]
Herder --> Worker[Worker.startConnector]
Worker --> Task[WorkerTask.execute]
Task --> SourcePoll[SourceTask.poll]
SourcePoll --> Convert[Converter.fromConnectData]
Convert --> Producer
Producer --> Kafka
flowchart LR
REST[REST] --> Herder[Herder.createConnector]
Herder --> Worker[Worker.startConnector]
Worker --> Task[WorkerSinkTask.execute]
Task --> Consumer[Consumer.poll]
Consumer --> Transform[Transformation.apply]
Transform --> SinkPut[SinkTask.put]
SinkPut --> Commit[commitOffsets]
4. 时序图(批处理与提交)
sequenceDiagram
participant H as Herder
participant W as Worker
participant T as Task
participant K as Kafka
H->>W: startConnector
W->>T: startTask
loop 周期性
T->>K: poll()/send()
T->>T: 转换/写入/调用put
alt 提交时机到达
T->>K: commitSync(offsets)
end
end
5. 类结构图与继承关系(简化)
classDiagram
class Worker
class Herder
class Connector
class SourceConnector
class SinkConnector
class Task
class SourceTask
class SinkTask
class Converter
class Transformation
Worker --> Connector
Connector <|-- SourceConnector
Connector <|-- SinkConnector
Task <|-- SourceTask
Task <|-- SinkTask
Task --> Converter
Task --> Transformation