RAGFlow-08-Plugin模块

模块概览

1.1 职责与定位

Plugin 模块是 RAGFlow 的插件系统,支持动态扩展 LLM 工具能力。主要职责包括:

  1. 插件加载:动态加载 Python 插件、MCP(Model Context Protocol)插件
  2. 工具注册:将插件注册为 LLM Tool,供 Agent 调用
  3. 元数据管理:插件名称、描述、参数 Schema
  4. 生命周期管理:插件初始化、调用、销毁
  5. 内置插件:提供常用插件(Web 搜索、数据库查询、文件操作)

1.2 插件类型

Python 插件

  • 继承 LLMToolPlugin 基类
  • 实现 execute() 方法
  • 部署在 plugin/embedded_plugins/ 目录

MCP 插件(Model Context Protocol):

  • 独立进程(通过 stdio/HTTP 通信)
  • 标准化协议(JSON-RPC)
  • 支持远程插件(如 GitHub、Notion、Slack)

1. 模块架构图

flowchart TB
    subgraph "Agent 组件"
        AgentWithTools[Agent with Tools]
    end

    subgraph "Plugin Manager"
        PluginManager[PluginManager<br/>插件管理器]
        PluginLoader[PluginLoader<br/>插件加载器]
        MCPClient[MCP Client<br/>MCP 客户端]
    end

    subgraph "插件仓库"
        EmbeddedPlugins[内置插件<br/>plugin/embedded_plugins/]
        CustomPlugins[自定义插件<br/>用户上传]
        MCPServers[MCP Servers<br/>远程插件]
    end

    subgraph "内置插件示例"
        WebSearch[Web Search<br/>Tavily/DuckDuckGo]
        DatabaseQuery[Database Query<br/>SQL 执行]
        FileOps[File Operations<br/>读写文件]
        APICall[API Call<br/>HTTP 请求]
    end

    subgraph "MCP 插件示例"
        GitHub[GitHub MCP<br/>仓库管理]
        Notion[Notion MCP<br/>笔记管理]
        Slack[Slack MCP<br/>消息发送]
    end

    AgentWithTools --> PluginManager
    
    PluginManager --> PluginLoader
    PluginManager --> MCPClient
    
    PluginLoader --> EmbeddedPlugins
    PluginLoader --> CustomPlugins
    MCPClient --> MCPServers
    
    EmbeddedPlugins --> WebSearch
    EmbeddedPlugins --> DatabaseQuery
    EmbeddedPlugins --> FileOps
    EmbeddedPlugins --> APICall
    
    MCPServers --> GitHub
    MCPServers --> Notion
    MCPServers --> Slack

2. 核心功能详细剖析

2.1 LLM Tool Plugin 基类

2.1.1 核心代码

# plugin/llm_tool_plugin.py
from abc import ABC, abstractmethod
from typing import Any, Dict, List

class LLMToolPlugin(ABC):
    """LLM Tool 插件基类"""
    
    @property
    @abstractmethod
    def name(self) -> str:
        """插件名称(唯一标识)"""
        pass
    
    @property
    @abstractmethod
    def description(self) -> str:
        """插件描述(供 LLM 理解)"""
        pass
    
    @property
    @abstractmethod
    def parameters_schema(self) -> Dict[str, Any]:
        """参数 Schema(JSON Schema 格式)"""
        pass
    
    @abstractmethod
    def execute(self, **kwargs) -> str:
        """执行插件逻辑"""
        pass
    
    def get_openai_tool_schema(self) -> Dict[str, Any]:
        """转换为 OpenAI Tool Schema"""
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
                "parameters": self.parameters_schema
            }
        }

2.2.1 核心代码

# plugin/embedded_plugins/web_search_plugin.py
from plugin.llm_tool_plugin import LLMToolPlugin
import requests
import os

class WebSearchPlugin(LLMToolPlugin):
    """Web 搜索插件(使用 Tavily API)"""
    
    @property
    def name(self) -> str:
        return "web_search"
    
    @property
    def description(self) -> str:
        return "Search the web for information using Tavily search engine. Use this when you need up-to-date information from the internet."
    
    @property
    def parameters_schema(self) -> dict:
        return {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "The search query"
                },
                "max_results": {
                    "type": "integer",
                    "description": "Maximum number of results to return",
                    "default": 5
                }
            },
            "required": ["query"]
        }
    
    def execute(self, query: str, max_results: int = 5) -> str:
        """执行搜索"""
        api_key = os.getenv("TAVILY_API_KEY")
        if not api_key:
            return "Error: TAVILY_API_KEY not set"
        
        try:
            # 调用 Tavily API
            response = requests.post(
                "https://api.tavily.com/search",
                json={
                    "api_key": api_key,
                    "query": query,
                    "max_results": max_results
                },
                timeout=30
            )
            
            if response.status_code != 200:
                return f"Error: {response.text}"
            
            # 格式化结果
            results = response.json().get("results", [])
            formatted = []
            for i, result in enumerate(results[:max_results], 1):
                formatted.append(
                    f"{i}. {result['title']}\n"
                    f"   URL: {result['url']}\n"
                    f"   {result['content']}\n"
                )
            
            return "\n".join(formatted) if formatted else "No results found"
        
        except Exception as e:
            return f"Error: {str(e)}"

