Jaeger-03-Storage-Ingester-Sampling综合模块

本文档综合介绍 Jaeger 的 Storage 存储层、Ingester 数据摄取器和 Sampling 采样策略三个核心模块。


第一部分:Storage 存储层模块

1. 模块概览

1.1 模块职责

Storage 模块是 Jaeger 的存储抽象层,定义了统一的存储接口,支持多种存储后端实现。

核心职责:

  • 接口抽象:定义 SpanReader、SpanWriter、DependencyReader 等统一接口
  • 多后端支持:支持 Cassandra、Elasticsearch、OpenSearch、Badger、Kafka、Memory、gRPC Remote Storage
  • 工厂模式:通过 Factory 模式创建存储实例
  • 版本适配:支持 V1 和 V2 两套存储接口,提供适配器互转

支持的存储后端:

存储后端 类型 适用场景 优势 劣势
Cassandra NoSQL 生产环境、高写入 高可用、可扩展、写入性能强 查询功能有限
Elasticsearch 搜索引擎 生产环境、复杂查询 全文搜索、复杂标签过滤 写入性能较弱
OpenSearch 搜索引擎 生产环境、ES 替代 同 ES,开源友好 同 ES
Badger 嵌入式 KV 单机部署、测试 无外部依赖、快速启动 不支持分布式
Kafka 消息队列 中间缓冲 解耦 Collector 和存储 非持久化存储
Memory 内存 测试、All-in-One 零配置、快速 数据丢失、容量有限
gRPC Remote 远程插件 自定义存储 可扩展、隔离 需额外开发

2. 核心接口定义

2.1 SpanWriter 接口

// SpanWriter 写入 span 数据
type Writer interface {
    WriteSpan(ctx context.Context, span *model.Span) error
}

// V2 版本(基于 OTLP)
type Writer interface {
    WriteTraces(ctx context.Context, traces ptrace.Traces) error
}

实现说明:

  • Cassandra:按 TraceID 分区,写入 tracesservice_name_index
  • Elasticsearch:写入 jaeger-span-* 索引,按日期轮转
  • Kafka:序列化 span 并发送到 Kafka topic

2.2 SpanReader 接口

type Reader interface {
    // 根据 TraceID 获取单个 trace
    GetTrace(ctx context.Context, query GetTraceParameters) (*model.Trace, error)

    // 查询所有服务名
    GetServices(ctx context.Context) ([]string, error)

    // 查询服务的所有操作名
    GetOperations(ctx context.Context, query OperationQueryParameters) ([]Operation, error)

    // 根据条件查询 traces
    FindTraces(ctx context.Context, query *TraceQueryParameters) ([]*model.Trace, error)

    // 根据条件查询 TraceIDs(不返回完整 span 数据)
    FindTraceIDs(ctx context.Context, query *TraceQueryParameters) ([]model.TraceID, error)
}

查询参数结构:

type TraceQueryParameters struct {
    ServiceName   string            // 服务名(必填)
    OperationName string            // 操作名(可选)
    Tags          map[string]string // 标签过滤(可选)
    StartTimeMin  time.Time         // 开始时间下限
    StartTimeMax  time.Time         // 开始时间上限
    DurationMin   time.Duration     // 最小持续时间
    DurationMax   time.Duration     // 最大持续时间
    NumTraces     int               // 返回数量上限
}

2.3 DependencyReader 接口

type Reader interface {
    GetDependencies(ctx context.Context, params QueryParameters) ([]model.DependencyLink, error)
}

type DependencyLink struct {
    Parent    string  // 调用方服务名
    Child     string  // 被调方服务名
    CallCount uint64  // 调用次数
}

3. 存储后端实现详解

3.1 Cassandra 存储

表结构:

