grpc-go源码剖析:server


一张图看懂 Server 端主流程

gRPC Server端主流程图
flowchart TD A[main] --> B[net.Listen(:port)] B --> C[grpc.NewServer(opts...)] C --> D[RegisterGreeterServer(s,&server{})] D --> E[Server.Serve(lis)] E -->|Accept loop| F[handleRawConn] F --> G[newHTTP2Transport<br/>(handshake,settings,preface)] G --> H{注册成功?} H -- 否 --> E H -- 是 --> I[goroutine: serveStreams] I --> J[HandleStreams: ReadFrame 循环] J --> K{MetaHeaders?} K -- 是 --> L[operateHeaders<br/>(解析path/metadata/限流)] L --> M[registerStream → loopy.registerStream] M --> N[handleStream → route方法] N --> O{Unary or Streaming} O -- Unary --> P[processUnaryRPC] P --> Q[recvAndDecompress + Unmarshal] Q --> R[业务Handler执行] R --> S[sendResponse→t.Write] S --> T[controlBuf<-dataFrame] T --> U[loopyWriter.processData<br/>(flow-control & WriteData)] O -- Streaming --> V[processStreamingRPC(略)] J --> W{DataFrame?} W -- 是 --> X[handleData → 写入 recvBufferReader] X --> Y[window update / EOF] J --> Z{其他帧: SETTINGS/PING/GOAWAY/RST} Z --> AA[对应 handle* 分支处理]

极简示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"os"
	"os/signal"
	"syscall"

	"google.golang.org/grpc"
	pb "your/module/helloworld"
)

type greeter struct{ pb.UnimplementedGreeterServer }

func (g *greeter) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}

func unaryLogger(
	ctx context.Context,
	req any,
	info *grpc.UnaryServerInfo,
	handler grpc.UnaryHandler,
) (any, error) {
	resp, err := handler(ctx, req)
	if err != nil {
		log.Printf("[ERR] %s: %v", info.FullMethod, err)
	} else {
		log.Printf("[OK ] %s", info.FullMethod)
	}
	return resp, err
}

func main() {
	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("listen: %v", err)
	}

	s := grpc.NewServer(grpc.UnaryInterceptor(unaryLogger))
	pb.RegisterGreeterServer(s, &greeter{})

	// 优雅退出
	go func() {
		if err := s.Serve(lis); err != nil {
			log.Fatalf("serve: %v", err)
		}
	}()

	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
	<-sig
	fmt.Println("shutting down...")
	s.GracefulStop()
}

源码走读

1) ServiceDesc 与方法处理路径

protoc 生成的 Greeter_ServiceDesc服务名方法名处理函数绑定,Server.RegisterService 会校验实现是否满足接口(反射)并登记在 server.services

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
var Greeter_ServiceDesc = grpc.ServiceDesc{
  ServiceName: "helloworld.Greeter",
  HandlerType: (*GreeterServer)(nil),
  Methods: []grpc.MethodDesc{
    { MethodName: "SayHello", Handler: _Greeter_SayHello_Handler },
  },
}

func (s *Server) RegisterService(sd *ServiceDesc, ss any) {
  if ss != nil {
    ht := reflect.TypeOf(sd.HandlerType).Elem()
    st := reflect.TypeOf(ss)
    if !st.Implements(ht) {
      logger.Fatalf("handler type %v does not satisfy %v", st, ht)
    }
  }
  s.register(sd, ss)
}

_Greeter_SayHello_Handler 解码请求、串接拦截器链、最后调用业务实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, itc grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(HelloRequest)
	if err := dec(in); err != nil { return nil, err }
	if itc == nil { return srv.(GreeterServer).SayHello(ctx, in) }
	info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: Greeter_SayHello_FullMethodName }
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
		return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
	}
	return itc(ctx, in, info, handler)
}

2) grpc.NewServer 与(可选)协程池

NewServer 初始化内部结构(监听器集合、连接表、服务表、channelz、退出事件等),并可开启一个极简协程池

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func NewServer(opt ...ServerOption) *Server {
  s := &Server{
    lis:      make(map[net.Listener]bool),
    conns:    make(map[string]map[transport.ServerTransport]bool),
    services: make(map[string]*serviceInfo),
    quit:     grpcsync.NewEvent(),
    done:     grpcsync.NewEvent(),
    channelz: channelz.RegisterServer(""),
  }
  s.cv = sync.NewCond(&s.mu) // Stop/GracefulStop 等待连接清理
  if s.opts.numServerWorkers > 0 {
    s.initServerWorkers() // 单队列多工人
  }
  return s
}

3) Serve:Accept 循环与每连接 goroutine

  • 出错退避采用指数回退(至多 1s)。
  • 每个连接 go handleRawConn(...) 开一个 goroutine;大量长连接会消耗一定内存与 GC 预算(每 goroutine 默认 2KB 栈,动态扩容)。

4) handleRawConnnewHTTP2Transport

  • 完成(可选)TLS handshake

  • 构造 Framer(HTTP/2 编解码器)、发送 SETTINGS、校验 client preface(魔数);

  • 读取客户端 SETTINGS,启动:

    • loopyWriter(发送循环,处理 controlBuf 指令与数据帧)
    • keepalive(保活与老化管理)
1
2
3
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, ...)
go t.loopy.run()
go t.keepalive()

Framer 发送 SETTINGS 的要点(写帧头、帧体、长度回填):

1
2
3
4
5
func (f *Framer) WriteSettings(settings ...Setting) error {
  f.startWrite(FrameSettings, 0, 0)
  for _, s := range settings { f.writeUint16(uint16(s.ID)); f.writeUint32(s.Val) }
  return f.endWrite()
}

