java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java SpringBoot整合Canal实现数据同步

Java SpringBoot整合Canal实现数据同步方式

作者:Monly21

本文介绍了如何开启和配置Canal,以及如何在Spring Boot中集成Canal,Canal是一种基于MySQL的数据库变更解析工具,可以将数据库的变更事件发送到Kafka、RocketMQ等消息队列中,用于数据分析和挖掘

一、开启数据库服务

1.1、检查Binlog日志是否开启

show variables like 'log_bin'

注意:如果Value=OFF,则需要开启Binlog日志,如果为ON时,则已开启。

1.2、开启Binlog日志

修改mysql的配置文件my.ini

注意:如果此路径下没有my.ini文件,则去C:\ProgramData\MySQL\MySQL Server 8.0路径下查找

server-id=1
# log-bin
log-bin = mysql-bin
# log-bin = C:\ProgramData\MySQL\MySQL Server 5.7\Data\log\bin_log
binlog_format = ROW
# binlog-do-db = canal-demo

重启MySQL服务:

1.3、常用Binlog日志查询

-- 查询Binlog是否开启
SHOW VARIABLES LIKE 'log_bin';
-- 查看Binlog日志文件列表
SHOW BINARY LOGS;
-- 查看当前正在写入的binlog文件
SHOW MASTER STATUS;

二、配置Canal

2.1 修改Canal配置文件:canal.properties

文件路径:canal.deployer-1.1.7\conf\canal.properties

canal.port = 11111

# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = tcp

canal.destinations = example

具体配置规则:

conf/目录下,使用文件夹放置,文件夹名代表一个MySQL实例。

canal.destinations用于配置需要监控数据的数据库。如果是多个用逗号隔开(“,”),例如:canal.destinations = example, example1, example2

2.2 修改MySQL实例配置文件:instance.properties

文件路径:canal.deployer-1.1.7\conf\example\instance.properties

注意:因为MySQL实例配置文件可以有很多个,所以具体情况要看canal配置文件中的配置

# canal.instance.mysql.slaveId=0

canal.instance.master.address=127.0.0.1:3306

canal.instance.dbUsername=root
canal.instance.dbPassword=root

2.3 启动canal

文件路径:canal.deployer-1.1.7\bin

三、SpringBoot集成Canal

3.1 加载POM

<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>

3.2 修改配置文件

在application.yml文件中增加以下配置

canal:
  server: 127.0.0.1:11111 #canal 默认端口11111
  destination: example
logging:  # 设置日志级别,否则会一致打印监听
  level:
    root: info
    top:
      javatool:
        canal:
          client:
            client:
              AbstractCanalClient: error

3.3 创建实例对象

package com.ming.domain;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
    private Long id;
    private String name;
    private String phone;
}

3.4 编写拦截器

package com.ming.handler;

import com.ming.domain.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.handler.EntryHandler;
import top.javatool.canal.client.annotation.CanalTable;

@Slf4j
@CanalTable("user")  // 数据库中的表名
@Component
public class UserCanalHandler implements EntryHandler<User> {
    @Override
    public void insert(User user) {
        log.info("insert message  {}", user);
    }

    @Override
    public void update(User before, User after) {
        log.info("update before {} ", before);
        log.info("update after {}", after);
    }

    @Override
    public void delete(User user) {
        log.info("delete  {}", user);
    }
}

四、Canal详解

4.1、Binlog的分类

分类介绍优点缺点
STATEMENT语句级别,保存的是对应的SQL语句,可能造成数据的不一致性,比如更新时间等节省空间可能造成数据的不一致性
ROW行级,记录变化的数据数据能够保存绝对一致占用空间大
MIXED兼顾STATEMENT和ROW的优点,很好,基本不用节省空间,兼顾数据一致性在智能转换的过程中,可能会出现意想不到的BUG

综上所述:如果想要对MySQL做数据分析的话,建议使用ROW模式

4.2、Canal的执行原理

  1. Canal将自己伪装成MySQL slave(MySQL的从库),向MySQL master(MySQL的主库)发送dump协议。
  2. MySQL master(MySQL的主库)收到dump协议,开始推送binary log给slave(即:canal)。
  3. Canal接收并解析Binlog日志,得到变更的数据,执行后续逻辑。

4.3、Canal的运用场景

  1. 数据库同步:同步数据到Redis等存储介质。
  2. 数据库实时监控:监控MySQL的更新操作,对于敏感信息可以及时通知相关人员。
  3. 数据分析和挖掘:将增量数据投递给Kafka等消息队列中,为数据分析和挖掘提供数据源。
  4. 数据库备份:将MySQL主库上数据增量日志复制到备库上,实现数据库备份。
  5. 数据集成:集成多个MySQL数据,为数据处理提供更加有效的解决方案。
  6. 数据库迁移:协助MySQL数据库的版本升级及数据迁移任务。

4.4、面试题

4.4.1 Canal是什么?有哪些特性

Canal是阿里巴巴开源的一款基于Netty实现的分布式、高性能、可靠的消息队列。在实现数据同步和数据分发场景下有着广泛的应用。

特性:

4.4.2 Canal的工作原理

4.4.3 Canal的优缺点

4.4.4 Canal应用场景

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文