RAGFlow-08-Plugin模块
模块概览
1.1 职责与定位
Plugin 模块是 RAGFlow 的插件系统,支持动态扩展 LLM 工具能力。主要职责包括:
- 插件加载:动态加载 Python 插件、MCP(Model Context Protocol)插件
- 工具注册:将插件注册为 LLM Tool,供 Agent 调用
- 元数据管理:插件名称、描述、参数 Schema
- 生命周期管理:插件初始化、调用、销毁
- 内置插件:提供常用插件(Web 搜索、数据库查询、文件操作)
1.2 插件类型
Python 插件:
- 继承
LLMToolPlugin基类 - 实现
execute()方法 - 部署在
plugin/embedded_plugins/目录
MCP 插件(Model Context Protocol):
- 独立进程(通过 stdio/HTTP 通信)
- 标准化协议(JSON-RPC)
- 支持远程插件(如 GitHub、Notion、Slack)
1. 模块架构图
flowchart TB
subgraph "Agent 组件"
AgentWithTools[Agent with Tools]
end
subgraph "Plugin Manager"
PluginManager[PluginManager<br/>插件管理器]
PluginLoader[PluginLoader<br/>插件加载器]
MCPClient[MCP Client<br/>MCP 客户端]
end
subgraph "插件仓库"
EmbeddedPlugins[内置插件<br/>plugin/embedded_plugins/]
CustomPlugins[自定义插件<br/>用户上传]
MCPServers[MCP Servers<br/>远程插件]
end
subgraph "内置插件示例"
WebSearch[Web Search<br/>Tavily/DuckDuckGo]
DatabaseQuery[Database Query<br/>SQL 执行]
FileOps[File Operations<br/>读写文件]
APICall[API Call<br/>HTTP 请求]
end
subgraph "MCP 插件示例"
GitHub[GitHub MCP<br/>仓库管理]
Notion[Notion MCP<br/>笔记管理]
Slack[Slack MCP<br/>消息发送]
end
AgentWithTools --> PluginManager
PluginManager --> PluginLoader
PluginManager --> MCPClient
PluginLoader --> EmbeddedPlugins
PluginLoader --> CustomPlugins
MCPClient --> MCPServers
EmbeddedPlugins --> WebSearch
EmbeddedPlugins --> DatabaseQuery
EmbeddedPlugins --> FileOps
EmbeddedPlugins --> APICall
MCPServers --> GitHub
MCPServers --> Notion
MCPServers --> Slack
2. 核心功能详细剖析
2.1 LLM Tool Plugin 基类
2.1.1 核心代码
# plugin/llm_tool_plugin.py
from abc import ABC, abstractmethod
from typing import Any, Dict, List
class LLMToolPlugin(ABC):
"""LLM Tool 插件基类"""
@property
@abstractmethod
def name(self) -> str:
"""插件名称(唯一标识)"""
pass
@property
@abstractmethod
def description(self) -> str:
"""插件描述(供 LLM 理解)"""
pass
@property
@abstractmethod
def parameters_schema(self) -> Dict[str, Any]:
"""参数 Schema(JSON Schema 格式)"""
pass
@abstractmethod
def execute(self, **kwargs) -> str:
"""执行插件逻辑"""
pass
def get_openai_tool_schema(self) -> Dict[str, Any]:
"""转换为 OpenAI Tool Schema"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters_schema
}
}
2.2 内置插件:Web Search
2.2.1 核心代码
# plugin/embedded_plugins/web_search_plugin.py
from plugin.llm_tool_plugin import LLMToolPlugin
import requests
import os
class WebSearchPlugin(LLMToolPlugin):
"""Web 搜索插件(使用 Tavily API)"""
@property
def name(self) -> str:
return "web_search"
@property
def description(self) -> str:
return "Search the web for information using Tavily search engine. Use this when you need up-to-date information from the internet."
@property
def parameters_schema(self) -> dict:
return {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query"
},
"max_results": {
"type": "integer",
"description": "Maximum number of results to return",
"default": 5
}
},
"required": ["query"]
}
def execute(self, query: str, max_results: int = 5) -> str:
"""执行搜索"""
api_key = os.getenv("TAVILY_API_KEY")
if not api_key:
return "Error: TAVILY_API_KEY not set"
try:
# 调用 Tavily API
response = requests.post(
"https://api.tavily.com/search",
json={
"api_key": api_key,
"query": query,
"max_results": max_results
},
timeout=30
)
if response.status_code != 200:
return f"Error: {response.text}"
# 格式化结果
results = response.json().get("results", [])
formatted = []
for i, result in enumerate(results[:max_results], 1):
formatted.append(
f"{i}. {result['title']}\n"
f" URL: {result['url']}\n"
f" {result['content']}\n"
)
return "\n".join(formatted) if formatted else "No results found"
except Exception as e:
return f"Error: {str(e)}"
2.3 Plugin Manager(插件管理器)
2.3.1 核心代码
# plugin/plugin_manager.py
import importlib
import os
from typing import Dict, List
from plugin.llm_tool_plugin import LLMToolPlugin
class PluginManager:
"""插件管理器"""
def __init__(self):
self.plugins: Dict[str, LLMToolPlugin] = {}
def load_embedded_plugins(self):
"""加载内置插件"""
plugin_dir = os.path.join(os.path.dirname(__file__), "embedded_plugins")
for filename in os.listdir(plugin_dir):
if not filename.endswith("_plugin.py"):
continue
# 动态导入模块
module_name = filename[:-3] # 去除 .py
module = importlib.import_module(f"plugin.embedded_plugins.{module_name}")
# 找到 Plugin 类
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (isinstance(attr, type) and
issubclass(attr, LLMToolPlugin) and
attr != LLMToolPlugin):
# 实例化插件
plugin = attr()
self.plugins[plugin.name] = plugin
print(f"Loaded plugin: {plugin.name}")
def load_custom_plugin(self, plugin_path: str):
"""加载自定义插件"""
# 从文件路径加载插件
spec = importlib.util.spec_from_file_location("custom_plugin", plugin_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# 查找 Plugin 类
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (isinstance(attr, type) and
issubclass(attr, LLMToolPlugin) and
attr != LLMToolPlugin):
plugin = attr()
self.plugins[plugin.name] = plugin
print(f"Loaded custom plugin: {plugin.name}")
def get_plugin(self, name: str) -> LLMToolPlugin:
"""获取插件"""
return self.plugins.get(name)
def list_plugins(self) -> List[str]:
"""列出所有插件"""
return list(self.plugins.keys())
def get_all_tool_schemas(self) -> List[Dict]:
"""获取所有插件的 OpenAI Tool Schema"""
return [plugin.get_openai_tool_schema() for plugin in self.plugins.values()]
def execute_plugin(self, name: str, **kwargs) -> str:
"""执行插件"""
plugin = self.get_plugin(name)
if not plugin:
return f"Error: Plugin '{name}' not found"
try:
return plugin.execute(**kwargs)
except Exception as e:
return f"Error executing plugin '{name}': {str(e)}"
# 全局单例
plugin_manager = PluginManager()
plugin_manager.load_embedded_plugins()
2.4 MCP 插件集成
2.4.1 MCP 客户端
# mcp/client/mcp_client.py
import json
import subprocess
from typing import Dict, Any, List
class MCPClient:
"""MCP (Model Context Protocol) 客户端"""
def __init__(self, server_command: List[str]):
"""
Args:
server_command: MCP Server 启动命令,如 ["node", "server.js"]
"""
self.server_command = server_command
self.process = None
def start(self):
"""启动 MCP Server"""
self.process = subprocess.Popen(
self.server_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""调用 MCP Tool"""
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments
}
}
# 发送请求
self.process.stdin.write(json.dumps(request) + "\n")
self.process.stdin.flush()
# 读取响应
response_line = self.process.stdout.readline()
response = json.loads(response_line)
if "error" in response:
return f"Error: {response['error']['message']}"
return response.get("result", {}).get("content", [{}])[0].get("text", "")
def list_tools(self) -> List[Dict]:
"""列出所有工具"""
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list"
}
self.process.stdin.write(json.dumps(request) + "\n")
self.process.stdin.flush()
response_line = self.process.stdout.readline()
response = json.loads(response_line)
return response.get("result", {}).get("tools", [])
def stop(self):
"""停止 MCP Server"""
if self.process:
self.process.terminate()
self.process.wait()
2.4.2 MCP Plugin Adapter
# plugin/mcp_plugin_adapter.py
from plugin.llm_tool_plugin import LLMToolPlugin
from mcp.client.mcp_client import MCPClient
from typing import Dict, Any
class MCPPluginAdapter(LLMToolPlugin):
"""将 MCP Tool 适配为 LLMToolPlugin"""
def __init__(self, mcp_client: MCPClient, tool_meta: Dict[str, Any]):
self.mcp_client = mcp_client
self.tool_meta = tool_meta
@property
def name(self) -> str:
return self.tool_meta["name"]
@property
def description(self) -> str:
return self.tool_meta.get("description", "")
@property
def parameters_schema(self) -> Dict[str, Any]:
return self.tool_meta.get("inputSchema", {})
def execute(self, **kwargs) -> str:
"""执行 MCP Tool"""
return self.mcp_client.call_tool(self.name, kwargs)
2.5 Agent 集成
2.5.1 Agent 使用插件
# agent/component/agent_with_tools.py (片段)
from plugin.plugin_manager import plugin_manager
class Agent(LLM):
def __init__(self, canvas, id, param):
# 1. 加载 Python 插件
self.tools = {}
for plugin_name in param.plugins:
plugin = plugin_manager.get_plugin(plugin_name)
if plugin:
self.tools[plugin.name] = plugin
# 2. 加载 MCP 插件
for mcp_config in param.mcp_servers:
mcp_client = MCPClient(mcp_config["command"])
mcp_client.start()
# 列出所有工具
for tool_meta in mcp_client.list_tools():
adapter = MCPPluginAdapter(mcp_client, tool_meta)
self.tools[adapter.name] = adapter
# 3. 生成 Tool Schema(供 LLM 使用)
self.tool_schemas = [tool.get_openai_tool_schema() for tool in self.tools.values()]
def _invoke(self, **kwargs):
# 调用 LLM,传递 tools
for response in self.chat_mdl.chat_streamly_with_tools(
system=self._param.sys_prompt,
messages=self.history,
tools=self.tool_schemas
):
if response.get("tool_calls"):
# LLM 决定调用工具
for tool_call in response["tool_calls"]:
tool_name = tool_call["function"]["name"]
tool_args = json.loads(tool_call["function"]["arguments"])
# 执行插件
tool_result = self.tools[tool_name].execute(**tool_args)
# 注入结果到历史
self.history.append({
"role": "tool",
"tool_call_id": tool_call["id"],
"content": tool_result
})
3. 自定义插件开发
3.1 开发步骤
1. 创建插件文件:
# my_custom_plugin.py
from plugin.llm_tool_plugin import LLMToolPlugin
class MyCustomPlugin(LLMToolPlugin):
@property
def name(self) -> str:
return "my_tool"
@property
def description(self) -> str:
return "Description of what this tool does"
@property
def parameters_schema(self) -> dict:
return {
"type": "object",
"properties": {
"param1": {"type": "string", "description": "..."},
"param2": {"type": "integer", "description": "..."}
},
"required": ["param1"]
}
def execute(self, param1: str, param2: int = 0) -> str:
# 实现逻辑
result = f"Processing {param1} with {param2}"
return result
2. 加载插件:
from plugin.plugin_manager import plugin_manager
plugin_manager.load_custom_plugin("/path/to/my_custom_plugin.py")
3. Agent 使用插件:
{
"component_name": "Agent",
"params": {
"plugins": ["my_tool", "web_search"],
"llm_id": "gpt-4"
}
}
4. 内置插件列表
| 插件名称 | 描述 | 参数 |
|---|---|---|
| web_search | Web 搜索(Tavily) | query, max_results |
| duckduckgo_search | DuckDuckGo 搜索 | query, max_results |
| wikipedia | Wikipedia 查询 | query, lang |
| arxiv | arXiv 论文搜索 | query, max_results |
| github | GitHub 仓库搜索 | query, language |
| tavily | Tavily 深度搜索 | query, search_depth |
| qweather | 和风天气查询 | location, lang |
| jin10 | 金十数据财经日历 | date |
5. MCP 插件示例
5.1 GitHub MCP
配置:
{
"mcp_servers": [
{
"name": "github",
"command": ["node", "/path/to/github-mcp/server.js"],
"env": {
"GITHUB_TOKEN": "ghp_xxxx"
}
}
]
}
可用工具:
create_repository:创建仓库list_issues:列出 Issuecreate_pull_request:创建 PR
6. 最佳实践
1. 插件设计:
- 单一职责(一个插件只做一件事)
- 明确参数 Schema(帮助 LLM 理解)
- 友好的错误信息
2. 安全性:
- 验证输入参数(防止注入攻击)
- 敏感信息(API Key)使用环境变量
- 限制插件权限(文件系统/网络访问)
3. 性能:
- 缓存结果(相同查询)
- 设置超时(防止插件卡死)
- 异步执行(不阻塞 Agent)