Jaeger-03-Storage-Ingester-Sampling综合模块
本文档综合介绍 Jaeger 的 Storage 存储层、Ingester 数据摄取器和 Sampling 采样策略三个核心模块。
第一部分:Storage 存储层模块
1. 模块概览
1.1 模块职责
Storage 模块是 Jaeger 的存储抽象层,定义了统一的存储接口,支持多种存储后端实现。
核心职责:
- 接口抽象:定义 SpanReader、SpanWriter、DependencyReader 等统一接口
- 多后端支持:支持 Cassandra、Elasticsearch、OpenSearch、Badger、Kafka、Memory、gRPC Remote Storage
- 工厂模式:通过 Factory 模式创建存储实例
- 版本适配:支持 V1 和 V2 两套存储接口,提供适配器互转
支持的存储后端:
| 存储后端 | 类型 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|---|
| Cassandra | NoSQL | 生产环境、高写入 | 高可用、可扩展、写入性能强 | 查询功能有限 |
| Elasticsearch | 搜索引擎 | 生产环境、复杂查询 | 全文搜索、复杂标签过滤 | 写入性能较弱 |
| OpenSearch | 搜索引擎 | 生产环境、ES 替代 | 同 ES,开源友好 | 同 ES |
| Badger | 嵌入式 KV | 单机部署、测试 | 无外部依赖、快速启动 | 不支持分布式 |
| Kafka | 消息队列 | 中间缓冲 | 解耦 Collector 和存储 | 非持久化存储 |
| Memory | 内存 | 测试、All-in-One | 零配置、快速 | 数据丢失、容量有限 |
| gRPC Remote | 远程插件 | 自定义存储 | 可扩展、隔离 | 需额外开发 |
2. 核心接口定义
2.1 SpanWriter 接口
// SpanWriter 写入 span 数据
type Writer interface {
WriteSpan(ctx context.Context, span *model.Span) error
}
// V2 版本(基于 OTLP)
type Writer interface {
WriteTraces(ctx context.Context, traces ptrace.Traces) error
}
实现说明:
- Cassandra:按 TraceID 分区,写入
traces和service_name_index表 - Elasticsearch:写入
jaeger-span-*索引,按日期轮转 - Kafka:序列化 span 并发送到 Kafka topic
2.2 SpanReader 接口
type Reader interface {
// 根据 TraceID 获取单个 trace
GetTrace(ctx context.Context, query GetTraceParameters) (*model.Trace, error)
// 查询所有服务名
GetServices(ctx context.Context) ([]string, error)
// 查询服务的所有操作名
GetOperations(ctx context.Context, query OperationQueryParameters) ([]Operation, error)
// 根据条件查询 traces
FindTraces(ctx context.Context, query *TraceQueryParameters) ([]*model.Trace, error)
// 根据条件查询 TraceIDs(不返回完整 span 数据)
FindTraceIDs(ctx context.Context, query *TraceQueryParameters) ([]model.TraceID, error)
}
查询参数结构:
type TraceQueryParameters struct {
ServiceName string // 服务名(必填)
OperationName string // 操作名(可选)
Tags map[string]string // 标签过滤(可选)
StartTimeMin time.Time // 开始时间下限
StartTimeMax time.Time // 开始时间上限
DurationMin time.Duration // 最小持续时间
DurationMax time.Duration // 最大持续时间
NumTraces int // 返回数量上限
}
2.3 DependencyReader 接口
type Reader interface {
GetDependencies(ctx context.Context, params QueryParameters) ([]model.DependencyLink, error)
}
type DependencyLink struct {
Parent string // 调用方服务名
Child string // 被调方服务名
CallCount uint64 // 调用次数
}
3. 存储后端实现详解
3.1 Cassandra 存储
表结构:
-- traces 表:存储 span 数据
CREATE TABLE traces (
trace_id blob,
span_id bigint,
span_hash bigint,
parent_id bigint,
operation_name text,
flags int,
start_time timestamp,
duration bigint,
tags blob,
logs blob,
refs blob,
process blob,
PRIMARY KEY (trace_id, span_id, span_hash)
);
-- service_name_index 表:服务名索引
CREATE TABLE service_name_index (
service_name text,
bucket int,
start_time timestamp,
trace_id blob,
PRIMARY KEY ((service_name, bucket), start_time, trace_id)
) WITH CLUSTERING ORDER BY (start_time DESC);
-- operation_names 表:操作名索引
CREATE TABLE operation_names (
service_name text,
operation_name text,
PRIMARY KEY (service_name, operation_name)
);
-- dependencies 表:依赖关系
CREATE TABLE dependencies (
ts timestamp,
ts_index bigint,
parent text,
child text,
call_count bigint,
PRIMARY KEY (ts, ts_index, parent, child)
);
写入流程:
sequenceDiagram
autonumber
participant C as Collector
participant W as CassandraWriter
participant CAS as Cassandra
C->>W: WriteSpan(span)
W->>W: 序列化 span<br/>(tags, logs, refs)
W->>W: 计算 bucket<br/>(时间分片)
par 并行写入
W->>CAS: INSERT INTO traces<br/>(trace_id, span_id, ...)
W->>CAS: INSERT INTO service_name_index<br/>(service_name, bucket, ...)
W->>CAS: INSERT INTO operation_names<br/>(service_name, operation_name)
end
CAS-->>W: ok
W-->>C: nil
查询流程(FindTraces):
sequenceDiagram
autonumber
participant Q as Query
participant R as CassandraReader
participant CAS as Cassandra
Q->>R: FindTraces(query)
R->>R: 构造查询条件<br/>(service, time range)
R->>CAS: SELECT trace_id FROM service_name_index<br/>WHERE service_name=? AND bucket=? AND start_time BETWEEN ? AND ?
CAS-->>R: []trace_id (100 条)
loop 每个 trace_id
R->>CAS: SELECT * FROM traces<br/>WHERE trace_id=?
CAS-->>R: []span
R->>R: 聚合 spans → Trace
end
R-->>Q: []Trace
配置参数:
jaeger-collector \
--span-storage.type=cassandra \
--cassandra.servers=cassandra1:9042,cassandra2:9042 \
--cassandra.keyspace=jaeger_v1_dc1 \
--cassandra.local-dc=dc1 \
--cassandra.connections-per-host=10 \
--cassandra.span-store.ttl=172800 # 2 天
3.2 Elasticsearch 存储
索引结构:
// jaeger-span-2024-01-01 索引
{
"mappings": {
"properties": {
"traceID": { "type": "keyword" },
"spanID": { "type": "keyword" },
"operationName": { "type": "keyword" },
"process": {
"properties": {
"serviceName": { "type": "keyword" },
"tags": {
"type": "nested",
"properties": {
"key": { "type": "keyword" },
"value": { "type": "keyword" }
}
}
}
},
"startTime": { "type": "date", "format": "epoch_micros" },
"duration": { "type": "long" },
"tags": {
"type": "nested",
"properties": {
"key": { "type": "keyword" },
"value": { "type": "keyword" }
}
},
"references": {
"type": "nested"
}
}
}
}
写入流程:
func (w *SpanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
// 1. 序列化 span 为 JSON
spanJSON, err := json.Marshal(esSpanMarshaller{span})
if err != nil {
return err
}
// 2. 计算索引名(按日期轮转)
indexName := w.indexNameProvider.IndexName(span.StartTime)
// 3. 写入 ES
_, err = w.client.Index(
indexName,
bytes.NewReader(spanJSON),
w.client.Index.WithContext(ctx),
)
return err
}
查询流程(FindTraces):
// ES 查询 DSL
{
"query": {
"bool": {
"must": [
{ "term": { "process.serviceName": "frontend" } },
{ "range": { "startTime": { "gte": 1609459200000000, "lte": 1609545600000000 } } }
],
"filter": [
{ "nested": {
"path": "tags",
"query": { "bool": { "must": [
{ "term": { "tags.key": "http.status_code" } },
{ "term": { "tags.value": "200" } }
]}}
}}
]
}
},
"aggs": {
"traceIDs": {
"terms": { "field": "traceID", "size": 100 }
}
}
}
配置参数:
jaeger-collector \
--span-storage.type=elasticsearch \
--es.server-urls=http://es1:9200,http://es2:9200 \
--es.index-prefix=jaeger \
--es.num-shards=5 \
--es.num-replicas=1 \
--es.bulk.size=5000000 # 5MB
3.3 Badger 存储
存储结构:
Badger 是嵌入式 KV 数据库,存储结构:
Key: Value:
trace:{traceID} → []byte (serialized Trace)
service:{serviceName} → []string (list of operations)
index:{service}:{time}:{traceID} → nil (索引)
写入流程:
func (w *SpanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
// 1. 序列化 span
spanBytes, err := proto.Marshal(span)
if err != nil {
return err
}
// 2. 写入 Badger
return w.db.Update(func(txn *badger.Txn) error {
// 写入 trace 数据
key := makeTraceKey(span.TraceID, span.SpanID)
err := txn.Set(key, spanBytes)
if err != nil {
return err
}
// 写入服务名索引
indexKey := makeIndexKey(span.Process.ServiceName, span.StartTime, span.TraceID)
return txn.Set(indexKey, nil)
})
}
配置参数:
jaeger-all-in-one \
--span-storage.type=badger \
--badger.ephemeral=false \
--badger.directory-key=/data/jaeger/keys \
--badger.directory-value=/data/jaeger/values \
--badger.span-store-ttl=72h
4. 存储工厂模式
Factory 接口:
type Factory interface {
// 初始化存储(连接数据库)
Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error
// 创建 SpanReader
CreateSpanReader() (spanstore.Reader, error)
// 创建 SpanWriter
CreateSpanWriter() (spanstore.Writer, error)
// 创建 DependencyReader
CreateDependencyReader() (dependencystore.Reader, error)
// 关闭连接
Close() error
}
使用示例:
// main.go
func main() {
// 1. 创建 Storage Factory
storageFactory, err := storage.NewFactory(storage.Config{
SpanStorageType: "cassandra",
})
if err != nil {
log.Fatal(err)
}
// 2. 初始化
err = storageFactory.Initialize(metricsFactory, logger)
if err != nil {
log.Fatal(err)
}
defer storageFactory.Close()
// 3. 创建 SpanWriter
spanWriter, err := storageFactory.CreateSpanWriter()
if err != nil {
log.Fatal(err)
}
// 4. 写入 span
err = spanWriter.WriteSpan(ctx, span)
}
5. 数据结构 UML 图
classDiagram
class Factory {
<<interface>>
+Initialize(metrics, logger) error
+CreateSpanReader() (Reader, error)
+CreateSpanWriter() (Writer, error)
+CreateDependencyReader() (Reader, error)
+Close() error
}
class CassandraFactory {
-config Config
-session *gocql.Session
+Initialize() error
+CreateSpanWriter() (Writer, error)
+CreateSpanReader() (Reader, error)
}
class ElasticsearchFactory {
-config Config
-client *elasticsearch.Client
+Initialize() error
+CreateSpanWriter() (Writer, error)
+CreateSpanReader() (Reader, error)
}
class SpanWriter {
<<interface>>
+WriteSpan(ctx, span) error
}
class SpanReader {
<<interface>>
+GetTrace(ctx, traceID) (*Trace, error)
+FindTraces(ctx, query) ([]*Trace, error)
+GetServices(ctx) ([]string, error)
+GetOperations(ctx, query) ([]Operation, error)
}
Factory <|.. CassandraFactory : implements
Factory <|.. ElasticsearchFactory : implements
CassandraFactory --> SpanWriter : creates
CassandraFactory --> SpanReader : creates
ElasticsearchFactory --> SpanWriter : creates
ElasticsearchFactory --> SpanReader : creates
第二部分:Ingester 数据摄取器模块
1. 模块概览
1.1 模块职责
Ingester 模块从 Kafka 消费 span 消息并批量写入存储,解耦 Collector 和存储层。
核心职责:
- Kafka 消费:订阅 Kafka topic,消费 span 消息
- 批量写入:聚合多个 span 批量写入存储,提高吞吐量
- 容错重试:消费失败自动重试,保证至少一次消费(At-Least-Once)
- 并行处理:支持多个 partition 并行消费
部署架构:
flowchart LR
COL1["Collector 1"] -->|"Produce"| KAFKA["Kafka<br/>Topic: jaeger-spans"]
COL2["Collector 2"] -->|"Produce"| KAFKA
COL3["Collector N"] -->|"Produce"| KAFKA
KAFKA -->|"Partition 0"| ING1["Ingester 1<br/>Consumer"]
KAFKA -->|"Partition 1"| ING2["Ingester 2<br/>Consumer"]
KAFKA -->|"Partition 2"| ING3["Ingester N<br/>Consumer"]
ING1 -->|"批量写入"| STORAGE["Storage<br/>(Cassandra/ES)"]
ING2 -->|"批量写入"| STORAGE
ING3 -->|"批量写入"| STORAGE
style KAFKA fill:#fff9c4
style STORAGE fill:#e8f5e9
2. 核心流程
2.1 Kafka Consumer 配置
type Options struct {
Brokers []string // Kafka broker 地址
Topic string // Topic 名称(如 "jaeger-spans")
GroupID string // Consumer Group ID
Parallelism int // 并行消费的 partition 数
DeadlockInterval time.Duration // 死锁检测间隔
ProtobufEncoding bool // 是否使用 Protobuf 编码
}
2.2 消费流程
sequenceDiagram
autonumber
participant K as Kafka<br/>(Partition 0)
participant C as Kafka Consumer
participant I as Ingester
participant W as SpanWriter
participant S as Storage
loop 持续消费
K->>C: Poll Messages<br/>(batch size: 100)
C-->>I: []Message
loop 每条消息
I->>I: 反序列化 Span
I->>I: 添加到批次缓冲
end
alt 批次满 or 超时
I->>W: WriteBatch(spans)
loop 每个 Span
W->>S: WriteSpan(span)
S-->>W: ok / error
end
W-->>I: ok / error
alt 写入成功
I->>C: Commit Offset
C->>K: 提交消费位移
else 写入失败
I->>I: 重试(延迟后重新消费)
end
end
end
2.3 核心代码
// Ingester 主循环
func (c *Consumer) Start() {
for i := 0; i < c.options.Parallelism; i++ {
go c.consumePartition(i)
}
}
func (c *Consumer) consumePartition(partitionID int) {
for {
select {
case <-c.stopCh:
return
default:
// 1. 从 Kafka 拉取消息
messages, err := c.kafkaConsumer.Poll(100*time.Millisecond, 100)
if err != nil {
c.logger.Error("Failed to poll Kafka", zap.Error(err))
continue
}
// 2. 反序列化 span
spans := make([]*model.Span, 0, len(messages))
for _, msg := range messages {
span, err := c.unmarshaller.Unmarshal(msg.Value)
if err != nil {
c.logger.Error("Failed to unmarshal span", zap.Error(err))
continue
}
spans = append(spans, span)
}
// 3. 批量写入存储
if len(spans) > 0 {
err = c.spanWriter.WriteBatch(spans)
if err != nil {
c.logger.Error("Failed to write spans", zap.Error(err))
// 不提交 offset,下次重新消费
continue
}
}
// 4. 提交 offset
err = c.kafkaConsumer.CommitMessages(ctx, messages...)
if err != nil {
c.logger.Error("Failed to commit offset", zap.Error(err))
}
}
}
}
3. 配置参数
| 配置项 | 默认值 | 说明 |
|---|---|---|
--kafka.consumer.brokers |
localhost:9092 | Kafka broker 地址 |
--kafka.consumer.topic |
jaeger-spans | Kafka topic 名称 |
--kafka.consumer.group-id |
jaeger-ingester | Consumer Group ID |
--ingester.parallelism |
1000 | 并行消费的 goroutine 数 |
--ingester.deadlockInterval |
1m | 死锁检测间隔 |
--kafka.consumer.encoding |
protobuf | 编码格式(protobuf/json) |
4. 监控指标
| 指标名 | 类型 | 说明 |
|---|---|---|
jaeger_ingester_messages_total |
Counter | 消费的消息总数 |
jaeger_ingester_batches_total |
Counter | 批量写入的批次数 |
jaeger_ingester_errors_total |
Counter | 消费错误总数 |
jaeger_ingester_lag |
Gauge | Kafka Consumer 消费延迟 |
第三部分:Sampling 采样策略模块
1. 模块概览
1.1 模块职责
Sampling 模块管理和提供采样策略,包括静态文件策略和自适应采样。
核心职责:
- 策略管理:加载和存储采样策略(文件或自适应计算)
- 策略下发:向客户端 SDK 提供采样策略(gRPC/HTTP 接口)
- 自适应采样:根据流量动态调整采样概率,控制数据量
- 吞吐量统计:Collector 统计 root span 吞吐量
2. 采样策略类型
2.1 概率采样(Probabilistic Sampling)
原理:
按固定概率采样,如 0.01 表示 1% 的 trace 被采样。
配置示例(sampling_strategies.json):
{
"default_strategy": {
"type": "probabilistic",
"param": 0.001
},
"service_strategies": [
{
"service": "frontend",
"type": "probabilistic",
"param": 0.1
},
{
"service": "backend",
"type": "probabilistic",
"param": 0.01,
"operation_strategies": [
{
"operation": "POST /api/orders",
"type": "probabilistic",
"param": 1.0
}
]
}
]
}
说明:
default_strategy:默认策略(0.1% 采样)service_strategies:服务级策略(覆盖默认)operation_strategies:操作级策略(覆盖服务级)
2.2 速率限制采样(Rate Limiting Sampling)
原理:
限制每秒最多采样的 trace 数量。
配置示例:
{
"default_strategy": {
"type": "ratelimiting",
"param": 10
}
}
说明:
param:每秒最多 10 个 trace
2.3 自适应采样(Adaptive Sampling)
原理:
根据服务的实际吞吐量动态调整采样概率,确保采样后的 trace 数量接近目标 QPS。
算法:
目标采样概率 = 目标 QPS / 当前吞吐量
示例:
目标 QPS: 10 traces/second
当前吞吐量: 1000 traces/second
采样概率 = 10 / 1000 = 0.01 (1%)
配置参数:
jaeger-collector \
--sampling.strategies-file="" \ # 禁用文件策略
--sampling.type=adaptive \
--sampling.initial-sampling-probability=0.001 \ # 初始概率
--sampling.target-samples-per-second=1.0 \ # 目标 QPS
--sampling.aggregation-buckets=10 # 聚合桶数
3. 自适应采样工作流程
flowchart TB
subgraph Collector["Collector 实例"]
ROOT["Root Span<br/>识别"]
AGG["Aggregator<br/>(统计吞吐量)"]
end
subgraph SamplingStore["Sampling Store"]
THROUGHPUT["吞吐量表<br/>(service+operation → count)"]
PROBABILITY["概率表<br/>(service+operation → probability)"]
end
subgraph PostAgg["Post-aggregator<br/>(后台任务)"]
CALC["计算目标概率"]
end
subgraph SDK["Client SDK"]
POLL["定期拉取<br/>采样策略"]
end
ROOT -->|"是 root span"| AGG
AGG -->|"定期刷新<br/>(1 分钟)"| THROUGHPUT
THROUGHPUT -->|"读取"| CALC
CALC -->|"计算并写入"| PROBABILITY
PROBABILITY -->|"查询"| SDK
SDK -->|"应用策略"| ROOT
style THROUGHPUT fill:#fff9c4
style PROBABILITY fill:#e1f5ff
详细流程:
-
Aggregator 统计:
- Collector 识别 root span(没有 parent span 的 span)
- 按 (service, operation) 分组统计吞吐量
- 定期(如 1 分钟)将统计数据写入 Sampling Store
-
Post-aggregator 计算:
- 后台任务定期(如 1 分钟)从 Sampling Store 读取所有服务的吞吐量
- 按算法计算目标采样概率
- 将计算结果写入 Sampling Store
-
SDK 拉取策略:
- SDK 定期(如 5 分钟)从 Collector/Query 拉取采样策略
- 应用到新 trace 的采样决策
4. 采样策略 API
4.1 GetSamplingStrategy (gRPC)
请求:
message GetSamplingStrategyRequest {
string service_name = 1;
}
响应:
message SamplingStrategyResponse {
enum StrategyType {
PROBABILISTIC = 0;
RATE_LIMITING = 1;
}
StrategyType strategy_type = 1;
ProbabilisticSamplingStrategy probabilistic_sampling = 2;
RateLimitingSamplingStrategy rate_limiting_sampling = 3;
PerOperationSamplingStrategies operation_sampling = 4;
}
message ProbabilisticSamplingStrategy {
double sampling_rate = 1; // 0.0 - 1.0
}
message RateLimitingSamplingStrategy {
int32 max_traces_per_second = 1;
}
示例响应:
{
"strategyType": "PROBABILISTIC",
"probabilisticSampling": {
"samplingRate": 0.01
},
"operationSampling": {
"defaultSamplingProbability": 0.01,
"perOperationStrategies": [
{
"operation": "POST /api/orders",
"probabilisticSampling": {
"samplingRate": 1.0
}
}
]
}
}
5. 数据结构 UML 图
classDiagram
class Provider {
<<interface>>
+GetSamplingStrategy(ctx, service) (*SamplingStrategyResponse, error)
}
class FileProvider {
-strategies map~string,Strategy~
+GetSamplingStrategy() (*Response, error)
-loadFile()
}
class AdaptiveProvider {
-samplingStore Store
-calculator Calculator
+GetSamplingStrategy() (*Response, error)
}
class Aggregator {
<<interface>>
+HandleRootSpan(span)
+Start()
+Close()
}
class SamplingAggregator {
-throughputBuckets map~string,int~
-samplingStore Store
+HandleRootSpan(span)
-flushThroughput()
}
class PostAggregator {
-samplingStore Store
-targetQPS float64
+Start()
-calculateProbabilities()
}
Provider <|.. FileProvider : implements
Provider <|.. AdaptiveProvider : implements
Aggregator <|.. SamplingAggregator : implements
AdaptiveProvider --> PostAggregator : uses
第四部分:最佳实践与调优
1. Storage 选择指南
| 场景 | 推荐存储 | 理由 |
|---|---|---|
| 高写入吞吐(> 10K spans/s) | Cassandra + Kafka | 写入性能强、可扩展 |
| 复杂查询(标签过滤) | Elasticsearch | 全文搜索、灵活查询 |
| 低成本、中等规模 | OpenSearch | 同 ES,开源无授权费 |
| 单机部署、测试 | Badger | 无外部依赖、快速启动 |
| 云原生、无运维 | gRPC Remote Storage | 托管存储服务 |
2. Ingester 部署建议
高吞吐场景:
jaeger-ingester \
--kafka.consumer.brokers=kafka1:9092,kafka2:9092,kafka3:9092 \
--ingester.parallelism=100 \
--kafka.consumer.group-id=jaeger-ingester-group
多实例部署:
- Ingester 实例数 = Kafka partition 数
- 同一 Consumer Group 内的实例自动分配 partition
- 每个实例独立消费,互不干扰
3. Sampling 策略建议
生产环境:
{
"default_strategy": {
"type": "probabilistic",
"param": 0.001 // 默认 0.1%
},
"service_strategies": [
{
"service": "critical-service",
"type": "probabilistic",
"param": 1.0 // 关键服务 100% 采样
},
{
"service": "high-traffic-service",
"type": "adaptive", // 高流量服务使用自适应
"target_samples_per_second": 10
}
]
}
监控告警:
groups:
- name: jaeger_sampling
rules:
- alert: SamplingRateTooLow
expr: jaeger_collector_sampling_probability < 0.0001
for: 10m
annotations:
summary: "Sampling rate for {{ $labels.service }} is too low: {{ $value }}"
5. 故障排查
问题 1:Ingester 消费延迟高
排查:
# 查看 Kafka Consumer Lag
kafka-consumer-groups --bootstrap-server kafka:9092 --group jaeger-ingester-group --describe
# 查看 Ingester 指标
curl http://ingester:14269/metrics | grep ingester_lag
解决:
- 增加 Ingester 实例数(匹配 Kafka partition 数)
- 增加 Kafka partition 数(需重新分区)
- 优化存储写入性能
问题 2:自适应采样不生效
排查:
# 查看 Sampling Store 中的概率
# (Cassandra 示例)
SELECT * FROM sampling_probabilities WHERE service_name='frontend';
# 查看 Aggregator 指标
curl http://collector:14269/metrics | grep sampling_aggregator
解决:
- 确认 Collector 配置了 Aggregator
- 确认 Post-aggregator 后台任务正常运行
- 检查 Sampling Store 是否可写
6. 总结
本文档综合介绍了 Jaeger 的三个核心支撑模块:
- Storage 存储层:提供统一接口,支持 7+ 种存储后端
- Ingester:从 Kafka 消费 span,解耦 Collector 和存储
- Sampling:管理采样策略,支持文件策略和自适应采样
关键要点:
- Storage 通过 Factory 模式支持多种后端,生产环境推荐 Cassandra 或 Elasticsearch
- Ingester 适用于高吞吐场景,通过 Kafka 缓冲流量峰值
- Sampling 的自适应策略动态调整采样率,平衡数据量和成本