1. 环境准备

1.1 快速启动 Standalone 模式

# 下载并启动 Pulsar Standalone
bin/pulsar standalone

# 验证服务状态
curl http://localhost:8080/admin/v2/clusters

1.2 Docker 方式启动

# 启动 Pulsar standalone 容器
docker run -it \
  -p 6650:6650 \
  -p 8080:8080 \
  --mount source=pulsardata,target=/pulsar/data \
  --mount source=pulsarconf,target=/pulsar/conf \
  apachepulsar/pulsar:latest \
  bin/pulsar standalone

2. 基础使用示例

2.1 Java 客户端基础使用

2.1.1 项目依赖配置

<!-- Maven 依赖配置 -->
<dependencies>
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client</artifactId>
        <version>4.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client-admin</artifactId>
        <version>4.2.0</version>
    </dependency>
</dependencies>

2.1.2 简单生产者示例

package org.apache.pulsar.examples;

import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 简单消息生产者示例
 * 演示如何创建生产者并发送消息到 Pulsar 主题
 */
public class SimpleProducer {
    private static final Logger log = LoggerFactory.getLogger(SimpleProducer.class);
    
    public static void main(String[] args) throws Exception {
        // 1. 创建 Pulsar 客户端
        // serviceUrl: Pulsar broker 的服务地址
        // operationTimeout: 操作超时时间
        // connectionsPerBroker: 每个 broker 的连接数
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .operationTimeout(30, java.util.concurrent.TimeUnit.SECONDS)
                .connectionsPerBroker(5)
                .build();
        
        try {
            // 2. 创建生产者
            // topic: 目标主题名称
            // producerName: 生产者名称(可选,系统会自动生成)
            // sendTimeout: 发送超时时间
            // maxPendingMessages: 最大待发送消息数
            Producer<String> producer = client.newProducer(Schema.STRING)
                    .topic("simple-topic")
                    .producerName("simple-producer")
                    .sendTimeout(10, java.util.concurrent.TimeUnit.SECONDS)
                    .maxPendingMessages(100)
                    .create();
            
            // 3. 发送消息
            for (int i = 0; i < 10; i++) {
                String message = "Hello Pulsar! Message " + i;
                
                // 同步发送消息
                // send() 方法会阻塞直到消息被 broker 确认
                MessageId msgId = producer.send(message);
                log.info("发送消息成功: {} -> MessageId: {}", message, msgId);
            }
            
            // 4. 关闭生产者
            producer.close();
        } finally {
            // 5. 关闭客户端
            client.close();
        }
    }
}

2.1.3 异步生产者示例

package org.apache.pulsar.examples;

import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * 异步消息生产者示例
 * 演示如何使用异步 API 提高消息发送性能
 */
public class AsyncProducer {
    private static final Logger log = LoggerFactory.getLogger(AsyncProducer.class);
    
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        
        try {
            // 创建生产者配置
            Producer<String> producer = client.newProducer(Schema.STRING)
                    .topic("async-topic")
                    .producerName("async-producer")
                    // 启用批量发送以提高性能
                    .batchingMaxMessages(100)
                    .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
                    // 启用压缩减少网络传输
                    .compressionType(CompressionType.LZ4)
                    .create();
            
            // 异步发送多条消息
            for (int i = 0; i < 100; i++) {
                String message = "Async message " + i;
                
                // 异步发送消息
                // sendAsync() 立即返回 CompletableFuture,不阻塞当前线程
                CompletableFuture<MessageId> future = producer.sendAsync(message);
                
                // 设置回调处理发送结果
                final int messageIndex = i;
                future.thenAccept(messageId -> {
                    log.info("消息 {} 发送成功,MessageId: {}", messageIndex, messageId);
                }).exceptionally(ex -> {
                    log.error("消息 {} 发送失败: {}", messageIndex, ex.getMessage());
                    return null;
                });
            }
            
            // 等待所有消息发送完成
            producer.flush();
            Thread.sleep(2000); // 等待回调完成
            
            producer.close();
        } finally {
            client.close();
        }
    }
}

2.1.4 简单消费者示例

package org.apache.pulsar.examples;