5) loopyWriter:发送主循环与流控

核心职责:

  • controlBuffer 取指令/数据项;
  • 对不同项执行 handle(...)(如 SETTINGS/PING/HEADER/GOAWAY 等);
  • 调用 processData()activeStreams 队里的 dataFrame连接级流级配额写出。

连接级配额sendQuota 流级配额oiws(初始窗口大小)与 outStream.bytesOutStanding

1
2
3
4
5
6
7
8
9
func (l *loopyWriter) processData() (bool, error) {
  if l.sendQuota == 0 { return true, nil }
  str := l.activeStreams.dequeue()
  df := str.itl.peek().(*dataFrame)

  // 计算本次可写的大小:受 MaxFrameLen、流级窗口、连接级窗口共同约束
  // 发送后更新 sendQuota 与 bytesOutStanding
  // 若未写完,放回 activeStreams;若耗尽流窗口,转 waitingOnStreamQuota
}

窗口更新(收/发两侧):

  • 收到对端的 WINDOW_UPDATEincomingWindowUpdateHandler:为流补充配额,必要时把流重新入队;
  • 需要告知对端可读窗口增长 → outgoingWindowUpdateHandler:写 WINDOW_UPDATE 帧。

SETTINGS 变更 INITIAL_WINDOW_SIZE 时,要把等待中的流激活回来。

HEADERS/CONTINUATION:利用 HPACK 编码,必要时切分多帧发送(EndHeaders 标记)。


6) serveStreams:读帧循环与并发限速

  • 入口处装配 streamQuota(信号量)限制同时处理的 stream 数
  • HandleStreams 为每个 MetaHeaders/Data/Settings/Ping 帧分派到对应处理函数。
1
2
3
4
5
6
7
st.HandleStreams(ctx, func(stream *transport.Stream) {
  streamQuota.acquire()
  go func() {
    defer streamQuota.release()
    s.handleStream(st, stream)
  }()
})

controlBuf.throttle() 会在发送队列积压时阻塞读取,形成读写背压平衡。


7) operateHeaders:建立 Stream 并解析请求

  • 校验 streamID 单调递增(客户端发起应为奇数);
  • 解析伪首部 :method/:pathcontent-typegrpc-encodinggrpc-timeout 等,并校验 HTTP/2 语义(gRPC 要求 POST);
  • 达到最大并发流上限会 RST;
  • 创建 Stream:收发窗口、recvBufferReaderwriteQuota
  • controlBuf.put(registerStream) 让 loopy 创建 outStream
  • 调用上层 handle(stream) 进入方法路由。

8) 路由与一元 RPC 处理

handleStream 解析 FullMethodservice/method,匹配 s.services 中已注册的 handler:

1
2
3
4
if md, ok := srv.methods[method]; ok {
  s.processUnaryRPC(ctx, t, stream, srv, md, trInfo)
  return
}

Unary 路径(解包→执行业务→回写响应):

1
2
3
4
5
6
7
8
d, _ := recvAndDecompress(&parser{r: stream, bufferPool: s.opts.bufferPool}, ...)
df := func(v any) error { return s.getCodec(stream.ContentSubtype()).Unmarshal(d, v) }

reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)

hdr, payload := msgHeader(data, compData, pf)
_ = t.Write(stream, hdr, payload, &transport.Options{Last: true})
return t.WriteStatus(stream, statusOK)

Write 首次会隐式发送响应 HEADERS,再把 dataFrame 投递到 controlBuf,由 loopy 统一按流控发送。


9) 数据帧接收与窗口维护

handleData

  • f.Data() 拷入 stream.recvBuffer,供 RecvMsg 消费;
  • 同步维护 连接级流级窗口:发送 WINDOW_UPDATE;
  • StreamEnded() 时写 io.EOF,置状态 streamReadDone

handleRSTStream/handleSettings/handlePing/incomingGoAway 等分别走对应的状态流与清理逻辑。


性能与实践提示

  • 大量长连接:每连接 goroutine + loopy + keepalive,控制 每连接最大流keepalive,合理的 MaxConnectionIdle/Age 有助于回收资源。
  • 协程池:默认单通道多工人模型简单但竞争较大;只有在 大量小 RPC 且业务处理很轻时才可能获益,注意不要阻塞网络线程。
  • 流控参数InitialWindowSize/ConnWindowSize过小会限制吞吐,过大易导致突发放大;结合 BDP 自适应估计更稳。
  • 头大小MaxHeaderListSize 限制过小会触发协议错误;过大的 metadata 需要谨慎。
  • 背压controlBuf.throttle 会在发送队列积压时抑制读取,确保不要在业务逻辑里长期占用写配额(及时 replenish)。
  • 可观测性:启用 channelzStatsHandler 与服务端日志可快速定位 RST/GOAWAY/窗口异常等问题。

附:帧与关键组件速查

  • Framer:编解码 HTTP/2 帧(HEADERS/DATA/SETTINGS/WINDOW_UPDATE/PING/GOAWAY/RST_STREAM/CONTINUATION)。
  • controlBuffer:发送侧队列;loopyWriter.run 的输入。
  • loopyWriter:发送循环;按配额从 activeStreams 写帧。
  • outStream / itemList:每流的待发队列;混合 dataFrameheaderFrame(包含尾部 trailers)。
  • writeQuota / recvBufferReader:应用层写/读的背压组件。
  • flow-control:连接级(sendQuota / trInFlow)、流级(oiws / inFlow)。

一个迷你时序图(请求-响应)

gRPC请求处理时序图
gRPC请求处理时序图



创建时间: 2025年05月08日

本文由 tommie blog 原创发布