-- traces 表:存储 span 数据
CREATE TABLE traces (
    trace_id blob,
    span_id bigint,
    span_hash bigint,
    parent_id bigint,
    operation_name text,
    flags int,
    start_time timestamp,
    duration bigint,
    tags blob,
    logs blob,
    refs blob,
    process blob,
    PRIMARY KEY (trace_id, span_id, span_hash)
);

-- service_name_index 表:服务名索引
CREATE TABLE service_name_index (
    service_name text,
    bucket int,
    start_time timestamp,
    trace_id blob,
    PRIMARY KEY ((service_name, bucket), start_time, trace_id)
) WITH CLUSTERING ORDER BY (start_time DESC);

-- operation_names 表:操作名索引
CREATE TABLE operation_names (
    service_name text,
    operation_name text,
    PRIMARY KEY (service_name, operation_name)
);

-- dependencies 表:依赖关系
CREATE TABLE dependencies (
    ts timestamp,
    ts_index bigint,
    parent text,
    child text,
    call_count bigint,
    PRIMARY KEY (ts, ts_index, parent, child)
);

写入流程:

sequenceDiagram
    autonumber
    participant C as Collector
    participant W as CassandraWriter
    participant CAS as Cassandra
    
    C->>W: WriteSpan(span)
    W->>W: 序列化 span<br/>(tags, logs, refs)
    W->>W: 计算 bucket<br/>(时间分片)
    
    par 并行写入
        W->>CAS: INSERT INTO traces<br/>(trace_id, span_id, ...)
        W->>CAS: INSERT INTO service_name_index<br/>(service_name, bucket, ...)
        W->>CAS: INSERT INTO operation_names<br/>(service_name, operation_name)
    end
    
    CAS-->>W: ok
    W-->>C: nil

查询流程(FindTraces):

sequenceDiagram
    autonumber
    participant Q as Query
    participant R as CassandraReader
    participant CAS as Cassandra
    
    Q->>R: FindTraces(query)
    R->>R: 构造查询条件<br/>(service, time range)
    R->>CAS: SELECT trace_id FROM service_name_index<br/>WHERE service_name=? AND bucket=? AND start_time BETWEEN ? AND ?
    CAS-->>R: []trace_id (100 条)
    
    loop 每个 trace_id
        R->>CAS: SELECT * FROM traces<br/>WHERE trace_id=?
        CAS-->>R: []span
        R->>R: 聚合 spans → Trace
    end
    
    R-->>Q: []Trace

配置参数:

jaeger-collector \
  --span-storage.type=cassandra \
  --cassandra.servers=cassandra1:9042,cassandra2:9042 \
  --cassandra.keyspace=jaeger_v1_dc1 \
  --cassandra.local-dc=dc1 \
  --cassandra.connections-per-host=10 \
  --cassandra.span-store.ttl=172800  # 2 天

3.2 Elasticsearch 存储

索引结构:

// jaeger-span-2024-01-01 索引
{
  "mappings": {
    "properties": {
      "traceID": { "type": "keyword" },
      "spanID": { "type": "keyword" },
      "operationName": { "type": "keyword" },
      "process": {
        "properties": {
          "serviceName": { "type": "keyword" },
          "tags": {
            "type": "nested",
            "properties": {
              "key": { "type": "keyword" },
              "value": { "type": "keyword" }
            }
          }
        }
      },
      "startTime": { "type": "date", "format": "epoch_micros" },
      "duration": { "type": "long" },
      "tags": {
        "type": "nested",
        "properties": {
          "key": { "type": "keyword" },
          "value": { "type": "keyword" }
        }
      },
      "references": {
        "type": "nested"
      }
    }
  }
}

写入流程:

func (w *SpanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
    // 1. 序列化 span 为 JSON
    spanJSON, err := json.Marshal(esSpanMarshaller{span})
    if err != nil {
        return err
    }

    // 2. 计算索引名(按日期轮转)
    indexName := w.indexNameProvider.IndexName(span.StartTime)

    // 3. 写入 ES
    _, err = w.client.Index(
        indexName,
        bytes.NewReader(spanJSON),
        w.client.Index.WithContext(ctx),
    )

    return err
}

查询流程(FindTraces):

// ES 查询 DSL
{
  "query": {
    "bool": {
      "must": [
        { "term": { "process.serviceName": "frontend" } },
        { "range": { "startTime": { "gte": 1609459200000000, "lte": 1609545600000000 } } }
      ],
      "filter": [
        { "nested": {
            "path": "tags",
            "query": { "bool": { "must": [
              { "term": { "tags.key": "http.status_code" } },
              { "term": { "tags.value": "200" } }
            ]}}
        }}
      ]
    }
  },
  "aggs": {
    "traceIDs": {
      "terms": { "field": "traceID", "size": 100 }
    }
  }
}

配置参数:

jaeger-collector \
  --span-storage.type=elasticsearch \
  --es.server-urls=http://es1:9200,http://es2:9200 \
  --es.index-prefix=jaeger \
  --es.num-shards=5 \
  --es.num-replicas=1 \
  --es.bulk.size=5000000  # 5MB

3.3 Badger 存储

存储结构:

Badger 是嵌入式 KV 数据库,存储结构:

Key:                     Value:
trace:{traceID}       → []byte (serialized Trace)
service:{serviceName} → []string (list of operations)
index:{service}:{time}:{traceID} → nil (索引)

写入流程:

func (w *SpanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
    // 1. 序列化 span
    spanBytes, err := proto.Marshal(span)
    if err != nil {
        return err
    }

    // 2. 写入 Badger
    return w.db.Update(func(txn *badger.Txn) error {
        // 写入 trace 数据
        key := makeTraceKey(span.TraceID, span.SpanID)
        err := txn.Set(key, spanBytes)
        if err != nil {
            return err
        }

        // 写入服务名索引
        indexKey := makeIndexKey(span.Process.ServiceName, span.StartTime, span.TraceID)
        return txn.Set(indexKey, nil)
    })
}

配置参数:

jaeger-all-in-one \
  --span-storage.type=badger \
  --badger.ephemeral=false \
  --badger.directory-key=/data/jaeger/keys \
  --badger.directory-value=/data/jaeger/values \
  --badger.span-store-ttl=72h

4. 存储工厂模式

Factory 接口:

type Factory interface {
    // 初始化存储(连接数据库)
    Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error

    // 创建 SpanReader
    CreateSpanReader() (spanstore.Reader, error)

    // 创建 SpanWriter
    CreateSpanWriter() (spanstore.Writer, error)

    // 创建 DependencyReader
    CreateDependencyReader() (dependencystore.Reader, error)

    // 关闭连接
    Close() error
}

使用示例:

// main.go
func main() {
    // 1. 创建 Storage Factory
    storageFactory, err := storage.NewFactory(storage.Config{
        SpanStorageType: "cassandra",
    })
    if err != nil {
        log.Fatal(err)
    }

    // 2. 初始化
    err = storageFactory.Initialize(metricsFactory, logger)
    if err != nil {
        log.Fatal(err)
    }
    defer storageFactory.Close()

    // 3. 创建 SpanWriter
    spanWriter, err := storageFactory.CreateSpanWriter()
    if err != nil {
        log.Fatal(err)
    }

    // 4. 写入 span
    err = spanWriter.WriteSpan(ctx, span)
}

5. 数据结构 UML 图