import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 简单消息消费者示例
 * 演示如何创建消费者并从 Pulsar 主题接收消息
 */
public class SimpleConsumer {
    private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);
    
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        
        try {
            // 创建消费者
            // topic: 订阅的主题
            // subscriptionName: 订阅名称,用于跟踪消费进度
            // subscriptionType: 订阅类型(Exclusive, Shared, Failover, Key_Shared)
            Consumer<String> consumer = client.newConsumer(Schema.STRING)
                    .topic("simple-topic")
                    .subscriptionName("simple-subscription")
                    .subscriptionType(SubscriptionType.Exclusive)
                    // 设置接收队列大小,预拉取消息数量
                    .receiverQueueSize(100)
                    // 设置确认超时时间,超时未确认的消息会重新投递
                    .ackTimeout(60, java.util.concurrent.TimeUnit.SECONDS)
                    .subscribe();
            
            // 接收并处理消息
            while (true) {
                try {
                    // 阻塞等待接收消息,超时时间 5 秒
                    Message<String> message = consumer.receive(5, java.util.concurrent.TimeUnit.SECONDS);
                    
                    if (message != null) {
                        // 处理消息内容
                        String content = message.getValue();
                        MessageId messageId = message.getMessageId();
                        
                        log.info("接收到消息: {} -> MessageId: {}", content, messageId);
                        log.info("消息属性: {}", message.getProperties());
                        log.info("发布时间: {}", message.getPublishTime());
                        
                        // 确认消息处理完成
                        // 确认后该消息不会再次投递给这个订阅
                        consumer.acknowledge(message);
                        
                        // 检查是否需要退出
                        if (content.contains("exit")) {
                            log.info("收到退出指令,停止消费");
                            break;
                        }
                    } else {
                        log.info("接收消息超时,继续等待...");
                    }
                } catch (PulsarClientException e) {
                    log.error("接收消息失败: {}", e.getMessage());
                }
            }
            
            consumer.close();
        } finally {
            client.close();
        }
    }
}

2.1.5 异步消费者示例

package org.apache.pulsar.examples;

import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 异步消息消费者示例
 * 演示如何使用消息监听器异步处理消息
 */
public class AsyncConsumer {
    private static final Logger log = LoggerFactory.getLogger(AsyncConsumer.class);
    
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        
        CountDownLatch latch = new CountDownLatch(1);
        
        try {
            // 创建消费者并设置消息监听器
            Consumer<String> consumer = client.newConsumer(Schema.STRING)
                    .topic("async-topic")
                    .subscriptionName("async-subscription")
                    .subscriptionType(SubscriptionType.Shared)
                    .receiverQueueSize(100)
                    // 设置消息监听器,消息到达时自动调用
                    .messageListener(new MessageListener<String>() {
                        private int messageCount = 0;
                        
                        @Override
                        public void received(Consumer<String> consumer, Message<String> message) {
                            try {
                                // 处理消息
                                String content = message.getValue();
                                messageCount++;
                                
                                log.info("异步接收消息 #{}: {} -> MessageId: {}", 
                                    messageCount, content, message.getMessageId());
                                
                                // 模拟消息处理时间
                                Thread.sleep(100);
                                
                                // 确认消息
                                consumer.acknowledge(message);
                                
                                // 处理完 20 条消息后退出
                                if (messageCount >= 20) {
                                    latch.countDown();
                                }
                                
                            } catch (Exception e) {
                                log.error("处理消息失败: {}", e.getMessage(), e);
                                // 拒绝消息,消息会重新投递
                                consumer.negativeAcknowledge(message);
                            }
                        }
                    })
                    .subscribe();
            
            log.info("开始监听消息,等待消息到达...");
            
            // 等待处理完成
            latch.await(30, TimeUnit.SECONDS);
            
            consumer.close();
        } finally {
            client.close();
        }
    }
}

2.2 批量操作示例

2.2.1 批量消息生产者

package org.apache.pulsar.examples;

import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;

/**
 * 批量消息生产者示例
 * 演示如何配置和使用批量发送功能提高性能
 */
