gRPC-Go 框架使用手册

目录

  1. 快速入门
  2. 核心概念
  3. API 参考
  4. 配置选项
  5. 最佳实践
  6. 故障排查

快速入门

安装依赖

go mod init your-project
go get google.golang.org/grpc
go get google.golang.org/protobuf/cmd/protoc-gen-go
go get google.golang.org/grpc/cmd/protoc-gen-go-grpc

定义服务

// greeter.proto
syntax = "proto3";

package helloworld;
option go_package = "github.com/your-org/your-project/proto/helloworld";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
  rpc SayHelloStream (stream HelloRequest) returns (stream HelloReply) {}
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

生成代码

protoc --go_out=. --go_opt=paths=source_relative \
    --go-grpc_out=. --go-grpc_opt=paths=source_relative \
    greeter.proto

服务端实现

package main

import (
    "context"
    "fmt"
    "log"
    "net"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    pb "your-project/proto/helloworld"
)

// 服务实现
type server struct {
    pb.UnimplementedGreeterServer
}

// 实现 SayHello 方法
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    log.Printf("Received: %v", in.GetName())
    
    // 业务逻辑处理
    if in.GetName() == "" {
        return nil, status.Error(codes.InvalidArgument, "name cannot be empty")
    }
    
    return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

// 实现流式方法
func (s *server) SayHelloStream(stream pb.Greeter_SayHelloStreamServer) error {
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        
        reply := &pb.HelloReply{
            Message: fmt.Sprintf("Hello %s at %v", in.GetName(), time.Now()),
        }
        
        if err := stream.Send(reply); err != nil {
            return err
        }
    }
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    // 创建 gRPC 服务器
    s := grpc.NewServer(
        grpc.MaxRecvMsgSize(4*1024*1024), // 4MB
        grpc.MaxSendMsgSize(4*1024*1024), // 4MB
        grpc.UnaryInterceptor(unaryInterceptor),
        grpc.StreamInterceptor(streamInterceptor),
    )
    
    // 注册服务
    pb.RegisterGreeterServer(s, &server{})
    
    log.Printf("server listening at %v", lis.Addr())
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

// 一元拦截器
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    start := time.Now()
    
    // 前置处理
    log.Printf("Method: %s, Start: %v", info.FullMethod, start)
    
    // 调用实际处理器
    resp, err := handler(ctx, req)
    
    // 后置处理
    duration := time.Since(start)
    log.Printf("Method: %s, Duration: %v, Error: %v", info.FullMethod, duration, err)
    
    return resp, err
}

// 流式拦截器
func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    log.Printf("Stream Method: %s", info.FullMethod)
    return handler(srv, ss)
}

客户端实现

package main

import (
    "context"
    "io"
    "log"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    pb "your-project/proto/helloworld"
)

func main() {
    // 建立连接
    conn, err := grpc.NewClient("localhost:50051", 
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithDefaultServiceConfig(`{
            "loadBalancingConfig": [{"round_robin":{}}],
            "methodConfig": [{
                "name": [{}],
                "waitForReady": true,
                "timeout": "10s",
                "retryPolicy": {
                    "maxAttempts": 3,
                    "initialBackoff": "100ms",
                    "maxBackoff": "1s",
                    "backoffMultiplier": 2.0,
                    "retryableStatusCodes": ["UNAVAILABLE"]
                }
            }]
        }`),
    )
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()

    c := pb.NewGreeterClient(conn)

    // 一元调用
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    
    r, err := c.SayHello(ctx, &pb.HelloRequest{Name: "World"})
    if err != nil {
        log.Fatalf("could not greet: %v", err)
    }
    log.Printf("Greeting: %s", r.GetMessage())

    // 流式调用
    streamClient, err := c.SayHelloStream(context.Background())
    if err != nil {
        log.Fatalf("could not create stream: %v", err)
    }

    // 发送数据
    go func() {
        names := []string{"Alice", "Bob", "Charlie"}
        for _, name := range names {
            if err := streamClient.Send(&pb.HelloRequest{Name: name}); err != nil {
                log.Printf("send error: %v", err)
                return
            }
            time.Sleep(time.Second)
        }
        streamClient.CloseSend()
    }()

    // 接收数据
    for {
        reply, err := streamClient.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("receive error: %v", err)
        }
        log.Printf("Stream reply: %s", reply.GetMessage())
    }
}

核心概念

服务定义与注册

gRPC 服务通过 Protocol Buffers 定义,生成的代码包含:

  1. 服务接口:定义服务方法签名
  2. 消息类型:请求和响应的数据结构
  3. 客户端存根:用于发起 RPC 调用
  4. 服务端骨架:用于实现服务逻辑

