Qwen-Agent 对外 API 深入分析

API 总览

Qwen-Agent 提供了多种接口类型,包括 HTTP API、Python API、命令行接口和 Web UI 接口。

接口分类

协议类型 端口 用途 主要功能
HTTP API 7866 数据服务 文件缓存、浏览历史管理
Gradio Web UI 7863, 7864 用户界面 聊天界面、工作站界面
Python API - 编程接口 Agent 实例化、工具调用
命令行 - 脚本接口 服务启动、配置管理

1. HTTP API 接口

1.1 数据库服务 API

服务端点: qwen_server/database_server.py
端口: 7866
协议: HTTP POST

API: /endpoint (POST)

入口函数: web_listening (qwen_server/database_server.py:116-133)

@app.post('/endpoint')
async def web_listening(request: Request):
    """
    统一的HTTP API端点,处理多种类型的请求
    
    支持的任务类型:
    - change_checkbox: 更改文档选择状态
    - cache: 缓存网页或文档内容
    - pop_url: 更新当前访问的URL
    """
    data = await request.json()
    msg_type = data['task']

    if msg_type == 'change_checkbox':
        rsp = change_checkbox_state(data['ckid'])
    elif msg_type == 'cache':
        cache_obj = multiprocessing.Process(target=cache_page, kwargs=data)
        cache_obj.start()
        rsp = 'caching'
    elif msg_type == 'pop_url':
        rsp = update_pop_url(data['url'])
    else:
        raise NotImplementedError

    return JSONResponse(content=rsp)

调用链路分析

  1. change_checkbox 任务

    def change_checkbox_state(key):
        """更改文档的选中状态"""
        with open(meta_file, 'r', encoding='utf-8') as file:
            meta_info = json.load(file)
        meta_info[key[3:]]['checked'] = (not meta_info[key[3:]]['checked'])
        with open(meta_file, 'w', encoding='utf-8') as file:
            json.dump(meta_info, file, indent=4)
        return {'result': 'changed'}
    
  2. cache 任务

    def cache_page(**kwargs):
        """缓存页面内容到本地并进行向量化处理"""
        url = kwargs.get('url', '')
        page_content = kwargs.get('content', '')
    
        if page_content and not get_file_type(url) in ['pdf', 'docx', 'pptx', 'txt']:
            # 保存到本地文件系统
            os.makedirs(os.path.join(server_config.path.download_root, hash_sha256(url)), exist_ok=True)
            url = os.path.join(server_config.path.download_root, hash_sha256(url), get_basename_from_url(url))
            save_browsing_meta_data(url, '[CACHING]', meta_file)
            save_history(None, url, history_dir)
            save_text_to_file(url, page_content)
        else:
            save_browsing_meta_data(url, '[CACHING]', meta_file)
            save_history(None, url, history_dir)
    
        try:
            # 使用Memory模块进行文档处理和向量化
            *_, last = mem.run([{'role': 'user', 'content': [{'file': url}]}])
            title = get_basename_from_url(url)
            save_browsing_meta_data(url, title, meta_file)
        except Exception:
            rm_browsing_meta_data(url, meta_file)
    
  3. pop_url 任务

    def update_pop_url(url: str):
        """更新当前访问的URL"""
        if not get_file_type(url) in ['pdf', 'docx', 'pptx', 'txt']:
            url = os.path.join(server_config.path.download_root, hash_sha256(url), get_basename_from_url(url))
        new_line = {'url': url}
    
        with jsonlines.open(cache_file_popup_url, mode='w') as writer:
            writer.write(new_line)
    
        return 'Update URL'
    

1.2 静态文件服务

路径: /static
功能: 提供代码解释器生成的图片、文件等静态资源

app.mount('/static', StaticFiles(directory=server_config.path.code_interpreter_ws), name='static')

2. Python API 接口

2.1 Agent 核心 API

2.1.1 Agent 基类 (qwen_agent/agent.py)

核心接口

class Agent(ABC):
    def __init__(self, function_list=None, llm=None, system_message=None, name=None, description=None):
        """
        智能体初始化
        
        Args:
            function_list: 工具列表
            llm: 大语言模型配置
            system_message: 系统消息
            name: 智能体名称
            description: 智能体描述
        """
    
    def run(self, messages: List[Union[Dict, Message]], **kwargs) -> Iterator[List[Message]]:
        """
        运行智能体,返回流式响应
        
        Args:
            messages: 对话消息列表
            **kwargs: 额外参数
            
        Yields:
            Iterator[List[Message]]: 流式响应消息
        """
    
    def run_nonstream(self, messages: List[Union[Dict, Message]], **kwargs) -> List[Message]:
        """
        运行智能体,返回完整响应
        
        Returns:
            List[Message]: 完整响应消息列表
        """

调用链路

Agent.run() 
  ├── 消息类型转换和验证
  ├── 系统消息注入
  ├── _run() [子类实现]
     ├── _call_llm() - LLM调用
     └── _call_tool() - 工具调用
  └── 响应格式化和流式输出

2.1.2 Assistant 智能体 (qwen_agent/agents/assistant.py)

核心功能

class Assistant(FnCallAgent):
    """集成RAG能力和函数调用能力的通用智能体"""
    
    def __init__(self, function_list=None, llm=None, system_message=None, 
                 name=None, description=None, files=None, rag_cfg=None):
        """
        Args:
            files: 知识库文件列表
            rag_cfg: RAG配置参数
        """
    
    def _run(self, messages: List[Message], lang: str = 'en', 
             max_ref_token: int = 4000, **kwargs) -> Iterator[List[Message]]:
        """
        执行流程:
        1. 文档检索和知识注入
        2. 函数调用处理
        3. 响应生成
        """

关键函数分析

  1. 知识检索和注入
def _run(self, messages: List[Message], lang: str = 'en', 
         max_ref_token: int = 4000, **kwargs) -> Iterator[List[Message]]:
    # 提取文件内容
    files = []
    for msg in messages:
        if msg.role == ROLE.USER:
            if isinstance(msg.content, list):
                for item in msg.content:
                    if isinstance(item, ContentItem) and item.file:
                        files.append(item.file)
    
    # 检索相关文档
    if files:
        try:
            *_, last = self.mem.run(messages=messages, max_ref_token=max_ref_token, **kwargs)
            if last:
                knowledge = format_knowledge_to_source_and_content(last[-1].content)
                if knowledge:
                    # 注入知识到系统消息
                    knowledge_str = self._format_knowledge_to_str(knowledge, lang=lang)
                    messages = self._prepend_knowledge_prompt(messages, knowledge_str, lang=lang)
        except Exception as ex:
            logger.warning(f'RAG failed: {ex}')
    
    # 调用父类的函数调用逻辑
    yield from super()._run(messages, lang=lang, **kwargs)
  1. 知识格式化
def _format_knowledge_to_str(self, knowledge: List[dict], lang: Literal['en', 'zh'] = 'en') -> str:
    """将检索到的知识格式化为字符串"""
    knowledge_str = ''
    for k in knowledge:
        knowledge_str += KNOWLEDGE_SNIPPET[lang].format(source=k['source'], content=k['content']) + '\n\n'
    return KNOWLEDGE_TEMPLATE[lang].format(knowledge=knowledge_str)

def _prepend_knowledge_prompt(self, messages: List[Message], knowledge_str: str, 
                            lang: Literal['en', 'zh'] = 'en') -> List[Message]:
    """将知识注入到消息中"""
    messages = copy.deepcopy(messages)
    if messages and messages[0].role == SYSTEM:
        if isinstance(messages[0].content, str):
            messages[0].content = messages[0].content + '\n\n' + knowledge_str
        else:
            messages[0].content.append(ContentItem(text=knowledge_str))
    else:
        messages.insert(0, Message(role=SYSTEM, content=knowledge_str))
    return messages

2.1.3 FnCallAgent 函数调用智能体

核心逻辑

class FnCallAgent(Agent):
    def _run(self, messages: List[Message], lang: str = 'en', **kwargs) -> Iterator[List[Message]]:
        """
        函数调用执行流程:
        1. 准备函数列表
        2. 调用LLM生成响应
        3. 检测和执行工具调用
        4. 处理工具结果
        5. 继续对话或结束
        """
        messages = copy.deepcopy(messages)
        
        # 准备函数定义
        functions = [func.function for func in self.function_map.values()] if self.function_map else None
        
        max_turn = 10  # 最大工具调用轮次
        while max_turn > 0:
            max_turn -= 1
            
            # 调用LLM
            response = []
            for response in self._call_llm(messages=messages, functions=functions, **kwargs):
                yield response
            
            if not response:
                return
            
            # 检测是否需要调用工具
            need_call_tool, func_name, func_args, text_reply = self._detect_tool(response[-1])
            
            if need_call_tool:
                # 执行工具调用
                try:
                    func_result = self._call_tool(func_name, func_args, **kwargs)
                    
                    # 添加工具结果到对话历史
                    messages.extend(response)
                    messages.append(Message(
                        role=ROLE.TOOL,
                        content=func_result,
                        name=func_name
                    ))
                    
                except Exception as ex:
                    # 工具调用失败处理
                    error_msg = f"Tool {func_name} failed: {str(ex)}"
                    messages.extend(response)
                    messages.append(Message(role=ROLE.TOOL, content=error_msg, name=func_name))
            else:
                # 无需工具调用,结束
                return

2.2 LLM API 接口

2.2.1 LLM 基类 (qwen_agent/llm/base.py)

class BaseChatModel(ABC):
    def __init__(self, model: str, api_key: str = '', model_server: str = ''):
        """
        LLM基类初始化
        
        Args:
            model: 模型名称
            api_key: API密钥
            model_server: 模型服务地址
        """
    
    @abstractmethod
    def chat(self, messages: List[Message], functions: List[Dict] = None, 
             stream: bool = True, extra_generate_cfg: dict = None) -> Iterator[List[Message]]:
        """
        聊天接口
        
        Args:
            messages: 对话消息
            functions: 可用函数列表
            stream: 是否流式输出
            extra_generate_cfg: 额外生成配置
            
        Yields:
            Iterator[List[Message]]: 响应消息流
        """

2.2.2 DashScope LLM (qwen_agent/llm/qwen_dashscope.py)

关键实现

class QwenChatAtDS(BaseChatModel):
    def chat(self, messages: List[Message], functions: List[Dict] = None, 
             stream: bool = True, extra_generate_cfg: dict = None) -> Iterator[List[Message]]:
        """
        DashScope API调用实现
        """
        # 消息格式转换
        dashscope_messages = []
        for msg in messages:
            if msg.role == ROLE.TOOL:
                # 工具结果消息处理
                dashscope_messages.append({
                    'role': 'tool',
                    'content': msg.content,
                    'name': msg.name
                })
            else:
                # 普通消息处理
                dashscope_messages.append({
                    'role': msg.role,
                    'content': self._convert_content_to_str(msg.content)
                })
        
        # 构建请求参数
        params = {
            'model': self.model,
            'messages': dashscope_messages,
            'stream': stream,
            **extra_generate_cfg
        }
        
        if functions:
            params['tools'] = [{'type': 'function', 'function': func} for func in functions]
        
        # 调用DashScope API
        try:
            response = dashscope.Generation.call(**params)
            
            if stream:
                # 流式响应处理
                for chunk in response:
                    if chunk.status_code == HTTPStatus.OK:
                        yield self._parse_stream_response(chunk)
                    else:
                        raise ModelServiceError(f"DashScope API error: {chunk.message}")
            else:
                # 非流式响应处理
                if response.status_code == HTTPStatus.OK:
                    yield self._parse_response(response)
                else:
                    raise ModelServiceError(f"DashScope API error: {response.message}")
                    
        except Exception as e:
            raise ModelServiceError(f"DashScope request failed: {str(e)}")

2.3 工具系统 API

2.3.1 工具基类 (qwen_agent/tools/base.py)

class BaseTool(ABC):
    name: str = ''
    description: str = ''
    parameters: List[Dict] = []
    
    @abstractmethod
    def call(self, params: str, **kwargs) -> Union[str, List[ContentItem]]:
        """
        工具调用接口
        
        Args:
            params: JSON格式的参数字符串
            **kwargs: 额外参数
            
        Returns:
            Union[str, List[ContentItem]]: 工具执行结果
        """
        raise NotImplementedError
    
    @property
    def function(self) -> Dict:
        """返回OpenAI函数调用格式的工具定义"""
        return {
            'name': self.name,
            'description': self.description,
            'parameters': {
                'type': 'object',
                'properties': {param['name']: {
                    'type': param['type'],
                    'description': param['description']
                } for param in self.parameters},
                'required': [param['name'] for param in self.parameters if param.get('required', False)]
            }
        }

def register_tool(name: str):
    """工具注册装饰器"""
    def decorator(cls):
        cls.name = name
        TOOL_REGISTRY[name] = cls
        return cls
    return decorator

2.3.2 代码解释器工具 (qwen_agent/tools/code_interpreter.py)

核心实现

@register_tool('code_interpreter')
class CodeInterpreter(BaseTool):
    description = 'Python代码解释器,可以运行Python代码并返回结果'
    parameters = [{
        'name': 'code',
        'type': 'string',
        'description': '要执行的Python代码',
        'required': True
    }]
    
    def call(self, params: str, **kwargs) -> str:
        """
        执行Python代码
        
        调用链路:
        1. 解析代码参数
        2. 启动Jupyter内核
        3. 执行代码
        4. 处理执行结果
        5. 返回格式化输出
        """
        import json5
        
        # 解析参数
        try:
            args = json5.loads(params)
            code = args['code']
        except Exception as e:
            return f"参数解析失败: {str(e)}"
        
        # 执行代码
        try:
            result = self._execute_code(code)
            return self._format_result(result)
        except Exception as e:
            return f"代码执行失败: {str(e)}"
    
    def _execute_code(self, code: str) -> Dict:
        """
        在Jupyter内核中执行代码
        """
        # 获取或创建内核
        kernel = self._get_kernel()
        
        # 执行代码
        msg_id = kernel.execute(code)
        
        # 收集执行结果
        result = {
            'stdout': '',
            'stderr': '',
            'display_data': [],
            'error': None
        }
        
        while True:
            try:
                msg = kernel.get_iopub_msg(timeout=self.timeout)
                msg_type = msg['header']['msg_type']
                
                if msg_type == 'stream':
                    # 标准输出/错误
                    stream_name = msg['content']['name']
                    text = msg['content']['text']
                    result[stream_name] += text
                    
                elif msg_type == 'display_data':
                    # 显示数据(图片、表格等)
                    result['display_data'].append(msg['content']['data'])
                    
                elif msg_type == 'error':
                    # 执行错误
                    result['error'] = {
                        'ename': msg['content']['ename'],
                        'evalue': msg['content']['evalue'],
                        'traceback': msg['content']['traceback']
                    }
                    break
                    
                elif msg_type == 'execute_reply':
                    # 执行完成
                    break
                    
            except Exception:
                break
        
        return result

3. Web UI 接口

3.1 Gradio 界面 (qwen_agent/gui/web_ui.py)

核心接口

class WebUI:
    def __init__(self, agent: Agent, chatbot_config: dict = None):
        """
        Web UI初始化
        
        Args:
            agent: 智能体实例
            chatbot_config: 聊天机器人配置
        """
        self.agent = agent
        self.config = chatbot_config or {}
    
    def run(self, server_name: str = '127.0.0.1', server_port: int = 7860, 
            share: bool = False, **kwargs):
        """
        启动Web界面
        
        Args:
            server_name: 服务器地址
            server_port: 服务器端口
            share: 是否创建公网链接
        """
        # 创建Gradio界面
        with gr.Blocks(css=self._get_css(), theme='soft') as demo:
            # 聊天界面组件
            chatbot = gr.Chatbot(
                elem_id='chatbot',
                height=600,
                avatar_images=[None, get_avatar_image('qwen')]
            )
            
            # 输入组件
            with gr.Row():
                msg_textbox = gr.Textbox(
                    placeholder='与Qwen对话...',
                    container=False,
                    scale=7
                )
                submit_btn = gr.Button('发送', scale=1)
                clear_btn = gr.Button('清除', scale=1)
            
            # 文件上传
            file_upload = gr.File(
                label='上传文件',
                file_types=['pdf', 'docx', 'txt', 'md']
            )
            
            # 事件绑定
            submit_btn.click(
                self._chat_fn,
                inputs=[chatbot, msg_textbox, file_upload],
                outputs=[chatbot, msg_textbox]
            )
            
            clear_btn.click(
                lambda: ([], ''),
                outputs=[chatbot, msg_textbox]
            )
        
        # 启动服务
        demo.queue().launch(
            server_name=server_name,
            server_port=server_port,
            share=share,
            **kwargs
        )
    
    def _chat_fn(self, history, message, files):
        """
        聊天处理函数
        
        调用链路:
        1. 构建消息格式
        2. 调用智能体
        3. 流式更新界面
        4. 返回结果
        """
        # 构建消息
        messages = []
        for human, assistant in history:
            messages.append({'role': 'user', 'content': human})
            if assistant:
                messages.append({'role': 'assistant', 'content': assistant})
        
        # 添加当前消息
        content = [{'text': message}]
        if files:
            for file in files:
                content.append({'file': file.name})
        messages.append({'role': 'user', 'content': content})
        
        # 调用智能体
        history.append([message, ''])
        
        try:
            for response in self.agent.run(messages):
                if response:
                    history[-1][1] = response[-1]['content']
                    yield history, ''
        except Exception as e:
            history[-1][1] = f'错误: {str(e)}'
            yield history, ''

