Jaeger-01-Collector模块
1. 模块概览
1.1 模块职责
Collector 模块是 Jaeger 分布式追踪系统的数据接收中心,负责接收、验证、处理和持久化来自各种客户端的 span 数据。
核心职责:
- 多协议接收:支持 Jaeger Thrift、Jaeger Proto、OpenTelemetry(OTLP)、Zipkin 等多种协议
- 数据验证:对接收的 span 进行格式验证、租户验证、过滤
- 数据处理:对 span 进行标准化、标签补充、数据清洗
- 流量控制:通过内存队列缓冲高并发请求,防止存储被打爆
- 持久化写入:将 span 写入配置的存储后端(Cassandra、ES、Kafka 等)
- 采样统计:为自适应采样提供吞吐量统计(Aggregator)
- 采样服务:向客户端 SDK 提供采样策略
输入与输出:
- 输入:
- gRPC:端口 14250(Jaeger Proto)
- HTTP:端口 14268(Jaeger Thrift)、9411(Zipkin)、4318(OTLP HTTP)
- gRPC:端口 4317(OTLP gRPC)
- 输出:
- 存储后端:Span 数据持久化
- Sampling Store:吞吐量统计数据
1.2 上下游依赖
上游(调用 Collector 的组件):
- 应用程序中的 OpenTelemetry SDK
- 应用程序中的 Jaeger SDK
- Zipkin SDK(兼容模式)
下游(Collector 调用的组件):
- Storage 层:Span Writer 接口,写入 span 数据
- Sampling Strategy Factory:采样策略管理,支持文件策略和自适应策略
- Tenancy Manager:多租户管理,实现租户隔离
1.3 生命周期
初始化 → 启动服务器 → 接收请求 → 处理 span → 写入存储 → 优雅关闭
详细流程:
-
初始化阶段:
- 加载配置参数(队列大小、工作线程数、存储类型等)
- 初始化 Storage Factory
- 初始化 Sampling Strategy Factory
- 创建 Span Processor(包含队列和工作线程池)
-
启动阶段:
- 启动 gRPC Server(端口 14250)
- 启动 HTTP Server(端口 14268)
- 启动 OTLP Receiver(可选,端口 4317/4318)
- 启动 Zipkin Receiver(可选,端口 9411)
- 启动队列消费者 goroutine 池
-
运行阶段:
- 接收 gRPC/HTTP 请求
- 验证租户和协议格式
- 将 span 放入内存队列
- 工作线程从队列消费 span
- 对 span 进行清洗和标签补充
- 写入 Storage
- 更新指标(成功/失败计数)
-
关闭阶段:
- 停止接受新请求
- 等待队列中的 span 处理完成
- 关闭 Storage 连接
- 释放资源
1.4 模块架构图
flowchart TB
subgraph Client["客户端"]
SDK["OTEL SDK<br/>Jaeger SDK"]
end
subgraph Collector["Collector 模块"]
direction TB
subgraph Receivers["Receivers 层"]
GRPC["gRPC Handler<br/>:14250"]
HTTP["HTTP Handler<br/>:14268"]
OTLP["OTLP Receiver<br/>:4317/:4318"]
ZIPKIN["Zipkin Receiver<br/>:9411"]
end
subgraph Processing["Processing 层"]
TENANT["Tenancy<br/>Validator"]
FILTER["Span<br/>Filter"]
QUEUE["Bounded<br/>Queue<br/>(内存队列)"]
WORKERS["Worker<br/>Pool<br/>(Goroutines)"]
end
subgraph Logic["Logic 层"]
SANITIZER["Span<br/>Sanitizer<br/>(数据清洗)"]
TAGGER["Collector<br/>Tagger<br/>(标签补充)"]
AGGREGATOR["Sampling<br/>Aggregator<br/>(统计)"]
end
WRITER["Trace<br/>Writer"]
Receivers --> TENANT
TENANT --> FILTER
FILTER --> QUEUE
QUEUE --> WORKERS
WORKERS --> SANITIZER
SANITIZER --> TAGGER
TAGGER --> AGGREGATOR
AGGREGATOR --> WRITER
end
subgraph Downstream["下游依赖"]
STORAGE["Storage<br/>Backend"]
SAMPSTORE["Sampling<br/>Store"]
end
SDK -->|"gRPC/HTTP<br/>PostSpans"| Receivers
WRITER --> STORAGE
AGGREGATOR -.->|"定期刷新"| SAMPSTORE
style Receivers fill:#fff4e6
style Processing fill:#e1f5ff
style Logic fill:#f3e5f5
style Storage fill:#e8f5e9
1.5 边界与扩展点
边界:
- 输入边界:仅接受符合 Jaeger/OTLP/Zipkin 协议的 span 数据,不支持自定义协议
- 处理边界:不修改 span 的核心数据(TraceID、SpanID、时间戳),仅补充 Collector 标签和清洗非法字段
- 输出边界:通过统一的 Storage 接口写入,不关心具体存储实现
扩展点:
- Span Filter:可自定义过滤器,根据业务规则丢弃特定 span
- Span Processor:可注册额外的处理器,实现数据脱敏、加密等
- Storage Backend:可插拔存储实现,支持 gRPC Remote Storage Plugin
- Sampling Strategy:可自定义采样策略实现
状态持有位置:
- 内存队列:缓冲待处理的 span(队列深度可配置)
- Metrics:记录处理指标(接收计数、拒绝计数、队列长度等)
- 无持久状态:Collector 本身无状态,重启后不丢失数据(已写入 Storage 的除外)
资源占用要点:
- 内存:主要消耗在队列(队列大小 × span 平均大小)和工作线程
- CPU:主要消耗在 span 反序列化、验证、序列化
- 网络:接收 span 数据和写入 Storage 的网络带宽
- 文件描述符:每个 gRPC/HTTP 连接占用一个 FD
2. 对外 API 列表与规格
2.1 API 总览
Collector 提供以下对外 API:
| API 名称 | 协议 | 路径/方法 | 端口 | 说明 |
|---|---|---|---|---|
| PostSpans (gRPC) | gRPC | jaeger.api_v2.CollectorService/PostSpans |
14250 | 接收 Jaeger Proto 格式的 span batch |
| SaveSpan (HTTP) | HTTP POST | /api/traces |
14268 | 接收 Jaeger Thrift 格式的 span batch |
| OTLP Traces (gRPC) | gRPC | opentelemetry.proto.collector.trace.v1.TraceService/Export |
4317 | 接收 OTLP gRPC 格式的 traces |
| OTLP Traces (HTTP) | HTTP POST | /v1/traces |
4318 | 接收 OTLP HTTP/JSON 格式的 traces |
| Zipkin Spans | HTTP POST | /api/v2/spans |
9411 | 接收 Zipkin JSON/Thrift 格式的 spans |
| GetSamplingStrategy (gRPC) | gRPC | jaeger.api_v2.SamplingManager/GetSamplingStrategy |
14250 | 返回服务的采样策略 |
| GetSamplingStrategy (HTTP) | HTTP GET | /api/sampling/strategies |
14268 | 返回服务的采样策略(HTTP) |
2.2 API 详细规格
2.2.1 PostSpans (gRPC)
基本信息:
- 名称:
PostSpans - 协议:gRPC
jaeger.api_v2.CollectorService/PostSpans - 端口:14250
- 幂等性:否(多次提交相同 span 会产生重复数据)
请求结构体:
// PostSpansRequest 定义在 jaeger-idl/proto-gen/api_v2/collector.pb.go
type PostSpansRequest struct {
Batch model.Batch // 包含多个 span 的批次
}
// model.Batch 定义在 jaeger-idl/model/v1/model.pb.go
type Batch struct {
Spans []*Span // span 列表
Process *Process // 共享的 Process 信息(服务名、标签等)
}
请求字段表:
| 字段 | 类型 | 必填 | 默认 | 约束 | 说明 |
|---|---|---|---|---|---|
| Batch | model.Batch | 是 | - | 至少包含 1 个 span | 批次数据 |
| Batch.Spans | []*model.Span | 是 | - | 长度 > 0 | span 列表 |
| Batch.Process | *model.Process | 是 | - | - | 服务进程信息(服务名、标签) |
model.Span 核心字段:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| TraceID | model.TraceID | 是 | Trace 唯一标识(16 字节) |
| SpanID | model.SpanID | 是 | Span 唯一标识(8 字节) |
| OperationName | string | 是 | 操作名(如 “HTTP GET /api/users”) |
| References | []SpanRef | 否 | 父 span 引用(ChildOf、FollowsFrom) |
| StartTime | time.Time | 是 | Span 开始时间(微秒) |
| Duration | time.Duration | 是 | Span 持续时间(微秒) |
| Tags | []KeyValue | 否 | Span 标签(key-value 对) |
| Logs | []Log | 否 | Span 日志事件 |
| Process | *Process | 否 | 如果为 nil,使用 Batch.Process |
响应结构体:
type PostSpansResponse struct {
// 空响应,成功返回空结构体
}
响应字段表:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| (无字段) | - | - | 成功时返回空响应,失败时返回 gRPC 错误 |
入口函数与核心代码:
入口函数:
// PostSpans implements gRPC CollectorService.
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
核心处理逻辑:
// PostSpans 实现 gRPC CollectorService
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
batch := &r.Batch
// 调用 batchConsumer 处理批次
err := g.batchConsumer.consume(ctx, batch)
return &api_v2.PostSpansResponse{}, err
}
// batchConsumer.consume 处理逻辑
func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error {
// 1. 验证租户(如果启用多租户)
tenant, err := c.validateTenant(ctx)
if err != nil {
return err // 返回 gRPC 错误:租户验证失败
}
// 2. 将 Batch.Process 分配给没有 Process 的 span
for _, span := range batch.Spans {
if span.GetProcess() == nil {
span.Process = batch.Process
}
}
// 3. 调用 SpanProcessor 处理 span
_, err = c.spanProcessor.ProcessSpans(ctx, processor.SpansV1{
Spans: batch.Spans,
Details: processor.Details{
InboundTransport: c.spanOptions.InboundTransport, // gRPC
SpanFormat: c.spanOptions.SpanFormat, // ProtoSpanFormat
Tenant: tenant,
},
})
// 4. 处理错误
if err != nil {
if errors.Is(err, processor.ErrBusy) {
// 队列满,返回 ResourceExhausted
return status.Error(codes.ResourceExhausted, err.Error())
}
return err
}
return nil
}
调用链与上层函数:
调用链层次:
gRPC Client → gRPC Server (grpc.Server)
→ CollectorServiceServer (GRPCHandler)
→ PostSpans()
→ batchConsumer.consume()
→ spanProcessor.ProcessSpans()
→ spanProcessor.processSpans()
→ spanProcessor.enqueueSpan() → Queue.Produce()
→ [异步] Worker Goroutine → processItemFromQueue()
→ sanitizer() → processSpan() → saveSpan()
→ traceWriter.WriteSpan()
上层适配代码:
// gRPC Server 启动逻辑(cmd/collector/app/server/grpc.go)
func serveGRPC(server *grpc.Server, listener net.Listener, params *GRPCServerParams) error {
// 注册 CollectorService
api_v2.RegisterCollectorServiceServer(server, params.Handler)
// 注册 SamplingManager
api_v2.RegisterSamplingManagerServer(server, samplinggrpc.NewHandler(params.SamplingProvider))
// 注册健康检查
healthServer := health.NewServer()
healthServer.SetServingStatus("jaeger.api_v2.CollectorService", grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(server, healthServer)
// 启动 gRPC Server
go func() {
if err := server.Serve(listener); err != nil {
params.Logger.Error("Could not launch gRPC service", zap.Error(err))
}
}()
return nil
}
时序图:
sequenceDiagram
autonumber
participant C as gRPC Client
participant GS as gRPC Server
participant GH as GRPCHandler
participant BC as BatchConsumer
participant TM as TenancyMgr
participant SP as SpanProcessor
participant Q as Queue
participant W as Worker Pool
participant SAN as Sanitizer
participant TW as TraceWriter
participant ST as Storage
C->>GS: PostSpans(PostSpansRequest)
GS->>GH: PostSpans(ctx, request)
GH->>BC: consume(ctx, batch)
alt 多租户启用
BC->>TM: GetValidTenant(ctx)
TM-->>BC: tenant / error
alt 租户验证失败
BC-->>GH: error
GH-->>C: gRPC Error
end
end
BC->>BC: 分配 Process<br/>给 span
BC->>SP: ProcessSpans(ctx, SpansV1)
SP->>SP: 为每个 span<br/>添加 Collector 标签
loop 每个 Span
SP->>SP: filterSpan()<br/>(过滤)
alt span 被过滤
SP->>SP: 拒绝计数+1
else span 通过
SP->>Q: Produce(queueItem)
alt 队列满
Q-->>SP: false
SP-->>BC: ErrBusy
BC-->>GH: ResourceExhausted
GH-->>C: gRPC Error<br/>(codes.ResourceExhausted)
end
end
end
SP-->>BC: ok
BC-->>GH: nil
GH-->>C: PostSpansResponse{}
Note over Q,W: 异步处理
Q->>W: 消费 queueItem
W->>SAN: Sanitize(span)
SAN-->>W: 清洗后的 span
W->>W: processSpan()<br/>(采样统计)
W->>TW: WriteSpan(ctx, span)
TW->>ST: 存储写入
ST-->>TW: ok / error
alt 写入成功
TW->>W: 成功指标+1
else 写入失败
TW->>W: 失败指标+1
end
边界与异常:
- 重复请求:不支持幂等,相同 span 多次提交会写入多次(依赖客户端去重或存储层去重)
- 队列满:返回
codes.ResourceExhausted错误,客户端应实施退避重试 - 租户验证失败:返回
codes.PermissionDenied错误,拒绝请求 - 存储失败:异步写入,客户端已收到成功响应,失败仅记录日志和指标
最佳实践:
- 批量提交:建议每次提交 10-100 个 span,减少网络开销
- 重试策略:客户端应实现指数退避重试,最大重试次数 3-5 次
- 超时设置:建议客户端设置 5-10 秒超时,避免长时间阻塞
- 压缩:启用 gRPC 压缩(
gzip),减少网络传输量
2.2.2 SaveSpan (HTTP Thrift)
基本信息:
- 名称:
SaveSpan - 协议:HTTP POST
/api/traces - 端口:14268
- Content-Type:
application/x-thrift或application/vnd.apache.thrift.binary - 幂等性:否
请求结构体:
// 请求体为 Thrift 序列化的 jaeger.Batch
// 定义在 jaeger-idl/thrift-gen/jaeger/jaeger.go
type Batch struct {
Process *Process `thrift:"process,1,required" json:"process"`
Spans []*Span `thrift:"spans,2,required" json:"spans"`
}
请求字段表:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| Process | *Process | 是 | 服务进程信息 |
| Spans | []*Span | 是 | span 列表(长度 > 0) |
响应结构体:
// HTTP 200 OK,响应体为空
// 或 HTTP 400/500,响应体为错误信息(纯文本)
响应字段表:
| HTTP 状态码 | 响应体 | 说明 |
|---|---|---|
| 200 OK | 空 | 成功接收并处理 |
| 400 Bad Request | 错误信息(文本) | 请求格式错误或无法解析 |
| 500 Internal Server Error | 错误信息(文本) | 服务器内部错误 |
| 503 Service Unavailable | 错误信息(文本) | 队列满,资源不足 |
入口函数与核心代码:
入口函数:
// SaveSpan submits the span provided in the request body to the JaegerBatchesHandler
func (aH *APIHandler) SaveSpan(w http.ResponseWriter, r *http.Request) {
// 读取请求体并反序列化 Thrift 格式的 Batch
核心处理逻辑:
func (aH *APIHandler) SaveSpan(w http.ResponseWriter, r *http.Request) {
// 1. 读取请求体
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, fmt.Sprintf("Unable to read body: %v", err), http.StatusBadRequest)
return
}
// 2. 验证 Content-Type
contentType, _, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err != nil {
http.Error(w, fmt.Sprintf("Cannot parse content type: %v", err), http.StatusBadRequest)
return
}
if _, ok := acceptedThriftFormats[contentType]; !ok {
http.Error(w, fmt.Sprintf("Unsupported content type: %s", contentType), http.StatusBadRequest)
return
}
// 3. 反序列化 Thrift
tdes := thrift.NewTDeserializer()
batch := &tjaeger.Batch{}
if err = tdes.Read(batch, bodyBytes); err != nil {
http.Error(w, fmt.Sprintf("Unable to deserialize: %v", err), http.StatusBadRequest)
return
}
// 4. 调用 JaegerBatchesHandler 处理
ctx := r.Context()
_, err = aH.jaegerBatchesHandler.SubmitBatches(ctx, []*tjaeger.Batch{batch}, handler.SubmitBatchOptions{
InboundTransport: processor.HTTPTransport,
})
// 5. 处理错误
if err != nil {
if errors.Is(err, processor.ErrBusy) {
http.Error(w, "Server busy", http.StatusServiceUnavailable)
} else {
http.Error(w, fmt.Sprintf("Cannot submit batch: %v", err), http.StatusInternalServerError)
}
return
}
// 6. 返回成功
w.WriteHeader(http.StatusOK)
}
时序图:
sequenceDiagram
autonumber
participant C as HTTP Client
participant HS as HTTP Server<br/>(Gorilla Mux)
participant AH as APIHandler
participant JBH as JaegerBatchesHandler
participant SP as SpanProcessor
participant Q as Queue
participant ST as Storage
C->>HS: POST /api/traces<br/>(Thrift binary)
HS->>AH: SaveSpan(w, r)
AH->>AH: 读取请求体<br/>验证 Content-Type
AH->>AH: 反序列化 Thrift
AH->>JBH: SubmitBatches(ctx, batches)
JBH->>SP: ProcessSpans(ctx, SpansV1)
SP->>Q: Produce(queueItem)
alt 队列满
Q-->>SP: false
SP-->>JBH: ErrBusy
JBH-->>AH: ErrBusy
AH->>HS: 503 Service Unavailable
HS->>C: 503 Response
else 成功入队
Q-->>SP: true
SP-->>JBH: ok
JBH-->>AH: ok
AH->>HS: 200 OK
HS->>C: 200 Response
Note over Q,ST: 异步处理
Q->>ST: 存储写入
end
边界与异常:
- Content-Type 错误:仅支持
application/x-thrift和application/vnd.apache.thrift.binary,其他返回 400 - Thrift 格式错误:无法反序列化时返回 400
- 队列满:返回 503 Service Unavailable
2.2.3 OTLP Traces (gRPC)
基本信息:
- 名称:
Export - 协议:gRPC
opentelemetry.proto.collector.trace.v1.TraceService/Export - 端口:4317(需配置
--collector.otlp.enabled=true) - 幂等性:否
请求结构体:
// ExportTraceServiceRequest 定义在 OTEL proto
type ExportTraceServiceRequest struct {
ResourceSpans []*ResourceSpans // OTLP traces 数据
}
响应结构体:
type ExportTraceServiceResponse struct {
PartialSuccess *ExportTracePartialSuccess // 部分成功信息(可选)
}
入口函数与核心代码:
OTLP Receiver 由 OpenTelemetry Collector 的 otlpreceiver 组件实现,Collector 通过配置启用:
// cmd/collector/app/handler/otlp_receiver.go
func StartOTLPReceiver(opts *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.Manager) (receiver.Traces, error) {
// 创建 OTLP Receiver
// 将接收到的 OTLP traces 转发给 spanProcessor
// 内部逻辑类似 PostSpans,但数据格式为 ptrace.Traces(OTEL Collector 的数据模型)
}
处理流程:
OTLP gRPC Client → OTLP Receiver → SpanProcessor.ProcessSpans(SpansV2{Traces: ptrace.Traces})
→ otelExporter.ConsumeTraces(ctx, traces)
→ traceWriter.WriteTraces(ctx, traces)
→ Storage
时序图: 类似 PostSpans,差异在于数据格式为 ptrace.Traces(OTEL 内部格式)
2.2.4 GetSamplingStrategy (gRPC)
基本信息:
- 名称:
GetSamplingStrategy - 协议:gRPC
jaeger.api_v2.SamplingManager/GetSamplingStrategy - 端口:14250
- 幂等性:是(相同请求返回相同采样策略)
请求结构体:
type GetSamplingStrategyRequest struct {
ServiceName string // 服务名
}
请求字段表:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| ServiceName | string | 是 | 服务名(如 “frontend”) |
响应结构体:
type SamplingStrategyResponse struct {
StrategyType SamplingStrategyType // 策略类型(PROBABILISTIC 或 RATE_LIMITING)
ProbabilisticSampling *ProbabilisticSamplingStrategy // 概率采样(0.0-1.0)
RateLimitingSampling *RateLimitingSamplingStrategy // 速率限制采样(traces/second)
OperationSampling *PerOperationSamplingStrategies // 操作级采样策略
}
type ProbabilisticSamplingStrategy struct {
SamplingRate float64 // 采样概率(0.0-1.0)
}
type RateLimitingSamplingStrategy struct {
MaxTracesPerSecond int32 // 最大 traces/second
}
响应字段表:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| StrategyType | enum | 是 | PROBABILISTIC(0) 或 RATE_LIMITING(1) |
| ProbabilisticSampling | *ProbabilisticSamplingStrategy | 否 | StrategyType=PROBABILISTIC 时返回 |
| RateLimitingSampling | *RateLimitingSamplingStrategy | 否 | StrategyType=RATE_LIMITING 时返回 |
| OperationSampling | *PerOperationSamplingStrategies | 否 | 操作级策略(覆盖服务级策略) |
入口函数与核心代码:
// Sampling gRPC Handler
type samplingHandler struct {
provider samplingstrategy.Provider
}
func (h *samplingHandler) GetSamplingStrategy(ctx context.Context, req *api_v2.GetSamplingStrategyRequest) (*api_v2.SamplingStrategyResponse, error) {
// 调用 Sampling Provider 获取策略
resp, err := h.provider.GetSamplingStrategy(ctx, req.ServiceName)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get sampling strategy: %v", err)
}
return resp, nil
}
时序图:
sequenceDiagram
autonumber
participant SDK as Client SDK
participant GS as gRPC Server
participant SH as SamplingHandler
participant SP as SamplingProvider
participant SS as SamplingStore
SDK->>GS: GetSamplingStrategy<br/>(service="frontend")
GS->>SH: GetSamplingStrategy(ctx, req)
SH->>SP: GetSamplingStrategy<br/>(ctx, "frontend")
alt 文件策略
SP->>SP: 读取静态配置文件
SP-->>SH: SamplingStrategyResponse<br/>(概率或速率限制)
else 自适应策略
SP->>SS: 查询服务采样概率
SS-->>SP: 概率值(0.0-1.0)
SP-->>SH: SamplingStrategyResponse<br/>(动态概率)
end
SH-->>GS: SamplingStrategyResponse
GS-->>SDK: Response
SDK->>SDK: 应用采样策略<br/>到新 trace
最佳实践:
- SDK 应定期拉取采样策略(建议 1-5 分钟一次),而非每次 trace 都请求
- 使用缓存避免频繁查询 Sampling Store
3. 关键数据结构与 UML
3.1 核心数据结构
3.1.1 Collector 结构体
type Collector struct {
serviceName string // 服务名 "jaeger-collector"
logger *zap.Logger // 日志器
metricsFactory metrics.Factory // 指标工厂
traceWriter tracestore.Writer // Trace 写入器
samplingProvider samplingstrategy.Provider // 采样策略提供者
samplingAggregator samplingstrategy.Aggregator // 采样聚合器
hCheck *healthcheck.HealthCheck // 健康检查
spanProcessor processor.SpanProcessor // Span 处理器
spanHandlers *SpanHandlers // Span 处理器集合
tenancyMgr *tenancy.Manager // 租户管理器
// 服务器实例(只读)
hServer *http.Server // HTTP 服务器
grpcServer *grpc.Server // gRPC 服务器
otlpReceiver receiver.Traces // OTLP 接收器
zipkinReceiver receiver.Traces // Zipkin 接收器
}
3.1.2 SpanProcessor 结构体
type spanProcessor struct {
queue *queue.BoundedQueue[queueItem] // 有界队列
otelExporter exporter.Traces // OTEL 导出器
queueResizeMu sync.Mutex // 队列调整锁
metrics *SpanProcessorMetrics // 指标
telset telemetry.Settings // 遥测设置
preProcessSpans ProcessSpans // 预处理函数
filterSpan FilterSpan // 过滤函数
sanitizer sanitizer.SanitizeSpan // 清洗函数
processSpan ProcessSpan // 处理函数
logger *zap.Logger // 日志器
traceWriter tracestore.Writer // Trace 写入器
reportBusy bool // 是否报告忙碌状态
numWorkers int // 工作线程数
collectorTags map[string]string // Collector 标签
dynQueueSizeWarmup uint // 动态队列预热大小
dynQueueSizeMemory uint // 动态队列内存上限
bytesProcessed atomic.Uint64 // 已处理字节数
spansProcessed atomic.Uint64 // 已处理 span 数
stopCh chan struct{} // 停止信号
}
type queueItem struct {
queuedTime time.Time // 入队时间
span *model.Span // Span 数据
tenant string // 租户 ID
}
3.1.3 SpanHandlers 结构体
type SpanHandlers struct {
ZipkinSpansHandler handler.ZipkinSpansHandler // Zipkin 处理器
JaegerBatchesHandler handler.JaegerBatchesHandler // Jaeger Batch 处理器
GRPCHandler *handler.GRPCHandler // gRPC 处理器
}
3.2 UML 类图
classDiagram
class Collector {
-string serviceName
-Logger logger
-MetricsFactory metricsFactory
-TraceWriter traceWriter
-SamplingProvider samplingProvider
-SamplingAggregator samplingAggregator
-SpanProcessor spanProcessor
-SpanHandlers spanHandlers
-TenancyManager tenancyMgr
+Start(options) error
+Close() error
}
class SpanProcessor {
<<interface>>
+ProcessSpans(ctx, batch) ([]bool, error)
+Close() error
}
class spanProcessor {
-BoundedQueue queue
-TraceWriter traceWriter
-Sanitizer sanitizer
-FilterSpan filterSpan
-int numWorkers
-map collectorTags
-ProcessSpan processSpan
+ProcessSpans(ctx, batch) ([]bool, error)
-enqueueSpan(span) bool
-processItemFromQueue(item)
-saveSpan(span, tenant)
}
class BoundedQueue~queueItem~ {
-int capacity
-chan items
-onDroppedItem func
+Produce(item) bool
+StartConsumers(workers, consumer)
+Stop()
}
class queueItem {
+time queuedTime
+Span span
+string tenant
}
class SpanHandlers {
+ZipkinSpansHandler
+JaegerBatchesHandler
+GRPCHandler
}
class GRPCHandler {
-Logger logger
-BatchConsumer batchConsumer
+PostSpans(ctx, req) (resp, error)
}
class BatchConsumer {
-SpanProcessor spanProcessor
-TenancyManager tenancyMgr
+consume(ctx, batch) error
-validateTenant(ctx) (string, error)
}
class TraceWriter {
<<interface>>
+WriteSpan(ctx, span) error
+WriteTraces(ctx, traces) error
}
Collector "1" --> "1" SpanProcessor : uses
Collector "1" --> "1" SpanHandlers : contains
Collector "1" --> "1" TraceWriter : uses
SpanHandlers "1" --> "1" GRPCHandler : contains
GRPCHandler "1" --> "1" BatchConsumer : uses
BatchConsumer "1" --> "1" SpanProcessor : uses
SpanProcessor <|.. spanProcessor : implements
spanProcessor "1" --> "1" BoundedQueue : uses
spanProcessor "1" --> "1" TraceWriter : uses
BoundedQueue "1" --> "*" queueItem : stores
3.3 数据结构说明
Collector:
- 顶层组件,负责协调所有子组件的生命周期
- 持有 gRPC/HTTP 服务器、Span 处理器、Trace 写入器、采样相关组件的引用
- 无持久状态,所有状态存储在下游组件(Storage、Sampling Store)
spanProcessor:
- 核心处理逻辑,实现
SpanProcessor接口 - 使用有界队列(
BoundedQueue)缓冲 span,解耦接收和处理 - 管理工作线程池(goroutine),从队列消费并处理 span
- 支持动态调整队列大小(基于内存使用率)
BoundedQueue:
- 有界队列,容量固定(默认 2000)
- 当队列满时,丢弃新 span 并调用
onDroppedItem回调 - 使用 Go channel 实现,支持多生产者多消费者
queueItem:
- 队列中的单元,包含 span 数据、入队时间、租户 ID
- 入队时间用于统计队列延迟(
InQueueLatency指标)
SpanHandlers:
- 聚合不同协议的处理器(gRPC、HTTP、Zipkin)
- 所有处理器最终调用同一个
SpanProcessor
4. 关键功能与函数详细描述
4.1 Span 处理流程
4.1.1 ProcessSpans 函数
函数签名:
func (sp *spanProcessor) ProcessSpans(ctx context.Context, batch processor.Batch) ([]bool, error)
功能描述:
ProcessSpans 是 span 处理的总入口,负责接收 span 批次、预处理、入队、返回结果。
核心代码:
func (sp *spanProcessor) ProcessSpans(ctx context.Context, batch processor.Batch) ([]bool, error) {
// 1. 调用预处理函数(用户自定义,可选)
sp.preProcessSpans(batch)
var batchOks []bool
var batchErr error
// 2. 根据数据模型版本处理(V1 或 V2)
batch.GetSpans(func(spans []*model.Span) {
// V1: Jaeger model.Span
batchOks, batchErr = sp.processSpans(ctx, batch, spans)
}, func(traces ptrace.Traces) {
// V2: OTLP ptrace.Traces
ctx := tenancy.WithTenant(ctx, batch.GetTenant())
if err := sp.otelExporter.ConsumeTraces(ctx, traces); err != nil {
batchErr = err
} else {
batchOks = make([]bool, traces.SpanCount())
for i := range batchOks {
batchOks[i] = true
}
}
})
return batchOks, batchErr
}
详细步骤:
-
预处理(
preProcessSpans):- 用户可注册自定义预处理函数,执行数据脱敏、加密等操作
- 默认为空函数
-
数据模型分支:
- V1 分支:Jaeger 原生格式(
model.Span)- 调用
processSpans逐个处理 span - 为每个 span 添加 Collector 标签和格式标记
- 入队到
BoundedQueue
- 调用
- V2 分支:OTLP 格式(
ptrace.Traces)- 调用
otelExporter.ConsumeTraces - 内部最终调用
traceWriter.WriteTraces
- 调用
- V1 分支:Jaeger 原生格式(
-
返回结果:
batchOks:布尔数组,表示每个 span 是否成功入队batchErr:错误信息(如队列满返回ErrBusy)
调用链:
PostSpans (gRPC) → GRPCHandler.PostSpans()
→ batchConsumer.consume()
→ spanProcessor.ProcessSpans()
4.1.2 enqueueSpan 函数
函数签名:
func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor.SpanFormat, transport processor.InboundTransport, tenant string) bool
功能描述:
将单个 span 入队到内存队列,执行过滤和指标记录。
核心代码:
func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor.SpanFormat, transport processor.InboundTransport, tenant string) bool {
// 1. 更新接收指标(按服务名和格式分组)
spanCounts := sp.metrics.GetCountsForFormat(originalFormat, transport)
spanCounts.ReceivedBySvc.ForSpanV1(span)
// 2. 调用过滤器
if !sp.filterSpan(span) {
// span 被过滤,更新拒绝指标
spanCounts.RejectedBySvc.ForSpanV1(span)
return true // 返回 true 表示"不是因为队列满被拒绝"
}
// 3. 添加格式标签(标记 span 来自哪种格式)
span.Tags = append(span.Tags, model.String(jptrace.FormatAttribute, string(originalFormat)))
// 4. 构造队列项
item := queueItem{
queuedTime: time.Now(),
span: span,
tenant: tenant,
}
// 5. 入队
return sp.queue.Produce(item) // 返回 false 表示队列满
}
详细步骤:
- 指标记录:根据 span 的服务名、格式(Thrift、Proto、Zipkin)、传输协议(gRPC、HTTP)更新接收计数
- 过滤:调用
filterSpan函数,默认返回true(不过滤),可自定义过滤规则 - 标签补充:添加
internal.span.format标签,标记 span 来源格式 - 入队:调用
queue.Produce,如果队列满返回false
调用链:
processSpans() → enqueueSpan() → queue.Produce()
4.1.3 processItemFromQueue 函数
函数签名:
func (sp *spanProcessor) processItemFromQueue(item queueItem)
功能描述:
工作线程从队列消费 span,执行清洗、处理、写入存储。
核心代码:
func (sp *spanProcessor) processItemFromQueue(item queueItem) {
// 1. 调用 sanitizer 清洗数据
cleanedSpan := sp.sanitizer(item.span)
// 2. 调用 processSpan 函数链(包含采样统计、写入存储等)
sp.processSpan(cleanedSpan, item.tenant)
// 3. 更新队列延迟指标
sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime))
}
详细步骤:
-
数据清洗(
sanitizer):- 清洗非法字段(如过长的字符串)
- 标准化时间戳(微秒转纳秒)
- 去除重复标签
-
处理函数链(
processSpan):- 采样统计:如果是 root span,调用
samplingAggregator.HandleRootSpan - 写入存储:调用
saveSpan→traceWriter.WriteSpan - 更新指标:成功/失败计数
- 采样统计:如果是 root span,调用
-
指标记录:
InQueueLatency:span 在队列中的等待时间(用于监控队列是否堵塞)
调用链:
Queue Consumer Goroutine → processItemFromQueue()
→ sanitizer()
→ processSpan()
→ saveSpan()
→ traceWriter.WriteSpan()
4.2 Sanitizer 数据清洗
函数链:
type SanitizeSpan func(*model.Span) *model.Span
func NewChainedSanitizer(sanitizers ...SanitizeSpan) SanitizeSpan {
return ChainedSanitizeSpan(sanitizers...)
}
标准清洗器:
- UTF-8 Sanitizer:确保所有字符串字段为有效 UTF-8
- Service Name Sanitizer:清理服务名中的非法字符
- ParentSpanID Sanitizer:修复 parent span ID 为 0 的情况(根 span)
清洗示例:
// UTF-8 清洗
func NewUTF8Sanitizer() SanitizeSpan {
return func(span *model.Span) *model.Span {
span.OperationName = sanitizeUTF8(span.OperationName)
for i := range span.Tags {
span.Tags[i].VStr = sanitizeUTF8(span.Tags[i].VStr)
}
return span
}
}
4.3 Collector Tags 补充
功能:
为每个 span 添加 Collector 的标识标签,标记 span 经过了哪个 Collector 实例。
核心代码:
func (sp *spanProcessor) addCollectorTags(span *model.Span) {
if len(sp.collectorTags) == 0 {
return
}
// 1. 检查 span.Process.Tags 中是否已存在相同的 key-value
dedupKey := make(map[string]struct{})
for _, tag := range span.Process.Tags {
if value, ok := sp.collectorTags[tag.Key]; ok && value == tag.AsString() {
dedupKey[tag.Key] = struct{}{} // 标记为已存在,不重复添加
}
}
// 2. 添加不存在的 Collector 标签
for k, v := range sp.collectorTags {
if _, ok := dedupKey[k]; !ok {
span.Process.Tags = append(span.Process.Tags, model.String(k, v))
}
}
// 3. 对标签排序(保证一致性)
typedTags := model.KeyValues(span.Process.Tags)
typedTags.Sort()
}
配置示例:
jaeger-collector --collector.tags="hostname=collector-01,zone=us-east-1"
结果:
所有 span 的 Process.Tags 中会包含:
{
"hostname": "collector-01",
"zone": "us-east-1"
}
4.4 动态队列调整
功能:
根据系统内存使用情况动态调整队列大小,避免内存溢出。
核心代码:
func (sp *spanProcessor) updateQueueSize() {
sp.queueResizeMu.Lock()
defer sp.queueResizeMu.Unlock()
// 1. 计算平均 span 大小
spansProcessed := sp.spansProcessed.Load()
bytesProcessed := sp.bytesProcessed.Load()
if spansProcessed == 0 {
return // 还没有数据,无法计算
}
avgSpanSizeBytes := bytesProcessed / spansProcessed
// 2. 计算目标队列大小(基于内存上限)
memoryMiB := sp.dynQueueSizeMemory
if memoryMiB == 0 {
return
}
targetQueueSize := (memoryMiB * 1024 * 1024) / avgSpanSizeBytes
// 3. 限制最大队列大小
if targetQueueSize > maxQueueSize {
targetQueueSize = maxQueueSize
}
// 4. 检查是否需要调整(至少增长 20%)
currentSize := sp.queue.Capacity()
if float64(targetQueueSize) / float64(currentSize) < minRequiredChange {
return // 变化不大,不调整
}
// 5. 调整队列大小
sp.queue.Resize(int(targetQueueSize))
sp.logger.Info("Resized queue", zap.Int("old-size", currentSize), zap.Uint64("new-size", targetQueueSize))
}
触发条件:
- 后台任务每 1 分钟执行一次
- 需启用动态队列(
--collector.queue-size-memory> 0)
算法:
目标队列大小 = (内存上限 / 平均 span 大小)
5. 配置项与最佳实践
5.1 关键配置项
| 配置项 | 默认值 | 说明 | 建议值 |
|---|---|---|---|
--collector.num-workers |
50 | 工作线程数 | CPU 核数 × 2-4 |
--collector.queue-size |
2000 | 队列大小 | >= 工作线程数 × 10 |
--collector.queue-size-memory |
0 (禁用) | 动态队列内存上限(字节) | 512MB - 2GB |
--collector.grpc-server.host-port |
:14250 | gRPC 监听地址 | 0.0.0.0:14250 |
--collector.http-server.host-port |
:14268 | HTTP 监听地址 | 0.0.0.0:14268 |
--collector.otlp.enabled |
true (V2) | 是否启用 OTLP Receiver | true |
--collector.tags |
空 | Collector 标签 | “hostname=$(hostname),dc=us-east” |
--collector.enable-span-size-metrics |
false | 是否启用 span 大小指标 | true(用于动态队列) |
--span-storage.type |
memory | 存储类型 | cassandra/elasticsearch |
5.2 性能调优
高吞吐场景(> 10K spans/s):
jaeger-collector \
--collector.num-workers=200 \
--collector.queue-size=20000 \
--collector.queue-size-memory=2147483648 \ # 2GB
--collector.grpc-server.max-message-size=8388608 \ # 8MB
--span-storage.type=kafka # 使用 Kafka 解耦
低延迟场景(P95 < 50ms):
jaeger-collector \
--collector.num-workers=100 \
--collector.queue-size=5000 \
--span-storage.type=cassandra # 直接写入,避免 Kafka 延迟
资源受限场景(小内存):
jaeger-collector \
--collector.num-workers=20 \
--collector.queue-size=1000 \
--collector.queue-size-memory=268435456 # 256MB
5.3 监控告警
关键指标:
-
队列深度 (
jaeger_collector_queue_length)- 正常:< 50%
- 警告:> 80%
- 告警:> 95%
-
拒绝率 (
jaeger_collector_spans_rejected_total/jaeger_collector_spans_received_total)- 正常:< 0.1%
- 告警:> 1%
-
队列延迟 (
jaeger_collector_in_queue_latency)- 正常:< 10ms
- 警告:> 50ms
- 告警:> 100ms
-
存储写入失败率 (
jaeger_collector_spans_saved_by_svc{result="err"})- 正常:< 0.01%
- 告警:> 0.1%
Prometheus 告警规则:
groups:
- name: jaeger_collector
rules:
- alert: CollectorQueueHigh
expr: jaeger_collector_queue_length / jaeger_collector_queue_size > 0.8
for: 5m
annotations:
summary: "Collector queue is {{ $value | humanizePercentage }} full"
- alert: CollectorHighRejectionRate
expr: rate(jaeger_collector_spans_rejected_total[5m]) / rate(jaeger_collector_spans_received_total[5m]) > 0.01
for: 5m
annotations:
summary: "Collector rejecting {{ $value | humanizePercentage }} of spans"
- alert: CollectorStorageWriteErrors
expr: rate(jaeger_collector_spans_saved_by_svc{result="err"}[5m]) > 10
for: 5m
annotations:
summary: "Collector storage write errors: {{ $value }} errors/sec"
6. 故障排查
6.1 常见问题
问题 1:客户端报 “ResourceExhausted” 错误
原因:
- Collector 队列满,无法接收新 span
排查:
# 查看队列深度指标
curl http://collector:14269/metrics | grep queue_length
# 查看工作线程数
curl http://collector:14269/metrics | grep num_workers
解决:
- 增加工作线程数:
--collector.num-workers=100 - 增加队列大小:
--collector.queue-size=5000 - 使用 Kafka 模式解耦:
--span-storage.type=kafka
问题 2:存储写入失败率高
原因:
- 存储后端不可用或过载
- 网络问题
排查:
# 查看存储写入错误指标
curl http://collector:14269/metrics | grep spans_saved_by_svc
# 查看 Collector 日志
kubectl logs -f jaeger-collector-xxx | grep "Failed to save span"
解决:
- 检查存储后端健康状态(Cassandra/ES 集群)
- 增加存储容量或优化索引
- 启用 Kafka 缓冲:
--span-storage.type=kafka
问题 3:内存使用持续增长
原因:
- 队列积压
- 动态队列未限制
排查:
# 查看内存使用
kubectl top pod jaeger-collector-xxx
# 查看队列深度
curl http://collector:14269/metrics | grep queue_length
解决:
- 启用动态队列并设置内存上限:
--collector.queue-size-memory=1073741824# 1GB - 限制队列大小:
--collector.queue-size=2000
7. 总结
Collector 模块是 Jaeger 的数据接收核心,具备以下关键特性:
- 多协议支持:Jaeger、OTLP、Zipkin 全覆盖
- 高吞吐处理:通过队列和工作线程池实现高并发
- 灵活扩展:支持自定义过滤器、处理器、存储后端
- 流量保护:队列满时拒绝新请求,避免雪崩
- 智能监控:丰富的指标和健康检查