2.3 Plugin Manager(插件管理器)

2.3.1 核心代码

# plugin/plugin_manager.py
import importlib
import os
from typing import Dict, List
from plugin.llm_tool_plugin import LLMToolPlugin

class PluginManager:
    """插件管理器"""
    
    def __init__(self):
        self.plugins: Dict[str, LLMToolPlugin] = {}
    
    def load_embedded_plugins(self):
        """加载内置插件"""
        plugin_dir = os.path.join(os.path.dirname(__file__), "embedded_plugins")
        
        for filename in os.listdir(plugin_dir):
            if not filename.endswith("_plugin.py"):
                continue
            
            # 动态导入模块
            module_name = filename[:-3]  # 去除 .py
            module = importlib.import_module(f"plugin.embedded_plugins.{module_name}")
            
            # 找到 Plugin 类
            for attr_name in dir(module):
                attr = getattr(module, attr_name)
                if (isinstance(attr, type) and 
                    issubclass(attr, LLMToolPlugin) and 
                    attr != LLMToolPlugin):
                    
                    # 实例化插件
                    plugin = attr()
                    self.plugins[plugin.name] = plugin
                    print(f"Loaded plugin: {plugin.name}")
    
    def load_custom_plugin(self, plugin_path: str):
        """加载自定义插件"""
        # 从文件路径加载插件
        spec = importlib.util.spec_from_file_location("custom_plugin", plugin_path)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        
        # 查找 Plugin 类
        for attr_name in dir(module):
            attr = getattr(module, attr_name)
            if (isinstance(attr, type) and 
                issubclass(attr, LLMToolPlugin) and 
                attr != LLMToolPlugin):
                
                plugin = attr()
                self.plugins[plugin.name] = plugin
                print(f"Loaded custom plugin: {plugin.name}")
    
    def get_plugin(self, name: str) -> LLMToolPlugin:
        """获取插件"""
        return self.plugins.get(name)
    
    def list_plugins(self) -> List[str]:
        """列出所有插件"""
        return list(self.plugins.keys())
    
    def get_all_tool_schemas(self) -> List[Dict]:
        """获取所有插件的 OpenAI Tool Schema"""
        return [plugin.get_openai_tool_schema() for plugin in self.plugins.values()]
    
    def execute_plugin(self, name: str, **kwargs) -> str:
        """执行插件"""
        plugin = self.get_plugin(name)
        if not plugin:
            return f"Error: Plugin '{name}' not found"
        
        try:
            return plugin.execute(**kwargs)
        except Exception as e:
            return f"Error executing plugin '{name}': {str(e)}"

# 全局单例
plugin_manager = PluginManager()
plugin_manager.load_embedded_plugins()

2.4 MCP 插件集成

2.4.1 MCP 客户端

# mcp/client/mcp_client.py
import json
import subprocess
from typing import Dict, Any, List

class MCPClient:
    """MCP (Model Context Protocol) 客户端"""
    
    def __init__(self, server_command: List[str]):
        """
        Args:
            server_command: MCP Server 启动命令,如 ["node", "server.js"]
        """
        self.server_command = server_command
        self.process = None
    
    def start(self):
        """启动 MCP Server"""
        self.process = subprocess.Popen(
            self.server_command,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            bufsize=1
        )
    
    def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
        """调用 MCP Tool"""
        request = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "tools/call",
            "params": {
                "name": tool_name,
                "arguments": arguments
            }
        }
        
        # 发送请求
        self.process.stdin.write(json.dumps(request) + "\n")
        self.process.stdin.flush()
        
        # 读取响应
        response_line = self.process.stdout.readline()
        response = json.loads(response_line)
        
        if "error" in response:
            return f"Error: {response['error']['message']}"
        
        return response.get("result", {}).get("content", [{}])[0].get("text", "")
    
    def list_tools(self) -> List[Dict]:
        """列出所有工具"""
        request = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "tools/list"
        }
        
        self.process.stdin.write(json.dumps(request) + "\n")
        self.process.stdin.flush()
        
        response_line = self.process.stdout.readline()
        response = json.loads(response_line)
        
        return response.get("result", {}).get("tools", [])
    
    def stop(self):
        """停止 MCP Server"""
        if self.process:
            self.process.terminate()
            self.process.wait()

2.4.2 MCP Plugin Adapter

# plugin/mcp_plugin_adapter.py
from plugin.llm_tool_plugin import LLMToolPlugin
from mcp.client.mcp_client import MCPClient
from typing import Dict, Any

