grpc-go源码剖析:server
一张图看懂 Server 端主流程
flowchart TD
A[main] --> B[net.Listen(:port)]
B --> C[grpc.NewServer(opts...)]
C --> D[RegisterGreeterServer(s,&server{})]
D --> E[Server.Serve(lis)]
E -->|Accept loop| F[handleRawConn]
F --> G[newHTTP2Transport<br/>(handshake,settings,preface)]
G --> H{注册成功?}
H -- 否 --> E
H -- 是 --> I[goroutine: serveStreams]
I --> J[HandleStreams: ReadFrame 循环]
J --> K{MetaHeaders?}
K -- 是 --> L[operateHeaders<br/>(解析path/metadata/限流)]
L --> M[registerStream → loopy.registerStream]
M --> N[handleStream → route方法]
N --> O{Unary or Streaming}
O -- Unary --> P[processUnaryRPC]
P --> Q[recvAndDecompress + Unmarshal]
Q --> R[业务Handler执行]
R --> S[sendResponse→t.Write]
S --> T[controlBuf<-dataFrame]
T --> U[loopyWriter.processData<br/>(flow-control & WriteData)]
O -- Streaming --> V[processStreamingRPC(略)]
J --> W{DataFrame?}
W -- 是 --> X[handleData → 写入 recvBufferReader]
X --> Y[window update / EOF]
J --> Z{其他帧: SETTINGS/PING/GOAWAY/RST}
Z --> AA[对应 handle* 分支处理]
极简示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
| package main
import (
"context"
"fmt"
"log"
"net"
"os"
"os/signal"
"syscall"
"google.golang.org/grpc"
pb "your/module/helloworld"
)
type greeter struct{ pb.UnimplementedGreeterServer }
func (g *greeter) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
func unaryLogger(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (any, error) {
resp, err := handler(ctx, req)
if err != nil {
log.Printf("[ERR] %s: %v", info.FullMethod, err)
} else {
log.Printf("[OK ] %s", info.FullMethod)
}
return resp, err
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("listen: %v", err)
}
s := grpc.NewServer(grpc.UnaryInterceptor(unaryLogger))
pb.RegisterGreeterServer(s, &greeter{})
// 优雅退出
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("serve: %v", err)
}
}()
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig
fmt.Println("shutting down...")
s.GracefulStop()
}
|
源码走读
1) ServiceDesc
与方法处理路径
protoc
生成的 Greeter_ServiceDesc
把 服务名、方法名 与 处理函数绑定,Server.RegisterService
会校验实现是否满足接口(反射)并登记在 server.services
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| var Greeter_ServiceDesc = grpc.ServiceDesc{
ServiceName: "helloworld.Greeter",
HandlerType: (*GreeterServer)(nil),
Methods: []grpc.MethodDesc{
{ MethodName: "SayHello", Handler: _Greeter_SayHello_Handler },
},
}
func (s *Server) RegisterService(sd *ServiceDesc, ss any) {
if ss != nil {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) {
logger.Fatalf("handler type %v does not satisfy %v", st, ht)
}
}
s.register(sd, ss)
}
|
_Greeter_SayHello_Handler
解码请求、串接拦截器链、最后调用业务实现:
1
2
3
4
5
6
7
8
9
10
| func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, itc grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HelloRequest)
if err := dec(in); err != nil { return nil, err }
if itc == nil { return srv.(GreeterServer).SayHello(ctx, in) }
info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: Greeter_SayHello_FullMethodName }
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
}
return itc(ctx, in, info, handler)
}
|
2) grpc.NewServer
与(可选)协程池
NewServer
初始化内部结构(监听器集合、连接表、服务表、channelz、退出事件等),并可开启一个极简协程池:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| func NewServer(opt ...ServerOption) *Server {
s := &Server{
lis: make(map[net.Listener]bool),
conns: make(map[string]map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
channelz: channelz.RegisterServer(""),
}
s.cv = sync.NewCond(&s.mu) // Stop/GracefulStop 等待连接清理
if s.opts.numServerWorkers > 0 {
s.initServerWorkers() // 单队列多工人
}
return s
}
|
3) Serve
:Accept 循环与每连接 goroutine
- 出错退避采用指数回退(至多 1s)。
- 每个连接
go handleRawConn(...)
开一个 goroutine;大量长连接会消耗一定内存与 GC 预算(每 goroutine 默认 2KB 栈,动态扩容)。
4) handleRawConn
→ newHTTP2Transport
1
2
3
| t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, ...)
go t.loopy.run()
go t.keepalive()
|
Framer 发送 SETTINGS 的要点(写帧头、帧体、长度回填):
1
2
3
4
5
| func (f *Framer) WriteSettings(settings ...Setting) error {
f.startWrite(FrameSettings, 0, 0)
for _, s := range settings { f.writeUint16(uint16(s.ID)); f.writeUint32(s.Val) }
return f.endWrite()
}
|
5) loopyWriter:发送主循环与流控
核心职责:
- 从 controlBuffer 取指令/数据项;
- 对不同项执行
handle(...)
(如 SETTINGS/PING/HEADER/GOAWAY 等); - 调用
processData()
把 activeStreams
队里的 dataFrame 按 连接级与流级配额写出。
连接级配额:sendQuota
流级配额:oiws
(初始窗口大小)与 outStream.bytesOutStanding
1
2
3
4
5
6
7
8
9
| func (l *loopyWriter) processData() (bool, error) {
if l.sendQuota == 0 { return true, nil }
str := l.activeStreams.dequeue()
df := str.itl.peek().(*dataFrame)
// 计算本次可写的大小:受 MaxFrameLen、流级窗口、连接级窗口共同约束
// 发送后更新 sendQuota 与 bytesOutStanding
// 若未写完,放回 activeStreams;若耗尽流窗口,转 waitingOnStreamQuota
}
|
窗口更新(收/发两侧):
- 收到对端的 WINDOW_UPDATE →
incomingWindowUpdateHandler
:为流补充配额,必要时把流重新入队; - 需要告知对端可读窗口增长 →
outgoingWindowUpdateHandler
:写 WINDOW_UPDATE 帧。
SETTINGS 变更 INITIAL_WINDOW_SIZE
时,要把等待中的流激活回来。
HEADERS/CONTINUATION:利用 HPACK 编码,必要时切分多帧发送(EndHeaders 标记)。
6) serveStreams
:读帧循环与并发限速
- 入口处装配
streamQuota
(信号量)限制同时处理的 stream 数; HandleStreams
为每个 MetaHeaders/Data/Settings/Ping 帧分派到对应处理函数。
1
2
3
4
5
6
7
| st.HandleStreams(ctx, func(stream *transport.Stream) {
streamQuota.acquire()
go func() {
defer streamQuota.release()
s.handleStream(st, stream)
}()
})
|
controlBuf.throttle()
会在发送队列积压时阻塞读取,形成读写背压平衡。
- 校验 streamID 单调递增(客户端发起应为奇数);
- 解析伪首部
:method/:path
、content-type
、grpc-encoding
、grpc-timeout
等,并校验 HTTP/2 语义(gRPC 要求 POST
); - 达到最大并发流上限会 RST;
- 创建
Stream
:收发窗口、recvBufferReader
、writeQuota
; controlBuf.put(registerStream)
让 loopy 创建 outStream
;- 调用上层
handle(stream)
进入方法路由。
8) 路由与一元 RPC 处理
handleStream
解析 FullMethod
为 service/method
,匹配 s.services
中已注册的 handler:
1
2
3
4
| if md, ok := srv.methods[method]; ok {
s.processUnaryRPC(ctx, t, stream, srv, md, trInfo)
return
}
|
Unary 路径(解包→执行业务→回写响应):
1
2
3
4
5
6
7
8
| d, _ := recvAndDecompress(&parser{r: stream, bufferPool: s.opts.bufferPool}, ...)
df := func(v any) error { return s.getCodec(stream.ContentSubtype()).Unmarshal(d, v) }
reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
hdr, payload := msgHeader(data, compData, pf)
_ = t.Write(stream, hdr, payload, &transport.Options{Last: true})
return t.WriteStatus(stream, statusOK)
|
Write
首次会隐式发送响应 HEADERS,再把 dataFrame
投递到 controlBuf
,由 loopy 统一按流控发送。
9) 数据帧接收与窗口维护
handleData
:
- 把
f.Data()
拷入 stream.recvBuffer
,供 RecvMsg
消费; - 同步维护 连接级与流级窗口:发送 WINDOW_UPDATE;
StreamEnded()
时写 io.EOF
,置状态 streamReadDone
。
handleRSTStream
/handleSettings
/handlePing
/incomingGoAway
等分别走对应的状态流与清理逻辑。
性能与实践提示
- 大量长连接:每连接 goroutine + loopy + keepalive,控制 每连接最大流 与 keepalive,合理的
MaxConnectionIdle/Age
有助于回收资源。 - 协程池:默认单通道多工人模型简单但竞争较大;只有在 大量小 RPC 且业务处理很轻时才可能获益,注意不要阻塞网络线程。
- 流控参数:
InitialWindowSize/ConnWindowSize
过小会限制吞吐,过大易导致突发放大;结合 BDP 自适应估计更稳。 - 头大小:
MaxHeaderListSize
限制过小会触发协议错误;过大的 metadata 需要谨慎。 - 背压:
controlBuf.throttle
会在发送队列积压时抑制读取,确保不要在业务逻辑里长期占用写配额(及时 replenish
)。 - 可观测性:启用 channelz、
StatsHandler
与服务端日志可快速定位 RST/GOAWAY/窗口异常等问题。
附:帧与关键组件速查
- Framer:编解码 HTTP/2 帧(HEADERS/DATA/SETTINGS/WINDOW_UPDATE/PING/GOAWAY/RST_STREAM/CONTINUATION)。
- controlBuffer:发送侧队列;
loopyWriter.run
的输入。 - loopyWriter:发送循环;按配额从
activeStreams
写帧。 - outStream / itemList:每流的待发队列;混合
dataFrame
与 headerFrame
(包含尾部 trailers)。 - writeQuota / recvBufferReader:应用层写/读的背压组件。
- flow-control:连接级(
sendQuota
/ trInFlow
)、流级(oiws
/ inFlow
)。
一个迷你时序图(请求-响应)
gRPC请求处理时序图

显示 Mermaid 源码
sequenceDiagram
autonumber
participant C as Client
participant S as grpc.Server
participant T as http2Server(Transport)
participant L as loopyWriter
participant H as Handler
C-»T: HEADERS(:path=/Greeter/SayHello)
T-»S: operateHeaders → handleStream
S-»H: 调用业务方法
H–»S: 返回 reply
S-»T: Write(hdr+payload)
T-»L: controlBuf <- headerFrame/dataFrame
L-»C: HEADERS + DATA (flow control)
C–»T: WINDOW_UPDATE / ACK
L-»C: HEADERS(trailers) + RST(optional)
创建时间: 2025年05月08日
本文由 tommie blog 原创发布