1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
| # /autogpt_platform/backend/backend/blocks/agent_reasoning.py
import asyncio
import time
import uuid
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
class ReasoningPhase(Enum):
"""推理阶段枚举"""
PERCEPTION = "perception" # 感知阶段
ANALYSIS = "analysis" # 分析阶段
PLANNING = "planning" # 规划阶段
ACTION = "action" # 行动阶段
OBSERVATION = "observation" # 观察阶段
REFLECTION = "reflection" # 反思阶段
@dataclass
class ReasoningStep:
"""推理步骤数据结构"""
phase: ReasoningPhase
input_data: Dict[str, Any]
reasoning_content: str
action_plan: Optional[Dict[str, Any]] = None
execution_result: Optional[Dict[str, Any]] = None
reflection_notes: Optional[str] = None
timestamp: float = 0.0
step_id: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = time.time()
if not self.step_id:
self.step_id = str(uuid.uuid4())
class ReActReasoningEngine:
"""
ReAct推理引擎核心实现
基于网上深度技术文章的ReAct方法论,实现完整的推理-行动循环
核心特性:
1. 结构化推理过程
2. 上下文感知决策
3. 自适应策略调整
4. 经验学习机制
"""
def __init__(self, llm_client, tool_registry, memory_manager):
self.llm_client = llm_client
self.tool_registry = tool_registry
self.memory_manager = memory_manager
# 推理历史和上下文
self.reasoning_history: List[ReasoningStep] = []
self.current_context = {}
self.goal_stack = []
# 提示模板
self.prompt_templates = self._initialize_prompt_templates()
# 推理配置
self.max_reasoning_steps = 50
self.max_thinking_time = 300 # 5分钟思考时间限制
async def reason_and_act(
self,
goal: str,
initial_context: Dict[str, Any] = None,
available_tools: List[str] = None
) -> Dict[str, Any]:
"""
执行ReAct推理和行动循环
这是Agent智能决策的核心方法,实现完整的推理-行动-观察-反思循环
参数:
goal: 要达成的目标描述
initial_context: 初始上下文信息
available_tools: 可用工具列表
返回:
推理和执行的完整结果
"""
logger.info(f"开始ReAct推理循环,目标: {goal}")
# 初始化推理环境
self.current_context = initial_context or {}
self.goal_stack = [goal]
self.reasoning_history.clear()
reasoning_start_time = time.time()
step_count = 0
try:
while self.goal_stack and step_count < self.max_reasoning_steps:
step_count += 1
# 检查时间限制
if time.time() - reasoning_start_time > self.max_thinking_time:
logger.warning("推理时间超限,终止推理循环")
break
current_goal = self.goal_stack[-1] # 当前目标
# 第一阶段:感知和分析
perception_result = await self._perception_phase(current_goal)
analysis_result = await self._analysis_phase(perception_result)
# 第二阶段:规划和决策
planning_result = await self._planning_phase(analysis_result)
# 检查是否需要进一步分解目标
if planning_result.get("requires_decomposition", False):
sub_goals = planning_result.get("sub_goals", [])
self.goal_stack.extend(reversed(sub_goals)) # 逆序添加,后进先出
continue
# 第三阶段:行动执行
action_result = await self._action_phase(planning_result)
# 第四阶段:观察和反思
observation_result = await self._observation_phase(action_result)
reflection_result = await self._reflection_phase(observation_result)
# 更新上下文和记忆
await self._update_context_and_memory(reflection_result)
# 检查目标完成状态
if reflection_result.get("goal_achieved", False):
completed_goal = self.goal_stack.pop()
logger.info(f"目标完成: {completed_goal}")
if not self.goal_stack:
logger.info("所有目标已完成,推理循环结束")
break
elif reflection_result.get("goal_failed", False):
failed_goal = self.goal_stack.pop()
logger.warning(f"目标失败: {failed_goal}")
# 可以选择重新规划或放弃
if reflection_result.get("should_retry", False):
self.goal_stack.append(failed_goal)
# 避免过快循环
await asyncio.sleep(0.1)
# 生成推理总结
reasoning_summary = await self._generate_reasoning_summary()
return {
"goal": goal,
"success": len(self.goal_stack) == 0,
"steps_taken": step_count,
"reasoning_time": time.time() - reasoning_start_time,
"reasoning_history": self.reasoning_history,
"final_context": self.current_context,
"summary": reasoning_summary,
}
except Exception as e:
logger.error(f"ReAct推理循环异常: {e}")
return {
"goal": goal,
"success": False,
"error": str(e),
"steps_taken": step_count,
"reasoning_history": self.reasoning_history,
}
async def _perception_phase(self, goal: str) -> Dict[str, Any]:
"""
感知阶段:收集和分析当前环境信息
感知内容:
1. 当前环境状态
2. 可用资源和工具
3. 历史经验和知识
4. 约束条件和限制
"""
logger.debug(f"感知阶段开始,目标: {goal}")
# 收集环境信息
environment_state = await self._collect_environment_state()
# 检索相关记忆
relevant_memories = await self.memory_manager.retrieve_relevant_memories(
query=goal,
memory_types=["short_term", "long_term", "skill"],
max_results=5
)
# 分析可用工具
available_tools = await self._analyze_available_tools(goal)
# 识别约束条件
constraints = await self._identify_constraints(goal, environment_state)
perception_result = {
"goal": goal,
"environment_state": environment_state,
"relevant_memories": relevant_memories,
"available_tools": available_tools,
"constraints": constraints,
"perception_quality": self._assess_perception_quality(environment_state),
}
# 记录感知步骤
step = ReasoningStep(
phase=ReasoningPhase.PERCEPTION,
input_data={"goal": goal},
reasoning_content=f"感知到环境状态,发现{len(available_tools)}个可用工具,{len(constraints)}个约束条件",
)
self.reasoning_history.append(step)
return perception_result
async def _analysis_phase(self, perception_data: Dict[str, Any]) -> Dict[str, Any]:
"""
分析阶段:深度分析问题本质和解决方案
分析内容:
1. 问题复杂度评估
2. 解决方案候选集生成
3. 资源需求分析
4. 风险因素识别
"""
goal = perception_data["goal"]
environment_state = perception_data["environment_state"]
available_tools = perception_data["available_tools"]
logger.debug(f"分析阶段开始,分析目标: {goal}")
# 构建分析提示词
analysis_prompt = self._build_analysis_prompt(perception_data)
# 调用LLM进行深度分析
analysis_response = await self.llm_client.generate(
prompt=analysis_prompt,
max_tokens=1000,
temperature=0.3, # 较低温度确保逻辑性
)
# 解析分析结果
analysis_result = self._parse_analysis_response(analysis_response)
# 增强分析结果
analysis_result.update({
"complexity_score": await self._calculate_complexity_score(goal),
"resource_requirements": await self._estimate_resource_requirements(analysis_result),
"success_probability": await self._estimate_success_probability(analysis_result),
"alternative_approaches": await self._generate_alternative_approaches(analysis_result),
})
# 记录分析步骤
step = ReasoningStep(
phase=ReasoningPhase.ANALYSIS,
input_data=perception_data,
reasoning_content=analysis_response,
)
self.reasoning_history.append(step)
return analysis_result
async def _planning_phase(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]:
"""
规划阶段:制定详细的执行计划
规划内容:
1. 执行策略选择
2. 步骤序列规划
3. 资源分配方案
4. 应急预案制定
"""
logger.debug("规划阶段开始,制定执行计划")
complexity_score = analysis_data.get("complexity_score", 0.5)
# 检查是否需要分解
if complexity_score > 0.7:
# 复杂任务需要进一步分解
decomposition_result = await self._decompose_complex_goal(analysis_data)
if decomposition_result["should_decompose"]:
return {
"requires_decomposition": True,
"sub_goals": decomposition_result["sub_goals"],
"decomposition_strategy": decomposition_result["strategy"],
}
# 构建执行计划
execution_plan = await self._build_execution_plan(analysis_data)
# 生成应急预案
contingency_plans = await self._generate_contingency_plans(execution_plan)
planning_result = {
"requires_decomposition": False,
"execution_plan": execution_plan,
"contingency_plans": contingency_plans,
"estimated_duration": execution_plan.get("duration", 0),
"confidence_level": execution_plan.get("confidence", 0.5),
}
# 记录规划步骤
step = ReasoningStep(
phase=ReasoningPhase.PLANNING,
input_data=analysis_data,
reasoning_content=f"制定执行计划,预计耗时{execution_plan.get('duration', 0)}秒",
action_plan=execution_plan,
)
self.reasoning_history.append(step)
return planning_result
async def _action_phase(self, planning_data: Dict[str, Any]) -> Dict[str, Any]:
"""
行动阶段:执行具体的操作
执行内容:
1. 工具调用执行
2. 参数动态调整
3. 实时监控和控制
4. 异常处理和恢复
"""
execution_plan = planning_data["execution_plan"]
logger.debug(f"行动阶段开始,执行计划: {execution_plan.get('name', 'unknown')}")
action_start_time = time.time()
try:
# 准备执行环境
execution_context = await self._prepare_execution_context(execution_plan)
# 执行主要行动
primary_result = await self._execute_primary_action(
execution_plan,
execution_context
)
# 执行后续行动(如果需要)
follow_up_results = []
if execution_plan.get("follow_up_actions"):
for follow_up in execution_plan["follow_up_actions"]:
follow_up_result = await self._execute_follow_up_action(
follow_up, execution_context, primary_result
)
follow_up_results.append(follow_up_result)
action_result = {
"success": True,
"primary_result": primary_result,
"follow_up_results": follow_up_results,
"execution_time": time.time() - action_start_time,
"tools_used": execution_plan.get("tools", []),
}
except Exception as e:
logger.error(f"行动执行失败: {e}")
# 尝试应急预案
contingency_result = await self._execute_contingency_plan(
planning_data.get("contingency_plans", []),
str(e)
)
action_result = {
"success": False,
"error": str(e),
"contingency_result": contingency_result,
"execution_time": time.time() - action_start_time,
}
# 记录行动步骤
step = ReasoningStep(
phase=ReasoningPhase.ACTION,
input_data=planning_data,
reasoning_content=f"执行行动计划:{execution_plan.get('description', '')}",
action_plan=execution_plan,
execution_result=action_result,
)
self.reasoning_history.append(step)
return action_result
async def _observation_phase(self, action_data: Dict[str, Any]) -> Dict[str, Any]:
"""
观察阶段:分析行动结果和环境变化
观察内容:
1. 行动结果分析
2. 环境状态变化
3. 新信息获取
4. 异常情况识别
"""
logger.debug("观察阶段开始,分析行动结果")
# 分析行动结果
result_analysis = await self._analyze_action_result(action_data)
# 检测环境变化
environment_changes = await self._detect_environment_changes()
# 提取新获得的信息
new_information = await self._extract_new_information(
action_data, environment_changes
)
# 识别异常或意外情况
anomalies = await self._detect_anomalies(action_data, environment_changes)
observation_result = {
"result_analysis": result_analysis,
"environment_changes": environment_changes,
"new_information": new_information,
"anomalies": anomalies,
"observation_quality": self._assess_observation_quality(result_analysis),
}
# 记录观察步骤
step = ReasoningStep(
phase=ReasoningPhase.OBSERVATION,
input_data=action_data,
reasoning_content=f"观察到{len(new_information)}条新信息,{len(anomalies)}个异常情况",
)
self.reasoning_history.append(step)
return observation_result
async def _reflection_phase(self, observation_data: Dict[str, Any]) -> Dict[str, Any]:
"""
反思阶段:评估进展和优化策略
反思内容:
1. 目标达成度评估
2. 策略有效性分析
3. 学习经验提取
4. 下一步行动建议
"""
logger.debug("反思阶段开始,评估整体进展")
# 构建反思提示词
reflection_prompt = self._build_reflection_prompt(observation_data)
# 调用LLM进行反思
reflection_response = await self.llm_client.generate(
prompt=reflection_prompt,
max_tokens=800,
temperature=0.4,
)
# 解析反思结果
reflection_analysis = self._parse_reflection_response(reflection_response)
# 评估目标完成状态
goal_status = await self._evaluate_goal_status(
self.goal_stack[-1] if self.goal_stack else "",
observation_data,
reflection_analysis
)
# 生成学习经验
learning_experience = await self._extract_learning_experience(
observation_data, reflection_analysis
)
reflection_result = {
"reflection_analysis": reflection_analysis,
"goal_status": goal_status,
"learning_experience": learning_experience,
"goal_achieved": goal_status.get("achieved", False),
"goal_failed": goal_status.get("failed", False),
"should_retry": goal_status.get("should_retry", False),
"next_action_suggestion": reflection_analysis.get("next_action", ""),
}
# 记录反思步骤
step = ReasoningStep(
phase=ReasoningPhase.REFLECTION,
input_data=observation_data,
reasoning_content=reflection_response,
reflection_notes=reflection_analysis.get("key_insights", ""),
)
self.reasoning_history.append(step)
return reflection_result
def _build_analysis_prompt(self, perception_data: Dict[str, Any]) -> str:
"""构建分析阶段的提示词"""
goal = perception_data["goal"]
environment = perception_data["environment_state"]
tools = perception_data["available_tools"]
memories = perception_data["relevant_memories"]
return f"""
请深度分析以下任务:
目标:{goal}
当前环境:
{self._format_environment_state(environment)}
可用工具:
{self._format_tool_list(tools)}
相关经验:
{self._format_memory_list(memories)}
请从以下维度进行分析:
1. 问题本质:这个目标的核心问题是什么?
2. 复杂度评估:任务的复杂程度如何?需要多少步骤?
3. 关键挑战:执行过程中可能遇到的主要困难?
4. 成功要素:完成任务需要的关键因素?
5. 风险评估:可能的失败原因和风险点?
分析结果请结构化输出:
问题本质:[分析内容]
复杂度:[1-10分] - [说明]
关键挑战:[挑战1]、[挑战2]...
成功要素:[要素1]、[要素2]...
风险评估:[风险1]、[风险2]...
"""
def _build_reflection_prompt(self, observation_data: Dict[str, Any]) -> str:
"""构建反思阶段的提示词"""
result_analysis = observation_data["result_analysis"]
new_info = observation_data["new_information"]
anomalies = observation_data["anomalies"]
return f"""
请反思刚才的行动结果:
行动结果分析:
{self._format_result_analysis(result_analysis)}
新获得的信息:
{self._format_information_list(new_info)}
发现的异常:
{self._format_anomaly_list(anomalies)}
推理历史:
{self._format_reasoning_history()}
请从以下角度进行反思:
1. 效果评估:行动是否达到了预期效果?
2. 策略分析:当前策略是否合适?需要调整吗?
3. 学习收获:从这次行动中学到了什么?
4. 问题识别:发现了什么新问题或挑战?
5. 下一步建议:接下来应该怎么做?
反思结果:
效果评估:[评估内容]
策略分析:[分析内容]
学习收获:[收获内容]
问题识别:[问题内容]
下一步建议:[建议内容]
"""
async def _execute_primary_action(
self,
execution_plan: Dict[str, Any],
execution_context: Dict[str, Any]
) -> Dict[str, Any]:
"""执行主要行动"""
tool_name = execution_plan.get("primary_tool")
if not tool_name:
raise ValueError("执行计划中缺少主要工具")
# 获取工具实例
tool = await self.tool_registry.get_tool(tool_name)
if not tool:
raise ValueError(f"工具 {tool_name} 不可用")
# 准备工具参数
tool_params = execution_plan.get("tool_parameters", {})
# 执行工具调用
logger.info(f"执行主要行动:使用工具 {tool_name}")
start_time = time.time()
try:
result = await tool.execute(
parameters=tool_params,
context=execution_context
)
execution_time = time.time() - start_time
return {
"success": True,
"tool_name": tool_name,
"result": result,
"execution_time": execution_time,
"parameters_used": tool_params,
}
except Exception as e:
execution_time = time.time() - start_time
return {
"success": False,
"tool_name": tool_name,
"error": str(e),
"execution_time": execution_time,
"parameters_used": tool_params,
}
async def _generate_reasoning_summary(self) -> Dict[str, Any]:
"""生成推理过程总结"""
if not self.reasoning_history:
return {"summary": "无推理历史"}
# 统计各阶段的步骤数
phase_counts = {}
for step in self.reasoning_history:
phase = step.phase.value
phase_counts[phase] = phase_counts.get(phase, 0) + 1
# 提取关键决策点
key_decisions = [
step for step in self.reasoning_history
if step.phase in [ReasoningPhase.PLANNING, ReasoningPhase.ACTION]
]
# 分析推理质量
reasoning_quality = await self._assess_reasoning_quality()
# 提取学习经验
lessons_learned = await self._extract_lessons_learned()
return {
"total_steps": len(self.reasoning_history),
"phase_distribution": phase_counts,
"key_decisions": [
{
"step_id": step.step_id,
"phase": step.phase.value,
"content": step.reasoning_content[:100] + "..." if len(step.reasoning_content) > 100 else step.reasoning_content,
"timestamp": step.timestamp,
}
for step in key_decisions
],
"reasoning_quality": reasoning_quality,
"lessons_learned": lessons_learned,
"performance_metrics": self._calculate_performance_metrics(),
}
def _parse_analysis_response(self, response: str) -> Dict[str, Any]:
"""解析LLM分析响应"""
import re
# 提取结构化信息
patterns = {
"problem_essence": r"问题本质:(.+?)(?=复杂度:|$)",
"complexity": r"复杂度:(\d+).*?-\s*(.+?)(?=关键挑战:|$)",
"challenges": r"关键挑战:(.+?)(?=成功要素:|$)",
"success_factors": r"成功要素:(.+?)(?=风险评估:|$)",
"risk_assessment": r"风险评估:(.+?)$",
}
parsed_result = {}
for key, pattern in patterns.items():
match = re.search(pattern, response, re.DOTALL)
if match:
if key == "complexity":
parsed_result["complexity_score"] = int(match.group(1)) / 10.0
parsed_result["complexity_description"] = match.group(2).strip()
else:
parsed_result[key] = match.group(1).strip()
return parsed_result
def _parse_reflection_response(self, response: str) -> Dict[str, Any]:
"""解析反思响应"""
import re
patterns = {
"effectiveness": r"效果评估:(.+?)(?=策略分析:|$)",
"strategy_analysis": r"策略分析:(.+?)(?=学习收获:|$)",
"learning_gains": r"学习收获:(.+?)(?=问题识别:|$)",
"problem_identification": r"问题识别:(.+?)(?=下一步建议:|$)",
"next_action": r"下一步建议:(.+?)$",
}
parsed_result = {}
for key, pattern in patterns.items():
match = re.search(pattern, response, re.DOTALL)
if match:
parsed_result[key] = match.group(1).strip()
# 提取关键洞察
insights = []
if parsed_result.get("learning_gains"):
insights.extend(
insight.strip()
for insight in parsed_result["learning_gains"].split("、")
if insight.strip()
)
parsed_result["key_insights"] = insights
return parsed_result
def _initialize_prompt_templates(self) -> Dict[str, str]:
"""初始化提示模板"""
return {
"system": """
你是一个高级AI代理,具备强大的推理和行动能力。
核心能力:
1. 深度分析:理解问题本质,识别关键要素
2. 策略规划:制定合理可行的执行计划
3. 工具使用:熟练使用各种工具完成任务
4. 学习适应:从经验中学习,持续优化策略
工作原则:
- 逻辑清晰:每一步推理都要有明确的逻辑依据
- 行动高效:选择最有效的工具和方法
- 学习成长:从成功和失败中都要提取经验
- 用户导向:始终以用户目标为中心
请严格按照ReAct框架进行推理和行动。
""",
"decomposition": """
任务分解原则:
1. 子任务应该相对独立,可以并行或串行执行
2. 每个子任务都有明确的输入、输出和成功标准
3. 子任务的粒度要合适,不能过于细碎或宽泛
4. 考虑依赖关系和执行顺序
分解策略:
- 功能性分解:按功能模块划分
- 时序性分解:按执行时间顺序划分
- 资源性分解:按所需资源类型划分
- 复杂性分解:按难度级别划分
""",
}
|