classDiagram
    class Factory {
        <<interface>>
        +Initialize(metrics, logger) error
        +CreateSpanReader() (Reader, error)
        +CreateSpanWriter() (Writer, error)
        +CreateDependencyReader() (Reader, error)
        +Close() error
    }
    
    class CassandraFactory {
        -config Config
        -session *gocql.Session
        +Initialize() error
        +CreateSpanWriter() (Writer, error)
        +CreateSpanReader() (Reader, error)
    }
    
    class ElasticsearchFactory {
        -config Config
        -client *elasticsearch.Client
        +Initialize() error
        +CreateSpanWriter() (Writer, error)
        +CreateSpanReader() (Reader, error)
    }
    
    class SpanWriter {
        <<interface>>
        +WriteSpan(ctx, span) error
    }
    
    class SpanReader {
        <<interface>>
        +GetTrace(ctx, traceID) (*Trace, error)
        +FindTraces(ctx, query) ([]*Trace, error)
        +GetServices(ctx) ([]string, error)
        +GetOperations(ctx, query) ([]Operation, error)
    }
    
    Factory <|.. CassandraFactory : implements
    Factory <|.. ElasticsearchFactory : implements
    CassandraFactory --> SpanWriter : creates
    CassandraFactory --> SpanReader : creates
    ElasticsearchFactory --> SpanWriter : creates
    ElasticsearchFactory --> SpanReader : creates

第二部分:Ingester 数据摄取器模块

1. 模块概览

1.1 模块职责

Ingester 模块从 Kafka 消费 span 消息并批量写入存储,解耦 Collector 和存储层。

核心职责:

  • Kafka 消费:订阅 Kafka topic,消费 span 消息
  • 批量写入:聚合多个 span 批量写入存储,提高吞吐量
  • 容错重试:消费失败自动重试,保证至少一次消费(At-Least-Once)
  • 并行处理:支持多个 partition 并行消费

部署架构:

flowchart LR
    COL1["Collector 1"] -->|"Produce"| KAFKA["Kafka<br/>Topic: jaeger-spans"]
    COL2["Collector 2"] -->|"Produce"| KAFKA
    COL3["Collector N"] -->|"Produce"| KAFKA
    
    KAFKA -->|"Partition 0"| ING1["Ingester 1<br/>Consumer"]
    KAFKA -->|"Partition 1"| ING2["Ingester 2<br/>Consumer"]
    KAFKA -->|"Partition 2"| ING3["Ingester N<br/>Consumer"]
    
    ING1 -->|"批量写入"| STORAGE["Storage<br/>(Cassandra/ES)"]
    ING2 -->|"批量写入"| STORAGE
    ING3 -->|"批量写入"| STORAGE
    
    style KAFKA fill:#fff9c4
    style STORAGE fill:#e8f5e9

2. 核心流程

2.1 Kafka Consumer 配置

type Options struct {
    Brokers             []string      // Kafka broker 地址
    Topic               string        // Topic 名称(如 "jaeger-spans")
    GroupID             string        // Consumer Group ID
    Parallelism         int           // 并行消费的 partition 数
    DeadlockInterval    time.Duration // 死锁检测间隔
    ProtobufEncoding    bool          // 是否使用 Protobuf 编码
}

2.2 消费流程

sequenceDiagram
    autonumber
    participant K as Kafka<br/>(Partition 0)
    participant C as Kafka Consumer
    participant I as Ingester
    participant W as SpanWriter
    participant S as Storage
    
    loop 持续消费
        K->>C: Poll Messages<br/>(batch size: 100)
        C-->>I: []Message
        
        loop 每条消息
            I->>I: 反序列化 Span
            I->>I: 添加到批次缓冲
        end
        
        alt 批次满 or 超时
            I->>W: WriteBatch(spans)
            
            loop 每个 Span
                W->>S: WriteSpan(span)
                S-->>W: ok / error
            end
            
            W-->>I: ok / error
            
            alt 写入成功
                I->>C: Commit Offset
                C->>K: 提交消费位移
            else 写入失败
                I->>I: 重试(延迟后重新消费)
            end
        end
    end

2.3 核心代码

// Ingester 主循环
func (c *Consumer) Start() {
    for i := 0; i < c.options.Parallelism; i++ {
        go c.consumePartition(i)
    }
}

