Dify-08-Backend-ModelRuntime模型运行时-完整剖析
目录
1. 模块概览
1.1 职责与边界
Model Runtime是Dify平台的模型调用抽象层,提供统一接口管理和调用30+种LLM提供商,屏蔽底层实现差异。
核心职责:
- 提供商管理:注册、配置、凭证验证
- 模型抽象:统一调用接口,支持6种模型类型
- 凭证管理:安全存储API Key等认证信息
- 流式支持:支持SSE流式和阻塞两种模式
- Token计算:统一Token计数和成本计算
- 错误处理:统一异常转换和重试机制
支持的模型类型:
| 类型 | 说明 | 主要提供商 |
|---|---|---|
| LLM | 大语言模型 | OpenAI、Anthropic、Azure |
| Text Embedding | 文本向量化 | OpenAI、Cohere、Jina |
| Rerank | 重排序 | Cohere、Jina |
| Speech2Text | 语音转文字 | OpenAI Whisper |
| Text2Speech | 文字转语音 | OpenAI TTS、Azure |
| Moderation | 内容审核 | OpenAI Moderation |
上下游依赖:
- 上游:Workflow节点、Agent、RAG、App
- 下游:各LLM提供商API、凭证存储、Token计数器
2. 整体架构
2.1 架构图
graph TB
subgraph "业务层"
App[App应用]
WF[Workflow]
RAG[RAG检索]
Agent[Agent]
end
subgraph "ModelRuntime层"
MM[ModelManager]
PM[ProviderManager]
subgraph "接口层"
LLM[LLM接口]
Embed[Embedding接口]
Rerank[Rerank接口]
end
subgraph "实现层"
OpenAI[OpenAI Provider]
Anthropic[Anthropic Provider]
Azure[Azure Provider]
Others[其他30+...]
end
end
subgraph "工具层"
HTTP[HTTP Client]
Cred[凭证存储]
Token[Token计数]
end
App --> MM
WF --> MM
RAG --> MM
Agent --> MM
MM --> PM
MM --> LLM
MM --> Embed
MM --> Rerank
LLM --> OpenAI
LLM --> Anthropic
LLM --> Azure
OpenAI --> HTTP
PM --> Cred
MM --> Token
2.2 设计理由
为什么需要抽象层?
- 解耦:业务代码不依赖具体Provider
- 统一:不同Provider提供一致的接口
- 扩展:新增Provider无需修改业务代码
- 切换:灵活切换Provider和模型
为什么按模型类型分类?
- 职责明确:不同模型类型有不同的方法
- 类型安全:编译期检查接口一致性
- 易维护:按功能组织代码
3. 核心数据结构
3.1 ModelType(模型类型)
classDiagram
class ModelType {
<<enumeration>>
LLM
TEXT_EMBEDDING
RERANK
SPEECH2TEXT
TTS
MODERATION
}
3.2 LLMResult(LLM响应)
classDiagram
class LLMResult {
+str model
+list~LLMResultChunk~ prompt_messages
+Message message
+Usage usage
}
class Message {
+str role
+str content
+list~ToolCall~ tool_calls
}
class Usage {
+int prompt_tokens
+int completion_tokens
+int total_tokens
+float total_price
}
LLMResult --> Message
LLMResult --> Usage
4. API详细规格
4.1 LargeLanguageModel.invoke()
4.1.1 方法签名
def invoke(
self,
model: str,
credentials: dict,
prompt_messages: list[PromptMessage],
model_parameters: dict,
tools: list[PromptMessageTool] = None,
stop: list[str] = None,
stream: bool = True,
) -> Generator[LLMResultChunk, None, None] | LLMResult:
"""调用LLM生成响应"""
4.1.2 核心实现
def invoke(self, model, credentials, prompt_messages, ...):
# 1. 验证凭证
self._validate_credentials(credentials)
# 2. 构建请求
request = self._build_request(
model, prompt_messages, model_parameters, tools, stop
)
# 3. 调用API
if stream:
# 流式调用
for chunk in self._stream_request(request, credentials):
yield self._convert_chunk(chunk)
else:
# 阻塞调用
response = self._blocking_request(request, credentials)
return self._convert_response(response)
4.1.3 调用链
WorkflowNode.run()
└─> ModelManager.get_model_instance()
└─> LLMProvider.get_model()
└─> LargeLanguageModel.invoke()
├─> _validate_credentials()
├─> _build_request()
└─> _stream_request() / _blocking_request()
5. 执行流程与时序
5.1 LLM调用流程
sequenceDiagram
participant App as 应用
participant MM as ModelManager
participant Prov as Provider
participant LLM as LLM Model
participant API as LLM API
App->>MM: get_model_instance()
MM->>Prov: get_provider()
Prov->>LLM: create_instance()
LLM-->>MM: model_instance
MM-->>App: model_instance
App->>LLM: invoke(prompt, stream=True)
LLM->>LLM: validate_credentials()
LLM->>LLM: build_request()
LLM->>API: POST /chat/completions
loop 流式响应
API-->>LLM: chunk
LLM->>LLM: convert_chunk()
LLM-->>App: yield chunk
end
6. 关键功能深入分析
6.1 凭证验证
def validate_provider_credentials(self, provider, credentials):
# 1. 获取Provider配置
provider_schema = self._get_provider_schema(provider)
# 2. 校验必需字段
for field in provider_schema.required_fields:
if field not in credentials:
raise ValueError(f"Missing required field: {field}")
# 3. 调用Provider验证方法
provider_instance = self._get_provider_instance(provider)
provider_instance.validate_credentials(credentials)
6.2 Token计算
def calculate_tokens(self, model, messages):
# 根据模型选择tokenizer
if model.startswith("gpt-"):
tokenizer = tiktoken.encoding_for_model(model)
else:
tokenizer = self._get_default_tokenizer()
# 计算token数
total_tokens = 0
for msg in messages:
total_tokens += len(tokenizer.encode(msg.content))
return total_tokens
6.3 成本计算
def calculate_price(self, model, usage):
# 获取模型价格配置
price_config = self._get_model_price(model)
# 计算成本
prompt_cost = usage.prompt_tokens * price_config.prompt_price / 1000
completion_cost = usage.completion_tokens * price_config.completion_price / 1000
return prompt_cost + completion_cost
7. 实战案例与最佳实践
7.1 案例:多Provider切换
# 配置多个Provider
providers_config = {
"primary": {
"provider": "openai",
"model": "gpt-4",
"credentials": {"api_key": "sk-xxx"}
},
"fallback": {
"provider": "anthropic",
"model": "claude-3",
"credentials": {"api_key": "sk-xxx"}
}
}
# 尝试调用,失败自动切换
def call_with_fallback(prompt):
for name, config in providers_config.items():
try:
model = model_manager.get_model_instance(**config)
return model.invoke(prompt)
except Exception as e:
# (此处省略日志记录)
continue
raise Exception("All providers failed")
7.2 最佳实践
1. 凭证管理
- 使用环境变量或密钥管理服务
- 定期轮换API Key
- 监控凭证使用量
2. 成本优化
- 使用缓存减少重复调用
- 选择合适的模型(性价比)
- 监控Token消耗
3. 错误处理
- 实现重试机制(指数退避)
- 设置超时时间
- 记录失败日志
文档版本:v1.0
生成日期:2025-10-04
维护者:Backend Team