Kitex 框架使用手册

目录

  1. 快速开始
  2. 基础概念
  3. 客户端使用
  4. 服务端使用
  5. 高级特性
  6. 配置参考
  7. 故障排查

快速开始

环境要求

  • Go 1.20 或更高版本
  • 支持的协议: Thrift, Protobuf, gRPC

安装

# 安装 Kitex 工具
go install github.com/cloudwego/kitex/tool/cmd/kitex@latest

# 安装 Thriftgo(如果使用 Thrift)
go install github.com/cloudwego/thriftgo@latest

定义服务接口

使用 Thrift IDL

// hello.thrift
namespace go hello

struct HelloRequest {
    1: string name
}

struct HelloResponse {
    1: string message
}

service HelloService {
    HelloResponse SayHello(1: HelloRequest req)
}

使用 Protobuf IDL

// hello.proto
syntax = "proto3";

package hello;
option go_package = "hello";

message HelloRequest {
    string name = 1;
}

message HelloResponse {
    string message = 1;
}

service HelloService {
    rpc SayHello(HelloRequest) returns (HelloResponse);
}

生成代码

# 生成 Thrift 代码
kitex -module example -service hello hello.thrift

# 生成 Protobuf 代码
kitex -module example -service hello hello.proto

实现服务端

package main

import (
    "context"
    "log"
    
    "github.com/cloudwego/kitex/server"
    "example/kitex_gen/hello"
    "example/kitex_gen/hello/helloservice"
)

// HelloServiceImpl 实现 HelloService 接口
type HelloServiceImpl struct{}

// SayHello 实现 SayHello 方法
func (s *HelloServiceImpl) SayHello(ctx context.Context, req *hello.HelloRequest) (*hello.HelloResponse, error) {
    return &hello.HelloResponse{
        Message: "Hello " + req.Name,
    }, nil
}

func main() {
    // 创建服务器
    svr := helloservice.NewServer(new(HelloServiceImpl))
    
    // 启动服务器
    err := svr.Run()
    if err != nil {
        log.Println(err.Error())
    }
}

实现客户端

package main

import (
    "context"
    "log"
    
    "example/kitex_gen/hello"
    "example/kitex_gen/hello/helloservice"
    "github.com/cloudwego/kitex/client"
)

func main() {
    // 创建客户端
    c, err := helloservice.NewClient("hello", client.WithHostPorts("0.0.0.0:8888"))
    if err != nil {
        log.Fatal(err)
    }
    
    // 发起调用
    req := &hello.HelloRequest{Name: "World"}
    resp, err := c.SayHello(context.Background(), req)
    if err != nil {
        log.Fatal(err)
    }
    
    log.Println(resp.Message)
}

基础概念

核心组件

ServiceInfo

服务信息描述符,包含服务的元数据信息:

type ServiceInfo struct {
    ServiceName     string                    // 服务名称
    Methods         map[string]MethodInfo     // 方法信息
    PayloadCodec    PayloadCodec             // 编解码器
    HandlerType     interface{}               // 处理器类型
    Extra           map[string]interface{}   // 扩展信息
}

RPCInfo

RPC 调用信息,包含调用的上下文信息:

type RPCInfo interface {
    From() EndpointInfo      // 调用方信息
    To() EndpointInfo        // 被调用方信息
    Invocation() Invocation  // 调用信息
    Config() RPCConfig       // RPC 配置
    Stats() RPCStats         // 统计信息
}

Message

消息抽象,封装了 RPC 调用的数据:

type Message interface {
    RPCInfo() rpcinfo.RPCInfo
    Data() interface{}
    MessageType() MessageType
    PayloadCodec() PayloadCodec
    TransInfo() TransInfo
}

传输协议

TTHeader

Kitex 默认的传输协议,支持元数据传输:

client.WithTransportProtocol(transport.TTHeader)

gRPC

标准的 gRPC 协议:

client.WithTransportProtocol(transport.GRPC)

HTTP2

HTTP/2 协议支持:

client.WithTransportProtocol(transport.HTTP2)

客户端使用

基本配置

创建客户端

import (
    "github.com/cloudwego/kitex/client"
    "example/kitex_gen/hello/helloservice"
)

// 基础客户端
c, err := helloservice.NewClient("hello")

// 带选项的客户端
c, err := helloservice.NewClient("hello",
    client.WithHostPorts("127.0.0.1:8888"),
    client.WithRPCTimeout(time.Second*3),
    client.WithConnectTimeout(time.Millisecond*500),
)

服务发现

import "github.com/kitex-contrib/registry-consul"

