CDC:把 MariaDB binlog 接到下游
Debezium / Maxwell / Canal——MariaDB binlog → Kafka / Pulsar / NATS / PG / ES
CDC(Change Data Capture)让数据库的每一次变更都变成一个事件流,可以被异步消费。实时数仓、搜索索引同步、跨库复制、事件溯源都靠它。
三大主流工具
| 工具 | 维护 | 用什么 | 适合 |
|---|---|---|---|
| Debezium | Red Hat | Kafka Connect / Server | 生产首选 |
| Maxwell | 社区 | 直接发 Kafka | 轻量 |
| Canal | 阿里 | Java agent | 国内生态 |
通用前置:开 binlog
[mariadb]
server-id=1
log-bin=mysql-bin
binlog_format=ROW # CDC 必须 ROW,不能 STATEMENT
binlog_row_image=FULL
expire_logs_days=7
gtid_strict_mode=ON
log_slave_updates=ON # 从库也写 binlog-- 给 CDC 工具用户 GRANT
CREATE USER 'debezium'@'%' IDENTIFIED BY 'xxx';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;别在 Galera 集群所有节点开 binlog 同时跑 CDC——会重复消费。指定一个节点。
Debezium
部署方式
- Kafka Connect:传统、强大、需要 Kafka
- Debezium Server:直接发到 Kinesis / Pulsar / NATS,不需要 Kafka
// debezium connector config
{
"name": "mariadb-app",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mariadb-host",
"database.port": "3306",
"database.user": "debezium",
"database.password": "xxx",
"database.server.id": "184054",
"database.server.name": "mariadb-app",
"database.include.list": "app",
"table.include.list": "app.orders,app.users",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schemahistory.app",
"include.schema.changes": "true",
"snapshot.mode": "initial",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}输出格式
Kafka topic mariadb-app.app.orders 里每条消息:
{
"op": "c", // c=create, u=update, d=delete, r=read
"ts_ms": 1747590000000,
"before": null,
"after": {
"id": 123,
"user_id": 42,
"amount_cents": 9900,
"status": "pending"
},
"source": { /* binlog 位置等 */ }
}优劣
✅ 生产成熟、社区活跃 ✅ 多 sink 选择 ✅ Schema 变更追踪 ✅ Snapshot + CDC 一体
❌ 部署稍复杂 ❌ Kafka Connect 单点要 HA
Maxwell
轻量、单文件 JAR:
java -jar maxwell.jar \
--user=maxwell --password=xxx \
--host=mariadb-host \
--producer=kafka \
--kafka.bootstrap.servers=kafka:9092输出格式:
{
"database": "app",
"table": "orders",
"type": "insert",
"ts": 1747590000,
"data": { "id": 123, ... }
}优劣
✅ 几分钟跑起来 ✅ JSON 格式直接消费 ❌ 没有 schema 历史 ❌ HA 自己搭
适合初创 / 小项目 / 临时同步。
Canal(阿里)
阿里巴巴开源,国内用户多。
# canal-deployer 在 db 旁边跑
sh bin/startup.sh
# canal-adapter 把变更同步到下游
# 支持 MySQL、HBase、ES、Kudu、ClickHouse...优劣
✅ 国内文档全、社区活 ✅ adapter 直连多种下游 ❌ Java 系,运维偏重 ❌ MariaDB 11.x 兼容滞后
适合阿里云生态、写 Java 的团队。
实战:MariaDB → Elasticsearch 全文搜索
+---------+ binlog +-----------+ JSON +------+
| MariaDB | -----------> | Debezium | ----------> | ES |
+---------+ +-----------+ +------+# Elasticsearch sink connector
{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics.regex": "mariadb-app\\.app\\.posts",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "false",
"schema.ignore": "true",
"transforms": "extractKey",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "id"
}应用层只写 MariaDB,搜索从 ES 走。
实战:MariaDB → PostgreSQL(异构同步)
# JDBC sink
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics.regex": "mariadb-app\\.app\\..*",
"connection.url": "jdbc:postgresql://pg-host/app",
"connection.user": "sync",
"connection.password": "xxx",
"insert.mode": "upsert",
"pk.mode": "record_key",
"auto.create": "true",
"auto.evolve": "true"
}实战:实时数仓(MariaDB → ClickHouse)
# ClickHouse sink
{
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"topics.regex": "mariadb-app\\.app\\..*",
"hostname": "clickhouse-host",
"port": "8123",
"database": "raw",
"username": "default",
"password": ""
}ClickHouse 里做 MV 聚合,OLAP 查询飞起。
实战:审计(MariaDB → Loki)
把 binlog 转成日志,写入 Loki / S3 / Splunk 做审计。Debezium → Kafka → Logstash → Loki。
性能 & 监控
-- CDC 用户的复制延迟
SHOW REPLICA STATUS FOR CHANNEL 'debezium'\G
-- binlog 累积量
SHOW BINARY LOGS;CDC 工具自身也要监控:
- 消费速度(lag)
- 失败重试次数
- 内存 / CPU
Debezium 在 JMX 暴露指标,可接 Prometheus。
坑
binlog_format不是 ROW → CDC 看不到具体变更值- 大事务(百万行 UPDATE)→ binlog 爆,下游卡死
- schema 变更没让 CDC 知道 → 消息解析失败
- 没设 expire_logs_days → 磁盘炸
- CDC 用户权限不够 → 复制启动失败
- 从从库消费 binlog 时主从延迟 → 数据延迟传到下游
- 跨网络消费带宽不够 → 持续 lag
不用 CDC 的替代方案
- 应用层双写 Kafka:简单但容易丢一致性
- 定时全量 dump:成本低、延迟高
- Outbox pattern:业务表 + outbox 表同事务,再异步消费 outbox