MariaDB 中文社区

各语言 ORM / 驱动最佳实践

Python / Node.js / Go / Java / Rust / PHP / Ruby — 连接、事务、批量、向量、连接池的 ready-to-paste 范式

全部代码片段都按"能直接 paste 到生产"标准给出。连接池、事务、批量、健康检查 这四件套是每个语言都必须做好的。

通用基础

推荐连接 URI

mysql://USER:PASS@HOST:3306/DB?charset=utf8mb4&parseTime=true&tls=preferred

关键参数:

参数推荐原因
charsetutf8mb4永远
tls / sslmodepreferredrequired生产强制 TLS
connect_timeout5s不要默认无限
read_timeout30s防止 stuck 连接吃住池
pool_sizeCPU × 2–4别一上来配 100+

通用陷阱

  1. 驱动版本兼容:MariaDB 11+ 在 auth、charset collation 等行为上与 MySQL 8 不同。优先用 MariaDB 官方驱动或最新版的 MySQL 驱动(5.x+)。
  2. SET sql_mode:很多 ORM 启动时改 sql_mode,会出乎意料。显式设定STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
  3. 连接生命周期:单连接闲置 > wait_timeout(默认 8h)会被服务端断。连接池要配 max_lifetimewait_timeout 短。

Python

驱动选择

  • mariadb(MariaDB 官方 C 客户端绑定):推荐
  • PyMySQL:纯 Python,最易用,开发调试好
  • mysqlclient:MySQL 官方 fork,老牌、最快

SQLAlchemy 2.x 范式

from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "mariadb+mariadbconnector://app:pass@host:3306/app?charset=utf8mb4",
    poolclass=QueuePool,
    pool_size=20, max_overflow=10,
    pool_pre_ping=True,       # 每次取连接前 SELECT 1 健康检查
    pool_recycle=3600,        # 1 小时强制回收,比 wait_timeout 短
    connect_args={"connect_timeout": 5},
    echo=False,
)

# 事务 + 自动回滚
with Session(engine) as s, s.begin():
    s.execute(text("UPDATE accounts SET balance = balance - 100 WHERE id = 1"))
    s.execute(text("UPDATE accounts SET balance = balance + 100 WHERE id = 2"))

批量插入(速度差 100 倍)

# ❌ 慢
for row in data:
    s.execute(text("INSERT INTO orders ..."), row)

# ✅ executemany(驱动层批量)
s.connection().exec_driver_sql(
    "INSERT INTO orders (user_id, amount_cents) VALUES (%s, %s)",
    [(r["uid"], r["amt"]) for r in data],
)

# ✅✅ 一次大 INSERT(最快)
from sqlalchemy.dialects.mysql import insert
stmt = insert(Order.__table__).values([dict(...) for r in data])
s.execute(stmt)

向量列(MariaDB 11.8+)

import json
embed = [0.1, 0.2, ...]  # 1536 维
s.execute(
    text("INSERT INTO chunks (content, embed) VALUES (:c, VEC_FromText(:e))"),
    {"c": text_chunk, "e": json.dumps(embed)},
)

Django

# settings.py
DATABASES = {
    "default": {
        "ENGINE": "django.db.backends.mysql",
        "NAME": "app",
        "USER": "app",
        "PASSWORD": os.environ["DB_PASS"],
        "HOST": "host",
        "PORT": "3306",
        "OPTIONS": {
            "charset": "utf8mb4",
            "init_command": "SET sql_mode='STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'",
            "isolation_level": "read committed",  # 长查询场景
            "connect_timeout": 5,
        },
        "CONN_MAX_AGE": 600,
        "CONN_HEALTH_CHECKS": True,  # Django 4.1+
    }
}

Django + MariaDB 4-byte UTF8 坑:早年版本 utf8mb4 不全支持。Django 4+ + MariaDB 10.5+ 没问题。


Node.js / TypeScript

