1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
| class PerformanceCallbackHandler(BaseCallbackHandler):
"""性能监控回调处理器"""
def __init__(self):
self.chain_starts: Dict[UUID, float] = {}
self.llm_starts: Dict[UUID, float] = {}
self.tool_starts: Dict[UUID, float] = {}
self.metrics: Dict[str, List[float]] = {
"chain_duration": [],
"llm_duration": [],
"tool_duration": [],
"token_usage": []
}
def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], *, run_id: UUID, **kwargs) -> None:
"""记录 Chain 开始时间"""
self.chain_starts[run_id] = time.time()
def on_chain_end(self, outputs: Dict[str, Any], *, run_id: UUID, **kwargs) -> None:
"""计算 Chain 执行时间"""
if run_id in self.chain_starts:
duration = time.time() - self.chain_starts[run_id]
self.metrics["chain_duration"].append(duration)
del self.chain_starts[run_id]
def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], *, run_id: UUID, **kwargs) -> None:
"""记录 LLM 开始时间"""
self.llm_starts[run_id] = time.time()
def on_llm_end(self, response: LLMResult, *, run_id: UUID, **kwargs) -> None:
"""计算 LLM 执行时间和 Token 使用"""
if run_id in self.llm_starts:
duration = time.time() - self.llm_starts[run_id]
self.metrics["llm_duration"].append(duration)
del self.llm_starts[run_id]
# 记录 Token 使用
if response.llm_output and "token_usage" in response.llm_output:
total_tokens = response.llm_output["token_usage"].get("total_tokens", 0)
self.metrics["token_usage"].append(total_tokens)
def get_metrics_summary(self) -> Dict[str, Dict[str, float]]:
"""获取指标摘要"""
summary = {}
for metric_name, values in self.metrics.items():
if values:
summary[metric_name] = {
"count": len(values),
"total": sum(values),
"avg": sum(values) / len(values),
"min": min(values),
"max": max(values),
"p50": np.percentile(values, 50),
"p95": np.percentile(values, 95),
"p99": np.percentile(values, 99)
}
return summary
|