Flink CDC 教程:MySQL 实时数据同步实战指南
1. 为什么我们需要 Flink CDC?
在深入技术细节之前,我们需要先解决一个根本问题:为什么传统的数据同步方式已经不再适用?
1.1 传统 ETL 的痛点
在过去,如果我们需要将 MySQL 中的业务数据同步到数据仓库(如 Hive)或搜索引擎(如 Elasticsearch)中,通常会使用 ETL 工具(如 Sqoop、Kettle)或编写定时脚本。
这种方式被称为 Batch(批处理),它像“邮政信件”一样:
- 高延迟:通常是 T+1(隔天)或小时级更新,无法满足实时大屏或风控需求。
- 性能压力:通常依赖
SELECT * FROM table WHERE update_time > last_time这种轮询方式。当数据量大时,频繁的查询会把生产数据库“拖垮”。 - 删除数据无法捕获:物理删除的数据在数据库中消失了,轮询查询无法感知“消失”,导致下游数据不一致。
1.2 什么是 CDC?
CDC (Change Data Capture),即数据变更捕获。
它不像传统方式那样去“询问”数据库现在的状态,而是监听数据库的 “日志”(对于 MySQL 来说就是 Binlog)。
- 比喻:传统方式是你每隔一小时打电话问朋友:“你现在在干嘛?”(轮询);CDC 是朋友发朋友圈,你立刻就收到了通知(监听日志)。
1.3 Flink CDC 的核心优势
Flink CDC 是基于 Apache Flink 的一组 Source 组件。相比于其他 CDC 方案(如 Canal、Debezium 独立部署),Flink CDC 具有颠覆性的优势:
- 全增量一体化:自动先读取历史全量数据,完成后无缝切换到增量 Binlog 读取,用户无感知。
- 无锁读取(Lock-free):在读取全量历史数据时,不需要对 MySQL 表加锁,不影响线上业务(这是 Flink CDC 2.0+ 的核心突破)。
- 分布式架构:支持海量数据并发读取,利用 Flink 的并行计算能力。
- Exactly-Once 保障:配合 Flink 的 Checkpoint 机制,确保数据不丢不重。
2. 环境准备与原理基石
2.1 开启 MySQL Binlog
Flink CDC 的底层依赖于 MySQL 的 Binlog(二进制日志)。必须确保 MySQL 开启了 Binlog,且格式为 ROW 模式。
登录 MySQL 执行以下 SQL 检查配置:
SHOW VARIABLES LIKE 'log_bin'; -- 必须为 ON
SHOW VARIABLES LIKE 'binlog_format'; -- 必须为 ROW
SHOW VARIABLES LIKE 'binlog_row_image'; -- 必须为 FULL
如果未开启,需要修改 MySQL 配置文件(my.cnf 或 my.ini)并重启数据库:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
binlog_row_image=FULL # 镜像模式为 FULL
server-id=1 # 设置 MySQL 的 ID,必须唯一
2.2 创建测试数据
为了演示,我们创建一个简单的电商订单表。
CREATE DATABASE IF NOT EXISTS flink_test;
USE flink_test;
CREATE TABLE orders (
order_id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
product_id INT NOT NULL,
order_amount DECIMAL(10, 2),
order_status VARCHAR(20),
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 插入几条模拟历史数据
INSERT INTO orders (user_id, product_id, order_amount, order_status) VALUES
(101, 2001, 99.50, 'PAID'),
(102, 2002, 199.00, 'PENDING'),
(103, 2003, 50.00, 'SHIPPED');
2.3 权限配置
Flink CDC 连接 MySQL 需要一个具备相关权限的账号。除了基础的 SELECT 权限外,还需要复制权限。
-- 创建用户
CREATE USER 'flinkuser'@'%' IDENTIFIED BY 'Flink@123';
-- 授权
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser'@'%';
-- 刷新权限
FLUSH PRIVILEGES;
REPLICATION SLAVE: 允许读取 Binlog。REPLICATION CLIENT: 允许获取 Binlog 状态。RELOAD: 需要执行FLUSH TABLES等操作(虽然无锁模式下不需要锁表,但某些元数据读取仍需此权限)。
3. 实战模式一:DataStream API(代码开发)
这是最灵活的方式,适合需要对数据进行复杂清洗、转换或侧流输出的场景。
3.1 引入依赖
在 Maven 项目的 pom.xml 中引入 Flink 核心包和 MySQL CDC 连接器。
注意版本兼容性:以下以 Flink 1.17 和 Flink CDC 2.4.0 为例(这是非常稳定的生产组合)。
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies>
3.2 编写 Java 代码
我们将创建一个 Flink 作业,监听 orders 表的变更,并将结果打印到控制台。
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDCRealtimeApp {
public static void main(String[] args) throws Exception {
// 1. 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启 Checkpoint (生产环境必须开启,用于故障恢复和 Exactly-Once)
env.enableCheckpointing(5000); // 每 5 秒一次 Checkpoint
env.setParallelism(1); // 演示环境设为 1,生产环境根据数据量调整
// 2. 构建 MySQL Source
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.username("flinkuser")
.password("Flink@123")
.databaseList("flink_test") // 监控的数据库
.tableList("flink_test.orders") // 监控的表,必须带库名前缀
/**
* 启动模式配置 (StartupOptions):
* - initial(): 第一次启动读取全量,然后读增量(最常用)
* - latest(): 只读启动后的增量数据
* - timestamp(long): 指定时间戳开始
*/
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化为 JSON 字符串
.build();
// 3. 从 Source 创建数据流
DataStreamSource<String> streamSource = env.fromSource(
mySqlSource,
WatermarkStrategy.noWatermarks(),
"MySQL CDC Source"
);
// 4. 处理数据 (这里简单打印)
streamSource.print("Data Change >>> ");
// 5. 启动任务
env.execute("Flink CDC MySQL Job");
}
}
3.3 运行与观察
- 启动 Java 程序。
- 控制台首先会输出表中已有的 3 条历史数据(Op:
r代表 read/snapshot)。 - 在 MySQL 中执行插入操作:
INSERT INTO orders (user_id, product_id, order_amount, order_status) VALUES (104, 2004, 300.00, 'PAID'); - 控制台会立即捕获到一条 Op:
c(create) 的 JSON 数据。 - 在 MySQL 中更新操作:
UPDATE orders SET order_status = 'COMPLETED' WHERE user_id = 101; - 控制台会捕获到更新前(
before)和更新后(after)的数据。
4. 实战模式二:Flink SQL(生产力工具)
对于不熟悉 Java 的数据分析师或工程师,Flink SQL 提供了更为便捷的声明式开发方式。通常配合 Flink SQL Client 使用。
4.1 定义 CDC 表
在 Flink SQL Client 中,我们需要通过 DDL 定义一个连接到 MySQL 的 Source 表。
CREATE TABLE orders_cdc (
order_id INT,
user_id INT,
product_id INT,
order_amount DECIMAL(10, 2),
order_status STRING,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'Flink@123',
'database-name' = 'flink_test',
'table-name' = 'orders'
);
4.2 数据查询与清洗
一旦表定义完成,这张表就变成了一个“动态表”。你可以像操作静态表一样写 SQL,但结果是实时流动的。
-- 实时统计每个用户的订单总金额
SELECT
user_id,
SUM(order_amount) as total_gmv,
COUNT(order_id) as order_count
FROM orders_cdc
GROUP BY user_id;
当 MySQL 中的 orders 表发生变化时,上述查询的结果会自动更新。这就是 Flink 强大的 Retract Stream(撤回流) 机制在起作用。
5. 核心原理剖析:Flink CDC 2.x 如何做到“无锁”?
这是面试和生产调优的重点。在 Flink CDC 1.x 时代,读取全量历史数据时需要对表加全局锁,导致线上业务不可写,这是一个巨大的痛点。
Flink CDC 2.x 引入了 Netflix DBLog 论文 中的“无锁并行读取”算法(Chunk 切分)。
5.1 Chunk 切分算法
为了避免锁全表,Flink CDC 将一张大表按照主键(Primary Key)切分成多个 Chunk(分片)。
例如,order_id 从 1 到 10000。
- Chunk 1: [1, 2000]
- Chunk 2: [2001, 4000]
- ...
5.2 流程详解
-
Snapshot Phase(快照阶段):
- Flink 启动多个 Reader 并行读取不同的 Chunk。
- 读取单个 Chunk 时,只对该 Chunk 的区间加极其短暂的锁(用于记录 Binlog 位置),随即释放。
- 在读取 Chunk 数据的过程中,如果 MySQL 中有新数据写入(Binlog产生),Flink 会将这部分 Binlog 缓存在内存中。
- 读取完 Chunk 后,将缓存的 Binlog 合并,修正数据,确保该 Chunk 的数据是最终一致的。
-
Binlog Phase(增量阶段):
- 当所有 Chunk 的快照都读取完毕后,Flink 只需要从最早的一个 Binlog 位点开始,统一消费增量日志。
优势总结:
- 支持断点续传:如果快照读了一半挂了,Checkpoint 记录了读完了哪些 Chunk,重启后只需读剩下的。
- 对业务无感:不再需要停机或锁表。
6. 生产环境避坑指南
6.1 Server ID 冲突
现象:程序报错 The server id xxx is already in use。 原因:MySQL Binlog 协议要求每个连接的 Slave 必须有一个唯一的 Server ID。如果 Flink 任务并发度为 4,那么它实际上会模拟 4 个 Slave 连接 MySQL。 解决方案: 从 Flink CDC 2.x 开始,默认会随机生成 Server ID,通常不需要手动干预。但在某些严格的网络环境下,或者使用 1.x 版本时,需要显式指定 ID 范围:
.serverId("5400-5404") // 指定一个范围,供不同并发 Source 使用
6.2 默认时区问题
现象:MySQL 中时间是 2023-11-01 10:00:00,同步到 Flink 后变成了 2023-11-01 02:00:00(相差 8 小时)。 原因:Flink CDC 底层 Debezium 默认使用 UTC 时间解析。 解决方案: 在构建 Source 时指定时区:
.serverTimeZone("Asia/Shanghai")
或者在 SQL DDL 中添加:
'server-time-zone' = 'Asia/Shanghai'
6.3 数据库连接超时
现象:任务运行一段时间后,报错 Communications link failure。 原因:MySQL 会主动断开长时间空闲的连接(wait_timeout 默认为 8 小时),或者全量阶段读取太慢导致 Binlog 读取超时。 解决方案:
- 在 JDBC 参数中添加心跳检测:
autoReconnect=true。 - 增加 MySQL 侧的
net_read_timeout和net_write_timeout。
6.4 全量阶段内存溢出 (OOM)
现象:在读取几亿行的大表时,TaskManager 频繁 GC 甚至 OOM。 原因:Chunk Size 设置过大,或者并行度过高导致大量数据涌入内存。 解决方案:
- 减小 Chunk Size(默认 8096 条):
'scan.incremental.snapshot.chunk.size' = '1000' - 限制 Source 的并发度(Parallelism)。
7. 架构延伸:基于 Flink CDC 的数据湖仓
掌握了 Flink CDC 后,我们可以构建现代化的实时数据仓库。
经典架构路径:
-
ODS 层 (Operational Data Store):
- MySQL -> Flink CDC -> Kafka (存储原始 Binlog JSON)
- 这一步实现了业务库与数仓的解耦。
-
DWD 层 (Data Warehouse Detail):
- Flink 读取 Kafka -> 数据清洗/宽表关联 -> Paimon / Hudi / Iceberg (数据湖格式)
- 实现 ACID 事务支持和流批一体查询。
-
ADS 层 (Application Data Service):
- Flink / StarRocks / ClickHouse -> 实时大屏 / 报表。
这种架构相比传统的 MySQL -> Sqoop -> Hive,将数据可见性从 T+1 提升到了 秒级。
8. 总结
Flink CDC 是实时数据集成领域的“瑞士军刀”。它解决了传统 ETL 的痛点,利用 Flink 强大的流处理能力,实现了全量与增量数据的无缝衔接。
关键点回顾:
- 环境:务必开启 MySQL Binlog (Row + Full)。
- 开发:推荐使用 DataStream API 处理复杂逻辑,Table API (SQL) 处理标准同步。
- 原理:理解“无锁 Chunk 切分”是掌握 Flink CDC 性能调优的钥匙。
- 运维:注意 Checkpoint 设置和 Server ID 唯一性。
随着业务对实时性要求的提高,掌握 Flink CDC 已经成为全栈工程师和大数据开发者的必备技能。