Kafka-00-总览
0. 摘要
0.1 项目目标与核心能力
Apache Kafka 是一个开源的分布式事件流平台,用于构建实时数据管道和流式应用。核心能力包括:
- 消息发布与订阅:类似消息队列系统,支持多生产者、多消费者模式
- 存储持久化:以容错方式持久化存储事件流数据
- 流处理:实时处理和转换事件流数据
- 高吞吐低延迟:支持每秒处理数百万条消息
- 水平扩展:通过分区机制实现线性扩展能力
- 强一致性保证:提供 exactly-once 语义和事务性消息
0.2 问题域与边界
问题域:
- 实时数据管道构建:在系统或应用之间可靠地获取数据
- 流式应用开发:实时转换或响应数据流
- 事件驱动架构:系统间通过事件进行解耦通信
- 日志聚合:收集和中心化日志数据
- 活动跟踪:跟踪用户行为事件流
核心边界:
- 不提供复杂的路由逻辑(需通过 Kafka Connect 或 Streams 实现)
- 不提供内置的数据转换(通过 Streams API 或外部处理器实现)
- 不保证全局消息顺序(仅保证分区内顺序)
非目标:
- 不是传统的企业消息总线(ESB)
- 不是完整的 ETL 工具
- 不是实时查询数据库
- 不提供复杂的消息过滤和路由
0.3 运行环境
语言与运行时:
- Java 17+ / Scala 2.13(服务端)
- Java 11+(客户端)
- 支持多语言客户端:Python, Go, C/C++, .NET 等
依赖项:
- Apache ZooKeeper(旧版模式,逐步被 KRaft 替代)
- KRaft(Kafka Raft):新的共识协议,无需 ZooKeeper
部署形态:
- Broker 集群模式:多个 Broker 节点组成集群
- KRaft 模式:Controller + Broker 的分离或混合部署
- 容器化部署:支持 Docker, Kubernetes
- 云原生部署:支持 AWS, Azure, GCP 等云平台
0.4 版本架构演进
- 0.x 版本:依赖 ZooKeeper 进行元数据管理
- 2.x 版本:引入 KRaft 模式(实验性)
- 3.x 版本:KRaft 模式生产就绪,逐步移除 ZooKeeper 依赖
1. 整体架构图
flowchart TB
subgraph "客户端层 Clients"
Producer[Producer<br/>生产者客户端]
Consumer[Consumer<br/>消费者客户端]
AdminClient[AdminClient<br/>管理客户端]
StreamsAPI[Streams API<br/>流处理客户端]
ConnectAPI[Connect API<br/>连接器客户端]
end
subgraph "网络层 Network"
SocketServer[SocketServer<br/>网络服务器]
RequestChannel[RequestChannel<br/>请求队列]
end
subgraph "API处理层 API Layer"
KafkaApis[KafkaApis<br/>请求处理器]
RequestHandlerPool[RequestHandlerPool<br/>处理线程池]
end
subgraph "核心服务层 Core Services"
ReplicaManager[ReplicaManager<br/>副本管理器]
LogManager[LogManager<br/>日志管理器]
GroupCoordinator[GroupCoordinator<br/>消费者组协调器]
TransactionCoordinator[TransactionCoordinator<br/>事务协调器]
ShareCoordinator[ShareCoordinator<br/>共享协调器]
end
subgraph "元数据层 Metadata"
KRaftManager[KRaftManager<br/>Raft共识管理器]
MetadataCache[MetadataCache<br/>元数据缓存]
ControllerServer[ControllerServer<br/>控制器服务]
end
subgraph "存储层 Storage"
Log[Log<br/>日志分段]
LogSegment[LogSegment<br/>日志段]
OffsetIndex[OffsetIndex<br/>偏移量索引]
TimeIndex[TimeIndex<br/>时间索引]
RemoteLogManager[RemoteLogManager<br/>远程日志管理]
end
Producer --> SocketServer
Consumer --> SocketServer
AdminClient --> SocketServer
StreamsAPI --> Producer
StreamsAPI --> Consumer
ConnectAPI --> Producer
ConnectAPI --> Consumer
SocketServer --> RequestChannel
RequestChannel --> RequestHandlerPool
RequestHandlerPool --> KafkaApis
KafkaApis --> ReplicaManager
KafkaApis --> GroupCoordinator
KafkaApis --> TransactionCoordinator
KafkaApis --> ShareCoordinator
ReplicaManager --> LogManager
ReplicaManager --> MetadataCache
LogManager --> Log
Log --> LogSegment
Log --> OffsetIndex
Log --> TimeIndex
LogManager --> RemoteLogManager
KRaftManager --> MetadataCache
ControllerServer --> KRaftManager
GroupCoordinator --> LogManager
TransactionCoordinator --> LogManager
1.1 架构图说明
1.1.1 组件职责与耦合关系
客户端层(Clients):
- Producer:负责将消息发送到 Kafka 集群,支持批量发送、压缩、分区选择等功能
- Consumer:从 Kafka 集群拉取消息,支持消费者组、自动提交偏移量、手动提交等
- AdminClient:执行集群管理操作,如创建主题、修改配置、查看元数据等
- Streams API:基于 Producer 和 Consumer 构建的流处理框架
- Connect API:数据集成框架,支持从外部系统导入/导出数据
网络层(Network):
- SocketServer:基于 Java NIO 的网络服务器,处理客户端连接和请求接收
- RequestChannel:请求队列,解耦网络线程和业务处理线程
API处理层(API Layer):
- KafkaApis:请求处理中枢,根据请求类型分发到不同的处理逻辑
- RequestHandlerPool:业务处理线程池,从 RequestChannel 获取请求并调用 KafkaApis 处理
核心服务层(Core Services):
- ReplicaManager:副本管理器,负责分区的读写、副本同步、ISR 管理
- LogManager:日志管理器,管理所有分区的日志文件生命周期
- GroupCoordinator:消费者组协调器,处理消费者组的加入、离开、再平衡
- TransactionCoordinator:事务协调器,管理事务性消息的两阶段提交
- ShareCoordinator:共享协调器,支持共享订阅模式
元数据层(Metadata):
- KRaftManager:基于 Raft 协议的共识管理器,替代 ZooKeeper
- MetadataCache:元数据缓存,缓存主题、分区、副本等元数据信息
- ControllerServer:集群控制器,负责分区分配、Leader 选举等全局决策
存储层(Storage):
- Log:分区日志抽象,管理一个分区的所有日志段
- LogSegment:日志段,实际存储消息的文件
- OffsetIndex/TimeIndex:索引文件,支持快速定位消息
- RemoteLogManager:远程日志管理,支持将冷数据迁移到对象存储
1.1.2 数据流与控制流分界
数据流路径:
- 生产路径:Producer → SocketServer → KafkaApis → ReplicaManager → LogManager → Log → 磁盘
- 消费路径:Consumer → SocketServer → KafkaApis → ReplicaManager → LogManager → Log → 内存 → Consumer
控制流路径:
- 元数据更新:Controller → KRaftManager → MetadataCache → 各组件(ReplicaManager, GroupCoordinator 等)
- 协调器操作:Client → KafkaApis → Coordinator → Log(状态持久化)
跨进程/跨线程路径:
- 网络线程(Acceptor/Processor)与业务线程(RequestHandler)通过 RequestChannel 解耦
- Controller 线程与 Broker 线程通过 MetadataCache 共享元数据
- 副本同步通过 Fetcher 线程异步拉取 Leader 数据
1.1.3 高可用与扩展性
高可用机制:
- 副本机制:每个分区有多个副本(1 Leader + N Follower),Leader 故障时自动切换
- ISR(In-Sync Replicas):只有与 Leader 保持同步的副本才能被选举为新 Leader
- Controller 高可用:KRaft 模式下,多个 Controller 节点通过 Raft 协议选举 Leader
- 数据持久化:所有消息写入磁盘,支持配置同步刷盘策略
扩展性设计:
- 分区机制:Topic 被分割为多个 Partition,分布在不同 Broker 上
- 水平扩展:增加 Broker 节点,自动重新分配分区
- 消费者组:一个分区只能被组内一个消费者消费,增加消费者数量可提高并行度
- 批量处理:Producer 和 Consumer 都支持批量操作,提高吞吐量
状态管理位置:
- 无状态组件:SocketServer, KafkaApis(可水平扩展)
- 有状态组件:ReplicaManager(管理分区副本状态),Coordinators(管理消费者组/事务状态)
- 集中式状态:MetadataCache(缓存全局元数据),KRaftManager(持久化元数据日志)
2. 全局时序图(主要业务闭环)
2.1 生产者发送消息时序图
sequenceDiagram
autonumber
participant P as Producer
participant NIO as SocketServer<br/>(Processor)
participant RC as RequestChannel
participant RH as RequestHandler
participant KA as KafkaApis
participant RM as ReplicaManager
participant LM as LogManager
participant L as Log
participant F as Follower<br/>Fetcher
P->>P: 序列化消息,选择分区
P->>P: 加入 RecordAccumulator 批次
P->>NIO: ProduceRequest(批量消息)
NIO->>RC: 放入请求队列
RC->>RH: 取出请求
RH->>KA: handleProduceRequest()
KA->>RM: appendRecords()
RM->>RM: 检查权限、配额
RM->>LM: appendAsLeader()
LM->>L: append()
L->>L: 写入 LogSegment
L->>L: 更新 HW 前记录 LEO
L-->>LM: LEO
LM-->>RM: 写入结果
alt acks=all
RM->>RM: 等待 ISR 副本同步
F->>RM: FetchRequest
RM->>LM: read()
LM->>L: read()
L-->>LM: 消息数据
LM-->>RM: 消息数据
RM-->>F: FetchResponse
F->>F: 写入本地 Log
F->>RM: 更新 LEO
RM->>RM: 更新 HW
end
RM-->>KA: ProduceResponse
KA-->>RH: Response
RH->>RC: 放入响应队列
RC->>NIO: 取出响应
NIO-->>P: ProduceResponse(offset, timestamp)
P->>P: 触发回调,释放批次内存
2.2 消费者拉取消息时序图
sequenceDiagram
autonumber
participant C as Consumer
participant NIO as SocketServer
participant RC as RequestChannel
participant RH as RequestHandler
participant KA as KafkaApis
participant RM as ReplicaManager
participant LM as LogManager
participant L as Log
participant GC as GroupCoordinator
C->>C: poll() 触发拉取
C->>NIO: FetchRequest(topic, partition, offset)
NIO->>RC: 放入请求队列
RC->>RH: 取出请求
RH->>KA: handleFetchRequest()
KA->>RM: fetchMessages()
RM->>RM: 检查权限、配额
RM->>LM: read()
LM->>L: read(offset, maxBytes)
L->>L: 从 LogSegment 读取
L->>L: 通过 OffsetIndex 定位
L-->>LM: 消息数据
LM-->>RM: 消息数据
alt 数据不足且未超时
RM->>RM: 加入延迟队列,等待数据到达或超时
end
RM-->>KA: FetchResponse
KA-->>RH: Response
RH->>RC: 放入响应队列
RC->>NIO: 取出响应
NIO-->>C: FetchResponse(records)
C->>C: 反序列化消息
C->>C: 处理消息
C->>NIO: OffsetCommitRequest
NIO->>KA: handleOffsetCommitRequest()
KA->>GC: handleCommitOffsets()
GC->>LM: 写入 __consumer_offsets
GC-->>KA: OffsetCommitResponse
KA-->>C: OffsetCommitResponse
2.3 消费者组再平衡时序图
sequenceDiagram
autonumber
participant C1 as Consumer1
participant C2 as Consumer2<br/>(新加入)
participant NIO as SocketServer
participant KA as KafkaApis
participant GC as GroupCoordinator
participant L as Log<br/>(__consumer_offsets)
C2->>NIO: FindCoordinatorRequest
NIO->>KA: handleFindCoordinatorRequest()
KA->>GC: 计算 Coordinator 位置
GC-->>KA: Coordinator 地址
KA-->>C2: FindCoordinatorResponse
C2->>NIO: JoinGroupRequest
NIO->>KA: handleJoinGroupRequest()
KA->>GC: handleJoinGroup()
GC->>GC: 记录成员信息
GC->>C1: HeartbeatResponse(REBALANCE_IN_PROGRESS)
C1->>NIO: JoinGroupRequest
NIO->>KA: handleJoinGroupRequest()
KA->>GC: handleJoinGroup()
GC->>GC: 等待所有成员加入
GC->>GC: 选举 Leader Consumer
GC-->>KA: JoinGroupResponse(Leader=C1, members=[C1,C2])
KA-->>C1: JoinGroupResponse
KA-->>C2: JoinGroupResponse
C1->>C1: 执行分区分配策略
C1->>NIO: SyncGroupRequest(assignments)
NIO->>KA: handleSyncGroupRequest()
KA->>GC: handleSyncGroup()
GC->>L: 持久化分组状态
GC-->>KA: SyncGroupResponse
KA-->>C1: assignment=[P0,P1]
C2->>NIO: SyncGroupRequest
NIO->>KA: handleSyncGroupRequest()
KA->>GC: handleSyncGroup()
GC-->>KA: SyncGroupResponse
KA-->>C2: assignment=[P2,P3]
C1->>C1: 触发 onPartitionsRevoked()
C1->>C1: 触发 onPartitionsAssigned()
C2->>C2: 触发 onPartitionsAssigned()
2.4 事务性消息时序图
sequenceDiagram
autonumber
participant P as Producer
participant NIO as SocketServer
participant KA as KafkaApis
participant TC as TransactionCoordinator
participant RM as ReplicaManager
participant L as Log
P->>NIO: InitProducerIdRequest
NIO->>KA: handleInitProducerIdRequest()
KA->>TC: handleInitProducerId()
TC->>L: 写入 __transaction_state
TC-->>KA: ProducerId, Epoch
KA-->>P: InitProducerIdResponse
P->>P: beginTransaction()
P->>NIO: AddPartitionsToTxnRequest
NIO->>KA: handleAddPartitionsToTxnRequest()
KA->>TC: handleAddPartitionsToTxn()
TC->>L: 更新 __transaction_state
TC-->>KA: Success
KA-->>P: AddPartitionsToTxnResponse
P->>NIO: ProduceRequest(transactional=true)
NIO->>KA: handleProduceRequest()
KA->>RM: appendRecords()
RM->>RM: 验证 ProducerId & Epoch
RM->>L: append(带事务标记)
L-->>RM: Success
RM-->>KA: ProduceResponse
KA-->>P: ProduceResponse
P->>P: commitTransaction()
P->>NIO: EndTxnRequest(commit)
NIO->>KA: handleEndTxnRequest()
KA->>TC: handleEndTxn()
TC->>TC: 准备提交
TC->>L: 写入 PREPARE_COMMIT
TC->>RM: 写入 Transaction Marker
RM->>L: 追加 COMMIT Marker
TC->>L: 写入 COMPLETE_COMMIT
TC-->>KA: EndTxnResponse
KA-->>P: EndTxnResponse
2.5 图解与要点
2.5.1 入口与鉴权
入口点:
- 所有客户端请求通过
SocketServer接收,由 Acceptor 线程接受连接,Processor 线程处理 I/O - 请求经过网络层后进入
RequestChannel队列,由RequestHandler线程池处理
鉴权机制:
- SASL/SCRAM、SASL/PLAIN、SASL/GSSAPI(Kerberos)等认证机制
- ACL(Access Control List)权限控制,支持 Topic、Group、Cluster 级别
- 在
KafkaApis处理请求前,通过Authorizer接口进行权限检查
2.5.2 幂等性保证
Producer 幂等性:
- 每个 Producer 有唯一的
ProducerId和递增的Sequence Number - Broker 端通过
(ProducerId, Partition, Sequence)三元组去重 - 重复消息(相同 Sequence)会被 Broker 拒绝,但返回成功响应
Consumer 幂等性:
- 通过提交 Offset 实现 at-least-once 或 at-most-once
- 结合事务性消费(read_committed)实现 exactly-once
事务幂等性:
- 两阶段提交协议,TransactionCoordinator 协调多个分区
- 所有分区写入 Transaction Marker 后,事务才对消费者可见
2.5.3 回退策略
Producer 回退:
- 网络失败:自动重试,可配置
retries和delivery.timeout.ms - 配额超限:阻塞等待或抛出异常
- 分区不可用:等待元数据更新后重试
- 事务失败:调用
abortTransaction()回滚
Consumer 回退:
- 偏移量越界:根据
auto.offset.reset(earliest/latest)重置 - 反序列化失败:跳过错误消息或停止消费
- 再平衡:触发
ConsumerRebalanceListener回调,重新分配分区
Broker 回退:
- 副本同步失败:从 ISR 中移除 Follower,降低
min.insync.replicas风险 - 磁盘故障:标记目录为离线,停止服务该目录的分区
- Controller 失败:通过 KRaft 或 ZooKeeper 重新选举 Controller
2.5.4 重试点
客户端重试:
- Producer:
send()失败后自动重试,直到delivery.timeout.ms超时 - Consumer:
poll()失败后自动重试(如网络临时中断) - AdminClient:大部分操作支持自动重试
服务端重试:
- Replica Fetcher:副本同步失败后自动重试
- Controller:分区分配、Leader 选举失败后重试
2.5.5 超时设定
客户端超时:
request.timeout.ms:单个请求超时时间(默认 30s)delivery.timeout.ms:Producer 发送总超时(默认 120s)session.timeout.ms:Consumer 会话超时(默认 10s)max.poll.interval.ms:Consumer 两次 poll 间最大间隔(默认 5min)
服务端超时:
replica.lag.time.max.ms:Follower 同步延迟超时(默认 10s)group.initial.rebalance.delay.ms:再平衡延迟时间(默认 3s)transaction.timeout.ms:事务超时时间(默认 1min)
2.5.6 资源上界
内存上界:
- Producer:
buffer.memory(默认 32MB) - Consumer:
fetch.max.bytes(默认 50MB) - Broker:
replica.fetch.max.bytes(默认 1MB)
磁盘上界:
- 日志保留:
log.retention.bytes或log.retention.ms - 段文件大小:
log.segment.bytes(默认 1GB)
网络上界:
socket.send.buffer.bytes/socket.receive.buffer.bytes(默认 100KB)num.network.threads:网络线程数(默认 3)num.io.threads:I/O 线程数(默认 8)
3. 模块边界与交互图
3.1 模块列表与职责
| 模块名称 | 目录路径 | 主要职责 | 对外 API | 调用方 |
|---|---|---|---|---|
| clients | clients/ |
客户端库(Producer/Consumer/Admin) | Producer.send(), Consumer.poll(), AdminClient APIs | 用户应用 |
| core | core/ |
Broker 核心实现(网络、日志、副本、协调器) | Kafka 协议请求 | clients, connect, streams |
| metadata | metadata/ |
元数据管理(Topic/Partition/Replica) | MetadataRequest/UpdateMetadata | core, clients |
| storage | storage/ |
存储层(Log/Segment/Index) | Log.append(), Log.read() | core |
| raft | raft/ |
Raft 共识协议实现 | RaftClient APIs | core(KRaft 模式) |
| group-coordinator | group-coordinator/ |
消费者组协调器 | JoinGroup, SyncGroup, Heartbeat | clients(Consumer) |
| transaction-coordinator | transaction-coordinator/ |
事务协调器 | InitProducerId, AddPartitionsToTxn, EndTxn | clients(Producer) |
| share-coordinator | share-coordinator/ |
共享协调器 | ShareFetch, ShareAcknowledge | clients(ShareConsumer) |
| connect | connect/ |
Kafka Connect 框架 | REST API(Connector 管理) | 外部系统 |
| streams | streams/ |
Kafka Streams 流处理库 | KafkaStreams APIs | 用户应用 |
| server/server-common | server/, server-common/ |
服务器通用组件 | N/A(内部) | core |
| coordinator-common | coordinator-common/ |
协调器通用框架 | N/A(内部) | group-coordinator, transaction-coordinator |
| tools | tools/ |
管理工具集 | CLI 命令 | 运维人员 |
| generator | generator/ |
消息协议代码生成器 | N/A(构建时) | 构建系统 |
3.2 模块交互矩阵
graph LR
subgraph "客户端模块"
Clients[clients<br/>Producer/Consumer]
Streams[streams<br/>流处理]
Connect[connect<br/>数据集成]
end
subgraph "服务端模块"
Core[core<br/>Broker核心]
Metadata[metadata<br/>元数据]
Storage[storage<br/>存储]
Raft[raft<br/>共识]
end
subgraph "协调器模块"
GroupCoord[group-coordinator<br/>消费者组]
TxnCoord[transaction-coordinator<br/>事务]
ShareCoord[share-coordinator<br/>共享]
CoordCommon[coordinator-common<br/>通用框架]
end
subgraph "支持模块"
Server[server/server-common<br/>服务器组件]
Tools[tools<br/>工具集]
Generator[generator<br/>代码生成]
end
Clients -->|Kafka协议| Core
Streams -->|基于| Clients
Connect -->|基于| Clients
Core -->|读写| Storage
Core -->|查询| Metadata
Core -->|KRaft模式| Raft
Core -->|依赖| Server
GroupCoord -->|基于| CoordCommon
TxnCoord -->|基于| CoordCommon
ShareCoord -->|基于| CoordCommon
GroupCoord -->|注册到| Core
TxnCoord -->|注册到| Core
ShareCoord -->|注册到| Core
Raft -->|管理| Metadata
Tools -->|调用| Clients
Generator -->|生成| Clients
Generator -->|生成| Core
3.3 交互说明
3.3.1 同步调用
| 调用方 | 被调方 | 接口名称 | 说明 |
|---|---|---|---|
| clients | core | ProduceRequest | Producer 发送消息到 Broker |
| clients | core | FetchRequest | Consumer 从 Broker 拉取消息 |
| clients | core | MetadataRequest | 获取 Topic/Partition 元数据 |
| core | storage | Log.append() | 写入日志数据 |
| core | storage | Log.read() | 读取日志数据 |
| core | metadata | MetadataCache.getPartitionInfo() | 查询分区元数据 |
| core | group-coordinator | handleJoinGroup() | 处理消费者组加入 |
| core | transaction-coordinator | handleInitProducerId() | 初始化事务 Producer |
3.3.2 异步消息
| 发送方 | 接收方 | 消息类型 | 说明 |
|---|---|---|---|
| core(Controller) | core(Broker) | UpdateMetadataRequest | 异步推送元数据更新 |
| core(Leader) | core(Follower) | FetchRequest | 副本异步拉取数据 |
| group-coordinator | clients(Consumer) | HeartbeatResponse | 通知再平衡 |
| transaction-coordinator | core(ReplicaManager) | WriteTxnMarkers | 写入事务标记 |
3.3.3 共享存储
| 模块 | 共享存储 | 用途 |
|---|---|---|
| group-coordinator | __consumer_offsets topic | 持久化消费者组状态和偏移量 |
| transaction-coordinator | __transaction_state topic | 持久化事务状态 |
| raft | __cluster_metadata topic | 持久化集群元数据(KRaft 模式) |
3.3.4 订阅/发布
| 发布者 | 订阅者 | 事件类型 | 说明 |
|---|---|---|---|
| metadata | core(多个组件) | MetadataUpdate | 元数据变更通知 |
| raft | core(BrokerMetadataPublisher) | MetadataSnapshot | 元数据快照推送 |
| core(LogManager) | core(ReplicaManager) | LogAppendEvent | 日志追加事件 |
4. 关键设计与权衡
4.1 数据一致性
4.1.1 副本一致性(强一致性)
设计:
- 每个分区有 1 个 Leader 和 N 个 Follower 副本
- 只有 Leader 处理读写请求,Follower 通过 Fetch 请求同步数据
- ISR(In-Sync Replicas)机制:只有与 Leader 保持同步的副本才在 ISR 集合中
一致性保证:
acks=all:Producer 必须等待所有 ISR 副本确认才返回成功min.insync.replicas:ISR 最小副本数,低于此值拒绝写入- HW(High Watermark):所有 ISR 副本都已同步的最大偏移量,Consumer 只能读取 HW 之前的消息
权衡:
- 强一致性 vs 可用性:
min.insync.replicas=N(N 越大,一致性越强,但可用性越低) - 性能 vs 持久性:
acks=all保证持久性,但延迟较高
4.1.2 最终一致性
场景:
- 元数据更新:Controller 异步推送
UpdateMetadataRequest到所有 Broker - 消费者组状态:不同 Consumer 可能短暂看到不一致的分区分配
权衡:
- 元数据最终一致,但可能导致短暂的路由错误(客户端会重试)
- 再平衡期间,Consumer 可能重复消费或丢失消息(需要应用层保证幂等性)
4.2 事务边界
4.2.1 单分区事务
设计:
- Producer 通过
ProducerId和递增的Sequence Number实现幂等性 - Broker 端去重,保证单分区内消息不重复
4.2.2 多分区事务
设计:
- 引入
TransactionCoordinator,协调多个分区的两阶段提交 - 事务状态持久化到
__transaction_statetopic - 所有参与分区写入
Transaction Marker(COMMIT 或 ABORT)
权衡:
- 事务保证跨分区 exactly-once,但增加延迟和协调开销
- 事务超时后自动回滚,避免长事务阻塞
4.3 锁与并发策略
4.3.1 分区级锁
设计:
- 每个分区的
Log对象有独立的读写锁 - 写入时加写锁,读取时加读锁(读写分离)
权衡:
- 细粒度锁提高并发,但增加锁开销
- 避免全局锁,支持多分区并行读写
4.3.2 协调器锁
设计:
GroupCoordinator和TransactionCoordinator使用分段锁- 不同组/事务可以并行处理
权衡:
- 分段锁提高并发,但增加内存开销
- 组内操作串行化,保证一致性
4.4 性能关键路径
4.4.1 零拷贝(Zero-Copy)
设计:
- 消费路径使用
sendfile()系统调用,直接从文件传输到网络 - 避免数据在内核态和用户态之间复制
性能提升:
- 减少 CPU 使用率 40%-50%
- 提高吞吐量 2-3 倍
4.4.2 批量处理
设计:
- Producer 批量发送消息(
batch.size和linger.ms) - Broker 批量写入磁盘
- Consumer 批量拉取消息
性能提升:
- 减少网络往返次数
- 提高磁盘 I/O 效率(顺序写优于随机写)
4.4.3 页缓存(Page Cache)
设计:
- Kafka 依赖操作系统页缓存,不自己管理缓存
- 消息先写入页缓存,由操作系统决定何时刷盘
性能提升:
- 利用操作系统优化,减少重复工作
- 消费路径大概率从页缓存读取,接近内存速度
4.5 可观测性指标
4.5.1 Broker 指标
| 指标名称 | 含义 | 阈值建议 |
|---|---|---|
BytesInPerSec |
每秒写入字节数 | 根据网络带宽设定 |
BytesOutPerSec |
每秒读取字节数 | 根据网络带宽设定 |
RequestsPerSec |
每秒请求数 | 根据硬件能力设定 |
UnderReplicatedPartitions |
副本不足的分区数 | 0(非 0 表示有副本掉队) |
OfflinePartitionsCount |
离线分区数 | 0(非 0 表示有分区不可用) |
RequestHandlerAvgIdlePercent |
请求处理线程空闲率 | >20%(低于 20% 表示过载) |
4.5.2 Producer 指标
| 指标名称 | 含义 | 阈值建议 |
|---|---|---|
record-send-rate |
每秒发送记录数 | 根据业务需求设定 |
record-error-rate |
每秒错误记录数 | <1%(高错误率需排查) |
buffer-available-bytes |
可用缓冲区字节数 | >0(为 0 表示背压) |
request-latency-avg |
平均请求延迟 | <100ms(P99 <500ms) |
4.5.3 Consumer 指标
| 指标名称 | 含义 | 阈值建议 |
|---|---|---|
records-lag-max |
最大消费延迟(条数) | <10000(根据业务容忍度) |
records-consumed-rate |
每秒消费记录数 | 应接近生产速率 |
fetch-latency-avg |
平均拉取延迟 | <50ms |
commit-latency-avg |
平均提交延迟 | <100ms |
4.6 配置项与可变参数
4.6.1 Broker 关键配置
| 配置项 | 默认值 | 说明 |
|---|---|---|
num.network.threads |
3 | 网络线程数(处理请求接收和响应发送) |
num.io.threads |
8 | I/O 线程数(处理业务逻辑) |
log.segment.bytes |
1GB | 日志段文件大小 |
log.retention.hours |
168(7天) | 日志保留时间 |
replica.lag.time.max.ms |
10000 | 副本同步最大延迟 |
min.insync.replicas |
1 | ISR 最小副本数 |
4.6.2 Producer 关键配置
| 配置项 | 默认值 | 说明 |
|---|---|---|
batch.size |
16384(16KB) | 批次大小 |
linger.ms |
0 | 批次等待时间 |
buffer.memory |
33554432(32MB) | 缓冲区大小 |
acks |
all | 确认级别(0/1/all) |
retries |
Integer.MAX_VALUE | 重试次数 |
delivery.timeout.ms |
120000 | 发送总超时 |
4.6.3 Consumer 关键配置
| 配置项 | 默认值 | 说明 |
|---|---|---|
fetch.min.bytes |
1 | 最小拉取字节数 |
fetch.max.wait.ms |
500 | 最大等待时间 |
max.poll.records |
500 | 单次拉取最大记录数 |
session.timeout.ms |
10000 | 会话超时 |
max.poll.interval.ms |
300000 | poll 间隔超时 |
enable.auto.commit |
true | 自动提交偏移量 |
5. 典型使用示例与最佳实践(总览级)
5.1 示例 1:最小可运行入口 + 端到端调用
5.1.1 启动 Kafka Broker(KRaft 模式)
# 生成集群 ID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
# 格式化存储目录
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
# 启动 Broker
bin/kafka-server-start.sh config/server.properties
说明:
- KRaft 模式无需 ZooKeeper,直接使用 Raft 协议管理元数据
--standalone表示单节点模式(开发测试用)- 生产环境需配置多个 Controller 节点
5.1.2 创建 Topic
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic test-topic \
--partitions 3 \
--replication-factor 1
说明:
partitions=3:3 个分区,支持 3 个消费者并行消费replication-factor=1:单副本(测试用,生产环境至少 3 副本)
5.1.3 Producer 发送消息(Java 示例)
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class ProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 等待所有 ISR 副本确认
props.put("retries", Integer.MAX_VALUE); // 自动重试
props.put("enable.idempotence", true); // 开启幂等性
Producer<String, String> producer = new KafkaProducer<>(props);
try {
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"test-topic",
"key-" + i,
"value-" + i
);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("发送失败: " + exception.getMessage());
} else {
System.out.printf("发送成功: partition=%d, offset=%d%n",
metadata.partition(), metadata.offset());
}
});
}
} finally {
producer.close(); // 等待所有消息发送完成
}
}
}
关键点:
acks=all:确保消息持久化到所有 ISR 副本enable.idempotence=true:避免重复消息- 异步发送 + 回调:提高吞吐量,同时处理失败
5.1.4 Consumer 消费消息(Java 示例)
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
public class ConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false"); // 手动提交偏移量
props.put("auto.offset.reset", "earliest"); // 从最早的偏移量开始
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("接收消息: partition=%d, offset=%d, key=%s, value=%s%n",
record.partition(), record.offset(), record.key(), record.value());
// 处理消息逻辑
processRecord(record);
}
// 手动提交偏移量(批量提交,提高性能)
consumer.commitSync();
}
} finally {
consumer.close();
}
}
private static void processRecord(ConsumerRecord<String, String> record) {
// 业务逻辑处理
}
}
关键点:
enable.auto.commit=false:手动提交,避免消息丢失commitSync():同步提交,确保偏移量持久化后再拉取新消息- 异常处理:捕获反序列化异常、处理失败等
5.2 示例 2:扩展点/插件接入
5.2.1 自定义 Partitioner
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int numPartitions = cluster.partitionCountForTopic(topic);
// 自定义分区逻辑:根据 key 的哈希值分区
if (key == null) {
return 0; // 默认分区
}
return Math.abs(key.hashCode()) % numPartitions;
}
@Override
public void close() {
// 清理资源
}
@Override
public void configure(Map<String, ?> configs) {
// 初始化配置
}
}
配置:
props.put("partitioner.class", "com.example.CustomPartitioner");
5.2.2 自定义 Interceptor
import org.apache.kafka.clients.producer.*;
import java.util.Map;
public class MetricsProducerInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 发送前拦截:添加时间戳头
record.headers().add("send-time",
String.valueOf(System.currentTimeMillis()).getBytes());
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 发送后拦截:记录指标
if (exception != null) {
System.err.println("发送失败: " + exception.getMessage());
} else {
System.out.println("发送成功: offset=" + metadata.offset());
}
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
配置:
props.put("interceptor.classes", "com.example.MetricsProducerInterceptor");
5.3 示例 3:规模化/上线注意事项清单
5.3.1 硬件规划
- CPU:每个 Broker 至少 8 核,网络和压缩密集型任务需更多核心
- 内存:32GB 起步,页缓存越大越好(建议操作系统总内存的 50%-75% 用于页缓存)
- 磁盘:
- SSD 优于 HDD(提升 I/O 性能)
- RAID 10(兼顾性能和冗余)
- 独立磁盘用于日志存储(避免与操作系统共享)
- 网络:10Gbps 网络(大规模集群需更高带宽)
5.3.2 Broker 配置优化
# 网络与线程
num.network.threads=8
num.io.threads=16
num.replica.fetchers=4
# 日志配置
log.segment.bytes=1073741824
log.retention.hours=168
log.retention.check.interval.ms=300000
log.cleaner.enable=true
# 副本配置
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false
# 性能优化
compression.type=lz4
message.max.bytes=10485760
replica.fetch.max.bytes=10485760
5.3.3 Topic 设计原则
- 分区数:根据吞吐量需求计算
- 单分区吞吐量:约 10-50 MB/s(取决于硬件和消息大小)
- 分区数 = 目标吞吐量 / 单分区吞吐量
- 预留 20%-30% 余量
- 副本因子:生产环境至少 3 副本
- 保留策略:
- 时间保留:
retention.ms - 大小保留:
retention.bytes - 压缩日志:
cleanup.policy=compact(用于状态存储)
- 时间保留:
5.3.4 客户端最佳实践
Producer:
- 开启幂等性:
enable.idempotence=true - 批量发送:
batch.size=32KB,linger.ms=10 - 压缩:
compression.type=lz4(或 snappy/zstd) - 合理设置超时:
delivery.timeout.ms=120000 - 异常处理:捕获
TimeoutException,SerializationException等
Consumer:
- 消费者组大小 ≤ 分区数(避免空闲消费者)
- 手动提交偏移量:确保消息处理完成后再提交
- 控制拉取批次:
max.poll.records=500(避免超时) - 合理设置超时:
session.timeout.ms=30000,max.poll.interval.ms=300000 - 处理再平衡:实现
ConsumerRebalanceListener
5.3.5 监控与告警
关键指标:
UnderReplicatedPartitions > 0:副本掉队OfflinePartitionsCount > 0:分区离线IsrShrinksPerSec > 0:ISR 频繁变化LeaderElectionRateAndTimeMs > 0:Leader 频繁切换- CPU、内存、磁盘、网络使用率
告警阈值:
UnderReplicatedPartitions持续 5 分钟 > 0OfflinePartitionsCount> 0- Broker 磁盘使用率 > 80%
- Consumer Lag 持续增长
5.3.6 故障演练
- 单 Broker 宕机:验证 Leader 自动切换
- 网络分区:验证 ISR 机制和数据不丢失
- 磁盘故障:验证日志目录隔离
- 滚动升级:验证零停机更新
6. 文档索引
本项目源码剖析文档按模块组织,每个模块包含以下子文档:
- 模块-概览.md:模块职责、架构图、核心流程
- 模块-API.md:对外 API 详细规格、请求/响应结构体、时序图
- 模块-数据结构.md:关键数据结构、UML 类图
- 模块-时序图.md:典型调用场景时序图
6.1 核心模块文档
-
Kafka-01-Clients-概览.md - 客户端模块总览
-
Kafka-01-Clients-API.md - Producer/Consumer API
-
Kafka-01-Clients-数据结构.md - 客户端数据结构
-
Kafka-01-Clients-时序图.md - 客户端调用时序
-
Kafka-02-Core-概览.md - Broker 核心模块
-
Kafka-02-Core-API.md - Broker 协议 API
-
Kafka-02-Core-数据结构.md - Broker 数据结构
-
Kafka-02-Core-时序图.md - Broker 处理时序
6.2 协调器模块文档
- Kafka-06-GroupCoordinator-概览.md - 消费者组协调器
- Kafka-07-TransactionCoordinator-概览.md - 事务协调器
- Kafka-11-ShareCoordinator-概览.md - 共享协调器
6.3 存储与元数据模块
- Kafka-03-Metadata-概览.md - 元数据管理
- Kafka-04-Storage-概览.md - 存储层
- Kafka-05-Raft-概览.md - Raft 共识协议
6.4 扩展框架模块
- Kafka-08-Connect-概览.md - Kafka Connect
- Kafka-09-Streams-概览.md - Kafka Streams
7. 参考资源
7.1 官方文档
7.2 源码仓库
7.3 社区资源
Kafka 核心机制与生产实践
Broker 核心架构
-
网络处理层:
- SocketServer:基于 Java NIO 的非阻塞网络服务器,监听多个端口(PLAINTEXT、SSL、SASL)。
- Acceptor:每个端口一个 Acceptor 线程,接受新连接并轮询分配给 Processor。
- Processor:网络线程池(默认 3 个),处理读写请求,将请求放入 RequestChannel。
- RequestChannel:请求队列(容量 500),解耦网络层和处理层。
-
请求处理层:
- KafkaApis:路由请求到具体处理方法(Produce、Fetch、Metadata、OffsetCommit 等)。
- RequestHandlerPool:处理线程池(默认 8 个),从 RequestChannel 取请求并调用 KafkaApis。
- DelayedRequestPurgatory:延迟请求管理(Produce acks=all、Fetch minBytes、Join Group 等),超时或条件满足时触发回调。
-
副本管理层:
- ReplicaManager:管理所有分区副本,协调 Leader 写入和 Follower 复制。
- ISR 管理:维护 In-Sync Replicas 集合,动态调整(lag 超过
replica.lag.time.max.ms移出 ISR)。 - ReplicaFetcher:Follower 通过 Fetcher 线程从 Leader 拉取数据,批量复制。
- 高水位(HW):ISR 中最小的 LEO(Log End Offset),消费者只能读到 HW 之前的数据。
-
存储管理层:
- LogManager:管理所有 Topic-Partition 的日志,负责日志创建、删除、清理。
- UnifiedLog:单个分区的抽象,由多个 LogSegment 组成。
- LogSegment:日志段(默认 1GB),包含 .log、.index、.timeindex、.txnindex 文件。
- LogCleaner:后台清理线程,支持 delete(删除过期段)和 compact(压缩保留最新值)策略。
存储引擎深度
-
分段日志设计:
- 日志按 Segment 分段存储(基于大小或时间),方便过期删除和并行读写。
- Active Segment:当前写入段,其他为 Inactive Segment(只读)。
- 文件命名:baseOffset.log(如 00000000000000000000.log),便于二分查找。
-
索引机制:
- 偏移量索引(.index):稀疏索引,映射相对偏移量到物理位置(4KB 一个条目)。
- 时间戳索引(.timeindex):映射时间戳到偏移量,支持按时间查询。
- 事务索引(.txnindex):记录 aborted transactions,过滤未提交消息。
- 查找流程:定位 Segment(二分查找) → 索引二分查找 → 日志顺序扫描。
-
零拷贝技术:
- sendfile 系统调用:数据从磁盘直接传输到网络,避免用户空间拷贝。
- 条件:未压缩或 Broker 和 Consumer 压缩格式一致,消息格式相同。
- 性能提升:减少 CPU 开销和内存拷贝,大幅提升吞吐量。
-
页缓存优化:
- 依赖操作系统页缓存:写入和读取都经过页缓存,顺序 I/O 性能接近内存。
- 预读(Readahead):操作系统自动预读后续数据,适合顺序消费场景。
- 避免堆内缓存:减少 GC 压力,利用系统内存管理。
-
日志清理策略:
- delete:基于时间(
retention.ms)或大小(retention.bytes)删除过期段。 - compact:保留每个 Key 的最新值,适用于变更日志(Changelog)场景。
- 清理流程:选择高脏数据比例的日志 → 构建偏移量映射 → 重写保留部分 → 替换旧段。
- delete:基于时间(
客户端实现机制
-
Producer 架构:
- 序列化器:将对象序列化为字节数组(StringSerializer、JsonSerializer、Avro)。
- 分区器:根据 Key 哈希或自定义逻辑选择分区(默认 murmur2 哈希)。
- RecordAccumulator:按分区缓存消息,批量发送(
batch.size: 16KB,linger.ms: 0)。 - Sender:后台线程从 Accumulator 取批次,通过 NetworkClient 发送到 Broker。
- 幂等性:
enable.idempotence: true,Broker 去重(PID + Sequence Number)。 - 事务性:
transactional.id启用事务,跨分区原子写入(Two-Phase Commit)。
-
Producer 关键配置:
acks:0(不等待),1(Leader 确认),all/-1(ISR 全部确认)。retries:重试次数(默认 Integer.MAX_VALUE),配合delivery.timeout.ms。max.in.flight.requests.per.connection:未确认请求数(默认 5),设为 1 保证顺序。compression.type:none/gzip/snappy/lz4/zstd,减少网络和存储开销。
-
Consumer 架构:
- 订阅模式:subscribe(自动分配分区)、assign(手动指定分区)。
- ConsumerCoordinator:与 GroupCoordinator 交互,处理心跳、Rebalance、偏移量提交。
- Fetcher:从 Broker 拉取消息(
fetch.min.bytes: 1,fetch.max.wait.ms: 500)。 - 反序列化器:将字节数组反序列化为对象。
- 偏移量管理:自动提交(
enable.auto.commit: true,auto.commit.interval.ms: 5000)或手动提交。
-
Consumer Rebalance:
- 触发条件:成员加入/离开、订阅变更、分区数变化。
- 协议:Range(默认,按主题均分)、RoundRobin(全局轮询)、Sticky(尽量保留原分配)、CooperativeSticky(增量 Rebalance)。
- 流程:JoinGroup(选举 Leader) → SyncGroup(Leader 分配分区,广播给成员) → Heartbeat(维持成员资格)。
- 优化:增大
session.timeout.ms(默认 45s),减少max.poll.interval.ms(默认 5min),避免频繁 Rebalance。
协调器机制
-
GroupCoordinator:
- 管理消费者组:成员管理、分区分配、偏移量管理。
- 偏移量存储:内部主题
__consumer_offsets(50 个分区,压缩日志)。 - 心跳检测:成员定期发送心跳,超时移出组触发 Rebalance。
-
TransactionCoordinator:
- 管理事务:事务状态存储在内部主题
__transaction_state(50 个分区)。 - 两阶段提交:Prepare(写入所有分区) → Commit(写入事务标记,更新状态)。
- 隔离级别:read_uncommitted(读所有)、read_committed(过滤未提交)。
- 管理事务:事务状态存储在内部主题
KRaft 共识协议
-
架构演进:
- 替代 ZooKeeper:移除外部依赖,简化部署和运维。
- Controller Quorum:专用 Controller 节点(或混合模式)运行 Raft 协议。
- 元数据日志:
__cluster_metadata主题存储元数据变更日志。
-
Raft 实现:
- Leader 选举:投票机制,多数派确认,任期(Epoch)递增。
- 日志复制:Leader 追加日志,Follower 复制,多数派确认后提交。
- 快照:定期生成快照(默认 100 万条记录),加速恢复。
-
元数据管理:
- MetadataCache:Broker 本地缓存元数据(Topic、Partition、Replica、Config)。
- 增量更新:Broker 订阅元数据日志,接收增量变更(Delta)并应用。
- 一致性:所有 Broker 通过日志同步保证元数据最终一致。
网络协议架构
-
协议设计:
- 二进制协议:基于 TLV(Type-Length-Value),紧凑高效。
- 请求-响应模型:请求携带 CorrelationId,响应匹配 ID。
- API 版本:支持多版本兼容,客户端和 Broker 协商最高支持版本。
-
关键 API:
- Produce:生产消息,acks 控制确认级别。
- Fetch:拉取消息,支持批量拉取和长轮询。
- Metadata:查询集群元数据(Topic、Partition、Leader、ISR)。
- OffsetCommit/OffsetFetch:提交和查询偏移量。
- JoinGroup/SyncGroup/Heartbeat:消费者组协调。
- ApiVersions:查询 Broker 支持的 API 版本。
-
连接管理:
- 长连接:客户端和 Broker 保持长连接,复用连接发送请求。
- 多路复用:一个连接并发发送多个请求(In-Flight Requests)。
- 连接池:客户端为每个 Broker 维护连接,避免频繁建立。
性能优化实践
-
生产端优化:
- 批量发送:增大
batch.size(如 32KB),增加linger.ms(如 10ms)。 - 压缩:启用 lz4 或 zstd 压缩,降低网络和存储开销。
- 异步发送:使用回调处理响应,避免阻塞。
- 分区数:根据吞吐量设置分区数(经验:1 个分区 10-30MB/s)。
- 批量发送:增大
-
消费端优化:
- 并行消费:多消费者实例,每个实例处理部分分区。
- 批量拉取:增大
fetch.min.bytes(如 1MB),减少请求次数。 - 手动提交:批处理后手动提交偏移量,减少提交频率。
- 预读:增大
max.partition.fetch.bytes(默认 1MB),提升吞吐。
-
Broker 优化:
- JVM 堆内存:6-8GB,避免过大导致 GC 停顿。
- 文件描述符:至少 10 万,支持大量分区和连接。
- 网络线程:
num.network.threads: 8(根据 CPU 核数调整)。 - I/O 线程:
num.io.threads: 8,处理磁盘读写。 - 页缓存:预留足够系统内存用于页缓存(总内存 - JVM 堆 - 系统开销)。
监控与运维
-
关键指标:
- Broker 指标:RequestsPerSec(QPS)、BytesInPerSec/BytesOutPerSec(吞吐量)、TotalTimeMs(请求延迟)、UnderReplicatedPartitions(未同步副本数)。
- Producer 指标:record-send-rate(发送速率)、record-error-rate(错误率)、request-latency-avg(延迟)。
- Consumer 指标:records-consumed-rate(消费速率)、fetch-latency-avg(拉取延迟)、records-lag-max(消费滞后)。
-
常见问题排查:
- 副本不同步:检查 Follower 是否宕机、网络延迟、磁盘 I/O 瓶颈。
- 消费滞后:增加消费者实例、优化处理逻辑、调整拉取参数。
- Rebalance 频繁:增大
session.timeout.ms、减少max.poll.interval.ms、优化处理时间。 - 磁盘满:调整保留策略(
retention.ms、retention.bytes),启用压缩。 - 高延迟:检查网络、磁盘 I/O、GC 停顿、请求队列积压。
-
安全加固:
- 认证:SASL/PLAIN、SASL/SCRAM、SASL/GSSAPI(Kerberos)。
- 授权:ACL(Access Control List)控制主题和组的访问权限。
- 加密:SSL/TLS 加密传输,保护数据隐私。
- 审计:记录管理操作日志,追踪配置变更。
Kafka Connect 框架
-
架构:
- Source Connector:从外部系统导入数据到 Kafka(如数据库、文件系统)。
- Sink Connector:从 Kafka 导出数据到外部系统(如 HDFS、Elasticsearch)。
- Task:Connector 分解为多个 Task 并行执行。
- Worker:运行 Connector 和 Task 的进程,支持单机和分布式模式。
-
分布式模式:
- 配置存储:内部主题
connect-configs、connect-offsets、connect-status。 - 容错:Task 失败自动重启,Worker 故障重新分配 Task。
- 扩展性:增加 Worker 实例,自动平衡 Task 分配。
- 配置存储:内部主题
Kafka Streams 引擎
-
流处理模型:
- 拓扑(Topology):定义流处理逻辑(Source → Processor → Sink)。
- 流和表:KStream(事件流)、KTable(变更日志)、GlobalKTable(全局表)。
- 窗口:Tumbling(固定窗口)、Hopping(滑动窗口)、Session(会话窗口)。
-
状态管理:
- State Store:本地 RocksDB 存储状态,支持持久化和容错。
- Changelog Topic:状态变更写入 Kafka 主题,用于恢复。
- Interactive Queries:查询本地状态,支持分布式状态聚合。
-
容错与扩展:
- 任务分配:按分区分配 Task,自动负载均衡。
- Rebalance:实例加入/离开触发 Rebalance,重新分配 Task。
- Exactly-Once:事务性处理,保证端到端精确一次语义。