Kitex 框架使用手册
目录
快速开始
环境要求
- Go 1.20 或更高版本
- 支持的协议: Thrift, Protobuf, gRPC
安装
# 安装 Kitex 工具
go install github.com/cloudwego/kitex/tool/cmd/kitex@latest
# 安装 Thriftgo(如果使用 Thrift)
go install github.com/cloudwego/thriftgo@latest
定义服务接口
使用 Thrift IDL
// hello.thrift
namespace go hello
struct HelloRequest {
1: string name
}
struct HelloResponse {
1: string message
}
service HelloService {
HelloResponse SayHello(1: HelloRequest req)
}
使用 Protobuf IDL
// hello.proto
syntax = "proto3";
package hello;
option go_package = "hello";
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
service HelloService {
rpc SayHello(HelloRequest) returns (HelloResponse);
}
生成代码
# 生成 Thrift 代码
kitex -module example -service hello hello.thrift
# 生成 Protobuf 代码
kitex -module example -service hello hello.proto
实现服务端
package main
import (
"context"
"log"
"github.com/cloudwego/kitex/server"
"example/kitex_gen/hello"
"example/kitex_gen/hello/helloservice"
)
// HelloServiceImpl 实现 HelloService 接口
type HelloServiceImpl struct{}
// SayHello 实现 SayHello 方法
func (s *HelloServiceImpl) SayHello(ctx context.Context, req *hello.HelloRequest) (*hello.HelloResponse, error) {
return &hello.HelloResponse{
Message: "Hello " + req.Name,
}, nil
}
func main() {
// 创建服务器
svr := helloservice.NewServer(new(HelloServiceImpl))
// 启动服务器
err := svr.Run()
if err != nil {
log.Println(err.Error())
}
}
实现客户端
package main
import (
"context"
"log"
"example/kitex_gen/hello"
"example/kitex_gen/hello/helloservice"
"github.com/cloudwego/kitex/client"
)
func main() {
// 创建客户端
c, err := helloservice.NewClient("hello", client.WithHostPorts("0.0.0.0:8888"))
if err != nil {
log.Fatal(err)
}
// 发起调用
req := &hello.HelloRequest{Name: "World"}
resp, err := c.SayHello(context.Background(), req)
if err != nil {
log.Fatal(err)
}
log.Println(resp.Message)
}
基础概念
核心组件
ServiceInfo
服务信息描述符,包含服务的元数据信息:
type ServiceInfo struct {
ServiceName string // 服务名称
Methods map[string]MethodInfo // 方法信息
PayloadCodec PayloadCodec // 编解码器
HandlerType interface{} // 处理器类型
Extra map[string]interface{} // 扩展信息
}
RPCInfo
RPC 调用信息,包含调用的上下文信息:
type RPCInfo interface {
From() EndpointInfo // 调用方信息
To() EndpointInfo // 被调用方信息
Invocation() Invocation // 调用信息
Config() RPCConfig // RPC 配置
Stats() RPCStats // 统计信息
}
Message
消息抽象,封装了 RPC 调用的数据:
type Message interface {
RPCInfo() rpcinfo.RPCInfo
Data() interface{}
MessageType() MessageType
PayloadCodec() PayloadCodec
TransInfo() TransInfo
}
传输协议
TTHeader
Kitex 默认的传输协议,支持元数据传输:
client.WithTransportProtocol(transport.TTHeader)
gRPC
标准的 gRPC 协议:
client.WithTransportProtocol(transport.GRPC)
HTTP2
HTTP/2 协议支持:
client.WithTransportProtocol(transport.HTTP2)
客户端使用
基本配置
创建客户端
import (
"github.com/cloudwego/kitex/client"
"example/kitex_gen/hello/helloservice"
)
// 基础客户端
c, err := helloservice.NewClient("hello")
// 带选项的客户端
c, err := helloservice.NewClient("hello",
client.WithHostPorts("127.0.0.1:8888"),
client.WithRPCTimeout(time.Second*3),
client.WithConnectTimeout(time.Millisecond*500),
)
服务发现
import "github.com/kitex-contrib/registry-consul"
// 使用 Consul 服务发现
resolver, err := consul.NewConsulResolver("127.0.0.1:8500")
if err != nil {
log.Fatal(err)
}
c, err := helloservice.NewClient("hello",
client.WithResolver(resolver),
)
负载均衡
import "github.com/cloudwego/kitex/pkg/loadbalance"
// 轮询负载均衡
c, err := helloservice.NewClient("hello",
client.WithLoadBalancer(loadbalance.NewWeightedRoundRobinBalancer()),
)
// 一致性哈希负载均衡
c, err := helloservice.NewClient("hello",
client.WithLoadBalancer(loadbalance.NewConsistentHashBalancer(
loadbalance.NewConsistentHashOption(func(ctx context.Context, request interface{}) string {
// 返回哈希键
return "user_123"
}),
)),
)
超时配置
多层超时
c, err := helloservice.NewClient("hello",
// RPC 总超时时间
client.WithRPCTimeout(time.Second*5),
// 连接超时时间
client.WithConnectTimeout(time.Millisecond*500),
// 读写超时时间
client.WithReadWriteTimeout(time.Second*2),
)
动态超时
import "github.com/cloudwego/kitex/client/callopt"
// 在调用时指定超时
resp, err := c.SayHello(context.Background(), req,
callopt.WithRPCTimeout(time.Second*10),
)
重试配置
失败重试
import "github.com/cloudwego/kitex/pkg/retry"
c, err := helloservice.NewClient("hello",
client.WithFailureRetry(retry.NewFailurePolicy(
retry.WithMaxRetryTimes(3), // 最大重试次数
retry.WithMaxDurationMS(10000), // 最大重试时长
retry.WithInitialDelay(10), // 初始延迟
retry.WithMaxDelay(100), // 最大延迟
retry.WithDelayPolicy(retry.BackOffDelayPolicy), // 退避策略
)),
)
备份请求
c, err := helloservice.NewClient("hello",
client.WithBackupRequest(retry.NewBackupPolicy(
retry.WithRetryDelayMS(100), // 备份请求延迟
retry.WithStopPolicy(retry.StopPolicyType(1)), // 停止策略
)),
)
熔断配置
import "github.com/cloudwego/kitex/pkg/circuitbreak"
c, err := helloservice.NewClient("hello",
client.WithCircuitBreaker(circuitbreak.NewCBSuite(
// 服务级熔断
circuitbreak.WithServiceCBConfig(circuitbreak.CBConfig{
Enable: true,
ErrRate: 0.5, // 错误率阈值 50%
MinSample: 200, // 最小采样数
}),
// 实例级熔断
circuitbreak.WithInstanceCBConfig(circuitbreak.CBConfig{
Enable: true,
ErrRate: 0.3, // 错误率阈值 30%
MinSample: 100, // 最小采样数
}),
)),
)
连接池配置
短连接池
import "github.com/cloudwego/kitex/pkg/connpool"
c, err := helloservice.NewClient("hello",
client.WithShortConnection(),
)
长连接池
c, err := helloservice.NewClient("hello",
client.WithLongConnection(connpool.IdleConfig{
MaxIdlePerAddress: 10, // 每个地址最大空闲连接数
MaxIdleGlobal: 100, // 全局最大空闲连接数
MaxIdleTimeout: time.Minute * 3, // 空闲超时时间
MinIdlePerAddress: 2, // 每个地址最小空闲连接数
MaxConnPerAddress: 50, // 每个地址最大连接数
}),
)
中间件使用
自定义中间件
import "github.com/cloudwego/kitex/pkg/endpoint"
// 定义中间件
func MyMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
// 前置处理
log.Printf("Before call: %+v", req)
// 调用下一个中间件或实际处理函数
err = next(ctx, req, resp)
// 后置处理
log.Printf("After call: %+v, err: %v", resp, err)
return err
}
}
// 使用中间件
c, err := helloservice.NewClient("hello",
client.WithMiddleware(MyMiddleware),
)
内置中间件
// 链路追踪中间件
import "github.com/kitex-contrib/tracer-opentracing"
c, err := helloservice.NewClient("hello",
client.WithSuite(opentracing.NewDefaultClientSuite()),
)
// 监控中间件
import "github.com/kitex-contrib/monitor-prometheus"
c, err := helloservice.NewClient("hello",
client.WithSuite(prometheus.NewClientSuite()),
)
服务端使用
基本配置
创建服务器
import "github.com/cloudwego/kitex/server"
// 基础服务器
svr := helloservice.NewServer(new(HelloServiceImpl))
// 带选项的服务器
svr := helloservice.NewServer(new(HelloServiceImpl),
server.WithServiceAddr(&net.TCPAddr{Port: 8888}),
server.WithReadWriteTimeout(time.Second*5),
)
服务注册
import "github.com/kitex-contrib/registry-consul"
// 使用 Consul 服务注册
registry, err := consul.NewConsulRegistry("127.0.0.1:8500")
if err != nil {
log.Fatal(err)
}
svr := helloservice.NewServer(new(HelloServiceImpl),
server.WithRegistry(registry),
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{
ServiceName: "hello",
Tags: map[string]string{
"version": "v1.0.0",
},
}),
)
限流配置
QPS 限流
import "github.com/cloudwego/kitex/pkg/limit"
svr := helloservice.NewServer(new(HelloServiceImpl),
server.WithLimit(&limit.Option{
MaxConnections: 1000, // 最大连接数
MaxQPS: 500, // 最大 QPS
UpdateControl: func(u limit.Updater) {
// 动态更新限流参数
go func() {
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for range ticker.C {
u.UpdateLimit(&limit.Option{
MaxQPS: getCurrentQPSLimit(),
})
}
}()
},
}),
)
自定义限流器
import "github.com/cloudwego/kitex/pkg/limiter"
// 实现自定义限流器
type MyLimiter struct{}
func (l *MyLimiter) Acquire(ctx context.Context) error {
// 自定义限流逻辑
return nil
}
func (l *MyLimiter) Status() (max, current int, windows float64) {
return 1000, 100, 0.1
}
svr := helloservice.NewServer(new(HelloServiceImpl),
server.WithQPSLimiter(&MyLimiter{}),
)
中间件配置
服务器中间件
// 定义服务器中间件
func ServerMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
// 记录请求开始时间
start := time.Now()
// 执行请求
err = next(ctx, req, resp)
// 记录请求耗时
duration := time.Since(start)
log.Printf("Request took %v", duration)
return err
}
}
svr := helloservice.NewServer(new(HelloServiceImpl),
server.WithMiddleware(ServerMiddleware),
)
错误处理中间件
func ErrorHandlerMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic recovered: %v", r)
err = fmt.Errorf("internal server error")
}
}()
return next(ctx, req, resp)
}
}
优雅关闭
import (
"os"
"os/signal"
"syscall"
)
func main() {
svr := helloservice.NewServer(new(HelloServiceImpl))
// 启动服务器(非阻塞)
go func() {
err := svr.Run()
if err != nil {
log.Printf("Server error: %v", err)
}
}()
// 等待信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
// 优雅关闭
if err := svr.Stop(); err != nil {
log.Printf("Server forced to shutdown: %v", err)
}
log.Println("Server exited")
}
高级特性
泛化调用
JSON 泛化调用
import (
"github.com/cloudwego/kitex/pkg/generic"
"github.com/cloudwego/kitex/client/genericclient"
)
// 创建泛化客户端
g, err := generic.JSONThriftGeneric("./hello.thrift")
if err != nil {
log.Fatal(err)
}
cli, err := genericclient.NewClient("hello", g,
client.WithHostPorts("127.0.0.1:8888"),
)
if err != nil {
log.Fatal(err)
}
// 使用 JSON 字符串调用
jsonReq := `{"name": "World"}`
resp, err := cli.GenericCall(context.Background(), "SayHello", jsonReq)
if err != nil {
log.Fatal(err)
}
log.Println(resp) // JSON 字符串响应
Map 泛化调用
g, err := generic.MapThriftGeneric("./hello.thrift")
if err != nil {
log.Fatal(err)
}
cli, err := genericclient.NewClient("hello", g)
if err != nil {
log.Fatal(err)
}
// 使用 Map 调用
req := map[string]interface{}{
"name": "World",
}
resp, err := cli.GenericCall(context.Background(), "SayHello", req)
if err != nil {
log.Fatal(err)
}
respMap := resp.(map[string]interface{})
log.Println(respMap["message"])
流式调用
客户端流式调用
// 创建客户端流
stream, err := c.ClientStreaming(context.Background(), "StreamMethod")
if err != nil {
log.Fatal(err)
}
// 发送多个请求
for i := 0; i < 10; i++ {
req := &hello.HelloRequest{Name: fmt.Sprintf("Request-%d", i)}
if err := stream.Send(req); err != nil {
log.Fatal(err)
}
}
// 关闭发送并接收响应
resp, err := stream.CloseAndRecv()
if err != nil {
log.Fatal(err)
}
log.Println(resp.Message)
服务端流式调用
// 发送请求
req := &hello.HelloRequest{Name: "World"}
stream, err := c.ServerStreaming(context.Background(), "StreamMethod", req)
if err != nil {
log.Fatal(err)
}
// 接收多个响应
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
log.Println(resp.Message)
}
双向流式调用
// 创建双向流
stream, err := c.BidirectionalStreaming(context.Background(), "StreamMethod")
if err != nil {
log.Fatal(err)
}
// 并发发送和接收
go func() {
for i := 0; i < 10; i++ {
req := &hello.HelloRequest{Name: fmt.Sprintf("Request-%d", i)}
if err := stream.Send(req); err != nil {
log.Printf("Send error: %v", err)
return
}
}
stream.CloseSend()
}()
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
log.Println(resp.Message)
}
元数据传输
发送元数据
import "github.com/bytedance/gopkg/cloud/metainfo"
// 在客户端设置元数据
ctx := metainfo.WithPersistentValue(context.Background(), "user-id", "123456")
ctx = metainfo.WithPersistentValue(ctx, "trace-id", "abc-def-ghi")
resp, err := c.SayHello(ctx, req)
接收元数据
// 在服务端获取元数据
func (s *HelloServiceImpl) SayHello(ctx context.Context, req *hello.HelloRequest) (*hello.HelloResponse, error) {
userID, ok := metainfo.GetPersistentValue(ctx, "user-id")
if ok {
log.Printf("User ID: %s", userID)
}
traceID, ok := metainfo.GetPersistentValue(ctx, "trace-id")
if ok {
log.Printf("Trace ID: %s", traceID)
}
return &hello.HelloResponse{
Message: "Hello " + req.Name,
}, nil
}
自定义编解码器
import "github.com/cloudwego/kitex/pkg/remote/codec"
// 实现自定义编解码器
type MyCodec struct{}
func (c *MyCodec) Marshal(ctx context.Context, message remote.Message, out remote.ByteBuffer) error {
// 自定义编码逻辑
return nil
}
func (c *MyCodec) Unmarshal(ctx context.Context, message remote.Message, in remote.ByteBuffer) error {
// 自定义解码逻辑
return nil
}
func (c *MyCodec) Name() string {
return "my-codec"
}
// 注册编解码器
codec.RegisterPayloadCodec(&MyCodec{})
// 使用自定义编解码器
c, err := helloservice.NewClient("hello",
client.WithPayloadCodec(&MyCodec{}),
)
配置参考
客户端配置选项
选项 | 类型 | 默认值 | 说明 |
---|---|---|---|
WithHostPorts | []string | - | 目标服务地址列表 |
WithRPCTimeout | time.Duration | 0 | RPC 调用超时时间 |
WithConnectTimeout | time.Duration | 50ms | 连接超时时间 |
WithReadWriteTimeout | time.Duration | 0 | 读写超时时间 |
WithResolver | discovery.Resolver | - | 服务发现解析器 |
WithLoadBalancer | loadbalance.Loadbalancer | 轮询 | 负载均衡器 |
WithRetryPolicy | retry.Policy | - | 重试策略 |
WithCircuitBreaker | circuitbreak.CBSuite | - | 熔断器 |
WithMiddleware | endpoint.Middleware | - | 中间件 |
WithTransportProtocol | transport.Protocol | TTHeader | 传输协议 |
服务端配置选项
选项 | 类型 | 默认值 | 说明 |
---|---|---|---|
WithServiceAddr | net.Addr | :8888 | 服务监听地址 |
WithReadWriteTimeout | time.Duration | 5s | 读写超时时间 |
WithRegistry | registry.Registry | - | 服务注册器 |
WithLimit | *limit.Option | - | 限流配置 |
WithMiddleware | endpoint.Middleware | - | 中间件 |
WithServerBasicInfo | *rpcinfo.EndpointBasicInfo | - | 服务基本信息 |
WithExitWaitTime | time.Duration | 5s | 退出等待时间 |
环境变量配置
环境变量 | 默认值 | 说明 |
---|---|---|
KITEX_CONF_DIR | conf | 配置文件目录 |
KITEX_CONF_FILE | kitex.yml | 配置文件名 |
KITEX_LOG_DIR | log | 日志目录 |
KITEX_RUNTIME_ROOT | - | 运行时根目录 |
故障排查
常见错误
1. 连接超时
错误信息: dial tcp: i/o timeout
可能原因:
- 目标服务不可达
- 网络延迟过高
- 连接超时时间设置过短
解决方案:
// 增加连接超时时间
client.WithConnectTimeout(time.Second * 2)
// 检查网络连通性
// ping target_host
// 检查服务是否正常运行
// telnet target_host target_port
2. RPC 调用超时
错误信息: rpc timeout
可能原因:
- 服务端处理时间过长
- RPC 超时时间设置过短
- 网络延迟
解决方案:
// 增加 RPC 超时时间
client.WithRPCTimeout(time.Second * 10)
// 使用调用级别超时
resp, err := c.SayHello(ctx, req,
callopt.WithRPCTimeout(time.Second * 30))
// 启用重试
client.WithFailureRetry(retry.NewFailurePolicy())
3. 服务发现失败
错误信息: no available instance
可能原因:
- 服务未注册
- 注册中心不可用
- 服务实例全部下线
解决方案:
// 检查服务注册状态
// 确保服务端正确注册服务
// 使用直连方式测试
client.WithHostPorts("127.0.0.1:8888")
// 检查注册中心状态
// 确保注册中心正常运行
4. 熔断器打开
错误信息: circuit breaker is open
可能原因:
- 错误率超过阈值
- 服务端异常
解决方案:
// 调整熔断器参数
client.WithCircuitBreaker(circuitbreak.NewCBSuite(
circuitbreak.WithServiceCBConfig(circuitbreak.CBConfig{
ErrRate: 0.8, // 提高错误率阈值
MinSample: 500, // 增加最小采样数
}),
))
// 检查服务端日志
// 修复服务端问题后等待熔断器自动恢复
调试技巧
1. 启用详细日志
import "github.com/cloudwego/kitex/pkg/klog"
// 设置日志级别
klog.SetLevel(klog.LevelDebug)
// 自定义日志输出
klog.SetOutput(os.Stdout)
2. 使用调试中间件
func DebugMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
// 打印请求信息
ri := rpcinfo.GetRPCInfo(ctx)
log.Printf("Request: service=%s, method=%s, req=%+v",
ri.To().ServiceName(), ri.To().Method(), req)
start := time.Now()
err = next(ctx, req, resp)
duration := time.Since(start)
// 打印响应信息
log.Printf("Response: duration=%v, resp=%+v, err=%v",
duration, resp, err)
return err
}
}
3. 监控指标
import "github.com/kitex-contrib/monitor-prometheus"
// 启用 Prometheus 监控
client.WithSuite(prometheus.NewClientSuite())
server.WithSuite(prometheus.NewServerSuite())
// 访问指标端点
// http://localhost:9090/metrics
性能调优
1. 连接池优化
// 长连接池配置
client.WithLongConnection(connpool.IdleConfig{
MaxIdlePerAddress: 50, // 根据并发量调整
MaxIdleGlobal: 500, // 根据总连接数调整
MaxIdleTimeout: time.Minute * 5, // 适当延长空闲时间
})
2. 序列化优化
// 使用 Frugal 优化 Thrift 序列化性能
import "github.com/cloudwego/frugal"
// 在生成代码时启用 Frugal
// kitex -use github.com/cloudwego/frugal hello.thrift
3. 网络优化
// 使用 Netpoll 网络库(默认启用)
// 调整网络参数
client.WithDialer(netpoll.NewDialer())
总结
本手册涵盖了 Kitex 框架的主要使用方法和最佳实践。通过合理配置各项参数和使用高级特性,可以构建高性能、高可靠的微服务系统。
关键要点:
- 正确配置超时和重试: 避免级联故障
- 合理使用连接池: 提高性能和资源利用率
- 启用监控和链路追踪: 便于问题定位和性能分析
- 实现优雅关闭: 确保服务稳定性
- 定期性能调优: 根据业务场景优化配置参数
更多详细信息请参考 Kitex 官方文档。