public class BatchProducer {
    private static final Logger log = LoggerFactory.getLogger(BatchProducer.class);
    
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        
        try {
            Producer<String> producer = client.newProducer(Schema.STRING)
                    .topic("batch-topic")
                    .producerName("batch-producer")
                    // 启用批量发送
                    .enableBatching(true)
                    // 批量消息最大条数
                    .batchingMaxMessages(50)
                    // 批量发送最大延迟时间
                    .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
                    // 批量消息最大字节数
                    .batchingMaxBytes(128 * 1024)
                    // 批量分区策略
                    .batcherBuilder(BatcherBuilder.DEFAULT)
                    // 压缩类型
                    .compressionType(CompressionType.LZ4)
                    .create();
            
            long startTime = System.currentTimeMillis();
            
            // 发送大量消息测试批量性能
            for (int i = 0; i < 1000; i++) {
                String message = String.format("Batch message %d - timestamp: %d", 
                    i, System.currentTimeMillis());
                
                // 异步发送以提高性能
                producer.sendAsync(message)
                    .thenAccept(messageId -> {
                        if (i % 100 == 0) {
                            log.info("发送消息 {} 成功: {}", i, messageId);
                        }
                    })
                    .exceptionally(ex -> {
                        log.error("发送消息 {} 失败: {}", i, ex.getMessage());
                        return null;
                    });
                
                // 每发送 100 条消息强制刷新一次
                if (i % 100 == 0 && i > 0) {
                    producer.flush();
                    log.info("已发送 {} 条消息,强制刷新批量缓冲区", i);
                }
            }
            
            // 确保所有消息都发送完成
            producer.flush();
            
            long endTime = System.currentTimeMillis();
            log.info("批量发送 1000 条消息完成,耗时: {} ms", endTime - startTime);
            
            producer.close();
        } finally {
            client.close();
        }
    }
}

2.2.2 批量消息消费者

package org.apache.pulsar.examples;

import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;

/**
 * 批量消息消费者示例
 * 演示如何使用批量接收 API 提高消费性能
 */
public class BatchConsumer {
    private static final Logger log = LoggerFactory.getLogger(BatchConsumer.class);
    
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        
        try {
            Consumer<String> consumer = client.newConsumer(Schema.STRING)
                    .topic("batch-topic")
                    .subscriptionName("batch-subscription")
                    .subscriptionType(SubscriptionType.Shared)
                    // 设置较大的接收队列以提高吞吐量
                    .receiverQueueSize(1000)
                    .subscribe();
            
            int totalMessages = 0;
            long startTime = System.currentTimeMillis();
            
            while (totalMessages < 1000) {
                try {
                    // 批量接收消息
                    // maxNumMessages: 批量接收的最大消息数量
                    // maxSizeBytes: 批量接收的最大字节数
                    // timeout: 批量接收的超时时间
                    Messages<String> messages = consumer.batchReceive();
                    
                    if (messages.size() > 0) {
                        log.info("批量接收到 {} 条消息", messages.size());
                        
                        // 处理批量消息
                        for (Message<String> message : messages) {
                            String content = message.getValue();
                            MessageId messageId = message.getMessageId();
                            
                            // 这里可以进行批量业务处理
                            // 例如:批量插入数据库、批量调用外部服务等
                            if (totalMessages % 100 == 0) {
                                log.info("处理消息: {} -> {}", content, messageId);
                            }
                            
                            totalMessages++;
                        }
                        
                        // 批量确认所有消息
                        // 注意:只需要确认最后一条消息即可
                        Message<String> lastMessage = null;
                        for (Message<String> message : messages) {
                            lastMessage = message;
                        }
                        if (lastMessage != null) {
                            consumer.acknowledge(lastMessage);
                        }
                        
                        log.info("批量确认完成,已处理消息总数: {}", totalMessages);
                    } else {
                        log.info("未接收到消息,继续等待...");
                    }
                    
                } catch (PulsarClientException e) {
                    log.error("批量接收消息失败: {}", e.getMessage());
                }
            }
            
            long endTime = System.currentTimeMillis();
            log.info("批量消费 {} 条消息完成,耗时: {} ms", totalMessages, endTime - startTime);
            
            consumer.close();
        } finally {
            client.close();
        }
    }
}

