如何利用Java实现MySQL的数据变化监听
作者:Huooya
在高并发和大数据环境下,实时获取 MySQL 数据库的增量变化对数据同步、数据分析、缓存更新等场景至关重要。MySQL 的 binlog(Binary Log) 记录了数据库的所有变更,可以用来实现 增量数据监听。本文将介绍如何利用 binlog 监听 MySQL 数据增量,并提供基于 Java 的 Canal 实现示例。
1.binlog 简介
1.1 什么是 binlog
binlog(Binary Log) 是 MySQL 记录 DDL(数据定义语言,如 CREATE、ALTER)和 DML(数据操作语言,如 INSERT、UPDATE、DELETE)的日志文件,它用于:
- 主从复制:MySQL 主库将 binlog 传输到从库,实现数据同步。
- 数据恢复:通过
mysqlbinlog工具解析 binlog 恢复数据。 - 数据同步:第三方工具(如 Canal)解析 binlog,进行数据同步。
1.2 binlog 的三种格式
| binlog 格式 | 说明 |
|---|---|
| STATEMENT | 记录 SQL 语句本身 |
| ROW | 记录行数据变更(推荐) |
| MIXED | 结合前两者,MySQL 自动判断 |
由于 ROW 格式能提供精确的行级别变更信息,因此推荐使用它。
2. 开启 binlog 并配置 MySQL
2.1 检查 binlog 是否开启
SHOW VARIABLES LIKE 'log_bin';
如果 log_bin 值为 OFF,说明 binlog 未开启。
2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)
在 [mysqld] 部分添加以下内容:
server-id=1 log-bin=mysql-bin binlog-format=ROW binlog-row-image=FULL expire_logs_days=7
重启 MySQL:
systemctl restart mysql # Linux net stop mysql && net start mysql # Windows
2.3 验证 binlog 配置
执行:
SHOW BINARY LOGS;
如果有 binlog 文件,如 mysql-bin.000001,说明已开启。
3. 使用 Java 监听 binlog
3.1 选择工具:Canal
阿里巴巴开源的 Canal 可以模拟 MySQL 从库协议,解析 binlog 并实时推送增量数据。
3.2 Java 代码监听 binlog
引入 Maven 依赖
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version>
</dependency>
</dependencies>编写 Java 代码
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class BinlogListener {
public static void main(String[] args) {
// 连接 Canal
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example", "canal", "canal");
try {
connector.connect();
connector.subscribe(".*\\..*"); // 监听所有库表
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100); // 获取数据
long batchId = message.getId();
List<CanalEntry.Entry> entries = message.getEntries();
if (batchId != -1 && !entries.isEmpty()) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
processEntry(entry);
}
}
}
connector.ack(batchId); // 确认消息
}
} finally {
connector.disconnect();
}
}
private static void processEntry(CanalEntry.Entry entry) {
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println("变更表:" + entry.getHeader().getTableName());
System.out.println("变更类型:" + eventType);
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
System.out.println("删除数据:" + rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
System.out.println("新增数据:" + rowData.getAfterColumnsList());
} else {
System.out.println("更新前数据:" + rowData.getBeforeColumnsList());
System.out.println("更新后数据:" + rowData.getAfterColumnsList());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
4. 代码解析
1.创建 Canal 连接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example", "canal", "canal");
127.0.0.1:Canal 服务器地址11111:Canal 端口example:Canal 实例canal/canal:默认账号密码
2.获取 binlog 变更数据
Message message = connector.getWithoutAck(100);
getWithoutAck(100):拉取 100 条 binlog 事件。
3.解析 binlog
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
processEntry(entry);
}
}
仅处理 ROWDATA 类型的变更,忽略事务等其他信息。
4.分类处理 INSERT、UPDATE、DELETE
if (eventType == CanalEntry.EventType.DELETE) {
System.out.println("删除数据:" + rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
System.out.println("新增数据:" + rowData.getAfterColumnsList());
} else {
System.out.println("更新前数据:" + rowData.getBeforeColumnsList());
System.out.println("更新后数据:" + rowData.getAfterColumnsList());
}
总结
- MySQL binlog 记录数据库变更,可用于监听增量数据。
- Canal 作为 MySQL 从库解析 binlog,实现数据同步。
- Java 代码示例 展示如何用 Canal 监听
INSERT、UPDATE、DELETE操作,并解析变更数据。
这种方案适用于 分布式数据同步、缓存一致性 和 数据变更通知,是实时数据处理的重要手段。
到此这篇关于如何利用Java实现MySQL的数据变化监听的文章就介绍到这了,更多相关Java监听MySQL数据变化内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
