概述

Kafka Streams 提供内嵌式分布式流处理引擎。本文补充关键函数核心代码、处理链调用关系、时序与类结构,强调实现边界与中性描述。

1. 引擎组件架构

graph TB
  subgraph App
    DS[StreamsBuilder]
    TP[Topology]
  end
  subgraph Runtime
    IK[KafkaStreams]
    TM[StreamThread]
    TK[Task]
    PR[Processor]
    SS[StateStore]
  end
  subgraph IO
    SRC[SourceNode]
    SINK[SinkNode]
    CON[Consumer]
    PRO[Producer]
  end

  DS --> TP
  TP --> IK
  IK --> TM
  TM --> TK
  TK --> PR
  PR --> SS
  SRC --> CON
  SINK --> PRO

2. 关键函数核心代码与说明(精要)

// 拓扑构建与物化状态存储(摘要)
public <K, V> KTable<K, V> table(String topic, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
  String storeName = materialized.storeName();
  StoreBuilder<KeyValueStore<K, V>> builder = Stores.keyValueStoreBuilder(
      Stores.persistentKeyValueStore(storeName), materialized.keySerde(), materialized.valueSerde());
  addStateStore(builder, sourceNameFor(topic));
  return new KTableImpl<>(builder, topic, storeName);
}
  • 功能:为表源配置持久化 KeyValueStore 并注册到拓扑中。
// 线程主循环(摘要)
public void run() {
  while (isRunning()) {
    int processed = runOnce(); // 拉取→反序列化→分发到任务→处理器执行
    maybeCommit(processed);
    maybePunctuate();
  }
}
  • 功能:每轮完成拉取、处理、提交与定时回调的调度循环。
// 任务处理单步(摘要)
int process() {
  int num = 0;
  while (recordQueue.hasNext() && num < maxProcess) {
    StampedRecord rec = recordQueue.next();
    processorContext.setRecordMetadata(rec.topic(), rec.partition(), rec.offset(), rec.timestamp());
    rootProcessor.process(rec.key(), rec.value());
    num++;
  }
  return num;
}
  • 功能:以背压上限批量出队,设置上下文并驱动处理器链。
// 状态存储读写(摘要)
public V put(K key, V value) {
  rocksDB.put(serialize(key), serialize(value));
  if (enableCaching) { cache.put(key, value); }
  return value;
}

public V get(K key) {
  V cached = enableCaching ? cache.getIfPresent(key) : null;
  if (cached != null) return cached;
  byte[] bytes = rocksDB.get(serialize(key));
  return deserialize(bytes);
}
  • 功能:写路径同步写入底层存储并可选写缓存;读路径命中缓存或回源 RocksDB。
// 提交与位移前推(摘要)
void maybeCommit(int processed) {
  if (processed == 0) return;
  Map<TopicPartition, OffsetAndMetadata> offsets = taskCollector.collectedOffsets();
  consumer.commitSync(offsets);
}
  • 功能:在处理推进后同步提交任务位移,保障至少一次处理语义。

3. 调用链(处理路径)

flowchart LR
  App[StreamsBuilder] --> Topo[Topology]
  Topo --> Streams[KafkaStreams.start]
  Streams --> Thread[StreamThread.run]
  Thread --> Poll[KafkaConsumer.poll]
  Poll --> Task[Task.process]
  Task --> Proc[Processor.process]
  Proc --> Store[StateStore.get/put]
  Task --> Commit[commitSync]

4. 时序图(单条记录处理)

sequenceDiagram
  participant T as StreamThread
  participant K as KafkaConsumer
  participant X as Task
  participant P as Processor
  participant S as StateStore

  T->>K: poll()
  K-->>T: records
  T->>X: schedule process(records)
  loop for each record
    X->>P: process(key,value)
    alt 需要状态
      P->>S: get/put
      S-->>P: value/ack
    end
  end
  T->>K: commitSync(offsets)

5. 类结构图与继承关系(简化)

classDiagram
  class KafkaStreams
  class StreamThread
  class Task
  class Processor~K,V~
  class SourceNode
  class SinkNode
  class StateStore
  class KeyValueStore

  KafkaStreams --> StreamThread
  StreamThread --> Task
  Task --> Processor
  Processor <|-- SourceNode
  Processor <|-- SinkNode
  StateStore <|-- KeyValueStore
  Processor --> StateStore