一种实时数据库
数据同步方案及实现
神州信息
董志
1.
概述
变化数据捕获简称CDC(Change Data Capture),可以识别提取从上次提取之后发生变化的数据,在广义的概念上,只要能捕获数据变更的技术,我们都可以称为CDC。通常我们说的 CDC技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC的两种模式:
(1)同步:同步CDC主要是采用触发器记录新增数据,基本能够做到实时增量提取。
(2)异步:异步CDC通过分析已经提交的日志记录来得到增量数据信息,有一定的延时,是本文采用的模式。
1.1. 应用场景
(1)数据同步,用于备份、容灾。
(2)数据分发,一个数据源分发给多个下游。
(3)数据采集,面向数据仓库/数据湖的ETL数据集成。
1.2. 主流的实现机制
(1)基于查询的CDC
a)离线调度查询作业,批处理。
b)无法保障数据一致性。
c)不保障实时性。
(2)基于日志的CDC
a)实时消费日志,流处理。
b)保障数据一致性。
c)提供实时数据。
2.
方案对比
主流开源CDC方案对比如下图所示,主要通过监控各数据库的事务日志达到监控数据变化的目的,根据对比采用Flink CDC 方案。
图:多种CDC技术对比
(1) DataX 不支持增量同步,Canal 不支持全量同步。虽然两者都是非常流行的数据同步工具,但在场景支持上仍不完善。
(2) 在全量+增量一体化同步方面,只有Flink CDC、Debezium、Oracle Goldengate 支持较好。
(3) 在架构方面,Apache Flink 是一个非常优秀的分布式流处理框架,因此Flink CDC 作为Apache Flink 的一个组件具有非常灵活的水平扩展能力。而DataX 和Canal 是个单机架构,在大数据场景下容易面临性能瓶颈的问题。
(4) 在数据加工的能力上,CDC 工具是否能够方便地对数据做一些清洗、过滤、聚合,甚至关联操作?Flink CDC 依托强大的Flink SQL 流式计算能力,可以非常方便地对数据进行加工。而 Debezium 等则需要通过复杂的 Java 代码才能完成,使用门槛比较高。
(5) 另外,在生态方面,这里指的是上下游存储的支持。Flink CDC 上下游非常丰富,支持对接MySQL、PostgreSQL 等数据源,还支持写入到TiDB、HBase、Kafka、Hudi 等各种存储系统中,也支持灵活的自定义connector。
因此,不论从性能还是适用范围上,Flink CDC 都可以作为最佳选择。Flink CDC Connectors是Apache Flink的一组source连接器,使用变更数据捕获 (CDC) 从不同的数据库中获取变更,Flink CDC连接器集成了Debezium作为引擎来捕获数据变化,所以它可以充分发挥Debezium的能力。目前连接器支持的数据库有:MySQL(5.6+)、PostgreSQL(9.6+)、MongoDB(3.6+)、Oracle(11+)、TiDB(5.1.x+)、SQL Server(2012+)和Oceanbase(3.1.x+)。
3.
数据库事务日志
目前支持的关系型数据库包括:MySQL、Oracle、PostgreSQL、SQL Server,主要采用基于WAL日志方式进行数据变化监听。下面介绍各关系型数据库的日志类型:
1. MySQL
(1)Error log错误日志记录了MySQL Server运行过程中所有较为严重的警告和错误信息,以及MySQL Server每次启动和关闭的详细信息。
(2)Binary log二进制日志,记录着数据库发生的各种事务信息。
(3)Update log更新日志是MySQL在较老版本上使用的,其功能跟Bin log类似,只不过不是以二进制格式记录,而是以简单文本格式记录内容。
(4)Query log查询日志记录MySQL中所有的query。
(5)Slow query log慢查询日志记录的就是执行时间较长的query。
(6)InnoDB redo log,InnoDB是一个事务安全的存储引擎,其事务安全性主要就是通过在线redo日志和记录在表空间中的undo信息来保证的。
2. Oracle
(1)系统报警日志alert.log。
(2)跟踪日志(用户和进程) trace.log。
(3)重做日志。
a. 在线重做日志:又称联机重做日志,指Oracle以SQL脚本的形式实时记录数据库的数据更新,换句话说,实时保存已执行的SQL脚本到在线日志文件中(按特定的格式)。
b. 归档重做日志:指当条件满足时,Oracle将在线重做日志以文件形式保存到硬盘(持久化)。
3. PostgreSQL
(1)pg_log文件夹中的日志一般用来记录服务器与DB的状态,如各种Error信息,定位慢查询SQL,数据库的启动关闭信息,发生checkpoint过于频繁等的告警信息等。
(2)pg_xlog文件夹中的日志是记录的PostgreSQL的WAL信息,也就是一些事务日志信息(transaction log),记录着数据库发生的各种事务信息。
(3)pg_clog文件夹存储的也是事务日志文件,但与pg_xlog不同的是它记录的是事务的元数据(metadata),这个日志告诉我们哪些事务完成了,哪些没有完成。
4. SQL Server
(1)交易日志(Transaction logs),是针对数据库改变所做的记录,它可以记录针对数据库的任何操作,并将记录结果保存在独立的文件中。对于任何每一个交易过程,交易日志都有非常全面的记录,根据这些记录可以将数据文件恢复成交易前的状态。
4.
功能实现
1. 整体架构
整体架构如下图所示,首先各源端数据库需要开启相应的事务日志,Flink CDC 任务会监听各数据库的事务变化日志,然后对日志数据进行处理,最后将数据进行传输:
(1)通过订阅发布方式将消息发送到Redis 的Channel 中,通知消费者数据库中的数据发生了变化。
(2)以流的方式存储到Kafka 的 Topic中,供下游程序进行消费。
(3)抽取到其他关系型数据库中,实现 ETL 功能。
图:整体架构图
2. 数据格式
由于 Flink CDC 内部集成了 Debezium 组件,通过 Debezium 进行数据采集,所以数据格式同 Debezium,监听到的数据格式如下图所示,after 代表变化后的数据;source 代表源端的数据库相关信息,包括 Debezium 版本号、连接器类型、数据库名、表名等;op 代表操作的类型,此处为读操作。
图:数据格式
3. 事务日志开启
(1) MySQL 开启Bin Log 日志
在 my.cnf 里面加上如下配置,重启服务。
查看是否开启Bin Log日志 show variables like 'log_%';
(2) Oracle 开启归档日志
启用归档日志:
检查归档日志是否启用:
启动补充日志记录:
4. 具体代码实现
DataStream方式监听 MySQL 数据库实现:
DataStream方式监听 Oracle 数据库实现:
Flink SQL方式监听 MySQL 数据库实现:
自定义反序列化器:
自定义Redis Sink: