Mysql

关注公众号 jb51net

关闭
首页 > 数据库 > Mysql > MySQL Binlog 日志监听

MySQL Binlog 日志监听与 Spring 集成实战场景

作者:IT枫斗者

MySQL 的二进制日志(binlog)有三种常见的格式:Statement 模式、Row 模式和Mixed 模式,这篇文章主要介绍了MySQL Binlog 日志监听与 Spring 集成实战,需要的朋友可以参考下

MySQL Binlog 日志监听与 Spring 集成实战

binlog的三种模式

MySQL 的二进制日志(binlog)有三种常见的格式:Statement 模式Row 模式Mixed 模式。每种模式的设计目标不同,适用于不同的场景,以下是它们的详细对比和应用:

1. Statement 模式

Statement 模式下,MySQL 记录的是每个执行的 SQL 语句,而不是具体的数据变化。例如,执行一个 UPDATE 语句时,binlog 中记录的是该 SQL 语句,而不是更新后的数据。

优点

缺点

2. Row 模式

Row 模式下,MySQL 记录每一行数据的变化。如果执行 UPDATE 语句,binlog 记录的是被更新的行的具体数据,而非 SQL 语句。

优点

缺点

3. Mixed 模式

Mixed 模式结合了 Statement 模式Row 模式,根据具体 SQL 的类型动态选择记录方式。对于简单的 SQL 语句(如 INSERT),MySQL 使用 Statement 模式;对于复杂的操作或涉及多行数据的 SQL 语句,则采用 Row 模式

优点

缺点

可以通过修改 MySQL 配置文件来设置 binlog_format 参数,具体操作如下:

[mysqld]
binlog_format=mixed

其中,statementrowmixed 分别代表 Statement 模式、Row 模式和 Mixed 模式。选择适当的 binlog 模式取决于应用的特定需求和性能要求。不同的模式具有不同的优劣势,例如,Statement 模式可能会更轻量,而 Row 模式可能提供更详细的数据变化信息。

以Mixed 为例

查看 Binlog 是否开启

你可以通过以下 SQL 查询来检查 binlog 是否开启:

show variables like '%log_bin%'

启动springboot程序

新建数据库

这个事件是一个 binlog 事件,其内容表示一个 SQL 查询事件。让我解释一下这个事件的各个部分:

这个事件的作用是在 test2023 数据库中执行了一个创建数据库的 SQL 查询。这是 binlog 中的一部分,用于记录数据库中的变化,以便进行数据备份、主从同步等操作。

新建表数据