// 使用 Consul 服务发现
resolver, err := consul.NewConsulResolver("127.0.0.1:8500")
if err != nil {
    log.Fatal(err)
}

c, err := helloservice.NewClient("hello",
    client.WithResolver(resolver),
)

负载均衡

import "github.com/cloudwego/kitex/pkg/loadbalance"

// 轮询负载均衡
c, err := helloservice.NewClient("hello",
    client.WithLoadBalancer(loadbalance.NewWeightedRoundRobinBalancer()),
)

// 一致性哈希负载均衡
c, err := helloservice.NewClient("hello",
    client.WithLoadBalancer(loadbalance.NewConsistentHashBalancer(
        loadbalance.NewConsistentHashOption(func(ctx context.Context, request interface{}) string {
            // 返回哈希键
            return "user_123"
        }),
    )),
)

超时配置

多层超时

c, err := helloservice.NewClient("hello",
    // RPC 总超时时间
    client.WithRPCTimeout(time.Second*5),
    
    // 连接超时时间
    client.WithConnectTimeout(time.Millisecond*500),
    
    // 读写超时时间
    client.WithReadWriteTimeout(time.Second*2),
)

动态超时

import "github.com/cloudwego/kitex/client/callopt"

// 在调用时指定超时
resp, err := c.SayHello(context.Background(), req,
    callopt.WithRPCTimeout(time.Second*10),
)

重试配置

失败重试

import "github.com/cloudwego/kitex/pkg/retry"

c, err := helloservice.NewClient("hello",
    client.WithFailureRetry(retry.NewFailurePolicy(
        retry.WithMaxRetryTimes(3),                    // 最大重试次数
        retry.WithMaxDurationMS(10000),                // 最大重试时长
        retry.WithInitialDelay(10),                    // 初始延迟
        retry.WithMaxDelay(100),                       // 最大延迟
        retry.WithDelayPolicy(retry.BackOffDelayPolicy), // 退避策略
    )),
)

备份请求

c, err := helloservice.NewClient("hello",
    client.WithBackupRequest(retry.NewBackupPolicy(
        retry.WithRetryDelayMS(100),  // 备份请求延迟
        retry.WithStopPolicy(retry.StopPolicyType(1)), // 停止策略
    )),
)

熔断配置

import "github.com/cloudwego/kitex/pkg/circuitbreak"

c, err := helloservice.NewClient("hello",
    client.WithCircuitBreaker(circuitbreak.NewCBSuite(
        // 服务级熔断
        circuitbreak.WithServiceCBConfig(circuitbreak.CBConfig{
            Enable:    true,
            ErrRate:   0.5,   // 错误率阈值 50%
            MinSample: 200,   // 最小采样数
        }),
        
        // 实例级熔断
        circuitbreak.WithInstanceCBConfig(circuitbreak.CBConfig{
            Enable:    true,
            ErrRate:   0.3,   // 错误率阈值 30%
            MinSample: 100,   // 最小采样数
        }),
    )),
)

连接池配置

短连接池

import "github.com/cloudwego/kitex/pkg/connpool"

c, err := helloservice.NewClient("hello",
    client.WithShortConnection(),
)

长连接池

c, err := helloservice.NewClient("hello",
    client.WithLongConnection(connpool.IdleConfig{
        MaxIdlePerAddress: 10,                    // 每个地址最大空闲连接数
        MaxIdleGlobal:     100,                   // 全局最大空闲连接数
        MaxIdleTimeout:    time.Minute * 3,       // 空闲超时时间
        MinIdlePerAddress: 2,                     // 每个地址最小空闲连接数
        MaxConnPerAddress: 50,                    // 每个地址最大连接数
    }),
)

中间件使用

自定义中间件

import "github.com/cloudwego/kitex/pkg/endpoint"

// 定义中间件
func MyMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
    return func(ctx context.Context, req, resp interface{}) (err error) {
        // 前置处理
        log.Printf("Before call: %+v", req)
        
        // 调用下一个中间件或实际处理函数
        err = next(ctx, req, resp)
        
        // 后置处理
        log.Printf("After call: %+v, err: %v", resp, err)
        
        return err
    }
}

// 使用中间件
c, err := helloservice.NewClient("hello",
    client.WithMiddleware(MyMiddleware),
)

内置中间件

// 链路追踪中间件
import "github.com/kitex-contrib/tracer-opentracing"

c, err := helloservice.NewClient("hello",
    client.WithSuite(opentracing.NewDefaultClientSuite()),
)

// 监控中间件
import "github.com/kitex-contrib/monitor-prometheus"