4. 命令行接口

4.1 服务启动 (run_server.py)

主要功能

def main():
    """
    服务启动主函数
    
    功能:
    1. 解析命令行参数
    2. 更新配置文件
    3. 启动多个服务进程
    4. 信号处理和优雅关闭
    """
    # 解析参数
    args = parse_args()
    
    # 更新配置
    server_config = update_config(server_config, args, server_config_path)
    
    # 启动服务进程
    servers = {
        'database': subprocess.Popen([
            sys.executable,
            os.path.join(os.getcwd(), 'qwen_server/database_server.py'),
        ]),
        'workstation': subprocess.Popen([
            sys.executable,
            os.path.join(os.getcwd(), 'qwen_server/workstation_server.py'),
        ]),
        'assistant': subprocess.Popen([
            sys.executable,
            os.path.join(os.getcwd(), 'qwen_server/assistant_server.py'),
        ]),
    }
    
    # 信号处理
    def signal_handler(sig_num, _frame):
        for v in servers.values():
            v.terminate()
        for k in list(servers.keys()):
            del servers[k]
        if sig_num == signal.SIGINT:
            raise KeyboardInterrupt()
    
    append_signal_handler(signal.SIGINT, signal_handler)
    append_signal_handler(signal.SIGTERM, signal_handler)
    
    # 等待所有进程
    for p in list(servers.values()):
        p.wait()

def parse_args():
    """解析命令行参数"""
    parser = argparse.ArgumentParser()
    parser.add_argument('-m', '--model_server', type=str, default='dashscope',
                       help='模型服务地址')
    parser.add_argument('-k', '--api_key', type=str, default='',
                       help='API密钥')
    parser.add_argument('-l', '--llm', type=str, default='qwen-plus',
                       help='模型名称')
    parser.add_argument('-s', '--server_host', type=str, default='127.0.0.1',
                       choices=['127.0.0.1', '0.0.0.0'],
                       help='服务器地址')
    parser.add_argument('-t', '--max_ref_token', type=int, default=4000,
                       help='RAG最大参考token数')
    parser.add_argument('-w', '--workstation_port', type=int, default=7864,
                       help='工作站端口')
    return parser.parse_args()

5. API 调用时序图

5.1 完整对话流程

sequenceDiagram
    participant Client as 客户端
    participant WebUI as Web UI
    participant Agent as 智能体
    participant LLM as 大语言模型
    participant Tool as 工具系统
    participant Memory as 记忆系统

    Client->>WebUI: 发送消息
    WebUI->>Agent: 调用 run()
    
    Agent->>Memory: 检索相关文档
    Memory-->>Agent: 返回知识片段
    
    Agent->>Agent: 注入知识到系统消息
    Agent->>LLM: 调用 chat()
    
    LLM-->>Agent: 返回响应(可能包含工具调用)
    
    alt 需要工具调用
        Agent->>Tool: 调用工具
        Tool-->>Agent: 返回工具结果
        Agent->>LLM: 继续对话
        LLM-->>Agent: 返回最终响应
    end
    
    Agent-->>WebUI: 流式返回响应
    WebUI-->>Client: 更新界面

5.2 HTTP API 调用流程

sequenceDiagram
    participant Browser as 浏览器扩展
    participant DatabaseAPI as 数据库API
    participant Memory as 记忆模块
    participant FileSystem as 文件系统

    Browser->>DatabaseAPI: POST /endpoint (cache任务)
    DatabaseAPI->>FileSystem: 保存页面内容
    DatabaseAPI->>Memory: 处理文档向量化
    Memory-->>DatabaseAPI: 返回处理结果
    DatabaseAPI-->>Browser: 返回缓存状态

    Browser->>DatabaseAPI: POST /endpoint (pop_url任务)
    DatabaseAPI->>FileSystem: 更新当前URL
    DatabaseAPI-->>Browser: 返回更新状态

6. 关键函数详细分析

6.1 消息处理流程

Agent.run() 核心逻辑

