概述 Dify的服务层模块(services/)是平台的业务逻辑核心,采用领域驱动设计(DDD)实现了完整的业务功能。该模块包含20+个核心服务,涵盖应用管理、数据集处理、工作流执行、用户账户、计费系统等各个业务领域,为上层API提供了丰富的业务能力支撑。
Code Executor安全执行引擎: Dify实现了安全的代码执行器(Code Executor),支持多种编程语言的安全执行:
class CodeExecutor: """ 安全代码执行器 支持Python、Node.js、Go等多种语言的沙箱执行 """ SUPPORTED_LANGUAGES = { "python": PythonCodeExecutor, "javascript": NodeJSCodeExecutor, "typescript": NodeJSCodeExecutor, "go": GoCodeExecutor } def execute_code( self, language: str, code: str, inputs: dict, timeout: int = 30 ) -> CodeExecutionResult: """ 安全执行用户代码 Args: language: 编程语言 code: 待执行的代码 inputs: 输入变量 timeout: 执行超时时间 Returns: CodeExecutionResult: 执行结果 """ # 1. 语言支持检查 if language not in self.SUPPORTED_LANGUAGES: raise UnsupportedLanguageError(f"不支持的语言: {language}") # 2. 代码安全检查 security_checker = CodeSecurityChecker() if not security_checker.is_safe(code, language): raise UnsafeCodeError("代码包含不安全的操作") # 3. 创建隔离执行环境 executor = self.SUPPORTED_LANGUAGES[language]( timeout=timeout, memory_limit="128MB", cpu_limit="1" ) # 4. 执行代码 return executor.run(code, inputs) 任务管道事件驱动机制: 服务层通过事件驱动的任务管道实现复杂业务流程:
BasedGenerateTaskPipeline:基础任务管道,提供流式输出和状态管理 EasyUIBasedGenerateTaskPipeline:针对EasyUI应用优化的任务处理 WorkflowBasedTaskPipeline:工作流专用的任务管道 Code Executor安全沙箱实现: Dify的Code Executor采用了多层安全隔离机制:
class SecureCodeExecutor: """ 安全代码执行器实现 基于容器化和资源限制的多重安全防护 """ def __init__(self): self.security_policies = { "python": PythonSecurityPolicy(), "javascript": JavaScriptSecurityPolicy(), "typescript": TypeScriptSecurityPolicy() } # 安全检查器 self.security_checker = CodeSecurityChecker() # 资源监控器 self.resource_monitor = ResourceMonitor() def execute_with_sandbox( self, language: str, code: str, inputs: dict, timeout: int = 30 ) -> CodeExecutionResult: """ 沙箱化代码执行 Args: language: 编程语言 code: 用户代码 inputs: 输入变量 timeout: 执行超时 Returns: CodeExecutionResult: 执行结果 """ # 1. 代码静态安全分析 security_report = self.security_checker.analyze_code(code, language) if not security_report.is_safe: raise UnsafeCodeError(f"代码安全检查失败: {security_report.violations}") # 2. 创建隔离执行环境 sandbox = self._create_sandbox(language) try: # 3. 资源限制设置 self._apply_resource_limits(sandbox, { "memory_limit": "128MB", "cpu_limit": "100m", "disk_limit": "10MB", "network_access": False, "file_system_access": "read-only" }) # 4. 执行代码 with self.resource_monitor.monitor(): result = sandbox.execute( code=code, inputs=inputs, timeout=timeout ) # 5. 结果安全检查 sanitized_result = self._sanitize_output(result) return CodeExecutionResult( success=True, output=sanitized_result, execution_time=result.execution_time, memory_usage=result.memory_usage, security_violations=[] ) except SecurityViolationError as e: logger.error(f"代码执行安全违规: {e}") return CodeExecutionResult( success=False, error=f"安全违规: {str(e)}", security_violations=[str(e)] ) finally: # 6. 清理沙箱环境 sandbox.cleanup() def _create_sandbox(self, language: str) -> CodeSandbox: """ 创建语言特定的沙箱环境 """ sandbox_configs = { "python": { "base_image": "python:3.11-alpine", "allowed_imports": [ "json", "math", "datetime", "re", "uuid", "pandas", "numpy", "requests" ], "blocked_imports": [ "os", "sys", "subprocess", "socket", "threading", "multiprocessing" ] }, "javascript": { "runtime": "node:18-alpine", "allowed_modules": [ "lodash", "moment", "axios", "crypto" ], "blocked_modules": [ "fs", "child_process", "cluster", "os" ] } } config = sandbox_configs.get(language) if not config: raise UnsupportedLanguageError(f"不支持的语言: {language}") return DockerSandbox(config) class CodeSecurityChecker: """ 代码安全检查器 基于AST分析和规则引擎的安全检查 """ def __init__(self): self.security_rules = self._load_security_rules() self.ast_analyzer = ASTSecurityAnalyzer() def analyze_code(self, code: str, language: str) -> SecurityReport: """ 分析代码安全性 Args: code: 源代码 language: 编程语言 Returns: SecurityReport: 安全分析报告 """ violations = [] # 1. AST分析 if language == "python": violations.extend(self._analyze_python_ast(code)) elif language in ["javascript", "typescript"]: violations.extend(self._analyze_js_ast(code)) # 2. 字符串模式匹配 violations.extend(self._pattern_based_analysis(code, language)) # 3. 动态分析(如果启用) if self.security_rules.get("enable_dynamic_analysis", False): violations.extend(self._dynamic_analysis(code, language)) return SecurityReport( is_safe=len(violations) == 0, violations=violations, severity_score=self._calculate_severity_score(violations) ) def _analyze_python_ast(self, code: str) -> list[SecurityViolation]: """Python AST安全分析""" violations = [] try: tree = ast.parse(code) for node in ast.walk(tree): # 检查危险函数调用 if isinstance(node, ast.Call): if hasattr(node.func, 'id'): func_name = node.func.id if func_name in self.security_rules["python"]["blocked_functions"]: violations.append(SecurityViolation( type="dangerous_function_call", message=f"禁止调用函数: {func_name}", line_number=node.lineno )) # 检查导入模块 elif isinstance(node, ast.Import): for alias in node.names: if alias.name in self.security_rules["python"]["blocked_imports"]: violations.append(SecurityViolation( type="blocked_import", message=f"禁止导入模块: {alias.name}", line_number=node.lineno )) except SyntaxError as e: violations.append(SecurityViolation( type="syntax_error", message=f"代码语法错误: {str(e)}", line_number=e.lineno )) return violations 企业级功能配置:
# 企业级部署优化配置 CELERY_WORKER_MAX_MEMORY_PER_CHILD = 200000 # 工作进程最大内存 CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000 # 工作进程最大任务数 # API工具配置优化 API_TOOL_DEFAULT_CONNECT_TIMEOUT = 10 # 连接超时 API_TOOL_DEFAULT_READ_TIMEOUT = 60 # 读取超时 # 中国区部署优化 PIP_MIRROR_URL = "https://pypi.tuna.tsinghua.edu.cn/simple" # 使用清华镜像源 ...