1. 框架使用示例
1.1 基本使用流程
# 1. 编译Mock Server
cd mysql-server/router
mkdir build && cd build
cmake .. -DCMAKE_BUILD_TYPE=Release
make mysql_server_mock
# 2. 创建配置文件
cat > simple_test.js << 'EOF'
({
"handshake": {
"greeting": {
"server_version": "8.0.0-mock-test",
"connection_id": 1,
"exec_time": 0
},
"auth": {
"username": "testuser",
"password": "testpass"
}
},
"stmts": [
{
"stmt": "SELECT @@version_comment",
"result": {
"columns": [{"name": "@@version_comment", "type": "STRING"}],
"rows": [["MySQL Router Mock Server"]]
}
}
]
})
EOF
# 3. 启动Mock Server
./mysql_server_mock --port=3306 --filename=simple_test.js --verbose
# 4. 使用MySQL客户端连接测试
mysql -h127.0.0.1 -P3306 -utestuser -ptestpass -e "SELECT @@version_comment"
1.2 命令行参数详解
# 完整的命令行参数示例
./mysql_server_mock \
--filename=/path/to/test_scenario.js \ # 测试脚本文件
--bind-address=0.0.0.0 \ # 绑定地址
--port=3306 \ # Classic协议端口
--xport=33060 \ # X协议端口
--socket=/tmp/mysql_mock.sock \ # Unix socket路径
--http-port=8080 \ # REST API端口
--module-prefix=/usr/local/share/mock_modules \ # JS模块路径
--ssl-mode=PREFERRED \ # SSL模式
--ssl-cert=/path/to/server.pem \ # SSL证书
--ssl-key=/path/to/server-key.pem \ # SSL私钥
--ssl-ca=/path/to/ca.pem \ # CA证书
--verbose \ # 详细日志
--logging-folder=/var/log/mysql_mock # 日志目录
1.3 Docker容器部署示例
# Dockerfile for MySQL Mock Server
FROM ubuntu:22.04
# 安装依赖
RUN apt-get update && apt-get install -y \
build-essential \
cmake \
libssl-dev \
libduktape-dev \
&& rm -rf /var/lib/apt/lists/*
# 复制源码并编译
COPY mysql-server /src/mysql-server
WORKDIR /src/mysql-server/router/build
RUN cmake .. -DCMAKE_BUILD_TYPE=Release && \
make mysql_server_mock && \
cp mysql_server_mock /usr/local/bin/
# 创建运行用户
RUN useradd -r -s /bin/false mysql_mock
# 复制配置文件
COPY test_scenarios/ /etc/mysql_mock/
# 暴露端口
EXPOSE 3306 33060 8080
# 启动脚本
COPY docker-entrypoint.sh /usr/local/bin/
RUN chmod +x /usr/local/bin/docker-entrypoint.sh
USER mysql_mock
ENTRYPOINT ["/usr/local/bin/docker-entrypoint.sh"]
#!/bin/bash
# docker-entrypoint.sh
# 默认配置
MOCK_FILE="${MOCK_FILE:-/etc/mysql_mock/default.js}"
BIND_ADDRESS="${BIND_ADDRESS:-0.0.0.0}"
PORT="${PORT:-3306}"
XPORT="${XPORT:-33060}"
HTTP_PORT="${HTTP_PORT:-8080}"
# 启动Mock Server
exec mysql_server_mock \
--filename="$MOCK_FILE" \
--bind-address="$BIND_ADDRESS" \
--port="$PORT" \
--xport="$XPORT" \
--http-port="$HTTP_PORT" \
--verbose
# docker-compose.yml
version: '3.8'
services:
mysql-mock:
build: .
ports:
- "3306:3306"
- "33060:33060"
- "8080:8080"
volumes:
- ./scenarios:/etc/mysql_mock
- ./logs:/var/log/mysql_mock
environment:
- MOCK_FILE=/etc/mysql_mock/integration_test.js
- PORT=3306
- VERBOSE=true
networks:
- test_network
test_client:
image: mysql:8.0
depends_on:
- mysql-mock
command: >
sh -c "
sleep 10 &&
mysql -hmysql-mock -P3306 -utestuser -ptestpass
-e 'SELECT @@version; SHOW DATABASES;'
"
networks:
- test_network
networks:
test_network:
2. 详细API说明和使用示例
2.1 JavaScript配置API详解
2.1.1 握手配置API
/**
* @brief 完整的握手配置示例
*/
({
"handshake": {
// 服务器问候消息配置
"greeting": {
"server_version": "8.0.28-mock", // 服务器版本字符串
"connection_id": 42, // 连接ID(可动态生成)
"capabilities": 0x807fffff, // 服务器能力标志位
"status_flags": 0x0002, // 服务器状态标志
"character_set": 8, // 默认字符集(latin1_swedish_ci)
"auth_method": "caching_sha2_password", // 认证方法
"nonce": "abcdefghij0123456789", // 认证随机数(20字节)
"exec_time": 1000 // 握手延迟(微秒)
},
// 认证配置
"auth": {
"username": "admin", // 期望的用户名
"password": "secret123", // 期望的密码
"auth_method_name": "mysql_native_password", // 强制使用的认证方法
// 客户端证书验证(可选)
"certificate": {
"issuer": "CN=Test CA,O=Example Corp,C=US",
"subject": "CN=test-client,O=Example Corp,C=US"
}
},
// 错误响应(可选,用于测试认证失败)
"error": {
"code": 1045,
"message": "Access denied for user 'test'@'localhost'",
"sql_state": "28000"
}
}
})
2.1.2 动态握手处理
/**
* @brief 动态握手处理示例
*/
({
"handshake": function(is_greeting) {
// 根据请求类型返回不同配置
if (is_greeting) {
return {
"greeting": {
"server_version": "8.0.0-dynamic-" + new Date().getTime(),
"connection_id": Math.floor(Math.random() * 1000000),
"exec_time": Math.random() < 0.1 ? 5000 : 0 // 10%概率延迟5ms
}
};
} else {
// 认证阶段
var current_hour = new Date().getHours();
if (current_hour < 9 || current_hour > 17) {
// 工作时间外拒绝连接
return {
"error": {
"code": 1226,
"message": "User access denied outside business hours",
"sql_state": "42000"
}
};
}
return {
"auth": {
"username": "business_user",
"password": "work_password"
}
};
}
}
})
2.2 语句处理API详解
2.2.1 静态响应配置
/**
* @brief 静态语句响应配置
*/
({
"stmts": [
{
"stmt": "SELECT @@version",
"exec_time": 500, // 500微秒执行时间
"result": {
"columns": [{
"catalog": "def",
"schema": "",
"table": "",
"orig_table": "",
"name": "@@version",
"orig_name": "",
"character_set": 33, // utf8_general_ci
"length": 255,
"type": "VAR_STRING",
"flags": 1, // NOT_NULL
"decimals": 31
}],
"rows": [["8.0.28-mock-test"]],
"affected_rows": 0,
"last_insert_id": 0,
"status": 0,
"warning_count": 0
}
},
{
"stmt": "INSERT INTO test_table VALUES (1, 'test')",
"ok": {
"affected_rows": 1,
"last_insert_id": 1,
"status": 0x0002, // SERVER_STATUS_AUTOCOMMIT
"warning_count": 0,
"message": "",
"session_trackers": [
{
"type": "system_variable",
"name": "autocommit",
"value": "ON"
}
]
}
},
{
"stmt": "SELECT * FROM nonexistent_table",
"error": {
"code": 1146,
"message": "Table 'test.nonexistent_table' doesn't exist",
"sql_state": "42S02"
}
}
]
})
2.2.2 动态语句处理
/**
* @brief 动态语句处理示例
*/
({
"stmts": function(stmt) {
// 模拟数据库状态
if (!mysqld.global.table_created) {
mysqld.global.table_created = false;
mysqld.global.row_count = 0;
}
// DDL语句处理
if (stmt.match(/^CREATE TABLE/i)) {
mysqld.global.table_created = true;
return {
"ok": {
"affected_rows": 0,
"message": "Table created successfully"
}
};
}
// DML语句处理
if (stmt.match(/^INSERT INTO/i)) {
if (!mysqld.global.table_created) {
return {
"error": {
"code": 1146,
"message": "Table doesn't exist",
"sql_state": "42S02"
}
};
}
mysqld.global.row_count++;
return {
"ok": {
"affected_rows": 1,
"last_insert_id": mysqld.global.row_count
}
};
}
// 查询语句处理
if (stmt.match(/^SELECT COUNT\(\*\) FROM/i)) {
return {
"result": {
"columns": [{"name": "COUNT(*)", "type": "LONGLONG"}],
"rows": [[mysqld.global.row_count.toString()]]
}
};
}
// 事务处理
if (stmt.match(/^(BEGIN|START TRANSACTION)/i)) {
mysqld.global.in_transaction = true;
return {"ok": {}};
}
if (stmt.match(/^(COMMIT|ROLLBACK)/i)) {
mysqld.global.in_transaction = false;
return {
"ok": {
"status": stmt.match(/ROLLBACK/i) ? 0 : 0x0002 // AUTOCOMMIT flag
}
};
}
// 默认错误响应
return {
"error": {
"code": 1064,
"message": "Unsupported statement: " + stmt,
"sql_state": "42000"
}
};
}
})
2.2.3 复杂查询模拟
/**
* @brief 复杂查询处理示例
*/
({
"stmts": function(stmt) {
// 解析SQL语句
var parsed = parseSQL(stmt);
if (parsed.type === 'SELECT') {
// 模拟不同的查询结果
if (parsed.from === 'users') {
return generateUserData(parsed);
} else if (parsed.from === 'orders') {
return generateOrderData(parsed);
} else if (parsed.from === 'performance_schema.replication_group_members') {
// 模拟Group Replication状态
return {
"result": {
"columns": [
{"name": "member_id", "type": "STRING"},
{"name": "member_host", "type": "STRING"},
{"name": "member_port", "type": "LONG"},
{"name": "member_state", "type": "STRING"},
{"name": "member_role", "type": "STRING"}
],
"rows": [
["550fa9ee-a1f8-4b6d-9bfe-c03ed5d30654", "mysql-node1", 3306, "ONLINE", "PRIMARY"],
["6091e3d1-b2f2-4c5d-a3e4-d04f6e7a8b9c", "mysql-node2", 3306, "ONLINE", "SECONDARY"],
["7182f4e2-c3f3-5d6e-b4f5-e15g7f8a9c0d", "mysql-node3", 3306, "ONLINE", "SECONDARY"]
]
}
};
}
}
// 辅助函数:简单SQL解析
function parseSQL(sql) {
var normalized = sql.toLowerCase().trim();
if (normalized.startsWith('select')) {
var fromMatch = normalized.match(/from\s+([^\s\;]+)/);
return {
type: 'SELECT',
from: fromMatch ? fromMatch[1] : null
};
}
return {type: 'UNKNOWN'};
}
// 生成用户数据
function generateUserData(parsed) {
var users = [
[1, "admin", "admin@example.com", "2023-01-01 10:00:00"],
[2, "user1", "user1@example.com", "2023-01-02 11:00:00"],
[3, "user2", "user2@example.com", "2023-01-03 12:00:00"]
];
return {
"result": {
"columns": [
{"name": "id", "type": "LONG"},
{"name": "username", "type": "STRING"},
{"name": "email", "type": "STRING"},
{"name": "created_at", "type": "DATETIME"}
],
"rows": users
}
};
}
// 生成订单数据
function generateOrderData(parsed) {
return {
"result": {
"columns": [
{"name": "order_id", "type": "LONG"},
{"name": "user_id", "type": "LONG"},
{"name": "amount", "type": "DECIMAL"},
{"name": "status", "type": "STRING"}
],
"rows": [
[1001, 1, "99.99", "completed"],
[1002, 2, "149.99", "pending"],
[1003, 1, "79.99", "completed"]
]
}
};
}
return {
"error": {
"code": 1146,
"message": "Table '" + (parsed.from || 'unknown') + "' doesn't exist",
"sql_state": "42S02"
}
};
}
})
2.3 REST API使用示例
2.3.1 HTTP接口配置
# 启动支持REST API的Mock Server
./mysql_server_mock \
--filename=test.js \
--port=3306 \
--http-port=8080 \
--verbose
// JavaScript中通过REST API动态修改行为
({
"stmts": function(stmt) {
// 检查全局状态
var maintenance_mode = mysqld.global.maintenance_mode || false;
if (maintenance_mode && !stmt.match(/^(SHOW|SELECT @@)/i)) {
return {
"error": {
"code": 1105,
"message": "Server is in maintenance mode",
"sql_state": "HY000"
}
};
}
// 正常处理
return handle_normal_statement(stmt);
}
})
2.3.2 REST API客户端示例
#!/usr/bin/env python3
"""
MySQL Mock Server REST API客户端示例
"""
import requests
import json
import time
class MockServerClient:
def __init__(self, base_url="http://localhost:8080"):
self.base_url = base_url
def set_global_variable(self, key, value):
"""设置全局变量"""
url = f"{self.base_url}/api/v1/mock/globals/{key}"
response = requests.put(url, json={"value": value})
return response.status_code == 200
def get_global_variable(self, key):
"""获取全局变量"""
url = f"{self.base_url}/api/v1/mock/globals/{key}"
response = requests.get(url)
if response.status_code == 200:
return response.json()["value"]
return None
def enable_maintenance_mode(self):
"""启用维护模式"""
return self.set_global_variable("maintenance_mode", True)
def disable_maintenance_mode(self):
"""禁用维护模式"""
return self.set_global_variable("maintenance_mode", False)
def get_connection_stats(self):
"""获取连接统计"""
url = f"{self.base_url}/api/v1/mock/stats/connections"
response = requests.get(url)
if response.status_code == 200:
return response.json()
return None
# 使用示例
def main():
client = MockServerClient()
print("启用维护模式...")
client.enable_maintenance_mode()
print("等待5秒...")
time.sleep(5)
print("禁用维护模式...")
client.disable_maintenance_mode()
stats = client.get_connection_stats()
if stats:
print(f"当前连接数: {stats['active_connections']}")
print(f"总连接数: {stats['total_connections']}")
if __name__ == "__main__":
main()
# 使用curl操作REST API
# 获取服务器状态
curl -X GET http://localhost:8080/api/v1/mock/status
# 设置全局变量
curl -X PUT http://localhost:8080/api/v1/mock/globals/test_mode \
-H "Content-Type: application/json" \
-d '{"value": true}'
# 获取连接统计
curl -X GET http://localhost:8080/api/v1/mock/stats/connections
# 重新加载配置
curl -X POST http://localhost:8080/api/v1/mock/reload
# 获取所有全局变量
curl -X GET http://localhost:8080/api/v1/mock/globals
3. 测试场景实战案例
3.1 MySQL Router Bootstrap测试
/**
* @brief MySQL Router Bootstrap场景测试
*/
({
"handshake": {
"greeting": {
"server_version": "8.0.28",
"connection_id": 10
},
"auth": {
"username": "root",
"password": "root_password"
}
},
"stmts": [
// 检查schema版本
{
"stmt": "SELECT * FROM mysql_innodb_cluster_metadata.schema_version",
"result": {
"columns": [
{"name": "major", "type": "LONG"},
{"name": "minor", "type": "LONG"},
{"name": "patch", "type": "LONG"}
],
"rows": [["2", "1", "0"]]
}
},
// 检查集群存在
{
"stmt": "SELECT ((SELECT count(*) FROM mysql_innodb_cluster_metadata.clusters) <= 1 AND (SELECT count(*) FROM mysql_innodb_cluster_metadata.replicasets) <= 1) as has_one_replicaset, (SELECT attributes->>'$.adopted' FROM mysql_innodb_cluster_metadata.clusters) as adopted",
"result": {
"columns": [
{"name": "has_one_replicaset", "type": "LONG"},
{"name": "adopted", "type": "STRING"}
],
"rows": [["1", "null"]]
}
},
// 获取集群信息
{
"stmt": "SELECT c.cluster_id, c.cluster_name, c.description, c.options, c.attributes FROM mysql_innodb_cluster_metadata.clusters c",
"result": {
"columns": [
{"name": "cluster_id", "type": "STRING"},
{"name": "cluster_name", "type": "STRING"},
{"name": "description", "type": "STRING"},
{"name": "options", "type": "JSON"},
{"name": "attributes", "type": "JSON"}
],
"rows": [
["550fa9ee-a1f8-4b6d-9bfe-c03ed5d30654", "testCluster", "Test InnoDB Cluster", "{}", "{}"]
]
}
},
// 获取复制集信息
{
"stmt.regex": "SELECT R.replicaset_name, I.mysql_server_uuid, H.location, I.addresses.*",
"result": {
"columns": [
{"name": "replicaset_name", "type": "STRING"},
{"name": "mysql_server_uuid", "type": "STRING"},
{"name": "location", "type": "STRING"},
{"name": "addresses", "type": "JSON"}
],
"rows": [
["default", "550fa9ee-a1f8-4b6d-9bfe-c03ed5d30654", "", "{\"mysqlClassic\": \"localhost:3306\", \"mysqlX\": \"localhost:33060\"}"],
["default", "6091e3d1-b2f2-4c5d-a3e4-d04f6e7a8b9c", "", "{\"mysqlClassic\": \"localhost:3307\", \"mysqlX\": \"localhost:33070\"}"],
["default", "7182f4e2-c3f3-5d6e-b4f5-e15g7f8a9c0d", "", "{\"mysqlClassic\": \"localhost:3308\", \"mysqlX\": \"localhost:33080\"}"]
]
}
}
]
})
3.2 Group Replication状态模拟
/**
* @brief Group Replication状态变化模拟
*/
({
"stmts": function(stmt) {
// 初始化集群状态
if (!mysqld.global.group_members) {
mysqld.global.group_members = [
{
uuid: "550fa9ee-a1f8-4b6d-9bfe-c03ed5d30654",
host: "mysql-node1",
port: 3306,
state: "ONLINE",
role: "PRIMARY"
},
{
uuid: "6091e3d1-b2f2-4c5d-a3e4-d04f6e7a8b9c",
host: "mysql-node2",
port: 3306,
state: "ONLINE",
role: "SECONDARY"
},
{
uuid: "7182f4e2-c3f3-5d6e-b4f5-e15g7f8a9c0d",
host: "mysql-node3",
port: 3306,
state: "RECOVERING", // 模拟恢复状态
role: "SECONDARY"
}
];
mysqld.global.primary_member = "550fa9ee-a1f8-4b6d-9bfe-c03ed5d30654";
mysqld.global.failover_count = 0;
}
// 查询Group Replication成员
if (stmt.match(/SELECT.*FROM performance_schema\.replication_group_members/i)) {
// 模拟节点状态变化
simulateNodeStateChanges();
var rows = mysqld.global.group_members.map(function(member) {
return [
member.uuid,
member.host,
member.port,
member.state,
member.role === "PRIMARY" ? "1" : "0"
];
});
return {
"result": {
"columns": [
{"name": "member_id", "type": "STRING"},
{"name": "member_host", "type": "STRING"},
{"name": "member_port", "type": "LONG"},
{"name": "member_state", "type": "STRING"},
{"name": "@@group_replication_single_primary_mode", "type": "STRING"}
],
"rows": rows
}
};
}
// 查询Primary成员
if (stmt.match(/show status like 'group_replication_primary_member'/i)) {
return {
"result": {
"columns": [
{"name": "Variable_name", "type": "STRING"},
{"name": "Value", "type": "STRING"}
],
"rows": [["group_replication_primary_member", mysqld.global.primary_member]]
}
};
}
// 模拟节点状态变化的函数
function simulateNodeStateChanges() {
var now = new Date().getTime();
// 每30秒模拟一次状态变化
if (!mysqld.global.last_state_change || (now - mysqld.global.last_state_change) > 30000) {
mysqld.global.last_state_change = now;
// 随机事件模拟
var event = Math.random();
if (event < 0.1) {
// 10%概率:Primary故障切换
simulateFailover();
} else if (event < 0.2) {
// 10%概率:节点恢复
simulateNodeRecovery();
} else if (event < 0.25) {
// 5%概率:节点故障
simulateNodeFailure();
}
}
}
function simulateFailover() {
// 找到当前Primary
var primaryIndex = mysqld.global.group_members.findIndex(
function(m) { return m.role === "PRIMARY"; }
);
if (primaryIndex !== -1) {
// 当前Primary变为不可用
mysqld.global.group_members[primaryIndex].state = "ERROR";
mysqld.global.group_members[primaryIndex].role = "SECONDARY";
// 选择新的Primary(第一个ONLINE的SECONDARY)
for (var i = 0; i < mysqld.global.group_members.length; i++) {
if (mysqld.global.group_members[i].state === "ONLINE" &&
mysqld.global.group_members[i].role === "SECONDARY") {
mysqld.global.group_members[i].role = "PRIMARY";
mysqld.global.primary_member = mysqld.global.group_members[i].uuid;
mysqld.global.failover_count++;
break;
}
}
}
}
function simulateNodeRecovery() {
// 找到ERROR状态的节点进行恢复
for (var i = 0; i < mysqld.global.group_members.length; i++) {
if (mysqld.global.group_members[i].state === "ERROR") {
mysqld.global.group_members[i].state = "RECOVERING";
// 延迟恢复到ONLINE状态
setTimeout(function(index) {
return function() {
if (mysqld.global.group_members[index]) {
mysqld.global.group_members[index].state = "ONLINE";
}
};
}(i), 10000); // 10秒后恢复
break;
}
}
}
function simulateNodeFailure() {
// 随机选择一个SECONDARY节点故障
var secondaries = mysqld.global.group_members.filter(
function(m) { return m.role === "SECONDARY" && m.state === "ONLINE"; }
);
if (secondaries.length > 0) {
var randomSecondary = secondaries[Math.floor(Math.random() * secondaries.length)];
randomSecondary.state = "ERROR";
}
}
return {
"error": {
"code": 1064,
"message": "Unknown statement: " + stmt,
"sql_state": "42000"
}
};
}
})
3.3 性能测试场景
/**
* @brief 性能测试场景配置
*/
({
"stmts": function(stmt) {
// 初始化性能计数器
if (!mysqld.global.perf_counters) {
mysqld.global.perf_counters = {
queries_total: 0,
queries_per_second: 0,
slow_queries: 0,
last_second: Math.floor(Date.now() / 1000)
};
}
var counters = mysqld.global.perf_counters;
var current_second = Math.floor(Date.now() / 1000);
// 重置每秒计数器
if (current_second !== counters.last_second) {
counters.queries_per_second = 0;
counters.last_second = current_second;
}
counters.queries_total++;
counters.queries_per_second++;
// 模拟慢查询
var is_slow_query = stmt.match(/SELECT.*SLEEP\(|SELECT.*BENCHMARK\(|SELECT.*HEAVY_COMPUTATION/i);
var exec_time = 0;
if (is_slow_query) {
counters.slow_queries++;
exec_time = 2000000; // 2秒
} else if (stmt.match(/SELECT/i)) {
exec_time = Math.random() * 10000; // 0-10ms随机延迟
}
// 性能统计查询
if (stmt.match(/SHOW.*STATUS.*LIKE.*Queries/i)) {
return {
"exec_time": exec_time,
"result": {
"columns": [
{"name": "Variable_name", "type": "STRING"},
{"name": "Value", "type": "STRING"}
],
"rows": [
["Queries", counters.queries_total.toString()],
["Queries_per_second", counters.queries_per_second.toString()],
["Slow_queries", counters.slow_queries.toString()]
]
}
};
}
// 模拟大结果集查询
if (stmt.match(/SELECT.*FROM.*large_table/i)) {
var row_count = 10000;
var rows = [];
for (var i = 1; i <= row_count; i++) {
rows.push([
i.toString(),
"data_" + i,
(Math.random() * 1000).toFixed(2),
new Date().toISOString()
]);
}
return {
"exec_time": 50000, // 50ms
"result": {
"columns": [
{"name": "id", "type": "LONG"},
{"name": "name", "type": "STRING"},
{"name": "value", "type": "DECIMAL"},
{"name": "created_at", "type": "DATETIME"}
],
"rows": rows,
"affected_rows": row_count
}
};
}
// 模拟连接池压力测试
if (stmt.match(/SELECT.*CONNECTION_ID\(\)/i)) {
return {
"exec_time": exec_time,
"result": {
"columns": [{"name": "CONNECTION_ID()", "type": "LONG"}],
"rows": [[Math.floor(Math.random() * 1000000).toString()]]
}
};
}
// 默认响应
return {
"exec_time": exec_time,
"ok": {
"affected_rows": Math.floor(Math.random() * 10),
"message": "Query executed successfully"
}
};
}
})
4. 集成测试实践
4.1 自动化测试框架
#!/usr/bin/env python3
"""
MySQL Mock Server 自动化测试框架
"""
import subprocess
import time
import mysql.connector
import threading
import json
import os
from contextlib import contextmanager
class MockServerTestFramework:
def __init__(self, mock_binary_path="./mysql_server_mock"):
self.mock_binary = mock_binary_path
self.mock_process = None
self.test_results = []
@contextmanager
def mock_server(self, config_file, port=3306, **kwargs):
"""启动Mock Server的上下文管理器"""
cmd = [
self.mock_binary,
f"--filename={config_file}",
f"--port={port}",
"--verbose"
]
# 添加额外参数
for key, value in kwargs.items():
cmd.append(f"--{key.replace('_', '-')}={value}")
try:
print(f"启动Mock Server: {' '.join(cmd)}")
self.mock_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 等待服务器启动
time.sleep(2)
if self.mock_process.poll() is not None:
stdout, stderr = self.mock_process.communicate()
raise RuntimeError(f"Mock server failed to start: {stderr.decode()}")
yield port
finally:
if self.mock_process:
self.mock_process.terminate()
self.mock_process.wait(timeout=10)
self.mock_process = None
def create_mysql_connection(self, port, user="testuser", password="testpass",
database=None, **kwargs):
"""创建MySQL连接"""
config = {
'host': 'localhost',
'port': port,
'user': user,
'password': password,
'autocommit': True,
**kwargs
}
if database:
config['database'] = database
return mysql.connector.connect(**config)
def run_test_case(self, test_func, config_file, description=""):
"""运行单个测试用例"""
start_time = time.time()
result = {
'test': test_func.__name__,
'description': description,
'status': 'FAILED',
'duration': 0,
'error': None
}
try:
with self.mock_server(config_file) as port:
test_func(port)
result['status'] = 'PASSED'
except Exception as e:
result['error'] = str(e)
finally:
result['duration'] = time.time() - start_time
self.test_results.append(result)
print(f"测试 {test_func.__name__}: {result['status']} "
f"({result['duration']:.2f}s)")
if result['error']:
print(f" 错误: {result['error']}")
return result['status'] == 'PASSED'
def generate_report(self, output_file="test_report.json"):
"""生成测试报告"""
total_tests = len(self.test_results)
passed_tests = sum(1 for r in self.test_results if r['status'] == 'PASSED')
failed_tests = total_tests - passed_tests
report = {
'summary': {
'total': total_tests,
'passed': passed_tests,
'failed': failed_tests,
'success_rate': (passed_tests / total_tests * 100) if total_tests > 0 else 0
},
'results': self.test_results
}
with open(output_file, 'w') as f:
json.dump(report, f, indent=2)
print(f"\n测试报告已生成: {output_file}")
print(f"测试总数: {total_tests}, 通过: {passed_tests}, 失败: {failed_tests}")
print(f"成功率: {report['summary']['success_rate']:.1f}%")
return report
# 测试用例示例
def test_basic_connection(port):
"""测试基本连接"""
framework = MockServerTestFramework()
conn = framework.create_mysql_connection(port)
cursor = conn.cursor()
cursor.execute("SELECT @@version")
result = cursor.fetchone()
assert result[0] == "8.0.28-mock-test", f"Unexpected version: {result[0]}"
conn.close()
def test_authentication_failure(port):
"""测试认证失败"""
framework = MockServerTestFramework()
try:
conn = framework.create_mysql_connection(port, user="wronguser", password="wrongpass")
conn.close()
assert False, "Should have failed authentication"
except mysql.connector.Error as e:
assert e.errno == 1045, f"Expected error 1045, got {e.errno}"
def test_query_execution(port):
"""测试查询执行"""
framework = MockServerTestFramework()
conn = framework.create_mysql_connection(port)
cursor = conn.cursor()
# 测试INSERT
cursor.execute("INSERT INTO test_table VALUES (1, 'test')")
assert cursor.rowcount == 1
# 测试SELECT
cursor.execute("SELECT COUNT(*) FROM test_table")
result = cursor.fetchone()
assert result[0] > 0
conn.close()
def test_error_handling(port):
"""测试错误处理"""
framework = MockServerTestFramework()
conn = framework.create_mysql_connection(port)
cursor = conn.cursor()
try:
cursor.execute("SELECT * FROM nonexistent_table")
assert False, "Should have raised an error"
except mysql.connector.Error as e:
assert e.errno == 1146, f"Expected error 1146, got {e.errno}"
conn.close()
# 性能测试
def test_concurrent_connections(port):
"""测试并发连接"""
framework = MockServerTestFramework()
def worker_thread(thread_id):
try:
conn = framework.create_mysql_connection(port)
cursor = conn.cursor()
for i in range(10):
cursor.execute("SELECT CONNECTION_ID()")
cursor.fetchone()
conn.close()
except Exception as e:
raise RuntimeError(f"Thread {thread_id} failed: {e}")
threads = []
for i in range(10):
t = threading.Thread(target=worker_thread, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
# 主测试流程
def main():
# 创建测试配置文件
test_config = {
"handshake": {
"greeting": {"server_version": "8.0.28-mock-test"},
"auth": {"username": "testuser", "password": "testpass"}
},
"stmts": [
{
"stmt": "SELECT @@version",
"result": {
"columns": [{"name": "@@version", "type": "STRING"}],
"rows": [["8.0.28-mock-test"]]
}
},
{
"stmt": "INSERT INTO test_table VALUES (1, 'test')",
"ok": {"affected_rows": 1}
},
{
"stmt": "SELECT COUNT(*) FROM test_table",
"result": {
"columns": [{"name": "COUNT(*)", "type": "LONGLONG"}],
"rows": [["1"]]
}
},
{
"stmt": "SELECT * FROM nonexistent_table",
"error": {
"code": 1146,
"message": "Table 'test.nonexistent_table' doesn't exist",
"sql_state": "42S02"
}
},
{
"stmt": "SELECT CONNECTION_ID()",
"result": {
"columns": [{"name": "CONNECTION_ID()", "type": "LONG"}],
"rows": [["42"]]
}
}
]
}
config_file = "test_config.js"
with open(config_file, 'w') as f:
f.write(f"({json.dumps(test_config, indent=2)})")
# 运行测试
framework = MockServerTestFramework()
try:
framework.run_test_case(test_basic_connection, config_file, "基本连接测试")
framework.run_test_case(test_query_execution, config_file, "查询执行测试")
framework.run_test_case(test_error_handling, config_file, "错误处理测试")
framework.run_test_case(test_concurrent_connections, config_file, "并发连接测试")
# 认证失败测试需要特殊配置
auth_fail_config = test_config.copy()
auth_fail_config["handshake"]["auth"] = {"username": "admin", "password": "secret"}
auth_config_file = "auth_test_config.js"
with open(auth_config_file, 'w') as f:
f.write(f"({json.dumps(auth_fail_config, indent=2)})")
framework.run_test_case(test_authentication_failure, auth_config_file, "认证失败测试")
finally:
# 清理配置文件
if os.path.exists(config_file):
os.remove(config_file)
if os.path.exists(auth_config_file):
os.remove(auth_config_file)
# 生成报告
framework.generate_report()
if __name__ == "__main__":
main()
4.2 CI/CD集成示例
# .github/workflows/mock-server-test.yml
name: Mock Server Tests
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
build-and-test:
runs-on: ubuntu-latest
strategy:
matrix:
compiler: [gcc, clang]
build_type: [Debug, Release]
steps:
- uses: actions/checkout@v2
with:
submodules: recursive
- name: Install Dependencies
run: |
sudo apt-get update
sudo apt-get install -y \
build-essential \
cmake \
libssl-dev \
libduktape-dev \
python3-pip \
mysql-client
pip3 install mysql-connector-python
- name: Build Mock Server
run: |
cd router
mkdir build && cd build
cmake .. \
-DCMAKE_BUILD_TYPE=${{ matrix.build_type }} \
-DCMAKE_CXX_COMPILER=${{ matrix.compiler == 'clang' && 'clang++' || 'g++' }}
make mysql_server_mock -j$(nproc)
- name: Run Unit Tests
run: |
cd router/build
ctest --output-on-failure -R mock_server
- name: Run Integration Tests
run: |
cd router/build
python3 ../../tests/integration/mock_server_test.py
- name: Run Performance Tests
run: |
cd router/build
python3 ../../tests/performance/mock_server_perf.py
- name: Upload Test Results
uses: actions/upload-artifact@v2
if: always()
with:
name: test-results-${{ matrix.compiler }}-${{ matrix.build_type }}
path: |
router/build/test_report.json
router/build/performance_report.json
# 测试容器Dockerfile
FROM mysql:8.0 AS mysql-base
FROM ubuntu:22.04
# 复制MySQL客户端
COPY --from=mysql-base /usr/bin/mysql /usr/bin/mysql
COPY --from=mysql-base /usr/lib/x86_64-linux-gnu/libmysqlclient.so* /usr/lib/x86_64-linux-gnu/
# 安装测试依赖
RUN apt-get update && apt-get install -y \
python3 \
python3-pip \
&& pip3 install mysql-connector-python pytest \
&& rm -rf /var/lib/apt/lists/*
# 复制Mock Server二进制
COPY mysql_server_mock /usr/local/bin/
COPY test_scenarios/ /opt/test_scenarios/
COPY tests/ /opt/tests/
WORKDIR /opt/tests
CMD ["python3", "-m", "pytest", ".", "-v"]
5. 总结
MySQL Router Mock Server通过其精心设计的架构和丰富的功能,为MySQL生态系统的测试提供了强大的支持。本文档详细分析了其:
5.1 核心价值
- 测试效率提升:将测试setup时间从20秒降低到毫秒级
- 错误场景覆盖:轻松模拟各种异常情况
- 隔离性测试:消除外部依赖,确保测试结果稳定
- 成本降低:减少测试环境的资源需求
5.2 技术亮点
- 现代C++设计:充分利用C++17/20特性,代码简洁高效
- 异步IO架构:基于io_context的高性能网络处理
- JavaScript集成:提供灵活的脚本化配置能力
- 协议完整性:完整实现MySQL Client/Server协议
5.3 最佳实践总结
- RAII资源管理:确保异常安全和资源正确释放
- 对象池化:优化频繁创建销毁的性能开销
- 错误码vs异常:在合适的场景选择合适的错误处理方式
- 线程安全设计:WaitableMonitor等工具类提供安全的并发访问
5.4 应用场景
- MySQL Router测试:Bootstrap、配置验证、故障切换等
- 应用程序测试:数据库层集成测试、性能测试
- CI/CD流水线:自动化测试环境搭建
- 培训和演示:MySQL协议学习、故障模拟演示
5.5 扩展方向
- 协议支持扩展:可添加更多数据库协议支持
- 云原生集成:Kubernetes Operator、服务网格集成
- 监控集成:Prometheus指标、分布式追踪支持
- 配置管理:热更新、版本控制、模板化配置
通过深入理解MySQL Router Mock Server的架构设计和实现细节,开发者可以:
- 更好地利用其功能进行高效测试
- 参与项目的开发和改进
- 借鉴其设计思想用于其他项目
- 构建更加可靠的测试基础设施
这个Mock Server项目体现了现代软件工程的最佳实践,为整个MySQL生态系统的质量保证做出了重要贡献。