1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
| # /autogpt_platform/autogpt_libs/autogpt_libs/rate_limit/limiter.py
import time
import logging
from typing import Tuple, Optional
from redis import Redis
from redis.exceptions import RedisError
from .config import RATE_LIMIT_SETTINGS
logger = logging.getLogger(__name__)
class RateLimiter:
"""
基于Redis的分布式限流器
特性:
1. 滑动窗口算法:精确控制请求频率
2. 分布式支持:多实例间共享限流状态
3. 自动过期:自动清理过期的请求记录
4. 实时统计:提供剩余请求数和重置时间
算法原理:
使用Redis的有序集合(sorted set)存储请求时间戳,
通过滑动时间窗口清理过期请求,统计当前窗口内的请求数量。
"""
def __init__(
self,
redis_host: str = RATE_LIMIT_SETTINGS.redis_host,
redis_port: str = RATE_LIMIT_SETTINGS.redis_port,
redis_password: str = RATE_LIMIT_SETTINGS.redis_password,
requests_per_minute: int = RATE_LIMIT_SETTINGS.requests_per_minute,
window_size: int = 60, # 时间窗口大小(秒)
):
"""
初始化分布式限流器
参数:
redis_host: Redis服务器主机
redis_port: Redis服务器端口
redis_password: Redis密码
requests_per_minute: 每分钟最大请求数
window_size: 滑动窗口大小(秒)
"""
try:
self.redis = Redis(
host=redis_host,
port=int(redis_port),
password=redis_password,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
health_check_interval=30,
)
# 测试Redis连接
self.redis.ping()
logger.info(f"Redis连接成功: {redis_host}:{redis_port}")
except RedisError as e:
logger.error(f"Redis连接失败: {e}")
raise
self.window = window_size
self.max_requests = requests_per_minute
logger.info(
f"限流器初始化完成: {requests_per_minute}请求/{window_size}秒"
)
async def check_rate_limit(self, api_key_id: str) -> Tuple[bool, int, int]:
"""
检查请求是否在限流范围内
算法步骤:
1. 计算当前滑动窗口的开始时间
2. 使用Redis Pipeline批量执行操作:
- 删除窗口外的旧请求记录
- 添加当前请求时间戳
- 统计当前窗口内的请求数量
- 设置键的过期时间
3. 计算剩余请求数和重置时间
4. 返回是否允许请求及相关信息
参数:
api_key_id: API密钥标识符
返回:
元组:(是否允许请求, 剩余请求数, 重置时间戳)
"""
try:
now = time.time()
window_start = now - self.window
key = f"ratelimit:{api_key_id}:1min"
# 使用Redis Pipeline提高性能
pipe = self.redis.pipeline()
# 1. 删除窗口外的旧请求记录
pipe.zremrangebyscore(key, 0, window_start)
# 2. 添加当前请求时间戳到有序集合
pipe.zadd(key, {str(now): now})
# 3. 统计当前窗口内的请求数量
pipe.zcount(key, window_start, now)
# 4. 设置键的过期时间(防止内存泄漏)
pipe.expire(key, self.window)
# 执行批量操作
results = pipe.execute()
# 解析结果
_, _, request_count, _ = results
# 计算剩余请求数和重置时间
remaining = max(0, self.max_requests - request_count)
reset_time = int(now + self.window)
# 判断是否允许请求
is_allowed = request_count <= self.max_requests
# 记录限流日志
if not is_allowed:
logger.warning(
f"API密钥 {api_key_id} 超出限流: {request_count}/{self.max_requests}"
)
else:
logger.debug(
f"API密钥 {api_key_id} 请求通过: {request_count}/{self.max_requests}"
)
return is_allowed, remaining, reset_time
except RedisError as e:
logger.error(f"Redis操作失败: {e}")
# Redis故障时的降级策略:允许请求通过
return True, self.max_requests, int(time.time() + self.window)
except Exception as e:
logger.error(f"限流检查异常: {e}")
# 其他异常时的降级策略:允许请求通过
return True, self.max_requests, int(time.time() + self.window)
def get_rate_limit_info(self, api_key_id: str) -> dict:
"""
获取API密钥的限流状态信息
参数:
api_key_id: API密钥标识符
返回:
包含限流状态的字典
"""
try:
now = time.time()
window_start = now - self.window
key = f"ratelimit:{api_key_id}:1min"
# 获取当前窗口内的请求数量
request_count = self.redis.zcount(key, window_start, now)
remaining = max(0, self.max_requests - request_count)
reset_time = int(now + self.window)
return {
"limit": self.max_requests,
"remaining": remaining,
"reset": reset_time,
"current": request_count,
"window_size": self.window,
}
except RedisError as e:
logger.error(f"获取限流信息失败: {e}")
return {
"limit": self.max_requests,
"remaining": self.max_requests,
"reset": int(time.time() + self.window),
"current": 0,
"window_size": self.window,
}
def reset_rate_limit(self, api_key_id: str) -> bool:
"""
重置API密钥的限流计数器(管理员功能)
参数:
api_key_id: API密钥标识符
返回:
是否重置成功
"""
try:
key = f"ratelimit:{api_key_id}:1min"
self.redis.delete(key)
logger.info(f"已重置API密钥 {api_key_id} 的限流计数器")
return True
except RedisError as e:
logger.error(f"重置限流计数器失败: {e}")
return False
|