驱动选择

  • mysql2:事实标准,支持 promise/prepared statements
  • mariadb(MariaDB 官方 Node 客户端):性能略好,特性最新
  • @planetscale/database:HTTP 协议,serverless 友好(不适合本场景)

mysql2 范式

import { createPool, type Pool } from 'mysql2/promise';

export const pool: Pool = createPool({
  uri: process.env.MARIADB_URL,
  connectionLimit: 20,
  connectTimeout: 5_000,
  enableKeepAlive: true,
  keepAliveInitialDelay: 30_000,
  waitForConnections: true,
  queueLimit: 100,
  namedPlaceholders: true,
});

// 事务
const conn = await pool.getConnection();
try {
  await conn.beginTransaction();
  await conn.query('UPDATE accounts SET balance = balance - ? WHERE id = ?', [100, 1]);
  await conn.query('UPDATE accounts SET balance = balance + ? WHERE id = ?', [100, 2]);
  await conn.commit();
} catch (e) {
  await conn.rollback();
  throw e;
} finally {
  conn.release();
}

Drizzle ORM(推荐)

import { drizzle } from 'drizzle-orm/mysql2';
import { mysqlTable, bigint, varchar, datetime } from 'drizzle-orm/mysql-core';

export const orders = mysqlTable('orders', {
  id: bigint('id', { mode: 'bigint' }).primaryKey().autoincrement(),
  userId: bigint('user_id', { mode: 'bigint' }).notNull(),
  amountCents: int('amount_cents').notNull(),
  createdAt: datetime('created_at').default(sql`CURRENT_TIMESTAMP`),
});

const db = drizzle(pool);

// 事务
await db.transaction(async (tx) => {
  await tx.update(accounts).set({ balance: sql`${accounts.balance} - 100` }).where(eq(accounts.id, 1));
  await tx.update(accounts).set({ balance: sql`${accounts.balance} + 100` }).where(eq(accounts.id, 2));
});

// 批量 insert
await db.insert(orders).values(rows);  // 自动批量

Prisma

Prisma 把 MariaDB 标为 mysql provider,但 11.x 的某些功能(VECTOR、新 JSON 函数)尚未在 schema 里直接表达。

datasource db {
  provider = "mysql"
  url      = env("MARIADB_URL")
}

model Order {
  id          BigInt    @id @default(autoincrement())
  userId      BigInt    @map("user_id")
  amountCents Int       @map("amount_cents")
  createdAt   DateTime  @default(now()) @map("created_at")

  @@map("orders")
  @@index([userId, createdAt])
}

VECTOR 列暂时用 Unsupported("vector(1536)") 占位 + 原生 query。

await prisma.$transaction([
  prisma.account.update({ where: { id: 1 }, data: { balance: { decrement: 100 } } }),
  prisma.account.update({ where: { id: 2 }, data: { balance: { increment: 100 } } }),
]);

Go

驱动

  • github.com/go-sql-driver/mysql:事实标准
  • github.com/mariadb-corporation/mariadb-go-driver:MariaDB 11+ 新特性

database/sql 范式

import (
    "database/sql"
    _ "github.com/go-sql-driver/mysql"
    "time"
)

dsn := "app:pass@tcp(host:3306)/app?charset=utf8mb4&parseTime=true&loc=Local&timeout=5s&readTimeout=30s"
db, err := sql.Open("mysql", dsn)
if err != nil { return err }
db.SetMaxOpenConns(20)
db.SetMaxIdleConns(10)
db.SetConnMaxLifetime(time.Hour)
db.SetConnMaxIdleTime(10 * time.Minute)

事务

ctx := context.Background()
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
if err != nil { return err }
defer tx.Rollback()  // 已 Commit 后 Rollback 无害

