概述
Dify的服务层模块(services/
)是平台的业务逻辑核心,采用领域驱动设计(DDD)实现了完整的业务功能。该模块包含20+个核心服务,涵盖应用管理、数据集处理、工作流执行、用户账户、计费系统等各个业务领域,为上层API提供了丰富的业务能力支撑。
Code Executor安全执行引擎: Dify实现了安全的代码执行器(Code Executor),支持多种编程语言的安全执行:
class CodeExecutor:
"""
安全代码执行器
支持Python、Node.js、Go等多种语言的沙箱执行
"""
SUPPORTED_LANGUAGES = {
"python": PythonCodeExecutor,
"javascript": NodeJSCodeExecutor,
"typescript": NodeJSCodeExecutor,
"go": GoCodeExecutor
}
def execute_code(
self,
language: str,
code: str,
inputs: dict,
timeout: int = 30
) -> CodeExecutionResult:
"""
安全执行用户代码
Args:
language: 编程语言
code: 待执行的代码
inputs: 输入变量
timeout: 执行超时时间
Returns:
CodeExecutionResult: 执行结果
"""
# 1. 语言支持检查
if language not in self.SUPPORTED_LANGUAGES:
raise UnsupportedLanguageError(f"不支持的语言: {language}")
# 2. 代码安全检查
security_checker = CodeSecurityChecker()
if not security_checker.is_safe(code, language):
raise UnsafeCodeError("代码包含不安全的操作")
# 3. 创建隔离执行环境
executor = self.SUPPORTED_LANGUAGES[language](
timeout=timeout,
memory_limit="128MB",
cpu_limit="1"
)
# 4. 执行代码
return executor.run(code, inputs)
任务管道事件驱动机制: 服务层通过事件驱动的任务管道实现复杂业务流程:
- BasedGenerateTaskPipeline:基础任务管道,提供流式输出和状态管理
- EasyUIBasedGenerateTaskPipeline:针对EasyUI应用优化的任务处理
- WorkflowBasedTaskPipeline:工作流专用的任务管道
Code Executor安全沙箱实现: Dify的Code Executor采用了多层安全隔离机制:
class SecureCodeExecutor:
"""
安全代码执行器实现
基于容器化和资源限制的多重安全防护
"""
def __init__(self):
self.security_policies = {
"python": PythonSecurityPolicy(),
"javascript": JavaScriptSecurityPolicy(),
"typescript": TypeScriptSecurityPolicy()
}
# 安全检查器
self.security_checker = CodeSecurityChecker()
# 资源监控器
self.resource_monitor = ResourceMonitor()
def execute_with_sandbox(
self,
language: str,
code: str,
inputs: dict,
timeout: int = 30
) -> CodeExecutionResult:
"""
沙箱化代码执行
Args:
language: 编程语言
code: 用户代码
inputs: 输入变量
timeout: 执行超时
Returns:
CodeExecutionResult: 执行结果
"""
# 1. 代码静态安全分析
security_report = self.security_checker.analyze_code(code, language)
if not security_report.is_safe:
raise UnsafeCodeError(f"代码安全检查失败: {security_report.violations}")
# 2. 创建隔离执行环境
sandbox = self._create_sandbox(language)
try:
# 3. 资源限制设置
self._apply_resource_limits(sandbox, {
"memory_limit": "128MB",
"cpu_limit": "100m",
"disk_limit": "10MB",
"network_access": False,
"file_system_access": "read-only"
})
# 4. 执行代码
with self.resource_monitor.monitor():
result = sandbox.execute(
code=code,
inputs=inputs,
timeout=timeout
)
# 5. 结果安全检查
sanitized_result = self._sanitize_output(result)
return CodeExecutionResult(
success=True,
output=sanitized_result,
execution_time=result.execution_time,
memory_usage=result.memory_usage,
security_violations=[]
)
except SecurityViolationError as e:
logger.error(f"代码执行安全违规: {e}")
return CodeExecutionResult(
success=False,
error=f"安全违规: {str(e)}",
security_violations=[str(e)]
)
finally:
# 6. 清理沙箱环境
sandbox.cleanup()
def _create_sandbox(self, language: str) -> CodeSandbox:
"""
创建语言特定的沙箱环境
"""
sandbox_configs = {
"python": {
"base_image": "python:3.11-alpine",
"allowed_imports": [
"json", "math", "datetime", "re", "uuid",
"pandas", "numpy", "requests"
],
"blocked_imports": [
"os", "sys", "subprocess", "socket",
"threading", "multiprocessing"
]
},
"javascript": {
"runtime": "node:18-alpine",
"allowed_modules": [
"lodash", "moment", "axios", "crypto"
],
"blocked_modules": [
"fs", "child_process", "cluster", "os"
]
}
}
config = sandbox_configs.get(language)
if not config:
raise UnsupportedLanguageError(f"不支持的语言: {language}")
return DockerSandbox(config)
class CodeSecurityChecker:
"""
代码安全检查器
基于AST分析和规则引擎的安全检查
"""
def __init__(self):
self.security_rules = self._load_security_rules()
self.ast_analyzer = ASTSecurityAnalyzer()
def analyze_code(self, code: str, language: str) -> SecurityReport:
"""
分析代码安全性
Args:
code: 源代码
language: 编程语言
Returns:
SecurityReport: 安全分析报告
"""
violations = []
# 1. AST分析
if language == "python":
violations.extend(self._analyze_python_ast(code))
elif language in ["javascript", "typescript"]:
violations.extend(self._analyze_js_ast(code))
# 2. 字符串模式匹配
violations.extend(self._pattern_based_analysis(code, language))
# 3. 动态分析(如果启用)
if self.security_rules.get("enable_dynamic_analysis", False):
violations.extend(self._dynamic_analysis(code, language))
return SecurityReport(
is_safe=len(violations) == 0,
violations=violations,
severity_score=self._calculate_severity_score(violations)
)
def _analyze_python_ast(self, code: str) -> list[SecurityViolation]:
"""Python AST安全分析"""
violations = []
try:
tree = ast.parse(code)
for node in ast.walk(tree):
# 检查危险函数调用
if isinstance(node, ast.Call):
if hasattr(node.func, 'id'):
func_name = node.func.id
if func_name in self.security_rules["python"]["blocked_functions"]:
violations.append(SecurityViolation(
type="dangerous_function_call",
message=f"禁止调用函数: {func_name}",
line_number=node.lineno
))
# 检查导入模块
elif isinstance(node, ast.Import):
for alias in node.names:
if alias.name in self.security_rules["python"]["blocked_imports"]:
violations.append(SecurityViolation(
type="blocked_import",
message=f"禁止导入模块: {alias.name}",
line_number=node.lineno
))
except SyntaxError as e:
violations.append(SecurityViolation(
type="syntax_error",
message=f"代码语法错误: {str(e)}",
line_number=e.lineno
))
return violations
企业级功能配置:
# 企业级部署优化配置
CELERY_WORKER_MAX_MEMORY_PER_CHILD = 200000 # 工作进程最大内存
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000 # 工作进程最大任务数
# API工具配置优化
API_TOOL_DEFAULT_CONNECT_TIMEOUT = 10 # 连接超时
API_TOOL_DEFAULT_READ_TIMEOUT = 60 # 读取超时
# 中国区部署优化
PIP_MIRROR_URL = "https://pypi.tuna.tsinghua.edu.cn/simple" # 使用清华镜像源
1. 服务层整体架构
1.1 领域服务分层
graph TB
subgraph "应用领域服务"
AppService[AppService 应用服务]
AppGenerateService[AppGenerateService 应用生成服务]
WorkflowService[WorkflowService 工作流服务]
ConversationService[ConversationService 对话服务]
MessageService[MessageService 消息服务]
end
subgraph "数据领域服务"
DatasetService[DatasetService 数据集服务]
KnowledgeService[KnowledgeService 知识服务]
VectorService[VectorService 向量服务]
FileService[FileService 文件服务]
MetadataService[MetadataService 元数据服务]
end
subgraph "用户领域服务"
AccountService[AccountService 账户服务]
WorkspaceService[WorkspaceService 工作空间服务]
AuthService[AuthService 认证服务]
PermissionService[PermissionService 权限服务]
end
subgraph "平台领域服务"
ModelProviderService[ModelProviderService 模型提供者服务]
ToolService[ToolService 工具服务]
PluginService[PluginService 插件服务]
FeatureService[FeatureService 功能服务]
end
subgraph "企业领域服务"
BillingService[BillingService 计费服务]
EnterpriseService[EnterpriseService 企业服务]
ComplianceService[ComplianceService 合规服务]
AuditService[AuditService 审计服务]
end
subgraph "支撑服务"
OperationService[OperationService 运维服务]
MonitoringService[MonitoringService 监控服务]
NotificationService[NotificationService 通知服务]
TaskService[TaskService 任务服务]
end
style AppService fill:#e3f2fd
style DatasetService fill:#e8f5e8
style AccountService fill:#fff3e0
style BillingService fill:#fce4ec
1.2 服务依赖关系
graph LR
subgraph "高层服务"
AppGenerateService
WorkflowService
ConversationService
end
subgraph "中层服务"
AppService
DatasetService
AccountService
ModelProviderService
end
subgraph "基础服务"
FileService
AuthService
NotificationService
TaskService
end
AppGenerateService --> AppService
AppGenerateService --> ModelProviderService
WorkflowService --> DatasetService
ConversationService --> MessageService
AppService --> AccountService
DatasetService --> FileService
AccountService --> AuthService
style AppGenerateService fill:#ffebee
style AppService fill:#e8f5e8
style FileService fill:#e3f2fd
2. 核心领域服务详解
2.1 AppService应用管理服务
class AppService:
"""
应用管理服务
负责应用的完整生命周期管理
"""
def create_app(self, tenant_id: str, args: dict, account: Account) -> App:
"""
创建应用
实现应用的初始化和配置
Args:
tenant_id: 租户ID
args: 创建参数
account: 创建者账户
Returns:
App: 创建的应用实例
"""
# 1. 验证应用模式
app_mode = AppMode.value_of(args["mode"])
app_template = default_app_templates[app_mode]
# 2. 获取默认模型配置
default_model_config = app_template.get("model_config", {}).copy()
if default_model_config and "model" in default_model_config:
try:
# 获取默认模型实例
model_manager = ModelManager()
model_instance = model_manager.get_default_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM
)
# 更新模型配置
default_model_config.update({
"provider": model_instance.provider,
"model": model_instance.model,
})
except Exception as e:
logger.exception("获取默认模型失败")
# 使用模板默认配置
pass
# 3. 创建应用实例
app = App(
id=str(uuid.uuid4()),
tenant_id=tenant_id,
name=args["name"],
mode=app_mode,
icon=args.get("icon"),
icon_background=args.get("icon_background"),
description=args.get("description", ""),
created_by=account.id,
updated_by=account.id,
created_at=naive_utc_now(),
updated_at=naive_utc_now()
)
# 4. 创建应用模型配置
app_model_config = AppModelConfig(
id=str(uuid.uuid4()),
app_id=app.id,
provider=default_model_config.get("provider"),
model_id=default_model_config.get("model"),
configs=default_model_config,
created_by=account.id,
updated_by=account.id
)
# 5. 保存到数据库
db.session.add(app)
db.session.add(app_model_config)
db.session.commit()
# 6. 触发应用创建事件
app_was_created.send(app, account=account)
return app
def get_paginate_apps(
self,
user_id: str,
tenant_id: str,
args: dict
) -> Optional[Pagination]:
"""
分页获取应用列表
支持多种过滤条件的应用查询
Args:
user_id: 用户ID
tenant_id: 租户ID
args: 查询参数
Returns:
Optional[Pagination]: 分页结果
"""
# 构建过滤条件
filters = [
App.tenant_id == tenant_id,
App.is_universal == False
]
# 应用模式过滤
mode_filters = {
"workflow": App.mode == AppMode.WORKFLOW,
"completion": App.mode == AppMode.COMPLETION,
"chat": App.mode == AppMode.CHAT,
"advanced-chat": App.mode == AppMode.ADVANCED_CHAT,
"agent-chat": App.mode == AppMode.AGENT_CHAT,
}
if args["mode"] in mode_filters:
filters.append(mode_filters[args["mode"]])
# 创建者过滤
if args.get("is_created_by_me", False):
filters.append(App.created_by == user_id)
# 名称模糊搜索
if args.get("name"):
name = args["name"][:30] # 限制搜索长度
filters.append(App.name.ilike(f"%{name}%"))
# 标签过滤
if args.get("tag_ids") and len(args["tag_ids"]) > 0:
target_ids = TagService.get_target_ids_by_tag_ids(
"app", tenant_id, args["tag_ids"]
)
if target_ids:
filters.append(App.id.in_(target_ids))
else:
return None
# 执行分页查询
return db.paginate(
db.select(App).where(*filters).order_by(App.created_at.desc()),
page=args["page"],
per_page=args["limit"],
error_out=False,
)
def update_app_model_config(
self,
app_id: str,
app_model_config: dict,
account: Account
) -> AppModelConfig:
"""
更新应用模型配置
Args:
app_id: 应用ID
app_model_config: 新的模型配置
account: 操作账户
Returns:
AppModelConfig: 更新后的配置
"""
# 验证配置
self._validate_model_config(app_model_config)
# 获取当前配置
current_config = AppModelConfig.query.filter_by(app_id=app_id).first()
if not current_config:
raise ValueError("应用配置不存在")
# 更新配置
current_config.configs = app_model_config
current_config.updated_by = account.id
current_config.updated_at = naive_utc_now()
db.session.commit()
# 触发配置更新事件
from events.app_event import app_model_config_was_updated
app_model_config_was_updated.send(current_config)
return current_config
def _validate_model_config(self, config: dict):
"""验证模型配置"""
required_fields = ["provider", "model"]
for field in required_fields:
if field not in config:
raise ValueError(f"模型配置缺少必填字段: {field}")
class DatasetService:
"""
数据集管理服务
负责知识库的完整生命周期管理
"""
def create_dataset(
self,
account: Account,
name: str,
data_source_type: str,
indexing_technique: str = "high_quality",
description: Optional[str] = None
) -> Dataset:
"""
创建数据集
Args:
account: 创建者账户
name: 数据集名称
data_source_type: 数据源类型
indexing_technique: 索引技术
description: 可选描述
Returns:
Dataset: 创建的数据集
"""
# 验证租户资源配额
if not self._check_dataset_quota(account.current_tenant_id):
raise QuotaExceededError("数据集配额已满")
# 创建数据集实例
dataset = Dataset(
id=str(uuid.uuid4()),
tenant_id=account.current_tenant_id,
name=name,
description=description or "",
data_source_type=data_source_type,
indexing_technique=indexing_technique,
created_by=account.id,
updated_by=account.id
)
# 保存到数据库
db.session.add(dataset)
db.session.commit()
# 触发数据集创建事件
from events.dataset_event import dataset_was_created
dataset_was_created.send(dataset, account=account)
return dataset
def process_document(
self,
dataset_id: str,
document_data: dict,
account: Account
) -> Document:
"""
处理文档
文档的提取、分割、向量化和索引
Args:
dataset_id: 数据集ID
document_data: 文档数据
account: 操作账户
Returns:
Document: 处理后的文档
"""
# 获取数据集
dataset = self.get_dataset(dataset_id)
if not dataset:
raise ValueError("数据集不存在")
# 创建文档处理任务
from tasks.document_indexing_task import document_indexing_task
task_id = str(uuid.uuid4())
# 异步处理文档
document_indexing_task.delay(
dataset_id=dataset_id,
document_data=document_data,
user_id=account.id,
task_id=task_id
)
return task_id
def _check_dataset_quota(self, tenant_id: str) -> bool:
"""检查数据集配额"""
if not dify_config.BILLING_ENABLED:
return True
return BillingService.check_resource_quota(
tenant_id=tenant_id,
resource_type="datasets"
)
class WorkflowService:
"""
工作流服务
负责工作流的设计、执行和管理
"""
def run_workflow(
self,
app_model: App,
workflow: Workflow,
user_inputs: dict,
user: Account,
invoke_from: InvokeFrom = InvokeFrom.DEBUGGER
) -> Generator:
"""
运行工作流
Args:
app_model: 应用模型
workflow: 工作流实例
user_inputs: 用户输入
user: 执行用户
invoke_from: 调用来源
Yields:
工作流执行事件
"""
# 创建工作流入口
workflow_entry = WorkflowEntry(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
workflow_id=workflow.id,
user_id=user.id,
invoke_from=invoke_from
)
# 执行工作流
try:
yield from workflow_entry.run(
inputs=user_inputs,
files=[]
)
except Exception as e:
logger.exception("工作流执行失败")
raise WorkflowExecutionError(f"工作流执行失败: {e}")
def save_draft_workflow(
self,
app_model: App,
graph_config: dict,
features_config: dict,
account: Account
) -> Workflow:
"""
保存草稿工作流
Args:
app_model: 应用模型
graph_config: 图配置
features_config: 功能配置
account: 操作账户
Returns:
Workflow: 保存的工作流
"""
# 验证图配置
self._validate_graph_config(graph_config)
# 获取或创建草稿工作流
draft_workflow = self._get_or_create_draft_workflow(app_model)
# 更新配置
draft_workflow.graph = graph_config
draft_workflow.features = features_config
draft_workflow.updated_by = account.id
draft_workflow.updated_at = naive_utc_now()
db.session.commit()
return draft_workflow
class AppGenerateService:
"""
应用生成服务
处理各种应用类型的内容生成
"""
@classmethod
def generate(
cls,
app_model: App,
user: Union[Account, EndUser],
args: dict,
invoke_from: InvokeFrom,
streaming: bool = True
) -> Generator:
"""
生成应用响应
根据应用类型调用相应的生成器
Args:
app_model: 应用模型
user: 用户实例
args: 生成参数
invoke_from: 调用来源
streaming: 是否流式输出
Yields:
生成过程事件
"""
# 频率限制检查
cls._check_rate_limit(user, app_model)
# 根据应用模式选择生成器
generator_mapping = {
AppMode.COMPLETION: CompletionAppGenerator,
AppMode.CHAT: ChatAppGenerator,
AppMode.AGENT_CHAT: AgentChatAppGenerator,
AppMode.WORKFLOW: WorkflowAppGenerator,
AppMode.ADVANCED_CHAT: AdvancedChatAppGenerator,
}
generator_class = generator_mapping.get(app_model.mode)
if not generator_class:
raise ValueError(f"不支持的应用模式: {app_model.mode}")
# 创建生成器实例
generator = generator_class(
app_model=app_model,
user=user,
invoke_from=invoke_from
)
# 执行生成
try:
yield from generator.generate(
inputs=args.get("inputs", {}),
query=args.get("query", ""),
files=args.get("files", []),
conversation_id=args.get("conversation_id"),
streaming=streaming
)
except Exception as e:
logger.exception("应用生成失败")
raise AppGenerateError(f"生成失败: {e}")
@classmethod
def _check_rate_limit(cls, user: Union[Account, EndUser], app_model: App):
"""检查频率限制"""
# 系统级频率限制
if not cls.system_rate_limiter.check_request_limit(user.id):
raise InvokeRateLimitError("系统调用频率超限")
# 应用级频率限制
app_rate_limit = RateLimit.from_app_config(app_model)
if app_rate_limit and not app_rate_limit.check_request_limit(user.id):
raise InvokeRateLimitError("应用调用频率超限")
class AccountService:
"""
账户管理服务
处理用户账户和租户的完整管理
"""
def create_account(
self,
email: str,
name: str,
password: str,
interface_language: str = "en-US",
timezone: str = "UTC"
) -> Account:
"""
创建用户账户
Args:
email: 邮箱地址
name: 用户名
password: 密码
interface_language: 界面语言
timezone: 时区
Returns:
Account: 创建的账户
"""
# 1. 验证邮箱是否已存在
if self._email_exists(email):
raise AccountRegisterError("邮箱地址已注册")
# 2. 验证密码强度
if not self._validate_password_strength(password):
raise AccountPasswordError("密码强度不足")
# 3. 创建账户
account = Account(
id=str(uuid.uuid4()),
email=email,
name=name,
password=hash_password(password),
interface_language=interface_language,
timezone=timezone,
status=AccountStatus.ACTIVE,
created_at=naive_utc_now()
)
# 4. 创建默认租户
tenant = self._create_default_tenant(account)
# 5. 建立账户租户关联
tenant_account_join = TenantAccountJoin(
tenant_id=tenant.id,
account_id=account.id,
role=TenantAccountRole.OWNER,
created_at=naive_utc_now()
)
# 6. 保存到数据库
db.session.add_all([account, tenant, tenant_account_join])
db.session.commit()
# 7. 触发账户创建事件
from events.tenant_event import tenant_was_created
tenant_was_created.send(tenant, account=account)
return account
def authenticate(self, email: str, password: str) -> Optional[Account]:
"""
用户认证
Args:
email: 邮箱
password: 密码
Returns:
Optional[Account]: 认证成功的账户
"""
# 查找账户
account = Account.query.filter_by(email=email).first()
if not account:
return None
# 验证密码
if not compare_password(password, account.password):
return None
# 检查账户状态
if account.status != AccountStatus.ACTIVE:
raise AccountLoginError("账户已被禁用")
# 更新最后登录时间
account.last_login_at = naive_utc_now()
db.session.commit()
return account
def _create_default_tenant(self, account: Account) -> Tenant:
"""创建默认租户"""
return Tenant(
id=str(uuid.uuid4()),
name=f"{account.name}'s workspace",
status=TenantStatus.NORMAL,
created_at=naive_utc_now()
)
class BillingService:
"""
计费管理服务
处理订阅、配额和计费相关功能
"""
@staticmethod
def check_resource_quota(tenant_id: str, resource_type: str) -> bool:
"""
检查资源配额
Args:
tenant_id: 租户ID
resource_type: 资源类型
Returns:
bool: 是否有剩余配额
"""
if not dify_config.BILLING_ENABLED:
return True
try:
# 获取租户订阅信息
subscription = cls._get_tenant_subscription(tenant_id)
if not subscription:
return False
# 获取当前使用量
current_usage = cls._get_resource_usage(tenant_id, resource_type)
# 获取配额限制
quota_limit = subscription.get_quota_limit(resource_type)
return current_usage < quota_limit
except Exception as e:
logger.exception("配额检查失败")
return False
@staticmethod
def record_usage(
tenant_id: str,
resource_type: str,
usage_amount: int,
metadata: Optional[dict] = None
):
"""
记录资源使用
Args:
tenant_id: 租户ID
resource_type: 资源类型
usage_amount: 使用量
metadata: 可选元数据
"""
if not dify_config.BILLING_ENABLED:
return
try:
# 记录使用量到计费系统
billing_record = BillingRecord(
tenant_id=tenant_id,
resource_type=resource_type,
usage_amount=usage_amount,
metadata=metadata or {},
recorded_at=naive_utc_now()
)
db.session.add(billing_record)
db.session.commit()
except Exception as e:
logger.exception("使用量记录失败")
class FeatureService:
"""
功能特性服务
管理系统和租户级别的功能开关
"""
@classmethod
def get_features(cls, tenant_id: str) -> FeatureModel:
"""
获取功能特性配置
Args:
tenant_id: 租户ID
Returns:
FeatureModel: 功能特性配置
"""
features = FeatureModel()
# 从环境变量填充基础功能
cls._fulfill_params_from_env(features)
# 如果启用计费,从计费API获取功能配置
if dify_config.BILLING_ENABLED and tenant_id:
cls._fulfill_params_from_billing_api(features, tenant_id)
# 如果启用企业版,应用企业功能
if dify_config.ENTERPRISE_ENABLED:
features.webapp_copyright_enabled = True
cls._fulfill_params_from_workspace_info(features, tenant_id)
return features
@classmethod
def _fulfill_params_from_env(cls, features: FeatureModel):
"""从环境变量填充功能参数"""
features.can_replace_logo = dify_config.CAN_REPLACE_LOGO
features.model_load_balancing_enabled = dify_config.MODEL_LB_ENABLED
features.dataset_operator_enabled = dify_config.DATASET_OPERATOR_ENABLED
features.member_invite_enabled = dify_config.INVITE_EXPIRY_HOURS > 0
3. 服务组合模式
3.1 服务编排与协调
class ServiceOrchestrator:
"""
服务编排器
协调多个服务之间的复杂业务流程
"""
def __init__(self):
"""初始化服务编排器"""
self.app_service = AppService()
self.dataset_service = DatasetService()
self.workflow_service = WorkflowService()
self.account_service = AccountService()
self.billing_service = BillingService()
def create_complete_app(
self,
tenant_id: str,
app_config: dict,
dataset_configs: list[dict],
workflow_config: Optional[dict],
account: Account
) -> dict:
"""
创建完整应用
包含应用、数据集、工作流的一站式创建
Args:
tenant_id: 租户ID
app_config: 应用配置
dataset_configs: 数据集配置列表
workflow_config: 可选的工作流配置
account: 创建者账户
Returns:
dict: 创建结果
"""
created_resources = {
"app": None,
"datasets": [],
"workflow": None,
"errors": []
}
try:
# 1. 检查资源配额
self._validate_resource_quotas(tenant_id, app_config, dataset_configs)
# 2. 创建应用
app = self.app_service.create_app(
tenant_id=tenant_id,
args=app_config,
account=account
)
created_resources["app"] = app
# 3. 创建数据集
for dataset_config in dataset_configs:
try:
dataset = self.dataset_service.create_dataset(
account=account,
**dataset_config
)
created_resources["datasets"].append(dataset)
except Exception as e:
created_resources["errors"].append({
"type": "dataset_creation",
"config": dataset_config,
"error": str(e)
})
# 4. 创建工作流(如果提供)
if workflow_config:
try:
workflow = self.workflow_service.create_workflow(
app_id=app.id,
graph_config=workflow_config,
account=account
)
created_resources["workflow"] = workflow
except Exception as e:
created_resources["errors"].append({
"type": "workflow_creation",
"error": str(e)
})
# 5. 记录资源使用
self.billing_service.record_usage(
tenant_id=tenant_id,
resource_type="apps",
usage_amount=1,
metadata={"app_id": app.id}
)
return created_resources
except Exception as e:
# 清理已创建的资源
self._cleanup_created_resources(created_resources)
raise e
def _validate_resource_quotas(
self,
tenant_id: str,
app_config: dict,
dataset_configs: list[dict]
):
"""验证资源配额"""
# 检查应用配额
if not self.billing_service.check_resource_quota(tenant_id, "apps"):
raise QuotaExceededError("应用配额已满")
# 检查数据集配额
required_datasets = len(dataset_configs)
if required_datasets > 0:
for _ in range(required_datasets):
if not self.billing_service.check_resource_quota(tenant_id, "datasets"):
raise QuotaExceededError("数据集配额不足")
def _cleanup_created_resources(self, resources: dict):
"""清理创建的资源"""
try:
# 删除创建的应用
if resources["app"]:
db.session.delete(resources["app"])
# 删除创建的数据集
for dataset in resources["datasets"]:
db.session.delete(dataset)
# 删除创建的工作流
if resources["workflow"]:
db.session.delete(resources["workflow"])
db.session.commit()
except Exception as e:
logger.exception("资源清理失败")
class TaskService:
"""
任务管理服务
处理异步任务的调度和监控
"""
def __init__(self):
"""初始化任务服务"""
self.task_registry = {}
self.task_status_cache = {}
def submit_async_task(
self,
task_type: str,
task_params: dict,
user_id: str,
priority: int = 0
) -> str:
"""
提交异步任务
Args:
task_type: 任务类型
task_params: 任务参数
user_id: 用户ID
priority: 优先级
Returns:
str: 任务ID
"""
task_id = str(uuid.uuid4())
# 创建任务记录
task_record = {
"task_id": task_id,
"task_type": task_type,
"params": task_params,
"user_id": user_id,
"priority": priority,
"status": "pending",
"created_at": time.time()
}
# 根据任务类型分发到相应队列
task_mapping = {
"document_indexing": self._submit_document_indexing_task,
"model_fine_tuning": self._submit_model_fine_tuning_task,
"dataset_export": self._submit_dataset_export_task,
"app_migration": self._submit_app_migration_task,
}
task_handler = task_mapping.get(task_type)
if not task_handler:
raise ValueError(f"不支持的任务类型: {task_type}")
# 提交任务
task_handler(task_record)
# 缓存任务状态
self.task_status_cache[task_id] = task_record
return task_id
def get_task_status(self, task_id: str) -> Optional[dict]:
"""
获取任务状态
Args:
task_id: 任务ID
Returns:
Optional[dict]: 任务状态信息
"""
# 先从缓存获取
if task_id in self.task_status_cache:
return self.task_status_cache[task_id]
# 从数据库获取
task_record = TaskRecord.query.filter_by(task_id=task_id).first()
if task_record:
return task_record.to_dict()
return None
def _submit_document_indexing_task(self, task_record: dict):
"""提交文档索引任务"""
from tasks.document_indexing_task import document_indexing_task
document_indexing_task.delay(
task_id=task_record["task_id"],
**task_record["params"]
)
def _submit_model_fine_tuning_task(self, task_record: dict):
"""提交模型微调任务"""
from tasks.model_fine_tuning_task import model_fine_tuning_task
model_fine_tuning_task.delay(
task_id=task_record["task_id"],
**task_record["params"]
)
4. 服务层设计模式
4.1 事务管理模式
class TransactionalService:
"""
事务性服务基类
提供数据库事务管理和回滚机制
"""
def __init__(self):
self.db_session = db.session
def execute_with_transaction(self, operations: list[Callable]):
"""
在事务中执行操作列表
Args:
operations: 操作函数列表
"""
try:
self.db_session.begin()
for operation in operations:
operation()
self.db_session.commit()
except Exception as e:
self.db_session.rollback()
logger.exception("事务执行失败")
raise e
def rollback_safe_operation(self, operation: Callable, rollback_operation: Callable):
"""
安全操作(支持回滚)
Args:
operation: 主操作
rollback_operation: 回滚操作
"""
try:
return operation()
except Exception as e:
try:
rollback_operation()
except Exception as rollback_error:
logger.exception("回滚操作失败")
raise e
class CacheableService:
"""
可缓存服务基类
提供服务级别的缓存机制
"""
def __init__(self, cache_prefix: str, default_ttl: int = 300):
self.cache_prefix = cache_prefix
self.default_ttl = default_ttl
self.redis_client = redis_client
def get_cached_result(
self,
cache_key: str,
fetch_function: Callable,
ttl: Optional[int] = None
):
"""
获取缓存结果
Args:
cache_key: 缓存键
fetch_function: 数据获取函数
ttl: 可选的TTL
Returns:
缓存或新获取的结果
"""
full_key = f"{self.cache_prefix}:{cache_key}"
# 尝试从缓存获取
cached_data = self.redis_client.get(full_key)
if cached_data:
try:
return json.loads(cached_data)
except json.JSONDecodeError:
pass
# 缓存未命中,获取新数据
result = fetch_function()
# 写入缓存
try:
self.redis_client.setex(
full_key,
ttl or self.default_ttl,
json.dumps(result, ensure_ascii=False, default=str)
)
except Exception as e:
logger.warning(f"缓存写入失败: {e}")
return result
def invalidate_cache(self, cache_key: str):
"""失效缓存"""
full_key = f"{self.cache_prefix}:{cache_key}"
self.redis_client.delete(full_key)
5. 企业级功能与安全架构
5.1 多租户架构深度实现
根据企业级部署实践,Dify的多租户架构具有以下核心特点:
class EnterpriseMultiTenancyService:
"""
企业级多租户服务
通过...实现
"""
def __init__(self):
self.tenant_isolation_strategies = {
"数据隔离": "基于tenant_id的行级安全(RLS)",
"资源隔离": "基于容器和命名空间的物理隔离",
"网络隔离": "基于VLAN和安全组的网络隔离",
"存储隔离": "基于对象存储前缀的逻辑隔离"
}
def create_tenant_workspace(
self,
tenant_config: TenantConfig,
admin_account: Account
) -> TenantWorkspace:
"""
创建租户工作空间
包含完整的资源初始化和安全配置
"""
# 1. 创建租户实例
tenant = self._create_tenant_instance(tenant_config)
# 2. 初始化租户数据库Schema
self._initialize_tenant_schema(tenant.id)
# 3. 配置租户专用资源
self._setup_tenant_resources(tenant.id, tenant_config.resource_quota)
# 4. 创建管理员账户
admin = self._create_tenant_admin(tenant.id, admin_account)
# 5. 应用安全策略
self._apply_tenant_security_policies(tenant.id, tenant_config.security_config)
# 6. 启用监控和审计
self._enable_tenant_monitoring(tenant.id)
return TenantWorkspace(
tenant=tenant,
admin=admin,
resources=self._get_tenant_resources(tenant.id)
)
# 企业级权限控制矩阵
ENTERPRISE_RBAC_MATRIX = {
"系统级角色": {
"super_admin": {
"权限": ["系统配置", "租户管理", "全局监控", "安全审计"],
"限制": ["数据访问需审批", "操作全程记录"],
"应用场景": "系统维护和紧急响应"
},
"platform_admin": {
"权限": ["平台功能配置", "用户支持", "性能监控"],
"限制": ["无租户数据访问权", "操作需要审批"],
"应用场景": "平台日常运维和用户支持"
}
},
"租户级角色": {
"tenant_owner": {
"权限": ["租户全权管理", "计费管理", "成员管理", "应用管理"],
"限制": ["不能删除自身", "关键操作需要验证"],
"应用场景": "企业管理员或决策者"
},
"tenant_admin": {
"权限": ["应用管理", "数据集管理", "成员管理"],
"限制": ["无计费权限", "无系统配置权"],
"应用场景": "技术负责人或项目经理"
},
"developer": {
"权限": ["应用开发", "调试测试", "数据集操作"],
"限制": ["无生产环境权限", "无成员管理权"],
"应用场景": "开发工程师"
},
"viewer": {
"权限": ["只读访问", "基础使用"],
"限制": ["无编辑权限", "无配置权限"],
"应用场景": "业务用户和观察者"
}
},
"应用级角色": {
"app_owner": "应用完全控制权",
"app_editor": "应用编辑权限",
"app_viewer": "应用只读权限",
"app_user": "应用使用权限"
}
}
class EnterpriseSecurityAuditService:
"""
企业级安全审计服务
满足SOC2、ISO27001等合规要求
"""
def __init__(self):
self.audit_categories = {
"用户行为审计": "登录、操作、配置变更等用户行为",
"数据访问审计": "数据查询、修改、导出等数据操作",
"系统事件审计": "系统启动、配置变更、错误事件等",
"API调用审计": "外部API调用、模型调用、工具使用等",
"安全事件审计": "认证失败、权限越界、异常访问等"
}
def log_security_event(
self,
event_type: str,
user_id: str,
tenant_id: str,
event_details: dict,
risk_level: str = "low"
):
"""
记录安全事件
支持实时监控和合规报告
"""
audit_record = SecurityAuditRecord(
event_id=str(uuid.uuid4()),
event_type=event_type,
user_id=user_id,
tenant_id=tenant_id,
timestamp=datetime.utcnow(),
event_details=event_details,
risk_level=risk_level,
ip_address=self._get_client_ip(),
user_agent=self._get_user_agent(),
session_id=self._get_session_id()
)
# 1. 持久化审计记录
self._persist_audit_record(audit_record)
# 2. 实时风险评估
if risk_level in ["high", "critical"]:
self._trigger_security_alert(audit_record)
# 3. 合规报告更新
self._update_compliance_metrics(audit_record)
# 数据安全与隐私保护
class DataPrivacyProtectionService:
"""
数据隐私保护服务
符合GDPR、CCPA等隐私法规要求
"""
def __init__(self):
self.protection_strategies = {
"数据分类": "基于敏感级别的自动数据分类",
"数据脱敏": "PII数据的自动识别和脱敏处理",
"访问控制": "基于最小权限原则的访问控制",
"数据生命周期": "自动化的数据保留和删除策略"
}
def classify_sensitive_data(self, content: str) -> dict:
"""
敏感数据自动分类
识别和标记PII、PHI、PCI等敏感信息
"""
classification_result = {
"pii_detected": False,
"pii_types": [],
"confidence_score": 0.0,
"suggested_actions": []
}
# 使用正则表达式和ML模型识别敏感数据
pii_patterns = {
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"phone": r'\b\d{3}-\d{3}-\d{4}\b|\b\d{11}\b',
"id_card": r'\b\d{17}[\dXx]\b',
"credit_card": r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'
}
for pii_type, pattern in pii_patterns.items():
matches = re.findall(pattern, content)
if matches:
classification_result["pii_detected"] = True
classification_result["pii_types"].append(pii_type)
classification_result["suggested_actions"].append(f"脱敏{pii_type}信息")
return classification_result
def apply_data_masking(self, content: str, masking_rules: dict) -> str:
"""
应用数据脱敏规则
根据数据分类结果进行自动脱敏
"""
masked_content = content
for pii_type, masking_strategy in masking_rules.items():
if masking_strategy == "full_mask":
# 完全脱敏
masked_content = re.sub(
self._get_pattern(pii_type),
"[REDACTED]",
masked_content
)
elif masking_strategy == "partial_mask":
# 部分脱敏
masked_content = self._apply_partial_masking(masked_content, pii_type)
return masked_content
实战案例:基于Dify构建股票分析助手: 根据网络技术文章的实践案例,展示了Dify在金融领域的应用潜力:
class StockAnalysisAssistantCase:
"""
股票分析助手实战案例
展示Dify在金融领域的应用实践
"""
def __init__(self):
"""
基于Dify构建的股票分析助手
技术栈:Dify + Agent + Google工具 + Alphavantage API
"""
self.application_config = {
"app_type": "Agent应用",
"model_config": {
"provider": "huawei_cloud_maas", # 华为云MaaS
"model": "DeepSeek_R1", # DeepSeek R1模型
"temperature": 0.1, # 低温度保证分析精确性
"max_tokens": 2048
},
"tools_config": [
{
"name": "Google搜索",
"purpose": "获取实时股票新闻和市场信息",
"configuration": "启用实时搜索API"
},
{
"name": "Alphavantage",
"purpose": "获取股票基础数据和技术指标",
"configuration": "配置API Key和数据源"
}
],
"variables": [
{
"name": "stock_code",
"type": "string",
"description": "股票代码,如AAPL、TSLA等",
"required": True
},
{
"name": "byapi_key",
"type": "string",
"description": "必盈数据API密钥",
"required": True
}
]
}
def generate_analysis_prompt(self) -> str:
"""
生成股票分析的专业提示词
基于金融专业知识和分析框架
"""
return """
你是一个专业的股票分析师,具备深厚的金融市场知识和数据分析能力。
请按照以下步骤对股票 {{stock_code}} 进行全面分析:
1. **数据收集阶段**:
- 使用Alphavantage工具获取股票的基础数据(价格、成交量、技术指标)
- 使用Google搜索获取最新的相关新闻和市场动态
- 收集公司财务报表和经营状况信息
2. **技术分析阶段**:
- 分析股票的K线图表和技术指标(MA、MACD、RSI等)
- 评估支撑位和阻力位
- 判断当前技术形态和趋势方向
3. **基本面分析阶段**:
- 分析公司财务数据(营收、利润、负债等)
- 评估行业地位和竞争优势
- 关注政策环境和市场预期
4. **综合评估阶段**:
- 结合技术面和基本面给出综合评分
- 提供明确的投资建议(买入/持有/卖出)
- 标注关键风险点和机会点
5. **报告生成**:
- 生成结构化的分析报告
- 包含图表说明和数据支撑
- 提供投资时间框架建议
注意事项:
- 所有分析必须基于客观数据
- 投资建议需要包含风险提示
- 保持专业性和中立性
- 数据来源需要标注清楚
请开始分析股票代码:{{stock_code}}
"""
def implementation_insights(self) -> dict:
"""
实施洞察和经验总结
"""
return {
"关键成功因素": {
"提示词设计": {
"重要性": "直接影响输出质量和专业性",
"优化策略": "多次测试和修改,基于实际效果调优",
"最佳实践": "结构化提示 + 专业术语 + 明确步骤"
},
"工具集成": {
"重要性": "数据获取的准确性和时效性",
"配置要点": "API密钥管理、调用频率控制、错误处理",
"最佳实践": "多数据源验证 + 备用API配置"
},
"模型选择": {
"重要性": "分析质量和推理能力",
"选择策略": "金融领域优选逻辑推理能力强的模型",
"最佳实践": "DeepSeek、GPT-4等高推理能力模型"
}
},
"性能优化经验": {
"响应时间优化": "平均分析时间控制在30-60秒",
"准确性提升": "通过多数据源交叉验证提升准确性",
"成本控制": "合理配置API调用频率和模型参数",
"用户体验": "流式输出让用户实时看到分析进展"
},
"部署建议": {
"环境要求": "4核8GB内存起步,支持Docker部署",
"安全考虑": "API密钥加密存储,访问日志审计",
"扩展性": "支持多用户并发,可横向扩展",
"监控告警": "API调用失败、响应时间异常的实时告警"
}
}
# 金融应用场景的Dify配置最佳实践
FINANCIAL_APPLICATION_CONFIG = {
"风险控制配置": {
"rate_limiting": "每用户每分钟最多5次分析请求",
"data_validation": "严格的股票代码格式验证",
"error_handling": "API调用失败的优雅降级处理",
"audit_logging": "完整的用户操作和分析结果记录"
},
"性能优化配置": {
"cache_strategy": "分析结果缓存30分钟,减少重复API调用",
"parallel_processing": "并行调用多个数据源API",
"timeout_management": "设置合理的API调用超时时间",
"resource_pooling": "数据库连接池和HTTP连接复用"
},
"合规性配置": {
"data_privacy": "用户输入的股票代码不记录到日志",
"disclaimer": "所有分析结果需要包含投资风险提示",
"data_retention": "分析历史数据保留策略(如90天)",
"access_control": "基于用户角色的功能访问控制"
}
}
5.3 事务与缓存一致性、异步幂等设计
一致性模式:
- 本地事务 + 事件外置(事务内写库,事务后发事件,带重试与去重)
- 读写分离下的缓存一致性:写入后先删缓存(或标记失效),读走DB回填
- 强一致热点:写时加版本号/逻辑时钟,CAS更新,失败重试
class Idempotency:
def acquire(self, key: str, ttl: int = 600) -> bool:
# SETNX idem:key true EX ttl
return redis_setnx(f"idem:{key}", "1", ttl=ttl)
def cache_invalidate_then_write(key: str, write: Callable[[], None]):
cache.delete(key)
try:
write()
finally:
pass # 读路径负责回填
异步任务幂等Key:hash(tenant_id, task_type, biz_id, payload_digest)
- 投递前先
SETNX
;消费者处理完成后删除Key;超时自动过期。 - 重试策略:指数退避 + 最大重试次数 + 死信队列(含诊断上下文)。
6. 总结与设计原则
6.1 服务层核心特点
- 领域驱动:按业务领域组织服务
- 单一职责:每个服务专注特定业务
- 事务管理:完善的事务和回滚机制
- 缓存优化:多层缓存提升性能
- 异步处理:耗时操作异步化
- 企业就绪:完整的多租户、审计、安全机制
- 合规支持:满足国际和国内法规要求
5.2 设计原则
- 高内聚低耦合:服务内聚业务逻辑,减少服务间依赖
- 接口优先:定义清晰的服务接口
- 异常透明:完善的异常处理和传播
- 可测试性:支持单元测试和集成测试
- 可扩展性:便于添加新的业务功能
7. 关键函数核心代码与说明(精简摘录)
class AppGenerateService:
@classmethod
def generate(cls, app_model: App, user: Account | EndUser, args: dict, invoke_from: InvokeFrom, streaming: bool = True) -> Generator:
"""频控校验→按应用模式选择生成器→生成事件流;异常抛出语义化错误。"""
class AppService:
def create_app(self, tenant_id: str, args: dict, account: Account) -> App:
"""按模板初始化配置→写入DB→触发创建事件;默认模型配置可回退。"""
class DatasetService:
def process_document(self, dataset_id: str, document_data: dict, account: Account) -> str:
"""创建索引任务并异步投递;返回任务ID。"""
class WorkflowService:
def run_workflow(self, app_model: App, workflow: Workflow, user_inputs: dict, user: Account, invoke_from: InvokeFrom = InvokeFrom.DEBUGGER) -> Generator:
"""封装 `WorkflowEntry` 执行并产出事件流;异常包装为 `WorkflowExecutionError`。"""
class TaskService:
def submit_async_task(self, task_type: str, task_params: dict, user_id: str, priority: int = 0) -> str:
"""生成任务ID→分发到对应 Celery 任务→缓存状态;不阻塞业务线程。"""
7.1 要点说明
- 领域服务:围绕应用/数据集/工作流等边界组织;接口清晰。
- 事件驱动:创建/更新通过领域事件与异步任务解耦。
- 失败路径:语义化异常,便于 API 层归一化响应。
8. 关键函数调用链(按职责)
对话/生成(Controller)
→ AppGenerateService.generate
→ 频控校验
→ 选择 *AppGenerator (Chat/Completion/Agent/Workflow)
→ generator.generate(..., streaming)
应用创建(Console/Service)
→ AppService.create_app → DB 写入 → app_was_created 事件
知识入库
→ DatasetService.process_document → Celery(document_indexing_task)
工作流执行
→ WorkflowService.run_workflow → WorkflowEntry.run → 事件流
9. 统一时序图(精简版)
sequenceDiagram
participant API as API层
participant S as Service层
participant DB as 数据库
participant Q as 任务队列
API->>S: AppGenerateService.generate
S-->>API: 事件(流)
API->>S: AppService.create_app
S->>DB: insert(app, config)
S-->>API: app
S-->>Q: app_was_created
API->>S: DatasetService.process_document
S-->>Q: document_indexing_task
Q-->>S: ack
10. 关键结构与继承关系(类图)
classDiagram
class AppService
class DatasetService
class WorkflowService
class AppGenerateService
class TaskService
class ServiceOrchestrator
ServiceOrchestrator --> AppService
ServiceOrchestrator --> DatasetService
ServiceOrchestrator --> WorkflowService
ServiceOrchestrator --> AppGenerateService
ServiceOrchestrator --> TaskService