func (c *Consumer) consumePartition(partitionID int) {
    for {
        select {
        case <-c.stopCh:
            return
        default:
            // 1. 从 Kafka 拉取消息
            messages, err := c.kafkaConsumer.Poll(100*time.Millisecond, 100)
            if err != nil {
                c.logger.Error("Failed to poll Kafka", zap.Error(err))
                continue
            }

            // 2. 反序列化 span
            spans := make([]*model.Span, 0, len(messages))
            for _, msg := range messages {
                span, err := c.unmarshaller.Unmarshal(msg.Value)
                if err != nil {
                    c.logger.Error("Failed to unmarshal span", zap.Error(err))
                    continue
                }
                spans = append(spans, span)
            }

            // 3. 批量写入存储
            if len(spans) > 0 {
                err = c.spanWriter.WriteBatch(spans)
                if err != nil {
                    c.logger.Error("Failed to write spans", zap.Error(err))
                    // 不提交 offset,下次重新消费
                    continue
                }
            }

            // 4. 提交 offset
            err = c.kafkaConsumer.CommitMessages(ctx, messages...)
            if err != nil {
                c.logger.Error("Failed to commit offset", zap.Error(err))
            }
        }
    }
}

3. 配置参数

配置项 默认值 说明
--kafka.consumer.brokers localhost:9092 Kafka broker 地址
--kafka.consumer.topic jaeger-spans Kafka topic 名称
--kafka.consumer.group-id jaeger-ingester Consumer Group ID
--ingester.parallelism 1000 并行消费的 goroutine 数
--ingester.deadlockInterval 1m 死锁检测间隔
--kafka.consumer.encoding protobuf 编码格式(protobuf/json)

4. 监控指标

指标名 类型 说明
jaeger_ingester_messages_total Counter 消费的消息总数
jaeger_ingester_batches_total Counter 批量写入的批次数
jaeger_ingester_errors_total Counter 消费错误总数
jaeger_ingester_lag Gauge Kafka Consumer 消费延迟

第三部分:Sampling 采样策略模块

1. 模块概览

1.1 模块职责

Sampling 模块管理和提供采样策略,包括静态文件策略和自适应采样。

核心职责:

  • 策略管理:加载和存储采样策略(文件或自适应计算)
  • 策略下发:向客户端 SDK 提供采样策略(gRPC/HTTP 接口)
  • 自适应采样:根据流量动态调整采样概率,控制数据量
  • 吞吐量统计:Collector 统计 root span 吞吐量

2. 采样策略类型

2.1 概率采样(Probabilistic Sampling)

原理:

按固定概率采样,如 0.01 表示 1% 的 trace 被采样。

配置示例(sampling_strategies.json):

{
  "default_strategy": {
    "type": "probabilistic",
    "param": 0.001
  },
  "service_strategies": [
    {
      "service": "frontend",
      "type": "probabilistic",
      "param": 0.1
    },
    {
      "service": "backend",
      "type": "probabilistic",
      "param": 0.01,
      "operation_strategies": [
        {
          "operation": "POST /api/orders",
          "type": "probabilistic",
          "param": 1.0
        }
      ]
    }
  ]
}

说明:

  • default_strategy:默认策略(0.1% 采样)
  • service_strategies:服务级策略(覆盖默认)
  • operation_strategies:操作级策略(覆盖服务级)

2.2 速率限制采样(Rate Limiting Sampling)

原理:

限制每秒最多采样的 trace 数量。

配置示例:

{
  "default_strategy": {
    "type": "ratelimiting",
    "param": 10
  }
}

说明:

  • param:每秒最多 10 个 trace

2.3 自适应采样(Adaptive Sampling)

原理:

根据服务的实际吞吐量动态调整采样概率,确保采样后的 trace 数量接近目标 QPS。

算法:

目标采样概率 = 目标 QPS / 当前吞吐量