CREATE TABLE `t_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `userName` varchar(100) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4;

这个事件也是一个 binlog 事件,表示一个 SQL 查询事件。让我解释一下这个事件的各个部分:

这个事件的作用是在 test2023 数据库中创建了一个名为 t_user 的表,该表包含 iduserName 两个字段,其中 id 是自增的主键。这种类型的事件常常用于记录数据库结构的变化,以便进行数据备份、迁移和版本控制等操作。

插入表数据

INSERT INTO `test2023`.`t_user` (`id`, `userName`)
VALUES
    (
        "10086",
        "用心记录技术,走心分享,始于后端,不止于后端,励志成为一名优秀的全栈架构师,真正的实现码中致富。"
    );

这个事件也是一个 binlog 事件,表示一个 SQL 查询事件,具体如下:

这个事件的作用是向 t_user 表中插入了一行数据,包含了 iduserName 两个字段的值。这种类型的事件通常用于记录数据的变化,以便进行数据备份、同步和迁移等操作。

修改表数据修改表数据

修改表数据

UPDATE `test2023`.`t_user`
SET `id` = '10086',
 `userName` = '我的修改数据!!!'
WHERE
	(`id` = '10086');

这个事件同样是一个 binlog 事件,表示一个 SQL 查询事件,具体如下:

这个事件的作用是将 t_user 表中 id10086 的行的数据进行更新,将 id 修改为 10086userName 修改为 ‘我的修改数据!!!’。这种类型的事件通常用于记录数据的变化,以便进行数据备份、同步和迁移等操作。

删除表数据

DELETE
FROM
	t_user
WHERE
	id = '10086';

这个事件同样是一个 binlog 事件,表示一个 SQL 查询事件,具体如下:

这个事件的作用是删除 t_user 表中 id10086 的行。这种类型的事件通常用于记录数据的删除操作,以便进行数据备份、同步和迁移等操作。

总结: binlog_format 设置为 mixed 时,对于 INSERT、UPDATE 和 DELETE 操作,它们在 binlog 中的事件类型都会被表示为 QUERY 事件。这是因为在 mixed 模式下,MySQL 使用了不同的方式来记录不同类型的操作,但在 binlog 中,它们都被包装成了 QUERY 事件。

在 mixed 模式下:

这就是为什么看到的 INSERT、UPDATE 和 DELETE 操作的事件类型都是 QUERY。在处理这些事件时,需要根据具体的 SQL 查询语句或其他信息来确定操作的类型。

源码示例

pom.xml

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jpa</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.48</version> <!-- 查看最新版本 -->
		</dependency>
		<dependency>
			<groupId>com.github.shyiko</groupId>
			<artifactId>mysql-binlog-connector-java</artifactId>
			<version>0.21.0</version>
		</dependency>

Java示例

package com.example.demo.listener;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.naming.AuthenticationException;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.TimeoutException;
public class BinlogListenerMixed {
    private static final Logger logger = LoggerFactory.getLogger(BinlogListenerMixed.class);
    private static final String MYSQL_HOST = "8.130.74.105";
    private static final int MYSQL_PORT = 3306;
    private static final String MYSQL_USERNAME = "root";
    private static final String MYSQL_PASSWORD = "zhang.ting.123";
    public static void main(String[] args) {
        try {
            BinaryLogClient client = new BinaryLogClient(MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD);
//            client.setBinlogFilename(null);
//            client.setBinlogPosition(-1); // 或者设置为其他适当的初始位置
//            client.setServerId(1);
//            client.setBinlogFilename("mysql-bin.000005");
//            client.setBinlogPosition(154);
            EventDeserializer eventDeserializer = new EventDeserializer();
            eventDeserializer.setCompatibilityMode(
                    EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                    EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
            );
            logger.info("使用主机={}, 端口={}, 用户名={}, 密码={} 连接到 MySQL", MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD);
            client.setEventDeserializer(eventDeserializer);
            client.registerEventListener(BinlogListenerMixed::handleEvent);
            client.registerLifecycleListener(new BinaryLogClient.LifecycleListener() {
                @Override
                public void onConnect(BinaryLogClient client) {
                    logger.info("Connected to MySQL server");
                }
                @Override
                public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
                    logger.error("Communication failure with MySQL server", ex);
                }
                @Override
                public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
                    logger.error("Event deserialization failure", ex);
                }
                @Override
                public void onDisconnect(BinaryLogClient client) {
                    logger.warn("Disconnected from MySQL server");
                    // 在这里添加重新连接或其他处理逻辑
                }
            });
            client.connect();
        } catch (IOException e) {
            logger.error("@@ 连接到 MySQL 时发生错误", e);
            logger.error("@@ Error connecting to MySQL", e);
        }
    }
    private static void handleEvent(Event event) {
        logger.info("@@ 打印 event: {}", event);
        logger.info("@@ Received event type: {}", event.getHeader().getEventType());
        switch (event.getHeader().getEventType()) {
            case WRITE_ROWS:
            case EXT_WRITE_ROWS:
                handleWriteRowsEvent((WriteRowsEventData) event.getData());
                break;
            case QUERY:
                handleQueryEvent((QueryEventData) event.getData());
                break;
            case TABLE_MAP:
                handleTableMapEvent((TableMapEventData) event.getData());
                break;
            // 其他事件处理...
        }
    }
    private static void handleWriteRowsEvent(WriteRowsEventData eventData) {
        List<Serializable[]> rows = eventData.getRows();
        // 获取表名
        String tableName = getTableName(eventData);
        // 处理每一行数据
        for (Serializable[] row : rows) {
            // 根据需要调整以下代码以获取具体的列值
            String column1Value = row[0].toString();
            String column2Value = row[1].toString();
            // 将数据备份到另一个数据库
            backupToAnotherDatabase(tableName, column1Value, column2Value);
        }
    }
    private static void handleQueryEvent(QueryEventData eventData) {
        String sql = eventData.getSql();
        logger.info("@@ handleQueryEvent函数执行Query event SQL: {}", sql);
        // 解析SQL语句,根据需要处理
        // 例如,检查是否包含写入操作,然后执行相应的逻辑
    }
    private static void handleTableMapEvent(TableMapEventData eventData) {
        // 获取表映射信息,根据需要处理
        logger.info("@@ handleTableMapEvent函数执行TableMap event: {}", eventData);
    }
    private static String getTableName(EventData eventData) {
        // 获取表名的逻辑,可以使用TableMapEventData等信息
        // 根据实际情况实现
        return "example_table";
    }
    private static void backupToAnotherDatabase(String tableName, String column1Value, String column2Value) {
        // 将数据备份到另一个数据库的逻辑
        logger.info("Backup to another database: Table={}, Column1={}, Column2={}", tableName, column1Value, column2Value);
    }
}

总结

选择合适的 binlog 模式对数据库的性能和数据一致性至关重要:

到此这篇关于MySQL Binlog 日志监听与 Spring 集成实战的文章就介绍到这了,更多相关MySQL Binlog 日志监听内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文