MariaDB 中文社区

给 MariaDB 写一个生产级 MCP Server

从零搭建一个带安全护栏、行数上限、审计日志、schema 自省工具的 MariaDB MCP server

目标:构建一个让 Claude / Cursor / Windsurf 等 AI 客户端能"安全地"操作 MariaDB 的 MCP server。

重点不是"能跑",而是生产级:注入防护、行数上限、审计日志、按 schema 自动生成工具。

为什么写这一篇

社区里能找到的 MariaDB / MySQL MCP server 大多停留在"Demo"水平:

问题Demo 实现生产实现
SQL 注入直接 db.query(prompt)强制 prepared statement
误删没有任何拦截行数上限 + 危险语句白名单
审计写到 stdout写到独立审计库,带 LLM 标识
Schema 自省没有自动生成 typed tools
权限用 root强制只读账号 + 按表授权
长连接单连接连接池 + 超时
多租户一个进程一个库上下文按租户隔离

下面这套是我自己用 TypeScript + @modelcontextprotocol/sdk + mysql2 写的,已在小流量生产跑了 3 个月。

整体架构

+-----------------+        stdio          +------------------+
|  Claude Desktop |  <----- JSON-RPC ---> |  MCP Server      |
|  (or Cursor)    |                       |  (this code)     |
+-----------------+                       +---------+--------+
                                                    |
                                          +---------v--------+
                                          | Guard            |
                                          | - parser         |
                                          | - row-limit      |
                                          | - deny-list      |
                                          +---------+--------+
                                                    |
                                  +-----------------+----------------+
                                  |                                  |
                          +-------v-------+                  +-------v-------+
                          | MariaDB (RO)  |                  | MariaDB (audit) |
                          | app schema    |                  | tools / logs    |
                          +---------------+                  +---------------+

Step-by-step

项目骨架

mkdir mariadb-mcp && cd mariadb-mcp
pnpm init
pnpm add @modelcontextprotocol/sdk mysql2 zod node-sql-parser
pnpm add -D typescript tsx @types/node
echo '{"compilerOptions":{"target":"es2022","module":"esnext","moduleResolution":"bundler","strict":true,"esModuleInterop":true}}' > tsconfig.json

连接池 + 双账号

// src/db.ts
import { createPool, type Pool } from 'mysql2/promise';

export const readPool: Pool = createPool({
  uri: process.env.MARIADB_READ_URL!,          // 只读账号
  connectionLimit: 5,
  connectTimeout: 5_000,
  enableKeepAlive: true,
});

export const auditPool: Pool = createPool({
  uri: process.env.MARIADB_AUDIT_URL!,         // 写入审计库的专用账号
  connectionLimit: 2,
});

await auditPool.query(`
  CREATE TABLE IF NOT EXISTS mcp_audit (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    ts DATETIME(6) DEFAULT CURRENT_TIMESTAMP(6),
    client VARCHAR(64),
    tool VARCHAR(64),
    sql_text TEXT,
    args_json JSON,
    rows_returned INT,
    error TEXT,
    INDEX idx_ts (ts),
    INDEX idx_client (client)
  ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='MCP 工具调用审计'
`);

安全护栏(核心)

// src/guard.ts
import { Parser } from 'node-sql-parser';
const parser = new Parser();

const DENY = new Set(['drop', 'truncate', 'alter', 'rename', 'grant', 'revoke', 'create_user']);
const WRITE = new Set(['insert', 'update', 'delete', 'replace']);

export interface GuardResult {
  ok: boolean;
  reason?: string;
  type: 'select' | 'write' | 'ddl' | 'unknown';
}