示例:
  目标 QPS: 10 traces/second
  当前吞吐量: 1000 traces/second
  采样概率 = 10 / 1000 = 0.01 (1%)

配置参数:

jaeger-collector \
  --sampling.strategies-file="" \  # 禁用文件策略
  --sampling.type=adaptive \
  --sampling.initial-sampling-probability=0.001 \  # 初始概率
  --sampling.target-samples-per-second=1.0 \       # 目标 QPS
  --sampling.aggregation-buckets=10                # 聚合桶数

3. 自适应采样工作流程

flowchart TB
    subgraph Collector["Collector 实例"]
        ROOT["Root Span<br/>识别"]
        AGG["Aggregator<br/>(统计吞吐量)"]
    end
    
    subgraph SamplingStore["Sampling Store"]
        THROUGHPUT["吞吐量表<br/>(service+operation → count)"]
        PROBABILITY["概率表<br/>(service+operation → probability)"]
    end
    
    subgraph PostAgg["Post-aggregator<br/>(后台任务)"]
        CALC["计算目标概率"]
    end
    
    subgraph SDK["Client SDK"]
        POLL["定期拉取<br/>采样策略"]
    end
    
    ROOT -->|"是 root span"| AGG
    AGG -->|"定期刷新<br/>(1 分钟)"| THROUGHPUT
    THROUGHPUT -->|"读取"| CALC
    CALC -->|"计算并写入"| PROBABILITY
    PROBABILITY -->|"查询"| SDK
    SDK -->|"应用策略"| ROOT
    
    style THROUGHPUT fill:#fff9c4
    style PROBABILITY fill:#e1f5ff

详细流程:

  1. Aggregator 统计:

    • Collector 识别 root span(没有 parent span 的 span)
    • 按 (service, operation) 分组统计吞吐量
    • 定期(如 1 分钟)将统计数据写入 Sampling Store
  2. Post-aggregator 计算:

    • 后台任务定期(如 1 分钟)从 Sampling Store 读取所有服务的吞吐量
    • 按算法计算目标采样概率
    • 将计算结果写入 Sampling Store
  3. SDK 拉取策略:

    • SDK 定期(如 5 分钟)从 Collector/Query 拉取采样策略
    • 应用到新 trace 的采样决策

4. 采样策略 API

4.1 GetSamplingStrategy (gRPC)

请求:

message GetSamplingStrategyRequest {
  string service_name = 1;
}

响应:

message SamplingStrategyResponse {
  enum StrategyType {
    PROBABILISTIC = 0;
    RATE_LIMITING = 1;
  }
  StrategyType strategy_type = 1;
  ProbabilisticSamplingStrategy probabilistic_sampling = 2;
  RateLimitingSamplingStrategy rate_limiting_sampling = 3;
  PerOperationSamplingStrategies operation_sampling = 4;
}

message ProbabilisticSamplingStrategy {
  double sampling_rate = 1;  // 0.0 - 1.0
}

message RateLimitingSamplingStrategy {
  int32 max_traces_per_second = 1;
}

示例响应:

{
  "strategyType": "PROBABILISTIC",
  "probabilisticSampling": {
    "samplingRate": 0.01
  },
  "operationSampling": {
    "defaultSamplingProbability": 0.01,
    "perOperationStrategies": [
      {
        "operation": "POST /api/orders",
        "probabilisticSampling": {
          "samplingRate": 1.0
        }
      }
    ]
  }
}

5. 数据结构 UML 图

