Flink CDC 教程:MySQL 实时数据同步实战指南

1. 为什么我们需要 Flink CDC?

在深入技术细节之前,我们需要先解决一个根本问题:为什么传统的数据同步方式已经不再适用?

1.1 传统 ETL 的痛点

在过去,如果我们需要将 MySQL 中的业务数据同步到数据仓库(如 Hive)或搜索引擎(如 Elasticsearch)中,通常会使用 ETL 工具(如 Sqoop、Kettle)或编写定时脚本。

这种方式被称为 Batch(批处理),它像“邮政信件”一样:

  1. 高延迟:通常是 T+1(隔天)或小时级更新,无法满足实时大屏或风控需求。
  2. 性能压力:通常依赖 SELECT * FROM table WHERE update_time > last_time 这种轮询方式。当数据量大时,频繁的查询会把生产数据库“拖垮”。
  3. 删除数据无法捕获:物理删除的数据在数据库中消失了,轮询查询无法感知“消失”,导致下游数据不一致。

1.2 什么是 CDC?

CDC (Change Data Capture),即数据变更捕获

它不像传统方式那样去“询问”数据库现在的状态,而是监听数据库的 “日志”(对于 MySQL 来说就是 Binlog)。

  • 比喻:传统方式是你每隔一小时打电话问朋友:“你现在在干嘛?”(轮询);CDC 是朋友发朋友圈,你立刻就收到了通知(监听日志)。

1.3 Flink CDC 的核心优势

Flink CDC 是基于 Apache Flink 的一组 Source 组件。相比于其他 CDC 方案(如 Canal、Debezium 独立部署),Flink CDC 具有颠覆性的优势:

  • 全增量一体化:自动先读取历史全量数据,完成后无缝切换到增量 Binlog 读取,用户无感知。
  • 无锁读取(Lock-free):在读取全量历史数据时,不需要对 MySQL 表加锁,不影响线上业务(这是 Flink CDC 2.0+ 的核心突破)。
  • 分布式架构:支持海量数据并发读取,利用 Flink 的并行计算能力。
  • Exactly-Once 保障:配合 Flink 的 Checkpoint 机制,确保数据不丢不重。

2. 环境准备与原理基石

2.1 开启 MySQL Binlog

Flink CDC 的底层依赖于 MySQL 的 Binlog(二进制日志)。必须确保 MySQL 开启了 Binlog,且格式为 ROW 模式。

登录 MySQL 执行以下 SQL 检查配置:

SHOW VARIABLES LIKE 'log_bin'; -- 必须为 ON
SHOW VARIABLES LIKE 'binlog_format'; -- 必须为 ROW
SHOW VARIABLES LIKE 'binlog_row_image'; -- 必须为 FULL

如果未开启,需要修改 MySQL 配置文件(my.cnfmy.ini)并重启数据库:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
binlog_row_image=FULL # 镜像模式为 FULL
server-id=1 # 设置 MySQL 的 ID,必须唯一

2.2 创建测试数据

为了演示,我们创建一个简单的电商订单表。

CREATE DATABASE IF NOT EXISTS flink_test;
USE flink_test;