def run(self, messages: List[Union[Dict, Message]], **kwargs) -> Iterator[List[Message]]:
    """
    智能体运行的核心入口
    
    处理流程:
    1. 消息格式统一化
    2. 语言检测
    3. 系统消息注入
    4. 调用子类_run方法
    5. 响应格式化
    """
    messages = copy.deepcopy(messages)
    _return_message_type = 'dict'
    new_messages = []
    
    # 消息格式转换
    if not messages:
        _return_message_type = 'message'
    for msg in messages:
        if isinstance(msg, dict):
            new_messages.append(Message(**msg))
        else:
            new_messages.append(msg)
            _return_message_type = 'message'
    
    # 语言检测
    if 'lang' not in kwargs:
        if has_chinese_messages(new_messages):
            kwargs['lang'] = 'zh'
        else:
            kwargs['lang'] = 'en'
    
    # 系统消息注入
    if self.system_message:
        if not new_messages or new_messages[0][ROLE] != SYSTEM:
            new_messages.insert(0, Message(role=SYSTEM, content=self.system_message))
        else:
            # 合并系统消息
            if isinstance(new_messages[0][CONTENT], str):
                new_messages[0][CONTENT] = self.system_message + '\n\n' + new_messages[0][CONTENT]
            else:
                new_messages[0][CONTENT] = [ContentItem(text=self.system_message + '\n\n')] + new_messages[0][CONTENT]
    
    # 调用具体实现
    for rsp in self._run(messages=new_messages, **kwargs):
        for i in range(len(rsp)):
            if not rsp[i].name and self.name:
                rsp[i].name = self.name
        
        # 返回格式转换
        if _return_message_type == 'message':
            yield [Message(**x) if isinstance(x, dict) else x for x in rsp]
        else:
            yield [x.model_dump() if not isinstance(x, dict) else x for x in rsp]

6.2 工具调用检测

_detect_tool() 实现

def _detect_tool(self, message: Message) -> Tuple[bool, str, str, str]:
    """
    检测消息中是否包含工具调用
    
    Args:
        message: LLM生成的消息
        
    Returns:
        Tuple[bool, str, str, str]: (是否需要调用工具, 工具名, 工具参数, 文本回复)
    """
    func_name = None
    func_args = None

    # 检查function_call字段
    if message.function_call:
        func_call = message.function_call
        func_name = func_call.name
        func_args = func_call.arguments
    
    # 获取文本内容
    text = message.content
    if not text:
        text = ''

    return (func_name is not None), func_name, func_args, text

6.3 工具执行

_call_tool() 实现

def _call_tool(self, tool_name: str, tool_args: Union[str, dict] = '{}', **kwargs) -> Union[str, List[ContentItem]]:
    """
    执行工具调用
    
    Args:
        tool_name: 工具名称
        tool_args: 工具参数
        **kwargs: 额外参数
        
    Returns:
        Union[str, List[ContentItem]]: 工具执行结果
    """
    # 检查工具是否存在
    if tool_name not in self.function_map:
        return f'Tool {tool_name} does not exists.'
    
    tool = self.function_map[tool_name]
    
    try:
        # 执行工具
        tool_result = tool.call(tool_args, **kwargs)
    except (ToolServiceError, DocParserError) as ex:
        # 重新抛出已知异常
        raise ex
    except Exception as ex:
        # 处理未知异常
        exception_type = type(ex).__name__
        exception_message = str(ex)
        traceback_info = ''.join(traceback.format_tb(ex.__traceback__))
        error_message = f'An error occurred when calling tool `{tool_name}`:\n' \
                        f'{exception_type}: {exception_message}\n' \
                        f'Traceback:\n{traceback_info}'
        logger.warning(error_message)
        return error_message

    # 格式化返回结果
    if isinstance(tool_result, str):
        return tool_result
    elif isinstance(tool_result, list) and all(isinstance(item, ContentItem) for item in tool_result):
        return tool_result  # 多模态工具结果
    else:
        return json.dumps(tool_result, ensure_ascii=False, indent=4)

7. 性能和扩展性分析

7.1 并发处理

  • 多进程架构: 数据库服务、工作站服务、助手服务独立运行
  • 异步处理: FastAPI + uvicorn 支持高并发HTTP请求
  • 流式输出: 支持实时响应,提升用户体验

7.2 缓存机制

  • 文档缓存: 浏览的网页和文档本地缓存
  • 向量缓存: 文档向量化结果缓存
  • 对话历史: 会话状态持久化

7.3 扩展接口

  • 工具注册: 通过装饰器轻松添加新工具
  • LLM适配: 统一接口支持多种LLM提供商
  • 智能体继承: 基于基类快速开发专用智能体

这个API分析涵盖了Qwen-Agent项目的所有对外接口,包括详细的调用链路、关键函数实现和时序图,为开发者提供了全面的技术参考。 """ 统一的 HTTP 端点处理函数

设计目的: 处理来自浏览器扩展和前端的各种请求
作用域: 全局请求路由和分发
依赖前置条件: FastAPI 应用已启动,CORS 中间件已配置
后置条件: 返回 JSON 响应给客户端
复杂度: O(1) - 简单的条件分支
"""
data = await request.json()  # 解析 JSON 请求体
msg_type = data['task']      # 获取任务类型

if msg_type == 'change_checkbox':
    # 处理复选框状态变更:用于浏览历史的选择/取消选择
    rsp = change_checkbox_state(data['ckid'])
elif msg_type == 'cache':
    # 处理页面缓存请求:异步缓存网页内容用于后续分析
    cache_obj = multiprocessing.Process(target=cache_page, kwargs=data)
    cache_obj.start()  # 启动独立进程避免阻塞主线程
    rsp = 'caching'
elif msg_type == 'pop_url':
    # 处理 URL 添加请求:将新 URL 添加到弹窗列表
    rsp = update_pop_url(data['url'])
else:
    raise NotImplementedError

return JSONResponse(content=rsp)

**调用链路分析**:

| 深度 | / | 函数 | 作用 | 备注 |
|------|-------|------|------|------|
| 0 | `database_server` | `web_listening` | HTTP 请求入口 | 路由分发 |
| 1 | `database_server` | `change_checkbox_state` | 状态管理 | 文件 I/O |
| 1 | `database_server` | `cache_page` | 页面缓存 | 多进程执行 |
| 1 | `database_server` | `update_pop_url` | URL 管理 | JSON Lines 写入 |
| 2 | `qwen_agent.utils` | `save_text_to_file` | 文件保存 | 底层 I/O |

