1. 项目概述

Apache Pulsar 是一个分布式发布-订阅消息系统,具有灵活的消息模型和直观的客户端 API。它专为云原生环境而设计,支持多租户、水平扩展、强一致性保证和低延迟持久存储。

1.1 核心特性

  • 水平可扩展性:支持数百万独立主题和每秒数百万消息
  • 强一致性保证:提供消息顺序和数据一致性保障
  • 低延迟持久存储:基于 Apache BookKeeper 的分层存储架构
  • 多租户支持:内置认证、授权、配额管理
  • 地理复制:跨区域数据复制和灾难恢复
  • 函数计算:内置 Serverless 计算框架

2. 整体架构设计

2.1 核心组件架构图

graph TB
    subgraph "Client Layer"
        PC[Producer Client]
        CC[Consumer Client]
        AC[Admin Client]
        RC[Reader Client]
    end
    
    subgraph "Proxy Layer"
        PP[Pulsar Proxy]
        WS[WebSocket Service]
    end
    
    subgraph "Broker Layer"
        B1[Broker 1]
        B2[Broker 2]
        B3[Broker N]
        BS[Broker Service]
        NS[Namespace Service]
        LM[Load Manager]
    end
    
    subgraph "Storage Layer"
        ML[Managed Ledger]
        BK[BookKeeper Cluster]
        BK1[Bookie 1]
        BK2[Bookie 2]
        BK3[Bookie N]
    end
    
    subgraph "Metadata Layer"
        ZK[ZooKeeper/RocksDB]
        MS[Metadata Store]
    end
    
    subgraph "Function Layer"
        FW[Functions Worker]
        FC[Function Container]
    end
    
    PC --> PP
    CC --> PP
    AC --> PP
    RC --> PP
    
    PP --> B1
    PP --> B2
    PP --> B3
    
    WS --> B1
    WS --> B2
    
    B1 --> ML
    B2 --> ML
    B3 --> ML
    
    ML --> BK
    BK --> BK1
    BK --> BK2
    BK --> BK3
    
    B1 --> MS
    B2 --> MS
    B3 --> MS
    MS --> ZK
    
    B1 --> FW
    FW --> FC

2.2 分层架构说明

2.2.1 客户端层 (Client Layer)

  • Producer:消息生产者,负责发送消息到指定主题
  • Consumer:消息消费者,从订阅中接收消息
  • Reader:消息阅读器,从指定位置读取消息
  • Admin Client:管理客户端,用于集群管理和配置

2.2.2 代理层 (Proxy Layer)

  • Pulsar Proxy:协议代理,处理客户端连接和负载均衡
  • WebSocket Service:WebSocket 协议支持

2.2.3 服务层 (Broker Layer)

  • Broker Service:核心消息服务,处理生产和消费请求
  • Namespace Service:命名空间管理,主题分配和负载均衡
  • Load Manager:负载管理器,监控集群负载并决策分配

2.2.4 存储层 (Storage Layer)

  • Managed Ledger:管理账本抽象层,提供统一的存储接口
  • BookKeeper:分布式日志存储系统,提供持久化保障

2.2.5 元数据层 (Metadata Layer)

  • Metadata Store:元数据存储,支持 ZooKeeper 和 RocksDB
  • Configuration Store:全局配置存储

2.2.6 函数计算层 (Function Layer)

  • Functions Worker:函数工作节点,管理函数生命周期
  • Function Container:函数执行容器

3. 消息流转时序图

3.1 消息发送时序图

sequenceDiagram
    participant P as Producer
    participant B as Broker
    participant ML as ManagedLedger
    participant BK as BookKeeper
    participant MS as MetadataStore
    
    P->>B: 1. 建立连接 (Connect)
    B->>MS: 2. 查询主题元数据
    MS-->>B: 3. 返回主题信息
    B-->>P: 4. 连接确认 (Connected)
    
    P->>B: 5. 创建 Producer (CreateProducer)
    B->>ML: 6. 打开 ManagedLedger
    ML->>BK: 7. 创建/打开 Ledger
    BK-->>ML: 8. Ledger 句柄
    ML-->>B: 9. ManagedLedger 就绪
    B-->>P: 10. Producer 创建成功
    
    P->>B: 11. 发送消息 (Send)
    B->>ML: 12. 写入消息到 Ledger
    ML->>BK: 13. 追加 Entry
    BK-->>ML: 14. 写入确认
    ML-->>B: 15. 位置信息 (Position)
    B-->>P: 16. 发送确认 (SendResponse)

3.2 消息消费时序图

