给 MariaDB 写一个生产级 MCP Server
从零搭建一个带安全护栏、行数上限、审计日志、schema 自省工具的 MariaDB MCP server
目标:构建一个让 Claude / Cursor / Windsurf 等 AI 客户端能"安全地"操作 MariaDB 的 MCP server。
重点不是"能跑",而是生产级:注入防护、行数上限、审计日志、按 schema 自动生成工具。
为什么写这一篇
社区里能找到的 MariaDB / MySQL MCP server 大多停留在"Demo"水平:
| 问题 | Demo 实现 | 生产实现 |
|---|---|---|
| SQL 注入 | 直接 db.query(prompt) | 强制 prepared statement |
| 误删 | 没有任何拦截 | 行数上限 + 危险语句白名单 |
| 审计 | 写到 stdout | 写到独立审计库,带 LLM 标识 |
| Schema 自省 | 没有 | 自动生成 typed tools |
| 权限 | 用 root | 强制只读账号 + 按表授权 |
| 长连接 | 单连接 | 连接池 + 超时 |
| 多租户 | 一个进程一个库 | 上下文按租户隔离 |
下面这套是我自己用 TypeScript + @modelcontextprotocol/sdk + mysql2 写的,已在小流量生产跑了 3 个月。
整体架构
+-----------------+ stdio +------------------+
| Claude Desktop | <----- JSON-RPC ---> | MCP Server |
| (or Cursor) | | (this code) |
+-----------------+ +---------+--------+
|
+---------v--------+
| Guard |
| - parser |
| - row-limit |
| - deny-list |
+---------+--------+
|
+-----------------+----------------+
| |
+-------v-------+ +-------v-------+
| MariaDB (RO) | | MariaDB (audit) |
| app schema | | tools / logs |
+---------------+ +---------------+Step-by-step
项目骨架
mkdir mariadb-mcp && cd mariadb-mcp
pnpm init
pnpm add @modelcontextprotocol/sdk mysql2 zod node-sql-parser
pnpm add -D typescript tsx @types/node
echo '{"compilerOptions":{"target":"es2022","module":"esnext","moduleResolution":"bundler","strict":true,"esModuleInterop":true}}' > tsconfig.json连接池 + 双账号
// src/db.ts
import { createPool, type Pool } from 'mysql2/promise';
export const readPool: Pool = createPool({
uri: process.env.MARIADB_READ_URL!, // 只读账号
connectionLimit: 5,
connectTimeout: 5_000,
enableKeepAlive: true,
});
export const auditPool: Pool = createPool({
uri: process.env.MARIADB_AUDIT_URL!, // 写入审计库的专用账号
connectionLimit: 2,
});
await auditPool.query(`
CREATE TABLE IF NOT EXISTS mcp_audit (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
ts DATETIME(6) DEFAULT CURRENT_TIMESTAMP(6),
client VARCHAR(64),
tool VARCHAR(64),
sql_text TEXT,
args_json JSON,
rows_returned INT,
error TEXT,
INDEX idx_ts (ts),
INDEX idx_client (client)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='MCP 工具调用审计'
`);安全护栏(核心)
// src/guard.ts
import { Parser } from 'node-sql-parser';
const parser = new Parser();
const DENY = new Set(['drop', 'truncate', 'alter', 'rename', 'grant', 'revoke', 'create_user']);
const WRITE = new Set(['insert', 'update', 'delete', 'replace']);
export interface GuardResult {
ok: boolean;
reason?: string;
type: 'select' | 'write' | 'ddl' | 'unknown';
}
export function guard(sql: string, opts: { allowWrites: boolean; maxRowsAffected: number }): GuardResult {
let ast;
try {
ast = parser.astify(sql, { database: 'MySQL' });
} catch (e: any) {
return { ok: false, reason: `parse error: ${e.message}`, type: 'unknown' };
}
const stmts = Array.isArray(ast) ? ast : [ast];
if (stmts.length > 1) return { ok: false, reason: 'multi-statement not allowed', type: 'unknown' };
const t: string = stmts[0].type;
if (DENY.has(t)) return { ok: false, reason: `statement type "${t}" denied`, type: 'ddl' };
if (WRITE.has(t)) {
if (!opts.allowWrites) return { ok: false, reason: 'writes disabled', type: 'write' };
if (!sql.toLowerCase().includes('where')) {
return { ok: false, reason: 'write without WHERE clause is rejected', type: 'write' };
}
return { ok: true, type: 'write' };
}
return { ok: true, type: 'select' };
}永远不要 trust LLM 生成的 SQL。即使你的 prompt 写得再好,也总有 prompt-injection 的可能。这里的护栏是最后一道防线,不要省。
MCP server 主体
// src/server.ts
import { McpServer, ResourceTemplate } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import { z } from 'zod';
import { readPool, auditPool } from './db.js';
import { guard } from './guard.js';
const ALLOW_WRITES = process.env.MCP_ALLOW_WRITES === '1';
const MAX_ROWS = Number(process.env.MCP_MAX_ROWS ?? 500);
const server = new McpServer({ name: 'mariadb-mcp', version: '1.0.0' });
// ---- tool: list_tables ----
server.tool('list_tables', 'List tables in current database', {}, async () => {
const [rows] = await readPool.query<any[]>(
`SELECT table_name AS name, table_rows, table_comment AS description
FROM information_schema.tables
WHERE table_schema = DATABASE()`
);
return { content: [{ type: 'text', text: JSON.stringify(rows, null, 2) }] };
});
// ---- tool: describe_table ----
server.tool(
'describe_table',
'Get a table\'s columns, indexes and comments',
{ table: z.string() },
async ({ table }) => {
const [cols] = await readPool.query<any[]>(
`SELECT column_name, column_type, is_nullable, column_key, column_default, column_comment
FROM information_schema.columns
WHERE table_schema = DATABASE() AND table_name = ?`,
[table],
);
const [idx] = await readPool.query<any[]>(`SHOW INDEXES FROM \`${table}\``);
return {
content: [
{ type: 'text', text: JSON.stringify({ columns: cols, indexes: idx }, null, 2) },
],
};
},
);
// ---- tool: run_query ----
server.tool(
'run_query',
'Run a single SQL statement (guarded)',
{ sql: z.string(), limit: z.number().int().positive().max(MAX_ROWS).default(50) },
async ({ sql, limit }) => {
const check = guard(sql, { allowWrites: ALLOW_WRITES, maxRowsAffected: MAX_ROWS });
if (!check.ok) {
await audit('run_query', sql, { limit }, 0, check.reason);
return { content: [{ type: 'text', text: `BLOCKED: ${check.reason}` }], isError: true };
}
try {
const finalSql = check.type === 'select' ? `${sql} LIMIT ${limit}` : sql;
const [rows] = await readPool.query<any[]>(finalSql);
const count = Array.isArray(rows) ? rows.length : (rows as any).affectedRows ?? 0;
await audit('run_query', sql, { limit }, count);
return { content: [{ type: 'text', text: JSON.stringify(rows, null, 2) }] };
} catch (e: any) {
await audit('run_query', sql, { limit }, 0, e.message);
return { content: [{ type: 'text', text: `ERROR: ${e.message}` }], isError: true };
}
},
);
async function audit(
tool: string,
sql: string,
args: object,
rows: number,
error?: string,
) {
await auditPool.query(
`INSERT INTO mcp_audit (client, tool, sql_text, args_json, rows_returned, error) VALUES (?, ?, ?, ?, ?, ?)`,
[process.env.MCP_CLIENT ?? 'unknown', tool, sql, JSON.stringify(args), rows, error ?? null],
);
}
await server.connect(new StdioServerTransport());客户端配置
~/Library/Application Support/Claude/claude_desktop_config.json:
{
"mcpServers": {
"mariadb": {
"command": "node",
"args": ["/abs/path/to/mariadb-mcp/dist/server.js"],
"env": {
"MARIADB_READ_URL": "mysql://ai_readonly:xxx@host:3306/app",
"MARIADB_AUDIT_URL": "mysql://ai_audit:yyy@host:3306/audit",
"MCP_ALLOW_WRITES": "0",
"MCP_MAX_ROWS": "500",
"MCP_CLIENT": "claude-desktop"
}
}
}
}~/.cursor/mcp.json,结构一致。
Windsurf 设置 → MCP Servers → Add,同样 stdio 配置。
给 MariaDB 配只读 + 审计账号
-- 只读账号(给 AI 用)
CREATE USER 'ai_readonly'@'%' IDENTIFIED BY 'strong-pass-1';
GRANT SELECT, SHOW VIEW ON app.* TO 'ai_readonly'@'%';
-- 审计账号
CREATE DATABASE IF NOT EXISTS audit;
CREATE USER 'ai_audit'@'%' IDENTIFIED BY 'strong-pass-2';
GRANT INSERT, SELECT ON audit.* TO 'ai_audit'@'%';
FLUSH PRIVILEGES;测试
# 列工具
echo '{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}' | node dist/server.js
# 跑一个查询
echo '{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"run_query","arguments":{"sql":"SELECT 1+1 AS r"}}}' | node dist/server.js
# 尝试 DROP(应该被拦截)
echo '{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"run_query","arguments":{"sql":"DROP TABLE users"}}}' | node dist/server.js
# → BLOCKED: statement type "drop" denied进阶:按 schema 自动生成 typed tools
上面的 run_query 暴露的是"通用 SQL"接口。更安全的姿势是按表生成专属工具:
// 启动时扫描 schema,为每张表生成 get_<table>_by_<pk> / list_<table> 等工具
const [tables] = await readPool.query<any[]>(
`SELECT table_name FROM information_schema.tables WHERE table_schema = DATABASE()`,
);
for (const { table_name: t } of tables) {
const [pk] = await readPool.query<any[]>(
`SELECT column_name FROM information_schema.key_column_usage
WHERE table_schema = DATABASE() AND table_name = ? AND constraint_name = 'PRIMARY'`,
[t],
);
if (pk[0]) {
server.tool(
`get_${t}_by_${pk[0].column_name}`,
`Fetch one row from ${t} by its primary key`,
{ id: z.union([z.string(), z.number()]) },
async ({ id }) => {
const [rows] = await readPool.query(`SELECT * FROM \`${t}\` WHERE \`${pk[0].column_name}\` = ?`, [id]);
return { content: [{ type: 'text', text: JSON.stringify(rows) }] };
},
);
}
}这样 Agent 看到的不是 "run SQL",而是 get_orders_by_id、list_users、search_products_by_name——更可控、错率更低。
进阶:限流 & 熔断
import { RateLimiterMemory } from 'rate-limiter-flexible';
const limiter = new RateLimiterMemory({ points: 60, duration: 60 });
server.tool('run_query', /*...*/, async (args) => {
try { await limiter.consume(process.env.MCP_CLIENT ?? 'unknown'); }
catch { return { content: [{ type: 'text', text: 'rate limited' }], isError: true }; }
// ...原逻辑
});你可以怎么扩展
- 多租户:从 client 元信息里拿 tenant_id,自动注入
WHERE tenant_id = ? - 向量检索 tool:暴露
semantic_search(query, k=5),内部用 MariaDB 11.8+VECTOR(见 RAG 教程) - 写库工具:把
INSERT/UPDATE包成cancel_order(order_id, reason)这种业务级 tool,args 强类型校验
参考与延伸
- MCP 官方规范
- 事故复盘合集 —— 别人怎么把库删了
- Cursor/Claude rules 模板 —— 配合本 MCP 使用效果最佳
- Schema 文档生成器 —— 把 schema 变 LLM 友好