2.3 高级特性示例

2.3.1 消息路由和分区

package org.apache.pulsar.examples;

import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 消息路由和分区示例
 * 演示如何使用自定义消息路由策略控制消息分发
 */
public class PartitionedProducer {
    private static final Logger log = LoggerFactory.getLogger(PartitionedProducer.class);
    
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        
        try {
            // 创建分区主题的生产者
            Producer<String> producer = client.newProducer(Schema.STRING)
                    .topic("partitioned-topic")
                    .producerName("partitioned-producer")
                    // 自定义消息路由器
                    .messageRouter(new MessageRouter() {
                        @Override
                        public int choosePartition(Message<?> msg, TopicMetadata metadata) {
                            // 根据消息键的哈希值选择分区
                            String key = msg.getKey();
                            if (key != null) {
                                return Math.abs(key.hashCode()) % metadata.numPartitions();
                            }
                            // 如果没有键,使用轮询方式
                            return -1;
                        }
                    })
                    .create();
            
            // 发送带有不同键的消息到不同分区
            String[] keys = {"user-1", "user-2", "user-3", "user-4", "user-5"};
            
            for (int i = 0; i < 50; i++) {
                String key = keys[i % keys.length];
                String message = String.format("Message %d for %s", i, key);
                
                // 使用类型化消息构建器设置消息属性
                MessageId messageId = producer.newMessage()
                        .key(key)  // 设置消息键用于分区路由
                        .value(message)  // 设置消息内容
                        .property("user-id", key)  // 设置自定义属性
                        .property("message-type", "data")
                        .eventTime(System.currentTimeMillis())  // 设置事件时间
                        .send();
                
                log.info("发送消息到分区: key={}, message={}, messageId={}", 
                    key, message, messageId);
            }
            
            producer.close();
        } finally {
            client.close();
        }
    }
}

2.3.2 消息过滤和选择器

package org.apache.pulsar.examples;

import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 消息过滤和选择器示例
 * 演示如何使用消息选择器过滤消息
 */
public class MessageFilterConsumer {
    private static final Logger log = LoggerFactory.getLogger(MessageFilterConsumer.class);
    
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        
        try {
            // 创建使用消息选择器的消费者
            Consumer<String> consumer = client.newConsumer(Schema.STRING)
                    .topic("filtered-topic")
                    .subscriptionName("filtered-subscription")
                    .subscriptionType(SubscriptionType.Shared)
                    // 使用 SQL 表达式过滤消息
                    // 只接收 user-type 属性为 'premium' 的消息
                    .subscriptionProperties(java.util.Map.of(
                        "subscriptionType", "filtered"
                    ))
                    .subscribe();
            
            // 创建生产者发送测试消息
            Producer<String> producer = client.newProducer(Schema.STRING)
                    .topic("filtered-topic")
                    .create();
            
            // 发送不同类型的消息
            String[] userTypes = {"premium", "standard", "premium", "basic", "premium"};
            
            for (int i = 0; i < 10; i++) {
                String userType = userTypes[i % userTypes.length];
                String message = String.format("Message %d for %s user", i, userType);
                
                producer.newMessage()
                        .value(message)
                        .property("user-type", userType)
                        .property("priority", userType.equals("premium") ? "high" : "normal")
                        .sendAsync()
                        .thenAccept(messageId -> {
                            log.info("发送消息: type={}, message={}, messageId={}", 
                                userType, message, messageId);
                        });
            }
            
            // 等待消息发送完成
            producer.flush();
            
            // 接收过滤后的消息
            int receivedCount = 0;
            while (receivedCount < 5) {  // 预期接收 5 条 premium 消息
                try {
                    Message<String> message = consumer.receive(10, java.util.concurrent.TimeUnit.SECONDS);
                    
                    if (message != null) {
                        String content = message.getValue();
                        String userType = message.getProperty("user-type");
                        String priority = message.getProperty("priority");
                        
                        log.info("接收到过滤消息: content={}, user-type={}, priority={}", 
                            content, userType, priority);
                        
                        consumer.acknowledge(message);
                        receivedCount++;
                    }
                } catch (PulsarClientException e) {
                    log.error("接收消息失败: {}", e.getMessage());
                    break;
                }
            }
            
            producer.close();
            consumer.close();
        } finally {
            client.close();
        }
    }
}