c, err := helloservice.NewClient("hello",
    client.WithSuite(prometheus.NewClientSuite()),
)

服务端使用

基本配置

创建服务器

import "github.com/cloudwego/kitex/server"

// 基础服务器
svr := helloservice.NewServer(new(HelloServiceImpl))

// 带选项的服务器
svr := helloservice.NewServer(new(HelloServiceImpl),
    server.WithServiceAddr(&net.TCPAddr{Port: 8888}),
    server.WithReadWriteTimeout(time.Second*5),
)

服务注册

import "github.com/kitex-contrib/registry-consul"

// 使用 Consul 服务注册
registry, err := consul.NewConsulRegistry("127.0.0.1:8500")
if err != nil {
    log.Fatal(err)
}

svr := helloservice.NewServer(new(HelloServiceImpl),
    server.WithRegistry(registry),
    server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{
        ServiceName: "hello",
        Tags: map[string]string{
            "version": "v1.0.0",
        },
    }),
)

限流配置

QPS 限流

import "github.com/cloudwego/kitex/pkg/limit"

svr := helloservice.NewServer(new(HelloServiceImpl),
    server.WithLimit(&limit.Option{
        MaxConnections: 1000,  // 最大连接数
        MaxQPS:         500,   // 最大 QPS
        UpdateControl: func(u limit.Updater) {
            // 动态更新限流参数
            go func() {
                ticker := time.NewTicker(time.Second * 10)
                defer ticker.Stop()
                for range ticker.C {
                    u.UpdateLimit(&limit.Option{
                        MaxQPS: getCurrentQPSLimit(),
                    })
                }
            }()
        },
    }),
)

自定义限流器

import "github.com/cloudwego/kitex/pkg/limiter"

// 实现自定义限流器
type MyLimiter struct{}

func (l *MyLimiter) Acquire(ctx context.Context) error {
    // 自定义限流逻辑
    return nil
}

func (l *MyLimiter) Status() (max, current int, windows float64) {
    return 1000, 100, 0.1
}

svr := helloservice.NewServer(new(HelloServiceImpl),
    server.WithQPSLimiter(&MyLimiter{}),
)

中间件配置

服务器中间件

// 定义服务器中间件
func ServerMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
    return func(ctx context.Context, req, resp interface{}) (err error) {
        // 记录请求开始时间
        start := time.Now()
        
        // 执行请求
        err = next(ctx, req, resp)
        
        // 记录请求耗时
        duration := time.Since(start)
        log.Printf("Request took %v", duration)
        
        return err
    }
}

svr := helloservice.NewServer(new(HelloServiceImpl),
    server.WithMiddleware(ServerMiddleware),
)

错误处理中间件

func ErrorHandlerMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
    return func(ctx context.Context, req, resp interface{}) (err error) {
        defer func() {
            if r := recover(); r != nil {
                log.Printf("Panic recovered: %v", r)
                err = fmt.Errorf("internal server error")
            }
        }()
        
        return next(ctx, req, resp)
    }
}

优雅关闭

import (
    "os"
    "os/signal"
    "syscall"
)

func main() {
    svr := helloservice.NewServer(new(HelloServiceImpl))
    
    // 启动服务器(非阻塞)
    go func() {
        err := svr.Run()
        if err != nil {
            log.Printf("Server error: %v", err)
        }
    }()
    
    // 等待信号
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit
    
    log.Println("Shutting down server...")
    
    // 优雅关闭
    if err := svr.Stop(); err != nil {
        log.Printf("Server forced to shutdown: %v", err)
    }
    
    log.Println("Server exited")
}

高级特性

泛化调用

JSON 泛化调用

import (
    "github.com/cloudwego/kitex/pkg/generic"
    "github.com/cloudwego/kitex/client/genericclient"
)

// 创建泛化客户端
g, err := generic.JSONThriftGeneric("./hello.thrift")
if err != nil {
    log.Fatal(err)
}

cli, err := genericclient.NewClient("hello", g,
    client.WithHostPorts("127.0.0.1:8888"),
)
if err != nil {
    log.Fatal(err)
}

// 使用 JSON 字符串调用
jsonReq := `{"name": "World"}`
resp, err := cli.GenericCall(context.Background(), "SayHello", jsonReq)
if err != nil {
    log.Fatal(err)
}

log.Println(resp) // JSON 字符串响应

Map 泛化调用

g, err := generic.MapThriftGeneric("./hello.thrift")
if err != nil {
    log.Fatal(err)
}

cli, err := genericclient.NewClient("hello", g)
if err != nil {
    log.Fatal(err)
}