export function guard(sql: string, opts: { allowWrites: boolean; maxRowsAffected: number }): GuardResult {
  let ast;
  try {
    ast = parser.astify(sql, { database: 'MySQL' });
  } catch (e: any) {
    return { ok: false, reason: `parse error: ${e.message}`, type: 'unknown' };
  }
  const stmts = Array.isArray(ast) ? ast : [ast];
  if (stmts.length > 1) return { ok: false, reason: 'multi-statement not allowed', type: 'unknown' };
  const t: string = stmts[0].type;
  if (DENY.has(t)) return { ok: false, reason: `statement type "${t}" denied`, type: 'ddl' };

  if (WRITE.has(t)) {
    if (!opts.allowWrites) return { ok: false, reason: 'writes disabled', type: 'write' };
    if (!sql.toLowerCase().includes('where')) {
      return { ok: false, reason: 'write without WHERE clause is rejected', type: 'write' };
    }
    return { ok: true, type: 'write' };
  }
  return { ok: true, type: 'select' };
}

永远不要 trust LLM 生成的 SQL。即使你的 prompt 写得再好,也总有 prompt-injection 的可能。这里的护栏是最后一道防线,不要省。

MCP server 主体

// src/server.ts
import { McpServer, ResourceTemplate } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import { z } from 'zod';
import { readPool, auditPool } from './db.js';
import { guard } from './guard.js';

const ALLOW_WRITES = process.env.MCP_ALLOW_WRITES === '1';
const MAX_ROWS = Number(process.env.MCP_MAX_ROWS ?? 500);

const server = new McpServer({ name: 'mariadb-mcp', version: '1.0.0' });

// ---- tool: list_tables ----
server.tool('list_tables', 'List tables in current database', {}, async () => {
  const [rows] = await readPool.query<any[]>(
    `SELECT table_name AS name, table_rows, table_comment AS description
     FROM information_schema.tables
     WHERE table_schema = DATABASE()`
  );
  return { content: [{ type: 'text', text: JSON.stringify(rows, null, 2) }] };
});

// ---- tool: describe_table ----
server.tool(
  'describe_table',
  'Get a table\'s columns, indexes and comments',
  { table: z.string() },
  async ({ table }) => {
    const [cols] = await readPool.query<any[]>(
      `SELECT column_name, column_type, is_nullable, column_key, column_default, column_comment
       FROM information_schema.columns
       WHERE table_schema = DATABASE() AND table_name = ?`,
      [table],
    );
    const [idx] = await readPool.query<any[]>(`SHOW INDEXES FROM \`${table}\``);
    return {
      content: [
        { type: 'text', text: JSON.stringify({ columns: cols, indexes: idx }, null, 2) },
      ],
    };
  },
);

// ---- tool: run_query ----
server.tool(
  'run_query',
  'Run a single SQL statement (guarded)',
  { sql: z.string(), limit: z.number().int().positive().max(MAX_ROWS).default(50) },
  async ({ sql, limit }) => {
    const check = guard(sql, { allowWrites: ALLOW_WRITES, maxRowsAffected: MAX_ROWS });
    if (!check.ok) {
      await audit('run_query', sql, { limit }, 0, check.reason);
      return { content: [{ type: 'text', text: `BLOCKED: ${check.reason}` }], isError: true };
    }
    try {
      const finalSql = check.type === 'select' ? `${sql} LIMIT ${limit}` : sql;
      const [rows] = await readPool.query<any[]>(finalSql);
      const count = Array.isArray(rows) ? rows.length : (rows as any).affectedRows ?? 0;
      await audit('run_query', sql, { limit }, count);
      return { content: [{ type: 'text', text: JSON.stringify(rows, null, 2) }] };
    } catch (e: any) {
      await audit('run_query', sql, { limit }, 0, e.message);
      return { content: [{ type: 'text', text: `ERROR: ${e.message}` }], isError: true };
    }
  },
);

async function audit(
  tool: string,
  sql: string,
  args: object,
  rows: number,
  error?: string,
) {
  await auditPool.query(
    `INSERT INTO mcp_audit (client, tool, sql_text, args_json, rows_returned, error) VALUES (?, ?, ?, ?, ?, ?)`,
    [process.env.MCP_CLIENT ?? 'unknown', tool, sql, JSON.stringify(args), rows, error ?? null],
  );
}

