Jaeger-01-Collector模块

1. 模块概览

1.1 模块职责

Collector 模块是 Jaeger 分布式追踪系统的数据接收中心,负责接收、验证、处理和持久化来自各种客户端的 span 数据。

核心职责:

  • 多协议接收:支持 Jaeger Thrift、Jaeger Proto、OpenTelemetry(OTLP)、Zipkin 等多种协议
  • 数据验证:对接收的 span 进行格式验证、租户验证、过滤
  • 数据处理:对 span 进行标准化、标签补充、数据清洗
  • 流量控制:通过内存队列缓冲高并发请求,防止存储被打爆
  • 持久化写入:将 span 写入配置的存储后端(Cassandra、ES、Kafka 等)
  • 采样统计:为自适应采样提供吞吐量统计(Aggregator)
  • 采样服务:向客户端 SDK 提供采样策略

输入与输出:

  • 输入:
    • gRPC:端口 14250(Jaeger Proto)
    • HTTP:端口 14268(Jaeger Thrift)、9411(Zipkin)、4318(OTLP HTTP)
    • gRPC:端口 4317(OTLP gRPC)
  • 输出:
    • 存储后端:Span 数据持久化
    • Sampling Store:吞吐量统计数据

1.2 上下游依赖

上游(调用 Collector 的组件):

  • 应用程序中的 OpenTelemetry SDK
  • 应用程序中的 Jaeger SDK
  • Zipkin SDK(兼容模式)

下游(Collector 调用的组件):

  • Storage 层:Span Writer 接口,写入 span 数据
  • Sampling Strategy Factory:采样策略管理,支持文件策略和自适应策略
  • Tenancy Manager:多租户管理,实现租户隔离

1.3 生命周期

初始化 → 启动服务器 → 接收请求 → 处理 span → 写入存储 → 优雅关闭

详细流程:

  1. 初始化阶段:

    • 加载配置参数(队列大小、工作线程数、存储类型等)
    • 初始化 Storage Factory
    • 初始化 Sampling Strategy Factory
    • 创建 Span Processor(包含队列和工作线程池)
  2. 启动阶段:

    • 启动 gRPC Server(端口 14250)
    • 启动 HTTP Server(端口 14268)
    • 启动 OTLP Receiver(可选,端口 4317/4318)
    • 启动 Zipkin Receiver(可选,端口 9411)
    • 启动队列消费者 goroutine 池
  3. 运行阶段:

    • 接收 gRPC/HTTP 请求
    • 验证租户和协议格式
    • 将 span 放入内存队列
    • 工作线程从队列消费 span
    • 对 span 进行清洗和标签补充
    • 写入 Storage
    • 更新指标(成功/失败计数)
  4. 关闭阶段:

    • 停止接受新请求
    • 等待队列中的 span 处理完成
    • 关闭 Storage 连接
    • 释放资源

1.4 模块架构图

flowchart TB
    subgraph Client["客户端"]
        SDK["OTEL SDK<br/>Jaeger SDK"]
    end
    
    subgraph Collector["Collector 模块"]
        direction TB
        
        subgraph Receivers["Receivers 层"]
            GRPC["gRPC Handler<br/>:14250"]
            HTTP["HTTP Handler<br/>:14268"]
            OTLP["OTLP Receiver<br/>:4317/:4318"]
            ZIPKIN["Zipkin Receiver<br/>:9411"]
        end
        
        subgraph Processing["Processing 层"]
            TENANT["Tenancy<br/>Validator"]
            FILTER["Span<br/>Filter"]
            QUEUE["Bounded<br/>Queue<br/>(内存队列)"]
            WORKERS["Worker<br/>Pool<br/>(Goroutines)"]
        end
        
        subgraph Logic["Logic 层"]
            SANITIZER["Span<br/>Sanitizer<br/>(数据清洗)"]
            TAGGER["Collector<br/>Tagger<br/>(标签补充)"]
            AGGREGATOR["Sampling<br/>Aggregator<br/>(统计)"]
        end
        
        WRITER["Trace<br/>Writer"]
        
        Receivers --> TENANT
        TENANT --> FILTER
        FILTER --> QUEUE
        QUEUE --> WORKERS
        WORKERS --> SANITIZER
        SANITIZER --> TAGGER
        TAGGER --> AGGREGATOR
        AGGREGATOR --> WRITER
    end
    
    subgraph Downstream["下游依赖"]
        STORAGE["Storage<br/>Backend"]
        SAMPSTORE["Sampling<br/>Store"]
    end
    
    SDK -->|"gRPC/HTTP<br/>PostSpans"| Receivers
    WRITER --> STORAGE
    AGGREGATOR -.->|"定期刷新"| SAMPSTORE
    
    style Receivers fill:#fff4e6
    style Processing fill:#e1f5ff
    style Logic fill:#f3e5f5
    style Storage fill:#e8f5e9

1.5 边界与扩展点

边界:

  • 输入边界:仅接受符合 Jaeger/OTLP/Zipkin 协议的 span 数据,不支持自定义协议
  • 处理边界:不修改 span 的核心数据(TraceID、SpanID、时间戳),仅补充 Collector 标签和清洗非法字段
  • 输出边界:通过统一的 Storage 接口写入,不关心具体存储实现

扩展点:

  1. Span Filter:可自定义过滤器,根据业务规则丢弃特定 span
  2. Span Processor:可注册额外的处理器,实现数据脱敏、加密等
  3. Storage Backend:可插拔存储实现,支持 gRPC Remote Storage Plugin
  4. Sampling Strategy:可自定义采样策略实现