连接管理

gRPC 使用 HTTP/2 作为传输协议,具有以下特点:

  • 多路复用:单个连接支持多个并发流
  • 流控制:防止快速发送方压垮慢速接收方
  • 头部压缩:使用 HPACK 算法压缩 HTTP 头部

负载均衡

gRPC 支持多种负载均衡策略:

  • pick_first:选择第一个可用连接
  • round_robin:轮询所有可用连接
  • weighted_round_robin:基于权重的轮询
  • grpclb:使用外部负载均衡器

服务发现

gRPC 支持多种服务发现机制:

  • DNS:通过 DNS SRV 记录发现服务
  • 静态配置:直接指定服务地址
  • 自定义 Resolver:实现自定义服务发现逻辑

API 参考

服务端 API

grpc.NewServer

func NewServer(opt ...ServerOption) *Server

创建新的 gRPC 服务器实例。

常用选项

  • MaxRecvMsgSize(int):设置接收消息最大大小
  • MaxSendMsgSize(int):设置发送消息最大大小
  • UnaryInterceptor(UnaryServerInterceptor):设置一元拦截器
  • StreamInterceptor(StreamServerInterceptor):设置流式拦截器

Server.RegisterService

func (s *Server) RegisterService(sd *ServiceDesc, ss interface{})

注册服务实现到服务器。

Server.Serve

func (s *Server) Serve(lis net.Listener) error

开始监听并处理请求。

Server.GracefulStop

func (s *Server) GracefulStop()

优雅关闭服务器,等待现有请求完成。

客户端 API

grpc.NewClient

func NewClient(target string, opts ...DialOption) (*ClientConn, error)

创建到指定目标的客户端连接。

常用选项

  • WithTransportCredentials(TransportCredentials):设置传输凭证
  • WithDefaultServiceConfig(string):设置默认服务配置
  • WithUnaryInterceptor(UnaryClientInterceptor):设置一元拦截器

ClientConn.Invoke

func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error

发起一元 RPC 调用。

ClientConn.NewStream

func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)

创建新的流式 RPC。

配置选项

服务端配置

s := grpc.NewServer(
    // 消息大小限制
    grpc.MaxRecvMsgSize(4*1024*1024), // 4MB
    grpc.MaxSendMsgSize(4*1024*1024), // 4MB
    
    // 连接参数
    grpc.KeepaliveParams(keepalive.ServerParameters{
        MaxConnectionIdle:     15 * time.Second,
        MaxConnectionAge:      30 * time.Second,
        MaxConnectionAgeGrace: 5 * time.Second,
        Time:                  5 * time.Second,
        Timeout:               1 * time.Second,
    }),
    
    // 并发控制
    grpc.MaxConcurrentStreams(1000),
    
    // 拦截器
    grpc.ChainUnaryInterceptor(
        loggingInterceptor,
        authInterceptor,
        recoveryInterceptor,
    ),
)

客户端配置

conn, err := grpc.NewClient(target,
    // 传输安全
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    
    // 连接参数
    grpc.WithKeepaliveParams(keepalive.ClientParameters{
        Time:                10 * time.Second,
        Timeout:             time.Second,
        PermitWithoutStream: true,
    }),
    
    // 服务配置
    grpc.WithDefaultServiceConfig(`{
        "loadBalancingConfig": [{"round_robin":{}}],
        "methodConfig": [{
            "name": [{}],
            "waitForReady": true,
            "timeout": "10s",
            "retryPolicy": {
                "maxAttempts": 3,
                "initialBackoff": "100ms",
                "maxBackoff": "1s",
                "backoffMultiplier": 2.0,
                "retryableStatusCodes": ["UNAVAILABLE"]
            }
        }]
    }`),
)

最佳实践

1. 错误处理

import (
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

func (s *server) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
    if req.GetName() == "" {
        return nil, status.Error(codes.InvalidArgument, "name is required")
    }
    
    // 业务逻辑
    result, err := s.businessLogic(req.GetName())
    if err != nil {
        // 根据错误类型返回适当的状态码
        switch err {
        case ErrNotFound:
            return nil, status.Error(codes.NotFound, "user not found")
        case ErrPermissionDenied:
            return nil, status.Error(codes.PermissionDenied, "access denied")
        default:
            return nil, status.Error(codes.Internal, "internal server error")
        }
    }
    
    return &pb.HelloReply{Message: result}, nil
}

2. 超时和取消