// 使用 Map 调用
req := map[string]interface{}{
    "name": "World",
}

resp, err := cli.GenericCall(context.Background(), "SayHello", req)
if err != nil {
    log.Fatal(err)
}

respMap := resp.(map[string]interface{})
log.Println(respMap["message"])

流式调用

客户端流式调用

// 创建客户端流
stream, err := c.ClientStreaming(context.Background(), "StreamMethod")
if err != nil {
    log.Fatal(err)
}

// 发送多个请求
for i := 0; i < 10; i++ {
    req := &hello.HelloRequest{Name: fmt.Sprintf("Request-%d", i)}
    if err := stream.Send(req); err != nil {
        log.Fatal(err)
    }
}

// 关闭发送并接收响应
resp, err := stream.CloseAndRecv()
if err != nil {
    log.Fatal(err)
}

log.Println(resp.Message)

服务端流式调用

// 发送请求
req := &hello.HelloRequest{Name: "World"}
stream, err := c.ServerStreaming(context.Background(), "StreamMethod", req)
if err != nil {
    log.Fatal(err)
}

// 接收多个响应
for {
    resp, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatal(err)
    }
    log.Println(resp.Message)
}

双向流式调用

// 创建双向流
stream, err := c.BidirectionalStreaming(context.Background(), "StreamMethod")
if err != nil {
    log.Fatal(err)
}

// 并发发送和接收
go func() {
    for i := 0; i < 10; i++ {
        req := &hello.HelloRequest{Name: fmt.Sprintf("Request-%d", i)}
        if err := stream.Send(req); err != nil {
            log.Printf("Send error: %v", err)
            return
        }
    }
    stream.CloseSend()
}()

for {
    resp, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatal(err)
    }
    log.Println(resp.Message)
}

元数据传输

发送元数据

import "github.com/bytedance/gopkg/cloud/metainfo"

// 在客户端设置元数据
ctx := metainfo.WithPersistentValue(context.Background(), "user-id", "123456")
ctx = metainfo.WithPersistentValue(ctx, "trace-id", "abc-def-ghi")

resp, err := c.SayHello(ctx, req)

接收元数据

// 在服务端获取元数据
func (s *HelloServiceImpl) SayHello(ctx context.Context, req *hello.HelloRequest) (*hello.HelloResponse, error) {
    userID, ok := metainfo.GetPersistentValue(ctx, "user-id")
    if ok {
        log.Printf("User ID: %s", userID)
    }
    
    traceID, ok := metainfo.GetPersistentValue(ctx, "trace-id")
    if ok {
        log.Printf("Trace ID: %s", traceID)
    }
    
    return &hello.HelloResponse{
        Message: "Hello " + req.Name,
    }, nil
}

自定义编解码器

import "github.com/cloudwego/kitex/pkg/remote/codec"

// 实现自定义编解码器
type MyCodec struct{}

func (c *MyCodec) Marshal(ctx context.Context, message remote.Message, out remote.ByteBuffer) error {
    // 自定义编码逻辑
    return nil
}

func (c *MyCodec) Unmarshal(ctx context.Context, message remote.Message, in remote.ByteBuffer) error {
    // 自定义解码逻辑
    return nil
}

func (c *MyCodec) Name() string {
    return "my-codec"
}

// 注册编解码器
codec.RegisterPayloadCodec(&MyCodec{})

// 使用自定义编解码器
c, err := helloservice.NewClient("hello",
    client.WithPayloadCodec(&MyCodec{}),
)

配置参考

客户端配置选项

选项 类型 默认值 说明
WithHostPorts []string - 目标服务地址列表
WithRPCTimeout time.Duration 0 RPC 调用超时时间
WithConnectTimeout time.Duration 50ms 连接超时时间
WithReadWriteTimeout time.Duration 0 读写超时时间
WithResolver discovery.Resolver - 服务发现解析器
WithLoadBalancer loadbalance.Loadbalancer 轮询 负载均衡器
WithRetryPolicy retry.Policy - 重试策略
WithCircuitBreaker circuitbreak.CBSuite - 熔断器
WithMiddleware endpoint.Middleware - 中间件
WithTransportProtocol transport.Protocol TTHeader 传输协议

服务端配置选项

选项 类型 默认值 说明
WithServiceAddr net.Addr :8888 服务监听地址
WithReadWriteTimeout time.Duration 5s 读写超时时间
WithRegistry registry.Registry - 服务注册器
WithLimit *limit.Option - 限流配置
WithMiddleware endpoint.Middleware - 中间件
WithServerBasicInfo *rpcinfo.EndpointBasicInfo - 服务基本信息
WithExitWaitTime time.Duration 5s 退出等待时间

