MariaDB 커뮤니티

CDC:把 MariaDB binlog 接到下游

Debezium / Maxwell / Canal——MariaDB binlog → Kafka / Pulsar / NATS / PG / ES

CDC(Change Data Capture)让数据库的每一次变更都变成一个事件流,可以被异步消费。实时数仓、搜索索引同步、跨库复制、事件溯源都靠它。

三大主流工具

工具维护用什么适合
DebeziumRed HatKafka Connect / Server生产首选
Maxwell社区直接发 Kafka轻量
Canal阿里Java agent国内生态

通用前置:开 binlog

[mariadb]
server-id=1
log-bin=mysql-bin
binlog_format=ROW                         # CDC 必须 ROW,不能 STATEMENT
binlog_row_image=FULL
expire_logs_days=7
gtid_strict_mode=ON
log_slave_updates=ON                      # 从库也写 binlog
-- 给 CDC 工具用户 GRANT
CREATE USER 'debezium'@'%' IDENTIFIED BY 'xxx';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
  ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;

别在 Galera 集群所有节点开 binlog 同时跑 CDC——会重复消费。指定一个节点。

Debezium

部署方式

  • Kafka Connect:传统、强大、需要 Kafka
  • Debezium Server:直接发到 Kinesis / Pulsar / NATS,不需要 Kafka
// debezium connector config
{
  "name": "mariadb-app",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mariadb-host",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "xxx",
    "database.server.id": "184054",
    "database.server.name": "mariadb-app",
    "database.include.list": "app",
    "table.include.list": "app.orders,app.users",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schemahistory.app",
    "include.schema.changes": "true",
    "snapshot.mode": "initial",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
  }
}

输出格式

Kafka topic mariadb-app.app.orders 里每条消息:

{
  "op": "c",                    // c=create, u=update, d=delete, r=read
  "ts_ms": 1747590000000,
  "before": null,
  "after": {
    "id": 123,
    "user_id": 42,
    "amount_cents": 9900,
    "status": "pending"
  },
  "source": { /* binlog 位置等 */ }
}

优劣

✅ 生产成熟、社区活跃 ✅ 多 sink 选择 ✅ Schema 变更追踪 ✅ Snapshot + CDC 一体

❌ 部署稍复杂 ❌ Kafka Connect 单点要 HA

Maxwell

轻量、单文件 JAR

java -jar maxwell.jar \
  --user=maxwell --password=xxx \
  --host=mariadb-host \
  --producer=kafka \
  --kafka.bootstrap.servers=kafka:9092

输出格式:

{
  "database": "app",
  "table": "orders",
  "type": "insert",
  "ts": 1747590000,
  "data": { "id": 123, ... }
}

优劣

✅ 几分钟跑起来 ✅ JSON 格式直接消费 ❌ 没有 schema 历史 ❌ HA 自己搭

适合初创 / 小项目 / 临时同步

Canal(阿里)

阿里巴巴开源,国内用户多。

# canal-deployer 在 db 旁边跑
sh bin/startup.sh

# canal-adapter 把变更同步到下游
# 支持 MySQL、HBase、ES、Kudu、ClickHouse...

优劣

✅ 国内文档全、社区活 ✅ adapter 直连多种下游 ❌ Java 系,运维偏重 ❌ MariaDB 11.x 兼容滞后

适合阿里云生态、写 Java 的团队

实战:MariaDB → Elasticsearch 全文搜索

+---------+    binlog    +-----------+    JSON     +------+
| MariaDB | -----------> | Debezium  | ----------> | ES   |
+---------+              +-----------+             +------+
# Elasticsearch sink connector
{
  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "topics.regex": "mariadb-app\\.app\\.posts",
  "connection.url": "http://elasticsearch:9200",
  "type.name": "_doc",
  "key.ignore": "false",
  "schema.ignore": "true",
  "transforms": "extractKey",
  "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extractKey.field": "id"
}

应用层只写 MariaDB,搜索从 ES 走。

实战:MariaDB → PostgreSQL(异构同步)

# JDBC sink
{
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "topics.regex": "mariadb-app\\.app\\..*",
  "connection.url": "jdbc:postgresql://pg-host/app",
  "connection.user": "sync",
  "connection.password": "xxx",
  "insert.mode": "upsert",
  "pk.mode": "record_key",
  "auto.create": "true",
  "auto.evolve": "true"
}

实战:实时数仓(MariaDB → ClickHouse)

# ClickHouse sink
{
  "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
  "topics.regex": "mariadb-app\\.app\\..*",
  "hostname": "clickhouse-host",
  "port": "8123",
  "database": "raw",
  "username": "default",
  "password": ""
}

ClickHouse 里做 MV 聚合,OLAP 查询飞起。

实战:审计(MariaDB → Loki)

把 binlog 转成日志,写入 Loki / S3 / Splunk 做审计。Debezium → Kafka → Logstash → Loki。

性能 & 监控

-- CDC 用户的复制延迟
SHOW REPLICA STATUS FOR CHANNEL 'debezium'\G

-- binlog 累积量
SHOW BINARY LOGS;

CDC 工具自身也要监控:

  • 消费速度(lag)
  • 失败重试次数
  • 内存 / CPU

Debezium 在 JMX 暴露指标,可接 Prometheus。

  1. binlog_format 不是 ROW → CDC 看不到具体变更值
  2. 大事务(百万行 UPDATE)→ binlog 爆,下游卡死
  3. schema 变更没让 CDC 知道 → 消息解析失败
  4. 没设 expire_logs_days → 磁盘炸
  5. CDC 用户权限不够 → 复制启动失败
  6. 从从库消费 binlog 时主从延迟 → 数据延迟传到下游
  7. 跨网络消费带宽不够 → 持续 lag

不用 CDC 的替代方案

  • 应用层双写 Kafka:简单但容易丢一致性
  • 定时全量 dump:成本低、延迟高
  • Outbox pattern:业务表 + outbox 表同事务,再异步消费 outbox

延伸

On this page