// 服务端检查上下文
func (s *server) LongRunningTask(ctx context.Context, req *pb.TaskRequest) (*pb.TaskResponse, error) {
    for i := 0; i < 100; i++ {
        // 检查上下文是否已取消
        select {
        case <-ctx.Done():
            return nil, status.Error(codes.Canceled, "request canceled")
        default:
        }
        
        // 执行任务步骤
        time.Sleep(100 * time.Millisecond)
    }
    
    return &pb.TaskResponse{Result: "completed"}, nil
}

// 客户端设置超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

resp, err := client.LongRunningTask(ctx, &pb.TaskRequest{})

3. 流式处理

// 服务端流式处理
func (s *server) StreamData(req *pb.StreamRequest, stream pb.Service_StreamDataServer) error {
    for i := 0; i < 10; i++ {
        // 检查上下文
        if stream.Context().Err() != nil {
            return stream.Context().Err()
        }
        
        data := &pb.StreamResponse{
            Id:   int32(i),
            Data: fmt.Sprintf("data-%d", i),
        }
        
        if err := stream.Send(data); err != nil {
            return err
        }
        
        time.Sleep(time.Second)
    }
    
    return nil
}

// 客户端流式接收
stream, err := client.StreamData(ctx, &pb.StreamRequest{})
if err != nil {
    log.Fatal(err)
}

for {
    resp, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatal(err)
    }
    
    log.Printf("Received: %v", resp)
}

4. 拦截器链

// 日志拦截器
func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    start := time.Now()
    resp, err := handler(ctx, req)
    duration := time.Since(start)
    
    log.Printf("Method: %s, Duration: %v, Error: %v", info.FullMethod, duration, err)
    return resp, err
}

// 认证拦截器
func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    // 跳过不需要认证的方法
    if info.FullMethod == "/health/check" {
        return handler(ctx, req)
    }
    
    // 从元数据中获取令牌
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, status.Error(codes.Unauthenticated, "missing metadata")
    }
    
    tokens := md.Get("authorization")
    if len(tokens) == 0 {
        return nil, status.Error(codes.Unauthenticated, "missing token")
    }
    
    // 验证令牌
    if !validateToken(tokens[0]) {
        return nil, status.Error(codes.Unauthenticated, "invalid token")
    }
    
    return handler(ctx, req)
}

// 恢复拦截器
func recoveryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("Panic in %s: %v", info.FullMethod, r)
            err = status.Error(codes.Internal, "internal server error")
        }
    }()
    
    return handler(ctx, req)
}

故障排查

常见错误及解决方案

1. 连接失败

错误connection refusedcontext deadline exceeded

排查步骤

  1. 检查服务端是否正常启动
  2. 验证网络连通性
  3. 检查防火墙设置
  4. 确认端口是否正确

2. 认证失败

错误transport: authentication handshake failed

排查步骤

  1. 检查 TLS 证书配置
  2. 验证证书有效期
  3. 确认客户端和服务端的认证配置匹配

3. 消息过大

错误message larger than max

解决方案

// 服务端
s := grpc.NewServer(
    grpc.MaxRecvMsgSize(10*1024*1024), // 10MB
    grpc.MaxSendMsgSize(10*1024*1024), // 10MB
)

// 客户端
conn, err := grpc.NewClient(target,
    grpc.WithDefaultCallOptions(
        grpc.MaxCallRecvMsgSize(10*1024*1024),
        grpc.MaxCallSendMsgSize(10*1024*1024),
    ),
)

4. 流控制问题

错误stream terminated by RST_STREAM

排查步骤

  1. 检查流控制窗口大小
  2. 验证数据发送速率
  3. 检查网络质量

调试工具

1. 启用详细日志

import "google.golang.org/grpc/grpclog"

func init() {
    grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stderr, os.Stderr))
}

2. 使用 channelz

import _ "google.golang.org/grpc/channelz/service"

// 启动 channelz 服务
go func() {
    lis, err := net.Listen("tcp", ":50052")
    if err != nil {
        log.Fatal(err)
    }
    s := grpc.NewServer()
    channelzservice.RegisterChannelzServiceToServer(s)
    s.Serve(lis)
}()

3. 性能分析

import _ "net/http/pprof"

// 启动 pprof 服务
go func() {
    log.Println(http.ListenAndServe("localhost:6060", nil))
}()

这个框架使用手册提供了 gRPC-Go 的完整使用指南,包括快速入门、核心概念、API 参考、配置选项、最佳实践和故障排查。接下来我将继续完善其他模块的详细分析。