环境变量配置

环境变量 默认值 说明
KITEX_CONF_DIR conf 配置文件目录
KITEX_CONF_FILE kitex.yml 配置文件名
KITEX_LOG_DIR log 日志目录
KITEX_RUNTIME_ROOT - 运行时根目录

故障排查

常见错误

1. 连接超时

错误信息: dial tcp: i/o timeout

可能原因:

  • 目标服务不可达
  • 网络延迟过高
  • 连接超时时间设置过短

解决方案:

// 增加连接超时时间
client.WithConnectTimeout(time.Second * 2)

// 检查网络连通性
// ping target_host

// 检查服务是否正常运行
// telnet target_host target_port

2. RPC 调用超时

错误信息: rpc timeout

可能原因:

  • 服务端处理时间过长
  • RPC 超时时间设置过短
  • 网络延迟

解决方案:

// 增加 RPC 超时时间
client.WithRPCTimeout(time.Second * 10)

// 使用调用级别超时
resp, err := c.SayHello(ctx, req, 
    callopt.WithRPCTimeout(time.Second * 30))

// 启用重试
client.WithFailureRetry(retry.NewFailurePolicy())

3. 服务发现失败

错误信息: no available instance

可能原因:

  • 服务未注册
  • 注册中心不可用
  • 服务实例全部下线

解决方案:

// 检查服务注册状态
// 确保服务端正确注册服务

// 使用直连方式测试
client.WithHostPorts("127.0.0.1:8888")

// 检查注册中心状态
// 确保注册中心正常运行

4. 熔断器打开

错误信息: circuit breaker is open

可能原因:

  • 错误率超过阈值
  • 服务端异常

解决方案:

// 调整熔断器参数
client.WithCircuitBreaker(circuitbreak.NewCBSuite(
    circuitbreak.WithServiceCBConfig(circuitbreak.CBConfig{
        ErrRate:   0.8,   // 提高错误率阈值
        MinSample: 500,   // 增加最小采样数
    }),
))

// 检查服务端日志
// 修复服务端问题后等待熔断器自动恢复

调试技巧

1. 启用详细日志

import "github.com/cloudwego/kitex/pkg/klog"

// 设置日志级别
klog.SetLevel(klog.LevelDebug)

// 自定义日志输出
klog.SetOutput(os.Stdout)

2. 使用调试中间件

func DebugMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
    return func(ctx context.Context, req, resp interface{}) (err error) {
        // 打印请求信息
        ri := rpcinfo.GetRPCInfo(ctx)
        log.Printf("Request: service=%s, method=%s, req=%+v", 
            ri.To().ServiceName(), ri.To().Method(), req)
        
        start := time.Now()
        err = next(ctx, req, resp)
        duration := time.Since(start)
        
        // 打印响应信息
        log.Printf("Response: duration=%v, resp=%+v, err=%v", 
            duration, resp, err)
        
        return err
    }
}

3. 监控指标

import "github.com/kitex-contrib/monitor-prometheus"

// 启用 Prometheus 监控
client.WithSuite(prometheus.NewClientSuite())
server.WithSuite(prometheus.NewServerSuite())

// 访问指标端点
// http://localhost:9090/metrics

性能调优

1. 连接池优化

// 长连接池配置
client.WithLongConnection(connpool.IdleConfig{
    MaxIdlePerAddress: 50,    // 根据并发量调整
    MaxIdleGlobal:     500,   // 根据总连接数调整
    MaxIdleTimeout:    time.Minute * 5, // 适当延长空闲时间
})

2. 序列化优化

// 使用 Frugal 优化 Thrift 序列化性能
import "github.com/cloudwego/frugal"

// 在生成代码时启用 Frugal
// kitex -use github.com/cloudwego/frugal hello.thrift

3. 网络优化

// 使用 Netpoll 网络库(默认启用)
// 调整网络参数
client.WithDialer(netpoll.NewDialer())

总结

本手册涵盖了 Kitex 框架的主要使用方法和最佳实践。通过合理配置各项参数和使用高级特性,可以构建高性能、高可靠的微服务系统。

关键要点:

  1. 正确配置超时和重试: 避免级联故障
  2. 合理使用连接池: 提高性能和资源利用率
  3. 启用监控和链路追踪: 便于问题定位和性能分析
  4. 实现优雅关闭: 确保服务稳定性
  5. 定期性能调优: 根据业务场景优化配置参数

更多详细信息请参考 Kitex 官方文档