await server.connect(new StdioServerTransport());

客户端配置

~/Library/Application Support/Claude/claude_desktop_config.json

{
  "mcpServers": {
    "mariadb": {
      "command": "node",
      "args": ["/abs/path/to/mariadb-mcp/dist/server.js"],
      "env": {
        "MARIADB_READ_URL": "mysql://ai_readonly:xxx@host:3306/app",
        "MARIADB_AUDIT_URL": "mysql://ai_audit:yyy@host:3306/audit",
        "MCP_ALLOW_WRITES": "0",
        "MCP_MAX_ROWS": "500",
        "MCP_CLIENT": "claude-desktop"
      }
    }
  }
}

~/.cursor/mcp.json,结构一致。

Windsurf 设置 → MCP Servers → Add,同样 stdio 配置。

给 MariaDB 配只读 + 审计账号

-- 只读账号(给 AI 用)
CREATE USER 'ai_readonly'@'%' IDENTIFIED BY 'strong-pass-1';
GRANT SELECT, SHOW VIEW ON app.* TO 'ai_readonly'@'%';

-- 审计账号
CREATE DATABASE IF NOT EXISTS audit;
CREATE USER 'ai_audit'@'%' IDENTIFIED BY 'strong-pass-2';
GRANT INSERT, SELECT ON audit.* TO 'ai_audit'@'%';

FLUSH PRIVILEGES;

测试

# 列工具
echo '{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}' | node dist/server.js

# 跑一个查询
echo '{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"run_query","arguments":{"sql":"SELECT 1+1 AS r"}}}' | node dist/server.js

# 尝试 DROP(应该被拦截)
echo '{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"run_query","arguments":{"sql":"DROP TABLE users"}}}' | node dist/server.js
# → BLOCKED: statement type "drop" denied

进阶:按 schema 自动生成 typed tools

上面的 run_query 暴露的是"通用 SQL"接口。更安全的姿势是按表生成专属工具

// 启动时扫描 schema,为每张表生成 get_<table>_by_<pk> / list_<table> 等工具
const [tables] = await readPool.query<any[]>(
  `SELECT table_name FROM information_schema.tables WHERE table_schema = DATABASE()`,
);
for (const { table_name: t } of tables) {
  const [pk] = await readPool.query<any[]>(
    `SELECT column_name FROM information_schema.key_column_usage
     WHERE table_schema = DATABASE() AND table_name = ? AND constraint_name = 'PRIMARY'`,
    [t],
  );
  if (pk[0]) {
    server.tool(
      `get_${t}_by_${pk[0].column_name}`,
      `Fetch one row from ${t} by its primary key`,
      { id: z.union([z.string(), z.number()]) },
      async ({ id }) => {
        const [rows] = await readPool.query(`SELECT * FROM \`${t}\` WHERE \`${pk[0].column_name}\` = ?`, [id]);
        return { content: [{ type: 'text', text: JSON.stringify(rows) }] };
      },
    );
  }
}

这样 Agent 看到的不是 "run SQL",而是 get_orders_by_idlist_userssearch_products_by_name——更可控、错率更低。

进阶:限流 & 熔断

import { RateLimiterMemory } from 'rate-limiter-flexible';
const limiter = new RateLimiterMemory({ points: 60, duration: 60 });

server.tool('run_query', /*...*/, async (args) => {
  try { await limiter.consume(process.env.MCP_CLIENT ?? 'unknown'); }
  catch { return { content: [{ type: 'text', text: 'rate limited' }], isError: true }; }
  // ...原逻辑
});

你可以怎么扩展

  • 多租户:从 client 元信息里拿 tenant_id,自动注入 WHERE tenant_id = ?
  • 向量检索 tool:暴露 semantic_search(query, k=5),内部用 MariaDB 11.8+ VECTOR(见 RAG 教程
  • 写库工具:把 INSERT/UPDATE 包成 cancel_order(order_id, reason) 这种业务级 tool,args 强类型校验

参考与延伸

本页目录