if _, err := tx.ExecContext(ctx, "UPDATE accounts SET balance = balance - ? WHERE id = ?", 100, 1); err != nil {
    return err
}
if _, err := tx.ExecContext(ctx, "UPDATE accounts SET balance = balance + ? WHERE id = ?", 100, 2); err != nil {
    return err
}
return tx.Commit()

Retry 死锁

for attempt := 0; attempt < 3; attempt++ {
    if err := runTx(ctx, db); err == nil { return nil }
    var me *mysql.MySQLError
    if errors.As(err, &me) && me.Number == 1213 {
        time.Sleep(time.Duration(50*(1<<attempt)) * time.Millisecond)
        continue
    }
    return err
}

GORM

import "gorm.io/driver/mysql"
import "gorm.io/gorm"

db, err := gorm.Open(mysql.New(mysql.Config{
    DSN: dsn,
    DefaultStringSize: 256,
    DisableDatetimePrecision: false,
    DontSupportRenameIndex: false,
}), &gorm.Config{})

// 批量插入
db.CreateInBatches(rows, 100)

// 事务
db.Transaction(func(tx *gorm.DB) error {
    if err := tx.Model(&Account{}).Where("id = ?", 1).Update("balance", gorm.Expr("balance - ?", 100)).Error; err != nil { return err }
    if err := tx.Model(&Account{}).Where("id = ?", 2).Update("balance", gorm.Expr("balance + ?", 100)).Error; err != nil { return err }
    return nil
})

sqlc(生成代码)

写 SQL,sqlc 生成 typed Go 函数。推荐给"写 SQL > 写 ORM"的团队。

-- query.sql
-- name: GetOrderByID :one
SELECT * FROM orders WHERE id = ? LIMIT 1;

-- name: ListUserOrders :many
SELECT * FROM orders WHERE user_id = ? ORDER BY created_at DESC LIMIT ?;

Java

驱动

  • MariaDB Connector/J(推荐 MariaDB 后端)
  • MySQL Connector/J(兼容 MariaDB)

HikariCP + Connector/J

HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mariadb://host:3306/app?useUnicode=true&characterEncoding=utf8");
config.setUsername("app");
config.setPassword(System.getenv("DB_PASS"));
config.setMaximumPoolSize(20);
config.setConnectionTimeout(5000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);
config.setLeakDetectionThreshold(60000);
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("useServerPrepStmts", "true");
HikariDataSource ds = new HikariDataSource(config);

Spring Boot

spring:
  datasource:
    url: jdbc:mariadb://host:3306/app?characterEncoding=utf8&useUnicode=true
    username: app
    password: ${DB_PASS}
    driver-class-name: org.mariadb.jdbc.Driver
    hikari:
      maximum-pool-size: 20
      minimum-idle: 5
      idle-timeout: 600000
      max-lifetime: 1800000
      connection-timeout: 5000
  jpa:
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MariaDBDialect
        jdbc:
          batch_size: 50
        order_inserts: true
        order_updates: true

MyBatis

<configuration>
  <settings>
    <setting name="defaultExecutorType" value="REUSE"/>
  </settings>
  <environments default="prod">
    <environment id="prod">
      <transactionManager type="JDBC"/>
      <dataSource type="POOLED">
        <property name="driver" value="org.mariadb.jdbc.Driver"/>
        <property name="url" value="jdbc:mariadb://host:3306/app"/>
        <property name="username" value="app"/>
        <property name="password" value="${DB_PASS}"/>
      </dataSource>
    </environment>
  </environments>
</configuration>

Rust

驱动

  • mysql_async:tokio 异步,性能好
  • sqlx:类型安全 + compile-time 校验,推荐
  • diesel:完全 ORM,编译期 schema 校验

sqlx 范式

# Cargo.toml
[dependencies]
sqlx = { version = "0.8", features = ["mysql", "runtime-tokio-rustls", "chrono"] }
tokio = { version = "1", features = ["full"] }
use sqlx::mysql::{MySqlPoolOptions, MySqlPool};