2.3.3 死信队列和重试机制

package org.apache.pulsar.examples;

import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;

/**
 * 死信队列和重试机制示例
 * 演示如何配置死信队列处理消息处理失败的情况
 */
public class DeadLetterQueueExample {
    private static final Logger log = LoggerFactory.getLogger(DeadLetterQueueExample.class);
    
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        
        try {
            // 创建配置了死信队列的消费者
            Consumer<String> consumer = client.newConsumer(Schema.STRING)
                    .topic("retry-topic")
                    .subscriptionName("retry-subscription")
                    .subscriptionType(SubscriptionType.Shared)
                    // 配置死信队列策略
                    .deadLetterPolicy(DeadLetterPolicy.builder()
                            .maxRedeliverCount(3)  // 最大重试次数
                            .deadLetterTopic("retry-topic-dlq")  // 死信队列主题
                            .retryLetterTopic("retry-topic-retry")  // 重试队列主题
                            .build())
                    // 设置确认超时时间
                    .ackTimeout(10, TimeUnit.SECONDS)
                    // 设置消息监听器
                    .messageListener((consumer1, message) -> {
                        try {
                            String content = message.getValue();
                            int redeliveryCount = message.getRedeliveryCount();
                            
                            log.info("处理消息: content={}, redeliveryCount={}, messageId={}", 
                                content, redeliveryCount, message.getMessageId());
                            
                            // 模拟处理失败的情况
                            if (content.contains("error")) {
                                log.error("消息处理失败,模拟异常: {}", content);
                                throw new RuntimeException("模拟处理异常");
                            }
                            
                            // 正常处理完成
                            consumer1.acknowledge(message);
                            log.info("消息处理成功并确认: {}", content);
                            
                        } catch (Exception e) {
                            log.error("消息处理异常: {}", e.getMessage());
                            // 拒绝消息,触发重试机制
                            consumer1.negativeAcknowledge(message);
                        }
                    })
                    .subscribe();
            
            // 创建生产者发送测试消息
            Producer<String> producer = client.newProducer(Schema.STRING)
                    .topic("retry-topic")
                    .create();
            
            // 发送正常消息和错误消息
            producer.send("正常消息 1");
            producer.send("包含 error 的消息");  // 这条消息会触发重试
            producer.send("正常消息 2");
            producer.send("另一条包含 error 的消息");  // 这条消息也会触发重试
            producer.send("正常消息 3");
            
            log.info("测试消息发送完成,等待处理结果...");
            
            // 等待消息处理完成
            Thread.sleep(30000);
            
            // 创建死信队列消费者查看最终失败的消息
            Consumer<String> dlqConsumer = client.newConsumer(Schema.STRING)
                    .topic("retry-topic-dlq")
                    .subscriptionName("dlq-subscription")
                    .subscriptionType(SubscriptionType.Shared)
                    .subscribe();
            
            log.info("检查死信队列中的消息...");
            
            while (true) {
                try {
                    Message<String> dlqMessage = dlqConsumer.receive(5, TimeUnit.SECONDS);
                    if (dlqMessage != null) {
                        log.info("死信队列消息: content={}, originalMessageId={}, properties={}", 
                            dlqMessage.getValue(), 
                            dlqMessage.getProperty("REAL_TOPIC"),
                            dlqMessage.getProperties());
                        dlqConsumer.acknowledge(dlqMessage);
                    } else {
                        log.info("没有更多死信队列消息");
                        break;
                    }
                } catch (PulsarClientException e) {
                    log.info("死信队列消息接收完成");
                    break;
                }
            }
            
            producer.close();
            consumer.close();
            dlqConsumer.close();
        } finally {
            client.close();
        }
    }
}

2.4 Schema 和序列化示例

2.4.1 Avro Schema 示例

package org.apache.pulsar.examples;

import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Avro Schema 示例
 * 演示如何使用结构化数据模式进行序列化和反序列化
 */
