概述
本文档汇总了Dify平台开发和运维的最佳实践,通过实战案例、性能优化技巧和开发规范,帮助开发者构建高质量的AI应用。
1. 应用开发最佳实践
1.1 聊天应用开发实战
案例:构建智能客服系统
# 智能客服应用配置示例
class CustomerServiceApp:
"""智能客服应用最佳实践"""
def __init__(self):
self.app_config = {
"mode": "chat",
"model_config": {
"provider": "openai",
"model": "gpt-4",
"parameters": {
"temperature": 0.1, # 降低随机性,提高一致性
"max_tokens": 1000,
"top_p": 0.9,
"presence_penalty": 0.1,
"frequency_penalty": 0.1
}
},
"prompt_template": {
"system_message": """你是一个专业的客服助手,请遵循以下原则:
1. 保持礼貌和专业的语调
2. 准确理解用户问题并提供有用的解答
3. 如果不确定答案,诚实说明并提供替代方案
4. 优先使用知识库中的信息回答问题
5. 对于复杂问题,引导用户联系人工客服
当前时间:{{#sys.datetime#}}
用户信息:{{#sys.user_name#}}""",
"user_input_form": [
{
"variable": "query",
"label": "用户问题",
"type": "text-input",
"required": True,
"max_length": 500
}
]
},
"dataset_configs": {
"retrieval_model": "vector",
"top_k": 5,
"score_threshold": 0.7,
"reranking_enable": True,
"reranking_model": {
"provider": "cohere",
"model": "rerank-multilingual-v2.0"
}
},
"conversation_variables": [
{
"variable": "user_level",
"name": "用户等级",
"description": "VIP/普通用户标识"
},
{
"variable": "issue_category",
"name": "问题类别",
"description": "技术/账单/产品问题分类"
}
]
}
def optimize_for_performance(self):
"""性能优化配置"""
return {
# 启用流式响应
"stream": True,
# 配置缓存策略
"cache_config": {
"enable_cache": True,
"cache_ttl": 3600, # 1小时
"cache_key_template": "cs_{user_id}_{query_hash}"
},
# 并发控制
"rate_limit": {
"requests_per_minute": 60,
"requests_per_hour": 1000
},
# 超时设置
"timeout": {
"llm_timeout": 30,
"retrieval_timeout": 10,
"total_timeout": 45
}
}
最佳实践要点
提示词设计:
- 明确角色定位和行为准则
- 使用系统变量增强上下文感知
- 设置合理的输入验证和长度限制
模型参数调优:
- 客服场景使用低temperature(0.1-0.3)保证一致性
- 适当的max_tokens避免回复过长
- 使用presence_penalty减少重复内容
知识库配置:
- 设置合适的score_threshold过滤低质量结果
- 启用重排模型提高检索精度
- 定期更新和维护知识库内容
1.2 工作流应用开发实战
案例:文档分析工作流
# 文档分析工作流配置
workflow_config:
name: "智能文档分析工作流"
description: "自动分析上传文档并生成摘要和关键信息"
nodes:
# 起始节点
- id: "start"
type: "start"
data:
title: "开始"
variables:
- variable: "document_file"
type: "file"
required: true
description: "待分析的文档文件"
- variable: "analysis_type"
type: "select"
required: true
options: ["summary", "key_points", "full_analysis"]
description: "分析类型"
# 文档解析节点
- id: "parse_document"
type: "code"
data:
title: "文档解析"
code_language: "python"
code: |
import PyPDF2
import docx
from io import BytesIO
def parse_document(file_content, file_type):
"""解析不同格式的文档"""
text = ""
if file_type == "pdf":
pdf_reader = PyPDF2.PdfReader(BytesIO(file_content))
for page in pdf_reader.pages:
text += page.extract_text()
elif file_type == "docx":
doc = docx.Document(BytesIO(file_content))
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
elif file_type == "txt":
text = file_content.decode('utf-8')
return {
"extracted_text": text,
"word_count": len(text.split()),
"char_count": len(text)
}
# 执行解析
result = parse_document(
document_file.content,
document_file.extension
)
extracted_text = result["extracted_text"]
document_stats = {
"word_count": result["word_count"],
"char_count": result["char_count"]
}
outputs:
- variable: "extracted_text"
type: "string"
- variable: "document_stats"
type: "object"
# 文本预处理节点
- id: "preprocess_text"
type: "code"
data:
title: "文本预处理"
code: |
import re
def clean_text(text):
"""清理和标准化文本"""
# 移除多余空白
text = re.sub(r'\s+', ' ', text)
# 移除特殊字符
text = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:]', '', text)
# 分段处理
paragraphs = [p.strip() for p in text.split('\n') if p.strip()]
return '\n'.join(paragraphs)
cleaned_text = clean_text(extracted_text)
# 文本分块(用于长文档)
max_chunk_size = 4000
chunks = []
if len(cleaned_text) > max_chunk_size:
words = cleaned_text.split()
current_chunk = []
current_length = 0
for word in words:
if current_length + len(word) > max_chunk_size:
chunks.append(' '.join(current_chunk))
current_chunk = [word]
current_length = len(word)
else:
current_chunk.append(word)
current_length += len(word) + 1
if current_chunk:
chunks.append(' '.join(current_chunk))
else:
chunks = [cleaned_text]
outputs:
- variable: "cleaned_text"
type: "string"
- variable: "text_chunks"
type: "array"
# 条件分支节点
- id: "analysis_router"
type: "if-else"
data:
title: "分析类型路由"
conditions:
logical_operator: "and"
conditions:
- variable_selector: ["start", "analysis_type"]
comparison_operator: "is"
value: "summary"
if_else:
true: "generate_summary"
false: "check_key_points"
# 摘要生成节点
- id: "generate_summary"
type: "llm"
data:
title: "生成文档摘要"
model:
provider: "openai"
name: "gpt-4"
parameters:
temperature: 0.3
max_tokens: 500
prompt_template:
- role: "system"
text: |
你是一个专业的文档分析师。请为以下文档生成简洁准确的摘要:
要求:
1. 摘要长度控制在200-300字
2. 突出文档的核心内容和主要观点
3. 使用清晰的结构化语言
4. 保持客观中性的语调
- role: "user"
text: |
文档内容:
{{#cleaned_text#}}
文档统计:
- 字数:{{#document_stats.word_count#}}
- 字符数:{{#document_stats.char_count#}}
请生成摘要:
outputs:
- variable: "summary"
type: "string"
# 关键点提取节点
- id: "extract_key_points"
type: "llm"
data:
title: "提取关键信息"
model:
provider: "openai"
name: "gpt-4"
parameters:
temperature: 0.2
max_tokens: 800
prompt_template:
- role: "system"
text: |
请从文档中提取关键信息,包括:
1. 主要观点(3-5个)
2. 重要数据和统计信息
3. 关键结论或建议
4. 需要注意的风险或问题
请以结构化的方式输出结果。
- role: "user"
text: "文档内容:\n{{#cleaned_text#}}"
outputs:
- variable: "key_points"
type: "string"
# 结果汇总节点
- id: "compile_results"
type: "template-transform"
data:
title: "结果汇总"
template: |
# 文档分析报告
## 基本信息
- 分析时间:{{#sys.datetime#}}
- 文档字数:{{#document_stats.word_count#}}
- 分析类型:{{#start.analysis_type#}}
{% if summary %}
## 文档摘要
{{#summary#}}
{% endif %}
{% if key_points %}
## 关键信息
{{#key_points#}}
{% endif %}
## 分析完成
文档分析已完成,如需进一步分析请联系相关人员。
outputs:
- variable: "final_report"
type: "string"
# 结束节点
- id: "end"
type: "end"
data:
title: "分析完成"
outputs:
- variable: "final_report"
type: "string"
# 节点连接关系
edges:
- source: "start"
target: "parse_document"
- source: "parse_document"
target: "preprocess_text"
- source: "preprocess_text"
target: "analysis_router"
- source: "analysis_router"
target: "generate_summary"
condition: true
- source: "analysis_router"
target: "extract_key_points"
condition: false
- source: "generate_summary"
target: "compile_results"
- source: "extract_key_points"
target: "compile_results"
- source: "compile_results"
target: "end"
工作流设计最佳实践
节点设计原则:
- 单一职责:每个节点只处理一个特定任务
- 可复用性:设计通用的处理节点
- 错误处理:为每个节点添加异常处理逻辑
变量管理:
- 明确的变量命名规范
- 合理的变量作用域设计
- 类型安全的变量传递
性能优化:
- 并行执行无依赖的节点
- 合理的文本分块策略
- 缓存中间结果
2. 性能优化最佳实践
2.1 数据库优化
PostgreSQL优化配置
-- 数据库性能优化配置
-- postgresql.conf 关键参数
-- 内存配置
shared_buffers = '256MB' -- 共享缓冲区
effective_cache_size = '1GB' -- 有效缓存大小
work_mem = '16MB' -- 工作内存
maintenance_work_mem = '64MB' -- 维护工作内存
-- 连接配置
max_connections = 200 -- 最大连接数
max_prepared_transactions = 100 -- 最大预处理事务
-- 检查点配置
checkpoint_completion_target = 0.9 -- 检查点完成目标
wal_buffers = '16MB' -- WAL缓冲区
checkpoint_timeout = '10min' -- 检查点超时
-- 查询优化
random_page_cost = 1.1 -- 随机页面成本
effective_io_concurrency = 200 -- 有效IO并发
-- 日志配置
log_min_duration_statement = 1000 -- 记录慢查询(1秒)
log_checkpoints = on -- 记录检查点
log_connections = on -- 记录连接
log_disconnections = on -- 记录断开连接
索引优化策略
-- 核心表索引优化
-- 应用表
CREATE INDEX CONCURRENTLY idx_apps_tenant_status
ON apps (tenant_id, status) WHERE status = 'normal';
CREATE INDEX CONCURRENTLY idx_apps_created_at
ON apps (created_at DESC);
-- 对话表
CREATE INDEX CONCURRENTLY idx_conversations_app_user
ON conversations (app_id, from_end_user_id, created_at DESC);
CREATE INDEX CONCURRENTLY idx_conversations_status
ON conversations (status) WHERE status IN ('normal', 'archived');
-- 消息表(分区表)
CREATE INDEX CONCURRENTLY idx_messages_conversation_created
ON messages (conversation_id, created_at DESC);
CREATE INDEX CONCURRENTLY idx_messages_app_date
ON messages (app_id, DATE(created_at));
-- 数据集文档表
CREATE INDEX CONCURRENTLY idx_documents_dataset_status
ON documents (dataset_id, indexing_status, enabled);
CREATE INDEX CONCURRENTLY idx_documents_batch
ON documents (batch) WHERE batch IS NOT NULL;
-- 向量索引(使用pgvector扩展)
CREATE INDEX CONCURRENTLY idx_document_segments_vector
ON document_segments USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
-- 复合索引优化查询
CREATE INDEX CONCURRENTLY idx_messages_complex
ON messages (app_id, conversation_id, created_at DESC)
INCLUDE (query, answer);
查询优化示例
# 高效的数据库查询实践
class OptimizedQueries:
"""优化的数据库查询示例"""
@staticmethod
def get_conversation_messages_optimized(conversation_id: str, limit: int = 20):
"""优化的消息查询 - 使用索引和分页"""
return db.session.query(Message)\
.filter(Message.conversation_id == conversation_id)\
.order_by(Message.created_at.desc())\
.limit(limit)\
.options(
# 预加载关联数据,避免N+1查询
selectinload(Message.message_files),
selectinload(Message.message_annotations)
).all()
@staticmethod
def get_app_statistics_optimized(app_id: str, date_range: tuple):
"""优化的应用统计查询 - 使用聚合和索引"""
start_date, end_date = date_range
# 使用原生SQL进行复杂聚合
sql = text("""
SELECT
DATE(created_at) as date,
COUNT(*) as message_count,
COUNT(DISTINCT conversation_id) as conversation_count,
AVG(provider_response_latency) as avg_latency,
SUM(CASE WHEN answer IS NOT NULL THEN 1 ELSE 0 END) as success_count
FROM messages
WHERE app_id = :app_id
AND created_at >= :start_date
AND created_at < :end_date
GROUP BY DATE(created_at)
ORDER BY date DESC
""")
return db.session.execute(sql, {
'app_id': app_id,
'start_date': start_date,
'end_date': end_date
}).fetchall()
@staticmethod
def batch_update_documents(dataset_id: str, updates: list):
"""批量更新文档 - 使用批处理减少数据库往返"""
# 构建批量更新语句
cases = []
ids = []
for update in updates:
cases.append(f"WHEN '{update['id']}' THEN '{update['status']}'")
ids.append(update['id'])
if cases:
sql = text(f"""
UPDATE documents
SET indexing_status = CASE id
{' '.join(cases)}
ELSE indexing_status
END,
updated_at = NOW()
WHERE id IN ({','.join([f"'{id}'" for id in ids])})
AND dataset_id = :dataset_id
""")
db.session.execute(sql, {'dataset_id': dataset_id})
db.session.commit()
2.2 缓存优化策略
Redis缓存最佳实践
# Redis缓存优化实践
import redis
import json
import hashlib
from typing import Any, Optional
from functools import wraps
class CacheManager:
"""缓存管理器"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.default_ttl = 3600 # 1小时
def cache_key(self, prefix: str, *args, **kwargs) -> str:
"""生成缓存键"""
# 创建稳定的键值
key_data = f"{prefix}:{':'.join(map(str, args))}"
if kwargs:
sorted_kwargs = sorted(kwargs.items())
key_data += f":{':'.join(f'{k}={v}' for k, v in sorted_kwargs)}"
# 对长键进行哈希
if len(key_data) > 200:
key_hash = hashlib.md5(key_data.encode()).hexdigest()
return f"{prefix}:hash:{key_hash}"
return key_data
def get_cached_result(self, key: str) -> Optional[Any]:
"""获取缓存结果"""
try:
cached_data = self.redis.get(key)
if cached_data:
return json.loads(cached_data)
except (json.JSONDecodeError, redis.RedisError):
pass
return None
def set_cached_result(self, key: str, data: Any, ttl: int = None) -> bool:
"""设置缓存结果"""
try:
ttl = ttl or self.default_ttl
serialized_data = json.dumps(data, ensure_ascii=False)
return self.redis.setex(key, ttl, serialized_data)
except (TypeError, redis.RedisError):
return False
def cache_function_result(self, ttl: int = None, key_prefix: str = None):
"""函数结果缓存装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
prefix = key_prefix or f"func:{func.__name__}"
cache_key = self.cache_key(prefix, *args, **kwargs)
# 尝试从缓存获取
cached_result = self.get_cached_result(cache_key)
if cached_result is not None:
return cached_result
# 执行函数并缓存结果
result = func(*args, **kwargs)
self.set_cached_result(cache_key, result, ttl)
return result
return wrapper
return decorator
# 使用示例
cache_manager = CacheManager(redis_client)
@cache_manager.cache_function_result(ttl=1800, key_prefix="app_config")
def get_app_config(app_id: str) -> dict:
"""获取应用配置(缓存30分钟)"""
app = App.query.filter_by(id=app_id).first()
if not app:
return {}
return {
'id': app.id,
'name': app.name,
'mode': app.mode,
'model_config': app.app_model_config.to_dict() if app.app_model_config else {},
'updated_at': app.updated_at.isoformat()
}
@cache_manager.cache_function_result(ttl=600, key_prefix="dataset_docs")
def get_dataset_documents(dataset_id: str, page: int = 1, per_page: int = 20) -> dict:
"""获取数据集文档列表(缓存10分钟)"""
documents = Document.query\
.filter_by(dataset_id=dataset_id, enabled=True)\
.order_by(Document.created_at.desc())\
.paginate(page=page, per_page=per_page)
return {
'documents': [doc.to_dict() for doc in documents.items],
'total': documents.total,
'pages': documents.pages,
'current_page': page
}
多层缓存架构
# 多层缓存实现
class MultiLevelCache:
"""多层缓存系统"""
def __init__(self, l1_cache, l2_cache, l3_cache=None):
self.l1_cache = l1_cache # 内存缓存(最快)
self.l2_cache = l2_cache # Redis缓存(中等)
self.l3_cache = l3_cache # 数据库缓存(最慢)
def get(self, key: str) -> Optional[Any]:
"""多层缓存获取"""
# L1缓存(内存)
result = self.l1_cache.get(key)
if result is not None:
return result
# L2缓存(Redis)
result = self.l2_cache.get(key)
if result is not None:
# 回填L1缓存
self.l1_cache.set(key, result, ttl=300) # 5分钟
return result
# L3缓存(数据库)
if self.l3_cache:
result = self.l3_cache.get(key)
if result is not None:
# 回填L2和L1缓存
self.l2_cache.set(key, result, ttl=1800) # 30分钟
self.l1_cache.set(key, result, ttl=300) # 5分钟
return result
return None
def set(self, key: str, value: Any, ttl: int = 3600):
"""多层缓存设置"""
# 同时设置所有层级
self.l1_cache.set(key, value, ttl=min(ttl, 300))
self.l2_cache.set(key, value, ttl=ttl)
if self.l3_cache:
self.l3_cache.set(key, value, ttl=ttl * 2)
def invalidate(self, key: str):
"""多层缓存失效"""
self.l1_cache.delete(key)
self.l2_cache.delete(key)
if self.l3_cache:
self.l3_cache.delete(key)
2.3 向量数据库优化
Qdrant优化配置
# Qdrant配置优化
storage:
# 存储配置
storage_path: "/qdrant/storage"
snapshots_path: "/qdrant/snapshots"
# 性能优化
performance:
max_search_threads: 4
max_optimization_threads: 2
# 内存配置
memory:
# 向量缓存大小(MB)
vector_cache_size: 512
# 索引缓存大小(MB)
index_cache_size: 256
service:
# HTTP服务配置
http_port: 6333
grpc_port: 6334
# 并发配置
max_request_size_mb: 32
max_workers: 8
# 超时配置
timeout_sec: 60
cluster:
# 集群配置(生产环境)
enabled: true
node_id: 1
# 一致性配置
consensus:
tick_period_ms: 100
bootstrap_timeout_sec: 30
向量检索优化策略
# 向量检索优化实践
class OptimizedVectorSearch:
"""优化的向量搜索实现"""
def __init__(self, qdrant_client):
self.client = qdrant_client
self.embedding_cache = {}
def create_optimized_collection(self, collection_name: str, vector_size: int):
"""创建优化的向量集合"""
from qdrant_client.models import VectorParams, Distance, OptimizersConfig
self.client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=vector_size,
distance=Distance.COSINE, # 余弦距离适合文本向量
),
# 优化器配置
optimizers_config=OptimizersConfig(
deleted_threshold=0.2, # 删除阈值
vacuum_min_vector_number=1000, # 最小向量数
default_segment_number=2, # 默认段数
max_segment_size_kb=20000, # 最大段大小
memmap_threshold_kb=50000, # 内存映射阈值
indexing_threshold_kb=20000, # 索引阈值
),
# HNSW索引配置
hnsw_config={
"m": 16, # 连接数
"ef_construct": 200, # 构建时的ef参数
"full_scan_threshold": 10000, # 全扫描阈值
}
)
def batch_upsert_vectors(
self,
collection_name: str,
vectors: list,
payloads: list,
batch_size: int = 100
):
"""批量插入向量"""
from qdrant_client.models import PointStruct
for i in range(0, len(vectors), batch_size):
batch_vectors = vectors[i:i + batch_size]
batch_payloads = payloads[i:i + batch_size]
points = [
PointStruct(
id=i + j,
vector=vector,
payload=payload
)
for j, (vector, payload) in enumerate(zip(batch_vectors, batch_payloads))
]
self.client.upsert(
collection_name=collection_name,
points=points,
wait=False # 异步插入提高性能
)
def optimized_search(
self,
collection_name: str,
query_vector: list,
limit: int = 10,
score_threshold: float = 0.7,
filter_conditions: dict = None
):
"""优化的向量搜索"""
from qdrant_client.models import Filter, FieldCondition, MatchValue
# 构建过滤条件
query_filter = None
if filter_conditions:
conditions = []
for field, value in filter_conditions.items():
conditions.append(
FieldCondition(key=field, match=MatchValue(value=value))
)
query_filter = Filter(must=conditions)
# 执行搜索
search_result = self.client.search(
collection_name=collection_name,
query_vector=query_vector,
query_filter=query_filter,
limit=limit,
score_threshold=score_threshold,
# 搜索参数优化
params={
"hnsw_ef": min(limit * 4, 200), # 动态调整ef参数
"exact": False, # 使用近似搜索
}
)
return search_result
def hybrid_search_with_rerank(
self,
collection_name: str,
query_text: str,
embedding_model,
rerank_model,
limit: int = 10,
rerank_top_k: int = 50
):
"""混合搜索与重排"""
# 1. 向量搜索
query_embedding = embedding_model.embed_query(query_text)
vector_results = self.optimized_search(
collection_name=collection_name,
query_vector=query_embedding,
limit=rerank_top_k, # 获取更多候选结果
score_threshold=0.5 # 降低阈值获取更多候选
)
# 2. 重排序
if len(vector_results) > limit:
documents = [result.payload.get('text', '') for result in vector_results]
rerank_scores = rerank_model.rerank(query_text, documents)
# 合并分数(向量相似度 + 重排分数)
combined_results = []
for i, result in enumerate(vector_results):
combined_score = (result.score * 0.7) + (rerank_scores[i] * 0.3)
combined_results.append((result, combined_score))
# 按合并分数排序
combined_results.sort(key=lambda x: x[1], reverse=True)
return [result[0] for result in combined_results[:limit]]
return vector_results
3. 安全最佳实践
3.1 API安全
认证与授权
# API安全最佳实践
from functools import wraps
import jwt
import time
from flask import request, jsonify, current_app
class APISecurityManager:
"""API安全管理器"""
@staticmethod
def validate_api_key(api_key: str) -> tuple[bool, dict]:
"""验证API密钥"""
try:
# 检查API密钥格式
if not api_key or not api_key.startswith('app-'):
return False, {'error': 'Invalid API key format'}
# 从数据库验证API密钥
app = App.query.filter_by(api_key=api_key).first()
if not app:
return False, {'error': 'API key not found'}
# 检查应用状态
if app.status != 'normal':
return False, {'error': 'Application is disabled'}
# 检查API密钥是否过期
if app.api_key_expires_at and app.api_key_expires_at < datetime.utcnow():
return False, {'error': 'API key expired'}
return True, {'app': app}
except Exception as e:
return False, {'error': f'API key validation failed: {str(e)}'}
@staticmethod
def rate_limit_check(identifier: str, limit: int, window: int) -> tuple[bool, dict]:
"""速率限制检查"""
try:
redis_key = f"rate_limit:{identifier}:{int(time.time() // window)}"
current_count = redis_client.get(redis_key)
if current_count is None:
# 首次请求
redis_client.setex(redis_key, window, 1)
return True, {'remaining': limit - 1}
current_count = int(current_count)
if current_count >= limit:
return False, {
'error': 'Rate limit exceeded',
'retry_after': window - (int(time.time()) % window)
}
# 增加计数
redis_client.incr(redis_key)
return True, {'remaining': limit - current_count - 1}
except Exception as e:
# 限流失败时允许请求通过,但记录错误
logger.error(f"Rate limit check failed: {e}")
return True, {'remaining': limit}
@staticmethod
def validate_request_signature(request_data: dict, signature: str, secret: str) -> bool:
"""验证请求签名"""
import hmac
import hashlib
try:
# 构建签名字符串
sorted_params = sorted(request_data.items())
sign_string = '&'.join([f'{k}={v}' for k, v in sorted_params])
# 计算HMAC签名
expected_signature = hmac.new(
secret.encode('utf-8'),
sign_string.encode('utf-8'),
hashlib.sha256
).hexdigest()
# 安全比较签名
return hmac.compare_digest(signature, expected_signature)
except Exception as e:
logger.error(f"Signature validation failed: {e}")
return False
def require_api_key(f):
"""API密钥验证装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
# 获取API密钥
api_key = request.headers.get('Authorization')
if api_key and api_key.startswith('Bearer '):
api_key = api_key[7:] # 移除 'Bearer ' 前缀
if not api_key:
return jsonify({'error': 'API key required'}), 401
# 验证API密钥
is_valid, result = APISecurityManager.validate_api_key(api_key)
if not is_valid:
return jsonify(result), 401
# 速率限制检查
app = result['app']
rate_limit_key = f"app:{app.id}"
is_allowed, rate_result = APISecurityManager.rate_limit_check(
rate_limit_key,
app.rate_limit or 1000, # 默认1000请求/小时
3600 # 1小时窗口
)
if not is_allowed:
return jsonify(rate_result), 429
# 将应用信息添加到请求上下文
request.app = app
return f(*args, **kwargs)
return decorated_function
输入验证与清理
# 输入验证最佳实践
import re
import html
from typing import Any, Dict, List
from marshmallow import Schema, fields, validate, ValidationError
class InputValidator:
"""输入验证器"""
# 安全的文本模式
SAFE_TEXT_PATTERN = re.compile(r'^[a-zA-Z0-9\u4e00-\u9fff\s\-_.,!?;:()]+$')
# 危险的SQL关键词
SQL_INJECTION_PATTERNS = [
r'\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|EXEC|EXECUTE)\b',
r'(\-\-|\#|\/\*|\*\/)',
r'(\bOR\b|\bAND\b).*(\=|\<|\>)',
r'(\bUNION\b.*\bSELECT\b)',
]
# XSS攻击模式
XSS_PATTERNS = [
r'<script[^>]*>.*?</script>',
r'javascript:',
r'on\w+\s*=',
r'<iframe[^>]*>.*?</iframe>',
]
@classmethod
def sanitize_text(cls, text: str, max_length: int = 1000) -> str:
"""清理文本输入"""
if not isinstance(text, str):
return ""
# 长度限制
text = text[:max_length]
# HTML转义
text = html.escape(text)
# 移除控制字符
text = ''.join(char for char in text if ord(char) >= 32 or char in '\n\r\t')
# 标准化空白字符
text = re.sub(r'\s+', ' ', text).strip()
return text
@classmethod
def validate_no_sql_injection(cls, text: str) -> bool:
"""检查SQL注入"""
text_upper = text.upper()
for pattern in cls.SQL_INJECTION_PATTERNS:
if re.search(pattern, text_upper, re.IGNORECASE):
return False
return True
@classmethod
def validate_no_xss(cls, text: str) -> bool:
"""检查XSS攻击"""
for pattern in cls.XSS_PATTERNS:
if re.search(pattern, text, re.IGNORECASE):
return False
return True
@classmethod
def validate_safe_filename(cls, filename: str) -> bool:
"""验证安全的文件名"""
if not filename or len(filename) > 255:
return False
# 检查危险字符
dangerous_chars = ['/', '\\', '..', '<', '>', ':', '"', '|', '?', '*']
for char in dangerous_chars:
if char in filename:
return False
# 检查保留名称(Windows)
reserved_names = [
'CON', 'PRN', 'AUX', 'NUL', 'COM1', 'COM2', 'COM3', 'COM4',
'COM5', 'COM6', 'COM7', 'COM8', 'COM9', 'LPT1', 'LPT2',
'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9'
]
if filename.upper() in reserved_names:
return False
return True
# API请求验证Schema
class ChatMessageSchema(Schema):
"""聊天消息验证Schema"""
query = fields.Str(
required=True,
validate=[
validate.Length(min=1, max=2000),
lambda x: InputValidator.validate_no_sql_injection(x),
lambda x: InputValidator.validate_no_xss(x)
]
)
conversation_id = fields.Str(
validate=validate.Regexp(r'^[a-f0-9\-]{36}$') # UUID格式
)
inputs = fields.Dict(
keys=fields.Str(validate=validate.Length(max=100)),
values=fields.Str(validate=validate.Length(max=1000))
)
response_mode = fields.Str(
validate=validate.OneOf(['blocking', 'streaming'])
)
user = fields.Str(
validate=validate.Length(max=100)
)
class FileUploadSchema(Schema):
"""文件上传验证Schema"""
file = fields.Raw(required=True)
def validate_file(self, file):
"""验证上传文件"""
# 检查文件大小
if file.content_length > 10 * 1024 * 1024: # 10MB
raise ValidationError("File size exceeds 10MB limit")
# 检查文件类型
allowed_types = [
'application/pdf',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'text/plain',
'text/markdown'
]
if file.content_type not in allowed_types:
raise ValidationError("Unsupported file type")
# 检查文件名
if not InputValidator.validate_safe_filename(file.filename):
raise ValidationError("Invalid filename")
return file
3.2 数据安全
敏感数据处理
# 敏感数据处理最佳实践
import hashlib
import secrets
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
class DataSecurityManager:
"""数据安全管理器"""
def __init__(self, master_key: str):
self.master_key = master_key.encode()
self._fernet = None
def _get_fernet(self) -> Fernet:
"""获取加密实例"""
if self._fernet is None:
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=b'dify_salt_2024', # 生产环境应使用随机salt
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(self.master_key))
self._fernet = Fernet(key)
return self._fernet
def encrypt_sensitive_data(self, data: str) -> str:
"""加密敏感数据"""
if not data:
return ""
fernet = self._get_fernet()
encrypted_data = fernet.encrypt(data.encode())
return base64.urlsafe_b64encode(encrypted_data).decode()
def decrypt_sensitive_data(self, encrypted_data: str) -> str:
"""解密敏感数据"""
if not encrypted_data:
return ""
try:
fernet = self._get_fernet()
decoded_data = base64.urlsafe_b64decode(encrypted_data.encode())
decrypted_data = fernet.decrypt(decoded_data)
return decrypted_data.decode()
except Exception as e:
logger.error(f"Failed to decrypt data: {e}")
return ""
def hash_password(self, password: str, salt: str = None) -> tuple[str, str]:
"""安全的密码哈希"""
if salt is None:
salt = secrets.token_hex(32)
# 使用PBKDF2进行密码哈希
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt.encode(),
iterations=100000,
)
password_hash = kdf.derive(password.encode())
return base64.urlsafe_b64encode(password_hash).decode(), salt
def verify_password(self, password: str, password_hash: str, salt: str) -> bool:
"""验证密码"""
try:
computed_hash, _ = self.hash_password(password, salt)
return secrets.compare_digest(password_hash, computed_hash)
except Exception:
return False
def generate_secure_token(self, length: int = 32) -> str:
"""生成安全令牌"""
return secrets.token_urlsafe(length)
def mask_sensitive_info(self, data: str, mask_char: str = '*') -> str:
"""脱敏敏感信息"""
if not data or len(data) <= 4:
return mask_char * len(data) if data else ""
# 保留前2位和后2位
return data[:2] + mask_char * (len(data) - 4) + data[-2:]
# 敏感数据模型混入
class EncryptedFieldMixin:
"""加密字段混入类"""
def __init__(self):
self.security_manager = DataSecurityManager(current_app.config['SECRET_KEY'])
def set_encrypted_field(self, field_name: str, value: str):
"""设置加密字段"""
if value:
encrypted_value = self.security_manager.encrypt_sensitive_data(value)
setattr(self, f"_{field_name}_encrypted", encrypted_value)
else:
setattr(self, f"_{field_name}_encrypted", "")
def get_encrypted_field(self, field_name: str) -> str:
"""获取加密字段"""
encrypted_value = getattr(self, f"_{field_name}_encrypted", "")
if encrypted_value:
return self.security_manager.decrypt_sensitive_data(encrypted_value)
return ""
# 使用示例
class ProviderCredentials(db.Model, EncryptedFieldMixin):
"""提供商凭据模型"""
id = db.Column(db.String(36), primary_key=True)
tenant_id = db.Column(db.String(36), nullable=False)
provider_name = db.Column(db.String(100), nullable=False)
_api_key_encrypted = db.Column(db.Text)
_secret_key_encrypted = db.Column(db.Text)
def __init__(self, **kwargs):
super().__init__()
EncryptedFieldMixin.__init__(self)
@property
def api_key(self) -> str:
return self.get_encrypted_field('api_key')
@api_key.setter
def api_key(self, value: str):
self.set_encrypted_field('api_key', value)
@property
def secret_key(self) -> str:
return self.get_encrypted_field('secret_key')
@secret_key.setter
def secret_key(self, value: str):
self.set_encrypted_field('secret_key', value)
def to_dict(self, include_sensitive: bool = False) -> dict:
"""转换为字典"""
result = {
'id': self.id,
'tenant_id': self.tenant_id,
'provider_name': self.provider_name,
}
if include_sensitive:
result.update({
'api_key': self.api_key,
'secret_key': self.secret_key,
})
else:
# 脱敏显示
result.update({
'api_key': self.security_manager.mask_sensitive_info(self.api_key),
'secret_key': self.security_manager.mask_sensitive_info(self.secret_key),
})
return result
4. 监控与运维最佳实践
4.1 应用监控
关键指标监控
# 应用监控最佳实践
import time
import psutil
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from functools import wraps
class MetricsCollector:
"""指标收集器"""
def __init__(self):
# 请求计数器
self.request_count = Counter(
'dify_requests_total',
'Total number of requests',
['method', 'endpoint', 'status_code']
)
# 请求延迟直方图
self.request_duration = Histogram(
'dify_request_duration_seconds',
'Request duration in seconds',
['method', 'endpoint']
)
# 活跃连接数
self.active_connections = Gauge(
'dify_active_connections',
'Number of active connections'
)
# LLM调用指标
self.llm_requests = Counter(
'dify_llm_requests_total',
'Total LLM requests',
['provider', 'model', 'status']
)
self.llm_duration = Histogram(
'dify_llm_request_duration_seconds',
'LLM request duration',
['provider', 'model']
)
# Token使用量
self.token_usage = Counter(
'dify_tokens_total',
'Total tokens used',
['provider', 'model', 'type'] # type: prompt/completion
)
# 系统资源指标
self.cpu_usage = Gauge('dify_cpu_usage_percent', 'CPU usage percentage')
self.memory_usage = Gauge('dify_memory_usage_bytes', 'Memory usage in bytes')
self.disk_usage = Gauge('dify_disk_usage_percent', 'Disk usage percentage')
# 业务指标
self.active_users = Gauge('dify_active_users', 'Number of active users')
self.active_apps = Gauge('dify_active_apps', 'Number of active applications')
def record_request(self, method: str, endpoint: str, status_code: int, duration: float):
"""记录请求指标"""
self.request_count.labels(
method=method,
endpoint=endpoint,
status_code=status_code
).inc()
self.request_duration.labels(
method=method,
endpoint=endpoint
).observe(duration)
def record_llm_request(self, provider: str, model: str, status: str, duration: float, tokens: dict):
"""记录LLM请求指标"""
self.llm_requests.labels(
provider=provider,
model=model,
status=status
).inc()
self.llm_duration.labels(
provider=provider,
model=model
).observe(duration)
# 记录Token使用量
if tokens.get('prompt_tokens'):
self.token_usage.labels(
provider=provider,
model=model,
type='prompt'
).inc(tokens['prompt_tokens'])
if tokens.get('completion_tokens'):
self.token_usage.labels(
provider=provider,
model=model,
type='completion'
).inc(tokens['completion_tokens'])
def update_system_metrics(self):
"""更新系统指标"""
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
self.cpu_usage.set(cpu_percent)
# 内存使用量
memory = psutil.virtual_memory()
self.memory_usage.set(memory.used)
# 磁盘使用率
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
self.disk_usage.set(disk_percent)
def update_business_metrics(self):
"""更新业务指标"""
# 活跃用户数(最近1小时)
one_hour_ago = datetime.utcnow() - timedelta(hours=1)
active_user_count = db.session.query(
func.count(func.distinct(Message.from_end_user_id))
).filter(Message.created_at >= one_hour_ago).scalar()
self.active_users.set(active_user_count or 0)
# 活跃应用数
active_app_count = App.query.filter_by(status='normal').count()
self.active_apps.set(active_app_count)
# 全局指标收集器实例
metrics_collector = MetricsCollector()
def monitor_request(f):
"""请求监控装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
start_time = time.time()
status_code = 200
try:
result = f(*args, **kwargs)
return result
except Exception as e:
status_code = getattr(e, 'code', 500)
raise
finally:
duration = time.time() - start_time
metrics_collector.record_request(
method=request.method,
endpoint=request.endpoint or 'unknown',
status_code=status_code,
duration=duration
)
return decorated_function
def monitor_llm_call(f):
"""LLM调用监控装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
start_time = time.time()
provider = kwargs.get('provider', 'unknown')
model = kwargs.get('model', 'unknown')
status = 'success'
try:
result = f(*args, **kwargs)
# 提取Token使用信息
tokens = {}
if hasattr(result, 'usage') and result.usage:
tokens = {
'prompt_tokens': result.usage.prompt_tokens,
'completion_tokens': result.usage.completion_tokens,
}
return result
except Exception as e:
status = 'error'
tokens = {}
raise
finally:
duration = time.time() - start_time
metrics_collector.record_llm_request(
provider=provider,
model=model,
status=status,
duration=duration,
tokens=tokens
)
return decorated_function
健康检查实现
# 健康检查最佳实践
from flask import Blueprint, jsonify
import redis
from sqlalchemy import text
health_bp = Blueprint('health', __name__)
class HealthChecker:
"""健康检查器"""
def __init__(self):
self.checks = {
'database': self.check_database,
'redis': self.check_redis,
'disk_space': self.check_disk_space,
'memory': self.check_memory,
'external_services': self.check_external_services,
}
def check_database(self) -> dict:
"""检查数据库连接"""
try:
# 执行简单查询
result = db.session.execute(text('SELECT 1')).fetchone()
if result and result[0] == 1:
return {'status': 'healthy', 'response_time_ms': 0}
else:
return {'status': 'unhealthy', 'error': 'Invalid query result'}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
def check_redis(self) -> dict:
"""检查Redis连接"""
try:
start_time = time.time()
redis_client.ping()
response_time = (time.time() - start_time) * 1000
return {'status': 'healthy', 'response_time_ms': round(response_time, 2)}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
def check_disk_space(self) -> dict:
"""检查磁盘空间"""
try:
disk = psutil.disk_usage('/')
usage_percent = (disk.used / disk.total) * 100
if usage_percent > 90:
return {
'status': 'unhealthy',
'usage_percent': round(usage_percent, 2),
'error': 'Disk usage too high'
}
elif usage_percent > 80:
return {
'status': 'warning',
'usage_percent': round(usage_percent, 2),
'message': 'Disk usage high'
}
else:
return {
'status': 'healthy',
'usage_percent': round(usage_percent, 2)
}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
def check_memory(self) -> dict:
"""检查内存使用"""
try:
memory = psutil.virtual_memory()
usage_percent = memory.percent
if usage_percent > 90:
return {
'status': 'unhealthy',
'usage_percent': usage_percent,
'error': 'Memory usage too high'
}
elif usage_percent > 80:
return {
'status': 'warning',
'usage_percent': usage_percent,
'message': 'Memory usage high'
}
else:
return {
'status': 'healthy',
'usage_percent': usage_percent
}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
def check_external_services(self) -> dict:
"""检查外部服务"""
try:
# 检查主要的LLM提供商
checks = {}
# OpenAI健康检查
try:
# 这里应该是实际的健康检查逻辑
checks['openai'] = {'status': 'healthy'}
except Exception as e:
checks['openai'] = {'status': 'unhealthy', 'error': str(e)}
# 检查向量数据库
try:
# Qdrant健康检查
checks['qdrant'] = {'status': 'healthy'}
except Exception as e:
checks['qdrant'] = {'status': 'unhealthy', 'error': str(e)}
# 判断整体状态
unhealthy_services = [k for k, v in checks.items() if v['status'] == 'unhealthy']
if unhealthy_services:
return {
'status': 'unhealthy',
'services': checks,
'unhealthy_services': unhealthy_services
}
else:
return {'status': 'healthy', 'services': checks}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
def run_all_checks(self) -> dict:
"""运行所有健康检查"""
results = {}
overall_status = 'healthy'
for check_name, check_func in self.checks.items():
try:
result = check_func()
results[check_name] = result
if result['status'] == 'unhealthy':
overall_status = 'unhealthy'
elif result['status'] == 'warning' and overall_status == 'healthy':
overall_status = 'warning'
except Exception as e:
results[check_name] = {'status': 'unhealthy', 'error': str(e)}
overall_status = 'unhealthy'
return {
'status': overall_status,
'timestamp': datetime.utcnow().isoformat(),
'checks': results
}
health_checker = HealthChecker()
@health_bp.route('/health')
def health_check():
"""健康检查端点"""
result = health_checker.run_all_checks()
status_code = 200
if result['status'] == 'unhealthy':
status_code = 503
elif result['status'] == 'warning':
status_code = 200 # 警告状态仍返回200
return jsonify(result), status_code
@health_bp.route('/health/ready')
def readiness_check():
"""就绪检查(Kubernetes)"""
# 检查关键服务
critical_checks = ['database', 'redis']
for check_name in critical_checks:
result = health_checker.checks[check_name]()
if result['status'] == 'unhealthy':
return jsonify({
'status': 'not_ready',
'failed_check': check_name,
'error': result.get('error')
}), 503
return jsonify({'status': 'ready'}), 200
@health_bp.route('/health/live')
def liveness_check():
"""存活检查(Kubernetes)"""
# 简单的存活检查
try:
return jsonify({
'status': 'alive',
'timestamp': datetime.utcnow().isoformat()
}), 200
except Exception as e:
return jsonify({
'status': 'dead',
'error': str(e)
}), 503
5. 总结
5.1 核心最佳实践总结
应用开发:
- 明确的角色定位和行为准则
- 合理的模型参数调优
- 完善的错误处理和重试机制
- 结构化的工作流设计
性能优化:
- 数据库索引和查询优化
- 多层缓存架构
- 向量检索优化
- 批处理和并行执行
安全防护:
- 严格的输入验证和清理
- 敏感数据加密存储
- API认证和授权
- 速率限制和防护
监控运维:
- 全面的指标监控
- 完善的健康检查
- 结构化日志记录
- 告警和故障处理
5.2 实施建议
- 渐进式优化:从核心功能开始,逐步完善各个方面
- 监控驱动:建立完善的监控体系,基于数据进行优化
- 安全优先:在设计阶段就考虑安全因素
- 文档完善:维护详细的开发和运维文档
- 团队协作:建立代码审查和最佳实践分享机制
通过遵循这些最佳实践,可以构建出高性能、高可用、安全可靠的Dify应用系统。