如何使用 Spring Boot 和 Canal 实现 MySQL 数据库同步
作者:归宿乐瑶
前言
在分布式系统中,数据同步是一个常见的需求。例如,我们可能需要将主库的数据实时同步到多个从库,或者将数据从一个数据库集群同步到另一个集群。本篇内容通过一个实际案例,介绍如何使用 Spring Boot 和 Canal 实现 MySQL 数据库之间的数据同步。
一、背景
假设我们有以下数据库架构:
- 两个主库:db_1 和 db_2。
- 每个主库对应两个从库:db_1_bk_1、db_1_bk_2 和 db_2_bk_1、db_2_bk_2。
- 我们的目标是:
- 将 db_1 的数据同步到 db_1_bk_1 和 db_1_bk_2。
- 将 db_2 的数据同步到 db_2_bk_1 和 db_2_bk_2。
二、Canal 简介
Canal 是阿里巴巴开源的一款基于 MySQL Binlog 的增量数据订阅与分发工具。它通过模拟 MySQL 的从节点,实时捕获主库的 Binlog 日志,并将数据变更事件推送给下游消费者。Canal 支持多种下游适配器,如 Kafka、RabbitMQ 和直接消费。
三、主库数据库配置
1.主库配置
为了使 Canal 能够正常解析 Binlog 日志,主库需要进行以下配置:
- 开启 Binlog 日志:确保主库开启了 Binlog 日志,并且设置为 ROW 模式。
- 配置 server-id:为每个主库设置唯一的 server-id。
- 创建 Canal 用户并授予权限:创建一个用户供 Canal 使用,并授予必要的权限。
编辑主库的配置文件(my.cnf 或 my.ini),添加以下内容:
[mysqld] # 开启 Binlog 日志 log-bin=mysql-bin # 设置 Binlog 格式为 ROW 模式 binlog-format=ROW # 设置唯一的 server-id server-id=1
注意:
- 如果你有多个主库,每个主库的 server-id 必须是唯一的。
- 修改配置后,需要重启 MySQL 服务以使配置生效。
2.创建 Canal 用户并授予权限
Canal 需要一个具有读取 Binlog 权限的 MySQL 用户。以下是创建用户并授予权限的步骤:
# 登录 MySQL mysql -u root -p # 创建用户 CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; # 授予权限 GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal'; # 刷新权限 FLUSH PRIVILEGES;
说明:
- canal 用户需要足够的权限来读取 Binlog 数据,但不需要对数据库进行写操作。
- 如果你的 MySQL 版本较新(8.x),可能需要使用 ALTER USER 命令来设置密码:
ALTER USER 'canal'@'%' IDENTIFIED BY 'canal';
四.配置 Canal Server
Canal Server 是 Canal 的核心组件,负责连接主库并解析 Binlog 数据。我们需要为每个主库配置一个 Canal 实例。
1.Canal Server 配置文件
在 Canal Server 的配置目录下,创建两个实例配置文件:conf/db_1/instance.properties 和 conf/db_2/instance.properties。conf/db_1/instance.properties:
# 主库的地址和端口 canal.instance.master.address=db_1_ip:3306 # Canal 连接主库的用户名和密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal # 需要同步的表正则表达式,这里表示同步 db_1 数据库的所有表 canal.instance.filter.regex=db_1\\..*
conf/db_2/instance.properties:
# 主库的地址和端口 canal.instance.master.address=db_2_ip:3306 # Canal 连接主库的用户名和密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal # 需要同步的表正则表达式,这里表示同步 db_2 数据库的所有表 canal.instance.filter.regex=db_2\\..*
2.启动 Canal Server
使用以下命令启动 Canal Server:
nohup sh bin/canal.sh start &
注意:
- 确保主库的 Binlog 位置和文件名正确。如果不确定,可以通过 SHOW MASTER STATUS; 命令查看。
- 如果主库已经运行了一段时间,需要指定 Binlog 的起始位置,避免重复同步旧数据。
五.开发 Spring Boot 客户端
Spring Boot 客户端作为 Canal 的消息消费者,负责接收数据变更事件并同步到目标从库。
1. 引入依赖
在 Spring Boot 项目的 pom.xml
文件中,引入 Canal 客户端依赖:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.8</version> </dependency>
2. 配置 Canal 客户端
在 application.yml
文件中,配置 Canal Server 的地址:
canal: server.ip: canal_server_ip server.port: 11111
3. 实现数据同步逻辑
创建一个 Canal 客户端服务类,用于接收和处理数据变更事件。CanalClientService.java:
@Service public class CanalClientService { private final CanalConnector canalConnector; public CanalClientService(@Value("${canal.server.ip}") String canalServerIp, @Value("${canal.server.port}") int canalServerPort) { this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp, canalServerPort), "example", "", ""); } @PostConstruct public void start() { canalConnector.connect(); canalConnector.subscribe("db_1..*, db_2..*"); // 订阅 db_1 和 db_2 的所有表 new Thread(this::process).start(); } private void process() { while (true) { Message message = canalConnector.getWithoutAck(100); long batchId = message.getId(); if (batchId == -1 || message.getEntries().isEmpty()) { continue; } for (Entry entry : message.getEntries()) { handleData(entry); } canalConnector.ack(batchId); } } private void handleData(Entry entry) { String schemaName = entry.getHeader().getSchemaName(); // 数据库名 String tableName = entry.getHeader().getTableName(); // 表名 EventType eventType = entry.getHeader().getEventType(); // 数据变更类型 System.out.println("Schema: " + schemaName + ", Table: " + tableName + ", Type: " + eventType); // 根据来源数据库同步到对应的从库 if ("db_1".equals(schemaName)) { syncToBackupDbs(entry, "db_1_bk_1", "db_1_bk_2"); } else if ("db_2".equals(schemaName)) { syncToBackupDbs(entry, "db_2_bk_1", "db_2_bk_2"); } } private void syncToBackupDbs(Entry entry, String... backupDbs) { // 根据事件类型同步到从库 if (entry.getHeader().getEventType() == EventType.INSERT) { for (String db : backupDbs) { syncInsert(entry, db); } } else if (entry.getHeader().getEventType() == EventType.UPDATE) { for (String db : backupDbs) { syncUpdate(entry, db); } } else if (entry.getHeader().getEventType() == EventType.DELETE) { for (String db : backupDbs) { syncDelete(entry, db); } } } private void syncInsert(Entry entry, String backupDb) { // 使用 MyBatis 将数据插入到对应的从库 System.out.println("INSERT into " + backupDb); } private void syncUpdate(Entry entry, String backupDb) { // 使用 MyBatis 将数据更新到对应的从库 System.out.println("UPDATE into " + backupDb); } private void syncDelete(Entry entry, String backupDb) { // 使用 MyBatis 将数据从对应的从库删除 System.out.println("DELETE from " + backupDb); } }
六.启动并测试
- 启动 Canal Server。
- 启动 Spring Boot 应用。
- 在主库 db_1 或 db_2 中插入、更新或删除数据。
- 观察从库 db_1_bk_1、db_1_bk_2、db_2_bk_1 和 db_2_bk_2 是否同步成功。
七.注意事项
- 数据一致性:确保从库的数据与主库保持一致。可以通过事务或锁机制来避免冲突。
- 性能优化:如果数据量较大,建议结合中间件(如 Kafka)进行缓冲和负载均衡。
- 错误处理:在同步过程中,需要处理网络异常、数据库连接异常等情况。
- Canal Server 高可用:在生产环境中,建议部署 Canal Server 的集群,以提高系统的可用性。
八.总结
通过 Spring Boot 和 Canal,我们可以实现 MySQL 数据库之间的高效数据同步。Canal 提供了强大的 Binlog 解析能力,而 Spring Boot 则提供了灵活的开发框架,两者结合可以轻松应对复杂的分布式数据同步需求。希望本文对你有所帮助,如果有任何问题,欢迎在评论区留言。
到此这篇关于使用 Spring Boot 和 Canal 实现 MySQL 数据库同步的文章就介绍到这了,更多相关Spring Boot MySQL 数据库同步内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!