class MCPPluginAdapter(LLMToolPlugin):
    """将 MCP Tool 适配为 LLMToolPlugin"""
    
    def __init__(self, mcp_client: MCPClient, tool_meta: Dict[str, Any]):
        self.mcp_client = mcp_client
        self.tool_meta = tool_meta
    
    @property
    def name(self) -> str:
        return self.tool_meta["name"]
    
    @property
    def description(self) -> str:
        return self.tool_meta.get("description", "")
    
    @property
    def parameters_schema(self) -> Dict[str, Any]:
        return self.tool_meta.get("inputSchema", {})
    
    def execute(self, **kwargs) -> str:
        """执行 MCP Tool"""
        return self.mcp_client.call_tool(self.name, kwargs)

2.5 Agent 集成

2.5.1 Agent 使用插件

# agent/component/agent_with_tools.py (片段)
from plugin.plugin_manager import plugin_manager

class Agent(LLM):
    def __init__(self, canvas, id, param):
        # 1. 加载 Python 插件
        self.tools = {}
        for plugin_name in param.plugins:
            plugin = plugin_manager.get_plugin(plugin_name)
            if plugin:
                self.tools[plugin.name] = plugin
        
        # 2. 加载 MCP 插件
        for mcp_config in param.mcp_servers:
            mcp_client = MCPClient(mcp_config["command"])
            mcp_client.start()
            
            # 列出所有工具
            for tool_meta in mcp_client.list_tools():
                adapter = MCPPluginAdapter(mcp_client, tool_meta)
                self.tools[adapter.name] = adapter
        
        # 3. 生成 Tool Schema(供 LLM 使用)
        self.tool_schemas = [tool.get_openai_tool_schema() for tool in self.tools.values()]
    
    def _invoke(self, **kwargs):
        # 调用 LLM,传递 tools
        for response in self.chat_mdl.chat_streamly_with_tools(
            system=self._param.sys_prompt,
            messages=self.history,
            tools=self.tool_schemas
        ):
            if response.get("tool_calls"):
                # LLM 决定调用工具
                for tool_call in response["tool_calls"]:
                    tool_name = tool_call["function"]["name"]
                    tool_args = json.loads(tool_call["function"]["arguments"])
                    
                    # 执行插件
                    tool_result = self.tools[tool_name].execute(**tool_args)
                    
                    # 注入结果到历史
                    self.history.append({
                        "role": "tool",
                        "tool_call_id": tool_call["id"],
                        "content": tool_result
                    })

3. 自定义插件开发

3.1 开发步骤

1. 创建插件文件

# my_custom_plugin.py
from plugin.llm_tool_plugin import LLMToolPlugin

class MyCustomPlugin(LLMToolPlugin):
    @property
    def name(self) -> str:
        return "my_tool"
    
    @property
    def description(self) -> str:
        return "Description of what this tool does"
    
    @property
    def parameters_schema(self) -> dict:
        return {
            "type": "object",
            "properties": {
                "param1": {"type": "string", "description": "..."},
                "param2": {"type": "integer", "description": "..."}
            },
            "required": ["param1"]
        }
    
    def execute(self, param1: str, param2: int = 0) -> str:
        # 实现逻辑
        result = f"Processing {param1} with {param2}"
        return result

2. 加载插件

from plugin.plugin_manager import plugin_manager

plugin_manager.load_custom_plugin("/path/to/my_custom_plugin.py")

3. Agent 使用插件

{
  "component_name": "Agent",
  "params": {
    "plugins": ["my_tool", "web_search"],
    "llm_id": "gpt-4"
  }
}

4. 内置插件列表

插件名称 描述 参数
web_search Web 搜索(Tavily) query, max_results
duckduckgo_search DuckDuckGo 搜索 query, max_results
wikipedia Wikipedia 查询 query, lang
arxiv arXiv 论文搜索 query, max_results
github GitHub 仓库搜索 query, language
tavily Tavily 深度搜索 query, search_depth
qweather 和风天气查询 location, lang
jin10 金十数据财经日历 date

5. MCP 插件示例

5.1 GitHub MCP

配置

{
  "mcp_servers": [
    {
      "name": "github",
      "command": ["node", "/path/to/github-mcp/server.js"],
      "env": {
        "GITHUB_TOKEN": "ghp_xxxx"
      }
    }
  ]
}

可用工具

  • create_repository:创建仓库
  • list_issues:列出 Issue
  • create_pull_request:创建 PR

6. 最佳实践

1. 插件设计

  • 单一职责(一个插件只做一件事)
  • 明确参数 Schema(帮助 LLM 理解)
  • 友好的错误信息

2. 安全性

  • 验证输入参数(防止注入攻击)
  • 敏感信息(API Key)使用环境变量
  • 限制插件权限(文件系统/网络访问)

3. 性能

  • 缓存结果(相同查询)
  • 设置超时(防止插件卡死)
  • 异步执行(不阻塞 Agent)