各语言 ORM / 驱动最佳实践
Python / Node.js / Go / Java / Rust / PHP / Ruby — 连接、事务、批量、向量、连接池的 ready-to-paste 范式
全部代码片段都按"能直接 paste 到生产"标准给出。连接池、事务、批量、健康检查 这四件套是每个语言都必须做好的。
通用基础
推荐连接 URI
mysql://USER:PASS@HOST:3306/DB?charset=utf8mb4&parseTime=true&tls=preferred关键参数:
| 参数 | 推荐 | 原因 |
|---|---|---|
charset | utf8mb4 | 永远 |
tls / sslmode | preferred 或 required | 生产强制 TLS |
connect_timeout | 5s | 不要默认无限 |
read_timeout | 30s | 防止 stuck 连接吃住池 |
pool_size | CPU × 2–4 | 别一上来配 100+ |
通用陷阱
- 驱动版本兼容:MariaDB 11+ 在 auth、charset collation 等行为上与 MySQL 8 不同。优先用 MariaDB 官方驱动或最新版的 MySQL 驱动(5.x+)。
SET sql_mode:很多 ORM 启动时改 sql_mode,会出乎意料。显式设定为STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION。- 连接生命周期:单连接闲置 >
wait_timeout(默认 8h)会被服务端断。连接池要配max_lifetime比wait_timeout短。
Python
驱动选择
mariadb(MariaDB 官方 C 客户端绑定):推荐PyMySQL:纯 Python,最易用,开发调试好mysqlclient:MySQL 官方 fork,老牌、最快
SQLAlchemy 2.x 范式
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
from sqlalchemy.pool import QueuePool
engine = create_engine(
"mariadb+mariadbconnector://app:pass@host:3306/app?charset=utf8mb4",
poolclass=QueuePool,
pool_size=20, max_overflow=10,
pool_pre_ping=True, # 每次取连接前 SELECT 1 健康检查
pool_recycle=3600, # 1 小时强制回收,比 wait_timeout 短
connect_args={"connect_timeout": 5},
echo=False,
)
# 事务 + 自动回滚
with Session(engine) as s, s.begin():
s.execute(text("UPDATE accounts SET balance = balance - 100 WHERE id = 1"))
s.execute(text("UPDATE accounts SET balance = balance + 100 WHERE id = 2"))批量插入(速度差 100 倍)
# ❌ 慢
for row in data:
s.execute(text("INSERT INTO orders ..."), row)
# ✅ executemany(驱动层批量)
s.connection().exec_driver_sql(
"INSERT INTO orders (user_id, amount_cents) VALUES (%s, %s)",
[(r["uid"], r["amt"]) for r in data],
)
# ✅✅ 一次大 INSERT(最快)
from sqlalchemy.dialects.mysql import insert
stmt = insert(Order.__table__).values([dict(...) for r in data])
s.execute(stmt)向量列(MariaDB 11.8+)
import json
embed = [0.1, 0.2, ...] # 1536 维
s.execute(
text("INSERT INTO chunks (content, embed) VALUES (:c, VEC_FromText(:e))"),
{"c": text_chunk, "e": json.dumps(embed)},
)Django
# settings.py
DATABASES = {
"default": {
"ENGINE": "django.db.backends.mysql",
"NAME": "app",
"USER": "app",
"PASSWORD": os.environ["DB_PASS"],
"HOST": "host",
"PORT": "3306",
"OPTIONS": {
"charset": "utf8mb4",
"init_command": "SET sql_mode='STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'",
"isolation_level": "read committed", # 长查询场景
"connect_timeout": 5,
},
"CONN_MAX_AGE": 600,
"CONN_HEALTH_CHECKS": True, # Django 4.1+
}
}Django + MariaDB 4-byte UTF8 坑:早年版本 utf8mb4 不全支持。Django 4+ + MariaDB 10.5+ 没问题。
Node.js / TypeScript
驱动选择
mysql2:事实标准,支持 promise/prepared statementsmariadb(MariaDB 官方 Node 客户端):性能略好,特性最新@planetscale/database:HTTP 协议,serverless 友好(不适合本场景)
mysql2 范式
import { createPool, type Pool } from 'mysql2/promise';
export const pool: Pool = createPool({
uri: process.env.MARIADB_URL,
connectionLimit: 20,
connectTimeout: 5_000,
enableKeepAlive: true,
keepAliveInitialDelay: 30_000,
waitForConnections: true,
queueLimit: 100,
namedPlaceholders: true,
});
// 事务
const conn = await pool.getConnection();
try {
await conn.beginTransaction();
await conn.query('UPDATE accounts SET balance = balance - ? WHERE id = ?', [100, 1]);
await conn.query('UPDATE accounts SET balance = balance + ? WHERE id = ?', [100, 2]);
await conn.commit();
} catch (e) {
await conn.rollback();
throw e;
} finally {
conn.release();
}Drizzle ORM(推荐)
import { drizzle } from 'drizzle-orm/mysql2';
import { mysqlTable, bigint, varchar, datetime } from 'drizzle-orm/mysql-core';
export const orders = mysqlTable('orders', {
id: bigint('id', { mode: 'bigint' }).primaryKey().autoincrement(),
userId: bigint('user_id', { mode: 'bigint' }).notNull(),
amountCents: int('amount_cents').notNull(),
createdAt: datetime('created_at').default(sql`CURRENT_TIMESTAMP`),
});
const db = drizzle(pool);
// 事务
await db.transaction(async (tx) => {
await tx.update(accounts).set({ balance: sql`${accounts.balance} - 100` }).where(eq(accounts.id, 1));
await tx.update(accounts).set({ balance: sql`${accounts.balance} + 100` }).where(eq(accounts.id, 2));
});
// 批量 insert
await db.insert(orders).values(rows); // 自动批量Prisma
Prisma 把 MariaDB 标为 mysql provider,但 11.x 的某些功能(VECTOR、新 JSON 函数)尚未在 schema 里直接表达。
datasource db {
provider = "mysql"
url = env("MARIADB_URL")
}
model Order {
id BigInt @id @default(autoincrement())
userId BigInt @map("user_id")
amountCents Int @map("amount_cents")
createdAt DateTime @default(now()) @map("created_at")
@@map("orders")
@@index([userId, createdAt])
}VECTOR 列暂时用 Unsupported("vector(1536)") 占位 + 原生 query。
await prisma.$transaction([
prisma.account.update({ where: { id: 1 }, data: { balance: { decrement: 100 } } }),
prisma.account.update({ where: { id: 2 }, data: { balance: { increment: 100 } } }),
]);Go
驱动
github.com/go-sql-driver/mysql:事实标准github.com/mariadb-corporation/mariadb-go-driver:MariaDB 11+ 新特性
database/sql 范式
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
"time"
)
dsn := "app:pass@tcp(host:3306)/app?charset=utf8mb4&parseTime=true&loc=Local&timeout=5s&readTimeout=30s"
db, err := sql.Open("mysql", dsn)
if err != nil { return err }
db.SetMaxOpenConns(20)
db.SetMaxIdleConns(10)
db.SetConnMaxLifetime(time.Hour)
db.SetConnMaxIdleTime(10 * time.Minute)事务
ctx := context.Background()
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
if err != nil { return err }
defer tx.Rollback() // 已 Commit 后 Rollback 无害
if _, err := tx.ExecContext(ctx, "UPDATE accounts SET balance = balance - ? WHERE id = ?", 100, 1); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, "UPDATE accounts SET balance = balance + ? WHERE id = ?", 100, 2); err != nil {
return err
}
return tx.Commit()Retry 死锁
for attempt := 0; attempt < 3; attempt++ {
if err := runTx(ctx, db); err == nil { return nil }
var me *mysql.MySQLError
if errors.As(err, &me) && me.Number == 1213 {
time.Sleep(time.Duration(50*(1<<attempt)) * time.Millisecond)
continue
}
return err
}GORM
import "gorm.io/driver/mysql"
import "gorm.io/gorm"
db, err := gorm.Open(mysql.New(mysql.Config{
DSN: dsn,
DefaultStringSize: 256,
DisableDatetimePrecision: false,
DontSupportRenameIndex: false,
}), &gorm.Config{})
// 批量插入
db.CreateInBatches(rows, 100)
// 事务
db.Transaction(func(tx *gorm.DB) error {
if err := tx.Model(&Account{}).Where("id = ?", 1).Update("balance", gorm.Expr("balance - ?", 100)).Error; err != nil { return err }
if err := tx.Model(&Account{}).Where("id = ?", 2).Update("balance", gorm.Expr("balance + ?", 100)).Error; err != nil { return err }
return nil
})sqlc(生成代码)
写 SQL,sqlc 生成 typed Go 函数。推荐给"写 SQL > 写 ORM"的团队。
-- query.sql
-- name: GetOrderByID :one
SELECT * FROM orders WHERE id = ? LIMIT 1;
-- name: ListUserOrders :many
SELECT * FROM orders WHERE user_id = ? ORDER BY created_at DESC LIMIT ?;Java
驱动
- MariaDB Connector/J(推荐 MariaDB 后端)
- MySQL Connector/J(兼容 MariaDB)
HikariCP + Connector/J
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mariadb://host:3306/app?useUnicode=true&characterEncoding=utf8");
config.setUsername("app");
config.setPassword(System.getenv("DB_PASS"));
config.setMaximumPoolSize(20);
config.setConnectionTimeout(5000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);
config.setLeakDetectionThreshold(60000);
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("useServerPrepStmts", "true");
HikariDataSource ds = new HikariDataSource(config);Spring Boot
spring:
datasource:
url: jdbc:mariadb://host:3306/app?characterEncoding=utf8&useUnicode=true
username: app
password: ${DB_PASS}
driver-class-name: org.mariadb.jdbc.Driver
hikari:
maximum-pool-size: 20
minimum-idle: 5
idle-timeout: 600000
max-lifetime: 1800000
connection-timeout: 5000
jpa:
properties:
hibernate:
dialect: org.hibernate.dialect.MariaDBDialect
jdbc:
batch_size: 50
order_inserts: true
order_updates: trueMyBatis
<configuration>
<settings>
<setting name="defaultExecutorType" value="REUSE"/>
</settings>
<environments default="prod">
<environment id="prod">
<transactionManager type="JDBC"/>
<dataSource type="POOLED">
<property name="driver" value="org.mariadb.jdbc.Driver"/>
<property name="url" value="jdbc:mariadb://host:3306/app"/>
<property name="username" value="app"/>
<property name="password" value="${DB_PASS}"/>
</dataSource>
</environment>
</environments>
</configuration>Rust
驱动
mysql_async:tokio 异步,性能好sqlx:类型安全 + compile-time 校验,推荐diesel:完全 ORM,编译期 schema 校验
sqlx 范式
# Cargo.toml
[dependencies]
sqlx = { version = "0.8", features = ["mysql", "runtime-tokio-rustls", "chrono"] }
tokio = { version = "1", features = ["full"] }use sqlx::mysql::{MySqlPoolOptions, MySqlPool};
let pool: MySqlPool = MySqlPoolOptions::new()
.max_connections(20)
.acquire_timeout(std::time::Duration::from_secs(5))
.max_lifetime(Some(std::time::Duration::from_secs(3600)))
.test_before_acquire(true)
.connect("mysql://app:pass@host:3306/app?charset=utf8mb4")
.await?;
// 事务
let mut tx = pool.begin().await?;
sqlx::query!("UPDATE accounts SET balance = balance - ? WHERE id = ?", 100, 1)
.execute(&mut *tx).await?;
sqlx::query!("UPDATE accounts SET balance = balance + ? WHERE id = ?", 100, 2)
.execute(&mut *tx).await?;
tx.commit().await?;Diesel
use diesel::prelude::*;
use diesel::mysql::MysqlConnection;
use diesel::r2d2::{ConnectionManager, Pool};
let manager = ConnectionManager::<MysqlConnection>::new("mysql://...");
let pool: Pool<ConnectionManager<MysqlConnection>> = Pool::builder()
.max_size(20)
.build(manager)?;PHP
Laravel(默认用 PDO)
// config/database.php
'mysql' => [
'driver' => 'mysql',
'host' => env('DB_HOST'),
'port' => env('DB_PORT', '3306'),
'database' => env('DB_DATABASE'),
'username' => env('DB_USERNAME'),
'password' => env('DB_PASSWORD'),
'charset' => 'utf8mb4',
'collation' => 'utf8mb4_unicode_ci',
'prefix' => '',
'strict' => true,
'engine' => 'InnoDB',
'options' => [
PDO::ATTR_PERSISTENT => true, // 长连接(谨慎,看场景)
PDO::MYSQL_ATTR_INIT_COMMAND => 'SET sql_mode="STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION"',
],
],// 事务
DB::transaction(function () {
DB::table('accounts')->where('id', 1)->decrement('balance', 100);
DB::table('accounts')->where('id', 2)->increment('balance', 100);
}, 3); // 第 2 个参数: 死锁 retry 次数原生 PDO
$pdo = new PDO(
'mysql:host=host;port=3306;dbname=app;charset=utf8mb4',
'app', $pass,
[
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
PDO::ATTR_EMULATE_PREPARES => false,
PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => true,
],
);Ruby
ActiveRecord(Rails)
# config/database.yml
production:
adapter: mysql2
encoding: utf8mb4
collation: utf8mb4_unicode_ci
reconnect: false
pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 20 } %>
host: <%= ENV["DB_HOST"] %>
username: app
password: <%= ENV["DB_PASS"] %>
database: app
variables:
sql_mode: 'STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION'
transaction_isolation: 'REPEATABLE-READ'
connect_timeout: 5ActiveRecord::Base.transaction do
Account.find(1).update!(balance: 0)
Account.find(2).update!(balance: 200)
end
# bulk insert
Order.insert_all(rows)Sequel
require 'sequel'
DB = Sequel.connect(
adapter: 'mysql2',
host: ENV['DB_HOST'],
user: 'app', password: ENV['DB_PASS'], database: 'app',
encoding: 'utf8mb4', max_connections: 20,
)通用:测试如何隔离
每个测试套件用独立 schema 或 docker 实例。绝不要让测试连生产(见 事故合集 事故 8)。
// jest setup
const dbName = `test_${process.env.JEST_WORKER_ID}_${Date.now()}`;
await admin.query(`CREATE DATABASE ${dbName}`);
// ... 跑测试
// teardown
await admin.query(`DROP DATABASE ${dbName}`);或者用 Docker:
services:
mariadb-test:
image: mariadb:11.4
tmpfs: /var/lib/mysql # 内存盘,10x 速度
environment:
MARIADB_ROOT_PASSWORD: x