sequenceDiagram
    participant C as Consumer
    participant B as Broker
    participant ML as ManagedLedger
    participant BK as BookKeeper
    participant MC as ManagedCursor
    
    C->>B: 1. 建立连接并订阅 (Subscribe)
    B->>ML: 2. 打开 ManagedLedger
    ML->>MC: 3. 创建/恢复 ManagedCursor
    MC->>BK: 4. 读取 Cursor 位置
    BK-->>MC: 5. Cursor 状态
    MC-->>ML: 6. Cursor 就绪
    ML-->>B: 7. 订阅成功
    B-->>C: 8. 订阅确认
    
    B->>ML: 9. 读取消息 (Read)
    ML->>BK: 10. 从 Ledger 读取 Entry
    BK-->>ML: 11. Entry 数据
    ML-->>B: 12. 消息数据
    B->>C: 13. 推送消息 (Message)
    
    C->>B: 14. 消息确认 (Ack)
    B->>MC: 15. 更新 Cursor 位置
    MC->>BK: 16. 持久化 Cursor
    BK-->>MC: 17. 写入确认
    MC-->>B: 18. 确认完成

4. 核心模块交互图

graph LR
    subgraph "Pulsar Broker"
        BS[BrokerService]
        NS[NamespaceService]
        LB[LoadBalancer]
        AS[AuthService]
    end
    
    subgraph "Managed Ledger"
        ML[ManagedLedgerImpl]
        MC[ManagedCursor]
        EC[EntryCache]
    end
    
    subgraph "BookKeeper"
        BKC[BookKeeper Client]
        LH[LedgerHandle]
        RH[ReadHandle]
    end
    
    subgraph "Metadata"
        MS[MetadataStore]
        ZK[ZooKeeper]
        RDB[RocksDB]
    end
    
    BS --> NS
    BS --> LB
    BS --> AS
    BS --> ML
    
    ML --> MC
    ML --> EC
    ML --> BKC
    
    BKC --> LH
    BKC --> RH
    
    NS --> MS
    LB --> MS
    MC --> MS
    
    MS --> ZK
    MS --> RDB

5. 关键数据结构概览

5.1 消息相关数据结构

/**
 * 消息元数据结构
 * 包含消息的所有元信息,如消息ID、生产时间、属性等
 */
public class MessageMetadata {
    // 生产者名称
    private String producerName;
    // 消息序列号
    private long sequenceId;
    // 发布时间
    private long publishTime;
    // 消息属性
    private Map<String, String> properties;
    // 批量消息数量
    private int numMessagesInBatch;
    // 消息键
    private String partitionKey;
    // 消息压缩类型
    private CompressionType compressionType;
}

/**
 * 消息ID结构
 * 用于唯一标识一条消息的位置
 */
public class MessageIdImpl implements MessageId {
    // Ledger ID
    private final long ledgerId;
    // Entry ID
    private final long entryId;
    // 分区索引
    private final int partitionIndex;
    // 批量消息索引
    private final int batchIndex;
}

5.2 存储相关数据结构

/**
 * 管理账本配置
 * 定义了 ManagedLedger 的行为和性能参数
 */
public class ManagedLedgerConfig {
    // 最大 Entry 大小
    private int maxEntrySize = 5 * 1024 * 1024;
    // Ledger 滚动时间
    private long retentionTimeMillis;
    // Ledger 滚动大小
    private long retentionSizeInMB;
    // 缓存大小
    private long entryCacheSizeMB = 256;
    // 数字签名类型
    private BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
}

/**
 * 位置信息
 * 表示消息在存储中的具体位置
 */
public class PositionImpl implements Position {
    // Ledger ID
    private final long ledgerId;
    // Entry ID  
    private final long entryId;
    
    // 比较位置大小
    public int compareTo(Position other) {
        PositionImpl otherPosition = (PositionImpl) other;
        if (ledgerId != otherPosition.ledgerId) {
            return Long.compare(ledgerId, otherPosition.ledgerId);
        }
        return Long.compare(entryId, otherPosition.entryId);
    }
}

5.3 消费者状态数据结构

/**
 * 管理游标配置
 * 控制消费者的行为和性能
 */
public class ManagedCursorConfig {
    // 游标持久化间隔
    private long cursorUpdateTimeIntervalSeconds = 60;
    // 最大未确认消息数
    private int maxUnackedMessages = 10000;
    // 消费者优先级
    private int consumerPriority = 0;
    // 订阅类型
    private SubscriptionType subscriptionType;
}

/**
 * 游标信息
 * 维护消费者的消费位置和状态
 */
public class ManagedCursorInfo {
    // 当前游标位置
    private PositionInfo cursorsLedgerId;
    // 未确认消息范围
    private List<MessageRange> individualDeletedMessages;
    // 游标属性
    private Map<String, String> properties;
}