public class AvroSchemaExample {
    private static final Logger log = LoggerFactory.getLogger(AvroSchemaExample.class);
    
    // 定义用户数据结构
    public static class User {
        private String name;
        private int age;
        private String email;
        private long timestamp;
        
        // 构造函数
        public User() {}
        
        public User(String name, int age, String email, long timestamp) {
            this.name = name;
            this.age = age;
            this.email = email;
            this.timestamp = timestamp;
        }
        
        // Getter 和 Setter 方法
        public String getName() { return name; }
        public void setName(String name) { this.name = name; }
        
        public int getAge() { return age; }
        public void setAge(int age) { this.age = age; }
        
        public String getEmail() { return email; }
        public void setEmail(String email) { this.email = email; }
        
        public long getTimestamp() { return timestamp; }
        public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
        
        @Override
        public String toString() {
            return String.format("User{name='%s', age=%d, email='%s', timestamp=%d}", 
                name, age, email, timestamp);
        }
    }
    
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        
        try {
            // 方式1:使用自动生成的 Avro Schema
            Schema<User> autoSchema = Schema.AVRO(User.class);
            
            // 方式2:手动构建 Avro Schema
            RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("User");
            recordSchemaBuilder.field("name").type(SchemaType.STRING);
            recordSchemaBuilder.field("age").type(SchemaType.INT32);
            recordSchemaBuilder.field("email").type(SchemaType.STRING);
            recordSchemaBuilder.field("timestamp").type(SchemaType.INT64);
            Schema<User> manualSchema = Schema.generic(recordSchemaBuilder.build(SchemaType.AVRO));
            
            // 使用自动生成的 Schema 创建生产者
            Producer<User> producer = client.newProducer(autoSchema)
                    .topic("avro-topic")
                    .producerName("avro-producer")
                    .create();
            
            // 创建消费者
            Consumer<User> consumer = client.newConsumer(autoSchema)
                    .topic("avro-topic")
                    .subscriptionName("avro-subscription")
                    .subscriptionType(SubscriptionType.Exclusive)
                    .subscribe();
            
            // 发送结构化数据
            for (int i = 0; i < 5; i++) {
                User user = new User(
                    "User" + i,
                    25 + i,
                    "user" + i + "@example.com",
                    System.currentTimeMillis()
                );
                
                MessageId messageId = producer.newMessage()
                        .value(user)
                        .property("user-id", String.valueOf(i))
                        .send();
                
                log.info("发送用户数据: {} -> MessageId: {}", user, messageId);
            }
            
            // 接收结构化数据
            for (int i = 0; i < 5; i++) {
                Message<User> message = consumer.receive(10, java.util.concurrent.TimeUnit.SECONDS);
                
                if (message != null) {
                    User receivedUser = message.getValue();
                    String userId = message.getProperty("user-id");
                    
                    log.info("接收到用户数据: {} -> 属性: user-id={}, MessageId: {}", 
                        receivedUser, userId, message.getMessageId());
                    
                    consumer.acknowledge(message);
                }
            }
            
            producer.close();
            consumer.close();
        } finally {
            client.close();
        }
    }
}

2.5 事务支持示例

2.5.1 事务消息示例

package org.apache.pulsar.examples;

import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;

/**
 * 事务消息示例
 * 演示如何使用事务保证消息的原子性操作
 */
public class TransactionExample {
    private static final Logger log = LoggerFactory.getLogger(TransactionExample.class);
    
    public static void main(String[] args) throws Exception {
        // 启用事务的客户端配置
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .enableTransaction(true)  // 启用事务支持
                .build();
        
        try {
            // 创建生产者
            Producer<String> producer = client.newProducer(Schema.STRING)
                    .topic("transaction-topic")
                    .producerName("transaction-producer")
                    .sendTimeout(10, TimeUnit.SECONDS)
                    .create();
            
            // 创建消费者
            Consumer<String> consumer = client.newConsumer(Schema.STRING)
                    .topic("transaction-topic")
                    .subscriptionName("transaction-subscription")
                    .subscriptionType(SubscriptionType.Exclusive)
                    .subscribe();
            
            // 示例1:成功的事务
            log.info("=== 执行成功的事务 ===");
            executeSuccessfulTransaction(client, producer);
            
            // 等待消息发送完成
            Thread.sleep(1000);
            
            // 示例2:失败的事务(回滚)
            log.info("=== 执行失败的事务(回滚) ===");
            executeFailedTransaction(client, producer);
            
            // 等待事务处理完成
            Thread.sleep(1000);
            
            // 消费消息验证事务效果
            log.info("=== 消费消息验证事务效果 ===");
            consumeMessages(consumer);
            
            producer.close();
            consumer.close();
        } finally {
            client.close();
        }
    }
    