**关键函数详解**:

```python
def update_pop_url(url: str):
    """
    更新弹窗 URL 列表
    
    设计目的: 管理用户通过浏览器扩展添加的 URL
    作用域: 单个 URL 的添加和存储
    依赖前置条件: 工作空间目录已创建
    后置条件: URL 写入到 popup_url.jsonl 文件
    """
    # 检查文件类型,非文档类型需要生成本地路径
    if not get_file_type(url) in ['pdf', 'docx', 'pptx', 'txt']:
        url = os.path.join(
            server_config.path.download_root, 
            hash_sha256(url), 
            get_basename_from_url(url)
        )
    
    new_line = {'url': url}
    
    # 使用 JSON Lines 格式存储,便于追加和读取
    with jsonlines.open(cache_file_popup_url, mode='w') as writer:
        writer.write(new_line)
    
    return 'Update URL'
def cache_page(**kwargs):
    """
    异步缓存页面内容
    
    设计目的: 在后台处理网页内容的下载和解析
    作用域: 单个网页的完整处理流程
    并发策略: 使用多进程避免阻塞主线程
    错误处理: 异常时清理元数据,防止状态不一致
    """
    url = kwargs.get('url')
    title = kwargs.get('title', '')
    
    try:
        # 保存浏览元数据,标记为缓存中状态
        save_browsing_meta_data(url, title, meta_file)
        
        # 使用 Memory 模块处理文件内容
        mem = Memory()
        *_, last = mem.run([{'role': 'user', 'content': [{'file': url}]}])
        
        # 更新元数据为完成状态
        save_browsing_meta_data(url, title, meta_file, status='completed')
        
    except Exception:
        # 清理失败的元数据记录
        rm_browsing_meta_data(url, meta_file)

1.2 静态文件服务

路径: /static
功能: 提供代码解释器生成的文件访问

app.mount('/static', StaticFiles(directory=server_config.path.code_interpreter_ws), name='static')

2. Python API 接口

2.1 Agent 核心接口

API: Agent.run() 方法

入口函数: run (qwen_agent/agent.py:78-131)

def run(self, messages: List[Union[Dict, Message]], **kwargs) -> Iterator[List[Message]]:
    """
    智能体核心执行方法
    
    设计目的: 提供统一的消息处理和响应生成接口
    作用域: 所有智能体类型的通用入口点
    依赖前置条件: LLM 配置正确,工具已注册
    后置条件: 生成流式响应消息
    复杂度: 取决于具体智能体实现,通常 O(n*m) n=消息数,m=工具调用次数
    """
    # 消息格式标准化:统一转换为 Message 对象
    new_messages = []
    _return_message_type = 'dict'
    for msg in messages:
        if isinstance(msg, dict):
            new_messages.append(Message(**msg))
        else:
            new_messages.append(msg)
            _return_message_type = 'message'

    # 语言检测:根据消息内容自动检测中英文
    if 'lang' not in kwargs:
        if has_chinese_messages(new_messages):
            kwargs['lang'] = 'zh'
        else:
            kwargs['lang'] = 'en'

    # 系统消息注入:确保系统指令在消息列表开头
    if self.system_message:
        if not new_messages or new_messages[0][ROLE] != SYSTEM:
            new_messages.insert(0, Message(role=SYSTEM, content=self.system_message))
        else:
            # 合并已有系统消息
            if isinstance(new_messages[0][CONTENT], str):
                new_messages[0][CONTENT] = self.system_message + '\n\n' + new_messages[0][CONTENT]

    # 调用具体智能体的实现逻辑
    for rsp in self._run(messages=new_messages, **kwargs):
        # 设置响应消息的发送者名称
        for i in range(len(rsp)):
            if not rsp[i].name and self.name:
                rsp[i].name = self.name
        
        # 根据输入格式返回对应格式的响应
        if _return_message_type == 'message':
            yield [Message(**x) if isinstance(x, dict) else x for x in rsp]
        else:
            yield [x.model_dump() if not isinstance(x, dict) else x for x in rsp]

调用链路图:

flowchart TD
    A[Agent.run] --> B[消息格式化]
    B --> C[语言检测]
    C --> D[系统消息注入]
    D --> E[调用 _run 方法]
    E --> F{智能体类型}
    F -->|Assistant| G[Assistant._run]
    F -->|FnCallAgent| H[FnCallAgent._run]
    F -->|ReActChat| I[ReActChat._run]
    G --> J[文件处理]
    H --> K[函数调用]
    I --> L[ReAct 推理]
    J --> M[LLM 调用]
    K --> M
    L --> M
    M --> N[响应生成]

API: Assistant.run() 实现

入口函数: _run (qwen_agent/agents/assistant.py:120-180)

def _run(self, messages: List[Message], lang: str = 'en', **kwargs) -> Iterator[List[Message]]:
    """
    Assistant 智能体的核心执行逻辑
    
    设计目的: 实现支持 RAG 和工具调用的通用助手
    作用域: 处理文档问答、工具调用、多轮对话
    并发策略: 同步执行,支持流式输出
    I/O 策略: 文件读取采用缓存机制,减少重复解析
    """
    # 文件内容处理:解析用户上传的文档
    messages = self._format_file(messages, lang=lang)
    
    # 知识检索:如果配置了 RAG,先进行相关内容检索
    if self.rag_cfg and any(self._has_file_content(msg) for msg in messages):
        knowledge = self._retrieve_knowledge(messages, **kwargs)
        messages = self._prepend_knowledge_prompt(messages, lang, knowledge)
    
    # 多轮对话循环:支持工具调用的迭代执行
    num_llm_calls_available = MAX_LLM_CALL_PER_RUN
    response = []
    
    while num_llm_calls_available > 0:
        num_llm_calls_available -= 1
        
        # 调用 LLM 生成响应
        output_stream = self._call_llm(
            messages=messages + response,
            functions=[func.function for func in self.function_map.values()]
        )
        
        # 流式输出处理
        output: List[Message] = []
        for output in output_stream:
            if output:
                yield response + output
        
        if output:
            response.extend(output)
        
        # 工具调用检测和执行
        use_tool, action, action_input, _ = self._detect_tool(response[-1])
        if use_tool:
            # 执行工具调用
            observation = self._call_tool(action, action_input, messages=messages)
            
            # 构造工具执行结果消息
            fn_msg = Message(
                role=FUNCTION,
                name=action,
                content=observation,
            )
            response.append(fn_msg)
            yield response
        else:
            # 无需工具调用,结束循环
            break

2.2 工具系统接口

API: BaseTool.call() 方法

入口函数: call (qwen_agent/tools/base.py:45-55)

def call(self, params: str, **kwargs) -> str:
    """
    工具调用的统一接口
    
    设计目的: 为所有工具提供标准化的调用方式
    作用域: 单个工具的执行逻辑
    依赖前置条件: 参数格式正确,工具已正确初始化
    后置条件: 返回工具执行结果的字符串表示
    错误处理: 子类需要处理参数解析和执行异常
    """
    raise NotImplementedError

API: 代码解释器工具

入口函数: call (qwen_agent/tools/code_interpreter.py:180-220)

def call(self, params: str, **kwargs) -> str:
    """
    代码解释器工具的核心执行方法
    
    设计目的: 在安全的 Jupyter 环境中执行 Python 代码
    作用域: Python 代码的解析、执行和结果收集
    并发策略: 使用独立的 Jupyter 内核,支持会话状态保持
    安全考虑: 代码在隔离环境中执行,但仍需注意恶意代码风险
    """
    try:
        # 解析工具调用参数
        params = json5.loads(params)
        code = params.get('code', '')
        
        if not code.strip():
            return json5.dumps({'error': 'Empty code provided'}, ensure_ascii=False)
        
        # 初始化或获取 Jupyter 内核
        if not hasattr(self, '_kernel_manager'):
            self._init_kernel()
        
        # 执行代码并收集结果
        execution_result = self._execute_code(code)
        
        # 处理执行结果:包括标准输出、错误输出、图像等
        result = {
            'stdout': execution_result.get('stdout', ''),
            'stderr': execution_result.get('stderr', ''),
            'display_data': execution_result.get('display_data', []),
            'execution_count': execution_result.get('execution_count', 0)
        }
        
        # 如果有图像输出,保存到静态文件目录
        if result['display_data']:
            result['image_urls'] = self._save_images(result['display_data'])
        
        return json5.dumps(result, ensure_ascii=False)
        
    except Exception as e:
        error_result = {
            'error': str(e),
            'traceback': traceback.format_exc()
        }
        return json5.dumps(error_result, ensure_ascii=False)

3. Web UI 接口

3.1 Gradio 界面接口

API: 聊天界面处理

入口函数: bot (qwen_server/assistant_server.py:87-106)

def bot(history):
    """
    Gradio 聊天界面的消息处理函数
    
    设计目的: 处理用户在 Web 界面的聊天请求
    作用域: 单轮对话的完整处理流程
    依赖前置条件: Assistant 实例已初始化,页面 URL 已设置
    后置条件: 生成流式聊天响应并保存历史记录
    """
    page_url = set_url()  # 获取当前浏览的页面 URL
    
    if not history:
        yield history
    else:
        # 构造消息:结合用户输入和当前页面内容
        messages = [{
            'role': 'user', 
            'content': [
                {'text': history[-1][0]},    # 用户文本输入
                {'file': page_url}           # 当前页面文件
            ]
        }]
        
        history[-1][1] = ''  # 初始化助手响应
        
        try:
            # 调用 Assistant 生成响应
            response = assistant.run(
                messages=messages, 
                max_ref_token=server_config.server.max_ref_token
            )
            
            # 流式更新界面
            for rsp in response:
                if rsp:
                    history[-1][1] = rsp[-1]['content']
                    yield history
                    
        except ModelServiceError as ex:
            history[-1][1] = str(ex)
            yield history
        except Exception as ex:
            raise ValueError(ex)
        
        # 保存对话历史
        save_history(history, page_url, history_dir)

3.2 工作站界面接口

API: 内容生成接口

入口函数: generate (qwen_server/workstation_server.py:323-384)

def generate(context):
    """
    工作站的智能内容生成函数
    
    设计目的: 根据用户输入和特殊指令路由到不同的处理逻辑
    作用域: 支持代码执行、插件调用、文章写作等多种模式
    路由策略: 基于特殊标记(/code, /plug, /title)进行功能路由
    """
    sp_query = get_last_one_line_context(context)  # 获取最后一行有效内容
    
    if CODE_FLAG in sp_query:  # /code 标记:路由到代码解释器
        sp_query = sp_query.split(CODE_FLAG)[-1]
        
        # 根据语言添加工具使用提示
        if has_chinese_chars(sp_query):
            sp_query += ', 必须使用code_interpreter工具'
        else:
            sp_query += ' (Please use code_interpreter.)'
        
        # 使用 ReActChat 智能体执行代码
        func_assistant = ReActChat(
            function_list=['code_interpreter'], 
            llm=llm_config
        )
        
        try:
            response = func_assistant.run(
                messages=[{'role': 'user', 'content': sp_query}]
            )
            for rsp in response:
                if rsp:
                    yield rsp[-1]['content']
        except ModelServiceError as ex:
            yield str(ex)
            
    elif PLUGIN_FLAG in sp_query:  # /plug 标记:路由到插件系统
        sp_query = sp_query.split(PLUGIN_FLAG)[-1]
        
        # 使用多工具智能体
        func_assistant = ReActChat(
            function_list=['code_interpreter', 'image_gen'], 
            llm=llm_config
        )
        
        try:
            response = func_assistant.run(
                messages=[{'role': 'user', 'content': sp_query}]
            )
            for rsp in response:
                if rsp:
                    yield rsp[-1]['content']
        except ModelServiceError as ex:
            yield str(ex)
            
    else:  # 默认:路由到文章写作
        sp_query_no_title = context
        full_article = False
        
        if TITLE_FLAG in sp_query:  # /title 标记:生成完整文章
            sp_query_no_title = sp_query.split(TITLE_FLAG)[-1]
            full_article = True
        
        try:
            # 使用 ArticleAgent 进行写作
            writing_assistant = ArticleAgent(llm=llm_config)
            
            content = [{'text': sp_query_no_title}]
            
            # 添加选中的浏览历史作为参考
            for record in read_meta_data_by_condition(
                meta_file, 
                time_limit=app_global_para['time'], 
                checked=True
            ):
                content.append({'file': record['url']})
            
            response = writing_assistant.run(
                messages=[{'role': 'user', 'content': content}],
                max_ref_token=server_config.server.max_ref_token,
                full_article=full_article
            )
            
            for rsp in response:
                if rsp:
                    yield '\n'.join([x['content'] for x in rsp])
                    
        except ModelServiceError as ex:
            yield str(ex)

4. 命令行接口

4.1 服务启动接口

入口函数: main (run_server.py:97-155)

def main():
    """
    服务器启动的主入口函数
    
    设计目的: 统一管理多个服务进程的启动和配置
    作用域: 整个服务集群的生命周期管理
    进程管理: 使用 subprocess 管理多个独立服务
    信号处理: 优雅处理 SIGINT 和 SIGTERM 信号
    """
    # 解析命令行参数
    args = parse_args()
    
    # 加载和更新配置
    server_config_path = Path(__file__).resolve().parent / 'qwen_server/server_config.json'
    with open(server_config_path, 'r') as f:
        server_config = json.load(f)
        server_config = GlobalConfig(**server_config)
    
    server_config = update_config(server_config, args, server_config_path)
    
    # 创建必要的工作目录
    os.makedirs(server_config.path.work_space_root, exist_ok=True)
    os.makedirs(server_config.path.download_root, exist_ok=True)
    os.makedirs(server_config.path.code_interpreter_ws, exist_ok=True)
    
    # 设置环境变量
    code_interpreter_work_dir = str(
        Path(__file__).resolve().parent / server_config.path.code_interpreter_ws
    )
    os.environ['M6_CODE_INTERPRETER_WORK_DIR'] = code_interpreter_work_dir
    
    # 启动多个服务进程
    servers = {
        'database': subprocess.Popen([
            sys.executable,
            os.path.join(os.getcwd(), 'qwen_server/database_server.py'),
        ]),
        'workstation': subprocess.Popen([
            sys.executable,
            os.path.join(os.getcwd(), 'qwen_server/workstation_server.py'),
        ]),
        'assistant': subprocess.Popen([
            sys.executable,
            os.path.join(os.getcwd(), 'qwen_server/assistant_server.py'),
        ]),
    }
    
    # 信号处理:优雅关闭所有服务
    def signal_handler(sig_num, _frame):
        for v in servers.values():
            v.terminate()  # 发送终止信号
        for k in list(servers.keys()):
            del servers[k]
        if sig_num == signal.SIGINT:
            raise KeyboardInterrupt()
    
    # 注册信号处理器
    append_signal_handler(signal.SIGINT, signal_handler)
    append_signal_handler(signal.SIGTERM, signal_handler)
    
    # 等待所有进程结束
    for p in list(servers.values()):
        p.wait()

5. API 性能和可观测性

5.1 性能指标

API 类型 预期延迟 QPS 限制 内存占用
HTTP API < 100ms 100/s < 100MB
Agent.run() 2-10s 10/s 200-500MB
工具调用 1-30s 5/s 100-1GB
Web UI < 200ms 50/s < 200MB

5.2 错误处理

所有 API 都实现了统一的错误处理机制:

try:
    # API 执行逻辑
    result = execute_api_logic()
    return success_response(result)
except ModelServiceError as e:
    # LLM 服务错误:用户可见,可重试
    return error_response(str(e), error_type='model_service')
except ValidationError as e:
    # 参数验证错误:用户可修复
    return error_response(str(e), error_type='validation')
except Exception as e:
    # 系统内部错误:记录日志,返回通用错误信息
    logger.error(f"Internal error: {e}", exc_info=True)
    return error_response("Internal server error", error_type='internal')

5.3 可观测性

  • 日志记录: 所有 API 调用都有详细的日志记录
  • 性能监控: 关键路径的执行时间统计
  • 错误追踪: 异常堆栈和上下文信息记录
  • 资源监控: 内存、CPU 使用情况监控

验收清单

  • HTTP API 接口完整列举
  • Python API 核心方法分析
  • Web UI 接口处理流程
  • 命令行接口说明
  • 每个 API 的调用链路图
  • 关键函数代码片段和注释
  • 错误处理和性能指标
  • 可观测性说明