let pool: MySqlPool = MySqlPoolOptions::new()
    .max_connections(20)
    .acquire_timeout(std::time::Duration::from_secs(5))
    .max_lifetime(Some(std::time::Duration::from_secs(3600)))
    .test_before_acquire(true)
    .connect("mysql://app:pass@host:3306/app?charset=utf8mb4")
    .await?;

// 事务
let mut tx = pool.begin().await?;
sqlx::query!("UPDATE accounts SET balance = balance - ? WHERE id = ?", 100, 1)
    .execute(&mut *tx).await?;
sqlx::query!("UPDATE accounts SET balance = balance + ? WHERE id = ?", 100, 2)
    .execute(&mut *tx).await?;
tx.commit().await?;

Diesel

use diesel::prelude::*;
use diesel::mysql::MysqlConnection;
use diesel::r2d2::{ConnectionManager, Pool};

let manager = ConnectionManager::<MysqlConnection>::new("mysql://...");
let pool: Pool<ConnectionManager<MysqlConnection>> = Pool::builder()
    .max_size(20)
    .build(manager)?;

PHP

Laravel(默认用 PDO)

// config/database.php
'mysql' => [
    'driver' => 'mysql',
    'host' => env('DB_HOST'),
    'port' => env('DB_PORT', '3306'),
    'database' => env('DB_DATABASE'),
    'username' => env('DB_USERNAME'),
    'password' => env('DB_PASSWORD'),
    'charset' => 'utf8mb4',
    'collation' => 'utf8mb4_unicode_ci',
    'prefix' => '',
    'strict' => true,
    'engine' => 'InnoDB',
    'options' => [
        PDO::ATTR_PERSISTENT => true,                       // 长连接(谨慎,看场景)
        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET sql_mode="STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION"',
    ],
],
// 事务
DB::transaction(function () {
    DB::table('accounts')->where('id', 1)->decrement('balance', 100);
    DB::table('accounts')->where('id', 2)->increment('balance', 100);
}, 3);  // 第 2 个参数: 死锁 retry 次数

原生 PDO

$pdo = new PDO(
    'mysql:host=host;port=3306;dbname=app;charset=utf8mb4',
    'app', $pass,
    [
        PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
        PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
        PDO::ATTR_EMULATE_PREPARES => false,
        PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => true,
    ],
);

Ruby

ActiveRecord(Rails)

# config/database.yml
production:
  adapter: mysql2
  encoding: utf8mb4
  collation: utf8mb4_unicode_ci
  reconnect: false
  pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 20 } %>
  host: <%= ENV["DB_HOST"] %>
  username: app
  password: <%= ENV["DB_PASS"] %>
  database: app
  variables:
    sql_mode: 'STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION'
    transaction_isolation: 'REPEATABLE-READ'
  connect_timeout: 5
ActiveRecord::Base.transaction do
  Account.find(1).update!(balance: 0)
  Account.find(2).update!(balance: 200)
end

# bulk insert
Order.insert_all(rows)

Sequel

require 'sequel'
DB = Sequel.connect(
  adapter: 'mysql2',
  host: ENV['DB_HOST'],
  user: 'app', password: ENV['DB_PASS'], database: 'app',
  encoding: 'utf8mb4', max_connections: 20,
)

通用:测试如何隔离

每个测试套件用独立 schema 或 docker 实例。绝不要让测试连生产(见 事故合集 事故 8)。

// jest setup
const dbName = `test_${process.env.JEST_WORKER_ID}_${Date.now()}`;
await admin.query(`CREATE DATABASE ${dbName}`);
// ... 跑测试
// teardown
await admin.query(`DROP DATABASE ${dbName}`);

或者用 Docker:

services:
  mariadb-test:
    image: mariadb:11.4
    tmpfs: /var/lib/mysql   # 内存盘,10x 速度
    environment:
      MARIADB_ROOT_PASSWORD: x

延伸

本页目录