1. 概述
本文档深入分析 Apache Pulsar 中的关键数据结构,通过 UML 类图和详细说明帮助读者理解各个组件之间的关系和设计模式。
2. 消息相关数据结构
2.1 Message 消息体系 UML 图
classDiagram
class Message~T~ {
<<interface>>
+getKey() String
+getValue() T
+getMessageId() MessageId
+getPublishTime() long
+getEventTime() long
+getProperties() Map~String, String~
+hasKey() boolean
+hasBase64EncodedKey() boolean
+hasOrderingKey() boolean
+getReaderSchema() Optional~Schema~T~~
}
class MessageImpl~T~ {
-msgMetadata MessageMetadata
-payload ByteBuf
-messageId MessageIdImpl
-cnx ClientCnx
-schema Schema~T~
-schemaState SchemaState
+getKey() String
+getValue() T
+getDataBuffer() ByteBuf
+getMessageBuilder() MessageMetadata
}
class MessageMetadata {
-producerName String
-sequenceId long
-publishTime long
-properties List~KeyValue~
-partitionKey String
-orderingKey ByteString
-deliverAtTime long
-eventTime long
-compressionType CompressionType
-uncompressedSize int
-numMessagesInBatch int
+getProducerName() String
+getSequenceId() long
+getPublishTime() long
+hasPartitionKey() boolean
}
class MessageId {
<<interface>>
+toByteArray() byte[]
+compareTo(MessageId) int
}
class MessageIdImpl {
-ledgerId long
-entryId long
-partitionIndex int
-batchIndex int
-batchSize int
+getLedgerId() long
+getEntryId() long
+getPartitionIndex() int
+getBatchIndex() int
+toString() String
+equals(Object) boolean
+hashCode() int
}
class TopicMessageImpl~T~ {
-topicName String
-topicPartitionName TopicName
-ownerConsumer String
+getTopicName() String
+getOwnerConsumer() String
}
Message~T~ <|-- MessageImpl~T~
Message~T~ <|-- TopicMessageImpl~T~
MessageImpl~T~ *-- MessageMetadata : contains
MessageImpl~T~ *-- MessageIdImpl : contains
MessageId <|-- MessageIdImpl
MessageImpl~T~ <-- TopicMessageImpl~T~ : wraps
2.2 Position 位置体系 UML 图
classDiagram
class Position {
<<interface>>
+compareTo(Position) int
+getNext() Position
}
class PositionImpl {
-ledgerId long
-entryId long
+getLedgerId() long
+getEntryId() long
+compareTo(Position) int
+getNext() Position
+toString() String
+create(long, long)$ PositionImpl
}
class Range~Position~ {
-lowerBound Position
-upperBound Position
+lowerEndpoint() Position
+upperEndpoint() Position
+contains(Position) boolean
+isEmpty() boolean
+span(Range) Range
}
Position <|-- PositionImpl
Range~Position~ o-- Position : bounds
3. 客户端核心数据结构
3.1 Producer 生产者体系 UML 图
classDiagram
class Producer~T~ {
<<interface>>
+getTopic() String
+getProducerName() String
+send(T) MessageId
+sendAsync(T) CompletableFuture~MessageId~
+newMessage() TypedMessageBuilder~T~
+flush() void
+close() void
+getStats() ProducerStats
+getLastSequenceId() long
+isConnected() boolean
}
class ProducerBase~T~ {
<<abstract>>
#producerCreatedFuture CompletableFuture~Producer~T~~
#conf ProducerConfigurationData
#schema Schema~T~
#interceptors ProducerInterceptors
+send(T) MessageId
+sendAsync(T) CompletableFuture~MessageId~
+newMessage() TypedMessageBuilder~T~
#internalSendAsync(Message) CompletableFuture~MessageId~
#triggerFlush() void
}
class ProducerImpl~T~ {
-client PulsarClientImpl
-topic String
-producerName String
-producerId long
-sequenceIdGenerator AtomicLong
-pendingMessages Queue~OpSendMsg~
-semaphore Semaphore
-batchMessageContainer BatchMessageContainer
-stats ProducerStatsImpl
+sendAsync(Message, SendCallback) void
-processOpSendMsg(OpSendMsg) void
-batchMessageAndSend(boolean) void
-canAddToBatch(MessageImpl) boolean
-doBatchSendAndAdd(MessageImpl, SendCallback, ByteBuf) void
}
class PartitionedProducerImpl~T~ {
-producers List~ProducerImpl~T~~
-routerPolicy MessageRouter
-topicMetadata TopicMetadata
+sendAsync(Message, SendCallback) void
-getProducer(Message) ProducerImpl~T~
-routeMessageToPartition(Message) int
}
class ProducerConfigurationData {
-topicName String
-producerName String
-sendTimeoutMs long
-blockIfQueueFull boolean
-maxPendingMessages int
-messageRoutingMode MessageRoutingMode
-hashingScheme HashingScheme
-cryptoKeyReader CryptoKeyReader
-encryptionKeys Set~String~
-compressionType CompressionType
-batchingEnabled boolean
-batchingMaxMessages int
-batchingMaxPublishDelayMicros long
-batchingMaxBytes int
-chunkingEnabled boolean
}
Producer~T~ <|-- ProducerBase~T~
ProducerBase~T~ <|-- ProducerImpl~T~
ProducerBase~T~ <|-- PartitionedProducerImpl~T~
ProducerImpl~T~ *-- ProducerConfigurationData : config
PartitionedProducerImpl~T~ o-- ProducerImpl~T~ : contains
3.2 Consumer 消费者体系 UML 图
classDiagram
class Consumer~T~ {
<<interface>>
+getTopic() String
+getSubscription() String
+receive() Message~T~
+receiveAsync() CompletableFuture~Message~T~~
+batchReceive() Messages~T~
+acknowledge(Message) void
+acknowledgeAsync(Message) CompletableFuture~Void~
+acknowledgeCumulative(Message) void
+negativeAcknowledge(Message) void
+close() void
+seek(MessageId) void
+getStats() ConsumerStats
+isConnected() boolean
}
class ConsumerBase~T~ {
<<abstract>>
#client PulsarClientImpl
#subscription String
#conf ConsumerConfigurationData
#schema Schema~T~
#interceptors ConsumerInterceptors
#unAckedMessageTracker UnAckedMessageTracker
#acknowledgementsGroupingTracker AcknowledgmentsGroupingTracker
+receive(long, TimeUnit) Message~T~
+acknowledge(Message) void
+negativeAcknowledge(Message) void
#internalReceive(long, TimeUnit) Message~T~
#internalReceiveAsync() CompletableFuture~Message~T~~
#beforeConsume(Message) Message~T~
#processMessage(Message) void
}
class ConsumerImpl~T~ {
-consumerId long
-consumerName String
-incomingMessages BlockingQueue~Message~T~~
-pendingReceives ConcurrentLinkedQueue~CompletableFuture~Message~T~~~
-availablePermits AtomicInteger
-subscription Subscription
-startMessageId MessageId
-lastDequeuedMessageId MessageId
-lastMessageIdInBroker MessageId
-duringSeek AtomicBoolean
-seekMessageId MessageId
-startMessageRollbackDurationInSec int
+internalReceive(long, TimeUnit) Message~T~
+internalReceiveAsync() CompletableFuture~Message~T~~
+increaseAvailablePermits(int) void
+messageReceived(CommandMessage, ByteBuf, ClientCnx) void
}
class MultiTopicsConsumerImpl~T~ {
-consumers ConcurrentHashMap~String, ConsumerImpl~T~~
-topicsPattern Pattern
-topics List~String~
-allTopicPartitionsNumber AtomicInteger
-pausedConsumers AtomicInteger
-sharedQueueResumeThreshold int
-maxReceiverQueueSize int
+subscribe(String) CompletableFuture~Void~
+unsubscribeAsync(String) CompletableFuture~Void~
-messageReceived(ConsumerImpl, Message) void
-receiveMessageFromConsumer(ConsumerImpl, boolean) void
}
class ConsumerConfigurationData {
-topics Set~String~
-topicsPattern Pattern
-subscriptionName String
-subscriptionType SubscriptionType
-subscriptionInitialPosition SubscriptionInitialPosition
-messageListener MessageListener~T~
-consumerEventListener ConsumerEventListener
-receiverQueueSize int
-acknowledgementsGroupTimeMicros long
-maxAcknowledgmentGroupSize int
-negativeAckRedeliveryDelayMicros long
-maxTotalReceiverQueueSizeAcrossPartitions int
-consumerName String
-ackTimeoutMillis long
-tickDurationMillis long
-priorityLevel int
-maxPendingChuckedMessage int
-cryptoKeyReader CryptoKeyReader
-readCompacted boolean
-subscriptionProperties Map~String, String~
-patternAutoDiscoveryPeriod int
-regexSubscriptionMode RegexSubscriptionMode
-deadLetterPolicy DeadLetterPolicy
-retryEnable boolean
-batchReceivePolicy BatchReceivePolicy
}
Consumer~T~ <|-- ConsumerBase~T~
ConsumerBase~T~ <|-- ConsumerImpl~T~
ConsumerBase~T~ <|-- MultiTopicsConsumerImpl~T~
ConsumerImpl~T~ *-- ConsumerConfigurationData : config
MultiTopicsConsumerImpl~T~ o-- ConsumerImpl~T~ : contains
4. Broker 核心数据结构
4.1 Topic 主题体系 UML 图
classDiagram
class Topic {
<<interface>>
+getName() String
+addProducer(Producer, CompletableFuture) void
+removeProducer(Producer) void
+subscribe(String, Consumer, boolean) CompletableFuture~Consumer~
+unsubscribe(Consumer) CompletableFuture~Void~
+publishMessage(ByteBuf, PublishCallback) void
+getStats(boolean, boolean, boolean) CompletableFuture~TopicStatsImpl~
+getInternalStats(boolean) CompletableFuture~PersistentTopicInternalStats~
+close() CompletableFuture~Void~
+isActive() boolean
+getLastPosition() Position
+isFenced() boolean
}
class AbstractTopic {
<<abstract>>
#topic String
#producers ConcurrentHashMap~String, Producer~
#brokerService BrokerService
#lock ReentrantReadWriteLock
#isFenced boolean
#topicPolicies HierarchyTopicPolicies
#lastActive long
#hasBatchMessagePublished boolean
#isEncryptionRequired boolean
#topicPublishRateLimiter PublishRateLimiter
#resourceGroupPublishLimiter ResourceGroupPublishLimiter
#bytesInCounter LongAdder
#msgInCounter LongAdder
+addProducer(Producer, CompletableFuture) void
+removeProducer(Producer) void
+checkPublishRate() boolean
+updatePublishRateLimiter() void
+incrementPublishCount(int, long) void
}
class PersistentTopic {
-ledger ManagedLedger
-subscriptions Map~String, PersistentSubscription~
-replicators Map~String, Replicator~
-shadowReplicators Map~String, Replicator~
-dispatchRateLimiter Optional~DispatchRateLimiter~
-subscribeRateLimiter Optional~SubscribeRateLimiter~
-compactor Optional~Compactor~
-topicCompactionService TopicCompactionService
+publishMessage(ByteBuf, PublishCallback) void
+subscribe(String, Consumer, boolean) CompletableFuture~Consumer~
+addComplete(Position, ByteBuf, Object) void
+addFailed(ManagedLedgerException, Object) void
-createSubscription(String, CommandSubscribe, boolean, boolean) PersistentSubscription
-checkBacklogQuota() void
}
class NonPersistentTopic {
-subscriptions ConcurrentHashMap~String, NonPersistentSubscription~
-replicators ConcurrentHashMap~String, NonPersistentReplicator~
+publishMessage(ByteBuf, PublishCallback) void
+subscribe(String, Consumer, boolean) CompletableFuture~Consumer~
-createSubscription(String, CommandSubscribe) NonPersistentSubscription
}
Topic <|-- AbstractTopic
AbstractTopic <|-- PersistentTopic
AbstractTopic <|-- NonPersistentTopic
PersistentTopic *-- ManagedLedger : contains
PersistentTopic o-- PersistentSubscription : manages
4.2 Subscription 订阅体系 UML 图
classDiagram
class Subscription {
<<interface>>
+getTopic() Topic
+getName() String
+getType() SubType
+addConsumer(Consumer) CompletableFuture~Void~
+removeConsumer(Consumer, boolean) CompletableFuture~Void~
+consumerFlow(Consumer, int) void
+acknowledgeMessage(List~Position~, AckType, Map~String, Long~) CompletableFuture~Void~
+getNumberOfEntriesInBacklog(boolean) CompletableFuture~Long~
+close() CompletableFuture~Void~
+delete() CompletableFuture~Void~
+disconnect() CompletableFuture~Void~
+getStats(GetStatsOptions) SubscriptionStatsImpl
}
class AbstractSubscription {
<<abstract>>
#topic AbstractTopic
#subName String
#cursor ManagedCursor
#IS_FENCED_UPDATER AtomicIntegerFieldUpdater
#isFenced int
#recentlyJoinedConsumers Map~Consumer, Long~
#subscriptionProperties Map~String, String~
+getName() String
+getTopic() Topic
+isCursorActive() boolean
+checkAndApplyReachedEndOfTopicOrTopicMigration(List) boolean
+getNumberOfEntriesInBacklog(boolean) CompletableFuture~Long~
#acknowledgeMessage(Position, AckType, Map) CompletableFuture~Void~
}
class PersistentSubscription {
-dispatcher Dispatcher
-subscriptionProperties ConcurrentHashMap~String, String~
-lastExpiredTimestamp long
-totalNonContiguousDeletedMessagesRange int
-subscriptionStatsUnsafeMode boolean
+addConsumer(Consumer) CompletableFuture~Void~
+removeConsumer(Consumer, boolean) CompletableFuture~Void~
+consumerFlow(Consumer, int) void
+acknowledgeMessage(List, AckType, Map) CompletableFuture~Void~
+expireMessages(int) void
+redeliverUnacknowledgedMessages(Consumer, List) void
-createDispatcher(Consumer) Dispatcher
-checkAndApplyReachedEndOfTopic() void
}
class PersistentDispatcherMultipleConsumers {
-consumerList List~Consumer~
-consumerSet ConcurrentHashMap~Consumer, Boolean~
-partitionedTopicConsumers ConcurrentHashMap~Consumer, String~
-readType ReadType
-sendInProgress AtomicBoolean
-readFailureBackoff Backoff
-totalAvailablePermits int
-messagesToReplay ConcurrentLinkedQueue~MessageId~
+addConsumer(Consumer) synchronized CompletableFuture~Void~
+removeConsumer(Consumer) synchronized CompletableFuture~Void~
+consumerFlow(Consumer, int) void
+canUnsubscribe(Consumer) boolean
-readMoreEntries() void
-dispatchMessagesToConsumers(List) void
}
class PersistentDispatcherSingleActiveConsumer {
-activeConsumer Consumer
-readOnActiveConsumerTask ScheduledFuture
+addConsumer(Consumer) CompletableFuture~Void~
+removeConsumer(Consumer) CompletableFuture~Void~
+consumerFlow(Consumer, int) void
-pickAndScheduleActiveConsumer() void
-scheduleReadOnActiveConsumer() void
-cancelPendingRead() void
}
Subscription <|-- AbstractSubscription
AbstractSubscription <|-- PersistentSubscription
PersistentSubscription *-- Dispatcher : contains
Dispatcher <|-- PersistentDispatcherMultipleConsumers
Dispatcher <|-- PersistentDispatcherSingleActiveConsumer
5. 存储相关数据结构
5.1 ManagedLedger 存储体系 UML 图
classDiagram
class ManagedLedger {
<<interface>>
+getName() String
+asyncAddEntry(ByteBuf, AddEntryCallback, Object) void
+asyncOpenCursor(String, OpenCursorCallback, Object) void
+asyncDeleteCursor(String, DeleteCursorCallback, Object) void
+getCursors() List~ManagedCursor~
+getLastConfirmedEntry() Position
+getFirstPosition() Position
+closeAsync() CompletableFuture~Void~
+deleteAsync() CompletableFuture~Void~
+terminateAsync() CompletableFuture~Position~
+isTerminated() boolean
+getEstimatedBacklogSize() long
}
class ManagedLedgerImpl {
-bookKeeper BookKeeper
-name String
-ledgerMetadata Map~String, byte[]~
-config ManagedLedgerConfig
-store MetaStore
-ledgers NavigableMap~Long, LedgerInfo~
-currentLedger LedgerHandle
-cursors ManagedCursorContainer
-activeCursors ActiveManagedCursorContainer
-entryCache EntryCache
-lastConfirmedEntry Position
-state State
-clock Clock
+asyncAddEntry(ByteBuf, AddEntryCallback, Object) void
+asyncOpenCursor(String, OpenCursorCallback, Object) void
-createLedgerAfterClosed() CompletableFuture~Void~
-rollCurrentLedgerIfFull() void
+initialize(ManagedLedgerInitializeLedgerCallback, Object) void
-scheduledExecutor OrderedScheduler
-mbean ManagedLedgerMBeanImpl
}
class ManagedCursor {
<<interface>>
+getName() String
+readEntries(int) List~Entry~
+asyncReadEntries(int, ReadEntriesCallback, Object) void
+asyncMarkDelete(Position, MarkDeleteCallback, Object) void
+asyncDelete(List, DeleteCallback, Object) void
+seek(Position, boolean) void
+rewind() void
+getReadPosition() Position
+getMarkDeletedPosition() Position
+hasMoreEntries() boolean
+getNumberOfEntries() long
+getTotalSize() long
+close() void
}
class ManagedCursorImpl {
-bookkeeper BookKeeper
-ledger ManagedLedgerImpl
-name String
-markDeletePosition Position
-readPosition Position
-lock ReadWriteLock
-state State
-lastActive long
-cursorProperties Map~String, String~
-individualDeletedMessages RangeSet~Position~
-batchDeletedIndexes RangeSet~Position~
+readEntries(int) List~Entry~
+asyncReadEntries(int, ReadEntriesCallback, Object) void
+asyncMarkDelete(Position, MarkDeleteCallback, Object) void
+seek(Position, boolean) void
+rewind() void
-persistPositionToMetaStore(Position, MarkDeleteCallback, Object) void
-setAcknowledgedPosition(Position) Position
}
class Entry {
<<interface>>
+getLedgerId() long
+getEntryId() long
+getPosition() Position
+getLength() int
+getData() ByteBuf
+getDataBuffer() ByteBuf
+release() int
+retain() Entry
+retain(int) Entry
}
class EntryImpl {
-ledgerId long
-entryId long
-data ByteBuf
+getLedgerId() long
+getEntryId() long
+getLength() int
+getData() ByteBuf
+release() int
+toString() String
+create(LedgerEntry, int)$ EntryImpl
+create(long, long, ByteBuf)$ EntryImpl
}
ManagedLedger <|-- ManagedLedgerImpl
ManagedCursor <|-- ManagedCursorImpl
Entry <|-- EntryImpl
ManagedLedgerImpl *-- ManagedCursorImpl : manages
ManagedLedgerImpl *-- EntryCache : uses
ManagedCursorImpl --> Entry : reads
6. 网络通信数据结构
6.1 网络连接体系 UML 图
classDiagram
class ClientCnx {
-channel Channel
-state State
-connectionHandler ConnectionHandler
-pendingRequests ConcurrentHashMap~Long, CompletableFuture~
-producers ConcurrentHashMap~Long, ProducerImpl~
-consumers ConcurrentHashMap~Long, ConsumerImpl~
-remoteAddress SocketAddress
-proxyToTargetBrokerAddress String
-lastDataReceivedTime long
-operationTimeoutMs long
+sendRequestWithId(ByteBuf, long) CompletableFuture~Void~
+newLookup(ByteBuf, long) CompletableFuture~LookupDataResult~
+newConsumer(Topic, Subscription, long, long, String, boolean, InitialPosition, SchemaInfo, boolean) CompletableFuture~Void~
+newProducer(String, long, String, boolean, Map, SchemaInfo, long, boolean, TxnID, long, Optional) CompletableFuture~Void~
+removeProducer(long) CompletableFuture~Void~
+removeConsumer(long) CompletableFuture~Void~
-handleResponse(ByteBuf) void
-handleActiveConsumerChange(CommandActiveConsumerChange) void
-handleMessage(CommandMessage, ByteBuf) void
}
class ServerCnx {
-service BrokerService
-cnx SocketAddress
-authState AuthenticationState
-authRole String
-authenticationData AuthenticationDataSource
-originalPrincipal String
-originalAuthData AuthenticationDataSource
-originalAuthMethod String
-producers ConcurrentHashMap~Long, Producer~
-consumers ConcurrentHashMap~Long, Consumer~
-remoteAddress SocketAddress
-authMethod String
-supportsTopicWatchers boolean
+handleConnect(CommandConnect) void
+handleSubscribe(CommandSubscribe) void
+handleProducer(CommandProducer) void
+handleSend(CommandSend, ByteBuf) void
+handleAck(CommandAck) void
+handleFlow(CommandFlow) void
+handleUnsubscribe(CommandUnsubscribe) void
+handleCloseProducer(CommandCloseProducer) void
+handleCloseConsumer(CommandCloseConsumer) void
-checkAuth(CompletableFuture, String) void
-isTopicOperationAllowed(TopicName, TopicOperation) CompletableFuture~Boolean~
}
class PulsarRequestIdGenerator {
-requestId AtomicLong
+nextId() long
}
class Commands {
+newConnect(String, String, String) ByteBuf
+newSubscribe(String, String, long, long, SubType, int, String) ByteBuf
+newProducer(String, long, String, boolean, Map) ByteBuf
+newSend(long, long, int, ChecksumType, MessageMetadata, ByteBuf) ByteBuf
+newAck(long, AckType, List, ValidationError, Map) ByteBuf
+newFlow(long, int) ByteBuf
+newCloseProducer(long, long) ByteBuf
+newCloseConsumer(long, long) ByteBuf
+parseMessageMetadata(ByteBuf) MessageMetadata
+parseBrokerEntryMetadataIfExist(ByteBuf) BrokerEntryMetadata
}
ClientCnx *-- PulsarRequestIdGenerator : uses
ServerCnx --> Commands : uses
ClientCnx --> Commands : uses
7. 配置数据结构
7.1 配置体系 UML 图
classDiagram
class ServiceConfiguration {
-clusterName String
-brokerServicePort Optional~Integer~
-brokerServicePortTls Optional~Integer~
-webServicePort Optional~Integer~
-webServicePortTls Optional~Integer~
-bindAddress String
-advertisedAddress String
-numIOThreads int
-numHttpServerThreads int
-zookeeperServers String
-configurationStoreServers String
-authenticationEnabled boolean
-authorizationEnabled boolean
-superUserRoles Set~String~
-managedLedgerDefaultEnsembleSize int
-managedLedgerDefaultWriteQuorum int
-managedLedgerDefaultAckQuorum int
-managedLedgerMaxEntriesPerLedger int
-managedLedgerMaxSizePerLedgerMB int
-loadBalancerEnabled boolean
+getClusterName() String
+getBrokerServicePort() Optional~Integer~
+isAuthenticationEnabled() boolean
+getManagedLedgerDefaultEnsembleSize() int
}
class ClientConfigurationData {
-serviceUrl String
-authPluginClassName String
-authParams String
-authParamMap Map~String, String~
-operationTimeoutMs long
-statsIntervalSeconds long
-numIoThreads int
-numListenerThreads int
-connectionsPerBroker int
-useTls boolean
-tlsTrustCertsFilePath String
-tlsAllowInsecureConnection boolean
-tlsHostnameVerificationEnable boolean
-concurrentLookupRequest int
-maxLookupRequest int
-maxLookupRedirects int
-maxNumberOfRejectedRequestPerConnection int
-keepAliveIntervalSeconds int
-connectionTimeoutMs int
-requestTimeoutMs long
+getServiceUrl() String
+getOperationTimeoutMs() long
+getNumIoThreads() int
}
class ProducerConfigurationData {
-topicName String
-producerName String
-sendTimeoutMs long
-blockIfQueueFull boolean
-maxPendingMessages int
-maxPendingMessagesAcrossPartitions int
-messageRoutingMode MessageRoutingMode
-hashingScheme HashingScheme
-compressionType CompressionType
-batchingEnabled boolean
-batchingMaxMessages int
-batchingMaxPublishDelayMicros long
-batchingMaxBytes int
-batchingPartitionSwitchFrequencyByPublishDelay int
-cryptoKeyReader CryptoKeyReader
-encryptionKeys Set~String~
-chunkingEnabled boolean
-chunkMaxMessageSize int
+getTopicName() String
+getSendTimeoutMs() long
+isBatchingEnabled() boolean
}
class ConsumerConfigurationData {
-topics Set~String~
-topicsPattern Pattern
-subscriptionName String
-subscriptionType SubscriptionType
-subscriptionInitialPosition SubscriptionInitialPosition
-messageListener MessageListener
-consumerEventListener ConsumerEventListener
-receiverQueueSize int
-acknowledgementsGroupTimeMicros long
-maxAcknowledgmentGroupSize int
-negativeAckRedeliveryDelayMicros long
-maxTotalReceiverQueueSizeAcrossPartitions int
-consumerName String
-ackTimeoutMillis long
-priorityLevel int
-cryptoKeyReader CryptoKeyReader
-readCompacted boolean
-subscriptionProperties Map~String, String~
-patternAutoDiscoveryPeriod int
-regexSubscriptionMode RegexSubscriptionMode
-deadLetterPolicy DeadLetterPolicy
-retryEnable boolean
-batchReceivePolicy BatchReceivePolicy
+getTopics() Set~String~
+getSubscriptionName() String
+getSubscriptionType() SubscriptionType
}
ServiceConfiguration --> "uses" ProducerConfigurationData : validates
ServiceConfiguration --> "uses" ConsumerConfigurationData : validates
ClientConfigurationData --> "creates" ProducerConfigurationData : factory
ClientConfigurationData --> "creates" ConsumerConfigurationData : factory
8. 统计监控数据结构
8.1 统计体系 UML 图
classDiagram
class TopicStats {
<<interface>>
+getName() String
+getPublishers() List~PublisherStatsImpl~
+getSubscriptions() Map~String, SubscriptionStatsImpl~
+getReplication() Map~String, ReplicatorStatsImpl~
+getInboundConnections() List~String~
+getOutboundConnections() List~String~
+getMsgRateIn() double
+getMsgRateOut() double
+getMsgThroughputIn() double
+getMsgThroughputOut() double
+getAverageMsgSize() double
+getStorageSize() long
+getBacklogSize() long
}
class TopicStatsImpl {
+msgRateIn double
+msgThroughputIn double
+msgRateOut double
+msgThroughputOut double
+averageMsgSize double
+storageSize long
+backlogSize long
+publishers List~PublisherStatsImpl~
+subscriptions Map~String, SubscriptionStatsImpl~
+replication Map~String, ReplicatorStatsImpl~
+deduplicationStatus String
+nonContiguousDeletedMessagesRanges int
+nonContiguousDeletedMessagesRangesSerializedSize int
+delayedMessageIndexSizeInBytes long
+bucketDelayedIndexStats BucketDelayedDeliveryTrackerStats
}
class PublisherStatsImpl {
+msgRateIn double
+msgThroughputIn double
+averageMsgSize double
+producerId long
+producerName String
+address String
+connectedSince String
+clientVersion String
+metadata Map~String, String~
+accessMode ProducerAccessMode
+chunkingEnabled boolean
}
class SubscriptionStatsImpl {
+msgRateOut double
+msgThroughputOut double
+msgBacklog long
+backlogSize long
+msgRateRedeliver double
+type String
+msgRateExpired double
+totalMsgExpired long
+lastExpiredTimestamp long
+lastMarkDeleteAdvancedTimestamp long
+lastConsumedFlowTimestamp long
+lastConsumedTimestamp long
+lastAckedTimestamp long
+consumersAfterMarkDeletePosition Map~String, String~
+nonContiguousDeletedMessagesRanges int
+nonContiguousDeletedMessagesRangesSerializedSize int
+delayedMessageIndexSizeInBytes long
+bucketDelayedIndexStats BucketDelayedDeliveryTrackerStats
+filterProcessedMsgCount long
+filterAcceptedMsgCount long
+filterRejectedMsgCount long
+filterRescheduledMsgCount long
+durable boolean
+replicated boolean
+consumers List~ConsumerStatsImpl~
}
class ConsumerStatsImpl {
+msgRateOut double
+msgThroughputOut double
+msgRateRedeliver double
+consumerName String
+availablePermits int
+unackedMessages int
+avgMessagesPerEntry double
+blockedConsumerOnUnackedMsgs boolean
+address String
+connectedSince String
+clientVersion String
+lastAckedTimestamp long
+lastConsumedTimestamp long
+keyHashRanges List~String~
+metadata Map~String, String~
}
TopicStats <|-- TopicStatsImpl
TopicStatsImpl o-- PublisherStatsImpl
TopicStatsImpl o-- SubscriptionStatsImpl
SubscriptionStatsImpl o-- ConsumerStatsImpl
9. Schema 数据结构
9.1 Schema 体系 UML 图
classDiagram
class Schema~T~ {
<<interface>>
+encode(T) byte[]
+decode(byte[]) T
+decode(byte[], byte[]) T
+getSchemaInfo() SchemaInfo
+validate(byte[]) boolean
+supportSchemaVersioning() boolean
+setSchemaInfoProvider(SchemaInfoProvider) void
+requireFetchingSchemaInfo() boolean
+configureSchemaInfo(String, String, SchemaInfo) void
+clone() Schema~T~
}
class SchemaInfo {
-name String
-schema byte[]
-type SchemaType
-timestamp long
-properties Map~String, String~
+getName() String
+getSchema() byte[]
+getType() SchemaType
+getTimestamp() long
+getProperties() Map~String, String~
+toString() String
}
class AvroSchema~T~ {
-avroSchema org.apache.avro.Schema
-schemaInfo SchemaInfo
-reader GenericDatumReader~GenericRecord~
-writer GenericDatumWriter~GenericRecord~
+encode(T) byte[]
+decode(byte[]) T
+getSchemaInfo() SchemaInfo
+validate(byte[]) boolean
+getAvroSchema() org.apache.avro.Schema
}
class JsonSchema~T~ {
-objectMapper ObjectMapper
-pojo Class~T~
-schemaInfo SchemaInfo
+encode(T) byte[]
+decode(byte[]) T
+getSchemaInfo() SchemaInfo
+validate(byte[]) boolean
}
class ProtobufSchema~T~ {
-protoMessage T
-schemaInfo SchemaInfo
+encode(T) byte[]
+decode(byte[]) T
+getSchemaInfo() SchemaInfo
+validate(byte[]) boolean
+parseFrom(byte[]) T
}
class PrimitiveSchema~T~ {
-schemaInfo SchemaInfo
+encode(T) byte[]
+decode(byte[]) T
+getSchemaInfo() SchemaInfo
+validate(byte[]) boolean
}
class KeyValueSchema~K,V~ {
-keySchema Schema~K~
-valueSchema Schema~V~
-keyValueEncodingType KeyValueEncodingType
-schemaInfo SchemaInfo
+encode(KeyValue) byte[]
+decode(byte[]) KeyValue~K,V~
+getSchemaInfo() SchemaInfo
}
Schema~T~ <|-- AvroSchema~T~
Schema~T~ <|-- JsonSchema~T~
Schema~T~ <|-- ProtobufSchema~T~
Schema~T~ <|-- PrimitiveSchema~T~
Schema~T~ <|-- KeyValueSchema~K,V~
Schema~T~ *-- SchemaInfo : contains
10. 总结
本文档通过详细的 UML 类图展示了 Apache Pulsar 中的关键数据结构和它们之间的关系。这些数据结构构成了 Pulsar 的核心架构,理解它们的设计和关系对于:
- 系统设计:帮助理解 Pulsar 的整体架构设计思路
- 性能优化:识别性能瓶颈和优化点
- 功能扩展:基于现有结构进行功能扩展和定制
- 问题排查:深入理解系统行为,快速定位问题
- 代码贡献:为 Pulsar 社区贡献代码和功能
每个数据结构都有其特定的职责和设计考虑,它们通过清晰的接口和继承关系协同工作,构建了一个高性能、可扩展的分布式消息系统。