classDiagram
    class Provider {
        <<interface>>
        +GetSamplingStrategy(ctx, service) (*SamplingStrategyResponse, error)
    }
    
    class FileProvider {
        -strategies map~string,Strategy~
        +GetSamplingStrategy() (*Response, error)
        -loadFile()
    }
    
    class AdaptiveProvider {
        -samplingStore Store
        -calculator Calculator
        +GetSamplingStrategy() (*Response, error)
    }
    
    class Aggregator {
        <<interface>>
        +HandleRootSpan(span)
        +Start()
        +Close()
    }
    
    class SamplingAggregator {
        -throughputBuckets map~string,int~
        -samplingStore Store
        +HandleRootSpan(span)
        -flushThroughput()
    }
    
    class PostAggregator {
        -samplingStore Store
        -targetQPS float64
        +Start()
        -calculateProbabilities()
    }
    
    Provider <|.. FileProvider : implements
    Provider <|.. AdaptiveProvider : implements
    Aggregator <|.. SamplingAggregator : implements
    AdaptiveProvider --> PostAggregator : uses

第四部分:最佳实践与调优

1. Storage 选择指南

场景 推荐存储 理由
高写入吞吐(> 10K spans/s) Cassandra + Kafka 写入性能强、可扩展
复杂查询(标签过滤) Elasticsearch 全文搜索、灵活查询
低成本、中等规模 OpenSearch 同 ES,开源无授权费
单机部署、测试 Badger 无外部依赖、快速启动
云原生、无运维 gRPC Remote Storage 托管存储服务

2. Ingester 部署建议

高吞吐场景:

jaeger-ingester \
  --kafka.consumer.brokers=kafka1:9092,kafka2:9092,kafka3:9092 \
  --ingester.parallelism=100 \
  --kafka.consumer.group-id=jaeger-ingester-group

多实例部署:

  • Ingester 实例数 = Kafka partition 数
  • 同一 Consumer Group 内的实例自动分配 partition
  • 每个实例独立消费,互不干扰

3. Sampling 策略建议

生产环境:

{
  "default_strategy": {
    "type": "probabilistic",
    "param": 0.001  // 默认 0.1%
  },
  "service_strategies": [
    {
      "service": "critical-service",
      "type": "probabilistic",
      "param": 1.0  // 关键服务 100% 采样
    },
    {
      "service": "high-traffic-service",
      "type": "adaptive",  // 高流量服务使用自适应
      "target_samples_per_second": 10
    }
  ]
}

监控告警:

groups:
  - name: jaeger_sampling
    rules:
      - alert: SamplingRateTooLow
        expr: jaeger_collector_sampling_probability < 0.0001
        for: 10m
        annotations:
          summary: "Sampling rate for {{ $labels.service }} is too low: {{ $value }}"

5. 故障排查

问题 1:Ingester 消费延迟高

排查:

# 查看 Kafka Consumer Lag
kafka-consumer-groups --bootstrap-server kafka:9092 --group jaeger-ingester-group --describe

# 查看 Ingester 指标
curl http://ingester:14269/metrics | grep ingester_lag

解决:

  • 增加 Ingester 实例数(匹配 Kafka partition 数)
  • 增加 Kafka partition 数(需重新分区)
  • 优化存储写入性能

问题 2:自适应采样不生效

排查:

# 查看 Sampling Store 中的概率
# (Cassandra 示例)
SELECT * FROM sampling_probabilities WHERE service_name='frontend';

# 查看 Aggregator 指标
curl http://collector:14269/metrics | grep sampling_aggregator

解决:

  • 确认 Collector 配置了 Aggregator
  • 确认 Post-aggregator 后台任务正常运行
  • 检查 Sampling Store 是否可写

6. 总结

本文档综合介绍了 Jaeger 的三个核心支撑模块:

  1. Storage 存储层:提供统一接口,支持 7+ 种存储后端
  2. Ingester:从 Kafka 消费 span,解耦 Collector 和存储
  3. Sampling:管理采样策略,支持文件策略和自适应采样

关键要点:

  • Storage 通过 Factory 模式支持多种后端,生产环境推荐 Cassandra 或 Elasticsearch
  • Ingester 适用于高吞吐场景,通过 Kafka 缓冲流量峰值
  • Sampling 的自适应策略动态调整采样率,平衡数据量和成本