状态持有位置:

  • 内存队列:缓冲待处理的 span(队列深度可配置)
  • Metrics:记录处理指标(接收计数、拒绝计数、队列长度等)
  • 无持久状态:Collector 本身无状态,重启后不丢失数据(已写入 Storage 的除外)

资源占用要点:

  • 内存:主要消耗在队列(队列大小 × span 平均大小)和工作线程
  • CPU:主要消耗在 span 反序列化、验证、序列化
  • 网络:接收 span 数据和写入 Storage 的网络带宽
  • 文件描述符:每个 gRPC/HTTP 连接占用一个 FD

2. 对外 API 列表与规格

2.1 API 总览

Collector 提供以下对外 API:

API 名称 协议 路径/方法 端口 说明
PostSpans (gRPC) gRPC jaeger.api_v2.CollectorService/PostSpans 14250 接收 Jaeger Proto 格式的 span batch
SaveSpan (HTTP) HTTP POST /api/traces 14268 接收 Jaeger Thrift 格式的 span batch
OTLP Traces (gRPC) gRPC opentelemetry.proto.collector.trace.v1.TraceService/Export 4317 接收 OTLP gRPC 格式的 traces
OTLP Traces (HTTP) HTTP POST /v1/traces 4318 接收 OTLP HTTP/JSON 格式的 traces
Zipkin Spans HTTP POST /api/v2/spans 9411 接收 Zipkin JSON/Thrift 格式的 spans
GetSamplingStrategy (gRPC) gRPC jaeger.api_v2.SamplingManager/GetSamplingStrategy 14250 返回服务的采样策略
GetSamplingStrategy (HTTP) HTTP GET /api/sampling/strategies 14268 返回服务的采样策略(HTTP)

2.2 API 详细规格

2.2.1 PostSpans (gRPC)

基本信息:

  • 名称:PostSpans
  • 协议:gRPC jaeger.api_v2.CollectorService/PostSpans
  • 端口:14250
  • 幂等性:否(多次提交相同 span 会产生重复数据)

请求结构体:

// PostSpansRequest 定义在 jaeger-idl/proto-gen/api_v2/collector.pb.go
type PostSpansRequest struct {
    Batch model.Batch  // 包含多个 span 的批次
}

// model.Batch 定义在 jaeger-idl/model/v1/model.pb.go
type Batch struct {
    Spans   []*Span   // span 列表
    Process *Process  // 共享的 Process 信息(服务名、标签等)
}

请求字段表:

字段 类型 必填 默认 约束 说明
Batch model.Batch - 至少包含 1 个 span 批次数据
Batch.Spans []*model.Span - 长度 > 0 span 列表
Batch.Process *model.Process - - 服务进程信息(服务名、标签)

model.Span 核心字段:

字段 类型 必填 说明
TraceID model.TraceID Trace 唯一标识(16 字节)
SpanID model.SpanID Span 唯一标识(8 字节)
OperationName string 操作名(如 “HTTP GET /api/users”)
References []SpanRef 父 span 引用(ChildOf、FollowsFrom)
StartTime time.Time Span 开始时间(微秒)
Duration time.Duration Span 持续时间(微秒)
Tags []KeyValue Span 标签(key-value 对)
Logs []Log Span 日志事件
Process *Process 如果为 nil,使用 Batch.Process

响应结构体:

type PostSpansResponse struct {
    // 空响应,成功返回空结构体
}

响应字段表:

字段 类型 必填 说明
(无字段) - - 成功时返回空响应,失败时返回 gRPC 错误

入口函数与核心代码:

入口函数:

// PostSpans implements gRPC CollectorService.
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {

核心处理逻辑:

// PostSpans 实现 gRPC CollectorService
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
    batch := &r.Batch
    // 调用 batchConsumer 处理批次
    err := g.batchConsumer.consume(ctx, batch)
    return &api_v2.PostSpansResponse{}, err
}

// batchConsumer.consume 处理逻辑
func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error {
    // 1. 验证租户(如果启用多租户)
    tenant, err := c.validateTenant(ctx)
    if err != nil {
        return err  // 返回 gRPC 错误:租户验证失败
    }

    // 2. 将 Batch.Process 分配给没有 Process 的 span
    for _, span := range batch.Spans {
        if span.GetProcess() == nil {
            span.Process = batch.Process
        }
    }

    // 3. 调用 SpanProcessor 处理 span
    _, err = c.spanProcessor.ProcessSpans(ctx, processor.SpansV1{
        Spans: batch.Spans,
        Details: processor.Details{
            InboundTransport: c.spanOptions.InboundTransport,  // gRPC
            SpanFormat:       c.spanOptions.SpanFormat,        // ProtoSpanFormat
            Tenant:           tenant,
        },
    })

    // 4. 处理错误
    if err != nil {
        if errors.Is(err, processor.ErrBusy) {
            // 队列满,返回 ResourceExhausted
            return status.Error(codes.ResourceExhausted, err.Error())
        }
        return err
    }
    return nil
}

调用链与上层函数:

调用链层次:

gRPC Client → gRPC Server (grpc.Server)
            → CollectorServiceServer (GRPCHandler)
            → PostSpans()
            → batchConsumer.consume()
            → spanProcessor.ProcessSpans()
            → spanProcessor.processSpans()
            → spanProcessor.enqueueSpan() → Queue.Produce()
            → [异步] Worker Goroutine → processItemFromQueue()
            → sanitizer() → processSpan() → saveSpan()
            → traceWriter.WriteSpan()

上层适配代码:

// gRPC Server 启动逻辑(cmd/collector/app/server/grpc.go)
func serveGRPC(server *grpc.Server, listener net.Listener, params *GRPCServerParams) error {
    // 注册 CollectorService
    api_v2.RegisterCollectorServiceServer(server, params.Handler)
    
    // 注册 SamplingManager
    api_v2.RegisterSamplingManagerServer(server, samplinggrpc.NewHandler(params.SamplingProvider))
    
    // 注册健康检查
    healthServer := health.NewServer()
    healthServer.SetServingStatus("jaeger.api_v2.CollectorService", grpc_health_v1.HealthCheckResponse_SERVING)
    grpc_health_v1.RegisterHealthServer(server, healthServer)
    
    // 启动 gRPC Server
    go func() {
        if err := server.Serve(listener); err != nil {
            params.Logger.Error("Could not launch gRPC service", zap.Error(err))
        }
    }()
    return nil
}

时序图:

sequenceDiagram
    autonumber
    participant C as gRPC Client
    participant GS as gRPC Server
    participant GH as GRPCHandler
    participant BC as BatchConsumer
    participant TM as TenancyMgr
    participant SP as SpanProcessor
    participant Q as Queue
    participant W as Worker Pool
    participant SAN as Sanitizer
    participant TW as TraceWriter
    participant ST as Storage
    
    C->>GS: PostSpans(PostSpansRequest)
    GS->>GH: PostSpans(ctx, request)
    GH->>BC: consume(ctx, batch)
    
    alt 多租户启用
        BC->>TM: GetValidTenant(ctx)
        TM-->>BC: tenant / error
        alt 租户验证失败
            BC-->>GH: error
            GH-->>C: gRPC Error
        end
    end
    
    BC->>BC: 分配 Process<br/>给 span
    BC->>SP: ProcessSpans(ctx, SpansV1)
    SP->>SP: 为每个 span<br/>添加 Collector 标签
    
    loop 每个 Span
        SP->>SP: filterSpan()<br/>(过滤)
        alt span 被过滤
            SP->>SP: 拒绝计数+1
        else span 通过
            SP->>Q: Produce(queueItem)
            alt 队列满
                Q-->>SP: false
                SP-->>BC: ErrBusy
                BC-->>GH: ResourceExhausted
                GH-->>C: gRPC Error<br/>(codes.ResourceExhausted)
            end
        end
    end
    
    SP-->>BC: ok
    BC-->>GH: nil
    GH-->>C: PostSpansResponse{}
    
    Note over Q,W: 异步处理
    Q->>W: 消费 queueItem
    W->>SAN: Sanitize(span)
    SAN-->>W: 清洗后的 span
    W->>W: processSpan()<br/>(采样统计)
    W->>TW: WriteSpan(ctx, span)
    TW->>ST: 存储写入
    ST-->>TW: ok / error
    alt 写入成功
        TW->>W: 成功指标+1
    else 写入失败
        TW->>W: 失败指标+1
    end

边界与异常:

  • 重复请求:不支持幂等,相同 span 多次提交会写入多次(依赖客户端去重或存储层去重)
  • 队列满:返回 codes.ResourceExhausted 错误,客户端应实施退避重试
  • 租户验证失败:返回 codes.PermissionDenied 错误,拒绝请求
  • 存储失败:异步写入,客户端已收到成功响应,失败仅记录日志和指标

最佳实践:

  • 批量提交:建议每次提交 10-100 个 span,减少网络开销
  • 重试策略:客户端应实现指数退避重试,最大重试次数 3-5 次
  • 超时设置:建议客户端设置 5-10 秒超时,避免长时间阻塞
  • 压缩:启用 gRPC 压缩(gzip),减少网络传输量

2.2.2 SaveSpan (HTTP Thrift)

基本信息:

  • 名称:SaveSpan
  • 协议:HTTP POST /api/traces
  • 端口:14268
  • Content-Type:application/x-thriftapplication/vnd.apache.thrift.binary
  • 幂等性:否

请求结构体:

// 请求体为 Thrift 序列化的 jaeger.Batch
// 定义在 jaeger-idl/thrift-gen/jaeger/jaeger.go
type Batch struct {
    Process *Process `thrift:"process,1,required" json:"process"`
    Spans   []*Span  `thrift:"spans,2,required" json:"spans"`
}

请求字段表:

字段 类型 必填 说明
Process *Process 服务进程信息
Spans []*Span span 列表(长度 > 0)

响应结构体:

// HTTP 200 OK,响应体为空
// 或 HTTP 400/500,响应体为错误信息(纯文本)

响应字段表:

HTTP 状态码 响应体 说明
200 OK 成功接收并处理
400 Bad Request 错误信息(文本) 请求格式错误或无法解析
500 Internal Server Error 错误信息(文本) 服务器内部错误
503 Service Unavailable 错误信息(文本) 队列满,资源不足

入口函数与核心代码:

入口函数:

// SaveSpan submits the span provided in the request body to the JaegerBatchesHandler
func (aH *APIHandler) SaveSpan(w http.ResponseWriter, r *http.Request) {
    // 读取请求体并反序列化 Thrift 格式的 Batch

核心处理逻辑:

func (aH *APIHandler) SaveSpan(w http.ResponseWriter, r *http.Request) {
    // 1. 读取请求体
    bodyBytes, err := io.ReadAll(r.Body)
    if err != nil {
        http.Error(w, fmt.Sprintf("Unable to read body: %v", err), http.StatusBadRequest)
        return
    }

    // 2. 验证 Content-Type
    contentType, _, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
    if err != nil {
        http.Error(w, fmt.Sprintf("Cannot parse content type: %v", err), http.StatusBadRequest)
        return
    }
    if _, ok := acceptedThriftFormats[contentType]; !ok {
        http.Error(w, fmt.Sprintf("Unsupported content type: %s", contentType), http.StatusBadRequest)
        return
    }

    // 3. 反序列化 Thrift
    tdes := thrift.NewTDeserializer()
    batch := &tjaeger.Batch{}
    if err = tdes.Read(batch, bodyBytes); err != nil {
        http.Error(w, fmt.Sprintf("Unable to deserialize: %v", err), http.StatusBadRequest)
        return
    }

    // 4. 调用 JaegerBatchesHandler 处理
    ctx := r.Context()
    _, err = aH.jaegerBatchesHandler.SubmitBatches(ctx, []*tjaeger.Batch{batch}, handler.SubmitBatchOptions{
        InboundTransport: processor.HTTPTransport,
    })

    // 5. 处理错误
    if err != nil {
        if errors.Is(err, processor.ErrBusy) {
            http.Error(w, "Server busy", http.StatusServiceUnavailable)
        } else {
            http.Error(w, fmt.Sprintf("Cannot submit batch: %v", err), http.StatusInternalServerError)
        }
        return
    }

    // 6. 返回成功
    w.WriteHeader(http.StatusOK)
}

时序图:

sequenceDiagram
    autonumber
    participant C as HTTP Client
    participant HS as HTTP Server<br/>(Gorilla Mux)
    participant AH as APIHandler
    participant JBH as JaegerBatchesHandler
    participant SP as SpanProcessor
    participant Q as Queue
    participant ST as Storage
    
    C->>HS: POST /api/traces<br/>(Thrift binary)
    HS->>AH: SaveSpan(w, r)
    AH->>AH: 读取请求体<br/>验证 Content-Type
    AH->>AH: 反序列化 Thrift
    AH->>JBH: SubmitBatches(ctx, batches)
    JBH->>SP: ProcessSpans(ctx, SpansV1)
    SP->>Q: Produce(queueItem)
    
    alt 队列满
        Q-->>SP: false
        SP-->>JBH: ErrBusy
        JBH-->>AH: ErrBusy
        AH->>HS: 503 Service Unavailable
        HS->>C: 503 Response
    else 成功入队
        Q-->>SP: true
        SP-->>JBH: ok
        JBH-->>AH: ok
        AH->>HS: 200 OK
        HS->>C: 200 Response
        
        Note over Q,ST: 异步处理
        Q->>ST: 存储写入
    end

边界与异常:

  • Content-Type 错误:仅支持 application/x-thriftapplication/vnd.apache.thrift.binary,其他返回 400
  • Thrift 格式错误:无法反序列化时返回 400
  • 队列满:返回 503 Service Unavailable

2.2.3 OTLP Traces (gRPC)

基本信息:

  • 名称:Export
  • 协议:gRPC opentelemetry.proto.collector.trace.v1.TraceService/Export
  • 端口:4317(需配置 --collector.otlp.enabled=true
  • 幂等性:否

请求结构体:

// ExportTraceServiceRequest 定义在 OTEL proto
type ExportTraceServiceRequest struct {
    ResourceSpans []*ResourceSpans  // OTLP traces 数据
}

响应结构体:

type ExportTraceServiceResponse struct {
    PartialSuccess *ExportTracePartialSuccess  // 部分成功信息(可选)
}

入口函数与核心代码:

OTLP Receiver 由 OpenTelemetry Collector 的 otlpreceiver 组件实现,Collector 通过配置启用:

// cmd/collector/app/handler/otlp_receiver.go
func StartOTLPReceiver(opts *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.Manager) (receiver.Traces, error) {
    // 创建 OTLP Receiver
    // 将接收到的 OTLP traces 转发给 spanProcessor
    // 内部逻辑类似 PostSpans,但数据格式为 ptrace.Traces(OTEL Collector 的数据模型)
}

处理流程:

OTLP gRPC Client → OTLP Receiver → SpanProcessor.ProcessSpans(SpansV2{Traces: ptrace.Traces})
                                  → otelExporter.ConsumeTraces(ctx, traces)
                                  → traceWriter.WriteTraces(ctx, traces)
                                  → Storage

时序图: 类似 PostSpans,差异在于数据格式为 ptrace.Traces(OTEL 内部格式)


2.2.4 GetSamplingStrategy (gRPC)

基本信息:

  • 名称:GetSamplingStrategy
  • 协议:gRPC jaeger.api_v2.SamplingManager/GetSamplingStrategy
  • 端口:14250
  • 幂等性:是(相同请求返回相同采样策略)

请求结构体:

type GetSamplingStrategyRequest struct {
    ServiceName string  // 服务名
}

请求字段表:

字段 类型 必填 说明
ServiceName string 服务名(如 “frontend”)

响应结构体:

type SamplingStrategyResponse struct {
    StrategyType              SamplingStrategyType          // 策略类型(PROBABILISTIC 或 RATE_LIMITING)
    ProbabilisticSampling     *ProbabilisticSamplingStrategy  // 概率采样(0.0-1.0)
    RateLimitingSampling      *RateLimitingSamplingStrategy   // 速率限制采样(traces/second)
    OperationSampling         *PerOperationSamplingStrategies // 操作级采样策略
}

type ProbabilisticSamplingStrategy struct {
    SamplingRate float64  // 采样概率(0.0-1.0)
}

type RateLimitingSamplingStrategy struct {
    MaxTracesPerSecond int32  // 最大 traces/second
}

响应字段表:

字段 类型 必填 说明
StrategyType enum PROBABILISTIC(0) 或 RATE_LIMITING(1)
ProbabilisticSampling *ProbabilisticSamplingStrategy StrategyType=PROBABILISTIC 时返回
RateLimitingSampling *RateLimitingSamplingStrategy StrategyType=RATE_LIMITING 时返回
OperationSampling *PerOperationSamplingStrategies 操作级策略(覆盖服务级策略)

入口函数与核心代码:

// Sampling gRPC Handler
type samplingHandler struct {
    provider samplingstrategy.Provider
}

func (h *samplingHandler) GetSamplingStrategy(ctx context.Context, req *api_v2.GetSamplingStrategyRequest) (*api_v2.SamplingStrategyResponse, error) {
    // 调用 Sampling Provider 获取策略
    resp, err := h.provider.GetSamplingStrategy(ctx, req.ServiceName)
    if err != nil {
        return nil, status.Errorf(codes.Internal, "failed to get sampling strategy: %v", err)
    }
    return resp, nil
}

时序图:

sequenceDiagram
    autonumber
    participant SDK as Client SDK
    participant GS as gRPC Server
    participant SH as SamplingHandler
    participant SP as SamplingProvider
    participant SS as SamplingStore
    
    SDK->>GS: GetSamplingStrategy<br/>(service="frontend")
    GS->>SH: GetSamplingStrategy(ctx, req)
    SH->>SP: GetSamplingStrategy<br/>(ctx, "frontend")
    
    alt 文件策略
        SP->>SP: 读取静态配置文件
        SP-->>SH: SamplingStrategyResponse<br/>(概率或速率限制)
    else 自适应策略
        SP->>SS: 查询服务采样概率
        SS-->>SP: 概率值(0.0-1.0)
        SP-->>SH: SamplingStrategyResponse<br/>(动态概率)
    end
    
    SH-->>GS: SamplingStrategyResponse
    GS-->>SDK: Response
    SDK->>SDK: 应用采样策略<br/>到新 trace

最佳实践:

  • SDK 应定期拉取采样策略(建议 1-5 分钟一次),而非每次 trace 都请求
  • 使用缓存避免频繁查询 Sampling Store

3. 关键数据结构与 UML

3.1 核心数据结构

3.1.1 Collector 结构体

type Collector struct {
    serviceName        string                        // 服务名 "jaeger-collector"
    logger             *zap.Logger                   // 日志器
    metricsFactory     metrics.Factory               // 指标工厂
    traceWriter        tracestore.Writer             // Trace 写入器
    samplingProvider   samplingstrategy.Provider     // 采样策略提供者
    samplingAggregator samplingstrategy.Aggregator   // 采样聚合器
    hCheck             *healthcheck.HealthCheck      // 健康检查
    spanProcessor      processor.SpanProcessor       // Span 处理器
    spanHandlers       *SpanHandlers                 // Span 处理器集合
    tenancyMgr         *tenancy.Manager              // 租户管理器
    
    // 服务器实例(只读)
    hServer        *http.Server      // HTTP 服务器
    grpcServer     *grpc.Server      // gRPC 服务器
    otlpReceiver   receiver.Traces   // OTLP 接收器
    zipkinReceiver receiver.Traces   // Zipkin 接收器
}

3.1.2 SpanProcessor 结构体

type spanProcessor struct {
    queue              *queue.BoundedQueue[queueItem]  // 有界队列
    otelExporter       exporter.Traces                 // OTEL 导出器
    queueResizeMu      sync.Mutex                      // 队列调整锁
    metrics            *SpanProcessorMetrics           // 指标
    telset             telemetry.Settings              // 遥测设置
    preProcessSpans    ProcessSpans                    // 预处理函数
    filterSpan         FilterSpan                      // 过滤函数
    sanitizer          sanitizer.SanitizeSpan          // 清洗函数
    processSpan        ProcessSpan                     // 处理函数
    logger             *zap.Logger                     // 日志器
    traceWriter        tracestore.Writer               // Trace 写入器
    reportBusy         bool                            // 是否报告忙碌状态
    numWorkers         int                             // 工作线程数
    collectorTags      map[string]string               // Collector 标签
    dynQueueSizeWarmup uint                            // 动态队列预热大小
    dynQueueSizeMemory uint                            // 动态队列内存上限
    bytesProcessed     atomic.Uint64                   // 已处理字节数
    spansProcessed     atomic.Uint64                   // 已处理 span 数
    stopCh             chan struct{}                   // 停止信号
}

type queueItem struct {
    queuedTime time.Time    // 入队时间
    span       *model.Span  // Span 数据
    tenant     string       // 租户 ID
}

3.1.3 SpanHandlers 结构体

type SpanHandlers struct {
    ZipkinSpansHandler   handler.ZipkinSpansHandler     // Zipkin 处理器
    JaegerBatchesHandler handler.JaegerBatchesHandler   // Jaeger Batch 处理器
    GRPCHandler          *handler.GRPCHandler           // gRPC 处理器
}

3.2 UML 类图

classDiagram
    class Collector {
        -string serviceName
        -Logger logger
        -MetricsFactory metricsFactory
        -TraceWriter traceWriter
        -SamplingProvider samplingProvider
        -SamplingAggregator samplingAggregator
        -SpanProcessor spanProcessor
        -SpanHandlers spanHandlers
        -TenancyManager tenancyMgr
        +Start(options) error
        +Close() error
    }
    
    class SpanProcessor {
        <<interface>>
        +ProcessSpans(ctx, batch) ([]bool, error)
        +Close() error
    }
    
    class spanProcessor {
        -BoundedQueue queue
        -TraceWriter traceWriter
        -Sanitizer sanitizer
        -FilterSpan filterSpan
        -int numWorkers
        -map collectorTags
        -ProcessSpan processSpan
        +ProcessSpans(ctx, batch) ([]bool, error)
        -enqueueSpan(span) bool
        -processItemFromQueue(item)
        -saveSpan(span, tenant)
    }
    
    class BoundedQueue~queueItem~ {
        -int capacity
        -chan items
        -onDroppedItem func
        +Produce(item) bool
        +StartConsumers(workers, consumer)
        +Stop()
    }
    
    class queueItem {
        +time queuedTime
        +Span span
        +string tenant
    }
    
    class SpanHandlers {
        +ZipkinSpansHandler
        +JaegerBatchesHandler
        +GRPCHandler
    }
    
    class GRPCHandler {
        -Logger logger
        -BatchConsumer batchConsumer
        +PostSpans(ctx, req) (resp, error)
    }
    
    class BatchConsumer {
        -SpanProcessor spanProcessor
        -TenancyManager tenancyMgr
        +consume(ctx, batch) error
        -validateTenant(ctx) (string, error)
    }
    
    class TraceWriter {
        <<interface>>
        +WriteSpan(ctx, span) error
        +WriteTraces(ctx, traces) error
    }
    
    Collector "1" --> "1" SpanProcessor : uses
    Collector "1" --> "1" SpanHandlers : contains
    Collector "1" --> "1" TraceWriter : uses
    
    SpanHandlers "1" --> "1" GRPCHandler : contains
    GRPCHandler "1" --> "1" BatchConsumer : uses
    BatchConsumer "1" --> "1" SpanProcessor : uses
    
    SpanProcessor <|.. spanProcessor : implements
    spanProcessor "1" --> "1" BoundedQueue : uses
    spanProcessor "1" --> "1" TraceWriter : uses
    BoundedQueue "1" --> "*" queueItem : stores

3.3 数据结构说明

Collector:

  • 顶层组件,负责协调所有子组件的生命周期
  • 持有 gRPC/HTTP 服务器、Span 处理器、Trace 写入器、采样相关组件的引用
  • 无持久状态,所有状态存储在下游组件(Storage、Sampling Store)

spanProcessor:

  • 核心处理逻辑,实现 SpanProcessor 接口
  • 使用有界队列(BoundedQueue)缓冲 span,解耦接收和处理
  • 管理工作线程池(goroutine),从队列消费并处理 span
  • 支持动态调整队列大小(基于内存使用率)

BoundedQueue:

  • 有界队列,容量固定(默认 2000)
  • 当队列满时,丢弃新 span 并调用 onDroppedItem 回调
  • 使用 Go channel 实现,支持多生产者多消费者

queueItem:

  • 队列中的单元,包含 span 数据、入队时间、租户 ID
  • 入队时间用于统计队列延迟(InQueueLatency 指标)

SpanHandlers:

  • 聚合不同协议的处理器(gRPC、HTTP、Zipkin)
  • 所有处理器最终调用同一个 SpanProcessor

4. 关键功能与函数详细描述

4.1 Span 处理流程

4.1.1 ProcessSpans 函数

函数签名:

func (sp *spanProcessor) ProcessSpans(ctx context.Context, batch processor.Batch) ([]bool, error)

功能描述:

ProcessSpans 是 span 处理的总入口,负责接收 span 批次、预处理、入队、返回结果。

核心代码:

func (sp *spanProcessor) ProcessSpans(ctx context.Context, batch processor.Batch) ([]bool, error) {
    // 1. 调用预处理函数(用户自定义,可选)
    sp.preProcessSpans(batch)

    var batchOks []bool
    var batchErr error
    
    // 2. 根据数据模型版本处理(V1 或 V2)
    batch.GetSpans(func(spans []*model.Span) {
        // V1: Jaeger model.Span
        batchOks, batchErr = sp.processSpans(ctx, batch, spans)
    }, func(traces ptrace.Traces) {
        // V2: OTLP ptrace.Traces
        ctx := tenancy.WithTenant(ctx, batch.GetTenant())
        if err := sp.otelExporter.ConsumeTraces(ctx, traces); err != nil {
            batchErr = err
        } else {
            batchOks = make([]bool, traces.SpanCount())
            for i := range batchOks {
                batchOks[i] = true
            }
        }
    })
    
    return batchOks, batchErr
}

详细步骤:

  1. 预处理(preProcessSpans):

    • 用户可注册自定义预处理函数,执行数据脱敏、加密等操作
    • 默认为空函数
  2. 数据模型分支:

    • V1 分支:Jaeger 原生格式(model.Span
      • 调用 processSpans 逐个处理 span
      • 为每个 span 添加 Collector 标签和格式标记
      • 入队到 BoundedQueue
    • V2 分支:OTLP 格式(ptrace.Traces
      • 调用 otelExporter.ConsumeTraces
      • 内部最终调用 traceWriter.WriteTraces
  3. 返回结果:

    • batchOks:布尔数组,表示每个 span 是否成功入队
    • batchErr:错误信息(如队列满返回 ErrBusy

调用链:

PostSpans (gRPC) → GRPCHandler.PostSpans() 
                 → batchConsumer.consume()
                 → spanProcessor.ProcessSpans()

4.1.2 enqueueSpan 函数

函数签名:

func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor.SpanFormat, transport processor.InboundTransport, tenant string) bool

功能描述:

将单个 span 入队到内存队列,执行过滤和指标记录。

核心代码:

func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor.SpanFormat, transport processor.InboundTransport, tenant string) bool {
    // 1. 更新接收指标(按服务名和格式分组)
    spanCounts := sp.metrics.GetCountsForFormat(originalFormat, transport)
    spanCounts.ReceivedBySvc.ForSpanV1(span)

    // 2. 调用过滤器
    if !sp.filterSpan(span) {
        // span 被过滤,更新拒绝指标
        spanCounts.RejectedBySvc.ForSpanV1(span)
        return true  // 返回 true 表示"不是因为队列满被拒绝"
    }

    // 3. 添加格式标签(标记 span 来自哪种格式)
    span.Tags = append(span.Tags, model.String(jptrace.FormatAttribute, string(originalFormat)))

    // 4. 构造队列项
    item := queueItem{
        queuedTime: time.Now(),
        span:       span,
        tenant:     tenant,
    }

    // 5. 入队
    return sp.queue.Produce(item)  // 返回 false 表示队列满
}

详细步骤:

  1. 指标记录:根据 span 的服务名、格式(Thrift、Proto、Zipkin)、传输协议(gRPC、HTTP)更新接收计数
  2. 过滤:调用 filterSpan 函数,默认返回 true(不过滤),可自定义过滤规则
  3. 标签补充:添加 internal.span.format 标签,标记 span 来源格式
  4. 入队:调用 queue.Produce,如果队列满返回 false

调用链:

processSpans() → enqueueSpan() → queue.Produce()

4.1.3 processItemFromQueue 函数

函数签名:

func (sp *spanProcessor) processItemFromQueue(item queueItem)

功能描述:

工作线程从队列消费 span,执行清洗、处理、写入存储。

核心代码:

func (sp *spanProcessor) processItemFromQueue(item queueItem) {
    // 1. 调用 sanitizer 清洗数据
    cleanedSpan := sp.sanitizer(item.span)

    // 2. 调用 processSpan 函数链(包含采样统计、写入存储等)
    sp.processSpan(cleanedSpan, item.tenant)

    // 3. 更新队列延迟指标
    sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime))
}

详细步骤:

  1. 数据清洗(sanitizer):

    • 清洗非法字段(如过长的字符串)
    • 标准化时间戳(微秒转纳秒)
    • 去除重复标签
  2. 处理函数链(processSpan):

    • 采样统计:如果是 root span,调用 samplingAggregator.HandleRootSpan
    • 写入存储:调用 saveSpantraceWriter.WriteSpan
    • 更新指标:成功/失败计数
  3. 指标记录:

    • InQueueLatency:span 在队列中的等待时间(用于监控队列是否堵塞)

调用链:

Queue Consumer Goroutine → processItemFromQueue()
                         → sanitizer()
                         → processSpan()
                         → saveSpan()
                         → traceWriter.WriteSpan()

4.2 Sanitizer 数据清洗

函数链:

type SanitizeSpan func(*model.Span) *model.Span

func NewChainedSanitizer(sanitizers ...SanitizeSpan) SanitizeSpan {
    return ChainedSanitizeSpan(sanitizers...)
}

标准清洗器:

  1. UTF-8 Sanitizer:确保所有字符串字段为有效 UTF-8
  2. Service Name Sanitizer:清理服务名中的非法字符
  3. ParentSpanID Sanitizer:修复 parent span ID 为 0 的情况(根 span)

清洗示例:

// UTF-8 清洗
func NewUTF8Sanitizer() SanitizeSpan {
    return func(span *model.Span) *model.Span {
        span.OperationName = sanitizeUTF8(span.OperationName)
        for i := range span.Tags {
            span.Tags[i].VStr = sanitizeUTF8(span.Tags[i].VStr)
        }
        return span
    }
}

4.3 Collector Tags 补充

功能:

为每个 span 添加 Collector 的标识标签,标记 span 经过了哪个 Collector 实例。

核心代码:

func (sp *spanProcessor) addCollectorTags(span *model.Span) {
    if len(sp.collectorTags) == 0 {
        return
    }

    // 1. 检查 span.Process.Tags 中是否已存在相同的 key-value
    dedupKey := make(map[string]struct{})
    for _, tag := range span.Process.Tags {
        if value, ok := sp.collectorTags[tag.Key]; ok && value == tag.AsString() {
            dedupKey[tag.Key] = struct{}{}  // 标记为已存在,不重复添加
        }
    }

    // 2. 添加不存在的 Collector 标签
    for k, v := range sp.collectorTags {
        if _, ok := dedupKey[k]; !ok {
            span.Process.Tags = append(span.Process.Tags, model.String(k, v))
        }
    }

    // 3. 对标签排序(保证一致性)
    typedTags := model.KeyValues(span.Process.Tags)
    typedTags.Sort()
}

配置示例:

jaeger-collector --collector.tags="hostname=collector-01,zone=us-east-1"

结果:

所有 span 的 Process.Tags 中会包含:

{
  "hostname": "collector-01",
  "zone": "us-east-1"
}

4.4 动态队列调整

功能:

根据系统内存使用情况动态调整队列大小,避免内存溢出。

核心代码:

func (sp *spanProcessor) updateQueueSize() {
    sp.queueResizeMu.Lock()
    defer sp.queueResizeMu.Unlock()

    // 1. 计算平均 span 大小
    spansProcessed := sp.spansProcessed.Load()
    bytesProcessed := sp.bytesProcessed.Load()
    if spansProcessed == 0 {
        return  // 还没有数据,无法计算
    }
    avgSpanSizeBytes := bytesProcessed / spansProcessed

    // 2. 计算目标队列大小(基于内存上限)
    memoryMiB := sp.dynQueueSizeMemory
    if memoryMiB == 0 {
        return
    }
    targetQueueSize := (memoryMiB * 1024 * 1024) / avgSpanSizeBytes

    // 3. 限制最大队列大小
    if targetQueueSize > maxQueueSize {
        targetQueueSize = maxQueueSize
    }

    // 4. 检查是否需要调整(至少增长 20%)
    currentSize := sp.queue.Capacity()
    if float64(targetQueueSize) / float64(currentSize) < minRequiredChange {
        return  // 变化不大,不调整
    }

    // 5. 调整队列大小
    sp.queue.Resize(int(targetQueueSize))
    sp.logger.Info("Resized queue", zap.Int("old-size", currentSize), zap.Uint64("new-size", targetQueueSize))
}

触发条件:

  • 后台任务每 1 分钟执行一次
  • 需启用动态队列(--collector.queue-size-memory > 0)

算法:

目标队列大小 = (内存上限 / 平均 span 大小)

5. 配置项与最佳实践

5.1 关键配置项

配置项 默认值 说明 建议值
--collector.num-workers 50 工作线程数 CPU 核数 × 2-4
--collector.queue-size 2000 队列大小 >= 工作线程数 × 10
--collector.queue-size-memory 0 (禁用) 动态队列内存上限(字节) 512MB - 2GB
--collector.grpc-server.host-port :14250 gRPC 监听地址 0.0.0.0:14250
--collector.http-server.host-port :14268 HTTP 监听地址 0.0.0.0:14268
--collector.otlp.enabled true (V2) 是否启用 OTLP Receiver true
--collector.tags Collector 标签 “hostname=$(hostname),dc=us-east”
--collector.enable-span-size-metrics false 是否启用 span 大小指标 true(用于动态队列)
--span-storage.type memory 存储类型 cassandra/elasticsearch

5.2 性能调优

高吞吐场景(> 10K spans/s):

jaeger-collector \
  --collector.num-workers=200 \
  --collector.queue-size=20000 \
  --collector.queue-size-memory=2147483648 \  # 2GB
  --collector.grpc-server.max-message-size=8388608 \  # 8MB
  --span-storage.type=kafka  # 使用 Kafka 解耦

低延迟场景(P95 < 50ms):

jaeger-collector \
  --collector.num-workers=100 \
  --collector.queue-size=5000 \
  --span-storage.type=cassandra  # 直接写入,避免 Kafka 延迟

资源受限场景(小内存):

jaeger-collector \
  --collector.num-workers=20 \
  --collector.queue-size=1000 \
  --collector.queue-size-memory=268435456  # 256MB

5.3 监控告警

关键指标:

  1. 队列深度 (jaeger_collector_queue_length)

    • 正常:< 50%
    • 警告:> 80%
    • 告警:> 95%
  2. 拒绝率 (jaeger_collector_spans_rejected_total / jaeger_collector_spans_received_total)

    • 正常:< 0.1%
    • 告警:> 1%
  3. 队列延迟 (jaeger_collector_in_queue_latency)

    • 正常:< 10ms
    • 警告:> 50ms
    • 告警:> 100ms
  4. 存储写入失败率 (jaeger_collector_spans_saved_by_svc{result="err"})

    • 正常:< 0.01%
    • 告警:> 0.1%

Prometheus 告警规则:

groups:
  - name: jaeger_collector
    rules:
      - alert: CollectorQueueHigh
        expr: jaeger_collector_queue_length / jaeger_collector_queue_size > 0.8
        for: 5m
        annotations:
          summary: "Collector queue is {{ $value | humanizePercentage }} full"

      - alert: CollectorHighRejectionRate
        expr: rate(jaeger_collector_spans_rejected_total[5m]) / rate(jaeger_collector_spans_received_total[5m]) > 0.01
        for: 5m
        annotations:
          summary: "Collector rejecting {{ $value | humanizePercentage }} of spans"

      - alert: CollectorStorageWriteErrors
        expr: rate(jaeger_collector_spans_saved_by_svc{result="err"}[5m]) > 10
        for: 5m
        annotations:
          summary: "Collector storage write errors: {{ $value }} errors/sec"

6. 故障排查

6.1 常见问题

问题 1:客户端报 “ResourceExhausted” 错误

原因:

  • Collector 队列满,无法接收新 span

排查:

# 查看队列深度指标
curl http://collector:14269/metrics | grep queue_length

# 查看工作线程数
curl http://collector:14269/metrics | grep num_workers

解决:

  • 增加工作线程数:--collector.num-workers=100
  • 增加队列大小:--collector.queue-size=5000
  • 使用 Kafka 模式解耦:--span-storage.type=kafka

问题 2:存储写入失败率高

原因:

  • 存储后端不可用或过载
  • 网络问题

排查:

# 查看存储写入错误指标
curl http://collector:14269/metrics | grep spans_saved_by_svc

# 查看 Collector 日志
kubectl logs -f jaeger-collector-xxx | grep "Failed to save span"

解决:

  • 检查存储后端健康状态(Cassandra/ES 集群)
  • 增加存储容量或优化索引
  • 启用 Kafka 缓冲:--span-storage.type=kafka

问题 3:内存使用持续增长

原因:

  • 队列积压
  • 动态队列未限制

排查:

# 查看内存使用
kubectl top pod jaeger-collector-xxx

# 查看队列深度
curl http://collector:14269/metrics | grep queue_length

解决:

  • 启用动态队列并设置内存上限:--collector.queue-size-memory=1073741824 # 1GB
  • 限制队列大小:--collector.queue-size=2000

7. 总结

Collector 模块是 Jaeger 的数据接收核心,具备以下关键特性:

  1. 多协议支持:Jaeger、OTLP、Zipkin 全覆盖
  2. 高吞吐处理:通过队列和工作线程池实现高并发
  3. 灵活扩展:支持自定义过滤器、处理器、存储后端
  4. 流量保护:队列满时拒绝新请求,避免雪崩
  5. 智能监控:丰富的指标和健康检查