6. 配置管理

6.1 Broker 配置示例

# 基本配置
clusterName=pulsar-cluster
zookeeperServers=localhost:2181
configurationStoreServers=localhost:2181

# 网络配置
brokerServicePort=6650
brokerServicePortTls=6651
webServicePort=8080
webServicePortTls=8443

# 存储配置
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=3
managedLedgerDefaultAckQuorum=2
managedLedgerMaxEntriesPerLedger=50000

# 负载均衡配置
loadBalancerEnabled=true
loadBalancerPlacementStrategy=weightedRandomSelection
loadBalancerReportUpdateThresholdPercentage=10

# 认证授权配置
authenticationEnabled=false
authorizationEnabled=false
superUserRoles=admin,superuser

6.2 客户端配置示例

// Producer 配置
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build();

Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .producerName("my-producer")
    .sendTimeout(10, TimeUnit.SECONDS)
    .maxPendingMessages(1000)
    .batchingMaxMessages(100)
    .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
    .compressionType(CompressionType.LZ4)
    .create();

// Consumer 配置
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Shared)
    .receiverQueueSize(1000)
    .maxUnackedMessages(1000)
    .ackTimeout(60, TimeUnit.SECONDS)
    .subscribe();

7. 部署架构

7.1 生产环境部署架构

graph TB
    subgraph "Load Balancer"
        LB[HAProxy/Nginx]
    end
    
    subgraph "Pulsar Proxy Cluster"
        P1[Proxy 1]
        P2[Proxy 2] 
        P3[Proxy 3]
    end
    
    subgraph "Pulsar Broker Cluster"
        B1[Broker 1]
        B2[Broker 2]
        B3[Broker 3]
        B4[Broker 4]
        B5[Broker 5]
        B6[Broker 6]
    end
    
    subgraph "BookKeeper Cluster"
        BK1[Bookie 1]
        BK2[Bookie 2] 
        BK3[Bookie 3]
        BK4[Bookie 4]
        BK5[Bookie 5]
    end
    
    subgraph "ZooKeeper Ensemble"
        Z1[ZK 1]
        Z2[ZK 2]
        Z3[ZK 3]
    end
    
    subgraph "Functions Workers"
        F1[Function Worker 1]
        F2[Function Worker 2]
        F3[Function Worker 3]
    end
    
    LB --> P1
    LB --> P2
    LB --> P3
    
    P1 --> B1
    P1 --> B2
    P2 --> B3
    P2 --> B4
    P3 --> B5
    P3 --> B6
    
    B1 --> BK1
    B1 --> BK2
    B2 --> BK2
    B2 --> BK3
    B3 --> BK3
    B3 --> BK4
    
    B1 --> Z1
    B2 --> Z2
    B3 --> Z3
    
    B1 --> F1
    B2 --> F2
    B3 --> F3

8. 性能优化要点

8.1 生产者优化

  • 批量发送:启用消息批量发送以提高吞吐量
  • 异步发送:使用异步 API 避免阻塞
  • 压缩算法:选择合适的压缩算法平衡CPU和网络
  • 连接池:复用客户端连接减少建连开销

8.2 消费者优化

  • 接收队列:合理设置 receiverQueueSize
  • 批量接收:使用 batchReceive API
  • 并行消费:使用多个 Consumer 实例
  • 确认策略:选择合适的确认模式

8.3 Broker 优化

  • 缓存配置:合理设置 Entry Cache 大小
  • IO 线程:根据负载调整 IO 线程数
  • JVM 参数:优化 GC 配置和堆内存
  • 磁盘IO:使用 SSD 和合理的文件系统

8.4 BookKeeper 优化

  • Ensemble 配置:合理设置副本数量
  • Journal 磁盘:独立的 Journal 磁盘
  • 内存配置:足够的 DirectMemory
  • 批量写入:启用批量写入优化

9. 监控和运维

9.1 关键监控指标

  • 吞吐量指标:messages/s, MB/s
  • 延迟指标:publish latency, consume latency
  • 存储指标:storage size, backlog size
  • 连接指标:active connections, failed connections
  • JVM 指标:heap usage, GC metrics

9.2 故障排查

  • 日志分析:结合 broker、bookie 日志
  • 元数据检查:验证 ZooKeeper 数据一致性
  • 网络诊断:检查网络连通性和延迟
  • 资源监控:CPU、内存、磁盘、网络使用率

这个整体架构分析为深入理解 Apache Pulsar 奠定了基础。接下来我们将深入分析各个核心模块的实现细节。