gRPC-Go 框架使用手册
目录
快速入门
安装依赖
go mod init your-project
go get google.golang.org/grpc
go get google.golang.org/protobuf/cmd/protoc-gen-go
go get google.golang.org/grpc/cmd/protoc-gen-go-grpc
定义服务
// greeter.proto
syntax = "proto3";
package helloworld;
option go_package = "github.com/your-org/your-project/proto/helloworld";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc SayHelloStream (stream HelloRequest) returns (stream HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
生成代码
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
greeter.proto
服务端实现
package main
import (
"context"
"fmt"
"log"
"net"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "your-project/proto/helloworld"
)
// 服务实现
type server struct {
pb.UnimplementedGreeterServer
}
// 实现 SayHello 方法
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.GetName())
// 业务逻辑处理
if in.GetName() == "" {
return nil, status.Error(codes.InvalidArgument, "name cannot be empty")
}
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
// 实现流式方法
func (s *server) SayHelloStream(stream pb.Greeter_SayHelloStreamServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
reply := &pb.HelloReply{
Message: fmt.Sprintf("Hello %s at %v", in.GetName(), time.Now()),
}
if err := stream.Send(reply); err != nil {
return err
}
}
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// 创建 gRPC 服务器
s := grpc.NewServer(
grpc.MaxRecvMsgSize(4*1024*1024), // 4MB
grpc.MaxSendMsgSize(4*1024*1024), // 4MB
grpc.UnaryInterceptor(unaryInterceptor),
grpc.StreamInterceptor(streamInterceptor),
)
// 注册服务
pb.RegisterGreeterServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
// 一元拦截器
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
// 前置处理
log.Printf("Method: %s, Start: %v", info.FullMethod, start)
// 调用实际处理器
resp, err := handler(ctx, req)
// 后置处理
duration := time.Since(start)
log.Printf("Method: %s, Duration: %v, Error: %v", info.FullMethod, duration, err)
return resp, err
}
// 流式拦截器
func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
log.Printf("Stream Method: %s", info.FullMethod)
return handler(srv, ss)
}
客户端实现
package main
import (
"context"
"io"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "your-project/proto/helloworld"
)
func main() {
// 建立连接
conn, err := grpc.NewClient("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{
"loadBalancingConfig": [{"round_robin":{}}],
"methodConfig": [{
"name": [{}],
"waitForReady": true,
"timeout": "10s",
"retryPolicy": {
"maxAttempts": 3,
"initialBackoff": "100ms",
"maxBackoff": "1s",
"backoffMultiplier": 2.0,
"retryableStatusCodes": ["UNAVAILABLE"]
}
}]
}`),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// 一元调用
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: "World"})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
// 流式调用
streamClient, err := c.SayHelloStream(context.Background())
if err != nil {
log.Fatalf("could not create stream: %v", err)
}
// 发送数据
go func() {
names := []string{"Alice", "Bob", "Charlie"}
for _, name := range names {
if err := streamClient.Send(&pb.HelloRequest{Name: name}); err != nil {
log.Printf("send error: %v", err)
return
}
time.Sleep(time.Second)
}
streamClient.CloseSend()
}()
// 接收数据
for {
reply, err := streamClient.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("receive error: %v", err)
}
log.Printf("Stream reply: %s", reply.GetMessage())
}
}
核心概念
服务定义与注册
gRPC 服务通过 Protocol Buffers 定义,生成的代码包含:
- 服务接口:定义服务方法签名
- 消息类型:请求和响应的数据结构
- 客户端存根:用于发起 RPC 调用
- 服务端骨架:用于实现服务逻辑
连接管理
gRPC 使用 HTTP/2 作为传输协议,具有以下特点:
- 多路复用:单个连接支持多个并发流
- 流控制:防止快速发送方压垮慢速接收方
- 头部压缩:使用 HPACK 算法压缩 HTTP 头部
负载均衡
gRPC 支持多种负载均衡策略:
- pick_first:选择第一个可用连接
- round_robin:轮询所有可用连接
- weighted_round_robin:基于权重的轮询
- grpclb:使用外部负载均衡器
服务发现
gRPC 支持多种服务发现机制:
- DNS:通过 DNS SRV 记录发现服务
- 静态配置:直接指定服务地址
- 自定义 Resolver:实现自定义服务发现逻辑
API 参考
服务端 API
grpc.NewServer
func NewServer(opt ...ServerOption) *Server
创建新的 gRPC 服务器实例。
常用选项:
MaxRecvMsgSize(int)
:设置接收消息最大大小MaxSendMsgSize(int)
:设置发送消息最大大小UnaryInterceptor(UnaryServerInterceptor)
:设置一元拦截器StreamInterceptor(StreamServerInterceptor)
:设置流式拦截器
Server.RegisterService
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{})
注册服务实现到服务器。
Server.Serve
func (s *Server) Serve(lis net.Listener) error
开始监听并处理请求。
Server.GracefulStop
func (s *Server) GracefulStop()
优雅关闭服务器,等待现有请求完成。
客户端 API
grpc.NewClient
func NewClient(target string, opts ...DialOption) (*ClientConn, error)
创建到指定目标的客户端连接。
常用选项:
WithTransportCredentials(TransportCredentials)
:设置传输凭证WithDefaultServiceConfig(string)
:设置默认服务配置WithUnaryInterceptor(UnaryClientInterceptor)
:设置一元拦截器
ClientConn.Invoke
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error
发起一元 RPC 调用。
ClientConn.NewStream
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
创建新的流式 RPC。
配置选项
服务端配置
s := grpc.NewServer(
// 消息大小限制
grpc.MaxRecvMsgSize(4*1024*1024), // 4MB
grpc.MaxSendMsgSize(4*1024*1024), // 4MB
// 连接参数
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 15 * time.Second,
MaxConnectionAge: 30 * time.Second,
MaxConnectionAgeGrace: 5 * time.Second,
Time: 5 * time.Second,
Timeout: 1 * time.Second,
}),
// 并发控制
grpc.MaxConcurrentStreams(1000),
// 拦截器
grpc.ChainUnaryInterceptor(
loggingInterceptor,
authInterceptor,
recoveryInterceptor,
),
)
客户端配置
conn, err := grpc.NewClient(target,
// 传输安全
grpc.WithTransportCredentials(insecure.NewCredentials()),
// 连接参数
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: time.Second,
PermitWithoutStream: true,
}),
// 服务配置
grpc.WithDefaultServiceConfig(`{
"loadBalancingConfig": [{"round_robin":{}}],
"methodConfig": [{
"name": [{}],
"waitForReady": true,
"timeout": "10s",
"retryPolicy": {
"maxAttempts": 3,
"initialBackoff": "100ms",
"maxBackoff": "1s",
"backoffMultiplier": 2.0,
"retryableStatusCodes": ["UNAVAILABLE"]
}
}]
}`),
)
最佳实践
1. 错误处理
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (s *server) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
if req.GetName() == "" {
return nil, status.Error(codes.InvalidArgument, "name is required")
}
// 业务逻辑
result, err := s.businessLogic(req.GetName())
if err != nil {
// 根据错误类型返回适当的状态码
switch err {
case ErrNotFound:
return nil, status.Error(codes.NotFound, "user not found")
case ErrPermissionDenied:
return nil, status.Error(codes.PermissionDenied, "access denied")
default:
return nil, status.Error(codes.Internal, "internal server error")
}
}
return &pb.HelloReply{Message: result}, nil
}
2. 超时和取消
// 服务端检查上下文
func (s *server) LongRunningTask(ctx context.Context, req *pb.TaskRequest) (*pb.TaskResponse, error) {
for i := 0; i < 100; i++ {
// 检查上下文是否已取消
select {
case <-ctx.Done():
return nil, status.Error(codes.Canceled, "request canceled")
default:
}
// 执行任务步骤
time.Sleep(100 * time.Millisecond)
}
return &pb.TaskResponse{Result: "completed"}, nil
}
// 客户端设置超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.LongRunningTask(ctx, &pb.TaskRequest{})
3. 流式处理
// 服务端流式处理
func (s *server) StreamData(req *pb.StreamRequest, stream pb.Service_StreamDataServer) error {
for i := 0; i < 10; i++ {
// 检查上下文
if stream.Context().Err() != nil {
return stream.Context().Err()
}
data := &pb.StreamResponse{
Id: int32(i),
Data: fmt.Sprintf("data-%d", i),
}
if err := stream.Send(data); err != nil {
return err
}
time.Sleep(time.Second)
}
return nil
}
// 客户端流式接收
stream, err := client.StreamData(ctx, &pb.StreamRequest{})
if err != nil {
log.Fatal(err)
}
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
log.Printf("Received: %v", resp)
}
4. 拦截器链
// 日志拦截器
func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req)
duration := time.Since(start)
log.Printf("Method: %s, Duration: %v, Error: %v", info.FullMethod, duration, err)
return resp, err
}
// 认证拦截器
func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 跳过不需要认证的方法
if info.FullMethod == "/health/check" {
return handler(ctx, req)
}
// 从元数据中获取令牌
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "missing metadata")
}
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "missing token")
}
// 验证令牌
if !validateToken(tokens[0]) {
return nil, status.Error(codes.Unauthenticated, "invalid token")
}
return handler(ctx, req)
}
// 恢复拦截器
func recoveryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic in %s: %v", info.FullMethod, r)
err = status.Error(codes.Internal, "internal server error")
}
}()
return handler(ctx, req)
}
故障排查
常见错误及解决方案
1. 连接失败
错误:connection refused
或 context deadline exceeded
排查步骤:
- 检查服务端是否正常启动
- 验证网络连通性
- 检查防火墙设置
- 确认端口是否正确
2. 认证失败
错误:transport: authentication handshake failed
排查步骤:
- 检查 TLS 证书配置
- 验证证书有效期
- 确认客户端和服务端的认证配置匹配
3. 消息过大
错误:message larger than max
解决方案:
// 服务端
s := grpc.NewServer(
grpc.MaxRecvMsgSize(10*1024*1024), // 10MB
grpc.MaxSendMsgSize(10*1024*1024), // 10MB
)
// 客户端
conn, err := grpc.NewClient(target,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(10*1024*1024),
grpc.MaxCallSendMsgSize(10*1024*1024),
),
)
4. 流控制问题
错误:stream terminated by RST_STREAM
排查步骤:
- 检查流控制窗口大小
- 验证数据发送速率
- 检查网络质量
调试工具
1. 启用详细日志
import "google.golang.org/grpc/grpclog"
func init() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stderr, os.Stderr))
}
2. 使用 channelz
import _ "google.golang.org/grpc/channelz/service"
// 启动 channelz 服务
go func() {
lis, err := net.Listen("tcp", ":50052")
if err != nil {
log.Fatal(err)
}
s := grpc.NewServer()
channelzservice.RegisterChannelzServiceToServer(s)
s.Serve(lis)
}()
3. 性能分析
import _ "net/http/pprof"
// 启动 pprof 服务
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
这个框架使用手册提供了 gRPC-Go 的完整使用指南,包括快速入门、核心概念、API 参考、配置选项、最佳实践和故障排查。接下来我将继续完善其他模块的详细分析。