    /**
     * 执行成功的事务
     */
    private static void executeSuccessfulTransaction(PulsarClient client, Producer<String> producer) 
            throws Exception {
        // 开始事务
        Transaction transaction = client.newTransaction()
                .withTransactionTimeout(30, TimeUnit.SECONDS)
                .build()
                .get();
        
        try {
            log.info("开始事务: {}", transaction.getTxnID());
            
            // 在事务中发送多条消息
            for (int i = 0; i < 3; i++) {
                String message = "Transaction message " + i;
                
                MessageId messageId = producer.newMessage(transaction)
                        .value(message)
                        .property("transaction-id", transaction.getTxnID().toString())
                        .property("batch", "success")
                        .send();
                
                log.info("在事务中发送消息: {} -> MessageId: {}", message, messageId);
            }
            
            // 提交事务
            transaction.commit().get();
            log.info("事务提交成功: {}", transaction.getTxnID());
            
        } catch (Exception e) {
            log.error("事务执行失败,回滚: {}", e.getMessage());
            transaction.abort().get();
        }
    }
    
    /**
     * 执行失败的事务(模拟回滚)
     */
    private static void executeFailedTransaction(PulsarClient client, Producer<String> producer) 
            throws Exception {
        // 开始事务
        Transaction transaction = client.newTransaction()
                .withTransactionTimeout(30, TimeUnit.SECONDS)
                .build()
                .get();
        
        try {
            log.info("开始事务: {}", transaction.getTxnID());
            
            // 在事务中发送消息
            for (int i = 0; i < 3; i++) {
                String message = "Failed transaction message " + i;
                
                MessageId messageId = producer.newMessage(transaction)
                        .value(message)
                        .property("transaction-id", transaction.getTxnID().toString())
                        .property("batch", "failed")
                        .send();
                
                log.info("在事务中发送消息: {} -> MessageId: {}", message, messageId);
                
                // 模拟在第2条消息后发生异常
                if (i == 1) {
                    throw new RuntimeException("模拟事务处理异常");
                }
            }
            
            // 提交事务
            transaction.commit().get();
            log.info("事务提交成功: {}", transaction.getTxnID());
            
        } catch (Exception e) {
            log.error("事务执行失败,回滚: {} -> {}", transaction.getTxnID(), e.getMessage());
            // 回滚事务
            transaction.abort().get();
            log.info("事务回滚完成: {}", transaction.getTxnID());
        }
    }
    
    /**
     * 消费消息验证事务效果
     */
    private static void consumeMessages(Consumer<String> consumer) throws Exception {
        int messageCount = 0;
        
        while (messageCount < 10) {  // 最多等待 10 条消息
            try {
                Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
                
                if (message != null) {
                    String content = message.getValue();
                    String transactionId = message.getProperty("transaction-id");
                    String batch = message.getProperty("batch");
                    
                    log.info("接收到消息: content={}, transaction-id={}, batch={}, messageId={}", 
                        content, transactionId, batch, message.getMessageId());
                    
                    consumer.acknowledge(message);
                    messageCount++;
                } else {
                    log.info("没有更多消息,退出消费");
                    break;
                }
                
            } catch (PulsarClientException e) {
                log.info("消费消息完成");
                break;
            }
        }
        
        log.info("总共接收到 {} 条消息(只有成功事务的消息会被消费)", messageCount);
    }
}

这些示例展示了 Apache Pulsar 的主要使用场景和特性。从基础的生产和消费,到高级的事务、Schema、分区路由等功能,为用户提供了全面的使用指南。