gRPC-Go 整体架构分析
目录
整体架构概览
gRPC-Go 采用分层架构设计,从上到下分为应用层、RPC 层、传输层和网络层:
graph TB
subgraph "应用层 (Application Layer)"
A1[业务服务实现]
A2[客户端应用代码]
end
subgraph "RPC 层 (RPC Layer)"
R1[Server]
R2[ClientConn]
R3[拦截器链]
R4[编解码器]
end
subgraph "负载均衡层 (Load Balancing Layer)"
L1[Resolver]
L2[Balancer]
L3[Picker]
L4[SubConn]
end
subgraph "传输层 (Transport Layer)"
T1[HTTP/2 Server]
T2[HTTP/2 Client]
T3[Stream 管理]
T4[流控制]
end
subgraph "网络层 (Network Layer)"
N1[TCP 连接]
N2[TLS/安全层]
end
A1 --> R1
A2 --> R2
R1 --> R3
R2 --> R3
R3 --> R4
R2 --> L1
L1 --> L2
L2 --> L3
L3 --> L4
L4 --> T2
R1 --> T1
T1 --> T3
T2 --> T3
T3 --> T4
T4 --> N1
N1 --> N2
style A1 fill:#e3f2fd
style A2 fill:#e3f2fd
style R1 fill:#f3e5f5
style R2 fill:#f3e5f5
style L1 fill:#e8f5e8
style L2 fill:#e8f5e8
style T1 fill:#fff3e0
style T2 fill:#fff3e0
核心组件架构
1. 服务端架构
graph TB
subgraph "gRPC Server 架构"
S1[grpc.Server]
S2[ServiceDesc 注册表]
S3[连接管理器]
S4[流处理器]
subgraph "传输层"
T1[HTTP/2 Server Transport]
T2[loopyWriter]
T3[controlBuffer]
T4[Framer]
end
subgraph "流管理"
F1[Stream 注册]
F2[方法路由]
F3[拦截器链]
F4[编解码]
end
subgraph "连接处理"
C1[Accept 循环]
C2[握手处理]
C3[Keepalive]
C4[优雅关闭]
end
end
S1 --> S2
S1 --> S3
S3 --> C1
C1 --> C2
C2 --> T1
T1 --> T2
T2 --> T3
T3 --> T4
S1 --> S4
S4 --> F1
F1 --> F2
F2 --> F3
F3 --> F4
T1 --> F1
C2 --> C3
S1 --> C4
style S1 fill:#ffebee
style T1 fill:#e8f5e8
style F2 fill:#e3f2fd
2. 客户端架构
graph TB
subgraph "gRPC Client 架构"
C1[grpc.ClientConn]
C2[连接状态管理]
C3[配置选择器]
subgraph "服务发现"
R1[Resolver Wrapper]
R2[DNS Resolver]
R3[地址更新]
end
subgraph "负载均衡"
B1[Balancer Wrapper]
B2[Round Robin]
B3[Pick First]
B4[Picker Wrapper]
end
subgraph "连接池"
A1[addrConn]
A2[SubConn]
A3[Transport]
end
subgraph "RPC 调用"
P1[clientStream]
P2[csAttempt]
P3[重试机制]
P4[超时控制]
end
end
C1 --> C2
C1 --> C3
C1 --> R1
R1 --> R2
R2 --> R3
R3 --> B1
B1 --> B2
B1 --> B3
B1 --> B4
B4 --> A1
A1 --> A2
A2 --> A3
C1 --> P1
P1 --> P2
P2 --> P3
P2 --> P4
P2 --> A3
style C1 fill:#e3f2fd
style R1 fill:#e8f5e8
style B1 fill:#fff3e0
style P1 fill:#f3e5f5
3. 传输层架构
graph TB
subgraph "HTTP/2 传输层架构"
subgraph "客户端传输"
C1[http2Client]
C2[连接管理]
C3[流创建]
C4[数据发送]
end
subgraph "服务端传输"
S1[http2Server]
S2[连接接受]
S3[流处理]
S4[数据接收]
end
subgraph "共享组件"
F1[Framer - 帧编解码]
F2[HPACK 编码器]
F3[流控制器]
F4[Keepalive 管理]
end
subgraph "控制流"
L1[loopyWriter]
L2[controlBuffer]
L3[指令队列]
L4[批量写入]
end
subgraph "数据流"
D1[recvBuffer]
D2[sendBuffer]
D3[窗口管理]
D4[背压控制]
end
end
C1 --> C2
C2 --> C3
C3 --> C4
S1 --> S2
S2 --> S3
S3 --> S4
C1 --> F1
S1 --> F1
F1 --> F2
F1 --> F3
F1 --> F4
C4 --> L1
S4 --> L1
L1 --> L2
L2 --> L3
L3 --> L4
C3 --> D1
S3 --> D1
D1 --> D2
D2 --> D3
D3 --> D4
style C1 fill:#e3f2fd
style S1 fill:#ffebee
style F1 fill:#e8f5e8
style L1 fill:#fff3e0
模块交互关系
1. 完整调用链路图
sequenceDiagram
participant App as 应用代码
participant CC as ClientConn
participant R as Resolver
participant B as Balancer
participant P as Picker
participant AC as addrConn
participant T as Transport
participant S as Server
participant H as Handler
Note over App,H: 客户端初始化阶段
App->>CC: grpc.NewClient(target)
CC->>R: 启动地址解析
R->>B: 更新地址列表
B->>P: 生成 Picker
B->>AC: 创建 SubConn
AC->>T: 建立 HTTP/2 连接
Note over App,H: RPC 调用阶段
App->>CC: client.SayHello(req)
CC->>P: 选择连接
P->>AC: 返回可用连接
AC->>T: 创建 Stream
T->>S: 发送 HEADERS 帧
S->>S: 路由到方法
S->>H: 调用业务逻辑
H->>S: 返回响应
S->>T: 发送响应数据
T->>CC: 返回结果
CC->>App: 完成调用
Note over App,H: 连接管理
R->>R: 定期刷新地址
AC->>AC: 健康检查
T->>T: Keepalive 心跳
2. 数据流向图
flowchart TD
subgraph "客户端数据流"
A1[应用数据] --> A2[编码/压缩]
A2 --> A3[gRPC Wire Format]
A3 --> A4[HTTP/2 DATA 帧]
A4 --> A5[TCP 数据包]
end
subgraph "网络传输"
A5 --> N1[网络]
N1 --> B5[TCP 数据包]
end
subgraph "服务端数据流"
B5 --> B4[HTTP/2 DATA 帧]
B4 --> B3[gRPC Wire Format]
B3 --> B2[解码/解压]
B2 --> B1[应用数据]
end
subgraph "响应数据流"
C1[响应数据] --> C2[编码/压缩]
C2 --> C3[gRPC Wire Format]
C3 --> C4[HTTP/2 DATA 帧]
C4 --> C5[TCP 数据包]
end
subgraph "网络返回"
C5 --> N2[网络]
N2 --> D5[TCP 数据包]
end
subgraph "客户端响应流"
D5 --> D4[HTTP/2 DATA 帧]
D4 --> D3[gRPC Wire Format]
D3 --> D2[解码/解压]
D2 --> D1[应用数据]
end
B1 --> C1
style A1 fill:#e3f2fd
style B1 fill:#ffebee
style C1 fill:#ffebee
style D1 fill:#e3f2fd
数据流时序图
1. 客户端连接建立时序
sequenceDiagram
participant App as 应用
participant CC as ClientConn
participant CSM as StateManager
participant RW as ResolverWrapper
participant R as Resolver
participant BW as BalancerWrapper
participant B as Balancer
participant AC as addrConn
participant T as HTTP2Transport
App->>CC: grpc.NewClient(target)
CC->>CSM: 初始化状态管理
CSM->>CSM: 设置状态为 Idle
CC->>RW: 创建 Resolver 包装器
CC->>BW: 创建 Balancer 包装器
CC->>RW: start() 启动解析
RW->>R: Build(target, cc, opts)
R->>R: 执行 DNS 查询
R->>RW: UpdateState(addresses)
RW->>BW: UpdateClientConnState(state)
BW->>B: UpdateClientConnState(addrs)
loop 为每个地址
B->>BW: NewSubConn(addr)
BW->>AC: 创建 addrConn
AC->>CSM: updateState(Connecting)
B->>AC: Connect()
AC->>T: newHTTP2Client(conn)
T->>T: TLS 握手
T->>T: HTTP/2 握手
T-->>AC: 连接就绪
AC->>CSM: updateState(Ready)
AC->>B: StateListener(Ready)
end
B->>B: regeneratePicker()
B->>BW: UpdateState(Ready, picker)
BW->>CC: 更新 Picker
CC-->>App: ClientConn 就绪
2. RPC 调用完整时序
sequenceDiagram
participant App as 应用
participant CC as ClientConn
participant CS as clientStream
participant CSA as csAttempt
participant PW as pickerWrapper
participant P as Picker
participant AC as addrConn
participant T as HTTP2Client
participant ST as ServerTransport
participant S as Server
participant H as Handler
App->>CC: Invoke(ctx, method, req, reply)
CC->>CS: newClientStream(ctx, desc, method)
CS->>CS: 等待地址解析完成
CS->>CSA: newAttempt()
CSA->>PW: pick(ctx, info)
PW->>P: Pick(info)
P->>P: 执行负载均衡算法
P-->>PW: PickResult{SubConn: ac}
PW-->>CSA: 选中的连接
CSA->>AC: getReadyTransport()
AC-->>CSA: HTTP2Client
CSA->>T: NewStream(ctx, callHdr)
T->>T: 分配 streamID
T->>ST: HEADERS 帧
ST->>S: operateHeaders()
S->>S: 解析方法路径
S->>S: 查找服务和方法
S->>S: processUnaryRPC()
CS->>CSA: SendMsg(req)
CSA->>T: Write(data)
T->>ST: DATA 帧
ST->>S: 接收请求数据
S->>S: 解码请求
S->>H: 调用业务方法
H-->>S: 返回响应
S->>S: 编码响应
S->>ST: Write(response)
ST->>T: DATA 帧
CS->>CSA: RecvMsg(reply)
CSA->>T: Read()
T-->>CSA: 响应数据
CSA->>CSA: 解码响应
CSA-->>CS: reply 对象
CS-->>CC: 完成
CC-->>App: 返回结果
3. 流控制时序图
sequenceDiagram
participant App as 应用
participant LW as loopyWriter
participant CB as controlBuffer
participant F as Framer
participant Peer as 对端
participant OS as outStream
Note over App,OS: 数据发送流程
App->>CB: put(dataFrame)
CB->>LW: 通知有数据
LW->>CB: get() 获取数据
CB-->>LW: dataFrame
LW->>LW: 检查发送配额
alt 有足够配额
LW->>OS: 计算发送大小
LW->>F: WriteData(streamID, data)
F->>Peer: HTTP/2 DATA 帧
LW->>LW: 更新 sendQuota
LW->>OS: 更新 bytesOutStanding
else 配额不足
LW->>OS: 状态改为 waiting
LW->>LW: 移出 activeStreams
end
Note over App,OS: 窗口更新流程
Peer-->>F: WINDOW_UPDATE 帧
F->>LW: incomingWindowUpdate
LW->>LW: 更新连接级配额
LW->>OS: 更新流级配额
alt 流重新有配额
LW->>OS: 状态改为 active
LW->>LW: 加入 activeStreams
LW->>LW: 继续发送数据
end
关键结构体关系
1. 服务端核心结构
classDiagram
class Server {
+opts serverOptions
+lis map[net.Listener]bool
+conns map[string]map[ServerTransport]bool
+services map[string]*serviceInfo
+serve bool
+drain bool
+cv *sync.Cond
+quit *grpcsync.Event
+done *grpcsync.Event
+channelz *channelz.Server
+NewServer(opts) *Server
+RegisterService(sd, ss)
+Serve(lis) error
+GracefulStop()
+handleRawConn(addr, conn)
+serveStreams(st, conn)
+handleStream(st, stream)
+processUnaryRPC(st, stream, srv, md)
}
class serviceInfo {
+serviceImpl any
+methods map[string]*MethodDesc
+streams map[string]*StreamDesc
+mdata any
}
class MethodDesc {
+MethodName string
+Handler MethodHandler
}
class StreamDesc {
+StreamName string
+Handler StreamHandler
+ServerStreams bool
+ClientStreams bool
}
class ServerTransport {
<<interface>>
+HandleStreams(func(*Stream), func(context.Context, string) context.Context)
+WriteHeader(stream, md) error
+Write(stream, hdr, data, opts) error
+WriteStatus(stream, st) error
+Close() error
+RemoteAddr() net.Addr
+Drain()
}
class http2Server {
+ctx context.Context
+done *grpcsync.Event
+conn net.Conn
+loopy *loopyWriter
+framer *framer
+hBuf *bytes.Buffer
+hEnc *hpack.Encoder
+maxStreams uint32
+controlBuf *controlBuffer
+fc *trInFlow
+sendQuotaPool *quotaPool
+stats []stats.Handler
+keepaliveParams keepalive.ServerParameters
+czData *channelzData
+HandleStreams(streamHandler, ctxHandler)
+operateHeaders(frame) error
+WriteHeader(stream, md) error
+Write(stream, hdr, data, opts) error
+WriteStatus(stream, st) error
}
Server --> serviceInfo : contains
serviceInfo --> MethodDesc : contains
serviceInfo --> StreamDesc : contains
Server --> ServerTransport : manages
ServerTransport <|.. http2Server : implements
2. 客户端核心结构
classDiagram
class ClientConn {
+ctx context.Context
+cancel context.CancelFunc
+target string
+parsedTarget resolver.Target
+authority string
+dopts dialOptions
+csMgr *connectivityStateManager
+balancerWrapper *ccBalancerWrapper
+resolverWrapper *ccResolverWrapper
+blockingpicker *pickerWrapper
+conns map[*addrConn]struct{}
+channelz *channelz.Channel
+NewClient(target, opts) (*ClientConn, error)
+Invoke(ctx, method, args, reply, opts) error
+NewStream(ctx, desc, method, opts) (ClientStream, error)
+GetState() connectivity.State
+WaitForStateChange(ctx, lastState) bool
+Close() error
}
class ccResolverWrapper {
+cc *ClientConn
+resolverMu sync.Mutex
+resolver resolver.Resolver
+done *grpcsync.Event
+curState resolver.State
+start() error
+resolveNow(o resolver.ResolveNowOptions)
+UpdateState(s resolver.State) error
+ReportError(err error)
+close()
}
class ccBalancerWrapper {
+cc *ClientConn
+balancerMu sync.Mutex
+balancer balancer.Balancer
+updateCh *buffer.Unbounded
+done *grpcsync.Event
+subConns map[*acBalancerWrapper]struct{}
+UpdateClientConnState(ccs balancer.ClientConnState) error
+UpdateState(s balancer.State) error
+NewSubConn(addrs, opts) (balancer.SubConn, error)
+RemoveSubConn(sc balancer.SubConn)
+close()
}
class addrConn {
+cc *ClientConn
+addrs []resolver.Address
+ctx context.Context
+cancel context.CancelFunc
+stateMu sync.Mutex
+state connectivity.State
+backoffIdx int
+resetBackoff chan struct{}
+transport transport.ClientTransport
+czData *channelzData
+connect() error
+tryAllAddrs(addrs, connectDeadline) error
+createTransport(addr, copts, connectDeadline) error
+getReadyTransport() transport.ClientTransport
+tearDown(err error)
}
class pickerWrapper {
+mu sync.Mutex
+done bool
+blockingCh chan struct{}
+picker balancer.Picker
+updatePicker(p balancer.Picker)
+pick(ctx, failfast, info) (transport.ClientTransport, balancer.PickResult, error)
+close()
}
ClientConn --> ccResolverWrapper : has
ClientConn --> ccBalancerWrapper : has
ClientConn --> pickerWrapper : has
ClientConn --> addrConn : manages
ccBalancerWrapper --> addrConn : creates
3. 传输层结构关系
classDiagram
class ClientTransport {
<<interface>>
+Write(s, hdr, data, opts) error
+NewStream(ctx, callHdr) (*Stream, error)
+CloseStream(stream, err) error
+Error() <-chan struct{}
+GoAway() <-chan struct{}
+GetGoAwayReason() GoAwayReason
}
class ServerTransport {
<<interface>>
+HandleStreams(streamHandler, ctxHandler)
+WriteHeader(stream, md) error
+Write(stream, hdr, data, opts) error
+WriteStatus(stream, st) error
+Close() error
+Drain()
}
class http2Client {
+ctx context.Context
+ctxDone <-chan struct{}
+cancel context.CancelFunc
+conn net.Conn
+loopy *loopyWriter
+framer *framer
+hBuf *bytes.Buffer
+hEnc *hpack.Encoder
+controlBuf *controlBuffer
+fc *trInFlow
+sendQuotaPool *quotaPool
+localSendQuota *quotaPool
+mu sync.Mutex
+activeStreams map[uint32]*Stream
+nextID uint32
+maxConcurrentStreams uint32
+streamQuota int64
+streamsQuotaAvailable chan struct{}
+waitingStreams uint32
+goAway chan struct{}
+awakenKeepalive chan struct{}
+czData *channelzData
+NewStream(ctx, callHdr) (*Stream, error)
+Write(s, hdr, data, opts) error
+CloseStream(stream, err) error
+handleData(f) error
+handleHeaders(f) error
+handleRSTStream(f) error
+handleSettings(f) error
+handlePing(f) error
+handleGoAway(f) error
+handleWindowUpdate(f) error
}
class http2Server {
+ctx context.Context
+done *grpcsync.Event
+conn net.Conn
+loopy *loopyWriter
+framer *framer
+hBuf *bytes.Buffer
+hEnc *hpack.Encoder
+maxStreams uint32
+controlBuf *controlBuffer
+fc *trInFlow
+sendQuotaPool *quotaPool
+stats []stats.Handler
+mu sync.Mutex
+activeStreams map[uint32]*Stream
+streamSendQuota uint32
+czData *channelzData
+HandleStreams(streamHandler, ctxHandler)
+operateHeaders(frame) error
+WriteHeader(stream, md) error
+Write(stream, hdr, data, opts) error
+WriteStatus(stream, st) error
+handleData(f) error
+handleHeaders(f) error
+handleRSTStream(f) error
+handleSettings(f) error
+handlePing(f) error
+handleGoAway(f) error
+handleWindowUpdate(f) error
}
class loopyWriter {
+side side
+cbuf *controlBuffer
+sendQuota uint32
+oiws uint32
+estdStreams map[uint32]*outStream
+activeStreams *outStreamList
+framer *framer
+hBuf *bytes.Buffer
+hEnc *hpack.Encoder
+bdpEst *bdpEstimator
+draining bool
+conn net.Conn
+logger *grpclog.PrefixLogger
+run() error
+writeHeader(streamID, endStream, hf, onWrite) error
+processData() (bool, error)
+handleWindowUpdate(wu *windowUpdate) error
+outgoingWindowUpdateHandler(wu *windowUpdate) error
+incomingWindowUpdateHandler(wu *windowUpdate) error
}
class controlBuffer {
+ch chan struct{}
+done <-chan struct{}
+mu sync.Mutex
+consumerWaiting bool
+list *itemList
+err error
+consumeAndClose func(item) error
+put(it) error
+load() error
+get(block) (item, error)
+finish()
+close()
}
ClientTransport <|.. http2Client : implements
ServerTransport <|.. http2Server : implements
http2Client --> loopyWriter : has
http2Server --> loopyWriter : has
loopyWriter --> controlBuffer : uses
这个架构分析文档全面展示了 gRPC-Go 的整体架构、核心组件、模块交互关系和关键数据结构。通过详细的架构图和时序图,开发者可以深入理解 gRPC-Go 的内部工作机制和各组件之间的协作关系。