架构设计最佳实践
1. 多租户架构设计
Chatwoot采用了优雅的多租户架构,每个Account代表一个租户:
# 使用Current类管理请求上下文
class Current < ActiveSupport::CurrentAttributes
attribute :account, :user, :contact, :executed_by
# 重置方法在请求结束时清理上下文
def reset
super
RequestStore.clear!
end
end
# 在控制器中设置当前上下文
class Api::V1::Accounts::BaseController < Api::BaseController
before_action :set_current_account
private
def set_current_account
Current.account = Account.find(params[:account_id])
Current.user = current_user
end
end
# 在模型中使用租户隔离
class Conversation < ApplicationRecord
# 所有查询都会自动加上account_id条件
belongs_to :account
# 使用scope确保数据隔离
scope :in_account, ->(account) { where(account: account) }
# 验证确保数据属于正确的租户
validate :ensure_account_consistency
private
def ensure_account_consistency
return unless account_id.present?
errors.add(:inbox, 'must belong to the same account') if inbox&.account_id != account_id
errors.add(:contact, 'must belong to the same account') if contact&.account_id != account_id
end
end
关键设计原则:
- 数据隔离: 每个模型都关联到Account,确保租户间数据完全隔离
- 上下文管理: 使用Current类管理请求上下文,避免参数传递
- 权限控制: 使用Pundit策略确保用户只能访问自己租户的数据
2. 事件驱动架构
# 事件分发器配置
# config/application.rb
config.dispatcher = Wisper::GlobalListeners
# 在初始化器中注册监听器
# config/initializers/event_listeners.rb
Rails.application.config.to_prepare do
# 消息相关事件
Rails.configuration.dispatcher.subscribe(MessageListener.new)
Rails.configuration.dispatcher.subscribe(ConversationListener.new)
Rails.configuration.dispatcher.subscribe(NotificationListener.new)
# 集成服务事件
Rails.configuration.dispatcher.subscribe(WebhookListener.new)
Rails.configuration.dispatcher.subscribe(AutomationRuleListener.new)
end
# 事件监听器基类
class BaseListener
include Sidekiq::Worker
# 异步处理事件,避免阻塞主流程
def perform_async(event_name, event_data)
public_send(event_name, OpenStruct.new(data: event_data))
end
# 错误处理
def handle_error(error, event_name, event_data)
ChatwootExceptionTracker.new(
error,
account: event_data[:account],
event_name: event_name
).capture_exception
end
end
# 消息事件监听器
class MessageListener < BaseListener
# 消息创建后的处理流程
def message_created(event)
message = event.data[:message]
# 1. 发送到外部渠道
send_to_channel(message)
# 2. 触发自动化规则
trigger_automation_rules(message)
# 3. 发送通知
send_notifications(message)
# 4. 更新对话状态
update_conversation_state(message)
# 5. 广播WebSocket事件
broadcast_message_event(message)
end
private
# 发送消息到外部渠道
def send_to_channel(message)
return unless message.outgoing?
return if message.private?
# 根据收件箱类型选择对应的发送服务
case message.inbox.channel_type
when 'Channel::Whatsapp'
Whatsapp::SendOnWhatsappService.new(message: message).perform
when 'Channel::Email'
Email::SendOnEmailService.new(message: message).perform
when 'Channel::TwilioSms'
Twilio::SendOnTwilioService.new(message: message).perform
end
end
# 触发自动化规则
def trigger_automation_rules(message)
message.account.automation_rules
.where(event_name: 'message_created', active: true)
.find_each do |rule|
AutomationRuleJob.perform_later(rule.id, message.conversation.id)
end
end
end
事件驱动的优势:
- 解耦: 业务逻辑通过事件解耦,易于维护和扩展
- 异步处理: 非关键路径操作异步执行,提升响应速度
- 可扩展: 新功能可以通过添加监听器实现,无需修改现有代码
性能优化实践
1. 数据库查询优化
# 使用includes预加载关联数据,避免N+1查询
class ConversationFinder
def perform
conversations = base_query
.includes(:contact, :assignee, :team, :inbox, :labels)
.includes(messages: [:sender, :attachments])
.page(params[:page])
.per(params[:per_page] || 25)
# 使用counter_cache避免重复计数查询
conversations.each { |conv| conv.association(:messages).loaded! }
{
conversations: conversations,
count: conversations.total_count
}
end
private
def base_query
# 使用复合索引优化查询
Current.account.conversations
.where(inbox_id: inbox_ids)
.where(status: statuses)
.order(last_activity_at: :desc)
end
end
# 数据库索引策略
class CreateConversations < ActiveRecord::Migration[7.0]
def change
create_table :conversations do |t|
# ... 字段定义
# 复合索引优化常用查询
t.index [:account_id, :inbox_id, :status, :assignee_id],
name: 'conv_acid_inbid_stat_asgnid_idx'
# 支持时间范围查询
t.index [:account_id, :last_activity_at]
# 支持状态过滤
t.index [:status, :account_id]
# 支持全文搜索
t.index :uuid, unique: true
end
end
end
2. 缓存策略
# 使用Redis缓存热点数据
class Account < ApplicationRecord
# 缓存账户配置,避免重复查询
def cached_feature_flags
Rails.cache.fetch("account_#{id}_feature_flags", expires_in: 1.hour) do
feature_flags_hash
end
end
# 缓存在线用户统计
def online_agents_count
Rails.cache.fetch("account_#{id}_online_agents", expires_in: 5.minutes) do
users.joins(:account_users)
.where(account_users: { role: 'agent' })
.where(availability: 'online')
.count
end
end
end
# 使用fragment caching缓存视图片段
# app/views/api/v1/accounts/conversations/show.json.jbuilder
json.cache! ['v1', @conversation, @conversation.messages.maximum(:updated_at)] do
json.partial! 'conversation', conversation: @conversation
json.messages do
json.cache_collection! @conversation.messages, partial: 'message', as: :message
end
end
# 智能缓存失效
class CacheInvalidationService
def self.invalidate_account_cache(account_id)
Rails.cache.delete_matched("account_#{account_id}_*")
end
def self.invalidate_conversation_cache(conversation)
Rails.cache.delete("conversation_#{conversation.id}")
Rails.cache.delete("account_#{conversation.account_id}_conversations_count")
end
end
3. 异步任务处理
# 使用Sidekiq处理耗时任务
class SendReplyJob < ApplicationJob
queue_as :high_priority
# 重试策略
sidekiq_options retry: 3, dead: false
def perform(message_id)
message = Message.find(message_id)
# 幂等性检查
return if message.status == 'sent'
# 发送消息到外部渠道
send_message_to_channel(message)
# 更新消息状态
message.update!(status: 'sent')
rescue => e
# 记录失败原因
message.update!(
status: 'failed',
content_attributes: message.content_attributes.merge(
error: e.message
)
)
raise e
end
private
def send_message_to_channel(message)
case message.inbox.channel_type
when 'Channel::Email'
ConversationReplyMailer.with(message: message).reply.deliver_now
when 'Channel::Whatsapp'
Whatsapp::SendOnWhatsappService.new(message: message).perform
end
end
end
# 任务监控和告警
class ApplicationJob < ActiveJob::Base
# 任务执行时间监控
around_perform do |job, block|
start_time = Time.current
block.call
duration = Time.current - start_time
# 记录慢任务
if duration > 30.seconds
Rails.logger.warn "Slow job detected: #{job.class.name} took #{duration}s"
end
end
# 异常处理
rescue_from(StandardError) do |exception|
ChatwootExceptionTracker.new(exception, job: self).capture_exception
raise exception
end
end
安全最佳实践
1. 认证与授权
# 使用DeviseTokenAuth进行API认证
class ApplicationController < ActionController::API
include DeviseTokenAuth::Concerns::SetUserByToken
# CSRF保护
protect_from_forgery with: :exception, unless: -> { request.format.json? }
# 请求限制
include Rack::Attack::Request
before_action :authenticate_user!, unless: :public_endpoint?
before_action :set_current_user
private
def set_current_user
Current.user = current_user
end
def public_endpoint?
controller_name == 'widget' || controller_name == 'public'
end
end
# 使用Pundit进行细粒度权限控制
class ConversationPolicy < ApplicationPolicy
class Scope < Scope
def resolve
# 管理员可以看到所有对话
return scope.all if user.administrator?
# 普通客服只能看到分配给自己或所在收件箱的对话
scope.joins(:inbox)
.where(
'conversations.assignee_id = ? OR inboxes.id IN (?)',
user.id,
user.inbox_ids
)
end
end
def show?
return true if user.administrator?
return true if record.assignee_id == user.id
return true if record.inbox.members.include?(user)
false
end
def update?
show? && user.agent?
end
end
2. 数据验证与清理
# 输入验证
class Message < ApplicationRecord
# 内容长度限制
validates :content, length: { maximum: 150_000 }
# HTML内容清理
before_save :sanitize_content
# JSON Schema验证
validates_with JsonSchemaValidator,
schema: TEMPLATE_PARAMS_SCHEMA,
attribute_resolver: ->(record) { record.additional_attributes }
private
def sanitize_content
return unless content_type == 'text' && content.present?
# 使用白名单清理HTML
self.content = ActionController::Base.helpers.sanitize(
content,
tags: %w[b i u strong em br p div span],
attributes: %w[class style]
)
end
end
# SQL注入防护
class ConversationFinder
def perform
# 使用参数化查询避免SQL注入
conversations = base_query.where(
'conversations.additional_attributes ->> ? = ?',
sanitized_attribute_key,
sanitized_value
)
end
private
def sanitized_attribute_key
# 白名单验证属性键
allowed_keys = %w[source_url browser_language]
params[:attribute_key] if allowed_keys.include?(params[:attribute_key])
end
end
3. 敏感数据保护
# 使用Rails 7的加密功能保护敏感数据
class User < ApplicationRecord
# 双因素认证密钥加密存储
encrypts :otp_secret, deterministic: true
encrypts :otp_backup_codes
# API密钥加密
encrypts :access_token, deterministic: true
# 在日志中过滤敏感信息
def self.filter_attributes
%w[password otp_secret access_token encrypted_password]
end
end
# 配置日志过滤
# config/application.rb
config.filter_parameters += [
:password, :password_confirmation,
:otp_secret, :otp_backup_codes,
:access_token, :refresh_token,
/private_key/i
]
# API响应中排除敏感字段
# app/views/api/v1/users/show.json.jbuilder
json.extract! @user, :id, :name, :email, :availability
# 不包含 otp_secret, access_token 等敏感字段
监控与可观测性
1. 应用性能监控
# 使用多种APM工具
# config/application.rb
if ENV.fetch('NEW_RELIC_LICENSE_KEY', false).present?
require 'newrelic-sidekiq-metrics'
require 'newrelic_rpm'
end
if ENV.fetch('DATADOG_API_KEY', false).present?
require 'datadog'
end
if ENV.fetch('SENTRY_DSN', false).present?
require 'sentry-rails'
require 'sentry-sidekiq'
end
# 自定义性能指标
class PerformanceTracker
def self.track_message_processing_time(message)
start_time = Time.current
yield
duration = Time.current - start_time
# 记录到监控系统
StatsD.increment('message.processed')
StatsD.histogram('message.processing_time', duration)
# 慢消息告警
if duration > 5.seconds
Rails.logger.warn "Slow message processing: #{message.id} took #{duration}s"
end
end
end
# 健康检查端点
class HealthController < ApplicationController
def show
checks = {
database: database_check,
redis: redis_check,
sidekiq: sidekiq_check,
storage: storage_check
}
status = checks.values.all? ? :ok : :service_unavailable
render json: checks, status: status
end
private
def database_check
ActiveRecord::Base.connection.execute('SELECT 1')
'ok'
rescue => e
"error: #{e.message}"
end
def redis_check
Redis.current.ping == 'PONG' ? 'ok' : 'error'
rescue => e
"error: #{e.message}"
end
end
2. 错误追踪与告警
# 统一异常处理
class ChatwootExceptionTracker
def initialize(exception, context = {})
@exception = exception
@context = context
end
def capture_exception
# 发送到多个监控服务
send_to_sentry if sentry_configured?
send_to_datadog if datadog_configured?
send_to_slack if critical_error?
# 记录到应用日志
Rails.logger.error "Exception: #{@exception.class.name} - #{@exception.message}"
Rails.logger.error @exception.backtrace.join("\n")
end
private
def send_to_sentry
Sentry.capture_exception(@exception, extra: @context)
end
def critical_error?
@exception.is_a?(SecurityError) ||
@exception.message.include?('payment') ||
@context[:account]&.premium?
end
def send_to_slack
SlackNotificationJob.perform_later(
channel: '#alerts',
text: "🚨 Critical error in #{Rails.env}: #{@exception.message}",
context: @context
)
end
end
# 在应用中使用
begin
risky_operation
rescue => e
ChatwootExceptionTracker.new(e, account: Current.account).capture_exception
raise e
end
部署与运维最佳实践
1. 容器化部署
# Dockerfile
FROM ruby:3.4.4-alpine
# 安装系统依赖
RUN apk add --no-cache \
build-base \
postgresql-dev \
nodejs \
npm \
git \
imagemagick \
tzdata
# 设置工作目录
WORKDIR /app
# 复制Gemfile并安装Ruby依赖
COPY Gemfile Gemfile.lock ./
RUN bundle config --global frozen 1 && \
bundle install --without development test
# 复制package.json并安装Node.js依赖
COPY package.json pnpm-lock.yaml ./
RUN npm install -g pnpm && pnpm install
# 复制应用代码
COPY . .
# 预编译资源
RUN RAILS_ENV=production bundle exec rake assets:precompile
# 创建非root用户
RUN addgroup -g 1001 -S chatwoot && \
adduser -S chatwoot -u 1001 -G chatwoot
USER chatwoot
EXPOSE 3000
CMD ["bundle", "exec", "rails", "server", "-b", "0.0.0.0"]
# docker-compose.yml
version: '3.8'
services:
app:
build: .
ports:
- "3000:3000"
environment:
- RAILS_ENV=production
- DATABASE_URL=postgresql://postgres:password@db:5432/chatwoot
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
volumes:
- ./storage:/app/storage
worker:
build: .
command: bundle exec sidekiq
environment:
- RAILS_ENV=production
- DATABASE_URL=postgresql://postgres:password@db:5432/chatwoot
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
db:
image: postgres:15
environment:
- POSTGRES_PASSWORD=password
- POSTGRES_DB=chatwoot
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
postgres_data:
redis_data:
2. 环境配置管理
# 使用dotenv管理环境变量
# .env.example
DATABASE_URL=postgresql://localhost/chatwoot_development
REDIS_URL=redis://localhost:6379
SECRET_KEY_BASE=your_secret_key_here
# 邮件配置
MAILER_SENDER_EMAIL=support@chatwoot.com
SMTP_ADDRESS=smtp.gmail.com
SMTP_PORT=587
SMTP_USERNAME=your_email@gmail.com
SMTP_PASSWORD=your_password
# 存储配置
ACTIVE_STORAGE_SERVICE=local
# AWS S3配置
AWS_ACCESS_KEY_ID=your_access_key
AWS_SECRET_ACCESS_KEY=your_secret_key
AWS_REGION=us-east-1
AWS_BUCKET_NAME=your-bucket
# 集成服务
FACEBOOK_APP_ID=your_facebook_app_id
FACEBOOK_APP_SECRET=your_facebook_app_secret
# 监控配置
SENTRY_DSN=your_sentry_dsn
NEW_RELIC_LICENSE_KEY=your_newrelic_key
# 功能开关
ENABLE_ACCOUNT_SIGNUP=true
CHATWOOT_INBOX_TOKEN=your_inbox_token
3. 数据库维护
# 数据库迁移脚本
class OptimizeConversationsTable < ActiveRecord::Migration[7.0]
def up
# 添加复合索引提升查询性能
add_index :conversations,
[:account_id, :status, :last_activity_at],
name: 'idx_conversations_account_status_activity'
# 清理过期数据
execute <<-SQL
DELETE FROM conversations
WHERE status = 'resolved'
AND updated_at < NOW() - INTERVAL '1 year'
AND account_id IN (
SELECT id FROM accounts WHERE status = 'suspended'
);
SQL
end
def down
remove_index :conversations, name: 'idx_conversations_account_status_activity'
end
end
# 定期数据清理任务
class DataCleanupJob < ApplicationJob
def perform
# 清理过期的访客联系人
Contact.stale_without_conversations(30.days.ago).delete_all
# 清理过期的通知
Notification.where('created_at < ?', 90.days.ago).delete_all
# 清理过期的会话数据
ActionCable.server.connections.each(&:close)
Rails.logger.info "Data cleanup completed at #{Time.current}"
end
end
框架使用示例
1. 创建自定义渠道
如果要为Chatwoot添加新的通信渠道,可以按照以下步骤:
步骤1: 创建渠道模型
# app/models/channel/custom_channel.rb
class Channel::CustomChannel < ApplicationRecord
# 验证必需的配置参数
validates :api_key, presence: true
validates :webhook_url, presence: true, format: { with: URI.regexp }
# 多态关联到收件箱
has_one :inbox, as: :channel, dependent: :destroy
# 渠道特定配置
def name
'CustomChannel'
end
# 支持的消息类型
def supported_message_types
%w[text image file]
end
# 验证配置有效性
def validate_connection
response = HTTParty.get(
"#{webhook_url}/validate",
headers: { 'Authorization' => "Bearer #{api_key}" }
)
response.success?
rescue
false
end
end
# 数据库迁移
class CreateChannelCustomChannels < ActiveRecord::Migration[7.0]
def change
create_table :channel_custom_channels do |t|
t.string :api_key, null: false
t.string :webhook_url, null: false
t.jsonb :additional_config, default: {}
t.timestamps
end
add_index :channel_custom_channels, :api_key, unique: true
end
end
步骤2: 创建消息发送服务
# app/services/custom_channel/send_on_custom_channel_service.rb
class CustomChannel::SendOnCustomChannelService < Base::SendOnChannelService
private
# 指定渠道类
def channel_class
Channel::CustomChannel
end
# 实现消息发送逻辑
def perform_reply
case message.content_type
when 'text'
send_text_message
when 'image', 'file'
send_attachment_message
else
Rails.logger.warn "Unsupported message type: #{message.content_type}"
end
end
def send_text_message
response = HTTParty.post(
"#{channel.webhook_url}/messages",
headers: request_headers,
body: {
recipient_id: contact.get_source_id(inbox.id),
message: {
type: 'text',
content: message.content
}
}.to_json
)
handle_response(response)
end
def send_attachment_message
attachment = message.attachments.first
response = HTTParty.post(
"#{channel.webhook_url}/messages",
headers: request_headers,
body: {
recipient_id: contact.get_source_id(inbox.id),
message: {
type: 'attachment',
content: message.content,
attachment_url: attachment.download_url,
attachment_type: attachment.file_type
}
}.to_json
)
handle_response(response)
end
def request_headers
{
'Authorization' => "Bearer #{channel.api_key}",
'Content-Type' => 'application/json'
}
end
def handle_response(response)
if response.success?
message.update!(
status: :sent,
source_id: response.parsed_response['message_id']
)
else
message.update!(
status: :failed,
content_attributes: message.content_attributes.merge(
external_error: response.parsed_response['error']
)
)
Rails.logger.error "Failed to send message: #{response.body}"
end
end
end
2. 创建自定义自动化动作
# 扩展自动化规则服务,添加自定义动作
class AutomationRules::ActionService < ActionService
private
# 发送短信动作
def send_sms(phone_numbers)
return unless @conversation.contact.phone_number.present?
phone_numbers.each do |phone_number|
SmsService.new(
to: phone_number,
message: "New conversation from #{@conversation.contact.name}: #{@conversation.recent_messages.last&.content}"
).send
end
end
# 创建外部工单动作
def create_external_ticket(ticket_params)
ticket_data = {
title: "Chatwoot Conversation ##{@conversation.display_id}",
description: @conversation.recent_messages.map(&:content).join("\n"),
priority: ticket_params['priority'],
assignee: ticket_params['assignee_email']
}
response = HTTParty.post(
ticket_params['api_endpoint'],
headers: { 'Authorization' => "Bearer #{ticket_params['api_token']}" },
body: ticket_data.to_json
)
if response.success?
# 在对话中添加工单链接
@conversation.update!(
additional_attributes: @conversation.additional_attributes.merge(
external_ticket_id: response.parsed_response['id'],
external_ticket_url: response.parsed_response['url']
)
)
end
end
# 触发Webhook到Zapier
def trigger_zapier_webhook(webhook_urls)
payload = {
conversation: @conversation.webhook_data,
contact: @conversation.contact.webhook_data,
account: @conversation.account.webhook_data,
triggered_at: Time.current.iso8601
}
webhook_urls.each do |url|
WebhookJob.perform_later(url, payload)
end
end
end
3. 扩展API功能
# 创建自定义API端点
class Api::V1::Accounts::CustomReportsController < Api::V1::Accounts::BaseController
before_action :check_authorization
# GET /api/v1/accounts/:account_id/custom_reports/agent_performance
def agent_performance
@report_data = generate_agent_performance_report
render json: @report_data
end
# GET /api/v1/accounts/:account_id/custom_reports/conversation_trends
def conversation_trends
@trend_data = generate_conversation_trends
render json: @trend_data
end
# POST /api/v1/accounts/:account_id/custom_reports/export
def export
ExportReportJob.perform_later(
current_account.id,
current_user.id,
permitted_params[:report_type],
permitted_params[:date_range]
)
render json: { status: 'queued', message: 'Report export has been queued' }
end
private
def generate_agent_performance_report
agents = current_account.users.joins(:account_users)
.where(account_users: { role: 'agent' })
agents.map do |agent|
conversations = agent.assigned_conversations
.where(created_at: date_range)
{
agent_id: agent.id,
agent_name: agent.name,
total_conversations: conversations.count,
resolved_conversations: conversations.resolved.count,
average_first_response_time: calculate_avg_first_response_time(conversations),
average_resolution_time: calculate_avg_resolution_time(conversations),
csat_score: calculate_csat_score(conversations)
}
end
end
def generate_conversation_trends
conversations = current_account.conversations
.where(created_at: date_range)
.group_by_day(:created_at)
.count
{
daily_conversations: conversations,
total_conversations: conversations.values.sum,
peak_day: conversations.max_by { |_, count| count }&.first,
average_daily: conversations.values.sum / conversations.size.to_f
}
end
def date_range
start_date = Date.parse(permitted_params[:start_date]) rescue 30.days.ago
end_date = Date.parse(permitted_params[:end_date]) rescue Date.current
start_date..end_date
end
def permitted_params
params.permit(:report_type, :start_date, :end_date, date_range: {})
end
def check_authorization
authorize current_account, :show_reports?
end
end
# 在routes.rb中添加路由
namespace :custom_reports do
get :agent_performance
get :conversation_trends
post :export
end
总结
这份实战经验与最佳实践文档总结了Chatwoot项目中的核心设计模式和实施经验:
- 架构设计: 多租户架构、事件驱动设计的实际应用
- 性能优化: 数据库查询优化、缓存策略、异步处理的具体实现
- 安全防护: 认证授权、数据验证、敏感信息保护的最佳实践
- 监控运维: APM集成、错误追踪、健康检查的完整方案
- 扩展开发: 自定义渠道、自动化动作、API端点的开发指南
通过学习这些实战经验,开发者可以更好地理解如何构建和维护一个生产级的Rails应用,掌握现代Web应用开发的核心技能和最佳实践。