CREATE TABLE orders (
    order_id INT AUTO_INCREMENT PRIMARY KEY,
    user_id INT NOT NULL,
    product_id INT NOT NULL,
    order_amount DECIMAL(10, 2),
    order_status VARCHAR(20),
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 插入几条模拟历史数据
INSERT INTO orders (user_id, product_id, order_amount, order_status) VALUES 
(101, 2001, 99.50, 'PAID'),
(102, 2002, 199.00, 'PENDING'),
(103, 2003, 50.00, 'SHIPPED');

2.3 权限配置

Flink CDC 连接 MySQL 需要一个具备相关权限的账号。除了基础的 SELECT 权限外,还需要复制权限。

-- 创建用户
CREATE USER 'flinkuser'@'%' IDENTIFIED BY 'Flink@123';

-- 授权
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser'@'%';

-- 刷新权限
FLUSH PRIVILEGES;
  • REPLICATION SLAVE: 允许读取 Binlog。
  • REPLICATION CLIENT: 允许获取 Binlog 状态。
  • RELOAD: 需要执行 FLUSH TABLES 等操作(虽然无锁模式下不需要锁表,但某些元数据读取仍需此权限)。

3. 实战模式一:DataStream API(代码开发)

这是最灵活的方式,适合需要对数据进行复杂清洗、转换或侧流输出的场景。

3.1 引入依赖

在 Maven 项目的 pom.xml 中引入 Flink 核心包和 MySQL CDC 连接器。

注意版本兼容性:以下以 Flink 1.17 和 Flink CDC 2.4.0 为例(这是非常稳定的生产组合)。

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.17.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>1.17.1</version>
    </dependency>

    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.4.0</version>
    </dependency>
    
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.83</version>
    </dependency>
</dependencies>

3.2 编写 Java 代码

我们将创建一个 Flink 作业,监听 orders 表的变更,并将结果打印到控制台。

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDCRealtimeApp {

    public static void main(String[] args) throws Exception {
        // 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 开启 Checkpoint (生产环境必须开启,用于故障恢复和 Exactly-Once)
        env.enableCheckpointing(5000); // 每 5 秒一次 Checkpoint
        env.setParallelism(1); // 演示环境设为 1,生产环境根据数据量调整

        // 2. 构建 MySQL Source
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .username("flinkuser")
                .password("Flink@123")
                .databaseList("flink_test") // 监控的数据库
                .tableList("flink_test.orders") // 监控的表,必须带库名前缀
                
                /**
                 * 启动模式配置 (StartupOptions):
                 * - initial(): 第一次启动读取全量,然后读增量(最常用)
                 * - latest(): 只读启动后的增量数据
                 * - timestamp(long): 指定时间戳开始
                 */
                .startupOptions(StartupOptions.initial()) 
                .deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化为 JSON 字符串
                .build();

        // 3. 从 Source 创建数据流
        DataStreamSource<String> streamSource = env.fromSource(
                mySqlSource,
                WatermarkStrategy.noWatermarks(),
                "MySQL CDC Source"
        );

        // 4. 处理数据 (这里简单打印)
        streamSource.print("Data Change >>> ");

        // 5. 启动任务
        env.execute("Flink CDC MySQL Job");
    }
}

3.3 运行与观察

  1. 启动 Java 程序。
  2. 控制台首先会输出表中已有的 3 条历史数据(Op: r 代表 read/snapshot)。
  3. 在 MySQL 中执行插入操作:
    INSERT INTO orders (user_id, product_id, order_amount, order_status) VALUES (104, 2004, 300.00, 'PAID');
    
  4. 控制台会立即捕获到一条 Op: c (create) 的 JSON 数据。
  5. 在 MySQL 中更新操作:
    UPDATE orders SET order_status = 'COMPLETED' WHERE user_id = 101;
    
  6. 控制台会捕获到更新前(before)和更新后(after)的数据。

4. 实战模式二:Flink SQL(生产力工具)

对于不熟悉 Java 的数据分析师或工程师,Flink SQL 提供了更为便捷的声明式开发方式。通常配合 Flink SQL Client 使用。

4.1 定义 CDC 表

在 Flink SQL Client 中,我们需要通过 DDL 定义一个连接到 MySQL 的 Source 表。

CREATE TABLE orders_cdc (
    order_id INT,
    user_id INT,
    product_id INT,
    order_amount DECIMAL(10, 2),
    order_status STRING,
    create_time TIMESTAMP(3),
    update_time TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'flinkuser',
    'password' = 'Flink@123',
    'database-name' = 'flink_test',
    'table-name' = 'orders'
);

4.2 数据查询与清洗

一旦表定义完成,这张表就变成了一个“动态表”。你可以像操作静态表一样写 SQL,但结果是实时流动的。

-- 实时统计每个用户的订单总金额
SELECT 
    user_id, 
    SUM(order_amount) as total_gmv,
    COUNT(order_id) as order_count
FROM orders_cdc
GROUP BY user_id;

当 MySQL 中的 orders 表发生变化时,上述查询的结果会自动更新。这就是 Flink 强大的 Retract Stream(撤回流) 机制在起作用。


5. 核心原理剖析:Flink CDC 2.x 如何做到“无锁”?

这是面试和生产调优的重点。在 Flink CDC 1.x 时代,读取全量历史数据时需要对表加全局锁,导致线上业务不可写,这是一个巨大的痛点。

Flink CDC 2.x 引入了 Netflix DBLog 论文 中的“无锁并行读取”算法(Chunk 切分)。

5.1 Chunk 切分算法

为了避免锁全表,Flink CDC 将一张大表按照主键(Primary Key)切分成多个 Chunk(分片)

例如,order_id 从 1 到 10000。

  • Chunk 1: [1, 2000]
  • Chunk 2: [2001, 4000]
  • ...

5.2 流程详解

  1. Snapshot Phase(快照阶段)

    • Flink 启动多个 Reader 并行读取不同的 Chunk。
    • 读取单个 Chunk 时,只对该 Chunk 的区间加极其短暂的锁(用于记录 Binlog 位置),随即释放。
    • 在读取 Chunk 数据的过程中,如果 MySQL 中有新数据写入(Binlog产生),Flink 会将这部分 Binlog 缓存在内存中。
    • 读取完 Chunk 后,将缓存的 Binlog 合并,修正数据,确保该 Chunk 的数据是最终一致的。
  2. Binlog Phase(增量阶段)

    • 当所有 Chunk 的快照都读取完毕后,Flink 只需要从最早的一个 Binlog 位点开始,统一消费增量日志。

优势总结

  • 支持断点续传:如果快照读了一半挂了,Checkpoint 记录了读完了哪些 Chunk,重启后只需读剩下的。
  • 对业务无感:不再需要停机或锁表。

6. 生产环境避坑指南

6.1 Server ID 冲突

现象:程序报错 The server id xxx is already in use原因:MySQL Binlog 协议要求每个连接的 Slave 必须有一个唯一的 Server ID。如果 Flink 任务并发度为 4,那么它实际上会模拟 4 个 Slave 连接 MySQL。 解决方案: 从 Flink CDC 2.x 开始,默认会随机生成 Server ID,通常不需要手动干预。但在某些严格的网络环境下,或者使用 1.x 版本时,需要显式指定 ID 范围:

.serverId("5400-5404") // 指定一个范围,供不同并发 Source 使用

6.2 默认时区问题

现象:MySQL 中时间是 2023-11-01 10:00:00,同步到 Flink 后变成了 2023-11-01 02:00:00(相差 8 小时)。 原因:Flink CDC 底层 Debezium 默认使用 UTC 时间解析。 解决方案: 在构建 Source 时指定时区:

.serverTimeZone("Asia/Shanghai") 

或者在 SQL DDL 中添加:

'server-time-zone' = 'Asia/Shanghai'

6.3 数据库连接超时

现象:任务运行一段时间后,报错 Communications link failure原因:MySQL 会主动断开长时间空闲的连接(wait_timeout 默认为 8 小时),或者全量阶段读取太慢导致 Binlog 读取超时。 解决方案

  • 在 JDBC 参数中添加心跳检测:autoReconnect=true
  • 增加 MySQL 侧的 net_read_timeoutnet_write_timeout

6.4 全量阶段内存溢出 (OOM)

现象:在读取几亿行的大表时,TaskManager 频繁 GC 甚至 OOM。 原因:Chunk Size 设置过大,或者并行度过高导致大量数据涌入内存。 解决方案

  • 减小 Chunk Size(默认 8096 条):
    'scan.incremental.snapshot.chunk.size' = '1000'
    
  • 限制 Source 的并发度(Parallelism)。

7. 架构延伸:基于 Flink CDC 的数据湖仓

掌握了 Flink CDC 后,我们可以构建现代化的实时数据仓库。

经典架构路径

  1. ODS 层 (Operational Data Store)

    • MySQL -> Flink CDC -> Kafka (存储原始 Binlog JSON)
    • 这一步实现了业务库与数仓的解耦。
  2. DWD 层 (Data Warehouse Detail)

    • Flink 读取 Kafka -> 数据清洗/宽表关联 -> Paimon / Hudi / Iceberg (数据湖格式)
    • 实现 ACID 事务支持和流批一体查询。
  3. ADS 层 (Application Data Service)

    • Flink / StarRocks / ClickHouse -> 实时大屏 / 报表。

这种架构相比传统的 MySQL -> Sqoop -> Hive,将数据可见性从 T+1 提升到了 秒级


8. 总结

Flink CDC 是实时数据集成领域的“瑞士军刀”。它解决了传统 ETL 的痛点,利用 Flink 强大的流处理能力,实现了全量与增量数据的无缝衔接。

关键点回顾

  1. 环境:务必开启 MySQL Binlog (Row + Full)。
  2. 开发:推荐使用 DataStream API 处理复杂逻辑,Table API (SQL) 处理标准同步。
  3. 原理:理解“无锁 Chunk 切分”是掌握 Flink CDC 性能调优的钥匙。
  4. 运维:注意 Checkpoint 设置和 Server ID 唯一性。

随着业务对实时性要求的提高,掌握 Flink CDC 已经成为全栈工程师和大数据开发者的必备技能。

正文到此结束
评论插件初始化中...
Loading...